#include <data/file-handle-def.h>
#include <data/settings.h>
#include <data/storage-stream.h>
+#include <data/transformations.h>
#include <data/value-labels.h>
#include <data/variable.h>
-#include <language/control/control-stack.h>
#include <libpspp/alloc.h>
#include <libpspp/message.h>
#include <libpspp/misc.h>
size_t cases_analyzed; /* Cases passed to procedure so far. */
};
-/* The current active file, from which cases are read. */
-struct case_source *vfm_source;
-
-/* The replacement active file, to which cases are written. */
-struct case_sink *vfm_sink;
-
-/* The compactor used to compact a compact, if necessary;
+/* Cases are read from vfm_source,
+ pass through permanent_trns_chain (which transforms them into
+ the format described by permanent_dict),
+ are written to vfm_sink,
+ pass through temporary_trns_chain (which transforms them into
+ the format described by default_dict),
+ and are finally passed to the procedure. */
+static struct case_source *vfm_source;
+static struct trns_chain *permanent_trns_chain;
+static struct dictionary *permanent_dict;
+static struct case_sink *vfm_sink;
+static struct trns_chain *temporary_trns_chain;
+struct dictionary *default_dict;
+
+/* The transformation chain that the next transformation will be
+ added to. */
+static struct trns_chain *cur_trns_chain;
+
+/* The compactor used to compact a case, if necessary;
otherwise a null pointer. */
static struct dict_compactor *compactor;
/* Time at which vfm was last invoked. */
static time_t last_vfm_invocation;
-/* Whether we're inside a procedure.
- For debugging purposes only. */
-static bool in_procedure;
-
/* Lag queue. */
int n_lag; /* Number of cases to lag. */
static int lag_count; /* Number of cases in lag_queue so far. */
static int lag_head; /* Index where next case will be added. */
static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
-/* Active transformations. */
-struct transformation *t_trns;
-size_t n_trns, m_trns, f_trns;
+static void add_case_limit_trns (void);
+static void add_filter_trns (void);
+static void add_process_if_trns (void);
static bool internal_procedure (bool (*proc_func) (struct ccase *, void *),
void *aux);
static void create_trns_case (struct ccase *, struct dictionary *);
static void open_active_file (void);
static bool write_case (struct write_case_data *wc_data);
-static int execute_transformations (struct ccase *c,
- struct transformation *trns,
- int first_idx, int last_idx,
- int case_num);
-static int filter_case (const struct ccase *c, int case_num);
static void lag_case (const struct ccase *c);
static void clear_case (struct ccase *c);
static bool close_active_file (void);
/* Reads the data from the input program and writes it to a new
active file. For each case we read from the input program, we
- do the following
+ do the following:
1. Execute permanent transformations. If these drop the case,
start the next case from step 1.
- 2. N OF CASES. If we have already written N cases, start the
- next case from step 1.
-
- 3. Write case to replacement active file.
+ 2. Write case to replacement active file.
- 4. Execute temporary transformations. If these drop the case,
+ 3. Execute temporary transformations. If these drop the case,
start the next case from step 1.
- 5. FILTER, PROCESS IF. If these drop the case, start the next
- case from step 1.
-
- 6. Post-TEMPORARY N OF CASES. If we have already analyzed N
- cases, start the next case from step 1.
-
- 7. Pass case to PROC_FUNC, passing AUX as auxiliary data.
+ 4. Pass case to PROC_FUNC, passing AUX as auxiliary data.
Returns true if successful, false if an I/O error occurred. */
bool
if (proc_func == NULL
&& case_source_is_class (vfm_source, &storage_source_class)
&& vfm_sink == NULL
- && !temporary
- && n_trns == 0)
+ && temporary_trns_chain == NULL
+ && trns_chain_is_empty (permanent_trns_chain))
{
- /* Nothing to do. */
+ expr_free (process_if_expr);
+ process_if_expr = NULL;
+ dict_set_case_limit (default_dict, 0);
+ dict_clear_vectors (default_dict);
+
update_last_vfm_invocation ();
return true;
}
open_active_file ();
ok = internal_procedure (proc_func, aux);
- if (!close_active_file ())
- ok = false;
+ ok = close_active_file () && ok;
+
+ return ok;
+ }
+}
+
+/* Callback function for multipass_procedure(). */
+static bool
+multipass_callback (struct ccase *c, void *cf_)
+{
+ struct casefile *cf = cf_;
+ return casefile_append (cf, c);
+}
+
+/* Procedure that allows multiple passes over the input data.
+ The entire active file is passed to PROC_FUNC, with the given
+ AUX as auxiliary data, as a unit. */
+bool
+multipass_procedure (bool (*proc_func) (const struct casefile *, void *aux),
+ void *aux)
+{
+ if (case_source_is_class (vfm_source, &storage_source_class)
+ && vfm_sink == NULL
+ && temporary_trns_chain == NULL
+ && trns_chain_is_empty (permanent_trns_chain))
+ {
+ proc_func (storage_source_get_casefile (vfm_source), aux);
+
+ expr_free (process_if_expr);
+ process_if_expr = NULL;
+ dict_set_case_limit (default_dict, 0);
+ dict_clear_vectors (default_dict);
+
+ update_last_vfm_invocation ();
+ return true;
+ }
+ else
+ {
+ struct casefile *cf;
+ bool ok;
+
+ assert (proc_func != NULL);
+
+ cf = casefile_create (dict_get_next_value_idx (default_dict));
+
+ open_active_file ();
+ ok = internal_procedure (multipass_callback, cf);
+ ok = proc_func (cf, aux) && ok;
+ ok = close_active_file () && ok;
+
+ casefile_destroy (cf);
return ok;
}
static void
open_active_file (void)
{
- assert (!in_procedure);
- in_procedure = true;
+ add_case_limit_trns ();
+ add_filter_trns ();
+ add_process_if_trns ();
- /* Make temp_dict refer to the dictionary right before data
- reaches the sink */
- if (!temporary)
- {
- temp_trns = n_trns;
- temp_dict = default_dict;
- }
+ /* Finalize transformations. */
+ trns_chain_finalize (cur_trns_chain);
+
+ /* Make permanent_dict refer to the dictionary right before
+ data reaches the sink. */
+ if (permanent_dict == NULL)
+ permanent_dict = default_dict;
/* Figure out compaction. */
- compactor = (dict_needs_compaction (temp_dict)
- ? dict_make_compactor (temp_dict)
+ compactor = (dict_needs_compaction (permanent_dict)
+ ? dict_make_compactor (permanent_dict)
: NULL);
/* Prepare sink. */
if (vfm_sink == NULL)
- vfm_sink = create_case_sink (&storage_sink_class, temp_dict, NULL);
+ vfm_sink = create_case_sink (&storage_sink_class, permanent_dict, NULL);
if (vfm_sink->class->open != NULL)
vfm_sink->class->open (vfm_sink);
for (i = 0; i < n_lag; i++)
case_nullify (&lag_queue[i]);
}
-
- /* Close any unclosed DO IF or LOOP constructs. */
- ctl_stack_clear ();
}
/* Transforms trns_case and writes it to the replacement active
static bool
write_case (struct write_case_data *wc_data)
{
- int retval;
+ enum trns_result retval;
+ size_t case_nr;
/* Execute permanent transformations. */
- retval = execute_transformations (&wc_data->trns_case, t_trns, f_trns,
- temp_trns, wc_data->cases_written + 1);
- if (retval != 1)
+ case_nr = wc_data->cases_written + 1;
+ retval = trns_chain_execute (permanent_trns_chain,
+ &wc_data->trns_case, &case_nr);
+ if (retval != TRNS_CONTINUE)
goto done;
- /* N OF CASES. */
- if (dict_get_case_limit (default_dict)
- && wc_data->cases_written >= dict_get_case_limit (default_dict))
- goto done;
- wc_data->cases_written++;
-
/* Write case to LAG queue. */
if (n_lag)
lag_case (&wc_data->trns_case);
/* Write case to replacement active file. */
+ wc_data->cases_written++;
if (vfm_sink->class->write != NULL)
{
if (compactor != NULL)
}
/* Execute temporary transformations. */
- retval = execute_transformations (&wc_data->trns_case, t_trns, temp_trns,
- n_trns, wc_data->cases_written);
- if (retval != 1)
- goto done;
-
- /* FILTER, PROCESS IF, post-TEMPORARY N OF CASES. */
- if (filter_case (&wc_data->trns_case, wc_data->cases_written)
- || (dict_get_case_limit (temp_dict)
- && wc_data->cases_analyzed >= dict_get_case_limit (temp_dict)))
- goto done;
- wc_data->cases_analyzed++;
+ if (temporary_trns_chain != NULL)
+ {
+ retval = trns_chain_execute (temporary_trns_chain,
+ &wc_data->trns_case,
+ &wc_data->cases_written);
+ if (retval != TRNS_CONTINUE)
+ goto done;
+ }
/* Pass case to procedure. */
+ wc_data->cases_analyzed++;
if (wc_data->proc_func != NULL)
if (!wc_data->proc_func (&wc_data->trns_case, wc_data->aux))
- retval = -1;
+ retval = TRNS_ERROR;
done:
clear_case (&wc_data->trns_case);
- return retval != -1;
-}
-
-/* Transforms case C using the transformations in TRNS[] with
- indexes FIRST_IDX through LAST_IDX, exclusive. Case C will
- become case CASE_NUM (1-based) in the output file. Returns 1
- if the case was successfully transformed, 0 if it was filtered
- out by one of the transformations, or -1 if the procedure
- should be abandoned due to a fatal error. */
-static int
-execute_transformations (struct ccase *c,
- struct transformation *trns,
- int first_idx, int last_idx,
- int case_num)
-{
- int idx;
-
- for (idx = first_idx; idx != last_idx; )
- {
- struct transformation *t = &trns[idx];
- int retval = t->proc (t->private, c, case_num);
- switch (retval)
- {
- case TRNS_CONTINUE:
- idx++;
- break;
-
- case TRNS_DROP_CASE:
- return 0;
-
- case TRNS_ERROR:
- return -1;
-
- case TRNS_NEXT_CASE:
- abort ();
-
- case TRNS_END_FILE:
- abort ();
-
- default:
- idx = retval;
- break;
- }
- }
-
- return 1;
-}
-
-/* Returns nonzero if case C with case number CASE_NUM should be
- excluded as specified on FILTER or PROCESS IF, otherwise
- zero. */
-static int
-filter_case (const struct ccase *c, int case_idx)
-{
- /* FILTER. */
- struct variable *filter_var = dict_get_filter (default_dict);
- if (filter_var != NULL)
- {
- double f = case_num (c, filter_var->fv);
- if (f == 0.0 || mv_is_num_missing (&filter_var->miss, f))
- return 1;
- }
-
- /* PROCESS IF. */
- if (process_if_expr != NULL
- && expr_evaluate_num (process_if_expr, c, case_idx) != 1.0)
- return 1;
-
- return 0;
+ return retval != TRNS_ERROR;
}
/* Add C to the lag queue. */
n_lag = 0;
}
- /* Dictionary from before TEMPORARY becomes permanent.. */
- if (temporary)
- {
- dict_destroy (default_dict);
- default_dict = temp_dict;
- temp_dict = NULL;
- }
+ /* Dictionary from before TEMPORARY becomes permanent. */
+ proc_cancel_temporary_transformations ();
/* Finish compaction. */
if (compactor != NULL)
/* Cancel TEMPORARY, PROCESS IF, FILTER, N OF CASES, vectors,
and get rid of all the transformations. */
- cancel_temporary ();
- expr_free (process_if_expr);
- process_if_expr = NULL;
- dict_set_case_limit (default_dict, 0);
dict_clear_vectors (default_dict);
-
- assert (in_procedure);
- in_procedure = false;
-
- return cancel_transformations ();
+ permanent_dict = NULL;
+ return proc_cancel_all_transformations ();
}
\f
/* Returns a pointer to the lagged case from N_BEFORE cases before the
else
return NULL;
}
-
-/* Appends TRNS to t_trns[], the list of all transformations to be
- performed on data as it is read from the active file. */
-void
-add_transformation (trns_proc_func *proc, trns_free_func *free, void *private)
-{
- struct transformation *trns;
-
- assert (!in_procedure);
-
- if (n_trns >= m_trns)
- t_trns = x2nrealloc (t_trns, &m_trns, sizeof *t_trns);
- trns = &t_trns[n_trns++];
- trns->proc = proc;
- trns->free = free;
- trns->private = private;
-}
-
-/* Returns the index number that the next transformation added by
- add_transformation() will receive. A trns_proc_func that
- returns this index causes control flow to jump to it. */
-size_t
-next_transformation (void)
-{
- return n_trns;
-}
-
-/* Cancels all active transformations, including any transformations
- created by the input program.
- Returns true if successful, false if an I/O error occurred. */
-bool
-cancel_transformations (void)
-{
- bool ok = true;
- size_t i;
- for (i = 0; i < n_trns; i++)
- {
- struct transformation *t = &t_trns[i];
- if (t->free != NULL)
- {
- if (!t->free (t->private))
- ok = false;
- }
- }
- n_trns = f_trns = 0;
- free (t_trns);
- t_trns = NULL;
- m_trns = 0;
- return ok;
-}
\f
/* Represents auxiliary data for handling SPLIT FILE. */
struct split_aux_data
};
static bool multipass_split_callback (struct ccase *c, void *aux_);
-static void multipass_split_output (struct multipass_split_aux_data *);
+static bool multipass_split_output (struct multipass_split_aux_data *);
/* Returns true if successful, false if an I/O error occurred. */
bool
ok = internal_procedure (multipass_split_callback, &aux);
if (aux.casefile != NULL)
- multipass_split_output (&aux);
+ ok = multipass_split_output (&aux) && ok;
case_destroy (&aux.prev_case);
if (!close_active_file ())
multipass_split_callback (struct ccase *c, void *aux_)
{
struct multipass_split_aux_data *aux = aux_;
+ bool ok = true;
/* Start a new series if needed. */
if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case))
{
/* Pass any cases to split_func. */
if (aux->casefile != NULL)
- multipass_split_output (aux);
+ ok = multipass_split_output (aux);
/* Start a new casefile. */
aux->casefile = casefile_create (dict_get_next_value_idx (default_dict));
case_clone (&aux->prev_case, c);
}
- return casefile_append (aux->casefile, c);
+ return casefile_append (aux->casefile, c) && ok;
}
-static void
+static bool
multipass_split_output (struct multipass_split_aux_data *aux)
{
+ bool ok;
+
assert (aux->casefile != NULL);
- aux->split_func (aux->casefile, aux->func_aux);
+ ok = aux->split_func (aux->casefile, aux->func_aux);
casefile_destroy (aux->casefile);
aux->casefile = NULL;
-}
+ return ok;
+}
/* Discards all the current state in preparation for a data-input
command like DATA LIST or GET. */
vfm_source = NULL;
}
- cancel_transformations ();
-
- ctl_stack_clear ();
+ proc_cancel_all_transformations ();
expr_free (process_if_expr);
process_if_expr = NULL;
- cancel_temporary ();
+ proc_cancel_temporary_transformations ();
+}
+\f
+/* Returns the current set of permanent transformations,
+ and clears the permanent transformations.
+ For use by INPUT PROGRAM. */
+struct trns_chain *
+proc_capture_transformations (void)
+{
+ struct trns_chain *chain;
+
+ assert (temporary_trns_chain == NULL);
+ chain = permanent_trns_chain;
+ cur_trns_chain = permanent_trns_chain = trns_chain_create ();
+ return chain;
+}
+
+/* Adds a transformation that processes a case with PROC and
+ frees itself with FREE to the current set of transformations.
+ The functions are passed AUX as auxiliary data. */
+void
+add_transformation (trns_proc_func *proc, trns_free_func *free, void *aux)
+{
+ trns_chain_append (cur_trns_chain, NULL, proc, free, aux);
+}
+
+/* Adds a transformation that processes a case with PROC and
+ frees itself with FREE to the current set of transformations.
+ When parsing of the block of transformations is complete,
+ FINALIZE will be called.
+ The functions are passed AUX as auxiliary data. */
+void
+add_transformation_with_finalizer (trns_finalize_func *finalize,
+ trns_proc_func *proc,
+ trns_free_func *free, void *aux)
+{
+ trns_chain_append (cur_trns_chain, finalize, proc, free, aux);
+}
+
+/* Returns the index of the next transformation.
+ This value can be returned by a transformation procedure
+ function to indicate a "jump" to that transformation. */
+size_t
+next_transformation (void)
+{
+ return trns_chain_next (cur_trns_chain);
+}
+
+/* Returns true if the next call to add_transformation() will add
+ a temporary transformation, false if it will add a permanent
+ transformation. */
+bool
+proc_in_temporary_transformations (void)
+{
+ return temporary_trns_chain != NULL;
+}
+
+/* Marks the start of temporary transformations.
+ Further calls to add_transformation() will add temporary
+ transformations. */
+void
+proc_start_temporary_transformations (void)
+{
+ if (!proc_in_temporary_transformations ())
+ {
+ add_case_limit_trns ();
+
+ permanent_dict = dict_clone (default_dict);
+ trns_chain_finalize (permanent_trns_chain);
+ temporary_trns_chain = cur_trns_chain = trns_chain_create ();
+ }
+}
+
+/* Converts all the temporary transformations, if any, to
+ permanent transformations. Further transformations will be
+ permanent.
+ Returns true if anything changed, false otherwise. */
+bool
+proc_make_temporary_transformations_permanent (void)
+{
+ if (proc_in_temporary_transformations ())
+ {
+ trns_chain_finalize (temporary_trns_chain);
+ trns_chain_splice (permanent_trns_chain, temporary_trns_chain);
+ temporary_trns_chain = NULL;
+
+ dict_destroy (permanent_dict);
+ permanent_dict = NULL;
+
+ return true;
+ }
+ else
+ return false;
+}
+
+/* Cancels all temporary transformations, if any. Further
+ transformations will be permanent.
+ Returns true if anything changed, false otherwise. */
+bool
+proc_cancel_temporary_transformations (void)
+{
+ if (proc_in_temporary_transformations ())
+ {
+ dict_destroy (default_dict);
+ default_dict = permanent_dict;
+ permanent_dict = NULL;
+
+ trns_chain_destroy (temporary_trns_chain);
+ temporary_trns_chain = NULL;
+
+ return true;
+ }
+ else
+ return false;
+}
+
+/* Cancels all transformations, if any.
+ Returns true if successful, false on I/O error. */
+bool
+proc_cancel_all_transformations (void)
+{
+ bool ok;
+ ok = trns_chain_destroy (permanent_trns_chain);
+ ok = trns_chain_destroy (temporary_trns_chain) && ok;
+ permanent_trns_chain = cur_trns_chain = trns_chain_create ();
+ temporary_trns_chain = NULL;
+ return ok;
+}
+\f
+/* Initializes procedure handling. */
+void
+proc_init (void)
+{
+ default_dict = dict_create ();
+ proc_cancel_all_transformations ();
+}
+
+/* Finishes up procedure handling. */
+void
+proc_done (void)
+{
+ discard_variables ();
+}
+
+/* Sets SINK as the destination for procedure output from the
+ next procedure. */
+void
+proc_set_sink (struct case_sink *sink)
+{
+ assert (vfm_sink == NULL);
+ vfm_sink = sink;
+}
+
+/* Sets SOURCE as the source for procedure input for the next
+ procedure. */
+void
+proc_set_source (struct case_source *source)
+{
+ assert (vfm_source == NULL);
+ vfm_source = source;
+}
+
+/* Returns true if a source for the next procedure has been
+ configured, false otherwise. */
+bool
+proc_has_source (void)
+{
+ return vfm_source != NULL;
+}
+
+/* Returns the output from the previous procedure.
+ For use only immediately after executing a procedure.
+ The returned casefile is owned by the caller; it will not be
+ automatically used for the next procedure's input. */
+struct casefile *
+proc_capture_output (void)
+{
+ struct casefile *casefile;
+
+ /* Try to make sure that this function is called immediately
+ after procedure() or a similar function. */
+ assert (vfm_source != NULL);
+ assert (case_source_is_class (vfm_source, &storage_source_class));
+ assert (trns_chain_is_empty (permanent_trns_chain));
+ assert (!proc_in_temporary_transformations ());
+
+ casefile = storage_source_decapsulate (vfm_source);
+ vfm_source = NULL;
+
+ return casefile;
+}
+\f
+static trns_proc_func case_limit_trns_proc;
+static trns_free_func case_limit_trns_free;
+
+/* Adds a transformation that limits the number of cases that may
+ pass through, if default_dict has a case limit. */
+static void
+add_case_limit_trns (void)
+{
+ size_t case_limit = dict_get_case_limit (default_dict);
+ if (case_limit != 0)
+ {
+ size_t *cases_remaining = xmalloc (sizeof *cases_remaining);
+ *cases_remaining = case_limit;
+ add_transformation (case_limit_trns_proc, case_limit_trns_free,
+ cases_remaining);
+ dict_set_case_limit (default_dict, 0);
+ }
+}
+
+/* Limits the maximum number of cases processed to
+ *CASES_REMAINING. */
+static int
+case_limit_trns_proc (void *cases_remaining_,
+ struct ccase *c UNUSED, int case_nr UNUSED)
+{
+ size_t *cases_remaining = cases_remaining_;
+ if (*cases_remaining > 0)
+ {
+ *cases_remaining--;
+ return TRNS_CONTINUE;
+ }
+ else
+ return TRNS_DROP_CASE;
+}
+
+/* Frees the data associated with a case limit transformation. */
+static bool
+case_limit_trns_free (void *cases_remaining_)
+{
+ size_t *cases_remaining = cases_remaining_;
+ free (cases_remaining);
+ return true;
+}
+\f
+static trns_proc_func filter_trns_proc;
+
+/* Adds a temporary transformation to filter data according to
+ the variable specified on FILTER, if any. */
+static void
+add_filter_trns (void)
+{
+ struct variable *filter_var = dict_get_filter (default_dict);
+ if (filter_var != NULL)
+ {
+ proc_start_temporary_transformations ();
+ add_transformation (filter_trns_proc, NULL, filter_var);
+ }
+}
+
+/* FILTER transformation. */
+static int
+filter_trns_proc (void *filter_var_,
+ struct ccase *c UNUSED, int case_nr UNUSED)
+
+{
+ struct variable *filter_var = filter_var_;
+ double f = case_num (c, filter_var->fv);
+ return (f != 0.0 && !mv_is_num_missing (&filter_var->miss, f)
+ ? TRNS_CONTINUE : TRNS_DROP_CASE);
+}
+\f
+static trns_proc_func process_if_trns_proc;
+static trns_free_func process_if_trns_free;
+
+/* Adds a temporary transformation to filter data according to
+ the expression specified on PROCESS IF, if any. */
+static void
+add_process_if_trns (void)
+{
+ if (process_if_expr != NULL)
+ {
+ proc_start_temporary_transformations ();
+ add_transformation (process_if_trns_proc, process_if_trns_free,
+ process_if_expr);
+ process_if_expr = NULL;
+ }
+}
+
+/* PROCESS IF transformation. */
+static int
+process_if_trns_proc (void *expression_,
+ struct ccase *c UNUSED, int case_nr UNUSED)
+
+{
+ struct expression *expression = expression_;
+ return (expr_evaluate_num (expression, c, case_nr) == 1.0
+ ? TRNS_CONTINUE : TRNS_DROP_CASE);
+}
+
+/* Frees a PROCESS IF transformation. */
+static bool
+process_if_trns_free (void *expression_)
+{
+ struct expression *expression = expression_;
+ expr_free (expression);
+ return true;
}