case-map: Make creating a case_map destroy the stage.
[pspp] / src / data / dataset.c
index e11f173948cc0ec1293ffa5f6800a8702e69e074..2af19bd756890c1490406d396d039c503abc7728 100644 (file)
@@ -36,6 +36,8 @@
 #include "data/transformations.h"
 #include "data/variable.h"
 #include "libpspp/deque.h"
+#include "libpspp/hash-functions.h"
+#include "libpspp/hmap.h"
 #include "libpspp/misc.h"
 #include "libpspp/str.h"
 #include "libpspp/taint.h"
@@ -66,6 +68,7 @@ struct dataset {
   struct caseinit *caseinit;
   struct trns_chain permanent_trns_chain;
   struct dictionary *permanent_dict;
+  struct variable *order_var;
   struct casewriter *sink;
   struct trns_chain temporary_trns_chain;
   bool temporary;
@@ -80,10 +83,6 @@ struct dataset {
      sink. */
   bool discard_output;
 
-  /* The case map used to compact a case, if necessary;
-     otherwise a null pointer. */
-  struct case_map *compactor;
-
   /* Time at which proc was last invoked. */
   time_t last_proc_invocation;
 
@@ -116,6 +115,8 @@ static void dataset_changed__ (struct dataset *);
 static void dataset_transformations_changed__ (struct dataset *,
                                                bool non_empty);
 
+static void add_measurement_level_trns (struct dataset *, struct dictionary *);
+static void cancel_measurement_level_trns (struct trns_chain *);
 static void add_case_limit_trns (struct dataset *ds);
 static void add_filter_trns (struct dataset *ds);
 
@@ -177,6 +178,7 @@ dataset_clone (struct dataset *old, const char *name)
   assert (old->sink == NULL);
   assert (!old->temporary);
   assert (!old->temporary_trns_chain.n);
+  assert (!old->n_stack);
 
   new = xzalloc (sizeof *new);
   new->name = xstrdup (name);
@@ -342,6 +344,48 @@ dataset_steal_source (struct dataset *ds)
   return reader;
 }
 
+void
+dataset_delete_vars (struct dataset *ds, struct variable **vars, size_t n)
+{
+  assert (!proc_in_temporary_transformations (ds));
+  assert (!proc_has_transformations (ds));
+  assert (n < dict_get_n_vars (ds->dict));
+
+  caseinit_mark_for_init (ds->caseinit, ds->dict);
+  ds->source = caseinit_translate_casereader_to_init_vars (
+    ds->caseinit, dict_get_proto (ds->dict), ds->source);
+  caseinit_clear (ds->caseinit);
+  caseinit_mark_as_preinited (ds->caseinit, ds->dict);
+
+  struct case_map_stage *stage = case_map_stage_create (ds->dict);
+  dict_delete_vars (ds->dict, vars, n);
+  ds->source = case_map_create_input_translator (
+    case_map_stage_to_case_map (stage), ds->source);
+  caseinit_clear (ds->caseinit);
+  caseinit_mark_as_preinited (ds->caseinit, ds->dict);
+}
+
+void
+dataset_reorder_vars (struct dataset *ds, struct variable **vars, size_t n)
+{
+  assert (!proc_in_temporary_transformations (ds));
+  assert (!proc_has_transformations (ds));
+  assert (n <= dict_get_n_vars (ds->dict));
+
+  caseinit_mark_for_init (ds->caseinit, ds->dict);
+  ds->source = caseinit_translate_casereader_to_init_vars (
+    ds->caseinit, dict_get_proto (ds->dict), ds->source);
+  caseinit_clear (ds->caseinit);
+  caseinit_mark_as_preinited (ds->caseinit, ds->dict);
+
+  struct case_map_stage *stage = case_map_stage_create (ds->dict);
+  dict_reorder_vars (ds->dict, vars, n);
+  ds->source = case_map_create_input_translator (
+    case_map_stage_to_case_map (stage), ds->source);
+  caseinit_clear (ds->caseinit);
+  caseinit_mark_as_preinited (ds->caseinit, ds->dict);
+}
+
 /* Returns a number unique to DS.  It can be used to distinguish one dataset
    from any other within a given program run, even datasets that do not exist
    at the same time. */
@@ -421,17 +465,22 @@ proc_open_filtering (struct dataset *ds, bool filter)
 {
   struct casereader *reader;
 
+  assert (ds->n_stack == 0);
   assert (ds->source != NULL);
   assert (ds->proc_state == PROC_COMMITTED);
 
   update_last_proc_invocation (ds);
 
   caseinit_mark_for_init (ds->caseinit, ds->dict);
+  ds->source = caseinit_translate_casereader_to_init_vars (
+    ds->caseinit, dict_get_proto (ds->dict), ds->source);
 
   /* Finish up the collection of transformations. */
   add_case_limit_trns (ds);
   if (filter)
     add_filter_trns (ds);
+  if (!proc_in_temporary_transformations (ds))
+    add_measurement_level_trns (ds, ds->dict);
 
   /* Make permanent_dict refer to the dictionary right before
      data reaches the sink. */
@@ -441,27 +490,16 @@ proc_open_filtering (struct dataset *ds, bool filter)
   /* Prepare sink. */
   if (!ds->discard_output)
     {
-      struct dictionary *pd = ds->permanent_dict;
-      size_t compacted_n_values = dict_count_values (pd, 1u << DC_SCRATCH);
-      if (compacted_n_values < dict_get_next_value_idx (pd))
-        {
-          struct caseproto *compacted_proto;
-          compacted_proto = dict_get_compacted_proto (pd, 1u << DC_SCRATCH);
-          ds->compactor = case_map_to_compact_dict (pd, 1u << DC_SCRATCH);
-          ds->sink = autopaging_writer_create (compacted_proto);
-          caseproto_unref (compacted_proto);
-        }
-      else
-        {
-          ds->compactor = NULL;
-          ds->sink = autopaging_writer_create (dict_get_proto (pd));
-        }
+      struct dictionary *pd = dict_clone (ds->permanent_dict);
+      struct case_map_stage *stage = case_map_stage_create (pd);
+      dict_delete_scratch_vars (pd);
+      ds->sink = case_map_create_output_translator (
+        case_map_stage_to_case_map (stage),
+        autopaging_writer_create (dict_get_proto (pd)));
+      dict_unref (pd);
     }
   else
-    {
-      ds->compactor = NULL;
-      ds->sink = NULL;
-    }
+    ds->sink = NULL;
 
   /* Allocate memory for lagged cases. */
   ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
@@ -526,12 +564,12 @@ proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
       if (c == NULL)
         return NULL;
       c = case_unshare_and_resize (c, dict_get_proto (ds->dict));
-      caseinit_init_vars (ds->caseinit, c);
+      caseinit_restore_left_vars (ds->caseinit, c);
 
       /* Execute permanent transformations.  */
       casenumber case_nr = ds->cases_written + 1;
       retval = trns_chain_execute (&ds->permanent_trns_chain, case_nr, &c);
-      caseinit_update_left_vars (ds->caseinit, c);
+      caseinit_save_left_vars (ds->caseinit, c);
       if (retval != TRNS_CONTINUE)
         continue;
 
@@ -546,8 +584,11 @@ proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
       /* Write case to replacement dataset. */
       ds->cases_written++;
       if (ds->sink != NULL)
-        casewriter_write (ds->sink,
-                          case_map_execute (ds->compactor, case_ref (c)));
+        {
+          if (ds->order_var)
+            *case_num_rw (c, ds->order_var) = case_nr;
+          casewriter_write (ds->sink, case_ref (c));
+        }
 
       /* Execute temporary transformations. */
       if (ds->temporary_trns_chain.n)
@@ -609,18 +650,11 @@ proc_commit (struct dataset *ds)
 
   /* Dictionary from before TEMPORARY becomes permanent. */
   proc_cancel_temporary_transformations (ds);
+  bool ok = proc_cancel_all_transformations (ds) && ds->ok;
 
   if (!ds->discard_output)
     {
-      /* Finish compacting. */
-      if (ds->compactor != NULL)
-        {
-          case_map_destroy (ds->compactor);
-          ds->compactor = NULL;
-
-          dict_delete_scratch_vars (ds->dict);
-          dict_compact_values (ds->dict);
-        }
+      dict_delete_scratch_vars (ds->dict);
 
       /* Old data sink becomes new data source. */
       if (ds->sink != NULL)
@@ -638,7 +672,8 @@ proc_commit (struct dataset *ds)
 
   dict_clear_vectors (ds->dict);
   ds->permanent_dict = NULL;
-  return proc_cancel_all_transformations (ds) && ds->ok;
+  ds->order_var = NULL;
+  return ok;
 }
 
 /* Casereader class for procedure execution. */
@@ -699,11 +734,13 @@ proc_in_temporary_transformations (const struct dataset *ds)
 void
 proc_start_temporary_transformations (struct dataset *ds)
 {
+  assert (!ds->n_stack);
   if (!proc_in_temporary_transformations (ds))
     {
       add_case_limit_trns (ds);
 
       ds->permanent_dict = dict_clone (ds->dict);
+      add_measurement_level_trns (ds, ds->permanent_dict);
 
       ds->temporary = true;
       dataset_transformations_changed__ (ds, true);
@@ -723,6 +760,7 @@ proc_make_temporary_transformations_permanent (struct dataset *ds)
 {
   if (proc_in_temporary_transformations (ds))
     {
+      cancel_measurement_level_trns (&ds->permanent_trns_chain);
       trns_chain_splice (&ds->permanent_trns_chain, &ds->temporary_trns_chain);
 
       ds->temporary = false;
@@ -744,12 +782,12 @@ proc_cancel_temporary_transformations (struct dataset *ds)
 {
   if (proc_in_temporary_transformations (ds))
     {
+      trns_chain_clear (&ds->temporary_trns_chain);
+
       dict_unref (ds->dict);
       ds->dict = ds->permanent_dict;
       ds->permanent_dict = NULL;
 
-      trns_chain_clear (&ds->temporary_trns_chain);
-
       dataset_transformations_changed__ (ds, ds->permanent_trns_chain.n != 0);
       return true;
     }
@@ -791,6 +829,12 @@ proc_pop_transformations (struct dataset *ds, struct trns_chain *chain)
   *chain = ds->stack[--ds->n_stack];
 }
 
+bool
+proc_has_transformations (const struct dataset *ds)
+{
+  return ds->permanent_trns_chain.n || ds->temporary_trns_chain.n;
+}
+
 static enum trns_result
 store_case_num (void *var_, struct ccase **cc, casenumber case_num)
 {
@@ -802,24 +846,26 @@ store_case_num (void *var_, struct ccase **cc, casenumber case_num)
   return TRNS_CONTINUE;
 }
 
-/* Add a variable which we can sort by to get back the original order. */
+/* Add a variable $ORDERING which we can sort by to get back the original order. */
 struct variable *
 add_permanent_ordering_transformation (struct dataset *ds)
 {
-  struct variable *temp_var = dict_create_var_assert (ds->dict, "$ORDER", 0);
-  struct variable *order_var
-    = (proc_in_temporary_transformations (ds)
-       ? dict_clone_var_in_place_assert (ds->permanent_dict, temp_var)
-       : temp_var);
+  struct dictionary *d = ds->permanent_dict ? ds->permanent_dict : ds->dict;
+  struct variable *order_var = dict_create_var_assert (d, "$ORDER", 0);
+  ds->order_var = order_var;
 
-  static const struct trns_class trns_class = {
-    .name = "ordering",
-    .execute = store_case_num
-  };
-  const struct transformation t = { .class = &trns_class, .aux = order_var };
-  trns_chain_append (&ds->permanent_trns_chain, &t);
+  if (ds->permanent_dict)
+    {
+      order_var = dict_create_var_assert (ds->dict, "$ORDER", 0);
+      static const struct trns_class trns_class = {
+        .name = "ordering",
+        .execute = store_case_num
+      };
+      const struct transformation t = { .class = &trns_class, .aux = order_var };
+      trns_chain_prepend (&ds->temporary_trns_chain, &t);
+    }
 
-  return temp_var;
+  return order_var;
 }
 \f
 /* Causes output from the next procedure to be discarded, instead
@@ -910,7 +956,7 @@ filter_trns_proc (void *filter_var_,
 {
   struct variable *filter_var = filter_var_;
   double f = case_num (*c, filter_var);
-  return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
+  return (f != 0.0 && !var_is_num_missing (filter_var, f)
           ? TRNS_CONTINUE : TRNS_DROP_CASE);
 }
 
@@ -938,6 +984,256 @@ dataset_need_lag (struct dataset *ds, int n_before)
   ds->n_lag = MAX (ds->n_lag, n_before);
 }
 \f
+/* Measurement guesser, for guessing a measurement level from formats and
+   data. */
+
+struct mg_value
+  {
+    struct hmap_node hmap_node;
+    double value;
+  };
+
+struct mg_var
+  {
+    struct variable *var;
+    struct hmap *values;
+  };
+
+static void
+mg_var_uninit (struct mg_var *mgv)
+{
+  struct mg_value *mgvalue, *next;
+  HMAP_FOR_EACH_SAFE (mgvalue, next, struct mg_value, hmap_node,
+                      mgv->values)
+    {
+      hmap_delete (mgv->values, &mgvalue->hmap_node);
+      free (mgvalue);
+    }
+  hmap_destroy (mgv->values);
+  free (mgv->values);
+}
+
+static enum measure
+mg_var_interpret (const struct mg_var *mgv)
+{
+  size_t n = hmap_count (mgv->values);
+  if (!n)
+    {
+      /* All missing (or no data). */
+      return MEASURE_NOMINAL;
+    }
+
+  const struct mg_value *mgvalue;
+  HMAP_FOR_EACH (mgvalue, struct mg_value, hmap_node,
+                 mgv->values)
+    if (mgvalue->value < 10)
+      return MEASURE_NOMINAL;
+  return MEASURE_SCALE;
+}
+
+static enum measure
+mg_var_add_value (struct mg_var *mgv, double value)
+{
+  if (var_is_num_missing (mgv->var, value))
+    return MEASURE_UNKNOWN;
+  else if (value < 0 || value != floor (value))
+    return MEASURE_SCALE;
+
+  size_t hash = hash_double (value, 0);
+  struct mg_value *mgvalue;
+  HMAP_FOR_EACH_WITH_HASH (mgvalue, struct mg_value, hmap_node,
+                           hash, mgv->values)
+    if (mgvalue->value == value)
+      return MEASURE_UNKNOWN;
+
+  mgvalue = xmalloc (sizeof *mgvalue);
+  mgvalue->value = value;
+  hmap_insert (mgv->values, &mgvalue->hmap_node, hash);
+  if (hmap_count (mgv->values) >= settings_get_scalemin ())
+    return MEASURE_SCALE;
+
+  return MEASURE_UNKNOWN;
+}
+
+struct measure_guesser
+  {
+    struct mg_var *vars;
+    size_t n_vars;
+  };
+
+static struct measure_guesser *
+measure_guesser_create__ (struct dictionary *dict)
+{
+  struct mg_var *mgvs = NULL;
+  size_t n_mgvs = 0;
+  size_t allocated_mgvs = 0;
+
+  for (size_t i = 0; i < dict_get_n_vars (dict); i++)
+    {
+      struct variable *var = dict_get_var (dict, i);
+      if (var_get_measure (var) != MEASURE_UNKNOWN)
+        continue;
+
+      struct fmt_spec f = var_get_print_format (var);
+      enum measure m = var_default_measure_for_format (f.type);
+      if (m != MEASURE_UNKNOWN)
+        {
+          var_set_measure (var, m);
+          continue;
+        }
+
+      if (n_mgvs >= allocated_mgvs)
+        mgvs = x2nrealloc (mgvs, &allocated_mgvs, sizeof *mgvs);
+
+      struct mg_var *mgv = &mgvs[n_mgvs++];
+      *mgv = (struct mg_var) {
+        .var = var,
+        .values = xmalloc (sizeof *mgv->values),
+      };
+      hmap_init (mgv->values);
+    }
+  if (!n_mgvs)
+    return NULL;
+
+  struct measure_guesser *mg = xmalloc (sizeof *mg);
+  *mg = (struct measure_guesser) {
+    .vars = mgvs,
+    .n_vars = n_mgvs,
+  };
+  return mg;
+}
+
+/* Scans through DS's dictionary for variables that have an unknown measurement
+   level.  For those, if the measurement level can be guessed based on the
+   variable's type and format, sets a default.  If that's enough, returns NULL.
+   If any remain whose levels are unknown and can't be guessed that way,
+   creates and returns a structure that the caller should pass to
+   measure_guesser_add_case() or measure_guesser_run() for guessing a
+   measurement level based on the data.  */
+struct measure_guesser *
+measure_guesser_create (struct dataset *ds)
+{
+  return measure_guesser_create__ (dataset_dict (ds));
+}
+
+/* Adds data from case C to MG. */
+static void
+measure_guesser_add_case (struct measure_guesser *mg, const struct ccase *c)
+{
+  for (size_t i = 0; i < mg->n_vars; )
+    {
+      struct mg_var *mgv = &mg->vars[i];
+      double value = case_num (c, mgv->var);
+      enum measure m = mg_var_add_value (mgv, value);
+      if (m != MEASURE_UNKNOWN)
+        {
+          var_set_measure (mgv->var, m);
+
+          mg_var_uninit (mgv);
+          *mgv = mg->vars[--mg->n_vars];
+        }
+      else
+        i++;
+    }
+}
+
+/* Destroys MG. */
+void
+measure_guesser_destroy (struct measure_guesser *mg)
+{
+  if (!mg)
+    return;
+
+  for (size_t i = 0; i < mg->n_vars; i++)
+    {
+      struct mg_var *mgv = &mg->vars[i];
+      var_set_measure (mgv->var, mg_var_interpret (mgv));
+      mg_var_uninit (mgv);
+    }
+  free (mg->vars);
+  free (mg);
+}
+
+/* Adds final measurement levels based on MG, after all the cases have been
+   added. */
+static void
+measure_guesser_commit (struct measure_guesser *mg)
+{
+  for (size_t i = 0; i < mg->n_vars; i++)
+    {
+      struct mg_var *mgv = &mg->vars[i];
+      var_set_measure (mgv->var, mg_var_interpret (mgv));
+    }
+}
+
+/* Passes the cases in READER through MG and uses the data in the cases to set
+   measurement levels for the variables where they were still unknown. */
+void
+measure_guesser_run (struct measure_guesser *mg,
+                     const struct casereader *reader)
+{
+  struct casereader *r = casereader_clone (reader);
+  while (mg->n_vars > 0)
+    {
+      struct ccase *c = casereader_read (r);
+      if (!c)
+        break;
+      measure_guesser_add_case (mg, c);
+      case_unref (c);
+    }
+  casereader_destroy (r);
+
+  measure_guesser_commit (mg);
+}
+\f
+/* A transformation for guessing measurement levels. */
+
+static enum trns_result
+mg_trns_proc (void *mg_, struct ccase **c, casenumber case_nr UNUSED)
+{
+  struct measure_guesser *mg = mg_;
+  measure_guesser_add_case (mg, *c);
+  return TRNS_CONTINUE;
+}
+
+static bool
+mg_trns_free (void *mg_)
+{
+  struct measure_guesser *mg = mg_;
+  measure_guesser_commit (mg);
+  measure_guesser_destroy (mg);
+  return true;
+}
+
+static const struct trns_class mg_trns_class = {
+  .name = "add measurement level",
+  .execute = mg_trns_proc,
+  .destroy = mg_trns_free,
+};
+
+static void
+add_measurement_level_trns (struct dataset *ds, struct dictionary *dict)
+{
+  struct measure_guesser *mg = measure_guesser_create__ (dict);
+  if (mg)
+    add_transformation (ds, &mg_trns_class, mg);
+}
+
+static void
+cancel_measurement_level_trns (struct trns_chain *chain)
+{
+  if (!chain->n)
+    return;
+
+  struct transformation *trns = &chain->xforms[chain->n - 1];
+  if (trns->class != &mg_trns_class)
+    return;
+
+  struct measure_guesser *mg = trns->aux;
+  measure_guesser_destroy (mg);
+  chain->n--;
+}
+\f
 static void
 dataset_changed__ (struct dataset *ds)
 {