Eliminate temp_case, and a few other cleanups.
[pspp-builds.git] / src / vfm.c
index e9c7d2048064d5abdc406fe1ddcadf83681f33dc..07bfaad30a33d9d58d842fbbfb2e33d6d404a53a 100644 (file)
--- a/src/vfm.c
+++ b/src/vfm.c
 #include <unistd.h>    /* Required by SunOS4. */
 #endif
 #include "alloc.h"
-#include "approx.h"
 #include "do-ifP.h"
 #include "error.h"
 #include "expr.h"
 #include "misc.h"
 #include "random.h"
+#include "settings.h"
 #include "som.h"
 #include "str.h"
 #include "tab.h"
 /*
    Virtual File Manager (vfm):
 
-   vfm is used to process data files.  It uses the model that data is
-   read from one stream (the data source), then written to another
-   (the data sink).  The data source is then deleted and the data sink
-   becomes the data source for the next procedure. */
-
-#include "debug-print.h"
+   vfm is used to process data files.  It uses the model that
+   data is read from one stream (the data source), processed,
+   then written to another (the data sink).  The data source is
+   then deleted and the data sink becomes the data source for the
+   next procedure. */
 
 /* Procedure execution data. */
 struct write_case_data
   {
-    void (*beginfunc) (void *);
-    int (*procfunc) (struct ccase *, void *);
-    void (*endfunc) (void *);
+    /* Functions to call... */
+    void (*begin_func) (void *);               /* ...before data. */
+    int (*proc_func) (struct ccase *, void *); /* ...with data. */
+    void (*end_func) (void *);                 /* ...after data. */
+    void *func_aux;                            /* Auxiliary data. */ 
+
+    /* Extra auxiliary data. */
     void *aux;
   };
 
-/* This is used to read from the active file. */
-struct case_stream *vfm_source;
-
-/* This is used to write to the replacement active file. */
-struct case_stream *vfm_sink;
-
-/* Information about the data source. */
-struct stream_info vfm_source_info;
+/* The current active file, from which cases are read. */
+struct case_source *vfm_source;
 
-/* Information about the data sink. */
-struct stream_info vfm_sink_info;
+/* The replacement active file, to which cases are written. */
+struct case_sink *vfm_sink;
 
 /* Nonzero if the case needs to have values deleted before being
    stored, zero otherwise. */
 int compaction_necessary;
 
-/* Number of values after compaction, or the same as
-   vfm_sink_info.nval, if compaction is not necessary. */
+/* Number of values after compaction. */
 int compaction_nval;
 
 /* Temporary case buffer with enough room for `compaction_nval'
    `value's. */
 struct ccase *compaction_case;
 
-/* Within a session, when paging is turned on, it is never turned back
-   off.  This policy might be too aggressive. */
-static int paging = 0;
+/* Nonzero means that we've overflowed our allotted workspace.
+   After that happens once during a session, we always store the
+   active file on disk instead of in memory.  (This policy may be
+   too aggressive.) */
+static int workspace_overflow = 0;
 
 /* Time at which vfm was last invoked. */
 time_t last_vfm_invocation;
@@ -99,119 +97,158 @@ static int lag_count;             /* Number of cases in lag_queue so far. */
 static int lag_head;           /* Index where next case will be added. */
 static struct ccase **lag_queue; /* Array of n_lag ccase * elements. */
 
+static struct ccase *create_trns_case (struct dictionary *dict);
 static void open_active_file (void);
 static void close_active_file (struct write_case_data *);
-static int SPLIT_FILE_procfunc (struct ccase *, void *);
+static int SPLIT_FILE_proc_func (struct ccase *, void *);
 static void finish_compaction (void);
-static void lag_case (void);
-static int procedure_write_case (struct write_case_data *);
-static void clear_temp_case (void);
-static int exclude_this_case (void);
+static void lag_case (const struct ccase *);
+static write_case_func procedure_write_case;
+static void clear_case (struct ccase *);
+static int exclude_this_case (const struct ccase *, int case_num);
 \f
 /* Public functions. */
 
+/* Auxiliary data for executing a procedure. */
+struct procedure_aux_data 
+  {
+    struct ccase *trns_case;    /* Case used for transformations. */
+    size_t cases_written;       /* Number of cases written so far. */
+  };
+
+/* Auxiliary data for SPLIT FILE. */
+struct split_aux_data 
+  {
+    struct ccase *prev_case;    /* Data in previous case. */
+  };
+
 /* Reads all the cases from the active file, transforms them by
-   the active set of transformations, calls PROCFUNC with CURCASE
-   set to the case, and writes them to a new active file.
+   the active set of transformations, passes each of them to
+   PROC_FUNC, and writes them to a new active file.
 
    Divides the active file into zero or more series of one or more
-   cases each.  BEGINFUNC is called before each series.  ENDFUNC is
+   cases each.  BEGIN_FUNC is called before each series.  END_FUNC is
    called after each series.
 
-   Arbitrary user-specified data AUX is passed to BEGINFUNC,
-   PROCFUNC, and ENDFUNC as auxiliary data. */
+   Arbitrary user-specified data AUX is passed to BEGIN_FUNC,
+   PROC_FUNC, and END_FUNC as auxiliary data. */
 void
-procedure (void (*beginfunc) (void *),
-          int (*procfunc) (struct ccase *curcase, void *),
-          void (*endfunc) (void *),
-           void *aux)
+procedure (void (*begin_func) (void *),
+          int (*proc_func) (struct ccase *, void *),
+          void (*end_func) (void *),
+           void *func_aux)
 {
+  static int recursive_call;
+
   struct write_case_data procedure_write_data;
+  struct procedure_aux_data proc_aux;
+
   struct write_case_data split_file_data;
+  struct split_aux_data split_aux;
+  int split;
 
-  if (dict_get_split_cnt (default_dict) == 0) 
-    {
-      /* Normally we just use the data passed by the user. */
-      procedure_write_data.beginfunc = beginfunc;
-      procedure_write_data.procfunc = procfunc;
-      procedure_write_data.endfunc = endfunc;
-      procedure_write_data.aux = aux;
-    }
-  else
+  assert (++recursive_call == 1);
+
+  proc_aux.cases_written = 0;
+  proc_aux.trns_case = create_trns_case (default_dict);
+
+  /* Normally we just use the data passed by the user. */
+  procedure_write_data.begin_func = begin_func;
+  procedure_write_data.proc_func = proc_func;
+  procedure_write_data.end_func = end_func;
+  procedure_write_data.func_aux = func_aux;
+  procedure_write_data.aux = &proc_aux;
+
+  /* Under SPLIT FILE, we add a layer of indirection. */
+  split = dict_get_split_cnt (default_dict) > 0;
+  if (split) 
     {
-      /* Under SPLIT FILE, we add a layer of indirection. */
-      procedure_write_data.beginfunc = NULL;
-      procedure_write_data.procfunc = SPLIT_FILE_procfunc;
-      procedure_write_data.endfunc = endfunc;
-      procedure_write_data.aux = &split_file_data;
-
-      split_file_data.beginfunc = beginfunc;
-      split_file_data.procfunc = procfunc;
-      split_file_data.endfunc = endfunc;
-      split_file_data.aux = aux;
+      split_file_data = procedure_write_data;
+      split_file_data.aux = &split_aux;
+
+      split_aux.prev_case = xmalloc (dict_get_case_size (default_dict));
+
+      procedure_write_data.begin_func = NULL;
+      procedure_write_data.proc_func = SPLIT_FILE_proc_func;
+      procedure_write_data.end_func = end_func;
+      procedure_write_data.func_aux = &split_file_data;
     }
 
   last_vfm_invocation = time (NULL);
 
   open_active_file ();
-  vfm_source->read (procedure_write_case, &procedure_write_data);
+  if (vfm_source != NULL) 
+    vfm_source->class->read (vfm_source,
+                             proc_aux.trns_case,
+                             procedure_write_case, &procedure_write_data);
   close_active_file (&procedure_write_data);
+
+  if (split)
+    free (split_aux.prev_case);
+
+  free (proc_aux.trns_case);
+
+  assert (--recursive_call == 0);
 }
 \f
 /* Active file processing support.  Subtly different semantics from
    procedure(). */
 
-static int process_active_file_write_case (struct write_case_data *data);
+static write_case_func process_active_file_write_case;
 
-/* The casefunc might want us to stop calling it. */
+/* The case_func might want us to stop calling it. */
 static int not_canceled;
 
-/* Reads all the cases from the active file and passes them one-by-one
-   to CASEFUNC in temp_case.  Before any cases are passed, calls
-   BEGINFUNC.  After all the cases have been passed, calls ENDFUNC.
-   BEGINFUNC, CASEFUNC, and ENDFUNC can write temp_case to the output
-   file by calling process_active_file_output_case().
+/* Reads all the cases from the active file and passes them
+   one-by-one to CASE_FUNC.  Before any cases are passed, calls
+   BEGIN_FUNC.  After all the cases have been passed, calls
+   END_FUNC.  BEGIN_FUNC, CASE_FUNC, and END_FUNC can write to
+   the output file by calling process_active_file_output_case().
 
    process_active_file() ignores TEMPORARY, SPLIT FILE, and N. */
 void
-process_active_file (void (*beginfunc) (void *),
-                    int (*casefunc) (struct ccase *curcase, void *),
-                    void (*endfunc) (void *),
-                     void *aux)
+process_active_file (void (*begin_func) (void *),
+                    int (*case_func) (struct ccase *, void *),
+                    void (*end_func) (void *),
+                     void *func_aux)
 {
+  struct procedure_aux_data proc_aux;
   struct write_case_data process_active_write_data;
 
-  process_active_write_data.beginfunc = beginfunc;
-  process_active_write_data.procfunc = casefunc;
-  process_active_write_data.endfunc = endfunc;
-  process_active_write_data.aux = aux;
+  proc_aux.cases_written = 0;
+  proc_aux.trns_case = create_trns_case (default_dict);
+
+  process_active_write_data.begin_func = begin_func;
+  process_active_write_data.proc_func = case_func;
+  process_active_write_data.end_func = end_func;
+  process_active_write_data.func_aux = func_aux;
+  process_active_write_data.aux = &proc_aux;
 
   not_canceled = 1;
 
   open_active_file ();
-  beginfunc (aux);
-  
-  /* There doesn't necessarily need to be an active file. */
-  if (vfm_source)
-    vfm_source->read (process_active_file_write_case,
-                      &process_active_write_data);
-  
-  endfunc (aux);
+  begin_func (func_aux);
+  if (vfm_source != NULL)
+    vfm_source->class->read (vfm_source, proc_aux.trns_case,
+                             process_active_file_write_case,
+                             &process_active_write_data);
+  end_func (func_aux);
   close_active_file (&process_active_write_data);
 }
 
-/* Pass the current case to casefunc. */
+/* Pass the current case to case_func. */
 static int
-process_active_file_write_case (struct write_case_data *data)
+process_active_file_write_case (struct write_case_data *wc_data)
 {
-  /* Index of current transformation. */
-  int cur_trns;
+  struct procedure_aux_data *proc_aux = wc_data->aux;
+  int cur_trns;         /* Index of current transformation. */
 
-  for (cur_trns = f_trns ; cur_trns != temp_trns; )
+  for (cur_trns = f_trns; cur_trns != temp_trns; )
     {
       int code;
        
-      code = t_trns[cur_trns]->proc (t_trns[cur_trns], temp_case);
+      code = t_trns[cur_trns]->proc (t_trns[cur_trns], proc_aux->trns_case,
+                                     case_count + 1);
       switch (code)
        {
        case -1:
@@ -229,26 +266,52 @@ process_active_file_write_case (struct write_case_data *data)
     }
 
   if (n_lag)
-    lag_case ();
+    lag_case (proc_aux->trns_case);
          
   /* Call the procedure if FILTER and PROCESS IF don't prohibit it. */
-  if (not_canceled && !exclude_this_case ())
-    not_canceled = data->procfunc (temp_case, data->aux);
+  if (not_canceled && !exclude_this_case (proc_aux->trns_case, case_count + 1))
+    not_canceled = wc_data->proc_func (proc_aux->trns_case, wc_data->func_aux);
   
   case_count++;
   
  done:
-  clear_temp_case ();
+  clear_case (proc_aux->trns_case);
 
   return 1;
 }
 
-/* Write temp_case to the active file. */
+/* Write the given case to the active file. */
 void
-process_active_file_output_case (void)
+process_active_file_output_case (const struct ccase *c)
 {
-  vfm_sink_info.ncases++;
-  vfm_sink->write ();
+  vfm_sink->class->write (vfm_sink, c);
+}
+\f
+/* Creates and returns a case, initializing it from the vectors
+   that say which `value's need to be initialized just once, and
+   which ones need to be re-initialized before every case. */
+static struct ccase *
+create_trns_case (struct dictionary *dict)
+{
+  struct ccase *c = xmalloc (dict_get_case_size (dict));
+  size_t var_cnt = dict_get_var_cnt (dict);
+  size_t i;
+
+  for (i = 0; i < var_cnt; i++) 
+    {
+      struct variable *v = dict_get_var (dict, i);
+
+      if (v->type == NUMERIC) 
+        {
+          if (v->reinit)
+            c->data[v->fv].f = 0.0;
+          else
+            c->data[v->fv].f = SYSMIS;
+        }
+      else
+        memset (c->data[v->fv].s, ' ', v->width);
+    }
+  return c;
 }
 \f
 /* Opening the active file. */
@@ -260,39 +323,12 @@ process_active_file_output_case (void)
 static void
 prepare_for_writing (void)
 {
-  /* FIXME: If ALL the conditions listed below hold true, then the
-     replacement active file is guaranteed to be identical to the
-     original active file:
-
-     1. TEMPORARY was the first transformation, OR, there were no
-     transformations at all.
-
-     2. Input is not coming from an input program.
-
-     3. Compaction is not necessary.
-
-     So, in this case, we shouldn't have to replace the active
-     file--it's just a waste of time and space. */
-
-  vfm_sink_info.ncases = 0;
-  vfm_sink_info.nval = dict_get_next_value_idx (default_dict);
-  vfm_sink_info.case_size = dict_get_case_size (default_dict);
-  
   if (vfm_sink == NULL)
     {
-      if (vfm_sink_info.case_size * vfm_source_info.ncases > MAX_WORKSPACE
-         && !paging)
-       {
-         msg (MW, _("Workspace overflow predicted.  Max workspace is "
-                    "currently set to %d KB (%d cases at %d bytes each).  "
-                    "Paging active file to disk."),
-              MAX_WORKSPACE / 1024, MAX_WORKSPACE / vfm_sink_info.case_size,
-              vfm_sink_info.case_size);
-         
-         paging = 1;
-       }
-      
-      vfm_sink = paging ? &vfm_disk_stream : &vfm_memory_stream;
+      if (workspace_overflow)
+        vfm_sink = create_case_sink (&disk_sink_class, NULL);
+      else
+        vfm_sink = create_case_sink (&memory_sink_class, NULL);
     }
 }
 
@@ -328,19 +364,13 @@ arrange_compaction (void)
   else
     compaction_necessary = 0;
   
-  if (vfm_sink->init)
-    vfm_sink->init ();
-}
-
-/* Prepares the temporary case and compaction case. */
-static void
-make_temp_case (void)
-{
-  temp_case = xmalloc (vfm_sink_info.case_size);
+  if (vfm_sink->class->open != NULL)
+    vfm_sink->class->open (vfm_sink);
 
   if (compaction_necessary)
     compaction_case = xmalloc (sizeof (struct ccase)
                               + sizeof (union value) * (compaction_nval - 1));
+
 }
 
 #if DEBUGGING
@@ -362,31 +392,6 @@ index_to_varname (int ccase_index)
 }
 #endif
 
-/* Initializes temp_case from the vectors that say which `value's
-   need to be initialized just once, and which ones need to be
-   re-initialized before every case. */
-static void
-vector_initialization (void)
-{
-  size_t var_cnt = dict_get_var_cnt (default_dict);
-  size_t i;
-  
-  for (i = 0; i < var_cnt; i++) 
-    {
-      struct variable *v = dict_get_var (default_dict, i);
-
-      if (v->type == NUMERIC) 
-        {
-          if (v->reinit)
-            temp_case->data[v->fv].f = 0.0;
-          else
-            temp_case->data[v->fv].f = SYSMIS;
-        }
-      else
-        memset (temp_case->data[v->fv].s, ' ', v->width);
-    }
-}
-
 /* Sets all the lag-related variables based on value of n_lag. */
 static void
 setup_lag (void)
@@ -408,34 +413,23 @@ setup_lag (void)
    Here is each nval count, with explanation, as set up by
    open_active_file():
 
-   vfm_source_info.nval: Number of `value's in the cases returned by
-   the source stream.  This value turns out not to be very useful, but
-   we maintain it anyway.
-
-   vfm_sink_info.nval: Number of `value's in the cases after all
-   transformations have been performed.  Never less than
-   vfm_source_info.nval.
-
    temp_dict->nval: Number of `value's in the cases after the
-   transformations leading up to TEMPORARY have been performed.  If
-   TEMPORARY was not specified, this is equal to vfm_sink_info.nval.
-   Never less than vfm_sink_info.nval.
+   transformations leading up to TEMPORARY have been performed.
 
    compaction_nval: Number of `value's in the cases after the
-   transformations leading up to TEMPORARY have been performed and the
-   case has been compacted by compact_case(), if compaction is
-   necessary.  This the number of `value's in the cases saved by the
-   sink stream.  (However, note that the cases passed to the sink
-   stream have not yet been compacted.  It is the responsibility of
-   the data sink to call compact_case().)  This may be less than,
-   greater than, or equal to vfm_source_info.nval.  `compaction'
-   becomes the new value of default_dict.nval after the procedure is
-   completed.
-
-   default_dict.nval: This is often an alias for temp_dict->nval.  As
-   such it can really have no separate existence until the procedure
-   is complete.  For this reason it should *not* be referenced inside
-   the execution of a procedure. */
+   transformations leading up to TEMPORARY have been performed
+   and the case has been compacted by compact_case(), if
+   compaction is necessary.  This the number of `value's in the
+   cases saved by the sink stream.  (However, note that the cases
+   passed to the sink stream have not yet been compacted.  It is
+   the responsibility of the data sink to call compact_case().)
+   `compaction' becomes the new value of default_dict.nval after
+   the procedure is completed.
+
+   default_dict.nval: This is often an alias for temp_dict->nval.
+   As such it can really have no separate existence until the
+   procedure is complete.  For this reason it should *not* be
+   referenced inside the execution of a procedure. */
 /* Makes all preparations for reading from the data source and writing
    to the data sink. */
 static void
@@ -458,19 +452,8 @@ open_active_file (void)
   /* The rest. */
   prepare_for_writing ();
   arrange_compaction ();
-  make_temp_case ();
-  vector_initialization ();
   discard_ctl_stack ();
   setup_lag ();
-
-  /* Debug output. */
-  debug_printf (("vfm: reading from %s source, writing to %s sink.\n",
-                vfm_source->name, vfm_sink->name));
-  debug_printf (("vfm: vfm_source_info.nval=%d, vfm_sink_info.nval=%d, "
-                "temp_dict->nval=%d, compaction_nval=%d, "
-                "default_dict.nval=%d\n",
-                vfm_source_info.nval, vfm_sink_info.nval, temp_dict->nval,
-                compaction_nval, default_dict.nval));
 }
 \f
 /* Closes the active file. */
@@ -478,8 +461,8 @@ static void
 close_active_file (struct write_case_data *data)
 {
   /* Close the current case group. */
-  if (case_count && data->endfunc != NULL)
-    data->endfunc (data->aux);
+  if (case_count && data->end_func != NULL)
+    data->end_func (data->func_aux);
 
   /* Stop lagging (catch up?). */
   if (n_lag)
@@ -506,27 +489,26 @@ close_active_file (struct write_case_data *data)
     finish_compaction ();
     
   /* Old data sink --> New data source. */
-  if (vfm_source && vfm_source->destroy_source)
-    vfm_source->destroy_source ();
-  
-  vfm_source = vfm_sink;
-  vfm_source_info.ncases = vfm_sink_info.ncases;
-  vfm_source_info.nval = compaction_nval;
-  vfm_source_info.case_size = (sizeof (struct ccase)
-                              + (compaction_nval - 1) * sizeof (union value));
-  if (vfm_source->mode)
-    vfm_source->mode ();
+  if (vfm_source != NULL) 
+    {
+      if (vfm_source->class->destroy != NULL)
+        vfm_source->class->destroy (vfm_source);
+      free (vfm_source);
+    }
+
+  if (vfm_sink->class->make_source != NULL)
+    vfm_source = vfm_sink->class->make_source (vfm_sink);
+  else
+    vfm_source = NULL;
 
   /* Old data sink is gone now. */
+  free (vfm_sink);
   vfm_sink = NULL;
 
   /* Cancel TEMPORARY. */
   cancel_temporary ();
 
   /* Free temporary cases. */
-  free (temp_case);
-  temp_case = NULL;
-
   free (compaction_case);
   compaction_case = NULL;
 
@@ -546,22 +528,28 @@ close_active_file (struct write_case_data *data)
 
   /* Clear VECTOR vectors. */
   dict_clear_vectors (default_dict);
-
-  debug_printf (("vfm: procedure complete\n\n"));
 }
 \f
 /* Disk case stream. */
 
-/* Associated files. */
-FILE *disk_source_file;
-FILE *disk_sink_file;
+/* Information about disk sink or source. */
+struct disk_stream_info 
+  {
+    FILE *file;                 /* Output file. */
+    size_t case_cnt;            /* Number of cases written so far. */
+    size_t case_size;           /* Number of bytes in case. */
+  };
 
 /* Initializes the disk sink. */
 static void
-disk_stream_init (void)
+disk_sink_create (struct case_sink *sink)
 {
-  disk_sink_file = tmpfile ();
-  if (!disk_sink_file)
+  struct disk_stream_info *info = xmalloc (sizeof *info);
+  info->file = tmpfile ();
+  info->case_cnt = 0;
+  info->case_size = compaction_nval;
+  sink->aux = info;
+  if (info->file == NULL)
     {
       msg (ME, _("An error occurred attempting to create a temporary "
                 "file for use as the active file: %s."),
@@ -570,44 +558,23 @@ disk_stream_init (void)
     }
 }
 
-/* Reads all cases from the disk source and passes them one by one to
-   write_case(). */
-static void
-disk_stream_read (write_case_func *write_case, write_case_data wc_data)
-{
-  int i;
-
-  for (i = 0; i < vfm_source_info.ncases; i++)
-    {
-      if (!fread (temp_case, vfm_source_info.case_size, 1, disk_source_file))
-       {
-         msg (ME, _("An error occurred while attempting to read from "
-              "a temporary file created for the active file: %s."),
-              strerror (errno));
-         err_failure ();
-         return;
-       }
-
-      if (!write_case (wc_data))
-       return;
-    }
-}
-
-/* Writes temp_case to the disk sink. */
+/* Writes case C to the disk sink. */
 static void
-disk_stream_write (void)
+disk_sink_write (struct case_sink *sink, const struct ccase *c)
 {
-  union value *src_case;
+  struct disk_stream_info *info = sink->aux;
+  const union value *src_case;
 
   if (compaction_necessary)
     {
-      compact_case (compaction_case, temp_case);
-      src_case = (union value *) compaction_case;
+      compact_case (compaction_case, c);
+      src_case = compaction_case->data;
     }
-  else src_case = (union value *) temp_case;
+  else src_case = c->data;
 
+  info->case_cnt++;
   if (fwrite (src_case, sizeof *src_case * compaction_nval, 1,
-             disk_sink_file) != 1)
+              info->file) != 1)
     {
       msg (ME, _("An error occurred while attempting to write to a "
                 "temporary file used as the active file: %s."),
@@ -616,12 +583,25 @@ disk_stream_write (void)
     }
 }
 
-/* Switches the stream from a sink to a source. */
+/* Destroys the sink's internal data. */
 static void
-disk_stream_mode (void)
+disk_sink_destroy (struct case_sink *sink)
+{
+  struct disk_stream_info *info = sink->aux;
+  if (info->file != NULL)
+    fclose (info->file);
+}
+
+/* Closes and destroys the sink and returns a disk source to read
+   back the written data. */
+static struct case_source *
+disk_sink_make_source (struct case_sink *sink) 
 {
-  /* Rewind the sink. */
-  if (fseek (disk_sink_file, 0, SEEK_SET) != 0)
+  struct disk_stream_info *info = sink->aux;
+    
+  /* Rewind the file. */
+  assert (info->file != NULL);
+  if (fseek (info->file, 0, SEEK_SET) != 0)
     {
       msg (ME, _("An error occurred while attempting to rewind a "
                 "temporary file used as the active file: %s."),
@@ -629,107 +609,141 @@ disk_stream_mode (void)
       err_failure ();
     }
   
-  /* Sink --> source variables. */
-  disk_source_file = disk_sink_file;
+  return create_case_source (&disk_source_class, default_dict, info);
 }
 
-/* Destroys the source's internal data. */
+/* Disk sink. */
+const struct case_sink_class disk_sink_class = 
+  {
+    "disk",
+    disk_sink_create,
+    disk_sink_write,
+    disk_sink_destroy,
+    disk_sink_make_source,
+  };
+\f
+/* Disk source. */
+
+/* Returns the number of cases that will be read by
+   disk_source_read(). */
+static int
+disk_source_count (const struct case_source *source) 
+{
+  struct disk_stream_info *info = source->aux;
+
+  return info->case_cnt;
+}
+
+/* Reads all cases from the disk source and passes them one by one to
+   write_case(). */
 static void
-disk_stream_destroy_source (void)
+disk_source_read (struct case_source *source,
+                  struct ccase *c,
+                  write_case_func *write_case, write_case_data wc_data)
 {
-  if (disk_source_file)
+  struct disk_stream_info *info = source->aux;
+  int i;
+
+  for (i = 0; i < info->case_cnt; i++)
     {
-      fclose (disk_source_file);
-      disk_source_file = NULL;
+      if (!fread (c, info->case_size, 1, info->file))
+       {
+         msg (ME, _("An error occurred while attempting to read from "
+              "a temporary file created for the active file: %s."),
+              strerror (errno));
+         err_failure ();
+         break;
+       }
+
+      if (!write_case (wc_data))
+       break;
     }
 }
 
-/* Destroys the sink's internal data. */
+/* Destroys the source's internal data. */
 static void
-disk_stream_destroy_sink (void)
+disk_source_destroy (struct case_source *source)
 {
-  if (disk_sink_file)
-    {
-      fclose (disk_sink_file);
-      disk_sink_file = NULL;
-    }
+  struct disk_stream_info *info = source->aux;
+  if (info->file != NULL)
+    fclose (info->file);
+  free (info);
 }
 
-/* Disk stream. */
-struct case_stream vfm_disk_stream = 
+/* Disk source. */
+const struct case_source_class disk_source_class = 
   {
-    disk_stream_init,
-    disk_stream_read,
-    disk_stream_write,
-    disk_stream_mode,
-    disk_stream_destroy_source,
-    disk_stream_destroy_sink,
     "disk",
+    disk_source_count,
+    disk_source_read,
+    disk_source_destroy,
   };
 \f
 /* Memory case stream. */
 
-/* List of cases stored in the stream. */
-struct case_list *memory_source_cases;
-struct case_list *memory_sink_cases;
-
-/* Current case. */
-struct case_list *memory_sink_iter;
+/* Memory sink data. */
+struct memory_sink_info
+  {
+    size_t case_cnt;            /* Number of cases. */
+    size_t case_size;           /* Case size in bytes. */
+    int max_cases;              /* Maximum cases before switching to disk. */
+    struct case_list *head;     /* First case in list. */
+    struct case_list *tail;     /* Last case in list. */
+  };
 
-/* Maximum number of cases. */
-int memory_sink_max_cases;
+/* Memory source data. */
+struct memory_source_info 
+  {
+    size_t case_cnt;            /* Number of cases. */
+    size_t case_size;           /* Case size in bytes. */
+    struct case_list *cases;    /* List of cases. */
+  };
 
-/* Initializes the memory stream variables for writing. */
+/* Creates the SINK memory sink. */
 static void
-memory_stream_init (void)
+memory_sink_create (struct case_sink *sink) 
 {
-  memory_sink_cases = NULL;
-  memory_sink_iter = NULL;
+  struct memory_sink_info *info;
   
-  assert (compaction_nval);
-  memory_sink_max_cases = MAX_WORKSPACE / (sizeof (union value) * compaction_nval);
-}
+  sink->aux = info = xmalloc (sizeof *info);
 
-/* Reads the case stream from memory and passes it to write_case(). */
-static void
-memory_stream_read (write_case_func *write_case, write_case_data wc_data)
-{
-  while (memory_source_cases != NULL)
-    {
-      memcpy (temp_case, &memory_source_cases->c, vfm_source_info.case_size);
-      
-      {
-       struct case_list *current = memory_source_cases;
-       memory_source_cases = memory_source_cases->next;
-       free (current);
-      }
-      
-      if (!write_case (wc_data))
-       return;
-    }
+  assert (compaction_nval > 0);
+  info->case_cnt = 0;
+  info->case_size = compaction_nval * sizeof (union value);
+  info->max_cases = set_max_workspace / info->case_size;
+  info->head = info->tail = NULL;
 }
 
-/* Writes temp_case to the memory stream. */
+/* Writes case C to memory sink SINK. */
 static void
-memory_stream_write (void)
+memory_sink_write (struct case_sink *sink, const struct ccase *c) 
 {
-  struct case_list *new_case = malloc (sizeof (struct case_list)
-                                      + ((compaction_nval - 1)
-                                         * sizeof (union value)));
+  struct memory_sink_info *info = sink->aux;
+  size_t case_size;
+  struct case_list *new_case;
+
+  case_size = sizeof (struct case_list)
+                      + ((compaction_nval - 1) * sizeof (union value));
+  new_case = malloc (case_size);
 
   /* If we've got memory to spare then add it to the linked list. */
-  if (vfm_sink_info.ncases <= memory_sink_max_cases && new_case != NULL)
+  if (info->case_cnt <= info->max_cases && new_case != NULL)
     {
-      if (compaction_necessary)
-       compact_case (&new_case->c, temp_case);
-      else
-       memcpy (&new_case->c, temp_case, sizeof (union value) * compaction_nval);
+      info->case_cnt++;
 
       /* Append case to linked list. */
-      if (memory_sink_cases)
-       memory_sink_iter = memory_sink_iter->next = new_case;
+      new_case->next = NULL;
+      if (info->head != NULL)
+        info->tail->next = new_case;
       else
-       memory_sink_iter = memory_sink_cases = new_case;
+        info->head = new_case;
+      info->tail = new_case;
+
+      /* Copy data into case. */
+      if (compaction_necessary)
+       compact_case (&new_case->c, c);
+      else
+       memcpy (&new_case->c, c, sizeof (union value) * compaction_nval);
     }
   else
     {
@@ -738,36 +752,32 @@ memory_stream_write (void)
 
       /* Notify the user. */
       if (!new_case)
-       msg (MW, _("Virtual memory exhausted.  Paging active file "
+       msg (MW, _("Virtual memory exhausted.  Writing active file "
                   "to disk."));
       else
        msg (MW, _("Workspace limit of %d KB (%d cases at %d bytes each) "
-                  "overflowed.  Paging active file to disk."),
-            MAX_WORKSPACE / 1024, memory_sink_max_cases,
+                  "overflowed.  Writing active file to disk."),
+            set_max_workspace / 1024, info->max_cases,
             compaction_nval * sizeof (union value));
 
       free (new_case);
 
       /* Switch to a disk sink. */
-      vfm_sink = &vfm_disk_stream;
-      vfm_sink->init ();
-      paging = 1;
-
-      /* Terminate the list. */
-      if (memory_sink_iter)
-       memory_sink_iter->next = NULL;
+      vfm_sink = create_case_sink (&disk_sink_class, NULL);
+      vfm_sink->class->open (vfm_sink);
+      workspace_overflow = 1;
 
       /* Write the cases to disk and destroy them.  We can't call
          vfm->sink->write() because of compaction. */
-      for (cur = memory_sink_cases; cur; cur = next)
+      for (cur = info->head; cur; cur = next)
        {
          next = cur->next;
          if (fwrite (cur->c.data, sizeof (union value) * compaction_nval, 1,
-                     disk_sink_file) != 1)
+                     vfm_sink->aux) != 1)
            {
              msg (ME, _("An error occurred while attempting to "
                         "write to a temporary file created as the "
-                        "active file, while paging to disk: %s."),
+                        "active file: %s."),
                   strerror (errno));
              err_failure ();
            }
@@ -775,36 +785,38 @@ memory_stream_write (void)
        }
 
       /* Write the current case to disk. */
-      vfm_sink->write ();
+      vfm_sink->class->write (vfm_sink, c);
     }
 }
 
 /* If the data is stored in memory, causes it to be written to disk.
    To be called only *between* procedure()s, not within them. */
 void
-page_to_disk (void)
+write_active_file_to_disk (void)
 {
-  if (vfm_source == &vfm_memory_stream)
+  if (case_source_is_class (vfm_source, &memory_source_class))
     {
+      struct memory_source_info *info = vfm_source->aux;
+
       /* Switch to a disk sink. */
-      vfm_sink = &vfm_disk_stream;
-      vfm_sink->init ();
-      paging = 1;
+      vfm_sink = create_case_sink (&disk_sink_class, NULL);
+      vfm_sink->class->open (vfm_sink);
+      workspace_overflow = 1;
       
       /* Write the cases to disk and destroy them.  We can't call
          vfm->sink->write() because of compaction. */
       {
        struct case_list *cur, *next;
        
-       for (cur = memory_source_cases; cur; cur = next)
+       for (cur = info->cases; cur; cur = next)
          {
            next = cur->next;
            if (fwrite (cur->c.data, sizeof *cur->c.data * compaction_nval, 1,
-                       disk_sink_file) != 1)
+                       vfm_sink->aux) != 1)
              {
                msg (ME, _("An error occurred while attempting to "
                           "write to a temporary file created as the "
-                          "active file, while paging to disk: %s."),
+                          "active file: %s."),
                     strerror (errno));
                err_failure ();
              }
@@ -812,76 +824,132 @@ page_to_disk (void)
          }
       }
       
-      vfm_source = &vfm_disk_stream;
-      vfm_source->mode ();
-
+      vfm_source = vfm_sink->class->make_source (vfm_sink);
       vfm_sink = NULL;
     }
 }
 
-/* Switch the memory stream from sink to source mode. */
+/* Destroy all memory sink data. */
 static void
-memory_stream_mode (void)
+memory_sink_destroy (struct case_sink *sink)
+{
+  struct memory_sink_info *info = sink->aux;
+  struct case_list *cur, *next;
+  
+  for (cur = info->head; cur; cur = next)
+    {
+      next = cur->next;
+      free (cur);
+    }
+  free (info);
+}
+
+/* Switch the memory stream from sink to source mode. */
+static struct case_source *
+memory_sink_make_source (struct case_sink *sink)
 {
-  /* Terminate the list. */
-  if (memory_sink_iter)
-    memory_sink_iter->next = NULL;
+  struct memory_sink_info *sink_info = sink->aux;
+  struct memory_source_info *source_info;
+
+  source_info = xmalloc (sizeof *source_info);
+  source_info->case_cnt = sink_info->case_cnt;
+  source_info->case_size = sink_info->case_size;
+  source_info->cases = sink_info->head;
 
-  /* Sink --> source variables. */
-  memory_source_cases = memory_sink_cases;
-  memory_sink_cases = NULL;
+  free (sink_info);
+
+  return create_case_source (&memory_source_class,
+                             default_dict, source_info);
 }
 
-/* Destroy all memory source data. */
+const struct case_sink_class memory_sink_class = 
+  {
+    "memory",
+    memory_sink_create,
+    memory_sink_write,
+    memory_sink_destroy,
+    memory_sink_make_source,
+  };
+
+/* Returns the number of cases in the source. */
+static int
+memory_source_count (const struct case_source *source) 
+{
+  struct memory_source_info *info = source->aux;
+
+  return info->case_cnt;
+}
+
+/* Reads the case stream from memory and passes it to write_case(). */
 static void
-memory_stream_destroy_source (void)
+memory_source_read (struct case_source *source,
+                    struct ccase *c,
+                    write_case_func *write_case, write_case_data wc_data)
 {
-  struct case_list *cur, *next;
-  
-  for (cur = memory_source_cases; cur; cur = next)
+  struct memory_source_info *info = source->aux;
+
+  while (info->cases != NULL) 
     {
-      next = cur->next;
-      free (cur);
+      struct case_list *iter = info->cases;
+      memcpy (c, &iter->c, info->case_size);
+      if (!write_case (wc_data)) 
+        break;
+            
+      info->cases = iter->next;
+      free (iter);
     }
-  memory_source_cases = NULL;
 }
 
-/* Destroy all memory sink data. */
+/* Destroy all memory source data. */
 static void
-memory_stream_destroy_sink (void)
+memory_source_destroy (struct case_source *source)
 {
+  struct memory_source_info *info = source->aux;
   struct case_list *cur, *next;
   
-  for (cur = memory_sink_cases; cur; cur = next)
+  for (cur = info->cases; cur; cur = next)
     {
       next = cur->next;
       free (cur);
     }
-  memory_sink_cases = NULL;
+  free (info);
 }
-  
+
+/* Returns the list of cases in memory source SOURCE. */
+struct case_list *
+memory_source_get_cases (const struct case_source *source) 
+{
+  struct memory_source_info *info = source->aux;
+
+  return info->cases;
+}
+
+/* Sets the list of cases in memory source SOURCE to CASES. */
+void
+memory_source_set_cases (const struct case_source *source,
+                         struct case_list *cases) 
+{
+  struct memory_source_info *info = source->aux;
+
+  info->cases = cases;
+}
+
 /* Memory stream. */
-struct case_stream vfm_memory_stream = 
+const struct case_source_class memory_source_class = 
   {
-    memory_stream_init,
-    memory_stream_read,
-    memory_stream_write,
-    memory_stream_mode,
-    memory_stream_destroy_source,
-    memory_stream_destroy_sink,
     "memory",
+    memory_source_count,
+    memory_source_read,
+    memory_source_destroy,
   };
 \f
-#include "debug-print.h"
-
-/* Add temp_case to the lag queue. */
+/* Add C to the lag queue. */
 static void
-lag_case (void)
+lag_case (const struct ccase *c)
 {
   if (lag_count < n_lag)
     lag_count++;
-  memcpy (lag_queue[lag_head], temp_case,
-          dict_get_case_size (temp_dict));
+  memcpy (lag_queue[lag_head], c, dict_get_case_size (temp_dict));
   if (++lag_head >= n_lag)
     lag_head = 0;
 }
@@ -903,51 +971,50 @@ lagged_case (int n_before)
   }
 }
    
-/* Transforms temp_case and writes it to the replacement active file
-   if advisable.  Returns nonzero if more cases can be accepted, zero
-   otherwise.  Do not call this function again after it has returned
-   zero once.  */
+/* Transforms trns_case and writes it to the replacement active
+   file if advisable.  Returns nonzero if more cases can be
+   accepted, zero otherwise.  Do not call this function again
+   after it has returned zero once.  */
 int
 procedure_write_case (write_case_data wc_data)
 {
+  struct procedure_aux_data *proc_aux = wc_data->aux;
+
   /* Index of current transformation. */
   int cur_trns;
 
   /* Return value: whether it's reasonable to write any more cases. */
   int more_cases = 1;
 
-  debug_printf ((_("transform: ")));
-
   cur_trns = f_trns;
   for (;;)
     {
       /* Output the case if this is temp_trns. */
       if (cur_trns == temp_trns)
        {
-         debug_printf (("REC"));
+          int case_limit;
 
          if (n_lag)
-           lag_case ();
+           lag_case (proc_aux->trns_case);
          
-         vfm_sink_info.ncases++;
-         vfm_sink->write ();
+         vfm_sink->class->write (vfm_sink, proc_aux->trns_case);
 
-         if (dict_get_case_limit (default_dict))
-           more_cases = (vfm_sink_info.ncases
-                          < dict_get_case_limit (default_dict));
+          proc_aux->cases_written++;
+          case_limit = dict_get_case_limit (default_dict);
+         if (case_limit != 0 && proc_aux->cases_written >= case_limit)
+            more_cases = 0;
        }
 
       /* Are we done? */
       if (cur_trns >= n_trns)
        break;
       
-      debug_printf (("$%d", cur_trns));
-
       /* Decide which transformation should come next. */
       {
        int code;
        
-       code = t_trns[cur_trns]->proc (t_trns[cur_trns], temp_case);
+       code = t_trns[cur_trns]->proc (t_trns[cur_trns], proc_aux->trns_case,
+                                       proc_aux->cases_written + 1);
        switch (code)
          {
          case -1:
@@ -966,29 +1033,28 @@ procedure_write_case (write_case_data wc_data)
     }
 
   /* Call the beginning of group function. */
-  if (!case_count && wc_data->beginfunc != NULL)
-    wc_data->beginfunc (wc_data->aux);
+  if (!case_count && wc_data->begin_func != NULL)
+    wc_data->begin_func (wc_data->func_aux);
 
   /* Call the procedure if there is one and FILTER and PROCESS IF
      don't prohibit it. */
-  if (wc_data->procfunc != NULL && !exclude_this_case ())
-    wc_data->procfunc (temp_case, wc_data->aux);
+  if (wc_data->proc_func != NULL
+      && !exclude_this_case (proc_aux->trns_case, proc_aux->cases_written + 1))
+    wc_data->proc_func (proc_aux->trns_case, wc_data->func_aux);
 
   case_count++;
   
 done:
-  debug_putc ('\n', stdout);
-
-  clear_temp_case ();
+  clear_case (proc_aux->trns_case);
   
   /* Return previously determined value. */
   return more_cases;
 }
 
-/* Clears the variables in the temporary case that need to be
-   cleared between processing cases.  */
+/* Clears the variables in C that need to be cleared between
+   processing cases.  */
 static void
-clear_temp_case (void)
+clear_case (struct ccase *c)
 {
   /* FIXME?  This is linear in the number of variables, but
      doesn't need to be, so it's an easy optimization target. */
@@ -1001,30 +1067,31 @@ clear_temp_case (void)
       if (v->init && v->reinit) 
         {
           if (v->type == NUMERIC) 
-            temp_case->data[v->fv].f = SYSMIS;
+            c->data[v->fv].f = SYSMIS;
           else
-            memset (temp_case->data[v->fv].s, ' ', v->width);
+            memset (c->data[v->fv].s, ' ', v->width);
         } 
     }
 }
 
-/* Returns nonzero if this case should be exclude as specified on
-   FILTER or PROCESS IF, otherwise zero. */
+/* Returns nonzero if case C with case number CASE_NUM should be
+   exclude as specified on FILTER or PROCESS IF, otherwise
+   zero. */
 static int
-exclude_this_case (void)
+exclude_this_case (const struct ccase *c, int case_num)
 {
   /* FILTER. */
   struct variable *filter_var = dict_get_filter (default_dict);
   if (filter_var != NULL) 
     {
-      double f = temp_case->data[filter_var->fv].f;
+      double f = c->data[filter_var->fv].f;
       if (f == 0.0 || f == SYSMIS || is_num_user_missing (f, filter_var))
         return 1;
     }
 
   /* PROCESS IF. */
   if (process_if_expr != NULL
-      && expr_evaluate (process_if_expr, temp_case, NULL) != 1.0)
+      && expr_evaluate (process_if_expr, c, case_num, NULL) != 1.0)
     return 1;
 
   return 0;
@@ -1104,14 +1171,14 @@ dump_splits (struct ccase *c)
   tab_submit (t);
 }
 
-/* This procfunc is substituted for the user-supplied procfunc when
+/* This proc_func is substituted for the user-supplied proc_func when
    SPLIT FILE is active.  This function forms a wrapper around that
-   procfunc by dividing the input into series. */
+   proc_func by dividing the input into series. */
 static int
-SPLIT_FILE_procfunc (struct ccase *c, void *data_)
+SPLIT_FILE_proc_func (struct ccase *c, void *data_)
 {
   struct write_case_data *data = data_;
-  static struct ccase *prev_case;
+  struct split_aux_data *split_aux = data->aux;
   struct variable *const *split;
   size_t split_cnt;
   size_t i;
@@ -1120,16 +1187,13 @@ SPLIT_FILE_procfunc (struct ccase *c, void *data_)
      preserve the values of the case for later comparison. */
   if (case_count == 0)
     {
-      if (prev_case)
-       free (prev_case);
-      prev_case = xmalloc (vfm_sink_info.case_size);
-      memcpy (prev_case, c, vfm_sink_info.case_size);
+      memcpy (split_aux->prev_case, c, dict_get_case_size (default_dict));
 
       dump_splits (c);
-      if (data->beginfunc != NULL)
-       data->beginfunc (data->aux);
+      if (data->begin_func != NULL)
+       data->begin_func (data->func_aux);
       
-      return data->procfunc (c, data->aux);
+      return data->proc_func (c, data->func_aux);
     }
 
   /* Compare the value of each SPLIT FILE variable to the values on
@@ -1143,30 +1207,31 @@ SPLIT_FILE_procfunc (struct ccase *c, void *data_)
       switch (v->type)
        {
        case NUMERIC:
-         if (approx_ne (c->data[v->fv].f, prev_case->data[v->fv].f))
+         if (c->data[v->fv].f != split_aux->prev_case->data[v->fv].f)
            goto not_equal;
          break;
        case ALPHA:
-         if (memcmp (c->data[v->fv].s, prev_case->data[v->fv].s, v->width))
+         if (memcmp (c->data[v->fv].s,
+                      split_aux->prev_case->data[v->fv].s, v->width))
            goto not_equal;
          break;
        default:
          assert (0);
        }
     }
-  return data->procfunc (c, data->aux);
+  return data->proc_func (c, data->func_aux);
   
 not_equal:
   /* The values of the SPLIT FILE variable are different from the
      values on the previous case.  That means that it's time to begin
      a new series. */
-  if (data->endfunc != NULL)
-    data->endfunc (data->aux);
+  if (data->end_func != NULL)
+    data->end_func (data->func_aux);
   dump_splits (c);
-  if (data->beginfunc != NULL)
-    data->beginfunc (data->aux);
-  memcpy (prev_case, c, vfm_sink_info.case_size);
-  return data->procfunc (c, data->aux);
+  if (data->begin_func != NULL)
+    data->begin_func (data->func_aux);
+  memcpy (split_aux->prev_case, c, dict_get_case_size (default_dict));
+  return data->proc_func (c, data->func_aux);
 }
 \f
 /* Case compaction. */
@@ -1228,4 +1293,43 @@ finish_compaction (void)
   dict_compact_values (default_dict);
 }
 
-  
+/* Creates a case source with class CLASS and auxiliary data AUX
+   and based on dictionary DICT. */
+struct case_source *
+create_case_source (const struct case_source_class *class,
+                    const struct dictionary *dict,
+                    void *aux) 
+{
+  struct case_source *source = xmalloc (sizeof *source);
+  source->class = class;
+  source->value_cnt = dict_get_next_value_idx (dict);
+  source->aux = aux;
+  return source;
+}
+
+/* Returns nonzero if a case source is "complex". */
+int
+case_source_is_complex (const struct case_source *source) 
+{
+  return source != NULL && (source->class == &input_program_source_class
+                            || source->class == &file_type_source_class);
+}
+
+/* Returns nonzero if CLASS is the class of SOURCE. */
+int
+case_source_is_class (const struct case_source *source,
+                      const struct case_source_class *class) 
+{
+  return source != NULL && source->class == class;
+}
+
+/* Creates a case sink with class CLASS and auxiliary data
+   AUX. */
+struct case_sink *
+create_case_sink (const struct case_sink_class *class, void *aux) 
+{
+  struct case_sink *sink = xmalloc (sizeof *sink);
+  sink->class = class;
+  sink->aux = aux;
+  return sink;
+}