dataset: Replace 'compactor' by a translating casewriter.
authorBen Pfaff <blp@cs.stanford.edu>
Sun, 5 Mar 2023 19:08:00 +0000 (11:08 -0800)
committerBen Pfaff <blp@cs.stanford.edu>
Sun, 5 Mar 2023 19:26:29 +0000 (11:26 -0800)
src/data/dataset.c

index 72dfb8b41f54db04f27e790c84b751a3b838856e..0e88c2c18681d686e58a373cb67ef301b22f86fa 100644 (file)
@@ -83,10 +83,6 @@ struct dataset {
      sink. */
   bool discard_output;
 
      sink. */
   bool discard_output;
 
-  /* The case map used to compact a case, if necessary;
-     otherwise a null pointer. */
-  struct case_map *compactor;
-
   /* Time at which proc was last invoked. */
   time_t last_proc_invocation;
 
   /* Time at which proc was last invoked. */
   time_t last_proc_invocation;
 
@@ -499,16 +495,14 @@ proc_open_filtering (struct dataset *ds, bool filter)
       struct dictionary *pd = dict_clone (ds->permanent_dict);
       struct case_map_stage *stage = case_map_stage_create (pd);
       dict_delete_scratch_vars (pd);
       struct dictionary *pd = dict_clone (ds->permanent_dict);
       struct case_map_stage *stage = case_map_stage_create (pd);
       dict_delete_scratch_vars (pd);
-      ds->compactor = case_map_stage_get_case_map (stage);
+      ds->sink = case_map_create_output_translator (
+        case_map_stage_get_case_map (stage),
+        autopaging_writer_create (dict_get_proto (pd)));
       case_map_stage_destroy (stage);
       case_map_stage_destroy (stage);
-      ds->sink = autopaging_writer_create (dict_get_proto (pd));
       dict_unref (pd);
     }
   else
       dict_unref (pd);
     }
   else
-    {
-      ds->compactor = NULL;
-      ds->sink = NULL;
-    }
+    ds->sink = NULL;
 
   /* Allocate memory for lagged cases. */
   ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
 
   /* Allocate memory for lagged cases. */
   ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
@@ -596,8 +590,7 @@ proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
         {
           if (ds->order_var)
             *case_num_rw (c, ds->order_var) = case_nr;
         {
           if (ds->order_var)
             *case_num_rw (c, ds->order_var) = case_nr;
-          casewriter_write (ds->sink,
-                            case_map_execute (ds->compactor, case_ref (c)));
+          casewriter_write (ds->sink, case_ref (c));
         }
 
       /* Execute temporary transformations. */
         }
 
       /* Execute temporary transformations. */
@@ -664,14 +657,7 @@ proc_commit (struct dataset *ds)
 
   if (!ds->discard_output)
     {
 
   if (!ds->discard_output)
     {
-      /* Finish compacting. */
-      if (ds->compactor != NULL)
-        {
-          case_map_destroy (ds->compactor);
-          ds->compactor = NULL;
-
-          dict_delete_scratch_vars (ds->dict);
-        }
+      dict_delete_scratch_vars (ds->dict);
 
       /* Old data sink becomes new data source. */
       if (ds->sink != NULL)
 
       /* Old data sink becomes new data source. */
       if (ds->sink != NULL)