Remove callbacks from struct dataset. Closes patch #6075
[pspp-builds.git] / src / data / procedure.c
index 55fe5a48ad8b8ea69d66f5234549af61444325df..f280b6828a314b20a7e6a3c912fcab9f29da23d5 100644 (file)
@@ -1,20 +1,18 @@
-/* PSPP - computes sample statistics.
+/* PSPP - a program for statistical analysis.
    Copyright (C) 1997-9, 2000, 2006, 2007 Free Software Foundation, Inc.
 
-   This program is free software; you can redistribute it and/or
-   modify it under the terms of the GNU General Public License as
-   published by the Free Software Foundation; either version 2 of the
-   License, or (at your option) any later version.
+   This program is free software: you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation, either version 3 of the License, or
+   (at your option) any later version.
 
-   This program is distributed in the hope that it will be useful, but
-   WITHOUT ANY WARRANTY; without even the implied warranty of
-   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-   General Public License for more details.
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
 
    You should have received a copy of the GNU General Public License
-   along with this program; if not, write to the Free Software
-   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
-   02110-1301, USA. */
+   along with this program.  If not, see <http://www.gnu.org/licenses/>. */
 
 #include <config.h>
 
@@ -24,6 +22,7 @@
 #include <unistd.h>
 
 #include <data/case.h>
+#include <data/case-map.h>
 #include <data/caseinit.h>
 #include <data/casereader.h>
 #include <data/casereader-provider.h>
@@ -39,6 +38,7 @@
 #include <libpspp/str.h>
 #include <libpspp/taint.h>
 
+
 struct dataset {
   /* Cases are read from source,
      their transformation variables are initialized,
@@ -56,12 +56,10 @@ struct dataset {
   struct trns_chain *temporary_trns_chain;
   struct dictionary *dict;
 
-  /* Callback which occurs when a procedure provides a new source for
-     the dataset */
-  replace_source_callback *replace_source ;
-
-  /* Callback which occurs whenever the DICT is replaced by a new one */
-  replace_dictionary_callback *replace_dict;
+  /* Callback which occurs whenever the transformation chain(s) have
+     been modified */
+  transformation_change_callback_func *xform_callback;
+  void *xform_callback_aux;
 
   /* If true, cases are discarded instead of being written to
      sink. */
@@ -71,9 +69,9 @@ struct dataset {
      added to. */
   struct trns_chain *cur_trns_chain;
 
-  /* The compactor used to compact a case, if necessary;
+  /* The case map used to compact a case, if necessary;
      otherwise a null pointer. */
-  struct dict_compactor *compactor;
+  struct case_map *compactor;
 
   /* Time at which proc was last invoked. */
   time_t last_proc_invocation;
@@ -86,12 +84,13 @@ struct dataset {
   /* Procedure data. */
   enum
     {
-      PROC_COMMITTED,
-      PROC_OPEN,
-      PROC_CLOSED
+      PROC_COMMITTED,           /* No procedure in progress. */
+      PROC_OPEN,                /* proc_open called, casereader still open. */
+      PROC_CLOSED               /* casereader from proc_open destroyed,
+                                   but proc_commit not yet called. */
     }
   proc_state;
-  size_t cases_written;       /* Cases output so far. */
+  casenumber cases_written;       /* Cases output so far. */
   bool ok;                    /* Error status. */
 }; /* struct dataset */
 
@@ -165,11 +164,13 @@ proc_open (struct dataset *ds)
   /* Prepare sink. */
   if (!ds->discard_output)
     {
-      ds->compactor = (dict_compacting_would_shrink (ds->permanent_dict)
-                       ? dict_make_compactor (ds->permanent_dict)
+      struct dictionary *pd = ds->permanent_dict;
+      size_t compacted_value_cnt = dict_count_values (pd, 1u << DC_SCRATCH);
+      bool should_compact = compacted_value_cnt < dict_get_next_value_idx (pd);
+      ds->compactor = (should_compact
+                       ? case_map_to_compact_dict (pd, 1u << DC_SCRATCH)
                        : NULL);
-      ds->sink = autopaging_writer_create (dict_get_compacted_value_cnt (
-                                             ds->permanent_dict));
+      ds->sink = autopaging_writer_create (compacted_value_cnt);
     }
   else
     {
@@ -193,18 +194,15 @@ proc_open (struct dataset *ds)
                                        &proc_casereader_class, ds);
 }
 
+/* Returns true if a procedure is in progress, that is, if
+   proc_open has been called but proc_commit has not. */
 bool
 proc_is_open (const struct dataset *ds)
 {
   return ds->proc_state != PROC_COMMITTED;
 }
 
-/* Reads the next case from dataset DS, which must have been
-   opened for reading with proc_open.
-   Returns true if successful, in which case a pointer to the
-   case is stored in *C.
-   Return false at end of file or if a read error occurs.  In
-   this case a null pointer is stored in *C. */
+/* "read" function for procedure casereader. */
 static bool
 proc_casereader_read (struct casereader *reader UNUSED, void *ds_,
                       struct ccase *c)
@@ -215,7 +213,7 @@ proc_casereader_read (struct casereader *reader UNUSED, void *ds_,
   assert (ds->proc_state == PROC_OPEN);
   for (;;)
     {
-      size_t case_nr;
+      casenumber case_nr;
 
       assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
       if (retval == TRNS_ERROR)
@@ -227,13 +225,12 @@ proc_casereader_read (struct casereader *reader UNUSED, void *ds_,
       if (!casereader_read (ds->source, c))
         return false;
       case_resize (c, dict_get_next_value_idx (ds->dict));
-      caseinit_init_reinit_vars (ds->caseinit, c);
-      caseinit_init_left_vars (ds->caseinit, c);
+      caseinit_init_vars (ds->caseinit, c);
 
       /* Execute permanent transformations.  */
       case_nr = ds->cases_written + 1;
       retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
-                                   c, &case_nr);
+                                   c, case_nr);
       caseinit_update_left_vars (ds->caseinit, c);
       if (retval != TRNS_CONTINUE)
         {
@@ -255,10 +252,7 @@ proc_casereader_read (struct casereader *reader UNUSED, void *ds_,
         {
           struct ccase tmp;
           if (ds->compactor != NULL)
-            {
-              case_create (&tmp, dict_get_compacted_value_cnt (ds->dict));
-              dict_compactor_compact (ds->compactor, &tmp, c);
-            }
+            case_map_execute (ds->compactor, c, &tmp);
           else
             case_clone (&tmp, c);
           casewriter_write (ds->sink, &tmp);
@@ -268,7 +262,7 @@ proc_casereader_read (struct casereader *reader UNUSED, void *ds_,
       if (ds->temporary_trns_chain != NULL)
         {
           retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
-                                       c, &ds->cases_written);
+                                       c, ds->cases_written);
           if (retval != TRNS_CONTINUE)
             {
               case_destroy (c);
@@ -280,11 +274,7 @@ proc_casereader_read (struct casereader *reader UNUSED, void *ds_,
     }
 }
 
-/* Closes dataset DS for reading.
-   Returns true if successful, false if an I/O error occurred
-   while reading or closing the data set.
-   If DS has not been opened, returns true without doing
-   anything else. */
+/* "destroy" function for procedure casereader. */
 static void
 proc_casereader_destroy (struct casereader *reader, void *ds_)
 {
@@ -327,9 +317,11 @@ proc_commit (struct dataset *ds)
       /* Finish compacting. */
       if (ds->compactor != NULL)
         {
-          dict_compactor_destroy (ds->compactor);
-          dict_compact_values (ds->dict);
+          case_map_destroy (ds->compactor);
           ds->compactor = NULL;
+
+          dict_delete_scratch_vars (ds->dict);
+          dict_compact_values (ds->dict);
         }
 
       /* Old data sink becomes new data source. */
@@ -342,7 +334,6 @@ proc_commit (struct dataset *ds)
       ds->discard_output = false;
     }
   ds->sink = NULL;
-  if ( ds->replace_source) ds->replace_source (ds->source);
 
   caseinit_clear (ds->caseinit);
   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
@@ -352,6 +343,7 @@ proc_commit (struct dataset *ds)
   return proc_cancel_all_transformations (ds) && ds->ok;
 }
 
+/* Casereader class for procedure execution. */
 static struct casereader_class proc_casereader_class =
   {
     proc_casereader_read,
@@ -392,6 +384,10 @@ proc_capture_transformations (struct dataset *ds)
   assert (ds->temporary_trns_chain == NULL);
   chain = ds->permanent_trns_chain;
   ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
+
+  if ( ds->xform_callback)
+    ds->xform_callback (false, ds->xform_callback_aux);
+
   return chain;
 }
 
@@ -402,6 +398,8 @@ void
 add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
 {
   trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
+  if ( ds->xform_callback)
+    ds->xform_callback (true, ds->xform_callback_aux);
 }
 
 /* Adds a transformation that processes a case with PROC and
@@ -416,6 +414,9 @@ add_transformation_with_finalizer (struct dataset *ds,
                                    trns_free_func *free, void *aux)
 {
   trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
+
+  if ( ds->xform_callback)
+    ds->xform_callback (true, ds->xform_callback_aux);
 }
 
 /* Returns the index of the next transformation.
@@ -450,6 +451,9 @@ proc_start_temporary_transformations (struct dataset *ds)
 
       trns_chain_finalize (ds->permanent_trns_chain);
       ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
+
+      if ( ds->xform_callback)
+       ds->xform_callback (true, ds->xform_callback_aux);
     }
 }
 
@@ -486,11 +490,14 @@ proc_cancel_temporary_transformations (struct dataset *ds)
       dict_destroy (ds->dict);
       ds->dict = ds->permanent_dict;
       ds->permanent_dict = NULL;
-      if (ds->replace_dict) ds->replace_dict (ds->dict);
 
       trns_chain_destroy (ds->temporary_trns_chain);
       ds->temporary_trns_chain = NULL;
 
+      if ( ds->xform_callback)
+       ds->xform_callback (!trns_chain_is_empty (ds->permanent_trns_chain),
+                           ds->xform_callback_aux);
+
       return true;
     }
   else
@@ -508,23 +515,33 @@ proc_cancel_all_transformations (struct dataset *ds)
   ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
   ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
   ds->temporary_trns_chain = NULL;
+  if ( ds->xform_callback)
+    ds->xform_callback (false, ds->xform_callback_aux);
+
   return ok;
 }
 \f
 /* Initializes procedure handling. */
 struct dataset *
-create_dataset (replace_source_callback *rps,
-               replace_dictionary_callback *rds)
+create_dataset (void)
 {
   struct dataset *ds = xzalloc (sizeof(*ds));
   ds->dict = dict_create ();
   ds->caseinit = caseinit_create ();
-  ds->replace_source = rps;
-  ds->replace_dict = rds;
   proc_cancel_all_transformations (ds);
   return ds;
 }
 
+
+void
+dataset_add_transform_change_callback (struct dataset *ds,
+                                      transformation_change_callback_func *cb,
+                                      void *aux)
+{
+  ds->xform_callback = cb;
+  ds->xform_callback_aux = aux;
+}
+
 /* Finishes up procedure handling. */
 void
 destroy_dataset (struct dataset *ds)
@@ -533,6 +550,9 @@ destroy_dataset (struct dataset *ds)
   dict_destroy (ds->dict);
   caseinit_destroy (ds->caseinit);
   trns_chain_destroy (ds->permanent_trns_chain);
+
+  if ( ds->xform_callback)
+    ds->xform_callback (false, ds->xform_callback_aux);
   free (ds);
 }
 
@@ -558,7 +578,6 @@ proc_discard_active_file (struct dataset *ds)
 
   casereader_destroy (ds->source);
   ds->source = NULL;
-  if ( ds->replace_source) ds->replace_source (NULL);
 
   proc_cancel_all_transformations (ds);
 }
@@ -577,7 +596,6 @@ proc_set_active_file (struct dataset *ds,
 
   dict_destroy (ds->dict);
   ds->dict = dict;
-  if ( ds->replace_dict) ds->replace_dict (dict);
 
   proc_set_active_file_data (ds, source);
 }
@@ -589,7 +607,6 @@ proc_set_active_file_data (struct dataset *ds, struct casereader *reader)
 {
   casereader_destroy (ds->source);
   ds->source = reader;
-  if (ds->replace_source) ds->replace_source (reader);
 
   caseinit_clear (ds->caseinit);
   caseinit_mark_as_preinited (ds->caseinit, ds->dict);
@@ -605,6 +622,17 @@ proc_has_active_file (const struct dataset *ds)
   return ds->source != NULL;
 }
 
+/* Returns the active file data source from DS, or a null pointer
+   if DS has no data source, and removes it from DS. */
+struct casereader *
+proc_extract_active_file_data (struct dataset *ds)
+{
+  struct casereader *reader = ds->source;
+  ds->source = NULL;
+
+  return reader;
+}
+
 /* Checks whether DS has a corrupted active file.  If so,
    discards it and returns false.  If not, returns true without
    doing anything. */
@@ -706,6 +734,12 @@ dataset_dict (const struct dataset *ds)
   return ds->dict;
 }
 
+const struct casereader *
+dataset_source (const struct dataset *ds)
+{
+  return ds->source;
+}
+
 void
 dataset_need_lag (struct dataset *ds, int n_before)
 {