Actually implement the new procedure code and adapt all of its clients
[pspp-builds.git] / src / data / procedure.c
index 7a9b432132c09158ce849147801f4a17bc4f64a4..46a18bb463297fdf735a6e30a2d2207c0dc45e9a 100644 (file)
 #include <stdlib.h>
 #include <unistd.h>
 
-#include <data/case-source.h>
-#include <data/case-sink.h>
 #include <data/case.h>
-#include <data/casefile.h>
-#include <data/fastfile.h>
+#include <data/caseinit.h>
+#include <data/casereader.h>
+#include <data/casereader-provider.h>
+#include <data/casewriter.h>
 #include <data/dictionary.h>
 #include <data/file-handle-def.h>
 #include <data/procedure.h>
-#include <data/storage-stream.h>
 #include <data/transformations.h>
 #include <data/variable.h>
 #include <libpspp/alloc.h>
 #include <libpspp/deque.h>
 #include <libpspp/misc.h>
 #include <libpspp/str.h>
+#include <libpspp/taint.h>
 
 struct dataset {
-
-  /* An abstract factory which creates casefiles */
-  struct casefile_factory *cf_factory;
-
-  /* Callback which occurs when a procedure provides a new source for
-     the dataset */
-  replace_source_callback *replace_source ;
-
-  /* Callback which occurs whenever the DICT is replaced by a new one */
-  replace_dictionary_callback *replace_dict;
-
-  /* Cases are read from proc_source,
+  /* Cases are read from source,
+     their transformation variables are initialized,
      pass through permanent_trns_chain (which transforms them into
      the format described by permanent_dict),
-     are written to proc_sink,
+     are written to sink,
      pass through temporary_trns_chain (which transforms them into
      the format described by dict),
      and are finally passed to the procedure. */
-  struct case_source *proc_source;
+  struct casereader *source;
+  struct caseinit *caseinit;
   struct trns_chain *permanent_trns_chain;
   struct dictionary *permanent_dict;
-  struct case_sink *proc_sink;
+  struct casewriter *sink;
   struct trns_chain *temporary_trns_chain;
   struct dictionary *dict;
 
+  /* Callback which occurs when a procedure provides a new source for
+     the dataset */
+  replace_source_callback *replace_source ;
+
+  /* Callback which occurs whenever the DICT is replaced by a new one */
+  replace_dictionary_callback *replace_dict;
+
+  /* If true, cases are discarded instead of being written to
+     sink. */
+  bool discard_output;
+
   /* The transformation chain that the next transformation will be
      added to. */
   struct trns_chain *cur_trns_chain;
@@ -82,26 +84,22 @@ struct dataset {
   struct ccase *lag_cases;      /* Lagged cases managed by deque. */
 
   /* Procedure data. */
-  bool is_open;               /* Procedure open? */
-  struct ccase trns_case;     /* Case used for transformations. */
-  struct ccase sink_case;     /* Case written to sink, if
-                                 compacting is necessary. */
+  enum 
+    {
+      PROC_COMMITTED,
+      PROC_OPEN,
+      PROC_CLOSED 
+    }
+  proc_state;
   size_t cases_written;       /* Cases output so far. */
-  bool ok;
+  bool ok;                    /* Error status. */
 }; /* struct dataset */
 
 
 static void add_case_limit_trns (struct dataset *ds);
 static void add_filter_trns (struct dataset *ds);
 
-static bool internal_procedure (struct dataset *ds, case_func *,
-                                end_func *,
-                                void *aux);
 static void update_last_proc_invocation (struct dataset *ds);
-static void create_trns_case (struct ccase *, struct dictionary *);
-static void open_active_file (struct dataset *ds);
-static void clear_case (const struct dataset *ds, struct ccase *c);
-static bool close_active_file (struct dataset *ds);
 \f
 /* Public functions. */
 
@@ -116,146 +114,89 @@ time_of_last_procedure (struct dataset *ds)
 \f
 /* Regular procedure. */
 
-
-
-/* Reads the data from the input program and writes it to a new
-   active file.  For each case we read from the input program, we
-   do the following:
-
-   1. Execute permanent transformations.  If these drop the case,
-      start the next case from step 1.
-
-   2. Write case to replacement active file.
-
-   3. Execute temporary transformations.  If these drop the case,
-      start the next case from step 1.
-
-   4. Pass case to PROC_FUNC, passing AUX as auxiliary data.
-
-   Returns true if successful, false if an I/O error occurred. */
+/* Executes any pending transformations, if necessary.
+   This is not identical to the EXECUTE command in that it won't
+   always read the source data.  This can be important when the
+   source data is given inline within BEGIN DATA...END FILE. */
 bool
-procedure (struct dataset *ds, case_func *cf, void *aux)
+proc_execute (struct dataset *ds)
 {
-  update_last_proc_invocation (ds);
+  bool ok;
 
-  /* Optimize the trivial case where we're not going to do
-     anything with the data, by not reading the data at all. */
-  if (cf == NULL
-      && case_source_is_class (ds->proc_source, &storage_source_class)
-      && ds->proc_sink == NULL
-      && (ds->temporary_trns_chain == NULL
-          || trns_chain_is_empty (ds->temporary_trns_chain))
+  if ((ds->temporary_trns_chain == NULL
+       || trns_chain_is_empty (ds->temporary_trns_chain))
       && trns_chain_is_empty (ds->permanent_trns_chain))
     {
       ds->n_lag = 0;
+      ds->discard_output = false;
       dict_set_case_limit (ds->dict, 0);
       dict_clear_vectors (ds->dict);
       return true;
     }
 
-  return internal_procedure (ds, cf, NULL, aux);
+  ok = casereader_destroy (proc_open (ds));
+  return proc_commit (ds) && ok;
 }
-\f
-/* Multipass procedure. */
 
-struct multipass_aux_data
-  {
-    struct casefile *casefile;
+static struct casereader_class proc_casereader_class;
 
-    bool (*proc_func) (const struct casefile *, void *aux);
-    void *aux;
-  };
-
-/* Case processing function for multipass_procedure(). */
-static bool
-multipass_case_func (const struct ccase *c, void *aux_data_, const struct dataset *ds UNUSED)
-{
-  struct multipass_aux_data *aux_data = aux_data_;
-  return casefile_append (aux_data->casefile, c);
-}
-
-/* End-of-file function for multipass_procedure(). */
-static bool
-multipass_end_func (void *aux_data_, const struct dataset *ds UNUSED)
-{
-  struct multipass_aux_data *aux_data = aux_data_;
-  return (aux_data->proc_func == NULL
-          || aux_data->proc_func (aux_data->casefile, aux_data->aux));
-}
-
-/* Procedure that allows multiple passes over the input data.
-   The entire active file is passed to PROC_FUNC, with the given
-   AUX as auxiliary data, as a unit. */
-bool
-multipass_procedure (struct dataset *ds, casefile_func *proc_func,  void *aux)
+/* Opens dataset DS for reading cases with proc_read.
+   proc_commit must be called when done. */
+struct casereader *
+proc_open (struct dataset *ds)
 {
-  struct multipass_aux_data aux_data;
-  bool ok;
+  assert (ds->source != NULL);
+  assert (ds->proc_state == PROC_COMMITTED);
 
-  aux_data.casefile =
-    ds->cf_factory->create_casefile (ds->cf_factory,
-                                    dict_get_next_value_idx (ds->dict));
-
-  aux_data.proc_func = proc_func;
-  aux_data.aux = aux;
-
-  ok = internal_procedure (ds, multipass_case_func, multipass_end_func, &aux_data);
-  ok = !casefile_error (aux_data.casefile) && ok;
-
-  casefile_destroy (aux_data.casefile);
-
-  return ok;
-}
-\f
+  update_last_proc_invocation (ds);
 
-/* Procedure implementation. */
+  caseinit_mark_for_init (ds->caseinit, ds->dict);
 
-/* Executes a procedure.
-   Passes each case to CASE_FUNC.
-   Calls END_FUNC after the last case.
-   Returns true if successful, false if an I/O error occurred (or
-   if CASE_FUNC or END_FUNC ever returned false). */
-static bool
-internal_procedure (struct dataset *ds, case_func *proc,
-                   end_func *end,
-                    void *aux)
-{
-  struct ccase *c;
-  bool ok = true;
+  /* Finish up the collection of transformations. */
+  add_case_limit_trns (ds);
+  add_filter_trns (ds);
+  trns_chain_finalize (ds->cur_trns_chain);
 
-  proc_open (ds);
-  while (ok && proc_read (ds, &c))
-    if (proc != NULL)
-      ok = proc (c, aux, ds) && ok;
-  if (end != NULL)
-    ok = end (aux, ds) && ok;
+  /* Make permanent_dict refer to the dictionary right before
+     data reaches the sink. */
+  if (ds->permanent_dict == NULL)
+    ds->permanent_dict = ds->dict;
 
-  if ( proc_close (ds) && ok )
+  /* Prepare sink. */
+  if (!ds->discard_output) 
     {
-
-      return true;
+      ds->compactor = (dict_compacting_would_shrink (ds->permanent_dict)
+                       ? dict_make_compactor (ds->permanent_dict)
+                       : NULL);
+      ds->sink = autopaging_writer_create (dict_get_compacted_value_cnt (
+                                             ds->permanent_dict)); 
+    }
+  else 
+    {
+      ds->compactor = NULL;
+      ds->sink = NULL;
     }
 
-  return false;
-}
-
-/* Opens dataset DS for reading cases with proc_read.
-   proc_close must be called when done. */
-void
-proc_open (struct dataset *ds)
-{
-  assert (ds->proc_source != NULL);
-  assert (!ds->is_open);
-
-  update_last_proc_invocation (ds);
-
-  open_active_file (ds);
+  /* Allocate memory for lagged cases. */
+  ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
 
-  ds->is_open = true;
-  create_trns_case (&ds->trns_case, ds->dict);
-  case_create (&ds->sink_case, dict_get_compacted_value_cnt (ds->dict));
+  ds->proc_state = PROC_OPEN;
   ds->cases_written = 0;
   ds->ok = true;
+
+  /* FIXME: use taint in dataset in place of `ok'? */
+  /* FIXME: for trivial cases we can just return a clone of
+     ds->source? */
+  return casereader_create_sequential (NULL,
+                                       dict_get_next_value_idx (ds->dict),
+                                       CASENUMBER_MAX,
+                                       &proc_casereader_class, ds);
+}
+
+bool
+proc_is_open (const struct dataset *ds) 
+{
+  return ds->proc_state != PROC_COMMITTED;
 }
 
 /* Reads the next case from dataset DS, which must have been
@@ -264,14 +205,15 @@ proc_open (struct dataset *ds)
    case is stored in *C.
    Return false at end of file or if a read error occurs.  In
    this case a null pointer is stored in *C. */
-bool
-proc_read (struct dataset *ds, struct ccase **c)
+static bool
+proc_casereader_read (struct casereader *reader UNUSED, void *ds_,
+                      struct ccase *c) 
 {
+  struct dataset *ds = ds_;
   enum trns_result retval = TRNS_DROP_CASE;
 
-  assert (ds->is_open);
-  *c = NULL;
-  for (;;)
+  assert (ds->proc_state == PROC_OPEN);
+  for (;;) 
     {
       size_t case_nr;
 
@@ -281,51 +223,59 @@ proc_read (struct dataset *ds, struct ccase **c)
       if (!ds->ok)
         return false;
 
-      /* Read a case from proc_source. */
-      clear_case (ds, &ds->trns_case);
-      if (!ds->proc_source->class->read (ds->proc_source, &ds->trns_case))
+      /* Read a case from source. */
+      if (!casereader_read (ds->source, c))
         return false;
+      case_resize (c, dict_get_next_value_idx (ds->dict));
+      caseinit_init_reinit_vars (ds->caseinit, c);
+      caseinit_init_left_vars (ds->caseinit, c);
 
       /* Execute permanent transformations.  */
       case_nr = ds->cases_written + 1;
       retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
-                                   &ds->trns_case, &case_nr);
-      if (retval != TRNS_CONTINUE)
-        continue;
-
+                                   c, &case_nr);
+      caseinit_update_left_vars (ds->caseinit, c);
+      if (retval != TRNS_CONTINUE) 
+        {
+          case_destroy (c);
+          continue; 
+        }
+  
       /* Write case to collection of lagged cases. */
       if (ds->n_lag > 0) 
         {
           while (deque_count (&ds->lag) >= ds->n_lag)
             case_destroy (&ds->lag_cases[deque_pop_back (&ds->lag)]);
-          case_clone (&ds->lag_cases[deque_push_front (&ds->lag)],
-                      &ds->trns_case);
+          case_clone (&ds->lag_cases[deque_push_front (&ds->lag)], c);
         }
 
       /* Write case to replacement active file. */
       ds->cases_written++;
-      if (ds->proc_sink->class->write != NULL)
+      if (ds->sink != NULL) 
         {
-          if (ds->compactor != NULL)
+          struct ccase tmp;
+          if (ds->compactor != NULL) 
             {
-              dict_compactor_compact (ds->compactor, &ds->sink_case,
-                                      &ds->trns_case);
-              ds->proc_sink->class->write (ds->proc_sink, &ds->sink_case);
+              case_create (&tmp, dict_get_compacted_value_cnt (ds->dict));
+              dict_compactor_compact (ds->compactor, &tmp, c);
             }
           else
-            ds->proc_sink->class->write (ds->proc_sink, &ds->trns_case);
+            case_clone (&tmp, c);
+          casewriter_write (ds->sink, &tmp);
         }
 
       /* Execute temporary transformations. */
       if (ds->temporary_trns_chain != NULL)
         {
           retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
-                                       &ds->trns_case, &ds->cases_written);
+                                       c, &ds->cases_written);
           if (retval != TRNS_CONTINUE)
-            continue;
+            {
+              case_destroy (c);
+              continue;
+            }
         }
 
-      *c = &ds->trns_case;
       return true;
     }
 }
@@ -335,120 +285,35 @@ proc_read (struct dataset *ds, struct ccase **c)
    while reading or closing the data set.
    If DS has not been opened, returns true without doing
    anything else. */
-bool
-proc_close (struct dataset *ds)
-{
-  if (!ds->is_open)
-    return true;
-
-  /* Drain any remaining cases. */
-  while (ds->ok)
-    {
-      struct ccase *c;
-      if (!proc_read (ds, &c))
-        break;
-    }
-  ds->ok = free_case_source (ds->proc_source) && ds->ok;
-  proc_set_source (ds, NULL);
-
-  case_destroy (&ds->sink_case);
-  case_destroy (&ds->trns_case);
-
-  ds->ok = close_active_file (ds) && ds->ok;
-  ds->is_open = false;
-
-  return ds->ok;
-}
-
-/* Updates last_proc_invocation. */
-static void
-update_last_proc_invocation (struct dataset *ds)
-{
-  ds->last_proc_invocation = time (NULL);
-}
-
-/* Creates and returns a case, initializing it from the vectors
-   that say which `value's need to be initialized just once, and
-   which ones need to be re-initialized before every case. */
 static void
-create_trns_case (struct ccase *trns_case, struct dictionary *dict)
+proc_casereader_destroy (struct casereader *reader, void *ds_)
 {
-  size_t var_cnt = dict_get_var_cnt (dict);
-  size_t i;
+  struct dataset *ds = ds_;
+  struct ccase c;
 
-  case_create (trns_case, dict_get_next_value_idx (dict));
-  for (i = 0; i < var_cnt; i++)
-    {
-      struct variable *v = dict_get_var (dict, i);
-      union value *value = case_data_rw (trns_case, v);
+  /* Make sure transformations happen for every input case, in
+     case they have side effects, and ensure that the replacement
+     active file gets all the cases it should. */
+  while (casereader_read (reader, &c))
+    case_destroy (&c);
 
-      if (var_is_numeric (v))
-        value->f = var_get_leave (v) ? 0.0 : SYSMIS;
-      else
-        memset (value->s, ' ', var_get_width (v));
-    }
+  ds->proc_state = PROC_CLOSED;
+  ds->ok = casereader_destroy (ds->source) && ds->ok;
+  ds->source = NULL;
+  proc_set_active_file_data (ds, NULL);
 }
 
-/* Makes all preparations for reading from the data source and writing
-   to the data sink. */
-static void
-open_active_file (struct dataset *ds)
-{
-  add_case_limit_trns (ds);
-  add_filter_trns (ds);
-
-  /* Finalize transformations. */
-  trns_chain_finalize (ds->cur_trns_chain);
-
-  /* Make permanent_dict refer to the dictionary right before
-     data reaches the sink. */
-  if (ds->permanent_dict == NULL)
-    ds->permanent_dict = ds->dict;
-
-  /* Figure out whether to compact. */
-  ds->compactor =
-    (dict_compacting_would_shrink (ds->permanent_dict)
-     ? dict_make_compactor (ds->permanent_dict)
-     : NULL);
-
-  /* Prepare sink. */
-  if (ds->proc_sink == NULL)
-    ds->proc_sink = create_case_sink (&storage_sink_class,
-                                     ds->permanent_dict,
-                                     ds->cf_factory,
-                                     NULL);
-  if (ds->proc_sink->class->open != NULL)
-    ds->proc_sink->class->open (ds->proc_sink);
-
-  /* Allocate memory for lagged cases. */
-  ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
-}
-
-/* Clears the variables in C that need to be cleared between
-   processing cases.  */
-static void
-clear_case (const struct dataset *ds, struct ccase *c)
+/* Must return false if the source casereader, a transformation,
+   or the sink casewriter signaled an error.  (If a temporary
+   transformation signals an error, then the return value is
+   false, but the replacement active file may still be
+   untainted.) */
+bool
+proc_commit (struct dataset *ds) 
 {
-  size_t var_cnt = dict_get_var_cnt (ds->dict);
-  size_t i;
+  assert (ds->proc_state == PROC_CLOSED);
+  ds->proc_state = PROC_COMMITTED;
 
-  for (i = 0; i < var_cnt; i++)
-    {
-      struct variable *v = dict_get_var (ds->dict, i);
-      if (!var_get_leave (v))
-        {
-          if (var_is_numeric (v))
-            case_data_rw (c, v)->f = SYSMIS;
-          else
-            memset (case_data_rw (c, v)->s, ' ', var_get_width (v));
-        }
-    }
-}
-
-/* Closes the active file. */
-static bool
-close_active_file (struct dataset *ds)
-{
   /* Free memory for lagged cases. */
   while (!deque_is_empty (&ds->lag))
     case_destroy (&ds->lag_cases[deque_pop_back (&ds->lag)]);
@@ -457,23 +322,49 @@ close_active_file (struct dataset *ds)
   /* Dictionary from before TEMPORARY becomes permanent. */
   proc_cancel_temporary_transformations (ds);
 
-  /* Finish compacting. */
-  if (ds->compactor != NULL)
+  if (!ds->discard_output) 
     {
-      dict_compactor_destroy (ds->compactor);
-      dict_compact_values (ds->dict);
-      ds->compactor = NULL;
+      /* Finish compacting. */
+      if (ds->compactor != NULL) 
+        {
+          dict_compactor_destroy (ds->compactor);
+          dict_compact_values (ds->dict);
+          ds->compactor = NULL;
+        }
+    
+      /* Old data sink becomes new data source. */
+      if (ds->sink != NULL) 
+        ds->source = casewriter_make_reader (ds->sink);
     }
+  else 
+    {
+      ds->source = NULL;
+      ds->discard_output = false; 
+    }
+  ds->sink = NULL;
+  if ( ds->replace_source) ds->replace_source (ds->source);
 
-  /* Old data sink becomes new data source. */
-  if (ds->proc_sink->class->make_source != NULL)
-    proc_set_source (ds, ds->proc_sink->class->make_source (ds->proc_sink) );
-  free_case_sink (ds->proc_sink);
-  ds->proc_sink = NULL;
+  caseinit_clear (ds->caseinit);
+  caseinit_mark_as_preinited (ds->caseinit, ds->dict);
 
   dict_clear_vectors (ds->dict);
   ds->permanent_dict = NULL;
-  return proc_cancel_all_transformations (ds);
+  return proc_cancel_all_transformations (ds) && ds->ok;
+}
+
+static struct casereader_class proc_casereader_class = 
+  {
+    proc_casereader_read,
+    proc_casereader_destroy,
+    NULL,
+    NULL,
+  };
+
+/* Updates last_proc_invocation. */
+static void
+update_last_proc_invocation (struct dataset *ds)
+{
+  ds->last_proc_invocation = time (NULL);
 }
 \f
 /* Returns a pointer to the lagged case from N_BEFORE cases before the
@@ -490,218 +381,6 @@ lagged_case (const struct dataset *ds, int n_before)
     return NULL;
 }
 \f
-/* Procedure that separates the data into SPLIT FILE groups. */
-
-/* Represents auxiliary data for handling SPLIT FILE. */
-struct split_aux_data
-  {
-    struct dataset *dataset;    /* The dataset */
-    struct ccase prev_case;     /* Data in previous case. */
-
-    /* Callback functions. */
-    begin_func *begin;
-    case_func *proc;
-    end_func *end;
-    void *func_aux;
-  };
-
-static int equal_splits (const struct ccase *, const struct ccase *, const struct dataset *ds);
-static bool split_procedure_case_func (const struct ccase *c, void *, const struct dataset *);
-static bool split_procedure_end_func (void *, const struct dataset *);
-
-/* Like procedure(), but it automatically breaks the case stream
-   into SPLIT FILE break groups.  Before each group of cases with
-   identical SPLIT FILE variable values, BEGIN_FUNC is called
-   with the first case in the group.
-   Then PROC_FUNC is called for each case in the group (including
-   the first).
-   END_FUNC is called when the group is finished.  FUNC_AUX is
-   passed to each of the functions as auxiliary data.
-
-   If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
-   and END_FUNC will be called at all.
-
-   If SPLIT FILE is not in effect, then there is one break group
-   (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
-   will be called once.
-
-   Returns true if successful, false if an I/O error occurred. */
-bool
-procedure_with_splits (struct dataset *ds,
-                      begin_func begin,
-                      case_func *proc,
-                       end_func *end,
-                       void *func_aux)
-{
-  struct split_aux_data split_aux;
-  bool ok;
-
-  case_nullify (&split_aux.prev_case);
-  split_aux.begin = begin;
-  split_aux.proc = proc;
-  split_aux.end = end;
-  split_aux.func_aux = func_aux;
-  split_aux.dataset = ds;
-
-  ok = internal_procedure (ds, split_procedure_case_func,
-                           split_procedure_end_func, &split_aux);
-
-  case_destroy (&split_aux.prev_case);
-
-  return ok;
-}
-
-/* Case callback used by procedure_with_splits(). */
-static bool
-split_procedure_case_func (const struct ccase *c, void *split_aux_, const struct dataset *ds)
-{
-  struct split_aux_data *split_aux = split_aux_;
-
-  /* Start a new series if needed. */
-  if (case_is_null (&split_aux->prev_case)
-      || !equal_splits (c, &split_aux->prev_case, split_aux->dataset))
-    {
-      if (!case_is_null (&split_aux->prev_case) && split_aux->end != NULL)
-        split_aux->end (split_aux->func_aux, ds);
-
-      case_destroy (&split_aux->prev_case);
-      case_clone (&split_aux->prev_case, c);
-
-      if (split_aux->begin != NULL)
-       split_aux->begin (&split_aux->prev_case, split_aux->func_aux, ds);
-    }
-
-  return (split_aux->proc == NULL
-          || split_aux->proc (c, split_aux->func_aux, ds));
-}
-
-/* End-of-file callback used by procedure_with_splits(). */
-static bool
-split_procedure_end_func (void *split_aux_, const struct dataset *ds)
-{
-  struct split_aux_data *split_aux = split_aux_;
-
-  if (!case_is_null (&split_aux->prev_case) && split_aux->end != NULL)
-    split_aux->end (split_aux->func_aux, ds);
-  return true;
-}
-
-/* Compares the SPLIT FILE variables in cases A and B and returns
-   nonzero only if they differ. */
-static int
-equal_splits (const struct ccase *a, const struct ccase *b,
-             const struct dataset *ds)
-{
-  return case_compare (a, b,
-                       dict_get_split_vars (ds->dict),
-                       dict_get_split_cnt (ds->dict)) == 0;
-}
-\f
-/* Multipass procedure that separates the data into SPLIT FILE
-   groups. */
-
-/* Represents auxiliary data for handling SPLIT FILE in a
-   multipass procedure. */
-struct multipass_split_aux_data
-  {
-    struct dataset *dataset;    /* The dataset of the split */
-    struct ccase prev_case;     /* Data in previous case. */
-    struct casefile *casefile;  /* Accumulates data for a split. */
-    split_func *split;          /* Function to call with the accumulated
-                                  data. */
-    void *func_aux;             /* Auxiliary data. */
-  };
-
-static bool multipass_split_case_func (const struct ccase *c, void *aux_, const struct dataset *);
-static bool multipass_split_end_func (void *aux_, const struct dataset *ds);
-static bool multipass_split_output (struct multipass_split_aux_data *, const struct dataset *ds);
-
-/* Returns true if successful, false if an I/O error occurred. */
-bool
-multipass_procedure_with_splits (struct dataset *ds,
-                                split_func  *split,
-                                 void *func_aux)
-{
-  struct multipass_split_aux_data aux;
-  bool ok;
-
-  case_nullify (&aux.prev_case);
-  aux.casefile = NULL;
-  aux.split = split;
-  aux.func_aux = func_aux;
-  aux.dataset = ds;
-
-  ok = internal_procedure (ds, multipass_split_case_func,
-                           multipass_split_end_func, &aux);
-  case_destroy (&aux.prev_case);
-
-  return ok;
-}
-
-/* Case callback used by multipass_procedure_with_splits(). */
-static bool
-multipass_split_case_func (const struct ccase *c, void *aux_, const struct dataset *ds)
-{
-  struct multipass_split_aux_data *aux = aux_;
-  bool ok = true;
-
-  /* Start a new series if needed. */
-  if (aux->casefile == NULL || ! equal_splits (c, &aux->prev_case, ds))
-    {
-      /* Record split values. */
-      case_destroy (&aux->prev_case);
-      case_clone (&aux->prev_case, c);
-
-      /* Pass any cases to split_func. */
-      if (aux->casefile != NULL)
-        ok = multipass_split_output (aux, ds);
-
-      /* Start a new casefile. */
-      aux->casefile =
-       ds->cf_factory->create_casefile (ds->cf_factory,
-                                        dict_get_next_value_idx (ds->dict));
-    }
-
-  return casefile_append (aux->casefile, c) && ok;
-}
-
-/* End-of-file callback used by multipass_procedure_with_splits(). */
-static bool
-multipass_split_end_func (void *aux_, const struct dataset *ds)
-{
-  struct multipass_split_aux_data *aux = aux_;
-  return (aux->casefile == NULL || multipass_split_output (aux, ds));
-}
-
-static bool
-multipass_split_output (struct multipass_split_aux_data *aux, const struct dataset *ds)
-{
-  bool ok;
-
-  assert (aux->casefile != NULL);
-  ok = aux->split (&aux->prev_case, aux->casefile, aux->func_aux, ds);
-  casefile_destroy (aux->casefile);
-  aux->casefile = NULL;
-
-  return ok;
-}
-\f
-/* Discards all the current state in preparation for a data-input
-   command like DATA LIST or GET. */
-void
-discard_variables (struct dataset *ds)
-{
-  dict_clear (ds->dict);
-  fh_set_default_handle (NULL);
-
-  ds->n_lag = 0;
-
-  free_case_source (ds->proc_source);
-  proc_set_source (ds, NULL);
-
-  proc_cancel_all_transformations (ds);
-}
-\f
 /* Returns the current set of permanent transformations,
    and clears the permanent transformations.
    For use by INPUT PROGRAM. */
@@ -804,8 +483,10 @@ proc_cancel_temporary_transformations (struct dataset *ds)
 {
   if (proc_in_temporary_transformations (ds))
     {
-      dataset_set_dict (ds, ds->permanent_dict);
+      dict_destroy (ds->dict);
+      ds->dict = ds->permanent_dict;
       ds->permanent_dict = NULL;
+      if (ds->replace_dict) ds->replace_dict (ds->dict);
 
       trns_chain_destroy (ds->temporary_trns_chain);
       ds->temporary_trns_chain = NULL;
@@ -822,6 +503,7 @@ bool
 proc_cancel_all_transformations (struct dataset *ds)
 {
   bool ok;
+  assert (ds->proc_state == PROC_COMMITTED);
   ok = trns_chain_destroy (ds->permanent_trns_chain);
   ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
   ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
@@ -831,14 +513,12 @@ proc_cancel_all_transformations (struct dataset *ds)
 \f
 /* Initializes procedure handling. */
 struct dataset *
-create_dataset (struct casefile_factory *fact,
-               replace_source_callback *rps,
-               replace_dictionary_callback *rds
-               )
+create_dataset (replace_source_callback *rps,
+               replace_dictionary_callback *rds)
 {
   struct dataset *ds = xzalloc (sizeof(*ds));
   ds->dict = dict_create ();
-  ds->cf_factory = fact;
+  ds->caseinit = caseinit_create ();
   ds->replace_source = rps;
   ds->replace_dict = rds;
   proc_cancel_all_transformations (ds);
@@ -849,60 +529,103 @@ create_dataset (struct casefile_factory *fact,
 void
 destroy_dataset (struct dataset *ds)
 {
-  discard_variables (ds);
+  proc_discard_active_file (ds);
   dict_destroy (ds->dict);
+  caseinit_destroy (ds->caseinit);
   trns_chain_destroy (ds->permanent_trns_chain);
   free (ds);
 }
 
-/* Sets SINK as the destination for procedure output from the
-   next procedure. */
+/* Causes output from the next procedure to be discarded, instead
+   of being preserved for use as input for the next procedure. */
 void
-proc_set_sink (struct dataset *ds, struct case_sink *sink)
+proc_discard_output (struct dataset *ds) 
 {
-  assert (ds->proc_sink == NULL);
-  ds->proc_sink = sink;
+  ds->discard_output = true;
+}
+
+/* Discards the active file dictionary, data, and
+   transformations. */
+void
+proc_discard_active_file (struct dataset *ds)
+{
+  assert (ds->proc_state == PROC_COMMITTED);
+
+  dict_clear (ds->dict);
+  fh_set_default_handle (NULL);
+
+  ds->n_lag = 0;
+  
+  casereader_destroy (ds->source);
+  ds->source = NULL;
+  if ( ds->replace_source) ds->replace_source (NULL);
+
+  proc_cancel_all_transformations (ds);
 }
 
 /* Sets SOURCE as the source for procedure input for the next
    procedure. */
 void
-proc_set_source (struct dataset *ds, struct case_source *source)
+proc_set_active_file (struct dataset *ds,
+                      struct casereader *source,
+                      struct dictionary *dict) 
 {
-  ds->proc_source = source;
+  assert (ds->proc_state == PROC_COMMITTED);
+  assert (ds->dict != dict);
+
+  proc_discard_active_file (ds);
 
-  if ( ds->replace_source )
-    ds->replace_source (ds->proc_source);
+  dict_destroy (ds->dict);
+  ds->dict = dict;
+  if ( ds->replace_dict) ds->replace_dict (dict);
+
+  proc_set_active_file_data (ds, source);
 }
 
-/* Returns true if a source for the next procedure has been
-   configured, false otherwise. */
+/* Replaces the active file's data by READER without replacing
+   the associated dictionary. */
 bool
-proc_has_source (const struct dataset *ds)
+proc_set_active_file_data (struct dataset *ds, struct casereader *reader) 
 {
-  return ds->proc_source != NULL;
-}
+  casereader_destroy (ds->source);
+  ds->source = reader;
+  if (ds->replace_source) ds->replace_source (reader);
 
-/* Returns the output from the previous procedure.
-   For use only immediately after executing a procedure.
-   The returned casefile is owned by the caller; it will not be
-   automatically used for the next procedure's input. */
-struct casefile *
-proc_capture_output (struct dataset *ds)
-{
-  struct casefile *casefile;
+  caseinit_clear (ds->caseinit);
+  caseinit_mark_as_preinited (ds->caseinit, ds->dict);
 
-  /* Try to make sure that this function is called immediately
-     after procedure() or a similar function. */
-  assert (ds->proc_source != NULL);
-  assert (case_source_is_class (ds->proc_source, &storage_source_class));
-  assert (trns_chain_is_empty (ds->permanent_trns_chain));
-  assert (!proc_in_temporary_transformations (ds));
+  return reader == NULL || !casereader_error (reader);
+}
 
-  casefile = storage_source_decapsulate (ds->proc_source);
-  proc_set_source (ds, NULL);
+/* Returns true if an active file data source is available, false
+   otherwise. */
+bool
+proc_has_active_file (const struct dataset *ds) 
+{
+  return ds->source != NULL;
+}
 
-  return casefile;
+/* Checks whether DS has a corrupted active file.  If so,
+   discards it and returns false.  If not, returns true without
+   doing anything. */
+bool
+dataset_end_of_command (struct dataset *ds) 
+{
+  if (ds->source != NULL) 
+    {
+      if (casereader_error (ds->source)) 
+        {
+          proc_discard_active_file (ds);
+          return false;
+        }
+      else 
+        {
+          const struct taint *taint = casereader_get_taint (ds->source);
+          taint_reset_successor_taint ((struct taint *) taint);
+          assert (!taint_has_tainted_successor (taint));
+        }
+    }
+  return true; 
 }
 \f
 static trns_proc_func case_limit_trns_proc;
@@ -983,32 +706,8 @@ dataset_dict (const struct dataset *ds)
   return ds->dict;
 }
 
-
-/* Set or replace dataset DS's dictionary with DICT.
-   The old dictionary is destroyed */
-void
-dataset_set_dict (struct dataset *ds, struct dictionary *dict)
-{
-  struct dictionary *old_dict = ds->dict;
-
-  dict_copy_callbacks (dict, ds->dict);
-  ds->dict = dict;
-
-  if ( ds->replace_dict )
-    ds->replace_dict (dict);
-
-  dict_destroy (old_dict);
-}
-
 void 
 dataset_need_lag (struct dataset *ds, int n_before)
 {
   ds->n_lag = MAX (ds->n_lag, n_before);
 }
-
-struct casefile_factory *
-dataset_get_casefile_factory (const struct dataset *ds)
-{
-  return ds->cf_factory;
-}
-