Patch #5672
[pspp-builds.git] / src / data / procedure.c
index 5341de4652a068e5794319c98b4720b0eccd6763..b6f988aabbaf46b2707940aca2e646504ced5ffa 100644 (file)
@@ -1,6 +1,5 @@
 /* PSPP - computes sample statistics.
-   Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
-   Written by Ben Pfaff <blp@gnu.org>.
+   Copyright (C) 1997-9, 2000, 2006, 2007 Free Software Foundation, Inc.
 
    This program is free software; you can redistribute it and/or
    modify it under the terms of the GNU General Public License as
 #include <libpspp/misc.h>
 #include <libpspp/str.h>
 
-/* Procedure execution data. */
-struct write_case_data
-  {
-    /* Function to call for each case. */
-    case_func_t case_func;
-    void *aux;
+struct dataset {
 
-    struct dataset *dataset;    /* The dataset concerned */
-    struct ccase trns_case;     /* Case used for transformations. */
-    struct ccase sink_case;     /* Case written to sink, if
-                                   compacting is necessary. */
-    size_t cases_written;       /* Cases output so far. */
-  };
+  /* An abstract factory which creates casefiles */
+  struct casefile_factory *cf_factory;
+
+  /* Callback which occurs when a procedure provides a new source for
+     the dataset */
+  replace_source_callback *replace_source ;
+
+  /* Callback which occurs whenever the DICT is replaced by a new one */
+  replace_dictionary_callback *replace_dict;
 
-struct dataset {
   /* Cases are read from proc_source,
      pass through permanent_trns_chain (which transforms them into
      the format described by permanent_dict),
@@ -85,21 +81,25 @@ struct dataset {
   int lag_head;                /* Index where next case will be added. */
   struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
 
+  /* Procedure data. */
+  bool is_open;               /* Procedure open? */
+  struct ccase trns_case;     /* Case used for transformations. */
+  struct ccase sink_case;     /* Case written to sink, if
+                                 compacting is necessary. */
+  size_t cases_written;       /* Cases output so far. */
+  bool ok;
 }; /* struct dataset */
 
 
-struct dataset *current_dataset;
-
 static void add_case_limit_trns (struct dataset *ds);
 static void add_filter_trns (struct dataset *ds);
 
-static bool internal_procedure (struct dataset *ds, case_func_t,
-                                bool (*end_func) (void *),
+static bool internal_procedure (struct dataset *ds, case_func *,
+                                end_func *,
                                 void *aux);
 static void update_last_proc_invocation (struct dataset *ds);
 static void create_trns_case (struct ccase *, struct dictionary *);
 static void open_active_file (struct dataset *ds);
-static bool write_case (struct write_case_data *wc_data);
 static void lag_case (struct dataset *ds, const struct ccase *c);
 static void clear_case (const struct dataset *ds, struct ccase *c);
 static bool close_active_file (struct dataset *ds);
@@ -108,7 +108,7 @@ static bool close_active_file (struct dataset *ds);
 
 /* Returns the last time the data was read. */
 time_t
-time_of_last_procedure (struct dataset *ds) 
+time_of_last_procedure (struct dataset *ds)
 {
   if (ds->last_proc_invocation == 0)
     update_last_proc_invocation (ds);
@@ -127,32 +127,49 @@ time_of_last_procedure (struct dataset *ds)
       start the next case from step 1.
 
    2. Write case to replacement active file.
-   
+
    3. Execute temporary transformations.  If these drop the case,
       start the next case from step 1.
-      
+
    4. Pass case to PROC_FUNC, passing AUX as auxiliary data.
 
    Returns true if successful, false if an I/O error occurred. */
 bool
-procedure (struct dataset *ds, case_func_t cf, void *aux)
+procedure (struct dataset *ds, case_func *cf, void *aux)
 {
+  update_last_proc_invocation (ds);
+
+  /* Optimize the trivial case where we're not going to do
+     anything with the data, by not reading the data at all. */
+  if (cf == NULL
+      && case_source_is_class (ds->proc_source, &storage_source_class)
+      && ds->proc_sink == NULL
+      && (ds->temporary_trns_chain == NULL
+          || trns_chain_is_empty (ds->temporary_trns_chain))
+      && trns_chain_is_empty (ds->permanent_trns_chain))
+    {
+      ds->n_lag = 0;
+      dict_set_case_limit (ds->dict, 0);
+      dict_clear_vectors (ds->dict);
+      return true;
+    }
+
   return internal_procedure (ds, cf, NULL, aux);
 }
 \f
 /* Multipass procedure. */
 
-struct multipass_aux_data 
+struct multipass_aux_data
   {
     struct casefile *casefile;
-    
+
     bool (*proc_func) (const struct casefile *, void *aux);
     void *aux;
   };
 
 /* Case processing function for multipass_procedure(). */
 static bool
-multipass_case_func (const struct ccase *c, void *aux_data_
+multipass_case_func (const struct ccase *c, void *aux_data_, const struct dataset *ds UNUSED)
 {
   struct multipass_aux_data *aux_data = aux_data_;
   return casefile_append (aux_data->casefile, c);
@@ -160,7 +177,7 @@ multipass_case_func (const struct ccase *c, void *aux_data_)
 
 /* End-of-file function for multipass_procedure(). */
 static bool
-multipass_end_func (void *aux_data_
+multipass_end_func (void *aux_data_, const struct dataset *ds UNUSED)
 {
   struct multipass_aux_data *aux_data = aux_data_;
   return (aux_data->proc_func == NULL
@@ -171,12 +188,15 @@ multipass_end_func (void *aux_data_)
    The entire active file is passed to PROC_FUNC, with the given
    AUX as auxiliary data, as a unit. */
 bool
-multipass_procedure (struct dataset *ds, casefile_func_t proc_func,  void *aux) 
+multipass_procedure (struct dataset *ds, casefile_func *proc_func,  void *aux)
 {
   struct multipass_aux_data aux_data;
   bool ok;
 
-  aux_data.casefile = fastfile_create (dict_get_next_value_idx (ds->dict));
+  aux_data.casefile =
+    ds->cf_factory->create_casefile (ds->cf_factory,
+                                    dict_get_next_value_idx (ds->dict));
+
   aux_data.proc_func = proc_func;
   aux_data.aux = aux;
 
@@ -188,8 +208,8 @@ multipass_procedure (struct dataset *ds, casefile_func_t proc_func,  void *aux)
   return ok;
 }
 \f
-/* Procedure implementation. */
 
+/* Procedure implementation. */
 
 /* Executes a procedure.
    Passes each case to CASE_FUNC.
@@ -197,59 +217,148 @@ multipass_procedure (struct dataset *ds, casefile_func_t proc_func,  void *aux)
    Returns true if successful, false if an I/O error occurred (or
    if CASE_FUNC or END_FUNC ever returned false). */
 static bool
-internal_procedure (struct dataset *ds, case_func_t case_func,
-                    bool (*end_func) (void *),
-                    void *aux) 
+internal_procedure (struct dataset *ds, case_func *proc,
+                   end_func *end,
+                    void *aux)
 {
-  struct write_case_data wc_data;
+  struct ccase *c;
   bool ok = true;
 
+  proc_open (ds);
+  while (ok && proc_read (ds, &c))
+    if (proc != NULL)
+      ok = proc (c, aux, ds) && ok;
+  if (end != NULL)
+    ok = end (aux, ds) && ok;
+
+  if ( proc_close (ds) && ok )
+    {
+
+      return true;
+    }
+
+  return false;
+}
+
+/* Opens dataset DS for reading cases with proc_read.
+   proc_close must be called when done. */
+void
+proc_open (struct dataset *ds)
+{
   assert (ds->proc_source != NULL);
+  assert (!ds->is_open);
 
   update_last_proc_invocation (ds);
 
-  /* Optimize the trivial case where we're not going to do
-     anything with the data, by not reading the data at all. */
-  if (case_func == NULL && end_func == NULL
-      && case_source_is_class (ds->proc_source, &storage_source_class)
-      && ds->proc_sink == NULL
-      && (ds->temporary_trns_chain == NULL
-          || trns_chain_is_empty (ds->temporary_trns_chain))
-      && trns_chain_is_empty (ds->permanent_trns_chain))
+  open_active_file (ds);
+
+  ds->is_open = true;
+  create_trns_case (&ds->trns_case, ds->dict);
+  case_create (&ds->sink_case, dict_get_compacted_value_cnt (ds->dict));
+  ds->cases_written = 0;
+  ds->ok = true;
+}
+
+/* Reads the next case from dataset DS, which must have been
+   opened for reading with proc_open.
+   Returns true if successful, in which case a pointer to the
+   case is stored in *C.
+   Return false at end of file or if a read error occurs.  In
+   this case a null pointer is stored in *C. */
+bool
+proc_read (struct dataset *ds, struct ccase **c)
+{
+  enum trns_result retval = TRNS_DROP_CASE;
+
+  assert (ds->is_open);
+  *c = NULL;
+  for (;;)
     {
-      ds->n_lag = 0;
-      dict_set_case_limit (ds->dict, 0);
-      dict_clear_vectors (ds->dict);
+      size_t case_nr;
+
+      assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
+      if (retval == TRNS_ERROR)
+        ds->ok = false;
+      if (!ds->ok)
+        return false;
+
+      /* Read a case from proc_source. */
+      clear_case (ds, &ds->trns_case);
+      if (!ds->proc_source->class->read (ds->proc_source, &ds->trns_case))
+        return false;
+
+      /* Execute permanent transformations.  */
+      case_nr = ds->cases_written + 1;
+      retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
+                                   &ds->trns_case, &case_nr);
+      if (retval != TRNS_CONTINUE)
+        continue;
+
+      /* Write case to LAG queue. */
+      if (ds->n_lag)
+        lag_case (ds, &ds->trns_case);
+
+      /* Write case to replacement active file. */
+      ds->cases_written++;
+      if (ds->proc_sink->class->write != NULL)
+        {
+          if (ds->compactor != NULL)
+            {
+              dict_compactor_compact (ds->compactor, &ds->sink_case,
+                                      &ds->trns_case);
+              ds->proc_sink->class->write (ds->proc_sink, &ds->sink_case);
+            }
+          else
+            ds->proc_sink->class->write (ds->proc_sink, &ds->trns_case);
+        }
+
+      /* Execute temporary transformations. */
+      if (ds->temporary_trns_chain != NULL)
+        {
+          retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
+                                       &ds->trns_case, &ds->cases_written);
+          if (retval != TRNS_CONTINUE)
+            continue;
+        }
+
+      *c = &ds->trns_case;
       return true;
     }
-  
-  open_active_file (ds);
-  
-  wc_data.case_func = case_func;
-  wc_data.aux = aux;
-  wc_data.dataset = ds;
-  create_trns_case (&wc_data.trns_case, ds->dict);
-  case_create (&wc_data.sink_case,
-               dict_get_compacted_value_cnt (ds->dict));
-  wc_data.cases_written = 0;
-
-  ok = ds->proc_source->class->read (ds->proc_source,
-                                 &wc_data.trns_case,
-                                 write_case, &wc_data) && ok;
-  if (end_func != NULL)
-    ok = end_func (aux) && ok;
-
-  case_destroy (&wc_data.sink_case);
-  case_destroy (&wc_data.trns_case);
-
-  ok = close_active_file (ds) && ok;
+}
 
-  return ok;
+/* Closes dataset DS for reading.
+   Returns true if successful, false if an I/O error occurred
+   while reading or closing the data set.
+   If DS has not been opened, returns true without doing
+   anything else. */
+bool
+proc_close (struct dataset *ds)
+{
+  if (!ds->is_open)
+    return true;
+
+  /* Drain any remaining cases. */
+  while (ds->ok)
+    {
+      struct ccase *c;
+      if (!proc_read (ds, &c))
+        break;
+    }
+  ds->ok = free_case_source (ds->proc_source) && ds->ok;
+  proc_set_source (ds, NULL);
+
+  case_destroy (&ds->sink_case);
+  case_destroy (&ds->trns_case);
+
+  ds->ok = close_active_file (ds) && ds->ok;
+  ds->is_open = false;
+
+  return ds->ok;
 }
 
 /* Updates last_proc_invocation. */
 static void
-update_last_proc_invocation (struct dataset *ds) 
+update_last_proc_invocation (struct dataset *ds)
 {
   ds->last_proc_invocation = time (NULL);
 }
@@ -264,15 +373,15 @@ create_trns_case (struct ccase *trns_case, struct dictionary *dict)
   size_t i;
 
   case_create (trns_case, dict_get_next_value_idx (dict));
-  for (i = 0; i < var_cnt; i++) 
+  for (i = 0; i < var_cnt; i++)
     {
       struct variable *v = dict_get_var (dict, i);
-      union value *value = case_data_rw (trns_case, v->fv);
+      union value *value = case_data_rw (trns_case, v);
 
-      if (v->type == NUMERIC)
-        value->f = v->leave ? 0.0 : SYSMIS;
+      if (var_is_numeric (v))
+        value->f = var_get_leave (v) ? 0.0 : SYSMIS;
       else
-        memset (value->s, ' ', v->width);
+        memset (value->s, ' ', var_get_width (v));
     }
 }
 
@@ -293,14 +402,17 @@ open_active_file (struct dataset *ds)
     ds->permanent_dict = ds->dict;
 
   /* Figure out whether to compact. */
-  ds->compactor = 
+  ds->compactor =
     (dict_compacting_would_shrink (ds->permanent_dict)
      ? dict_make_compactor (ds->permanent_dict)
      : NULL);
 
   /* Prepare sink. */
   if (ds->proc_sink == NULL)
-    ds->proc_sink = create_case_sink (&storage_sink_class, ds->permanent_dict, NULL);
+    ds->proc_sink = create_case_sink (&storage_sink_class,
+                                     ds->permanent_dict,
+                                     ds->cf_factory,
+                                     NULL);
   if (ds->proc_sink->class->open != NULL)
     ds->proc_sink->class->open (ds->proc_sink);
 
@@ -308,7 +420,7 @@ open_active_file (struct dataset *ds)
   if (ds->n_lag > 0)
     {
       int i;
-  
+
       ds->lag_count = 0;
       ds->lag_head = 0;
       ds->lag_queue = xnmalloc (ds->n_lag, sizeof *ds->lag_queue);
@@ -317,63 +429,6 @@ open_active_file (struct dataset *ds)
     }
 }
 
-/* Transforms trns_case and writes it to the replacement active
-   file if advisable.  Returns true if more cases can be
-   accepted, false otherwise.  Do not call this function again
-   after it has returned false once.  */
-static bool
-write_case (struct write_case_data *wc_data)
-{
-  enum trns_result retval;
-  size_t case_nr;
-
-  struct dataset *ds = wc_data->dataset;
-  
-  /* Execute permanent transformations.  */
-  case_nr = wc_data->cases_written + 1;
-  retval = trns_chain_execute (ds->permanent_trns_chain,
-                               &wc_data->trns_case, &case_nr);
-  if (retval != TRNS_CONTINUE)
-    goto done;
-
-  /* Write case to LAG queue. */
-  if (ds->n_lag)
-    lag_case (ds, &wc_data->trns_case);
-
-  /* Write case to replacement active file. */
-  wc_data->cases_written++;
-  if (ds->proc_sink->class->write != NULL) 
-    {
-      if (ds->compactor != NULL) 
-        {
-          dict_compactor_compact (ds->compactor, &wc_data->sink_case,
-                                  &wc_data->trns_case);
-          ds->proc_sink->class->write (ds->proc_sink, &wc_data->sink_case);
-        }
-      else
-        ds->proc_sink->class->write (ds->proc_sink, &wc_data->trns_case);
-    }
-  
-  /* Execute temporary transformations. */
-  if (ds->temporary_trns_chain != NULL) 
-    {
-      retval = trns_chain_execute (ds->temporary_trns_chain,
-                                   &wc_data->trns_case,
-                                   &wc_data->cases_written);
-      if (retval != TRNS_CONTINUE)
-        goto done;
-    }
-
-  /* Pass case to procedure. */
-  if (wc_data->case_func != NULL)
-    if (!wc_data->case_func (&wc_data->trns_case, wc_data->aux))
-      retval = TRNS_ERROR;
-
- done:
-  clear_case (ds, &wc_data->trns_case);
-  return retval != TRNS_ERROR;
-}
-
 /* Add C to the lag queue. */
 static void
 lag_case (struct dataset *ds, const struct ccase *c)
@@ -393,17 +448,17 @@ clear_case (const struct dataset *ds, struct ccase *c)
 {
   size_t var_cnt = dict_get_var_cnt (ds->dict);
   size_t i;
-  
-  for (i = 0; i < var_cnt; i++) 
+
+  for (i = 0; i < var_cnt; i++)
     {
       struct variable *v = dict_get_var (ds->dict, i);
-      if (!v->leave) 
+      if (!var_get_leave (v))
         {
-          if (v->type == NUMERIC)
-            case_data_rw (c, v->fv)->f = SYSMIS;
+          if (var_is_numeric (v))
+            case_data_rw (c, v)->f = SYSMIS;
           else
-            memset (case_data_rw (c, v->fv)->s, ' ', v->width);
-        } 
+            memset (case_data_rw (c, v)->s, ' ', var_get_width (v));
+        }
     }
 }
 
@@ -415,31 +470,27 @@ close_active_file (struct dataset *ds)
   if (ds->n_lag > 0)
     {
       int i;
-      
+
       for (i = 0; i < ds->n_lag; i++)
        case_destroy (&ds->lag_queue[i]);
       free (ds->lag_queue);
       ds->n_lag = 0;
     }
-  
+
   /* Dictionary from before TEMPORARY becomes permanent. */
   proc_cancel_temporary_transformations (ds);
 
   /* Finish compacting. */
-  if (ds->compactor != NULL) 
+  if (ds->compactor != NULL)
     {
       dict_compactor_destroy (ds->compactor);
       dict_compact_values (ds->dict);
       ds->compactor = NULL;
     }
-    
-  /* Free data source. */
-  free_case_source (ds->proc_source);
-  ds->proc_source = NULL;
 
   /* Old data sink becomes new data source. */
   if (ds->proc_sink->class->make_source != NULL)
-    ds->proc_source = ds->proc_sink->class->make_source (ds->proc_sink);
+    proc_set_source (ds, ds->proc_sink->class->make_source (ds->proc_sink) );
   free_case_sink (ds->proc_sink);
   ds->proc_sink = NULL;
 
@@ -470,21 +521,21 @@ lagged_case (const struct dataset *ds, int n_before)
 /* Procedure that separates the data into SPLIT FILE groups. */
 
 /* Represents auxiliary data for handling SPLIT FILE. */
-struct split_aux_data 
+struct split_aux_data
   {
     struct dataset *dataset;    /* The dataset */
     struct ccase prev_case;     /* Data in previous case. */
 
     /* Callback functions. */
-    begin_func_t begin_func ; 
-    case_func_t proc_func ;
-    void (*end_func) (void *);
+    begin_func *begin;
+    case_func *proc;
+    end_func *end;
     void *func_aux;
   };
 
 static int equal_splits (const struct ccase *, const struct ccase *, const struct dataset *ds);
-static bool split_procedure_case_func (const struct ccase *c, void *);
-static bool split_procedure_end_func (void *);
+static bool split_procedure_case_func (const struct ccase *c, void *, const struct dataset *);
+static bool split_procedure_end_func (void *, const struct dataset *);
 
 /* Like procedure(), but it automatically breaks the case stream
    into SPLIT FILE break groups.  Before each group of cases with
@@ -496,27 +547,27 @@ static bool split_procedure_end_func (void *);
    passed to each of the functions as auxiliary data.
 
    If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
-   and END_FUNC will be called at all. 
+   and END_FUNC will be called at all.
 
    If SPLIT FILE is not in effect, then there is one break group
    (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
    will be called once.
-   
+
    Returns true if successful, false if an I/O error occurred. */
 bool
 procedure_with_splits (struct dataset *ds,
-                      begin_func_t begin_func, 
-                      case_func_t proc_func,
-                       void (*end_func) (void *aux),
-                       void *func_aux) 
+                      begin_func begin,
+                      case_func *proc,
+                       end_func *end,
+                       void *func_aux)
 {
   struct split_aux_data split_aux;
   bool ok;
 
   case_nullify (&split_aux.prev_case);
-  split_aux.begin_func = begin_func;
-  split_aux.proc_func = proc_func;
-  split_aux.end_func = end_func;
+  split_aux.begin = begin;
+  split_aux.proc = proc;
+  split_aux.end = end;
   split_aux.func_aux = func_aux;
   split_aux.dataset = ds;
 
@@ -530,7 +581,7 @@ procedure_with_splits (struct dataset *ds,
 
 /* Case callback used by procedure_with_splits(). */
 static bool
-split_procedure_case_func (const struct ccase *c, void *split_aux_
+split_procedure_case_func (const struct ccase *c, void *split_aux_, const struct dataset *ds)
 {
   struct split_aux_data *split_aux = split_aux_;
 
@@ -538,37 +589,36 @@ split_procedure_case_func (const struct ccase *c, void *split_aux_)
   if (case_is_null (&split_aux->prev_case)
       || !equal_splits (c, &split_aux->prev_case, split_aux->dataset))
     {
-      if (!case_is_null (&split_aux->prev_case) && split_aux->end_func != NULL)
-        split_aux->end_func (split_aux->func_aux);
+      if (!case_is_null (&split_aux->prev_case) && split_aux->end != NULL)
+        split_aux->end (split_aux->func_aux, ds);
 
       case_destroy (&split_aux->prev_case);
       case_clone (&split_aux->prev_case, c);
 
-      if (split_aux->begin_func != NULL)
-       split_aux->begin_func (&split_aux->prev_case, split_aux->func_aux);
-
+      if (split_aux->begin != NULL)
+       split_aux->begin (&split_aux->prev_case, split_aux->func_aux, ds);
     }
 
-  return (split_aux->proc_func == NULL
-          || split_aux->proc_func (c, split_aux->func_aux));
+  return (split_aux->proc == NULL
+          || split_aux->proc (c, split_aux->func_aux, ds));
 }
 
 /* End-of-file callback used by procedure_with_splits(). */
 static bool
-split_procedure_end_func (void *split_aux_
+split_procedure_end_func (void *split_aux_, const struct dataset *ds)
 {
   struct split_aux_data *split_aux = split_aux_;
 
-  if (!case_is_null (&split_aux->prev_case) && split_aux->end_func != NULL)
-    split_aux->end_func (split_aux->func_aux);
+  if (!case_is_null (&split_aux->prev_case) && split_aux->end != NULL)
+    split_aux->end (split_aux->func_aux, ds);
   return true;
 }
 
 /* Compares the SPLIT FILE variables in cases A and B and returns
    nonzero only if they differ. */
 static int
-equal_splits (const struct ccase *a, const struct ccase *b, 
-             const struct dataset *ds) 
+equal_splits (const struct ccase *a, const struct ccase *b,
+             const struct dataset *ds)
 {
   return case_compare (a, b,
                        dict_get_split_vars (ds->dict),
@@ -580,28 +630,24 @@ equal_splits (const struct ccase *a, const struct ccase *b,
 
 /* Represents auxiliary data for handling SPLIT FILE in a
    multipass procedure. */
-struct multipass_split_aux_data 
+struct multipass_split_aux_data
   {
     struct dataset *dataset;    /* The dataset of the split */
     struct ccase prev_case;     /* Data in previous case. */
     struct casefile *casefile;  /* Accumulates data for a split. */
-
-    /* Function to call with the accumulated data. */
-    bool (*split_func) (const struct ccase *first, const struct casefile *,
-                        void *);
-    void *func_aux;                            /* Auxiliary data. */ 
+    split_func *split;          /* Function to call with the accumulated
+                                  data. */
+    void *func_aux;             /* Auxiliary data. */
   };
 
-static bool multipass_split_case_func (const struct ccase *c, void *aux_);
-static bool multipass_split_end_func (void *aux_);
-static bool multipass_split_output (struct multipass_split_aux_data *);
+static bool multipass_split_case_func (const struct ccase *c, void *aux_, const struct dataset *);
+static bool multipass_split_end_func (void *aux_, const struct dataset *ds);
+static bool multipass_split_output (struct multipass_split_aux_data *, const struct dataset *ds);
 
 /* Returns true if successful, false if an I/O error occurred. */
 bool
-multipass_procedure_with_splits (struct dataset *ds, 
-                                bool (*split_func) (const struct ccase *first,
-                                                     const struct casefile *,
-                                                     void *aux),
+multipass_procedure_with_splits (struct dataset *ds,
+                                split_func  *split,
                                  void *func_aux)
 {
   struct multipass_split_aux_data aux;
@@ -609,7 +655,7 @@ multipass_procedure_with_splits (struct dataset *ds,
 
   case_nullify (&aux.prev_case);
   aux.casefile = NULL;
-  aux.split_func = split_func;
+  aux.split = split;
   aux.func_aux = func_aux;
   aux.dataset = ds;
 
@@ -622,10 +668,9 @@ multipass_procedure_with_splits (struct dataset *ds,
 
 /* Case callback used by multipass_procedure_with_splits(). */
 static bool
-multipass_split_case_func (const struct ccase *c, void *aux_)
+multipass_split_case_func (const struct ccase *c, void *aux_, const struct dataset *ds)
 {
   struct multipass_split_aux_data *aux = aux_;
-  struct dataset *ds = aux->dataset;
   bool ok = true;
 
   /* Start a new series if needed. */
@@ -637,11 +682,12 @@ multipass_split_case_func (const struct ccase *c, void *aux_)
 
       /* Pass any cases to split_func. */
       if (aux->casefile != NULL)
-        ok = multipass_split_output (aux);
+        ok = multipass_split_output (aux, ds);
 
       /* Start a new casefile. */
-      aux->casefile = 
-       fastfile_create (dict_get_next_value_idx (ds->dict));
+      aux->casefile =
+       ds->cf_factory->create_casefile (ds->cf_factory,
+                                        dict_get_next_value_idx (ds->dict));
     }
 
   return casefile_append (aux->casefile, c) && ok;
@@ -649,19 +695,19 @@ multipass_split_case_func (const struct ccase *c, void *aux_)
 
 /* End-of-file callback used by multipass_procedure_with_splits(). */
 static bool
-multipass_split_end_func (void *aux_)
+multipass_split_end_func (void *aux_, const struct dataset *ds)
 {
   struct multipass_split_aux_data *aux = aux_;
-  return (aux->casefile == NULL || multipass_split_output (aux));
+  return (aux->casefile == NULL || multipass_split_output (aux, ds));
 }
 
 static bool
-multipass_split_output (struct multipass_split_aux_data *aux)
+multipass_split_output (struct multipass_split_aux_data *aux, const struct dataset *ds)
 {
   bool ok;
-  
+
   assert (aux->casefile != NULL);
-  ok = aux->split_func (&aux->prev_case, aux->casefile, aux->func_aux);
+  ok = aux->split (&aux->prev_case, aux->casefile, aux->func_aux, ds);
   casefile_destroy (aux->casefile);
   aux->casefile = NULL;
 
@@ -677,9 +723,9 @@ discard_variables (struct dataset *ds)
   fh_set_default_handle (NULL);
 
   ds->n_lag = 0;
-  
+
   free_case_source (ds->proc_source);
-  ds->proc_source = NULL;
+  proc_set_source (ds, NULL);
 
   proc_cancel_all_transformations (ds);
 }
@@ -688,10 +734,10 @@ discard_variables (struct dataset *ds)
    and clears the permanent transformations.
    For use by INPUT PROGRAM. */
 struct trns_chain *
-proc_capture_transformations (struct dataset *ds) 
+proc_capture_transformations (struct dataset *ds)
 {
   struct trns_chain *chain;
-  
+
   assert (ds->temporary_trns_chain == NULL);
   chain = ds->permanent_trns_chain;
   ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
@@ -713,7 +759,7 @@ add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *fr
    FINALIZE will be called.
    The functions are passed AUX as auxiliary data. */
 void
-add_transformation_with_finalizer (struct dataset *ds, 
+add_transformation_with_finalizer (struct dataset *ds,
                                   trns_finalize_func *finalize,
                                    trns_proc_func *proc,
                                    trns_free_func *free, void *aux)
@@ -725,7 +771,7 @@ add_transformation_with_finalizer (struct dataset *ds,
    This value can be returned by a transformation procedure
    function to indicate a "jump" to that transformation. */
 size_t
-next_transformation (const struct dataset *ds) 
+next_transformation (const struct dataset *ds)
 {
   return trns_chain_next (ds->cur_trns_chain);
 }
@@ -734,7 +780,7 @@ next_transformation (const struct dataset *ds)
    a temporary transformation, false if it will add a permanent
    transformation. */
 bool
-proc_in_temporary_transformations (const struct dataset *ds) 
+proc_in_temporary_transformations (const struct dataset *ds)
 {
   return ds->temporary_trns_chain != NULL;
 }
@@ -743,13 +789,14 @@ proc_in_temporary_transformations (const struct dataset *ds)
    Further calls to add_transformation() will add temporary
    transformations. */
 void
-proc_start_temporary_transformations (struct dataset *ds) 
+proc_start_temporary_transformations (struct dataset *ds)
 {
   if (!proc_in_temporary_transformations (ds))
     {
       add_case_limit_trns (ds);
 
       ds->permanent_dict = dict_clone (ds->dict);
+
       trns_chain_finalize (ds->permanent_trns_chain);
       ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
     }
@@ -760,9 +807,9 @@ proc_start_temporary_transformations (struct dataset *ds)
    permanent.
    Returns true if anything changed, false otherwise. */
 bool
-proc_make_temporary_transformations_permanent (struct dataset *ds) 
+proc_make_temporary_transformations_permanent (struct dataset *ds)
 {
-  if (proc_in_temporary_transformations (ds)) 
+  if (proc_in_temporary_transformations (ds))
     {
       trns_chain_finalize (ds->temporary_trns_chain);
       trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
@@ -781,12 +828,11 @@ proc_make_temporary_transformations_permanent (struct dataset *ds)
    transformations will be permanent.
    Returns true if anything changed, false otherwise. */
 bool
-proc_cancel_temporary_transformations (struct dataset *ds) 
+proc_cancel_temporary_transformations (struct dataset *ds)
 {
-  if (proc_in_temporary_transformations (ds)) 
+  if (proc_in_temporary_transformations (ds))
     {
-      dict_destroy (ds->dict);
-      ds->dict = ds->permanent_dict;
+      dataset_set_dict (ds, ds->permanent_dict);
       ds->permanent_dict = NULL;
 
       trns_chain_destroy (ds->temporary_trns_chain);
@@ -813,10 +859,16 @@ proc_cancel_all_transformations (struct dataset *ds)
 \f
 /* Initializes procedure handling. */
 struct dataset *
-create_dataset (void)
+create_dataset (struct casefile_factory *fact,
+               replace_source_callback *rps,
+               replace_dictionary_callback *rds
+               )
 {
   struct dataset *ds = xzalloc (sizeof(*ds));
   ds->dict = dict_create ();
+  ds->cf_factory = fact;
+  ds->replace_source = rps;
+  ds->replace_dict = rds;
   proc_cancel_all_transformations (ds);
   return ds;
 }
@@ -827,13 +879,14 @@ destroy_dataset (struct dataset *ds)
 {
   discard_variables (ds);
   dict_destroy (ds->dict);
+  trns_chain_destroy (ds->permanent_trns_chain);
   free (ds);
 }
 
 /* Sets SINK as the destination for procedure output from the
    next procedure. */
 void
-proc_set_sink (struct dataset *ds, struct case_sink *sink) 
+proc_set_sink (struct dataset *ds, struct case_sink *sink)
 {
   assert (ds->proc_sink == NULL);
   ds->proc_sink = sink;
@@ -842,16 +895,18 @@ proc_set_sink (struct dataset *ds, struct case_sink *sink)
 /* Sets SOURCE as the source for procedure input for the next
    procedure. */
 void
-proc_set_source (struct dataset *ds, struct case_source *source) 
+proc_set_source (struct dataset *ds, struct case_source *source)
 {
-  assert (ds->proc_source == NULL);
   ds->proc_source = source;
+
+  if ( ds->replace_source )
+    ds->replace_source (ds->proc_source);
 }
 
 /* Returns true if a source for the next procedure has been
    configured, false otherwise. */
 bool
-proc_has_source (const struct dataset *ds) 
+proc_has_source (const struct dataset *ds)
 {
   return ds->proc_source != NULL;
 }
@@ -861,7 +916,7 @@ proc_has_source (const struct dataset *ds)
    The returned casefile is owned by the caller; it will not be
    automatically used for the next procedure's input. */
 struct casefile *
-proc_capture_output (struct dataset *ds) 
+proc_capture_output (struct dataset *ds)
 {
   struct casefile *casefile;
 
@@ -873,7 +928,7 @@ proc_capture_output (struct dataset *ds)
   assert (!proc_in_temporary_transformations (ds));
 
   casefile = storage_source_decapsulate (ds->proc_source);
-  ds->proc_source = NULL;
+  proc_set_source (ds, NULL);
 
   return casefile;
 }
@@ -884,7 +939,7 @@ static trns_free_func case_limit_trns_free;
 /* Adds a transformation that limits the number of cases that may
    pass through, if DS->DICT has a case limit. */
 static void
-add_case_limit_trns (struct dataset *ds) 
+add_case_limit_trns (struct dataset *ds)
 {
   size_t case_limit = dict_get_case_limit (ds->dict);
   if (case_limit != 0)
@@ -901,10 +956,10 @@ add_case_limit_trns (struct dataset *ds)
    *CASES_REMAINING. */
 static int
 case_limit_trns_proc (void *cases_remaining_,
-                      struct ccase *c UNUSED, casenum_t case_nr UNUSED) 
+                      struct ccase *c UNUSED, casenumber case_nr UNUSED)
 {
   size_t *cases_remaining = cases_remaining_;
-  if (*cases_remaining > 0) 
+  if (*cases_remaining > 0)
     {
       (*cases_remaining)--;
       return TRNS_CONTINUE;
@@ -915,7 +970,7 @@ case_limit_trns_proc (void *cases_remaining_,
 
 /* Frees the data associated with a case limit transformation. */
 static bool
-case_limit_trns_free (void *cases_remaining_) 
+case_limit_trns_free (void *cases_remaining_)
 {
   size_t *cases_remaining = cases_remaining_;
   free (cases_remaining);
@@ -927,10 +982,10 @@ static trns_proc_func filter_trns_proc;
 /* Adds a temporary transformation to filter data according to
    the variable specified on FILTER, if any. */
 static void
-add_filter_trns (struct dataset *ds) 
+add_filter_trns (struct dataset *ds)
 {
   struct variable *filter_var = dict_get_filter (ds->dict);
-  if (filter_var != NULL) 
+  if (filter_var != NULL)
     {
       proc_start_temporary_transformations (ds);
       add_transformation (ds, filter_trns_proc, NULL, filter_var);
@@ -940,12 +995,12 @@ add_filter_trns (struct dataset *ds)
 /* FILTER transformation. */
 static int
 filter_trns_proc (void *filter_var_,
-                  struct ccase *c UNUSED, casenum_t case_nr UNUSED) 
-  
+                  struct ccase *c UNUSED, casenumber case_nr UNUSED)
+
 {
   struct variable *filter_var = filter_var_;
-  double f = case_num (c, filter_var->fv);
-  return (f != 0.0 && !mv_is_num_missing (&filter_var->miss, f)
+  double f = case_num (c, filter_var);
+  return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
           ? TRNS_CONTINUE : TRNS_DROP_CASE);
 }
 
@@ -957,22 +1012,38 @@ dataset_dict (const struct dataset *ds)
 }
 
 
-void 
+/* Set or replace dataset DS's dictionary with DICT.
+   The old dictionary is destroyed */
+void
 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
 {
+  struct dictionary *old_dict = ds->dict;
+
+  dict_copy_callbacks (dict, ds->dict);
   ds->dict = dict;
+
+  if ( ds->replace_dict )
+    ds->replace_dict (dict);
+
+  dict_destroy (old_dict);
 }
 
-int 
+int
 dataset_n_lag (const struct dataset *ds)
 {
   return ds->n_lag;
 }
 
-void 
+void
 dataset_set_n_lag (struct dataset *ds, int n_lag)
 {
   ds->n_lag = n_lag;
 }
 
 
+struct casefile_factory *
+dataset_get_casefile_factory (const struct dataset *ds)
+{
+  return ds->cf_factory;
+}
+