Add a deque, implemented as a circular queue, to libpspp.
[pspp-builds.git] / src / data / procedure.c
index baba8fa7cefb20f049b09484d2de162f50b51694..ed69420c3e464bd71221bd3bff5ee0ad3e727037 100644 (file)
@@ -1,5 +1,5 @@
 /* PSPP - computes sample statistics.
 /* PSPP - computes sample statistics.
-   Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
+   Copyright (C) 1997-9, 2000, 2006, 2007 Free Software Foundation, Inc.
 
    This program is free software; you can redistribute it and/or
    modify it under the terms of the GNU General Public License as
 
    This program is free software; you can redistribute it and/or
    modify it under the terms of the GNU General Public License as
@@ -26,6 +26,7 @@
 #include <data/case-source.h>
 #include <data/case-sink.h>
 #include <data/case.h>
 #include <data/case-source.h>
 #include <data/case-sink.h>
 #include <data/case.h>
+#include <data/casedeque.h>
 #include <data/casefile.h>
 #include <data/fastfile.h>
 #include <data/dictionary.h>
 #include <data/casefile.h>
 #include <data/fastfile.h>
 #include <data/dictionary.h>
 #include <libpspp/str.h>
 
 struct dataset {
 #include <libpspp/str.h>
 
 struct dataset {
+
+  /* An abstract factory which creates casefiles */
+  struct casefile_factory *cf_factory;
+
+  /* Callback which occurs when a procedure provides a new source for
+     the dataset */
+  replace_source_callback *replace_source ;
+
+  /* Callback which occurs whenever the DICT is replaced by a new one */
+  replace_dictionary_callback *replace_dict;
+
   /* Cases are read from proc_source,
      pass through permanent_trns_chain (which transforms them into
      the format described by permanent_dict),
   /* Cases are read from proc_source,
      pass through permanent_trns_chain (which transforms them into
      the format described by permanent_dict),
@@ -64,11 +76,9 @@ struct dataset {
   /* Time at which proc was last invoked. */
   time_t last_proc_invocation;
 
   /* Time at which proc was last invoked. */
   time_t last_proc_invocation;
 
-  /* Lag queue. */
+  /* Cases just before ("lagging") the current one. */
   int n_lag;                   /* Number of cases to lag. */
   int n_lag;                   /* Number of cases to lag. */
-  int lag_count;               /* Number of cases in lag_queue so far. */
-  int lag_head;                /* Index where next case will be added. */
-  struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
+  struct casedeque lagged_cases; /* Lagged cases. */
 
   /* Procedure data. */
   bool is_open;               /* Procedure open? */
 
   /* Procedure data. */
   bool is_open;               /* Procedure open? */
@@ -89,7 +99,6 @@ static bool internal_procedure (struct dataset *ds, case_func *,
 static void update_last_proc_invocation (struct dataset *ds);
 static void create_trns_case (struct ccase *, struct dictionary *);
 static void open_active_file (struct dataset *ds);
 static void update_last_proc_invocation (struct dataset *ds);
 static void create_trns_case (struct ccase *, struct dictionary *);
 static void open_active_file (struct dataset *ds);
-static void lag_case (struct dataset *ds, const struct ccase *c);
 static void clear_case (const struct dataset *ds, struct ccase *c);
 static bool close_active_file (struct dataset *ds);
 \f
 static void clear_case (const struct dataset *ds, struct ccase *c);
 static bool close_active_file (struct dataset *ds);
 \f
@@ -97,7 +106,7 @@ static bool close_active_file (struct dataset *ds);
 
 /* Returns the last time the data was read. */
 time_t
 
 /* Returns the last time the data was read. */
 time_t
-time_of_last_procedure (struct dataset *ds) 
+time_of_last_procedure (struct dataset *ds)
 {
   if (ds->last_proc_invocation == 0)
     update_last_proc_invocation (ds);
 {
   if (ds->last_proc_invocation == 0)
     update_last_proc_invocation (ds);
@@ -116,10 +125,10 @@ time_of_last_procedure (struct dataset *ds)
       start the next case from step 1.
 
    2. Write case to replacement active file.
       start the next case from step 1.
 
    2. Write case to replacement active file.
-   
+
    3. Execute temporary transformations.  If these drop the case,
       start the next case from step 1.
    3. Execute temporary transformations.  If these drop the case,
       start the next case from step 1.
-      
+
    4. Pass case to PROC_FUNC, passing AUX as auxiliary data.
 
    Returns true if successful, false if an I/O error occurred. */
    4. Pass case to PROC_FUNC, passing AUX as auxiliary data.
 
    Returns true if successful, false if an I/O error occurred. */
@@ -148,17 +157,17 @@ procedure (struct dataset *ds, case_func *cf, void *aux)
 \f
 /* Multipass procedure. */
 
 \f
 /* Multipass procedure. */
 
-struct multipass_aux_data 
+struct multipass_aux_data
   {
     struct casefile *casefile;
   {
     struct casefile *casefile;
-    
+
     bool (*proc_func) (const struct casefile *, void *aux);
     void *aux;
   };
 
 /* Case processing function for multipass_procedure(). */
 static bool
     bool (*proc_func) (const struct casefile *, void *aux);
     void *aux;
   };
 
 /* Case processing function for multipass_procedure(). */
 static bool
-multipass_case_func (const struct ccase *c, void *aux_data_, const struct dataset *ds UNUSED) 
+multipass_case_func (const struct ccase *c, void *aux_data_, const struct dataset *ds UNUSED)
 {
   struct multipass_aux_data *aux_data = aux_data_;
   return casefile_append (aux_data->casefile, c);
 {
   struct multipass_aux_data *aux_data = aux_data_;
   return casefile_append (aux_data->casefile, c);
@@ -166,7 +175,7 @@ multipass_case_func (const struct ccase *c, void *aux_data_, const struct datase
 
 /* End-of-file function for multipass_procedure(). */
 static bool
 
 /* End-of-file function for multipass_procedure(). */
 static bool
-multipass_end_func (void *aux_data_, const struct dataset *ds UNUSED) 
+multipass_end_func (void *aux_data_, const struct dataset *ds UNUSED)
 {
   struct multipass_aux_data *aux_data = aux_data_;
   return (aux_data->proc_func == NULL
 {
   struct multipass_aux_data *aux_data = aux_data_;
   return (aux_data->proc_func == NULL
@@ -177,12 +186,15 @@ multipass_end_func (void *aux_data_, const struct dataset *ds UNUSED)
    The entire active file is passed to PROC_FUNC, with the given
    AUX as auxiliary data, as a unit. */
 bool
    The entire active file is passed to PROC_FUNC, with the given
    AUX as auxiliary data, as a unit. */
 bool
-multipass_procedure (struct dataset *ds, casefile_func *proc_func,  void *aux) 
+multipass_procedure (struct dataset *ds, casefile_func *proc_func,  void *aux)
 {
   struct multipass_aux_data aux_data;
   bool ok;
 
 {
   struct multipass_aux_data aux_data;
   bool ok;
 
-  aux_data.casefile = fastfile_create (dict_get_next_value_idx (ds->dict));
+  aux_data.casefile =
+    ds->cf_factory->create_casefile (ds->cf_factory,
+                                    dict_get_next_value_idx (ds->dict));
+
   aux_data.proc_func = proc_func;
   aux_data.aux = aux;
 
   aux_data.proc_func = proc_func;
   aux_data.aux = aux;
 
@@ -194,6 +206,7 @@ multipass_procedure (struct dataset *ds, casefile_func *proc_func,  void *aux)
   return ok;
 }
 \f
   return ok;
 }
 \f
+
 /* Procedure implementation. */
 
 /* Executes a procedure.
 /* Procedure implementation. */
 
 /* Executes a procedure.
@@ -204,18 +217,25 @@ multipass_procedure (struct dataset *ds, casefile_func *proc_func,  void *aux)
 static bool
 internal_procedure (struct dataset *ds, case_func *proc,
                    end_func *end,
 static bool
 internal_procedure (struct dataset *ds, case_func *proc,
                    end_func *end,
-                    void *aux) 
+                    void *aux)
 {
   struct ccase *c;
   bool ok = true;
 {
   struct ccase *c;
   bool ok = true;
-  
+
   proc_open (ds);
   while (ok && proc_read (ds, &c))
     if (proc != NULL)
       ok = proc (c, aux, ds) && ok;
   if (end != NULL)
     ok = end (aux, ds) && ok;
   proc_open (ds);
   while (ok && proc_read (ds, &c))
     if (proc != NULL)
       ok = proc (c, aux, ds) && ok;
   if (end != NULL)
     ok = end (aux, ds) && ok;
-  return proc_close (ds) && ok;
+
+  if ( proc_close (ds) && ok )
+    {
+
+      return true;
+    }
+
+  return false;
 }
 
 /* Opens dataset DS for reading cases with proc_read.
 }
 
 /* Opens dataset DS for reading cases with proc_read.
@@ -244,13 +264,13 @@ proc_open (struct dataset *ds)
    Return false at end of file or if a read error occurs.  In
    this case a null pointer is stored in *C. */
 bool
    Return false at end of file or if a read error occurs.  In
    this case a null pointer is stored in *C. */
 bool
-proc_read (struct dataset *ds, struct ccase **c) 
+proc_read (struct dataset *ds, struct ccase **c)
 {
   enum trns_result retval = TRNS_DROP_CASE;
 
   assert (ds->is_open);
   *c = NULL;
 {
   enum trns_result retval = TRNS_DROP_CASE;
 
   assert (ds->is_open);
   *c = NULL;
-  for (;;) 
+  for (;;)
     {
       size_t case_nr;
 
     {
       size_t case_nr;
 
@@ -271,16 +291,21 @@ proc_read (struct dataset *ds, struct ccase **c)
                                    &ds->trns_case, &case_nr);
       if (retval != TRNS_CONTINUE)
         continue;
                                    &ds->trns_case, &case_nr);
       if (retval != TRNS_CONTINUE)
         continue;
-  
-      /* Write case to LAG queue. */
-      if (ds->n_lag)
-        lag_case (ds, &ds->trns_case);
+
+      /* Write case to collection of lagged cases. */
+      if (ds->n_lag > 0) 
+        {
+          while (casedeque_count (&ds->lagged_cases) >= ds->n_lag)
+            case_destroy (casedeque_pop_back (&ds->lagged_cases));
+          case_clone (casedeque_push_front (&ds->lagged_cases),
+                      &ds->trns_case);
+        }
 
       /* Write case to replacement active file. */
       ds->cases_written++;
 
       /* Write case to replacement active file. */
       ds->cases_written++;
-      if (ds->proc_sink->class->write != NULL) 
+      if (ds->proc_sink->class->write != NULL)
         {
         {
-          if (ds->compactor != NULL) 
+          if (ds->compactor != NULL)
             {
               dict_compactor_compact (ds->compactor, &ds->sink_case,
                                       &ds->trns_case);
             {
               dict_compactor_compact (ds->compactor, &ds->sink_case,
                                       &ds->trns_case);
@@ -289,9 +314,9 @@ proc_read (struct dataset *ds, struct ccase **c)
           else
             ds->proc_sink->class->write (ds->proc_sink, &ds->trns_case);
         }
           else
             ds->proc_sink->class->write (ds->proc_sink, &ds->trns_case);
         }
-  
+
       /* Execute temporary transformations. */
       /* Execute temporary transformations. */
-      if (ds->temporary_trns_chain != NULL) 
+      if (ds->temporary_trns_chain != NULL)
         {
           retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
                                        &ds->trns_case, &ds->cases_written);
         {
           retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
                                        &ds->trns_case, &ds->cases_written);
@@ -300,7 +325,7 @@ proc_read (struct dataset *ds, struct ccase **c)
         }
 
       *c = &ds->trns_case;
         }
 
       *c = &ds->trns_case;
-      return true; 
+      return true;
     }
 }
 
     }
 }
 
@@ -310,21 +335,20 @@ proc_read (struct dataset *ds, struct ccase **c)
    If DS has not been opened, returns true without doing
    anything else. */
 bool
    If DS has not been opened, returns true without doing
    anything else. */
 bool
-proc_close (struct dataset *ds) 
+proc_close (struct dataset *ds)
 {
   if (!ds->is_open)
     return true;
 
   /* Drain any remaining cases. */
 {
   if (!ds->is_open)
     return true;
 
   /* Drain any remaining cases. */
-  while (ds->ok) 
+  while (ds->ok)
     {
       struct ccase *c;
       if (!proc_read (ds, &c))
     {
       struct ccase *c;
       if (!proc_read (ds, &c))
-        break; 
+        break;
     }
     }
-  
   ds->ok = free_case_source (ds->proc_source) && ds->ok;
   ds->ok = free_case_source (ds->proc_source) && ds->ok;
-  ds->proc_source = NULL;
+  proc_set_source (ds, NULL);
 
   case_destroy (&ds->sink_case);
   case_destroy (&ds->trns_case);
 
   case_destroy (&ds->sink_case);
   case_destroy (&ds->trns_case);
@@ -337,7 +361,7 @@ proc_close (struct dataset *ds)
 
 /* Updates last_proc_invocation. */
 static void
 
 /* Updates last_proc_invocation. */
 static void
-update_last_proc_invocation (struct dataset *ds) 
+update_last_proc_invocation (struct dataset *ds)
 {
   ds->last_proc_invocation = time (NULL);
 }
 {
   ds->last_proc_invocation = time (NULL);
 }
@@ -352,7 +376,7 @@ create_trns_case (struct ccase *trns_case, struct dictionary *dict)
   size_t i;
 
   case_create (trns_case, dict_get_next_value_idx (dict));
   size_t i;
 
   case_create (trns_case, dict_get_next_value_idx (dict));
-  for (i = 0; i < var_cnt; i++) 
+  for (i = 0; i < var_cnt; i++)
     {
       struct variable *v = dict_get_var (dict, i);
       union value *value = case_data_rw (trns_case, v);
     {
       struct variable *v = dict_get_var (dict, i);
       union value *value = case_data_rw (trns_case, v);
@@ -381,40 +405,22 @@ open_active_file (struct dataset *ds)
     ds->permanent_dict = ds->dict;
 
   /* Figure out whether to compact. */
     ds->permanent_dict = ds->dict;
 
   /* Figure out whether to compact. */
-  ds->compactor = 
+  ds->compactor =
     (dict_compacting_would_shrink (ds->permanent_dict)
      ? dict_make_compactor (ds->permanent_dict)
      : NULL);
 
   /* Prepare sink. */
   if (ds->proc_sink == NULL)
     (dict_compacting_would_shrink (ds->permanent_dict)
      ? dict_make_compactor (ds->permanent_dict)
      : NULL);
 
   /* Prepare sink. */
   if (ds->proc_sink == NULL)
-    ds->proc_sink = create_case_sink (&storage_sink_class, ds->permanent_dict, NULL);
+    ds->proc_sink = create_case_sink (&storage_sink_class,
+                                     ds->permanent_dict,
+                                     ds->cf_factory,
+                                     NULL);
   if (ds->proc_sink->class->open != NULL)
     ds->proc_sink->class->open (ds->proc_sink);
 
   if (ds->proc_sink->class->open != NULL)
     ds->proc_sink->class->open (ds->proc_sink);
 
-  /* Allocate memory for lag queue. */
-  if (ds->n_lag > 0)
-    {
-      int i;
-  
-      ds->lag_count = 0;
-      ds->lag_head = 0;
-      ds->lag_queue = xnmalloc (ds->n_lag, sizeof *ds->lag_queue);
-      for (i = 0; i < ds->n_lag; i++)
-        case_nullify (&ds->lag_queue[i]);
-    }
-}
-
-/* Add C to the lag queue. */
-static void
-lag_case (struct dataset *ds, const struct ccase *c)
-{
-  if (ds->lag_count < ds->n_lag)
-    ds->lag_count++;
-  case_destroy (&ds->lag_queue[ds->lag_head]);
-  case_clone (&ds->lag_queue[ds->lag_head], c);
-  if (++ds->lag_head >= ds->n_lag)
-    ds->lag_head = 0;
+  /* Allocate memory for lagged cases. */
+  casedeque_init (&ds->lagged_cases, ds->n_lag);
 }
 
 /* Clears the variables in C that need to be cleared between
 }
 
 /* Clears the variables in C that need to be cleared between
@@ -424,17 +430,17 @@ clear_case (const struct dataset *ds, struct ccase *c)
 {
   size_t var_cnt = dict_get_var_cnt (ds->dict);
   size_t i;
 {
   size_t var_cnt = dict_get_var_cnt (ds->dict);
   size_t i;
-  
-  for (i = 0; i < var_cnt; i++) 
+
+  for (i = 0; i < var_cnt; i++)
     {
       struct variable *v = dict_get_var (ds->dict, i);
     {
       struct variable *v = dict_get_var (ds->dict, i);
-      if (!var_get_leave (v)) 
+      if (!var_get_leave (v))
         {
           if (var_is_numeric (v))
         {
           if (var_is_numeric (v))
-            case_data_rw (c, v)->f = SYSMIS; 
+            case_data_rw (c, v)->f = SYSMIS;
           else
             memset (case_data_rw (c, v)->s, ' ', var_get_width (v));
           else
             memset (case_data_rw (c, v)->s, ' ', var_get_width (v));
-        } 
+        }
     }
 }
 
     }
 }
 
@@ -442,31 +448,25 @@ clear_case (const struct dataset *ds, struct ccase *c)
 static bool
 close_active_file (struct dataset *ds)
 {
 static bool
 close_active_file (struct dataset *ds)
 {
-  /* Free memory for lag queue, and turn off lagging. */
-  if (ds->n_lag > 0)
-    {
-      int i;
-      
-      for (i = 0; i < ds->n_lag; i++)
-       case_destroy (&ds->lag_queue[i]);
-      free (ds->lag_queue);
-      ds->n_lag = 0;
-    }
-  
+  /* Free memory for lagged cases. */
+  while (!casedeque_is_empty (&ds->lagged_cases))
+    case_destroy (casedeque_pop_back (&ds->lagged_cases));
+  casedeque_destroy (&ds->lagged_cases);
+
   /* Dictionary from before TEMPORARY becomes permanent. */
   proc_cancel_temporary_transformations (ds);
 
   /* Finish compacting. */
   /* Dictionary from before TEMPORARY becomes permanent. */
   proc_cancel_temporary_transformations (ds);
 
   /* Finish compacting. */
-  if (ds->compactor != NULL) 
+  if (ds->compactor != NULL)
     {
       dict_compactor_destroy (ds->compactor);
       dict_compact_values (ds->dict);
       ds->compactor = NULL;
     }
     {
       dict_compactor_destroy (ds->compactor);
       dict_compact_values (ds->dict);
       ds->compactor = NULL;
     }
-    
+
   /* Old data sink becomes new data source. */
   if (ds->proc_sink->class->make_source != NULL)
   /* Old data sink becomes new data source. */
   if (ds->proc_sink->class->make_source != NULL)
-    ds->proc_source = ds->proc_sink->class->make_source (ds->proc_sink);
+    proc_set_source (ds, ds->proc_sink->class->make_source (ds->proc_sink) );
   free_case_sink (ds->proc_sink);
   ds->proc_sink = NULL;
 
   free_case_sink (ds->proc_sink);
   ds->proc_sink = NULL;
 
@@ -480,16 +480,11 @@ close_active_file (struct dataset *ds)
 struct ccase *
 lagged_case (const struct dataset *ds, int n_before)
 {
 struct ccase *
 lagged_case (const struct dataset *ds, int n_before)
 {
-  assert (n_before >= 1 );
+  assert (n_before >= 1);
   assert (n_before <= ds->n_lag);
 
   assert (n_before <= ds->n_lag);
 
-  if (n_before <= ds->lag_count)
-    {
-      int index = ds->lag_head - n_before;
-      if (index < 0)
-        index += ds->n_lag;
-      return &ds->lag_queue[index];
-    }
+  if (n_before <= casedeque_count (&ds->lagged_cases))
+    return casedeque_front (&ds->lagged_cases, n_before - 1);
   else
     return NULL;
 }
   else
     return NULL;
 }
@@ -497,13 +492,13 @@ lagged_case (const struct dataset *ds, int n_before)
 /* Procedure that separates the data into SPLIT FILE groups. */
 
 /* Represents auxiliary data for handling SPLIT FILE. */
 /* Procedure that separates the data into SPLIT FILE groups. */
 
 /* Represents auxiliary data for handling SPLIT FILE. */
-struct split_aux_data 
+struct split_aux_data
   {
     struct dataset *dataset;    /* The dataset */
     struct ccase prev_case;     /* Data in previous case. */
 
     /* Callback functions. */
   {
     struct dataset *dataset;    /* The dataset */
     struct ccase prev_case;     /* Data in previous case. */
 
     /* Callback functions. */
-    begin_func *begin; 
+    begin_func *begin;
     case_func *proc;
     end_func *end;
     void *func_aux;
     case_func *proc;
     end_func *end;
     void *func_aux;
@@ -523,19 +518,19 @@ static bool split_procedure_end_func (void *, const struct dataset *);
    passed to each of the functions as auxiliary data.
 
    If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
    passed to each of the functions as auxiliary data.
 
    If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
-   and END_FUNC will be called at all. 
+   and END_FUNC will be called at all.
 
    If SPLIT FILE is not in effect, then there is one break group
    (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
    will be called once.
 
    If SPLIT FILE is not in effect, then there is one break group
    (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
    will be called once.
-   
+
    Returns true if successful, false if an I/O error occurred. */
 bool
 procedure_with_splits (struct dataset *ds,
    Returns true if successful, false if an I/O error occurred. */
 bool
 procedure_with_splits (struct dataset *ds,
-                      begin_func begin, 
+                      begin_func begin,
                       case_func *proc,
                        end_func *end,
                       case_func *proc,
                        end_func *end,
-                       void *func_aux) 
+                       void *func_aux)
 {
   struct split_aux_data split_aux;
   bool ok;
 {
   struct split_aux_data split_aux;
   bool ok;
@@ -557,7 +552,7 @@ procedure_with_splits (struct dataset *ds,
 
 /* Case callback used by procedure_with_splits(). */
 static bool
 
 /* Case callback used by procedure_with_splits(). */
 static bool
-split_procedure_case_func (const struct ccase *c, void *split_aux_, const struct dataset *ds) 
+split_procedure_case_func (const struct ccase *c, void *split_aux_, const struct dataset *ds)
 {
   struct split_aux_data *split_aux = split_aux_;
 
 {
   struct split_aux_data *split_aux = split_aux_;
 
@@ -581,7 +576,7 @@ split_procedure_case_func (const struct ccase *c, void *split_aux_, const struct
 
 /* End-of-file callback used by procedure_with_splits(). */
 static bool
 
 /* End-of-file callback used by procedure_with_splits(). */
 static bool
-split_procedure_end_func (void *split_aux_, const struct dataset *ds) 
+split_procedure_end_func (void *split_aux_, const struct dataset *ds)
 {
   struct split_aux_data *split_aux = split_aux_;
 
 {
   struct split_aux_data *split_aux = split_aux_;
 
@@ -593,8 +588,8 @@ split_procedure_end_func (void *split_aux_, const struct dataset *ds)
 /* Compares the SPLIT FILE variables in cases A and B and returns
    nonzero only if they differ. */
 static int
 /* Compares the SPLIT FILE variables in cases A and B and returns
    nonzero only if they differ. */
 static int
-equal_splits (const struct ccase *a, const struct ccase *b, 
-             const struct dataset *ds) 
+equal_splits (const struct ccase *a, const struct ccase *b,
+             const struct dataset *ds)
 {
   return case_compare (a, b,
                        dict_get_split_vars (ds->dict),
 {
   return case_compare (a, b,
                        dict_get_split_vars (ds->dict),
@@ -606,14 +601,14 @@ equal_splits (const struct ccase *a, const struct ccase *b,
 
 /* Represents auxiliary data for handling SPLIT FILE in a
    multipass procedure. */
 
 /* Represents auxiliary data for handling SPLIT FILE in a
    multipass procedure. */
-struct multipass_split_aux_data 
+struct multipass_split_aux_data
   {
     struct dataset *dataset;    /* The dataset of the split */
     struct ccase prev_case;     /* Data in previous case. */
     struct casefile *casefile;  /* Accumulates data for a split. */
   {
     struct dataset *dataset;    /* The dataset of the split */
     struct ccase prev_case;     /* Data in previous case. */
     struct casefile *casefile;  /* Accumulates data for a split. */
-    split_func *split;          /* Function to call with the accumulated 
+    split_func *split;          /* Function to call with the accumulated
                                   data. */
                                   data. */
-    void *func_aux;             /* Auxiliary data. */ 
+    void *func_aux;             /* Auxiliary data. */
   };
 
 static bool multipass_split_case_func (const struct ccase *c, void *aux_, const struct dataset *);
   };
 
 static bool multipass_split_case_func (const struct ccase *c, void *aux_, const struct dataset *);
@@ -622,7 +617,7 @@ static bool multipass_split_output (struct multipass_split_aux_data *, const str
 
 /* Returns true if successful, false if an I/O error occurred. */
 bool
 
 /* Returns true if successful, false if an I/O error occurred. */
 bool
-multipass_procedure_with_splits (struct dataset *ds, 
+multipass_procedure_with_splits (struct dataset *ds,
                                 split_func  *split,
                                  void *func_aux)
 {
                                 split_func  *split,
                                  void *func_aux)
 {
@@ -661,8 +656,9 @@ multipass_split_case_func (const struct ccase *c, void *aux_, const struct datas
         ok = multipass_split_output (aux, ds);
 
       /* Start a new casefile. */
         ok = multipass_split_output (aux, ds);
 
       /* Start a new casefile. */
-      aux->casefile = 
-       fastfile_create (dict_get_next_value_idx (ds->dict));
+      aux->casefile =
+       ds->cf_factory->create_casefile (ds->cf_factory,
+                                        dict_get_next_value_idx (ds->dict));
     }
 
   return casefile_append (aux->casefile, c) && ok;
     }
 
   return casefile_append (aux->casefile, c) && ok;
@@ -680,7 +676,7 @@ static bool
 multipass_split_output (struct multipass_split_aux_data *aux, const struct dataset *ds)
 {
   bool ok;
 multipass_split_output (struct multipass_split_aux_data *aux, const struct dataset *ds)
 {
   bool ok;
-  
+
   assert (aux->casefile != NULL);
   ok = aux->split (&aux->prev_case, aux->casefile, aux->func_aux, ds);
   casefile_destroy (aux->casefile);
   assert (aux->casefile != NULL);
   ok = aux->split (&aux->prev_case, aux->casefile, aux->func_aux, ds);
   casefile_destroy (aux->casefile);
@@ -698,9 +694,9 @@ discard_variables (struct dataset *ds)
   fh_set_default_handle (NULL);
 
   ds->n_lag = 0;
   fh_set_default_handle (NULL);
 
   ds->n_lag = 0;
-  
+
   free_case_source (ds->proc_source);
   free_case_source (ds->proc_source);
-  ds->proc_source = NULL;
+  proc_set_source (ds, NULL);
 
   proc_cancel_all_transformations (ds);
 }
 
   proc_cancel_all_transformations (ds);
 }
@@ -709,10 +705,10 @@ discard_variables (struct dataset *ds)
    and clears the permanent transformations.
    For use by INPUT PROGRAM. */
 struct trns_chain *
    and clears the permanent transformations.
    For use by INPUT PROGRAM. */
 struct trns_chain *
-proc_capture_transformations (struct dataset *ds) 
+proc_capture_transformations (struct dataset *ds)
 {
   struct trns_chain *chain;
 {
   struct trns_chain *chain;
-  
+
   assert (ds->temporary_trns_chain == NULL);
   chain = ds->permanent_trns_chain;
   ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
   assert (ds->temporary_trns_chain == NULL);
   chain = ds->permanent_trns_chain;
   ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
@@ -734,7 +730,7 @@ add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *fr
    FINALIZE will be called.
    The functions are passed AUX as auxiliary data. */
 void
    FINALIZE will be called.
    The functions are passed AUX as auxiliary data. */
 void
-add_transformation_with_finalizer (struct dataset *ds, 
+add_transformation_with_finalizer (struct dataset *ds,
                                   trns_finalize_func *finalize,
                                    trns_proc_func *proc,
                                    trns_free_func *free, void *aux)
                                   trns_finalize_func *finalize,
                                    trns_proc_func *proc,
                                    trns_free_func *free, void *aux)
@@ -746,7 +742,7 @@ add_transformation_with_finalizer (struct dataset *ds,
    This value can be returned by a transformation procedure
    function to indicate a "jump" to that transformation. */
 size_t
    This value can be returned by a transformation procedure
    function to indicate a "jump" to that transformation. */
 size_t
-next_transformation (const struct dataset *ds) 
+next_transformation (const struct dataset *ds)
 {
   return trns_chain_next (ds->cur_trns_chain);
 }
 {
   return trns_chain_next (ds->cur_trns_chain);
 }
@@ -755,7 +751,7 @@ next_transformation (const struct dataset *ds)
    a temporary transformation, false if it will add a permanent
    transformation. */
 bool
    a temporary transformation, false if it will add a permanent
    transformation. */
 bool
-proc_in_temporary_transformations (const struct dataset *ds) 
+proc_in_temporary_transformations (const struct dataset *ds)
 {
   return ds->temporary_trns_chain != NULL;
 }
 {
   return ds->temporary_trns_chain != NULL;
 }
@@ -764,13 +760,14 @@ proc_in_temporary_transformations (const struct dataset *ds)
    Further calls to add_transformation() will add temporary
    transformations. */
 void
    Further calls to add_transformation() will add temporary
    transformations. */
 void
-proc_start_temporary_transformations (struct dataset *ds) 
+proc_start_temporary_transformations (struct dataset *ds)
 {
   if (!proc_in_temporary_transformations (ds))
     {
       add_case_limit_trns (ds);
 
       ds->permanent_dict = dict_clone (ds->dict);
 {
   if (!proc_in_temporary_transformations (ds))
     {
       add_case_limit_trns (ds);
 
       ds->permanent_dict = dict_clone (ds->dict);
+
       trns_chain_finalize (ds->permanent_trns_chain);
       ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
     }
       trns_chain_finalize (ds->permanent_trns_chain);
       ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
     }
@@ -781,9 +778,9 @@ proc_start_temporary_transformations (struct dataset *ds)
    permanent.
    Returns true if anything changed, false otherwise. */
 bool
    permanent.
    Returns true if anything changed, false otherwise. */
 bool
-proc_make_temporary_transformations_permanent (struct dataset *ds) 
+proc_make_temporary_transformations_permanent (struct dataset *ds)
 {
 {
-  if (proc_in_temporary_transformations (ds)) 
+  if (proc_in_temporary_transformations (ds))
     {
       trns_chain_finalize (ds->temporary_trns_chain);
       trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
     {
       trns_chain_finalize (ds->temporary_trns_chain);
       trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
@@ -802,12 +799,11 @@ proc_make_temporary_transformations_permanent (struct dataset *ds)
    transformations will be permanent.
    Returns true if anything changed, false otherwise. */
 bool
    transformations will be permanent.
    Returns true if anything changed, false otherwise. */
 bool
-proc_cancel_temporary_transformations (struct dataset *ds) 
+proc_cancel_temporary_transformations (struct dataset *ds)
 {
 {
-  if (proc_in_temporary_transformations (ds)) 
+  if (proc_in_temporary_transformations (ds))
     {
     {
-      dict_destroy (ds->dict);
-      ds->dict = ds->permanent_dict;
+      dataset_set_dict (ds, ds->permanent_dict);
       ds->permanent_dict = NULL;
 
       trns_chain_destroy (ds->temporary_trns_chain);
       ds->permanent_dict = NULL;
 
       trns_chain_destroy (ds->temporary_trns_chain);
@@ -834,10 +830,16 @@ proc_cancel_all_transformations (struct dataset *ds)
 \f
 /* Initializes procedure handling. */
 struct dataset *
 \f
 /* Initializes procedure handling. */
 struct dataset *
-create_dataset (void)
+create_dataset (struct casefile_factory *fact,
+               replace_source_callback *rps,
+               replace_dictionary_callback *rds
+               )
 {
   struct dataset *ds = xzalloc (sizeof(*ds));
   ds->dict = dict_create ();
 {
   struct dataset *ds = xzalloc (sizeof(*ds));
   ds->dict = dict_create ();
+  ds->cf_factory = fact;
+  ds->replace_source = rps;
+  ds->replace_dict = rds;
   proc_cancel_all_transformations (ds);
   return ds;
 }
   proc_cancel_all_transformations (ds);
   return ds;
 }
@@ -855,7 +857,7 @@ destroy_dataset (struct dataset *ds)
 /* Sets SINK as the destination for procedure output from the
    next procedure. */
 void
 /* Sets SINK as the destination for procedure output from the
    next procedure. */
 void
-proc_set_sink (struct dataset *ds, struct case_sink *sink) 
+proc_set_sink (struct dataset *ds, struct case_sink *sink)
 {
   assert (ds->proc_sink == NULL);
   ds->proc_sink = sink;
 {
   assert (ds->proc_sink == NULL);
   ds->proc_sink = sink;
@@ -864,16 +866,18 @@ proc_set_sink (struct dataset *ds, struct case_sink *sink)
 /* Sets SOURCE as the source for procedure input for the next
    procedure. */
 void
 /* Sets SOURCE as the source for procedure input for the next
    procedure. */
 void
-proc_set_source (struct dataset *ds, struct case_source *source) 
+proc_set_source (struct dataset *ds, struct case_source *source)
 {
 {
-  assert (ds->proc_source == NULL);
   ds->proc_source = source;
   ds->proc_source = source;
+
+  if ( ds->replace_source )
+    ds->replace_source (ds->proc_source);
 }
 
 /* Returns true if a source for the next procedure has been
    configured, false otherwise. */
 bool
 }
 
 /* Returns true if a source for the next procedure has been
    configured, false otherwise. */
 bool
-proc_has_source (const struct dataset *ds) 
+proc_has_source (const struct dataset *ds)
 {
   return ds->proc_source != NULL;
 }
 {
   return ds->proc_source != NULL;
 }
@@ -883,7 +887,7 @@ proc_has_source (const struct dataset *ds)
    The returned casefile is owned by the caller; it will not be
    automatically used for the next procedure's input. */
 struct casefile *
    The returned casefile is owned by the caller; it will not be
    automatically used for the next procedure's input. */
 struct casefile *
-proc_capture_output (struct dataset *ds) 
+proc_capture_output (struct dataset *ds)
 {
   struct casefile *casefile;
 
 {
   struct casefile *casefile;
 
@@ -895,7 +899,7 @@ proc_capture_output (struct dataset *ds)
   assert (!proc_in_temporary_transformations (ds));
 
   casefile = storage_source_decapsulate (ds->proc_source);
   assert (!proc_in_temporary_transformations (ds));
 
   casefile = storage_source_decapsulate (ds->proc_source);
-  ds->proc_source = NULL;
+  proc_set_source (ds, NULL);
 
   return casefile;
 }
 
   return casefile;
 }
@@ -906,7 +910,7 @@ static trns_free_func case_limit_trns_free;
 /* Adds a transformation that limits the number of cases that may
    pass through, if DS->DICT has a case limit. */
 static void
 /* Adds a transformation that limits the number of cases that may
    pass through, if DS->DICT has a case limit. */
 static void
-add_case_limit_trns (struct dataset *ds) 
+add_case_limit_trns (struct dataset *ds)
 {
   size_t case_limit = dict_get_case_limit (ds->dict);
   if (case_limit != 0)
 {
   size_t case_limit = dict_get_case_limit (ds->dict);
   if (case_limit != 0)
@@ -923,10 +927,10 @@ add_case_limit_trns (struct dataset *ds)
    *CASES_REMAINING. */
 static int
 case_limit_trns_proc (void *cases_remaining_,
    *CASES_REMAINING. */
 static int
 case_limit_trns_proc (void *cases_remaining_,
-                      struct ccase *c UNUSED, casenumber case_nr UNUSED) 
+                      struct ccase *c UNUSED, casenumber case_nr UNUSED)
 {
   size_t *cases_remaining = cases_remaining_;
 {
   size_t *cases_remaining = cases_remaining_;
-  if (*cases_remaining > 0) 
+  if (*cases_remaining > 0)
     {
       (*cases_remaining)--;
       return TRNS_CONTINUE;
     {
       (*cases_remaining)--;
       return TRNS_CONTINUE;
@@ -937,7 +941,7 @@ case_limit_trns_proc (void *cases_remaining_,
 
 /* Frees the data associated with a case limit transformation. */
 static bool
 
 /* Frees the data associated with a case limit transformation. */
 static bool
-case_limit_trns_free (void *cases_remaining_) 
+case_limit_trns_free (void *cases_remaining_)
 {
   size_t *cases_remaining = cases_remaining_;
   free (cases_remaining);
 {
   size_t *cases_remaining = cases_remaining_;
   free (cases_remaining);
@@ -949,10 +953,10 @@ static trns_proc_func filter_trns_proc;
 /* Adds a temporary transformation to filter data according to
    the variable specified on FILTER, if any. */
 static void
 /* Adds a temporary transformation to filter data according to
    the variable specified on FILTER, if any. */
 static void
-add_filter_trns (struct dataset *ds) 
+add_filter_trns (struct dataset *ds)
 {
   struct variable *filter_var = dict_get_filter (ds->dict);
 {
   struct variable *filter_var = dict_get_filter (ds->dict);
-  if (filter_var != NULL) 
+  if (filter_var != NULL)
     {
       proc_start_temporary_transformations (ds);
       add_transformation (ds, filter_trns_proc, NULL, filter_var);
     {
       proc_start_temporary_transformations (ds);
       add_transformation (ds, filter_trns_proc, NULL, filter_var);
@@ -962,12 +966,12 @@ add_filter_trns (struct dataset *ds)
 /* FILTER transformation. */
 static int
 filter_trns_proc (void *filter_var_,
 /* FILTER transformation. */
 static int
 filter_trns_proc (void *filter_var_,
-                  struct ccase *c UNUSED, casenumber case_nr UNUSED) 
-  
+                  struct ccase *c UNUSED, casenumber case_nr UNUSED)
+
 {
   struct variable *filter_var = filter_var_;
   double f = case_num (c, filter_var);
 {
   struct variable *filter_var = filter_var_;
   double f = case_num (c, filter_var);
-  return (f != 0.0 && !var_is_num_missing (filter_var, f)
+  return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
           ? TRNS_CONTINUE : TRNS_DROP_CASE);
 }
 
           ? TRNS_CONTINUE : TRNS_DROP_CASE);
 }
 
@@ -979,22 +983,31 @@ dataset_dict (const struct dataset *ds)
 }
 
 
 }
 
 
-void 
+/* Set or replace dataset DS's dictionary with DICT.
+   The old dictionary is destroyed */
+void
 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
 {
 dataset_set_dict (struct dataset *ds, struct dictionary *dict)
 {
+  struct dictionary *old_dict = ds->dict;
+
+  dict_copy_callbacks (dict, ds->dict);
   ds->dict = dict;
   ds->dict = dict;
-}
 
 
-int 
-dataset_n_lag (const struct dataset *ds)
-{
-  return ds->n_lag;
+  if ( ds->replace_dict )
+    ds->replace_dict (dict);
+
+  dict_destroy (old_dict);
 }
 
 void 
 }
 
 void 
-dataset_set_n_lag (struct dataset *ds, int n_lag)
+dataset_need_lag (struct dataset *ds, int n_before)
 {
 {
-  ds->n_lag = n_lag;
+  ds->n_lag = MAX (ds->n_lag, n_before);
 }
 
 }
 
+struct casefile_factory *
+dataset_get_casefile_factory (const struct dataset *ds)
+{
+  return ds->cf_factory;
+}