Patch #5672
[pspp-builds.git] / src / data / procedure.c
index b59f128299ecce3efd0056922360ec6acb59fba9..b6f988aabbaf46b2707940aca2e646504ced5ffa 100644 (file)
@@ -1,6 +1,5 @@
 /* PSPP - computes sample statistics.
 /* PSPP - computes sample statistics.
-   Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
-   Written by Ben Pfaff <blp@gnu.org>.
+   Copyright (C) 1997-9, 2000, 2006, 2007 Free Software Foundation, Inc.
 
    This program is free software; you can redistribute it and/or
    modify it under the terms of the GNU General Public License as
 
    This program is free software; you can redistribute it and/or
    modify it under the terms of the GNU General Public License as
 #include <libpspp/misc.h>
 #include <libpspp/str.h>
 
 #include <libpspp/misc.h>
 #include <libpspp/str.h>
 
-/* Procedure execution data. */
-struct write_case_data
-  {
-    /* Function to call for each case. */
-    case_func *proc;
-    void *aux;
+struct dataset {
 
 
-    struct dataset *dataset;    /* The dataset concerned */
-    struct ccase trns_case;     /* Case used for transformations. */
-    struct ccase sink_case;     /* Case written to sink, if
-                                   compacting is necessary. */
-    size_t cases_written;       /* Cases output so far. */
-  };
+  /* An abstract factory which creates casefiles */
+  struct casefile_factory *cf_factory;
+
+  /* Callback which occurs when a procedure provides a new source for
+     the dataset */
+  replace_source_callback *replace_source ;
+
+  /* Callback which occurs whenever the DICT is replaced by a new one */
+  replace_dictionary_callback *replace_dict;
 
 
-struct dataset {
   /* Cases are read from proc_source,
      pass through permanent_trns_chain (which transforms them into
      the format described by permanent_dict),
   /* Cases are read from proc_source,
      pass through permanent_trns_chain (which transforms them into
      the format described by permanent_dict),
@@ -85,6 +81,13 @@ struct dataset {
   int lag_head;                /* Index where next case will be added. */
   struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
 
   int lag_head;                /* Index where next case will be added. */
   struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
 
+  /* Procedure data. */
+  bool is_open;               /* Procedure open? */
+  struct ccase trns_case;     /* Case used for transformations. */
+  struct ccase sink_case;     /* Case written to sink, if
+                                 compacting is necessary. */
+  size_t cases_written;       /* Cases output so far. */
+  bool ok;
 }; /* struct dataset */
 
 
 }; /* struct dataset */
 
 
@@ -97,7 +100,6 @@ static bool internal_procedure (struct dataset *ds, case_func *,
 static void update_last_proc_invocation (struct dataset *ds);
 static void create_trns_case (struct ccase *, struct dictionary *);
 static void open_active_file (struct dataset *ds);
 static void update_last_proc_invocation (struct dataset *ds);
 static void create_trns_case (struct ccase *, struct dictionary *);
 static void open_active_file (struct dataset *ds);
-static bool write_case (struct write_case_data *wc_data);
 static void lag_case (struct dataset *ds, const struct ccase *c);
 static void clear_case (const struct dataset *ds, struct ccase *c);
 static bool close_active_file (struct dataset *ds);
 static void lag_case (struct dataset *ds, const struct ccase *c);
 static void clear_case (const struct dataset *ds, struct ccase *c);
 static bool close_active_file (struct dataset *ds);
@@ -106,7 +108,7 @@ static bool close_active_file (struct dataset *ds);
 
 /* Returns the last time the data was read. */
 time_t
 
 /* Returns the last time the data was read. */
 time_t
-time_of_last_procedure (struct dataset *ds) 
+time_of_last_procedure (struct dataset *ds)
 {
   if (ds->last_proc_invocation == 0)
     update_last_proc_invocation (ds);
 {
   if (ds->last_proc_invocation == 0)
     update_last_proc_invocation (ds);
@@ -125,32 +127,49 @@ time_of_last_procedure (struct dataset *ds)
       start the next case from step 1.
 
    2. Write case to replacement active file.
       start the next case from step 1.
 
    2. Write case to replacement active file.
-   
+
    3. Execute temporary transformations.  If these drop the case,
       start the next case from step 1.
    3. Execute temporary transformations.  If these drop the case,
       start the next case from step 1.
-      
+
    4. Pass case to PROC_FUNC, passing AUX as auxiliary data.
 
    Returns true if successful, false if an I/O error occurred. */
 bool
 procedure (struct dataset *ds, case_func *cf, void *aux)
 {
    4. Pass case to PROC_FUNC, passing AUX as auxiliary data.
 
    Returns true if successful, false if an I/O error occurred. */
 bool
 procedure (struct dataset *ds, case_func *cf, void *aux)
 {
+  update_last_proc_invocation (ds);
+
+  /* Optimize the trivial case where we're not going to do
+     anything with the data, by not reading the data at all. */
+  if (cf == NULL
+      && case_source_is_class (ds->proc_source, &storage_source_class)
+      && ds->proc_sink == NULL
+      && (ds->temporary_trns_chain == NULL
+          || trns_chain_is_empty (ds->temporary_trns_chain))
+      && trns_chain_is_empty (ds->permanent_trns_chain))
+    {
+      ds->n_lag = 0;
+      dict_set_case_limit (ds->dict, 0);
+      dict_clear_vectors (ds->dict);
+      return true;
+    }
+
   return internal_procedure (ds, cf, NULL, aux);
 }
 \f
 /* Multipass procedure. */
 
   return internal_procedure (ds, cf, NULL, aux);
 }
 \f
 /* Multipass procedure. */
 
-struct multipass_aux_data 
+struct multipass_aux_data
   {
     struct casefile *casefile;
   {
     struct casefile *casefile;
-    
+
     bool (*proc_func) (const struct casefile *, void *aux);
     void *aux;
   };
 
 /* Case processing function for multipass_procedure(). */
 static bool
     bool (*proc_func) (const struct casefile *, void *aux);
     void *aux;
   };
 
 /* Case processing function for multipass_procedure(). */
 static bool
-multipass_case_func (const struct ccase *c, void *aux_data_, const struct dataset *ds UNUSED) 
+multipass_case_func (const struct ccase *c, void *aux_data_, const struct dataset *ds UNUSED)
 {
   struct multipass_aux_data *aux_data = aux_data_;
   return casefile_append (aux_data->casefile, c);
 {
   struct multipass_aux_data *aux_data = aux_data_;
   return casefile_append (aux_data->casefile, c);
@@ -158,7 +177,7 @@ multipass_case_func (const struct ccase *c, void *aux_data_, const struct datase
 
 /* End-of-file function for multipass_procedure(). */
 static bool
 
 /* End-of-file function for multipass_procedure(). */
 static bool
-multipass_end_func (void *aux_data_, const struct dataset *ds UNUSED) 
+multipass_end_func (void *aux_data_, const struct dataset *ds UNUSED)
 {
   struct multipass_aux_data *aux_data = aux_data_;
   return (aux_data->proc_func == NULL
 {
   struct multipass_aux_data *aux_data = aux_data_;
   return (aux_data->proc_func == NULL
@@ -169,12 +188,15 @@ multipass_end_func (void *aux_data_, const struct dataset *ds UNUSED)
    The entire active file is passed to PROC_FUNC, with the given
    AUX as auxiliary data, as a unit. */
 bool
    The entire active file is passed to PROC_FUNC, with the given
    AUX as auxiliary data, as a unit. */
 bool
-multipass_procedure (struct dataset *ds, casefile_func *proc_func,  void *aux) 
+multipass_procedure (struct dataset *ds, casefile_func *proc_func,  void *aux)
 {
   struct multipass_aux_data aux_data;
   bool ok;
 
 {
   struct multipass_aux_data aux_data;
   bool ok;
 
-  aux_data.casefile = fastfile_create (dict_get_next_value_idx (ds->dict));
+  aux_data.casefile =
+    ds->cf_factory->create_casefile (ds->cf_factory,
+                                    dict_get_next_value_idx (ds->dict));
+
   aux_data.proc_func = proc_func;
   aux_data.aux = aux;
 
   aux_data.proc_func = proc_func;
   aux_data.aux = aux;
 
@@ -186,8 +208,8 @@ multipass_procedure (struct dataset *ds, casefile_func *proc_func,  void *aux)
   return ok;
 }
 \f
   return ok;
 }
 \f
-/* Procedure implementation. */
 
 
+/* Procedure implementation. */
 
 /* Executes a procedure.
    Passes each case to CASE_FUNC.
 
 /* Executes a procedure.
    Passes each case to CASE_FUNC.
@@ -197,57 +219,146 @@ multipass_procedure (struct dataset *ds, casefile_func *proc_func,  void *aux)
 static bool
 internal_procedure (struct dataset *ds, case_func *proc,
                    end_func *end,
 static bool
 internal_procedure (struct dataset *ds, case_func *proc,
                    end_func *end,
-                    void *aux) 
+                    void *aux)
 {
 {
-  struct write_case_data wc_data;
+  struct ccase *c;
   bool ok = true;
 
   bool ok = true;
 
+  proc_open (ds);
+  while (ok && proc_read (ds, &c))
+    if (proc != NULL)
+      ok = proc (c, aux, ds) && ok;
+  if (end != NULL)
+    ok = end (aux, ds) && ok;
+
+  if ( proc_close (ds) && ok )
+    {
+
+      return true;
+    }
+
+  return false;
+}
+
+/* Opens dataset DS for reading cases with proc_read.
+   proc_close must be called when done. */
+void
+proc_open (struct dataset *ds)
+{
   assert (ds->proc_source != NULL);
   assert (ds->proc_source != NULL);
+  assert (!ds->is_open);
 
   update_last_proc_invocation (ds);
 
 
   update_last_proc_invocation (ds);
 
-  /* Optimize the trivial case where we're not going to do
-     anything with the data, by not reading the data at all. */
-  if (proc == NULL && end == NULL
-      && case_source_is_class (ds->proc_source, &storage_source_class)
-      && ds->proc_sink == NULL
-      && (ds->temporary_trns_chain == NULL
-          || trns_chain_is_empty (ds->temporary_trns_chain))
-      && trns_chain_is_empty (ds->permanent_trns_chain))
+  open_active_file (ds);
+
+  ds->is_open = true;
+  create_trns_case (&ds->trns_case, ds->dict);
+  case_create (&ds->sink_case, dict_get_compacted_value_cnt (ds->dict));
+  ds->cases_written = 0;
+  ds->ok = true;
+}
+
+/* Reads the next case from dataset DS, which must have been
+   opened for reading with proc_open.
+   Returns true if successful, in which case a pointer to the
+   case is stored in *C.
+   Return false at end of file or if a read error occurs.  In
+   this case a null pointer is stored in *C. */
+bool
+proc_read (struct dataset *ds, struct ccase **c)
+{
+  enum trns_result retval = TRNS_DROP_CASE;
+
+  assert (ds->is_open);
+  *c = NULL;
+  for (;;)
     {
     {
-      ds->n_lag = 0;
-      dict_set_case_limit (ds->dict, 0);
-      dict_clear_vectors (ds->dict);
+      size_t case_nr;
+
+      assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
+      if (retval == TRNS_ERROR)
+        ds->ok = false;
+      if (!ds->ok)
+        return false;
+
+      /* Read a case from proc_source. */
+      clear_case (ds, &ds->trns_case);
+      if (!ds->proc_source->class->read (ds->proc_source, &ds->trns_case))
+        return false;
+
+      /* Execute permanent transformations.  */
+      case_nr = ds->cases_written + 1;
+      retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
+                                   &ds->trns_case, &case_nr);
+      if (retval != TRNS_CONTINUE)
+        continue;
+
+      /* Write case to LAG queue. */
+      if (ds->n_lag)
+        lag_case (ds, &ds->trns_case);
+
+      /* Write case to replacement active file. */
+      ds->cases_written++;
+      if (ds->proc_sink->class->write != NULL)
+        {
+          if (ds->compactor != NULL)
+            {
+              dict_compactor_compact (ds->compactor, &ds->sink_case,
+                                      &ds->trns_case);
+              ds->proc_sink->class->write (ds->proc_sink, &ds->sink_case);
+            }
+          else
+            ds->proc_sink->class->write (ds->proc_sink, &ds->trns_case);
+        }
+
+      /* Execute temporary transformations. */
+      if (ds->temporary_trns_chain != NULL)
+        {
+          retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
+                                       &ds->trns_case, &ds->cases_written);
+          if (retval != TRNS_CONTINUE)
+            continue;
+        }
+
+      *c = &ds->trns_case;
       return true;
     }
       return true;
     }
-  
-  open_active_file (ds);
-  
-  wc_data.proc = proc;
-  wc_data.aux = aux;
-  wc_data.dataset = ds;
-  create_trns_case (&wc_data.trns_case, ds->dict);
-  case_create (&wc_data.sink_case,
-               dict_get_compacted_value_cnt (ds->dict));
-  wc_data.cases_written = 0;
-
-  ok = ds->proc_source->class->read (ds->proc_source,
-                                 &wc_data.trns_case,
-                                 write_case, &wc_data) && ok;
-  if (end != NULL)
-    ok = end (aux, ds) && ok;
+}
 
 
-  case_destroy (&wc_data.sink_case);
-  case_destroy (&wc_data.trns_case);
+/* Closes dataset DS for reading.
+   Returns true if successful, false if an I/O error occurred
+   while reading or closing the data set.
+   If DS has not been opened, returns true without doing
+   anything else. */
+bool
+proc_close (struct dataset *ds)
+{
+  if (!ds->is_open)
+    return true;
 
 
-  ok = close_active_file (ds) && ok;
+  /* Drain any remaining cases. */
+  while (ds->ok)
+    {
+      struct ccase *c;
+      if (!proc_read (ds, &c))
+        break;
+    }
+  ds->ok = free_case_source (ds->proc_source) && ds->ok;
+  proc_set_source (ds, NULL);
 
 
-  return ok;
+  case_destroy (&ds->sink_case);
+  case_destroy (&ds->trns_case);
+
+  ds->ok = close_active_file (ds) && ds->ok;
+  ds->is_open = false;
+
+  return ds->ok;
 }
 
 /* Updates last_proc_invocation. */
 static void
 }
 
 /* Updates last_proc_invocation. */
 static void
-update_last_proc_invocation (struct dataset *ds) 
+update_last_proc_invocation (struct dataset *ds)
 {
   ds->last_proc_invocation = time (NULL);
 }
 {
   ds->last_proc_invocation = time (NULL);
 }
@@ -262,15 +373,15 @@ create_trns_case (struct ccase *trns_case, struct dictionary *dict)
   size_t i;
 
   case_create (trns_case, dict_get_next_value_idx (dict));
   size_t i;
 
   case_create (trns_case, dict_get_next_value_idx (dict));
-  for (i = 0; i < var_cnt; i++) 
+  for (i = 0; i < var_cnt; i++)
     {
       struct variable *v = dict_get_var (dict, i);
     {
       struct variable *v = dict_get_var (dict, i);
-      union value *value = case_data_rw (trns_case, v->fv);
+      union value *value = case_data_rw (trns_case, v);
 
 
-      if (v->type == NUMERIC)
-        value->f = v->leave ? 0.0 : SYSMIS;
+      if (var_is_numeric (v))
+        value->f = var_get_leave (v) ? 0.0 : SYSMIS;
       else
       else
-        memset (value->s, ' ', v->width);
+        memset (value->s, ' ', var_get_width (v));
     }
 }
 
     }
 }
 
@@ -291,14 +402,17 @@ open_active_file (struct dataset *ds)
     ds->permanent_dict = ds->dict;
 
   /* Figure out whether to compact. */
     ds->permanent_dict = ds->dict;
 
   /* Figure out whether to compact. */
-  ds->compactor = 
+  ds->compactor =
     (dict_compacting_would_shrink (ds->permanent_dict)
      ? dict_make_compactor (ds->permanent_dict)
      : NULL);
 
   /* Prepare sink. */
   if (ds->proc_sink == NULL)
     (dict_compacting_would_shrink (ds->permanent_dict)
      ? dict_make_compactor (ds->permanent_dict)
      : NULL);
 
   /* Prepare sink. */
   if (ds->proc_sink == NULL)
-    ds->proc_sink = create_case_sink (&storage_sink_class, ds->permanent_dict, NULL);
+    ds->proc_sink = create_case_sink (&storage_sink_class,
+                                     ds->permanent_dict,
+                                     ds->cf_factory,
+                                     NULL);
   if (ds->proc_sink->class->open != NULL)
     ds->proc_sink->class->open (ds->proc_sink);
 
   if (ds->proc_sink->class->open != NULL)
     ds->proc_sink->class->open (ds->proc_sink);
 
@@ -306,7 +420,7 @@ open_active_file (struct dataset *ds)
   if (ds->n_lag > 0)
     {
       int i;
   if (ds->n_lag > 0)
     {
       int i;
-  
+
       ds->lag_count = 0;
       ds->lag_head = 0;
       ds->lag_queue = xnmalloc (ds->n_lag, sizeof *ds->lag_queue);
       ds->lag_count = 0;
       ds->lag_head = 0;
       ds->lag_queue = xnmalloc (ds->n_lag, sizeof *ds->lag_queue);
@@ -315,63 +429,6 @@ open_active_file (struct dataset *ds)
     }
 }
 
     }
 }
 
-/* Transforms trns_case and writes it to the replacement active
-   file if advisable.  Returns true if more cases can be
-   accepted, false otherwise.  Do not call this function again
-   after it has returned false once.  */
-static bool
-write_case (struct write_case_data *wc_data)
-{
-  enum trns_result retval;
-  size_t case_nr;
-
-  struct dataset *ds = wc_data->dataset;
-  
-  /* Execute permanent transformations.  */
-  case_nr = wc_data->cases_written + 1;
-  retval = trns_chain_execute (ds->permanent_trns_chain,
-                               &wc_data->trns_case, &case_nr);
-  if (retval != TRNS_CONTINUE)
-    goto done;
-
-  /* Write case to LAG queue. */
-  if (ds->n_lag)
-    lag_case (ds, &wc_data->trns_case);
-
-  /* Write case to replacement active file. */
-  wc_data->cases_written++;
-  if (ds->proc_sink->class->write != NULL) 
-    {
-      if (ds->compactor != NULL) 
-        {
-          dict_compactor_compact (ds->compactor, &wc_data->sink_case,
-                                  &wc_data->trns_case);
-          ds->proc_sink->class->write (ds->proc_sink, &wc_data->sink_case);
-        }
-      else
-        ds->proc_sink->class->write (ds->proc_sink, &wc_data->trns_case);
-    }
-  
-  /* Execute temporary transformations. */
-  if (ds->temporary_trns_chain != NULL) 
-    {
-      retval = trns_chain_execute (ds->temporary_trns_chain,
-                                   &wc_data->trns_case,
-                                   &wc_data->cases_written);
-      if (retval != TRNS_CONTINUE)
-        goto done;
-    }
-
-  /* Pass case to procedure. */
-  if (wc_data->proc != NULL)
-    if (!wc_data->proc (&wc_data->trns_case, wc_data->aux, ds))
-      retval = TRNS_ERROR;
-
- done:
-  clear_case (ds, &wc_data->trns_case);
-  return retval != TRNS_ERROR;
-}
-
 /* Add C to the lag queue. */
 static void
 lag_case (struct dataset *ds, const struct ccase *c)
 /* Add C to the lag queue. */
 static void
 lag_case (struct dataset *ds, const struct ccase *c)
@@ -391,17 +448,17 @@ clear_case (const struct dataset *ds, struct ccase *c)
 {
   size_t var_cnt = dict_get_var_cnt (ds->dict);
   size_t i;
 {
   size_t var_cnt = dict_get_var_cnt (ds->dict);
   size_t i;
-  
-  for (i = 0; i < var_cnt; i++) 
+
+  for (i = 0; i < var_cnt; i++)
     {
       struct variable *v = dict_get_var (ds->dict, i);
     {
       struct variable *v = dict_get_var (ds->dict, i);
-      if (!v->leave) 
+      if (!var_get_leave (v))
         {
         {
-          if (v->type == NUMERIC)
-            case_data_rw (c, v->fv)->f = SYSMIS;
+          if (var_is_numeric (v))
+            case_data_rw (c, v)->f = SYSMIS;
           else
           else
-            memset (case_data_rw (c, v->fv)->s, ' ', v->width);
-        } 
+            memset (case_data_rw (c, v)->s, ' ', var_get_width (v));
+        }
     }
 }
 
     }
 }
 
@@ -413,31 +470,27 @@ close_active_file (struct dataset *ds)
   if (ds->n_lag > 0)
     {
       int i;
   if (ds->n_lag > 0)
     {
       int i;
-      
+
       for (i = 0; i < ds->n_lag; i++)
        case_destroy (&ds->lag_queue[i]);
       free (ds->lag_queue);
       ds->n_lag = 0;
     }
       for (i = 0; i < ds->n_lag; i++)
        case_destroy (&ds->lag_queue[i]);
       free (ds->lag_queue);
       ds->n_lag = 0;
     }
-  
+
   /* Dictionary from before TEMPORARY becomes permanent. */
   proc_cancel_temporary_transformations (ds);
 
   /* Finish compacting. */
   /* Dictionary from before TEMPORARY becomes permanent. */
   proc_cancel_temporary_transformations (ds);
 
   /* Finish compacting. */
-  if (ds->compactor != NULL) 
+  if (ds->compactor != NULL)
     {
       dict_compactor_destroy (ds->compactor);
       dict_compact_values (ds->dict);
       ds->compactor = NULL;
     }
     {
       dict_compactor_destroy (ds->compactor);
       dict_compact_values (ds->dict);
       ds->compactor = NULL;
     }
-    
-  /* Free data source. */
-  free_case_source (ds->proc_source);
-  ds->proc_source = NULL;
 
   /* Old data sink becomes new data source. */
   if (ds->proc_sink->class->make_source != NULL)
 
   /* Old data sink becomes new data source. */
   if (ds->proc_sink->class->make_source != NULL)
-    ds->proc_source = ds->proc_sink->class->make_source (ds->proc_sink);
+    proc_set_source (ds, ds->proc_sink->class->make_source (ds->proc_sink) );
   free_case_sink (ds->proc_sink);
   ds->proc_sink = NULL;
 
   free_case_sink (ds->proc_sink);
   ds->proc_sink = NULL;
 
@@ -468,13 +521,13 @@ lagged_case (const struct dataset *ds, int n_before)
 /* Procedure that separates the data into SPLIT FILE groups. */
 
 /* Represents auxiliary data for handling SPLIT FILE. */
 /* Procedure that separates the data into SPLIT FILE groups. */
 
 /* Represents auxiliary data for handling SPLIT FILE. */
-struct split_aux_data 
+struct split_aux_data
   {
     struct dataset *dataset;    /* The dataset */
     struct ccase prev_case;     /* Data in previous case. */
 
     /* Callback functions. */
   {
     struct dataset *dataset;    /* The dataset */
     struct ccase prev_case;     /* Data in previous case. */
 
     /* Callback functions. */
-    begin_func *begin; 
+    begin_func *begin;
     case_func *proc;
     end_func *end;
     void *func_aux;
     case_func *proc;
     end_func *end;
     void *func_aux;
@@ -494,19 +547,19 @@ static bool split_procedure_end_func (void *, const struct dataset *);
    passed to each of the functions as auxiliary data.
 
    If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
    passed to each of the functions as auxiliary data.
 
    If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
-   and END_FUNC will be called at all. 
+   and END_FUNC will be called at all.
 
    If SPLIT FILE is not in effect, then there is one break group
    (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
    will be called once.
 
    If SPLIT FILE is not in effect, then there is one break group
    (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
    will be called once.
-   
+
    Returns true if successful, false if an I/O error occurred. */
 bool
 procedure_with_splits (struct dataset *ds,
    Returns true if successful, false if an I/O error occurred. */
 bool
 procedure_with_splits (struct dataset *ds,
-                      begin_func begin, 
+                      begin_func begin,
                       case_func *proc,
                        end_func *end,
                       case_func *proc,
                        end_func *end,
-                       void *func_aux) 
+                       void *func_aux)
 {
   struct split_aux_data split_aux;
   bool ok;
 {
   struct split_aux_data split_aux;
   bool ok;
@@ -528,7 +581,7 @@ procedure_with_splits (struct dataset *ds,
 
 /* Case callback used by procedure_with_splits(). */
 static bool
 
 /* Case callback used by procedure_with_splits(). */
 static bool
-split_procedure_case_func (const struct ccase *c, void *split_aux_, const struct dataset *ds) 
+split_procedure_case_func (const struct ccase *c, void *split_aux_, const struct dataset *ds)
 {
   struct split_aux_data *split_aux = split_aux_;
 
 {
   struct split_aux_data *split_aux = split_aux_;
 
@@ -552,7 +605,7 @@ split_procedure_case_func (const struct ccase *c, void *split_aux_, const struct
 
 /* End-of-file callback used by procedure_with_splits(). */
 static bool
 
 /* End-of-file callback used by procedure_with_splits(). */
 static bool
-split_procedure_end_func (void *split_aux_, const struct dataset *ds) 
+split_procedure_end_func (void *split_aux_, const struct dataset *ds)
 {
   struct split_aux_data *split_aux = split_aux_;
 
 {
   struct split_aux_data *split_aux = split_aux_;
 
@@ -564,8 +617,8 @@ split_procedure_end_func (void *split_aux_, const struct dataset *ds)
 /* Compares the SPLIT FILE variables in cases A and B and returns
    nonzero only if they differ. */
 static int
 /* Compares the SPLIT FILE variables in cases A and B and returns
    nonzero only if they differ. */
 static int
-equal_splits (const struct ccase *a, const struct ccase *b, 
-             const struct dataset *ds) 
+equal_splits (const struct ccase *a, const struct ccase *b,
+             const struct dataset *ds)
 {
   return case_compare (a, b,
                        dict_get_split_vars (ds->dict),
 {
   return case_compare (a, b,
                        dict_get_split_vars (ds->dict),
@@ -577,14 +630,14 @@ equal_splits (const struct ccase *a, const struct ccase *b,
 
 /* Represents auxiliary data for handling SPLIT FILE in a
    multipass procedure. */
 
 /* Represents auxiliary data for handling SPLIT FILE in a
    multipass procedure. */
-struct multipass_split_aux_data 
+struct multipass_split_aux_data
   {
     struct dataset *dataset;    /* The dataset of the split */
     struct ccase prev_case;     /* Data in previous case. */
     struct casefile *casefile;  /* Accumulates data for a split. */
   {
     struct dataset *dataset;    /* The dataset of the split */
     struct ccase prev_case;     /* Data in previous case. */
     struct casefile *casefile;  /* Accumulates data for a split. */
-    split_func *split;          /* Function to call with the accumulated 
+    split_func *split;          /* Function to call with the accumulated
                                   data. */
                                   data. */
-    void *func_aux;             /* Auxiliary data. */ 
+    void *func_aux;             /* Auxiliary data. */
   };
 
 static bool multipass_split_case_func (const struct ccase *c, void *aux_, const struct dataset *);
   };
 
 static bool multipass_split_case_func (const struct ccase *c, void *aux_, const struct dataset *);
@@ -593,7 +646,7 @@ static bool multipass_split_output (struct multipass_split_aux_data *, const str
 
 /* Returns true if successful, false if an I/O error occurred. */
 bool
 
 /* Returns true if successful, false if an I/O error occurred. */
 bool
-multipass_procedure_with_splits (struct dataset *ds, 
+multipass_procedure_with_splits (struct dataset *ds,
                                 split_func  *split,
                                  void *func_aux)
 {
                                 split_func  *split,
                                  void *func_aux)
 {
@@ -632,8 +685,9 @@ multipass_split_case_func (const struct ccase *c, void *aux_, const struct datas
         ok = multipass_split_output (aux, ds);
 
       /* Start a new casefile. */
         ok = multipass_split_output (aux, ds);
 
       /* Start a new casefile. */
-      aux->casefile = 
-       fastfile_create (dict_get_next_value_idx (ds->dict));
+      aux->casefile =
+       ds->cf_factory->create_casefile (ds->cf_factory,
+                                        dict_get_next_value_idx (ds->dict));
     }
 
   return casefile_append (aux->casefile, c) && ok;
     }
 
   return casefile_append (aux->casefile, c) && ok;
@@ -651,7 +705,7 @@ static bool
 multipass_split_output (struct multipass_split_aux_data *aux, const struct dataset *ds)
 {
   bool ok;
 multipass_split_output (struct multipass_split_aux_data *aux, const struct dataset *ds)
 {
   bool ok;
-  
+
   assert (aux->casefile != NULL);
   ok = aux->split (&aux->prev_case, aux->casefile, aux->func_aux, ds);
   casefile_destroy (aux->casefile);
   assert (aux->casefile != NULL);
   ok = aux->split (&aux->prev_case, aux->casefile, aux->func_aux, ds);
   casefile_destroy (aux->casefile);
@@ -669,9 +723,9 @@ discard_variables (struct dataset *ds)
   fh_set_default_handle (NULL);
 
   ds->n_lag = 0;
   fh_set_default_handle (NULL);
 
   ds->n_lag = 0;
-  
+
   free_case_source (ds->proc_source);
   free_case_source (ds->proc_source);
-  ds->proc_source = NULL;
+  proc_set_source (ds, NULL);
 
   proc_cancel_all_transformations (ds);
 }
 
   proc_cancel_all_transformations (ds);
 }
@@ -680,10 +734,10 @@ discard_variables (struct dataset *ds)
    and clears the permanent transformations.
    For use by INPUT PROGRAM. */
 struct trns_chain *
    and clears the permanent transformations.
    For use by INPUT PROGRAM. */
 struct trns_chain *
-proc_capture_transformations (struct dataset *ds) 
+proc_capture_transformations (struct dataset *ds)
 {
   struct trns_chain *chain;
 {
   struct trns_chain *chain;
-  
+
   assert (ds->temporary_trns_chain == NULL);
   chain = ds->permanent_trns_chain;
   ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
   assert (ds->temporary_trns_chain == NULL);
   chain = ds->permanent_trns_chain;
   ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
@@ -705,7 +759,7 @@ add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *fr
    FINALIZE will be called.
    The functions are passed AUX as auxiliary data. */
 void
    FINALIZE will be called.
    The functions are passed AUX as auxiliary data. */
 void
-add_transformation_with_finalizer (struct dataset *ds, 
+add_transformation_with_finalizer (struct dataset *ds,
                                   trns_finalize_func *finalize,
                                    trns_proc_func *proc,
                                    trns_free_func *free, void *aux)
                                   trns_finalize_func *finalize,
                                    trns_proc_func *proc,
                                    trns_free_func *free, void *aux)
@@ -717,7 +771,7 @@ add_transformation_with_finalizer (struct dataset *ds,
    This value can be returned by a transformation procedure
    function to indicate a "jump" to that transformation. */
 size_t
    This value can be returned by a transformation procedure
    function to indicate a "jump" to that transformation. */
 size_t
-next_transformation (const struct dataset *ds) 
+next_transformation (const struct dataset *ds)
 {
   return trns_chain_next (ds->cur_trns_chain);
 }
 {
   return trns_chain_next (ds->cur_trns_chain);
 }
@@ -726,7 +780,7 @@ next_transformation (const struct dataset *ds)
    a temporary transformation, false if it will add a permanent
    transformation. */
 bool
    a temporary transformation, false if it will add a permanent
    transformation. */
 bool
-proc_in_temporary_transformations (const struct dataset *ds) 
+proc_in_temporary_transformations (const struct dataset *ds)
 {
   return ds->temporary_trns_chain != NULL;
 }
 {
   return ds->temporary_trns_chain != NULL;
 }
@@ -735,13 +789,14 @@ proc_in_temporary_transformations (const struct dataset *ds)
    Further calls to add_transformation() will add temporary
    transformations. */
 void
    Further calls to add_transformation() will add temporary
    transformations. */
 void
-proc_start_temporary_transformations (struct dataset *ds) 
+proc_start_temporary_transformations (struct dataset *ds)
 {
   if (!proc_in_temporary_transformations (ds))
     {
       add_case_limit_trns (ds);
 
       ds->permanent_dict = dict_clone (ds->dict);
 {
   if (!proc_in_temporary_transformations (ds))
     {
       add_case_limit_trns (ds);
 
       ds->permanent_dict = dict_clone (ds->dict);
+
       trns_chain_finalize (ds->permanent_trns_chain);
       ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
     }
       trns_chain_finalize (ds->permanent_trns_chain);
       ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
     }
@@ -752,9 +807,9 @@ proc_start_temporary_transformations (struct dataset *ds)
    permanent.
    Returns true if anything changed, false otherwise. */
 bool
    permanent.
    Returns true if anything changed, false otherwise. */
 bool
-proc_make_temporary_transformations_permanent (struct dataset *ds) 
+proc_make_temporary_transformations_permanent (struct dataset *ds)
 {
 {
-  if (proc_in_temporary_transformations (ds)) 
+  if (proc_in_temporary_transformations (ds))
     {
       trns_chain_finalize (ds->temporary_trns_chain);
       trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
     {
       trns_chain_finalize (ds->temporary_trns_chain);
       trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
@@ -773,12 +828,11 @@ proc_make_temporary_transformations_permanent (struct dataset *ds)
    transformations will be permanent.
    Returns true if anything changed, false otherwise. */
 bool
    transformations will be permanent.
    Returns true if anything changed, false otherwise. */
 bool
-proc_cancel_temporary_transformations (struct dataset *ds) 
+proc_cancel_temporary_transformations (struct dataset *ds)
 {
 {
-  if (proc_in_temporary_transformations (ds)) 
+  if (proc_in_temporary_transformations (ds))
     {
     {
-      dict_destroy (ds->dict);
-      ds->dict = ds->permanent_dict;
+      dataset_set_dict (ds, ds->permanent_dict);
       ds->permanent_dict = NULL;
 
       trns_chain_destroy (ds->temporary_trns_chain);
       ds->permanent_dict = NULL;
 
       trns_chain_destroy (ds->temporary_trns_chain);
@@ -805,10 +859,16 @@ proc_cancel_all_transformations (struct dataset *ds)
 \f
 /* Initializes procedure handling. */
 struct dataset *
 \f
 /* Initializes procedure handling. */
 struct dataset *
-create_dataset (void)
+create_dataset (struct casefile_factory *fact,
+               replace_source_callback *rps,
+               replace_dictionary_callback *rds
+               )
 {
   struct dataset *ds = xzalloc (sizeof(*ds));
   ds->dict = dict_create ();
 {
   struct dataset *ds = xzalloc (sizeof(*ds));
   ds->dict = dict_create ();
+  ds->cf_factory = fact;
+  ds->replace_source = rps;
+  ds->replace_dict = rds;
   proc_cancel_all_transformations (ds);
   return ds;
 }
   proc_cancel_all_transformations (ds);
   return ds;
 }
@@ -826,7 +886,7 @@ destroy_dataset (struct dataset *ds)
 /* Sets SINK as the destination for procedure output from the
    next procedure. */
 void
 /* Sets SINK as the destination for procedure output from the
    next procedure. */
 void
-proc_set_sink (struct dataset *ds, struct case_sink *sink) 
+proc_set_sink (struct dataset *ds, struct case_sink *sink)
 {
   assert (ds->proc_sink == NULL);
   ds->proc_sink = sink;
 {
   assert (ds->proc_sink == NULL);
   ds->proc_sink = sink;
@@ -835,16 +895,18 @@ proc_set_sink (struct dataset *ds, struct case_sink *sink)
 /* Sets SOURCE as the source for procedure input for the next
    procedure. */
 void
 /* Sets SOURCE as the source for procedure input for the next
    procedure. */
 void
-proc_set_source (struct dataset *ds, struct case_source *source) 
+proc_set_source (struct dataset *ds, struct case_source *source)
 {
 {
-  assert (ds->proc_source == NULL);
   ds->proc_source = source;
   ds->proc_source = source;
+
+  if ( ds->replace_source )
+    ds->replace_source (ds->proc_source);
 }
 
 /* Returns true if a source for the next procedure has been
    configured, false otherwise. */
 bool
 }
 
 /* Returns true if a source for the next procedure has been
    configured, false otherwise. */
 bool
-proc_has_source (const struct dataset *ds) 
+proc_has_source (const struct dataset *ds)
 {
   return ds->proc_source != NULL;
 }
 {
   return ds->proc_source != NULL;
 }
@@ -854,7 +916,7 @@ proc_has_source (const struct dataset *ds)
    The returned casefile is owned by the caller; it will not be
    automatically used for the next procedure's input. */
 struct casefile *
    The returned casefile is owned by the caller; it will not be
    automatically used for the next procedure's input. */
 struct casefile *
-proc_capture_output (struct dataset *ds) 
+proc_capture_output (struct dataset *ds)
 {
   struct casefile *casefile;
 
 {
   struct casefile *casefile;
 
@@ -866,7 +928,7 @@ proc_capture_output (struct dataset *ds)
   assert (!proc_in_temporary_transformations (ds));
 
   casefile = storage_source_decapsulate (ds->proc_source);
   assert (!proc_in_temporary_transformations (ds));
 
   casefile = storage_source_decapsulate (ds->proc_source);
-  ds->proc_source = NULL;
+  proc_set_source (ds, NULL);
 
   return casefile;
 }
 
   return casefile;
 }
@@ -877,7 +939,7 @@ static trns_free_func case_limit_trns_free;
 /* Adds a transformation that limits the number of cases that may
    pass through, if DS->DICT has a case limit. */
 static void
 /* Adds a transformation that limits the number of cases that may
    pass through, if DS->DICT has a case limit. */
 static void
-add_case_limit_trns (struct dataset *ds) 
+add_case_limit_trns (struct dataset *ds)
 {
   size_t case_limit = dict_get_case_limit (ds->dict);
   if (case_limit != 0)
 {
   size_t case_limit = dict_get_case_limit (ds->dict);
   if (case_limit != 0)
@@ -894,10 +956,10 @@ add_case_limit_trns (struct dataset *ds)
    *CASES_REMAINING. */
 static int
 case_limit_trns_proc (void *cases_remaining_,
    *CASES_REMAINING. */
 static int
 case_limit_trns_proc (void *cases_remaining_,
-                      struct ccase *c UNUSED, casenumber case_nr UNUSED) 
+                      struct ccase *c UNUSED, casenumber case_nr UNUSED)
 {
   size_t *cases_remaining = cases_remaining_;
 {
   size_t *cases_remaining = cases_remaining_;
-  if (*cases_remaining > 0) 
+  if (*cases_remaining > 0)
     {
       (*cases_remaining)--;
       return TRNS_CONTINUE;
     {
       (*cases_remaining)--;
       return TRNS_CONTINUE;
@@ -908,7 +970,7 @@ case_limit_trns_proc (void *cases_remaining_,
 
 /* Frees the data associated with a case limit transformation. */
 static bool
 
 /* Frees the data associated with a case limit transformation. */
 static bool
-case_limit_trns_free (void *cases_remaining_) 
+case_limit_trns_free (void *cases_remaining_)
 {
   size_t *cases_remaining = cases_remaining_;
   free (cases_remaining);
 {
   size_t *cases_remaining = cases_remaining_;
   free (cases_remaining);
@@ -920,10 +982,10 @@ static trns_proc_func filter_trns_proc;
 /* Adds a temporary transformation to filter data according to
    the variable specified on FILTER, if any. */
 static void
 /* Adds a temporary transformation to filter data according to
    the variable specified on FILTER, if any. */
 static void
-add_filter_trns (struct dataset *ds) 
+add_filter_trns (struct dataset *ds)
 {
   struct variable *filter_var = dict_get_filter (ds->dict);
 {
   struct variable *filter_var = dict_get_filter (ds->dict);
-  if (filter_var != NULL) 
+  if (filter_var != NULL)
     {
       proc_start_temporary_transformations (ds);
       add_transformation (ds, filter_trns_proc, NULL, filter_var);
     {
       proc_start_temporary_transformations (ds);
       add_transformation (ds, filter_trns_proc, NULL, filter_var);
@@ -933,12 +995,12 @@ add_filter_trns (struct dataset *ds)
 /* FILTER transformation. */
 static int
 filter_trns_proc (void *filter_var_,
 /* FILTER transformation. */
 static int
 filter_trns_proc (void *filter_var_,
-                  struct ccase *c UNUSED, casenumber case_nr UNUSED) 
-  
+                  struct ccase *c UNUSED, casenumber case_nr UNUSED)
+
 {
   struct variable *filter_var = filter_var_;
 {
   struct variable *filter_var = filter_var_;
-  double f = case_num (c, filter_var->fv);
-  return (f != 0.0 && !mv_is_num_missing (&filter_var->miss, f)
+  double f = case_num (c, filter_var);
+  return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
           ? TRNS_CONTINUE : TRNS_DROP_CASE);
 }
 
           ? TRNS_CONTINUE : TRNS_DROP_CASE);
 }
 
@@ -950,22 +1012,38 @@ dataset_dict (const struct dataset *ds)
 }
 
 
 }
 
 
-void 
+/* Set or replace dataset DS's dictionary with DICT.
+   The old dictionary is destroyed */
+void
 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
 {
 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
 {
+  struct dictionary *old_dict = ds->dict;
+
+  dict_copy_callbacks (dict, ds->dict);
   ds->dict = dict;
   ds->dict = dict;
+
+  if ( ds->replace_dict )
+    ds->replace_dict (dict);
+
+  dict_destroy (old_dict);
 }
 
 }
 
-int 
+int
 dataset_n_lag (const struct dataset *ds)
 {
   return ds->n_lag;
 }
 
 dataset_n_lag (const struct dataset *ds)
 {
   return ds->n_lag;
 }
 
-void 
+void
 dataset_set_n_lag (struct dataset *ds, int n_lag)
 {
   ds->n_lag = n_lag;
 }
 
 
 dataset_set_n_lag (struct dataset *ds, int n_lag)
 {
   ds->n_lag = n_lag;
 }
 
 
+struct casefile_factory *
+dataset_get_casefile_factory (const struct dataset *ds)
+{
+  return ds->cf_factory;
+}
+