Change "union value" to dynamically allocate long strings.
[pspp-builds.git] / src / data / procedure.c
index 16dcd745929f9962d489fd58f66bcd4110fcfc13..b762214d10e20a33d6bbdf89ac52253a67589453 100644 (file)
@@ -1,21 +1,18 @@
-/* PSPP - computes sample statistics.
-   Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
-   Written by Ben Pfaff <blp@gnu.org>.
+/* PSPP - a program for statistical analysis.
+   Copyright (C) 1997-9, 2000, 2006, 2007, 2009 Free Software Foundation, Inc.
 
-   This program is free software; you can redistribute it and/or
-   modify it under the terms of the GNU General Public License as
-   published by the Free Software Foundation; either version 2 of the
-   License, or (at your option) any later version.
+   This program is free software: you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation, either version 3 of the License, or
+   (at your option) any later version.
 
-   This program is distributed in the hope that it will be useful, but
-   WITHOUT ANY WARRANTY; without even the implied warranty of
-   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-   General Public License for more details.
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
 
    You should have received a copy of the GNU General Public License
-   along with this program; if not, write to the Free Software
-   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
-   02110-1301, USA. */
+   along with this program.  If not, see <http://www.gnu.org/licenses/>. */
 
 #include <config.h>
 
 #include <stdlib.h>
 #include <unistd.h>
 
-#include <data/case-source.h>
-#include <data/case-sink.h>
 #include <data/case.h>
-#include <data/casefile.h>
+#include <data/case-map.h>
+#include <data/caseinit.h>
+#include <data/casereader.h>
+#include <data/casereader-provider.h>
+#include <data/casewriter.h>
 #include <data/dictionary.h>
 #include <data/file-handle-def.h>
 #include <data/procedure.h>
-#include <data/storage-stream.h>
 #include <data/transformations.h>
 #include <data/variable.h>
-#include <libpspp/alloc.h>
+#include <libpspp/deque.h>
 #include <libpspp/misc.h>
 #include <libpspp/str.h>
+#include <libpspp/taint.h>
+#include <libpspp/i18n.h>
+
+#include "minmax.h"
+#include "xalloc.h"
+
+struct dataset {
+  /* Cases are read from source,
+     their transformation variables are initialized,
+     pass through permanent_trns_chain (which transforms them into
+     the format described by permanent_dict),
+     are written to sink,
+     pass through temporary_trns_chain (which transforms them into
+     the format described by dict),
+     and are finally passed to the procedure. */
+  struct casereader *source;
+  struct caseinit *caseinit;
+  struct trns_chain *permanent_trns_chain;
+  struct dictionary *permanent_dict;
+  struct casewriter *sink;
+  struct trns_chain *temporary_trns_chain;
+  struct dictionary *dict;
+
+  /* Callback which occurs whenever the transformation chain(s) have
+     been modified */
+  transformation_change_callback_func *xform_callback;
+  void *xform_callback_aux;
+
+  /* If true, cases are discarded instead of being written to
+     sink. */
+  bool discard_output;
+
+  /* The transformation chain that the next transformation will be
+     added to. */
+  struct trns_chain *cur_trns_chain;
+
+  /* The case map used to compact a case, if necessary;
+     otherwise a null pointer. */
+  struct case_map *compactor;
+
+  /* Time at which proc was last invoked. */
+  time_t last_proc_invocation;
+
+  /* Cases just before ("lagging") the current one. */
+  int n_lag;                   /* Number of cases to lag. */
+  struct deque lag;             /* Deque of lagged cases. */
+  struct ccase **lag_cases;     /* Lagged cases managed by deque. */
+
+  /* Procedure data. */
+  enum
+    {
+      PROC_COMMITTED,           /* No procedure in progress. */
+      PROC_OPEN,                /* proc_open called, casereader still open. */
+      PROC_CLOSED               /* casereader from proc_open destroyed,
+                                   but proc_commit not yet called. */
+    }
+  proc_state;
+  casenumber cases_written;       /* Cases output so far. */
+  bool ok;                    /* Error status. */
 
-/* Procedure execution data. */
-struct write_case_data
-  {
-    /* Function to call for each case. */
-    bool (*case_func) (const struct ccase *, void *);
-    void *aux;
-
-    struct ccase trns_case;     /* Case used for transformations. */
-    struct ccase sink_case;     /* Case written to sink, if
-                                   compaction is necessary. */
-    size_t cases_written;       /* Cases output so far. */
-  };
-
-/* Cases are read from proc_source,
-   pass through permanent_trns_chain (which transforms them into
-   the format described by permanent_dict),
-   are written to proc_sink,
-   pass through temporary_trns_chain (which transforms them into
-   the format described by default_dict),
-   and are finally passed to the procedure. */
-static struct case_source *proc_source;
-static struct trns_chain *permanent_trns_chain;
-static struct dictionary *permanent_dict;
-static struct case_sink *proc_sink;
-static struct trns_chain *temporary_trns_chain;
-struct dictionary *default_dict;
-
-/* The transformation chain that the next transformation will be
-   added to. */
-static struct trns_chain *cur_trns_chain;
-
-/* The compactor used to compact a case, if necessary;
-   otherwise a null pointer. */
-static struct dict_compactor *compactor;
-
-/* Time at which proc was last invoked. */
-static time_t last_proc_invocation;
-
-/* Lag queue. */
-int n_lag;                     /* Number of cases to lag. */
-static int lag_count;          /* Number of cases in lag_queue so far. */
-static int lag_head;           /* Index where next case will be added. */
-static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
-
-static void add_case_limit_trns (void);
-static void add_filter_trns (void);
-
-static bool internal_procedure (bool (*case_func) (const struct ccase *,
-                                                   void *),
-                                bool (*end_func) (void *),
-                                void *aux);
-static void update_last_proc_invocation (void);
-static void create_trns_case (struct ccase *, struct dictionary *);
-static void open_active_file (void);
-static bool write_case (struct write_case_data *wc_data);
-static void lag_case (const struct ccase *c);
-static void clear_case (struct ccase *c);
-static bool close_active_file (void);
-\f
-/* Public functions. */
+  void (*callback) (void *); /* Callback for when the dataset changes */
+  void *cb_data;
 
-/* Returns the last time the data was read. */
-time_t
-time_of_last_procedure (void) 
-{
-  if (last_proc_invocation == 0)
-    update_last_proc_invocation ();
-  return last_proc_invocation;
-}
-\f
-/* Regular procedure. */
+}; /* struct dataset */
 
-/* Reads the data from the input program and writes it to a new
-   active file.  For each case we read from the input program, we
-   do the following:
 
-   1. Execute permanent transformations.  If these drop the case,
-      start the next case from step 1.
+static void add_case_limit_trns (struct dataset *ds);
+static void add_filter_trns (struct dataset *ds);
 
-   2. Write case to replacement active file.
-   
-   3. Execute temporary transformations.  If these drop the case,
-      start the next case from step 1.
-      
-   4. Pass case to PROC_FUNC, passing AUX as auxiliary data.
+static void update_last_proc_invocation (struct dataset *ds);
 
-   Returns true if successful, false if an I/O error occurred. */
-bool
-procedure (bool (*proc_func) (const struct ccase *, void *), void *aux)
+static void
+dataset_set_unsaved (const struct dataset *ds)
 {
-  return internal_procedure (proc_func, NULL, aux);
+  if (ds->callback) ds->callback (ds->cb_data);
 }
-\f
-/* Multipass procedure. */
 
-struct multipass_aux_data 
-  {
-    struct casefile *casefile;
-    
-    bool (*proc_func) (const struct casefile *, void *aux);
-    void *aux;
-  };
+\f
+/* Public functions. */
 
-/* Case processing function for multipass_procedure(). */
-static bool
-multipass_case_func (const struct ccase *c, void *aux_data_) 
+void
+dataset_set_callback (struct dataset *ds, void (*cb) (void *), void *cb_data)
 {
-  struct multipass_aux_data *aux_data = aux_data_;
-  return casefile_append (aux_data->casefile, c);
+  ds->callback = cb;
+  ds->cb_data = cb_data;
 }
 
-/* End-of-file function for multipass_procedure(). */
-static bool
-multipass_end_func (void *aux_data_) 
-{
-  struct multipass_aux_data *aux_data = aux_data_;
-  return (aux_data->proc_func == NULL
-          || aux_data->proc_func (aux_data->casefile, aux_data->aux));
-}
 
-/* Procedure that allows multiple passes over the input data.
-   The entire active file is passed to PROC_FUNC, with the given
-   AUX as auxiliary data, as a unit. */
-bool
-multipass_procedure (bool (*proc_func) (const struct casefile *, void *aux),
-                     void *aux) 
+/* Returns the last time the data was read. */
+time_t
+time_of_last_procedure (struct dataset *ds)
 {
-  struct multipass_aux_data aux_data;
-  bool ok;
-
-  aux_data.casefile = casefile_create (dict_get_next_value_idx (default_dict));
-  aux_data.proc_func = proc_func;
-  aux_data.aux = aux;
-
-  ok = internal_procedure (multipass_case_func, multipass_end_func, &aux_data);
-  ok = !casefile_error (aux_data.casefile) && ok;
-
-  casefile_destroy (aux_data.casefile);
-
-  return ok;
+  if (ds->last_proc_invocation == 0)
+    update_last_proc_invocation (ds);
+  return ds->last_proc_invocation;
 }
 \f
-/* Procedure implementation. */
+/* Regular procedure. */
 
-/* Executes a procedure.
-   Passes each case to CASE_FUNC.
-   Calls END_FUNC after the last case.
-   Returns true if successful, false if an I/O error occurred (or
-   if CASE_FUNC or END_FUNC ever returned false). */
-static bool
-internal_procedure (bool (*case_func) (const struct ccase *, void *),
-                    bool (*end_func) (void *),
-                    void *aux) 
+/* Executes any pending transformations, if necessary.
+   This is not identical to the EXECUTE command in that it won't
+   always read the source data.  This can be important when the
+   source data is given inline within BEGIN DATA...END FILE. */
+bool
+proc_execute (struct dataset *ds)
 {
-  struct write_case_data wc_data;
-  bool ok = true;
-
-  assert (proc_source != NULL);
-
-  update_last_proc_invocation ();
+  bool ok;
 
-  /* Optimize the trivial case where we're not going to do
-     anything with the data, by not reading the data at all. */
-  if (case_func == NULL && end_func == NULL
-      && case_source_is_class (proc_source, &storage_source_class)
-      && proc_sink == NULL
-      && (temporary_trns_chain == NULL
-          || trns_chain_is_empty (temporary_trns_chain))
-      && trns_chain_is_empty (permanent_trns_chain))
+  if ((ds->temporary_trns_chain == NULL
+       || trns_chain_is_empty (ds->temporary_trns_chain))
+      && trns_chain_is_empty (ds->permanent_trns_chain))
     {
-      n_lag = 0;
-      dict_set_case_limit (default_dict, 0);
-      dict_clear_vectors (default_dict);
+      ds->n_lag = 0;
+      ds->discard_output = false;
+      dict_set_case_limit (ds->dict, 0);
+      dict_clear_vectors (ds->dict);
       return true;
     }
-  
-  open_active_file ();
-  
-  wc_data.case_func = case_func;
-  wc_data.aux = aux;
-  create_trns_case (&wc_data.trns_case, default_dict);
-  case_create (&wc_data.sink_case, dict_get_next_value_idx (default_dict));
-  wc_data.cases_written = 0;
-
-  ok = proc_source->class->read (proc_source,
-                                 &wc_data.trns_case,
-                                 write_case, &wc_data) && ok;
-  if (end_func != NULL)
-    ok = end_func (aux) && ok;
-
-  case_destroy (&wc_data.sink_case);
-  case_destroy (&wc_data.trns_case);
-
-  ok = close_active_file () && ok;
 
-  return ok;
+  ok = casereader_destroy (proc_open (ds));
+  return proc_commit (ds) && ok;
 }
 
-/* Updates last_proc_invocation. */
-static void
-update_last_proc_invocation (void) 
-{
-  last_proc_invocation = time (NULL);
-}
+static const struct casereader_class proc_casereader_class;
 
-/* Creates and returns a case, initializing it from the vectors
-   that say which `value's need to be initialized just once, and
-   which ones need to be re-initialized before every case. */
-static void
-create_trns_case (struct ccase *trns_case, struct dictionary *dict)
+/* Opens dataset DS for reading cases with proc_read.
+   proc_commit must be called when done. */
+struct casereader *
+proc_open (struct dataset *ds)
 {
-  size_t var_cnt = dict_get_var_cnt (dict);
-  size_t i;
+  assert (ds->source != NULL);
+  assert (ds->proc_state == PROC_COMMITTED);
 
-  case_create (trns_case, dict_get_next_value_idx (dict));
-  for (i = 0; i < var_cnt; i++) 
-    {
-      struct variable *v = dict_get_var (dict, i);
-      union value *value = case_data_rw (trns_case, v->fv);
+  update_last_proc_invocation (ds);
 
-      if (v->type == NUMERIC)
-        value->f = v->leave ? 0.0 : SYSMIS;
-      else
-        memset (value->s, ' ', v->width);
-    }
-}
-
-/* Makes all preparations for reading from the data source and writing
-   to the data sink. */
-static void
-open_active_file (void)
-{
-  add_case_limit_trns ();
-  add_filter_trns ();
+  caseinit_mark_for_init (ds->caseinit, ds->dict);
 
-  /* Finalize transformations. */
-  trns_chain_finalize (cur_trns_chain);
+  /* Finish up the collection of transformations. */
+  add_case_limit_trns (ds);
+  add_filter_trns (ds);
+  trns_chain_finalize (ds->cur_trns_chain);
 
   /* Make permanent_dict refer to the dictionary right before
      data reaches the sink. */
-  if (permanent_dict == NULL)
-    permanent_dict = default_dict;
-
-  /* Figure out compaction. */
-  compactor = (dict_needs_compaction (permanent_dict)
-               ? dict_make_compactor (permanent_dict)
-               : NULL);
+  if (ds->permanent_dict == NULL)
+    ds->permanent_dict = ds->dict;
 
   /* Prepare sink. */
-  if (proc_sink == NULL)
-    proc_sink = create_case_sink (&storage_sink_class, permanent_dict, NULL);
-  if (proc_sink->class->open != NULL)
-    proc_sink->class->open (proc_sink);
-
-  /* Allocate memory for lag queue. */
-  if (n_lag > 0)
+  if (!ds->discard_output)
     {
-      int i;
-  
-      lag_count = 0;
-      lag_head = 0;
-      lag_queue = xnmalloc (n_lag, sizeof *lag_queue);
-      for (i = 0; i < n_lag; i++)
-        case_nullify (&lag_queue[i]);
-    }
-}
-
-/* Transforms trns_case and writes it to the replacement active
-   file if advisable.  Returns true if more cases can be
-   accepted, false otherwise.  Do not call this function again
-   after it has returned false once.  */
-static bool
-write_case (struct write_case_data *wc_data)
-{
-  enum trns_result retval;
-  size_t case_nr;
-  
-  /* Execute permanent transformations.  */
-  case_nr = wc_data->cases_written + 1;
-  retval = trns_chain_execute (permanent_trns_chain,
-                               &wc_data->trns_case, &case_nr);
-  if (retval != TRNS_CONTINUE)
-    goto done;
-
-  /* Write case to LAG queue. */
-  if (n_lag)
-    lag_case (&wc_data->trns_case);
-
-  /* Write case to replacement active file. */
-  wc_data->cases_written++;
-  if (proc_sink->class->write != NULL) 
-    {
-      if (compactor != NULL) 
+      struct dictionary *pd = ds->permanent_dict;
+      size_t compacted_value_cnt = dict_count_values (pd, 1u << DC_SCRATCH);
+      if (compacted_value_cnt < dict_get_next_value_idx (pd))
         {
-          dict_compactor_compact (compactor, &wc_data->sink_case,
-                                  &wc_data->trns_case);
-          proc_sink->class->write (proc_sink, &wc_data->sink_case);
+          struct caseproto *compacted_proto;
+          compacted_proto = dict_get_compacted_proto (pd, 1u << DC_SCRATCH);
+          ds->compactor = case_map_to_compact_dict (pd, 1u << DC_SCRATCH);
+          ds->sink = autopaging_writer_create (compacted_proto);
+          caseproto_unref (compacted_proto);
         }
       else
-        proc_sink->class->write (proc_sink, &wc_data->trns_case);
-    }
-  
-  /* Execute temporary transformations. */
-  if (temporary_trns_chain != NULL) 
-    {
-      retval = trns_chain_execute (temporary_trns_chain,
-                                   &wc_data->trns_case,
-                                   &wc_data->cases_written);
-      if (retval != TRNS_CONTINUE)
-        goto done;
-    }
-
-  /* Pass case to procedure. */
-  if (wc_data->case_func != NULL)
-    if (!wc_data->case_func (&wc_data->trns_case, wc_data->aux))
-      retval = TRNS_ERROR;
-
- done:
-  clear_case (&wc_data->trns_case);
-  return retval != TRNS_ERROR;
-}
-
-/* Add C to the lag queue. */
-static void
-lag_case (const struct ccase *c)
-{
-  if (lag_count < n_lag)
-    lag_count++;
-  case_destroy (&lag_queue[lag_head]);
-  case_clone (&lag_queue[lag_head], c);
-  if (++lag_head >= n_lag)
-    lag_head = 0;
-}
-
-/* Clears the variables in C that need to be cleared between
-   processing cases.  */
-static void
-clear_case (struct ccase *c)
-{
-  size_t var_cnt = dict_get_var_cnt (default_dict);
-  size_t i;
-  
-  for (i = 0; i < var_cnt; i++) 
-    {
-      struct variable *v = dict_get_var (default_dict, i);
-      if (!v->leave) 
         {
-          if (v->type == NUMERIC)
-            case_data_rw (c, v->fv)->f = SYSMIS;
-          else
-            memset (case_data_rw (c, v->fv)->s, ' ', v->width);
-        } 
-    }
-}
-
-/* Closes the active file. */
-static bool
-close_active_file (void)
-{
-  /* Free memory for lag queue, and turn off lagging. */
-  if (n_lag > 0)
-    {
-      int i;
-      
-      for (i = 0; i < n_lag; i++)
-       case_destroy (&lag_queue[i]);
-      free (lag_queue);
-      n_lag = 0;
+          ds->compactor = NULL;
+          ds->sink = autopaging_writer_create (dict_get_proto (pd));
+        }
     }
-  
-  /* Dictionary from before TEMPORARY becomes permanent. */
-  proc_cancel_temporary_transformations ();
-
-  /* Finish compaction. */
-  if (compactor != NULL) 
+  else
     {
-      dict_compactor_destroy (compactor);
-      dict_compact_values (default_dict);
-      compactor = NULL;
+      ds->compactor = NULL;
+      ds->sink = NULL;
     }
-    
-  /* Free data source. */
-  free_case_source (proc_source);
-  proc_source = NULL;
 
-  /* Old data sink becomes new data source. */
-  if (proc_sink->class->make_source != NULL)
-    proc_source = proc_sink->class->make_source (proc_sink);
-  free_case_sink (proc_sink);
-  proc_sink = NULL;
+  /* Allocate memory for lagged cases. */
+  ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
 
-  dict_clear_vectors (default_dict);
-  permanent_dict = NULL;
-  return proc_cancel_all_transformations ();
-}
-\f
-/* Returns a pointer to the lagged case from N_BEFORE cases before the
-   current one, or NULL if there haven't been that many cases yet. */
-struct ccase *
-lagged_case (int n_before)
-{
-  assert (n_before >= 1 );
-  assert (n_before <= n_lag);
+  ds->proc_state = PROC_OPEN;
+  ds->cases_written = 0;
+  ds->ok = true;
 
-  if (n_before <= lag_count)
-    {
-      int index = lag_head - n_before;
-      if (index < 0)
-        index += n_lag;
-      return &lag_queue[index];
-    }
-  else
-    return NULL;
+  /* FIXME: use taint in dataset in place of `ok'? */
+  /* FIXME: for trivial cases we can just return a clone of
+     ds->source? */
+  return casereader_create_sequential (NULL, dict_get_proto (ds->dict),
+                                       CASENUMBER_MAX,
+                                       &proc_casereader_class, ds);
 }
-\f
-/* Procedure that separates the data into SPLIT FILE groups. */
 
-/* Represents auxiliary data for handling SPLIT FILE. */
-struct split_aux_data 
-  {
-    size_t case_count;          /* Number of cases so far. */
-    struct ccase prev_case;     /* Data in previous case. */
-
-    /* Callback functions. */
-    void (*begin_func) (const struct ccase *, void *);
-    bool (*proc_func) (const struct ccase *, void *);
-    void (*end_func) (void *);
-    void *func_aux;
-  };
-
-static int equal_splits (const struct ccase *, const struct ccase *);
-static bool split_procedure_case_func (const struct ccase *c, void *);
-static bool split_procedure_end_func (void *);
-
-/* Like procedure(), but it automatically breaks the case stream
-   into SPLIT FILE break groups.  Before each group of cases with
-   identical SPLIT FILE variable values, BEGIN_FUNC is called
-   with the first case in the group.
-   Then PROC_FUNC is called for each case in the group (including
-   the first).
-   END_FUNC is called when the group is finished.  FUNC_AUX is
-   passed to each of the functions as auxiliary data.
-
-   If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
-   and END_FUNC will be called at all. 
-
-   If SPLIT FILE is not in effect, then there is one break group
-   (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
-   will be called once.
-   
-   Returns true if successful, false if an I/O error occurred. */
+/* Returns true if a procedure is in progress, that is, if
+   proc_open has been called but proc_commit has not. */
 bool
-procedure_with_splits (void (*begin_func) (const struct ccase *, void *aux),
-                       bool (*proc_func) (const struct ccase *, void *aux),
-                       void (*end_func) (void *aux),
-                       void *func_aux) 
+proc_is_open (const struct dataset *ds)
 {
-  struct split_aux_data split_aux;
-  bool ok;
-
-  split_aux.case_count = 0;
-  case_nullify (&split_aux.prev_case);
-  split_aux.begin_func = begin_func;
-  split_aux.proc_func = proc_func;
-  split_aux.end_func = end_func;
-  split_aux.func_aux = func_aux;
-
-  ok = internal_procedure (split_procedure_case_func,
-                           split_procedure_end_func, &split_aux);
-
-  case_destroy (&split_aux.prev_case);
-
-  return ok;
+  return ds->proc_state != PROC_COMMITTED;
 }
 
-/* Case callback used by procedure_with_splits(). */
-static bool
-split_procedure_case_func (const struct ccase *c, void *split_aux_) 
+/* "read" function for procedure casereader. */
+static struct ccase *
+proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
 {
-  struct split_aux_data *split_aux = split_aux_;
+  struct dataset *ds = ds_;
+  enum trns_result retval = TRNS_DROP_CASE;
+  struct ccase *c;
 
-  /* Start a new series if needed. */
-  if (split_aux->case_count == 0
-      || !equal_splits (c, &split_aux->prev_case))
+  assert (ds->proc_state == PROC_OPEN);
+  for (; ; case_unref (c))
     {
-      if (split_aux->case_count > 0 && split_aux->end_func != NULL)
-        split_aux->end_func (split_aux->func_aux);
-
-      case_destroy (&split_aux->prev_case);
-      case_clone (&split_aux->prev_case, c);
+      casenumber case_nr;
+
+      assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
+      if (retval == TRNS_ERROR)
+        ds->ok = false;
+      if (!ds->ok)
+        return NULL;
+
+      /* Read a case from source. */
+      c = casereader_read (ds->source);
+      if (c == NULL)
+        return NULL;
+      c = case_unshare_and_resize (c, dict_get_proto (ds->dict));
+      caseinit_init_vars (ds->caseinit, c);
+
+      /* Execute permanent transformations.  */
+      case_nr = ds->cases_written + 1;
+      retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
+                                   &c, case_nr);
+      caseinit_update_left_vars (ds->caseinit, c);
+      if (retval != TRNS_CONTINUE)
+        continue;
 
-      if (split_aux->begin_func != NULL)
-       split_aux->begin_func (&split_aux->prev_case, split_aux->func_aux);
-    }
+      /* Write case to collection of lagged cases. */
+      if (ds->n_lag > 0)
+        {
+          while (deque_count (&ds->lag) >= ds->n_lag)
+            case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
+          ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c);
+        }
 
-  split_aux->case_count++;
-  return (split_aux->proc_func == NULL
-          || split_aux->proc_func (c, split_aux->func_aux));
-}
+      /* Write case to replacement active file. */
+      ds->cases_written++;
+      if (ds->sink != NULL)
+        casewriter_write (ds->sink,
+                          case_map_execute (ds->compactor, case_ref (c)));
 
-/* End-of-file callback used by procedure_with_splits(). */
-static bool
-split_procedure_end_func (void *split_aux_) 
-{
-  struct split_aux_data *split_aux = split_aux_;
+      /* Execute temporary transformations. */
+      if (ds->temporary_trns_chain != NULL)
+        {
+          retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
+                                       &c, ds->cases_written);
+          if (retval != TRNS_CONTINUE)
+            continue;
+        }
 
-  if (split_aux->case_count > 0 && split_aux->end_func != NULL)
-    split_aux->end_func (split_aux->func_aux);
-  return true;
+      return c;
+    }
 }
 
-/* Compares the SPLIT FILE variables in cases A and B and returns
-   nonzero only if they differ. */
-static int
-equal_splits (const struct ccase *a, const struct ccase *b) 
+/* "destroy" function for procedure casereader. */
+static void
+proc_casereader_destroy (struct casereader *reader, void *ds_)
 {
-  return case_compare (a, b,
-                       dict_get_split_vars (default_dict),
-                       dict_get_split_cnt (default_dict)) == 0;
-}
-\f
-/* Multipass procedure that separates the data into SPLIT FILE
-   groups. */
+  struct dataset *ds = ds_;
+  struct ccase *c;
 
-/* Represents auxiliary data for handling SPLIT FILE in a
-   multipass procedure. */
-struct multipass_split_aux_data 
-  {
-    struct ccase prev_case;     /* Data in previous case. */
-    struct casefile *casefile;  /* Accumulates data for a split. */
+  /* Make sure transformations happen for every input case, in
+     case they have side effects, and ensure that the replacement
+     active file gets all the cases it should. */
+  while ((c = casereader_read (reader)) != NULL)
+    case_unref (c);
 
-    /* Function to call with the accumulated data. */
-    bool (*split_func) (const struct ccase *first, const struct casefile *,
-                        void *);
-    void *func_aux;                            /* Auxiliary data. */ 
-  };
-
-static bool multipass_split_case_func (const struct ccase *c, void *aux_);
-static bool multipass_split_end_func (void *aux_);
-static bool multipass_split_output (struct multipass_split_aux_data *);
+  ds->proc_state = PROC_CLOSED;
+  ds->ok = casereader_destroy (ds->source) && ds->ok;
+  ds->source = NULL;
+  proc_set_active_file_data (ds, NULL);
+}
 
-/* Returns true if successful, false if an I/O error occurred. */
+/* Must return false if the source casereader, a transformation,
+   or the sink casewriter signaled an error.  (If a temporary
+   transformation signals an error, then the return value is
+   false, but the replacement active file may still be
+   untainted.) */
 bool
-multipass_procedure_with_splits (bool (*split_func) (const struct ccase *first,
-                                                     const struct casefile *,
-                                                     void *aux),
-                                 void *func_aux)
+proc_commit (struct dataset *ds)
 {
-  struct multipass_split_aux_data aux;
-  bool ok;
+  assert (ds->proc_state == PROC_CLOSED);
+  ds->proc_state = PROC_COMMITTED;
 
-  case_nullify (&aux.prev_case);
-  aux.casefile = NULL;
-  aux.split_func = split_func;
-  aux.func_aux = func_aux;
+  dataset_set_unsaved (ds);
 
-  ok = internal_procedure (multipass_split_case_func,
-                           multipass_split_end_func, &aux);
-  case_destroy (&aux.prev_case);
+  /* Free memory for lagged cases. */
+  while (!deque_is_empty (&ds->lag))
+    case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
+  free (ds->lag_cases);
 
-  return ok;
-}
-
-/* Case callback used by multipass_procedure_with_splits(). */
-static bool
-multipass_split_case_func (const struct ccase *c, void *aux_)
-{
-  struct multipass_split_aux_data *aux = aux_;
-  bool ok = true;
+  /* Dictionary from before TEMPORARY becomes permanent. */
+  proc_cancel_temporary_transformations (ds);
 
-  /* Start a new series if needed. */
-  if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case))
+  if (!ds->discard_output)
     {
-      /* Record split values. */
-      case_destroy (&aux->prev_case);
-      case_clone (&aux->prev_case, c);
+      /* Finish compacting. */
+      if (ds->compactor != NULL)
+        {
+          case_map_destroy (ds->compactor);
+          ds->compactor = NULL;
 
-      /* Pass any cases to split_func. */
-      if (aux->casefile != NULL)
-        ok = multipass_split_output (aux);
+          dict_delete_scratch_vars (ds->dict);
+          dict_compact_values (ds->dict);
+        }
 
-      /* Start a new casefile. */
-      aux->casefile = casefile_create (dict_get_next_value_idx (default_dict));
+      /* Old data sink becomes new data source. */
+      if (ds->sink != NULL)
+        ds->source = casewriter_make_reader (ds->sink);
+    }
+  else
+    {
+      ds->source = NULL;
+      ds->discard_output = false;
     }
+  ds->sink = NULL;
 
-  return casefile_append (aux->casefile, c) && ok;
-}
+  caseinit_clear (ds->caseinit);
+  caseinit_mark_as_preinited (ds->caseinit, ds->dict);
 
-/* End-of-file callback used by multipass_procedure_with_splits(). */
-static bool
-multipass_split_end_func (void *aux_)
-{
-  struct multipass_split_aux_data *aux = aux_;
-  return (aux->casefile == NULL || multipass_split_output (aux));
+  dict_clear_vectors (ds->dict);
+  ds->permanent_dict = NULL;
+  return proc_cancel_all_transformations (ds) && ds->ok;
 }
 
-static bool
-multipass_split_output (struct multipass_split_aux_data *aux)
-{
-  bool ok;
-  
-  assert (aux->casefile != NULL);
-  ok = aux->split_func (&aux->prev_case, aux->casefile, aux->func_aux);
-  casefile_destroy (aux->casefile);
-  aux->casefile = NULL;
+/* Casereader class for procedure execution. */
+static const struct casereader_class proc_casereader_class =
+  {
+    proc_casereader_read,
+    proc_casereader_destroy,
+    NULL,
+    NULL,
+  };
 
-  return ok;
+/* Updates last_proc_invocation. */
+static void
+update_last_proc_invocation (struct dataset *ds)
+{
+  ds->last_proc_invocation = time (NULL);
 }
 \f
-/* Discards all the current state in preparation for a data-input
-   command like DATA LIST or GET. */
-void
-discard_variables (void)
+/* Returns a pointer to the lagged case from N_BEFORE cases before the
+   current one, or NULL if there haven't been that many cases yet. */
+const struct ccase *
+lagged_case (const struct dataset *ds, int n_before)
 {
-  dict_clear (default_dict);
-  fh_set_default_handle (NULL);
-
-  n_lag = 0;
-  
-  free_case_source (proc_source);
-  proc_source = NULL;
+  assert (n_before >= 1);
+  assert (n_before <= ds->n_lag);
 
-  proc_cancel_all_transformations ();
+  if (n_before <= deque_count (&ds->lag))
+    return ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
+  else
+    return NULL;
 }
 \f
 /* Returns the current set of permanent transformations,
    and clears the permanent transformations.
    For use by INPUT PROGRAM. */
 struct trns_chain *
-proc_capture_transformations (void) 
+proc_capture_transformations (struct dataset *ds)
 {
   struct trns_chain *chain;
-  
-  assert (temporary_trns_chain == NULL);
-  chain = permanent_trns_chain;
-  cur_trns_chain = permanent_trns_chain = trns_chain_create ();
+
+  assert (ds->temporary_trns_chain == NULL);
+  chain = ds->permanent_trns_chain;
+  ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
+
+  if ( ds->xform_callback)
+    ds->xform_callback (false, ds->xform_callback_aux);
+
   return chain;
 }
 
@@ -681,9 +414,11 @@ proc_capture_transformations (void)
    frees itself with FREE to the current set of transformations.
    The functions are passed AUX as auxiliary data. */
 void
-add_transformation (trns_proc_func *proc, trns_free_func *free, void *aux)
+add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
 {
-  trns_chain_append (cur_trns_chain, NULL, proc, free, aux);
+  trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
+  if ( ds->xform_callback)
+    ds->xform_callback (true, ds->xform_callback_aux);
 }
 
 /* Adds a transformation that processes a case with PROC and
@@ -692,44 +427,52 @@ add_transformation (trns_proc_func *proc, trns_free_func *free, void *aux)
    FINALIZE will be called.
    The functions are passed AUX as auxiliary data. */
 void
-add_transformation_with_finalizer (trns_finalize_func *finalize,
+add_transformation_with_finalizer (struct dataset *ds,
+                                  trns_finalize_func *finalize,
                                    trns_proc_func *proc,
                                    trns_free_func *free, void *aux)
 {
-  trns_chain_append (cur_trns_chain, finalize, proc, free, aux);
+  trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
+
+  if ( ds->xform_callback)
+    ds->xform_callback (true, ds->xform_callback_aux);
 }
 
 /* Returns the index of the next transformation.
    This value can be returned by a transformation procedure
    function to indicate a "jump" to that transformation. */
 size_t
-next_transformation (void) 
+next_transformation (const struct dataset *ds)
 {
-  return trns_chain_next (cur_trns_chain);
+  return trns_chain_next (ds->cur_trns_chain);
 }
 
 /* Returns true if the next call to add_transformation() will add
    a temporary transformation, false if it will add a permanent
    transformation. */
 bool
-proc_in_temporary_transformations (void) 
+proc_in_temporary_transformations (const struct dataset *ds)
 {
-  return temporary_trns_chain != NULL;
+  return ds->temporary_trns_chain != NULL;
 }
 
 /* Marks the start of temporary transformations.
    Further calls to add_transformation() will add temporary
    transformations. */
 void
-proc_start_temporary_transformations (void) 
+proc_start_temporary_transformations (struct dataset *ds)
 {
-  if (!proc_in_temporary_transformations ())
+  if (!proc_in_temporary_transformations (ds))
     {
-      add_case_limit_trns ();
+      add_case_limit_trns (ds);
 
-      permanent_dict = dict_clone (default_dict);
-      trns_chain_finalize (permanent_trns_chain);
-      temporary_trns_chain = cur_trns_chain = trns_chain_create ();
+      ds->permanent_dict = dict_clone (ds->dict);
+
+      trns_chain_finalize (ds->permanent_trns_chain);
+      ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
+
+      if ( ds->xform_callback)
+       ds->xform_callback (true, ds->xform_callback_aux);
     }
 }
 
@@ -738,16 +481,16 @@ proc_start_temporary_transformations (void)
    permanent.
    Returns true if anything changed, false otherwise. */
 bool
-proc_make_temporary_transformations_permanent (void) 
+proc_make_temporary_transformations_permanent (struct dataset *ds)
 {
-  if (proc_in_temporary_transformations ()) 
+  if (proc_in_temporary_transformations (ds))
     {
-      trns_chain_finalize (temporary_trns_chain);
-      trns_chain_splice (permanent_trns_chain, temporary_trns_chain);
-      temporary_trns_chain = NULL;
+      trns_chain_finalize (ds->temporary_trns_chain);
+      trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
+      ds->temporary_trns_chain = NULL;
 
-      dict_destroy (permanent_dict);
-      permanent_dict = NULL;
+      dict_destroy (ds->permanent_dict);
+      ds->permanent_dict = NULL;
 
       return true;
     }
@@ -759,16 +502,20 @@ proc_make_temporary_transformations_permanent (void)
    transformations will be permanent.
    Returns true if anything changed, false otherwise. */
 bool
-proc_cancel_temporary_transformations (void) 
+proc_cancel_temporary_transformations (struct dataset *ds)
 {
-  if (proc_in_temporary_transformations ()) 
+  if (proc_in_temporary_transformations (ds))
     {
-      dict_destroy (default_dict);
-      default_dict = permanent_dict;
-      permanent_dict = NULL;
+      dict_destroy (ds->dict);
+      ds->dict = ds->permanent_dict;
+      ds->permanent_dict = NULL;
+
+      trns_chain_destroy (ds->temporary_trns_chain);
+      ds->temporary_trns_chain = NULL;
 
-      trns_chain_destroy (temporary_trns_chain);
-      temporary_trns_chain = NULL;
+      if ( ds->xform_callback)
+       ds->xform_callback (!trns_chain_is_empty (ds->permanent_trns_chain),
+                           ds->xform_callback_aux);
 
       return true;
     }
@@ -779,95 +526,185 @@ proc_cancel_temporary_transformations (void)
 /* Cancels all transformations, if any.
    Returns true if successful, false on I/O error. */
 bool
-proc_cancel_all_transformations (void)
+proc_cancel_all_transformations (struct dataset *ds)
 {
   bool ok;
-  ok = trns_chain_destroy (permanent_trns_chain);
-  ok = trns_chain_destroy (temporary_trns_chain) && ok;
-  permanent_trns_chain = cur_trns_chain = trns_chain_create ();
-  temporary_trns_chain = NULL;
+  assert (ds->proc_state == PROC_COMMITTED);
+  ok = trns_chain_destroy (ds->permanent_trns_chain);
+  ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
+  ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
+  ds->temporary_trns_chain = NULL;
+  if ( ds->xform_callback)
+    ds->xform_callback (false, ds->xform_callback_aux);
+
   return ok;
 }
 \f
+
+static void
+dict_callback (struct dictionary *d UNUSED, void *ds_)
+{
+  struct dataset *ds = ds_;
+  dataset_set_unsaved (ds);
+}
+
 /* Initializes procedure handling. */
+struct dataset *
+create_dataset (void)
+{
+  struct dataset *ds = xzalloc (sizeof(*ds));
+  ds->dict = dict_create ();
+
+  dict_set_change_callback (ds->dict, dict_callback, ds);
+
+  dict_set_encoding (ds->dict, get_default_encoding ());
+
+  ds->caseinit = caseinit_create ();
+  proc_cancel_all_transformations (ds);
+  return ds;
+}
+
+
 void
-proc_init (void) 
+dataset_add_transform_change_callback (struct dataset *ds,
+                                      transformation_change_callback_func *cb,
+                                      void *aux)
 {
-  default_dict = dict_create ();
-  proc_cancel_all_transformations ();
+  ds->xform_callback = cb;
+  ds->xform_callback_aux = aux;
 }
 
 /* Finishes up procedure handling. */
 void
-proc_done (void)
+destroy_dataset (struct dataset *ds)
 {
-  discard_variables ();
+  proc_discard_active_file (ds);
+  dict_destroy (ds->dict);
+  caseinit_destroy (ds->caseinit);
+  trns_chain_destroy (ds->permanent_trns_chain);
+
+  if ( ds->xform_callback)
+    ds->xform_callback (false, ds->xform_callback_aux);
+  free (ds);
 }
 
-/* Sets SINK as the destination for procedure output from the
-   next procedure. */
+/* Causes output from the next procedure to be discarded, instead
+   of being preserved for use as input for the next procedure. */
 void
-proc_set_sink (struct case_sink *sink) 
+proc_discard_output (struct dataset *ds)
 {
-  assert (proc_sink == NULL);
-  proc_sink = sink;
+  ds->discard_output = true;
+}
+
+/* Discards the active file dictionary, data, and
+   transformations. */
+void
+proc_discard_active_file (struct dataset *ds)
+{
+  assert (ds->proc_state == PROC_COMMITTED);
+
+  dict_clear (ds->dict);
+  fh_set_default_handle (NULL);
+
+  ds->n_lag = 0;
+
+  casereader_destroy (ds->source);
+  ds->source = NULL;
+
+  proc_cancel_all_transformations (ds);
 }
 
 /* Sets SOURCE as the source for procedure input for the next
    procedure. */
 void
-proc_set_source (struct case_source *source) 
+proc_set_active_file (struct dataset *ds,
+                      struct casereader *source,
+                      struct dictionary *dict)
 {
-  assert (proc_source == NULL);
-  proc_source = source;
+  assert (ds->proc_state == PROC_COMMITTED);
+  assert (ds->dict != dict);
+
+  proc_discard_active_file (ds);
+
+  dict_destroy (ds->dict);
+  ds->dict = dict;
+  dict_set_change_callback (ds->dict, dict_callback, ds);
+
+  proc_set_active_file_data (ds, source);
 }
 
-/* Returns true if a source for the next procedure has been
-   configured, false otherwise. */
+/* Replaces the active file's data by READER without replacing
+   the associated dictionary. */
 bool
-proc_has_source (void) 
+proc_set_active_file_data (struct dataset *ds, struct casereader *reader)
 {
-  return proc_source != NULL;
+  casereader_destroy (ds->source);
+  ds->source = reader;
+
+  caseinit_clear (ds->caseinit);
+  caseinit_mark_as_preinited (ds->caseinit, ds->dict);
+
+  return reader == NULL || !casereader_error (reader);
 }
 
-/* Returns the output from the previous procedure.
-   For use only immediately after executing a procedure.
-   The returned casefile is owned by the caller; it will not be
-   automatically used for the next procedure's input. */
-struct casefile *
-proc_capture_output (void) 
+/* Returns true if an active file data source is available, false
+   otherwise. */
+bool
+proc_has_active_file (const struct dataset *ds)
 {
-  struct casefile *casefile;
+  return ds->source != NULL;
+}
 
-  /* Try to make sure that this function is called immediately
-     after procedure() or a similar function. */
-  assert (proc_source != NULL);
-  assert (case_source_is_class (proc_source, &storage_source_class));
-  assert (trns_chain_is_empty (permanent_trns_chain));
-  assert (!proc_in_temporary_transformations ());
+/* Returns the active file data source from DS, or a null pointer
+   if DS has no data source, and removes it from DS. */
+struct casereader *
+proc_extract_active_file_data (struct dataset *ds)
+{
+  struct casereader *reader = ds->source;
+  ds->source = NULL;
 
-  casefile = storage_source_decapsulate (proc_source);
-  proc_source = NULL;
+  return reader;
+}
 
-  return casefile;
+/* Checks whether DS has a corrupted active file.  If so,
+   discards it and returns false.  If not, returns true without
+   doing anything. */
+bool
+dataset_end_of_command (struct dataset *ds)
+{
+  if (ds->source != NULL)
+    {
+      if (casereader_error (ds->source))
+        {
+          proc_discard_active_file (ds);
+          return false;
+        }
+      else
+        {
+          const struct taint *taint = casereader_get_taint (ds->source);
+          taint_reset_successor_taint ((struct taint *) taint);
+          assert (!taint_has_tainted_successor (taint));
+        }
+    }
+  return true;
 }
 \f
 static trns_proc_func case_limit_trns_proc;
 static trns_free_func case_limit_trns_free;
 
 /* Adds a transformation that limits the number of cases that may
-   pass through, if default_dict has a case limit. */
+   pass through, if DS->DICT has a case limit. */
 static void
-add_case_limit_trns (void) 
+add_case_limit_trns (struct dataset *ds)
 {
-  size_t case_limit = dict_get_case_limit (default_dict);
+  casenumber case_limit = dict_get_case_limit (ds->dict);
   if (case_limit != 0)
     {
-      size_t *cases_remaining = xmalloc (sizeof *cases_remaining);
+      casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
       *cases_remaining = case_limit;
-      add_transformation (case_limit_trns_proc, case_limit_trns_free,
+      add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
                           cases_remaining);
-      dict_set_case_limit (default_dict, 0);
+      dict_set_case_limit (ds->dict, 0);
     }
 }
 
@@ -875,12 +712,12 @@ add_case_limit_trns (void)
    *CASES_REMAINING. */
 static int
 case_limit_trns_proc (void *cases_remaining_,
-                      struct ccase *c UNUSED, int case_nr UNUSED) 
+                      struct ccase **c UNUSED, casenumber case_nr UNUSED)
 {
   size_t *cases_remaining = cases_remaining_;
-  if (*cases_remaining > 0) 
+  if (*cases_remaining > 0)
     {
-      *cases_remaining--;
+      (*cases_remaining)--;
       return TRNS_CONTINUE;
     }
   else
@@ -889,7 +726,7 @@ case_limit_trns_proc (void *cases_remaining_,
 
 /* Frees the data associated with a case limit transformation. */
 static bool
-case_limit_trns_free (void *cases_remaining_) 
+case_limit_trns_free (void *cases_remaining_)
 {
   size_t *cases_remaining = cases_remaining_;
   free (cases_remaining);
@@ -901,25 +738,43 @@ static trns_proc_func filter_trns_proc;
 /* Adds a temporary transformation to filter data according to
    the variable specified on FILTER, if any. */
 static void
-add_filter_trns (void) 
+add_filter_trns (struct dataset *ds)
 {
-  struct variable *filter_var = dict_get_filter (default_dict);
-  if (filter_var != NULL) 
+  struct variable *filter_var = dict_get_filter (ds->dict);
+  if (filter_var != NULL)
     {
-      proc_start_temporary_transformations ();
-      add_transformation (filter_trns_proc, NULL, filter_var);
+      proc_start_temporary_transformations (ds);
+      add_transformation (ds, filter_trns_proc, NULL, filter_var);
     }
 }
 
 /* FILTER transformation. */
 static int
 filter_trns_proc (void *filter_var_,
-                  struct ccase *c UNUSED, int case_nr UNUSED) 
-  
+                  struct ccase **c UNUSED, casenumber case_nr UNUSED)
+
 {
   struct variable *filter_var = filter_var_;
-  double f = case_num (c, filter_var->fv);
-  return (f != 0.0 && !mv_is_num_missing (&filter_var->miss, f)
+  double f = case_num (*c, filter_var);
+  return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
           ? TRNS_CONTINUE : TRNS_DROP_CASE);
 }
 
+
+struct dictionary *
+dataset_dict (const struct dataset *ds)
+{
+  return ds->dict;
+}
+
+const struct casereader *
+dataset_source (const struct dataset *ds)
+{
+  return ds->source;
+}
+
+void
+dataset_need_lag (struct dataset *ds, int n_before)
+{
+  ds->n_lag = MAX (ds->n_lag, n_before);
+}