Patch #5672
[pspp-builds.git] / src / data / procedure.c
index 16dcd745929f9962d489fd58f66bcd4110fcfc13..b6f988aabbaf46b2707940aca2e646504ced5ffa 100644 (file)
@@ -1,6 +1,5 @@
 /* PSPP - computes sample statistics.
 /* PSPP - computes sample statistics.
-   Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
-   Written by Ben Pfaff <blp@gnu.org>.
+   Copyright (C) 1997-9, 2000, 2006, 2007 Free Software Foundation, Inc.
 
    This program is free software; you can redistribute it and/or
    modify it under the terms of the GNU General Public License as
 
    This program is free software; you can redistribute it and/or
    modify it under the terms of the GNU General Public License as
@@ -28,6 +27,7 @@
 #include <data/case-sink.h>
 #include <data/case.h>
 #include <data/casefile.h>
 #include <data/case-sink.h>
 #include <data/case.h>
 #include <data/casefile.h>
+#include <data/fastfile.h>
 #include <data/dictionary.h>
 #include <data/file-handle-def.h>
 #include <data/procedure.h>
 #include <data/dictionary.h>
 #include <data/file-handle-def.h>
 #include <data/procedure.h>
 #include <libpspp/misc.h>
 #include <libpspp/str.h>
 
 #include <libpspp/misc.h>
 #include <libpspp/str.h>
 
-/* Procedure execution data. */
-struct write_case_data
-  {
-    /* Function to call for each case. */
-    bool (*case_func) (const struct ccase *, void *);
-    void *aux;
+struct dataset {
+
+  /* An abstract factory which creates casefiles */
+  struct casefile_factory *cf_factory;
+
+  /* Callback which occurs when a procedure provides a new source for
+     the dataset */
+  replace_source_callback *replace_source ;
+
+  /* Callback which occurs whenever the DICT is replaced by a new one */
+  replace_dictionary_callback *replace_dict;
+
+  /* Cases are read from proc_source,
+     pass through permanent_trns_chain (which transforms them into
+     the format described by permanent_dict),
+     are written to proc_sink,
+     pass through temporary_trns_chain (which transforms them into
+     the format described by dict),
+     and are finally passed to the procedure. */
+  struct case_source *proc_source;
+  struct trns_chain *permanent_trns_chain;
+  struct dictionary *permanent_dict;
+  struct case_sink *proc_sink;
+  struct trns_chain *temporary_trns_chain;
+  struct dictionary *dict;
+
+  /* The transformation chain that the next transformation will be
+     added to. */
+  struct trns_chain *cur_trns_chain;
+
+  /* The compactor used to compact a case, if necessary;
+     otherwise a null pointer. */
+  struct dict_compactor *compactor;
+
+  /* Time at which proc was last invoked. */
+  time_t last_proc_invocation;
+
+  /* Lag queue. */
+  int n_lag;                   /* Number of cases to lag. */
+  int lag_count;               /* Number of cases in lag_queue so far. */
+  int lag_head;                /* Index where next case will be added. */
+  struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
+
+  /* Procedure data. */
+  bool is_open;               /* Procedure open? */
+  struct ccase trns_case;     /* Case used for transformations. */
+  struct ccase sink_case;     /* Case written to sink, if
+                                 compacting is necessary. */
+  size_t cases_written;       /* Cases output so far. */
+  bool ok;
+}; /* struct dataset */
 
 
-    struct ccase trns_case;     /* Case used for transformations. */
-    struct ccase sink_case;     /* Case written to sink, if
-                                   compaction is necessary. */
-    size_t cases_written;       /* Cases output so far. */
-  };
 
 
-/* Cases are read from proc_source,
-   pass through permanent_trns_chain (which transforms them into
-   the format described by permanent_dict),
-   are written to proc_sink,
-   pass through temporary_trns_chain (which transforms them into
-   the format described by default_dict),
-   and are finally passed to the procedure. */
-static struct case_source *proc_source;
-static struct trns_chain *permanent_trns_chain;
-static struct dictionary *permanent_dict;
-static struct case_sink *proc_sink;
-static struct trns_chain *temporary_trns_chain;
-struct dictionary *default_dict;
-
-/* The transformation chain that the next transformation will be
-   added to. */
-static struct trns_chain *cur_trns_chain;
-
-/* The compactor used to compact a case, if necessary;
-   otherwise a null pointer. */
-static struct dict_compactor *compactor;
-
-/* Time at which proc was last invoked. */
-static time_t last_proc_invocation;
-
-/* Lag queue. */
-int n_lag;                     /* Number of cases to lag. */
-static int lag_count;          /* Number of cases in lag_queue so far. */
-static int lag_head;           /* Index where next case will be added. */
-static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
-
-static void add_case_limit_trns (void);
-static void add_filter_trns (void);
-
-static bool internal_procedure (bool (*case_func) (const struct ccase *,
-                                                   void *),
-                                bool (*end_func) (void *),
+static void add_case_limit_trns (struct dataset *ds);
+static void add_filter_trns (struct dataset *ds);
+
+static bool internal_procedure (struct dataset *ds, case_func *,
+                                end_func *,
                                 void *aux);
                                 void *aux);
-static void update_last_proc_invocation (void);
+static void update_last_proc_invocation (struct dataset *ds);
 static void create_trns_case (struct ccase *, struct dictionary *);
 static void create_trns_case (struct ccase *, struct dictionary *);
-static void open_active_file (void);
-static bool write_case (struct write_case_data *wc_data);
-static void lag_case (const struct ccase *c);
-static void clear_case (struct ccase *c);
-static bool close_active_file (void);
+static void open_active_file (struct dataset *ds);
+static void lag_case (struct dataset *ds, const struct ccase *c);
+static void clear_case (const struct dataset *ds, struct ccase *c);
+static bool close_active_file (struct dataset *ds);
 \f
 /* Public functions. */
 
 /* Returns the last time the data was read. */
 time_t
 \f
 /* Public functions. */
 
 /* Returns the last time the data was read. */
 time_t
-time_of_last_procedure (void) 
+time_of_last_procedure (struct dataset *ds)
 {
 {
-  if (last_proc_invocation == 0)
-    update_last_proc_invocation ();
-  return last_proc_invocation;
+  if (ds->last_proc_invocation == 0)
+    update_last_proc_invocation (ds);
+  return ds->last_proc_invocation;
 }
 \f
 /* Regular procedure. */
 
 }
 \f
 /* Regular procedure. */
 
+
+
 /* Reads the data from the input program and writes it to a new
    active file.  For each case we read from the input program, we
    do the following:
 /* Reads the data from the input program and writes it to a new
    active file.  For each case we read from the input program, we
    do the following:
@@ -118,32 +127,49 @@ time_of_last_procedure (void)
       start the next case from step 1.
 
    2. Write case to replacement active file.
       start the next case from step 1.
 
    2. Write case to replacement active file.
-   
+
    3. Execute temporary transformations.  If these drop the case,
       start the next case from step 1.
    3. Execute temporary transformations.  If these drop the case,
       start the next case from step 1.
-      
+
    4. Pass case to PROC_FUNC, passing AUX as auxiliary data.
 
    Returns true if successful, false if an I/O error occurred. */
 bool
    4. Pass case to PROC_FUNC, passing AUX as auxiliary data.
 
    Returns true if successful, false if an I/O error occurred. */
 bool
-procedure (bool (*proc_func) (const struct ccase *, void *), void *aux)
+procedure (struct dataset *ds, case_func *cf, void *aux)
 {
 {
-  return internal_procedure (proc_func, NULL, aux);
+  update_last_proc_invocation (ds);
+
+  /* Optimize the trivial case where we're not going to do
+     anything with the data, by not reading the data at all. */
+  if (cf == NULL
+      && case_source_is_class (ds->proc_source, &storage_source_class)
+      && ds->proc_sink == NULL
+      && (ds->temporary_trns_chain == NULL
+          || trns_chain_is_empty (ds->temporary_trns_chain))
+      && trns_chain_is_empty (ds->permanent_trns_chain))
+    {
+      ds->n_lag = 0;
+      dict_set_case_limit (ds->dict, 0);
+      dict_clear_vectors (ds->dict);
+      return true;
+    }
+
+  return internal_procedure (ds, cf, NULL, aux);
 }
 \f
 /* Multipass procedure. */
 
 }
 \f
 /* Multipass procedure. */
 
-struct multipass_aux_data 
+struct multipass_aux_data
   {
     struct casefile *casefile;
   {
     struct casefile *casefile;
-    
+
     bool (*proc_func) (const struct casefile *, void *aux);
     void *aux;
   };
 
 /* Case processing function for multipass_procedure(). */
 static bool
     bool (*proc_func) (const struct casefile *, void *aux);
     void *aux;
   };
 
 /* Case processing function for multipass_procedure(). */
 static bool
-multipass_case_func (const struct ccase *c, void *aux_data_
+multipass_case_func (const struct ccase *c, void *aux_data_, const struct dataset *ds UNUSED)
 {
   struct multipass_aux_data *aux_data = aux_data_;
   return casefile_append (aux_data->casefile, c);
 {
   struct multipass_aux_data *aux_data = aux_data_;
   return casefile_append (aux_data->casefile, c);
@@ -151,7 +177,7 @@ multipass_case_func (const struct ccase *c, void *aux_data_)
 
 /* End-of-file function for multipass_procedure(). */
 static bool
 
 /* End-of-file function for multipass_procedure(). */
 static bool
-multipass_end_func (void *aux_data_
+multipass_end_func (void *aux_data_, const struct dataset *ds UNUSED)
 {
   struct multipass_aux_data *aux_data = aux_data_;
   return (aux_data->proc_func == NULL
 {
   struct multipass_aux_data *aux_data = aux_data_;
   return (aux_data->proc_func == NULL
@@ -162,17 +188,19 @@ multipass_end_func (void *aux_data_)
    The entire active file is passed to PROC_FUNC, with the given
    AUX as auxiliary data, as a unit. */
 bool
    The entire active file is passed to PROC_FUNC, with the given
    AUX as auxiliary data, as a unit. */
 bool
-multipass_procedure (bool (*proc_func) (const struct casefile *, void *aux),
-                     void *aux) 
+multipass_procedure (struct dataset *ds, casefile_func *proc_func,  void *aux)
 {
   struct multipass_aux_data aux_data;
   bool ok;
 
 {
   struct multipass_aux_data aux_data;
   bool ok;
 
-  aux_data.casefile = casefile_create (dict_get_next_value_idx (default_dict));
+  aux_data.casefile =
+    ds->cf_factory->create_casefile (ds->cf_factory,
+                                    dict_get_next_value_idx (ds->dict));
+
   aux_data.proc_func = proc_func;
   aux_data.aux = aux;
 
   aux_data.proc_func = proc_func;
   aux_data.aux = aux;
 
-  ok = internal_procedure (multipass_case_func, multipass_end_func, &aux_data);
+  ok = internal_procedure (ds, multipass_case_func, multipass_end_func, &aux_data);
   ok = !casefile_error (aux_data.casefile) && ok;
 
   casefile_destroy (aux_data.casefile);
   ok = !casefile_error (aux_data.casefile) && ok;
 
   casefile_destroy (aux_data.casefile);
@@ -180,6 +208,7 @@ multipass_procedure (bool (*proc_func) (const struct casefile *, void *aux),
   return ok;
 }
 \f
   return ok;
 }
 \f
+
 /* Procedure implementation. */
 
 /* Executes a procedure.
 /* Procedure implementation. */
 
 /* Executes a procedure.
@@ -188,59 +217,150 @@ multipass_procedure (bool (*proc_func) (const struct casefile *, void *aux),
    Returns true if successful, false if an I/O error occurred (or
    if CASE_FUNC or END_FUNC ever returned false). */
 static bool
    Returns true if successful, false if an I/O error occurred (or
    if CASE_FUNC or END_FUNC ever returned false). */
 static bool
-internal_procedure (bool (*case_func) (const struct ccase *, void *),
-                    bool (*end_func) (void *),
-                    void *aux) 
+internal_procedure (struct dataset *ds, case_func *proc,
+                   end_func *end,
+                    void *aux)
 {
 {
-  struct write_case_data wc_data;
+  struct ccase *c;
   bool ok = true;
 
   bool ok = true;
 
-  assert (proc_source != NULL);
+  proc_open (ds);
+  while (ok && proc_read (ds, &c))
+    if (proc != NULL)
+      ok = proc (c, aux, ds) && ok;
+  if (end != NULL)
+    ok = end (aux, ds) && ok;
 
 
-  update_last_proc_invocation ();
+  if ( proc_close (ds) && ok )
+    {
 
 
-  /* Optimize the trivial case where we're not going to do
-     anything with the data, by not reading the data at all. */
-  if (case_func == NULL && end_func == NULL
-      && case_source_is_class (proc_source, &storage_source_class)
-      && proc_sink == NULL
-      && (temporary_trns_chain == NULL
-          || trns_chain_is_empty (temporary_trns_chain))
-      && trns_chain_is_empty (permanent_trns_chain))
+      return true;
+    }
+
+  return false;
+}
+
+/* Opens dataset DS for reading cases with proc_read.
+   proc_close must be called when done. */
+void
+proc_open (struct dataset *ds)
+{
+  assert (ds->proc_source != NULL);
+  assert (!ds->is_open);
+
+  update_last_proc_invocation (ds);
+
+  open_active_file (ds);
+
+  ds->is_open = true;
+  create_trns_case (&ds->trns_case, ds->dict);
+  case_create (&ds->sink_case, dict_get_compacted_value_cnt (ds->dict));
+  ds->cases_written = 0;
+  ds->ok = true;
+}
+
+/* Reads the next case from dataset DS, which must have been
+   opened for reading with proc_open.
+   Returns true if successful, in which case a pointer to the
+   case is stored in *C.
+   Return false at end of file or if a read error occurs.  In
+   this case a null pointer is stored in *C. */
+bool
+proc_read (struct dataset *ds, struct ccase **c)
+{
+  enum trns_result retval = TRNS_DROP_CASE;
+
+  assert (ds->is_open);
+  *c = NULL;
+  for (;;)
     {
     {
-      n_lag = 0;
-      dict_set_case_limit (default_dict, 0);
-      dict_clear_vectors (default_dict);
+      size_t case_nr;
+
+      assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
+      if (retval == TRNS_ERROR)
+        ds->ok = false;
+      if (!ds->ok)
+        return false;
+
+      /* Read a case from proc_source. */
+      clear_case (ds, &ds->trns_case);
+      if (!ds->proc_source->class->read (ds->proc_source, &ds->trns_case))
+        return false;
+
+      /* Execute permanent transformations.  */
+      case_nr = ds->cases_written + 1;
+      retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
+                                   &ds->trns_case, &case_nr);
+      if (retval != TRNS_CONTINUE)
+        continue;
+
+      /* Write case to LAG queue. */
+      if (ds->n_lag)
+        lag_case (ds, &ds->trns_case);
+
+      /* Write case to replacement active file. */
+      ds->cases_written++;
+      if (ds->proc_sink->class->write != NULL)
+        {
+          if (ds->compactor != NULL)
+            {
+              dict_compactor_compact (ds->compactor, &ds->sink_case,
+                                      &ds->trns_case);
+              ds->proc_sink->class->write (ds->proc_sink, &ds->sink_case);
+            }
+          else
+            ds->proc_sink->class->write (ds->proc_sink, &ds->trns_case);
+        }
+
+      /* Execute temporary transformations. */
+      if (ds->temporary_trns_chain != NULL)
+        {
+          retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
+                                       &ds->trns_case, &ds->cases_written);
+          if (retval != TRNS_CONTINUE)
+            continue;
+        }
+
+      *c = &ds->trns_case;
       return true;
     }
       return true;
     }
-  
-  open_active_file ();
-  
-  wc_data.case_func = case_func;
-  wc_data.aux = aux;
-  create_trns_case (&wc_data.trns_case, default_dict);
-  case_create (&wc_data.sink_case, dict_get_next_value_idx (default_dict));
-  wc_data.cases_written = 0;
-
-  ok = proc_source->class->read (proc_source,
-                                 &wc_data.trns_case,
-                                 write_case, &wc_data) && ok;
-  if (end_func != NULL)
-    ok = end_func (aux) && ok;
-
-  case_destroy (&wc_data.sink_case);
-  case_destroy (&wc_data.trns_case);
-
-  ok = close_active_file () && ok;
+}
 
 
-  return ok;
+/* Closes dataset DS for reading.
+   Returns true if successful, false if an I/O error occurred
+   while reading or closing the data set.
+   If DS has not been opened, returns true without doing
+   anything else. */
+bool
+proc_close (struct dataset *ds)
+{
+  if (!ds->is_open)
+    return true;
+
+  /* Drain any remaining cases. */
+  while (ds->ok)
+    {
+      struct ccase *c;
+      if (!proc_read (ds, &c))
+        break;
+    }
+  ds->ok = free_case_source (ds->proc_source) && ds->ok;
+  proc_set_source (ds, NULL);
+
+  case_destroy (&ds->sink_case);
+  case_destroy (&ds->trns_case);
+
+  ds->ok = close_active_file (ds) && ds->ok;
+  ds->is_open = false;
+
+  return ds->ok;
 }
 
 /* Updates last_proc_invocation. */
 static void
 }
 
 /* Updates last_proc_invocation. */
 static void
-update_last_proc_invocation (void) 
+update_last_proc_invocation (struct dataset *ds)
 {
 {
-  last_proc_invocation = time (NULL);
+  ds->last_proc_invocation = time (NULL);
 }
 
 /* Creates and returns a case, initializing it from the vectors
 }
 
 /* Creates and returns a case, initializing it from the vectors
@@ -253,201 +373,146 @@ create_trns_case (struct ccase *trns_case, struct dictionary *dict)
   size_t i;
 
   case_create (trns_case, dict_get_next_value_idx (dict));
   size_t i;
 
   case_create (trns_case, dict_get_next_value_idx (dict));
-  for (i = 0; i < var_cnt; i++) 
+  for (i = 0; i < var_cnt; i++)
     {
       struct variable *v = dict_get_var (dict, i);
     {
       struct variable *v = dict_get_var (dict, i);
-      union value *value = case_data_rw (trns_case, v->fv);
+      union value *value = case_data_rw (trns_case, v);
 
 
-      if (v->type == NUMERIC)
-        value->f = v->leave ? 0.0 : SYSMIS;
+      if (var_is_numeric (v))
+        value->f = var_get_leave (v) ? 0.0 : SYSMIS;
       else
       else
-        memset (value->s, ' ', v->width);
+        memset (value->s, ' ', var_get_width (v));
     }
 }
 
 /* Makes all preparations for reading from the data source and writing
    to the data sink. */
 static void
     }
 }
 
 /* Makes all preparations for reading from the data source and writing
    to the data sink. */
 static void
-open_active_file (void)
+open_active_file (struct dataset *ds)
 {
 {
-  add_case_limit_trns ();
-  add_filter_trns ();
+  add_case_limit_trns (ds);
+  add_filter_trns (ds);
 
   /* Finalize transformations. */
 
   /* Finalize transformations. */
-  trns_chain_finalize (cur_trns_chain);
+  trns_chain_finalize (ds->cur_trns_chain);
 
   /* Make permanent_dict refer to the dictionary right before
      data reaches the sink. */
 
   /* Make permanent_dict refer to the dictionary right before
      data reaches the sink. */
-  if (permanent_dict == NULL)
-    permanent_dict = default_dict;
+  if (ds->permanent_dict == NULL)
+    ds->permanent_dict = ds->dict;
 
 
-  /* Figure out compaction. */
-  compactor = (dict_needs_compaction (permanent_dict)
-               ? dict_make_compactor (permanent_dict)
-               : NULL);
+  /* Figure out whether to compact. */
+  ds->compactor =
+    (dict_compacting_would_shrink (ds->permanent_dict)
+     ? dict_make_compactor (ds->permanent_dict)
+     : NULL);
 
   /* Prepare sink. */
 
   /* Prepare sink. */
-  if (proc_sink == NULL)
-    proc_sink = create_case_sink (&storage_sink_class, permanent_dict, NULL);
-  if (proc_sink->class->open != NULL)
-    proc_sink->class->open (proc_sink);
+  if (ds->proc_sink == NULL)
+    ds->proc_sink = create_case_sink (&storage_sink_class,
+                                     ds->permanent_dict,
+                                     ds->cf_factory,
+                                     NULL);
+  if (ds->proc_sink->class->open != NULL)
+    ds->proc_sink->class->open (ds->proc_sink);
 
   /* Allocate memory for lag queue. */
 
   /* Allocate memory for lag queue. */
-  if (n_lag > 0)
+  if (ds->n_lag > 0)
     {
       int i;
     {
       int i;
-  
-      lag_count = 0;
-      lag_head = 0;
-      lag_queue = xnmalloc (n_lag, sizeof *lag_queue);
-      for (i = 0; i < n_lag; i++)
-        case_nullify (&lag_queue[i]);
-    }
-}
 
 
-/* Transforms trns_case and writes it to the replacement active
-   file if advisable.  Returns true if more cases can be
-   accepted, false otherwise.  Do not call this function again
-   after it has returned false once.  */
-static bool
-write_case (struct write_case_data *wc_data)
-{
-  enum trns_result retval;
-  size_t case_nr;
-  
-  /* Execute permanent transformations.  */
-  case_nr = wc_data->cases_written + 1;
-  retval = trns_chain_execute (permanent_trns_chain,
-                               &wc_data->trns_case, &case_nr);
-  if (retval != TRNS_CONTINUE)
-    goto done;
-
-  /* Write case to LAG queue. */
-  if (n_lag)
-    lag_case (&wc_data->trns_case);
-
-  /* Write case to replacement active file. */
-  wc_data->cases_written++;
-  if (proc_sink->class->write != NULL) 
-    {
-      if (compactor != NULL) 
-        {
-          dict_compactor_compact (compactor, &wc_data->sink_case,
-                                  &wc_data->trns_case);
-          proc_sink->class->write (proc_sink, &wc_data->sink_case);
-        }
-      else
-        proc_sink->class->write (proc_sink, &wc_data->trns_case);
-    }
-  
-  /* Execute temporary transformations. */
-  if (temporary_trns_chain != NULL) 
-    {
-      retval = trns_chain_execute (temporary_trns_chain,
-                                   &wc_data->trns_case,
-                                   &wc_data->cases_written);
-      if (retval != TRNS_CONTINUE)
-        goto done;
+      ds->lag_count = 0;
+      ds->lag_head = 0;
+      ds->lag_queue = xnmalloc (ds->n_lag, sizeof *ds->lag_queue);
+      for (i = 0; i < ds->n_lag; i++)
+        case_nullify (&ds->lag_queue[i]);
     }
     }
-
-  /* Pass case to procedure. */
-  if (wc_data->case_func != NULL)
-    if (!wc_data->case_func (&wc_data->trns_case, wc_data->aux))
-      retval = TRNS_ERROR;
-
- done:
-  clear_case (&wc_data->trns_case);
-  return retval != TRNS_ERROR;
 }
 
 /* Add C to the lag queue. */
 static void
 }
 
 /* Add C to the lag queue. */
 static void
-lag_case (const struct ccase *c)
+lag_case (struct dataset *ds, const struct ccase *c)
 {
 {
-  if (lag_count < n_lag)
-    lag_count++;
-  case_destroy (&lag_queue[lag_head]);
-  case_clone (&lag_queue[lag_head], c);
-  if (++lag_head >= n_lag)
-    lag_head = 0;
+  if (ds->lag_count < ds->n_lag)
+    ds->lag_count++;
+  case_destroy (&ds->lag_queue[ds->lag_head]);
+  case_clone (&ds->lag_queue[ds->lag_head], c);
+  if (++ds->lag_head >= ds->n_lag)
+    ds->lag_head = 0;
 }
 
 /* Clears the variables in C that need to be cleared between
    processing cases.  */
 static void
 }
 
 /* Clears the variables in C that need to be cleared between
    processing cases.  */
 static void
-clear_case (struct ccase *c)
+clear_case (const struct dataset *ds, struct ccase *c)
 {
 {
-  size_t var_cnt = dict_get_var_cnt (default_dict);
+  size_t var_cnt = dict_get_var_cnt (ds->dict);
   size_t i;
   size_t i;
-  
-  for (i = 0; i < var_cnt; i++) 
+
+  for (i = 0; i < var_cnt; i++)
     {
     {
-      struct variable *v = dict_get_var (default_dict, i);
-      if (!v->leave) 
+      struct variable *v = dict_get_var (ds->dict, i);
+      if (!var_get_leave (v))
         {
         {
-          if (v->type == NUMERIC)
-            case_data_rw (c, v->fv)->f = SYSMIS;
+          if (var_is_numeric (v))
+            case_data_rw (c, v)->f = SYSMIS;
           else
           else
-            memset (case_data_rw (c, v->fv)->s, ' ', v->width);
-        } 
+            memset (case_data_rw (c, v)->s, ' ', var_get_width (v));
+        }
     }
 }
 
 /* Closes the active file. */
 static bool
     }
 }
 
 /* Closes the active file. */
 static bool
-close_active_file (void)
+close_active_file (struct dataset *ds)
 {
   /* Free memory for lag queue, and turn off lagging. */
 {
   /* Free memory for lag queue, and turn off lagging. */
-  if (n_lag > 0)
+  if (ds->n_lag > 0)
     {
       int i;
     {
       int i;
-      
-      for (i = 0; i < n_lag; i++)
-       case_destroy (&lag_queue[i]);
-      free (lag_queue);
-      n_lag = 0;
+
+      for (i = 0; i < ds->n_lag; i++)
+       case_destroy (&ds->lag_queue[i]);
+      free (ds->lag_queue);
+      ds->n_lag = 0;
     }
     }
-  
+
   /* Dictionary from before TEMPORARY becomes permanent. */
   /* Dictionary from before TEMPORARY becomes permanent. */
-  proc_cancel_temporary_transformations ();
+  proc_cancel_temporary_transformations (ds);
 
 
-  /* Finish compaction. */
-  if (compactor != NULL) 
+  /* Finish compacting. */
+  if (ds->compactor != NULL)
     {
     {
-      dict_compactor_destroy (compactor);
-      dict_compact_values (default_dict);
-      compactor = NULL;
+      dict_compactor_destroy (ds->compactor);
+      dict_compact_values (ds->dict);
+      ds->compactor = NULL;
     }
     }
-    
-  /* Free data source. */
-  free_case_source (proc_source);
-  proc_source = NULL;
 
   /* Old data sink becomes new data source. */
 
   /* Old data sink becomes new data source. */
-  if (proc_sink->class->make_source != NULL)
-    proc_source = proc_sink->class->make_source (proc_sink);
-  free_case_sink (proc_sink);
-  proc_sink = NULL;
-
-  dict_clear_vectors (default_dict);
-  permanent_dict = NULL;
-  return proc_cancel_all_transformations ();
+  if (ds->proc_sink->class->make_source != NULL)
+    proc_set_source (ds, ds->proc_sink->class->make_source (ds->proc_sink) );
+  free_case_sink (ds->proc_sink);
+  ds->proc_sink = NULL;
+
+  dict_clear_vectors (ds->dict);
+  ds->permanent_dict = NULL;
+  return proc_cancel_all_transformations (ds);
 }
 \f
 /* Returns a pointer to the lagged case from N_BEFORE cases before the
    current one, or NULL if there haven't been that many cases yet. */
 struct ccase *
 }
 \f
 /* Returns a pointer to the lagged case from N_BEFORE cases before the
    current one, or NULL if there haven't been that many cases yet. */
 struct ccase *
-lagged_case (int n_before)
+lagged_case (const struct dataset *ds, int n_before)
 {
   assert (n_before >= 1 );
 {
   assert (n_before >= 1 );
-  assert (n_before <= n_lag);
+  assert (n_before <= ds->n_lag);
 
 
-  if (n_before <= lag_count)
+  if (n_before <= ds->lag_count)
     {
     {
-      int index = lag_head - n_before;
+      int index = ds->lag_head - n_before;
       if (index < 0)
       if (index < 0)
-        index += n_lag;
-      return &lag_queue[index];
+        index += ds->n_lag;
+      return &ds->lag_queue[index];
     }
   else
     return NULL;
     }
   else
     return NULL;
@@ -456,21 +521,21 @@ lagged_case (int n_before)
 /* Procedure that separates the data into SPLIT FILE groups. */
 
 /* Represents auxiliary data for handling SPLIT FILE. */
 /* Procedure that separates the data into SPLIT FILE groups. */
 
 /* Represents auxiliary data for handling SPLIT FILE. */
-struct split_aux_data 
+struct split_aux_data
   {
   {
-    size_t case_count;          /* Number of cases so far. */
+    struct dataset *dataset;    /* The dataset */
     struct ccase prev_case;     /* Data in previous case. */
 
     /* Callback functions. */
     struct ccase prev_case;     /* Data in previous case. */
 
     /* Callback functions. */
-    void (*begin_func) (const struct ccase *, void *);
-    bool (*proc_func) (const struct ccase *, void *);
-    void (*end_func) (void *);
+    begin_func *begin;
+    case_func *proc;
+    end_func *end;
     void *func_aux;
   };
 
     void *func_aux;
   };
 
-static int equal_splits (const struct ccase *, const struct ccase *);
-static bool split_procedure_case_func (const struct ccase *c, void *);
-static bool split_procedure_end_func (void *);
+static int equal_splits (const struct ccase *, const struct ccase *, const struct dataset *ds);
+static bool split_procedure_case_func (const struct ccase *c, void *, const struct dataset *);
+static bool split_procedure_end_func (void *, const struct dataset *);
 
 /* Like procedure(), but it automatically breaks the case stream
    into SPLIT FILE break groups.  Before each group of cases with
 
 /* Like procedure(), but it automatically breaks the case stream
    into SPLIT FILE break groups.  Before each group of cases with
@@ -482,30 +547,31 @@ static bool split_procedure_end_func (void *);
    passed to each of the functions as auxiliary data.
 
    If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
    passed to each of the functions as auxiliary data.
 
    If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
-   and END_FUNC will be called at all. 
+   and END_FUNC will be called at all.
 
    If SPLIT FILE is not in effect, then there is one break group
    (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
    will be called once.
 
    If SPLIT FILE is not in effect, then there is one break group
    (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
    will be called once.
-   
+
    Returns true if successful, false if an I/O error occurred. */
 bool
    Returns true if successful, false if an I/O error occurred. */
 bool
-procedure_with_splits (void (*begin_func) (const struct ccase *, void *aux),
-                       bool (*proc_func) (const struct ccase *, void *aux),
-                       void (*end_func) (void *aux),
-                       void *func_aux) 
+procedure_with_splits (struct dataset *ds,
+                      begin_func begin,
+                      case_func *proc,
+                       end_func *end,
+                       void *func_aux)
 {
   struct split_aux_data split_aux;
   bool ok;
 
 {
   struct split_aux_data split_aux;
   bool ok;
 
-  split_aux.case_count = 0;
   case_nullify (&split_aux.prev_case);
   case_nullify (&split_aux.prev_case);
-  split_aux.begin_func = begin_func;
-  split_aux.proc_func = proc_func;
-  split_aux.end_func = end_func;
+  split_aux.begin = begin;
+  split_aux.proc = proc;
+  split_aux.end = end;
   split_aux.func_aux = func_aux;
   split_aux.func_aux = func_aux;
+  split_aux.dataset = ds;
 
 
-  ok = internal_procedure (split_procedure_case_func,
+  ok = internal_procedure (ds, split_procedure_case_func,
                            split_procedure_end_func, &split_aux);
 
   case_destroy (&split_aux.prev_case);
                            split_procedure_end_func, &split_aux);
 
   case_destroy (&split_aux.prev_case);
@@ -515,48 +581,48 @@ procedure_with_splits (void (*begin_func) (const struct ccase *, void *aux),
 
 /* Case callback used by procedure_with_splits(). */
 static bool
 
 /* Case callback used by procedure_with_splits(). */
 static bool
-split_procedure_case_func (const struct ccase *c, void *split_aux_
+split_procedure_case_func (const struct ccase *c, void *split_aux_, const struct dataset *ds)
 {
   struct split_aux_data *split_aux = split_aux_;
 
   /* Start a new series if needed. */
 {
   struct split_aux_data *split_aux = split_aux_;
 
   /* Start a new series if needed. */
-  if (split_aux->case_count == 0
-      || !equal_splits (c, &split_aux->prev_case))
+  if (case_is_null (&split_aux->prev_case)
+      || !equal_splits (c, &split_aux->prev_case, split_aux->dataset))
     {
     {
-      if (split_aux->case_count > 0 && split_aux->end_func != NULL)
-        split_aux->end_func (split_aux->func_aux);
+      if (!case_is_null (&split_aux->prev_case) && split_aux->end != NULL)
+        split_aux->end (split_aux->func_aux, ds);
 
       case_destroy (&split_aux->prev_case);
       case_clone (&split_aux->prev_case, c);
 
 
       case_destroy (&split_aux->prev_case);
       case_clone (&split_aux->prev_case, c);
 
-      if (split_aux->begin_func != NULL)
-       split_aux->begin_func (&split_aux->prev_case, split_aux->func_aux);
+      if (split_aux->begin != NULL)
+       split_aux->begin (&split_aux->prev_case, split_aux->func_aux, ds);
     }
 
     }
 
-  split_aux->case_count++;
-  return (split_aux->proc_func == NULL
-          || split_aux->proc_func (c, split_aux->func_aux));
+  return (split_aux->proc == NULL
+          || split_aux->proc (c, split_aux->func_aux, ds));
 }
 
 /* End-of-file callback used by procedure_with_splits(). */
 static bool
 }
 
 /* End-of-file callback used by procedure_with_splits(). */
 static bool
-split_procedure_end_func (void *split_aux_
+split_procedure_end_func (void *split_aux_, const struct dataset *ds)
 {
   struct split_aux_data *split_aux = split_aux_;
 
 {
   struct split_aux_data *split_aux = split_aux_;
 
-  if (split_aux->case_count > 0 && split_aux->end_func != NULL)
-    split_aux->end_func (split_aux->func_aux);
+  if (!case_is_null (&split_aux->prev_case) && split_aux->end != NULL)
+    split_aux->end (split_aux->func_aux, ds);
   return true;
 }
 
 /* Compares the SPLIT FILE variables in cases A and B and returns
    nonzero only if they differ. */
 static int
   return true;
 }
 
 /* Compares the SPLIT FILE variables in cases A and B and returns
    nonzero only if they differ. */
 static int
-equal_splits (const struct ccase *a, const struct ccase *b) 
+equal_splits (const struct ccase *a, const struct ccase *b,
+             const struct dataset *ds)
 {
   return case_compare (a, b,
 {
   return case_compare (a, b,
-                       dict_get_split_vars (default_dict),
-                       dict_get_split_cnt (default_dict)) == 0;
+                       dict_get_split_vars (ds->dict),
+                       dict_get_split_cnt (ds->dict)) == 0;
 }
 \f
 /* Multipass procedure that separates the data into SPLIT FILE
 }
 \f
 /* Multipass procedure that separates the data into SPLIT FILE
@@ -564,26 +630,24 @@ equal_splits (const struct ccase *a, const struct ccase *b)
 
 /* Represents auxiliary data for handling SPLIT FILE in a
    multipass procedure. */
 
 /* Represents auxiliary data for handling SPLIT FILE in a
    multipass procedure. */
-struct multipass_split_aux_data 
+struct multipass_split_aux_data
   {
   {
+    struct dataset *dataset;    /* The dataset of the split */
     struct ccase prev_case;     /* Data in previous case. */
     struct casefile *casefile;  /* Accumulates data for a split. */
     struct ccase prev_case;     /* Data in previous case. */
     struct casefile *casefile;  /* Accumulates data for a split. */
-
-    /* Function to call with the accumulated data. */
-    bool (*split_func) (const struct ccase *first, const struct casefile *,
-                        void *);
-    void *func_aux;                            /* Auxiliary data. */ 
+    split_func *split;          /* Function to call with the accumulated
+                                  data. */
+    void *func_aux;             /* Auxiliary data. */
   };
 
   };
 
-static bool multipass_split_case_func (const struct ccase *c, void *aux_);
-static bool multipass_split_end_func (void *aux_);
-static bool multipass_split_output (struct multipass_split_aux_data *);
+static bool multipass_split_case_func (const struct ccase *c, void *aux_, const struct dataset *);
+static bool multipass_split_end_func (void *aux_, const struct dataset *ds);
+static bool multipass_split_output (struct multipass_split_aux_data *, const struct dataset *ds);
 
 /* Returns true if successful, false if an I/O error occurred. */
 bool
 
 /* Returns true if successful, false if an I/O error occurred. */
 bool
-multipass_procedure_with_splits (bool (*split_func) (const struct ccase *first,
-                                                     const struct casefile *,
-                                                     void *aux),
+multipass_procedure_with_splits (struct dataset *ds,
+                                split_func  *split,
                                  void *func_aux)
 {
   struct multipass_split_aux_data aux;
                                  void *func_aux)
 {
   struct multipass_split_aux_data aux;
@@ -591,10 +655,11 @@ multipass_procedure_with_splits (bool (*split_func) (const struct ccase *first,
 
   case_nullify (&aux.prev_case);
   aux.casefile = NULL;
 
   case_nullify (&aux.prev_case);
   aux.casefile = NULL;
-  aux.split_func = split_func;
+  aux.split = split;
   aux.func_aux = func_aux;
   aux.func_aux = func_aux;
+  aux.dataset = ds;
 
 
-  ok = internal_procedure (multipass_split_case_func,
+  ok = internal_procedure (ds, multipass_split_case_func,
                            multipass_split_end_func, &aux);
   case_destroy (&aux.prev_case);
 
                            multipass_split_end_func, &aux);
   case_destroy (&aux.prev_case);
 
@@ -603,13 +668,13 @@ multipass_procedure_with_splits (bool (*split_func) (const struct ccase *first,
 
 /* Case callback used by multipass_procedure_with_splits(). */
 static bool
 
 /* Case callback used by multipass_procedure_with_splits(). */
 static bool
-multipass_split_case_func (const struct ccase *c, void *aux_)
+multipass_split_case_func (const struct ccase *c, void *aux_, const struct dataset *ds)
 {
   struct multipass_split_aux_data *aux = aux_;
   bool ok = true;
 
   /* Start a new series if needed. */
 {
   struct multipass_split_aux_data *aux = aux_;
   bool ok = true;
 
   /* Start a new series if needed. */
-  if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case))
+  if (aux->casefile == NULL || ! equal_splits (c, &aux->prev_case, ds))
     {
       /* Record split values. */
       case_destroy (&aux->prev_case);
     {
       /* Record split values. */
       case_destroy (&aux->prev_case);
@@ -617,10 +682,12 @@ multipass_split_case_func (const struct ccase *c, void *aux_)
 
       /* Pass any cases to split_func. */
       if (aux->casefile != NULL)
 
       /* Pass any cases to split_func. */
       if (aux->casefile != NULL)
-        ok = multipass_split_output (aux);
+        ok = multipass_split_output (aux, ds);
 
       /* Start a new casefile. */
 
       /* Start a new casefile. */
-      aux->casefile = casefile_create (dict_get_next_value_idx (default_dict));
+      aux->casefile =
+       ds->cf_factory->create_casefile (ds->cf_factory,
+                                        dict_get_next_value_idx (ds->dict));
     }
 
   return casefile_append (aux->casefile, c) && ok;
     }
 
   return casefile_append (aux->casefile, c) && ok;
@@ -628,19 +695,19 @@ multipass_split_case_func (const struct ccase *c, void *aux_)
 
 /* End-of-file callback used by multipass_procedure_with_splits(). */
 static bool
 
 /* End-of-file callback used by multipass_procedure_with_splits(). */
 static bool
-multipass_split_end_func (void *aux_)
+multipass_split_end_func (void *aux_, const struct dataset *ds)
 {
   struct multipass_split_aux_data *aux = aux_;
 {
   struct multipass_split_aux_data *aux = aux_;
-  return (aux->casefile == NULL || multipass_split_output (aux));
+  return (aux->casefile == NULL || multipass_split_output (aux, ds));
 }
 
 static bool
 }
 
 static bool
-multipass_split_output (struct multipass_split_aux_data *aux)
+multipass_split_output (struct multipass_split_aux_data *aux, const struct dataset *ds)
 {
   bool ok;
 {
   bool ok;
-  
+
   assert (aux->casefile != NULL);
   assert (aux->casefile != NULL);
-  ok = aux->split_func (&aux->prev_case, aux->casefile, aux->func_aux);
+  ok = aux->split (&aux->prev_case, aux->casefile, aux->func_aux, ds);
   casefile_destroy (aux->casefile);
   aux->casefile = NULL;
 
   casefile_destroy (aux->casefile);
   aux->casefile = NULL;
 
@@ -650,30 +717,30 @@ multipass_split_output (struct multipass_split_aux_data *aux)
 /* Discards all the current state in preparation for a data-input
    command like DATA LIST or GET. */
 void
 /* Discards all the current state in preparation for a data-input
    command like DATA LIST or GET. */
 void
-discard_variables (void)
+discard_variables (struct dataset *ds)
 {
 {
-  dict_clear (default_dict);
+  dict_clear (ds->dict);
   fh_set_default_handle (NULL);
 
   fh_set_default_handle (NULL);
 
-  n_lag = 0;
-  
-  free_case_source (proc_source);
-  proc_source = NULL;
+  ds->n_lag = 0;
 
 
-  proc_cancel_all_transformations ();
+  free_case_source (ds->proc_source);
+  proc_set_source (ds, NULL);
+
+  proc_cancel_all_transformations (ds);
 }
 \f
 /* Returns the current set of permanent transformations,
    and clears the permanent transformations.
    For use by INPUT PROGRAM. */
 struct trns_chain *
 }
 \f
 /* Returns the current set of permanent transformations,
    and clears the permanent transformations.
    For use by INPUT PROGRAM. */
 struct trns_chain *
-proc_capture_transformations (void) 
+proc_capture_transformations (struct dataset *ds)
 {
   struct trns_chain *chain;
 {
   struct trns_chain *chain;
-  
-  assert (temporary_trns_chain == NULL);
-  chain = permanent_trns_chain;
-  cur_trns_chain = permanent_trns_chain = trns_chain_create ();
+
+  assert (ds->temporary_trns_chain == NULL);
+  chain = ds->permanent_trns_chain;
+  ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
   return chain;
 }
 
   return chain;
 }
 
@@ -681,9 +748,9 @@ proc_capture_transformations (void)
    frees itself with FREE to the current set of transformations.
    The functions are passed AUX as auxiliary data. */
 void
    frees itself with FREE to the current set of transformations.
    The functions are passed AUX as auxiliary data. */
 void
-add_transformation (trns_proc_func *proc, trns_free_func *free, void *aux)
+add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
 {
 {
-  trns_chain_append (cur_trns_chain, NULL, proc, free, aux);
+  trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
 }
 
 /* Adds a transformation that processes a case with PROC and
 }
 
 /* Adds a transformation that processes a case with PROC and
@@ -692,44 +759,46 @@ add_transformation (trns_proc_func *proc, trns_free_func *free, void *aux)
    FINALIZE will be called.
    The functions are passed AUX as auxiliary data. */
 void
    FINALIZE will be called.
    The functions are passed AUX as auxiliary data. */
 void
-add_transformation_with_finalizer (trns_finalize_func *finalize,
+add_transformation_with_finalizer (struct dataset *ds,
+                                  trns_finalize_func *finalize,
                                    trns_proc_func *proc,
                                    trns_free_func *free, void *aux)
 {
                                    trns_proc_func *proc,
                                    trns_free_func *free, void *aux)
 {
-  trns_chain_append (cur_trns_chain, finalize, proc, free, aux);
+  trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
 }
 
 /* Returns the index of the next transformation.
    This value can be returned by a transformation procedure
    function to indicate a "jump" to that transformation. */
 size_t
 }
 
 /* Returns the index of the next transformation.
    This value can be returned by a transformation procedure
    function to indicate a "jump" to that transformation. */
 size_t
-next_transformation (void) 
+next_transformation (const struct dataset *ds)
 {
 {
-  return trns_chain_next (cur_trns_chain);
+  return trns_chain_next (ds->cur_trns_chain);
 }
 
 /* Returns true if the next call to add_transformation() will add
    a temporary transformation, false if it will add a permanent
    transformation. */
 bool
 }
 
 /* Returns true if the next call to add_transformation() will add
    a temporary transformation, false if it will add a permanent
    transformation. */
 bool
-proc_in_temporary_transformations (void) 
+proc_in_temporary_transformations (const struct dataset *ds)
 {
 {
-  return temporary_trns_chain != NULL;
+  return ds->temporary_trns_chain != NULL;
 }
 
 /* Marks the start of temporary transformations.
    Further calls to add_transformation() will add temporary
    transformations. */
 void
 }
 
 /* Marks the start of temporary transformations.
    Further calls to add_transformation() will add temporary
    transformations. */
 void
-proc_start_temporary_transformations (void) 
+proc_start_temporary_transformations (struct dataset *ds)
 {
 {
-  if (!proc_in_temporary_transformations ())
+  if (!proc_in_temporary_transformations (ds))
     {
     {
-      add_case_limit_trns ();
+      add_case_limit_trns (ds);
+
+      ds->permanent_dict = dict_clone (ds->dict);
 
 
-      permanent_dict = dict_clone (default_dict);
-      trns_chain_finalize (permanent_trns_chain);
-      temporary_trns_chain = cur_trns_chain = trns_chain_create ();
+      trns_chain_finalize (ds->permanent_trns_chain);
+      ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
     }
 }
 
     }
 }
 
@@ -738,16 +807,16 @@ proc_start_temporary_transformations (void)
    permanent.
    Returns true if anything changed, false otherwise. */
 bool
    permanent.
    Returns true if anything changed, false otherwise. */
 bool
-proc_make_temporary_transformations_permanent (void) 
+proc_make_temporary_transformations_permanent (struct dataset *ds)
 {
 {
-  if (proc_in_temporary_transformations ()) 
+  if (proc_in_temporary_transformations (ds))
     {
     {
-      trns_chain_finalize (temporary_trns_chain);
-      trns_chain_splice (permanent_trns_chain, temporary_trns_chain);
-      temporary_trns_chain = NULL;
+      trns_chain_finalize (ds->temporary_trns_chain);
+      trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
+      ds->temporary_trns_chain = NULL;
 
 
-      dict_destroy (permanent_dict);
-      permanent_dict = NULL;
+      dict_destroy (ds->permanent_dict);
+      ds->permanent_dict = NULL;
 
       return true;
     }
 
       return true;
     }
@@ -759,16 +828,15 @@ proc_make_temporary_transformations_permanent (void)
    transformations will be permanent.
    Returns true if anything changed, false otherwise. */
 bool
    transformations will be permanent.
    Returns true if anything changed, false otherwise. */
 bool
-proc_cancel_temporary_transformations (void) 
+proc_cancel_temporary_transformations (struct dataset *ds)
 {
 {
-  if (proc_in_temporary_transformations ()) 
+  if (proc_in_temporary_transformations (ds))
     {
     {
-      dict_destroy (default_dict);
-      default_dict = permanent_dict;
-      permanent_dict = NULL;
+      dataset_set_dict (ds, ds->permanent_dict);
+      ds->permanent_dict = NULL;
 
 
-      trns_chain_destroy (temporary_trns_chain);
-      temporary_trns_chain = NULL;
+      trns_chain_destroy (ds->temporary_trns_chain);
+      ds->temporary_trns_chain = NULL;
 
       return true;
     }
 
       return true;
     }
@@ -779,55 +847,68 @@ proc_cancel_temporary_transformations (void)
 /* Cancels all transformations, if any.
    Returns true if successful, false on I/O error. */
 bool
 /* Cancels all transformations, if any.
    Returns true if successful, false on I/O error. */
 bool
-proc_cancel_all_transformations (void)
+proc_cancel_all_transformations (struct dataset *ds)
 {
   bool ok;
 {
   bool ok;
-  ok = trns_chain_destroy (permanent_trns_chain);
-  ok = trns_chain_destroy (temporary_trns_chain) && ok;
-  permanent_trns_chain = cur_trns_chain = trns_chain_create ();
-  temporary_trns_chain = NULL;
+  ok = trns_chain_destroy (ds->permanent_trns_chain);
+  ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
+  ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
+  ds->temporary_trns_chain = NULL;
   return ok;
 }
 \f
 /* Initializes procedure handling. */
   return ok;
 }
 \f
 /* Initializes procedure handling. */
-void
-proc_init (void) 
+struct dataset *
+create_dataset (struct casefile_factory *fact,
+               replace_source_callback *rps,
+               replace_dictionary_callback *rds
+               )
 {
 {
-  default_dict = dict_create ();
-  proc_cancel_all_transformations ();
+  struct dataset *ds = xzalloc (sizeof(*ds));
+  ds->dict = dict_create ();
+  ds->cf_factory = fact;
+  ds->replace_source = rps;
+  ds->replace_dict = rds;
+  proc_cancel_all_transformations (ds);
+  return ds;
 }
 
 /* Finishes up procedure handling. */
 void
 }
 
 /* Finishes up procedure handling. */
 void
-proc_done (void)
+destroy_dataset (struct dataset *ds)
 {
 {
-  discard_variables ();
+  discard_variables (ds);
+  dict_destroy (ds->dict);
+  trns_chain_destroy (ds->permanent_trns_chain);
+  free (ds);
 }
 
 /* Sets SINK as the destination for procedure output from the
    next procedure. */
 void
 }
 
 /* Sets SINK as the destination for procedure output from the
    next procedure. */
 void
-proc_set_sink (struct case_sink *sink) 
+proc_set_sink (struct dataset *ds, struct case_sink *sink)
 {
 {
-  assert (proc_sink == NULL);
-  proc_sink = sink;
+  assert (ds->proc_sink == NULL);
+  ds->proc_sink = sink;
 }
 
 /* Sets SOURCE as the source for procedure input for the next
    procedure. */
 void
 }
 
 /* Sets SOURCE as the source for procedure input for the next
    procedure. */
 void
-proc_set_source (struct case_source *source) 
+proc_set_source (struct dataset *ds, struct case_source *source)
 {
 {
-  assert (proc_source == NULL);
-  proc_source = source;
+  ds->proc_source = source;
+
+  if ( ds->replace_source )
+    ds->replace_source (ds->proc_source);
 }
 
 /* Returns true if a source for the next procedure has been
    configured, false otherwise. */
 bool
 }
 
 /* Returns true if a source for the next procedure has been
    configured, false otherwise. */
 bool
-proc_has_source (void) 
+proc_has_source (const struct dataset *ds)
 {
 {
-  return proc_source != NULL;
+  return ds->proc_source != NULL;
 }
 
 /* Returns the output from the previous procedure.
 }
 
 /* Returns the output from the previous procedure.
@@ -835,19 +916,19 @@ proc_has_source (void)
    The returned casefile is owned by the caller; it will not be
    automatically used for the next procedure's input. */
 struct casefile *
    The returned casefile is owned by the caller; it will not be
    automatically used for the next procedure's input. */
 struct casefile *
-proc_capture_output (void) 
+proc_capture_output (struct dataset *ds)
 {
   struct casefile *casefile;
 
   /* Try to make sure that this function is called immediately
      after procedure() or a similar function. */
 {
   struct casefile *casefile;
 
   /* Try to make sure that this function is called immediately
      after procedure() or a similar function. */
-  assert (proc_source != NULL);
-  assert (case_source_is_class (proc_source, &storage_source_class));
-  assert (trns_chain_is_empty (permanent_trns_chain));
-  assert (!proc_in_temporary_transformations ());
+  assert (ds->proc_source != NULL);
+  assert (case_source_is_class (ds->proc_source, &storage_source_class));
+  assert (trns_chain_is_empty (ds->permanent_trns_chain));
+  assert (!proc_in_temporary_transformations (ds));
 
 
-  casefile = storage_source_decapsulate (proc_source);
-  proc_source = NULL;
+  casefile = storage_source_decapsulate (ds->proc_source);
+  proc_set_source (ds, NULL);
 
   return casefile;
 }
 
   return casefile;
 }
@@ -856,18 +937,18 @@ static trns_proc_func case_limit_trns_proc;
 static trns_free_func case_limit_trns_free;
 
 /* Adds a transformation that limits the number of cases that may
 static trns_free_func case_limit_trns_free;
 
 /* Adds a transformation that limits the number of cases that may
-   pass through, if default_dict has a case limit. */
+   pass through, if DS->DICT has a case limit. */
 static void
 static void
-add_case_limit_trns (void) 
+add_case_limit_trns (struct dataset *ds)
 {
 {
-  size_t case_limit = dict_get_case_limit (default_dict);
+  size_t case_limit = dict_get_case_limit (ds->dict);
   if (case_limit != 0)
     {
       size_t *cases_remaining = xmalloc (sizeof *cases_remaining);
       *cases_remaining = case_limit;
   if (case_limit != 0)
     {
       size_t *cases_remaining = xmalloc (sizeof *cases_remaining);
       *cases_remaining = case_limit;
-      add_transformation (case_limit_trns_proc, case_limit_trns_free,
+      add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
                           cases_remaining);
                           cases_remaining);
-      dict_set_case_limit (default_dict, 0);
+      dict_set_case_limit (ds->dict, 0);
     }
 }
 
     }
 }
 
@@ -875,12 +956,12 @@ add_case_limit_trns (void)
    *CASES_REMAINING. */
 static int
 case_limit_trns_proc (void *cases_remaining_,
    *CASES_REMAINING. */
 static int
 case_limit_trns_proc (void *cases_remaining_,
-                      struct ccase *c UNUSED, int case_nr UNUSED) 
+                      struct ccase *c UNUSED, casenumber case_nr UNUSED)
 {
   size_t *cases_remaining = cases_remaining_;
 {
   size_t *cases_remaining = cases_remaining_;
-  if (*cases_remaining > 0) 
+  if (*cases_remaining > 0)
     {
     {
-      *cases_remaining--;
+      (*cases_remaining)--;
       return TRNS_CONTINUE;
     }
   else
       return TRNS_CONTINUE;
     }
   else
@@ -889,7 +970,7 @@ case_limit_trns_proc (void *cases_remaining_,
 
 /* Frees the data associated with a case limit transformation. */
 static bool
 
 /* Frees the data associated with a case limit transformation. */
 static bool
-case_limit_trns_free (void *cases_remaining_) 
+case_limit_trns_free (void *cases_remaining_)
 {
   size_t *cases_remaining = cases_remaining_;
   free (cases_remaining);
 {
   size_t *cases_remaining = cases_remaining_;
   free (cases_remaining);
@@ -901,25 +982,68 @@ static trns_proc_func filter_trns_proc;
 /* Adds a temporary transformation to filter data according to
    the variable specified on FILTER, if any. */
 static void
 /* Adds a temporary transformation to filter data according to
    the variable specified on FILTER, if any. */
 static void
-add_filter_trns (void) 
+add_filter_trns (struct dataset *ds)
 {
 {
-  struct variable *filter_var = dict_get_filter (default_dict);
-  if (filter_var != NULL) 
+  struct variable *filter_var = dict_get_filter (ds->dict);
+  if (filter_var != NULL)
     {
     {
-      proc_start_temporary_transformations ();
-      add_transformation (filter_trns_proc, NULL, filter_var);
+      proc_start_temporary_transformations (ds);
+      add_transformation (ds, filter_trns_proc, NULL, filter_var);
     }
 }
 
 /* FILTER transformation. */
 static int
 filter_trns_proc (void *filter_var_,
     }
 }
 
 /* FILTER transformation. */
 static int
 filter_trns_proc (void *filter_var_,
-                  struct ccase *c UNUSED, int case_nr UNUSED) 
-  
+                  struct ccase *c UNUSED, casenumber case_nr UNUSED)
+
 {
   struct variable *filter_var = filter_var_;
 {
   struct variable *filter_var = filter_var_;
-  double f = case_num (c, filter_var->fv);
-  return (f != 0.0 && !mv_is_num_missing (&filter_var->miss, f)
+  double f = case_num (c, filter_var);
+  return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
           ? TRNS_CONTINUE : TRNS_DROP_CASE);
 }
 
           ? TRNS_CONTINUE : TRNS_DROP_CASE);
 }
 
+
+struct dictionary *
+dataset_dict (const struct dataset *ds)
+{
+  return ds->dict;
+}
+
+
+/* Set or replace dataset DS's dictionary with DICT.
+   The old dictionary is destroyed */
+void
+dataset_set_dict (struct dataset *ds, struct dictionary *dict)
+{
+  struct dictionary *old_dict = ds->dict;
+
+  dict_copy_callbacks (dict, ds->dict);
+  ds->dict = dict;
+
+  if ( ds->replace_dict )
+    ds->replace_dict (dict);
+
+  dict_destroy (old_dict);
+}
+
+int
+dataset_n_lag (const struct dataset *ds)
+{
+  return ds->n_lag;
+}
+
+void
+dataset_set_n_lag (struct dataset *ds, int n_lag)
+{
+  ds->n_lag = n_lag;
+}
+
+
+struct casefile_factory *
+dataset_get_casefile_factory (const struct dataset *ds)
+{
+  return ds->cf_factory;
+}
+