Rewrite expression code.
[pspp-builds.git] / src / vfm.c
index a847ea65f25d5285e714eb881eab9f82faa2711c..796e3bcff07eda9daac4101bb5793d8721d2dd8f 100644 (file)
--- a/src/vfm.c
+++ b/src/vfm.c
@@ -20,7 +20,7 @@
 #include <config.h>
 #include "vfm.h"
 #include "vfmP.h"
-#include <assert.h>
+#include "error.h"
 #include <errno.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <unistd.h>    /* Required by SunOS4. */
 #endif
 #include "alloc.h"
+#include "case.h"
+#include "casefile.h"
+#include "dictionary.h"
 #include "do-ifP.h"
 #include "error.h"
-#include "expr.h"
+#include "expressions/public.h"
 #include "misc.h"
-#include "random.h"
 #include "settings.h"
 #include "som.h"
 #include "str.h"
 /* Procedure execution data. */
 struct write_case_data
   {
-    /* Functions to call... */
-    void (*begin_func) (void *);               /* ...before data. */
-    int (*proc_func) (struct ccase *, void *); /* ...with data. */
-    void (*end_func) (void *);                 /* ...after data. */
-    void *func_aux;                            /* Auxiliary data. */ 
-
-    /* Extra auxiliary data. */
-    void *aux;
+    /* Function to call for each case. */
+    int (*proc_func) (struct ccase *, void *); /* Function. */
+    void *aux;                                 /* Auxiliary data. */ 
+
+    struct ccase trns_case;     /* Case used for transformations. */
+    struct ccase sink_case;     /* Case written to sink, if
+                                   compaction is necessary. */
+    size_t cases_written;       /* Cases output so far. */
+    size_t cases_analyzed;      /* Cases passed to procedure so far. */
   };
 
 /* The current active file, from which cases are read. */
@@ -70,312 +73,331 @@ struct case_sink *vfm_sink;
 
 /* Nonzero if the case needs to have values deleted before being
    stored, zero otherwise. */
-int compaction_necessary;
-
-/* Number of values after compaction. */
-int compaction_nval;
-
-/* Temporary case buffer with enough room for `compaction_nval'
-   `value's. */
-struct ccase *compaction_case;
-
-/* Nonzero means that we've overflowed our allotted workspace.
-   After that happens once during a session, we always store the
-   active file on disk instead of in memory.  (This policy may be
-   too aggressive.) */
-static int workspace_overflow = 0;
+static int compaction_necessary;
 
 /* Time at which vfm was last invoked. */
 time_t last_vfm_invocation;
 
-/* Number of cases passed to proc_func(). */
-static int case_count;
-
 /* Lag queue. */
 int n_lag;                     /* Number of cases to lag. */
 static int lag_count;          /* Number of cases in lag_queue so far. */
 static int lag_head;           /* Index where next case will be added. */
-static struct ccase **lag_queue; /* Array of n_lag ccase * elements. */
+static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
 
+static void internal_procedure (int (*proc_func) (struct ccase *, void *),
+                                void *aux);
+static void create_trns_case (struct ccase *, struct dictionary *);
 static void open_active_file (void);
-static void close_active_file (struct write_case_data *);
-static int SPLIT_FILE_proc_func (struct ccase *, void *);
-static void finish_compaction (void);
-static void lag_case (void);
-static int procedure_write_case (struct write_case_data *);
-static void clear_temp_case (void);
-static int exclude_this_case (int case_num);
+static int write_case (struct write_case_data *wc_data);
+static int execute_transformations (struct ccase *c,
+                                    struct trns_header **trns,
+                                    int first_idx, int last_idx,
+                                    int case_num);
+static int filter_case (const struct ccase *c, int case_num);
+static void lag_case (const struct ccase *c);
+static void compact_case (struct ccase *dest, const struct ccase *src);
+static void clear_case (struct ccase *c);
+static void close_active_file (void);
 \f
 /* Public functions. */
 
-struct procedure_aux_data 
-  {
-    size_t cases_written;       /* Number of cases written so far. */
-  };
+/* Reads the data from the input program and writes it to a new
+   active file.  For each case we read from the input program, we
+   do the following
 
-struct split_aux_data 
-  {
-    struct ccase *prev_case;    /* Data in previous case. */
-  };
+   1. Execute permanent transformations.  If these drop the case,
+      start the next case from step 1.
 
-/* Reads all the cases from the active file, transforms them by
-   the active set of transformations, calls PROC_FUNC with CURCASE
-   set to the case, and writes them to a new active file.
+   2. N OF CASES.  If we have already written N cases, start the
+      next case from step 1.
+   
+   3. Write case to replacement active file.
+   
+   4. Execute temporary transformations.  If these drop the case,
+      start the next case from step 1.
+      
+   5. FILTER, PROCESS IF.  If these drop the case, start the next
+      case from step 1.
+   
+   6. Post-TEMPORARY N OF CASES.  If we have already analyzed N
+      cases, start the next case from step 1.
+      
+   7. Pass case to PROC_FUNC, passing AUX as auxiliary data. */
+void
+procedure (int (*proc_func) (struct ccase *, void *), void *aux)
+{
+  if (proc_func == NULL
+      && case_source_is_class (vfm_source, &storage_source_class)
+      && vfm_sink == NULL
+      && !temporary
+      && n_trns == 0)
+    {
+      /* Nothing to do. */
+      return;
+    }
 
-   Divides the active file into zero or more series of one or more
-   cases each.  BEGIN_FUNC is called before each series.  END_FUNC is
-   called after each series.
+  open_active_file ();
+  internal_procedure (proc_func, aux);
+  close_active_file ();
+}
 
-   Arbitrary user-specified data AUX is passed to BEGIN_FUNC,
-   PROC_FUNC, and END_FUNC as auxiliary data. */
-void
-procedure (void (*begin_func) (void *),
-          int (*proc_func) (struct ccase *curcase, void *),
-          void (*end_func) (void *),
-           void *func_aux)
+/* Executes a procedure, as procedure(), except that the caller
+   is responsible for calling open_active_file() and
+   close_active_file(). */
+static void
+internal_procedure (int (*proc_func) (struct ccase *, void *), void *aux) 
 {
   static int recursive_call;
 
-  struct write_case_data procedure_write_data;
-  struct procedure_aux_data proc_aux;
-
-  struct write_case_data split_file_data;
-  struct split_aux_data split_aux;
-  int split;
+  struct write_case_data wc_data;
 
   assert (++recursive_call == 1);
 
-  proc_aux.cases_written = 0;
-
-  /* Normally we just use the data passed by the user. */
-  procedure_write_data.begin_func = begin_func;
-  procedure_write_data.proc_func = proc_func;
-  procedure_write_data.end_func = end_func;
-  procedure_write_data.func_aux = func_aux;
-  procedure_write_data.aux = &proc_aux;
-
-  /* Under SPLIT FILE, we add a layer of indirection. */
-  split = dict_get_split_cnt (default_dict) > 0;
-  if (split) 
-    {
-      split_file_data = procedure_write_data;
-      split_file_data.aux = &split_aux;
-
-      split_aux.prev_case = xmalloc (dict_get_case_size (default_dict));
-
-      procedure_write_data.begin_func = NULL;
-      procedure_write_data.proc_func = SPLIT_FILE_proc_func;
-      procedure_write_data.end_func = end_func;
-      procedure_write_data.func_aux = &split_file_data;
-    }
+  wc_data.proc_func = proc_func;
+  wc_data.aux = aux;
+  create_trns_case (&wc_data.trns_case, default_dict);
+  case_create (&wc_data.sink_case, dict_get_next_value_idx (default_dict));
+  wc_data.cases_written = 0;
 
   last_vfm_invocation = time (NULL);
 
-  open_active_file ();
   if (vfm_source != NULL) 
     vfm_source->class->read (vfm_source,
-                             procedure_write_case, &procedure_write_data);
-  close_active_file (&procedure_write_data);
+                             &wc_data.trns_case,
+                             write_case, &wc_data);
 
-  if (split)
-    free (split_aux.prev_case);
+  case_destroy (&wc_data.sink_case);
+  case_destroy (&wc_data.trns_case);
 
   assert (--recursive_call == 0);
 }
-\f
-/* Active file processing support.  Subtly different semantics from
-   procedure(). */
 
-static int process_active_file_write_case (struct write_case_data *data);
+/* Creates and returns a case, initializing it from the vectors
+   that say which `value's need to be initialized just once, and
+   which ones need to be re-initialized before every case. */
+static void
+create_trns_case (struct ccase *trns_case, struct dictionary *dict)
+{
+  size_t var_cnt = dict_get_var_cnt (dict);
+  size_t i;
 
-/* The casefunc might want us to stop calling it. */
-static int not_canceled;
+  case_create (trns_case, dict_get_next_value_idx (dict));
+  for (i = 0; i < var_cnt; i++) 
+    {
+      struct variable *v = dict_get_var (dict, i);
+      union value *value = case_data_rw (trns_case, v->fv);
 
-/* Reads all the cases from the active file and passes them one-by-one
-   to CASEFUNC in temp_case.  Before any cases are passed, calls
-   BEGIN_FUNC.  After all the cases have been passed, calls END_FUNC.
-   BEGIN_FUNC, CASEFUNC, and END_FUNC can write temp_case to the output
-   file by calling process_active_file_output_case().
+      if (v->type == NUMERIC)
+        value->f = v->reinit ? 0.0 : SYSMIS;
+      else
+        memset (value->s, ' ', v->width);
+    }
+}
 
-   process_active_file() ignores TEMPORARY, SPLIT FILE, and N. */
-void
-process_active_file (void (*begin_func) (void *),
-                    int (*casefunc) (struct ccase *curcase, void *),
-                    void (*end_func) (void *),
-                     void *func_aux)
+/* Makes all preparations for reading from the data source and writing
+   to the data sink. */
+static void
+open_active_file (void)
 {
-  struct write_case_data process_active_write_data;
+  /* Make temp_dict refer to the dictionary right before data
+     reaches the sink */
+  if (!temporary)
+    {
+      temp_trns = n_trns;
+      temp_dict = default_dict;
+    }
 
-  process_active_write_data.begin_func = begin_func;
-  process_active_write_data.proc_func = casefunc;
-  process_active_write_data.end_func = end_func;
-  process_active_write_data.func_aux = func_aux;
+  /* Figure out compaction. */
+  compaction_necessary = (dict_get_next_value_idx (temp_dict)
+                          != dict_get_compacted_value_cnt (temp_dict));
 
-  not_canceled = 1;
+  /* Prepare sink. */
+  if (vfm_sink == NULL)
+    vfm_sink = create_case_sink (&storage_sink_class, temp_dict, NULL);
+  if (vfm_sink->class->open != NULL)
+    vfm_sink->class->open (vfm_sink);
 
-  open_active_file ();
-  begin_func (func_aux);
-  if (vfm_source != NULL)
-    vfm_source->class->read (vfm_source, process_active_file_write_case,
-                             &process_active_write_data);
-  end_func (func_aux);
-  close_active_file (&process_active_write_data);
+  /* Allocate memory for lag queue. */
+  if (n_lag > 0)
+    {
+      int i;
+  
+      lag_count = 0;
+      lag_head = 0;
+      lag_queue = xmalloc (n_lag * sizeof *lag_queue);
+      for (i = 0; i < n_lag; i++)
+        case_nullify (&lag_queue[i]);
+    }
+
+  /* Close any unclosed DO IF or LOOP constructs. */
+  discard_ctl_stack ();
 }
 
-/* Pass the current case to casefunc. */
+/* Transforms trns_case and writes it to the replacement active
+   file if advisable.  Returns nonzero if more cases can be
+   accepted, zero otherwise.  Do not call this function again
+   after it has returned zero once.  */
 static int
-process_active_file_write_case (struct write_case_data *data)
+write_case (struct write_case_data *wc_data)
 {
-  /* Index of current transformation. */
-  int cur_trns;
+  /* Execute permanent transformations.  */
+  if (!execute_transformations (&wc_data->trns_case, t_trns, f_trns, temp_trns,
+                                wc_data->cases_written + 1))
+    goto done;
 
-  for (cur_trns = f_trns ; cur_trns != temp_trns; )
-    {
-      int code;
-       
-      code = t_trns[cur_trns]->proc (t_trns[cur_trns], temp_case,
-                                     case_count + 1);
-      switch (code)
-       {
-       case -1:
-         /* Next transformation. */
-         cur_trns++;
-         break;
-       case -2:
-         /* Delete this case. */
-         goto done;
-       default:
-         /* Go to that transformation. */
-         cur_trns = code;
-         break;
-       }
-    }
+  /* N OF CASES. */
+  if (dict_get_case_limit (default_dict)
+      && wc_data->cases_written >= dict_get_case_limit (default_dict))
+    goto done;
+  wc_data->cases_written++;
 
+  /* Write case to LAG queue. */
   if (n_lag)
-    lag_case ();
-         
-  /* Call the procedure if FILTER and PROCESS IF don't prohibit it. */
-  if (not_canceled && !exclude_this_case (case_count + 1))
-    not_canceled = data->proc_func (temp_case, data->func_aux);
+    lag_case (&wc_data->trns_case);
+
+  /* Write case to replacement active file. */
+  if (vfm_sink->class->write != NULL) 
+    {
+      if (compaction_necessary) 
+        {
+          compact_case (&wc_data->sink_case, &wc_data->trns_case);
+          vfm_sink->class->write (vfm_sink, &wc_data->sink_case);
+        }
+      else
+        vfm_sink->class->write (vfm_sink, &wc_data->trns_case);
+    }
   
-  case_count++;
+  /* Execute temporary transformations. */
+  if (!execute_transformations (&wc_data->trns_case, t_trns, temp_trns, n_trns,
+                                wc_data->cases_written))
+    goto done;
   
- done:
-  clear_temp_case ();
+  /* FILTER, PROCESS IF, post-TEMPORARY N OF CASES. */
+  if (filter_case (&wc_data->trns_case, wc_data->cases_written)
+      || (dict_get_case_limit (temp_dict)
+          && wc_data->cases_analyzed >= dict_get_case_limit (temp_dict)))
+    goto done;
+  wc_data->cases_analyzed++;
 
-  return 1;
-}
+  /* Pass case to procedure. */
+  if (wc_data->proc_func != NULL)
+    wc_data->proc_func (&wc_data->trns_case, wc_data->aux);
 
-/* Write temp_case to the active file. */
-void
-process_active_file_output_case (void)
-{
-  vfm_sink->class->write (vfm_sink, temp_case);
+ done:
+  clear_case (&wc_data->trns_case);
+  return 1;
 }
-\f
-/* Opening the active file. */
 
-/* It might be usefully noted that the following several functions are
-   given in the order that they are called by open_active_file(). */
-
-/* Prepare to write to the replacement active file. */
-static void
-prepare_for_writing (void)
+/* Transforms case C using the transformations in TRNS[] with
+   indexes FIRST_IDX through LAST_IDX, exclusive.  Case C will
+   become case CASE_NUM (1-based) in the output file.  Returns
+   zero if the case was filtered out by one of the
+   transformations, nonzero otherwise. */
+static int
+execute_transformations (struct ccase *c,
+                         struct trns_header **trns,
+                         int first_idx, int last_idx,
+                         int case_num) 
 {
-  /* FIXME: If ALL the conditions listed below hold true, then the
-     replacement active file is guaranteed to be identical to the
-     original active file:
-
-     1. TEMPORARY was the first transformation, OR, there were no
-     transformations at all.
+  int idx;
 
-     2. Input is not coming from an input program.
-
-     3. Compaction is not necessary.
-
-     So, in this case, we shouldn't have to replace the active
-     file--it's just a waste of time and space. */
-
-  if (vfm_sink == NULL)
+  for (idx = first_idx; idx != last_idx; )
     {
-      if (workspace_overflow)
-        vfm_sink = create_case_sink (&disk_sink_class, NULL);
-      else
-        vfm_sink = create_case_sink (&memory_sink_class, NULL);
+      int retval = trns[idx]->proc (trns[idx], c, case_num);
+      switch (retval)
+        {
+        case -1:
+          idx++;
+          break;
+          
+        case -2:
+          return 0;
+          
+        default:
+          idx = retval;
+          break;
+        }
     }
+
+  return 1;
 }
 
-/* Arrange for compacting the output cases for storage. */
-static void
-arrange_compaction (void)
+/* Returns nonzero if case C with case number CASE_NUM should be
+   exclude as specified on FILTER or PROCESS IF, otherwise
+   zero. */
+static int
+filter_case (const struct ccase *c, int case_idx)
 {
-  int count_values = 0;
+  /* FILTER. */
+  struct variable *filter_var = dict_get_filter (default_dict);
+  if (filter_var != NULL) 
+    {
+      double f = case_num (c, filter_var->fv);
+      if (f == 0.0 || f == SYSMIS || is_num_user_missing (f, filter_var))
+        return 1;
+    }
 
-  {
-    int i;
-    
-    /* Count up the number of `value's that will be output. */
-    for (i = 0; i < dict_get_var_cnt (temp_dict); i++) 
-      {
-        struct variable *v = dict_get_var (temp_dict, i);
-
-        if (v->name[0] != '#')
-          {
-            assert (v->nv > 0);
-            count_values += v->nv;
-          } 
-      }
-    assert (temporary == 2
-            || count_values <= dict_get_next_value_idx (temp_dict));
-  }
-  
-  /* Compaction is only necessary if the number of `value's to output
-     differs from the number already present. */
-  compaction_nval = count_values;
-  if (temporary == 2 || count_values != dict_get_next_value_idx (temp_dict))
-    compaction_necessary = 1;
-  else
-    compaction_necessary = 0;
-  
-  if (vfm_sink->class->open != NULL)
-    vfm_sink->class->open (vfm_sink);
+  /* PROCESS IF. */
+  if (process_if_expr != NULL
+      && expr_evaluate_num (process_if_expr, c, case_idx) != 1.0)
+    return 1;
+
+  return 0;
 }
 
-/* Prepares the temporary case and compaction case. */
+/* Add C to the lag queue. */
 static void
-make_temp_case (void)
+lag_case (const struct ccase *c)
 {
-  temp_case = xmalloc (dict_get_case_size (default_dict));
-
-  if (compaction_necessary)
-    compaction_case = xmalloc (sizeof (struct ccase)
-                              + sizeof (union value) * (compaction_nval - 1));
+  if (lag_count < n_lag)
+    lag_count++;
+  case_destroy (&lag_queue[lag_head]);
+  case_clone (&lag_queue[lag_head], c);
+  if (++lag_head >= n_lag)
+    lag_head = 0;
 }
 
-#if DEBUGGING
-/* Returns the name of the variable that owns the index CCASE_INDEX
-   into ccase. */
-static const char *
-index_to_varname (int ccase_index)
+/* Copies case SRC to case DEST, compacting it in the process. */
+static void
+compact_case (struct ccase *dest, const struct ccase *src)
 {
   int i;
+  int nval = 0;
+  size_t var_cnt;
+  
+  assert (compaction_necessary);
 
-  for (i = 0; i < default_dict.nvar; i++)
+  /* Copy all the variables except scratch variables from SRC to
+     DEST. */
+  /* FIXME: this should be temp_dict not default_dict I guess. */
+  var_cnt = dict_get_var_cnt (default_dict);
+  for (i = 0; i < var_cnt; i++)
     {
-      struct variable *v = default_dict.var[i];
+      struct variable *v = dict_get_var (default_dict, i);
       
-      if (ccase_index >= v->fv && ccase_index < v->fv + v->nv)
-       return default_dict.var[i]->name;
+      if (dict_class_from_id (v->name) == DC_SCRATCH)
+       continue;
+
+      if (v->type == NUMERIC) 
+        {
+          case_data_rw (dest, nval)->f = case_num (src, v->fv);
+          nval++; 
+        }
+      else
+       {
+         int w = DIV_RND_UP (v->width, sizeof (union value));
+         
+         memcpy (case_data_rw (dest, nval), case_str (src, v->fv),
+                  w * sizeof (union value));
+         nval += w;
+       }
     }
-  return _("<NOVAR>");
 }
-#endif
 
-/* Initializes temp_case from the vectors that say which `value's
-   need to be initialized just once, and which ones need to be
-   re-initialized before every case. */
+/* Clears the variables in C that need to be cleared between
+   processing cases.  */
 static void
-vector_initialization (void)
+clear_case (struct ccase *c)
 {
   size_t var_cnt = dict_get_var_cnt (default_dict);
   size_t i;
@@ -383,106 +405,32 @@ vector_initialization (void)
   for (i = 0; i < var_cnt; i++) 
     {
       struct variable *v = dict_get_var (default_dict, i);
-
-      if (v->type == NUMERIC) 
+      if (v->init && v->reinit) 
         {
-          if (v->reinit)
-            temp_case->data[v->fv].f = 0.0;
+          if (v->type == NUMERIC)
+            case_data_rw (c, v->fv)->f = SYSMIS;
           else
-            temp_case->data[v->fv].f = SYSMIS;
-        }
-      else
-        memset (temp_case->data[v->fv].s, ' ', v->width);
+            memset (case_data_rw (c, v->fv)->s, ' ', v->width);
+        } 
     }
 }
 
-/* Sets all the lag-related variables based on value of n_lag. */
-static void
-setup_lag (void)
-{
-  int i;
-  
-  if (n_lag == 0)
-    return;
-
-  lag_count = 0;
-  lag_head = 0;
-  lag_queue = xmalloc (n_lag * sizeof *lag_queue);
-  for (i = 0; i < n_lag; i++)
-    lag_queue[i] = xmalloc (dict_get_case_size (temp_dict));
-}
-
-/* There is a lot of potential confusion in the vfm and related
-   routines over the number of `value's at each stage of the process.
-   Here is each nval count, with explanation, as set up by
-   open_active_file():
-
-   temp_dict->nval: Number of `value's in the cases after the
-   transformations leading up to TEMPORARY have been performed.
-
-   compaction_nval: Number of `value's in the cases after the
-   transformations leading up to TEMPORARY have been performed
-   and the case has been compacted by compact_case(), if
-   compaction is necessary.  This the number of `value's in the
-   cases saved by the sink stream.  (However, note that the cases
-   passed to the sink stream have not yet been compacted.  It is
-   the responsibility of the data sink to call compact_case().)
-   `compaction' becomes the new value of default_dict.nval after
-   the procedure is completed.
-
-   default_dict.nval: This is often an alias for temp_dict->nval.
-   As such it can really have no separate existence until the
-   procedure is complete.  For this reason it should *not* be
-   referenced inside the execution of a procedure. */
-/* Makes all preparations for reading from the data source and writing
-   to the data sink. */
-static void
-open_active_file (void)
-{
-  /* Sometimes we want to refer to the dictionary that applies to the
-     data actually written to the sink.  This is either temp_dict or
-     default_dict.  However, if TEMPORARY is not on, then temp_dict
-     does not apply.  So, we can set temp_dict to default_dict in this
-     case. */
-  if (!temporary)
-    {
-      temp_trns = n_trns;
-      temp_dict = default_dict;
-    }
-
-  /* No cases passed to the procedure yet. */
-  case_count = 0;
-
-  /* The rest. */
-  prepare_for_writing ();
-  arrange_compaction ();
-  make_temp_case ();
-  vector_initialization ();
-  discard_ctl_stack ();
-  setup_lag ();
-}
-\f
 /* Closes the active file. */
 static void
-close_active_file (struct write_case_data *data)
+close_active_file (void)
 {
-  /* Close the current case group. */
-  if (case_count && data->end_func != NULL)
-    data->end_func (data->func_aux);
-
-  /* Stop lagging (catch up?). */
-  if (n_lag)
+  /* Free memory for lag queue, and turn off lagging. */
+  if (n_lag > 0)
     {
       int i;
       
       for (i = 0; i < n_lag; i++)
-       free (lag_queue[i]);
+       case_destroy (&lag_queue[i]);
       free (lag_queue);
       n_lag = 0;
     }
   
-  /* Assume the dictionary from right before TEMPORARY, if any.  Turn
-     off TEMPORARY. */
+  /* Dictionary from before TEMPORARY becomes permanent.. */
   if (temporary)
     {
       dict_destroy (default_dict);
@@ -492,643 +440,415 @@ close_active_file (struct write_case_data *data)
 
   /* Finish compaction. */
   if (compaction_necessary)
-    finish_compaction ();
+    dict_compact_values (default_dict);
     
-  /* Old data sink --> New data source. */
+  /* Free data source. */
   if (vfm_source != NULL) 
     {
-      if (vfm_source->class->destroy != NULL)
-        vfm_source->class->destroy (vfm_source);
-      free (vfm_source);
+      free_case_source (vfm_source);
+      vfm_source = NULL;
     }
 
-  vfm_source = vfm_sink->class->make_source (vfm_sink);
-
-  /* Old data sink is gone now. */
-  free (vfm_sink);
+  /* Old data sink becomes new data source. */
+  if (vfm_sink->class->make_source != NULL)
+    vfm_source = vfm_sink->class->make_source (vfm_sink);
+  free_case_sink (vfm_sink);
   vfm_sink = NULL;
 
-  /* Cancel TEMPORARY. */
+  /* Cancel TEMPORARY, PROCESS IF, FILTER, N OF CASES, vectors,
+     and get rid of all the transformations. */
   cancel_temporary ();
-
-  /* Free temporary cases. */
-  free (temp_case);
-  temp_case = NULL;
-
-  free (compaction_case);
-  compaction_case = NULL;
-
-  /* Cancel PROCESS IF. */
   expr_free (process_if_expr);
   process_if_expr = NULL;
-
-  /* Cancel FILTER if temporary. */
   if (dict_get_filter (default_dict) != NULL && !FILTER_before_TEMPORARY)
     dict_set_filter (default_dict, NULL);
-
-  /* Cancel transformations. */
-  cancel_transformations ();
-
-  /* Turn off case limiter. */
   dict_set_case_limit (default_dict, 0);
-
-  /* Clear VECTOR vectors. */
   dict_clear_vectors (default_dict);
+  cancel_transformations ();
 }
 \f
-/* Disk case stream. */
+/* Storage case stream. */
 
-/* Information about disk sink or source. */
-struct disk_stream_info 
+/* Information about storage sink or source. */
+struct storage_stream_info 
   {
-    FILE *file;                 /* Output file. */
-    size_t case_cnt;            /* Number of cases written so far. */
-    size_t case_size;           /* Number of bytes in case. */
+    struct casefile *casefile;  /* Storage. */
   };
 
-/* Initializes the disk sink. */
+/* Initializes a storage sink. */
 static void
-disk_sink_create (struct case_sink *sink)
+storage_sink_open (struct case_sink *sink)
 {
-  struct disk_stream_info *info = xmalloc (sizeof *info);
-  info->file = tmpfile ();
-  info->case_cnt = 0;
-  info->case_size = compaction_nval;
-  sink->aux = info;
-  if (info->file == NULL)
-    {
-      msg (ME, _("An error occurred attempting to create a temporary "
-                "file for use as the active file: %s."),
-          strerror (errno));
-      err_failure ();
-    }
+  struct storage_stream_info *info;
+
+  sink->aux = info = xmalloc (sizeof *info);
+  info->casefile = casefile_create (sink->value_cnt);
 }
 
-/* Writes temp_case to the disk sink. */
+/* Destroys storage stream represented by INFO. */
 static void
-disk_sink_write (struct case_sink *sink, struct ccase *c)
+destroy_storage_stream_info (struct storage_stream_info *info) 
 {
-  struct disk_stream_info *info = sink->aux;
-  union value *src_case;
-
-  if (compaction_necessary)
+  if (info != NULL) 
     {
-      compact_case (compaction_case, c);
-      src_case = compaction_case->data;
+      casefile_destroy (info->casefile);
+      free (info); 
     }
-  else src_case = c->data;
+}
 
-  info->case_cnt++;
-  if (fwrite (src_case, sizeof *src_case * compaction_nval, 1,
-              info->file) != 1)
-    {
-      msg (ME, _("An error occurred while attempting to write to a "
-                "temporary file used as the active file: %s."),
-          strerror (errno));
-      err_failure ();
-    }
+/* Writes case C to the storage sink SINK. */
+static void
+storage_sink_write (struct case_sink *sink, const struct ccase *c)
+{
+  struct storage_stream_info *info = sink->aux;
+
+  casefile_append (info->casefile, c);
 }
 
-/* Destroys the sink's internal data. */
+/* Destroys internal data in SINK. */
 static void
-disk_sink_destroy (struct case_sink *sink)
+storage_sink_destroy (struct case_sink *sink)
 {
-  struct disk_stream_info *info = sink->aux;
-  if (info->file != NULL)
-    fclose (info->file);
+  destroy_storage_stream_info (sink->aux);
 }
 
-/* Closes and destroys the sink and returns a disk source to read
-   back the written data. */
+/* Closes the sink and returns a storage source to read back the
+   written data. */
 static struct case_source *
-disk_sink_make_source (struct case_sink *sink) 
+storage_sink_make_source (struct case_sink *sink) 
 {
-  struct disk_stream_info *info = sink->aux;
-    
-  /* Rewind the file. */
-  assert (info->file != NULL);
-  if (fseek (info->file, 0, SEEK_SET) != 0)
-    {
-      msg (ME, _("An error occurred while attempting to rewind a "
-                "temporary file used as the active file: %s."),
-          strerror (errno));
-      err_failure ();
-    }
-  
-  return create_case_source (&disk_source_class, info);
+  struct case_source *source
+    = create_case_source (&storage_source_class, sink->dict, sink->aux);
+  sink->aux = NULL;
+  return source;
 }
 
-/* Disk sink. */
-const struct case_sink_class disk_sink_class = 
+/* Storage sink. */
+const struct case_sink_class storage_sink_class = 
   {
-    "disk",
-    disk_sink_create,
-    disk_sink_write,
-    disk_sink_destroy,
-    disk_sink_make_source,
+    "storage",
+    storage_sink_open,
+    storage_sink_write,
+    storage_sink_destroy,
+    storage_sink_make_source,
   };
 \f
-/* Disk source. */
+/* Storage source. */
 
 /* Returns the number of cases that will be read by
-   disk_source_read(). */
+   storage_source_read(). */
 static int
-disk_source_count (const struct case_source *source) 
+storage_source_count (const struct case_source *source) 
 {
-  struct disk_stream_info *info = source->aux;
+  struct storage_stream_info *info = source->aux;
 
-  return info->case_cnt;
+  return casefile_get_case_cnt (info->casefile);
 }
 
-/* Reads all cases from the disk source and passes them one by one to
+/* Reads all cases from the storage source and passes them one by one to
    write_case(). */
 static void
-disk_source_read (struct case_source *source,
-                  write_case_func *write_case, write_case_data wc_data)
+storage_source_read (struct case_source *source,
+                     struct ccase *output_case,
+                     write_case_func *write_case, write_case_data wc_data)
 {
-  struct disk_stream_info *info = source->aux;
-  int i;
+  struct storage_stream_info *info = source->aux;
+  struct ccase casefile_case;
+  struct casereader *reader;
 
-  for (i = 0; i < info->case_cnt; i++)
+  for (reader = casefile_get_reader (info->casefile);
+       casereader_read (reader, &casefile_case);
+       case_destroy (&casefile_case))
     {
-      if (!fread (temp_case, info->case_size, 1, info->file))
-       {
-         msg (ME, _("An error occurred while attempting to read from "
-              "a temporary file created for the active file: %s."),
-              strerror (errno));
-         err_failure ();
-         return;
-       }
-
-      if (!write_case (wc_data))
-       return;
+      case_copy (output_case, 0,
+                 &casefile_case, 0,
+                 casefile_get_value_cnt (info->casefile));
+      write_case (wc_data);
     }
+  casereader_destroy (reader);
 }
 
 /* Destroys the source's internal data. */
 static void
-disk_source_destroy (struct case_source *source)
+storage_source_destroy (struct case_source *source)
 {
-  struct disk_stream_info *info = source->aux;
-  if (info->file != NULL)
-    fclose (info->file);
-  free (info);
+  destroy_storage_stream_info (source->aux);
 }
 
-/* Disk source. */
-const struct case_source_class disk_source_class = 
-  {
-    "disk",
-    disk_source_count,
-    disk_source_read,
-    disk_source_destroy,
-  };
-\f
-/* Memory case stream. */
-
-/* Memory sink data. */
-struct memory_sink_info
-  {
-    size_t case_cnt;            /* Number of cases. */
-    size_t case_size;           /* Case size in bytes. */
-    int max_cases;              /* Maximum cases before switching to disk. */
-    struct case_list *head;     /* First case in list. */
-    struct case_list *tail;     /* Last case in list. */
-  };
-
-/* Memory source data. */
-struct memory_source_info 
+/* Storage source. */
+const struct case_source_class storage_source_class = 
   {
-    size_t case_cnt;            /* Number of cases. */
-    size_t case_size;           /* Case size in bytes. */
-    struct case_list *cases;    /* List of cases. */
+    "storage",
+    storage_source_count,
+    storage_source_read,
+    storage_source_destroy,
   };
 
-static void
-memory_sink_create (struct case_sink *sink
+struct casefile *
+storage_source_get_casefile (struct case_source *source
 {
-  struct memory_sink_info *info;
-  
-  sink->aux = info = xmalloc (sizeof *info);
+  struct storage_stream_info *info = source->aux;
 
-  assert (compaction_nval > 0);
-  info->case_cnt = 0;
-  info->case_size = compaction_nval * sizeof (union value);
-  info->max_cases = set_max_workspace / info->case_size;
-  info->head = info->tail = NULL;
+  assert (source->class == &storage_source_class);
+  return info->casefile;
 }
 
-static void
-memory_sink_write (struct case_sink *sink, struct ccase *c
+struct case_source *
+storage_source_create (struct casefile *cf, const struct dictionary *dict
 {
-  struct memory_sink_info *info = sink->aux;
-  size_t case_size;
-  struct case_list *new_case;
-
-  case_size = sizeof (struct case_list)
-                      + ((compaction_nval - 1) * sizeof (union value));
-  new_case = malloc (case_size);
+  struct storage_stream_info *info;
 
-  /* If we've got memory to spare then add it to the linked list. */
-  if (info->case_cnt <= info->max_cases && new_case != NULL)
-    {
-      info->case_cnt++;
+  info = xmalloc (sizeof *info);
+  info->casefile = cf;
 
-      /* Append case to linked list. */
-      new_case->next = NULL;
-      if (info->head != NULL)
-        info->tail->next = new_case;
-      else
-        info->head = new_case;
-      info->tail = new_case;
+  return create_case_source (&storage_source_class, dict, info);
+}
+\f
+/* Null sink.  Used by a few procedures that keep track of output
+   themselves and would throw away anything that the sink
+   contained anyway. */
 
-      /* Copy data into case. */
-      if (compaction_necessary)
-       compact_case (&new_case->c, c);
-      else
-       memcpy (&new_case->c, c, sizeof (union value) * compaction_nval);
+const struct case_sink_class null_sink_class = 
+  {
+    "null",
+    NULL,
+    NULL,
+    NULL,
+    NULL,
+  };
+\f
+/* Returns a pointer to the lagged case from N_BEFORE cases before the
+   current one, or NULL if there haven't been that many cases yet. */
+struct ccase *
+lagged_case (int n_before)
+{
+  assert (n_before >= 1 && n_before <= n_lag);
+  if (n_before <= lag_count)
+    {
+      int index = lag_head - n_before;
+      if (index < 0)
+        index += n_lag;
+      return &lag_queue[index];
     }
   else
+    return NULL;
+}
+   
+/* Appends TRNS to t_trns[], the list of all transformations to be
+   performed on data as it is read from the active file. */
+void
+add_transformation (struct trns_header * trns)
+{
+  if (n_trns >= m_trns)
     {
-      /* Out of memory.  Write the active file to disk. */
-      struct case_list *cur, *next;
-
-      /* Notify the user. */
-      if (!new_case)
-       msg (MW, _("Virtual memory exhausted.  Writing active file "
-                  "to disk."));
-      else
-       msg (MW, _("Workspace limit of %d KB (%d cases at %d bytes each) "
-                  "overflowed.  Writing active file to disk."),
-            set_max_workspace / 1024, info->max_cases,
-            compaction_nval * sizeof (union value));
-
-      free (new_case);
-
-      /* Switch to a disk sink. */
-      vfm_sink = create_case_sink (&disk_sink_class, NULL);
-      vfm_sink->class->open (vfm_sink);
-      workspace_overflow = 1;
-
-      /* Write the cases to disk and destroy them.  We can't call
-         vfm->sink->write() because of compaction. */
-      for (cur = info->head; cur; cur = next)
-       {
-         next = cur->next;
-         if (fwrite (cur->c.data, sizeof (union value) * compaction_nval, 1,
-                     vfm_sink->aux) != 1)
-           {
-             msg (ME, _("An error occurred while attempting to "
-                        "write to a temporary file created as the "
-                        "active file: %s."),
-                  strerror (errno));
-             err_failure ();
-           }
-         free (cur);
-       }
-
-      /* Write the current case to disk. */
-      vfm_sink->class->write (vfm_sink, c);
+      m_trns += 16;
+      t_trns = xrealloc (t_trns, sizeof *t_trns * m_trns);
     }
+  t_trns[n_trns] = trns;
+  trns->index = n_trns++;
 }
 
-/* If the data is stored in memory, causes it to be written to disk.
-   To be called only *between* procedure()s, not within them. */
+/* Cancels all active transformations, including any transformations
+   created by the input program. */
 void
-write_active_file_to_disk (void)
+cancel_transformations (void)
 {
-  if (case_source_is_class (vfm_source, &memory_source_class))
+  int i;
+  for (i = 0; i < n_trns; i++)
     {
-      struct memory_source_info *info = vfm_source->aux;
-
-      /* Switch to a disk sink. */
-      vfm_sink = create_case_sink (&disk_sink_class, NULL);
-      vfm_sink->class->open (vfm_sink);
-      workspace_overflow = 1;
-      
-      /* Write the cases to disk and destroy them.  We can't call
-         vfm->sink->write() because of compaction. */
-      {
-       struct case_list *cur, *next;
-       
-       for (cur = info->cases; cur; cur = next)
-         {
-           next = cur->next;
-           if (fwrite (cur->c.data, sizeof *cur->c.data * compaction_nval, 1,
-                       vfm_sink->aux) != 1)
-             {
-               msg (ME, _("An error occurred while attempting to "
-                          "write to a temporary file created as the "
-                          "active file: %s."),
-                    strerror (errno));
-               err_failure ();
-             }
-           free (cur);
-         }
-      }
-      
-      vfm_source = vfm_sink->class->make_source (vfm_sink);
-      vfm_sink = NULL;
+      if (t_trns[i]->free)
+       t_trns[i]->free (t_trns[i]);
+      free (t_trns[i]);
     }
+  n_trns = f_trns = 0;
+  free (t_trns);
+  t_trns=NULL;
+  m_trns = 0;
+}
+\f
+/* Creates a case source with class CLASS and auxiliary data AUX
+   and based on dictionary DICT. */
+struct case_source *
+create_case_source (const struct case_source_class *class,
+                    const struct dictionary *dict,
+                    void *aux) 
+{
+  struct case_source *source = xmalloc (sizeof *source);
+  source->class = class;
+  source->value_cnt = dict_get_next_value_idx (dict);
+  source->aux = aux;
+  return source;
 }
 
-/* Destroy all memory sink data. */
-static void
-memory_sink_destroy (struct case_sink *sink)
+/* Destroys case source SOURCE.  It is the caller's responsible to
+   call the source's destroy function, if any. */
+void
+free_case_source (struct case_source *source) 
 {
-  struct memory_sink_info *info = sink->aux;
-  struct case_list *cur, *next;
-  
-  for (cur = info->head; cur; cur = next)
+  if (source != NULL) 
     {
-      next = cur->next;
-      free (cur);
+      if (source->class->destroy != NULL)
+        source->class->destroy (source);
+      free (source);
     }
-  free (info);
 }
 
-/* Switch the memory stream from sink to source mode. */
-static struct case_source *
-memory_sink_make_source (struct case_sink *sink)
+/* Returns nonzero if a case source is "complex". */
+int
+case_source_is_complex (const struct case_source *source) 
 {
-  struct memory_sink_info *sink_info = sink->aux;
-  struct memory_source_info *source_info;
-
-  source_info = xmalloc (sizeof *source_info);
-  source_info->case_cnt = sink_info->case_cnt;
-  source_info->case_size = sink_info->case_size;
-  source_info->cases = sink_info->head;
-
-  free (sink_info);
-
-  return create_case_source (&memory_source_class, source_info);
+  return source != NULL && (source->class == &input_program_source_class
+                            || source->class == &file_type_source_class);
 }
 
-const struct case_sink_class memory_sink_class = 
-  {
-    "memory",
-    memory_sink_create,
-    memory_sink_write,
-    memory_sink_destroy,
-    memory_sink_make_source,
-  };
-
-/* Returns the number of cases in the source. */
-static int
-memory_source_count (const struct case_source *source) 
+/* Returns nonzero if CLASS is the class of SOURCE. */
+int
+case_source_is_class (const struct case_source *source,
+                      const struct case_source_class *class) 
 {
-  struct memory_source_info *info = source->aux;
-
-  return info->case_cnt;
+  return source != NULL && source->class == class;
 }
 
-/* Reads the case stream from memory and passes it to write_case(). */
-static void
-memory_source_read (struct case_source *source,
-                    write_case_func *write_case, write_case_data wc_data)
+/* Creates a case sink with class CLASS and auxiliary data
+   AUX. */
+struct case_sink *
+create_case_sink (const struct case_sink_class *class,
+                  const struct dictionary *dict,
+                  void *aux) 
 {
-  struct memory_source_info *info = source->aux;
-
-  while (info->cases != NULL) 
-    {
-      struct case_list *iter = info->cases;
-      info->cases = iter->next;
-      memcpy (temp_case, &iter->c, info->case_size);
-      free (iter);
-      
-      if (!write_case (wc_data))
-       return;
-    }
+  struct case_sink *sink = xmalloc (sizeof *sink);
+  sink->class = class;
+  sink->dict = dict;
+  sink->idx_to_fv = dict_get_compacted_idx_to_fv (dict);
+  sink->value_cnt = dict_get_compacted_value_cnt (dict);
+  sink->aux = aux;
+  return sink;
 }
 
-/* Destroy all memory source data. */
-static void
-memory_source_destroy (struct case_source *source)
+/* Destroys case sink SINK.  */
+void
+free_case_sink (struct case_sink *sink) 
 {
-  struct memory_source_info *info = source->aux;
-  struct case_list *cur, *next;
-  
-  for (cur = info->cases; cur; cur = next)
+  if (sink != NULL) 
     {
-      next = cur->next;
-      free (cur);
+      if (sink->class->destroy != NULL)
+        sink->class->destroy (sink);
+      free (sink->idx_to_fv);
+      free (sink); 
     }
-  free (info);
-}
-
-struct case_list *
-memory_source_get_cases (const struct case_source *source) 
-{
-  struct memory_source_info *info = source->aux;
-
-  return info->cases;
-}
-
-void
-memory_source_set_cases (const struct case_source *source,
-                         struct case_list *cases) 
-{
-  struct memory_source_info *info = source->aux;
-
-  info->cases = cases;
 }
-
-/* Memory stream. */
-const struct case_source_class memory_source_class = 
-  {
-    "memory",
-    memory_source_count,
-    memory_source_read,
-    memory_source_destroy,
-  };
 \f
-/* Add temp_case to the lag queue. */
-static void
-lag_case (void)
-{
-  if (lag_count < n_lag)
-    lag_count++;
-  memcpy (lag_queue[lag_head], temp_case,
-          dict_get_case_size (temp_dict));
-  if (++lag_head >= n_lag)
-    lag_head = 0;
-}
-
-/* Returns a pointer to the lagged case from N_BEFORE cases before the
-   current one, or NULL if there haven't been that many cases yet. */
-struct ccase *
-lagged_case (int n_before)
-{
-  assert (n_before <= n_lag);
-  if (n_before > lag_count)
-    return NULL;
-  
+/* Represents auxiliary data for handling SPLIT FILE. */
+struct split_aux_data 
   {
-    int index = lag_head - n_before;
-    if (index < 0)
-      index += n_lag;
-    return lag_queue[index];
-  }
-}
-   
-/* Transforms temp_case and writes it to the replacement active file
-   if advisable.  Returns nonzero if more cases can be accepted, zero
-   otherwise.  Do not call this function again after it has returned
-   zero once.  */
-int
-procedure_write_case (write_case_data wc_data)
-{
-  struct procedure_aux_data *proc_aux = wc_data->aux;
+    size_t case_count;          /* Number of cases so far. */
+    struct ccase prev_case;     /* Data in previous case. */
 
-  /* Index of current transformation. */
-  int cur_trns;
-
-  /* Return value: whether it's reasonable to write any more cases. */
-  int more_cases = 1;
-
-  cur_trns = f_trns;
-  for (;;)
-    {
-      /* Output the case if this is temp_trns. */
-      if (cur_trns == temp_trns)
-       {
-          int case_limit;
+    /* Functions to call... */
+    void (*begin_func) (void *);               /* ...before data. */
+    int (*proc_func) (struct ccase *, void *); /* ...with data. */
+    void (*end_func) (void *);                 /* ...after data. */
+    void *func_aux;                            /* Auxiliary data. */ 
+  };
 
-         if (n_lag)
-           lag_case ();
-         
-         vfm_sink->class->write (vfm_sink, temp_case);
+static int equal_splits (const struct ccase *, const struct ccase *);
+static int procedure_with_splits_callback (struct ccase *, void *);
+static void dump_splits (struct ccase *);
 
-          proc_aux->cases_written++;
-          case_limit = dict_get_case_limit (default_dict);
-         if (case_limit != 0 && proc_aux->cases_written >= case_limit)
-            more_cases = 0;
-       }
+/* Like procedure(), but it automatically breaks the case stream
+   into SPLIT FILE break groups.  Before each group of cases with
+   identical SPLIT FILE variable values, BEGIN_FUNC is called.
+   Then PROC_FUNC is called with each case in the group.  
+   END_FUNC is called when the group is finished.  FUNC_AUX is
+   passed to each of the functions as auxiliary data.
 
-      /* Are we done? */
-      if (cur_trns >= n_trns)
-       break;
-      
-      /* Decide which transformation should come next. */
-      {
-       int code;
-       
-       code = t_trns[cur_trns]->proc (t_trns[cur_trns], temp_case,
-                                       proc_aux->cases_written + 1);
-       switch (code)
-         {
-         case -1:
-           /* Next transformation. */
-           cur_trns++;
-           break;
-         case -2:
-           /* Delete this case. */
-           goto done;
-         default:
-           /* Go to that transformation. */
-           cur_trns = code;
-           break;
-         }
-      }
-    }
+   If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
+   and END_FUNC will be called at all. 
 
-  /* Call the beginning of group function. */
-  if (!case_count && wc_data->begin_func != NULL)
-    wc_data->begin_func (wc_data->func_aux);
+   If SPLIT FILE is not in effect, then there is one break group
+   (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
+   will be called once. */
+void
+procedure_with_splits (void (*begin_func) (void *aux),
+                       int (*proc_func) (struct ccase *, void *aux),
+                       void (*end_func) (void *aux),
+                       void *func_aux) 
+{
+  struct split_aux_data split_aux;
 
-  /* Call the procedure if there is one and FILTER and PROCESS IF
-     don't prohibit it. */
-  if (wc_data->proc_func != NULL
-      && !exclude_this_case (proc_aux->cases_written + 1))
-    wc_data->proc_func (temp_case, wc_data->func_aux);
+  split_aux.case_count = 0;
+  case_nullify (&split_aux.prev_case);
+  split_aux.begin_func = begin_func;
+  split_aux.proc_func = proc_func;
+  split_aux.end_func = end_func;
+  split_aux.func_aux = func_aux;
 
-  case_count++;
-  
-done:
-  clear_temp_case ();
-  
-  /* Return previously determined value. */
-  return more_cases;
-}
+  procedure (procedure_with_splits_callback, &split_aux);
 
-/* Clears the variables in the temporary case that need to be
-   cleared between processing cases.  */
-static void
-clear_temp_case (void)
-{
-  /* FIXME?  This is linear in the number of variables, but
-     doesn't need to be, so it's an easy optimization target. */
-  size_t var_cnt = dict_get_var_cnt (default_dict);
-  size_t i;
-  
-  for (i = 0; i < var_cnt; i++) 
-    {
-      struct variable *v = dict_get_var (default_dict, i);
-      if (v->init && v->reinit) 
-        {
-          if (v->type == NUMERIC) 
-            temp_case->data[v->fv].f = SYSMIS;
-          else
-            memset (temp_case->data[v->fv].s, ' ', v->width);
-        } 
-    }
+  if (split_aux.case_count > 0 && end_func != NULL)
+    end_func (func_aux);
+  case_destroy (&split_aux.prev_case);
 }
 
-/* Returns nonzero if this case (numbered CASE_NUM) should be
-   exclude as specified on FILTER or PROCESS IF, otherwise
-   zero. */
+/* procedure() callback used by procedure_with_splits(). */
 static int
-exclude_this_case (int case_num)
+procedure_with_splits_callback (struct ccase *c, void *split_aux_) 
 {
-  /* FILTER. */
-  struct variable *filter_var = dict_get_filter (default_dict);
-  if (filter_var != NULL) 
+  struct split_aux_data *split_aux = split_aux_;
+
+  /* Start a new series if needed. */
+  if (split_aux->case_count == 0
+      || !equal_splits (c, &split_aux->prev_case))
     {
-      double f = temp_case->data[filter_var->fv].f;
-      if (f == 0.0 || f == SYSMIS || is_num_user_missing (f, filter_var))
-        return 1;
+      if (split_aux->case_count > 0 && split_aux->end_func != NULL)
+        split_aux->end_func (split_aux->func_aux);
+
+      dump_splits (c);
+      case_destroy (&split_aux->prev_case);
+      case_clone (&split_aux->prev_case, c);
+
+      if (split_aux->begin_func != NULL)
+       split_aux->begin_func (split_aux->func_aux);
     }
 
-  /* PROCESS IF. */
-  if (process_if_expr != NULL
-      && expr_evaluate (process_if_expr, temp_case, case_num, NULL) != 1.0)
+  split_aux->case_count++;
+  if (split_aux->proc_func != NULL)
+    return split_aux->proc_func (c, split_aux->func_aux);
+  else
     return 1;
-
-  return 0;
 }
 
-/* Appends TRNS to t_trns[], the list of all transformations to be
-   performed on data as it is read from the active file. */
-void
-add_transformation (struct trns_header * trns)
+/* Compares the SPLIT FILE variables in cases A and B and returns
+   nonzero only if they differ. */
+static int
+equal_splits (const struct ccase *a, const struct ccase *b) 
 {
-  if (n_trns >= m_trns)
+  struct variable *const *split;
+  size_t split_cnt;
+  size_t i;
+    
+  split = dict_get_split_vars (default_dict);
+  split_cnt = dict_get_split_cnt (default_dict);
+  for (i = 0; i < split_cnt; i++)
     {
-      m_trns += 16;
-      t_trns = xrealloc (t_trns, sizeof *t_trns * m_trns);
+      struct variable *v = split[i];
+      
+      switch (v->type)
+       {
+       case NUMERIC:
+         if (case_num (a, v->fv) != case_num (b, v->fv))
+            return 0;
+         break;
+       case ALPHA:
+         if (memcmp (case_str (a, v->fv), case_str (b, v->fv), v->width))
+            return 0;
+         break;
+       default:
+         assert (0);
+       }
     }
-  t_trns[n_trns] = trns;
-  trns->index = n_trns++;
-}
 
-/* Cancels all active transformations, including any transformations
-   created by the input program. */
-void
-cancel_transformations (void)
-{
-  int i;
-  for (i = 0; i < n_trns; i++)
-    {
-      if (t_trns[i]->free)
-       t_trns[i]->free (t_trns[i]);
-      free (t_trns[i]);
-    }
-  n_trns = f_trns = 0;
-  if (m_trns > 32)
-    {
-      free (t_trns);
-      m_trns = 0;
-    }
+  return 1;
 }
 
 /* Dumps out the values of all the split variables for the case C. */
@@ -1141,6 +861,9 @@ dump_splits (struct ccase *c)
   int i;
 
   split_cnt = dict_get_split_cnt (default_dict);
+  if (split_cnt == 0)
+    return;
+
   t = tab_create (3, split_cnt + 1, 0);
   tab_dim (t, tab_natural_dimensions);
   tab_vline (t, TAL_1 | TAL_SPACING, 1, 0, split_cnt);
@@ -1158,170 +881,90 @@ dump_splits (struct ccase *c)
       assert (v->type == NUMERIC || v->type == ALPHA);
       tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
       
-      data_out (temp_buf, &v->print, &c->data[v->fv]);
+      data_out (temp_buf, &v->print, case_data (c, v->fv));
       
       temp_buf[v->print.w] = 0;
       tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
 
-      val_lab = val_labs_find (v->val_labs, c->data[v->fv]);
+      val_lab = val_labs_find (v->val_labs, *case_data (c, v->fv));
       if (val_lab)
        tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
     }
   tab_flags (t, SOMF_NO_TITLE);
   tab_submit (t);
 }
+\f
+/* Represents auxiliary data for handling SPLIT FILE in a
+   multipass procedure. */
+struct multipass_split_aux_data 
+  {
+    struct ccase prev_case;     /* Data in previous case. */
+    struct casefile *casefile;  /* Accumulates data for a split. */
 
-/* This proc_func is substituted for the user-supplied proc_func when
-   SPLIT FILE is active.  This function forms a wrapper around that
-   proc_func by dividing the input into series. */
-static int
-SPLIT_FILE_proc_func (struct ccase *c, void *data_)
-{
-  struct write_case_data *data = data_;
-  struct split_aux_data *split_aux = data->aux;
-  struct variable *const *split;
-  size_t split_cnt;
-  size_t i;
-
-  /* The first case always begins a new series.  We also need to
-     preserve the values of the case for later comparison. */
-  if (case_count == 0)
-    {
-      memcpy (split_aux->prev_case, c, dict_get_case_size (default_dict));
-
-      dump_splits (c);
-      if (data->begin_func != NULL)
-       data->begin_func (data->func_aux);
-      
-      return data->proc_func (c, data->func_aux);
-    }
+    /* Function to call with the accumulated data. */
+    void (*split_func) (const struct casefile *, void *);
+    void *func_aux;                            /* Auxiliary data. */ 
+  };
 
-  /* Compare the value of each SPLIT FILE variable to the values on
-     the previous case. */
-  split = dict_get_split_vars (default_dict);
-  split_cnt = dict_get_split_cnt (default_dict);
-  for (i = 0; i < split_cnt; i++)
-    {
-      struct variable *v = split[i];
-      
-      switch (v->type)
-       {
-       case NUMERIC:
-         if (c->data[v->fv].f != split_aux->prev_case->data[v->fv].f)
-           goto not_equal;
-         break;
-       case ALPHA:
-         if (memcmp (c->data[v->fv].s,
-                      split_aux->prev_case->data[v->fv].s, v->width))
-           goto not_equal;
-         break;
-       default:
-         assert (0);
-       }
-    }
-  return data->proc_func (c, data->func_aux);
-  
-not_equal:
-  /* The values of the SPLIT FILE variable are different from the
-     values on the previous case.  That means that it's time to begin
-     a new series. */
-  if (data->end_func != NULL)
-    data->end_func (data->func_aux);
-  dump_splits (c);
-  if (data->begin_func != NULL)
-    data->begin_func (data->func_aux);
-  memcpy (split_aux->prev_case, c, dict_get_case_size (default_dict));
-  return data->proc_func (c, data->func_aux);
-}
-\f
-/* Case compaction. */
+static int multipass_split_callback (struct ccase *c, void *aux_);
+static void multipass_split_output (struct multipass_split_aux_data *);
 
-/* Copies case SRC to case DEST, compacting it in the process. */
 void
-compact_case (struct ccase *dest, const struct ccase *src)
+multipass_procedure_with_splits (void (*split_func) (const struct casefile *,
+                                                     void *),
+                                 void *func_aux) 
 {
-  int i;
-  int nval = 0;
-  size_t var_cnt;
-  
-  assert (compaction_necessary);
+  struct multipass_split_aux_data aux;
 
-  if (temporary == 2)
-    {
-      if (dest != compaction_case)
-       memcpy (dest, compaction_case, sizeof (union value) * compaction_nval);
-      return;
-    }
+  assert (split_func != NULL);
 
-  /* Copy all the variables except the scratch variables from SRC to
-     DEST. */
-  var_cnt = dict_get_var_cnt (default_dict);
-  for (i = 0; i < var_cnt; i++)
-    {
-      struct variable *v = dict_get_var (default_dict, i);
-      
-      if (v->name[0] == '#')
-       continue;
+  open_active_file ();
 
-      if (v->type == NUMERIC)
-       dest->data[nval++] = src->data[v->fv];
-      else
-       {
-         int w = DIV_RND_UP (v->width, sizeof (union value));
-         
-         memcpy (&dest->data[nval], &src->data[v->fv], w * sizeof (union value));
-         nval += w;
-       }
-    }
+  case_nullify (&aux.prev_case);
+  aux.casefile = NULL;
+  aux.split_func = split_func;
+  aux.func_aux = func_aux;
+
+  internal_procedure (multipass_split_callback, &aux);
+  if (aux.casefile != NULL)
+    multipass_split_output (&aux);
+  case_destroy (&aux.prev_case);
+
+  close_active_file ();
 }
 
-/* Reassigns `fv' for each variable.  Deletes scratch variables. */
-static void
-finish_compaction (void)
+/* procedure() callback used by multipass_procedure_with_splits(). */
+static int
+multipass_split_callback (struct ccase *c, void *aux_)
 {
-  int i;
+  struct multipass_split_aux_data *aux = aux_;
 
-  for (i = 0; i < dict_get_var_cnt (default_dict); )
+  /* Start a new series if needed. */
+  if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case))
     {
-      struct variable *v = dict_get_var (default_dict, i);
+      /* Pass any cases to split_func. */
+      if (aux->casefile != NULL)
+        multipass_split_output (aux);
 
-      if (v->name[0] == '#') 
-        dict_delete_var (default_dict, v);
-      else
-        i++;
-    }
-  dict_compact_values (default_dict);
-}
+      /* Start a new casefile. */
+      aux->casefile = casefile_create (dict_get_next_value_idx (default_dict));
 
-struct case_source *
-create_case_source (const struct case_source_class *class, void *aux) 
-{
-  struct case_source *source = xmalloc (sizeof *source);
-  source->class = class;
-  source->aux = aux;
-  return source;
-}
+      /* Record split values. */
+      dump_splits (c);
+      case_destroy (&aux->prev_case);
+      case_clone (&aux->prev_case, c);
+    }
 
-int
-case_source_is_complex (const struct case_source *source) 
-{
-  return source != NULL && (source->class == &input_program_source_class
-                            || source->class == &file_type_source_class);
-}
+  casefile_append (aux->casefile, c);
 
-int
-case_source_is_class (const struct case_source *source,
-                      const struct case_source_class *class) 
-{
-  return source != NULL && source->class == class;
+  return 1;
 }
 
-struct case_sink *
-create_case_sink (const struct case_sink_class *class, void *aux) 
+static void
+multipass_split_output (struct multipass_split_aux_data *aux)
 {
-  struct case_sink *sink = xmalloc (sizeof *sink);
-  sink->class = class;
-  sink->aux = aux;
-  return sink;
+  assert (aux->casefile != NULL);
+  aux->split_func (aux->casefile, aux->func_aux);
+  casefile_destroy (aux->casefile);
+  aux->casefile = NULL;
 }
-