case-map: Make creating a case_map destroy the stage.
[pspp] / src / data / dataset.c
index 9503521207c0fd78a5bbbc2fc18a9c55a579da53..2af19bd756890c1490406d396d039c503abc7728 100644 (file)
@@ -68,6 +68,7 @@ struct dataset {
   struct caseinit *caseinit;
   struct trns_chain permanent_trns_chain;
   struct dictionary *permanent_dict;
+  struct variable *order_var;
   struct casewriter *sink;
   struct trns_chain temporary_trns_chain;
   bool temporary;
@@ -82,10 +83,6 @@ struct dataset {
      sink. */
   bool discard_output;
 
-  /* The case map used to compact a case, if necessary;
-     otherwise a null pointer. */
-  struct case_map *compactor;
-
   /* Time at which proc was last invoked. */
   time_t last_proc_invocation;
 
@@ -360,10 +357,31 @@ dataset_delete_vars (struct dataset *ds, struct variable **vars, size_t n)
   caseinit_clear (ds->caseinit);
   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
 
+  struct case_map_stage *stage = case_map_stage_create (ds->dict);
   dict_delete_vars (ds->dict, vars, n);
   ds->source = case_map_create_input_translator (
-    case_map_to_compact_dict (ds->dict, 0), ds->source);
-  dict_compact_values (ds->dict);
+    case_map_stage_to_case_map (stage), ds->source);
+  caseinit_clear (ds->caseinit);
+  caseinit_mark_as_preinited (ds->caseinit, ds->dict);
+}
+
+void
+dataset_reorder_vars (struct dataset *ds, struct variable **vars, size_t n)
+{
+  assert (!proc_in_temporary_transformations (ds));
+  assert (!proc_has_transformations (ds));
+  assert (n <= dict_get_n_vars (ds->dict));
+
+  caseinit_mark_for_init (ds->caseinit, ds->dict);
+  ds->source = caseinit_translate_casereader_to_init_vars (
+    ds->caseinit, dict_get_proto (ds->dict), ds->source);
+  caseinit_clear (ds->caseinit);
+  caseinit_mark_as_preinited (ds->caseinit, ds->dict);
+
+  struct case_map_stage *stage = case_map_stage_create (ds->dict);
+  dict_reorder_vars (ds->dict, vars, n);
+  ds->source = case_map_create_input_translator (
+    case_map_stage_to_case_map (stage), ds->source);
   caseinit_clear (ds->caseinit);
   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
 }
@@ -472,27 +490,16 @@ proc_open_filtering (struct dataset *ds, bool filter)
   /* Prepare sink. */
   if (!ds->discard_output)
     {
-      struct dictionary *pd = ds->permanent_dict;
-      size_t compacted_n_values = dict_count_values (pd, DC_SCRATCH);
-      if (compacted_n_values < dict_get_next_value_idx (pd))
-        {
-          struct caseproto *compacted_proto;
-          compacted_proto = dict_get_compacted_proto (pd, DC_SCRATCH);
-          ds->compactor = case_map_to_compact_dict (pd, DC_SCRATCH);
-          ds->sink = autopaging_writer_create (compacted_proto);
-          caseproto_unref (compacted_proto);
-        }
-      else
-        {
-          ds->compactor = NULL;
-          ds->sink = autopaging_writer_create (dict_get_proto (pd));
-        }
+      struct dictionary *pd = dict_clone (ds->permanent_dict);
+      struct case_map_stage *stage = case_map_stage_create (pd);
+      dict_delete_scratch_vars (pd);
+      ds->sink = case_map_create_output_translator (
+        case_map_stage_to_case_map (stage),
+        autopaging_writer_create (dict_get_proto (pd)));
+      dict_unref (pd);
     }
   else
-    {
-      ds->compactor = NULL;
-      ds->sink = NULL;
-    }
+    ds->sink = NULL;
 
   /* Allocate memory for lagged cases. */
   ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
@@ -577,8 +584,11 @@ proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
       /* Write case to replacement dataset. */
       ds->cases_written++;
       if (ds->sink != NULL)
-        casewriter_write (ds->sink,
-                          case_map_execute (ds->compactor, case_ref (c)));
+        {
+          if (ds->order_var)
+            *case_num_rw (c, ds->order_var) = case_nr;
+          casewriter_write (ds->sink, case_ref (c));
+        }
 
       /* Execute temporary transformations. */
       if (ds->temporary_trns_chain.n)
@@ -644,15 +654,7 @@ proc_commit (struct dataset *ds)
 
   if (!ds->discard_output)
     {
-      /* Finish compacting. */
-      if (ds->compactor != NULL)
-        {
-          case_map_destroy (ds->compactor);
-          ds->compactor = NULL;
-
-          dict_delete_scratch_vars (ds->dict);
-          dict_compact_values (ds->dict);
-        }
+      dict_delete_scratch_vars (ds->dict);
 
       /* Old data sink becomes new data source. */
       if (ds->sink != NULL)
@@ -670,6 +672,7 @@ proc_commit (struct dataset *ds)
 
   dict_clear_vectors (ds->dict);
   ds->permanent_dict = NULL;
+  ds->order_var = NULL;
   return ok;
 }
 
@@ -843,24 +846,26 @@ store_case_num (void *var_, struct ccase **cc, casenumber case_num)
   return TRNS_CONTINUE;
 }
 
-/* Add a variable which we can sort by to get back the original order. */
+/* Add a variable $ORDERING which we can sort by to get back the original order. */
 struct variable *
 add_permanent_ordering_transformation (struct dataset *ds)
 {
-  struct variable *temp_var = dict_create_var_assert (ds->dict, "$ORDER", 0);
-  struct variable *order_var
-    = (proc_in_temporary_transformations (ds)
-       ? dict_clone_var_in_place_assert (ds->permanent_dict, temp_var)
-       : temp_var);
+  struct dictionary *d = ds->permanent_dict ? ds->permanent_dict : ds->dict;
+  struct variable *order_var = dict_create_var_assert (d, "$ORDER", 0);
+  ds->order_var = order_var;
 
-  static const struct trns_class trns_class = {
-    .name = "ordering",
-    .execute = store_case_num
-  };
-  const struct transformation t = { .class = &trns_class, .aux = order_var };
-  trns_chain_append (&ds->permanent_trns_chain, &t);
+  if (ds->permanent_dict)
+    {
+      order_var = dict_create_var_assert (ds->dict, "$ORDER", 0);
+      static const struct trns_class trns_class = {
+        .name = "ordering",
+        .execute = store_case_num
+      };
+      const struct transformation t = { .class = &trns_class, .aux = order_var };
+      trns_chain_prepend (&ds->temporary_trns_chain, &t);
+    }
 
-  return temp_var;
+  return order_var;
 }
 \f
 /* Causes output from the next procedure to be discarded, instead