Continue reforming procedure execution. In this phase, get rid of
[pspp-builds.git] / src / procedure.c
index 67f22c5c0dfca5a0e0442feb0a8888973bd222a1..9b94da01910e4743c3cc58955d7024e5ef2637b9 100644 (file)
@@ -35,9 +35,9 @@
 #include <data/file-handle-def.h>
 #include <data/settings.h>
 #include <data/storage-stream.h>
 #include <data/file-handle-def.h>
 #include <data/settings.h>
 #include <data/storage-stream.h>
+#include <data/transformations.h>
 #include <data/value-labels.h>
 #include <data/variable.h>
 #include <data/value-labels.h>
 #include <data/variable.h>
-#include <language/control/control-stack.h>
 #include <libpspp/alloc.h>
 #include <libpspp/message.h>
 #include <libpspp/misc.h>
 #include <libpspp/alloc.h>
 #include <libpspp/message.h>
 #include <libpspp/misc.h>
@@ -71,32 +71,40 @@ struct write_case_data
     size_t cases_analyzed;      /* Cases passed to procedure so far. */
   };
 
     size_t cases_analyzed;      /* Cases passed to procedure so far. */
   };
 
-/* The current active file, from which cases are read. */
-struct case_source *vfm_source;
-
-/* The replacement active file, to which cases are written. */
-struct case_sink *vfm_sink;
-
-/* The compactor used to compact a compact, if necessary;
+/* Cases are read from vfm_source,
+   pass through permanent_trns_chain (which transforms them into
+   the format described by permanent_dict),
+   are written to vfm_sink,
+   pass through temporary_trns_chain (which transforms them into
+   the format described by default_dict),
+   and are finally passed to the procedure. */
+static struct case_source *vfm_source;
+static struct trns_chain *permanent_trns_chain;
+static struct dictionary *permanent_dict;
+static struct case_sink *vfm_sink;
+static struct trns_chain *temporary_trns_chain;
+struct dictionary *default_dict;
+
+/* The transformation chain that the next transformation will be
+   added to. */
+static struct trns_chain *cur_trns_chain;
+
+/* The compactor used to compact a case, if necessary;
    otherwise a null pointer. */
 static struct dict_compactor *compactor;
 
 /* Time at which vfm was last invoked. */
 static time_t last_vfm_invocation;
 
    otherwise a null pointer. */
 static struct dict_compactor *compactor;
 
 /* Time at which vfm was last invoked. */
 static time_t last_vfm_invocation;
 
-/* Whether we're inside a procedure.
-   For debugging purposes only. */
-static bool in_procedure;
-
 /* Lag queue. */
 int n_lag;                     /* Number of cases to lag. */
 static int lag_count;          /* Number of cases in lag_queue so far. */
 static int lag_head;           /* Index where next case will be added. */
 static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
 
 /* Lag queue. */
 int n_lag;                     /* Number of cases to lag. */
 static int lag_count;          /* Number of cases in lag_queue so far. */
 static int lag_head;           /* Index where next case will be added. */
 static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
 
-/* Active transformations. */
-struct transformation *t_trns;
-size_t n_trns, m_trns, f_trns;
+static void add_case_limit_trns (void);
+static void add_filter_trns (void);
+static void add_process_if_trns (void);
 
 static bool internal_procedure (bool (*proc_func) (struct ccase *, void *),
                                 void *aux);
 
 static bool internal_procedure (bool (*proc_func) (struct ccase *, void *),
                                 void *aux);
@@ -104,11 +112,6 @@ static void update_last_vfm_invocation (void);
 static void create_trns_case (struct ccase *, struct dictionary *);
 static void open_active_file (void);
 static bool write_case (struct write_case_data *wc_data);
 static void create_trns_case (struct ccase *, struct dictionary *);
 static void open_active_file (void);
 static bool write_case (struct write_case_data *wc_data);
-static int execute_transformations (struct ccase *c,
-                                    struct transformation *trns,
-                                    int first_idx, int last_idx,
-                                    int case_num);
-static int filter_case (const struct ccase *c, int case_num);
 static void lag_case (const struct ccase *c);
 static void clear_case (struct ccase *c);
 static bool close_active_file (void);
 static void lag_case (const struct ccase *c);
 static void clear_case (struct ccase *c);
 static bool close_active_file (void);
@@ -126,26 +129,17 @@ time_of_last_procedure (void)
 
 /* Reads the data from the input program and writes it to a new
    active file.  For each case we read from the input program, we
 
 /* Reads the data from the input program and writes it to a new
    active file.  For each case we read from the input program, we
-   do the following
+   do the following:
 
    1. Execute permanent transformations.  If these drop the case,
       start the next case from step 1.
 
 
    1. Execute permanent transformations.  If these drop the case,
       start the next case from step 1.
 
-   2. N OF CASES.  If we have already written N cases, start the
-      next case from step 1.
-   
-   3. Write case to replacement active file.
+   2. Write case to replacement active file.
    
    
-   4. Execute temporary transformations.  If these drop the case,
+   3. Execute temporary transformations.  If these drop the case,
       start the next case from step 1.
       
       start the next case from step 1.
       
-   5. FILTER, PROCESS IF.  If these drop the case, start the next
-      case from step 1.
-   
-   6. Post-TEMPORARY N OF CASES.  If we have already analyzed N
-      cases, start the next case from step 1.
-      
-   7. Pass case to PROC_FUNC, passing AUX as auxiliary data.
+   4. Pass case to PROC_FUNC, passing AUX as auxiliary data.
 
    Returns true if successful, false if an I/O error occurred. */
 bool
 
    Returns true if successful, false if an I/O error occurred. */
 bool
@@ -154,10 +148,14 @@ procedure (bool (*proc_func) (struct ccase *, void *), void *aux)
   if (proc_func == NULL
       && case_source_is_class (vfm_source, &storage_source_class)
       && vfm_sink == NULL
   if (proc_func == NULL
       && case_source_is_class (vfm_source, &storage_source_class)
       && vfm_sink == NULL
-      && !temporary
-      && n_trns == 0)
+      && temporary_trns_chain == NULL
+      && trns_chain_is_empty (permanent_trns_chain))
     {
     {
-      /* Nothing to do. */
+      expr_free (process_if_expr);
+      process_if_expr = NULL;
+      dict_set_case_limit (default_dict, 0);
+      dict_clear_vectors (default_dict);
+
       update_last_vfm_invocation ();
       return true;
     }
       update_last_vfm_invocation ();
       return true;
     }
@@ -167,8 +165,57 @@ procedure (bool (*proc_func) (struct ccase *, void *), void *aux)
       
       open_active_file ();
       ok = internal_procedure (proc_func, aux);
       
       open_active_file ();
       ok = internal_procedure (proc_func, aux);
-      if (!close_active_file ())
-        ok = false;
+      ok = close_active_file () && ok;
+
+      return ok;
+    }
+}
+
+/* Callback function for multipass_procedure(). */
+static bool
+multipass_callback (struct ccase *c, void *cf_) 
+{
+  struct casefile *cf = cf_;
+  return casefile_append (cf, c);
+}
+
+/* Procedure that allows multiple passes over the input data.
+   The entire active file is passed to PROC_FUNC, with the given
+   AUX as auxiliary data, as a unit. */
+bool
+multipass_procedure (bool (*proc_func) (const struct casefile *, void *aux),
+                     void *aux) 
+{
+  if (case_source_is_class (vfm_source, &storage_source_class)
+      && vfm_sink == NULL
+      && temporary_trns_chain == NULL
+      && trns_chain_is_empty (permanent_trns_chain))
+    {
+      proc_func (storage_source_get_casefile (vfm_source), aux);
+
+      expr_free (process_if_expr);
+      process_if_expr = NULL;
+      dict_set_case_limit (default_dict, 0);
+      dict_clear_vectors (default_dict);
+
+      update_last_vfm_invocation ();
+      return true;
+    }
+  else 
+    {
+      struct casefile *cf;
+      bool ok;
+
+      assert (proc_func != NULL);
+
+      cf = casefile_create (dict_get_next_value_idx (default_dict));
+
+      open_active_file ();
+      ok = internal_procedure (multipass_callback, cf);
+      ok = proc_func (cf, aux) && ok;
+      ok = close_active_file () && ok;
+
+      casefile_destroy (cf);
 
       return ok;
     }
 
       return ok;
     }
@@ -237,25 +284,26 @@ create_trns_case (struct ccase *trns_case, struct dictionary *dict)
 static void
 open_active_file (void)
 {
 static void
 open_active_file (void)
 {
-  assert (!in_procedure);
-  in_procedure = true;
+  add_case_limit_trns ();
+  add_filter_trns ();
+  add_process_if_trns ();
 
 
-  /* Make temp_dict refer to the dictionary right before data
-     reaches the sink */
-  if (!temporary)
-    {
-      temp_trns = n_trns;
-      temp_dict = default_dict;
-    }
+  /* Finalize transformations. */
+  trns_chain_finalize (cur_trns_chain);
+
+  /* Make permanent_dict refer to the dictionary right before
+     data reaches the sink. */
+  if (permanent_dict == NULL)
+    permanent_dict = default_dict;
 
   /* Figure out compaction. */
 
   /* Figure out compaction. */
-  compactor = (dict_needs_compaction (temp_dict)
-               ? dict_make_compactor (temp_dict)
+  compactor = (dict_needs_compaction (permanent_dict)
+               ? dict_make_compactor (permanent_dict)
                : NULL);
 
   /* Prepare sink. */
   if (vfm_sink == NULL)
                : NULL);
 
   /* Prepare sink. */
   if (vfm_sink == NULL)
-    vfm_sink = create_case_sink (&storage_sink_class, temp_dict, NULL);
+    vfm_sink = create_case_sink (&storage_sink_class, permanent_dict, NULL);
   if (vfm_sink->class->open != NULL)
     vfm_sink->class->open (vfm_sink);
 
   if (vfm_sink->class->open != NULL)
     vfm_sink->class->open (vfm_sink);
 
@@ -270,9 +318,6 @@ open_active_file (void)
       for (i = 0; i < n_lag; i++)
         case_nullify (&lag_queue[i]);
     }
       for (i = 0; i < n_lag; i++)
         case_nullify (&lag_queue[i]);
     }
-
-  /* Close any unclosed DO IF or LOOP constructs. */
-  ctl_stack_clear ();
 }
 
 /* Transforms trns_case and writes it to the replacement active
 }
 
 /* Transforms trns_case and writes it to the replacement active
@@ -282,25 +327,22 @@ open_active_file (void)
 static bool
 write_case (struct write_case_data *wc_data)
 {
 static bool
 write_case (struct write_case_data *wc_data)
 {
-  int retval;
+  enum trns_result retval;
+  size_t case_nr;
   
   /* Execute permanent transformations.  */
   
   /* Execute permanent transformations.  */
-  retval = execute_transformations (&wc_data->trns_case, t_trns, f_trns,
-                                    temp_trns, wc_data->cases_written + 1);
-  if (retval != 1)
+  case_nr = wc_data->cases_written + 1;
+  retval = trns_chain_execute (permanent_trns_chain,
+                               &wc_data->trns_case, &case_nr);
+  if (retval != TRNS_CONTINUE)
     goto done;
 
     goto done;
 
-  /* N OF CASES. */
-  if (dict_get_case_limit (default_dict)
-      && wc_data->cases_written >= dict_get_case_limit (default_dict))
-    goto done;
-  wc_data->cases_written++;
-
   /* Write case to LAG queue. */
   if (n_lag)
     lag_case (&wc_data->trns_case);
 
   /* Write case to replacement active file. */
   /* Write case to LAG queue. */
   if (n_lag)
     lag_case (&wc_data->trns_case);
 
   /* Write case to replacement active file. */
+  wc_data->cases_written++;
   if (vfm_sink->class->write != NULL) 
     {
       if (compactor != NULL) 
   if (vfm_sink->class->write != NULL) 
     {
       if (compactor != NULL) 
@@ -314,94 +356,24 @@ write_case (struct write_case_data *wc_data)
     }
   
   /* Execute temporary transformations. */
     }
   
   /* Execute temporary transformations. */
-  retval = execute_transformations (&wc_data->trns_case, t_trns, temp_trns,
-                                    n_trns, wc_data->cases_written);
-  if (retval != 1)
-    goto done;
-  
-  /* FILTER, PROCESS IF, post-TEMPORARY N OF CASES. */
-  if (filter_case (&wc_data->trns_case, wc_data->cases_written)
-      || (dict_get_case_limit (temp_dict)
-          && wc_data->cases_analyzed >= dict_get_case_limit (temp_dict)))
-    goto done;
-  wc_data->cases_analyzed++;
+  if (temporary_trns_chain != NULL) 
+    {
+      retval = trns_chain_execute (temporary_trns_chain,
+                                   &wc_data->trns_case,
+                                   &wc_data->cases_written);
+      if (retval != TRNS_CONTINUE)
+        goto done;
+    }
 
   /* Pass case to procedure. */
 
   /* Pass case to procedure. */
+  wc_data->cases_analyzed++;
   if (wc_data->proc_func != NULL)
     if (!wc_data->proc_func (&wc_data->trns_case, wc_data->aux))
   if (wc_data->proc_func != NULL)
     if (!wc_data->proc_func (&wc_data->trns_case, wc_data->aux))
-      retval = -1;
+      retval = TRNS_ERROR;
 
  done:
   clear_case (&wc_data->trns_case);
 
  done:
   clear_case (&wc_data->trns_case);
-  return retval != -1;
-}
-
-/* Transforms case C using the transformations in TRNS[] with
-   indexes FIRST_IDX through LAST_IDX, exclusive.  Case C will
-   become case CASE_NUM (1-based) in the output file.  Returns 1
-   if the case was successfully transformed, 0 if it was filtered
-   out by one of the transformations, or -1 if the procedure
-   should be abandoned due to a fatal error. */
-static int
-execute_transformations (struct ccase *c,
-                         struct transformation *trns,
-                         int first_idx, int last_idx,
-                         int case_num) 
-{
-  int idx;
-
-  for (idx = first_idx; idx != last_idx; )
-    {
-      struct transformation *t = &trns[idx];
-      int retval = t->proc (t->private, c, case_num);
-      switch (retval)
-        {
-        case TRNS_CONTINUE:
-          idx++;
-          break;
-          
-        case TRNS_DROP_CASE:
-          return 0;
-
-        case TRNS_ERROR:
-          return -1;
-
-        case TRNS_NEXT_CASE:
-          abort ();
-
-        case TRNS_END_FILE:
-          abort ();
-          
-        default:
-          idx = retval;
-          break;
-        }
-    }
-
-  return 1;
-}
-
-/* Returns nonzero if case C with case number CASE_NUM should be
-   excluded as specified on FILTER or PROCESS IF, otherwise
-   zero. */
-static int
-filter_case (const struct ccase *c, int case_idx)
-{
-  /* FILTER. */
-  struct variable *filter_var = dict_get_filter (default_dict);
-  if (filter_var != NULL) 
-    {
-      double f = case_num (c, filter_var->fv);
-      if (f == 0.0 || mv_is_num_missing (&filter_var->miss, f))
-        return 1;
-    }
-
-  /* PROCESS IF. */
-  if (process_if_expr != NULL
-      && expr_evaluate_num (process_if_expr, c, case_idx) != 1.0)
-    return 1;
-
-  return 0;
+  return retval != TRNS_ERROR;
 }
 
 /* Add C to the lag queue. */
 }
 
 /* Add C to the lag queue. */
@@ -452,13 +424,8 @@ close_active_file (void)
       n_lag = 0;
     }
   
       n_lag = 0;
     }
   
-  /* Dictionary from before TEMPORARY becomes permanent.. */
-  if (temporary)
-    {
-      dict_destroy (default_dict);
-      default_dict = temp_dict;
-      temp_dict = NULL;
-    }
+  /* Dictionary from before TEMPORARY becomes permanent. */
+  proc_cancel_temporary_transformations ();
 
   /* Finish compaction. */
   if (compactor != NULL) 
 
   /* Finish compaction. */
   if (compactor != NULL) 
@@ -479,16 +446,9 @@ close_active_file (void)
 
   /* Cancel TEMPORARY, PROCESS IF, FILTER, N OF CASES, vectors,
      and get rid of all the transformations. */
 
   /* Cancel TEMPORARY, PROCESS IF, FILTER, N OF CASES, vectors,
      and get rid of all the transformations. */
-  cancel_temporary ();
-  expr_free (process_if_expr);
-  process_if_expr = NULL;
-  dict_set_case_limit (default_dict, 0);
   dict_clear_vectors (default_dict);
   dict_clear_vectors (default_dict);
-
-  assert (in_procedure);
-  in_procedure = false;
-
-  return cancel_transformations ();
+  permanent_dict = NULL;
+  return proc_cancel_all_transformations ();
 }
 \f
 /* Returns a pointer to the lagged case from N_BEFORE cases before the
 }
 \f
 /* Returns a pointer to the lagged case from N_BEFORE cases before the
@@ -509,56 +469,6 @@ lagged_case (int n_before)
   else
     return NULL;
 }
   else
     return NULL;
 }
-   
-/* Appends TRNS to t_trns[], the list of all transformations to be
-   performed on data as it is read from the active file. */
-void
-add_transformation (trns_proc_func *proc, trns_free_func *free, void *private)
-{
-  struct transformation *trns;
-
-  assert (!in_procedure);
-
-  if (n_trns >= m_trns)
-    t_trns = x2nrealloc (t_trns, &m_trns, sizeof *t_trns);
-  trns = &t_trns[n_trns++];
-  trns->proc = proc;
-  trns->free = free;
-  trns->private = private;
-}
-
-/* Returns the index number that the next transformation added by
-   add_transformation() will receive.  A trns_proc_func that
-   returns this index causes control flow to jump to it. */
-size_t
-next_transformation (void) 
-{
-  return n_trns;
-}
-
-/* Cancels all active transformations, including any transformations
-   created by the input program.
-   Returns true if successful, false if an I/O error occurred. */
-bool
-cancel_transformations (void)
-{
-  bool ok = true;
-  size_t i;
-  for (i = 0; i < n_trns; i++)
-    {
-      struct transformation *t = &t_trns[i];
-      if (t->free != NULL) 
-        {
-          if (!t->free (t->private))
-            ok = false; 
-        }
-    }
-  n_trns = f_trns = 0;
-  free (t_trns);
-  t_trns = NULL;
-  m_trns = 0;
-  return ok;
-}
 \f
 /* Represents auxiliary data for handling SPLIT FILE. */
 struct split_aux_data 
 \f
 /* Represents auxiliary data for handling SPLIT FILE. */
 struct split_aux_data 
@@ -714,7 +624,7 @@ struct multipass_split_aux_data
   };
 
 static bool multipass_split_callback (struct ccase *c, void *aux_);
   };
 
 static bool multipass_split_callback (struct ccase *c, void *aux_);
-static void multipass_split_output (struct multipass_split_aux_data *);
+static bool multipass_split_output (struct multipass_split_aux_data *);
 
 /* Returns true if successful, false if an I/O error occurred. */
 bool
 
 /* Returns true if successful, false if an I/O error occurred. */
 bool
@@ -736,7 +646,7 @@ multipass_procedure_with_splits (bool (*split_func) (const struct casefile *,
 
   ok = internal_procedure (multipass_split_callback, &aux);
   if (aux.casefile != NULL)
 
   ok = internal_procedure (multipass_split_callback, &aux);
   if (aux.casefile != NULL)
-    multipass_split_output (&aux);
+    ok = multipass_split_output (&aux) && ok;
   case_destroy (&aux.prev_case);
 
   if (!close_active_file ())
   case_destroy (&aux.prev_case);
 
   if (!close_active_file ())
@@ -750,13 +660,14 @@ static bool
 multipass_split_callback (struct ccase *c, void *aux_)
 {
   struct multipass_split_aux_data *aux = aux_;
 multipass_split_callback (struct ccase *c, void *aux_)
 {
   struct multipass_split_aux_data *aux = aux_;
+  bool ok = true;
 
   /* Start a new series if needed. */
   if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case))
     {
       /* Pass any cases to split_func. */
       if (aux->casefile != NULL)
 
   /* Start a new series if needed. */
   if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case))
     {
       /* Pass any cases to split_func. */
       if (aux->casefile != NULL)
-        multipass_split_output (aux);
+        ok = multipass_split_output (aux);
 
       /* Start a new casefile. */
       aux->casefile = casefile_create (dict_get_next_value_idx (default_dict));
 
       /* Start a new casefile. */
       aux->casefile = casefile_create (dict_get_next_value_idx (default_dict));
@@ -767,18 +678,21 @@ multipass_split_callback (struct ccase *c, void *aux_)
       case_clone (&aux->prev_case, c);
     }
 
       case_clone (&aux->prev_case, c);
     }
 
-  return casefile_append (aux->casefile, c);
+  return casefile_append (aux->casefile, c) && ok;
 }
 
 }
 
-static void
+static bool
 multipass_split_output (struct multipass_split_aux_data *aux)
 {
 multipass_split_output (struct multipass_split_aux_data *aux)
 {
+  bool ok;
+  
   assert (aux->casefile != NULL);
   assert (aux->casefile != NULL);
-  aux->split_func (aux->casefile, aux->func_aux);
+  ok = aux->split_func (aux->casefile, aux->func_aux);
   casefile_destroy (aux->casefile);
   aux->casefile = NULL;
   casefile_destroy (aux->casefile);
   aux->casefile = NULL;
-}
 
 
+  return ok;
+}
 
 /* Discards all the current state in preparation for a data-input
    command like DATA LIST or GET. */
 
 /* Discards all the current state in preparation for a data-input
    command like DATA LIST or GET. */
@@ -796,12 +710,307 @@ discard_variables (void)
       vfm_source = NULL;
     }
 
       vfm_source = NULL;
     }
 
-  cancel_transformations ();
-
-  ctl_stack_clear ();
+  proc_cancel_all_transformations ();
 
   expr_free (process_if_expr);
   process_if_expr = NULL;
 
 
   expr_free (process_if_expr);
   process_if_expr = NULL;
 
-  cancel_temporary ();
+  proc_cancel_temporary_transformations ();
+}
+\f
+/* Returns the current set of permanent transformations,
+   and clears the permanent transformations.
+   For use by INPUT PROGRAM. */
+struct trns_chain *
+proc_capture_transformations (void) 
+{
+  struct trns_chain *chain;
+  
+  assert (temporary_trns_chain == NULL);
+  chain = permanent_trns_chain;
+  cur_trns_chain = permanent_trns_chain = trns_chain_create ();
+  return chain;
+}
+
+/* Adds a transformation that processes a case with PROC and
+   frees itself with FREE to the current set of transformations.
+   The functions are passed AUX as auxiliary data. */
+void
+add_transformation (trns_proc_func *proc, trns_free_func *free, void *aux)
+{
+  trns_chain_append (cur_trns_chain, NULL, proc, free, aux);
+}
+
+/* Adds a transformation that processes a case with PROC and
+   frees itself with FREE to the current set of transformations.
+   When parsing of the block of transformations is complete,
+   FINALIZE will be called.
+   The functions are passed AUX as auxiliary data. */
+void
+add_transformation_with_finalizer (trns_finalize_func *finalize,
+                                   trns_proc_func *proc,
+                                   trns_free_func *free, void *aux)
+{
+  trns_chain_append (cur_trns_chain, finalize, proc, free, aux);
+}
+
+/* Returns the index of the next transformation.
+   This value can be returned by a transformation procedure
+   function to indicate a "jump" to that transformation. */
+size_t
+next_transformation (void) 
+{
+  return trns_chain_next (cur_trns_chain);
+}
+
+/* Returns true if the next call to add_transformation() will add
+   a temporary transformation, false if it will add a permanent
+   transformation. */
+bool
+proc_in_temporary_transformations (void) 
+{
+  return temporary_trns_chain != NULL;
+}
+
+/* Marks the start of temporary transformations.
+   Further calls to add_transformation() will add temporary
+   transformations. */
+void
+proc_start_temporary_transformations (void) 
+{
+  if (!proc_in_temporary_transformations ())
+    {
+      add_case_limit_trns ();
+
+      permanent_dict = dict_clone (default_dict);
+      trns_chain_finalize (permanent_trns_chain);
+      temporary_trns_chain = cur_trns_chain = trns_chain_create ();
+    }
+}
+
+/* Converts all the temporary transformations, if any, to
+   permanent transformations.  Further transformations will be
+   permanent.
+   Returns true if anything changed, false otherwise. */
+bool
+proc_make_temporary_transformations_permanent (void) 
+{
+  if (proc_in_temporary_transformations ()) 
+    {
+      trns_chain_finalize (temporary_trns_chain);
+      trns_chain_splice (permanent_trns_chain, temporary_trns_chain);
+      temporary_trns_chain = NULL;
+
+      dict_destroy (permanent_dict);
+      permanent_dict = NULL;
+
+      return true;
+    }
+  else
+    return false;
+}
+
+/* Cancels all temporary transformations, if any.  Further
+   transformations will be permanent.
+   Returns true if anything changed, false otherwise. */
+bool
+proc_cancel_temporary_transformations (void) 
+{
+  if (proc_in_temporary_transformations ()) 
+    {
+      dict_destroy (default_dict);
+      default_dict = permanent_dict;
+      permanent_dict = NULL;
+
+      trns_chain_destroy (temporary_trns_chain);
+      temporary_trns_chain = NULL;
+
+      return true;
+    }
+  else
+    return false;
+}
+
+/* Cancels all transformations, if any.
+   Returns true if successful, false on I/O error. */
+bool
+proc_cancel_all_transformations (void)
+{
+  bool ok;
+  ok = trns_chain_destroy (permanent_trns_chain);
+  ok = trns_chain_destroy (temporary_trns_chain) && ok;
+  permanent_trns_chain = cur_trns_chain = trns_chain_create ();
+  temporary_trns_chain = NULL;
+  return ok;
+}
+\f
+/* Initializes procedure handling. */
+void
+proc_init (void) 
+{
+  default_dict = dict_create ();
+  proc_cancel_all_transformations ();
+}
+
+/* Finishes up procedure handling. */
+void
+proc_done (void)
+{
+  discard_variables ();
+}
+
+/* Sets SINK as the destination for procedure output from the
+   next procedure. */
+void
+proc_set_sink (struct case_sink *sink) 
+{
+  assert (vfm_sink == NULL);
+  vfm_sink = sink;
+}
+
+/* Sets SOURCE as the source for procedure input for the next
+   procedure. */
+void
+proc_set_source (struct case_source *source) 
+{
+  assert (vfm_source == NULL);
+  vfm_source = source;
+}
+
+/* Returns true if a source for the next procedure has been
+   configured, false otherwise. */
+bool
+proc_has_source (void) 
+{
+  return vfm_source != NULL;
+}
+
+/* Returns the output from the previous procedure.
+   For use only immediately after executing a procedure.
+   The returned casefile is owned by the caller; it will not be
+   automatically used for the next procedure's input. */
+struct casefile *
+proc_capture_output (void) 
+{
+  struct casefile *casefile;
+
+  /* Try to make sure that this function is called immediately
+     after procedure() or a similar function. */
+  assert (vfm_source != NULL);
+  assert (case_source_is_class (vfm_source, &storage_source_class));
+  assert (trns_chain_is_empty (permanent_trns_chain));
+  assert (!proc_in_temporary_transformations ());
+
+  casefile = storage_source_decapsulate (vfm_source);
+  vfm_source = NULL;
+
+  return casefile;
+}
+\f
+static trns_proc_func case_limit_trns_proc;
+static trns_free_func case_limit_trns_free;
+
+/* Adds a transformation that limits the number of cases that may
+   pass through, if default_dict has a case limit. */
+static void
+add_case_limit_trns (void) 
+{
+  size_t case_limit = dict_get_case_limit (default_dict);
+  if (case_limit != 0)
+    {
+      size_t *cases_remaining = xmalloc (sizeof *cases_remaining);
+      *cases_remaining = case_limit;
+      add_transformation (case_limit_trns_proc, case_limit_trns_free,
+                          cases_remaining);
+      dict_set_case_limit (default_dict, 0);
+    }
+}
+
+/* Limits the maximum number of cases processed to
+   *CASES_REMAINING. */
+static int
+case_limit_trns_proc (void *cases_remaining_,
+                      struct ccase *c UNUSED, int case_nr UNUSED) 
+{
+  size_t *cases_remaining = cases_remaining_;
+  if (*cases_remaining > 0) 
+    {
+      *cases_remaining--;
+      return TRNS_CONTINUE;
+    }
+  else
+    return TRNS_DROP_CASE;
+}
+
+/* Frees the data associated with a case limit transformation. */
+static bool
+case_limit_trns_free (void *cases_remaining_) 
+{
+  size_t *cases_remaining = cases_remaining_;
+  free (cases_remaining);
+  return true;
+}
+\f
+static trns_proc_func filter_trns_proc;
+
+/* Adds a temporary transformation to filter data according to
+   the variable specified on FILTER, if any. */
+static void
+add_filter_trns (void) 
+{
+  struct variable *filter_var = dict_get_filter (default_dict);
+  if (filter_var != NULL) 
+    {
+      proc_start_temporary_transformations ();
+      add_transformation (filter_trns_proc, NULL, filter_var);
+    }
+}
+
+/* FILTER transformation. */
+static int
+filter_trns_proc (void *filter_var_,
+                  struct ccase *c UNUSED, int case_nr UNUSED) 
+  
+{
+  struct variable *filter_var = filter_var_;
+  double f = case_num (c, filter_var->fv);
+  return (f != 0.0 && !mv_is_num_missing (&filter_var->miss, f)
+          ? TRNS_CONTINUE : TRNS_DROP_CASE);
+}
+\f
+static trns_proc_func process_if_trns_proc;
+static trns_free_func process_if_trns_free;
+
+/* Adds a temporary transformation to filter data according to
+   the expression specified on PROCESS IF, if any. */
+static void
+add_process_if_trns (void) 
+{
+  if (process_if_expr != NULL) 
+    {
+      proc_start_temporary_transformations ();
+      add_transformation (process_if_trns_proc, process_if_trns_free,
+                          process_if_expr);
+      process_if_expr = NULL;
+    }
+}
+
+/* PROCESS IF transformation. */
+static int
+process_if_trns_proc (void *expression_,
+                      struct ccase *c UNUSED, int case_nr UNUSED) 
+  
+{
+  struct expression *expression = expression_;
+  return (expr_evaluate_num (expression, c, case_nr) == 1.0
+          ? TRNS_CONTINUE : TRNS_DROP_CASE);
+}
+
+/* Frees a PROCESS IF transformation. */
+static bool
+process_if_trns_free (void *expression_) 
+{
+  struct expression *expression = expression_;
+  expr_free (expression);
+  return true;
 }
 }