Continue reforming procedure execution. In this phase, get rid of
[pspp-builds.git] / src / procedure.c
index 67f22c5c0dfca5a0e0442feb0a8888973bd222a1..9b94da01910e4743c3cc58955d7024e5ef2637b9 100644 (file)
@@ -35,9 +35,9 @@
 #include <data/file-handle-def.h>
 #include <data/settings.h>
 #include <data/storage-stream.h>
+#include <data/transformations.h>
 #include <data/value-labels.h>
 #include <data/variable.h>
-#include <language/control/control-stack.h>
 #include <libpspp/alloc.h>
 #include <libpspp/message.h>
 #include <libpspp/misc.h>
@@ -71,32 +71,40 @@ struct write_case_data
     size_t cases_analyzed;      /* Cases passed to procedure so far. */
   };
 
-/* The current active file, from which cases are read. */
-struct case_source *vfm_source;
-
-/* The replacement active file, to which cases are written. */
-struct case_sink *vfm_sink;
-
-/* The compactor used to compact a compact, if necessary;
+/* Cases are read from vfm_source,
+   pass through permanent_trns_chain (which transforms them into
+   the format described by permanent_dict),
+   are written to vfm_sink,
+   pass through temporary_trns_chain (which transforms them into
+   the format described by default_dict),
+   and are finally passed to the procedure. */
+static struct case_source *vfm_source;
+static struct trns_chain *permanent_trns_chain;
+static struct dictionary *permanent_dict;
+static struct case_sink *vfm_sink;
+static struct trns_chain *temporary_trns_chain;
+struct dictionary *default_dict;
+
+/* The transformation chain that the next transformation will be
+   added to. */
+static struct trns_chain *cur_trns_chain;
+
+/* The compactor used to compact a case, if necessary;
    otherwise a null pointer. */
 static struct dict_compactor *compactor;
 
 /* Time at which vfm was last invoked. */
 static time_t last_vfm_invocation;
 
-/* Whether we're inside a procedure.
-   For debugging purposes only. */
-static bool in_procedure;
-
 /* Lag queue. */
 int n_lag;                     /* Number of cases to lag. */
 static int lag_count;          /* Number of cases in lag_queue so far. */
 static int lag_head;           /* Index where next case will be added. */
 static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
 
-/* Active transformations. */
-struct transformation *t_trns;
-size_t n_trns, m_trns, f_trns;
+static void add_case_limit_trns (void);
+static void add_filter_trns (void);
+static void add_process_if_trns (void);
 
 static bool internal_procedure (bool (*proc_func) (struct ccase *, void *),
                                 void *aux);
@@ -104,11 +112,6 @@ static void update_last_vfm_invocation (void);
 static void create_trns_case (struct ccase *, struct dictionary *);
 static void open_active_file (void);
 static bool write_case (struct write_case_data *wc_data);
-static int execute_transformations (struct ccase *c,
-                                    struct transformation *trns,
-                                    int first_idx, int last_idx,
-                                    int case_num);
-static int filter_case (const struct ccase *c, int case_num);
 static void lag_case (const struct ccase *c);
 static void clear_case (struct ccase *c);
 static bool close_active_file (void);
@@ -126,26 +129,17 @@ time_of_last_procedure (void)
 
 /* Reads the data from the input program and writes it to a new
    active file.  For each case we read from the input program, we
-   do the following
+   do the following:
 
    1. Execute permanent transformations.  If these drop the case,
       start the next case from step 1.
 
-   2. N OF CASES.  If we have already written N cases, start the
-      next case from step 1.
-   
-   3. Write case to replacement active file.
+   2. Write case to replacement active file.
    
-   4. Execute temporary transformations.  If these drop the case,
+   3. Execute temporary transformations.  If these drop the case,
       start the next case from step 1.
       
-   5. FILTER, PROCESS IF.  If these drop the case, start the next
-      case from step 1.
-   
-   6. Post-TEMPORARY N OF CASES.  If we have already analyzed N
-      cases, start the next case from step 1.
-      
-   7. Pass case to PROC_FUNC, passing AUX as auxiliary data.
+   4. Pass case to PROC_FUNC, passing AUX as auxiliary data.
 
    Returns true if successful, false if an I/O error occurred. */
 bool
@@ -154,10 +148,14 @@ procedure (bool (*proc_func) (struct ccase *, void *), void *aux)
   if (proc_func == NULL
       && case_source_is_class (vfm_source, &storage_source_class)
       && vfm_sink == NULL
-      && !temporary
-      && n_trns == 0)
+      && temporary_trns_chain == NULL
+      && trns_chain_is_empty (permanent_trns_chain))
     {
-      /* Nothing to do. */
+      expr_free (process_if_expr);
+      process_if_expr = NULL;
+      dict_set_case_limit (default_dict, 0);
+      dict_clear_vectors (default_dict);
+
       update_last_vfm_invocation ();
       return true;
     }
@@ -167,8 +165,57 @@ procedure (bool (*proc_func) (struct ccase *, void *), void *aux)
       
       open_active_file ();
       ok = internal_procedure (proc_func, aux);
-      if (!close_active_file ())
-        ok = false;
+      ok = close_active_file () && ok;
+
+      return ok;
+    }
+}
+
+/* Callback function for multipass_procedure(). */
+static bool
+multipass_callback (struct ccase *c, void *cf_) 
+{
+  struct casefile *cf = cf_;
+  return casefile_append (cf, c);
+}
+
+/* Procedure that allows multiple passes over the input data.
+   The entire active file is passed to PROC_FUNC, with the given
+   AUX as auxiliary data, as a unit. */
+bool
+multipass_procedure (bool (*proc_func) (const struct casefile *, void *aux),
+                     void *aux) 
+{
+  if (case_source_is_class (vfm_source, &storage_source_class)
+      && vfm_sink == NULL
+      && temporary_trns_chain == NULL
+      && trns_chain_is_empty (permanent_trns_chain))
+    {
+      proc_func (storage_source_get_casefile (vfm_source), aux);
+
+      expr_free (process_if_expr);
+      process_if_expr = NULL;
+      dict_set_case_limit (default_dict, 0);
+      dict_clear_vectors (default_dict);
+
+      update_last_vfm_invocation ();
+      return true;
+    }
+  else 
+    {
+      struct casefile *cf;
+      bool ok;
+
+      assert (proc_func != NULL);
+
+      cf = casefile_create (dict_get_next_value_idx (default_dict));
+
+      open_active_file ();
+      ok = internal_procedure (multipass_callback, cf);
+      ok = proc_func (cf, aux) && ok;
+      ok = close_active_file () && ok;
+
+      casefile_destroy (cf);
 
       return ok;
     }
@@ -237,25 +284,26 @@ create_trns_case (struct ccase *trns_case, struct dictionary *dict)
 static void
 open_active_file (void)
 {
-  assert (!in_procedure);
-  in_procedure = true;
+  add_case_limit_trns ();
+  add_filter_trns ();
+  add_process_if_trns ();
 
-  /* Make temp_dict refer to the dictionary right before data
-     reaches the sink */
-  if (!temporary)
-    {
-      temp_trns = n_trns;
-      temp_dict = default_dict;
-    }
+  /* Finalize transformations. */
+  trns_chain_finalize (cur_trns_chain);
+
+  /* Make permanent_dict refer to the dictionary right before
+     data reaches the sink. */
+  if (permanent_dict == NULL)
+    permanent_dict = default_dict;
 
   /* Figure out compaction. */
-  compactor = (dict_needs_compaction (temp_dict)
-               ? dict_make_compactor (temp_dict)
+  compactor = (dict_needs_compaction (permanent_dict)
+               ? dict_make_compactor (permanent_dict)
                : NULL);
 
   /* Prepare sink. */
   if (vfm_sink == NULL)
-    vfm_sink = create_case_sink (&storage_sink_class, temp_dict, NULL);
+    vfm_sink = create_case_sink (&storage_sink_class, permanent_dict, NULL);
   if (vfm_sink->class->open != NULL)
     vfm_sink->class->open (vfm_sink);
 
@@ -270,9 +318,6 @@ open_active_file (void)
       for (i = 0; i < n_lag; i++)
         case_nullify (&lag_queue[i]);
     }
-
-  /* Close any unclosed DO IF or LOOP constructs. */
-  ctl_stack_clear ();
 }
 
 /* Transforms trns_case and writes it to the replacement active
@@ -282,25 +327,22 @@ open_active_file (void)
 static bool
 write_case (struct write_case_data *wc_data)
 {
-  int retval;
+  enum trns_result retval;
+  size_t case_nr;
   
   /* Execute permanent transformations.  */
-  retval = execute_transformations (&wc_data->trns_case, t_trns, f_trns,
-                                    temp_trns, wc_data->cases_written + 1);
-  if (retval != 1)
+  case_nr = wc_data->cases_written + 1;
+  retval = trns_chain_execute (permanent_trns_chain,
+                               &wc_data->trns_case, &case_nr);
+  if (retval != TRNS_CONTINUE)
     goto done;
 
-  /* N OF CASES. */
-  if (dict_get_case_limit (default_dict)
-      && wc_data->cases_written >= dict_get_case_limit (default_dict))
-    goto done;
-  wc_data->cases_written++;
-
   /* Write case to LAG queue. */
   if (n_lag)
     lag_case (&wc_data->trns_case);
 
   /* Write case to replacement active file. */
+  wc_data->cases_written++;
   if (vfm_sink->class->write != NULL) 
     {
       if (compactor != NULL) 
@@ -314,94 +356,24 @@ write_case (struct write_case_data *wc_data)
     }
   
   /* Execute temporary transformations. */
-  retval = execute_transformations (&wc_data->trns_case, t_trns, temp_trns,
-                                    n_trns, wc_data->cases_written);
-  if (retval != 1)
-    goto done;
-  
-  /* FILTER, PROCESS IF, post-TEMPORARY N OF CASES. */
-  if (filter_case (&wc_data->trns_case, wc_data->cases_written)
-      || (dict_get_case_limit (temp_dict)
-          && wc_data->cases_analyzed >= dict_get_case_limit (temp_dict)))
-    goto done;
-  wc_data->cases_analyzed++;
+  if (temporary_trns_chain != NULL) 
+    {
+      retval = trns_chain_execute (temporary_trns_chain,
+                                   &wc_data->trns_case,
+                                   &wc_data->cases_written);
+      if (retval != TRNS_CONTINUE)
+        goto done;
+    }
 
   /* Pass case to procedure. */
+  wc_data->cases_analyzed++;
   if (wc_data->proc_func != NULL)
     if (!wc_data->proc_func (&wc_data->trns_case, wc_data->aux))
-      retval = -1;
+      retval = TRNS_ERROR;
 
  done:
   clear_case (&wc_data->trns_case);
-  return retval != -1;
-}
-
-/* Transforms case C using the transformations in TRNS[] with
-   indexes FIRST_IDX through LAST_IDX, exclusive.  Case C will
-   become case CASE_NUM (1-based) in the output file.  Returns 1
-   if the case was successfully transformed, 0 if it was filtered
-   out by one of the transformations, or -1 if the procedure
-   should be abandoned due to a fatal error. */
-static int
-execute_transformations (struct ccase *c,
-                         struct transformation *trns,
-                         int first_idx, int last_idx,
-                         int case_num) 
-{
-  int idx;
-
-  for (idx = first_idx; idx != last_idx; )
-    {
-      struct transformation *t = &trns[idx];
-      int retval = t->proc (t->private, c, case_num);
-      switch (retval)
-        {
-        case TRNS_CONTINUE:
-          idx++;
-          break;
-          
-        case TRNS_DROP_CASE:
-          return 0;
-
-        case TRNS_ERROR:
-          return -1;
-
-        case TRNS_NEXT_CASE:
-          abort ();
-
-        case TRNS_END_FILE:
-          abort ();
-          
-        default:
-          idx = retval;
-          break;
-        }
-    }
-
-  return 1;
-}
-
-/* Returns nonzero if case C with case number CASE_NUM should be
-   excluded as specified on FILTER or PROCESS IF, otherwise
-   zero. */
-static int
-filter_case (const struct ccase *c, int case_idx)
-{
-  /* FILTER. */
-  struct variable *filter_var = dict_get_filter (default_dict);
-  if (filter_var != NULL) 
-    {
-      double f = case_num (c, filter_var->fv);
-      if (f == 0.0 || mv_is_num_missing (&filter_var->miss, f))
-        return 1;
-    }
-
-  /* PROCESS IF. */
-  if (process_if_expr != NULL
-      && expr_evaluate_num (process_if_expr, c, case_idx) != 1.0)
-    return 1;
-
-  return 0;
+  return retval != TRNS_ERROR;
 }
 
 /* Add C to the lag queue. */
@@ -452,13 +424,8 @@ close_active_file (void)
       n_lag = 0;
     }
   
-  /* Dictionary from before TEMPORARY becomes permanent.. */
-  if (temporary)
-    {
-      dict_destroy (default_dict);
-      default_dict = temp_dict;
-      temp_dict = NULL;
-    }
+  /* Dictionary from before TEMPORARY becomes permanent. */
+  proc_cancel_temporary_transformations ();
 
   /* Finish compaction. */
   if (compactor != NULL) 
@@ -479,16 +446,9 @@ close_active_file (void)
 
   /* Cancel TEMPORARY, PROCESS IF, FILTER, N OF CASES, vectors,
      and get rid of all the transformations. */
-  cancel_temporary ();
-  expr_free (process_if_expr);
-  process_if_expr = NULL;
-  dict_set_case_limit (default_dict, 0);
   dict_clear_vectors (default_dict);
-
-  assert (in_procedure);
-  in_procedure = false;
-
-  return cancel_transformations ();
+  permanent_dict = NULL;
+  return proc_cancel_all_transformations ();
 }
 \f
 /* Returns a pointer to the lagged case from N_BEFORE cases before the
@@ -509,56 +469,6 @@ lagged_case (int n_before)
   else
     return NULL;
 }
-   
-/* Appends TRNS to t_trns[], the list of all transformations to be
-   performed on data as it is read from the active file. */
-void
-add_transformation (trns_proc_func *proc, trns_free_func *free, void *private)
-{
-  struct transformation *trns;
-
-  assert (!in_procedure);
-
-  if (n_trns >= m_trns)
-    t_trns = x2nrealloc (t_trns, &m_trns, sizeof *t_trns);
-  trns = &t_trns[n_trns++];
-  trns->proc = proc;
-  trns->free = free;
-  trns->private = private;
-}
-
-/* Returns the index number that the next transformation added by
-   add_transformation() will receive.  A trns_proc_func that
-   returns this index causes control flow to jump to it. */
-size_t
-next_transformation (void) 
-{
-  return n_trns;
-}
-
-/* Cancels all active transformations, including any transformations
-   created by the input program.
-   Returns true if successful, false if an I/O error occurred. */
-bool
-cancel_transformations (void)
-{
-  bool ok = true;
-  size_t i;
-  for (i = 0; i < n_trns; i++)
-    {
-      struct transformation *t = &t_trns[i];
-      if (t->free != NULL) 
-        {
-          if (!t->free (t->private))
-            ok = false; 
-        }
-    }
-  n_trns = f_trns = 0;
-  free (t_trns);
-  t_trns = NULL;
-  m_trns = 0;
-  return ok;
-}
 \f
 /* Represents auxiliary data for handling SPLIT FILE. */
 struct split_aux_data 
@@ -714,7 +624,7 @@ struct multipass_split_aux_data
   };
 
 static bool multipass_split_callback (struct ccase *c, void *aux_);
-static void multipass_split_output (struct multipass_split_aux_data *);
+static bool multipass_split_output (struct multipass_split_aux_data *);
 
 /* Returns true if successful, false if an I/O error occurred. */
 bool
@@ -736,7 +646,7 @@ multipass_procedure_with_splits (bool (*split_func) (const struct casefile *,
 
   ok = internal_procedure (multipass_split_callback, &aux);
   if (aux.casefile != NULL)
-    multipass_split_output (&aux);
+    ok = multipass_split_output (&aux) && ok;
   case_destroy (&aux.prev_case);
 
   if (!close_active_file ())
@@ -750,13 +660,14 @@ static bool
 multipass_split_callback (struct ccase *c, void *aux_)
 {
   struct multipass_split_aux_data *aux = aux_;
+  bool ok = true;
 
   /* Start a new series if needed. */
   if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case))
     {
       /* Pass any cases to split_func. */
       if (aux->casefile != NULL)
-        multipass_split_output (aux);
+        ok = multipass_split_output (aux);
 
       /* Start a new casefile. */
       aux->casefile = casefile_create (dict_get_next_value_idx (default_dict));
@@ -767,18 +678,21 @@ multipass_split_callback (struct ccase *c, void *aux_)
       case_clone (&aux->prev_case, c);
     }
 
-  return casefile_append (aux->casefile, c);
+  return casefile_append (aux->casefile, c) && ok;
 }
 
-static void
+static bool
 multipass_split_output (struct multipass_split_aux_data *aux)
 {
+  bool ok;
+  
   assert (aux->casefile != NULL);
-  aux->split_func (aux->casefile, aux->func_aux);
+  ok = aux->split_func (aux->casefile, aux->func_aux);
   casefile_destroy (aux->casefile);
   aux->casefile = NULL;
-}
 
+  return ok;
+}
 
 /* Discards all the current state in preparation for a data-input
    command like DATA LIST or GET. */
@@ -796,12 +710,307 @@ discard_variables (void)
       vfm_source = NULL;
     }
 
-  cancel_transformations ();
-
-  ctl_stack_clear ();
+  proc_cancel_all_transformations ();
 
   expr_free (process_if_expr);
   process_if_expr = NULL;
 
-  cancel_temporary ();
+  proc_cancel_temporary_transformations ();
+}
+\f
+/* Returns the current set of permanent transformations,
+   and clears the permanent transformations.
+   For use by INPUT PROGRAM. */
+struct trns_chain *
+proc_capture_transformations (void) 
+{
+  struct trns_chain *chain;
+  
+  assert (temporary_trns_chain == NULL);
+  chain = permanent_trns_chain;
+  cur_trns_chain = permanent_trns_chain = trns_chain_create ();
+  return chain;
+}
+
+/* Adds a transformation that processes a case with PROC and
+   frees itself with FREE to the current set of transformations.
+   The functions are passed AUX as auxiliary data. */
+void
+add_transformation (trns_proc_func *proc, trns_free_func *free, void *aux)
+{
+  trns_chain_append (cur_trns_chain, NULL, proc, free, aux);
+}
+
+/* Adds a transformation that processes a case with PROC and
+   frees itself with FREE to the current set of transformations.
+   When parsing of the block of transformations is complete,
+   FINALIZE will be called.
+   The functions are passed AUX as auxiliary data. */
+void
+add_transformation_with_finalizer (trns_finalize_func *finalize,
+                                   trns_proc_func *proc,
+                                   trns_free_func *free, void *aux)
+{
+  trns_chain_append (cur_trns_chain, finalize, proc, free, aux);
+}
+
+/* Returns the index of the next transformation.
+   This value can be returned by a transformation procedure
+   function to indicate a "jump" to that transformation. */
+size_t
+next_transformation (void) 
+{
+  return trns_chain_next (cur_trns_chain);
+}
+
+/* Returns true if the next call to add_transformation() will add
+   a temporary transformation, false if it will add a permanent
+   transformation. */
+bool
+proc_in_temporary_transformations (void) 
+{
+  return temporary_trns_chain != NULL;
+}
+
+/* Marks the start of temporary transformations.
+   Further calls to add_transformation() will add temporary
+   transformations. */
+void
+proc_start_temporary_transformations (void) 
+{
+  if (!proc_in_temporary_transformations ())
+    {
+      add_case_limit_trns ();
+
+      permanent_dict = dict_clone (default_dict);
+      trns_chain_finalize (permanent_trns_chain);
+      temporary_trns_chain = cur_trns_chain = trns_chain_create ();
+    }
+}
+
+/* Converts all the temporary transformations, if any, to
+   permanent transformations.  Further transformations will be
+   permanent.
+   Returns true if anything changed, false otherwise. */
+bool
+proc_make_temporary_transformations_permanent (void) 
+{
+  if (proc_in_temporary_transformations ()) 
+    {
+      trns_chain_finalize (temporary_trns_chain);
+      trns_chain_splice (permanent_trns_chain, temporary_trns_chain);
+      temporary_trns_chain = NULL;
+
+      dict_destroy (permanent_dict);
+      permanent_dict = NULL;
+
+      return true;
+    }
+  else
+    return false;
+}
+
+/* Cancels all temporary transformations, if any.  Further
+   transformations will be permanent.
+   Returns true if anything changed, false otherwise. */
+bool
+proc_cancel_temporary_transformations (void) 
+{
+  if (proc_in_temporary_transformations ()) 
+    {
+      dict_destroy (default_dict);
+      default_dict = permanent_dict;
+      permanent_dict = NULL;
+
+      trns_chain_destroy (temporary_trns_chain);
+      temporary_trns_chain = NULL;
+
+      return true;
+    }
+  else
+    return false;
+}
+
+/* Cancels all transformations, if any.
+   Returns true if successful, false on I/O error. */
+bool
+proc_cancel_all_transformations (void)
+{
+  bool ok;
+  ok = trns_chain_destroy (permanent_trns_chain);
+  ok = trns_chain_destroy (temporary_trns_chain) && ok;
+  permanent_trns_chain = cur_trns_chain = trns_chain_create ();
+  temporary_trns_chain = NULL;
+  return ok;
+}
+\f
+/* Initializes procedure handling. */
+void
+proc_init (void) 
+{
+  default_dict = dict_create ();
+  proc_cancel_all_transformations ();
+}
+
+/* Finishes up procedure handling. */
+void
+proc_done (void)
+{
+  discard_variables ();
+}
+
+/* Sets SINK as the destination for procedure output from the
+   next procedure. */
+void
+proc_set_sink (struct case_sink *sink) 
+{
+  assert (vfm_sink == NULL);
+  vfm_sink = sink;
+}
+
+/* Sets SOURCE as the source for procedure input for the next
+   procedure. */
+void
+proc_set_source (struct case_source *source) 
+{
+  assert (vfm_source == NULL);
+  vfm_source = source;
+}
+
+/* Returns true if a source for the next procedure has been
+   configured, false otherwise. */
+bool
+proc_has_source (void) 
+{
+  return vfm_source != NULL;
+}
+
+/* Returns the output from the previous procedure.
+   For use only immediately after executing a procedure.
+   The returned casefile is owned by the caller; it will not be
+   automatically used for the next procedure's input. */
+struct casefile *
+proc_capture_output (void) 
+{
+  struct casefile *casefile;
+
+  /* Try to make sure that this function is called immediately
+     after procedure() or a similar function. */
+  assert (vfm_source != NULL);
+  assert (case_source_is_class (vfm_source, &storage_source_class));
+  assert (trns_chain_is_empty (permanent_trns_chain));
+  assert (!proc_in_temporary_transformations ());
+
+  casefile = storage_source_decapsulate (vfm_source);
+  vfm_source = NULL;
+
+  return casefile;
+}
+\f
+static trns_proc_func case_limit_trns_proc;
+static trns_free_func case_limit_trns_free;
+
+/* Adds a transformation that limits the number of cases that may
+   pass through, if default_dict has a case limit. */
+static void
+add_case_limit_trns (void) 
+{
+  size_t case_limit = dict_get_case_limit (default_dict);
+  if (case_limit != 0)
+    {
+      size_t *cases_remaining = xmalloc (sizeof *cases_remaining);
+      *cases_remaining = case_limit;
+      add_transformation (case_limit_trns_proc, case_limit_trns_free,
+                          cases_remaining);
+      dict_set_case_limit (default_dict, 0);
+    }
+}
+
+/* Limits the maximum number of cases processed to
+   *CASES_REMAINING. */
+static int
+case_limit_trns_proc (void *cases_remaining_,
+                      struct ccase *c UNUSED, int case_nr UNUSED) 
+{
+  size_t *cases_remaining = cases_remaining_;
+  if (*cases_remaining > 0) 
+    {
+      *cases_remaining--;
+      return TRNS_CONTINUE;
+    }
+  else
+    return TRNS_DROP_CASE;
+}
+
+/* Frees the data associated with a case limit transformation. */
+static bool
+case_limit_trns_free (void *cases_remaining_) 
+{
+  size_t *cases_remaining = cases_remaining_;
+  free (cases_remaining);
+  return true;
+}
+\f
+static trns_proc_func filter_trns_proc;
+
+/* Adds a temporary transformation to filter data according to
+   the variable specified on FILTER, if any. */
+static void
+add_filter_trns (void) 
+{
+  struct variable *filter_var = dict_get_filter (default_dict);
+  if (filter_var != NULL) 
+    {
+      proc_start_temporary_transformations ();
+      add_transformation (filter_trns_proc, NULL, filter_var);
+    }
+}
+
+/* FILTER transformation. */
+static int
+filter_trns_proc (void *filter_var_,
+                  struct ccase *c UNUSED, int case_nr UNUSED) 
+  
+{
+  struct variable *filter_var = filter_var_;
+  double f = case_num (c, filter_var->fv);
+  return (f != 0.0 && !mv_is_num_missing (&filter_var->miss, f)
+          ? TRNS_CONTINUE : TRNS_DROP_CASE);
+}
+\f
+static trns_proc_func process_if_trns_proc;
+static trns_free_func process_if_trns_free;
+
+/* Adds a temporary transformation to filter data according to
+   the expression specified on PROCESS IF, if any. */
+static void
+add_process_if_trns (void) 
+{
+  if (process_if_expr != NULL) 
+    {
+      proc_start_temporary_transformations ();
+      add_transformation (process_if_trns_proc, process_if_trns_free,
+                          process_if_expr);
+      process_if_expr = NULL;
+    }
+}
+
+/* PROCESS IF transformation. */
+static int
+process_if_trns_proc (void *expression_,
+                      struct ccase *c UNUSED, int case_nr UNUSED) 
+  
+{
+  struct expression *expression = expression_;
+  return (expr_evaluate_num (expression, c, case_nr) == 1.0
+          ? TRNS_CONTINUE : TRNS_DROP_CASE);
+}
+
+/* Frees a PROCESS IF transformation. */
+static bool
+process_if_trns_free (void *expression_) 
+{
+  struct expression *expression = expression_;
+  expr_free (expression);
+  return true;
 }