Oops - revert change that shouldn't have been applied.
[pspp] / src / vfm.c
index 45d9c28e8b436284bc795bd033b4479cf8a62433..82af200015d6aa40dc19f4fb6215d8a7c0480201 100644 (file)
--- a/src/vfm.c
+++ b/src/vfm.c
 #include <unistd.h>    /* Required by SunOS4. */
 #endif
 #include "alloc.h"
 #include <unistd.h>    /* Required by SunOS4. */
 #endif
 #include "alloc.h"
-#include "approx.h"
 #include "do-ifP.h"
 #include "error.h"
 #include "expr.h"
 #include "misc.h"
 #include "random.h"
 #include "do-ifP.h"
 #include "error.h"
 #include "expr.h"
 #include "misc.h"
 #include "random.h"
+#include "settings.h"
 #include "som.h"
 #include "str.h"
 #include "tab.h"
 #include "som.h"
 #include "str.h"
 #include "tab.h"
 /*
    Virtual File Manager (vfm):
 
 /*
    Virtual File Manager (vfm):
 
-   vfm is used to process data files.  It uses the model that data is
-   read from one stream (the data source), then written to another
-   (the data sink).  The data source is then deleted and the data sink
-   becomes the data source for the next procedure. */
+   vfm is used to process data files.  It uses the model that
+   data is read from one stream (the data source), processed,
+   then written to another (the data sink).  The data source is
+   then deleted and the data sink becomes the data source for the
+   next procedure. */
 
 #include "debug-print.h"
 
 
 #include "debug-print.h"
 
@@ -59,11 +60,11 @@ struct write_case_data
     void *aux;
   };
 
     void *aux;
   };
 
-/* This is used to read from the active file. */
-struct case_stream *vfm_source;
+/* The current active file, from which cases are read. */
+struct case_source *vfm_source;
 
 
-/* This is used to write to the replacement active file. */
-struct case_stream *vfm_sink;
+/* The replacement active file, to which cases are written. */
+struct case_sink *vfm_sink;
 
 /* Information about the data source. */
 struct stream_info vfm_source_info;
 
 /* Information about the data source. */
 struct stream_info vfm_source_info;
@@ -71,17 +72,6 @@ struct stream_info vfm_source_info;
 /* Information about the data sink. */
 struct stream_info vfm_sink_info;
 
 /* Information about the data sink. */
 struct stream_info vfm_sink_info;
 
-/* Filter variable and  `value' index. */
-static struct variable *filter_var;
-static int filter_index;
-
-#define FILTERED                                                       \
-       (filter_index != -1                                             \
-        && (temp_case->data[filter_index].f == 0.0                     \
-            || temp_case->data[filter_index].f == SYSMIS               \
-            || is_num_user_missing (temp_case->data[filter_index].f,   \
-                                    filter_var)))
-
 /* Nonzero if the case needs to have values deleted before being
    stored, zero otherwise. */
 int compaction_necessary;
 /* Nonzero if the case needs to have values deleted before being
    stored, zero otherwise. */
 int compaction_necessary;
@@ -94,9 +84,11 @@ int compaction_nval;
    `value's. */
 struct ccase *compaction_case;
 
    `value's. */
 struct ccase *compaction_case;
 
-/* Within a session, when paging is turned on, it is never turned back
-   off.  This policy might be too aggressive. */
-static int paging = 0;
+/* Nonzero means that we've overflowed our allotted workspace.
+   After that happens once during a session, we always store the
+   active file on disk instead of in memory.  (This policy may be
+   too aggressive.) */
+static int workspace_overflow = 0;
 
 /* Time at which vfm was last invoked. */
 time_t last_vfm_invocation;
 
 /* Time at which vfm was last invoked. */
 time_t last_vfm_invocation;
@@ -117,12 +109,13 @@ static void finish_compaction (void);
 static void lag_case (void);
 static int procedure_write_case (struct write_case_data *);
 static void clear_temp_case (void);
 static void lag_case (void);
 static int procedure_write_case (struct write_case_data *);
 static void clear_temp_case (void);
+static int exclude_this_case (void);
 \f
 /* Public functions. */
 
 /* Reads all the cases from the active file, transforms them by
    the active set of transformations, calls PROCFUNC with CURCASE
 \f
 /* Public functions. */
 
 /* Reads all the cases from the active file, transforms them by
    the active set of transformations, calls PROCFUNC with CURCASE
-   set to the case , and writes them to a new active file.
+   set to the case, and writes them to a new active file.
 
    Divides the active file into zero or more series of one or more
    cases each.  BEGINFUNC is called before each series.  ENDFUNC is
 
    Divides the active file into zero or more series of one or more
    cases each.  BEGINFUNC is called before each series.  ENDFUNC is
@@ -136,9 +129,13 @@ procedure (void (*beginfunc) (void *),
           void (*endfunc) (void *),
            void *aux)
 {
           void (*endfunc) (void *),
            void *aux)
 {
+  static int recursive_call;
+
   struct write_case_data procedure_write_data;
   struct write_case_data split_file_data;
 
   struct write_case_data procedure_write_data;
   struct write_case_data split_file_data;
 
+  assert (++recursive_call == 1);
+
   if (dict_get_split_cnt (default_dict) == 0) 
     {
       /* Normally we just use the data passed by the user. */
   if (dict_get_split_cnt (default_dict) == 0) 
     {
       /* Normally we just use the data passed by the user. */
@@ -164,8 +161,12 @@ procedure (void (*beginfunc) (void *),
   last_vfm_invocation = time (NULL);
 
   open_active_file ();
   last_vfm_invocation = time (NULL);
 
   open_active_file ();
-  vfm_source->read (procedure_write_case, &procedure_write_data);
+  if (vfm_source != NULL) 
+    vfm_source->class->read (vfm_source,
+                             procedure_write_case, &procedure_write_data);
   close_active_file (&procedure_write_data);
   close_active_file (&procedure_write_data);
+
+  assert (--recursive_call == 0);
 }
 \f
 /* Active file processing support.  Subtly different semantics from
 }
 \f
 /* Active file processing support.  Subtly different semantics from
@@ -202,9 +203,9 @@ process_active_file (void (*beginfunc) (void *),
   beginfunc (aux);
   
   /* There doesn't necessarily need to be an active file. */
   beginfunc (aux);
   
   /* There doesn't necessarily need to be an active file. */
-  if (vfm_source)
-    vfm_source->read (process_active_file_write_case,
-                      &process_active_write_data);
+  if (vfm_source != NULL)
+    vfm_source->class->read (vfm_source, process_active_file_write_case,
+                             &process_active_write_data);
   
   endfunc (aux);
   close_active_file (&process_active_write_data);
   
   endfunc (aux);
   close_active_file (&process_active_write_data);
@@ -242,10 +243,7 @@ process_active_file_write_case (struct write_case_data *data)
     lag_case ();
          
   /* Call the procedure if FILTER and PROCESS IF don't prohibit it. */
     lag_case ();
          
   /* Call the procedure if FILTER and PROCESS IF don't prohibit it. */
-  if (not_canceled
-      && !FILTERED
-      && (process_if_expr == NULL ||
-         expr_evaluate (process_if_expr, temp_case, NULL) == 1.0))
+  if (not_canceled && !exclude_this_case ())
     not_canceled = data->procfunc (temp_case, data->aux);
   
   case_count++;
     not_canceled = data->procfunc (temp_case, data->aux);
   
   case_count++;
@@ -261,7 +259,7 @@ void
 process_active_file_output_case (void)
 {
   vfm_sink_info.ncases++;
 process_active_file_output_case (void)
 {
   vfm_sink_info.ncases++;
-  vfm_sink->write ();
+  vfm_sink->class->write (vfm_sink, temp_case);
 }
 \f
 /* Opening the active file. */
 }
 \f
 /* Opening the active file. */
@@ -288,26 +286,27 @@ prepare_for_writing (void)
      file--it's just a waste of time and space. */
 
   vfm_sink_info.ncases = 0;
      file--it's just a waste of time and space. */
 
   vfm_sink_info.ncases = 0;
-  vfm_sink_info.nval = dict_get_value_cnt (default_dict);
-  vfm_sink_info.case_size = (sizeof (struct ccase)
-                            + ((dict_get_value_cnt (default_dict) - 1)
-                                * sizeof (union value)));
+  vfm_sink_info.nval = dict_get_next_value_idx (default_dict);
+  vfm_sink_info.case_size = dict_get_case_size (default_dict);
   
   if (vfm_sink == NULL)
     {
   
   if (vfm_sink == NULL)
     {
-      if (vfm_sink_info.case_size * vfm_source_info.ncases > MAX_WORKSPACE
-         && !paging)
+      if (vfm_sink_info.case_size * vfm_source_info.ncases > set_max_workspace
+         && !workspace_overflow)
        {
          msg (MW, _("Workspace overflow predicted.  Max workspace is "
                     "currently set to %d KB (%d cases at %d bytes each).  "
        {
          msg (MW, _("Workspace overflow predicted.  Max workspace is "
                     "currently set to %d KB (%d cases at %d bytes each).  "
-                    "Paging active file to disk."),
-              MAX_WORKSPACE / 1024, MAX_WORKSPACE / vfm_sink_info.case_size,
+                    "Writing active file to disk."),
+              set_max_workspace / 1024, set_max_workspace / vfm_sink_info.case_size,
               vfm_sink_info.case_size);
          
               vfm_sink_info.case_size);
          
-         paging = 1;
+         workspace_overflow = 1;
        }
        }
-      
-      vfm_sink = paging ? &vfm_disk_stream : &vfm_memory_stream;
+
+      if (workspace_overflow)
+        vfm_sink = create_case_sink (&disk_sink_class, NULL);
+      else
+        vfm_sink = create_case_sink (&memory_sink_class, NULL);
     }
 }
 
     }
 }
 
@@ -331,17 +330,20 @@ arrange_compaction (void)
             count_values += v->nv;
           } 
       }
             count_values += v->nv;
           } 
       }
-    assert (temporary == 2 || count_values <= dict_get_value_cnt (temp_dict));
+    assert (temporary == 2
+            || count_values <= dict_get_next_value_idx (temp_dict));
   }
   
   /* Compaction is only necessary if the number of `value's to output
      differs from the number already present. */
   compaction_nval = count_values;
   }
   
   /* Compaction is only necessary if the number of `value's to output
      differs from the number already present. */
   compaction_nval = count_values;
-  compaction_necessary = (temporary == 2
-                          || count_values != dict_get_value_cnt (temp_dict));
+  if (temporary == 2 || count_values != dict_get_next_value_idx (temp_dict))
+    compaction_necessary = 1;
+  else
+    compaction_necessary = 0;
   
   
-  if (vfm_sink->init)
-    vfm_sink->init ();
+  if (vfm_sink->class->open != NULL)
+    vfm_sink->class->open (vfm_sink);
 }
 
 /* Prepares the temporary case and compaction case. */
 }
 
 /* Prepares the temporary case and compaction case. */
@@ -399,21 +401,6 @@ vector_initialization (void)
     }
 }
 
     }
 }
 
-/* Sets filter_index to an appropriate value. */
-static void
-setup_filter (void)
-{
-  filter_var = dict_get_filter (default_dict);
-  
-  if (filter_var != NULL)
-    {
-      assert (filter_var->type == NUMERIC);
-      filter_index = filter_var->index;
-    } else {
-      filter_index = -1;
-    }
-}
-
 /* Sets all the lag-related variables based on value of n_lag. */
 static void
 setup_lag (void)
 /* Sets all the lag-related variables based on value of n_lag. */
 static void
 setup_lag (void)
@@ -427,8 +414,7 @@ setup_lag (void)
   lag_head = 0;
   lag_queue = xmalloc (n_lag * sizeof *lag_queue);
   for (i = 0; i < n_lag; i++)
   lag_head = 0;
   lag_queue = xmalloc (n_lag * sizeof *lag_queue);
   for (i = 0; i < n_lag; i++)
-    lag_queue[i] = xmalloc (dict_get_value_cnt (temp_dict)
-                            * sizeof **lag_queue);
+    lag_queue[i] = xmalloc (dict_get_case_size (temp_dict));
 }
 
 /* There is a lot of potential confusion in the vfm and related
 }
 
 /* There is a lot of potential confusion in the vfm and related
@@ -489,7 +475,6 @@ open_active_file (void)
   make_temp_case ();
   vector_initialization ();
   discard_ctl_stack ();
   make_temp_case ();
   vector_initialization ();
   discard_ctl_stack ();
-  setup_filter ();
   setup_lag ();
 
   /* Debug output. */
   setup_lag ();
 
   /* Debug output. */
@@ -535,18 +520,21 @@ close_active_file (struct write_case_data *data)
     finish_compaction ();
     
   /* Old data sink --> New data source. */
     finish_compaction ();
     
   /* Old data sink --> New data source. */
-  if (vfm_source && vfm_source->destroy_source)
-    vfm_source->destroy_source ();
-  
-  vfm_source = vfm_sink;
+  if (vfm_source != NULL) 
+    {
+      if (vfm_source->class->destroy != NULL)
+        vfm_source->class->destroy (vfm_source);
+      free (vfm_source);
+    }
+
+  vfm_source = vfm_sink->class->make_source (vfm_sink);
   vfm_source_info.ncases = vfm_sink_info.ncases;
   vfm_source_info.nval = compaction_nval;
   vfm_source_info.case_size = (sizeof (struct ccase)
                               + (compaction_nval - 1) * sizeof (union value));
   vfm_source_info.ncases = vfm_sink_info.ncases;
   vfm_source_info.nval = compaction_nval;
   vfm_source_info.case_size = (sizeof (struct ccase)
                               + (compaction_nval - 1) * sizeof (union value));
-  if (vfm_source->mode)
-    vfm_source->mode ();
 
   /* Old data sink is gone now. */
 
   /* Old data sink is gone now. */
+  free (vfm_sink);
   vfm_sink = NULL;
 
   /* Cancel TEMPORARY. */
   vfm_sink = NULL;
 
   /* Cancel TEMPORARY. */
@@ -564,7 +552,7 @@ close_active_file (struct write_case_data *data)
   process_if_expr = NULL;
 
   /* Cancel FILTER if temporary. */
   process_if_expr = NULL;
 
   /* Cancel FILTER if temporary. */
-  if (filter_var != NULL && !FILTER_before_TEMPORARY)
+  if (dict_get_filter (default_dict) != NULL && !FILTER_before_TEMPORARY)
     dict_set_filter (default_dict, NULL);
 
   /* Cancel transformations. */
     dict_set_filter (default_dict, NULL);
 
   /* Cancel transformations. */
@@ -581,16 +569,12 @@ close_active_file (struct write_case_data *data)
 \f
 /* Disk case stream. */
 
 \f
 /* Disk case stream. */
 
-/* Associated files. */
-FILE *disk_source_file;
-FILE *disk_sink_file;
-
 /* Initializes the disk sink. */
 static void
 /* Initializes the disk sink. */
 static void
-disk_stream_init (void)
+disk_sink_create (struct case_sink *sink)
 {
 {
-  disk_sink_file = tmpfile ();
-  if (!disk_sink_file)
+  sink->aux = tmpfile ();
+  if (!sink->aux)
     {
       msg (ME, _("An error occurred attempting to create a temporary "
                 "file for use as the active file: %s."),
     {
       msg (ME, _("An error occurred attempting to create a temporary "
                 "file for use as the active file: %s."),
@@ -599,44 +583,21 @@ disk_stream_init (void)
     }
 }
 
     }
 }
 
-/* Reads all cases from the disk source and passes them one by one to
-   write_case(). */
-static void
-disk_stream_read (write_case_func *write_case, write_case_data wc_data)
-{
-  int i;
-
-  for (i = 0; i < vfm_source_info.ncases; i++)
-    {
-      if (!fread (temp_case, vfm_source_info.case_size, 1, disk_source_file))
-       {
-         msg (ME, _("An error occurred while attempting to read from "
-              "a temporary file created for the active file: %s."),
-              strerror (errno));
-         err_failure ();
-         return;
-       }
-
-      if (!write_case (wc_data))
-       return;
-    }
-}
-
 /* Writes temp_case to the disk sink. */
 static void
 /* Writes temp_case to the disk sink. */
 static void
-disk_stream_write (void)
+disk_sink_write (struct case_sink *sink, struct ccase *c)
 {
 {
+  FILE *file = sink->aux;
   union value *src_case;
 
   if (compaction_necessary)
     {
   union value *src_case;
 
   if (compaction_necessary)
     {
-      compact_case (compaction_case, temp_case);
-      src_case = (union value *) compaction_case;
+      compact_case (compaction_case, c);
+      src_case = compaction_case->data;
     }
     }
-  else src_case = (union value *) temp_case;
+  else src_case = c->data;
 
 
-  if (fwrite (src_case, sizeof *src_case * compaction_nval, 1,
-             disk_sink_file) != 1)
+  if (fwrite (src_case, sizeof *src_case * compaction_nval, 1, file) != 1)
     {
       msg (ME, _("An error occurred while attempting to write to a "
                 "temporary file used as the active file: %s."),
     {
       msg (ME, _("An error occurred while attempting to write to a "
                 "temporary file used as the active file: %s."),
@@ -645,12 +606,25 @@ disk_stream_write (void)
     }
 }
 
     }
 }
 
-/* Switches the stream from a sink to a source. */
+/* Destroys the sink's internal data. */
 static void
 static void
-disk_stream_mode (void)
+disk_sink_destroy (struct case_sink *sink)
+{
+  FILE *file = sink->aux;
+  if (file != NULL)
+    fclose (file);
+}
+
+/* Closes and destroys the sink and returns a disk source to read
+   back the written data. */
+static struct case_source *
+disk_sink_make_source (struct case_sink *sink) 
 {
 {
-  /* Rewind the sink. */
-  if (fseek (disk_sink_file, 0, SEEK_SET) != 0)
+  FILE *file = sink->aux;
+  
+  /* Rewind the file. */
+  assert (file != NULL);
+  if (fseek (file, 0, SEEK_SET) != 0)
     {
       msg (ME, _("An error occurred while attempting to rewind a "
                 "temporary file used as the active file: %s."),
     {
       msg (ME, _("An error occurred while attempting to rewind a "
                 "temporary file used as the active file: %s."),
@@ -658,107 +632,118 @@ disk_stream_mode (void)
       err_failure ();
     }
   
       err_failure ();
     }
   
-  /* Sink --> source variables. */
-  disk_source_file = disk_sink_file;
+  return create_case_source (&disk_source_class, file);
 }
 
 }
 
-/* Destroys the source's internal data. */
+/* Disk sink. */
+const struct case_sink_class disk_sink_class = 
+  {
+    "disk",
+    disk_sink_create,
+    disk_sink_write,
+    disk_sink_destroy,
+    disk_sink_make_source,
+  };
+\f
+/* Disk source. */
+
+/* Reads all cases from the disk source and passes them one by one to
+   write_case(). */
 static void
 static void
-disk_stream_destroy_source (void)
+disk_source_read (struct case_source *source,
+                  write_case_func *write_case, write_case_data wc_data)
 {
 {
-  if (disk_source_file)
+  FILE *file = source->aux;
+  int i;
+
+  for (i = 0; i < vfm_source_info.ncases; i++)
     {
     {
-      fclose (disk_source_file);
-      disk_source_file = NULL;
+      if (!fread (temp_case, vfm_source_info.case_size, 1, file))
+       {
+         msg (ME, _("An error occurred while attempting to read from "
+              "a temporary file created for the active file: %s."),
+              strerror (errno));
+         err_failure ();
+         return;
+       }
+
+      if (!write_case (wc_data))
+       return;
     }
 }
 
     }
 }
 
-/* Destroys the sink's internal data. */
+/* Destroys the source's internal data. */
 static void
 static void
-disk_stream_destroy_sink (void)
+disk_source_destroy (struct case_source *source)
 {
 {
-  if (disk_sink_file)
-    {
-      fclose (disk_sink_file);
-      disk_sink_file = NULL;
-    }
+  FILE *file = source->aux;
+  if (file != NULL)
+    fclose (file);
 }
 
 }
 
-/* Disk stream. */
-struct case_stream vfm_disk_stream = 
+/* Disk source. */
+const struct case_source_class disk_source_class = 
   {
   {
-    disk_stream_init,
-    disk_stream_read,
-    disk_stream_write,
-    disk_stream_mode,
-    disk_stream_destroy_source,
-    disk_stream_destroy_sink,
     "disk",
     "disk",
+    disk_source_read,
+    disk_source_destroy,
   };
 \f
 /* Memory case stream. */
 
   };
 \f
 /* Memory case stream. */
 
-/* List of cases stored in the stream. */
-struct case_list *memory_source_cases;
-struct case_list *memory_sink_cases;
-
-/* Current case. */
-struct case_list *memory_sink_iter;
+/* Memory sink data. */
+struct memory_sink_info
+  {
+    int max_cases;              /* Maximum cases before switching to disk. */
+    struct case_list *head;     /* First case in list. */
+    struct case_list *tail;     /* Last case in list. */
+  };
 
 
-/* Maximum number of cases. */
-int memory_sink_max_cases;
+/* Memory source data. */
+struct memory_source_info 
+  {
+    struct case_list *cases;    /* List of cases. */
+  };
 
 
-/* Initializes the memory stream variables for writing. */
 static void
 static void
-memory_stream_init (void)
+memory_sink_create (struct case_sink *sink) 
 {
 {
-  memory_sink_cases = NULL;
-  memory_sink_iter = NULL;
+  struct memory_sink_info *info;
   
   
-  assert (compaction_nval);
-  memory_sink_max_cases = MAX_WORKSPACE / (sizeof (union value) * compaction_nval);
-}
+  sink->aux = info = xmalloc (sizeof *info);
 
 
-/* Reads the case stream from memory and passes it to write_case(). */
-static void
-memory_stream_read (write_case_func *write_case, write_case_data wc_data)
-{
-  while (memory_source_cases != NULL)
-    {
-      memcpy (temp_case, &memory_source_cases->c, vfm_source_info.case_size);
-      
-      {
-       struct case_list *current = memory_source_cases;
-       memory_source_cases = memory_source_cases->next;
-       free (current);
-      }
-      
-      if (!write_case (wc_data))
-       return;
-    }
+  assert (compaction_nval > 0);
+  info->max_cases = set_max_workspace / (sizeof (union value) * compaction_nval);
+  info->head = info->tail = NULL;
 }
 
 }
 
-/* Writes temp_case to the memory stream. */
 static void
 static void
-memory_stream_write (void)
+memory_sink_write (struct case_sink *sink, struct ccase *c) 
 {
 {
-  struct case_list *new_case = malloc (sizeof (struct case_list)
-                                      + ((compaction_nval - 1)
-                                         * sizeof (union value)));
+  struct memory_sink_info *info = sink->aux;
+  size_t case_size;
+  struct case_list *new_case;
+
+  case_size = sizeof (struct case_list)
+                      + ((compaction_nval - 1) * sizeof (union value));
+  new_case = malloc (case_size);
 
   /* If we've got memory to spare then add it to the linked list. */
 
   /* If we've got memory to spare then add it to the linked list. */
-  if (vfm_sink_info.ncases <= memory_sink_max_cases && new_case != NULL)
+  if (vfm_sink_info.ncases <= info->max_cases && new_case != NULL)
     {
     {
-      if (compaction_necessary)
-       compact_case (&new_case->c, temp_case);
+      /* Append case to linked list. */
+      new_case->next = NULL;
+      if (info->head != NULL)
+        info->tail->next = new_case;
       else
       else
-       memcpy (&new_case->c, temp_case, sizeof (union value) * compaction_nval);
+        info->head = new_case;
+      info->tail = new_case;
 
 
-      /* Append case to linked list. */
-      if (memory_sink_cases)
-       memory_sink_iter = memory_sink_iter->next = new_case;
+      /* Copy data into case. */
+      if (compaction_necessary)
+       compact_case (&new_case->c, c);
       else
       else
-       memory_sink_iter = memory_sink_cases = new_case;
+       memcpy (&new_case->c, c, sizeof (union value) * compaction_nval);
     }
   else
     {
     }
   else
     {
@@ -767,36 +752,32 @@ memory_stream_write (void)
 
       /* Notify the user. */
       if (!new_case)
 
       /* Notify the user. */
       if (!new_case)
-       msg (MW, _("Virtual memory exhausted.  Paging active file "
+       msg (MW, _("Virtual memory exhausted.  Writing active file "
                   "to disk."));
       else
        msg (MW, _("Workspace limit of %d KB (%d cases at %d bytes each) "
                   "to disk."));
       else
        msg (MW, _("Workspace limit of %d KB (%d cases at %d bytes each) "
-                  "overflowed.  Paging active file to disk."),
-            MAX_WORKSPACE / 1024, memory_sink_max_cases,
+                  "overflowed.  Writing active file to disk."),
+            set_max_workspace / 1024, info->max_cases,
             compaction_nval * sizeof (union value));
 
       free (new_case);
 
       /* Switch to a disk sink. */
             compaction_nval * sizeof (union value));
 
       free (new_case);
 
       /* Switch to a disk sink. */
-      vfm_sink = &vfm_disk_stream;
-      vfm_sink->init ();
-      paging = 1;
-
-      /* Terminate the list. */
-      if (memory_sink_iter)
-       memory_sink_iter->next = NULL;
+      vfm_sink = create_case_sink (&disk_sink_class, NULL);
+      vfm_sink->class->open (vfm_sink);
+      workspace_overflow = 1;
 
       /* Write the cases to disk and destroy them.  We can't call
          vfm->sink->write() because of compaction. */
 
       /* Write the cases to disk and destroy them.  We can't call
          vfm->sink->write() because of compaction. */
-      for (cur = memory_sink_cases; cur; cur = next)
+      for (cur = info->head; cur; cur = next)
        {
          next = cur->next;
          if (fwrite (cur->c.data, sizeof (union value) * compaction_nval, 1,
        {
          next = cur->next;
          if (fwrite (cur->c.data, sizeof (union value) * compaction_nval, 1,
-                     disk_sink_file) != 1)
+                     vfm_sink->aux) != 1)
            {
              msg (ME, _("An error occurred while attempting to "
                         "write to a temporary file created as the "
            {
              msg (ME, _("An error occurred while attempting to "
                         "write to a temporary file created as the "
-                        "active file, while paging to disk: %s."),
+                        "active file: %s."),
                   strerror (errno));
              err_failure ();
            }
                   strerror (errno));
              err_failure ();
            }
@@ -804,36 +785,38 @@ memory_stream_write (void)
        }
 
       /* Write the current case to disk. */
        }
 
       /* Write the current case to disk. */
-      vfm_sink->write ();
+      vfm_sink->class->write (vfm_sink, c);
     }
 }
 
 /* If the data is stored in memory, causes it to be written to disk.
    To be called only *between* procedure()s, not within them. */
 void
     }
 }
 
 /* If the data is stored in memory, causes it to be written to disk.
    To be called only *between* procedure()s, not within them. */
 void
-page_to_disk (void)
+write_active_file_to_disk (void)
 {
 {
-  if (vfm_source == &vfm_memory_stream)
+  if (case_source_is_class (vfm_source, &memory_source_class))
     {
     {
+      struct memory_source_info *info = vfm_source->aux;
+
       /* Switch to a disk sink. */
       /* Switch to a disk sink. */
-      vfm_sink = &vfm_disk_stream;
-      vfm_sink->init ();
-      paging = 1;
+      vfm_sink = create_case_sink (&disk_sink_class, NULL);
+      vfm_sink->class->open (vfm_sink);
+      workspace_overflow = 1;
       
       /* Write the cases to disk and destroy them.  We can't call
          vfm->sink->write() because of compaction. */
       {
        struct case_list *cur, *next;
        
       
       /* Write the cases to disk and destroy them.  We can't call
          vfm->sink->write() because of compaction. */
       {
        struct case_list *cur, *next;
        
-       for (cur = memory_source_cases; cur; cur = next)
+       for (cur = info->cases; cur; cur = next)
          {
            next = cur->next;
            if (fwrite (cur->c.data, sizeof *cur->c.data * compaction_nval, 1,
          {
            next = cur->next;
            if (fwrite (cur->c.data, sizeof *cur->c.data * compaction_nval, 1,
-                       disk_sink_file) != 1)
+                       vfm_sink->aux) != 1)
              {
                msg (ME, _("An error occurred while attempting to "
                           "write to a temporary file created as the "
              {
                msg (ME, _("An error occurred while attempting to "
                           "write to a temporary file created as the "
-                          "active file, while paging to disk: %s."),
+                          "active file: %s."),
                     strerror (errno));
                err_failure ();
              }
                     strerror (errno));
                err_failure ();
              }
@@ -841,64 +824,107 @@ page_to_disk (void)
          }
       }
       
          }
       }
       
-      vfm_source = &vfm_disk_stream;
-      vfm_source->mode ();
-
+      vfm_source = vfm_sink->class->make_source (vfm_sink);
       vfm_sink = NULL;
     }
 }
 
       vfm_sink = NULL;
     }
 }
 
-/* Switch the memory stream from sink to source mode. */
+/* Destroy all memory sink data. */
 static void
 static void
-memory_stream_mode (void)
+memory_sink_destroy (struct case_sink *sink)
 {
 {
-  /* Terminate the list. */
-  if (memory_sink_iter)
-    memory_sink_iter->next = NULL;
+  struct memory_sink_info *info = sink->aux;
+  struct case_list *cur, *next;
+  
+  for (cur = info->head; cur; cur = next)
+    {
+      next = cur->next;
+      free (cur);
+    }
+  free (info);
+}
+
+/* Switch the memory stream from sink to source mode. */
+static struct case_source *
+memory_sink_make_source (struct case_sink *sink)
+{
+  struct memory_sink_info *sink_info = sink->aux;
+  struct memory_source_info *source_info;
+
+  source_info = xmalloc (sizeof *source_info);
+  source_info->cases = sink_info->head;
+
+  free (sink_info);
 
 
-  /* Sink --> source variables. */
-  memory_source_cases = memory_sink_cases;
-  memory_sink_cases = NULL;
+  return create_case_source (&memory_source_class, source_info);
 }
 
 }
 
-/* Destroy all memory source data. */
+const struct case_sink_class memory_sink_class = 
+  {
+    "memory",
+    memory_sink_create,
+    memory_sink_write,
+    memory_sink_destroy,
+    memory_sink_make_source,
+  };
+
+/* Reads the case stream from memory and passes it to write_case(). */
 static void
 static void
-memory_stream_destroy_source (void)
+memory_source_read (struct case_source *source,
+                    write_case_func *write_case, write_case_data wc_data)
 {
 {
-  struct case_list *cur, *next;
-  
-  for (cur = memory_source_cases; cur; cur = next)
+  struct memory_source_info *info = source->aux;
+
+  while (info->cases != NULL) 
     {
     {
-      next = cur->next;
-      free (cur);
+      struct case_list *iter = info->cases;
+      info->cases = iter->next;
+      memcpy (temp_case, &iter->c, vfm_source_info.case_size);
+      free (iter);
+      
+      if (!write_case (wc_data))
+       return;
     }
     }
-  memory_source_cases = NULL;
 }
 
 }
 
-/* Destroy all memory sink data. */
+/* Destroy all memory source data. */
 static void
 static void
-memory_stream_destroy_sink (void)
+memory_source_destroy (struct case_source *source)
 {
 {
+  struct memory_source_info *info = source->aux;
   struct case_list *cur, *next;
   
   struct case_list *cur, *next;
   
-  for (cur = memory_sink_cases; cur; cur = next)
+  for (cur = info->cases; cur; cur = next)
     {
       next = cur->next;
       free (cur);
     }
     {
       next = cur->next;
       free (cur);
     }
-  memory_sink_cases = NULL;
+  free (info);
 }
 }
-  
+
+struct case_list *
+memory_source_get_cases (const struct case_source *source) 
+{
+  struct memory_source_info *info = source->aux;
+
+  return info->cases;
+}
+
+void
+memory_source_set_cases (const struct case_source *source,
+                         struct case_list *cases) 
+{
+  struct memory_source_info *info = source->aux;
+
+  info->cases = cases;
+}
+
 /* Memory stream. */
 /* Memory stream. */
-struct case_stream vfm_memory_stream = 
+const struct case_source_class memory_source_class = 
   {
   {
-    memory_stream_init,
-    memory_stream_read,
-    memory_stream_write,
-    memory_stream_mode,
-    memory_stream_destroy_source,
-    memory_stream_destroy_sink,
     "memory",
     "memory",
+    memory_source_read,
+    memory_source_destroy,
   };
 \f
 #include "debug-print.h"
   };
 \f
 #include "debug-print.h"
@@ -910,7 +936,7 @@ lag_case (void)
   if (lag_count < n_lag)
     lag_count++;
   memcpy (lag_queue[lag_head], temp_case,
   if (lag_count < n_lag)
     lag_count++;
   memcpy (lag_queue[lag_head], temp_case,
-          sizeof (union value) * dict_get_value_cnt (temp_dict));
+          dict_get_case_size (temp_dict));
   if (++lag_head >= n_lag)
     lag_head = 0;
 }
   if (++lag_head >= n_lag)
     lag_head = 0;
 }
@@ -959,7 +985,7 @@ procedure_write_case (write_case_data wc_data)
            lag_case ();
          
          vfm_sink_info.ncases++;
            lag_case ();
          
          vfm_sink_info.ncases++;
-         vfm_sink->write ();
+         vfm_sink->class->write (vfm_sink, temp_case);
 
          if (dict_get_case_limit (default_dict))
            more_cases = (vfm_sink_info.ncases
 
          if (dict_get_case_limit (default_dict))
            more_cases = (vfm_sink_info.ncases
@@ -1000,10 +1026,7 @@ procedure_write_case (write_case_data wc_data)
 
   /* Call the procedure if there is one and FILTER and PROCESS IF
      don't prohibit it. */
 
   /* Call the procedure if there is one and FILTER and PROCESS IF
      don't prohibit it. */
-  if (wc_data->procfunc != NULL
-      && !FILTERED
-      && (process_if_expr == NULL ||
-         expr_evaluate (process_if_expr, temp_case, NULL) == 1.0))
+  if (wc_data->procfunc != NULL && !exclude_this_case ())
     wc_data->procfunc (temp_case, wc_data->aux);
 
   case_count++;
     wc_data->procfunc (temp_case, wc_data->aux);
 
   case_count++;
@@ -1040,6 +1063,28 @@ clear_temp_case (void)
     }
 }
 
     }
 }
 
+/* Returns nonzero if this case should be exclude as specified on
+   FILTER or PROCESS IF, otherwise zero. */
+static int
+exclude_this_case (void)
+{
+  /* FILTER. */
+  struct variable *filter_var = dict_get_filter (default_dict);
+  if (filter_var != NULL) 
+    {
+      double f = temp_case->data[filter_var->fv].f;
+      if (f == 0.0 || f == SYSMIS || is_num_user_missing (f, filter_var))
+        return 1;
+    }
+
+  /* PROCESS IF. */
+  if (process_if_expr != NULL
+      && expr_evaluate (process_if_expr, temp_case, NULL) != 1.0)
+    return 1;
+
+  return 0;
+}
+
 /* Appends TRNS to t_trns[], the list of all transformations to be
    performed on data as it is read from the active file. */
 void
 /* Appends TRNS to t_trns[], the list of all transformations to be
    performed on data as it is read from the active file. */
 void
@@ -1101,12 +1146,7 @@ dump_splits (struct ccase *c)
       assert (v->type == NUMERIC || v->type == ALPHA);
       tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
       
       assert (v->type == NUMERIC || v->type == ALPHA);
       tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
       
-      {
-       union value val = c->data[v->fv];
-       if (v->type == ALPHA)
-         val.c = c->data[v->fv].s;
-       data_out (temp_buf, &v->print, &val);
-      }
+      data_out (temp_buf, &v->print, &c->data[v->fv]);
       
       temp_buf[v->print.w] = 0;
       tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
       
       temp_buf[v->print.w] = 0;
       tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
@@ -1158,7 +1198,7 @@ SPLIT_FILE_procfunc (struct ccase *c, void *data_)
       switch (v->type)
        {
        case NUMERIC:
       switch (v->type)
        {
        case NUMERIC:
-         if (approx_ne (c->data[v->fv].f, prev_case->data[v->fv].f))
+         if (c->data[v->fv].f != prev_case->data[v->fv].f)
            goto not_equal;
          break;
        case ALPHA:
            goto not_equal;
          break;
        case ALPHA:
@@ -1243,4 +1283,36 @@ finish_compaction (void)
   dict_compact_values (default_dict);
 }
 
   dict_compact_values (default_dict);
 }
 
-  
+struct case_source *
+create_case_source (const struct case_source_class *class, void *aux) 
+{
+  struct case_source *source = xmalloc (sizeof *source);
+  source->class = class;
+  source->aux = aux;
+  return source;
+}
+
+int
+case_source_is_complex (const struct case_source *source) 
+{
+  return source != NULL && (source->class == &input_program_source_class
+                            || source->class == &file_type_source_class);
+}
+
+int
+case_source_is_class (const struct case_source *source,
+                      const struct case_source_class *class) 
+{
+  return source != NULL && source->class == class;
+
+}
+
+struct case_sink *
+create_case_sink (const struct case_sink_class *class, void *aux) 
+{
+  struct case_sink *sink = xmalloc (sizeof *sink);
+  sink->class = class;
+  sink->aux = aux;
+  return sink;
+}
+