Eliminate temp_case, and a few other cleanups.
[pspp-builds.git] / src / vfm.c
index 333851d61176071a6e7b90ad9d62f7d9e9adf354..07bfaad30a33d9d58d842fbbfb2e33d6d404a53a 100644 (file)
--- a/src/vfm.c
+++ b/src/vfm.c
    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
    02111-1307, USA. */
 
-/* AIX requires this to be the first thing in the file.  */
 #include <config.h>
-#if __GNUC__
-#define alloca __builtin_alloca
-#else
-#if HAVE_ALLOCA_H
-#include <alloca.h>
-#else
-#ifdef _AIX
-#pragma alloca
-#else
-#ifndef alloca                 /* predefined by HP cc +Olibcalls */
-char *alloca ();
-#endif
-#endif
-#endif
-#endif
-
+#include "vfm.h"
+#include "vfmP.h"
 #include <assert.h>
 #include <errno.h>
 #include <stdio.h>
@@ -43,88 +28,66 @@ char *alloca ();
 #include <unistd.h>    /* Required by SunOS4. */
 #endif
 #include "alloc.h"
-#include "approx.h"
 #include "do-ifP.h"
 #include "error.h"
 #include "expr.h"
 #include "misc.h"
 #include "random.h"
+#include "settings.h"
 #include "som.h"
 #include "str.h"
 #include "tab.h"
 #include "var.h"
-#include "vector.h"
-#include "vfm.h"
-#include "vfmP.h"
+#include "value-labels.h"
 
 /*
    Virtual File Manager (vfm):
 
-   vfm is used to process data files.  It uses the model that data is
-   read from one stream (the data source), then written to another
-   (the data sink).  The data source is then deleted and the data sink
-   becomes the data source for the next procedure. */
-
-#undef DEBUGGING
-/*#define DEBUGGING 1 */
-#include "debug-print.h"
+   vfm is used to process data files.  It uses the model that
+   data is read from one stream (the data source), processed,
+   then written to another (the data sink).  The data source is
+   then deleted and the data sink becomes the data source for the
+   next procedure. */
 
-/* This is used to read from the active file. */
-struct case_stream *vfm_source;
-
-/* `value' indexes to initialize to particular values for certain cases. */
-struct long_vec reinit_sysmis;         /* SYSMIS for every case. */
-struct long_vec reinit_blanks;         /* Blanks for every case. */
-struct long_vec init_zero;             /* Zero for first case only. */
-struct long_vec init_blanks;           /* Blanks for first case only. */
-
-/* This is used to write to the replacement active file. */
-struct case_stream *vfm_sink;
-
-/* Information about the data source. */
-struct stream_info vfm_source_info;
-
-/* Information about the data sink. */
-struct stream_info vfm_sink_info;
+/* Procedure execution data. */
+struct write_case_data
+  {
+    /* Functions to call... */
+    void (*begin_func) (void *);               /* ...before data. */
+    int (*proc_func) (struct ccase *, void *); /* ...with data. */
+    void (*end_func) (void *);                 /* ...after data. */
+    void *func_aux;                            /* Auxiliary data. */ 
+
+    /* Extra auxiliary data. */
+    void *aux;
+  };
 
-/* Filter variable and  `value' index. */
-static struct variable *filter_var;
-static int filter_index;
+/* The current active file, from which cases are read. */
+struct case_source *vfm_source;
 
-#define FILTERED                                                       \
-       (filter_index != -1                                             \
-        && (temp_case->data[filter_index].f == 0.0                     \
-            || temp_case->data[filter_index].f == SYSMIS               \
-            || is_num_user_missing (temp_case->data[filter_index].f,   \
-                                    filter_var)))
+/* The replacement active file, to which cases are written. */
+struct case_sink *vfm_sink;
 
 /* Nonzero if the case needs to have values deleted before being
    stored, zero otherwise. */
 int compaction_necessary;
 
-/* Number of values after compaction, or the same as
-   vfm_sink_info.nval, if compaction is not necessary. */
+/* Number of values after compaction. */
 int compaction_nval;
 
 /* Temporary case buffer with enough room for `compaction_nval'
    `value's. */
 struct ccase *compaction_case;
 
-/* Within a session, when paging is turned on, it is never turned back
-   off.  This policy might be too aggressive. */
-static int paging = 0;
+/* Nonzero means that we've overflowed our allotted workspace.
+   After that happens once during a session, we always store the
+   active file on disk instead of in memory.  (This policy may be
+   too aggressive.) */
+static int workspace_overflow = 0;
 
 /* Time at which vfm was last invoked. */
 time_t last_vfm_invocation;
 
-/* Functions called during procedure processing. */
-static int (*proc_func) (struct ccase *);      /* Called for each case. */
-static int (*virt_proc_func) (struct ccase *); /* From SPLIT_FILE_procfunc. */
-static void (*begin_func) (void);      /* Called at beginning of a series. */
-static void (*virt_begin_func) (void); /* Called by SPLIT_FILE_procfunc. */
-static void (*end_func) (void);        /* Called after end of a series. */
-int (*write_case) (void);
-
 /* Number of cases passed to proc_func(). */
 static int case_count;
 
@@ -134,97 +97,158 @@ static int lag_count;             /* Number of cases in lag_queue so far. */
 static int lag_head;           /* Index where next case will be added. */
 static struct ccase **lag_queue; /* Array of n_lag ccase * elements. */
 
+static struct ccase *create_trns_case (struct dictionary *dict);
 static void open_active_file (void);
-static void close_active_file (void);
-static int SPLIT_FILE_procfunc (struct ccase *);
+static void close_active_file (struct write_case_data *);
+static int SPLIT_FILE_proc_func (struct ccase *, void *);
 static void finish_compaction (void);
-static void lag_case (void);
-static int procedure_write_case (void);
+static void lag_case (const struct ccase *);
+static write_case_func procedure_write_case;
+static void clear_case (struct ccase *);
+static int exclude_this_case (const struct ccase *, int case_num);
 \f
 /* Public functions. */
 
-/* Reads all the cases from the active file, transforms them by the
-   active set of transformations, calls PROCFUNC with CURCASE set to
-   the case and CASENUM set to the case number, and writes them to a
-   new active file.
+/* Auxiliary data for executing a procedure. */
+struct procedure_aux_data 
+  {
+    struct ccase *trns_case;    /* Case used for transformations. */
+    size_t cases_written;       /* Number of cases written so far. */
+  };
+
+/* Auxiliary data for SPLIT FILE. */
+struct split_aux_data 
+  {
+    struct ccase *prev_case;    /* Data in previous case. */
+  };
+
+/* Reads all the cases from the active file, transforms them by
+   the active set of transformations, passes each of them to
+   PROC_FUNC, and writes them to a new active file.
 
    Divides the active file into zero or more series of one or more
-   cases each.  BEGINFUNC is called before each series.  ENDFUNC is
-   called after each series. */
+   cases each.  BEGIN_FUNC is called before each series.  END_FUNC is
+   called after each series.
+
+   Arbitrary user-specified data AUX is passed to BEGIN_FUNC,
+   PROC_FUNC, and END_FUNC as auxiliary data. */
 void
-procedure (void (*beginfunc) (void),
-          int (*procfunc) (struct ccase *curcase),
-          void (*endfunc) (void))
+procedure (void (*begin_func) (void *),
+          int (*proc_func) (struct ccase *, void *),
+          void (*end_func) (void *),
+           void *func_aux)
 {
-  end_func = endfunc;
-  write_case = procedure_write_case;
+  static int recursive_call;
+
+  struct write_case_data procedure_write_data;
+  struct procedure_aux_data proc_aux;
+
+  struct write_case_data split_file_data;
+  struct split_aux_data split_aux;
+  int split;
+
+  assert (++recursive_call == 1);
+
+  proc_aux.cases_written = 0;
+  proc_aux.trns_case = create_trns_case (default_dict);
 
-  if (default_dict.n_splits && procfunc != NULL)
+  /* Normally we just use the data passed by the user. */
+  procedure_write_data.begin_func = begin_func;
+  procedure_write_data.proc_func = proc_func;
+  procedure_write_data.end_func = end_func;
+  procedure_write_data.func_aux = func_aux;
+  procedure_write_data.aux = &proc_aux;
+
+  /* Under SPLIT FILE, we add a layer of indirection. */
+  split = dict_get_split_cnt (default_dict) > 0;
+  if (split) 
     {
-      virt_proc_func = procfunc;
-      proc_func = SPLIT_FILE_procfunc;
-      
-      virt_begin_func = beginfunc;
-      begin_func = NULL;
-    } else {
-      begin_func = beginfunc;
-      proc_func = procfunc;
+      split_file_data = procedure_write_data;
+      split_file_data.aux = &split_aux;
+
+      split_aux.prev_case = xmalloc (dict_get_case_size (default_dict));
+
+      procedure_write_data.begin_func = NULL;
+      procedure_write_data.proc_func = SPLIT_FILE_proc_func;
+      procedure_write_data.end_func = end_func;
+      procedure_write_data.func_aux = &split_file_data;
     }
 
   last_vfm_invocation = time (NULL);
 
   open_active_file ();
-  vfm_source->read ();
-  close_active_file ();
+  if (vfm_source != NULL) 
+    vfm_source->class->read (vfm_source,
+                             proc_aux.trns_case,
+                             procedure_write_case, &procedure_write_data);
+  close_active_file (&procedure_write_data);
+
+  if (split)
+    free (split_aux.prev_case);
+
+  free (proc_aux.trns_case);
+
+  assert (--recursive_call == 0);
 }
 \f
 /* Active file processing support.  Subtly different semantics from
    procedure(). */
 
-static int process_active_file_write_case (void);
+static write_case_func process_active_file_write_case;
 
-/* The casefunc might want us to stop calling it. */
+/* The case_func might want us to stop calling it. */
 static int not_canceled;
 
-/* Reads all the cases from the active file and passes them one-by-one
-   to CASEFUNC in temp_case.  Before any cases are passed, calls
-   BEGINFUNC.  After all the cases have been passed, calls ENDFUNC.
-   BEGINFUNC, CASEFUNC, and ENDFUNC can write temp_case to the output
-   file by calling process_active_file_output_case().
+/* Reads all the cases from the active file and passes them
+   one-by-one to CASE_FUNC.  Before any cases are passed, calls
+   BEGIN_FUNC.  After all the cases have been passed, calls
+   END_FUNC.  BEGIN_FUNC, CASE_FUNC, and END_FUNC can write to
+   the output file by calling process_active_file_output_case().
 
    process_active_file() ignores TEMPORARY, SPLIT FILE, and N. */
 void
-process_active_file (void (*beginfunc) (void),
-                    int (*casefunc) (struct ccase *curcase),
-                    void (*endfunc) (void))
+process_active_file (void (*begin_func) (void *),
+                    int (*case_func) (struct ccase *, void *),
+                    void (*end_func) (void *),
+                     void *func_aux)
 {
-  proc_func = casefunc;
-  write_case = process_active_file_write_case;
+  struct procedure_aux_data proc_aux;
+  struct write_case_data process_active_write_data;
+
+  proc_aux.cases_written = 0;
+  proc_aux.trns_case = create_trns_case (default_dict);
+
+  process_active_write_data.begin_func = begin_func;
+  process_active_write_data.proc_func = case_func;
+  process_active_write_data.end_func = end_func;
+  process_active_write_data.func_aux = func_aux;
+  process_active_write_data.aux = &proc_aux;
+
   not_canceled = 1;
 
   open_active_file ();
-  beginfunc ();
-  
-  /* There doesn't necessarily need to be an active file. */
-  if (vfm_source)
-    vfm_source->read ();
-  
-  endfunc ();
-  close_active_file ();
+  begin_func (func_aux);
+  if (vfm_source != NULL)
+    vfm_source->class->read (vfm_source, proc_aux.trns_case,
+                             process_active_file_write_case,
+                             &process_active_write_data);
+  end_func (func_aux);
+  close_active_file (&process_active_write_data);
 }
 
-/* Pass the current case to casefunc. */
+/* Pass the current case to case_func. */
 static int
-process_active_file_write_case (void)
+process_active_file_write_case (struct write_case_data *wc_data)
 {
-  /* Index of current transformation. */
-  int cur_trns;
+  struct procedure_aux_data *proc_aux = wc_data->aux;
+  int cur_trns;         /* Index of current transformation. */
 
-  for (cur_trns = f_trns ; cur_trns != temp_trns; )
+  for (cur_trns = f_trns; cur_trns != temp_trns; )
     {
       int code;
        
-      code = t_trns[cur_trns]->proc (t_trns[cur_trns], temp_case);
+      code = t_trns[cur_trns]->proc (t_trns[cur_trns], proc_aux->trns_case,
+                                     case_count + 1);
       switch (code)
        {
        case -1:
@@ -242,37 +266,52 @@ process_active_file_write_case (void)
     }
 
   if (n_lag)
-    lag_case ();
+    lag_case (proc_aux->trns_case);
          
   /* Call the procedure if FILTER and PROCESS IF don't prohibit it. */
-  if (not_canceled
-      && !FILTERED
-      && (process_if_expr == NULL ||
-         expr_evaluate (process_if_expr, temp_case, NULL) == 1.0))
-    not_canceled = proc_func (temp_case);
+  if (not_canceled && !exclude_this_case (proc_aux->trns_case, case_count + 1))
+    not_canceled = wc_data->proc_func (proc_aux->trns_case, wc_data->func_aux);
   
   case_count++;
   
  done:
-  {
-    long *lp;
+  clear_case (proc_aux->trns_case);
 
-    /* This case is finished.  Initialize the variables for the next case. */
-    for (lp = reinit_sysmis.vec; *lp != -1;)
-      temp_case->data[*lp++].f = SYSMIS;
-    for (lp = reinit_blanks.vec; *lp != -1;)
-      memset (temp_case->data[*lp++].s, ' ', MAX_SHORT_STRING);
-  }
-  
   return 1;
 }
 
-/* Write temp_case to the active file. */
+/* Write the given case to the active file. */
 void
-process_active_file_output_case (void)
+process_active_file_output_case (const struct ccase *c)
 {
-  vfm_sink_info.ncases++;
-  vfm_sink->write ();
+  vfm_sink->class->write (vfm_sink, c);
+}
+\f
+/* Creates and returns a case, initializing it from the vectors
+   that say which `value's need to be initialized just once, and
+   which ones need to be re-initialized before every case. */
+static struct ccase *
+create_trns_case (struct dictionary *dict)
+{
+  struct ccase *c = xmalloc (dict_get_case_size (dict));
+  size_t var_cnt = dict_get_var_cnt (dict);
+  size_t i;
+
+  for (i = 0; i < var_cnt; i++) 
+    {
+      struct variable *v = dict_get_var (dict, i);
+
+      if (v->type == NUMERIC) 
+        {
+          if (v->reinit)
+            c->data[v->fv].f = 0.0;
+          else
+            c->data[v->fv].f = SYSMIS;
+        }
+      else
+        memset (c->data[v->fv].s, ' ', v->width);
+    }
+  return c;
 }
 \f
 /* Opening the active file. */
@@ -284,40 +323,12 @@ process_active_file_output_case (void)
 static void
 prepare_for_writing (void)
 {
-  /* FIXME: If ALL the conditions listed below hold true, then the
-     replacement active file is guaranteed to be identical to the
-     original active file:
-
-     1. TEMPORARY was the first transformation, OR, there were no
-     transformations at all.
-
-     2. Input is not coming from an input program.
-
-     3. Compaction is not necessary.
-
-     So, in this case, we shouldn't have to replace the active
-     file--it's just a waste of time and space. */
-
-  vfm_sink_info.ncases = 0;
-  vfm_sink_info.nval = default_dict.nval;
-  vfm_sink_info.case_size = (sizeof (struct ccase)
-                            + (default_dict.nval - 1) * sizeof (union value));
-  
   if (vfm_sink == NULL)
     {
-      if (vfm_sink_info.case_size * vfm_source_info.ncases > MAX_WORKSPACE
-         && !paging)
-       {
-         msg (MW, _("Workspace overflow predicted.  Max workspace is "
-                    "currently set to %d KB (%d cases at %d bytes each).  "
-                    "Paging active file to disk."),
-              MAX_WORKSPACE / 1024, MAX_WORKSPACE / vfm_sink_info.case_size,
-              vfm_sink_info.case_size);
-         
-         paging = 1;
-       }
-      
-      vfm_sink = paging ? &vfm_disk_stream : &vfm_memory_stream;
+      if (workspace_overflow)
+        vfm_sink = create_case_sink (&disk_sink_class, NULL);
+      else
+        vfm_sink = create_case_sink (&memory_sink_class, NULL);
     }
 }
 
@@ -331,50 +342,35 @@ arrange_compaction (void)
     int i;
     
     /* Count up the number of `value's that will be output. */
-    for (i = 0; i < temp_dict->nvar; i++)
-      if (temp_dict->var[i]->name[0] != '#')
-       {
-         assert (temp_dict->var[i]->nv > 0);
-         count_values += temp_dict->var[i]->nv;
-       }
-    assert (temporary == 2 || count_values <= temp_dict->nval);
+    for (i = 0; i < dict_get_var_cnt (temp_dict); i++) 
+      {
+        struct variable *v = dict_get_var (temp_dict, i);
+
+        if (v->name[0] != '#')
+          {
+            assert (v->nv > 0);
+            count_values += v->nv;
+          } 
+      }
+    assert (temporary == 2
+            || count_values <= dict_get_next_value_idx (temp_dict));
   }
   
   /* Compaction is only necessary if the number of `value's to output
      differs from the number already present. */
   compaction_nval = count_values;
-  compaction_necessary = temporary == 2 || count_values != temp_dict->nval;
+  if (temporary == 2 || count_values != dict_get_next_value_idx (temp_dict))
+    compaction_necessary = 1;
+  else
+    compaction_necessary = 0;
   
-  if (vfm_sink->init)
-    vfm_sink->init ();
-}
-
-/* Prepares the temporary case and compaction case. */
-static void
-make_temp_case (void)
-{
-  temp_case = xmalloc (vfm_sink_info.case_size);
+  if (vfm_sink->class->open != NULL)
+    vfm_sink->class->open (vfm_sink);
 
   if (compaction_necessary)
     compaction_case = xmalloc (sizeof (struct ccase)
                               + sizeof (union value) * (compaction_nval - 1));
-  
-#if __CHECKER__
-  /* Initialize the unused trailing parts of string variables to avoid
-     spurious warnings from Checker. */
-  {
-    int i;
-    
-    for (i = 0; i < default_dict.nvar; i++)
-      {
-       struct variable *v = default_dict.var[i];
-      
-       if (v->type == ALPHA && v->width % 8 != 0)
-         memcpy (&temp_case->data[v->fv + v->nv - 1]
-                 .s[v->width % 8], _("!ERROR!"), 8 - v->width % 8);
-      }
-  }
-#endif
+
 }
 
 #if DEBUGGING
@@ -387,7 +383,7 @@ index_to_varname (int ccase_index)
 
   for (i = 0; i < default_dict.nvar; i++)
     {
-      variable *v = default_dict.var[i];
+      struct variable *v = default_dict.var[i];
       
       if (ccase_index >= v->fv && ccase_index < v->fv + v->nv)
        return default_dict.var[i]->name;
@@ -396,70 +392,6 @@ index_to_varname (int ccase_index)
 }
 #endif
 
-/* Initializes temp_case from the vectors that say which `value's need
-   to be initialized just once, and which ones need to be
-   re-initialized before every case. */
-static void
-vector_initialization (void)
-{
-  int i;
-  long *lp;
-  
-  /* Just once. */
-  for (i = 0; i < init_zero.n; i++)
-    temp_case->data[init_zero.vec[i]].f = 0.0;
-  for (i = 0; i < init_blanks.n; i++)
-    memset (temp_case->data[init_blanks.vec[i]].s, ' ', MAX_SHORT_STRING);
-
-  /* These vectors need to be repeatedly accessed, so we add a
-     sentinel to (hopefully) improve speed. */
-  vec_insert (&reinit_sysmis, -1);
-  vec_insert (&reinit_blanks, -1);
-
-  for (lp = reinit_sysmis.vec; *lp != -1;)
-    temp_case->data[*lp++].f = SYSMIS;
-  for (lp = reinit_blanks.vec; *lp != -1;)
-    memset (temp_case->data[*lp++].s, ' ', MAX_SHORT_STRING);
-  
-#if DEBUGGING
-  printf ("vfm: init_zero=");
-  for (i = 0; i < init_zero.n; i++)
-    printf ("%s%s", i ? "," : "", index_to_varname (init_zero.vec[i]));
-  printf (" init_blanks=");
-  for (i = 0; i < init_blanks.n; i++)
-    printf ("%s%s", i ? "," : "", index_to_varname (init_blanks.vec[i]));
-  printf (" reinit_sysmis=");
-  for (lp = reinit_sysmis.vec; *lp != -1; lp++)
-    printf ("%s%s", lp != reinit_sysmis.vec ? "," : "",
-           index_to_varname (*lp));
-  printf (" reinit_blanks=");
-  for (lp = reinit_blanks.vec; *lp != -1; lp++)
-    printf ("%s%s", lp != reinit_blanks.vec ? "," : "",
-           index_to_varname (*lp));
-  printf ("\n");
-#endif
-}
-
-/* Sets filter_index to an appropriate value. */
-static void
-setup_filter (void)
-{
-  filter_index = -1;
-  
-  if (default_dict.filter_var[0])
-    {
-      struct variable *fv = find_variable (default_dict.filter_var);
-      
-      if (fv == NULL || fv->type == ALPHA)
-       default_dict.filter_var[0] = 0;
-      else
-       {
-         filter_index = fv->index;
-         filter_var = fv;
-       }
-    }
-}
-
 /* Sets all the lag-related variables based on value of n_lag. */
 static void
 setup_lag (void)
@@ -473,7 +405,7 @@ setup_lag (void)
   lag_head = 0;
   lag_queue = xmalloc (n_lag * sizeof *lag_queue);
   for (i = 0; i < n_lag; i++)
-    lag_queue[i] = xmalloc (temp_dict->nval * sizeof **lag_queue);
+    lag_queue[i] = xmalloc (dict_get_case_size (temp_dict));
 }
 
 /* There is a lot of potential confusion in the vfm and related
@@ -481,34 +413,23 @@ setup_lag (void)
    Here is each nval count, with explanation, as set up by
    open_active_file():
 
-   vfm_source_info.nval: Number of `value's in the cases returned by
-   the source stream.  This value turns out not to be very useful, but
-   we maintain it anyway.
-
-   vfm_sink_info.nval: Number of `value's in the cases after all
-   transformations have been performed.  Never less than
-   vfm_source_info.nval.
-
    temp_dict->nval: Number of `value's in the cases after the
-   transformations leading up to TEMPORARY have been performed.  If
-   TEMPORARY was not specified, this is equal to vfm_sink_info.nval.
-   Never less than vfm_sink_info.nval.
+   transformations leading up to TEMPORARY have been performed.
 
    compaction_nval: Number of `value's in the cases after the
-   transformations leading up to TEMPORARY have been performed and the
-   case has been compacted by compact_case(), if compaction is
-   necessary.  This the number of `value's in the cases saved by the
-   sink stream.  (However, note that the cases passed to the sink
-   stream have not yet been compacted.  It is the responsibility of
-   the data sink to call compact_case().)  This may be less than,
-   greater than, or equal to vfm_source_info.nval.  `compaction'
-   becomes the new value of default_dict.nval after the procedure is
-   completed.
-
-   default_dict.nval: This is often an alias for temp_dict->nval.  As
-   such it can really have no separate existence until the procedure
-   is complete.  For this reason it should *not* be referenced inside
-   the execution of a procedure. */
+   transformations leading up to TEMPORARY have been performed
+   and the case has been compacted by compact_case(), if
+   compaction is necessary.  This the number of `value's in the
+   cases saved by the sink stream.  (However, note that the cases
+   passed to the sink stream have not yet been compacted.  It is
+   the responsibility of the data sink to call compact_case().)
+   `compaction' becomes the new value of default_dict.nval after
+   the procedure is completed.
+
+   default_dict.nval: This is often an alias for temp_dict->nval.
+   As such it can really have no separate existence until the
+   procedure is complete.  For this reason it should *not* be
+   referenced inside the execution of a procedure. */
 /* Makes all preparations for reading from the data source and writing
    to the data sink. */
 static void
@@ -522,7 +443,7 @@ open_active_file (void)
   if (!temporary)
     {
       temp_trns = n_trns;
-      temp_dict = &default_dict;
+      temp_dict = default_dict;
     }
 
   /* No cases passed to the procedure yet. */
@@ -531,30 +452,17 @@ open_active_file (void)
   /* The rest. */
   prepare_for_writing ();
   arrange_compaction ();
-  make_temp_case ();
-  vector_initialization ();
-  setup_randomize ();
   discard_ctl_stack ();
-  setup_filter ();
   setup_lag ();
-
-  /* Debug output. */
-  debug_printf (("vfm: reading from %s source, writing to %s sink.\n",
-                vfm_source->name, vfm_sink->name));
-  debug_printf (("vfm: vfm_source_info.nval=%d, vfm_sink_info.nval=%d, "
-                "temp_dict->nval=%d, compaction_nval=%d, "
-                "default_dict.nval=%d\n",
-                vfm_source_info.nval, vfm_sink_info.nval, temp_dict->nval,
-                compaction_nval, default_dict.nval));
 }
 \f
 /* Closes the active file. */
 static void
-close_active_file (void)
+close_active_file (struct write_case_data *data)
 {
   /* Close the current case group. */
-  if (case_count && end_func != NULL)
-    end_func ();
+  if (case_count && data->end_func != NULL)
+    data->end_func (data->func_aux);
 
   /* Stop lagging (catch up?). */
   if (n_lag)
@@ -571,37 +479,36 @@ close_active_file (void)
      off TEMPORARY. */
   if (temporary)
     {
-      restore_dictionary (temp_dict);
+      dict_destroy (default_dict);
+      default_dict = temp_dict;
       temp_dict = NULL;
     }
 
-  /* The default dictionary assumes the compacted data size. */
-  default_dict.nval = compaction_nval;
+  /* Finish compaction. */
+  if (compaction_necessary)
+    finish_compaction ();
     
   /* Old data sink --> New data source. */
-  if (vfm_source && vfm_source->destroy_source)
-    vfm_source->destroy_source ();
-  
-  vfm_source = vfm_sink;
-  vfm_source_info.ncases = vfm_sink_info.ncases;
-  vfm_source_info.nval = compaction_nval;
-  vfm_source_info.case_size = (sizeof (struct ccase)
-                              + (compaction_nval - 1) * sizeof (union value));
-  if (vfm_source->mode)
-    vfm_source->mode ();
+  if (vfm_source != NULL) 
+    {
+      if (vfm_source->class->destroy != NULL)
+        vfm_source->class->destroy (vfm_source);
+      free (vfm_source);
+    }
+
+  if (vfm_sink->class->make_source != NULL)
+    vfm_source = vfm_sink->class->make_source (vfm_sink);
+  else
+    vfm_source = NULL;
 
   /* Old data sink is gone now. */
+  free (vfm_sink);
   vfm_sink = NULL;
 
-  /* Finish compaction. */
-  if (compaction_necessary)
-    finish_compaction ();
+  /* Cancel TEMPORARY. */
   cancel_temporary ();
 
   /* Free temporary cases. */
-  free (temp_case);
-  temp_case = NULL;
-
   free (compaction_case);
   compaction_case = NULL;
 
@@ -610,47 +517,39 @@ close_active_file (void)
   process_if_expr = NULL;
 
   /* Cancel FILTER if temporary. */
-  if (filter_index != -1 && !FILTER_before_TEMPORARY)
-    default_dict.filter_var[0] = 0;
+  if (dict_get_filter (default_dict) != NULL && !FILTER_before_TEMPORARY)
+    dict_set_filter (default_dict, NULL);
 
   /* Cancel transformations. */
   cancel_transformations ();
 
-  /* Clear value-initialization vectors. */
-  vec_clear (&init_zero);
-  vec_clear (&init_blanks);
-  vec_clear (&reinit_sysmis);
-  vec_clear (&reinit_blanks);
-
   /* Turn off case limiter. */
-  default_dict.N = 0;
+  dict_set_case_limit (default_dict, 0);
 
   /* Clear VECTOR vectors. */
-  {
-    int i;
-
-    for (i = 0; i < nvec; i++)
-      free (vec[i].v);
-    free (vec);
-    vec = NULL;
-    nvec = 0;
-  }
-
-  debug_printf (("vfm: procedure complete\n\n"));
+  dict_clear_vectors (default_dict);
 }
 \f
 /* Disk case stream. */
 
-/* Associated files. */
-FILE *disk_source_file;
-FILE *disk_sink_file;
+/* Information about disk sink or source. */
+struct disk_stream_info 
+  {
+    FILE *file;                 /* Output file. */
+    size_t case_cnt;            /* Number of cases written so far. */
+    size_t case_size;           /* Number of bytes in case. */
+  };
 
 /* Initializes the disk sink. */
 static void
-disk_stream_init (void)
+disk_sink_create (struct case_sink *sink)
 {
-  disk_sink_file = tmpfile ();
-  if (!disk_sink_file)
+  struct disk_stream_info *info = xmalloc (sizeof *info);
+  info->file = tmpfile ();
+  info->case_cnt = 0;
+  info->case_size = compaction_nval;
+  sink->aux = info;
+  if (info->file == NULL)
     {
       msg (ME, _("An error occurred attempting to create a temporary "
                 "file for use as the active file: %s."),
@@ -659,44 +558,23 @@ disk_stream_init (void)
     }
 }
 
-/* Reads all cases from the disk source and passes them one by one to
-   write_case(). */
+/* Writes case C to the disk sink. */
 static void
-disk_stream_read (void)
+disk_sink_write (struct case_sink *sink, const struct ccase *c)
 {
-  int i;
-
-  for (i = 0; i < vfm_source_info.ncases; i++)
-    {
-      if (!fread (temp_case, vfm_source_info.case_size, 1, disk_source_file))
-       {
-         msg (ME, _("An error occurred while attempting to read from "
-              "a temporary file created for the active file: %s."),
-              strerror (errno));
-         err_failure ();
-         return;
-       }
-
-      if (!write_case ())
-       return;
-    }
-}
-
-/* Writes temp_case to the disk sink. */
-static void
-disk_stream_write (void)
-{
-  union value *src_case;
+  struct disk_stream_info *info = sink->aux;
+  const union value *src_case;
 
   if (compaction_necessary)
     {
-      compact_case (compaction_case, temp_case);
-      src_case = (union value *) compaction_case;
+      compact_case (compaction_case, c);
+      src_case = compaction_case->data;
     }
-  else src_case = (union value *) temp_case;
+  else src_case = c->data;
 
+  info->case_cnt++;
   if (fwrite (src_case, sizeof *src_case * compaction_nval, 1,
-             disk_sink_file) != 1)
+              info->file) != 1)
     {
       msg (ME, _("An error occurred while attempting to write to a "
                 "temporary file used as the active file: %s."),
@@ -705,12 +583,25 @@ disk_stream_write (void)
     }
 }
 
-/* Switches the stream from a sink to a source. */
+/* Destroys the sink's internal data. */
 static void
-disk_stream_mode (void)
+disk_sink_destroy (struct case_sink *sink)
+{
+  struct disk_stream_info *info = sink->aux;
+  if (info->file != NULL)
+    fclose (info->file);
+}
+
+/* Closes and destroys the sink and returns a disk source to read
+   back the written data. */
+static struct case_source *
+disk_sink_make_source (struct case_sink *sink) 
 {
-  /* Rewind the sink. */
-  if (fseek (disk_sink_file, 0, SEEK_SET) != 0)
+  struct disk_stream_info *info = sink->aux;
+    
+  /* Rewind the file. */
+  assert (info->file != NULL);
+  if (fseek (info->file, 0, SEEK_SET) != 0)
     {
       msg (ME, _("An error occurred while attempting to rewind a "
                 "temporary file used as the active file: %s."),
@@ -718,107 +609,141 @@ disk_stream_mode (void)
       err_failure ();
     }
   
-  /* Sink --> source variables. */
-  disk_source_file = disk_sink_file;
+  return create_case_source (&disk_source_class, default_dict, info);
 }
 
-/* Destroys the source's internal data. */
+/* Disk sink. */
+const struct case_sink_class disk_sink_class = 
+  {
+    "disk",
+    disk_sink_create,
+    disk_sink_write,
+    disk_sink_destroy,
+    disk_sink_make_source,
+  };
+\f
+/* Disk source. */
+
+/* Returns the number of cases that will be read by
+   disk_source_read(). */
+static int
+disk_source_count (const struct case_source *source) 
+{
+  struct disk_stream_info *info = source->aux;
+
+  return info->case_cnt;
+}
+
+/* Reads all cases from the disk source and passes them one by one to
+   write_case(). */
 static void
-disk_stream_destroy_source (void)
+disk_source_read (struct case_source *source,
+                  struct ccase *c,
+                  write_case_func *write_case, write_case_data wc_data)
 {
-  if (disk_source_file)
+  struct disk_stream_info *info = source->aux;
+  int i;
+
+  for (i = 0; i < info->case_cnt; i++)
     {
-      fclose (disk_source_file);
-      disk_source_file = NULL;
+      if (!fread (c, info->case_size, 1, info->file))
+       {
+         msg (ME, _("An error occurred while attempting to read from "
+              "a temporary file created for the active file: %s."),
+              strerror (errno));
+         err_failure ();
+         break;
+       }
+
+      if (!write_case (wc_data))
+       break;
     }
 }
 
-/* Destroys the sink's internal data. */
+/* Destroys the source's internal data. */
 static void
-disk_stream_destroy_sink (void)
+disk_source_destroy (struct case_source *source)
 {
-  if (disk_sink_file)
-    {
-      fclose (disk_sink_file);
-      disk_sink_file = NULL;
-    }
+  struct disk_stream_info *info = source->aux;
+  if (info->file != NULL)
+    fclose (info->file);
+  free (info);
 }
 
-/* Disk stream. */
-struct case_stream vfm_disk_stream = 
+/* Disk source. */
+const struct case_source_class disk_source_class = 
   {
-    disk_stream_init,
-    disk_stream_read,
-    disk_stream_write,
-    disk_stream_mode,
-    disk_stream_destroy_source,
-    disk_stream_destroy_sink,
     "disk",
+    disk_source_count,
+    disk_source_read,
+    disk_source_destroy,
   };
 \f
 /* Memory case stream. */
 
-/* List of cases stored in the stream. */
-struct case_list *memory_source_cases;
-struct case_list *memory_sink_cases;
-
-/* Current case. */
-struct case_list *memory_sink_iter;
+/* Memory sink data. */
+struct memory_sink_info
+  {
+    size_t case_cnt;            /* Number of cases. */
+    size_t case_size;           /* Case size in bytes. */
+    int max_cases;              /* Maximum cases before switching to disk. */
+    struct case_list *head;     /* First case in list. */
+    struct case_list *tail;     /* Last case in list. */
+  };
 
-/* Maximum number of cases. */
-int memory_sink_max_cases;
+/* Memory source data. */
+struct memory_source_info 
+  {
+    size_t case_cnt;            /* Number of cases. */
+    size_t case_size;           /* Case size in bytes. */
+    struct case_list *cases;    /* List of cases. */
+  };
 
-/* Initializes the memory stream variables for writing. */
+/* Creates the SINK memory sink. */
 static void
-memory_stream_init (void)
+memory_sink_create (struct case_sink *sink) 
 {
-  memory_sink_cases = NULL;
-  memory_sink_iter = NULL;
+  struct memory_sink_info *info;
   
-  assert (compaction_nval);
-  memory_sink_max_cases = MAX_WORKSPACE / (sizeof (union value) * compaction_nval);
-}
+  sink->aux = info = xmalloc (sizeof *info);
 
-/* Reads the case stream from memory and passes it to write_case(). */
-static void
-memory_stream_read (void)
-{
-  while (memory_source_cases != NULL)
-    {
-      memcpy (temp_case, &memory_source_cases->c, vfm_source_info.case_size);
-      
-      {
-       struct case_list *current = memory_source_cases;
-       memory_source_cases = memory_source_cases->next;
-       free (current);
-      }
-      
-      if (!write_case ())
-       return;
-    }
+  assert (compaction_nval > 0);
+  info->case_cnt = 0;
+  info->case_size = compaction_nval * sizeof (union value);
+  info->max_cases = set_max_workspace / info->case_size;
+  info->head = info->tail = NULL;
 }
 
-/* Writes temp_case to the memory stream. */
+/* Writes case C to memory sink SINK. */
 static void
-memory_stream_write (void)
+memory_sink_write (struct case_sink *sink, const struct ccase *c) 
 {
-  struct case_list *new_case = malloc (sizeof (struct case_list)
-                                      + ((compaction_nval - 1)
-                                         * sizeof (union value)));
+  struct memory_sink_info *info = sink->aux;
+  size_t case_size;
+  struct case_list *new_case;
+
+  case_size = sizeof (struct case_list)
+                      + ((compaction_nval - 1) * sizeof (union value));
+  new_case = malloc (case_size);
 
   /* If we've got memory to spare then add it to the linked list. */
-  if (vfm_sink_info.ncases <= memory_sink_max_cases && new_case != NULL)
+  if (info->case_cnt <= info->max_cases && new_case != NULL)
     {
-      if (compaction_necessary)
-       compact_case (&new_case->c, temp_case);
-      else
-       memcpy (&new_case->c, temp_case, sizeof (union value) * compaction_nval);
+      info->case_cnt++;
 
       /* Append case to linked list. */
-      if (memory_sink_cases)
-       memory_sink_iter = memory_sink_iter->next = new_case;
+      new_case->next = NULL;
+      if (info->head != NULL)
+        info->tail->next = new_case;
       else
-       memory_sink_iter = memory_sink_cases = new_case;
+        info->head = new_case;
+      info->tail = new_case;
+
+      /* Copy data into case. */
+      if (compaction_necessary)
+       compact_case (&new_case->c, c);
+      else
+       memcpy (&new_case->c, c, sizeof (union value) * compaction_nval);
     }
   else
     {
@@ -827,36 +752,32 @@ memory_stream_write (void)
 
       /* Notify the user. */
       if (!new_case)
-       msg (MW, _("Virtual memory exhausted.  Paging active file "
+       msg (MW, _("Virtual memory exhausted.  Writing active file "
                   "to disk."));
       else
        msg (MW, _("Workspace limit of %d KB (%d cases at %d bytes each) "
-                  "overflowed.  Paging active file to disk."),
-            MAX_WORKSPACE / 1024, memory_sink_max_cases,
+                  "overflowed.  Writing active file to disk."),
+            set_max_workspace / 1024, info->max_cases,
             compaction_nval * sizeof (union value));
 
       free (new_case);
 
       /* Switch to a disk sink. */
-      vfm_sink = &vfm_disk_stream;
-      vfm_sink->init ();
-      paging = 1;
-
-      /* Terminate the list. */
-      if (memory_sink_iter)
-       memory_sink_iter->next = NULL;
+      vfm_sink = create_case_sink (&disk_sink_class, NULL);
+      vfm_sink->class->open (vfm_sink);
+      workspace_overflow = 1;
 
       /* Write the cases to disk and destroy them.  We can't call
          vfm->sink->write() because of compaction. */
-      for (cur = memory_sink_cases; cur; cur = next)
+      for (cur = info->head; cur; cur = next)
        {
          next = cur->next;
          if (fwrite (cur->c.data, sizeof (union value) * compaction_nval, 1,
-                     disk_sink_file) != 1)
+                     vfm_sink->aux) != 1)
            {
              msg (ME, _("An error occurred while attempting to "
                         "write to a temporary file created as the "
-                        "active file, while paging to disk: %s."),
+                        "active file: %s."),
                   strerror (errno));
              err_failure ();
            }
@@ -864,36 +785,38 @@ memory_stream_write (void)
        }
 
       /* Write the current case to disk. */
-      vfm_sink->write ();
+      vfm_sink->class->write (vfm_sink, c);
     }
 }
 
 /* If the data is stored in memory, causes it to be written to disk.
    To be called only *between* procedure()s, not within them. */
 void
-page_to_disk (void)
+write_active_file_to_disk (void)
 {
-  if (vfm_source == &vfm_memory_stream)
+  if (case_source_is_class (vfm_source, &memory_source_class))
     {
+      struct memory_source_info *info = vfm_source->aux;
+
       /* Switch to a disk sink. */
-      vfm_sink = &vfm_disk_stream;
-      vfm_sink->init ();
-      paging = 1;
+      vfm_sink = create_case_sink (&disk_sink_class, NULL);
+      vfm_sink->class->open (vfm_sink);
+      workspace_overflow = 1;
       
       /* Write the cases to disk and destroy them.  We can't call
          vfm->sink->write() because of compaction. */
       {
        struct case_list *cur, *next;
        
-       for (cur = memory_source_cases; cur; cur = next)
+       for (cur = info->cases; cur; cur = next)
          {
            next = cur->next;
            if (fwrite (cur->c.data, sizeof *cur->c.data * compaction_nval, 1,
-                       disk_sink_file) != 1)
+                       vfm_sink->aux) != 1)
              {
                msg (ME, _("An error occurred while attempting to "
                           "write to a temporary file created as the "
-                          "active file, while paging to disk: %s."),
+                          "active file: %s."),
                     strerror (errno));
                err_failure ();
              }
@@ -901,76 +824,132 @@ page_to_disk (void)
          }
       }
       
-      vfm_source = &vfm_disk_stream;
-      vfm_source->mode ();
-
+      vfm_source = vfm_sink->class->make_source (vfm_sink);
       vfm_sink = NULL;
     }
 }
 
-/* Switch the memory stream from sink to source mode. */
+/* Destroy all memory sink data. */
 static void
-memory_stream_mode (void)
+memory_sink_destroy (struct case_sink *sink)
 {
-  /* Terminate the list. */
-  if (memory_sink_iter)
-    memory_sink_iter->next = NULL;
+  struct memory_sink_info *info = sink->aux;
+  struct case_list *cur, *next;
+  
+  for (cur = info->head; cur; cur = next)
+    {
+      next = cur->next;
+      free (cur);
+    }
+  free (info);
+}
+
+/* Switch the memory stream from sink to source mode. */
+static struct case_source *
+memory_sink_make_source (struct case_sink *sink)
+{
+  struct memory_sink_info *sink_info = sink->aux;
+  struct memory_source_info *source_info;
 
-  /* Sink --> source variables. */
-  memory_source_cases = memory_sink_cases;
-  memory_sink_cases = NULL;
+  source_info = xmalloc (sizeof *source_info);
+  source_info->case_cnt = sink_info->case_cnt;
+  source_info->case_size = sink_info->case_size;
+  source_info->cases = sink_info->head;
+
+  free (sink_info);
+
+  return create_case_source (&memory_source_class,
+                             default_dict, source_info);
 }
 
-/* Destroy all memory source data. */
+const struct case_sink_class memory_sink_class = 
+  {
+    "memory",
+    memory_sink_create,
+    memory_sink_write,
+    memory_sink_destroy,
+    memory_sink_make_source,
+  };
+
+/* Returns the number of cases in the source. */
+static int
+memory_source_count (const struct case_source *source) 
+{
+  struct memory_source_info *info = source->aux;
+
+  return info->case_cnt;
+}
+
+/* Reads the case stream from memory and passes it to write_case(). */
 static void
-memory_stream_destroy_source (void)
+memory_source_read (struct case_source *source,
+                    struct ccase *c,
+                    write_case_func *write_case, write_case_data wc_data)
 {
-  struct case_list *cur, *next;
-  
-  for (cur = memory_source_cases; cur; cur = next)
+  struct memory_source_info *info = source->aux;
+
+  while (info->cases != NULL) 
     {
-      next = cur->next;
-      free (cur);
+      struct case_list *iter = info->cases;
+      memcpy (c, &iter->c, info->case_size);
+      if (!write_case (wc_data)) 
+        break;
+            
+      info->cases = iter->next;
+      free (iter);
     }
-  memory_source_cases = NULL;
 }
 
-/* Destroy all memory sink data. */
+/* Destroy all memory source data. */
 static void
-memory_stream_destroy_sink (void)
+memory_source_destroy (struct case_source *source)
 {
+  struct memory_source_info *info = source->aux;
   struct case_list *cur, *next;
   
-  for (cur = memory_sink_cases; cur; cur = next)
+  for (cur = info->cases; cur; cur = next)
     {
       next = cur->next;
       free (cur);
     }
-  memory_sink_cases = NULL;
+  free (info);
 }
-  
+
+/* Returns the list of cases in memory source SOURCE. */
+struct case_list *
+memory_source_get_cases (const struct case_source *source) 
+{
+  struct memory_source_info *info = source->aux;
+
+  return info->cases;
+}
+
+/* Sets the list of cases in memory source SOURCE to CASES. */
+void
+memory_source_set_cases (const struct case_source *source,
+                         struct case_list *cases) 
+{
+  struct memory_source_info *info = source->aux;
+
+  info->cases = cases;
+}
+
 /* Memory stream. */
-struct case_stream vfm_memory_stream = 
+const struct case_source_class memory_source_class = 
   {
-    memory_stream_init,
-    memory_stream_read,
-    memory_stream_write,
-    memory_stream_mode,
-    memory_stream_destroy_source,
-    memory_stream_destroy_sink,
     "memory",
+    memory_source_count,
+    memory_source_read,
+    memory_source_destroy,
   };
 \f
-#undef DEBUGGING
-#include "debug-print.h"
-
-/* Add temp_case to the lag queue. */
+/* Add C to the lag queue. */
 static void
-lag_case (void)
+lag_case (const struct ccase *c)
 {
   if (lag_count < n_lag)
     lag_count++;
-  memcpy (lag_queue[lag_head], temp_case, sizeof (union value) * temp_dict->nval);
+  memcpy (lag_queue[lag_head], c, dict_get_case_size (temp_dict));
   if (++lag_head >= n_lag)
     lag_head = 0;
 }
@@ -992,50 +971,50 @@ lagged_case (int n_before)
   }
 }
    
-/* Transforms temp_case and writes it to the replacement active file
-   if advisable.  Returns nonzero if more cases can be accepted, zero
-   otherwise.  Do not call this function again after it has returned
-   zero once.  */
+/* Transforms trns_case and writes it to the replacement active
+   file if advisable.  Returns nonzero if more cases can be
+   accepted, zero otherwise.  Do not call this function again
+   after it has returned zero once.  */
 int
-procedure_write_case (void)
+procedure_write_case (write_case_data wc_data)
 {
+  struct procedure_aux_data *proc_aux = wc_data->aux;
+
   /* Index of current transformation. */
   int cur_trns;
 
   /* Return value: whether it's reasonable to write any more cases. */
   int more_cases = 1;
 
-  debug_printf ((_("transform: ")));
-
   cur_trns = f_trns;
   for (;;)
     {
       /* Output the case if this is temp_trns. */
       if (cur_trns == temp_trns)
        {
-         debug_printf (("REC"));
+          int case_limit;
 
          if (n_lag)
-           lag_case ();
+           lag_case (proc_aux->trns_case);
          
-         vfm_sink_info.ncases++;
-         vfm_sink->write ();
+         vfm_sink->class->write (vfm_sink, proc_aux->trns_case);
 
-         if (default_dict.N)
-           more_cases = vfm_sink_info.ncases < default_dict.N;
+          proc_aux->cases_written++;
+          case_limit = dict_get_case_limit (default_dict);
+         if (case_limit != 0 && proc_aux->cases_written >= case_limit)
+            more_cases = 0;
        }
 
       /* Are we done? */
       if (cur_trns >= n_trns)
        break;
       
-      debug_printf (("$%d", cur_trns));
-
       /* Decide which transformation should come next. */
       {
        int code;
        
-       code = t_trns[cur_trns]->proc (t_trns[cur_trns], temp_case);
+       code = t_trns[cur_trns]->proc (t_trns[cur_trns], proc_aux->trns_case,
+                                       proc_aux->cases_written + 1);
        switch (code)
          {
          case -1:
@@ -1054,36 +1033,70 @@ procedure_write_case (void)
     }
 
   /* Call the beginning of group function. */
-  if (!case_count && begin_func != NULL)
-    begin_func ();
+  if (!case_count && wc_data->begin_func != NULL)
+    wc_data->begin_func (wc_data->func_aux);
 
   /* Call the procedure if there is one and FILTER and PROCESS IF
      don't prohibit it. */
-  if (proc_func != NULL
-      && !FILTERED
-      && (process_if_expr == NULL ||
-         expr_evaluate (process_if_expr, temp_case, NULL) == 1.0))
-    proc_func (temp_case);
+  if (wc_data->proc_func != NULL
+      && !exclude_this_case (proc_aux->trns_case, proc_aux->cases_written + 1))
+    wc_data->proc_func (proc_aux->trns_case, wc_data->func_aux);
 
   case_count++;
   
 done:
-  debug_putc ('\n', stdout);
-  
-  {
-    long *lp;
-
-    /* This case is finished.  Initialize the variables for the next case. */
-    for (lp = reinit_sysmis.vec; *lp != -1;)
-      temp_case->data[*lp++].f = SYSMIS;
-    for (lp = reinit_blanks.vec; *lp != -1;)
-      memset (temp_case->data[*lp++].s, ' ', MAX_SHORT_STRING);
-  }
+  clear_case (proc_aux->trns_case);
   
   /* Return previously determined value. */
   return more_cases;
 }
 
+/* Clears the variables in C that need to be cleared between
+   processing cases.  */
+static void
+clear_case (struct ccase *c)
+{
+  /* FIXME?  This is linear in the number of variables, but
+     doesn't need to be, so it's an easy optimization target. */
+  size_t var_cnt = dict_get_var_cnt (default_dict);
+  size_t i;
+  
+  for (i = 0; i < var_cnt; i++) 
+    {
+      struct variable *v = dict_get_var (default_dict, i);
+      if (v->init && v->reinit) 
+        {
+          if (v->type == NUMERIC) 
+            c->data[v->fv].f = SYSMIS;
+          else
+            memset (c->data[v->fv].s, ' ', v->width);
+        } 
+    }
+}
+
+/* Returns nonzero if case C with case number CASE_NUM should be
+   exclude as specified on FILTER or PROCESS IF, otherwise
+   zero. */
+static int
+exclude_this_case (const struct ccase *c, int case_num)
+{
+  /* FILTER. */
+  struct variable *filter_var = dict_get_filter (default_dict);
+  if (filter_var != NULL) 
+    {
+      double f = c->data[filter_var->fv].f;
+      if (f == 0.0 || f == SYSMIS || is_num_user_missing (f, filter_var))
+        return 1;
+    }
+
+  /* PROCESS IF. */
+  if (process_if_expr != NULL
+      && expr_evaluate (process_if_expr, c, case_num, NULL) != 1.0)
+    return 1;
+
+  return 0;
+}
+
 /* Appends TRNS to t_trns[], the list of all transformations to be
    performed on data as it is read from the active file. */
 void
@@ -1122,37 +1135,35 @@ cancel_transformations (void)
 static void
 dump_splits (struct ccase *c)
 {
-  struct variable **iter;
+  struct variable *const *split;
   struct tab_table *t;
+  size_t split_cnt;
   int i;
 
-  t = tab_create (3, default_dict.n_splits + 1, 0);
+  split_cnt = dict_get_split_cnt (default_dict);
+  t = tab_create (3, split_cnt + 1, 0);
   tab_dim (t, tab_natural_dimensions);
-  tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, default_dict.n_splits);
-  tab_vline (t, TAL_1 | TAL_SPACING, 2, 0, default_dict.n_splits);
+  tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, split_cnt);
+  tab_vline (t, TAL_1 | TAL_SPACING, 2, 0, split_cnt);
   tab_text (t, 0, 0, TAB_NONE, _("Variable"));
   tab_text (t, 1, 0, TAB_LEFT, _("Value"));
   tab_text (t, 2, 0, TAB_LEFT, _("Label"));
-  for (iter = default_dict.splits, i = 0; *iter; iter++, i++)
+  split = dict_get_split_vars (default_dict);
+  for (i = 0; i < split_cnt; i++)
     {
-      struct variable *v = *iter;
+      struct variable *v = split[i];
       char temp_buf[80];
-      char *val_lab;
+      const char *val_lab;
 
       assert (v->type == NUMERIC || v->type == ALPHA);
       tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
       
-      {
-       union value val = c->data[v->fv];
-       if (v->type == ALPHA)
-         val.c = c->data[v->fv].s;
-       data_out (temp_buf, &v->print, &val);
-      }
+      data_out (temp_buf, &v->print, &c->data[v->fv]);
       
       temp_buf[v->print.w] = 0;
       tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
 
-      val_lab = get_val_lab (v, c->data[v->fv], 0);
+      val_lab = val_labs_find (v->val_labs, c->data[v->fv]);
       if (val_lab)
        tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
     }
@@ -1160,64 +1171,67 @@ dump_splits (struct ccase *c)
   tab_submit (t);
 }
 
-/* This procfunc is substituted for the user-supplied procfunc when
+/* This proc_func is substituted for the user-supplied proc_func when
    SPLIT FILE is active.  This function forms a wrapper around that
-   procfunc by dividing the input into series. */
+   proc_func by dividing the input into series. */
 static int
-SPLIT_FILE_procfunc (struct ccase *c)
+SPLIT_FILE_proc_func (struct ccase *c, void *data_)
 {
-  static struct ccase *prev_case;
-  struct variable **iter;
+  struct write_case_data *data = data_;
+  struct split_aux_data *split_aux = data->aux;
+  struct variable *const *split;
+  size_t split_cnt;
+  size_t i;
 
   /* The first case always begins a new series.  We also need to
      preserve the values of the case for later comparison. */
   if (case_count == 0)
     {
-      if (prev_case)
-       free (prev_case);
-      prev_case = xmalloc (vfm_sink_info.case_size);
-      memcpy (prev_case, c, vfm_sink_info.case_size);
+      memcpy (split_aux->prev_case, c, dict_get_case_size (default_dict));
 
       dump_splits (c);
-      if (virt_begin_func != NULL)
-       virt_begin_func ();
+      if (data->begin_func != NULL)
+       data->begin_func (data->func_aux);
       
-      return virt_proc_func (c);
+      return data->proc_func (c, data->func_aux);
     }
 
   /* Compare the value of each SPLIT FILE variable to the values on
      the previous case. */
-  for (iter = default_dict.splits; *iter; iter++)
+  split = dict_get_split_vars (default_dict);
+  split_cnt = dict_get_split_cnt (default_dict);
+  for (i = 0; i < split_cnt; i++)
     {
-      struct variable *v = *iter;
+      struct variable *v = split[i];
       
       switch (v->type)
        {
        case NUMERIC:
-         if (approx_ne (c->data[v->fv].f, prev_case->data[v->fv].f))
+         if (c->data[v->fv].f != split_aux->prev_case->data[v->fv].f)
            goto not_equal;
          break;
        case ALPHA:
-         if (memcmp (c->data[v->fv].s, prev_case->data[v->fv].s, v->width))
+         if (memcmp (c->data[v->fv].s,
+                      split_aux->prev_case->data[v->fv].s, v->width))
            goto not_equal;
          break;
        default:
          assert (0);
        }
     }
-  return virt_proc_func (c);
+  return data->proc_func (c, data->func_aux);
   
 not_equal:
   /* The values of the SPLIT FILE variable are different from the
      values on the previous case.  That means that it's time to begin
      a new series. */
-  if (end_func != NULL)
-    end_func ();
+  if (data->end_func != NULL)
+    data->end_func (data->func_aux);
   dump_splits (c);
-  if (virt_begin_func != NULL)
-    virt_begin_func ();
-  memcpy (prev_case, c, vfm_sink_info.case_size);
-  return virt_proc_func (c);
+  if (data->begin_func != NULL)
+    data->begin_func (data->func_aux);
+  memcpy (split_aux->prev_case, c, dict_get_case_size (default_dict));
+  return data->proc_func (c, data->func_aux);
 }
 \f
 /* Case compaction. */
@@ -1228,6 +1242,7 @@ compact_case (struct ccase *dest, const struct ccase *src)
 {
   int i;
   int nval = 0;
+  size_t var_cnt;
   
   assert (compaction_necessary);
 
@@ -1240,9 +1255,10 @@ compact_case (struct ccase *dest, const struct ccase *src)
 
   /* Copy all the variables except the scratch variables from SRC to
      DEST. */
-  for (i = 0; i < default_dict.nvar; i++)
+  var_cnt = dict_get_var_cnt (default_dict);
+  for (i = 0; i < var_cnt; i++)
     {
-      struct variable *v = default_dict.var[i];
+      struct variable *v = dict_get_var (default_dict, i);
       
       if (v->name[0] == '#')
        continue;
@@ -1263,35 +1279,57 @@ compact_case (struct ccase *dest, const struct ccase *src)
 static void
 finish_compaction (void)
 {
-  int copy_index = 0;
-  int nval = 0;
   int i;
 
-  for (i = 0; i < default_dict.nvar; i++)
+  for (i = 0; i < dict_get_var_cnt (default_dict); )
     {
-      struct variable *v = default_dict.var[i];
-
-      if (v->name[0] == '#')
-       {
-         clear_variable (&default_dict, v);
-         free (v);
-         continue;
-       }
+      struct variable *v = dict_get_var (default_dict, i);
 
-      v->fv = nval;
-      if (v->type == NUMERIC)
-       nval++;
+      if (v->name[0] == '#') 
+        dict_delete_var (default_dict, v);
       else
-       nval += DIV_RND_UP (v->width, sizeof (union value));
-      
-      default_dict.var[copy_index++] = v;
-    }
-  if (copy_index != default_dict.nvar)
-    {
-      default_dict.var = xrealloc (default_dict.var,
-                                  sizeof *default_dict.var * copy_index);
-      default_dict.nvar = copy_index;
+        i++;
     }
+  dict_compact_values (default_dict);
 }
 
-  
+/* Creates a case source with class CLASS and auxiliary data AUX
+   and based on dictionary DICT. */
+struct case_source *
+create_case_source (const struct case_source_class *class,
+                    const struct dictionary *dict,
+                    void *aux) 
+{
+  struct case_source *source = xmalloc (sizeof *source);
+  source->class = class;
+  source->value_cnt = dict_get_next_value_idx (dict);
+  source->aux = aux;
+  return source;
+}
+
+/* Returns nonzero if a case source is "complex". */
+int
+case_source_is_complex (const struct case_source *source) 
+{
+  return source != NULL && (source->class == &input_program_source_class
+                            || source->class == &file_type_source_class);
+}
+
+/* Returns nonzero if CLASS is the class of SOURCE. */
+int
+case_source_is_class (const struct case_source *source,
+                      const struct case_source_class *class) 
+{
+  return source != NULL && source->class == class;
+}
+
+/* Creates a case sink with class CLASS and auxiliary data
+   AUX. */
+struct case_sink *
+create_case_sink (const struct case_sink_class *class, void *aux) 
+{
+  struct case_sink *sink = xmalloc (sizeof *sink);
+  sink->class = class;
+  sink->aux = aux;
+  return sink;
+}