Oops - revert change that shouldn't have been applied.
[pspp] / src / vfm.c
index 3fdefb5db2940dd609ab813133d95207d0a8263f..82af200015d6aa40dc19f4fb6215d8a7c0480201 100644 (file)
--- a/src/vfm.c
+++ b/src/vfm.c
    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
    02111-1307, USA. */
 
    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
    02111-1307, USA. */
 
-/* AIX requires this to be the first thing in the file.  */
 #include <config.h>
 #include <config.h>
-#if __GNUC__
-#define alloca __builtin_alloca
-#else
-#if HAVE_ALLOCA_H
-#include <alloca.h>
-#else
-#ifdef _AIX
-#pragma alloca
-#else
-#ifndef alloca                 /* predefined by HP cc +Olibcalls */
-char *alloca ();
-#endif
-#endif
-#endif
-#endif
-
+#include "vfm.h"
+#include "vfmP.h"
 #include <assert.h>
 #include <errno.h>
 #include <stdio.h>
 #include <assert.h>
 #include <errno.h>
 #include <stdio.h>
@@ -43,42 +28,43 @@ char *alloca ();
 #include <unistd.h>    /* Required by SunOS4. */
 #endif
 #include "alloc.h"
 #include <unistd.h>    /* Required by SunOS4. */
 #endif
 #include "alloc.h"
-#include "approx.h"
 #include "do-ifP.h"
 #include "error.h"
 #include "expr.h"
 #include "misc.h"
 #include "random.h"
 #include "do-ifP.h"
 #include "error.h"
 #include "expr.h"
 #include "misc.h"
 #include "random.h"
+#include "settings.h"
 #include "som.h"
 #include "str.h"
 #include "tab.h"
 #include "var.h"
 #include "som.h"
 #include "str.h"
 #include "tab.h"
 #include "var.h"
-#include "vector.h"
 #include "value-labels.h"
 #include "value-labels.h"
-#include "vfm.h"
-#include "vfmP.h"
 
 /*
    Virtual File Manager (vfm):
 
 
 /*
    Virtual File Manager (vfm):
 
-   vfm is used to process data files.  It uses the model that data is
-   read from one stream (the data source), then written to another
-   (the data sink).  The data source is then deleted and the data sink
-   becomes the data source for the next procedure. */
+   vfm is used to process data files.  It uses the model that
+   data is read from one stream (the data source), processed,
+   then written to another (the data sink).  The data source is
+   then deleted and the data sink becomes the data source for the
+   next procedure. */
 
 #include "debug-print.h"
 
 
 #include "debug-print.h"
 
-/* This is used to read from the active file. */
-struct case_stream *vfm_source;
+/* Procedure execution data. */
+struct write_case_data
+  {
+    void (*beginfunc) (void *);
+    int (*procfunc) (struct ccase *, void *);
+    void (*endfunc) (void *);
+    void *aux;
+  };
 
 
-/* `value' indexes to initialize to particular values for certain cases. */
-struct long_vec reinit_sysmis;         /* SYSMIS for every case. */
-struct long_vec reinit_blanks;         /* Blanks for every case. */
-struct long_vec init_zero;             /* Zero for first case only. */
-struct long_vec init_blanks;           /* Blanks for first case only. */
+/* The current active file, from which cases are read. */
+struct case_source *vfm_source;
 
 
-/* This is used to write to the replacement active file. */
-struct case_stream *vfm_sink;
+/* The replacement active file, to which cases are written. */
+struct case_sink *vfm_sink;
 
 /* Information about the data source. */
 struct stream_info vfm_source_info;
 
 /* Information about the data source. */
 struct stream_info vfm_source_info;
@@ -86,17 +72,6 @@ struct stream_info vfm_source_info;
 /* Information about the data sink. */
 struct stream_info vfm_sink_info;
 
 /* Information about the data sink. */
 struct stream_info vfm_sink_info;
 
-/* Filter variable and  `value' index. */
-static struct variable *filter_var;
-static int filter_index;
-
-#define FILTERED                                                       \
-       (filter_index != -1                                             \
-        && (temp_case->data[filter_index].f == 0.0                     \
-            || temp_case->data[filter_index].f == SYSMIS               \
-            || is_num_user_missing (temp_case->data[filter_index].f,   \
-                                    filter_var)))
-
 /* Nonzero if the case needs to have values deleted before being
    stored, zero otherwise. */
 int compaction_necessary;
 /* Nonzero if the case needs to have values deleted before being
    stored, zero otherwise. */
 int compaction_necessary;
@@ -109,21 +84,15 @@ int compaction_nval;
    `value's. */
 struct ccase *compaction_case;
 
    `value's. */
 struct ccase *compaction_case;
 
-/* Within a session, when paging is turned on, it is never turned back
-   off.  This policy might be too aggressive. */
-static int paging = 0;
+/* Nonzero means that we've overflowed our allotted workspace.
+   After that happens once during a session, we always store the
+   active file on disk instead of in memory.  (This policy may be
+   too aggressive.) */
+static int workspace_overflow = 0;
 
 /* Time at which vfm was last invoked. */
 time_t last_vfm_invocation;
 
 
 /* Time at which vfm was last invoked. */
 time_t last_vfm_invocation;
 
-/* Functions called during procedure processing. */
-static int (*proc_func) (struct ccase *);      /* Called for each case. */
-static int (*virt_proc_func) (struct ccase *); /* From SPLIT_FILE_procfunc. */
-static void (*begin_func) (void);      /* Called at beginning of a series. */
-static void (*virt_begin_func) (void); /* Called by SPLIT_FILE_procfunc. */
-static void (*end_func) (void);        /* Called after end of a series. */
-int (*write_case) (void);
-
 /* Number of cases passed to proc_func(). */
 static int case_count;
 
 /* Number of cases passed to proc_func(). */
 static int case_count;
 
@@ -134,53 +103,76 @@ static int lag_head;              /* Index where next case will be added. */
 static struct ccase **lag_queue; /* Array of n_lag ccase * elements. */
 
 static void open_active_file (void);
 static struct ccase **lag_queue; /* Array of n_lag ccase * elements. */
 
 static void open_active_file (void);
-static void close_active_file (void);
-static int SPLIT_FILE_procfunc (struct ccase *);
+static void close_active_file (struct write_case_data *);
+static int SPLIT_FILE_procfunc (struct ccase *, void *);
 static void finish_compaction (void);
 static void lag_case (void);
 static void finish_compaction (void);
 static void lag_case (void);
-static int procedure_write_case (void);
+static int procedure_write_case (struct write_case_data *);
+static void clear_temp_case (void);
+static int exclude_this_case (void);
 \f
 /* Public functions. */
 
 \f
 /* Public functions. */
 
-/* Reads all the cases from the active file, transforms them by the
-   active set of transformations, calls PROCFUNC with CURCASE set to
-   the case and CASENUM set to the case number, and writes them to a
-   new active file.
+/* Reads all the cases from the active file, transforms them by
+   the active set of transformations, calls PROCFUNC with CURCASE
+   set to the case, and writes them to a new active file.
 
    Divides the active file into zero or more series of one or more
    cases each.  BEGINFUNC is called before each series.  ENDFUNC is
 
    Divides the active file into zero or more series of one or more
    cases each.  BEGINFUNC is called before each series.  ENDFUNC is
-   called after each series. */
+   called after each series.
+
+   Arbitrary user-specified data AUX is passed to BEGINFUNC,
+   PROCFUNC, and ENDFUNC as auxiliary data. */
 void
 void
-procedure (void (*beginfunc) (void),
-          int (*procfunc) (struct ccase *curcase),
-          void (*endfunc) (void))
+procedure (void (*beginfunc) (void *),
+          int (*procfunc) (struct ccase *curcase, void *),
+          void (*endfunc) (void *),
+           void *aux)
 {
 {
-  end_func = endfunc;
-  write_case = procedure_write_case;
+  static int recursive_call;
 
 
-  if (default_dict.n_splits && procfunc != NULL)
+  struct write_case_data procedure_write_data;
+  struct write_case_data split_file_data;
+
+  assert (++recursive_call == 1);
+
+  if (dict_get_split_cnt (default_dict) == 0) 
     {
     {
-      virt_proc_func = procfunc;
-      proc_func = SPLIT_FILE_procfunc;
-      
-      virt_begin_func = beginfunc;
-      begin_func = NULL;
-    } else {
-      begin_func = beginfunc;
-      proc_func = procfunc;
+      /* Normally we just use the data passed by the user. */
+      procedure_write_data.beginfunc = beginfunc;
+      procedure_write_data.procfunc = procfunc;
+      procedure_write_data.endfunc = endfunc;
+      procedure_write_data.aux = aux;
+    }
+  else
+    {
+      /* Under SPLIT FILE, we add a layer of indirection. */
+      procedure_write_data.beginfunc = NULL;
+      procedure_write_data.procfunc = SPLIT_FILE_procfunc;
+      procedure_write_data.endfunc = endfunc;
+      procedure_write_data.aux = &split_file_data;
+
+      split_file_data.beginfunc = beginfunc;
+      split_file_data.procfunc = procfunc;
+      split_file_data.endfunc = endfunc;
+      split_file_data.aux = aux;
     }
 
   last_vfm_invocation = time (NULL);
 
   open_active_file ();
     }
 
   last_vfm_invocation = time (NULL);
 
   open_active_file ();
-  vfm_source->read ();
-  close_active_file ();
+  if (vfm_source != NULL) 
+    vfm_source->class->read (vfm_source,
+                             procedure_write_case, &procedure_write_data);
+  close_active_file (&procedure_write_data);
+
+  assert (--recursive_call == 0);
 }
 \f
 /* Active file processing support.  Subtly different semantics from
    procedure(). */
 
 }
 \f
 /* Active file processing support.  Subtly different semantics from
    procedure(). */
 
-static int process_active_file_write_case (void);
+static int process_active_file_write_case (struct write_case_data *data);
 
 /* The casefunc might want us to stop calling it. */
 static int not_canceled;
 
 /* The casefunc might want us to stop calling it. */
 static int not_canceled;
@@ -193,28 +185,35 @@ static int not_canceled;
 
    process_active_file() ignores TEMPORARY, SPLIT FILE, and N. */
 void
 
    process_active_file() ignores TEMPORARY, SPLIT FILE, and N. */
 void
-process_active_file (void (*beginfunc) (void),
-                    int (*casefunc) (struct ccase *curcase),
-                    void (*endfunc) (void))
+process_active_file (void (*beginfunc) (void *),
+                    int (*casefunc) (struct ccase *curcase, void *),
+                    void (*endfunc) (void *),
+                     void *aux)
 {
 {
-  proc_func = casefunc;
-  write_case = process_active_file_write_case;
+  struct write_case_data process_active_write_data;
+
+  process_active_write_data.beginfunc = beginfunc;
+  process_active_write_data.procfunc = casefunc;
+  process_active_write_data.endfunc = endfunc;
+  process_active_write_data.aux = aux;
+
   not_canceled = 1;
 
   open_active_file ();
   not_canceled = 1;
 
   open_active_file ();
-  beginfunc ();
+  beginfunc (aux);
   
   /* There doesn't necessarily need to be an active file. */
   
   /* There doesn't necessarily need to be an active file. */
-  if (vfm_source)
-    vfm_source->read ();
+  if (vfm_source != NULL)
+    vfm_source->class->read (vfm_source, process_active_file_write_case,
+                             &process_active_write_data);
   
   
-  endfunc ();
-  close_active_file ();
+  endfunc (aux);
+  close_active_file (&process_active_write_data);
 }
 
 /* Pass the current case to casefunc. */
 static int
 }
 
 /* Pass the current case to casefunc. */
 static int
-process_active_file_write_case (void)
+process_active_file_write_case (struct write_case_data *data)
 {
   /* Index of current transformation. */
   int cur_trns;
 {
   /* Index of current transformation. */
   int cur_trns;
@@ -244,25 +243,14 @@ process_active_file_write_case (void)
     lag_case ();
          
   /* Call the procedure if FILTER and PROCESS IF don't prohibit it. */
     lag_case ();
          
   /* Call the procedure if FILTER and PROCESS IF don't prohibit it. */
-  if (not_canceled
-      && !FILTERED
-      && (process_if_expr == NULL ||
-         expr_evaluate (process_if_expr, temp_case, NULL) == 1.0))
-    not_canceled = proc_func (temp_case);
+  if (not_canceled && !exclude_this_case ())
+    not_canceled = data->procfunc (temp_case, data->aux);
   
   case_count++;
   
  done:
   
   case_count++;
   
  done:
-  {
-    long *lp;
+  clear_temp_case ();
 
 
-    /* This case is finished.  Initialize the variables for the next case. */
-    for (lp = reinit_sysmis.vec; *lp != -1;)
-      temp_case->data[*lp++].f = SYSMIS;
-    for (lp = reinit_blanks.vec; *lp != -1;)
-      memset (temp_case->data[*lp++].s, ' ', MAX_SHORT_STRING);
-  }
-  
   return 1;
 }
 
   return 1;
 }
 
@@ -271,7 +259,7 @@ void
 process_active_file_output_case (void)
 {
   vfm_sink_info.ncases++;
 process_active_file_output_case (void)
 {
   vfm_sink_info.ncases++;
-  vfm_sink->write ();
+  vfm_sink->class->write (vfm_sink, temp_case);
 }
 \f
 /* Opening the active file. */
 }
 \f
 /* Opening the active file. */
@@ -298,25 +286,27 @@ prepare_for_writing (void)
      file--it's just a waste of time and space. */
 
   vfm_sink_info.ncases = 0;
      file--it's just a waste of time and space. */
 
   vfm_sink_info.ncases = 0;
-  vfm_sink_info.nval = default_dict.nval;
-  vfm_sink_info.case_size = (sizeof (struct ccase)
-                            + (default_dict.nval - 1) * sizeof (union value));
+  vfm_sink_info.nval = dict_get_next_value_idx (default_dict);
+  vfm_sink_info.case_size = dict_get_case_size (default_dict);
   
   if (vfm_sink == NULL)
     {
   
   if (vfm_sink == NULL)
     {
-      if (vfm_sink_info.case_size * vfm_source_info.ncases > MAX_WORKSPACE
-         && !paging)
+      if (vfm_sink_info.case_size * vfm_source_info.ncases > set_max_workspace
+         && !workspace_overflow)
        {
          msg (MW, _("Workspace overflow predicted.  Max workspace is "
                     "currently set to %d KB (%d cases at %d bytes each).  "
        {
          msg (MW, _("Workspace overflow predicted.  Max workspace is "
                     "currently set to %d KB (%d cases at %d bytes each).  "
-                    "Paging active file to disk."),
-              MAX_WORKSPACE / 1024, MAX_WORKSPACE / vfm_sink_info.case_size,
+                    "Writing active file to disk."),
+              set_max_workspace / 1024, set_max_workspace / vfm_sink_info.case_size,
               vfm_sink_info.case_size);
          
               vfm_sink_info.case_size);
          
-         paging = 1;
+         workspace_overflow = 1;
        }
        }
-      
-      vfm_sink = paging ? &vfm_disk_stream : &vfm_memory_stream;
+
+      if (workspace_overflow)
+        vfm_sink = create_case_sink (&disk_sink_class, NULL);
+      else
+        vfm_sink = create_case_sink (&memory_sink_class, NULL);
     }
 }
 
     }
 }
 
@@ -330,22 +320,30 @@ arrange_compaction (void)
     int i;
     
     /* Count up the number of `value's that will be output. */
     int i;
     
     /* Count up the number of `value's that will be output. */
-    for (i = 0; i < temp_dict->nvar; i++)
-      if (temp_dict->var[i]->name[0] != '#')
-       {
-         assert (temp_dict->var[i]->nv > 0);
-         count_values += temp_dict->var[i]->nv;
-       }
-    assert (temporary == 2 || count_values <= temp_dict->nval);
+    for (i = 0; i < dict_get_var_cnt (temp_dict); i++) 
+      {
+        struct variable *v = dict_get_var (temp_dict, i);
+
+        if (v->name[0] != '#')
+          {
+            assert (v->nv > 0);
+            count_values += v->nv;
+          } 
+      }
+    assert (temporary == 2
+            || count_values <= dict_get_next_value_idx (temp_dict));
   }
   
   /* Compaction is only necessary if the number of `value's to output
      differs from the number already present. */
   compaction_nval = count_values;
   }
   
   /* Compaction is only necessary if the number of `value's to output
      differs from the number already present. */
   compaction_nval = count_values;
-  compaction_necessary = temporary == 2 || count_values != temp_dict->nval;
+  if (temporary == 2 || count_values != dict_get_next_value_idx (temp_dict))
+    compaction_necessary = 1;
+  else
+    compaction_necessary = 0;
   
   
-  if (vfm_sink->init)
-    vfm_sink->init ();
+  if (vfm_sink->class->open != NULL)
+    vfm_sink->class->open (vfm_sink);
 }
 
 /* Prepares the temporary case and compaction case. */
 }
 
 /* Prepares the temporary case and compaction case. */
@@ -378,67 +376,28 @@ index_to_varname (int ccase_index)
 }
 #endif
 
 }
 #endif
 
-/* Initializes temp_case from the vectors that say which `value's need
-   to be initialized just once, and which ones need to be
+/* Initializes temp_case from the vectors that say which `value's
+   need to be initialized just once, and which ones need to be
    re-initialized before every case. */
 static void
 vector_initialization (void)
 {
    re-initialized before every case. */
 static void
 vector_initialization (void)
 {
-  int i;
-  long *lp;
-  
-  /* Just once. */
-  for (i = 0; i < init_zero.n; i++)
-    temp_case->data[init_zero.vec[i]].f = 0.0;
-  for (i = 0; i < init_blanks.n; i++)
-    memset (temp_case->data[init_blanks.vec[i]].s, ' ', MAX_SHORT_STRING);
-
-  /* These vectors need to be repeatedly accessed, so we add a
-     sentinel to (hopefully) improve speed. */
-  vec_insert (&reinit_sysmis, -1);
-  vec_insert (&reinit_blanks, -1);
-
-  for (lp = reinit_sysmis.vec; *lp != -1;)
-    temp_case->data[*lp++].f = SYSMIS;
-  for (lp = reinit_blanks.vec; *lp != -1;)
-    memset (temp_case->data[*lp++].s, ' ', MAX_SHORT_STRING);
-  
-#if DEBUGGING
-  printf ("vfm: init_zero=");
-  for (i = 0; i < init_zero.n; i++)
-    printf ("%s%s", i ? "," : "", index_to_varname (init_zero.vec[i]));
-  printf (" init_blanks=");
-  for (i = 0; i < init_blanks.n; i++)
-    printf ("%s%s", i ? "," : "", index_to_varname (init_blanks.vec[i]));
-  printf (" reinit_sysmis=");
-  for (lp = reinit_sysmis.vec; *lp != -1; lp++)
-    printf ("%s%s", lp != reinit_sysmis.vec ? "," : "",
-           index_to_varname (*lp));
-  printf (" reinit_blanks=");
-  for (lp = reinit_blanks.vec; *lp != -1; lp++)
-    printf ("%s%s", lp != reinit_blanks.vec ? "," : "",
-           index_to_varname (*lp));
-  printf ("\n");
-#endif
-}
-
-/* Sets filter_index to an appropriate value. */
-static void
-setup_filter (void)
-{
-  filter_index = -1;
+  size_t var_cnt = dict_get_var_cnt (default_dict);
+  size_t i;
   
   
-  if (default_dict.filter_var[0])
+  for (i = 0; i < var_cnt; i++) 
     {
     {
-      struct variable *fv = find_variable (default_dict.filter_var);
-      
-      if (fv == NULL || fv->type == ALPHA)
-       default_dict.filter_var[0] = 0;
+      struct variable *v = dict_get_var (default_dict, i);
+
+      if (v->type == NUMERIC) 
+        {
+          if (v->reinit)
+            temp_case->data[v->fv].f = 0.0;
+          else
+            temp_case->data[v->fv].f = SYSMIS;
+        }
       else
       else
-       {
-         filter_index = fv->index;
-         filter_var = fv;
-       }
+        memset (temp_case->data[v->fv].s, ' ', v->width);
     }
 }
 
     }
 }
 
@@ -455,7 +414,7 @@ setup_lag (void)
   lag_head = 0;
   lag_queue = xmalloc (n_lag * sizeof *lag_queue);
   for (i = 0; i < n_lag; i++)
   lag_head = 0;
   lag_queue = xmalloc (n_lag * sizeof *lag_queue);
   for (i = 0; i < n_lag; i++)
-    lag_queue[i] = xmalloc (temp_dict->nval * sizeof **lag_queue);
+    lag_queue[i] = xmalloc (dict_get_case_size (temp_dict));
 }
 
 /* There is a lot of potential confusion in the vfm and related
 }
 
 /* There is a lot of potential confusion in the vfm and related
@@ -504,7 +463,7 @@ open_active_file (void)
   if (!temporary)
     {
       temp_trns = n_trns;
   if (!temporary)
     {
       temp_trns = n_trns;
-      temp_dict = &default_dict;
+      temp_dict = default_dict;
     }
 
   /* No cases passed to the procedure yet. */
     }
 
   /* No cases passed to the procedure yet. */
@@ -516,7 +475,6 @@ open_active_file (void)
   make_temp_case ();
   vector_initialization ();
   discard_ctl_stack ();
   make_temp_case ();
   vector_initialization ();
   discard_ctl_stack ();
-  setup_filter ();
   setup_lag ();
 
   /* Debug output. */
   setup_lag ();
 
   /* Debug output. */
@@ -531,11 +489,11 @@ open_active_file (void)
 \f
 /* Closes the active file. */
 static void
 \f
 /* Closes the active file. */
 static void
-close_active_file (void)
+close_active_file (struct write_case_data *data)
 {
   /* Close the current case group. */
 {
   /* Close the current case group. */
-  if (case_count && end_func != NULL)
-    end_func ();
+  if (case_count && data->endfunc != NULL)
+    data->endfunc (data->aux);
 
   /* Stop lagging (catch up?). */
   if (n_lag)
 
   /* Stop lagging (catch up?). */
   if (n_lag)
@@ -552,31 +510,34 @@ close_active_file (void)
      off TEMPORARY. */
   if (temporary)
     {
      off TEMPORARY. */
   if (temporary)
     {
-      restore_dictionary (temp_dict);
+      dict_destroy (default_dict);
+      default_dict = temp_dict;
       temp_dict = NULL;
     }
 
       temp_dict = NULL;
     }
 
-  /* The default dictionary assumes the compacted data size. */
-  default_dict.nval = compaction_nval;
+  /* Finish compaction. */
+  if (compaction_necessary)
+    finish_compaction ();
     
   /* Old data sink --> New data source. */
     
   /* Old data sink --> New data source. */
-  if (vfm_source && vfm_source->destroy_source)
-    vfm_source->destroy_source ();
-  
-  vfm_source = vfm_sink;
+  if (vfm_source != NULL) 
+    {
+      if (vfm_source->class->destroy != NULL)
+        vfm_source->class->destroy (vfm_source);
+      free (vfm_source);
+    }
+
+  vfm_source = vfm_sink->class->make_source (vfm_sink);
   vfm_source_info.ncases = vfm_sink_info.ncases;
   vfm_source_info.nval = compaction_nval;
   vfm_source_info.case_size = (sizeof (struct ccase)
                               + (compaction_nval - 1) * sizeof (union value));
   vfm_source_info.ncases = vfm_sink_info.ncases;
   vfm_source_info.nval = compaction_nval;
   vfm_source_info.case_size = (sizeof (struct ccase)
                               + (compaction_nval - 1) * sizeof (union value));
-  if (vfm_source->mode)
-    vfm_source->mode ();
 
   /* Old data sink is gone now. */
 
   /* Old data sink is gone now. */
+  free (vfm_sink);
   vfm_sink = NULL;
 
   vfm_sink = NULL;
 
-  /* Finish compaction. */
-  if (compaction_necessary)
-    finish_compaction ();
+  /* Cancel TEMPORARY. */
   cancel_temporary ();
 
   /* Free temporary cases. */
   cancel_temporary ();
 
   /* Free temporary cases. */
@@ -591,47 +552,29 @@ close_active_file (void)
   process_if_expr = NULL;
 
   /* Cancel FILTER if temporary. */
   process_if_expr = NULL;
 
   /* Cancel FILTER if temporary. */
-  if (filter_index != -1 && !FILTER_before_TEMPORARY)
-    default_dict.filter_var[0] = 0;
+  if (dict_get_filter (default_dict) != NULL && !FILTER_before_TEMPORARY)
+    dict_set_filter (default_dict, NULL);
 
   /* Cancel transformations. */
   cancel_transformations ();
 
 
   /* Cancel transformations. */
   cancel_transformations ();
 
-  /* Clear value-initialization vectors. */
-  vec_clear (&init_zero);
-  vec_clear (&init_blanks);
-  vec_clear (&reinit_sysmis);
-  vec_clear (&reinit_blanks);
-
   /* Turn off case limiter. */
   /* Turn off case limiter. */
-  default_dict.N = 0;
+  dict_set_case_limit (default_dict, 0);
 
   /* Clear VECTOR vectors. */
 
   /* Clear VECTOR vectors. */
-  {
-    int i;
-
-    for (i = 0; i < nvec; i++)
-      free (vec[i].v);
-    free (vec);
-    vec = NULL;
-    nvec = 0;
-  }
+  dict_clear_vectors (default_dict);
 
   debug_printf (("vfm: procedure complete\n\n"));
 }
 \f
 /* Disk case stream. */
 
 
   debug_printf (("vfm: procedure complete\n\n"));
 }
 \f
 /* Disk case stream. */
 
-/* Associated files. */
-FILE *disk_source_file;
-FILE *disk_sink_file;
-
 /* Initializes the disk sink. */
 static void
 /* Initializes the disk sink. */
 static void
-disk_stream_init (void)
+disk_sink_create (struct case_sink *sink)
 {
 {
-  disk_sink_file = tmpfile ();
-  if (!disk_sink_file)
+  sink->aux = tmpfile ();
+  if (!sink->aux)
     {
       msg (ME, _("An error occurred attempting to create a temporary "
                 "file for use as the active file: %s."),
     {
       msg (ME, _("An error occurred attempting to create a temporary "
                 "file for use as the active file: %s."),
@@ -640,44 +583,21 @@ disk_stream_init (void)
     }
 }
 
     }
 }
 
-/* Reads all cases from the disk source and passes them one by one to
-   write_case(). */
-static void
-disk_stream_read (void)
-{
-  int i;
-
-  for (i = 0; i < vfm_source_info.ncases; i++)
-    {
-      if (!fread (temp_case, vfm_source_info.case_size, 1, disk_source_file))
-       {
-         msg (ME, _("An error occurred while attempting to read from "
-              "a temporary file created for the active file: %s."),
-              strerror (errno));
-         err_failure ();
-         return;
-       }
-
-      if (!write_case ())
-       return;
-    }
-}
-
 /* Writes temp_case to the disk sink. */
 static void
 /* Writes temp_case to the disk sink. */
 static void
-disk_stream_write (void)
+disk_sink_write (struct case_sink *sink, struct ccase *c)
 {
 {
+  FILE *file = sink->aux;
   union value *src_case;
 
   if (compaction_necessary)
     {
   union value *src_case;
 
   if (compaction_necessary)
     {
-      compact_case (compaction_case, temp_case);
-      src_case = (union value *) compaction_case;
+      compact_case (compaction_case, c);
+      src_case = compaction_case->data;
     }
     }
-  else src_case = (union value *) temp_case;
+  else src_case = c->data;
 
 
-  if (fwrite (src_case, sizeof *src_case * compaction_nval, 1,
-             disk_sink_file) != 1)
+  if (fwrite (src_case, sizeof *src_case * compaction_nval, 1, file) != 1)
     {
       msg (ME, _("An error occurred while attempting to write to a "
                 "temporary file used as the active file: %s."),
     {
       msg (ME, _("An error occurred while attempting to write to a "
                 "temporary file used as the active file: %s."),
@@ -686,12 +606,25 @@ disk_stream_write (void)
     }
 }
 
     }
 }
 
-/* Switches the stream from a sink to a source. */
+/* Destroys the sink's internal data. */
 static void
 static void
-disk_stream_mode (void)
+disk_sink_destroy (struct case_sink *sink)
 {
 {
-  /* Rewind the sink. */
-  if (fseek (disk_sink_file, 0, SEEK_SET) != 0)
+  FILE *file = sink->aux;
+  if (file != NULL)
+    fclose (file);
+}
+
+/* Closes and destroys the sink and returns a disk source to read
+   back the written data. */
+static struct case_source *
+disk_sink_make_source (struct case_sink *sink) 
+{
+  FILE *file = sink->aux;
+  
+  /* Rewind the file. */
+  assert (file != NULL);
+  if (fseek (file, 0, SEEK_SET) != 0)
     {
       msg (ME, _("An error occurred while attempting to rewind a "
                 "temporary file used as the active file: %s."),
     {
       msg (ME, _("An error occurred while attempting to rewind a "
                 "temporary file used as the active file: %s."),
@@ -699,107 +632,118 @@ disk_stream_mode (void)
       err_failure ();
     }
   
       err_failure ();
     }
   
-  /* Sink --> source variables. */
-  disk_source_file = disk_sink_file;
+  return create_case_source (&disk_source_class, file);
 }
 
 }
 
-/* Destroys the source's internal data. */
+/* Disk sink. */
+const struct case_sink_class disk_sink_class = 
+  {
+    "disk",
+    disk_sink_create,
+    disk_sink_write,
+    disk_sink_destroy,
+    disk_sink_make_source,
+  };
+\f
+/* Disk source. */
+
+/* Reads all cases from the disk source and passes them one by one to
+   write_case(). */
 static void
 static void
-disk_stream_destroy_source (void)
+disk_source_read (struct case_source *source,
+                  write_case_func *write_case, write_case_data wc_data)
 {
 {
-  if (disk_source_file)
+  FILE *file = source->aux;
+  int i;
+
+  for (i = 0; i < vfm_source_info.ncases; i++)
     {
     {
-      fclose (disk_source_file);
-      disk_source_file = NULL;
+      if (!fread (temp_case, vfm_source_info.case_size, 1, file))
+       {
+         msg (ME, _("An error occurred while attempting to read from "
+              "a temporary file created for the active file: %s."),
+              strerror (errno));
+         err_failure ();
+         return;
+       }
+
+      if (!write_case (wc_data))
+       return;
     }
 }
 
     }
 }
 
-/* Destroys the sink's internal data. */
+/* Destroys the source's internal data. */
 static void
 static void
-disk_stream_destroy_sink (void)
+disk_source_destroy (struct case_source *source)
 {
 {
-  if (disk_sink_file)
-    {
-      fclose (disk_sink_file);
-      disk_sink_file = NULL;
-    }
+  FILE *file = source->aux;
+  if (file != NULL)
+    fclose (file);
 }
 
 }
 
-/* Disk stream. */
-struct case_stream vfm_disk_stream = 
+/* Disk source. */
+const struct case_source_class disk_source_class = 
   {
   {
-    disk_stream_init,
-    disk_stream_read,
-    disk_stream_write,
-    disk_stream_mode,
-    disk_stream_destroy_source,
-    disk_stream_destroy_sink,
     "disk",
     "disk",
+    disk_source_read,
+    disk_source_destroy,
   };
 \f
 /* Memory case stream. */
 
   };
 \f
 /* Memory case stream. */
 
-/* List of cases stored in the stream. */
-struct case_list *memory_source_cases;
-struct case_list *memory_sink_cases;
-
-/* Current case. */
-struct case_list *memory_sink_iter;
+/* Memory sink data. */
+struct memory_sink_info
+  {
+    int max_cases;              /* Maximum cases before switching to disk. */
+    struct case_list *head;     /* First case in list. */
+    struct case_list *tail;     /* Last case in list. */
+  };
 
 
-/* Maximum number of cases. */
-int memory_sink_max_cases;
+/* Memory source data. */
+struct memory_source_info 
+  {
+    struct case_list *cases;    /* List of cases. */
+  };
 
 
-/* Initializes the memory stream variables for writing. */
 static void
 static void
-memory_stream_init (void)
+memory_sink_create (struct case_sink *sink) 
 {
 {
-  memory_sink_cases = NULL;
-  memory_sink_iter = NULL;
+  struct memory_sink_info *info;
   
   
-  assert (compaction_nval);
-  memory_sink_max_cases = MAX_WORKSPACE / (sizeof (union value) * compaction_nval);
-}
+  sink->aux = info = xmalloc (sizeof *info);
 
 
-/* Reads the case stream from memory and passes it to write_case(). */
-static void
-memory_stream_read (void)
-{
-  while (memory_source_cases != NULL)
-    {
-      memcpy (temp_case, &memory_source_cases->c, vfm_source_info.case_size);
-      
-      {
-       struct case_list *current = memory_source_cases;
-       memory_source_cases = memory_source_cases->next;
-       free (current);
-      }
-      
-      if (!write_case ())
-       return;
-    }
+  assert (compaction_nval > 0);
+  info->max_cases = set_max_workspace / (sizeof (union value) * compaction_nval);
+  info->head = info->tail = NULL;
 }
 
 }
 
-/* Writes temp_case to the memory stream. */
 static void
 static void
-memory_stream_write (void)
+memory_sink_write (struct case_sink *sink, struct ccase *c) 
 {
 {
-  struct case_list *new_case = malloc (sizeof (struct case_list)
-                                      + ((compaction_nval - 1)
-                                         * sizeof (union value)));
+  struct memory_sink_info *info = sink->aux;
+  size_t case_size;
+  struct case_list *new_case;
+
+  case_size = sizeof (struct case_list)
+                      + ((compaction_nval - 1) * sizeof (union value));
+  new_case = malloc (case_size);
 
   /* If we've got memory to spare then add it to the linked list. */
 
   /* If we've got memory to spare then add it to the linked list. */
-  if (vfm_sink_info.ncases <= memory_sink_max_cases && new_case != NULL)
+  if (vfm_sink_info.ncases <= info->max_cases && new_case != NULL)
     {
     {
-      if (compaction_necessary)
-       compact_case (&new_case->c, temp_case);
+      /* Append case to linked list. */
+      new_case->next = NULL;
+      if (info->head != NULL)
+        info->tail->next = new_case;
       else
       else
-       memcpy (&new_case->c, temp_case, sizeof (union value) * compaction_nval);
+        info->head = new_case;
+      info->tail = new_case;
 
 
-      /* Append case to linked list. */
-      if (memory_sink_cases)
-       memory_sink_iter = memory_sink_iter->next = new_case;
+      /* Copy data into case. */
+      if (compaction_necessary)
+       compact_case (&new_case->c, c);
       else
       else
-       memory_sink_iter = memory_sink_cases = new_case;
+       memcpy (&new_case->c, c, sizeof (union value) * compaction_nval);
     }
   else
     {
     }
   else
     {
@@ -808,36 +752,32 @@ memory_stream_write (void)
 
       /* Notify the user. */
       if (!new_case)
 
       /* Notify the user. */
       if (!new_case)
-       msg (MW, _("Virtual memory exhausted.  Paging active file "
+       msg (MW, _("Virtual memory exhausted.  Writing active file "
                   "to disk."));
       else
        msg (MW, _("Workspace limit of %d KB (%d cases at %d bytes each) "
                   "to disk."));
       else
        msg (MW, _("Workspace limit of %d KB (%d cases at %d bytes each) "
-                  "overflowed.  Paging active file to disk."),
-            MAX_WORKSPACE / 1024, memory_sink_max_cases,
+                  "overflowed.  Writing active file to disk."),
+            set_max_workspace / 1024, info->max_cases,
             compaction_nval * sizeof (union value));
 
       free (new_case);
 
       /* Switch to a disk sink. */
             compaction_nval * sizeof (union value));
 
       free (new_case);
 
       /* Switch to a disk sink. */
-      vfm_sink = &vfm_disk_stream;
-      vfm_sink->init ();
-      paging = 1;
-
-      /* Terminate the list. */
-      if (memory_sink_iter)
-       memory_sink_iter->next = NULL;
+      vfm_sink = create_case_sink (&disk_sink_class, NULL);
+      vfm_sink->class->open (vfm_sink);
+      workspace_overflow = 1;
 
       /* Write the cases to disk and destroy them.  We can't call
          vfm->sink->write() because of compaction. */
 
       /* Write the cases to disk and destroy them.  We can't call
          vfm->sink->write() because of compaction. */
-      for (cur = memory_sink_cases; cur; cur = next)
+      for (cur = info->head; cur; cur = next)
        {
          next = cur->next;
          if (fwrite (cur->c.data, sizeof (union value) * compaction_nval, 1,
        {
          next = cur->next;
          if (fwrite (cur->c.data, sizeof (union value) * compaction_nval, 1,
-                     disk_sink_file) != 1)
+                     vfm_sink->aux) != 1)
            {
              msg (ME, _("An error occurred while attempting to "
                         "write to a temporary file created as the "
            {
              msg (ME, _("An error occurred while attempting to "
                         "write to a temporary file created as the "
-                        "active file, while paging to disk: %s."),
+                        "active file: %s."),
                   strerror (errno));
              err_failure ();
            }
                   strerror (errno));
              err_failure ();
            }
@@ -845,36 +785,38 @@ memory_stream_write (void)
        }
 
       /* Write the current case to disk. */
        }
 
       /* Write the current case to disk. */
-      vfm_sink->write ();
+      vfm_sink->class->write (vfm_sink, c);
     }
 }
 
 /* If the data is stored in memory, causes it to be written to disk.
    To be called only *between* procedure()s, not within them. */
 void
     }
 }
 
 /* If the data is stored in memory, causes it to be written to disk.
    To be called only *between* procedure()s, not within them. */
 void
-page_to_disk (void)
+write_active_file_to_disk (void)
 {
 {
-  if (vfm_source == &vfm_memory_stream)
+  if (case_source_is_class (vfm_source, &memory_source_class))
     {
     {
+      struct memory_source_info *info = vfm_source->aux;
+
       /* Switch to a disk sink. */
       /* Switch to a disk sink. */
-      vfm_sink = &vfm_disk_stream;
-      vfm_sink->init ();
-      paging = 1;
+      vfm_sink = create_case_sink (&disk_sink_class, NULL);
+      vfm_sink->class->open (vfm_sink);
+      workspace_overflow = 1;
       
       /* Write the cases to disk and destroy them.  We can't call
          vfm->sink->write() because of compaction. */
       {
        struct case_list *cur, *next;
        
       
       /* Write the cases to disk and destroy them.  We can't call
          vfm->sink->write() because of compaction. */
       {
        struct case_list *cur, *next;
        
-       for (cur = memory_source_cases; cur; cur = next)
+       for (cur = info->cases; cur; cur = next)
          {
            next = cur->next;
            if (fwrite (cur->c.data, sizeof *cur->c.data * compaction_nval, 1,
          {
            next = cur->next;
            if (fwrite (cur->c.data, sizeof *cur->c.data * compaction_nval, 1,
-                       disk_sink_file) != 1)
+                       vfm_sink->aux) != 1)
              {
                msg (ME, _("An error occurred while attempting to "
                           "write to a temporary file created as the "
              {
                msg (ME, _("An error occurred while attempting to "
                           "write to a temporary file created as the "
-                          "active file, while paging to disk: %s."),
+                          "active file: %s."),
                     strerror (errno));
                err_failure ();
              }
                     strerror (errno));
                err_failure ();
              }
@@ -882,64 +824,107 @@ page_to_disk (void)
          }
       }
       
          }
       }
       
-      vfm_source = &vfm_disk_stream;
-      vfm_source->mode ();
-
+      vfm_source = vfm_sink->class->make_source (vfm_sink);
       vfm_sink = NULL;
     }
 }
 
       vfm_sink = NULL;
     }
 }
 
-/* Switch the memory stream from sink to source mode. */
+/* Destroy all memory sink data. */
 static void
 static void
-memory_stream_mode (void)
+memory_sink_destroy (struct case_sink *sink)
 {
 {
-  /* Terminate the list. */
-  if (memory_sink_iter)
-    memory_sink_iter->next = NULL;
+  struct memory_sink_info *info = sink->aux;
+  struct case_list *cur, *next;
+  
+  for (cur = info->head; cur; cur = next)
+    {
+      next = cur->next;
+      free (cur);
+    }
+  free (info);
+}
+
+/* Switch the memory stream from sink to source mode. */
+static struct case_source *
+memory_sink_make_source (struct case_sink *sink)
+{
+  struct memory_sink_info *sink_info = sink->aux;
+  struct memory_source_info *source_info;
 
 
-  /* Sink --> source variables. */
-  memory_source_cases = memory_sink_cases;
-  memory_sink_cases = NULL;
+  source_info = xmalloc (sizeof *source_info);
+  source_info->cases = sink_info->head;
+
+  free (sink_info);
+
+  return create_case_source (&memory_source_class, source_info);
 }
 
 }
 
-/* Destroy all memory source data. */
+const struct case_sink_class memory_sink_class = 
+  {
+    "memory",
+    memory_sink_create,
+    memory_sink_write,
+    memory_sink_destroy,
+    memory_sink_make_source,
+  };
+
+/* Reads the case stream from memory and passes it to write_case(). */
 static void
 static void
-memory_stream_destroy_source (void)
+memory_source_read (struct case_source *source,
+                    write_case_func *write_case, write_case_data wc_data)
 {
 {
-  struct case_list *cur, *next;
-  
-  for (cur = memory_source_cases; cur; cur = next)
+  struct memory_source_info *info = source->aux;
+
+  while (info->cases != NULL) 
     {
     {
-      next = cur->next;
-      free (cur);
+      struct case_list *iter = info->cases;
+      info->cases = iter->next;
+      memcpy (temp_case, &iter->c, vfm_source_info.case_size);
+      free (iter);
+      
+      if (!write_case (wc_data))
+       return;
     }
     }
-  memory_source_cases = NULL;
 }
 
 }
 
-/* Destroy all memory sink data. */
+/* Destroy all memory source data. */
 static void
 static void
-memory_stream_destroy_sink (void)
+memory_source_destroy (struct case_source *source)
 {
 {
+  struct memory_source_info *info = source->aux;
   struct case_list *cur, *next;
   
   struct case_list *cur, *next;
   
-  for (cur = memory_sink_cases; cur; cur = next)
+  for (cur = info->cases; cur; cur = next)
     {
       next = cur->next;
       free (cur);
     }
     {
       next = cur->next;
       free (cur);
     }
-  memory_sink_cases = NULL;
+  free (info);
 }
 }
-  
+
+struct case_list *
+memory_source_get_cases (const struct case_source *source) 
+{
+  struct memory_source_info *info = source->aux;
+
+  return info->cases;
+}
+
+void
+memory_source_set_cases (const struct case_source *source,
+                         struct case_list *cases) 
+{
+  struct memory_source_info *info = source->aux;
+
+  info->cases = cases;
+}
+
 /* Memory stream. */
 /* Memory stream. */
-struct case_stream vfm_memory_stream = 
+const struct case_source_class memory_source_class = 
   {
   {
-    memory_stream_init,
-    memory_stream_read,
-    memory_stream_write,
-    memory_stream_mode,
-    memory_stream_destroy_source,
-    memory_stream_destroy_sink,
     "memory",
     "memory",
+    memory_source_read,
+    memory_source_destroy,
   };
 \f
 #include "debug-print.h"
   };
 \f
 #include "debug-print.h"
@@ -950,7 +935,8 @@ lag_case (void)
 {
   if (lag_count < n_lag)
     lag_count++;
 {
   if (lag_count < n_lag)
     lag_count++;
-  memcpy (lag_queue[lag_head], temp_case, sizeof (union value) * temp_dict->nval);
+  memcpy (lag_queue[lag_head], temp_case,
+          dict_get_case_size (temp_dict));
   if (++lag_head >= n_lag)
     lag_head = 0;
 }
   if (++lag_head >= n_lag)
     lag_head = 0;
 }
@@ -977,7 +963,7 @@ lagged_case (int n_before)
    otherwise.  Do not call this function again after it has returned
    zero once.  */
 int
    otherwise.  Do not call this function again after it has returned
    zero once.  */
 int
-procedure_write_case (void)
+procedure_write_case (write_case_data wc_data)
 {
   /* Index of current transformation. */
   int cur_trns;
 {
   /* Index of current transformation. */
   int cur_trns;
@@ -999,10 +985,11 @@ procedure_write_case (void)
            lag_case ();
          
          vfm_sink_info.ncases++;
            lag_case ();
          
          vfm_sink_info.ncases++;
-         vfm_sink->write ();
+         vfm_sink->class->write (vfm_sink, temp_case);
 
 
-         if (default_dict.N)
-           more_cases = vfm_sink_info.ncases < default_dict.N;
+         if (dict_get_case_limit (default_dict))
+           more_cases = (vfm_sink_info.ncases
+                          < dict_get_case_limit (default_dict));
        }
 
       /* Are we done? */
        }
 
       /* Are we done? */
@@ -1034,36 +1021,70 @@ procedure_write_case (void)
     }
 
   /* Call the beginning of group function. */
     }
 
   /* Call the beginning of group function. */
-  if (!case_count && begin_func != NULL)
-    begin_func ();
+  if (!case_count && wc_data->beginfunc != NULL)
+    wc_data->beginfunc (wc_data->aux);
 
   /* Call the procedure if there is one and FILTER and PROCESS IF
      don't prohibit it. */
 
   /* Call the procedure if there is one and FILTER and PROCESS IF
      don't prohibit it. */
-  if (proc_func != NULL
-      && !FILTERED
-      && (process_if_expr == NULL ||
-         expr_evaluate (process_if_expr, temp_case, NULL) == 1.0))
-    proc_func (temp_case);
+  if (wc_data->procfunc != NULL && !exclude_this_case ())
+    wc_data->procfunc (temp_case, wc_data->aux);
 
   case_count++;
   
 done:
   debug_putc ('\n', stdout);
 
   case_count++;
   
 done:
   debug_putc ('\n', stdout);
-  
-  {
-    long *lp;
 
 
-    /* This case is finished.  Initialize the variables for the next case. */
-    for (lp = reinit_sysmis.vec; *lp != -1;)
-      temp_case->data[*lp++].f = SYSMIS;
-    for (lp = reinit_blanks.vec; *lp != -1;)
-      memset (temp_case->data[*lp++].s, ' ', MAX_SHORT_STRING);
-  }
+  clear_temp_case ();
   
   /* Return previously determined value. */
   return more_cases;
 }
 
   
   /* Return previously determined value. */
   return more_cases;
 }
 
+/* Clears the variables in the temporary case that need to be
+   cleared between processing cases.  */
+static void
+clear_temp_case (void)
+{
+  /* FIXME?  This is linear in the number of variables, but
+     doesn't need to be, so it's an easy optimization target. */
+  size_t var_cnt = dict_get_var_cnt (default_dict);
+  size_t i;
+  
+  for (i = 0; i < var_cnt; i++) 
+    {
+      struct variable *v = dict_get_var (default_dict, i);
+      if (v->init && v->reinit) 
+        {
+          if (v->type == NUMERIC) 
+            temp_case->data[v->fv].f = SYSMIS;
+          else
+            memset (temp_case->data[v->fv].s, ' ', v->width);
+        } 
+    }
+}
+
+/* Returns nonzero if this case should be exclude as specified on
+   FILTER or PROCESS IF, otherwise zero. */
+static int
+exclude_this_case (void)
+{
+  /* FILTER. */
+  struct variable *filter_var = dict_get_filter (default_dict);
+  if (filter_var != NULL) 
+    {
+      double f = temp_case->data[filter_var->fv].f;
+      if (f == 0.0 || f == SYSMIS || is_num_user_missing (f, filter_var))
+        return 1;
+    }
+
+  /* PROCESS IF. */
+  if (process_if_expr != NULL
+      && expr_evaluate (process_if_expr, temp_case, NULL) != 1.0)
+    return 1;
+
+  return 0;
+}
+
 /* Appends TRNS to t_trns[], the list of all transformations to be
    performed on data as it is read from the active file. */
 void
 /* Appends TRNS to t_trns[], the list of all transformations to be
    performed on data as it is read from the active file. */
 void
@@ -1102,32 +1123,30 @@ cancel_transformations (void)
 static void
 dump_splits (struct ccase *c)
 {
 static void
 dump_splits (struct ccase *c)
 {
-  struct variable **iter;
+  struct variable *const *split;
   struct tab_table *t;
   struct tab_table *t;
+  size_t split_cnt;
   int i;
 
   int i;
 
-  t = tab_create (3, default_dict.n_splits + 1, 0);
+  split_cnt = dict_get_split_cnt (default_dict);
+  t = tab_create (3, split_cnt + 1, 0);
   tab_dim (t, tab_natural_dimensions);
   tab_dim (t, tab_natural_dimensions);
-  tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, default_dict.n_splits);
-  tab_vline (t, TAL_1 | TAL_SPACING, 2, 0, default_dict.n_splits);
+  tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, split_cnt);
+  tab_vline (t, TAL_1 | TAL_SPACING, 2, 0, split_cnt);
   tab_text (t, 0, 0, TAB_NONE, _("Variable"));
   tab_text (t, 1, 0, TAB_LEFT, _("Value"));
   tab_text (t, 2, 0, TAB_LEFT, _("Label"));
   tab_text (t, 0, 0, TAB_NONE, _("Variable"));
   tab_text (t, 1, 0, TAB_LEFT, _("Value"));
   tab_text (t, 2, 0, TAB_LEFT, _("Label"));
-  for (iter = default_dict.splits, i = 0; *iter; iter++, i++)
+  split = dict_get_split_vars (default_dict);
+  for (i = 0; i < split_cnt; i++)
     {
     {
-      struct variable *v = *iter;
+      struct variable *v = split[i];
       char temp_buf[80];
       const char *val_lab;
 
       assert (v->type == NUMERIC || v->type == ALPHA);
       tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
       
       char temp_buf[80];
       const char *val_lab;
 
       assert (v->type == NUMERIC || v->type == ALPHA);
       tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
       
-      {
-       union value val = c->data[v->fv];
-       if (v->type == ALPHA)
-         val.c = c->data[v->fv].s;
-       data_out (temp_buf, &v->print, &val);
-      }
+      data_out (temp_buf, &v->print, &c->data[v->fv]);
       
       temp_buf[v->print.w] = 0;
       tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
       
       temp_buf[v->print.w] = 0;
       tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
@@ -1144,10 +1163,13 @@ dump_splits (struct ccase *c)
    SPLIT FILE is active.  This function forms a wrapper around that
    procfunc by dividing the input into series. */
 static int
    SPLIT FILE is active.  This function forms a wrapper around that
    procfunc by dividing the input into series. */
 static int
-SPLIT_FILE_procfunc (struct ccase *c)
+SPLIT_FILE_procfunc (struct ccase *c, void *data_)
 {
 {
+  struct write_case_data *data = data_;
   static struct ccase *prev_case;
   static struct ccase *prev_case;
-  struct variable **iter;
+  struct variable *const *split;
+  size_t split_cnt;
+  size_t i;
 
   /* The first case always begins a new series.  We also need to
      preserve the values of the case for later comparison. */
 
   /* The first case always begins a new series.  We also need to
      preserve the values of the case for later comparison. */
@@ -1159,22 +1181,24 @@ SPLIT_FILE_procfunc (struct ccase *c)
       memcpy (prev_case, c, vfm_sink_info.case_size);
 
       dump_splits (c);
       memcpy (prev_case, c, vfm_sink_info.case_size);
 
       dump_splits (c);
-      if (virt_begin_func != NULL)
-       virt_begin_func ();
+      if (data->beginfunc != NULL)
+       data->beginfunc (data->aux);
       
       
-      return virt_proc_func (c);
+      return data->procfunc (c, data->aux);
     }
 
   /* Compare the value of each SPLIT FILE variable to the values on
      the previous case. */
     }
 
   /* Compare the value of each SPLIT FILE variable to the values on
      the previous case. */
-  for (iter = default_dict.splits; *iter; iter++)
+  split = dict_get_split_vars (default_dict);
+  split_cnt = dict_get_split_cnt (default_dict);
+  for (i = 0; i < split_cnt; i++)
     {
     {
-      struct variable *v = *iter;
+      struct variable *v = split[i];
       
       switch (v->type)
        {
        case NUMERIC:
       
       switch (v->type)
        {
        case NUMERIC:
-         if (approx_ne (c->data[v->fv].f, prev_case->data[v->fv].f))
+         if (c->data[v->fv].f != prev_case->data[v->fv].f)
            goto not_equal;
          break;
        case ALPHA:
            goto not_equal;
          break;
        case ALPHA:
@@ -1185,19 +1209,19 @@ SPLIT_FILE_procfunc (struct ccase *c)
          assert (0);
        }
     }
          assert (0);
        }
     }
-  return virt_proc_func (c);
+  return data->procfunc (c, data->aux);
   
 not_equal:
   /* The values of the SPLIT FILE variable are different from the
      values on the previous case.  That means that it's time to begin
      a new series. */
   
 not_equal:
   /* The values of the SPLIT FILE variable are different from the
      values on the previous case.  That means that it's time to begin
      a new series. */
-  if (end_func != NULL)
-    end_func ();
+  if (data->endfunc != NULL)
+    data->endfunc (data->aux);
   dump_splits (c);
   dump_splits (c);
-  if (virt_begin_func != NULL)
-    virt_begin_func ();
+  if (data->beginfunc != NULL)
+    data->beginfunc (data->aux);
   memcpy (prev_case, c, vfm_sink_info.case_size);
   memcpy (prev_case, c, vfm_sink_info.case_size);
-  return virt_proc_func (c);
+  return data->procfunc (c, data->aux);
 }
 \f
 /* Case compaction. */
 }
 \f
 /* Case compaction. */
@@ -1208,6 +1232,7 @@ compact_case (struct ccase *dest, const struct ccase *src)
 {
   int i;
   int nval = 0;
 {
   int i;
   int nval = 0;
+  size_t var_cnt;
   
   assert (compaction_necessary);
 
   
   assert (compaction_necessary);
 
@@ -1220,9 +1245,10 @@ compact_case (struct ccase *dest, const struct ccase *src)
 
   /* Copy all the variables except the scratch variables from SRC to
      DEST. */
 
   /* Copy all the variables except the scratch variables from SRC to
      DEST. */
-  for (i = 0; i < default_dict.nvar; i++)
+  var_cnt = dict_get_var_cnt (default_dict);
+  for (i = 0; i < var_cnt; i++)
     {
     {
-      struct variable *v = default_dict.var[i];
+      struct variable *v = dict_get_var (default_dict, i);
       
       if (v->name[0] == '#')
        continue;
       
       if (v->name[0] == '#')
        continue;
@@ -1243,35 +1269,50 @@ compact_case (struct ccase *dest, const struct ccase *src)
 static void
 finish_compaction (void)
 {
 static void
 finish_compaction (void)
 {
-  int copy_index = 0;
-  int nval = 0;
   int i;
 
   int i;
 
-  for (i = 0; i < default_dict.nvar; i++)
+  for (i = 0; i < dict_get_var_cnt (default_dict); )
     {
     {
-      struct variable *v = default_dict.var[i];
-
-      if (v->name[0] == '#')
-       {
-         clear_variable (&default_dict, v);
-         free (v);
-         continue;
-       }
+      struct variable *v = dict_get_var (default_dict, i);
 
 
-      v->fv = nval;
-      if (v->type == NUMERIC)
-       nval++;
+      if (v->name[0] == '#') 
+        dict_delete_var (default_dict, v);
       else
       else
-       nval += DIV_RND_UP (v->width, sizeof (union value));
-      
-      default_dict.var[copy_index++] = v;
-    }
-  if (copy_index != default_dict.nvar)
-    {
-      default_dict.var = xrealloc (default_dict.var,
-                                  sizeof *default_dict.var * copy_index);
-      default_dict.nvar = copy_index;
+        i++;
     }
     }
+  dict_compact_values (default_dict);
+}
+
+struct case_source *
+create_case_source (const struct case_source_class *class, void *aux) 
+{
+  struct case_source *source = xmalloc (sizeof *source);
+  source->class = class;
+  source->aux = aux;
+  return source;
+}
+
+int
+case_source_is_complex (const struct case_source *source) 
+{
+  return source != NULL && (source->class == &input_program_source_class
+                            || source->class == &file_type_source_class);
+}
+
+int
+case_source_is_class (const struct case_source *source,
+                      const struct case_source_class *class) 
+{
+  return source != NULL && source->class == class;
+
+}
+
+struct case_sink *
+create_case_sink (const struct case_sink_class *class, void *aux) 
+{
+  struct case_sink *sink = xmalloc (sizeof *sink);
+  sink->class = class;
+  sink->aux = aux;
+  return sink;
 }
 
 }
 
-