Merge commit 'origin/stable'
[pspp-builds.git] / src / data / procedure.c
index 0741b1cc944ce479b6214aba0933d624cb8611c8..b762214d10e20a33d6bbdf89ac52253a67589453 100644 (file)
@@ -1,20 +1,18 @@
-/* PSPP - computes sample statistics.
-   Copyright (C) 1997-9, 2000, 2006, 2007 Free Software Foundation, Inc.
+/* PSPP - a program for statistical analysis.
+   Copyright (C) 1997-9, 2000, 2006, 2007, 2009 Free Software Foundation, Inc.
 
-   This program is free software; you can redistribute it and/or
-   modify it under the terms of the GNU General Public License as
-   published by the Free Software Foundation; either version 2 of the
-   License, or (at your option) any later version.
+   This program is free software: you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation, either version 3 of the License, or
+   (at your option) any later version.
 
-   This program is distributed in the hope that it will be useful, but
-   WITHOUT ANY WARRANTY; without even the implied warranty of
-   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-   General Public License for more details.
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
 
    You should have received a copy of the GNU General Public License
-   along with this program; if not, write to the Free Software
-   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
-   02110-1301, USA. */
+   along with this program.  If not, see <http://www.gnu.org/licenses/>. */
 
 #include <config.h>
 
@@ -24,6 +22,7 @@
 #include <unistd.h>
 
 #include <data/case.h>
+#include <data/case-map.h>
 #include <data/caseinit.h>
 #include <data/casereader.h>
 #include <data/casereader-provider.h>
 #include <data/procedure.h>
 #include <data/transformations.h>
 #include <data/variable.h>
-#include <libpspp/alloc.h>
 #include <libpspp/deque.h>
 #include <libpspp/misc.h>
 #include <libpspp/str.h>
 #include <libpspp/taint.h>
+#include <libpspp/i18n.h>
+
+#include "minmax.h"
+#include "xalloc.h"
 
 struct dataset {
   /* Cases are read from source,
@@ -56,12 +58,10 @@ struct dataset {
   struct trns_chain *temporary_trns_chain;
   struct dictionary *dict;
 
-  /* Callback which occurs when a procedure provides a new source for
-     the dataset */
-  replace_source_callback *replace_source ;
-
-  /* Callback which occurs whenever the DICT is replaced by a new one */
-  replace_dictionary_callback *replace_dict;
+  /* Callback which occurs whenever the transformation chain(s) have
+     been modified */
+  transformation_change_callback_func *xform_callback;
+  void *xform_callback_aux;
 
   /* If true, cases are discarded instead of being written to
      sink. */
@@ -71,9 +71,9 @@ struct dataset {
      added to. */
   struct trns_chain *cur_trns_chain;
 
-  /* The compactor used to compact a case, if necessary;
+  /* The case map used to compact a case, if necessary;
      otherwise a null pointer. */
-  struct dict_compactor *compactor;
+  struct case_map *compactor;
 
   /* Time at which proc was last invoked. */
   time_t last_proc_invocation;
@@ -81,7 +81,7 @@ struct dataset {
   /* Cases just before ("lagging") the current one. */
   int n_lag;                   /* Number of cases to lag. */
   struct deque lag;             /* Deque of lagged cases. */
-  struct ccase *lag_cases;      /* Lagged cases managed by deque. */
+  struct ccase **lag_cases;     /* Lagged cases managed by deque. */
 
   /* Procedure data. */
   enum
@@ -92,8 +92,12 @@ struct dataset {
                                    but proc_commit not yet called. */
     }
   proc_state;
-  size_t cases_written;       /* Cases output so far. */
+  casenumber cases_written;       /* Cases output so far. */
   bool ok;                    /* Error status. */
+
+  void (*callback) (void *); /* Callback for when the dataset changes */
+  void *cb_data;
+
 }; /* struct dataset */
 
 
@@ -101,9 +105,24 @@ static void add_case_limit_trns (struct dataset *ds);
 static void add_filter_trns (struct dataset *ds);
 
 static void update_last_proc_invocation (struct dataset *ds);
+
+static void
+dataset_set_unsaved (const struct dataset *ds)
+{
+  if (ds->callback) ds->callback (ds->cb_data);
+}
+
 \f
 /* Public functions. */
 
+void
+dataset_set_callback (struct dataset *ds, void (*cb) (void *), void *cb_data)
+{
+  ds->callback = cb;
+  ds->cb_data = cb_data;
+}
+
+
 /* Returns the last time the data was read. */
 time_t
 time_of_last_procedure (struct dataset *ds)
@@ -139,7 +158,7 @@ proc_execute (struct dataset *ds)
   return proc_commit (ds) && ok;
 }
 
-static struct casereader_class proc_casereader_class;
+static const struct casereader_class proc_casereader_class;
 
 /* Opens dataset DS for reading cases with proc_read.
    proc_commit must be called when done. */
@@ -166,11 +185,21 @@ proc_open (struct dataset *ds)
   /* Prepare sink. */
   if (!ds->discard_output)
     {
-      ds->compactor = (dict_compacting_would_shrink (ds->permanent_dict)
-                       ? dict_make_compactor (ds->permanent_dict)
-                       : NULL);
-      ds->sink = autopaging_writer_create (dict_get_compacted_value_cnt (
-                                             ds->permanent_dict));
+      struct dictionary *pd = ds->permanent_dict;
+      size_t compacted_value_cnt = dict_count_values (pd, 1u << DC_SCRATCH);
+      if (compacted_value_cnt < dict_get_next_value_idx (pd))
+        {
+          struct caseproto *compacted_proto;
+          compacted_proto = dict_get_compacted_proto (pd, 1u << DC_SCRATCH);
+          ds->compactor = case_map_to_compact_dict (pd, 1u << DC_SCRATCH);
+          ds->sink = autopaging_writer_create (compacted_proto);
+          caseproto_unref (compacted_proto);
+        }
+      else
+        {
+          ds->compactor = NULL;
+          ds->sink = autopaging_writer_create (dict_get_proto (pd));
+        }
     }
   else
     {
@@ -188,8 +217,7 @@ proc_open (struct dataset *ds)
   /* FIXME: use taint in dataset in place of `ok'? */
   /* FIXME: for trivial cases we can just return a clone of
      ds->source? */
-  return casereader_create_sequential (NULL,
-                                       dict_get_next_value_idx (ds->dict),
+  return casereader_create_sequential (NULL, dict_get_proto (ds->dict),
                                        CASENUMBER_MAX,
                                        &proc_casereader_class, ds);
 }
@@ -203,77 +231,63 @@ proc_is_open (const struct dataset *ds)
 }
 
 /* "read" function for procedure casereader. */
-static bool
-proc_casereader_read (struct casereader *reader UNUSED, void *ds_,
-                      struct ccase *c)
+static struct ccase *
+proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
 {
   struct dataset *ds = ds_;
   enum trns_result retval = TRNS_DROP_CASE;
+  struct ccase *c;
 
   assert (ds->proc_state == PROC_OPEN);
-  for (;;)
+  for (; ; case_unref (c))
     {
-      size_t case_nr;
+      casenumber case_nr;
 
       assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
       if (retval == TRNS_ERROR)
         ds->ok = false;
       if (!ds->ok)
-        return false;
+        return NULL;
 
       /* Read a case from source. */
-      if (!casereader_read (ds->source, c))
-        return false;
-      case_resize (c, dict_get_next_value_idx (ds->dict));
+      c = casereader_read (ds->source);
+      if (c == NULL)
+        return NULL;
+      c = case_unshare_and_resize (c, dict_get_proto (ds->dict));
       caseinit_init_vars (ds->caseinit, c);
 
       /* Execute permanent transformations.  */
       case_nr = ds->cases_written + 1;
       retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
-                                   c, &case_nr);
+                                   &c, case_nr);
       caseinit_update_left_vars (ds->caseinit, c);
       if (retval != TRNS_CONTINUE)
-        {
-          case_destroy (c);
-          continue;
-        }
+        continue;
 
       /* Write case to collection of lagged cases. */
       if (ds->n_lag > 0)
         {
           while (deque_count (&ds->lag) >= ds->n_lag)
-            case_destroy (&ds->lag_cases[deque_pop_back (&ds->lag)]);
-          case_clone (&ds->lag_cases[deque_push_front (&ds->lag)], c);
+            case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
+          ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c);
         }
 
       /* Write case to replacement active file. */
       ds->cases_written++;
       if (ds->sink != NULL)
-        {
-          struct ccase tmp;
-          if (ds->compactor != NULL)
-            {
-              case_create (&tmp, dict_get_compacted_value_cnt (ds->dict));
-              dict_compactor_compact (ds->compactor, &tmp, c);
-            }
-          else
-            case_clone (&tmp, c);
-          casewriter_write (ds->sink, &tmp);
-        }
+        casewriter_write (ds->sink,
+                          case_map_execute (ds->compactor, case_ref (c)));
 
       /* Execute temporary transformations. */
       if (ds->temporary_trns_chain != NULL)
         {
           retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
-                                       c, &ds->cases_written);
+                                       &c, ds->cases_written);
           if (retval != TRNS_CONTINUE)
-            {
-              case_destroy (c);
-              continue;
-            }
+            continue;
         }
 
-      return true;
+      return c;
     }
 }
 
@@ -282,13 +296,13 @@ static void
 proc_casereader_destroy (struct casereader *reader, void *ds_)
 {
   struct dataset *ds = ds_;
-  struct ccase c;
+  struct ccase *c;
 
   /* Make sure transformations happen for every input case, in
      case they have side effects, and ensure that the replacement
      active file gets all the cases it should. */
-  while (casereader_read (reader, &c))
-    case_destroy (&c);
+  while ((c = casereader_read (reader)) != NULL)
+    case_unref (c);
 
   ds->proc_state = PROC_CLOSED;
   ds->ok = casereader_destroy (ds->source) && ds->ok;
@@ -307,9 +321,11 @@ proc_commit (struct dataset *ds)
   assert (ds->proc_state == PROC_CLOSED);
   ds->proc_state = PROC_COMMITTED;
 
+  dataset_set_unsaved (ds);
+
   /* Free memory for lagged cases. */
   while (!deque_is_empty (&ds->lag))
-    case_destroy (&ds->lag_cases[deque_pop_back (&ds->lag)]);
+    case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
   free (ds->lag_cases);
 
   /* Dictionary from before TEMPORARY becomes permanent. */
@@ -320,9 +336,11 @@ proc_commit (struct dataset *ds)
       /* Finish compacting. */
       if (ds->compactor != NULL)
         {
-          dict_compactor_destroy (ds->compactor);
-          dict_compact_values (ds->dict);
+          case_map_destroy (ds->compactor);
           ds->compactor = NULL;
+
+          dict_delete_scratch_vars (ds->dict);
+          dict_compact_values (ds->dict);
         }
 
       /* Old data sink becomes new data source. */
@@ -335,7 +353,6 @@ proc_commit (struct dataset *ds)
       ds->discard_output = false;
     }
   ds->sink = NULL;
-  if ( ds->replace_source) ds->replace_source (ds->source);
 
   caseinit_clear (ds->caseinit);
   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
@@ -346,7 +363,7 @@ proc_commit (struct dataset *ds)
 }
 
 /* Casereader class for procedure execution. */
-static struct casereader_class proc_casereader_class =
+static const struct casereader_class proc_casereader_class =
   {
     proc_casereader_read,
     proc_casereader_destroy,
@@ -363,14 +380,14 @@ update_last_proc_invocation (struct dataset *ds)
 \f
 /* Returns a pointer to the lagged case from N_BEFORE cases before the
    current one, or NULL if there haven't been that many cases yet. */
-struct ccase *
+const struct ccase *
 lagged_case (const struct dataset *ds, int n_before)
 {
   assert (n_before >= 1);
   assert (n_before <= ds->n_lag);
 
   if (n_before <= deque_count (&ds->lag))
-    return &ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
+    return ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
   else
     return NULL;
 }
@@ -386,6 +403,10 @@ proc_capture_transformations (struct dataset *ds)
   assert (ds->temporary_trns_chain == NULL);
   chain = ds->permanent_trns_chain;
   ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
+
+  if ( ds->xform_callback)
+    ds->xform_callback (false, ds->xform_callback_aux);
+
   return chain;
 }
 
@@ -396,6 +417,8 @@ void
 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
 {
   trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
+  if ( ds->xform_callback)
+    ds->xform_callback (true, ds->xform_callback_aux);
 }
 
 /* Adds a transformation that processes a case with PROC and
@@ -410,6 +433,9 @@ add_transformation_with_finalizer (struct dataset *ds,
                                    trns_free_func *free, void *aux)
 {
   trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
+
+  if ( ds->xform_callback)
+    ds->xform_callback (true, ds->xform_callback_aux);
 }
 
 /* Returns the index of the next transformation.
@@ -444,6 +470,9 @@ proc_start_temporary_transformations (struct dataset *ds)
 
       trns_chain_finalize (ds->permanent_trns_chain);
       ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
+
+      if ( ds->xform_callback)
+       ds->xform_callback (true, ds->xform_callback_aux);
     }
 }
 
@@ -480,11 +509,14 @@ proc_cancel_temporary_transformations (struct dataset *ds)
       dict_destroy (ds->dict);
       ds->dict = ds->permanent_dict;
       ds->permanent_dict = NULL;
-      if (ds->replace_dict) ds->replace_dict (ds->dict);
 
       trns_chain_destroy (ds->temporary_trns_chain);
       ds->temporary_trns_chain = NULL;
 
+      if ( ds->xform_callback)
+       ds->xform_callback (!trns_chain_is_empty (ds->permanent_trns_chain),
+                           ds->xform_callback_aux);
+
       return true;
     }
   else
@@ -502,23 +534,46 @@ proc_cancel_all_transformations (struct dataset *ds)
   ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
   ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
   ds->temporary_trns_chain = NULL;
+  if ( ds->xform_callback)
+    ds->xform_callback (false, ds->xform_callback_aux);
+
   return ok;
 }
 \f
+
+static void
+dict_callback (struct dictionary *d UNUSED, void *ds_)
+{
+  struct dataset *ds = ds_;
+  dataset_set_unsaved (ds);
+}
+
 /* Initializes procedure handling. */
 struct dataset *
-create_dataset (replace_source_callback *rps,
-               replace_dictionary_callback *rds)
+create_dataset (void)
 {
   struct dataset *ds = xzalloc (sizeof(*ds));
   ds->dict = dict_create ();
+
+  dict_set_change_callback (ds->dict, dict_callback, ds);
+
+  dict_set_encoding (ds->dict, get_default_encoding ());
+
   ds->caseinit = caseinit_create ();
-  ds->replace_source = rps;
-  ds->replace_dict = rds;
   proc_cancel_all_transformations (ds);
   return ds;
 }
 
+
+void
+dataset_add_transform_change_callback (struct dataset *ds,
+                                      transformation_change_callback_func *cb,
+                                      void *aux)
+{
+  ds->xform_callback = cb;
+  ds->xform_callback_aux = aux;
+}
+
 /* Finishes up procedure handling. */
 void
 destroy_dataset (struct dataset *ds)
@@ -527,6 +582,9 @@ destroy_dataset (struct dataset *ds)
   dict_destroy (ds->dict);
   caseinit_destroy (ds->caseinit);
   trns_chain_destroy (ds->permanent_trns_chain);
+
+  if ( ds->xform_callback)
+    ds->xform_callback (false, ds->xform_callback_aux);
   free (ds);
 }
 
@@ -552,7 +610,6 @@ proc_discard_active_file (struct dataset *ds)
 
   casereader_destroy (ds->source);
   ds->source = NULL;
-  if ( ds->replace_source) ds->replace_source (NULL);
 
   proc_cancel_all_transformations (ds);
 }
@@ -571,7 +628,7 @@ proc_set_active_file (struct dataset *ds,
 
   dict_destroy (ds->dict);
   ds->dict = dict;
-  if ( ds->replace_dict) ds->replace_dict (dict);
+  dict_set_change_callback (ds->dict, dict_callback, ds);
 
   proc_set_active_file_data (ds, source);
 }
@@ -583,7 +640,6 @@ proc_set_active_file_data (struct dataset *ds, struct casereader *reader)
 {
   casereader_destroy (ds->source);
   ds->source = reader;
-  if (ds->replace_source) ds->replace_source (reader);
 
   caseinit_clear (ds->caseinit);
   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
@@ -599,6 +655,17 @@ proc_has_active_file (const struct dataset *ds)
   return ds->source != NULL;
 }
 
+/* Returns the active file data source from DS, or a null pointer
+   if DS has no data source, and removes it from DS. */
+struct casereader *
+proc_extract_active_file_data (struct dataset *ds)
+{
+  struct casereader *reader = ds->source;
+  ds->source = NULL;
+
+  return reader;
+}
+
 /* Checks whether DS has a corrupted active file.  If so,
    discards it and returns false.  If not, returns true without
    doing anything. */
@@ -630,10 +697,10 @@ static trns_free_func case_limit_trns_free;
 static void
 add_case_limit_trns (struct dataset *ds)
 {
-  size_t case_limit = dict_get_case_limit (ds->dict);
+  casenumber case_limit = dict_get_case_limit (ds->dict);
   if (case_limit != 0)
     {
-      size_t *cases_remaining = xmalloc (sizeof *cases_remaining);
+      casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
       *cases_remaining = case_limit;
       add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
                           cases_remaining);
@@ -645,7 +712,7 @@ add_case_limit_trns (struct dataset *ds)
    *CASES_REMAINING. */
 static int
 case_limit_trns_proc (void *cases_remaining_,
-                      struct ccase *c UNUSED, casenumber case_nr UNUSED)
+                      struct ccase **c UNUSED, casenumber case_nr UNUSED)
 {
   size_t *cases_remaining = cases_remaining_;
   if (*cases_remaining > 0)
@@ -684,11 +751,11 @@ add_filter_trns (struct dataset *ds)
 /* FILTER transformation. */
 static int
 filter_trns_proc (void *filter_var_,
-                  struct ccase *c UNUSED, casenumber case_nr UNUSED)
+                  struct ccase **c UNUSED, casenumber case_nr UNUSED)
 
 {
   struct variable *filter_var = filter_var_;
-  double f = case_num (c, filter_var);
+  double f = case_num (*c, filter_var);
   return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
           ? TRNS_CONTINUE : TRNS_DROP_CASE);
 }
@@ -700,6 +767,12 @@ dataset_dict (const struct dataset *ds)
   return ds->dict;
 }
 
+const struct casereader *
+dataset_source (const struct dataset *ds)
+{
+  return ds->source;
+}
+
 void
 dataset_need_lag (struct dataset *ds, int n_before)
 {