-/* PSPP - computes sample statistics.
- Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
- Written by Ben Pfaff <blp@gnu.org>.
+/* PSPP - a program for statistical analysis.
+ Copyright (C) 1997-9, 2000, 2006, 2007 Free Software Foundation, Inc.
- This program is free software; you can redistribute it and/or
- modify it under the terms of the GNU General Public License as
- published by the Free Software Foundation; either version 2 of the
- License, or (at your option) any later version.
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
- This program is distributed in the hope that it will be useful, but
- WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- General Public License for more details.
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
You should have received a copy of the GNU General Public License
- along with this program; if not, write to the Free Software
- Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- 02110-1301, USA. */
+ along with this program. If not, see <http://www.gnu.org/licenses/>. */
#include <config.h>
#include <stdlib.h>
#include <unistd.h>
-#include <data/case-source.h>
-#include <data/case-sink.h>
#include <data/case.h>
-#include <data/casefile.h>
-#include <data/fastfile.h>
+#include <data/case-map.h>
+#include <data/caseinit.h>
+#include <data/casereader.h>
+#include <data/casereader-provider.h>
+#include <data/casewriter.h>
#include <data/dictionary.h>
#include <data/file-handle-def.h>
#include <data/procedure.h>
-#include <data/storage-stream.h>
#include <data/transformations.h>
#include <data/variable.h>
-#include <libpspp/alloc.h>
+#include <libpspp/deque.h>
#include <libpspp/misc.h>
#include <libpspp/str.h>
+#include <libpspp/taint.h>
-/* Procedure execution data. */
-struct write_case_data
- {
- /* Function to call for each case. */
- case_func *proc;
- void *aux;
-
- struct dataset *dataset; /* The dataset concerned */
- struct ccase trns_case; /* Case used for transformations. */
- struct ccase sink_case; /* Case written to sink, if
- compacting is necessary. */
- size_t cases_written; /* Cases output so far. */
- };
+#include "xalloc.h"
struct dataset {
- /* Cases are read from proc_source,
+ /* Cases are read from source,
+ their transformation variables are initialized,
pass through permanent_trns_chain (which transforms them into
the format described by permanent_dict),
- are written to proc_sink,
+ are written to sink,
pass through temporary_trns_chain (which transforms them into
the format described by dict),
and are finally passed to the procedure. */
- struct case_source *proc_source;
+ struct casereader *source;
+ struct caseinit *caseinit;
struct trns_chain *permanent_trns_chain;
struct dictionary *permanent_dict;
- struct case_sink *proc_sink;
+ struct casewriter *sink;
struct trns_chain *temporary_trns_chain;
struct dictionary *dict;
+ /* Callback which occurs whenever the transformation chain(s) have
+ been modified */
+ transformation_change_callback_func *xform_callback;
+ void *xform_callback_aux;
+
+ /* If true, cases are discarded instead of being written to
+ sink. */
+ bool discard_output;
+
/* The transformation chain that the next transformation will be
added to. */
struct trns_chain *cur_trns_chain;
- /* The compactor used to compact a case, if necessary;
+ /* The case map used to compact a case, if necessary;
otherwise a null pointer. */
- struct dict_compactor *compactor;
+ struct case_map *compactor;
/* Time at which proc was last invoked. */
time_t last_proc_invocation;
- /* Lag queue. */
+ /* Cases just before ("lagging") the current one. */
int n_lag; /* Number of cases to lag. */
- int lag_count; /* Number of cases in lag_queue so far. */
- int lag_head; /* Index where next case will be added. */
- struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
+ struct deque lag; /* Deque of lagged cases. */
+ struct ccase *lag_cases; /* Lagged cases managed by deque. */
+ /* Procedure data. */
+ enum
+ {
+ PROC_COMMITTED, /* No procedure in progress. */
+ PROC_OPEN, /* proc_open called, casereader still open. */
+ PROC_CLOSED /* casereader from proc_open destroyed,
+ but proc_commit not yet called. */
+ }
+ proc_state;
+ casenumber cases_written; /* Cases output so far. */
+ bool ok; /* Error status. */
}; /* struct dataset */
static void add_case_limit_trns (struct dataset *ds);
static void add_filter_trns (struct dataset *ds);
-static bool internal_procedure (struct dataset *ds, case_func *,
- end_func *,
- void *aux);
static void update_last_proc_invocation (struct dataset *ds);
-static void create_trns_case (struct ccase *, struct dictionary *);
-static void open_active_file (struct dataset *ds);
-static bool write_case (struct write_case_data *wc_data);
-static void lag_case (struct dataset *ds, const struct ccase *c);
-static void clear_case (const struct dataset *ds, struct ccase *c);
-static bool close_active_file (struct dataset *ds);
\f
/* Public functions. */
/* Returns the last time the data was read. */
time_t
-time_of_last_procedure (struct dataset *ds)
+time_of_last_procedure (struct dataset *ds)
{
if (ds->last_proc_invocation == 0)
update_last_proc_invocation (ds);
\f
/* Regular procedure. */
-
-
-/* Reads the data from the input program and writes it to a new
- active file. For each case we read from the input program, we
- do the following:
-
- 1. Execute permanent transformations. If these drop the case,
- start the next case from step 1.
-
- 2. Write case to replacement active file.
-
- 3. Execute temporary transformations. If these drop the case,
- start the next case from step 1.
-
- 4. Pass case to PROC_FUNC, passing AUX as auxiliary data.
-
- Returns true if successful, false if an I/O error occurred. */
+/* Executes any pending transformations, if necessary.
+ This is not identical to the EXECUTE command in that it won't
+ always read the source data. This can be important when the
+ source data is given inline within BEGIN DATA...END FILE. */
bool
-procedure (struct dataset *ds, case_func *cf, void *aux)
+proc_execute (struct dataset *ds)
{
- return internal_procedure (ds, cf, NULL, aux);
-}
-\f
-/* Multipass procedure. */
-
-struct multipass_aux_data
- {
- struct casefile *casefile;
-
- bool (*proc_func) (const struct casefile *, void *aux);
- void *aux;
- };
-
-/* Case processing function for multipass_procedure(). */
-static bool
-multipass_case_func (const struct ccase *c, void *aux_data_, const struct dataset *ds UNUSED)
-{
- struct multipass_aux_data *aux_data = aux_data_;
- return casefile_append (aux_data->casefile, c);
-}
-
-/* End-of-file function for multipass_procedure(). */
-static bool
-multipass_end_func (void *aux_data_, const struct dataset *ds UNUSED)
-{
- struct multipass_aux_data *aux_data = aux_data_;
- return (aux_data->proc_func == NULL
- || aux_data->proc_func (aux_data->casefile, aux_data->aux));
-}
-
-/* Procedure that allows multiple passes over the input data.
- The entire active file is passed to PROC_FUNC, with the given
- AUX as auxiliary data, as a unit. */
-bool
-multipass_procedure (struct dataset *ds, casefile_func *proc_func, void *aux)
-{
- struct multipass_aux_data aux_data;
bool ok;
- aux_data.casefile = fastfile_create (dict_get_next_value_idx (ds->dict));
- aux_data.proc_func = proc_func;
- aux_data.aux = aux;
-
- ok = internal_procedure (ds, multipass_case_func, multipass_end_func, &aux_data);
- ok = !casefile_error (aux_data.casefile) && ok;
-
- casefile_destroy (aux_data.casefile);
-
- return ok;
-}
-\f
-/* Procedure implementation. */
-
-
-/* Executes a procedure.
- Passes each case to CASE_FUNC.
- Calls END_FUNC after the last case.
- Returns true if successful, false if an I/O error occurred (or
- if CASE_FUNC or END_FUNC ever returned false). */
-static bool
-internal_procedure (struct dataset *ds, case_func *proc,
- end_func *end,
- void *aux)
-{
- struct write_case_data wc_data;
- bool ok = true;
-
- assert (ds->proc_source != NULL);
-
- update_last_proc_invocation (ds);
-
- /* Optimize the trivial case where we're not going to do
- anything with the data, by not reading the data at all. */
- if (proc == NULL && end == NULL
- && case_source_is_class (ds->proc_source, &storage_source_class)
- && ds->proc_sink == NULL
- && (ds->temporary_trns_chain == NULL
- || trns_chain_is_empty (ds->temporary_trns_chain))
+ if ((ds->temporary_trns_chain == NULL
+ || trns_chain_is_empty (ds->temporary_trns_chain))
&& trns_chain_is_empty (ds->permanent_trns_chain))
{
ds->n_lag = 0;
+ ds->discard_output = false;
dict_set_case_limit (ds->dict, 0);
dict_clear_vectors (ds->dict);
return true;
}
-
- open_active_file (ds);
-
- wc_data.proc = proc;
- wc_data.aux = aux;
- wc_data.dataset = ds;
- create_trns_case (&wc_data.trns_case, ds->dict);
- case_create (&wc_data.sink_case,
- dict_get_compacted_value_cnt (ds->dict));
- wc_data.cases_written = 0;
-
- ok = ds->proc_source->class->read (ds->proc_source,
- &wc_data.trns_case,
- write_case, &wc_data) && ok;
- if (end != NULL)
- ok = end (aux, ds) && ok;
-
- case_destroy (&wc_data.sink_case);
- case_destroy (&wc_data.trns_case);
-
- ok = close_active_file (ds) && ok;
- return ok;
+ ok = casereader_destroy (proc_open (ds));
+ return proc_commit (ds) && ok;
}
-/* Updates last_proc_invocation. */
-static void
-update_last_proc_invocation (struct dataset *ds)
-{
- ds->last_proc_invocation = time (NULL);
-}
+static const struct casereader_class proc_casereader_class;
-/* Creates and returns a case, initializing it from the vectors
- that say which `value's need to be initialized just once, and
- which ones need to be re-initialized before every case. */
-static void
-create_trns_case (struct ccase *trns_case, struct dictionary *dict)
+/* Opens dataset DS for reading cases with proc_read.
+ proc_commit must be called when done. */
+struct casereader *
+proc_open (struct dataset *ds)
{
- size_t var_cnt = dict_get_var_cnt (dict);
- size_t i;
+ assert (ds->source != NULL);
+ assert (ds->proc_state == PROC_COMMITTED);
- case_create (trns_case, dict_get_next_value_idx (dict));
- for (i = 0; i < var_cnt; i++)
- {
- struct variable *v = dict_get_var (dict, i);
- union value *value = case_data_rw (trns_case, v);
+ update_last_proc_invocation (ds);
- if (var_is_numeric (v))
- value->f = var_get_leave (v) ? 0.0 : SYSMIS;
- else
- memset (value->s, ' ', var_get_width (v));
- }
-}
+ caseinit_mark_for_init (ds->caseinit, ds->dict);
-/* Makes all preparations for reading from the data source and writing
- to the data sink. */
-static void
-open_active_file (struct dataset *ds)
-{
+ /* Finish up the collection of transformations. */
add_case_limit_trns (ds);
add_filter_trns (ds);
-
- /* Finalize transformations. */
trns_chain_finalize (ds->cur_trns_chain);
/* Make permanent_dict refer to the dictionary right before
if (ds->permanent_dict == NULL)
ds->permanent_dict = ds->dict;
- /* Figure out whether to compact. */
- ds->compactor =
- (dict_compacting_would_shrink (ds->permanent_dict)
- ? dict_make_compactor (ds->permanent_dict)
- : NULL);
-
/* Prepare sink. */
- if (ds->proc_sink == NULL)
- ds->proc_sink = create_case_sink (&storage_sink_class, ds->permanent_dict, NULL);
- if (ds->proc_sink->class->open != NULL)
- ds->proc_sink->class->open (ds->proc_sink);
-
- /* Allocate memory for lag queue. */
- if (ds->n_lag > 0)
+ if (!ds->discard_output)
{
- int i;
-
- ds->lag_count = 0;
- ds->lag_head = 0;
- ds->lag_queue = xnmalloc (ds->n_lag, sizeof *ds->lag_queue);
- for (i = 0; i < ds->n_lag; i++)
- case_nullify (&ds->lag_queue[i]);
+ struct dictionary *pd = ds->permanent_dict;
+ size_t compacted_value_cnt = dict_count_values (pd, 1u << DC_SCRATCH);
+ bool should_compact = compacted_value_cnt < dict_get_next_value_idx (pd);
+ ds->compactor = (should_compact
+ ? case_map_to_compact_dict (pd, 1u << DC_SCRATCH)
+ : NULL);
+ ds->sink = autopaging_writer_create (compacted_value_cnt);
}
-}
-
-/* Transforms trns_case and writes it to the replacement active
- file if advisable. Returns true if more cases can be
- accepted, false otherwise. Do not call this function again
- after it has returned false once. */
-static bool
-write_case (struct write_case_data *wc_data)
-{
- enum trns_result retval;
- size_t case_nr;
-
- struct dataset *ds = wc_data->dataset;
-
- /* Execute permanent transformations. */
- case_nr = wc_data->cases_written + 1;
- retval = trns_chain_execute (ds->permanent_trns_chain,
- &wc_data->trns_case, &case_nr);
- if (retval != TRNS_CONTINUE)
- goto done;
-
- /* Write case to LAG queue. */
- if (ds->n_lag)
- lag_case (ds, &wc_data->trns_case);
-
- /* Write case to replacement active file. */
- wc_data->cases_written++;
- if (ds->proc_sink->class->write != NULL)
- {
- if (ds->compactor != NULL)
- {
- dict_compactor_compact (ds->compactor, &wc_data->sink_case,
- &wc_data->trns_case);
- ds->proc_sink->class->write (ds->proc_sink, &wc_data->sink_case);
- }
- else
- ds->proc_sink->class->write (ds->proc_sink, &wc_data->trns_case);
- }
-
- /* Execute temporary transformations. */
- if (ds->temporary_trns_chain != NULL)
- {
- retval = trns_chain_execute (ds->temporary_trns_chain,
- &wc_data->trns_case,
- &wc_data->cases_written);
- if (retval != TRNS_CONTINUE)
- goto done;
- }
-
- /* Pass case to procedure. */
- if (wc_data->proc != NULL)
- if (!wc_data->proc (&wc_data->trns_case, wc_data->aux, ds))
- retval = TRNS_ERROR;
-
- done:
- clear_case (ds, &wc_data->trns_case);
- return retval != TRNS_ERROR;
-}
-
-/* Add C to the lag queue. */
-static void
-lag_case (struct dataset *ds, const struct ccase *c)
-{
- if (ds->lag_count < ds->n_lag)
- ds->lag_count++;
- case_destroy (&ds->lag_queue[ds->lag_head]);
- case_clone (&ds->lag_queue[ds->lag_head], c);
- if (++ds->lag_head >= ds->n_lag)
- ds->lag_head = 0;
-}
-
-/* Clears the variables in C that need to be cleared between
- processing cases. */
-static void
-clear_case (const struct dataset *ds, struct ccase *c)
-{
- size_t var_cnt = dict_get_var_cnt (ds->dict);
- size_t i;
-
- for (i = 0; i < var_cnt; i++)
- {
- struct variable *v = dict_get_var (ds->dict, i);
- if (!var_get_leave (v))
- {
- if (var_is_numeric (v))
- case_data_rw (c, v)->f = SYSMIS;
- else
- memset (case_data_rw (c, v)->s, ' ', var_get_width (v));
- }
- }
-}
-
-/* Closes the active file. */
-static bool
-close_active_file (struct dataset *ds)
-{
- /* Free memory for lag queue, and turn off lagging. */
- if (ds->n_lag > 0)
- {
- int i;
-
- for (i = 0; i < ds->n_lag; i++)
- case_destroy (&ds->lag_queue[i]);
- free (ds->lag_queue);
- ds->n_lag = 0;
- }
-
- /* Dictionary from before TEMPORARY becomes permanent. */
- proc_cancel_temporary_transformations (ds);
-
- /* Finish compacting. */
- if (ds->compactor != NULL)
+ else
{
- dict_compactor_destroy (ds->compactor);
- dict_compact_values (ds->dict);
ds->compactor = NULL;
+ ds->sink = NULL;
}
-
- /* Free data source. */
- free_case_source (ds->proc_source);
- ds->proc_source = NULL;
- /* Old data sink becomes new data source. */
- if (ds->proc_sink->class->make_source != NULL)
- ds->proc_source = ds->proc_sink->class->make_source (ds->proc_sink);
- free_case_sink (ds->proc_sink);
- ds->proc_sink = NULL;
+ /* Allocate memory for lagged cases. */
+ ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases);
- dict_clear_vectors (ds->dict);
- ds->permanent_dict = NULL;
- return proc_cancel_all_transformations (ds);
-}
-\f
-/* Returns a pointer to the lagged case from N_BEFORE cases before the
- current one, or NULL if there haven't been that many cases yet. */
-struct ccase *
-lagged_case (const struct dataset *ds, int n_before)
-{
- assert (n_before >= 1 );
- assert (n_before <= ds->n_lag);
+ ds->proc_state = PROC_OPEN;
+ ds->cases_written = 0;
+ ds->ok = true;
- if (n_before <= ds->lag_count)
- {
- int index = ds->lag_head - n_before;
- if (index < 0)
- index += ds->n_lag;
- return &ds->lag_queue[index];
- }
- else
- return NULL;
+ /* FIXME: use taint in dataset in place of `ok'? */
+ /* FIXME: for trivial cases we can just return a clone of
+ ds->source? */
+ return casereader_create_sequential (NULL,
+ dict_get_next_value_idx (ds->dict),
+ CASENUMBER_MAX,
+ &proc_casereader_class, ds);
}
-\f
-/* Procedure that separates the data into SPLIT FILE groups. */
-
-/* Represents auxiliary data for handling SPLIT FILE. */
-struct split_aux_data
- {
- struct dataset *dataset; /* The dataset */
- struct ccase prev_case; /* Data in previous case. */
-
- /* Callback functions. */
- begin_func *begin;
- case_func *proc;
- end_func *end;
- void *func_aux;
- };
-static int equal_splits (const struct ccase *, const struct ccase *, const struct dataset *ds);
-static bool split_procedure_case_func (const struct ccase *c, void *, const struct dataset *);
-static bool split_procedure_end_func (void *, const struct dataset *);
-
-/* Like procedure(), but it automatically breaks the case stream
- into SPLIT FILE break groups. Before each group of cases with
- identical SPLIT FILE variable values, BEGIN_FUNC is called
- with the first case in the group.
- Then PROC_FUNC is called for each case in the group (including
- the first).
- END_FUNC is called when the group is finished. FUNC_AUX is
- passed to each of the functions as auxiliary data.
-
- If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
- and END_FUNC will be called at all.
-
- If SPLIT FILE is not in effect, then there is one break group
- (if the active file is nonempty), and BEGIN_FUNC and END_FUNC
- will be called once.
-
- Returns true if successful, false if an I/O error occurred. */
+/* Returns true if a procedure is in progress, that is, if
+ proc_open has been called but proc_commit has not. */
bool
-procedure_with_splits (struct dataset *ds,
- begin_func begin,
- case_func *proc,
- end_func *end,
- void *func_aux)
+proc_is_open (const struct dataset *ds)
{
- struct split_aux_data split_aux;
- bool ok;
-
- case_nullify (&split_aux.prev_case);
- split_aux.begin = begin;
- split_aux.proc = proc;
- split_aux.end = end;
- split_aux.func_aux = func_aux;
- split_aux.dataset = ds;
-
- ok = internal_procedure (ds, split_procedure_case_func,
- split_procedure_end_func, &split_aux);
-
- case_destroy (&split_aux.prev_case);
-
- return ok;
+ return ds->proc_state != PROC_COMMITTED;
}
-/* Case callback used by procedure_with_splits(). */
+/* "read" function for procedure casereader. */
static bool
-split_procedure_case_func (const struct ccase *c, void *split_aux_, const struct dataset *ds)
+proc_casereader_read (struct casereader *reader UNUSED, void *ds_,
+ struct ccase *c)
{
- struct split_aux_data *split_aux = split_aux_;
+ struct dataset *ds = ds_;
+ enum trns_result retval = TRNS_DROP_CASE;
- /* Start a new series if needed. */
- if (case_is_null (&split_aux->prev_case)
- || !equal_splits (c, &split_aux->prev_case, split_aux->dataset))
+ assert (ds->proc_state == PROC_OPEN);
+ for (;;)
{
- if (!case_is_null (&split_aux->prev_case) && split_aux->end != NULL)
- split_aux->end (split_aux->func_aux, ds);
-
- case_destroy (&split_aux->prev_case);
- case_clone (&split_aux->prev_case, c);
+ casenumber case_nr;
+
+ assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
+ if (retval == TRNS_ERROR)
+ ds->ok = false;
+ if (!ds->ok)
+ return false;
+
+ /* Read a case from source. */
+ if (!casereader_read (ds->source, c))
+ return false;
+ case_resize (c, dict_get_next_value_idx (ds->dict));
+ caseinit_init_vars (ds->caseinit, c);
+
+ /* Execute permanent transformations. */
+ case_nr = ds->cases_written + 1;
+ retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
+ c, case_nr);
+ caseinit_update_left_vars (ds->caseinit, c);
+ if (retval != TRNS_CONTINUE)
+ {
+ case_destroy (c);
+ continue;
+ }
- if (split_aux->begin != NULL)
- split_aux->begin (&split_aux->prev_case, split_aux->func_aux, ds);
- }
+ /* Write case to collection of lagged cases. */
+ if (ds->n_lag > 0)
+ {
+ while (deque_count (&ds->lag) >= ds->n_lag)
+ case_destroy (&ds->lag_cases[deque_pop_back (&ds->lag)]);
+ case_clone (&ds->lag_cases[deque_push_front (&ds->lag)], c);
+ }
- return (split_aux->proc == NULL
- || split_aux->proc (c, split_aux->func_aux, ds));
-}
+ /* Write case to replacement active file. */
+ ds->cases_written++;
+ if (ds->sink != NULL)
+ {
+ struct ccase tmp;
+ if (ds->compactor != NULL)
+ case_map_execute (ds->compactor, c, &tmp);
+ else
+ case_clone (&tmp, c);
+ casewriter_write (ds->sink, &tmp);
+ }
-/* End-of-file callback used by procedure_with_splits(). */
-static bool
-split_procedure_end_func (void *split_aux_, const struct dataset *ds)
-{
- struct split_aux_data *split_aux = split_aux_;
+ /* Execute temporary transformations. */
+ if (ds->temporary_trns_chain != NULL)
+ {
+ retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
+ c, ds->cases_written);
+ if (retval != TRNS_CONTINUE)
+ {
+ case_destroy (c);
+ continue;
+ }
+ }
- if (!case_is_null (&split_aux->prev_case) && split_aux->end != NULL)
- split_aux->end (split_aux->func_aux, ds);
- return true;
+ return true;
+ }
}
-/* Compares the SPLIT FILE variables in cases A and B and returns
- nonzero only if they differ. */
-static int
-equal_splits (const struct ccase *a, const struct ccase *b,
- const struct dataset *ds)
+/* "destroy" function for procedure casereader. */
+static void
+proc_casereader_destroy (struct casereader *reader, void *ds_)
{
- return case_compare (a, b,
- dict_get_split_vars (ds->dict),
- dict_get_split_cnt (ds->dict)) == 0;
-}
-\f
-/* Multipass procedure that separates the data into SPLIT FILE
- groups. */
+ struct dataset *ds = ds_;
+ struct ccase c;
-/* Represents auxiliary data for handling SPLIT FILE in a
- multipass procedure. */
-struct multipass_split_aux_data
- {
- struct dataset *dataset; /* The dataset of the split */
- struct ccase prev_case; /* Data in previous case. */
- struct casefile *casefile; /* Accumulates data for a split. */
- split_func *split; /* Function to call with the accumulated
- data. */
- void *func_aux; /* Auxiliary data. */
- };
+ /* Make sure transformations happen for every input case, in
+ case they have side effects, and ensure that the replacement
+ active file gets all the cases it should. */
+ while (casereader_read (reader, &c))
+ case_destroy (&c);
-static bool multipass_split_case_func (const struct ccase *c, void *aux_, const struct dataset *);
-static bool multipass_split_end_func (void *aux_, const struct dataset *ds);
-static bool multipass_split_output (struct multipass_split_aux_data *, const struct dataset *ds);
+ ds->proc_state = PROC_CLOSED;
+ ds->ok = casereader_destroy (ds->source) && ds->ok;
+ ds->source = NULL;
+ proc_set_active_file_data (ds, NULL);
+}
-/* Returns true if successful, false if an I/O error occurred. */
+/* Must return false if the source casereader, a transformation,
+ or the sink casewriter signaled an error. (If a temporary
+ transformation signals an error, then the return value is
+ false, but the replacement active file may still be
+ untainted.) */
bool
-multipass_procedure_with_splits (struct dataset *ds,
- split_func *split,
- void *func_aux)
+proc_commit (struct dataset *ds)
{
- struct multipass_split_aux_data aux;
- bool ok;
-
- case_nullify (&aux.prev_case);
- aux.casefile = NULL;
- aux.split = split;
- aux.func_aux = func_aux;
- aux.dataset = ds;
+ assert (ds->proc_state == PROC_CLOSED);
+ ds->proc_state = PROC_COMMITTED;
- ok = internal_procedure (ds, multipass_split_case_func,
- multipass_split_end_func, &aux);
- case_destroy (&aux.prev_case);
+ /* Free memory for lagged cases. */
+ while (!deque_is_empty (&ds->lag))
+ case_destroy (&ds->lag_cases[deque_pop_back (&ds->lag)]);
+ free (ds->lag_cases);
- return ok;
-}
-
-/* Case callback used by multipass_procedure_with_splits(). */
-static bool
-multipass_split_case_func (const struct ccase *c, void *aux_, const struct dataset *ds)
-{
- struct multipass_split_aux_data *aux = aux_;
- bool ok = true;
+ /* Dictionary from before TEMPORARY becomes permanent. */
+ proc_cancel_temporary_transformations (ds);
- /* Start a new series if needed. */
- if (aux->casefile == NULL || ! equal_splits (c, &aux->prev_case, ds))
+ if (!ds->discard_output)
{
- /* Record split values. */
- case_destroy (&aux->prev_case);
- case_clone (&aux->prev_case, c);
+ /* Finish compacting. */
+ if (ds->compactor != NULL)
+ {
+ case_map_destroy (ds->compactor);
+ ds->compactor = NULL;
- /* Pass any cases to split_func. */
- if (aux->casefile != NULL)
- ok = multipass_split_output (aux, ds);
+ dict_delete_scratch_vars (ds->dict);
+ dict_compact_values (ds->dict);
+ }
- /* Start a new casefile. */
- aux->casefile =
- fastfile_create (dict_get_next_value_idx (ds->dict));
+ /* Old data sink becomes new data source. */
+ if (ds->sink != NULL)
+ ds->source = casewriter_make_reader (ds->sink);
+ }
+ else
+ {
+ ds->source = NULL;
+ ds->discard_output = false;
}
+ ds->sink = NULL;
- return casefile_append (aux->casefile, c) && ok;
-}
+ caseinit_clear (ds->caseinit);
+ caseinit_mark_as_preinited (ds->caseinit, ds->dict);
-/* End-of-file callback used by multipass_procedure_with_splits(). */
-static bool
-multipass_split_end_func (void *aux_, const struct dataset *ds)
-{
- struct multipass_split_aux_data *aux = aux_;
- return (aux->casefile == NULL || multipass_split_output (aux, ds));
+ dict_clear_vectors (ds->dict);
+ ds->permanent_dict = NULL;
+ return proc_cancel_all_transformations (ds) && ds->ok;
}
-static bool
-multipass_split_output (struct multipass_split_aux_data *aux, const struct dataset *ds)
-{
- bool ok;
-
- assert (aux->casefile != NULL);
- ok = aux->split (&aux->prev_case, aux->casefile, aux->func_aux, ds);
- casefile_destroy (aux->casefile);
- aux->casefile = NULL;
+/* Casereader class for procedure execution. */
+static const struct casereader_class proc_casereader_class =
+ {
+ proc_casereader_read,
+ proc_casereader_destroy,
+ NULL,
+ NULL,
+ };
- return ok;
+/* Updates last_proc_invocation. */
+static void
+update_last_proc_invocation (struct dataset *ds)
+{
+ ds->last_proc_invocation = time (NULL);
}
\f
-/* Discards all the current state in preparation for a data-input
- command like DATA LIST or GET. */
-void
-discard_variables (struct dataset *ds)
+/* Returns a pointer to the lagged case from N_BEFORE cases before the
+ current one, or NULL if there haven't been that many cases yet. */
+struct ccase *
+lagged_case (const struct dataset *ds, int n_before)
{
- dict_clear (ds->dict);
- fh_set_default_handle (NULL);
-
- ds->n_lag = 0;
-
- free_case_source (ds->proc_source);
- ds->proc_source = NULL;
+ assert (n_before >= 1);
+ assert (n_before <= ds->n_lag);
- proc_cancel_all_transformations (ds);
+ if (n_before <= deque_count (&ds->lag))
+ return &ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
+ else
+ return NULL;
}
\f
/* Returns the current set of permanent transformations,
and clears the permanent transformations.
For use by INPUT PROGRAM. */
struct trns_chain *
-proc_capture_transformations (struct dataset *ds)
+proc_capture_transformations (struct dataset *ds)
{
struct trns_chain *chain;
-
+
assert (ds->temporary_trns_chain == NULL);
chain = ds->permanent_trns_chain;
ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
+
+ if ( ds->xform_callback)
+ ds->xform_callback (false, ds->xform_callback_aux);
+
return chain;
}
add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
{
trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
+ if ( ds->xform_callback)
+ ds->xform_callback (true, ds->xform_callback_aux);
}
/* Adds a transformation that processes a case with PROC and
FINALIZE will be called.
The functions are passed AUX as auxiliary data. */
void
-add_transformation_with_finalizer (struct dataset *ds,
+add_transformation_with_finalizer (struct dataset *ds,
trns_finalize_func *finalize,
trns_proc_func *proc,
trns_free_func *free, void *aux)
{
trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
+
+ if ( ds->xform_callback)
+ ds->xform_callback (true, ds->xform_callback_aux);
}
/* Returns the index of the next transformation.
This value can be returned by a transformation procedure
function to indicate a "jump" to that transformation. */
size_t
-next_transformation (const struct dataset *ds)
+next_transformation (const struct dataset *ds)
{
return trns_chain_next (ds->cur_trns_chain);
}
a temporary transformation, false if it will add a permanent
transformation. */
bool
-proc_in_temporary_transformations (const struct dataset *ds)
+proc_in_temporary_transformations (const struct dataset *ds)
{
return ds->temporary_trns_chain != NULL;
}
Further calls to add_transformation() will add temporary
transformations. */
void
-proc_start_temporary_transformations (struct dataset *ds)
+proc_start_temporary_transformations (struct dataset *ds)
{
if (!proc_in_temporary_transformations (ds))
{
add_case_limit_trns (ds);
ds->permanent_dict = dict_clone (ds->dict);
+
trns_chain_finalize (ds->permanent_trns_chain);
ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
+
+ if ( ds->xform_callback)
+ ds->xform_callback (true, ds->xform_callback_aux);
}
}
permanent.
Returns true if anything changed, false otherwise. */
bool
-proc_make_temporary_transformations_permanent (struct dataset *ds)
+proc_make_temporary_transformations_permanent (struct dataset *ds)
{
- if (proc_in_temporary_transformations (ds))
+ if (proc_in_temporary_transformations (ds))
{
trns_chain_finalize (ds->temporary_trns_chain);
trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
transformations will be permanent.
Returns true if anything changed, false otherwise. */
bool
-proc_cancel_temporary_transformations (struct dataset *ds)
+proc_cancel_temporary_transformations (struct dataset *ds)
{
- if (proc_in_temporary_transformations (ds))
+ if (proc_in_temporary_transformations (ds))
{
dict_destroy (ds->dict);
ds->dict = ds->permanent_dict;
trns_chain_destroy (ds->temporary_trns_chain);
ds->temporary_trns_chain = NULL;
+ if ( ds->xform_callback)
+ ds->xform_callback (!trns_chain_is_empty (ds->permanent_trns_chain),
+ ds->xform_callback_aux);
+
return true;
}
else
proc_cancel_all_transformations (struct dataset *ds)
{
bool ok;
+ assert (ds->proc_state == PROC_COMMITTED);
ok = trns_chain_destroy (ds->permanent_trns_chain);
ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
ds->temporary_trns_chain = NULL;
+ if ( ds->xform_callback)
+ ds->xform_callback (false, ds->xform_callback_aux);
+
return ok;
}
\f
{
struct dataset *ds = xzalloc (sizeof(*ds));
ds->dict = dict_create ();
+ ds->caseinit = caseinit_create ();
proc_cancel_all_transformations (ds);
return ds;
}
+
+void
+dataset_add_transform_change_callback (struct dataset *ds,
+ transformation_change_callback_func *cb,
+ void *aux)
+{
+ ds->xform_callback = cb;
+ ds->xform_callback_aux = aux;
+}
+
/* Finishes up procedure handling. */
void
destroy_dataset (struct dataset *ds)
{
- discard_variables (ds);
+ proc_discard_active_file (ds);
dict_destroy (ds->dict);
+ caseinit_destroy (ds->caseinit);
trns_chain_destroy (ds->permanent_trns_chain);
+
+ if ( ds->xform_callback)
+ ds->xform_callback (false, ds->xform_callback_aux);
free (ds);
}
-/* Sets SINK as the destination for procedure output from the
- next procedure. */
+/* Causes output from the next procedure to be discarded, instead
+ of being preserved for use as input for the next procedure. */
+void
+proc_discard_output (struct dataset *ds)
+{
+ ds->discard_output = true;
+}
+
+/* Discards the active file dictionary, data, and
+ transformations. */
void
-proc_set_sink (struct dataset *ds, struct case_sink *sink)
+proc_discard_active_file (struct dataset *ds)
{
- assert (ds->proc_sink == NULL);
- ds->proc_sink = sink;
+ assert (ds->proc_state == PROC_COMMITTED);
+
+ dict_clear (ds->dict);
+ fh_set_default_handle (NULL);
+
+ ds->n_lag = 0;
+
+ casereader_destroy (ds->source);
+ ds->source = NULL;
+
+ proc_cancel_all_transformations (ds);
}
/* Sets SOURCE as the source for procedure input for the next
procedure. */
void
-proc_set_source (struct dataset *ds, struct case_source *source)
+proc_set_active_file (struct dataset *ds,
+ struct casereader *source,
+ struct dictionary *dict)
{
- assert (ds->proc_source == NULL);
- ds->proc_source = source;
+ assert (ds->proc_state == PROC_COMMITTED);
+ assert (ds->dict != dict);
+
+ proc_discard_active_file (ds);
+
+ dict_destroy (ds->dict);
+ ds->dict = dict;
+
+ proc_set_active_file_data (ds, source);
}
-/* Returns true if a source for the next procedure has been
- configured, false otherwise. */
+/* Replaces the active file's data by READER without replacing
+ the associated dictionary. */
bool
-proc_has_source (const struct dataset *ds)
+proc_set_active_file_data (struct dataset *ds, struct casereader *reader)
{
- return ds->proc_source != NULL;
+ casereader_destroy (ds->source);
+ ds->source = reader;
+
+ caseinit_clear (ds->caseinit);
+ caseinit_mark_as_preinited (ds->caseinit, ds->dict);
+
+ return reader == NULL || !casereader_error (reader);
}
-/* Returns the output from the previous procedure.
- For use only immediately after executing a procedure.
- The returned casefile is owned by the caller; it will not be
- automatically used for the next procedure's input. */
-struct casefile *
-proc_capture_output (struct dataset *ds)
+/* Returns true if an active file data source is available, false
+ otherwise. */
+bool
+proc_has_active_file (const struct dataset *ds)
{
- struct casefile *casefile;
+ return ds->source != NULL;
+}
- /* Try to make sure that this function is called immediately
- after procedure() or a similar function. */
- assert (ds->proc_source != NULL);
- assert (case_source_is_class (ds->proc_source, &storage_source_class));
- assert (trns_chain_is_empty (ds->permanent_trns_chain));
- assert (!proc_in_temporary_transformations (ds));
+/* Returns the active file data source from DS, or a null pointer
+ if DS has no data source, and removes it from DS. */
+struct casereader *
+proc_extract_active_file_data (struct dataset *ds)
+{
+ struct casereader *reader = ds->source;
+ ds->source = NULL;
- casefile = storage_source_decapsulate (ds->proc_source);
- ds->proc_source = NULL;
+ return reader;
+}
- return casefile;
+/* Checks whether DS has a corrupted active file. If so,
+ discards it and returns false. If not, returns true without
+ doing anything. */
+bool
+dataset_end_of_command (struct dataset *ds)
+{
+ if (ds->source != NULL)
+ {
+ if (casereader_error (ds->source))
+ {
+ proc_discard_active_file (ds);
+ return false;
+ }
+ else
+ {
+ const struct taint *taint = casereader_get_taint (ds->source);
+ taint_reset_successor_taint ((struct taint *) taint);
+ assert (!taint_has_tainted_successor (taint));
+ }
+ }
+ return true;
}
\f
static trns_proc_func case_limit_trns_proc;
/* Adds a transformation that limits the number of cases that may
pass through, if DS->DICT has a case limit. */
static void
-add_case_limit_trns (struct dataset *ds)
+add_case_limit_trns (struct dataset *ds)
{
- size_t case_limit = dict_get_case_limit (ds->dict);
+ casenumber case_limit = dict_get_case_limit (ds->dict);
if (case_limit != 0)
{
- size_t *cases_remaining = xmalloc (sizeof *cases_remaining);
+ casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
*cases_remaining = case_limit;
add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
cases_remaining);
*CASES_REMAINING. */
static int
case_limit_trns_proc (void *cases_remaining_,
- struct ccase *c UNUSED, casenumber case_nr UNUSED)
+ struct ccase *c UNUSED, casenumber case_nr UNUSED)
{
size_t *cases_remaining = cases_remaining_;
- if (*cases_remaining > 0)
+ if (*cases_remaining > 0)
{
(*cases_remaining)--;
return TRNS_CONTINUE;
/* Frees the data associated with a case limit transformation. */
static bool
-case_limit_trns_free (void *cases_remaining_)
+case_limit_trns_free (void *cases_remaining_)
{
size_t *cases_remaining = cases_remaining_;
free (cases_remaining);
/* Adds a temporary transformation to filter data according to
the variable specified on FILTER, if any. */
static void
-add_filter_trns (struct dataset *ds)
+add_filter_trns (struct dataset *ds)
{
struct variable *filter_var = dict_get_filter (ds->dict);
- if (filter_var != NULL)
+ if (filter_var != NULL)
{
proc_start_temporary_transformations (ds);
add_transformation (ds, filter_trns_proc, NULL, filter_var);
/* FILTER transformation. */
static int
filter_trns_proc (void *filter_var_,
- struct ccase *c UNUSED, casenumber case_nr UNUSED)
-
+ struct ccase *c UNUSED, casenumber case_nr UNUSED)
+
{
struct variable *filter_var = filter_var_;
double f = case_num (c, filter_var);
- return (f != 0.0 && !var_is_num_missing (filter_var, f)
+ return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
? TRNS_CONTINUE : TRNS_DROP_CASE);
}
return ds->dict;
}
-
-void
-dataset_set_dict (struct dataset *ds, struct dictionary *dict)
+const struct casereader *
+dataset_source (const struct dataset *ds)
{
- ds->dict = dict;
-}
-
-int
-dataset_n_lag (const struct dataset *ds)
-{
- return ds->n_lag;
+ return ds->source;
}
-void
-dataset_set_n_lag (struct dataset *ds, int n_lag)
+void
+dataset_need_lag (struct dataset *ds, int n_before)
{
- ds->n_lag = n_lag;
+ ds->n_lag = MAX (ds->n_lag, n_before);
}
-
-