Remove callbacks from struct dataset. Closes patch #6075
[pspp-builds.git] / src / data / procedure.c
index 46a18bb463297fdf735a6e30a2d2207c0dc45e9a..f280b6828a314b20a7e6a3c912fcab9f29da23d5 100644 (file)
@@ -1,20 +1,18 @@
-/* PSPP - computes sample statistics.
+/* PSPP - a program for statistical analysis.
    Copyright (C) 1997-9, 2000, 2006, 2007 Free Software Foundation, Inc.
 
-   This program is free software; you can redistribute it and/or
-   modify it under the terms of the GNU General Public License as
-   published by the Free Software Foundation; either version 2 of the
-   License, or (at your option) any later version.
+   This program is free software: you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation, either version 3 of the License, or
+   (at your option) any later version.
 
-   This program is distributed in the hope that it will be useful, but
-   WITHOUT ANY WARRANTY; without even the implied warranty of
-   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-   General Public License for more details.
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
 
    You should have received a copy of the GNU General Public License
-   along with this program; if not, write to the Free Software
-   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
-   02110-1301, USA. */
+   along with this program.  If not, see <http://www.gnu.org/licenses/>. */
 
 #include <config.h>
 
@@ -24,6 +22,7 @@
 #include <unistd.h>
 
 #include <data/case.h>
+#include <data/case-map.h>
 #include <data/caseinit.h>
 #include <data/casereader.h>
 #include <data/casereader-provider.h>
@@ -39,6 +38,7 @@
 #include <libpspp/str.h>
 #include <libpspp/taint.h>
 
+
 struct dataset {
   /* Cases are read from source,
      their transformation variables are initialized,
@@ -56,12 +56,10 @@ struct dataset {
   struct trns_chain *temporary_trns_chain;
   struct dictionary *dict;
 
-  /* Callback which occurs when a procedure provides a new source for
-     the dataset */
-  replace_source_callback *replace_source ;
-
-  /* Callback which occurs whenever the DICT is replaced by a new one */
-  replace_dictionary_callback *replace_dict;
+  /* Callback which occurs whenever the transformation chain(s) have
+     been modified */
+  transformation_change_callback_func *xform_callback;
+  void *xform_callback_aux;
 
   /* If true, cases are discarded instead of being written to
      sink. */
@@ -71,9 +69,9 @@ struct dataset {
      added to. */
   struct trns_chain *cur_trns_chain;
 
-  /* The compactor used to compact a case, if necessary;
+  /* The case map used to compact a case, if necessary;
      otherwise a null pointer. */
-  struct dict_compactor *compactor;
+  struct case_map *compactor;
 
   /* Time at which proc was last invoked. */
   time_t last_proc_invocation;
@@ -84,14 +82,15 @@ struct dataset {
   struct ccase *lag_cases;      /* Lagged cases managed by deque. */
 
   /* Procedure data. */
-  enum 
+  enum
     {
-      PROC_COMMITTED,
-      PROC_OPEN,
-      PROC_CLOSED 
+      PROC_COMMITTED,           /* No procedure in progress. */
+      PROC_OPEN,                /* proc_open called, casereader still open. */
+      PROC_CLOSED               /* casereader from proc_open destroyed,
+                                   but proc_commit not yet called. */
     }
   proc_state;
-  size_t cases_written;       /* Cases output so far. */
+  casenumber cases_written;       /* Cases output so far. */
   bool ok;                    /* Error status. */
 }; /* struct dataset */
 
@@ -163,15 +162,17 @@ proc_open (struct dataset *ds)
     ds->permanent_dict = ds->dict;
 
   /* Prepare sink. */
-  if (!ds->discard_output) 
+  if (!ds->discard_output)
     {
-      ds->compactor = (dict_compacting_would_shrink (ds->permanent_dict)
-                       ? dict_make_compactor (ds->permanent_dict)
+      struct dictionary *pd = ds->permanent_dict;
+      size_t compacted_value_cnt = dict_count_values (pd, 1u << DC_SCRATCH);
+      bool should_compact = compacted_value_cnt < dict_get_next_value_idx (pd);
+      ds->compactor = (should_compact
+                       ? case_map_to_compact_dict (pd, 1u << DC_SCRATCH)
                        : NULL);
-      ds->sink = autopaging_writer_create (dict_get_compacted_value_cnt (
-                                             ds->permanent_dict)); 
+      ds->sink = autopaging_writer_create (compacted_value_cnt);
     }
-  else 
+  else
     {
       ds->compactor = NULL;
       ds->sink = NULL;
@@ -193,29 +194,26 @@ proc_open (struct dataset *ds)
                                        &proc_casereader_class, ds);
 }
 
+/* Returns true if a procedure is in progress, that is, if
+   proc_open has been called but proc_commit has not. */
 bool
-proc_is_open (const struct dataset *ds) 
+proc_is_open (const struct dataset *ds)
 {
   return ds->proc_state != PROC_COMMITTED;
 }
 
-/* Reads the next case from dataset DS, which must have been
-   opened for reading with proc_open.
-   Returns true if successful, in which case a pointer to the
-   case is stored in *C.
-   Return false at end of file or if a read error occurs.  In
-   this case a null pointer is stored in *C. */
+/* "read" function for procedure casereader. */
 static bool
 proc_casereader_read (struct casereader *reader UNUSED, void *ds_,
-                      struct ccase *c) 
+                      struct ccase *c)
 {
   struct dataset *ds = ds_;
   enum trns_result retval = TRNS_DROP_CASE;
 
   assert (ds->proc_state == PROC_OPEN);
-  for (;;) 
+  for (;;)
     {
-      size_t case_nr;
+      casenumber case_nr;
 
       assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
       if (retval == TRNS_ERROR)
@@ -227,22 +225,21 @@ proc_casereader_read (struct casereader *reader UNUSED, void *ds_,
       if (!casereader_read (ds->source, c))
         return false;
       case_resize (c, dict_get_next_value_idx (ds->dict));
-      caseinit_init_reinit_vars (ds->caseinit, c);
-      caseinit_init_left_vars (ds->caseinit, c);
+      caseinit_init_vars (ds->caseinit, c);
 
       /* Execute permanent transformations.  */
       case_nr = ds->cases_written + 1;
       retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
-                                   c, &case_nr);
+                                   c, case_nr);
       caseinit_update_left_vars (ds->caseinit, c);
-      if (retval != TRNS_CONTINUE) 
+      if (retval != TRNS_CONTINUE)
         {
           case_destroy (c);
-          continue; 
+          continue;
         }
-  
+
       /* Write case to collection of lagged cases. */
-      if (ds->n_lag > 0) 
+      if (ds->n_lag > 0)
         {
           while (deque_count (&ds->lag) >= ds->n_lag)
             case_destroy (&ds->lag_cases[deque_pop_back (&ds->lag)]);
@@ -251,14 +248,11 @@ proc_casereader_read (struct casereader *reader UNUSED, void *ds_,
 
       /* Write case to replacement active file. */
       ds->cases_written++;
-      if (ds->sink != NULL) 
+      if (ds->sink != NULL)
         {
           struct ccase tmp;
-          if (ds->compactor != NULL) 
-            {
-              case_create (&tmp, dict_get_compacted_value_cnt (ds->dict));
-              dict_compactor_compact (ds->compactor, &tmp, c);
-            }
+          if (ds->compactor != NULL)
+            case_map_execute (ds->compactor, c, &tmp);
           else
             case_clone (&tmp, c);
           casewriter_write (ds->sink, &tmp);
@@ -268,7 +262,7 @@ proc_casereader_read (struct casereader *reader UNUSED, void *ds_,
       if (ds->temporary_trns_chain != NULL)
         {
           retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
-                                       c, &ds->cases_written);
+                                       c, ds->cases_written);
           if (retval != TRNS_CONTINUE)
             {
               case_destroy (c);
@@ -280,11 +274,7 @@ proc_casereader_read (struct casereader *reader UNUSED, void *ds_,
     }
 }
 
-/* Closes dataset DS for reading.
-   Returns true if successful, false if an I/O error occurred
-   while reading or closing the data set.
-   If DS has not been opened, returns true without doing
-   anything else. */
+/* "destroy" function for procedure casereader. */
 static void
 proc_casereader_destroy (struct casereader *reader, void *ds_)
 {
@@ -309,7 +299,7 @@ proc_casereader_destroy (struct casereader *reader, void *ds_)
    false, but the replacement active file may still be
    untainted.) */
 bool
-proc_commit (struct dataset *ds) 
+proc_commit (struct dataset *ds)
 {
   assert (ds->proc_state == PROC_CLOSED);
   ds->proc_state = PROC_COMMITTED;
@@ -322,27 +312,28 @@ proc_commit (struct dataset *ds)
   /* Dictionary from before TEMPORARY becomes permanent. */
   proc_cancel_temporary_transformations (ds);
 
-  if (!ds->discard_output) 
+  if (!ds->discard_output)
     {
       /* Finish compacting. */
-      if (ds->compactor != NULL) 
+      if (ds->compactor != NULL)
         {
-          dict_compactor_destroy (ds->compactor);
-          dict_compact_values (ds->dict);
+          case_map_destroy (ds->compactor);
           ds->compactor = NULL;
+
+          dict_delete_scratch_vars (ds->dict);
+          dict_compact_values (ds->dict);
         }
-    
+
       /* Old data sink becomes new data source. */
-      if (ds->sink != NULL) 
+      if (ds->sink != NULL)
         ds->source = casewriter_make_reader (ds->sink);
     }
-  else 
+  else
     {
       ds->source = NULL;
-      ds->discard_output = false; 
+      ds->discard_output = false;
     }
   ds->sink = NULL;
-  if ( ds->replace_source) ds->replace_source (ds->source);
 
   caseinit_clear (ds->caseinit);
   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
@@ -352,7 +343,8 @@ proc_commit (struct dataset *ds)
   return proc_cancel_all_transformations (ds) && ds->ok;
 }
 
-static struct casereader_class proc_casereader_class = 
+/* Casereader class for procedure execution. */
+static struct casereader_class proc_casereader_class =
   {
     proc_casereader_read,
     proc_casereader_destroy,
@@ -392,6 +384,10 @@ proc_capture_transformations (struct dataset *ds)
   assert (ds->temporary_trns_chain == NULL);
   chain = ds->permanent_trns_chain;
   ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
+
+  if ( ds->xform_callback)
+    ds->xform_callback (false, ds->xform_callback_aux);
+
   return chain;
 }
 
@@ -402,6 +398,8 @@ void
 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
 {
   trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
+  if ( ds->xform_callback)
+    ds->xform_callback (true, ds->xform_callback_aux);
 }
 
 /* Adds a transformation that processes a case with PROC and
@@ -416,6 +414,9 @@ add_transformation_with_finalizer (struct dataset *ds,
                                    trns_free_func *free, void *aux)
 {
   trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
+
+  if ( ds->xform_callback)
+    ds->xform_callback (true, ds->xform_callback_aux);
 }
 
 /* Returns the index of the next transformation.
@@ -450,6 +451,9 @@ proc_start_temporary_transformations (struct dataset *ds)
 
       trns_chain_finalize (ds->permanent_trns_chain);
       ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
+
+      if ( ds->xform_callback)
+       ds->xform_callback (true, ds->xform_callback_aux);
     }
 }
 
@@ -486,11 +490,14 @@ proc_cancel_temporary_transformations (struct dataset *ds)
       dict_destroy (ds->dict);
       ds->dict = ds->permanent_dict;
       ds->permanent_dict = NULL;
-      if (ds->replace_dict) ds->replace_dict (ds->dict);
 
       trns_chain_destroy (ds->temporary_trns_chain);
       ds->temporary_trns_chain = NULL;
 
+      if ( ds->xform_callback)
+       ds->xform_callback (!trns_chain_is_empty (ds->permanent_trns_chain),
+                           ds->xform_callback_aux);
+
       return true;
     }
   else
@@ -508,23 +515,33 @@ proc_cancel_all_transformations (struct dataset *ds)
   ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
   ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
   ds->temporary_trns_chain = NULL;
+  if ( ds->xform_callback)
+    ds->xform_callback (false, ds->xform_callback_aux);
+
   return ok;
 }
 \f
 /* Initializes procedure handling. */
 struct dataset *
-create_dataset (replace_source_callback *rps,
-               replace_dictionary_callback *rds)
+create_dataset (void)
 {
   struct dataset *ds = xzalloc (sizeof(*ds));
   ds->dict = dict_create ();
   ds->caseinit = caseinit_create ();
-  ds->replace_source = rps;
-  ds->replace_dict = rds;
   proc_cancel_all_transformations (ds);
   return ds;
 }
 
+
+void
+dataset_add_transform_change_callback (struct dataset *ds,
+                                      transformation_change_callback_func *cb,
+                                      void *aux)
+{
+  ds->xform_callback = cb;
+  ds->xform_callback_aux = aux;
+}
+
 /* Finishes up procedure handling. */
 void
 destroy_dataset (struct dataset *ds)
@@ -533,13 +550,16 @@ destroy_dataset (struct dataset *ds)
   dict_destroy (ds->dict);
   caseinit_destroy (ds->caseinit);
   trns_chain_destroy (ds->permanent_trns_chain);
+
+  if ( ds->xform_callback)
+    ds->xform_callback (false, ds->xform_callback_aux);
   free (ds);
 }
 
 /* Causes output from the next procedure to be discarded, instead
    of being preserved for use as input for the next procedure. */
 void
-proc_discard_output (struct dataset *ds) 
+proc_discard_output (struct dataset *ds)
 {
   ds->discard_output = true;
 }
@@ -555,10 +575,9 @@ proc_discard_active_file (struct dataset *ds)
   fh_set_default_handle (NULL);
 
   ds->n_lag = 0;
-  
+
   casereader_destroy (ds->source);
   ds->source = NULL;
-  if ( ds->replace_source) ds->replace_source (NULL);
 
   proc_cancel_all_transformations (ds);
 }
@@ -568,7 +587,7 @@ proc_discard_active_file (struct dataset *ds)
 void
 proc_set_active_file (struct dataset *ds,
                       struct casereader *source,
-                      struct dictionary *dict) 
+                      struct dictionary *dict)
 {
   assert (ds->proc_state == PROC_COMMITTED);
   assert (ds->dict != dict);
@@ -577,7 +596,6 @@ proc_set_active_file (struct dataset *ds,
 
   dict_destroy (ds->dict);
   ds->dict = dict;
-  if ( ds->replace_dict) ds->replace_dict (dict);
 
   proc_set_active_file_data (ds, source);
 }
@@ -585,11 +603,10 @@ proc_set_active_file (struct dataset *ds,
 /* Replaces the active file's data by READER without replacing
    the associated dictionary. */
 bool
-proc_set_active_file_data (struct dataset *ds, struct casereader *reader) 
+proc_set_active_file_data (struct dataset *ds, struct casereader *reader)
 {
   casereader_destroy (ds->source);
   ds->source = reader;
-  if (ds->replace_source) ds->replace_source (reader);
 
   caseinit_clear (ds->caseinit);
   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
@@ -600,32 +617,43 @@ proc_set_active_file_data (struct dataset *ds, struct casereader *reader)
 /* Returns true if an active file data source is available, false
    otherwise. */
 bool
-proc_has_active_file (const struct dataset *ds) 
+proc_has_active_file (const struct dataset *ds)
 {
   return ds->source != NULL;
 }
 
+/* Returns the active file data source from DS, or a null pointer
+   if DS has no data source, and removes it from DS. */
+struct casereader *
+proc_extract_active_file_data (struct dataset *ds)
+{
+  struct casereader *reader = ds->source;
+  ds->source = NULL;
+
+  return reader;
+}
+
 /* Checks whether DS has a corrupted active file.  If so,
    discards it and returns false.  If not, returns true without
    doing anything. */
 bool
-dataset_end_of_command (struct dataset *ds) 
+dataset_end_of_command (struct dataset *ds)
 {
-  if (ds->source != NULL) 
+  if (ds->source != NULL)
     {
-      if (casereader_error (ds->source)) 
+      if (casereader_error (ds->source))
         {
           proc_discard_active_file (ds);
           return false;
         }
-      else 
+      else
         {
           const struct taint *taint = casereader_get_taint (ds->source);
           taint_reset_successor_taint ((struct taint *) taint);
           assert (!taint_has_tainted_successor (taint));
         }
     }
-  return true; 
+  return true;
 }
 \f
 static trns_proc_func case_limit_trns_proc;
@@ -706,7 +734,13 @@ dataset_dict (const struct dataset *ds)
   return ds->dict;
 }
 
-void 
+const struct casereader *
+dataset_source (const struct dataset *ds)
+{
+  return ds->source;
+}
+
+void
 dataset_need_lag (struct dataset *ds, int n_before)
 {
   ds->n_lag = MAX (ds->n_lag, n_before);