Redo VFM interface. Get rid of compaction_necessary, compaction_nval,
[pspp-builds.git] / src / vfm.c
index 51bd02cc5c14e8707789670ec81e4324ed947697..4a693bc297f2b05a788cc2252b3111b5917def19 100644 (file)
--- a/src/vfm.c
+++ b/src/vfm.c
 /* Procedure execution data. */
 struct write_case_data
   {
-    /* Functions to call... */
-    void (*begin_func) (void *);               /* ...before data. */
-    int (*proc_func) (struct ccase *, void *); /* ...with data. */
-    void (*end_func) (void *);                 /* ...after data. */
-    void *func_aux;                            /* Auxiliary data. */ 
+    /* Function to call for each case. */
+    int (*proc_func) (struct ccase *, void *); /* Function. */
+    void *aux;                                 /* Auxiliary data. */ 
 
-    /* Extra auxiliary data. */
-    void *aux;
+    struct ccase *trns_case;    /* Case used for transformations. */
+    struct ccase *sink_case;    /* Case written to sink, if
+                                   compaction is necessary. */
+    size_t cases_written;       /* Cases output so far. */
+    size_t cases_analyzed;      /* Cases passed to procedure so far. */
   };
 
 /* The current active file, from which cases are read. */
@@ -70,14 +71,7 @@ struct case_sink *vfm_sink;
 
 /* Nonzero if the case needs to have values deleted before being
    stored, zero otherwise. */
-int compaction_necessary;
-
-/* Number of values after compaction. */
-int compaction_nval;
-
-/* Temporary case buffer with enough room for `compaction_nval'
-   `value's. */
-struct ccase *compaction_case;
+static int compaction_necessary;
 
 /* Nonzero means that we've overflowed our allotted workspace.
    After that happens once during a session, we always store the
@@ -88,205 +82,79 @@ static int workspace_overflow = 0;
 /* Time at which vfm was last invoked. */
 time_t last_vfm_invocation;
 
-/* Number of cases passed to proc_func(). */
-static int case_count;
-
 /* Lag queue. */
 int n_lag;                     /* Number of cases to lag. */
 static int lag_count;          /* Number of cases in lag_queue so far. */
 static int lag_head;           /* Index where next case will be added. */
 static struct ccase **lag_queue; /* Array of n_lag ccase * elements. */
 
-static struct ccase *create_trns_case (struct dictionary *dict);
+static struct ccase *create_trns_case (struct dictionary *);
 static void open_active_file (void);
-static void close_active_file (struct write_case_data *);
-static int SPLIT_FILE_proc_func (struct ccase *, void *);
-static void finish_compaction (void);
-static void lag_case (const struct ccase *);
-static write_case_func procedure_write_case;
-static void clear_case (struct ccase *);
-static int exclude_this_case (const struct ccase *, int case_num);
+static int write_case (struct write_case_data *wc_data);
+static int execute_transformations (struct ccase *c,
+                                    struct trns_header **trns,
+                                    int first_idx, int last_idx,
+                                    int case_num);
+static int filter_case (const struct ccase *c, int case_num);
+static void lag_case (const struct ccase *c);
+static void compact_case (struct ccase *dest, const struct ccase *src);
+static void clear_case (struct ccase *c);
+static void close_active_file (void);
 \f
 /* Public functions. */
 
-/* Auxiliary data for executing a procedure. */
-struct procedure_aux_data 
-  {
-    struct ccase *trns_case;    /* Case used for transformations. */
-    size_t cases_written;       /* Number of cases written so far. */
-  };
-
-/* Auxiliary data for SPLIT FILE. */
-struct split_aux_data 
-  {
-    struct ccase *prev_case;    /* Data in previous case. */
-  };
-
-/* Reads all the cases from the active file, transforms them by
-   the active set of transformations, passes each of them to
-   PROC_FUNC, and writes them to a new active file.
+/* Reads the data from the input program and writes it to a new
+   active file.  For each case we read from the input program, we
+   do the following
 
-   Divides the active file into zero or more series of one or more
-   cases each.  BEGIN_FUNC is called before each series.  END_FUNC is
-   called after each series.
+   1. Execute permanent transformations.  If these drop the case,
+      start the next case from step 1.
 
-   Arbitrary user-specified data AUX is passed to BEGIN_FUNC,
-   PROC_FUNC, and END_FUNC as auxiliary data. */
+   2. N OF CASES.  If we have already written N cases, start the
+      next case from step 1.
+   
+   3. Write case to replacement active file.
+   
+   4. Execute temporary transformations.  If these drop the case,
+      start the next case from step 1.
+      
+   5. FILTER, PROCESS IF.  If these drop the case, start the next
+      case from step 1.
+   
+   6. Post-TEMPORARY N OF CASES.  If we have already analyzed N
+      cases, start the next case from step 1.
+      
+   7. Pass case to PROC_FUNC, passing AUX as auxiliary data. */
 void
-procedure (void (*begin_func) (void *),
-          int (*proc_func) (struct ccase *, void *),
-          void (*end_func) (void *),
-           void *func_aux)
+procedure (int (*proc_func) (struct ccase *, void *), void *aux)
 {
   static int recursive_call;
 
-  struct write_case_data procedure_write_data;
-  struct procedure_aux_data proc_aux;
-
-  struct write_case_data split_file_data;
-  struct split_aux_data split_aux;
-  int split;
+  struct write_case_data wc_data;
 
   assert (++recursive_call == 1);
 
-  proc_aux.cases_written = 0;
-  proc_aux.trns_case = create_trns_case (default_dict);
-
-  /* Normally we just use the data passed by the user. */
-  procedure_write_data.begin_func = begin_func;
-  procedure_write_data.proc_func = proc_func;
-  procedure_write_data.end_func = end_func;
-  procedure_write_data.func_aux = func_aux;
-  procedure_write_data.aux = &proc_aux;
-
-  /* Under SPLIT FILE, we add a layer of indirection. */
-  split = dict_get_split_cnt (default_dict) > 0;
-  if (split) 
-    {
-      split_file_data = procedure_write_data;
-      split_file_data.aux = &split_aux;
-
-      split_aux.prev_case = xmalloc (dict_get_case_size (default_dict));
-
-      procedure_write_data.begin_func = NULL;
-      procedure_write_data.proc_func = SPLIT_FILE_proc_func;
-      procedure_write_data.end_func = end_func;
-      procedure_write_data.func_aux = &split_file_data;
-    }
+  wc_data.proc_func = proc_func;
+  wc_data.aux = aux;
+  wc_data.trns_case = create_trns_case (default_dict);
+  wc_data.sink_case = xmalloc (dict_get_case_size (default_dict));
+  wc_data.cases_written = 0;
 
   last_vfm_invocation = time (NULL);
 
   open_active_file ();
   if (vfm_source != NULL) 
     vfm_source->class->read (vfm_source,
-                             proc_aux.trns_case,
-                             procedure_write_case, &procedure_write_data);
-  close_active_file (&procedure_write_data);
-
-  if (split)
-    free (split_aux.prev_case);
+                             wc_data.trns_case,
+                             write_case, &wc_data);
+  close_active_file ();
 
-  free (proc_aux.trns_case);
+  free (wc_data.sink_case);
+  free (wc_data.trns_case);
 
   assert (--recursive_call == 0);
 }
-\f
-/* Active file processing support.  Subtly different semantics from
-   procedure(). */
-
-static write_case_func process_active_file_write_case;
-
-/* The case_func might want us to stop calling it. */
-static int not_canceled;
-
-/* Reads all the cases from the active file and passes them
-   one-by-one to CASE_FUNC.  Before any cases are passed, calls
-   BEGIN_FUNC.  After all the cases have been passed, calls
-   END_FUNC.  BEGIN_FUNC, CASE_FUNC, and END_FUNC can write to
-   the output file by calling process_active_file_output_case().
-
-   process_active_file() ignores TEMPORARY, SPLIT FILE, and N. */
-void
-process_active_file (void (*begin_func) (void *),
-                    int (*case_func) (struct ccase *, void *),
-                    void (*end_func) (void *),
-                     void *func_aux)
-{
-  struct procedure_aux_data proc_aux;
-  struct write_case_data process_active_write_data;
-
-  proc_aux.cases_written = 0;
-  proc_aux.trns_case = create_trns_case (default_dict);
-
-  process_active_write_data.begin_func = begin_func;
-  process_active_write_data.proc_func = case_func;
-  process_active_write_data.end_func = end_func;
-  process_active_write_data.func_aux = func_aux;
-  process_active_write_data.aux = &proc_aux;
-
-  not_canceled = 1;
-
-  open_active_file ();
-  begin_func (func_aux);
-  if (vfm_source != NULL)
-    vfm_source->class->read (vfm_source, proc_aux.trns_case,
-                             process_active_file_write_case,
-                             &process_active_write_data);
-  end_func (func_aux);
-  close_active_file (&process_active_write_data);
-}
-
-/* Pass the current case to case_func. */
-static int
-process_active_file_write_case (struct write_case_data *wc_data)
-{
-  struct procedure_aux_data *proc_aux = wc_data->aux;
-  int cur_trns;         /* Index of current transformation. */
-
-  for (cur_trns = f_trns; cur_trns != temp_trns; )
-    {
-      int code;
-       
-      code = t_trns[cur_trns]->proc (t_trns[cur_trns], proc_aux->trns_case,
-                                     case_count + 1);
-      switch (code)
-       {
-       case -1:
-         /* Next transformation. */
-         cur_trns++;
-         break;
-       case -2:
-         /* Delete this case. */
-         goto done;
-       default:
-         /* Go to that transformation. */
-         cur_trns = code;
-         break;
-       }
-    }
-
-  if (n_lag)
-    lag_case (proc_aux->trns_case);
-         
-  /* Call the procedure if FILTER and PROCESS IF don't prohibit it. */
-  if (not_canceled && !exclude_this_case (proc_aux->trns_case, case_count + 1))
-    not_canceled = wc_data->proc_func (proc_aux->trns_case, wc_data->func_aux);
-  
-  case_count++;
-  
- done:
-  clear_case (proc_aux->trns_case);
-
-  return 1;
-}
 
-/* Write the given case to the active file. */
-void
-process_active_file_output_case (const struct ccase *c)
-{
-  vfm_sink->class->write (vfm_sink, c);
-}
-\f
 /* Creates and returns a case, initializing it from the vectors
    that say which `value's need to be initialized just once, and
    which ones need to be re-initialized before every case. */
@@ -313,159 +181,228 @@ create_trns_case (struct dictionary *dict)
     }
   return c;
 }
-\f
-/* Opening the active file. */
 
-/* It might be usefully noted that the following several functions are
-   given in the order that they are called by open_active_file(). */
-
-/* Prepare to write to the replacement active file. */
+/* Makes all preparations for reading from the data source and writing
+   to the data sink. */
 static void
-prepare_for_writing (void)
+open_active_file (void)
 {
+  /* Make temp_dict refer to the dictionary right before data
+     reaches the sink */
+  if (!temporary)
+    {
+      temp_trns = n_trns;
+      temp_dict = default_dict;
+    }
+
+  /* Figure out compaction. */
+  compaction_necessary = (dict_get_next_value_idx (temp_dict)
+                          != dict_get_compacted_value_cnt (temp_dict));
+
+  /* Prepare sink. */
   if (vfm_sink == NULL)
+    vfm_sink = create_case_sink (&storage_sink_class, temp_dict, NULL);
+  if (vfm_sink->class->open != NULL)
+    vfm_sink->class->open (vfm_sink);
+
+  /* Allocate memory for lag queue. */
+  if (n_lag > 0)
     {
-      if (workspace_overflow)
-        vfm_sink = create_case_sink (&disk_sink_class, NULL);
-      else
-        vfm_sink = create_case_sink (&memory_sink_class, NULL);
+      int i;
+  
+      lag_count = 0;
+      lag_head = 0;
+      lag_queue = xmalloc (n_lag * sizeof *lag_queue);
+      for (i = 0; i < n_lag; i++)
+        lag_queue[i] = xmalloc (dict_get_case_size (temp_dict));
     }
+
+  /* Close any unclosed DO IF or LOOP constructs. */
+  discard_ctl_stack ();
 }
 
-/* Arrange for compacting the output cases for storage. */
-static void
-arrange_compaction (void)
+/* Transforms trns_case and writes it to the replacement active
+   file if advisable.  Returns nonzero if more cases can be
+   accepted, zero otherwise.  Do not call this function again
+   after it has returned zero once.  */
+static int
+write_case (struct write_case_data *wc_data)
 {
-  int count_values = 0;
+  /* Execute permanent transformations.  */
+  if (!execute_transformations (wc_data->trns_case, t_trns, f_trns, temp_trns,
+                                wc_data->cases_written + 1))
+    goto done;
+
+  /* N OF CASES. */
+  if (dict_get_case_limit (default_dict)
+      && wc_data->cases_written >= dict_get_case_limit (default_dict))
+    goto done;
+  wc_data->cases_written++;
+
+  /* Write case to LAG queue. */
+  if (n_lag)
+    lag_case (wc_data->trns_case);
 
-  {
-    int i;
-    
-    /* Count up the number of `value's that will be output. */
-    for (i = 0; i < dict_get_var_cnt (temp_dict); i++) 
-      {
-        struct variable *v = dict_get_var (temp_dict, i);
-
-        if (dict_class_from_id (v->name) != DC_SCRATCH)
-          {
-            assert (v->nv > 0);
-            count_values += v->nv;
-          } 
-      }
-    assert (temporary == 2
-            || count_values <= dict_get_next_value_idx (temp_dict));
-  }
+  /* Write case to replacement active file. */
+  if (vfm_sink->class->write != NULL) 
+    {
+      if (compaction_necessary) 
+        {
+          compact_case (wc_data->sink_case, wc_data->trns_case);
+          vfm_sink->class->write (vfm_sink, wc_data->sink_case);
+        }
+      else
+        vfm_sink->class->write (vfm_sink, wc_data->trns_case);
+    }
   
-  /* Compaction is only necessary if the number of `value's to output
-     differs from the number already present. */
-  compaction_nval = count_values;
-  if (temporary == 2 || count_values != dict_get_next_value_idx (temp_dict))
-    compaction_necessary = 1;
-  else
-    compaction_necessary = 0;
+  /* Execute temporary transformations. */
+  if (!execute_transformations (wc_data->trns_case, t_trns, temp_trns, n_trns,
+                                wc_data->cases_written))
+    goto done;
   
-  if (vfm_sink->class->open != NULL)
-    vfm_sink->class->open (vfm_sink);
+  /* FILTER, PROCESS IF, post-TEMPORARY N OF CASES. */
+  if (filter_case (wc_data->trns_case, wc_data->cases_written)
+      || (dict_get_case_limit (temp_dict)
+          && wc_data->cases_analyzed >= dict_get_case_limit (temp_dict)))
+    goto done;
+  wc_data->cases_analyzed++;
 
-  if (compaction_necessary)
-    compaction_case = xmalloc (sizeof (struct ccase)
-                              + sizeof (union value) * (compaction_nval - 1));
+  /* Pass case to procedure. */
+  if (wc_data->proc_func != NULL)
+    wc_data->proc_func (wc_data->trns_case, wc_data->aux);
 
+ done:
+  clear_case (wc_data->trns_case);
+  return 1;
 }
 
-#if DEBUGGING
-/* Returns the name of the variable that owns the index CCASE_INDEX
-   into ccase. */
-static const char *
-index_to_varname (int ccase_index)
+/* Transforms case C using the transformations in TRNS[] with
+   indexes FIRST_IDX through LAST_IDX, exclusive.  Case C will
+   become case CASE_NUM (1-based) in the output file.  Returns
+   zero if the case was filtered out by one of the
+   transformations, nonzero otherwise. */
+static int
+execute_transformations (struct ccase *c,
+                         struct trns_header **trns,
+                         int first_idx, int last_idx,
+                         int case_num) 
 {
-  int i;
+  int idx;
 
-  for (i = 0; i < default_dict.nvar; i++)
+  for (idx = first_idx; idx != last_idx; )
     {
-      struct variable *v = default_dict.var[i];
-      
-      if (ccase_index >= v->fv && ccase_index < v->fv + v->nv)
-       return default_dict.var[i]->name;
+      int retval = trns[idx]->proc (trns[idx], c, case_num);
+      switch (retval)
+        {
+        case -1:
+          idx++;
+          break;
+          
+        case -2:
+          return 0;
+          
+        default:
+          idx = retval;
+          break;
+        }
     }
-  return _("<NOVAR>");
+
+  return 1;
+}
+
+/* Returns nonzero if case C with case number CASE_NUM should be
+   exclude as specified on FILTER or PROCESS IF, otherwise
+   zero. */
+static int
+filter_case (const struct ccase *c, int case_num)
+{
+  /* FILTER. */
+  struct variable *filter_var = dict_get_filter (default_dict);
+  if (filter_var != NULL) 
+    {
+      double f = c->data[filter_var->fv].f;
+      if (f == 0.0 || f == SYSMIS || is_num_user_missing (f, filter_var))
+        return 1;
+    }
+
+  /* PROCESS IF. */
+  if (process_if_expr != NULL
+      && expr_evaluate (process_if_expr, c, case_num, NULL) != 1.0)
+    return 1;
+
+  return 0;
+}
+
+/* Add C to the lag queue. */
+static void
+lag_case (const struct ccase *c)
+{
+  if (lag_count < n_lag)
+    lag_count++;
+  memcpy (lag_queue[lag_head], c, dict_get_case_size (temp_dict));
+  if (++lag_head >= n_lag)
+    lag_head = 0;
 }
-#endif
 
-/* Sets all the lag-related variables based on value of n_lag. */
+/* Copies case SRC to case DEST, compacting it in the process. */
 static void
-setup_lag (void)
+compact_case (struct ccase *dest, const struct ccase *src)
 {
   int i;
+  int nval = 0;
+  size_t var_cnt;
   
-  if (n_lag == 0)
-    return;
+  assert (compaction_necessary);
+
+  /* Copy all the variables except scratch variables from SRC to
+     DEST. */
+  var_cnt = dict_get_var_cnt (default_dict);
+  for (i = 0; i < var_cnt; i++)
+    {
+      struct variable *v = dict_get_var (default_dict, i);
+      
+      if (dict_class_from_id (v->name) == DC_SCRATCH)
+       continue;
 
-  lag_count = 0;
-  lag_head = 0;
-  lag_queue = xmalloc (n_lag * sizeof *lag_queue);
-  for (i = 0; i < n_lag; i++)
-    lag_queue[i] = xmalloc (dict_get_case_size (temp_dict));
+      if (v->type == NUMERIC)
+       dest->data[nval++] = src->data[v->fv];
+      else
+       {
+         int w = DIV_RND_UP (v->width, sizeof (union value));
+         
+         memcpy (&dest->data[nval], &src->data[v->fv], w * sizeof (union value));
+         nval += w;
+       }
+    }
 }
 
-/* There is a lot of potential confusion in the vfm and related
-   routines over the number of `value's at each stage of the process.
-   Here is each nval count, with explanation, as set up by
-   open_active_file():
-
-   temp_dict->nval: Number of `value's in the cases after the
-   transformations leading up to TEMPORARY have been performed.
-
-   compaction_nval: Number of `value's in the cases after the
-   transformations leading up to TEMPORARY have been performed
-   and the case has been compacted by compact_case(), if
-   compaction is necessary.  This the number of `value's in the
-   cases saved by the sink stream.  (However, note that the cases
-   passed to the sink stream have not yet been compacted.  It is
-   the responsibility of the data sink to call compact_case().)
-   `compaction' becomes the new value of default_dict.nval after
-   the procedure is completed.
-
-   default_dict.nval: This is often an alias for temp_dict->nval.
-   As such it can really have no separate existence until the
-   procedure is complete.  For this reason it should *not* be
-   referenced inside the execution of a procedure. */
-/* Makes all preparations for reading from the data source and writing
-   to the data sink. */
+/* Clears the variables in C that need to be cleared between
+   processing cases.  */
 static void
-open_active_file (void)
+clear_case (struct ccase *c)
 {
-  /* Sometimes we want to refer to the dictionary that applies to the
-     data actually written to the sink.  This is either temp_dict or
-     default_dict.  However, if TEMPORARY is not on, then temp_dict
-     does not apply.  So, we can set temp_dict to default_dict in this
-     case. */
-  if (!temporary)
+  size_t var_cnt = dict_get_var_cnt (default_dict);
+  size_t i;
+  
+  for (i = 0; i < var_cnt; i++) 
     {
-      temp_trns = n_trns;
-      temp_dict = default_dict;
+      struct variable *v = dict_get_var (default_dict, i);
+      if (v->init && v->reinit) 
+        {
+          if (v->type == NUMERIC) 
+            c->data[v->fv].f = SYSMIS;
+          else
+            memset (c->data[v->fv].s, ' ', v->width);
+        } 
     }
-
-  /* No cases passed to the procedure yet. */
-  case_count = 0;
-
-  /* The rest. */
-  prepare_for_writing ();
-  arrange_compaction ();
-  discard_ctl_stack ();
-  setup_lag ();
 }
-\f
+
 /* Closes the active file. */
 static void
-close_active_file (struct write_case_data *data)
+close_active_file (void)
 {
-  /* Close the current case group. */
-  if (case_count && data->end_func != NULL)
-    data->end_func (data->func_aux);
-
-  /* Stop lagging (catch up?). */
-  if (n_lag)
+  /* Free memory for lag queue, and turn off lagging. */
+  if (n_lag > 0)
     {
       int i;
       
@@ -475,8 +412,7 @@ close_active_file (struct write_case_data *data)
       n_lag = 0;
     }
   
-  /* Assume the dictionary from right before TEMPORARY, if any.  Turn
-     off TEMPORARY. */
+  /* Dictionary from before TEMPORARY becomes permanent.. */
   if (temporary)
     {
       dict_destroy (default_dict);
@@ -486,9 +422,9 @@ close_active_file (struct write_case_data *data)
 
   /* Finish compaction. */
   if (compaction_necessary)
-    finish_compaction ();
+    dict_compact_values (default_dict);
     
-  /* Old data sink --> New data source. */
+  /* Free data source. */
   if (vfm_source != NULL) 
     {
       if (vfm_source->class->destroy != NULL)
@@ -496,240 +432,156 @@ close_active_file (struct write_case_data *data)
       free (vfm_source);
     }
 
+  /* Old data sink becomes new data source. */
   if (vfm_sink->class->make_source != NULL)
     vfm_source = vfm_sink->class->make_source (vfm_sink);
   else
     vfm_source = NULL;
-
-  /* Old data sink is gone now. */
-  free (vfm_sink);
+  free_case_sink (vfm_sink);
   vfm_sink = NULL;
 
-  /* Cancel TEMPORARY. */
+  /* Cancel TEMPORARY, PROCESS IF, FILTER, N OF CASES, vectors,
+     and get rid of all the transformations. */
   cancel_temporary ();
-
-  /* Free temporary cases. */
-  free (compaction_case);
-  compaction_case = NULL;
-
-  /* Cancel PROCESS IF. */
   expr_free (process_if_expr);
   process_if_expr = NULL;
-
-  /* Cancel FILTER if temporary. */
   if (dict_get_filter (default_dict) != NULL && !FILTER_before_TEMPORARY)
     dict_set_filter (default_dict, NULL);
-
-  /* Cancel transformations. */
-  cancel_transformations ();
-
-  /* Turn off case limiter. */
   dict_set_case_limit (default_dict, 0);
-
-  /* Clear VECTOR vectors. */
   dict_clear_vectors (default_dict);
+  cancel_transformations ();
 }
 \f
-/* Disk case stream. */
+/* Storage case stream. */
 
-/* Information about disk sink or source. */
-struct disk_stream_info 
+/* Information about storage sink or source. */
+struct storage_stream_info 
   {
-    FILE *file;                 /* Output file. */
-    size_t case_cnt;            /* Number of cases written so far. */
+    size_t case_cnt;            /* Number of cases. */
     size_t case_size;           /* Number of bytes in case. */
+    enum { DISK, MEMORY } mode; /* Where is data stored? */
+
+    /* Disk storage.  */
+    FILE *file;                 /* Data file. */
+
+    /* Memory storage. */
+    int max_cases;              /* Maximum cases before switching to disk. */
+    struct case_list *head;     /* First case in list. */
+    struct case_list *tail;     /* Last case in list. */
   };
 
-/* Initializes the disk sink. */
+static void open_storage_file (struct storage_stream_info *info);
+
+/* Initializes a storage sink. */
 static void
-disk_sink_create (struct case_sink *sink)
+storage_sink_open (struct case_sink *sink)
 {
-  struct disk_stream_info *info = xmalloc (sizeof *info);
-  info->file = tmpfile ();
+  struct storage_stream_info *info;
+
+  sink->aux = info = xmalloc (sizeof *info);
   info->case_cnt = 0;
-  info->case_size = compaction_nval;
-  sink->aux = info;
-  if (info->file == NULL)
+  info->case_size = sink->value_cnt * sizeof (union value);
+  info->file = NULL;
+  info->max_cases = 0;
+  info->head = info->tail = NULL;
+  if (workspace_overflow) 
     {
-      msg (ME, _("An error occurred attempting to create a temporary "
-                "file for use as the active file: %s."),
-          strerror (errno));
-      err_failure ();
+      info->mode = DISK;
+      open_storage_file (info);
+    }
+  else 
+    {
+      info->mode = MEMORY; 
+      info->max_cases = (set_max_workspace
+                         / (sizeof (struct case_list) + info->case_size));
     }
 }
 
-/* Writes case C to the disk sink. */
+/* Creates a new temporary file and puts it into INFO. */
 static void
-disk_sink_write (struct case_sink *sink, const struct ccase *c)
+open_storage_file (struct storage_stream_info *info) 
 {
-  struct disk_stream_info *info = sink->aux;
-  const union value *src_case;
-
-  if (compaction_necessary)
+  info->file = tmpfile ();
+  if (info->file == NULL)
     {
-      compact_case (compaction_case, c);
-      src_case = compaction_case->data;
+      msg (ME, _("An error occurred creating a temporary "
+                 "file for use as the active file: %s."),
+           strerror (errno));
+      err_failure ();
     }
-  else src_case = c->data;
+}
 
-  info->case_cnt++;
-  if (fwrite (src_case, sizeof *src_case * compaction_nval, 1,
-              info->file) != 1)
+/* Writes the VALUE_CNT values in VALUES to FILE. */
+static void
+write_storage_file (FILE *file, const union value *values, size_t value_cnt) 
+{
+  if (fwrite (values, sizeof *values * value_cnt, 1, file) != 1)
     {
-      msg (ME, _("An error occurred while attempting to write to a "
+      msg (ME, _("An error occurred writing to a "
                 "temporary file used as the active file: %s."),
           strerror (errno));
       err_failure ();
     }
 }
 
-/* Destroys the sink's internal data. */
+/* If INFO represents records in memory, moves them to disk.
+   Each comprises VALUE_CNT `union value's. */
 static void
-disk_sink_destroy (struct case_sink *sink)
+storage_to_disk (struct storage_stream_info *info, size_t value_cnt) 
 {
-  struct disk_stream_info *info = sink->aux;
-  if (info->file != NULL)
-    fclose (info->file);
+  struct case_list *cur, *next;
+
+  if (info->mode == MEMORY) 
+    {
+      info->mode = DISK;
+      open_storage_file (info);
+      for (cur = info->head; cur; cur = next)
+        {
+          next = cur->next;
+          write_storage_file (info->file, cur->c.data, value_cnt);
+          free (cur);
+        }
+      info->head = info->tail = NULL; 
+    }
 }
 
-/* Closes and destroys the sink and returns a disk source to read
-   back the written data. */
-static struct case_source *
-disk_sink_make_source (struct case_sink *sink) 
+/* Destroys storage stream represented by INFO. */
+static void
+destroy_storage_stream_info (struct storage_stream_info *info) 
 {
-  struct disk_stream_info *info = sink->aux;
-    
-  /* Rewind the file. */
-  assert (info->file != NULL);
-  if (fseek (info->file, 0, SEEK_SET) != 0)
+  if (info->mode == DISK) 
     {
-      msg (ME, _("An error occurred while attempting to rewind a "
-                "temporary file used as the active file: %s."),
-          strerror (errno));
-      err_failure ();
+      if (info->file != NULL)
+        fclose (info->file); 
     }
+  else 
+    {
+      struct case_list *cur, *next;
   
-  return create_case_source (&disk_source_class, default_dict, info);
-}
-
-/* Disk sink. */
-const struct case_sink_class disk_sink_class = 
-  {
-    "disk",
-    disk_sink_create,
-    disk_sink_write,
-    disk_sink_destroy,
-    disk_sink_make_source,
-  };
-\f
-/* Disk source. */
-
-/* Returns the number of cases that will be read by
-   disk_source_read(). */
-static int
-disk_source_count (const struct case_source *source) 
-{
-  struct disk_stream_info *info = source->aux;
-
-  return info->case_cnt;
-}
-
-/* Reads all cases from the disk source and passes them one by one to
-   write_case(). */
-static void
-disk_source_read (struct case_source *source,
-                  struct ccase *c,
-                  write_case_func *write_case, write_case_data wc_data)
-{
-  struct disk_stream_info *info = source->aux;
-  int i;
-
-  for (i = 0; i < info->case_cnt; i++)
-    {
-      if (!fread (c, info->case_size, 1, info->file))
-       {
-         msg (ME, _("An error occurred while attempting to read from "
-              "a temporary file created for the active file: %s."),
-              strerror (errno));
-         err_failure ();
-         break;
-       }
-
-      if (!write_case (wc_data))
-       break;
+      for (cur = info->head; cur; cur = next)
+        {
+          next = cur->next;
+          free (cur);
+        }
     }
+  free (info); 
 }
 
-/* Destroys the source's internal data. */
-static void
-disk_source_destroy (struct case_source *source)
-{
-  struct disk_stream_info *info = source->aux;
-  if (info->file != NULL)
-    fclose (info->file);
-  free (info);
-}
-
-/* Disk source. */
-const struct case_source_class disk_source_class = 
-  {
-    "disk",
-    disk_source_count,
-    disk_source_read,
-    disk_source_destroy,
-  };
-\f
-/* Memory case stream. */
-
-/* Memory sink data. */
-struct memory_sink_info
-  {
-    size_t case_cnt;            /* Number of cases. */
-    size_t case_size;           /* Case size in bytes. */
-    int max_cases;              /* Maximum cases before switching to disk. */
-    struct case_list *head;     /* First case in list. */
-    struct case_list *tail;     /* Last case in list. */
-  };
-
-/* Memory source data. */
-struct memory_source_info 
-  {
-    size_t case_cnt;            /* Number of cases. */
-    size_t case_size;           /* Case size in bytes. */
-    struct case_list *cases;    /* List of cases. */
-  };
-
-/* Creates the SINK memory sink. */
-static void
-memory_sink_create (struct case_sink *sink) 
-{
-  struct memory_sink_info *info;
-  
-  sink->aux = info = xmalloc (sizeof *info);
-
-  assert (compaction_nval > 0);
-  info->case_cnt = 0;
-  info->case_size = compaction_nval * sizeof (union value);
-  info->max_cases = set_max_workspace / info->case_size;
-  info->head = info->tail = NULL;
-}
-
-/* Writes case C to memory sink SINK. */
+/* Writes case C to the storage sink SINK. */
 static void
-memory_sink_write (struct case_sink *sink, const struct ccase *c) 
+storage_sink_write (struct case_sink *sink, const struct ccase *c)
 {
-  struct memory_sink_info *info = sink->aux;
-  size_t case_size;
-  struct case_list *new_case;
+  struct storage_stream_info *info = sink->aux;
 
-  case_size = sizeof (struct case_list)
-                      + ((compaction_nval - 1) * sizeof (union value));
-  new_case = malloc (case_size);
-
-  /* If we've got memory to spare then add it to the linked list. */
-  if (info->case_cnt <= info->max_cases && new_case != NULL)
+  info->case_cnt++;
+  if (info->mode == MEMORY) 
     {
-      info->case_cnt++;
+      struct case_list *new_case;
+
+      /* Copy case. */
+      new_case = xmalloc (sizeof (struct case_list)
+                          + ((sink->value_cnt - 1) * sizeof (union value)));
+      memcpy (&new_case->c, c, sizeof (union value) * sink->value_cnt);
 
       /* Append case to linked list. */
       new_case->next = NULL;
@@ -739,221 +591,188 @@ memory_sink_write (struct case_sink *sink, const struct ccase *c)
         info->head = new_case;
       info->tail = new_case;
 
-      /* Copy data into case. */
-      if (compaction_necessary)
-       compact_case (&new_case->c, c);
-      else
-       memcpy (&new_case->c, c, sizeof (union value) * compaction_nval);
-    }
-  else
-    {
-      /* Out of memory.  Write the active file to disk. */
-      struct case_list *cur, *next;
-
-      /* Notify the user. */
-      if (!new_case)
-       msg (MW, _("Virtual memory exhausted.  Writing active file "
-                  "to disk."));
-      else
-       msg (MW, _("Workspace limit of %d KB (%d cases at %d bytes each) "
-                  "overflowed.  Writing active file to disk."),
-            set_max_workspace / 1024, info->max_cases,
-            compaction_nval * sizeof (union value));
-
-      free (new_case);
-
-      /* Switch to a disk sink. */
-      vfm_sink = create_case_sink (&disk_sink_class, NULL);
-      vfm_sink->class->open (vfm_sink);
-      workspace_overflow = 1;
-
-      /* Write the cases to disk and destroy them.  We can't call
-         vfm->sink->write() because of compaction. */
-      for (cur = info->head; cur; cur = next)
-       {
-         next = cur->next;
-         if (fwrite (cur->c.data, sizeof (union value) * compaction_nval, 1,
-                     vfm_sink->aux) != 1)
-           {
-             msg (ME, _("An error occurred while attempting to "
-                        "write to a temporary file created as the "
-                        "active file: %s."),
-                  strerror (errno));
-             err_failure ();
-           }
-         free (cur);
-       }
-
-      /* Write the current case to disk. */
-      vfm_sink->class->write (vfm_sink, c);
-    }
-}
-
-/* If the data is stored in memory, causes it to be written to disk.
-   To be called only *between* procedure()s, not within them. */
-void
-write_active_file_to_disk (void)
-{
-  if (case_source_is_class (vfm_source, &memory_source_class))
-    {
-      struct memory_source_info *info = vfm_source->aux;
+      /* Dump all the cases to disk if we've run out of
+         workspace. */
+      if (info->case_cnt > info->max_cases) 
+        {
+          workspace_overflow = 1;
+          msg (MW, _("Workspace limit of %d KB (%d cases at %d bytes each) "
+                     "overflowed.  Writing active file to disk."),
+               set_max_workspace / 1024, info->max_cases,
+               sizeof (struct case_list) + info->case_size);
 
-      /* Switch to a disk sink. */
-      vfm_sink = create_case_sink (&disk_sink_class, NULL);
-      vfm_sink->class->open (vfm_sink);
-      workspace_overflow = 1;
-      
-      /* Write the cases to disk and destroy them.  We can't call
-         vfm->sink->write() because of compaction. */
-      {
-       struct case_list *cur, *next;
-       
-       for (cur = info->cases; cur; cur = next)
-         {
-           next = cur->next;
-           if (fwrite (cur->c.data, sizeof *cur->c.data * compaction_nval, 1,
-                       vfm_sink->aux) != 1)
-             {
-               msg (ME, _("An error occurred while attempting to "
-                          "write to a temporary file created as the "
-                          "active file: %s."),
-                    strerror (errno));
-               err_failure ();
-             }
-           free (cur);
-         }
-      }
-      
-      vfm_source = vfm_sink->class->make_source (vfm_sink);
-      vfm_sink = NULL;
+          storage_to_disk (info, sink->value_cnt);
+        }
     }
+  else 
+    write_storage_file (info->file, c->data, sink->value_cnt);
 }
 
-/* Destroy all memory sink data. */
+/* Destroys internal data in SINK. */
 static void
-memory_sink_destroy (struct case_sink *sink)
+storage_sink_destroy (struct case_sink *sink)
 {
-  struct memory_sink_info *info = sink->aux;
-  struct case_list *cur, *next;
-  
-  for (cur = info->head; cur; cur = next)
-    {
-      next = cur->next;
-      free (cur);
-    }
-  free (info);
+  destroy_storage_stream_info (sink->aux);
 }
 
-/* Switch the memory stream from sink to source mode. */
+/* Closes and destroys the sink and returns a storage source to
+   read back the written data. */
 static struct case_source *
-memory_sink_make_source (struct case_sink *sink)
+storage_sink_make_source (struct case_sink *sink) 
 {
-  struct memory_sink_info *sink_info = sink->aux;
-  struct memory_source_info *source_info;
-
-  source_info = xmalloc (sizeof *source_info);
-  source_info->case_cnt = sink_info->case_cnt;
-  source_info->case_size = sink_info->case_size;
-  source_info->cases = sink_info->head;
+  struct storage_stream_info *info = sink->aux;
 
-  free (sink_info);
+  if (info->mode == DISK) 
+    {
+      /* Rewind the file. */
+      assert (info->file != NULL);
+      if (fseek (info->file, 0, SEEK_SET) != 0)
+        {
+          msg (ME, _("An error occurred while attempting to rewind a "
+                     "temporary file used as the active file: %s."),
+               strerror (errno));
+          err_failure ();
+        }
+    }
 
-  return create_case_source (&memory_source_class,
-                             default_dict, source_info);
+  return create_case_source (&storage_source_class, sink->dict, info); 
 }
 
-const struct case_sink_class memory_sink_class = 
+/* Storage sink. */
+const struct case_sink_class storage_sink_class = 
   {
-    "memory",
-    memory_sink_create,
-    memory_sink_write,
-    memory_sink_destroy,
-    memory_sink_make_source,
+    "storage",
+    storage_sink_open,
+    storage_sink_write,
+    storage_sink_destroy,
+    storage_sink_make_source,
   };
+\f
+/* Storage source. */
 
-/* Returns the number of cases in the source. */
+/* Returns the number of cases that will be read by
+   storage_source_read(). */
 static int
-memory_source_count (const struct case_source *source) 
+storage_source_count (const struct case_source *source) 
 {
-  struct memory_source_info *info = source->aux;
+  struct storage_stream_info *info = source->aux;
 
   return info->case_cnt;
 }
 
-/* Reads the case stream from memory and passes it to write_case(). */
+/* Reads all cases from the storage source and passes them one by one to
+   write_case(). */
 static void
-memory_source_read (struct case_source *source,
-                    struct ccase *c,
-                    write_case_func *write_case, write_case_data wc_data)
+storage_source_read (struct case_source *source,
+                     struct ccase *c,
+                     write_case_func *write_case, write_case_data wc_data)
 {
-  struct memory_source_info *info = source->aux;
+  struct storage_stream_info *info = source->aux;
 
-  while (info->cases != NULL
+  if (info->mode == DISK
     {
-      struct case_list *iter = info->cases;
-      memcpy (c, &iter->c, info->case_size);
-      if (!write_case (wc_data)) 
-        break;
+      int i;
+
+      for (i = 0; i < info->case_cnt; i++)
+        {
+          if (!fread (c, info->case_size, 1, info->file))
+            {
+              msg (ME, _("An error occurred while attempting to read from "
+                         "a temporary file created for the active file: %s."),
+                   strerror (errno));
+              err_failure ();
+              break;
+            }
+
+          if (!write_case (wc_data))
+            break;
+        }
+    }
+  else 
+    {
+      while (info->head != NULL) 
+        {
+          struct case_list *iter = info->head;
+          memcpy (c, &iter->c, info->case_size);
+          if (!write_case (wc_data)) 
+            break;
             
-      info->cases = iter->next;
-      free (iter);
+          info->head = iter->next;
+          free (iter);
+        }
+      info->tail = NULL;
     }
 }
 
-/* Destroy all memory source data. */
+/* Destroys the source's internal data. */
 static void
-memory_source_destroy (struct case_source *source)
+storage_source_destroy (struct case_source *source)
 {
-  struct memory_source_info *info = source->aux;
-  struct case_list *cur, *next;
-  
-  for (cur = info->cases; cur; cur = next)
-    {
-      next = cur->next;
-      free (cur);
-    }
-  free (info);
+  destroy_storage_stream_info (source->aux);
 }
 
-/* Returns the list of cases in memory source SOURCE. */
+/* Storage source. */
+const struct case_source_class storage_source_class = 
+  {
+    "storage",
+    storage_source_count,
+    storage_source_read,
+    storage_source_destroy,
+  };
+
+/* Returns nonzero only if SOURCE is stored on disk (instead of
+   in memory). */
+int
+storage_source_on_disk (const struct case_source *source) 
+{
+  struct storage_stream_info *info = source->aux;
+
+  return info->mode == DISK;
+}
+
+/* Returns the list of cases in storage source SOURCE. */
 struct case_list *
-memory_source_get_cases (const struct case_source *source) 
+storage_source_get_cases (const struct case_source *source) 
 {
-  struct memory_source_info *info = source->aux;
+  struct storage_stream_info *info = source->aux;
 
-  return info->cases;
+  assert (info->mode == MEMORY);
+  return info->head;
 }
 
 /* Sets the list of cases in memory source SOURCE to CASES. */
 void
-memory_source_set_cases (const struct case_source *source,
-                         struct case_list *cases) 
+storage_source_set_cases (const struct case_source *source,
+                          struct case_list *cases) 
 {
-  struct memory_source_info *info = source->aux;
+  struct storage_stream_info *info = source->aux;
 
-  info->cases = cases;
+  assert (info->mode == MEMORY);
+  info->head = cases;
 }
 
-/* Memory stream. */
-const struct case_source_class memory_source_class = 
-  {
-    "memory",
-    memory_source_count,
-    memory_source_read,
-    memory_source_destroy,
-  };
-\f
-/* Add C to the lag queue. */
-static void
-lag_case (const struct ccase *c)
+/* If SOURCE has its cases in memory, writes them to disk. */
+void
+storage_source_to_disk (struct case_source *source) 
 {
-  if (lag_count < n_lag)
-    lag_count++;
-  memcpy (lag_queue[lag_head], c, dict_get_case_size (temp_dict));
-  if (++lag_head >= n_lag)
-    lag_head = 0;
+  struct storage_stream_info *info = source->aux;
+
+  storage_to_disk (info, source->value_cnt);
 }
+\f
+/* Null sink.  Used by a few procedures that keep track of output
+   themselves and would throw away anything that the sink
+   contained anyway. */
 
+const struct case_sink_class null_sink_class = 
+  {
+    "null",
+    NULL,
+    NULL,
+    NULL,
+    NULL,
+  };
+\f
 /* Returns a pointer to the lagged case from N_BEFORE cases before the
    current one, or NULL if there haven't been that many cases yet. */
 struct ccase *
@@ -971,132 +790,6 @@ lagged_case (int n_before)
   }
 }
    
-/* Transforms trns_case and writes it to the replacement active
-   file if advisable.  Returns nonzero if more cases can be
-   accepted, zero otherwise.  Do not call this function again
-   after it has returned zero once.  */
-int
-procedure_write_case (write_case_data wc_data)
-{
-  struct procedure_aux_data *proc_aux = wc_data->aux;
-
-  /* Index of current transformation. */
-  int cur_trns;
-
-  /* Return value: whether it's reasonable to write any more cases. */
-  int more_cases = 1;
-
-  cur_trns = f_trns;
-  for (;;)
-    {
-      /* Output the case if this is temp_trns. */
-      if (cur_trns == temp_trns)
-       {
-          int case_limit;
-
-         if (n_lag)
-           lag_case (proc_aux->trns_case);
-         
-         vfm_sink->class->write (vfm_sink, proc_aux->trns_case);
-
-          proc_aux->cases_written++;
-          case_limit = dict_get_case_limit (default_dict);
-         if (case_limit != 0 && proc_aux->cases_written >= case_limit)
-            more_cases = 0;
-       }
-
-      /* Are we done? */
-      if (cur_trns >= n_trns)
-       break;
-      
-      /* Decide which transformation should come next. */
-      {
-       int code;
-       
-       code = t_trns[cur_trns]->proc (t_trns[cur_trns], proc_aux->trns_case,
-                                       proc_aux->cases_written + 1);
-       switch (code)
-         {
-         case -1:
-           /* Next transformation. */
-           cur_trns++;
-           break;
-         case -2:
-           /* Delete this case. */
-           goto done;
-         default:
-           /* Go to that transformation. */
-           cur_trns = code;
-           break;
-         }
-      }
-    }
-
-  /* Call the beginning of group function. */
-  if (!case_count && wc_data->begin_func != NULL)
-    wc_data->begin_func (wc_data->func_aux);
-
-  /* Call the procedure if there is one and FILTER and PROCESS IF
-     don't prohibit it. */
-  if (wc_data->proc_func != NULL
-      && !exclude_this_case (proc_aux->trns_case, proc_aux->cases_written + 1))
-    wc_data->proc_func (proc_aux->trns_case, wc_data->func_aux);
-
-  case_count++;
-  
-done:
-  clear_case (proc_aux->trns_case);
-  
-  /* Return previously determined value. */
-  return more_cases;
-}
-
-/* Clears the variables in C that need to be cleared between
-   processing cases.  */
-static void
-clear_case (struct ccase *c)
-{
-  /* FIXME?  This is linear in the number of variables, but
-     doesn't need to be, so it's an easy optimization target. */
-  size_t var_cnt = dict_get_var_cnt (default_dict);
-  size_t i;
-  
-  for (i = 0; i < var_cnt; i++) 
-    {
-      struct variable *v = dict_get_var (default_dict, i);
-      if (v->init && v->reinit) 
-        {
-          if (v->type == NUMERIC) 
-            c->data[v->fv].f = SYSMIS;
-          else
-            memset (c->data[v->fv].s, ' ', v->width);
-        } 
-    }
-}
-
-/* Returns nonzero if case C with case number CASE_NUM should be
-   exclude as specified on FILTER or PROCESS IF, otherwise
-   zero. */
-static int
-exclude_this_case (const struct ccase *c, int case_num)
-{
-  /* FILTER. */
-  struct variable *filter_var = dict_get_filter (default_dict);
-  if (filter_var != NULL) 
-    {
-      double f = c->data[filter_var->fv].f;
-      if (f == 0.0 || f == SYSMIS || is_num_user_missing (f, filter_var))
-        return 1;
-    }
-
-  /* PROCESS IF. */
-  if (process_if_expr != NULL
-      && expr_evaluate (process_if_expr, c, case_num, NULL) != 1.0)
-    return 1;
-
-  return 0;
-}
-
 /* Appends TRNS to t_trns[], the list of all transformations to be
    performed on data as it is read from the active file. */
 void
@@ -1130,74 +823,150 @@ cancel_transformations (void)
       m_trns = 0;
     }
 }
+\f
+/* Creates a case source with class CLASS and auxiliary data AUX
+   and based on dictionary DICT. */
+struct case_source *
+create_case_source (const struct case_source_class *class,
+                    const struct dictionary *dict,
+                    void *aux) 
+{
+  struct case_source *source = xmalloc (sizeof *source);
+  source->class = class;
+  source->value_cnt = dict_get_next_value_idx (dict);
+  source->aux = aux;
+  return source;
+}
 
-/* Dumps out the values of all the split variables for the case C. */
-static void
-dump_splits (struct ccase *c)
+/* Returns nonzero if a case source is "complex". */
+int
+case_source_is_complex (const struct case_source *source) 
 {
-  struct variable *const *split;
-  struct tab_table *t;
-  size_t split_cnt;
-  int i;
+  return source != NULL && (source->class == &input_program_source_class
+                            || source->class == &file_type_source_class);
+}
 
-  split_cnt = dict_get_split_cnt (default_dict);
-  t = tab_create (3, split_cnt + 1, 0);
-  tab_dim (t, tab_natural_dimensions);
-  tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, split_cnt);
-  tab_vline (t, TAL_1 | TAL_SPACING, 2, 0, split_cnt);
-  tab_text (t, 0, 0, TAB_NONE, _("Variable"));
-  tab_text (t, 1, 0, TAB_LEFT, _("Value"));
-  tab_text (t, 2, 0, TAB_LEFT, _("Label"));
-  split = dict_get_split_vars (default_dict);
-  for (i = 0; i < split_cnt; i++)
-    {
-      struct variable *v = split[i];
-      char temp_buf[80];
-      const char *val_lab;
+/* Returns nonzero if CLASS is the class of SOURCE. */
+int
+case_source_is_class (const struct case_source *source,
+                      const struct case_source_class *class) 
+{
+  return source != NULL && source->class == class;
+}
 
-      assert (v->type == NUMERIC || v->type == ALPHA);
-      tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
-      
-      data_out (temp_buf, &v->print, &c->data[v->fv]);
-      
-      temp_buf[v->print.w] = 0;
-      tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
+/* Creates a case sink with class CLASS and auxiliary data
+   AUX. */
+struct case_sink *
+create_case_sink (const struct case_sink_class *class,
+                  const struct dictionary *dict,
+                  void *aux) 
+{
+  struct case_sink *sink = xmalloc (sizeof *sink);
+  sink->class = class;
+  sink->dict = dict;
+  sink->idx_to_fv = dict_get_compacted_idx_to_fv (dict);
+  sink->value_cnt = dict_get_compacted_value_cnt (dict);
+  sink->aux = aux;
+  return sink;
+}
 
-      val_lab = val_labs_find (v->val_labs, c->data[v->fv]);
-      if (val_lab)
-       tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
-    }
-  tab_flags (t, SOMF_NO_TITLE);
-  tab_submit (t);
+/* Destroys case sink SINK.  It is the caller's responsible to
+   call the sink's destroy function, if any. */
+void
+free_case_sink (struct case_sink *sink) 
+{
+  free (sink->idx_to_fv);
+  free (sink);
+}
+\f
+/* Represents auxiliary data for handling SPLIT FILE. */
+struct split_aux_data 
+  {
+    size_t case_count;          /* Number of cases so far. */
+    struct ccase *prev_case;    /* Data in previous case. */
+
+    /* Functions to call... */
+    void (*begin_func) (void *);               /* ...before data. */
+    int (*proc_func) (struct ccase *, void *); /* ...with data. */
+    void (*end_func) (void *);                 /* ...after data. */
+    void *func_aux;                            /* Auxiliary data. */ 
+  };
+
+static int equal_splits (const struct ccase *, const struct ccase *);
+static int procedure_with_splits_callback (struct ccase *, void *);
+static void dump_splits (struct ccase *);
+
+/* Like procedure(), but it automatically breaks the case stream
+   into SPLIT FILE break groups.  Before each group of cases with
+   identical SPLIT FILE variable values, BEGIN_FUNC is called.
+   Then PROC_FUNC is called with each case in the group.  
+   END_FUNC is called when the group is finished.  FUNC_AUX is
+   passed to each of the functions as auxiliary data.
+
+   If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
+   and END_FUNC will be called at all. 
+
+   If SPLIT FILE is not in effect, then there is one break group
+   (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
+   will be called once. */
+void
+procedure_with_splits (void (*begin_func) (void *aux),
+                       int (*proc_func) (struct ccase *, void *aux),
+                       void (*end_func) (void *aux),
+                       void *func_aux) 
+{
+  struct split_aux_data split_aux;
+
+  split_aux.case_count = 0;
+  split_aux.prev_case = xmalloc (dict_get_case_size (default_dict));
+  split_aux.begin_func = begin_func;
+  split_aux.proc_func = proc_func;
+  split_aux.end_func = end_func;
+  split_aux.func_aux = func_aux;
+
+  procedure (procedure_with_splits_callback, &split_aux);
+
+  if (split_aux.case_count > 0 && end_func != NULL)
+    end_func (func_aux);
+  free (split_aux.prev_case);
 }
 
-/* This proc_func is substituted for the user-supplied proc_func when
-   SPLIT FILE is active.  This function forms a wrapper around that
-   proc_func by dividing the input into series. */
+/* procedure() callback used by procedure_with_splits(). */
 static int
-SPLIT_FILE_proc_func (struct ccase *c, void *data_)
+procedure_with_splits_callback (struct ccase *c, void *split_aux_) 
 {
-  struct write_case_data *data = data_;
-  struct split_aux_data *split_aux = data->aux;
-  struct variable *const *split;
-  size_t split_cnt;
-  size_t i;
+  struct split_aux_data *split_aux = split_aux_;
 
-  /* The first case always begins a new series.  We also need to
-     preserve the values of the case for later comparison. */
-  if (case_count == 0)
+  /* Start a new series if needed. */
+  if (split_aux->case_count == 0
+      || !equal_splits (c, split_aux->prev_case))
     {
-      memcpy (split_aux->prev_case, c, dict_get_case_size (default_dict));
+      if (split_aux->case_count > 0 && split_aux->end_func != NULL)
+        split_aux->end_func (split_aux->func_aux);
 
       dump_splits (c);
-      if (data->begin_func != NULL)
-       data->begin_func (data->func_aux);
-      
-      return data->proc_func (c, data->func_aux);
+      memcpy (split_aux->prev_case, c, dict_get_case_size (default_dict));
+
+      if (split_aux->begin_func != NULL)
+       split_aux->begin_func (split_aux->func_aux);
     }
 
-  /* Compare the value of each SPLIT FILE variable to the values on
-     the previous case. */
+  split_aux->case_count++;
+  if (split_aux->proc_func != NULL)
+    return split_aux->proc_func (c, split_aux->func_aux);
+  else
+    return 1;
+}
+
+/* Compares the SPLIT FILE variables in cases A and B and returns
+   nonzero only if they differ. */
+static int
+equal_splits (const struct ccase *a, const struct ccase *b) 
+{
+  struct variable *const *split;
+  size_t split_cnt;
+  size_t i;
+    
   split = dict_get_split_vars (default_dict);
   split_cnt = dict_get_split_cnt (default_dict);
   for (i = 0; i < split_cnt; i++)
@@ -1207,129 +976,60 @@ SPLIT_FILE_proc_func (struct ccase *c, void *data_)
       switch (v->type)
        {
        case NUMERIC:
-         if (c->data[v->fv].f != split_aux->prev_case->data[v->fv].f)
-           goto not_equal;
+         if (a->data[v->fv].f != b->data[v->fv].f)
+            return 0;
          break;
        case ALPHA:
-         if (memcmp (c->data[v->fv].s,
-                      split_aux->prev_case->data[v->fv].s, v->width))
-           goto not_equal;
+         if (memcmp (a->data[v->fv].s, b->data[v->fv].s, v->width))
+            return 0;
          break;
        default:
          assert (0);
        }
     }
-  return data->proc_func (c, data->func_aux);
-  
-not_equal:
-  /* The values of the SPLIT FILE variable are different from the
-     values on the previous case.  That means that it's time to begin
-     a new series. */
-  if (data->end_func != NULL)
-    data->end_func (data->func_aux);
-  dump_splits (c);
-  if (data->begin_func != NULL)
-    data->begin_func (data->func_aux);
-  memcpy (split_aux->prev_case, c, dict_get_case_size (default_dict));
-  return data->proc_func (c, data->func_aux);
-}
-\f
-/* Case compaction. */
-
-/* Copies case SRC to case DEST, compacting it in the process. */
-void
-compact_case (struct ccase *dest, const struct ccase *src)
-{
-  int i;
-  int nval = 0;
-  size_t var_cnt;
-  
-  assert (compaction_necessary);
-
-  if (temporary == 2)
-    {
-      if (dest != compaction_case)
-       memcpy (dest, compaction_case, sizeof (union value) * compaction_nval);
-      return;
-    }
 
-  /* Copy all the variables except the scratch variables from SRC to
-     DEST. */
-  var_cnt = dict_get_var_cnt (default_dict);
-  for (i = 0; i < var_cnt; i++)
-    {
-      struct variable *v = dict_get_var (default_dict, i);
-      
-      if (dict_class_from_id (v->name) == DC_SCRATCH)
-       continue;
-
-      if (v->type == NUMERIC)
-       dest->data[nval++] = src->data[v->fv];
-      else
-       {
-         int w = DIV_RND_UP (v->width, sizeof (union value));
-         
-         memcpy (&dest->data[nval], &src->data[v->fv], w * sizeof (union value));
-         nval += w;
-       }
-    }
+  return 1;
 }
 
-/* Reassigns `fv' for each variable.  Deletes scratch variables. */
+/* Dumps out the values of all the split variables for the case C. */
 static void
-finish_compaction (void)
+dump_splits (struct ccase *c)
 {
+  struct variable *const *split;
+  struct tab_table *t;
+  size_t split_cnt;
   int i;
 
-  for (i = 0; i < dict_get_var_cnt (default_dict); )
-    {
-      struct variable *v = dict_get_var (default_dict, i);
-
-      if (dict_class_from_id (v->name) == DC_SCRATCH) 
-        dict_delete_var (default_dict, v);
-      else
-        i++;
-    }
-  dict_compact_values (default_dict);
-}
-
-/* Creates a case source with class CLASS and auxiliary data AUX
-   and based on dictionary DICT. */
-struct case_source *
-create_case_source (const struct case_source_class *class,
-                    const struct dictionary *dict,
-                    void *aux) 
-{
-  struct case_source *source = xmalloc (sizeof *source);
-  source->class = class;
-  source->value_cnt = dict_get_next_value_idx (dict);
-  source->aux = aux;
-  return source;
-}
+  split_cnt = dict_get_split_cnt (default_dict);
+  if (split_cnt == 0)
+    return;
 
-/* Returns nonzero if a case source is "complex". */
-int
-case_source_is_complex (const struct case_source *source) 
-{
-  return source != NULL && (source->class == &input_program_source_class
-                            || source->class == &file_type_source_class);
-}
+  t = tab_create (3, split_cnt + 1, 0);
+  tab_dim (t, tab_natural_dimensions);
+  tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, split_cnt);
+  tab_vline (t, TAL_1 | TAL_SPACING, 2, 0, split_cnt);
+  tab_text (t, 0, 0, TAB_NONE, _("Variable"));
+  tab_text (t, 1, 0, TAB_LEFT, _("Value"));
+  tab_text (t, 2, 0, TAB_LEFT, _("Label"));
+  split = dict_get_split_vars (default_dict);
+  for (i = 0; i < split_cnt; i++)
+    {
+      struct variable *v = split[i];
+      char temp_buf[80];
+      const char *val_lab;
 
-/* Returns nonzero if CLASS is the class of SOURCE. */
-int
-case_source_is_class (const struct case_source *source,
-                      const struct case_source_class *class) 
-{
-  return source != NULL && source->class == class;
-}
+      assert (v->type == NUMERIC || v->type == ALPHA);
+      tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
+      
+      data_out (temp_buf, &v->print, &c->data[v->fv]);
+      
+      temp_buf[v->print.w] = 0;
+      tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
 
-/* Creates a case sink with class CLASS and auxiliary data
-   AUX. */
-struct case_sink *
-create_case_sink (const struct case_sink_class *class, void *aux) 
-{
-  struct case_sink *sink = xmalloc (sizeof *sink);
-  sink->class = class;
-  sink->aux = aux;
-  return sink;
+      val_lab = val_labs_find (v->val_labs, c->data[v->fv]);
+      if (val_lab)
+       tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
+    }
+  tab_flags (t, SOMF_NO_TITLE);
+  tab_submit (t);
 }