Encapsulated the static data of procedure.[ch] into a single object, to be
[pspp-builds.git] / src / data / procedure.c
index 1843852a64f7b635b1a4d08ede407c43c6692e58..5341de4652a068e5794319c98b4720b0eccd6763 100644 (file)
 struct write_case_data
   {
     /* Function to call for each case. */
-    bool (*case_func) (const struct ccase *, void *);
+    case_func_t case_func;
     void *aux;
 
+    struct dataset *dataset;    /* The dataset concerned */
     struct ccase trns_case;     /* Case used for transformations. */
     struct ccase sink_case;     /* Case written to sink, if
                                    compacting is necessary. */
     size_t cases_written;       /* Cases output so far. */
   };
 
-/* Cases are read from proc_source,
-   pass through permanent_trns_chain (which transforms them into
-   the format described by permanent_dict),
-   are written to proc_sink,
-   pass through temporary_trns_chain (which transforms them into
-   the format described by default_dict),
-   and are finally passed to the procedure. */
-static struct case_source *proc_source;
-static struct trns_chain *permanent_trns_chain;
-static struct dictionary *permanent_dict;
-static struct case_sink *proc_sink;
-static struct trns_chain *temporary_trns_chain;
-struct dictionary *default_dict;
-
-/* The transformation chain that the next transformation will be
-   added to. */
-static struct trns_chain *cur_trns_chain;
-
-/* The compactor used to compact a case, if necessary;
-   otherwise a null pointer. */
-static struct dict_compactor *compactor;
-
-/* Time at which proc was last invoked. */
-static time_t last_proc_invocation;
-
-/* Lag queue. */
-int n_lag;                     /* Number of cases to lag. */
-static int lag_count;          /* Number of cases in lag_queue so far. */
-static int lag_head;           /* Index where next case will be added. */
-static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
-
-static void add_case_limit_trns (void);
-static void add_filter_trns (void);
-
-static bool internal_procedure (bool (*case_func) (const struct ccase *,
-                                                   void *),
+struct dataset {
+  /* Cases are read from proc_source,
+     pass through permanent_trns_chain (which transforms them into
+     the format described by permanent_dict),
+     are written to proc_sink,
+     pass through temporary_trns_chain (which transforms them into
+     the format described by dict),
+     and are finally passed to the procedure. */
+  struct case_source *proc_source;
+  struct trns_chain *permanent_trns_chain;
+  struct dictionary *permanent_dict;
+  struct case_sink *proc_sink;
+  struct trns_chain *temporary_trns_chain;
+  struct dictionary *dict;
+
+  /* The transformation chain that the next transformation will be
+     added to. */
+  struct trns_chain *cur_trns_chain;
+
+  /* The compactor used to compact a case, if necessary;
+     otherwise a null pointer. */
+  struct dict_compactor *compactor;
+
+  /* Time at which proc was last invoked. */
+  time_t last_proc_invocation;
+
+  /* Lag queue. */
+  int n_lag;                   /* Number of cases to lag. */
+  int lag_count;               /* Number of cases in lag_queue so far. */
+  int lag_head;                /* Index where next case will be added. */
+  struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
+
+}; /* struct dataset */
+
+
+struct dataset *current_dataset;
+
+static void add_case_limit_trns (struct dataset *ds);
+static void add_filter_trns (struct dataset *ds);
+
+static bool internal_procedure (struct dataset *ds, case_func_t,
                                 bool (*end_func) (void *),
                                 void *aux);
-static void update_last_proc_invocation (void);
+static void update_last_proc_invocation (struct dataset *ds);
 static void create_trns_case (struct ccase *, struct dictionary *);
-static void open_active_file (void);
+static void open_active_file (struct dataset *ds);
 static bool write_case (struct write_case_data *wc_data);
-static void lag_case (const struct ccase *c);
-static void clear_case (struct ccase *c);
-static bool close_active_file (void);
+static void lag_case (struct dataset *ds, const struct ccase *c);
+static void clear_case (const struct dataset *ds, struct ccase *c);
+static bool close_active_file (struct dataset *ds);
 \f
 /* Public functions. */
 
 /* Returns the last time the data was read. */
 time_t
-time_of_last_procedure (void
+time_of_last_procedure (struct dataset *ds
 {
-  if (last_proc_invocation == 0)
-    update_last_proc_invocation ();
-  return last_proc_invocation;
+  if (ds->last_proc_invocation == 0)
+    update_last_proc_invocation (ds);
+  return ds->last_proc_invocation;
 }
 \f
 /* Regular procedure. */
 
+
+
 /* Reads the data from the input program and writes it to a new
    active file.  For each case we read from the input program, we
    do the following:
@@ -127,9 +135,9 @@ time_of_last_procedure (void)
 
    Returns true if successful, false if an I/O error occurred. */
 bool
-procedure (bool (*proc_func) (const struct ccase *, void *), void *aux)
+procedure (struct dataset *ds, case_func_t cf, void *aux)
 {
-  return internal_procedure (proc_func, NULL, aux);
+  return internal_procedure (ds, cf, NULL, aux);
 }
 \f
 /* Multipass procedure. */
@@ -163,17 +171,16 @@ multipass_end_func (void *aux_data_)
    The entire active file is passed to PROC_FUNC, with the given
    AUX as auxiliary data, as a unit. */
 bool
-multipass_procedure (bool (*proc_func) (const struct casefile *, void *aux),
-                     void *aux) 
+multipass_procedure (struct dataset *ds, casefile_func_t proc_func,  void *aux) 
 {
   struct multipass_aux_data aux_data;
   bool ok;
 
-  aux_data.casefile = fastfile_create (dict_get_next_value_idx (default_dict));
+  aux_data.casefile = fastfile_create (dict_get_next_value_idx (ds->dict));
   aux_data.proc_func = proc_func;
   aux_data.aux = aux;
 
-  ok = internal_procedure (multipass_case_func, multipass_end_func, &aux_data);
+  ok = internal_procedure (ds, multipass_case_func, multipass_end_func, &aux_data);
   ok = !casefile_error (aux_data.casefile) && ok;
 
   casefile_destroy (aux_data.casefile);
@@ -183,48 +190,50 @@ multipass_procedure (bool (*proc_func) (const struct casefile *, void *aux),
 \f
 /* Procedure implementation. */
 
+
 /* Executes a procedure.
    Passes each case to CASE_FUNC.
    Calls END_FUNC after the last case.
    Returns true if successful, false if an I/O error occurred (or
    if CASE_FUNC or END_FUNC ever returned false). */
 static bool
-internal_procedure (bool (*case_func) (const struct ccase *, void *),
+internal_procedure (struct dataset *ds, case_func_t case_func,
                     bool (*end_func) (void *),
                     void *aux) 
 {
   struct write_case_data wc_data;
   bool ok = true;
 
-  assert (proc_source != NULL);
+  assert (ds->proc_source != NULL);
 
-  update_last_proc_invocation ();
+  update_last_proc_invocation (ds);
 
   /* Optimize the trivial case where we're not going to do
      anything with the data, by not reading the data at all. */
   if (case_func == NULL && end_func == NULL
-      && case_source_is_class (proc_source, &storage_source_class)
-      && proc_sink == NULL
-      && (temporary_trns_chain == NULL
-          || trns_chain_is_empty (temporary_trns_chain))
-      && trns_chain_is_empty (permanent_trns_chain))
+      && case_source_is_class (ds->proc_source, &storage_source_class)
+      && ds->proc_sink == NULL
+      && (ds->temporary_trns_chain == NULL
+          || trns_chain_is_empty (ds->temporary_trns_chain))
+      && trns_chain_is_empty (ds->permanent_trns_chain))
     {
-      n_lag = 0;
-      dict_set_case_limit (default_dict, 0);
-      dict_clear_vectors (default_dict);
+      ds->n_lag = 0;
+      dict_set_case_limit (ds->dict, 0);
+      dict_clear_vectors (ds->dict);
       return true;
     }
   
-  open_active_file ();
+  open_active_file (ds);
   
   wc_data.case_func = case_func;
   wc_data.aux = aux;
-  create_trns_case (&wc_data.trns_case, default_dict);
+  wc_data.dataset = ds;
+  create_trns_case (&wc_data.trns_case, ds->dict);
   case_create (&wc_data.sink_case,
-               dict_get_compacted_value_cnt (default_dict));
+               dict_get_compacted_value_cnt (ds->dict));
   wc_data.cases_written = 0;
 
-  ok = proc_source->class->read (proc_source,
+  ok = ds->proc_source->class->read (ds->proc_source,
                                  &wc_data.trns_case,
                                  write_case, &wc_data) && ok;
   if (end_func != NULL)
@@ -233,16 +242,16 @@ internal_procedure (bool (*case_func) (const struct ccase *, void *),
   case_destroy (&wc_data.sink_case);
   case_destroy (&wc_data.trns_case);
 
-  ok = close_active_file () && ok;
+  ok = close_active_file (ds) && ok;
 
   return ok;
 }
 
 /* Updates last_proc_invocation. */
 static void
-update_last_proc_invocation (void
+update_last_proc_invocation (struct dataset *ds
 {
-  last_proc_invocation = time (NULL);
+  ds->last_proc_invocation = time (NULL);
 }
 
 /* Creates and returns a case, initializing it from the vectors
@@ -270,40 +279,41 @@ create_trns_case (struct ccase *trns_case, struct dictionary *dict)
 /* Makes all preparations for reading from the data source and writing
    to the data sink. */
 static void
-open_active_file (void)
+open_active_file (struct dataset *ds)
 {
-  add_case_limit_trns ();
-  add_filter_trns ();
+  add_case_limit_trns (ds);
+  add_filter_trns (ds);
 
   /* Finalize transformations. */
-  trns_chain_finalize (cur_trns_chain);
+  trns_chain_finalize (ds->cur_trns_chain);
 
   /* Make permanent_dict refer to the dictionary right before
      data reaches the sink. */
-  if (permanent_dict == NULL)
-    permanent_dict = default_dict;
+  if (ds->permanent_dict == NULL)
+    ds->permanent_dict = ds->dict;
 
   /* Figure out whether to compact. */
-  compactor = (dict_compacting_would_shrink (permanent_dict)
-               ? dict_make_compactor (permanent_dict)
-               : NULL);
+  ds->compactor = 
+    (dict_compacting_would_shrink (ds->permanent_dict)
+     ? dict_make_compactor (ds->permanent_dict)
+     : NULL);
 
   /* Prepare sink. */
-  if (proc_sink == NULL)
-    proc_sink = create_case_sink (&storage_sink_class, permanent_dict, NULL);
-  if (proc_sink->class->open != NULL)
-    proc_sink->class->open (proc_sink);
+  if (ds->proc_sink == NULL)
+    ds->proc_sink = create_case_sink (&storage_sink_class, ds->permanent_dict, NULL);
+  if (ds->proc_sink->class->open != NULL)
+    ds->proc_sink->class->open (ds->proc_sink);
 
   /* Allocate memory for lag queue. */
-  if (n_lag > 0)
+  if (ds->n_lag > 0)
     {
       int i;
   
-      lag_count = 0;
-      lag_head = 0;
-      lag_queue = xnmalloc (n_lag, sizeof *lag_queue);
-      for (i = 0; i < n_lag; i++)
-        case_nullify (&lag_queue[i]);
+      ds->lag_count = 0;
+      ds->lag_head = 0;
+      ds->lag_queue = xnmalloc (ds->n_lag, sizeof *ds->lag_queue);
+      for (i = 0; i < ds->n_lag; i++)
+        case_nullify (&ds->lag_queue[i]);
     }
 }
 
@@ -316,36 +326,38 @@ write_case (struct write_case_data *wc_data)
 {
   enum trns_result retval;
   size_t case_nr;
+
+  struct dataset *ds = wc_data->dataset;
   
   /* Execute permanent transformations.  */
   case_nr = wc_data->cases_written + 1;
-  retval = trns_chain_execute (permanent_trns_chain,
+  retval = trns_chain_execute (ds->permanent_trns_chain,
                                &wc_data->trns_case, &case_nr);
   if (retval != TRNS_CONTINUE)
     goto done;
 
   /* Write case to LAG queue. */
-  if (n_lag)
-    lag_case (&wc_data->trns_case);
+  if (ds->n_lag)
+    lag_case (ds, &wc_data->trns_case);
 
   /* Write case to replacement active file. */
   wc_data->cases_written++;
-  if (proc_sink->class->write != NULL) 
+  if (ds->proc_sink->class->write != NULL) 
     {
-      if (compactor != NULL) 
+      if (ds->compactor != NULL) 
         {
-          dict_compactor_compact (compactor, &wc_data->sink_case,
+          dict_compactor_compact (ds->compactor, &wc_data->sink_case,
                                   &wc_data->trns_case);
-          proc_sink->class->write (proc_sink, &wc_data->sink_case);
+          ds->proc_sink->class->write (ds->proc_sink, &wc_data->sink_case);
         }
       else
-        proc_sink->class->write (proc_sink, &wc_data->trns_case);
+        ds->proc_sink->class->write (ds->proc_sink, &wc_data->trns_case);
     }
   
   /* Execute temporary transformations. */
-  if (temporary_trns_chain != NULL) 
+  if (ds->temporary_trns_chain != NULL) 
     {
-      retval = trns_chain_execute (temporary_trns_chain,
+      retval = trns_chain_execute (ds->temporary_trns_chain,
                                    &wc_data->trns_case,
                                    &wc_data->cases_written);
       if (retval != TRNS_CONTINUE)
@@ -358,33 +370,33 @@ write_case (struct write_case_data *wc_data)
       retval = TRNS_ERROR;
 
  done:
-  clear_case (&wc_data->trns_case);
+  clear_case (ds, &wc_data->trns_case);
   return retval != TRNS_ERROR;
 }
 
 /* Add C to the lag queue. */
 static void
-lag_case (const struct ccase *c)
+lag_case (struct dataset *ds, const struct ccase *c)
 {
-  if (lag_count < n_lag)
-    lag_count++;
-  case_destroy (&lag_queue[lag_head]);
-  case_clone (&lag_queue[lag_head], c);
-  if (++lag_head >= n_lag)
-    lag_head = 0;
+  if (ds->lag_count < ds->n_lag)
+    ds->lag_count++;
+  case_destroy (&ds->lag_queue[ds->lag_head]);
+  case_clone (&ds->lag_queue[ds->lag_head], c);
+  if (++ds->lag_head >= ds->n_lag)
+    ds->lag_head = 0;
 }
 
 /* Clears the variables in C that need to be cleared between
    processing cases.  */
 static void
-clear_case (struct ccase *c)
+clear_case (const struct dataset *ds, struct ccase *c)
 {
-  size_t var_cnt = dict_get_var_cnt (default_dict);
+  size_t var_cnt = dict_get_var_cnt (ds->dict);
   size_t i;
   
   for (i = 0; i < var_cnt; i++) 
     {
-      struct variable *v = dict_get_var (default_dict, i);
+      struct variable *v = dict_get_var (ds->dict, i);
       if (!v->leave) 
         {
           if (v->type == NUMERIC)
@@ -397,59 +409,59 @@ clear_case (struct ccase *c)
 
 /* Closes the active file. */
 static bool
-close_active_file (void)
+close_active_file (struct dataset *ds)
 {
   /* Free memory for lag queue, and turn off lagging. */
-  if (n_lag > 0)
+  if (ds->n_lag > 0)
     {
       int i;
       
-      for (i = 0; i < n_lag; i++)
-       case_destroy (&lag_queue[i]);
-      free (lag_queue);
-      n_lag = 0;
+      for (i = 0; i < ds->n_lag; i++)
+       case_destroy (&ds->lag_queue[i]);
+      free (ds->lag_queue);
+      ds->n_lag = 0;
     }
   
   /* Dictionary from before TEMPORARY becomes permanent. */
-  proc_cancel_temporary_transformations ();
+  proc_cancel_temporary_transformations (ds);
 
   /* Finish compacting. */
-  if (compactor != NULL) 
+  if (ds->compactor != NULL) 
     {
-      dict_compactor_destroy (compactor);
-      dict_compact_values (default_dict);
-      compactor = NULL;
+      dict_compactor_destroy (ds->compactor);
+      dict_compact_values (ds->dict);
+      ds->compactor = NULL;
     }
     
   /* Free data source. */
-  free_case_source (proc_source);
-  proc_source = NULL;
+  free_case_source (ds->proc_source);
+  ds->proc_source = NULL;
 
   /* Old data sink becomes new data source. */
-  if (proc_sink->class->make_source != NULL)
-    proc_source = proc_sink->class->make_source (proc_sink);
-  free_case_sink (proc_sink);
-  proc_sink = NULL;
+  if (ds->proc_sink->class->make_source != NULL)
+    ds->proc_source = ds->proc_sink->class->make_source (ds->proc_sink);
+  free_case_sink (ds->proc_sink);
+  ds->proc_sink = NULL;
 
-  dict_clear_vectors (default_dict);
-  permanent_dict = NULL;
-  return proc_cancel_all_transformations ();
+  dict_clear_vectors (ds->dict);
+  ds->permanent_dict = NULL;
+  return proc_cancel_all_transformations (ds);
 }
 \f
 /* Returns a pointer to the lagged case from N_BEFORE cases before the
    current one, or NULL if there haven't been that many cases yet. */
 struct ccase *
-lagged_case (int n_before)
+lagged_case (const struct dataset *ds, int n_before)
 {
   assert (n_before >= 1 );
-  assert (n_before <= n_lag);
+  assert (n_before <= ds->n_lag);
 
-  if (n_before <= lag_count)
+  if (n_before <= ds->lag_count)
     {
-      int index = lag_head - n_before;
+      int index = ds->lag_head - n_before;
       if (index < 0)
-        index += n_lag;
-      return &lag_queue[index];
+        index += ds->n_lag;
+      return &ds->lag_queue[index];
     }
   else
     return NULL;
@@ -460,16 +472,17 @@ lagged_case (int n_before)
 /* Represents auxiliary data for handling SPLIT FILE. */
 struct split_aux_data 
   {
+    struct dataset *dataset;    /* The dataset */
     struct ccase prev_case;     /* Data in previous case. */
 
     /* Callback functions. */
-    void (*begin_func) (const struct ccase *, void *);
-    bool (*proc_func) (const struct ccase *, void *);
+    begin_func_t begin_func ; 
+    case_func_t proc_func ;
     void (*end_func) (void *);
     void *func_aux;
   };
 
-static int equal_splits (const struct ccase *, const struct ccase *);
+static int equal_splits (const struct ccase *, const struct ccase *, const struct dataset *ds);
 static bool split_procedure_case_func (const struct ccase *c, void *);
 static bool split_procedure_end_func (void *);
 
@@ -491,8 +504,9 @@ static bool split_procedure_end_func (void *);
    
    Returns true if successful, false if an I/O error occurred. */
 bool
-procedure_with_splits (void (*begin_func) (const struct ccase *, void *aux),
-                       bool (*proc_func) (const struct ccase *, void *aux),
+procedure_with_splits (struct dataset *ds,
+                      begin_func_t begin_func, 
+                      case_func_t proc_func,
                        void (*end_func) (void *aux),
                        void *func_aux) 
 {
@@ -504,8 +518,9 @@ procedure_with_splits (void (*begin_func) (const struct ccase *, void *aux),
   split_aux.proc_func = proc_func;
   split_aux.end_func = end_func;
   split_aux.func_aux = func_aux;
+  split_aux.dataset = ds;
 
-  ok = internal_procedure (split_procedure_case_func,
+  ok = internal_procedure (ds, split_procedure_case_func,
                            split_procedure_end_func, &split_aux);
 
   case_destroy (&split_aux.prev_case);
@@ -521,7 +536,7 @@ split_procedure_case_func (const struct ccase *c, void *split_aux_)
 
   /* Start a new series if needed. */
   if (case_is_null (&split_aux->prev_case)
-      || !equal_splits (c, &split_aux->prev_case))
+      || !equal_splits (c, &split_aux->prev_case, split_aux->dataset))
     {
       if (!case_is_null (&split_aux->prev_case) && split_aux->end_func != NULL)
         split_aux->end_func (split_aux->func_aux);
@@ -531,6 +546,7 @@ split_procedure_case_func (const struct ccase *c, void *split_aux_)
 
       if (split_aux->begin_func != NULL)
        split_aux->begin_func (&split_aux->prev_case, split_aux->func_aux);
+
     }
 
   return (split_aux->proc_func == NULL
@@ -551,11 +567,12 @@ split_procedure_end_func (void *split_aux_)
 /* Compares the SPLIT FILE variables in cases A and B and returns
    nonzero only if they differ. */
 static int
-equal_splits (const struct ccase *a, const struct ccase *b) 
+equal_splits (const struct ccase *a, const struct ccase *b, 
+             const struct dataset *ds) 
 {
   return case_compare (a, b,
-                       dict_get_split_vars (default_dict),
-                       dict_get_split_cnt (default_dict)) == 0;
+                       dict_get_split_vars (ds->dict),
+                       dict_get_split_cnt (ds->dict)) == 0;
 }
 \f
 /* Multipass procedure that separates the data into SPLIT FILE
@@ -565,6 +582,7 @@ equal_splits (const struct ccase *a, const struct ccase *b)
    multipass procedure. */
 struct multipass_split_aux_data 
   {
+    struct dataset *dataset;    /* The dataset of the split */
     struct ccase prev_case;     /* Data in previous case. */
     struct casefile *casefile;  /* Accumulates data for a split. */
 
@@ -580,7 +598,8 @@ static bool multipass_split_output (struct multipass_split_aux_data *);
 
 /* Returns true if successful, false if an I/O error occurred. */
 bool
-multipass_procedure_with_splits (bool (*split_func) (const struct ccase *first,
+multipass_procedure_with_splits (struct dataset *ds, 
+                                bool (*split_func) (const struct ccase *first,
                                                      const struct casefile *,
                                                      void *aux),
                                  void *func_aux)
@@ -592,8 +611,9 @@ multipass_procedure_with_splits (bool (*split_func) (const struct ccase *first,
   aux.casefile = NULL;
   aux.split_func = split_func;
   aux.func_aux = func_aux;
+  aux.dataset = ds;
 
-  ok = internal_procedure (multipass_split_case_func,
+  ok = internal_procedure (ds, multipass_split_case_func,
                            multipass_split_end_func, &aux);
   case_destroy (&aux.prev_case);
 
@@ -605,10 +625,11 @@ static bool
 multipass_split_case_func (const struct ccase *c, void *aux_)
 {
   struct multipass_split_aux_data *aux = aux_;
+  struct dataset *ds = aux->dataset;
   bool ok = true;
 
   /* Start a new series if needed. */
-  if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case))
+  if (aux->casefile == NULL || ! equal_splits (c, &aux->prev_case, ds))
     {
       /* Record split values. */
       case_destroy (&aux->prev_case);
@@ -619,7 +640,8 @@ multipass_split_case_func (const struct ccase *c, void *aux_)
         ok = multipass_split_output (aux);
 
       /* Start a new casefile. */
-      aux->casefile = fastfile_create (dict_get_next_value_idx (default_dict));
+      aux->casefile = 
+       fastfile_create (dict_get_next_value_idx (ds->dict));
     }
 
   return casefile_append (aux->casefile, c) && ok;
@@ -649,30 +671,30 @@ multipass_split_output (struct multipass_split_aux_data *aux)
 /* Discards all the current state in preparation for a data-input
    command like DATA LIST or GET. */
 void
-discard_variables (void)
+discard_variables (struct dataset *ds)
 {
-  dict_clear (default_dict);
+  dict_clear (ds->dict);
   fh_set_default_handle (NULL);
 
-  n_lag = 0;
+  ds->n_lag = 0;
   
-  free_case_source (proc_source);
-  proc_source = NULL;
+  free_case_source (ds->proc_source);
+  ds->proc_source = NULL;
 
-  proc_cancel_all_transformations ();
+  proc_cancel_all_transformations (ds);
 }
 \f
 /* Returns the current set of permanent transformations,
    and clears the permanent transformations.
    For use by INPUT PROGRAM. */
 struct trns_chain *
-proc_capture_transformations (void
+proc_capture_transformations (struct dataset *ds
 {
   struct trns_chain *chain;
   
-  assert (temporary_trns_chain == NULL);
-  chain = permanent_trns_chain;
-  cur_trns_chain = permanent_trns_chain = trns_chain_create ();
+  assert (ds->temporary_trns_chain == NULL);
+  chain = ds->permanent_trns_chain;
+  ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
   return chain;
 }
 
@@ -680,9 +702,9 @@ proc_capture_transformations (void)
    frees itself with FREE to the current set of transformations.
    The functions are passed AUX as auxiliary data. */
 void
-add_transformation (trns_proc_func *proc, trns_free_func *free, void *aux)
+add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
 {
-  trns_chain_append (cur_trns_chain, NULL, proc, free, aux);
+  trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
 }
 
 /* Adds a transformation that processes a case with PROC and
@@ -691,44 +713,45 @@ add_transformation (trns_proc_func *proc, trns_free_func *free, void *aux)
    FINALIZE will be called.
    The functions are passed AUX as auxiliary data. */
 void
-add_transformation_with_finalizer (trns_finalize_func *finalize,
+add_transformation_with_finalizer (struct dataset *ds, 
+                                  trns_finalize_func *finalize,
                                    trns_proc_func *proc,
                                    trns_free_func *free, void *aux)
 {
-  trns_chain_append (cur_trns_chain, finalize, proc, free, aux);
+  trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
 }
 
 /* Returns the index of the next transformation.
    This value can be returned by a transformation procedure
    function to indicate a "jump" to that transformation. */
 size_t
-next_transformation (void
+next_transformation (const struct dataset *ds
 {
-  return trns_chain_next (cur_trns_chain);
+  return trns_chain_next (ds->cur_trns_chain);
 }
 
 /* Returns true if the next call to add_transformation() will add
    a temporary transformation, false if it will add a permanent
    transformation. */
 bool
-proc_in_temporary_transformations (void
+proc_in_temporary_transformations (const struct dataset *ds
 {
-  return temporary_trns_chain != NULL;
+  return ds->temporary_trns_chain != NULL;
 }
 
 /* Marks the start of temporary transformations.
    Further calls to add_transformation() will add temporary
    transformations. */
 void
-proc_start_temporary_transformations (void
+proc_start_temporary_transformations (struct dataset *ds
 {
-  if (!proc_in_temporary_transformations ())
+  if (!proc_in_temporary_transformations (ds))
     {
-      add_case_limit_trns ();
+      add_case_limit_trns (ds);
 
-      permanent_dict = dict_clone (default_dict);
-      trns_chain_finalize (permanent_trns_chain);
-      temporary_trns_chain = cur_trns_chain = trns_chain_create ();
+      ds->permanent_dict = dict_clone (ds->dict);
+      trns_chain_finalize (ds->permanent_trns_chain);
+      ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
     }
 }
 
@@ -737,16 +760,16 @@ proc_start_temporary_transformations (void)
    permanent.
    Returns true if anything changed, false otherwise. */
 bool
-proc_make_temporary_transformations_permanent (void
+proc_make_temporary_transformations_permanent (struct dataset *ds
 {
-  if (proc_in_temporary_transformations ()) 
+  if (proc_in_temporary_transformations (ds)) 
     {
-      trns_chain_finalize (temporary_trns_chain);
-      trns_chain_splice (permanent_trns_chain, temporary_trns_chain);
-      temporary_trns_chain = NULL;
+      trns_chain_finalize (ds->temporary_trns_chain);
+      trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
+      ds->temporary_trns_chain = NULL;
 
-      dict_destroy (permanent_dict);
-      permanent_dict = NULL;
+      dict_destroy (ds->permanent_dict);
+      ds->permanent_dict = NULL;
 
       return true;
     }
@@ -758,16 +781,16 @@ proc_make_temporary_transformations_permanent (void)
    transformations will be permanent.
    Returns true if anything changed, false otherwise. */
 bool
-proc_cancel_temporary_transformations (void
+proc_cancel_temporary_transformations (struct dataset *ds
 {
-  if (proc_in_temporary_transformations ()) 
+  if (proc_in_temporary_transformations (ds)) 
     {
-      dict_destroy (default_dict);
-      default_dict = permanent_dict;
-      permanent_dict = NULL;
+      dict_destroy (ds->dict);
+      ds->dict = ds->permanent_dict;
+      ds->permanent_dict = NULL;
 
-      trns_chain_destroy (temporary_trns_chain);
-      temporary_trns_chain = NULL;
+      trns_chain_destroy (ds->temporary_trns_chain);
+      ds->temporary_trns_chain = NULL;
 
       return true;
     }
@@ -778,56 +801,59 @@ proc_cancel_temporary_transformations (void)
 /* Cancels all transformations, if any.
    Returns true if successful, false on I/O error. */
 bool
-proc_cancel_all_transformations (void)
+proc_cancel_all_transformations (struct dataset *ds)
 {
   bool ok;
-  ok = trns_chain_destroy (permanent_trns_chain);
-  ok = trns_chain_destroy (temporary_trns_chain) && ok;
-  permanent_trns_chain = cur_trns_chain = trns_chain_create ();
-  temporary_trns_chain = NULL;
+  ok = trns_chain_destroy (ds->permanent_trns_chain);
+  ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
+  ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
+  ds->temporary_trns_chain = NULL;
   return ok;
 }
 \f
 /* Initializes procedure handling. */
-void
-proc_init (void) 
+struct dataset *
+create_dataset (void)
 {
-  default_dict = dict_create ();
-  proc_cancel_all_transformations ();
+  struct dataset *ds = xzalloc (sizeof(*ds));
+  ds->dict = dict_create ();
+  proc_cancel_all_transformations (ds);
+  return ds;
 }
 
 /* Finishes up procedure handling. */
 void
-proc_done (void)
+destroy_dataset (struct dataset *ds)
 {
-  discard_variables ();
-  dict_destroy (default_dict);
+  discard_variables (ds);
+  dict_destroy (ds->dict);
+  free (ds);
 }
 
 /* Sets SINK as the destination for procedure output from the
    next procedure. */
 void
-proc_set_sink (struct case_sink *sink) 
+proc_set_sink (struct dataset *ds, struct case_sink *sink) 
 {
-  assert (proc_sink == NULL);
-  proc_sink = sink;
+  assert (ds->proc_sink == NULL);
+  ds->proc_sink = sink;
 }
 
 /* Sets SOURCE as the source for procedure input for the next
    procedure. */
 void
-proc_set_source (struct case_source *source) 
+proc_set_source (struct dataset *ds, struct case_source *source) 
 {
-  assert (proc_source == NULL);
-  proc_source = source;
+  assert (ds->proc_source == NULL);
+  ds->proc_source = source;
 }
 
 /* Returns true if a source for the next procedure has been
    configured, false otherwise. */
 bool
-proc_has_source (void
+proc_has_source (const struct dataset *ds
 {
-  return proc_source != NULL;
+  return ds->proc_source != NULL;
 }
 
 /* Returns the output from the previous procedure.
@@ -835,19 +861,19 @@ proc_has_source (void)
    The returned casefile is owned by the caller; it will not be
    automatically used for the next procedure's input. */
 struct casefile *
-proc_capture_output (void
+proc_capture_output (struct dataset *ds
 {
   struct casefile *casefile;
 
   /* Try to make sure that this function is called immediately
      after procedure() or a similar function. */
-  assert (proc_source != NULL);
-  assert (case_source_is_class (proc_source, &storage_source_class));
-  assert (trns_chain_is_empty (permanent_trns_chain));
-  assert (!proc_in_temporary_transformations ());
+  assert (ds->proc_source != NULL);
+  assert (case_source_is_class (ds->proc_source, &storage_source_class));
+  assert (trns_chain_is_empty (ds->permanent_trns_chain));
+  assert (!proc_in_temporary_transformations (ds));
 
-  casefile = storage_source_decapsulate (proc_source);
-  proc_source = NULL;
+  casefile = storage_source_decapsulate (ds->proc_source);
+  ds->proc_source = NULL;
 
   return casefile;
 }
@@ -856,18 +882,18 @@ static trns_proc_func case_limit_trns_proc;
 static trns_free_func case_limit_trns_free;
 
 /* Adds a transformation that limits the number of cases that may
-   pass through, if default_dict has a case limit. */
+   pass through, if DS->DICT has a case limit. */
 static void
-add_case_limit_trns (void
+add_case_limit_trns (struct dataset *ds
 {
-  size_t case_limit = dict_get_case_limit (default_dict);
+  size_t case_limit = dict_get_case_limit (ds->dict);
   if (case_limit != 0)
     {
       size_t *cases_remaining = xmalloc (sizeof *cases_remaining);
       *cases_remaining = case_limit;
-      add_transformation (case_limit_trns_proc, case_limit_trns_free,
+      add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
                           cases_remaining);
-      dict_set_case_limit (default_dict, 0);
+      dict_set_case_limit (ds->dict, 0);
     }
 }
 
@@ -901,13 +927,13 @@ static trns_proc_func filter_trns_proc;
 /* Adds a temporary transformation to filter data according to
    the variable specified on FILTER, if any. */
 static void
-add_filter_trns (void
+add_filter_trns (struct dataset *ds
 {
-  struct variable *filter_var = dict_get_filter (default_dict);
+  struct variable *filter_var = dict_get_filter (ds->dict);
   if (filter_var != NULL) 
     {
-      proc_start_temporary_transformations ();
-      add_transformation (filter_trns_proc, NULL, filter_var);
+      proc_start_temporary_transformations (ds);
+      add_transformation (ds, filter_trns_proc, NULL, filter_var);
     }
 }
 
@@ -923,3 +949,30 @@ filter_trns_proc (void *filter_var_,
           ? TRNS_CONTINUE : TRNS_DROP_CASE);
 }
 
+
+struct dictionary *
+dataset_dict (const struct dataset *ds)
+{
+  return ds->dict;
+}
+
+
+void 
+dataset_set_dict (struct dataset *ds, struct dictionary *dict)
+{
+  ds->dict = dict;
+}
+
+int 
+dataset_n_lag (const struct dataset *ds)
+{
+  return ds->n_lag;
+}
+
+void 
+dataset_set_n_lag (struct dataset *ds, int n_lag)
+{
+  ds->n_lag = n_lag;
+}
+
+