Adopt use of gnulib for portability.
[pspp-builds.git] / src / vfm.c
index b5d2a8d3564e76641e692bace61df3fe3befcf76..0414234c01d2be666208fb4c7db1e67c5b315974 100644 (file)
--- a/src/vfm.c
+++ b/src/vfm.c
 
    You should have received a copy of the GNU General Public License
    along with this program; if not, write to the Free Software
-   Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
-   02111-1307, USA. */
+   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+   02110-1301, USA. */
 
 #include <config.h>
 #include "vfm.h"
 #include "vfmP.h"
-#include <assert.h>
+#include "error.h"
 #include <errno.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <unistd.h>    /* Required by SunOS4. */
 #endif
 #include "alloc.h"
+#include "case.h"
+#include "casefile.h"
+#include "dictionary.h"
 #include "do-ifP.h"
 #include "error.h"
-#include "expr.h"
+#include "expressions/public.h"
 #include "misc.h"
-#include "random.h"
 #include "settings.h"
 #include "som.h"
 #include "str.h"
@@ -40,6 +42,9 @@
 #include "var.h"
 #include "value-labels.h"
 
+#include "gettext.h"
+#define _(msgid) gettext (msgid)
+
 /*
    Virtual File Manager (vfm):
 
@@ -56,8 +61,8 @@ struct write_case_data
     int (*proc_func) (struct ccase *, void *); /* Function. */
     void *aux;                                 /* Auxiliary data. */ 
 
-    struct ccase *trns_case;    /* Case used for transformations. */
-    struct ccase *sink_case;    /* Case written to sink, if
+    struct ccase trns_case;     /* Case used for transformations. */
+    struct ccase sink_case;     /* Case written to sink, if
                                    compaction is necessary. */
     size_t cases_written;       /* Cases output so far. */
     size_t cases_analyzed;      /* Cases passed to procedure so far. */
@@ -73,12 +78,6 @@ struct case_sink *vfm_sink;
    stored, zero otherwise. */
 static int compaction_necessary;
 
-/* Nonzero means that we've overflowed our allotted workspace.
-   After that happens once during a session, we always store the
-   active file on disk instead of in memory.  (This policy may be
-   too aggressive.) */
-static int workspace_overflow = 0;
-
 /* Time at which vfm was last invoked. */
 time_t last_vfm_invocation;
 
@@ -86,9 +85,11 @@ time_t last_vfm_invocation;
 int n_lag;                     /* Number of cases to lag. */
 static int lag_count;          /* Number of cases in lag_queue so far. */
 static int lag_head;           /* Index where next case will be added. */
-static struct ccase **lag_queue; /* Array of n_lag ccase * elements. */
+static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
 
-static struct ccase *create_trns_case (struct dictionary *);
+static void internal_procedure (int (*proc_func) (struct ccase *, void *),
+                                void *aux);
+static void create_trns_case (struct ccase *, struct dictionary *);
 static void open_active_file (void);
 static int write_case (struct write_case_data *wc_data);
 static int execute_transformations (struct ccase *c,
@@ -97,7 +98,6 @@ static int execute_transformations (struct ccase *c,
                                     int case_num);
 static int filter_case (const struct ccase *c, int case_num);
 static void lag_case (const struct ccase *c);
-static void compact_case (struct ccase *dest, const struct ccase *src);
 static void clear_case (struct ccase *c);
 static void close_active_file (void);
 \f
@@ -127,6 +127,27 @@ static void close_active_file (void);
    7. Pass case to PROC_FUNC, passing AUX as auxiliary data. */
 void
 procedure (int (*proc_func) (struct ccase *, void *), void *aux)
+{
+  if (proc_func == NULL
+      && case_source_is_class (vfm_source, &storage_source_class)
+      && vfm_sink == NULL
+      && !temporary
+      && n_trns == 0)
+    {
+      /* Nothing to do. */
+      return;
+    }
+
+  open_active_file ();
+  internal_procedure (proc_func, aux);
+  close_active_file ();
+}
+
+/* Executes a procedure, as procedure(), except that the caller
+   is responsible for calling open_active_file() and
+   close_active_file(). */
+static void
+internal_procedure (int (*proc_func) (struct ccase *, void *), void *aux) 
 {
   static int recursive_call;
 
@@ -136,21 +157,19 @@ procedure (int (*proc_func) (struct ccase *, void *), void *aux)
 
   wc_data.proc_func = proc_func;
   wc_data.aux = aux;
-  wc_data.trns_case = create_trns_case (default_dict);
-  wc_data.sink_case = xmalloc (dict_get_case_size (default_dict));
+  create_trns_case (&wc_data.trns_case, default_dict);
+  case_create (&wc_data.sink_case, dict_get_next_value_idx (default_dict));
   wc_data.cases_written = 0;
 
   last_vfm_invocation = time (NULL);
 
-  open_active_file ();
   if (vfm_source != NULL) 
     vfm_source->class->read (vfm_source,
-                             wc_data.trns_case,
+                             &wc_data.trns_case,
                              write_case, &wc_data);
-  close_active_file ();
 
-  free (wc_data.sink_case);
-  free (wc_data.trns_case);
+  case_destroy (&wc_data.sink_case);
+  case_destroy (&wc_data.trns_case);
 
   assert (--recursive_call == 0);
 }
@@ -158,28 +177,23 @@ procedure (int (*proc_func) (struct ccase *, void *), void *aux)
 /* Creates and returns a case, initializing it from the vectors
    that say which `value's need to be initialized just once, and
    which ones need to be re-initialized before every case. */
-static struct ccase *
-create_trns_case (struct dictionary *dict)
+static void
+create_trns_case (struct ccase *trns_case, struct dictionary *dict)
 {
-  struct ccase *c = xmalloc (dict_get_case_size (dict));
   size_t var_cnt = dict_get_var_cnt (dict);
   size_t i;
 
+  case_create (trns_case, dict_get_next_value_idx (dict));
   for (i = 0; i < var_cnt; i++) 
     {
       struct variable *v = dict_get_var (dict, i);
+      union value *value = case_data_rw (trns_case, v->fv);
 
-      if (v->type == NUMERIC) 
-        {
-          if (v->reinit)
-            c->data[v->fv].f = 0.0;
-          else
-            c->data[v->fv].f = SYSMIS;
-        }
+      if (v->type == NUMERIC)
+        value->f = v->reinit ? 0.0 : SYSMIS;
       else
-        memset (c->data[v->fv].s, ' ', v->width);
+        memset (value->s, ' ', v->width);
     }
-  return c;
 }
 
 /* Makes all preparations for reading from the data source and writing
@@ -214,7 +228,7 @@ open_active_file (void)
       lag_head = 0;
       lag_queue = xmalloc (n_lag * sizeof *lag_queue);
       for (i = 0; i < n_lag; i++)
-        lag_queue[i] = xmalloc (dict_get_case_size (temp_dict));
+        case_nullify (&lag_queue[i]);
     }
 
   /* Close any unclosed DO IF or LOOP constructs. */
@@ -229,7 +243,7 @@ static int
 write_case (struct write_case_data *wc_data)
 {
   /* Execute permanent transformations.  */
-  if (!execute_transformations (wc_data->trns_case, t_trns, f_trns, temp_trns,
+  if (!execute_transformations (&wc_data->trns_case, t_trns, f_trns, temp_trns,
                                 wc_data->cases_written + 1))
     goto done;
 
@@ -241,27 +255,28 @@ write_case (struct write_case_data *wc_data)
 
   /* Write case to LAG queue. */
   if (n_lag)
-    lag_case (wc_data->trns_case);
+    lag_case (&wc_data->trns_case);
 
   /* Write case to replacement active file. */
   if (vfm_sink->class->write != NULL) 
     {
       if (compaction_necessary) 
         {
-          compact_case (wc_data->sink_case, wc_data->trns_case);
-          vfm_sink->class->write (vfm_sink, wc_data->sink_case);
+          dict_compact_case (temp_dict, &wc_data->sink_case,
+                             &wc_data->trns_case);
+          vfm_sink->class->write (vfm_sink, &wc_data->sink_case);
         }
       else
-        vfm_sink->class->write (vfm_sink, wc_data->trns_case);
+        vfm_sink->class->write (vfm_sink, &wc_data->trns_case);
     }
   
   /* Execute temporary transformations. */
-  if (!execute_transformations (wc_data->trns_case, t_trns, temp_trns, n_trns,
+  if (!execute_transformations (&wc_data->trns_case, t_trns, temp_trns, n_trns,
                                 wc_data->cases_written))
     goto done;
   
   /* FILTER, PROCESS IF, post-TEMPORARY N OF CASES. */
-  if (filter_case (wc_data->trns_case, wc_data->cases_written)
+  if (filter_case (&wc_data->trns_case, wc_data->cases_written)
       || (dict_get_case_limit (temp_dict)
           && wc_data->cases_analyzed >= dict_get_case_limit (temp_dict)))
     goto done;
@@ -269,10 +284,10 @@ write_case (struct write_case_data *wc_data)
 
   /* Pass case to procedure. */
   if (wc_data->proc_func != NULL)
-    wc_data->proc_func (wc_data->trns_case, wc_data->aux);
+    wc_data->proc_func (&wc_data->trns_case, wc_data->aux);
 
  done:
-  clear_case (wc_data->trns_case);
+  clear_case (&wc_data->trns_case);
   return 1;
 }
 
@@ -314,20 +329,20 @@ execute_transformations (struct ccase *c,
    exclude as specified on FILTER or PROCESS IF, otherwise
    zero. */
 static int
-filter_case (const struct ccase *c, int case_num)
+filter_case (const struct ccase *c, int case_idx)
 {
   /* FILTER. */
   struct variable *filter_var = dict_get_filter (default_dict);
   if (filter_var != NULL) 
     {
-      double f = c->data[filter_var->fv].f;
+      double f = case_num (c, filter_var->fv);
       if (f == 0.0 || f == SYSMIS || is_num_user_missing (f, filter_var))
         return 1;
     }
 
   /* PROCESS IF. */
   if (process_if_expr != NULL
-      && expr_evaluate (process_if_expr, c, case_num, NULL) != 1.0)
+      && expr_evaluate_num (process_if_expr, c, case_idx) != 1.0)
     return 1;
 
   return 0;
@@ -339,43 +354,12 @@ lag_case (const struct ccase *c)
 {
   if (lag_count < n_lag)
     lag_count++;
-  memcpy (lag_queue[lag_head], c, dict_get_case_size (temp_dict));
+  case_destroy (&lag_queue[lag_head]);
+  case_clone (&lag_queue[lag_head], c);
   if (++lag_head >= n_lag)
     lag_head = 0;
 }
 
-/* Copies case SRC to case DEST, compacting it in the process. */
-static void
-compact_case (struct ccase *dest, const struct ccase *src)
-{
-  int i;
-  int nval = 0;
-  size_t var_cnt;
-  
-  assert (compaction_necessary);
-
-  /* Copy all the variables except scratch variables from SRC to
-     DEST. */
-  var_cnt = dict_get_var_cnt (default_dict);
-  for (i = 0; i < var_cnt; i++)
-    {
-      struct variable *v = dict_get_var (default_dict, i);
-      
-      if (dict_class_from_id (v->name) == DC_SCRATCH)
-       continue;
-
-      if (v->type == NUMERIC)
-       dest->data[nval++] = src->data[v->fv];
-      else
-       {
-         int w = DIV_RND_UP (v->width, sizeof (union value));
-         
-         memcpy (&dest->data[nval], &src->data[v->fv], w * sizeof (union value));
-         nval += w;
-       }
-    }
-}
-
 /* Clears the variables in C that need to be cleared between
    processing cases.  */
 static void
@@ -389,10 +373,10 @@ clear_case (struct ccase *c)
       struct variable *v = dict_get_var (default_dict, i);
       if (v->init && v->reinit) 
         {
-          if (v->type == NUMERIC) 
-            c->data[v->fv].f = SYSMIS;
+          if (v->type == NUMERIC)
+            case_data_rw (c, v->fv)->f = SYSMIS;
           else
-            memset (c->data[v->fv].s, ' ', v->width);
+            memset (case_data_rw (c, v->fv)->s, ' ', v->width);
         } 
     }
 }
@@ -407,7 +391,7 @@ close_active_file (void)
       int i;
       
       for (i = 0; i < n_lag; i++)
-       free (lag_queue[i]);
+       case_destroy (&lag_queue[i]);
       free (lag_queue);
       n_lag = 0;
     }
@@ -425,22 +409,12 @@ close_active_file (void)
     dict_compact_values (default_dict);
     
   /* Free data source. */
-  if (vfm_source != NULL) 
-    {
-      if (vfm_source->class->destroy != NULL)
-        vfm_source->class->destroy (vfm_source);
-      free (vfm_source);
-    }
+  free_case_source (vfm_source);
+  vfm_source = NULL;
 
   /* Old data sink becomes new data source. */
   if (vfm_sink->class->make_source != NULL)
     vfm_source = vfm_sink->class->make_source (vfm_sink);
-  else 
-    {
-      if (vfm_sink->class->destroy != NULL)
-        vfm_sink->class->destroy (vfm_sink);
-      vfm_source = NULL; 
-    }
   free_case_sink (vfm_sink);
   vfm_sink = NULL;
 
@@ -461,21 +435,9 @@ close_active_file (void)
 /* Information about storage sink or source. */
 struct storage_stream_info 
   {
-    size_t case_cnt;            /* Number of cases. */
-    size_t case_size;           /* Number of bytes in case. */
-    enum { DISK, MEMORY } mode; /* Where is data stored? */
-
-    /* Disk storage.  */
-    FILE *file;                 /* Data file. */
-
-    /* Memory storage. */
-    int max_cases;              /* Maximum cases before switching to disk. */
-    struct case_list *head;     /* First case in list. */
-    struct case_list *tail;     /* Last case in list. */
+    struct casefile *casefile;  /* Storage. */
   };
 
-static void open_storage_file (struct storage_stream_info *info);
-
 /* Initializes a storage sink. */
 static void
 storage_sink_open (struct case_sink *sink)
@@ -483,92 +445,18 @@ storage_sink_open (struct case_sink *sink)
   struct storage_stream_info *info;
 
   sink->aux = info = xmalloc (sizeof *info);
-  info->case_cnt = 0;
-  info->case_size = sink->value_cnt * sizeof (union value);
-  info->file = NULL;
-  info->max_cases = 0;
-  info->head = info->tail = NULL;
-  if (workspace_overflow) 
-    {
-      info->mode = DISK;
-      open_storage_file (info);
-    }
-  else 
-    {
-      info->mode = MEMORY; 
-      info->max_cases = (get_max_workspace()
-                         / (sizeof (struct case_list) + info->case_size));
-    }
-}
-
-/* Creates a new temporary file and puts it into INFO. */
-static void
-open_storage_file (struct storage_stream_info *info) 
-{
-  info->file = tmpfile ();
-  if (info->file == NULL)
-    {
-      msg (ME, _("An error occurred creating a temporary "
-                 "file for use as the active file: %s."),
-           strerror (errno));
-      err_failure ();
-    }
-}
-
-/* Writes the VALUE_CNT values in VALUES to FILE. */
-static void
-write_storage_file (FILE *file, const union value *values, size_t value_cnt) 
-{
-  if (fwrite (values, sizeof *values * value_cnt, 1, file) != 1)
-    {
-      msg (ME, _("An error occurred writing to a "
-                "temporary file used as the active file: %s."),
-          strerror (errno));
-      err_failure ();
-    }
-}
-
-/* If INFO represents records in memory, moves them to disk.
-   Each comprises VALUE_CNT `union value's. */
-static void
-storage_to_disk (struct storage_stream_info *info, size_t value_cnt) 
-{
-  struct case_list *cur, *next;
-
-  if (info->mode == MEMORY) 
-    {
-      info->mode = DISK;
-      open_storage_file (info);
-      for (cur = info->head; cur; cur = next)
-        {
-          next = cur->next;
-          write_storage_file (info->file, cur->c.data, value_cnt);
-          free (cur);
-        }
-      info->head = info->tail = NULL; 
-    }
+  info->casefile = casefile_create (sink->value_cnt);
 }
 
 /* Destroys storage stream represented by INFO. */
 static void
 destroy_storage_stream_info (struct storage_stream_info *info) 
 {
-  if (info->mode == DISK
+  if (info != NULL
     {
-      if (info->file != NULL)
-        fclose (info->file); 
+      casefile_destroy (info->casefile);
+      free (info); 
     }
-  else 
-    {
-      struct case_list *cur, *next;
-  
-      for (cur = info->head; cur; cur = next)
-        {
-          next = cur->next;
-          free (cur);
-        }
-    }
-  free (info); 
 }
 
 /* Writes case C to the storage sink SINK. */
@@ -577,39 +465,7 @@ storage_sink_write (struct case_sink *sink, const struct ccase *c)
 {
   struct storage_stream_info *info = sink->aux;
 
-  info->case_cnt++;
-  if (info->mode == MEMORY) 
-    {
-      struct case_list *new_case;
-
-      /* Copy case. */
-      new_case = xmalloc (sizeof (struct case_list)
-                          + ((sink->value_cnt - 1) * sizeof (union value)));
-      memcpy (&new_case->c, c, sizeof (union value) * sink->value_cnt);
-
-      /* Append case to linked list. */
-      new_case->next = NULL;
-      if (info->head != NULL)
-        info->tail->next = new_case;
-      else
-        info->head = new_case;
-      info->tail = new_case;
-
-      /* Dump all the cases to disk if we've run out of
-         workspace. */
-      if (info->case_cnt > info->max_cases) 
-        {
-          workspace_overflow = 1;
-          msg (MW, _("Workspace limit of %d KB (%d cases at %d bytes each) "
-                     "overflowed.  Writing active file to disk."),
-               get_max_workspace() / 1024, info->max_cases,
-               sizeof (struct case_list) + info->case_size);
-
-          storage_to_disk (info, sink->value_cnt);
-        }
-    }
-  else 
-    write_storage_file (info->file, c->data, sink->value_cnt);
+  casefile_append (info->casefile, c);
 }
 
 /* Destroys internal data in SINK. */
@@ -619,27 +475,15 @@ storage_sink_destroy (struct case_sink *sink)
   destroy_storage_stream_info (sink->aux);
 }
 
-/* Closes and destroys the sink and returns a storage source to
-   read back the written data. */
+/* Closes the sink and returns a storage source to read back the
+   written data. */
 static struct case_source *
 storage_sink_make_source (struct case_sink *sink) 
 {
-  struct storage_stream_info *info = sink->aux;
-
-  if (info->mode == DISK) 
-    {
-      /* Rewind the file. */
-      assert (info->file != NULL);
-      if (fseek (info->file, 0, SEEK_SET) != 0)
-        {
-          msg (ME, _("An error occurred while attempting to rewind a "
-                     "temporary file used as the active file: %s."),
-               strerror (errno));
-          err_failure ();
-        }
-    }
-
-  return create_case_source (&storage_source_class, sink->dict, info); 
+  struct case_source *source
+    = create_case_source (&storage_source_class, sink->aux);
+  sink->aux = NULL;
+  return source;
 }
 
 /* Storage sink. */
@@ -661,51 +505,30 @@ storage_source_count (const struct case_source *source)
 {
   struct storage_stream_info *info = source->aux;
 
-  return info->case_cnt;
+  return casefile_get_case_cnt (info->casefile);
 }
 
 /* Reads all cases from the storage source and passes them one by one to
    write_case(). */
 static void
 storage_source_read (struct case_source *source,
-                     struct ccase *c,
+                     struct ccase *output_case,
                      write_case_func *write_case, write_case_data wc_data)
 {
   struct storage_stream_info *info = source->aux;
+  struct ccase casefile_case;
+  struct casereader *reader;
 
-  if (info->mode == DISK) 
+  for (reader = casefile_get_reader (info->casefile);
+       casereader_read (reader, &casefile_case);
+       case_destroy (&casefile_case))
     {
-      int i;
-
-      for (i = 0; i < info->case_cnt; i++)
-        {
-          if (!fread (c, info->case_size, 1, info->file))
-            {
-              msg (ME, _("An error occurred while attempting to read from "
-                         "a temporary file created for the active file: %s."),
-                   strerror (errno));
-              err_failure ();
-              break;
-            }
-
-          if (!write_case (wc_data))
-            break;
-        }
-    }
-  else 
-    {
-      while (info->head != NULL) 
-        {
-          struct case_list *iter = info->head;
-          memcpy (c, &iter->c, info->case_size);
-          if (!write_case (wc_data)) 
-            break;
-            
-          info->head = iter->next;
-          free (iter);
-        }
-      info->tail = NULL;
+      case_copy (output_case, 0,
+                 &casefile_case, 0,
+                 casefile_get_value_cnt (info->casefile));
+      write_case (wc_data);
     }
+  casereader_destroy (reader);
 }
 
 /* Destroys the source's internal data. */
@@ -724,44 +547,24 @@ const struct case_source_class storage_source_class =
     storage_source_destroy,
   };
 
-/* Returns nonzero only if SOURCE is stored on disk (instead of
-   in memory). */
-int
-storage_source_on_disk (const struct case_source *source) 
-{
-  struct storage_stream_info *info = source->aux;
-
-  return info->mode == DISK;
-}
-
-/* Returns the list of cases in storage source SOURCE. */
-struct case_list *
-storage_source_get_cases (const struct case_source *source) 
+struct casefile *
+storage_source_get_casefile (struct case_source *source) 
 {
   struct storage_stream_info *info = source->aux;
 
-  assert (info->mode == MEMORY);
-  return info->head;
+  assert (source->class == &storage_source_class);
+  return info->casefile;
 }
 
-/* Sets the list of cases in memory source SOURCE to CASES. */
-void
-storage_source_set_cases (const struct case_source *source,
-                          struct case_list *cases) 
+struct case_source *
+storage_source_create (struct casefile *cf)
 {
-  struct storage_stream_info *info = source->aux;
+  struct storage_stream_info *info;
 
-  assert (info->mode == MEMORY);
-  info->head = cases;
-}
+  info = xmalloc (sizeof *info);
+  info->casefile = cf;
 
-/* If SOURCE has its cases in memory, writes them to disk. */
-void
-storage_source_to_disk (struct case_source *source) 
-{
-  struct storage_stream_info *info = source->aux;
-
-  storage_to_disk (info, source->value_cnt);
+  return create_case_source (&storage_source_class, info);
 }
 \f
 /* Null sink.  Used by a few procedures that keep track of output
@@ -782,16 +585,18 @@ const struct case_sink_class null_sink_class =
 struct ccase *
 lagged_case (int n_before)
 {
+  assert (n_before >= 1 );
   assert (n_before <= n_lag);
-  if (n_before > lag_count)
+
+  if (n_before <= lag_count)
+    {
+      int index = lag_head - n_before;
+      if (index < 0)
+        index += n_lag;
+      return &lag_queue[index];
+    }
+  else
     return NULL;
-  
-  {
-    int index = lag_head - n_before;
-    if (index < 0)
-      index += n_lag;
-    return lag_queue[index];
-  }
 }
    
 /* Appends TRNS to t_trns[], the list of all transformations to be
@@ -821,27 +626,36 @@ cancel_transformations (void)
       free (t_trns[i]);
     }
   n_trns = f_trns = 0;
-  if (m_trns > 32)
-    {
-      free (t_trns);
-      m_trns = 0;
-    }
+  free (t_trns);
+  t_trns = NULL;
+  m_trns = 0;
 }
 \f
 /* Creates a case source with class CLASS and auxiliary data AUX
    and based on dictionary DICT. */
 struct case_source *
 create_case_source (const struct case_source_class *class,
-                    const struct dictionary *dict,
                     void *aux) 
 {
   struct case_source *source = xmalloc (sizeof *source);
   source->class = class;
-  source->value_cnt = dict_get_next_value_idx (dict);
   source->aux = aux;
   return source;
 }
 
+/* Destroys case source SOURCE.  It is the caller's responsible to
+   call the source's destroy function, if any. */
+void
+free_case_source (struct case_source *source) 
+{
+  if (source != NULL) 
+    {
+      if (source->class->destroy != NULL)
+        source->class->destroy (source);
+      free (source);
+    }
+}
+
 /* Returns nonzero if a case source is "complex". */
 int
 case_source_is_complex (const struct case_source *source) 
@@ -858,8 +672,8 @@ case_source_is_class (const struct case_source *source,
   return source != NULL && source->class == class;
 }
 
-/* Creates a case sink with class CLASS and auxiliary data
-   AUX. */
+/* Creates a case sink to accept cases from the given DICT with
+   class CLASS and auxiliary data AUX. */
 struct case_sink *
 create_case_sink (const struct case_sink_class *class,
                   const struct dictionary *dict,
@@ -867,27 +681,28 @@ create_case_sink (const struct case_sink_class *class,
 {
   struct case_sink *sink = xmalloc (sizeof *sink);
   sink->class = class;
-  sink->dict = dict;
-  sink->idx_to_fv = dict_get_compacted_idx_to_fv (dict);
   sink->value_cnt = dict_get_compacted_value_cnt (dict);
   sink->aux = aux;
   return sink;
 }
 
-/* Destroys case sink SINK.  It is the caller's responsible to
-   call the sink's destroy function, if any. */
+/* Destroys case sink SINK.  */
 void
 free_case_sink (struct case_sink *sink) 
 {
-  free (sink->idx_to_fv);
-  free (sink);
+  if (sink != NULL) 
+    {
+      if (sink->class->destroy != NULL)
+        sink->class->destroy (sink);
+      free (sink); 
+    }
 }
 \f
 /* Represents auxiliary data for handling SPLIT FILE. */
 struct split_aux_data 
   {
     size_t case_count;          /* Number of cases so far. */
-    struct ccase *prev_case;    /* Data in previous case. */
+    struct ccase prev_case;     /* Data in previous case. */
 
     /* Functions to call... */
     void (*begin_func) (void *);               /* ...before data. */
@@ -922,17 +737,19 @@ procedure_with_splits (void (*begin_func) (void *aux),
   struct split_aux_data split_aux;
 
   split_aux.case_count = 0;
-  split_aux.prev_case = xmalloc (dict_get_case_size (default_dict));
+  case_nullify (&split_aux.prev_case);
   split_aux.begin_func = begin_func;
   split_aux.proc_func = proc_func;
   split_aux.end_func = end_func;
   split_aux.func_aux = func_aux;
 
-  procedure (procedure_with_splits_callback, &split_aux);
-
+  open_active_file ();
+  internal_procedure (procedure_with_splits_callback, &split_aux);
   if (split_aux.case_count > 0 && end_func != NULL)
     end_func (func_aux);
-  free (split_aux.prev_case);
+  close_active_file ();
+
+  case_destroy (&split_aux.prev_case);
 }
 
 /* procedure() callback used by procedure_with_splits(). */
@@ -943,13 +760,14 @@ procedure_with_splits_callback (struct ccase *c, void *split_aux_)
 
   /* Start a new series if needed. */
   if (split_aux->case_count == 0
-      || !equal_splits (c, split_aux->prev_case))
+      || !equal_splits (c, &split_aux->prev_case))
     {
       if (split_aux->case_count > 0 && split_aux->end_func != NULL)
         split_aux->end_func (split_aux->func_aux);
 
       dump_splits (c);
-      memcpy (split_aux->prev_case, c, dict_get_case_size (default_dict));
+      case_destroy (&split_aux->prev_case);
+      case_clone (&split_aux->prev_case, c);
 
       if (split_aux->begin_func != NULL)
        split_aux->begin_func (split_aux->func_aux);
@@ -967,32 +785,9 @@ procedure_with_splits_callback (struct ccase *c, void *split_aux_)
 static int
 equal_splits (const struct ccase *a, const struct ccase *b) 
 {
-  struct variable *const *split;
-  size_t split_cnt;
-  size_t i;
-    
-  split = dict_get_split_vars (default_dict);
-  split_cnt = dict_get_split_cnt (default_dict);
-  for (i = 0; i < split_cnt; i++)
-    {
-      struct variable *v = split[i];
-      
-      switch (v->type)
-       {
-       case NUMERIC:
-         if (a->data[v->fv].f != b->data[v->fv].f)
-            return 0;
-         break;
-       case ALPHA:
-         if (memcmp (a->data[v->fv].s, b->data[v->fv].s, v->width))
-            return 0;
-         break;
-       default:
-         assert (0);
-       }
-    }
-
-  return 1;
+  return case_compare (a, b,
+                       dict_get_split_vars (default_dict),
+                       dict_get_split_cnt (default_dict)) == 0;
 }
 
 /* Dumps out the values of all the split variables for the case C. */
@@ -1025,15 +820,90 @@ dump_splits (struct ccase *c)
       assert (v->type == NUMERIC || v->type == ALPHA);
       tab_text (t, 0, i + 1, TAB_LEFT | TAT_PRINTF, "%s", v->name);
       
-      data_out (temp_buf, &v->print, &c->data[v->fv]);
+      data_out (temp_buf, &v->print, case_data (c, v->fv));
       
       temp_buf[v->print.w] = 0;
       tab_text (t, 1, i + 1, TAT_PRINTF, "%.*s", v->print.w, temp_buf);
 
-      val_lab = val_labs_find (v->val_labs, c->data[v->fv]);
+      val_lab = val_labs_find (v->val_labs, *case_data (c, v->fv));
       if (val_lab)
        tab_text (t, 2, i + 1, TAB_LEFT, val_lab);
     }
   tab_flags (t, SOMF_NO_TITLE);
   tab_submit (t);
 }
+\f
+/* Represents auxiliary data for handling SPLIT FILE in a
+   multipass procedure. */
+struct multipass_split_aux_data 
+  {
+    struct ccase prev_case;     /* Data in previous case. */
+    struct casefile *casefile;  /* Accumulates data for a split. */
+
+    /* Function to call with the accumulated data. */
+    void (*split_func) (const struct casefile *, void *);
+    void *func_aux;                            /* Auxiliary data. */ 
+  };
+
+static int multipass_split_callback (struct ccase *c, void *aux_);
+static void multipass_split_output (struct multipass_split_aux_data *);
+
+void
+multipass_procedure_with_splits (void (*split_func) (const struct casefile *,
+                                                     void *),
+                                 void *func_aux) 
+{
+  struct multipass_split_aux_data aux;
+
+  assert (split_func != NULL);
+
+  open_active_file ();
+
+  case_nullify (&aux.prev_case);
+  aux.casefile = NULL;
+  aux.split_func = split_func;
+  aux.func_aux = func_aux;
+
+  internal_procedure (multipass_split_callback, &aux);
+  if (aux.casefile != NULL)
+    multipass_split_output (&aux);
+  case_destroy (&aux.prev_case);
+
+  close_active_file ();
+}
+
+/* procedure() callback used by multipass_procedure_with_splits(). */
+static int
+multipass_split_callback (struct ccase *c, void *aux_)
+{
+  struct multipass_split_aux_data *aux = aux_;
+
+  /* Start a new series if needed. */
+  if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case))
+    {
+      /* Pass any cases to split_func. */
+      if (aux->casefile != NULL)
+        multipass_split_output (aux);
+
+      /* Start a new casefile. */
+      aux->casefile = casefile_create (dict_get_next_value_idx (default_dict));
+
+      /* Record split values. */
+      dump_splits (c);
+      case_destroy (&aux->prev_case);
+      case_clone (&aux->prev_case, c);
+    }
+
+  casefile_append (aux->casefile, c);
+
+  return 1;
+}
+
+static void
+multipass_split_output (struct multipass_split_aux_data *aux)
+{
+  assert (aux->casefile != NULL);
+  aux->split_func (aux->casefile, aux->func_aux);
+  casefile_destroy (aux->casefile);
+  aux->casefile = NULL;
+}