Adopt use of gnulib for portability.
[pspp-builds.git] / src / vfm.c
index fb8c5febb4b85669b3b73f01ca1f25be6fe34a78..0414234c01d2be666208fb4c7db1e67c5b315974 100644 (file)
--- a/src/vfm.c
+++ b/src/vfm.c
 
    You should have received a copy of the GNU General Public License
    along with this program; if not, write to the Free Software
-   Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
-   02111-1307, USA. */
+   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+   02110-1301, USA. */
 
 #include <config.h>
 #include "vfm.h"
 #include "vfmP.h"
-#include <assert.h>
+#include "error.h"
 #include <errno.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <unistd.h>    /* Required by SunOS4. */
 #endif
 #include "alloc.h"
-#include "approx.h"
+#include "case.h"
+#include "casefile.h"
+#include "dictionary.h"
 #include "do-ifP.h"
 #include "error.h"
-#include "expr.h"
+#include "expressions/public.h"
 #include "misc.h"
-#include "random.h"
+#include "settings.h"
 #include "som.h"
 #include "str.h"
 #include "tab.h"
 #include "var.h"
 #include "value-labels.h"
 
+#include "gettext.h"
+#define _(msgid) gettext (msgid)
+
 /*
    Virtual File Manager (vfm):
 
-   vfm is used to process data files.  It uses the model that data is
-   read from one stream (the data source), then written to another
-   (the data sink).  The data source is then deleted and the data sink
-   becomes the data source for the next procedure. */
-
-#include "debug-print.h"
+   vfm is used to process data files.  It uses the model that
+   data is read from one stream (the data source), processed,
+   then written to another (the data sink).  The data source is
+   then deleted and the data sink becomes the data source for the
+   next procedure. */
 
 /* Procedure execution data. */
 struct write_case_data
   {
-    void (*beginfunc) (void *);
-    int (*procfunc) (struct ccase *, void *);
-    void (*endfunc) (void *);
-    void *aux;
+    /* Function to call for each case. */
+    int (*proc_func) (struct ccase *, void *); /* Function. */
+    void *aux;                                 /* Auxiliary data. */ 
+
+    struct ccase trns_case;     /* Case used for transformations. */
+    struct ccase sink_case;     /* Case written to sink, if
+                                   compaction is necessary. */
+    size_t cases_written;       /* Cases output so far. */
+    size_t cases_analyzed;      /* Cases passed to procedure so far. */
   };
 
-/* This is used to read from the active file. */
-struct case_stream *vfm_source;
-
-/* This is used to write to the replacement active file. */
-struct case_stream *vfm_sink;
+/* The current active file, from which cases are read. */
+struct case_source *vfm_source;
 
-/* Information about the data source. */
-struct stream_info vfm_source_info;
-
-/* Information about the data sink. */
-struct stream_info vfm_sink_info;
+/* The replacement active file, to which cases are written. */
+struct case_sink *vfm_sink;
 
 /* Nonzero if the case needs to have values deleted before being
    stored, zero otherwise. */
-int compaction_necessary;
-
-/* Number of values after compaction, or the same as
-   vfm_sink_info.nval, if compaction is not necessary. */
-int compaction_nval;
-
-/* Temporary case buffer with enough room for `compaction_nval'
-   `value's. */
-struct ccase *compaction_case;
-
-/* Within a session, when paging is turned on, it is never turned back
-   off.  This policy might be too aggressive. */
-static int paging = 0;
+static int compaction_necessary;
 
 /* Time at which vfm was last invoked. */
 time_t last_vfm_invocation;
 
-/* Number of cases passed to proc_func(). */
-static int case_count;
-
 /* Lag queue. */
 int n_lag;                     /* Number of cases to lag. */
 static int lag_count;          /* Number of cases in lag_queue so far. */
 static int lag_head;           /* Index where next case will be added. */
-static struct ccase **lag_queue; /* Array of n_lag ccase * elements. */
+static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
 
+static void internal_procedure (int (*proc_func) (struct ccase *, void *),
+                                void *aux);
+static void create_trns_case (struct ccase *, struct dictionary *);
 static void open_active_file (void);
-static void close_active_file (struct write_case_data *);
-static int SPLIT_FILE_procfunc (struct ccase *, void *);
-static void finish_compaction (void);
-static void lag_case (void);
-static int procedure_write_case (struct write_case_data *);
-static void clear_temp_case (void);
-static int exclude_this_case (void);
+static int write_case (struct write_case_data *wc_data);
+static int execute_transformations (struct ccase *c,
+                                    struct trns_header **trns,
+                                    int first_idx, int last_idx,
+                                    int case_num);
+static int filter_case (const struct ccase *c, int case_num);
+static void lag_case (const struct ccase *c);
+static void clear_case (struct ccase *c);
+static void close_active_file (void);
 \f
 /* Public functions. */
 
-/* Reads all the cases from the active file, transforms them by
-   the active set of transformations, calls PROCFUNC with CURCASE
-   set to the case, and writes them to a new active file.
+/* Reads the data from the input program and writes it to a new
+   active file.  For each case we read from the input program, we
+   do the following
 
-   Divides the active file into zero or more series of one or more
-   cases each.  BEGINFUNC is called before each series.  ENDFUNC is
-   called after each series.
+   1. Execute permanent transformations.  If these drop the case,
+      start the next case from step 1.
 
-   Arbitrary user-specified data AUX is passed to BEGINFUNC,
-   PROCFUNC, and ENDFUNC as auxiliary data. */
+   2. N OF CASES.  If we have already written N cases, start the
+      next case from step 1.
+   
+   3. Write case to replacement active file.
+   
+   4. Execute temporary transformations.  If these drop the case,
+      start the next case from step 1.
+      
+   5. FILTER, PROCESS IF.  If these drop the case, start the next
+      case from step 1.
+   
+   6. Post-TEMPORARY N OF CASES.  If we have already analyzed N
+      cases, start the next case from step 1.
+      
+   7. Pass case to PROC_FUNC, passing AUX as auxiliary data. */
 void
-procedure (void (*beginfunc) (void *),
-          int (*procfunc) (struct ccase *curcase, void *),
-          void (*endfunc) (void *),
-           void *aux)
+procedure (int (*proc_func) (struct ccase *, void *), void *aux)
+{
+  if (proc_func == NULL
+      && case_source_is_class (vfm_source, &storage_source_class)
+      && vfm_sink == NULL
+      && !temporary
+      && n_trns == 0)
+    {
+      /* Nothing to do. */
+      return;
+    }
+
+  open_active_file ();
+  internal_procedure (proc_func, aux);
+  close_active_file ();
+}
+
+/* Executes a procedure, as procedure(), except that the caller
+   is responsible for calling open_active_file() and
+   close_active_file(). */
+static void
+internal_procedure (int (*proc_func) (struct ccase *, void *), void *aux) 
 {
   static int recursive_call;
 
-  struct write_case_data procedure_write_data;
-  struct write_case_data split_file_data;
+  struct write_case_data wc_data;
 
   assert (++recursive_call == 1);
 
-  if (dict_get_split_cnt (default_dict) == 0) 
-    {
-      /* Normally we just use the data passed by the user. */
-      procedure_write_data.beginfunc = beginfunc;
-      procedure_write_data.procfunc = procfunc;
-      procedure_write_data.endfunc = endfunc;
-      procedure_write_data.aux = aux;
-    }
-  else
-    {
-      /* Under SPLIT FILE, we add a layer of indirection. */
-      procedure_write_data.beginfunc = NULL;
-      procedure_write_data.procfunc = SPLIT_FILE_procfunc;
-      procedure_write_data.endfunc = endfunc;
-      procedure_write_data.aux = &split_file_data;
-
-      split_file_data.beginfunc = beginfunc;
-      split_file_data.procfunc = procfunc;
-      split_file_data.endfunc = endfunc;
-      split_file_data.aux = aux;
-    }
+  wc_data.proc_func = proc_func;
+  wc_data.aux = aux;
+  create_trns_case (&wc_data.trns_case, default_dict);
+  case_create (&wc_data.sink_case, dict_get_next_value_idx (default_dict));
+  wc_data.cases_written = 0;
 
   last_vfm_invocation = time (NULL);
 
-  open_active_file ();
-  vfm_source->read (procedure_write_case, &procedure_write_data);
-  close_active_file (&procedure_write_data);
+  if (vfm_source != NULL) 
+    vfm_source->class->read (vfm_source,
+                             &wc_data.trns_case,
+                             write_case, &wc_data);
+
+  case_destroy (&wc_data.sink_case);
+  case_destroy (&wc_data.trns_case);
 
   assert (--recursive_call == 0);
 }
-\f
-/* Active file processing support.  Subtly different semantics from
-   procedure(). */
 
-static int process_active_file_write_case (struct write_case_data *data);
+/* Creates and returns a case, initializing it from the vectors
+   that say which `value's need to be initialized just once, and
+   which ones need to be re-initialized before every case. */
+static void
+create_trns_case (struct ccase *trns_case, struct dictionary *dict)
+{
+  size_t var_cnt = dict_get_var_cnt (dict);
+  size_t i;
 
-/* The casefunc might want us to stop calling it. */
-static int not_canceled;
+  case_create (trns_case, dict_get_next_value_idx (dict));
+  for (i = 0; i < var_cnt; i++) 
+    {
+      struct variable *v = dict_get_var (dict, i);
+      union value *value = case_data_rw (trns_case, v->fv);
 
-/* Reads all the cases from the active file and passes them one-by-one
-   to CASEFUNC in temp_case.  Before any cases are passed, calls
-   BEGINFUNC.  After all the cases have been passed, calls ENDFUNC.
-   BEGINFUNC, CASEFUNC, and ENDFUNC can write temp_case to the output
-   file by calling process_active_file_output_case().
+      if (v->type == NUMERIC)
+        value->f = v->reinit ? 0.0 : SYSMIS;
+      else
+        memset (value->s, ' ', v->width);
+    }
+}
 
-   process_active_file() ignores TEMPORARY, SPLIT FILE, and N. */
-void
-process_active_file (void (*beginfunc) (void *),
-                    int (*casefunc) (struct ccase *curcase, void *),
-                    void (*endfunc) (void *),
-                     void *aux)
+/* Makes all preparations for reading from the data source and writing
+   to the data sink. */
+static void
+open_active_file (void)
 {
-  struct write_case_data process_active_write_data;
+  /* Make temp_dict refer to the dictionary right before data
+     reaches the sink */
+  if (!temporary)
+    {
+      temp_trns = n_trns;
+      temp_dict = default_dict;
+    }
 
-  process_active_write_data.beginfunc = beginfunc;
-  process_active_write_data.procfunc = casefunc;
-  process_active_write_data.endfunc = endfunc;
-  process_active_write_data.aux = aux;
+  /* Figure out compaction. */
+  compaction_necessary = (dict_get_next_value_idx (temp_dict)
+                          != dict_get_compacted_value_cnt (temp_dict));
 
-  not_canceled = 1;
+  /* Prepare sink. */
+  if (vfm_sink == NULL)
+    vfm_sink = create_case_sink (&storage_sink_class, temp_dict, NULL);
+  if (vfm_sink->class->open != NULL)
+    vfm_sink->class->open (vfm_sink);
 
-  open_active_file ();
-  beginfunc (aux);
-  
-  /* There doesn't necessarily need to be an active file. */
-  if (vfm_source)
-    vfm_source->read (process_active_file_write_case,
-                      &process_active_write_data);
+  /* Allocate memory for lag queue. */
+  if (n_lag > 0)
+    {
+      int i;
   
-  endfunc (aux);
-  close_active_file (&process_active_write_data);
+      lag_count = 0;
+      lag_head = 0;
+      lag_queue = xmalloc (n_lag * sizeof *lag_queue);
+      for (i = 0; i < n_lag; i++)
+        case_nullify (&lag_queue[i]);
+    }
+
+  /* Close any unclosed DO IF or LOOP constructs. */
+  discard_ctl_stack ();
 }
 
-/* Pass the current case to casefunc. */
+/* Transforms trns_case and writes it to the replacement active
+   file if advisable.  Returns nonzero if more cases can be
+   accepted, zero otherwise.  Do not call this function again
+   after it has returned zero once.  */
 static int
-process_active_file_write_case (struct write_case_data *data)
+write_case (struct write_case_data *wc_data)
 {
-  /* Index of current transformation. */
-  int cur_trns;
+  /* Execute permanent transformations.  */
+  if (!execute_transformations (&wc_data->trns_case, t_trns, f_trns, temp_trns,
+                                wc_data->cases_written + 1))
+    goto done;
 
-  for (cur_trns = f_trns ; cur_trns != temp_trns; )
-    {
-      int code;
-       
-      code = t_trns[cur_trns]->proc (t_trns[cur_trns], temp_case);
-      switch (code)
-       {
-       case -1:
-         /* Next transformation. */
-         cur_trns++;
-         break;
-       case -2:
-         /* Delete this case. */
-         goto done;
-       default:
-         /* Go to that transformation. */
-         cur_trns = code;
-         break;
-       }
-    }
+  /* N OF CASES. */
+  if (dict_get_case_limit (default_dict)
+      && wc_data->cases_written >= dict_get_case_limit (default_dict))
+    goto done;
+  wc_data->cases_written++;
 
+  /* Write case to LAG queue. */
   if (n_lag)
-    lag_case ();
-         
-  /* Call the procedure if FILTER and PROCESS IF don't prohibit it. */
-  if (not_canceled && !exclude_this_case ())
-    not_canceled = data->procfunc (temp_case, data->aux);
+    lag_case (&wc_data->trns_case);
+
+  /* Write case to replacement active file. */
+  if (vfm_sink->class->write != NULL) 
+    {
+      if (compaction_necessary) 
+        {
+          dict_compact_case (temp_dict, &wc_data->sink_case,
+                             &wc_data->trns_case);
+          vfm_sink->class->write (vfm_sink, &wc_data->sink_case);
+        }
+      else
+        vfm_sink->class->write (vfm_sink, &wc_data->trns_case);
+    }
   
-  case_count++;
+  /* Execute temporary transformations. */
+  if (!execute_transformations (&wc_data->trns_case, t_trns, temp_trns, n_trns,
+                                wc_data->cases_written))
+    goto done;
   
- done:
-  clear_temp_case ();
+  /* FILTER, PROCESS IF, post-TEMPORARY N OF CASES. */
+  if (filter_case (&wc_data->trns_case, wc_data->cases_written)
+      || (dict_get_case_limit (temp_dict)
+          && wc_data->cases_analyzed >= dict_get_case_limit (temp_dict)))
+    goto done;
+  wc_data->cases_analyzed++;
 
-  return 1;
-}
+  /* Pass case to procedure. */
+  if (wc_data->proc_func != NULL)
+    wc_data->proc_func (&wc_data->trns_case, wc_data->aux);
 
-/* Write temp_case to the active file. */
-void
-process_active_file_output_case (void)
-{
-  vfm_sink_info.ncases++;
-  vfm_sink->write ();
+ done:
+  clear_case (&wc_data->trns_case);
+  return 1;
 }
-\f
-/* Opening the active file. */
-
-/* It might be usefully noted that the following several functions are
-   given in the order that they are called by open_active_file(). */
 
-/* Prepare to write to the replacement active file. */
-static void
-prepare_for_writing (void)
+/* Transforms case C using the transformations in TRNS[] with
+   indexes FIRST_IDX through LAST_IDX, exclusive.  Case C will
+   become case CASE_NUM (1-based) in the output file.  Returns
+   zero if the case was filtered out by one of the
+   transformations, nonzero otherwise. */
+static int
+execute_transformations (struct ccase *c,
+                         struct trns_header **trns,
+                         int first_idx, int last_idx,
+                         int case_num) 
 {
-  /* FIXME: If ALL the conditions listed below hold true, then the
-     replacement active file is guaranteed to be identical to the
-     original active file:
-
-     1. TEMPORARY was the first transformation, OR, there were no
-     transformations at all.
-
-     2. Input is not coming from an input program.
-
-     3. Compaction is not necessary.
+  int idx;
 
-     So, in this case, we shouldn't have to replace the active
-     file--it's just a waste of time and space. */
-
-  vfm_sink_info.ncases = 0;
-  vfm_sink_info.nval = dict_get_next_value_idx (default_dict);
-  vfm_sink_info.case_size = dict_get_case_size (default_dict);
-  
-  if (vfm_sink == NULL)
+  for (idx = first_idx; idx != last_idx; )
     {
-      if (vfm_sink_info.case_size * vfm_source_info.ncases > MAX_WORKSPACE
-         && !paging)
-       {
-         msg (MW, _("Workspace overflow predicted.  Max workspace is "
-                    "currently set to %d KB (%d cases at %d bytes each).  "
-                    "Paging active file to disk."),
-              MAX_WORKSPACE / 1024, MAX_WORKSPACE / vfm_sink_info.case_size,
-              vfm_sink_info.case_size);
-         
-         paging = 1;
-       }
-      
-      vfm_sink = paging ? &vfm_disk_stream : &vfm_memory_stream;
+      int retval = trns[idx]->proc (trns[idx], c, case_num);
+      switch (retval)
+        {
+        case -1:
+          idx++;
+          break;
+          
+        case -2:
+          return 0;
+          
+        default:
+          idx = retval;
+          break;
+        }
     }
-}
 
-/* Arrange for compacting the output cases for storage. */
-static void
-arrange_compaction (void)
-{
-  int count_values = 0;
-
-  {
-    int i;
-    
-    /* Count up the number of `value's that will be output. */
-    for (i = 0; i < dict_get_var_cnt (temp_dict); i++) 
-      {
-        struct variable *v = dict_get_var (temp_dict, i);
-
-        if (v->name[0] != '#')
-          {
-            assert (v->nv > 0);
-            count_values += v->nv;
-          } 
-      }
-    assert (temporary == 2
-            || count_values <= dict_get_next_value_idx (temp_dict));
-  }
-  
-  /* Compaction is only necessary if the number of `value's to output
-     differs from the number already present. */
-  compaction_nval = count_values;
-  if (temporary == 2 || count_values != dict_get_next_value_idx (temp_dict))
-    compaction_necessary = 1;
-  else
-    compaction_necessary = 0;
-  
-  if (vfm_sink->init)
-    vfm_sink->init ();
+  return 1;
 }
 
-/* Prepares the temporary case and compaction case. */
-static void
-make_temp_case (void)
+/* Returns nonzero if case C with case number CASE_NUM should be
+   exclude as specified on FILTER or PROCESS IF, otherwise
+   zero. */
+static int
+filter_case (const struct ccase *c, int case_idx)
 {
-  temp_case = xmalloc (vfm_sink_info.case_size);
+  /* FILTER. */
+  struct variable *filter_var = dict_get_filter (default_dict);
+  if (filter_var != NULL) 
+    {
+      double f = case_num (c, filter_var->fv);
+      if (f == 0.0 || f == SYSMIS || is_num_user_missing (f, filter_var))
+        return 1;
+    }
 
-  if (compaction_necessary)
-    compaction_case = xmalloc (sizeof (struct ccase)
-                              + sizeof (union value) * (compaction_nval - 1));
+  /* PROCESS IF. */
+  if (process_if_expr != NULL
+      && expr_evaluate_num (process_if_expr, c, case_idx) != 1.0)
+    return 1;
+
+  return 0;
 }
 
-#if DEBUGGING
-/* Returns the name of the variable that owns the index CCASE_INDEX
-   into ccase. */
-static const char *
-index_to_varname (int ccase_index)
+/* Add C to the lag queue. */
+static void
+lag_case (const struct ccase *c)
 {
-  int i;
-
-  for (i = 0; i < default_dict.nvar; i++)
-    {
-      struct variable *v = default_dict.var[i];
-      
-      if (ccase_index >= v->fv && ccase_index < v->fv + v->nv)
-       return default_dict.var[i]->name;
-    }
-  return _("<NOVAR>");
+  if (lag_count < n_lag)
+    lag_count++;
+  case_destroy (&lag_queue[lag_head]);
+  case_clone (&lag_queue[lag_head], c);
+  if (++lag_head >= n_lag)
+    lag_head = 0;
 }
-#endif
 
-/* Initializes temp_case from the vectors that say which `value's
-   need to be initialized just once, and which ones need to be
-   re-initialized before every case. */
+/* Clears the variables in C that need to be cleared between
+   processing cases.  */
 static void
-vector_initialization (void)
+clear_case (struct ccase *c)
 {
   size_t var_cnt = dict_get_var_cnt (default_dict);
   size_t i;
@@ -380,126 +371,32 @@ vector_initialization (void)
   for (i = 0; i < var_cnt; i++) 
     {
       struct variable *v = dict_get_var (default_dict, i);
-
-      if (v->type == NUMERIC) 
+      if (v->init && v->reinit) 
         {
-          if (v->reinit)
-            temp_case->data[v->fv].f = 0.0;
+          if (v->type == NUMERIC)
+            case_data_rw (c, v->fv)->f = SYSMIS;
           else
-            temp_case->data[v->fv].f = SYSMIS;
-        }
-      else
-        memset (temp_case->data[v->fv].s, ' ', v->width);
+            memset (case_data_rw (c, v->fv)->s, ' ', v->width);
+        } 
     }
 }
 
-/* Sets all the lag-related variables based on value of n_lag. */
-static void
-setup_lag (void)
-{
-  int i;
-  
-  if (n_lag == 0)
-    return;
-
-  lag_count = 0;
-  lag_head = 0;
-  lag_queue = xmalloc (n_lag * sizeof *lag_queue);
-  for (i = 0; i < n_lag; i++)
-    lag_queue[i] = xmalloc (dict_get_case_size (temp_dict));
-}
-
-/* There is a lot of potential confusion in the vfm and related
-   routines over the number of `value's at each stage of the process.
-   Here is each nval count, with explanation, as set up by
-   open_active_file():
-
-   vfm_source_info.nval: Number of `value's in the cases returned by
-   the source stream.  This value turns out not to be very useful, but
-   we maintain it anyway.
-
-   vfm_sink_info.nval: Number of `value's in the cases after all
-   transformations have been performed.  Never less than
-   vfm_source_info.nval.
-
-   temp_dict->nval: Number of `value's in the cases after the
-   transformations leading up to TEMPORARY have been performed.  If
-   TEMPORARY was not specified, this is equal to vfm_sink_info.nval.
-   Never less than vfm_sink_info.nval.
-
-   compaction_nval: Number of `value's in the cases after the
-   transformations leading up to TEMPORARY have been performed and the
-   case has been compacted by compact_case(), if compaction is
-   necessary.  This the number of `value's in the cases saved by the
-   sink stream.  (However, note that the cases passed to the sink
-   stream have not yet been compacted.  It is the responsibility of
-   the data sink to call compact_case().)  This may be less than,
-   greater than, or equal to vfm_source_info.nval.  `compaction'
-   becomes the new value of default_dict.nval after the procedure is
-   completed.
-
-   default_dict.nval: This is often an alias for temp_dict->nval.  As
-   such it can really have no separate existence until the procedure
-   is complete.  For this reason it should *not* be referenced inside
-   the execution of a procedure. */
-/* Makes all preparations for reading from the data source and writing
-   to the data sink. */
-static void
-open_active_file (void)
-{
-  /* Sometimes we want to refer to the dictionary that applies to the
-     data actually written to the sink.  This is either temp_dict or
-     default_dict.  However, if TEMPORARY is not on, then temp_dict
-     does not apply.  So, we can set temp_dict to default_dict in this
-     case. */
-  if (!temporary)
-    {
-      temp_trns = n_trns;
-      temp_dict = default_dict;
-    }
-
-  /* No cases passed to the procedure yet. */
-  case_count = 0;
-
-  /* The rest. */
-  prepare_for_writing ();
-  arrange_compaction ();
-  make_temp_case ();
-  vector_initialization ();
-  discard_ctl_stack ();
-  setup_lag ();
-
-  /* Debug output. */
-  debug_printf (("vfm: reading from %s source, writing to %s sink.\n",
-                vfm_source->name, vfm_sink->name));
-  debug_printf (("vfm: vfm_source_info.nval=%d, vfm_sink_info.nval=%d, "
-                "temp_dict->nval=%d, compaction_nval=%d, "
-                "default_dict.nval=%d\n",
-                vfm_source_info.nval, vfm_sink_info.nval, temp_dict->nval,
-                compaction_nval, default_dict.nval));
-}
-\f
 /* Closes the active file. */
 static void
-close_active_file (struct write_case_data *data)
+close_active_file (void)
 {
-  /* Close the current case group. */
-  if (case_count && data->endfunc != NULL)
-    data->endfunc (data->aux);
-
-  /* Stop lagging (catch up?). */
-  if (n_lag)
+  /* Free memory for lag queue, and turn off lagging. */
+  if (n_lag > 0)
     {
       int i;
       
       for (i = 0; i < n_lag; i++)
-       free (lag_queue[i]);
+       case_destroy (&lag_queue[i]);
       free (lag_queue);
       n_lag = 0;
     }
   
-  /* Assume the dictionary from right before TEMPORARY, if any.  Turn
-     off TEMPORARY. */
+  /* Dictionary from before TEMPORARY becomes permanent.. */
   if (temporary)
     {
       dict_destroy (default_dict);
@@ -509,533 +406,199 @@ close_active_file (struct write_case_data *data)
 
   /* Finish compaction. */
   if (compaction_necessary)
-    finish_compaction ();
+    dict_compact_values (default_dict);
     
-  /* Old data sink --> New data source. */
-  if (vfm_source && vfm_source->destroy_source)
-    vfm_source->destroy_source ();
-  
-  vfm_source = vfm_sink;
-  vfm_source_info.ncases = vfm_sink_info.ncases;
-  vfm_source_info.nval = compaction_nval;
-  vfm_source_info.case_size = (sizeof (struct ccase)
-                              + (compaction_nval - 1) * sizeof (union value));
-  if (vfm_source->mode)
-    vfm_source->mode ();
-
-  /* Old data sink is gone now. */
+  /* Free data source. */
+  free_case_source (vfm_source);
+  vfm_source = NULL;
+
+  /* Old data sink becomes new data source. */
+  if (vfm_sink->class->make_source != NULL)
+    vfm_source = vfm_sink->class->make_source (vfm_sink);
+  free_case_sink (vfm_sink);
   vfm_sink = NULL;
 
-  /* Cancel TEMPORARY. */
+  /* Cancel TEMPORARY, PROCESS IF, FILTER, N OF CASES, vectors,
+     and get rid of all the transformations. */
   cancel_temporary ();
-
-  /* Free temporary cases. */
-  free (temp_case);
-  temp_case = NULL;
-
-  free (compaction_case);
-  compaction_case = NULL;
-
-  /* Cancel PROCESS IF. */
   expr_free (process_if_expr);
   process_if_expr = NULL;
-
-  /* Cancel FILTER if temporary. */
   if (dict_get_filter (default_dict) != NULL && !FILTER_before_TEMPORARY)
     dict_set_filter (default_dict, NULL);
-
-  /* Cancel transformations. */
-  cancel_transformations ();
-
-  /* Turn off case limiter. */
   dict_set_case_limit (default_dict, 0);
-
-  /* Clear VECTOR vectors. */
   dict_clear_vectors (default_dict);
-
-  debug_printf (("vfm: procedure complete\n\n"));
+  cancel_transformations ();
 }
 \f
-/* Disk case stream. */
-
-/* Associated files. */
-FILE *disk_source_file;
-FILE *disk_sink_file;
+/* Storage case stream. */
 
-/* Initializes the disk sink. */
-static void
-disk_stream_init (void)
-{
-  disk_sink_file = tmpfile ();
-  if (!disk_sink_file)
-    {
-      msg (ME, _("An error occurred attempting to create a temporary "
-                "file for use as the active file: %s."),
-          strerror (errno));
-      err_failure ();
-    }
-}
+/* Information about storage sink or source. */
+struct storage_stream_info 
+  {
+    struct casefile *casefile;  /* Storage. */
+  };
 
-/* Reads all cases from the disk source and passes them one by one to
-   write_case(). */
+/* Initializes a storage sink. */
 static void
-disk_stream_read (write_case_func *write_case, write_case_data wc_data)
+storage_sink_open (struct case_sink *sink)
 {
-  int i;
+  struct storage_stream_info *info;
 
-  for (i = 0; i < vfm_source_info.ncases; i++)
-    {
-      if (!fread (temp_case, vfm_source_info.case_size, 1, disk_source_file))
-       {
-         msg (ME, _("An error occurred while attempting to read from "
-              "a temporary file created for the active file: %s."),
-              strerror (errno));
-         err_failure ();
-         return;
-       }
-
-      if (!write_case (wc_data))
-       return;
-    }
+  sink->aux = info = xmalloc (sizeof *info);
+  info->casefile = casefile_create (sink->value_cnt);
 }
 
-/* Writes temp_case to the disk sink. */
+/* Destroys storage stream represented by INFO. */
 static void
-disk_stream_write (void)
+destroy_storage_stream_info (struct storage_stream_info *info) 
 {
-  union value *src_case;
-
-  if (compaction_necessary)
+  if (info != NULL) 
     {
-      compact_case (compaction_case, temp_case);
-      src_case = (union value *) compaction_case;
-    }
-  else src_case = (union value *) temp_case;
-
-  if (fwrite (src_case, sizeof *src_case * compaction_nval, 1,
-             disk_sink_file) != 1)
-    {
-      msg (ME, _("An error occurred while attempting to write to a "
-                "temporary file used as the active file: %s."),
-          strerror (errno));
-      err_failure ();
+      casefile_destroy (info->casefile);
+      free (info); 
     }
 }
 
-/* Switches the stream from a sink to a source. */
+/* Writes case C to the storage sink SINK. */
 static void
-disk_stream_mode (void)
+storage_sink_write (struct case_sink *sink, const struct ccase *c)
 {
-  /* Rewind the sink. */
-  if (fseek (disk_sink_file, 0, SEEK_SET) != 0)
-    {
-      msg (ME, _("An error occurred while attempting to rewind a "
-                "temporary file used as the active file: %s."),
-          strerror (errno));
-      err_failure ();
-    }
-  
-  /* Sink --> source variables. */
-  disk_source_file = disk_sink_file;
+  struct storage_stream_info *info = sink->aux;
+
+  casefile_append (info->casefile, c);
 }
 
-/* Destroys the source's internal data. */
+/* Destroys internal data in SINK. */
 static void
-disk_stream_destroy_source (void)
+storage_sink_destroy (struct case_sink *sink)
 {
-  if (disk_source_file)
-    {
-      fclose (disk_source_file);
-      disk_source_file = NULL;
-    }
+  destroy_storage_stream_info (sink->aux);
 }
 
-/* Destroys the sink's internal data. */
-static void
-disk_stream_destroy_sink (void)
+/* Closes the sink and returns a storage source to read back the
+   written data. */
+static struct case_source *
+storage_sink_make_source (struct case_sink *sink) 
 {
-  if (disk_sink_file)
-    {
-      fclose (disk_sink_file);
-      disk_sink_file = NULL;
-    }
+  struct case_source *source
+    = create_case_source (&storage_source_class, sink->aux);
+  sink->aux = NULL;
+  return source;
 }
 
-/* Disk stream. */
-struct case_stream vfm_disk_stream = 
+/* Storage sink. */
+const struct case_sink_class storage_sink_class = 
   {
-    disk_stream_init,
-    disk_stream_read,
-    disk_stream_write,
-    disk_stream_mode,
-    disk_stream_destroy_source,
-    disk_stream_destroy_sink,
-    "disk",
+    "storage",
+    storage_sink_open,
+    storage_sink_write,
+    storage_sink_destroy,
+    storage_sink_make_source,
   };
 \f
-/* Memory case stream. */
+/* Storage source. */
 
-/* List of cases stored in the stream. */
-struct case_list *memory_source_cases;
-struct case_list *memory_sink_cases;
-
-/* Current case. */
-struct case_list *memory_sink_iter;
-
-/* Maximum number of cases. */
-int memory_sink_max_cases;
-
-/* Initializes the memory stream variables for writing. */
-static void
-memory_stream_init (void)
+/* Returns the number of cases that will be read by
+   storage_source_read(). */
+static int
+storage_source_count (const struct case_source *source) 
 {
-  memory_sink_cases = NULL;
-  memory_sink_iter = NULL;
-  
-  assert (compaction_nval);
-  memory_sink_max_cases = MAX_WORKSPACE / (sizeof (union value) * compaction_nval);
+  struct storage_stream_info *info = source->aux;
+
+  return casefile_get_case_cnt (info->casefile);
 }
 
-/* Reads the case stream from memory and passes it to write_case(). */
+/* Reads all cases from the storage source and passes them one by one to
+   write_case(). */
 static void
-memory_stream_read (write_case_func *write_case, write_case_data wc_data)
+storage_source_read (struct case_source *source,
+                     struct ccase *output_case,
+                     write_case_func *write_case, write_case_data wc_data)
 {
-  while (memory_source_cases != NULL)
+  struct storage_stream_info *info = source->aux;
+  struct ccase casefile_case;
+  struct casereader *reader;
+
+  for (reader = casefile_get_reader (info->casefile);
+       casereader_read (reader, &casefile_case);
+       case_destroy (&casefile_case))
     {
-      memcpy (temp_case, &memory_source_cases->c, vfm_source_info.case_size);
-      
-      {
-       struct case_list *current = memory_source_cases;
-       memory_source_cases = memory_source_cases->next;
-       free (current);
-      }
-      
-      if (!write_case (wc_data))
-       return;
+      case_copy (output_case, 0,
+                 &casefile_case, 0,
+                 casefile_get_value_cnt (info->casefile));
+      write_case (wc_data);
     }
+  casereader_destroy (reader);
 }
 
-/* Writes temp_case to the memory stream. */
+/* Destroys the source's internal data. */
 static void
-memory_stream_write (void)
+storage_source_destroy (struct case_source *source)
 {
-  struct case_list *new_case = malloc (sizeof (struct case_list)
-                                      + ((compaction_nval - 1)
-                                         * sizeof (union value)));
-
-  /* If we've got memory to spare then add it to the linked list. */
-  if (vfm_sink_info.ncases <= memory_sink_max_cases && new_case != NULL)
-    {
-      if (compaction_necessary)
-       compact_case (&new_case->c, temp_case);
-      else
-       memcpy (&new_case->c, temp_case, sizeof (union value) * compaction_nval);
-
-      /* Append case to linked list. */
-      if (memory_sink_cases)
-       memory_sink_iter = memory_sink_iter->next = new_case;
-      else
-       memory_sink_iter = memory_sink_cases = new_case;
-    }
-  else
-    {
-      /* Out of memory.  Write the active file to disk. */
-      struct case_list *cur, *next;
-
-      /* Notify the user. */
-      if (!new_case)
-       msg (MW, _("Virtual memory exhausted.  Paging active file "
-                  "to disk."));
-      else
-       msg (MW, _("Workspace limit of %d KB (%d cases at %d bytes each) "
-                  "overflowed.  Paging active file to disk."),
-            MAX_WORKSPACE / 1024, memory_sink_max_cases,
-            compaction_nval * sizeof (union value));
-
-      free (new_case);
-
-      /* Switch to a disk sink. */
-      vfm_sink = &vfm_disk_stream;
-      vfm_sink->init ();
-      paging = 1;
-
-      /* Terminate the list. */
-      if (memory_sink_iter)
-       memory_sink_iter->next = NULL;
-
-      /* Write the cases to disk and destroy them.  We can't call
-         vfm->sink->write() because of compaction. */
-      for (cur = memory_sink_cases; cur; cur = next)
-       {
-         next = cur->next;
-         if (fwrite (cur->c.data, sizeof (union value) * compaction_nval, 1,
-                     disk_sink_file) != 1)
-           {
-             msg (ME, _("An error occurred while attempting to "
-                        "write to a temporary file created as the "
-                        "active file, while paging to disk: %s."),
-                  strerror (errno));
-             err_failure ();
-           }
-         free (cur);
-       }
-
-      /* Write the current case to disk. */
-      vfm_sink->write ();
-    }
+  destroy_storage_stream_info (source->aux);
 }
 
-/* If the data is stored in memory, causes it to be written to disk.
-   To be called only *between* procedure()s, not within them. */
-void
-page_to_disk (void)
+/* Storage source. */
+const struct case_source_class storage_source_class = 
+  {
+    "storage",
+    storage_source_count,
+    storage_source_read,
+    storage_source_destroy,
+  };
+
+struct casefile *
+storage_source_get_casefile (struct case_source *source) 
 {
-  if (vfm_source == &vfm_memory_stream)
-    {
-      /* Switch to a disk sink. */
-      vfm_sink = &vfm_disk_stream;
-      vfm_sink->init ();
-      paging = 1;
-      
-      /* Write the cases to disk and destroy them.  We can't call
-         vfm->sink->write() because of compaction. */
-      {
-       struct case_list *cur, *next;
-       
-       for (cur = memory_source_cases; cur; cur = next)
-         {
-           next = cur->next;
-           if (fwrite (cur->c.data, sizeof *cur->c.data * compaction_nval, 1,
-                       disk_sink_file) != 1)
-             {
-               msg (ME, _("An error occurred while attempting to "
-                          "write to a temporary file created as the "
-                          "active file, while paging to disk: %s."),
-                    strerror (errno));
-               err_failure ();
-             }
-           free (cur);
-         }
-      }
-      
-      vfm_source = &vfm_disk_stream;
-      vfm_source->mode ();
+  struct storage_stream_info *info = source->aux;
 
-      vfm_sink = NULL;
-    }
+  assert (source->class == &storage_source_class);
+  return info->casefile;
 }
 
-/* Switch the memory stream from sink to source mode. */
-static void
-memory_stream_mode (void)
+struct case_source *
+storage_source_create (struct casefile *cf)
 {
-  /* Terminate the list. */
-  if (memory_sink_iter)
-    memory_sink_iter->next = NULL;
+  struct storage_stream_info *info;
 
-  /* Sink --> source variables. */
-  memory_source_cases = memory_sink_cases;
-  memory_sink_cases = NULL;
-}
+  info = xmalloc (sizeof *info);
+  info->casefile = cf;
 
-/* Destroy all memory source data. */
-static void
-memory_stream_destroy_source (void)
-{
-  struct case_list *cur, *next;
-  
-  for (cur = memory_source_cases; cur; cur = next)
-    {
-      next = cur->next;
-      free (cur);
-    }
-  memory_source_cases = NULL;
+  return create_case_source (&storage_source_class, info);
 }
+\f
+/* Null sink.  Used by a few procedures that keep track of output
+   themselves and would throw away anything that the sink
+   contained anyway. */
 
-/* Destroy all memory sink data. */
-static void
-memory_stream_destroy_sink (void)
-{
-  struct case_list *cur, *next;
-  
-  for (cur = memory_sink_cases; cur; cur = next)
-    {
-      next = cur->next;
-      free (cur);
-    }
-  memory_sink_cases = NULL;
-}
-  
-/* Memory stream. */
-struct case_stream vfm_memory_stream = 
+const struct case_sink_class null_sink_class = 
   {
-    memory_stream_init,
-    memory_stream_read,
-    memory_stream_write,
-    memory_stream_mode,
-    memory_stream_destroy_source,
-    memory_stream_destroy_sink,
-    "memory",
+    "null",
+    NULL,
+    NULL,
+    NULL,
+    NULL,
   };
 \f
-#include "debug-print.h"
-
-/* Add temp_case to the lag queue. */
-static void
-lag_case (void)
-{
-  if (lag_count < n_lag)
-    lag_count++;
-  memcpy (lag_queue[lag_head], temp_case,
-          dict_get_case_size (temp_dict));
-  if (++lag_head >= n_lag)
-    lag_head = 0;
-}
-
 /* Returns a pointer to the lagged case from N_BEFORE cases before the
    current one, or NULL if there haven't been that many cases yet. */
 struct ccase *
 lagged_case (int n_before)
 {
+  assert (n_before >= 1 );
   assert (n_before <= n_lag);
-  if (n_before > lag_count)
-    return NULL;
-  
-  {
-    int index = lag_head - n_before;
-    if (index < 0)
-      index += n_lag;
-    return lag_queue[index];
-  }
-}
-   
-/* Transforms temp_case and writes it to the replacement active file
-   if advisable.  Returns nonzero if more cases can be accepted, zero
-   otherwise.  Do not call this function again after it has returned
-   zero once.  */
-int
-procedure_write_case (write_case_data wc_data)
-{
-  /* Index of current transformation. */
-  int cur_trns;
-
-  /* Return value: whether it's reasonable to write any more cases. */
-  int more_cases = 1;
 
-  debug_printf ((_("transform: ")));
-
-  cur_trns = f_trns;
-  for (;;)
-    {
-      /* Output the case if this is temp_trns. */
-      if (cur_trns == temp_trns)
-       {
-         debug_printf (("REC"));
-
-         if (n_lag)
-           lag_case ();
-         
-         vfm_sink_info.ncases++;
-         vfm_sink->write ();
-
-         if (dict_get_case_limit (default_dict))
-           more_cases = (vfm_sink_info.ncases
-                          < dict_get_case_limit (default_dict));
-       }
-
-      /* Are we done? */
-      if (cur_trns >= n_trns)
-       break;
-      
-      debug_printf (("$%d", cur_trns));
-
-      /* Decide which transformation should come next. */
-      {
-       int code;
-       
-       code = t_trns[cur_trns]->proc (t_trns[cur_trns], temp_case);
-       switch (code)
-         {
-         case -1:
-           /* Next transformation. */
-           cur_trns++;
-           break;
-         case -2:
-           /* Delete this case. */
-           goto done;
-         default:
-           /* Go to that transformation. */
-           cur_trns = code;
-           break;
-         }
-      }
-    }
-
-  /* Call the beginning of group function. */
-  if (!case_count && wc_data->beginfunc != NULL)
-    wc_data->beginfunc (wc_data->aux);
-
-  /* Call the procedure if there is one and FILTER and PROCESS IF
-     don't prohibit it. */
-  if (wc_data->procfunc != NULL && !exclude_this_case ())
-    wc_data->procfunc (temp_case, wc_data->aux);
-
-  case_count++;
-  
-done:
-  debug_putc ('\n', stdout);
-
-  clear_temp_case ();
-  
-  /* Return previously determined value. */
-  return more_cases;
-}
-
-/* Clears the variables in the temporary case that need to be
-   cleared between processing cases.  */
-static void
-clear_temp_case (void)
-{
-  /* FIXME?  This is linear in the number of variables, but
-     doesn't need to be, so it's an easy optimization target. */
-  size_t var_cnt = dict_get_var_cnt (default_dict);
-  size_t i;
-  
-  for (i = 0; i < var_cnt; i++) 
-    {
-      struct variable *v = dict_get_var (default_dict, i);
-      if (v->init && v->reinit) 
-        {
-          if (v->type == NUMERIC) 
-            temp_case->data[v->fv].f = SYSMIS;
-          else
-            memset (temp_case->data[v->fv].s, ' ', v->width);
-        } 
-    }
-}
-
-/* Returns nonzero if this case should be exclude as specified on
-   FILTER or PROCESS IF, otherwise zero. */
-static int
-exclude_this_case (void)
-{
-  /* FILTER. */
-  struct variable *filter_var = dict_get_filter (default_dict);
-  if (filter_var != NULL) 
+  if (n_before <= lag_count)
     {
-      double f = temp_case->data[filter_var->fv].f;
-      if (f == 0.0 || f == SYSMIS || is_num_user_missing (f, filter_var))
-        return 1;
+      int index = lag_head - n_before;
+      if (index < 0)
+        index += n_lag;
+      return &lag_queue[index];
     }
-
-  /* PROCESS IF. */
-  if (process_if_expr != NULL
-      && expr_evaluate (process_if_expr, temp_case, NULL) != 1.0)
-    return 1;
-
-  return 0;
+  else
+    return NULL;
 }
-
+   
 /* Appends TRNS to t_trns[], the list of all transformations to be
    performed on data as it is read from the active file. */
 void
@@ -1063,12 +626,169 @@ cancel_transformations (void)
       free (t_trns[i]);
     }
   n_trns = f_trns = 0;
-  if (m_trns > 32)
+  free (t_trns);
+  t_trns = NULL;
+  m_trns = 0;
+}
+\f
+/* Creates a case source with class CLASS and auxiliary data AUX
+   and based on dictionary DICT. */
+struct case_source *
+create_case_source (const struct case_source_class *class,
+                    void *aux) 
+{
+  struct case_source *source = xmalloc (sizeof *source);
+  source->class = class;
+  source->aux = aux;
+  return source;
+}
+
+/* Destroys case source SOURCE.  It is the caller's responsible to
+   call the source's destroy function, if any. */
+void
+free_case_source (struct case_source *source) 
+{
+  if (source != NULL) 
+    {
+      if (source->class->destroy != NULL)
+        source->class->destroy (source);
+      free (source);
+    }
+}
+
+/* Returns nonzero if a case source is "complex". */
+int
+case_source_is_complex (const struct case_source *source) 
+{
+  return source != NULL && (source->class == &input_program_source_class
+                            || source->class == &file_type_source_class);
+}
+
+/* Returns nonzero if CLASS is the class of SOURCE. */
+int
+case_source_is_class (const struct case_source *source,
+                      const struct case_source_class *class) 
+{
+  return source != NULL && source->class == class;
+}
+
+/* Creates a case sink to accept cases from the given DICT with
+   class CLASS and auxiliary data AUX. */
+struct case_sink *
+create_case_sink (const struct case_sink_class *class,
+                  const struct dictionary *dict,
+                  void *aux) 
+{
+  struct case_sink *sink = xmalloc (sizeof *sink);
+  sink->class = class;
+  sink->value_cnt = dict_get_compacted_value_cnt (dict);
+  sink->aux = aux;
+  return sink;
+}
+
+/* Destroys case sink SINK.  */
+void
+free_case_sink (struct case_sink *sink) 
+{
+  if (sink != NULL) 
     {
-      free (t_trns);
-      m_trns = 0;
+      if (sink->class->destroy != NULL)
+        sink->class->destroy (sink);
+      free (sink); 
     }
 }
+\f
+/* Represents auxiliary data for handling SPLIT FILE. */
+struct split_aux_data 
+  {
+    size_t case_count;          /* Number of cases so far. */
+    struct ccase prev_case;     /* Data in previous case. */
+
+    /* Functions to call... */
+    void (*begin_func) (void *);               /* ...before data. */
+    int (*proc_func) (struct ccase *, void *); /* ...with data. */
+    void (*end_func) (void *);                 /* ...after data. */
+    void *func_aux;                            /* Auxiliary data. */ 
+  };
+
+static int equal_splits (const struct ccase *, const struct ccase *);
+static int procedure_with_splits_callback (struct ccase *, void *);
+static void dump_splits (struct ccase *);
+
+/* Like procedure(), but it automatically breaks the case stream
+   into SPLIT FILE break groups.  Before each group of cases with
+   identical SPLIT FILE variable values, BEGIN_FUNC is called.
+   Then PROC_FUNC is called with each case in the group.  
+   END_FUNC is called when the group is finished.  FUNC_AUX is
+   passed to each of the functions as auxiliary data.
+
+   If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
+   and END_FUNC will be called at all. 
+
+   If SPLIT FILE is not in effect, then there is one break group
+   (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
+   will be called once. */
+void
+procedure_with_splits (void (*begin_func) (void *aux),
+                       int (*proc_func) (struct ccase *, void *aux),
+                       void (*end_func) (void *aux),
+                       void *func_aux) 
+{
+  struct split_aux_data split_aux;
+
+  split_aux.case_count = 0;
+  case_nullify (&split_aux.prev_case);
+  split_aux.begin_func = begin_func;
+  split_aux.proc_func = proc_func;
+  split_aux.end_func = end_func;
+  split_aux.func_aux = func_aux;
+
+  open_active_file ();
+  internal_procedure (procedure_with_splits_callback, &split_aux);
+  if (split_aux.case_count > 0 && end_func != NULL)
+    end_func (func_aux);
+  close_active_file ();
+
+  case_destroy (&split_aux.prev_case);
+}
+
+/* procedure() callback used by procedure_with_splits(). */
+static int
+procedure_with_splits_callback (struct ccase *c, void *split_aux_) 
+{
+  struct split_aux_data *split_aux = split_aux_;
+
+  /* Start a new series if needed. */
+  if (split_aux->case_count == 0
+      || !equal_splits (c, &split_aux->prev_case))
+    {
+      if (split_aux->case_count > 0 && split_aux->end_func != NULL)
+        split_aux->end_func (split_aux->func_aux);
+
+      dump_splits (c);
+      case_destroy (&split_aux->prev_case);
+      case_clone (&split_aux->prev_case, c);
+
+      if (split_aux->begin_func != NULL)
+       split_aux->begin_func (split_aux->func_aux);
+    }
+
+  split_aux->case_count++;
+  if (split_aux->proc_func != NULL)
+    return split_aux->proc_func (c, split_aux->func_aux);
+  else
+    return 1;
+}
+
+/* Compares the SPLIT FILE variables in cases A and B and returns
+   nonzero only if they differ. */
+static int
+equal_splits (const struct ccase *a, const struct ccase *b) 
+{
+  return case_compare (a, b,
+                       dict_get_split_vars (default_dict),
+                       dict_get_split_cnt (default_dict)) == 0;
+}
 
 /* Dumps out the values of all the split variables for the case C. */
 static void
@@ -1080,6 +800,9 @@ dump_splits (struct ccase *c)
   int i;
 
   split_cnt = dict_get_split_cnt (default_dict);
+  if (split_cnt == 0)
+    return;
+
   t = tab_create (3, split_cnt + 1, 0);
   tab_dim (t, tab_natural_dimensions);
   tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, split_cnt);
@@ -1097,141 +820,90 @@ dump_splits (struct ccase *c)
       assert (v->type == NUMERIC || v->type == ALPHA);
       tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
       
-      data_out (temp_buf, &v->print, &c->data[v->fv]);
+      data_out (temp_buf, &v->print, case_data (c, v->fv));
       
       temp_buf[v->print.w] = 0;
       tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
 
-      val_lab = val_labs_find (v->val_labs, c->data[v->fv]);
+      val_lab = val_labs_find (v->val_labs, *case_data (c, v->fv));
       if (val_lab)
        tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
     }
   tab_flags (t, SOMF_NO_TITLE);
   tab_submit (t);
 }
+\f
+/* Represents auxiliary data for handling SPLIT FILE in a
+   multipass procedure. */
+struct multipass_split_aux_data 
+  {
+    struct ccase prev_case;     /* Data in previous case. */
+    struct casefile *casefile;  /* Accumulates data for a split. */
 
-/* This procfunc is substituted for the user-supplied procfunc when
-   SPLIT FILE is active.  This function forms a wrapper around that
-   procfunc by dividing the input into series. */
-static int
-SPLIT_FILE_procfunc (struct ccase *c, void *data_)
+    /* Function to call with the accumulated data. */
+    void (*split_func) (const struct casefile *, void *);
+    void *func_aux;                            /* Auxiliary data. */ 
+  };
+
+static int multipass_split_callback (struct ccase *c, void *aux_);
+static void multipass_split_output (struct multipass_split_aux_data *);
+
+void
+multipass_procedure_with_splits (void (*split_func) (const struct casefile *,
+                                                     void *),
+                                 void *func_aux) 
 {
-  struct write_case_data *data = data_;
-  static struct ccase *prev_case;
-  struct variable *const *split;
-  size_t split_cnt;
-  size_t i;
+  struct multipass_split_aux_data aux;
 
-  /* The first case always begins a new series.  We also need to
-     preserve the values of the case for later comparison. */
-  if (case_count == 0)
-    {
-      if (prev_case)
-       free (prev_case);
-      prev_case = xmalloc (vfm_sink_info.case_size);
-      memcpy (prev_case, c, vfm_sink_info.case_size);
+  assert (split_func != NULL);
 
-      dump_splits (c);
-      if (data->beginfunc != NULL)
-       data->beginfunc (data->aux);
-      
-      return data->procfunc (c, data->aux);
-    }
+  open_active_file ();
 
-  /* Compare the value of each SPLIT FILE variable to the values on
-     the previous case. */
-  split = dict_get_split_vars (default_dict);
-  split_cnt = dict_get_split_cnt (default_dict);
-  for (i = 0; i < split_cnt; i++)
-    {
-      struct variable *v = split[i];
-      
-      switch (v->type)
-       {
-       case NUMERIC:
-         if (approx_ne (c->data[v->fv].f, prev_case->data[v->fv].f))
-           goto not_equal;
-         break;
-       case ALPHA:
-         if (memcmp (c->data[v->fv].s, prev_case->data[v->fv].s, v->width))
-           goto not_equal;
-         break;
-       default:
-         assert (0);
-       }
-    }
-  return data->procfunc (c, data->aux);
-  
-not_equal:
-  /* The values of the SPLIT FILE variable are different from the
-     values on the previous case.  That means that it's time to begin
-     a new series. */
-  if (data->endfunc != NULL)
-    data->endfunc (data->aux);
-  dump_splits (c);
-  if (data->beginfunc != NULL)
-    data->beginfunc (data->aux);
-  memcpy (prev_case, c, vfm_sink_info.case_size);
-  return data->procfunc (c, data->aux);
+  case_nullify (&aux.prev_case);
+  aux.casefile = NULL;
+  aux.split_func = split_func;
+  aux.func_aux = func_aux;
+
+  internal_procedure (multipass_split_callback, &aux);
+  if (aux.casefile != NULL)
+    multipass_split_output (&aux);
+  case_destroy (&aux.prev_case);
+
+  close_active_file ();
 }
-\f
-/* Case compaction. */
 
-/* Copies case SRC to case DEST, compacting it in the process. */
-void
-compact_case (struct ccase *dest, const struct ccase *src)
+/* procedure() callback used by multipass_procedure_with_splits(). */
+static int
+multipass_split_callback (struct ccase *c, void *aux_)
 {
-  int i;
-  int nval = 0;
-  size_t var_cnt;
-  
-  assert (compaction_necessary);
+  struct multipass_split_aux_data *aux = aux_;
 
-  if (temporary == 2)
+  /* Start a new series if needed. */
+  if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case))
     {
-      if (dest != compaction_case)
-       memcpy (dest, compaction_case, sizeof (union value) * compaction_nval);
-      return;
-    }
+      /* Pass any cases to split_func. */
+      if (aux->casefile != NULL)
+        multipass_split_output (aux);
 
-  /* Copy all the variables except the scratch variables from SRC to
-     DEST. */
-  var_cnt = dict_get_var_cnt (default_dict);
-  for (i = 0; i < var_cnt; i++)
-    {
-      struct variable *v = dict_get_var (default_dict, i);
-      
-      if (v->name[0] == '#')
-       continue;
+      /* Start a new casefile. */
+      aux->casefile = casefile_create (dict_get_next_value_idx (default_dict));
 
-      if (v->type == NUMERIC)
-       dest->data[nval++] = src->data[v->fv];
-      else
-       {
-         int w = DIV_RND_UP (v->width, sizeof (union value));
-         
-         memcpy (&dest->data[nval], &src->data[v->fv], w * sizeof (union value));
-         nval += w;
-       }
+      /* Record split values. */
+      dump_splits (c);
+      case_destroy (&aux->prev_case);
+      case_clone (&aux->prev_case, c);
     }
+
+  casefile_append (aux->casefile, c);
+
+  return 1;
 }
 
-/* Reassigns `fv' for each variable.  Deletes scratch variables. */
 static void
-finish_compaction (void)
+multipass_split_output (struct multipass_split_aux_data *aux)
 {
-  int i;
-
-  for (i = 0; i < dict_get_var_cnt (default_dict); )
-    {
-      struct variable *v = dict_get_var (default_dict, i);
-
-      if (v->name[0] == '#') 
-        dict_delete_var (default_dict, v);
-      else
-        i++;
-    }
-  dict_compact_values (default_dict);
+  assert (aux->casefile != NULL);
+  aux->split_func (aux->casefile, aux->func_aux);
+  casefile_destroy (aux->casefile);
+  aux->casefile = NULL;
 }
-
-