Added casereader_clone function.
[pspp-builds.git] / src / data / procedure.c
index 1ecfd331a1f314571a79b1a68b9c74d4c491304c..a779771ca427d1e55befa6d51b955fe1b1bb98ce 100644 (file)
 #include <data/case-sink.h>
 #include <data/case.h>
 #include <data/casefile.h>
+#include <data/fastfile.h>
 #include <data/dictionary.h>
 #include <data/file-handle-def.h>
 #include <data/procedure.h>
 #include <data/storage-stream.h>
 #include <data/transformations.h>
 #include <data/variable.h>
-#include <language/expressions/public.h>
 #include <libpspp/alloc.h>
 #include <libpspp/misc.h>
 #include <libpspp/str.h>
 
-/*
-   Virtual File Manager (vfm):
-
-   vfm is used to process data files.  It uses the model that
-   data is read from one stream (the data source), processed,
-   then written to another (the data sink).  The data source is
-   then deleted and the data sink becomes the data source for the
-   next procedure. */
-
 /* Procedure execution data. */
 struct write_case_data
   {
     /* Function to call for each case. */
-    bool (*case_func) (struct ccase *, void *); /* Function. */
-    void *aux;                                 /* Auxiliary data. */ 
+    bool (*case_func) (const struct ccase *, void *);
+    void *aux;
 
     struct ccase trns_case;     /* Case used for transformations. */
     struct ccase sink_case;     /* Case written to sink, if
-                                   compaction is necessary. */
+                                   compacting is necessary. */
     size_t cases_written;       /* Cases output so far. */
   };
 
-/* Cases are read from vfm_source,
+/* Cases are read from proc_source,
    pass through permanent_trns_chain (which transforms them into
    the format described by permanent_dict),
-   are written to vfm_sink,
+   are written to proc_sink,
    pass through temporary_trns_chain (which transforms them into
    the format described by default_dict),
    and are finally passed to the procedure. */
-static struct case_source *vfm_source;
+static struct case_source *proc_source;
 static struct trns_chain *permanent_trns_chain;
 static struct dictionary *permanent_dict;
-static struct case_sink *vfm_sink;
+static struct case_sink *proc_sink;
 static struct trns_chain *temporary_trns_chain;
 struct dictionary *default_dict;
 
@@ -83,8 +74,8 @@ static struct trns_chain *cur_trns_chain;
    otherwise a null pointer. */
 static struct dict_compactor *compactor;
 
-/* Time at which vfm was last invoked. */
-static time_t last_vfm_invocation;
+/* Time at which proc was last invoked. */
+static time_t last_proc_invocation;
 
 /* Lag queue. */
 int n_lag;                     /* Number of cases to lag. */
@@ -94,12 +85,12 @@ static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
 
 static void add_case_limit_trns (void);
 static void add_filter_trns (void);
-static void add_process_if_trns (void);
 
-static bool internal_procedure (bool (*case_func) (struct ccase *, void *),
+static bool internal_procedure (bool (*case_func) (const struct ccase *,
+                                                   void *),
                                 bool (*end_func) (void *),
                                 void *aux);
-static void update_last_vfm_invocation (void);
+static void update_last_proc_invocation (void);
 static void create_trns_case (struct ccase *, struct dictionary *);
 static void open_active_file (void);
 static bool write_case (struct write_case_data *wc_data);
@@ -113,9 +104,9 @@ static bool close_active_file (void);
 time_t
 time_of_last_procedure (void) 
 {
-  if (last_vfm_invocation == 0)
-    update_last_vfm_invocation ();
-  return last_vfm_invocation;
+  if (last_proc_invocation == 0)
+    update_last_proc_invocation ();
+  return last_proc_invocation;
 }
 \f
 /* Regular procedure. */
@@ -136,7 +127,7 @@ time_of_last_procedure (void)
 
    Returns true if successful, false if an I/O error occurred. */
 bool
-procedure (bool (*proc_func) (struct ccase *, void *), void *aux)
+procedure (bool (*proc_func) (const struct ccase *, void *), void *aux)
 {
   return internal_procedure (proc_func, NULL, aux);
 }
@@ -153,7 +144,7 @@ struct multipass_aux_data
 
 /* Case processing function for multipass_procedure(). */
 static bool
-multipass_case_func (struct ccase *c, void *aux_data_) 
+multipass_case_func (const struct ccase *c, void *aux_data_) 
 {
   struct multipass_aux_data *aux_data = aux_data_;
   return casefile_append (aux_data->casefile, c);
@@ -178,7 +169,7 @@ multipass_procedure (bool (*proc_func) (const struct casefile *, void *aux),
   struct multipass_aux_data aux_data;
   bool ok;
 
-  aux_data.casefile = casefile_create (dict_get_next_value_idx (default_dict));
+  aux_data.casefile = fastfile_create (dict_get_next_value_idx (default_dict));
   aux_data.proc_func = proc_func;
   aux_data.aux = aux;
 
@@ -198,29 +189,27 @@ multipass_procedure (bool (*proc_func) (const struct casefile *, void *aux),
    Returns true if successful, false if an I/O error occurred (or
    if CASE_FUNC or END_FUNC ever returned false). */
 static bool
-internal_procedure (bool (*case_func) (struct ccase *, void *),
+internal_procedure (bool (*case_func) (const struct ccase *, void *),
                     bool (*end_func) (void *),
                     void *aux) 
 {
   struct write_case_data wc_data;
   bool ok = true;
 
-  assert (vfm_source != NULL);
+  assert (proc_source != NULL);
 
-  update_last_vfm_invocation ();
+  update_last_proc_invocation ();
 
   /* Optimize the trivial case where we're not going to do
      anything with the data, by not reading the data at all. */
   if (case_func == NULL && end_func == NULL
-      && case_source_is_class (vfm_source, &storage_source_class)
-      && vfm_sink == NULL
+      && case_source_is_class (proc_source, &storage_source_class)
+      && proc_sink == NULL
       && (temporary_trns_chain == NULL
           || trns_chain_is_empty (temporary_trns_chain))
       && trns_chain_is_empty (permanent_trns_chain))
     {
       n_lag = 0;
-      expr_free (process_if_expr);
-      process_if_expr = NULL;
       dict_set_case_limit (default_dict, 0);
       dict_clear_vectors (default_dict);
       return true;
@@ -231,12 +220,13 @@ internal_procedure (bool (*case_func) (struct ccase *, void *),
   wc_data.case_func = case_func;
   wc_data.aux = aux;
   create_trns_case (&wc_data.trns_case, default_dict);
-  case_create (&wc_data.sink_case, dict_get_next_value_idx (default_dict));
+  case_create (&wc_data.sink_case,
+               dict_get_compacted_value_cnt (default_dict));
   wc_data.cases_written = 0;
 
-  ok = vfm_source->class->read (vfm_source,
-                                &wc_data.trns_case,
-                                write_case, &wc_data) && ok;
+  ok = proc_source->class->read (proc_source,
+                                 &wc_data.trns_case,
+                                 write_case, &wc_data) && ok;
   if (end_func != NULL)
     ok = end_func (aux) && ok;
 
@@ -248,11 +238,11 @@ internal_procedure (bool (*case_func) (struct ccase *, void *),
   return ok;
 }
 
-/* Updates last_vfm_invocation. */
+/* Updates last_proc_invocation. */
 static void
-update_last_vfm_invocation (void) 
+update_last_proc_invocation (void) 
 {
-  last_vfm_invocation = time (NULL);
+  last_proc_invocation = time (NULL);
 }
 
 /* Creates and returns a case, initializing it from the vectors
@@ -284,7 +274,6 @@ open_active_file (void)
 {
   add_case_limit_trns ();
   add_filter_trns ();
-  add_process_if_trns ();
 
   /* Finalize transformations. */
   trns_chain_finalize (cur_trns_chain);
@@ -294,16 +283,16 @@ open_active_file (void)
   if (permanent_dict == NULL)
     permanent_dict = default_dict;
 
-  /* Figure out compaction. */
-  compactor = (dict_needs_compaction (permanent_dict)
+  /* Figure out whether to compact. */
+  compactor = (dict_compacting_would_shrink (permanent_dict)
                ? dict_make_compactor (permanent_dict)
                : NULL);
 
   /* Prepare sink. */
-  if (vfm_sink == NULL)
-    vfm_sink = create_case_sink (&storage_sink_class, permanent_dict, NULL);
-  if (vfm_sink->class->open != NULL)
-    vfm_sink->class->open (vfm_sink);
+  if (proc_sink == NULL)
+    proc_sink = create_case_sink (&storage_sink_class, permanent_dict, NULL);
+  if (proc_sink->class->open != NULL)
+    proc_sink->class->open (proc_sink);
 
   /* Allocate memory for lag queue. */
   if (n_lag > 0)
@@ -341,16 +330,16 @@ write_case (struct write_case_data *wc_data)
 
   /* Write case to replacement active file. */
   wc_data->cases_written++;
-  if (vfm_sink->class->write != NULL) 
+  if (proc_sink->class->write != NULL) 
     {
       if (compactor != NULL) 
         {
           dict_compactor_compact (compactor, &wc_data->sink_case,
                                   &wc_data->trns_case);
-          vfm_sink->class->write (vfm_sink, &wc_data->sink_case);
+          proc_sink->class->write (proc_sink, &wc_data->sink_case);
         }
       else
-        vfm_sink->class->write (vfm_sink, &wc_data->trns_case);
+        proc_sink->class->write (proc_sink, &wc_data->trns_case);
     }
   
   /* Execute temporary transformations. */
@@ -424,7 +413,7 @@ close_active_file (void)
   /* Dictionary from before TEMPORARY becomes permanent. */
   proc_cancel_temporary_transformations ();
 
-  /* Finish compaction. */
+  /* Finish compacting. */
   if (compactor != NULL) 
     {
       dict_compactor_destroy (compactor);
@@ -433,17 +422,15 @@ close_active_file (void)
     }
     
   /* Free data source. */
-  free_case_source (vfm_source);
-  vfm_source = NULL;
+  free_case_source (proc_source);
+  proc_source = NULL;
 
   /* Old data sink becomes new data source. */
-  if (vfm_sink->class->make_source != NULL)
-    vfm_source = vfm_sink->class->make_source (vfm_sink);
-  free_case_sink (vfm_sink);
-  vfm_sink = NULL;
+  if (proc_sink->class->make_source != NULL)
+    proc_source = proc_sink->class->make_source (proc_sink);
+  free_case_sink (proc_sink);
+  proc_sink = NULL;
 
-  /* Cancel TEMPORARY, PROCESS IF, FILTER, N OF CASES, vectors,
-     and get rid of all the transformations. */
   dict_clear_vectors (default_dict);
   permanent_dict = NULL;
   return proc_cancel_all_transformations ();
@@ -473,7 +460,6 @@ lagged_case (int n_before)
 /* Represents auxiliary data for handling SPLIT FILE. */
 struct split_aux_data 
   {
-    size_t case_count;          /* Number of cases so far. */
     struct ccase prev_case;     /* Data in previous case. */
 
     /* Callback functions. */
@@ -484,8 +470,8 @@ struct split_aux_data
   };
 
 static int equal_splits (const struct ccase *, const struct ccase *);
-static bool split_procedure_case_func (struct ccase *c, void *split_aux_);
-static bool split_procedure_end_func (void *split_aux_);
+static bool split_procedure_case_func (const struct ccase *c, void *);
+static bool split_procedure_end_func (void *);
 
 /* Like procedure(), but it automatically breaks the case stream
    into SPLIT FILE break groups.  Before each group of cases with
@@ -513,7 +499,6 @@ procedure_with_splits (void (*begin_func) (const struct ccase *, void *aux),
   struct split_aux_data split_aux;
   bool ok;
 
-  split_aux.case_count = 0;
   case_nullify (&split_aux.prev_case);
   split_aux.begin_func = begin_func;
   split_aux.proc_func = proc_func;
@@ -530,15 +515,15 @@ procedure_with_splits (void (*begin_func) (const struct ccase *, void *aux),
 
 /* Case callback used by procedure_with_splits(). */
 static bool
-split_procedure_case_func (struct ccase *c, void *split_aux_) 
+split_procedure_case_func (const struct ccase *c, void *split_aux_) 
 {
   struct split_aux_data *split_aux = split_aux_;
 
   /* Start a new series if needed. */
-  if (split_aux->case_count == 0
+  if (case_is_null (&split_aux->prev_case)
       || !equal_splits (c, &split_aux->prev_case))
     {
-      if (split_aux->case_count > 0 && split_aux->end_func != NULL)
+      if (!case_is_null (&split_aux->prev_case) && split_aux->end_func != NULL)
         split_aux->end_func (split_aux->func_aux);
 
       case_destroy (&split_aux->prev_case);
@@ -548,7 +533,6 @@ split_procedure_case_func (struct ccase *c, void *split_aux_)
        split_aux->begin_func (&split_aux->prev_case, split_aux->func_aux);
     }
 
-  split_aux->case_count++;
   return (split_aux->proc_func == NULL
           || split_aux->proc_func (c, split_aux->func_aux));
 }
@@ -559,7 +543,7 @@ split_procedure_end_func (void *split_aux_)
 {
   struct split_aux_data *split_aux = split_aux_;
 
-  if (split_aux->case_count > 0 && split_aux->end_func != NULL)
+  if (!case_is_null (&split_aux->prev_case) && split_aux->end_func != NULL)
     split_aux->end_func (split_aux->func_aux);
   return true;
 }
@@ -590,7 +574,7 @@ struct multipass_split_aux_data
     void *func_aux;                            /* Auxiliary data. */ 
   };
 
-static bool multipass_split_case_func (struct ccase *c, void *aux_);
+static bool multipass_split_case_func (const struct ccase *c, void *aux_);
 static bool multipass_split_end_func (void *aux_);
 static bool multipass_split_output (struct multipass_split_aux_data *);
 
@@ -618,7 +602,7 @@ multipass_procedure_with_splits (bool (*split_func) (const struct ccase *first,
 
 /* Case callback used by multipass_procedure_with_splits(). */
 static bool
-multipass_split_case_func (struct ccase *c, void *aux_)
+multipass_split_case_func (const struct ccase *c, void *aux_)
 {
   struct multipass_split_aux_data *aux = aux_;
   bool ok = true;
@@ -635,7 +619,7 @@ multipass_split_case_func (struct ccase *c, void *aux_)
         ok = multipass_split_output (aux);
 
       /* Start a new casefile. */
-      aux->casefile = casefile_create (dict_get_next_value_idx (default_dict));
+      aux->casefile = fastfile_create (dict_get_next_value_idx (default_dict));
     }
 
   return casefile_append (aux->casefile, c) && ok;
@@ -672,15 +656,10 @@ discard_variables (void)
 
   n_lag = 0;
   
-  free_case_source (vfm_source);
-  vfm_source = NULL;
+  free_case_source (proc_source);
+  proc_source = NULL;
 
   proc_cancel_all_transformations ();
-
-  expr_free (process_if_expr);
-  process_if_expr = NULL;
-
-  proc_cancel_temporary_transformations ();
 }
 \f
 /* Returns the current set of permanent transformations,
@@ -822,6 +801,7 @@ void
 proc_done (void)
 {
   discard_variables ();
+  dict_destroy (default_dict);
 }
 
 /* Sets SINK as the destination for procedure output from the
@@ -829,8 +809,8 @@ proc_done (void)
 void
 proc_set_sink (struct case_sink *sink) 
 {
-  assert (vfm_sink == NULL);
-  vfm_sink = sink;
+  assert (proc_sink == NULL);
+  proc_sink = sink;
 }
 
 /* Sets SOURCE as the source for procedure input for the next
@@ -838,8 +818,8 @@ proc_set_sink (struct case_sink *sink)
 void
 proc_set_source (struct case_source *source) 
 {
-  assert (vfm_source == NULL);
-  vfm_source = source;
+  assert (proc_source == NULL);
+  proc_source = source;
 }
 
 /* Returns true if a source for the next procedure has been
@@ -847,7 +827,7 @@ proc_set_source (struct case_source *source)
 bool
 proc_has_source (void) 
 {
-  return vfm_source != NULL;
+  return proc_source != NULL;
 }
 
 /* Returns the output from the previous procedure.
@@ -861,13 +841,13 @@ proc_capture_output (void)
 
   /* Try to make sure that this function is called immediately
      after procedure() or a similar function. */
-  assert (vfm_source != NULL);
-  assert (case_source_is_class (vfm_source, &storage_source_class));
+  assert (proc_source != NULL);
+  assert (case_source_is_class (proc_source, &storage_source_class));
   assert (trns_chain_is_empty (permanent_trns_chain));
   assert (!proc_in_temporary_transformations ());
 
-  casefile = storage_source_decapsulate (vfm_source);
-  vfm_source = NULL;
+  casefile = storage_source_decapsulate (proc_source);
+  proc_source = NULL;
 
   return casefile;
 }
@@ -895,7 +875,7 @@ add_case_limit_trns (void)
    *CASES_REMAINING. */
 static int
 case_limit_trns_proc (void *cases_remaining_,
-                      struct ccase *c UNUSED, int case_nr UNUSED) 
+                      struct ccase *c UNUSED, casenum_t case_nr UNUSED) 
 {
   size_t *cases_remaining = cases_remaining_;
   if (*cases_remaining > 0) 
@@ -934,7 +914,7 @@ add_filter_trns (void)
 /* FILTER transformation. */
 static int
 filter_trns_proc (void *filter_var_,
-                  struct ccase *c UNUSED, int case_nr UNUSED) 
+                  struct ccase *c UNUSED, casenum_t case_nr UNUSED) 
   
 {
   struct variable *filter_var = filter_var_;
@@ -942,40 +922,4 @@ filter_trns_proc (void *filter_var_,
   return (f != 0.0 && !mv_is_num_missing (&filter_var->miss, f)
           ? TRNS_CONTINUE : TRNS_DROP_CASE);
 }
-\f
-static trns_proc_func process_if_trns_proc;
-static trns_free_func process_if_trns_free;
-
-/* Adds a temporary transformation to filter data according to
-   the expression specified on PROCESS IF, if any. */
-static void
-add_process_if_trns (void) 
-{
-  if (process_if_expr != NULL) 
-    {
-      proc_start_temporary_transformations ();
-      add_transformation (process_if_trns_proc, process_if_trns_free,
-                          process_if_expr);
-      process_if_expr = NULL;
-    }
-}
-
-/* PROCESS IF transformation. */
-static int
-process_if_trns_proc (void *expression_,
-                      struct ccase *c UNUSED, int case_nr UNUSED) 
-  
-{
-  struct expression *expression = expression_;
-  return (expr_evaluate_num (expression, c, case_nr) == 1.0
-          ? TRNS_CONTINUE : TRNS_DROP_CASE);
-}
 
-/* Frees a PROCESS IF transformation. */
-static bool
-process_if_trns_free (void *expression_) 
-{
-  struct expression *expression = expression_;
-  expr_free (expression);
-  return true;
-}