Beginning of VFM cleanup.
[pspp-builds.git] / src / vfm.c
index 331f287f8c4d1de47283e8da46d909ee62d83c4f..82af200015d6aa40dc19f4fb6215d8a7c0480201 100644 (file)
--- a/src/vfm.c
+++ b/src/vfm.c
    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
    02111-1307, USA. */
 
-/* AIX requires this to be the first thing in the file.  */
 #include <config.h>
-#if __GNUC__
-#define alloca __builtin_alloca
-#else
-#if HAVE_ALLOCA_H
-#include <alloca.h>
-#else
-#ifdef _AIX
-#pragma alloca
-#else
-#ifndef alloca                 /* predefined by HP cc +Olibcalls */
-char *alloca ();
-#endif
-#endif
-#endif
-#endif
-
+#include "vfm.h"
+#include "vfmP.h"
 #include <assert.h>
 #include <errno.h>
 #include <stdio.h>
@@ -43,43 +28,43 @@ char *alloca ();
 #include <unistd.h>    /* Required by SunOS4. */
 #endif
 #include "alloc.h"
-#include "approx.h"
 #include "do-ifP.h"
 #include "error.h"
 #include "expr.h"
 #include "misc.h"
 #include "random.h"
+#include "settings.h"
 #include "som.h"
 #include "str.h"
 #include "tab.h"
 #include "var.h"
-#include "vector.h"
-#include "vfm.h"
-#include "vfmP.h"
+#include "value-labels.h"
 
 /*
    Virtual File Manager (vfm):
 
-   vfm is used to process data files.  It uses the model that data is
-   read from one stream (the data source), then written to another
-   (the data sink).  The data source is then deleted and the data sink
-   becomes the data source for the next procedure. */
+   vfm is used to process data files.  It uses the model that
+   data is read from one stream (the data source), processed,
+   then written to another (the data sink).  The data source is
+   then deleted and the data sink becomes the data source for the
+   next procedure. */
 
-#undef DEBUGGING
-/*#define DEBUGGING 1 */
 #include "debug-print.h"
 
-/* This is used to read from the active file. */
-struct case_stream *vfm_source;
+/* Procedure execution data. */
+struct write_case_data
+  {
+    void (*beginfunc) (void *);
+    int (*procfunc) (struct ccase *, void *);
+    void (*endfunc) (void *);
+    void *aux;
+  };
 
-/* `value' indexes to initialize to particular values for certain cases. */
-struct long_vec reinit_sysmis;         /* SYSMIS for every case. */
-struct long_vec reinit_blanks;         /* Blanks for every case. */
-struct long_vec init_zero;             /* Zero for first case only. */
-struct long_vec init_blanks;           /* Blanks for first case only. */
+/* The current active file, from which cases are read. */
+struct case_source *vfm_source;
 
-/* This is used to write to the replacement active file. */
-struct case_stream *vfm_sink;
+/* The replacement active file, to which cases are written. */
+struct case_sink *vfm_sink;
 
 /* Information about the data source. */
 struct stream_info vfm_source_info;
@@ -87,17 +72,6 @@ struct stream_info vfm_source_info;
 /* Information about the data sink. */
 struct stream_info vfm_sink_info;
 
-/* Filter variable and  `value' index. */
-static struct variable *filter_var;
-static int filter_index;
-
-#define FILTERED                                                       \
-       (filter_index != -1                                             \
-        && (temp_case->data[filter_index].f == 0.0                     \
-            || temp_case->data[filter_index].f == SYSMIS               \
-            || is_num_user_missing (temp_case->data[filter_index].f,   \
-                                    filter_var)))
-
 /* Nonzero if the case needs to have values deleted before being
    stored, zero otherwise. */
 int compaction_necessary;
@@ -110,21 +84,15 @@ int compaction_nval;
    `value's. */
 struct ccase *compaction_case;
 
-/* Within a session, when paging is turned on, it is never turned back
-   off.  This policy might be too aggressive. */
-static int paging = 0;
+/* Nonzero means that we've overflowed our allotted workspace.
+   After that happens once during a session, we always store the
+   active file on disk instead of in memory.  (This policy may be
+   too aggressive.) */
+static int workspace_overflow = 0;
 
 /* Time at which vfm was last invoked. */
 time_t last_vfm_invocation;
 
-/* Functions called during procedure processing. */
-static int (*proc_func) (struct ccase *);      /* Called for each case. */
-static int (*virt_proc_func) (struct ccase *); /* From SPLIT_FILE_procfunc. */
-static void (*begin_func) (void);      /* Called at beginning of a series. */
-static void (*virt_begin_func) (void); /* Called by SPLIT_FILE_procfunc. */
-static void (*end_func) (void);        /* Called after end of a series. */
-int (*write_case) (void);
-
 /* Number of cases passed to proc_func(). */
 static int case_count;
 
@@ -135,53 +103,76 @@ static int lag_head;              /* Index where next case will be added. */
 static struct ccase **lag_queue; /* Array of n_lag ccase * elements. */
 
 static void open_active_file (void);
-static void close_active_file (void);
-static int SPLIT_FILE_procfunc (struct ccase *);
+static void close_active_file (struct write_case_data *);
+static int SPLIT_FILE_procfunc (struct ccase *, void *);
 static void finish_compaction (void);
 static void lag_case (void);
-static int procedure_write_case (void);
+static int procedure_write_case (struct write_case_data *);
+static void clear_temp_case (void);
+static int exclude_this_case (void);
 \f
 /* Public functions. */
 
-/* Reads all the cases from the active file, transforms them by the
-   active set of transformations, calls PROCFUNC with CURCASE set to
-   the case and CASENUM set to the case number, and writes them to a
-   new active file.
+/* Reads all the cases from the active file, transforms them by
+   the active set of transformations, calls PROCFUNC with CURCASE
+   set to the case, and writes them to a new active file.
 
    Divides the active file into zero or more series of one or more
    cases each.  BEGINFUNC is called before each series.  ENDFUNC is
-   called after each series. */
+   called after each series.
+
+   Arbitrary user-specified data AUX is passed to BEGINFUNC,
+   PROCFUNC, and ENDFUNC as auxiliary data. */
 void
-procedure (void (*beginfunc) (void),
-          int (*procfunc) (struct ccase *curcase),
-          void (*endfunc) (void))
+procedure (void (*beginfunc) (void *),
+          int (*procfunc) (struct ccase *curcase, void *),
+          void (*endfunc) (void *),
+           void *aux)
 {
-  end_func = endfunc;
-  write_case = procedure_write_case;
+  static int recursive_call;
 
-  if (default_dict.n_splits && procfunc != NULL)
+  struct write_case_data procedure_write_data;
+  struct write_case_data split_file_data;
+
+  assert (++recursive_call == 1);
+
+  if (dict_get_split_cnt (default_dict) == 0) 
     {
-      virt_proc_func = procfunc;
-      proc_func = SPLIT_FILE_procfunc;
-      
-      virt_begin_func = beginfunc;
-      begin_func = NULL;
-    } else {
-      begin_func = beginfunc;
-      proc_func = procfunc;
+      /* Normally we just use the data passed by the user. */
+      procedure_write_data.beginfunc = beginfunc;
+      procedure_write_data.procfunc = procfunc;
+      procedure_write_data.endfunc = endfunc;
+      procedure_write_data.aux = aux;
+    }
+  else
+    {
+      /* Under SPLIT FILE, we add a layer of indirection. */
+      procedure_write_data.beginfunc = NULL;
+      procedure_write_data.procfunc = SPLIT_FILE_procfunc;
+      procedure_write_data.endfunc = endfunc;
+      procedure_write_data.aux = &split_file_data;
+
+      split_file_data.beginfunc = beginfunc;
+      split_file_data.procfunc = procfunc;
+      split_file_data.endfunc = endfunc;
+      split_file_data.aux = aux;
     }
 
   last_vfm_invocation = time (NULL);
 
   open_active_file ();
-  vfm_source->read ();
-  close_active_file ();
+  if (vfm_source != NULL) 
+    vfm_source->class->read (vfm_source,
+                             procedure_write_case, &procedure_write_data);
+  close_active_file (&procedure_write_data);
+
+  assert (--recursive_call == 0);
 }
 \f
 /* Active file processing support.  Subtly different semantics from
    procedure(). */
 
-static int process_active_file_write_case (void);
+static int process_active_file_write_case (struct write_case_data *data);
 
 /* The casefunc might want us to stop calling it. */
 static int not_canceled;
@@ -194,28 +185,35 @@ static int not_canceled;
 
    process_active_file() ignores TEMPORARY, SPLIT FILE, and N. */
 void
-process_active_file (void (*beginfunc) (void),
-                    int (*casefunc) (struct ccase *curcase),
-                    void (*endfunc) (void))
+process_active_file (void (*beginfunc) (void *),
+                    int (*casefunc) (struct ccase *curcase, void *),
+                    void (*endfunc) (void *),
+                     void *aux)
 {
-  proc_func = casefunc;
-  write_case = process_active_file_write_case;
+  struct write_case_data process_active_write_data;
+
+  process_active_write_data.beginfunc = beginfunc;
+  process_active_write_data.procfunc = casefunc;
+  process_active_write_data.endfunc = endfunc;
+  process_active_write_data.aux = aux;
+
   not_canceled = 1;
 
   open_active_file ();
-  beginfunc ();
+  beginfunc (aux);
   
   /* There doesn't necessarily need to be an active file. */
-  if (vfm_source)
-    vfm_source->read ();
+  if (vfm_source != NULL)
+    vfm_source->class->read (vfm_source, process_active_file_write_case,
+                             &process_active_write_data);
   
-  endfunc ();
-  close_active_file ();
+  endfunc (aux);
+  close_active_file (&process_active_write_data);
 }
 
 /* Pass the current case to casefunc. */
 static int
-process_active_file_write_case (void)
+process_active_file_write_case (struct write_case_data *data)
 {
   /* Index of current transformation. */
   int cur_trns;
@@ -245,25 +243,14 @@ process_active_file_write_case (void)
     lag_case ();
          
   /* Call the procedure if FILTER and PROCESS IF don't prohibit it. */
-  if (not_canceled
-      && !FILTERED
-      && (process_if_expr == NULL ||
-         expr_evaluate (process_if_expr, temp_case, NULL) == 1.0))
-    not_canceled = proc_func (temp_case);
+  if (not_canceled && !exclude_this_case ())
+    not_canceled = data->procfunc (temp_case, data->aux);
   
   case_count++;
   
  done:
-  {
-    long *lp;
+  clear_temp_case ();
 
-    /* This case is finished.  Initialize the variables for the next case. */
-    for (lp = reinit_sysmis.vec; *lp != -1;)
-      temp_case->data[*lp++].f = SYSMIS;
-    for (lp = reinit_blanks.vec; *lp != -1;)
-      memset (temp_case->data[*lp++].s, ' ', MAX_SHORT_STRING);
-  }
-  
   return 1;
 }
 
@@ -272,7 +259,7 @@ void
 process_active_file_output_case (void)
 {
   vfm_sink_info.ncases++;
-  vfm_sink->write ();
+  vfm_sink->class->write (vfm_sink, temp_case);
 }
 \f
 /* Opening the active file. */
@@ -299,25 +286,27 @@ prepare_for_writing (void)
      file--it's just a waste of time and space. */
 
   vfm_sink_info.ncases = 0;
-  vfm_sink_info.nval = default_dict.nval;
-  vfm_sink_info.case_size = (sizeof (struct ccase)
-                            + (default_dict.nval - 1) * sizeof (union value));
+  vfm_sink_info.nval = dict_get_next_value_idx (default_dict);
+  vfm_sink_info.case_size = dict_get_case_size (default_dict);
   
   if (vfm_sink == NULL)
     {
-      if (vfm_sink_info.case_size * vfm_source_info.ncases > MAX_WORKSPACE
-         && !paging)
+      if (vfm_sink_info.case_size * vfm_source_info.ncases > set_max_workspace
+         && !workspace_overflow)
        {
          msg (MW, _("Workspace overflow predicted.  Max workspace is "
                     "currently set to %d KB (%d cases at %d bytes each).  "
-                    "Paging active file to disk."),
-              MAX_WORKSPACE / 1024, MAX_WORKSPACE / vfm_sink_info.case_size,
+                    "Writing active file to disk."),
+              set_max_workspace / 1024, set_max_workspace / vfm_sink_info.case_size,
               vfm_sink_info.case_size);
          
-         paging = 1;
+         workspace_overflow = 1;
        }
-      
-      vfm_sink = paging ? &vfm_disk_stream : &vfm_memory_stream;
+
+      if (workspace_overflow)
+        vfm_sink = create_case_sink (&disk_sink_class, NULL);
+      else
+        vfm_sink = create_case_sink (&memory_sink_class, NULL);
     }
 }
 
@@ -331,22 +320,30 @@ arrange_compaction (void)
     int i;
     
     /* Count up the number of `value's that will be output. */
-    for (i = 0; i < temp_dict->nvar; i++)
-      if (temp_dict->var[i]->name[0] != '#')
-       {
-         assert (temp_dict->var[i]->nv > 0);
-         count_values += temp_dict->var[i]->nv;
-       }
-    assert (temporary == 2 || count_values <= temp_dict->nval);
+    for (i = 0; i < dict_get_var_cnt (temp_dict); i++) 
+      {
+        struct variable *v = dict_get_var (temp_dict, i);
+
+        if (v->name[0] != '#')
+          {
+            assert (v->nv > 0);
+            count_values += v->nv;
+          } 
+      }
+    assert (temporary == 2
+            || count_values <= dict_get_next_value_idx (temp_dict));
   }
   
   /* Compaction is only necessary if the number of `value's to output
      differs from the number already present. */
   compaction_nval = count_values;
-  compaction_necessary = temporary == 2 || count_values != temp_dict->nval;
+  if (temporary == 2 || count_values != dict_get_next_value_idx (temp_dict))
+    compaction_necessary = 1;
+  else
+    compaction_necessary = 0;
   
-  if (vfm_sink->init)
-    vfm_sink->init ();
+  if (vfm_sink->class->open != NULL)
+    vfm_sink->class->open (vfm_sink);
 }
 
 /* Prepares the temporary case and compaction case. */
@@ -370,7 +367,7 @@ index_to_varname (int ccase_index)
 
   for (i = 0; i < default_dict.nvar; i++)
     {
-      variable *v = default_dict.var[i];
+      struct variable *v = default_dict.var[i];
       
       if (ccase_index >= v->fv && ccase_index < v->fv + v->nv)
        return default_dict.var[i]->name;
@@ -379,67 +376,28 @@ index_to_varname (int ccase_index)
 }
 #endif
 
-/* Initializes temp_case from the vectors that say which `value's need
-   to be initialized just once, and which ones need to be
+/* Initializes temp_case from the vectors that say which `value's
+   need to be initialized just once, and which ones need to be
    re-initialized before every case. */
 static void
 vector_initialization (void)
 {
-  int i;
-  long *lp;
-  
-  /* Just once. */
-  for (i = 0; i < init_zero.n; i++)
-    temp_case->data[init_zero.vec[i]].f = 0.0;
-  for (i = 0; i < init_blanks.n; i++)
-    memset (temp_case->data[init_blanks.vec[i]].s, ' ', MAX_SHORT_STRING);
-
-  /* These vectors need to be repeatedly accessed, so we add a
-     sentinel to (hopefully) improve speed. */
-  vec_insert (&reinit_sysmis, -1);
-  vec_insert (&reinit_blanks, -1);
-
-  for (lp = reinit_sysmis.vec; *lp != -1;)
-    temp_case->data[*lp++].f = SYSMIS;
-  for (lp = reinit_blanks.vec; *lp != -1;)
-    memset (temp_case->data[*lp++].s, ' ', MAX_SHORT_STRING);
-  
-#if DEBUGGING
-  printf ("vfm: init_zero=");
-  for (i = 0; i < init_zero.n; i++)
-    printf ("%s%s", i ? "," : "", index_to_varname (init_zero.vec[i]));
-  printf (" init_blanks=");
-  for (i = 0; i < init_blanks.n; i++)
-    printf ("%s%s", i ? "," : "", index_to_varname (init_blanks.vec[i]));
-  printf (" reinit_sysmis=");
-  for (lp = reinit_sysmis.vec; *lp != -1; lp++)
-    printf ("%s%s", lp != reinit_sysmis.vec ? "," : "",
-           index_to_varname (*lp));
-  printf (" reinit_blanks=");
-  for (lp = reinit_blanks.vec; *lp != -1; lp++)
-    printf ("%s%s", lp != reinit_blanks.vec ? "," : "",
-           index_to_varname (*lp));
-  printf ("\n");
-#endif
-}
-
-/* Sets filter_index to an appropriate value. */
-static void
-setup_filter (void)
-{
-  filter_index = -1;
+  size_t var_cnt = dict_get_var_cnt (default_dict);
+  size_t i;
   
-  if (default_dict.filter_var[0])
+  for (i = 0; i < var_cnt; i++) 
     {
-      struct variable *fv = find_variable (default_dict.filter_var);
-      
-      if (fv == NULL || fv->type == ALPHA)
-       default_dict.filter_var[0] = 0;
+      struct variable *v = dict_get_var (default_dict, i);
+
+      if (v->type == NUMERIC) 
+        {
+          if (v->reinit)
+            temp_case->data[v->fv].f = 0.0;
+          else
+            temp_case->data[v->fv].f = SYSMIS;
+        }
       else
-       {
-         filter_index = fv->index;
-         filter_var = fv;
-       }
+        memset (temp_case->data[v->fv].s, ' ', v->width);
     }
 }
 
@@ -456,7 +414,7 @@ setup_lag (void)
   lag_head = 0;
   lag_queue = xmalloc (n_lag * sizeof *lag_queue);
   for (i = 0; i < n_lag; i++)
-    lag_queue[i] = xmalloc (temp_dict->nval * sizeof **lag_queue);
+    lag_queue[i] = xmalloc (dict_get_case_size (temp_dict));
 }
 
 /* There is a lot of potential confusion in the vfm and related
@@ -505,7 +463,7 @@ open_active_file (void)
   if (!temporary)
     {
       temp_trns = n_trns;
-      temp_dict = &default_dict;
+      temp_dict = default_dict;
     }
 
   /* No cases passed to the procedure yet. */
@@ -516,9 +474,7 @@ open_active_file (void)
   arrange_compaction ();
   make_temp_case ();
   vector_initialization ();
-  setup_randomize ();
   discard_ctl_stack ();
-  setup_filter ();
   setup_lag ();
 
   /* Debug output. */
@@ -533,11 +489,11 @@ open_active_file (void)
 \f
 /* Closes the active file. */
 static void
-close_active_file (void)
+close_active_file (struct write_case_data *data)
 {
   /* Close the current case group. */
-  if (case_count && end_func != NULL)
-    end_func ();
+  if (case_count && data->endfunc != NULL)
+    data->endfunc (data->aux);
 
   /* Stop lagging (catch up?). */
   if (n_lag)
@@ -554,31 +510,34 @@ close_active_file (void)
      off TEMPORARY. */
   if (temporary)
     {
-      restore_dictionary (temp_dict);
+      dict_destroy (default_dict);
+      default_dict = temp_dict;
       temp_dict = NULL;
     }
 
-  /* The default dictionary assumes the compacted data size. */
-  default_dict.nval = compaction_nval;
+  /* Finish compaction. */
+  if (compaction_necessary)
+    finish_compaction ();
     
   /* Old data sink --> New data source. */
-  if (vfm_source && vfm_source->destroy_source)
-    vfm_source->destroy_source ();
-  
-  vfm_source = vfm_sink;
+  if (vfm_source != NULL) 
+    {
+      if (vfm_source->class->destroy != NULL)
+        vfm_source->class->destroy (vfm_source);
+      free (vfm_source);
+    }
+
+  vfm_source = vfm_sink->class->make_source (vfm_sink);
   vfm_source_info.ncases = vfm_sink_info.ncases;
   vfm_source_info.nval = compaction_nval;
   vfm_source_info.case_size = (sizeof (struct ccase)
                               + (compaction_nval - 1) * sizeof (union value));
-  if (vfm_source->mode)
-    vfm_source->mode ();
 
   /* Old data sink is gone now. */
+  free (vfm_sink);
   vfm_sink = NULL;
 
-  /* Finish compaction. */
-  if (compaction_necessary)
-    finish_compaction ();
+  /* Cancel TEMPORARY. */
   cancel_temporary ();
 
   /* Free temporary cases. */
@@ -593,47 +552,29 @@ close_active_file (void)
   process_if_expr = NULL;
 
   /* Cancel FILTER if temporary. */
-  if (filter_index != -1 && !FILTER_before_TEMPORARY)
-    default_dict.filter_var[0] = 0;
+  if (dict_get_filter (default_dict) != NULL && !FILTER_before_TEMPORARY)
+    dict_set_filter (default_dict, NULL);
 
   /* Cancel transformations. */
   cancel_transformations ();
 
-  /* Clear value-initialization vectors. */
-  vec_clear (&init_zero);
-  vec_clear (&init_blanks);
-  vec_clear (&reinit_sysmis);
-  vec_clear (&reinit_blanks);
-
   /* Turn off case limiter. */
-  default_dict.N = 0;
+  dict_set_case_limit (default_dict, 0);
 
   /* Clear VECTOR vectors. */
-  {
-    int i;
-
-    for (i = 0; i < nvec; i++)
-      free (vec[i].v);
-    free (vec);
-    vec = NULL;
-    nvec = 0;
-  }
+  dict_clear_vectors (default_dict);
 
   debug_printf (("vfm: procedure complete\n\n"));
 }
 \f
 /* Disk case stream. */
 
-/* Associated files. */
-FILE *disk_source_file;
-FILE *disk_sink_file;
-
 /* Initializes the disk sink. */
 static void
-disk_stream_init (void)
+disk_sink_create (struct case_sink *sink)
 {
-  disk_sink_file = tmpfile ();
-  if (!disk_sink_file)
+  sink->aux = tmpfile ();
+  if (!sink->aux)
     {
       msg (ME, _("An error occurred attempting to create a temporary "
                 "file for use as the active file: %s."),
@@ -642,44 +583,21 @@ disk_stream_init (void)
     }
 }
 
-/* Reads all cases from the disk source and passes them one by one to
-   write_case(). */
-static void
-disk_stream_read (void)
-{
-  int i;
-
-  for (i = 0; i < vfm_source_info.ncases; i++)
-    {
-      if (!fread (temp_case, vfm_source_info.case_size, 1, disk_source_file))
-       {
-         msg (ME, _("An error occurred while attempting to read from "
-              "a temporary file created for the active file: %s."),
-              strerror (errno));
-         err_failure ();
-         return;
-       }
-
-      if (!write_case ())
-       return;
-    }
-}
-
 /* Writes temp_case to the disk sink. */
 static void
-disk_stream_write (void)
+disk_sink_write (struct case_sink *sink, struct ccase *c)
 {
+  FILE *file = sink->aux;
   union value *src_case;
 
   if (compaction_necessary)
     {
-      compact_case (compaction_case, temp_case);
-      src_case = (union value *) compaction_case;
+      compact_case (compaction_case, c);
+      src_case = compaction_case->data;
     }
-  else src_case = (union value *) temp_case;
+  else src_case = c->data;
 
-  if (fwrite (src_case, sizeof *src_case * compaction_nval, 1,
-             disk_sink_file) != 1)
+  if (fwrite (src_case, sizeof *src_case * compaction_nval, 1, file) != 1)
     {
       msg (ME, _("An error occurred while attempting to write to a "
                 "temporary file used as the active file: %s."),
@@ -688,12 +606,25 @@ disk_stream_write (void)
     }
 }
 
-/* Switches the stream from a sink to a source. */
+/* Destroys the sink's internal data. */
 static void
-disk_stream_mode (void)
+disk_sink_destroy (struct case_sink *sink)
 {
-  /* Rewind the sink. */
-  if (fseek (disk_sink_file, 0, SEEK_SET) != 0)
+  FILE *file = sink->aux;
+  if (file != NULL)
+    fclose (file);
+}
+
+/* Closes and destroys the sink and returns a disk source to read
+   back the written data. */
+static struct case_source *
+disk_sink_make_source (struct case_sink *sink) 
+{
+  FILE *file = sink->aux;
+  
+  /* Rewind the file. */
+  assert (file != NULL);
+  if (fseek (file, 0, SEEK_SET) != 0)
     {
       msg (ME, _("An error occurred while attempting to rewind a "
                 "temporary file used as the active file: %s."),
@@ -701,107 +632,118 @@ disk_stream_mode (void)
       err_failure ();
     }
   
-  /* Sink --> source variables. */
-  disk_source_file = disk_sink_file;
+  return create_case_source (&disk_source_class, file);
 }
 
-/* Destroys the source's internal data. */
+/* Disk sink. */
+const struct case_sink_class disk_sink_class = 
+  {
+    "disk",
+    disk_sink_create,
+    disk_sink_write,
+    disk_sink_destroy,
+    disk_sink_make_source,
+  };
+\f
+/* Disk source. */
+
+/* Reads all cases from the disk source and passes them one by one to
+   write_case(). */
 static void
-disk_stream_destroy_source (void)
+disk_source_read (struct case_source *source,
+                  write_case_func *write_case, write_case_data wc_data)
 {
-  if (disk_source_file)
+  FILE *file = source->aux;
+  int i;
+
+  for (i = 0; i < vfm_source_info.ncases; i++)
     {
-      fclose (disk_source_file);
-      disk_source_file = NULL;
+      if (!fread (temp_case, vfm_source_info.case_size, 1, file))
+       {
+         msg (ME, _("An error occurred while attempting to read from "
+              "a temporary file created for the active file: %s."),
+              strerror (errno));
+         err_failure ();
+         return;
+       }
+
+      if (!write_case (wc_data))
+       return;
     }
 }
 
-/* Destroys the sink's internal data. */
+/* Destroys the source's internal data. */
 static void
-disk_stream_destroy_sink (void)
+disk_source_destroy (struct case_source *source)
 {
-  if (disk_sink_file)
-    {
-      fclose (disk_sink_file);
-      disk_sink_file = NULL;
-    }
+  FILE *file = source->aux;
+  if (file != NULL)
+    fclose (file);
 }
 
-/* Disk stream. */
-struct case_stream vfm_disk_stream = 
+/* Disk source. */
+const struct case_source_class disk_source_class = 
   {
-    disk_stream_init,
-    disk_stream_read,
-    disk_stream_write,
-    disk_stream_mode,
-    disk_stream_destroy_source,
-    disk_stream_destroy_sink,
     "disk",
+    disk_source_read,
+    disk_source_destroy,
   };
 \f
 /* Memory case stream. */
 
-/* List of cases stored in the stream. */
-struct case_list *memory_source_cases;
-struct case_list *memory_sink_cases;
-
-/* Current case. */
-struct case_list *memory_sink_iter;
+/* Memory sink data. */
+struct memory_sink_info
+  {
+    int max_cases;              /* Maximum cases before switching to disk. */
+    struct case_list *head;     /* First case in list. */
+    struct case_list *tail;     /* Last case in list. */
+  };
 
-/* Maximum number of cases. */
-int memory_sink_max_cases;
+/* Memory source data. */
+struct memory_source_info 
+  {
+    struct case_list *cases;    /* List of cases. */
+  };
 
-/* Initializes the memory stream variables for writing. */
 static void
-memory_stream_init (void)
+memory_sink_create (struct case_sink *sink) 
 {
-  memory_sink_cases = NULL;
-  memory_sink_iter = NULL;
+  struct memory_sink_info *info;
   
-  assert (compaction_nval);
-  memory_sink_max_cases = MAX_WORKSPACE / (sizeof (union value) * compaction_nval);
-}
+  sink->aux = info = xmalloc (sizeof *info);
 
-/* Reads the case stream from memory and passes it to write_case(). */
-static void
-memory_stream_read (void)
-{
-  while (memory_source_cases != NULL)
-    {
-      memcpy (temp_case, &memory_source_cases->c, vfm_source_info.case_size);
-      
-      {
-       struct case_list *current = memory_source_cases;
-       memory_source_cases = memory_source_cases->next;
-       free (current);
-      }
-      
-      if (!write_case ())
-       return;
-    }
+  assert (compaction_nval > 0);
+  info->max_cases = set_max_workspace / (sizeof (union value) * compaction_nval);
+  info->head = info->tail = NULL;
 }
 
-/* Writes temp_case to the memory stream. */
 static void
-memory_stream_write (void)
+memory_sink_write (struct case_sink *sink, struct ccase *c) 
 {
-  struct case_list *new_case = malloc (sizeof (struct case_list)
-                                      + ((compaction_nval - 1)
-                                         * sizeof (union value)));
+  struct memory_sink_info *info = sink->aux;
+  size_t case_size;
+  struct case_list *new_case;
+
+  case_size = sizeof (struct case_list)
+                      + ((compaction_nval - 1) * sizeof (union value));
+  new_case = malloc (case_size);
 
   /* If we've got memory to spare then add it to the linked list. */
-  if (vfm_sink_info.ncases <= memory_sink_max_cases && new_case != NULL)
+  if (vfm_sink_info.ncases <= info->max_cases && new_case != NULL)
     {
-      if (compaction_necessary)
-       compact_case (&new_case->c, temp_case);
+      /* Append case to linked list. */
+      new_case->next = NULL;
+      if (info->head != NULL)
+        info->tail->next = new_case;
       else
-       memcpy (&new_case->c, temp_case, sizeof (union value) * compaction_nval);
+        info->head = new_case;
+      info->tail = new_case;
 
-      /* Append case to linked list. */
-      if (memory_sink_cases)
-       memory_sink_iter = memory_sink_iter->next = new_case;
+      /* Copy data into case. */
+      if (compaction_necessary)
+       compact_case (&new_case->c, c);
       else
-       memory_sink_iter = memory_sink_cases = new_case;
+       memcpy (&new_case->c, c, sizeof (union value) * compaction_nval);
     }
   else
     {
@@ -810,36 +752,32 @@ memory_stream_write (void)
 
       /* Notify the user. */
       if (!new_case)
-       msg (MW, _("Virtual memory exhausted.  Paging active file "
+       msg (MW, _("Virtual memory exhausted.  Writing active file "
                   "to disk."));
       else
        msg (MW, _("Workspace limit of %d KB (%d cases at %d bytes each) "
-                  "overflowed.  Paging active file to disk."),
-            MAX_WORKSPACE / 1024, memory_sink_max_cases,
+                  "overflowed.  Writing active file to disk."),
+            set_max_workspace / 1024, info->max_cases,
             compaction_nval * sizeof (union value));
 
       free (new_case);
 
       /* Switch to a disk sink. */
-      vfm_sink = &vfm_disk_stream;
-      vfm_sink->init ();
-      paging = 1;
-
-      /* Terminate the list. */
-      if (memory_sink_iter)
-       memory_sink_iter->next = NULL;
+      vfm_sink = create_case_sink (&disk_sink_class, NULL);
+      vfm_sink->class->open (vfm_sink);
+      workspace_overflow = 1;
 
       /* Write the cases to disk and destroy them.  We can't call
          vfm->sink->write() because of compaction. */
-      for (cur = memory_sink_cases; cur; cur = next)
+      for (cur = info->head; cur; cur = next)
        {
          next = cur->next;
          if (fwrite (cur->c.data, sizeof (union value) * compaction_nval, 1,
-                     disk_sink_file) != 1)
+                     vfm_sink->aux) != 1)
            {
              msg (ME, _("An error occurred while attempting to "
                         "write to a temporary file created as the "
-                        "active file, while paging to disk: %s."),
+                        "active file: %s."),
                   strerror (errno));
              err_failure ();
            }
@@ -847,36 +785,38 @@ memory_stream_write (void)
        }
 
       /* Write the current case to disk. */
-      vfm_sink->write ();
+      vfm_sink->class->write (vfm_sink, c);
     }
 }
 
 /* If the data is stored in memory, causes it to be written to disk.
    To be called only *between* procedure()s, not within them. */
 void
-page_to_disk (void)
+write_active_file_to_disk (void)
 {
-  if (vfm_source == &vfm_memory_stream)
+  if (case_source_is_class (vfm_source, &memory_source_class))
     {
+      struct memory_source_info *info = vfm_source->aux;
+
       /* Switch to a disk sink. */
-      vfm_sink = &vfm_disk_stream;
-      vfm_sink->init ();
-      paging = 1;
+      vfm_sink = create_case_sink (&disk_sink_class, NULL);
+      vfm_sink->class->open (vfm_sink);
+      workspace_overflow = 1;
       
       /* Write the cases to disk and destroy them.  We can't call
          vfm->sink->write() because of compaction. */
       {
        struct case_list *cur, *next;
        
-       for (cur = memory_source_cases; cur; cur = next)
+       for (cur = info->cases; cur; cur = next)
          {
            next = cur->next;
            if (fwrite (cur->c.data, sizeof *cur->c.data * compaction_nval, 1,
-                       disk_sink_file) != 1)
+                       vfm_sink->aux) != 1)
              {
                msg (ME, _("An error occurred while attempting to "
                           "write to a temporary file created as the "
-                          "active file, while paging to disk: %s."),
+                          "active file: %s."),
                     strerror (errno));
                err_failure ();
              }
@@ -884,67 +824,109 @@ page_to_disk (void)
          }
       }
       
-      vfm_source = &vfm_disk_stream;
-      vfm_source->mode ();
-
+      vfm_source = vfm_sink->class->make_source (vfm_sink);
       vfm_sink = NULL;
     }
 }
 
-/* Switch the memory stream from sink to source mode. */
+/* Destroy all memory sink data. */
 static void
-memory_stream_mode (void)
+memory_sink_destroy (struct case_sink *sink)
 {
-  /* Terminate the list. */
-  if (memory_sink_iter)
-    memory_sink_iter->next = NULL;
+  struct memory_sink_info *info = sink->aux;
+  struct case_list *cur, *next;
+  
+  for (cur = info->head; cur; cur = next)
+    {
+      next = cur->next;
+      free (cur);
+    }
+  free (info);
+}
+
+/* Switch the memory stream from sink to source mode. */
+static struct case_source *
+memory_sink_make_source (struct case_sink *sink)
+{
+  struct memory_sink_info *sink_info = sink->aux;
+  struct memory_source_info *source_info;
 
-  /* Sink --> source variables. */
-  memory_source_cases = memory_sink_cases;
-  memory_sink_cases = NULL;
+  source_info = xmalloc (sizeof *source_info);
+  source_info->cases = sink_info->head;
+
+  free (sink_info);
+
+  return create_case_source (&memory_source_class, source_info);
 }
 
-/* Destroy all memory source data. */
+const struct case_sink_class memory_sink_class = 
+  {
+    "memory",
+    memory_sink_create,
+    memory_sink_write,
+    memory_sink_destroy,
+    memory_sink_make_source,
+  };
+
+/* Reads the case stream from memory and passes it to write_case(). */
 static void
-memory_stream_destroy_source (void)
+memory_source_read (struct case_source *source,
+                    write_case_func *write_case, write_case_data wc_data)
 {
-  struct case_list *cur, *next;
-  
-  for (cur = memory_source_cases; cur; cur = next)
+  struct memory_source_info *info = source->aux;
+
+  while (info->cases != NULL) 
     {
-      next = cur->next;
-      free (cur);
+      struct case_list *iter = info->cases;
+      info->cases = iter->next;
+      memcpy (temp_case, &iter->c, vfm_source_info.case_size);
+      free (iter);
+      
+      if (!write_case (wc_data))
+       return;
     }
-  memory_source_cases = NULL;
 }
 
-/* Destroy all memory sink data. */
+/* Destroy all memory source data. */
 static void
-memory_stream_destroy_sink (void)
+memory_source_destroy (struct case_source *source)
 {
+  struct memory_source_info *info = source->aux;
   struct case_list *cur, *next;
   
-  for (cur = memory_sink_cases; cur; cur = next)
+  for (cur = info->cases; cur; cur = next)
     {
       next = cur->next;
       free (cur);
     }
-  memory_sink_cases = NULL;
+  free (info);
 }
-  
+
+struct case_list *
+memory_source_get_cases (const struct case_source *source) 
+{
+  struct memory_source_info *info = source->aux;
+
+  return info->cases;
+}
+
+void
+memory_source_set_cases (const struct case_source *source,
+                         struct case_list *cases) 
+{
+  struct memory_source_info *info = source->aux;
+
+  info->cases = cases;
+}
+
 /* Memory stream. */
-struct case_stream vfm_memory_stream = 
+const struct case_source_class memory_source_class = 
   {
-    memory_stream_init,
-    memory_stream_read,
-    memory_stream_write,
-    memory_stream_mode,
-    memory_stream_destroy_source,
-    memory_stream_destroy_sink,
     "memory",
+    memory_source_read,
+    memory_source_destroy,
   };
 \f
-#undef DEBUGGING
 #include "debug-print.h"
 
 /* Add temp_case to the lag queue. */
@@ -953,7 +935,8 @@ lag_case (void)
 {
   if (lag_count < n_lag)
     lag_count++;
-  memcpy (lag_queue[lag_head], temp_case, sizeof (union value) * temp_dict->nval);
+  memcpy (lag_queue[lag_head], temp_case,
+          dict_get_case_size (temp_dict));
   if (++lag_head >= n_lag)
     lag_head = 0;
 }
@@ -980,7 +963,7 @@ lagged_case (int n_before)
    otherwise.  Do not call this function again after it has returned
    zero once.  */
 int
-procedure_write_case (void)
+procedure_write_case (write_case_data wc_data)
 {
   /* Index of current transformation. */
   int cur_trns;
@@ -1002,10 +985,11 @@ procedure_write_case (void)
            lag_case ();
          
          vfm_sink_info.ncases++;
-         vfm_sink->write ();
+         vfm_sink->class->write (vfm_sink, temp_case);
 
-         if (default_dict.N)
-           more_cases = vfm_sink_info.ncases < default_dict.N;
+         if (dict_get_case_limit (default_dict))
+           more_cases = (vfm_sink_info.ncases
+                          < dict_get_case_limit (default_dict));
        }
 
       /* Are we done? */
@@ -1037,36 +1021,70 @@ procedure_write_case (void)
     }
 
   /* Call the beginning of group function. */
-  if (!case_count && begin_func != NULL)
-    begin_func ();
+  if (!case_count && wc_data->beginfunc != NULL)
+    wc_data->beginfunc (wc_data->aux);
 
   /* Call the procedure if there is one and FILTER and PROCESS IF
      don't prohibit it. */
-  if (proc_func != NULL
-      && !FILTERED
-      && (process_if_expr == NULL ||
-         expr_evaluate (process_if_expr, temp_case, NULL) == 1.0))
-    proc_func (temp_case);
+  if (wc_data->procfunc != NULL && !exclude_this_case ())
+    wc_data->procfunc (temp_case, wc_data->aux);
 
   case_count++;
   
 done:
   debug_putc ('\n', stdout);
-  
-  {
-    long *lp;
 
-    /* This case is finished.  Initialize the variables for the next case. */
-    for (lp = reinit_sysmis.vec; *lp != -1;)
-      temp_case->data[*lp++].f = SYSMIS;
-    for (lp = reinit_blanks.vec; *lp != -1;)
-      memset (temp_case->data[*lp++].s, ' ', MAX_SHORT_STRING);
-  }
+  clear_temp_case ();
   
   /* Return previously determined value. */
   return more_cases;
 }
 
+/* Clears the variables in the temporary case that need to be
+   cleared between processing cases.  */
+static void
+clear_temp_case (void)
+{
+  /* FIXME?  This is linear in the number of variables, but
+     doesn't need to be, so it's an easy optimization target. */
+  size_t var_cnt = dict_get_var_cnt (default_dict);
+  size_t i;
+  
+  for (i = 0; i < var_cnt; i++) 
+    {
+      struct variable *v = dict_get_var (default_dict, i);
+      if (v->init && v->reinit) 
+        {
+          if (v->type == NUMERIC) 
+            temp_case->data[v->fv].f = SYSMIS;
+          else
+            memset (temp_case->data[v->fv].s, ' ', v->width);
+        } 
+    }
+}
+
+/* Returns nonzero if this case should be exclude as specified on
+   FILTER or PROCESS IF, otherwise zero. */
+static int
+exclude_this_case (void)
+{
+  /* FILTER. */
+  struct variable *filter_var = dict_get_filter (default_dict);
+  if (filter_var != NULL) 
+    {
+      double f = temp_case->data[filter_var->fv].f;
+      if (f == 0.0 || f == SYSMIS || is_num_user_missing (f, filter_var))
+        return 1;
+    }
+
+  /* PROCESS IF. */
+  if (process_if_expr != NULL
+      && expr_evaluate (process_if_expr, temp_case, NULL) != 1.0)
+    return 1;
+
+  return 0;
+}
+
 /* Appends TRNS to t_trns[], the list of all transformations to be
    performed on data as it is read from the active file. */
 void
@@ -1105,37 +1123,35 @@ cancel_transformations (void)
 static void
 dump_splits (struct ccase *c)
 {
-  struct variable **iter;
+  struct variable *const *split;
   struct tab_table *t;
+  size_t split_cnt;
   int i;
 
-  t = tab_create (3, default_dict.n_splits + 1, 0);
+  split_cnt = dict_get_split_cnt (default_dict);
+  t = tab_create (3, split_cnt + 1, 0);
   tab_dim (t, tab_natural_dimensions);
-  tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, default_dict.n_splits);
-  tab_vline (t, TAL_1 | TAL_SPACING, 2, 0, default_dict.n_splits);
+  tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, split_cnt);
+  tab_vline (t, TAL_1 | TAL_SPACING, 2, 0, split_cnt);
   tab_text (t, 0, 0, TAB_NONE, _("Variable"));
   tab_text (t, 1, 0, TAB_LEFT, _("Value"));
   tab_text (t, 2, 0, TAB_LEFT, _("Label"));
-  for (iter = default_dict.splits, i = 0; *iter; iter++, i++)
+  split = dict_get_split_vars (default_dict);
+  for (i = 0; i < split_cnt; i++)
     {
-      struct variable *v = *iter;
+      struct variable *v = split[i];
       char temp_buf[80];
-      char *val_lab;
+      const char *val_lab;
 
       assert (v->type == NUMERIC || v->type == ALPHA);
       tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
       
-      {
-       union value val = c->data[v->fv];
-       if (v->type == ALPHA)
-         val.c = c->data[v->fv].s;
-       data_out (temp_buf, &v->print, &val);
-      }
+      data_out (temp_buf, &v->print, &c->data[v->fv]);
       
       temp_buf[v->print.w] = 0;
       tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
 
-      val_lab = get_val_lab (v, c->data[v->fv], 0);
+      val_lab = val_labs_find (v->val_labs, c->data[v->fv]);
       if (val_lab)
        tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
     }
@@ -1147,10 +1163,13 @@ dump_splits (struct ccase *c)
    SPLIT FILE is active.  This function forms a wrapper around that
    procfunc by dividing the input into series. */
 static int
-SPLIT_FILE_procfunc (struct ccase *c)
+SPLIT_FILE_procfunc (struct ccase *c, void *data_)
 {
+  struct write_case_data *data = data_;
   static struct ccase *prev_case;
-  struct variable **iter;
+  struct variable *const *split;
+  size_t split_cnt;
+  size_t i;
 
   /* The first case always begins a new series.  We also need to
      preserve the values of the case for later comparison. */
@@ -1162,22 +1181,24 @@ SPLIT_FILE_procfunc (struct ccase *c)
       memcpy (prev_case, c, vfm_sink_info.case_size);
 
       dump_splits (c);
-      if (virt_begin_func != NULL)
-       virt_begin_func ();
+      if (data->beginfunc != NULL)
+       data->beginfunc (data->aux);
       
-      return virt_proc_func (c);
+      return data->procfunc (c, data->aux);
     }
 
   /* Compare the value of each SPLIT FILE variable to the values on
      the previous case. */
-  for (iter = default_dict.splits; *iter; iter++)
+  split = dict_get_split_vars (default_dict);
+  split_cnt = dict_get_split_cnt (default_dict);
+  for (i = 0; i < split_cnt; i++)
     {
-      struct variable *v = *iter;
+      struct variable *v = split[i];
       
       switch (v->type)
        {
        case NUMERIC:
-         if (approx_ne (c->data[v->fv].f, prev_case->data[v->fv].f))
+         if (c->data[v->fv].f != prev_case->data[v->fv].f)
            goto not_equal;
          break;
        case ALPHA:
@@ -1188,19 +1209,19 @@ SPLIT_FILE_procfunc (struct ccase *c)
          assert (0);
        }
     }
-  return virt_proc_func (c);
+  return data->procfunc (c, data->aux);
   
 not_equal:
   /* The values of the SPLIT FILE variable are different from the
      values on the previous case.  That means that it's time to begin
      a new series. */
-  if (end_func != NULL)
-    end_func ();
+  if (data->endfunc != NULL)
+    data->endfunc (data->aux);
   dump_splits (c);
-  if (virt_begin_func != NULL)
-    virt_begin_func ();
+  if (data->beginfunc != NULL)
+    data->beginfunc (data->aux);
   memcpy (prev_case, c, vfm_sink_info.case_size);
-  return virt_proc_func (c);
+  return data->procfunc (c, data->aux);
 }
 \f
 /* Case compaction. */
@@ -1211,6 +1232,7 @@ compact_case (struct ccase *dest, const struct ccase *src)
 {
   int i;
   int nval = 0;
+  size_t var_cnt;
   
   assert (compaction_necessary);
 
@@ -1223,9 +1245,10 @@ compact_case (struct ccase *dest, const struct ccase *src)
 
   /* Copy all the variables except the scratch variables from SRC to
      DEST. */
-  for (i = 0; i < default_dict.nvar; i++)
+  var_cnt = dict_get_var_cnt (default_dict);
+  for (i = 0; i < var_cnt; i++)
     {
-      struct variable *v = default_dict.var[i];
+      struct variable *v = dict_get_var (default_dict, i);
       
       if (v->name[0] == '#')
        continue;
@@ -1246,35 +1269,50 @@ compact_case (struct ccase *dest, const struct ccase *src)
 static void
 finish_compaction (void)
 {
-  int copy_index = 0;
-  int nval = 0;
   int i;
 
-  for (i = 0; i < default_dict.nvar; i++)
+  for (i = 0; i < dict_get_var_cnt (default_dict); )
     {
-      struct variable *v = default_dict.var[i];
-
-      if (v->name[0] == '#')
-       {
-         clear_variable (&default_dict, v);
-         free (v);
-         continue;
-       }
+      struct variable *v = dict_get_var (default_dict, i);
 
-      v->fv = nval;
-      if (v->type == NUMERIC)
-       nval++;
+      if (v->name[0] == '#') 
+        dict_delete_var (default_dict, v);
       else
-       nval += DIV_RND_UP (v->width, sizeof (union value));
-      
-      default_dict.var[copy_index++] = v;
-    }
-  if (copy_index != default_dict.nvar)
-    {
-      default_dict.var = xrealloc (default_dict.var,
-                                  sizeof *default_dict.var * copy_index);
-      default_dict.nvar = copy_index;
+        i++;
     }
+  dict_compact_values (default_dict);
+}
+
+struct case_source *
+create_case_source (const struct case_source_class *class, void *aux) 
+{
+  struct case_source *source = xmalloc (sizeof *source);
+  source->class = class;
+  source->aux = aux;
+  return source;
+}
+
+int
+case_source_is_complex (const struct case_source *source) 
+{
+  return source != NULL && (source->class == &input_program_source_class
+                            || source->class == &file_type_source_class);
+}
+
+int
+case_source_is_class (const struct case_source *source,
+                      const struct case_source_class *class) 
+{
+  return source != NULL && source->class == class;
+
+}
+
+struct case_sink *
+create_case_sink (const struct case_sink_class *class, void *aux) 
+{
+  struct case_sink *sink = xmalloc (sizeof *sink);
+  sink->class = class;
+  sink->aux = aux;
+  return sink;
 }
 
-