X-Git-Url: https://pintos-os.org/cgi-bin/gitweb.cgi?a=blobdiff_plain;f=src%2Fdata%2Fprocedure.c;h=b762214d10e20a33d6bbdf89ac52253a67589453;hb=5c3291dc396b795696e94f47780308fd7ace6fc4;hp=4dcce32b0f8ec8ac467423d52c1d25ebf4cdb951;hpb=6492b3b49661963dc6d78201a1eb3927fdf54b68;p=pspp-builds.git diff --git a/src/data/procedure.c b/src/data/procedure.c index 4dcce32b..b762214d 100644 --- a/src/data/procedure.c +++ b/src/data/procedure.c @@ -1,21 +1,18 @@ -/* PSPP - computes sample statistics. - Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc. - Written by Ben Pfaff . +/* PSPP - a program for statistical analysis. + Copyright (C) 1997-9, 2000, 2006, 2007, 2009 Free Software Foundation, Inc. - This program is free software; you can redistribute it and/or - modify it under the terms of the GNU General Public License as - published by the Free Software Foundation; either version 2 of the - License, or (at your option) any later version. + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. - This program is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA - 02110-1301, USA. */ + along with this program. If not, see . */ #include @@ -24,654 +21,392 @@ #include #include -#include -#include #include -#include +#include +#include +#include +#include +#include #include #include #include -#include #include #include -#include +#include #include #include +#include +#include + +#include "minmax.h" +#include "xalloc.h" + +struct dataset { + /* Cases are read from source, + their transformation variables are initialized, + pass through permanent_trns_chain (which transforms them into + the format described by permanent_dict), + are written to sink, + pass through temporary_trns_chain (which transforms them into + the format described by dict), + and are finally passed to the procedure. */ + struct casereader *source; + struct caseinit *caseinit; + struct trns_chain *permanent_trns_chain; + struct dictionary *permanent_dict; + struct casewriter *sink; + struct trns_chain *temporary_trns_chain; + struct dictionary *dict; + + /* Callback which occurs whenever the transformation chain(s) have + been modified */ + transformation_change_callback_func *xform_callback; + void *xform_callback_aux; + + /* If true, cases are discarded instead of being written to + sink. */ + bool discard_output; + + /* The transformation chain that the next transformation will be + added to. */ + struct trns_chain *cur_trns_chain; + + /* The case map used to compact a case, if necessary; + otherwise a null pointer. */ + struct case_map *compactor; + + /* Time at which proc was last invoked. */ + time_t last_proc_invocation; + + /* Cases just before ("lagging") the current one. */ + int n_lag; /* Number of cases to lag. */ + struct deque lag; /* Deque of lagged cases. */ + struct ccase **lag_cases; /* Lagged cases managed by deque. */ + + /* Procedure data. */ + enum + { + PROC_COMMITTED, /* No procedure in progress. */ + PROC_OPEN, /* proc_open called, casereader still open. */ + PROC_CLOSED /* casereader from proc_open destroyed, + but proc_commit not yet called. */ + } + proc_state; + casenumber cases_written; /* Cases output so far. */ + bool ok; /* Error status. */ -/* Procedure execution data. */ -struct write_case_data - { - /* Function to call for each case. */ - bool (*case_func) (const struct ccase *, void *); - void *aux; - - struct ccase trns_case; /* Case used for transformations. */ - struct ccase sink_case; /* Case written to sink, if - compacting is necessary. */ - size_t cases_written; /* Cases output so far. */ - }; - -/* Cases are read from proc_source, - pass through permanent_trns_chain (which transforms them into - the format described by permanent_dict), - are written to proc_sink, - pass through temporary_trns_chain (which transforms them into - the format described by default_dict), - and are finally passed to the procedure. */ -static struct case_source *proc_source; -static struct trns_chain *permanent_trns_chain; -static struct dictionary *permanent_dict; -static struct case_sink *proc_sink; -static struct trns_chain *temporary_trns_chain; -struct dictionary *default_dict; - -/* The transformation chain that the next transformation will be - added to. */ -static struct trns_chain *cur_trns_chain; - -/* The compactor used to compact a case, if necessary; - otherwise a null pointer. */ -static struct dict_compactor *compactor; - -/* Time at which proc was last invoked. */ -static time_t last_proc_invocation; - -/* Lag queue. */ -int n_lag; /* Number of cases to lag. */ -static int lag_count; /* Number of cases in lag_queue so far. */ -static int lag_head; /* Index where next case will be added. */ -static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */ - -static void add_case_limit_trns (void); -static void add_filter_trns (void); - -static bool internal_procedure (bool (*case_func) (const struct ccase *, - void *), - bool (*end_func) (void *), - void *aux); -static void update_last_proc_invocation (void); -static void create_trns_case (struct ccase *, struct dictionary *); -static void open_active_file (void); -static bool write_case (struct write_case_data *wc_data); -static void lag_case (const struct ccase *c); -static void clear_case (struct ccase *c); -static bool close_active_file (void); - -/* Public functions. */ + void (*callback) (void *); /* Callback for when the dataset changes */ + void *cb_data; -/* Returns the last time the data was read. */ -time_t -time_of_last_procedure (void) -{ - if (last_proc_invocation == 0) - update_last_proc_invocation (); - return last_proc_invocation; -} - -/* Regular procedure. */ +}; /* struct dataset */ -/* Reads the data from the input program and writes it to a new - active file. For each case we read from the input program, we - do the following: - 1. Execute permanent transformations. If these drop the case, - start the next case from step 1. +static void add_case_limit_trns (struct dataset *ds); +static void add_filter_trns (struct dataset *ds); - 2. Write case to replacement active file. - - 3. Execute temporary transformations. If these drop the case, - start the next case from step 1. - - 4. Pass case to PROC_FUNC, passing AUX as auxiliary data. +static void update_last_proc_invocation (struct dataset *ds); - Returns true if successful, false if an I/O error occurred. */ -bool -procedure (bool (*proc_func) (const struct ccase *, void *), void *aux) +static void +dataset_set_unsaved (const struct dataset *ds) { - return internal_procedure (proc_func, NULL, aux); + if (ds->callback) ds->callback (ds->cb_data); } - -/* Multipass procedure. */ -struct multipass_aux_data - { - struct casefile *casefile; - - bool (*proc_func) (const struct casefile *, void *aux); - void *aux; - }; + +/* Public functions. */ -/* Case processing function for multipass_procedure(). */ -static bool -multipass_case_func (const struct ccase *c, void *aux_data_) +void +dataset_set_callback (struct dataset *ds, void (*cb) (void *), void *cb_data) { - struct multipass_aux_data *aux_data = aux_data_; - return casefile_append (aux_data->casefile, c); + ds->callback = cb; + ds->cb_data = cb_data; } -/* End-of-file function for multipass_procedure(). */ -static bool -multipass_end_func (void *aux_data_) -{ - struct multipass_aux_data *aux_data = aux_data_; - return (aux_data->proc_func == NULL - || aux_data->proc_func (aux_data->casefile, aux_data->aux)); -} -/* Procedure that allows multiple passes over the input data. - The entire active file is passed to PROC_FUNC, with the given - AUX as auxiliary data, as a unit. */ -bool -multipass_procedure (bool (*proc_func) (const struct casefile *, void *aux), - void *aux) +/* Returns the last time the data was read. */ +time_t +time_of_last_procedure (struct dataset *ds) { - struct multipass_aux_data aux_data; - bool ok; - - aux_data.casefile = casefile_create (dict_get_next_value_idx (default_dict)); - aux_data.proc_func = proc_func; - aux_data.aux = aux; - - ok = internal_procedure (multipass_case_func, multipass_end_func, &aux_data); - ok = !casefile_error (aux_data.casefile) && ok; - - casefile_destroy (aux_data.casefile); - - return ok; + if (ds->last_proc_invocation == 0) + update_last_proc_invocation (ds); + return ds->last_proc_invocation; } -/* Procedure implementation. */ +/* Regular procedure. */ -/* Executes a procedure. - Passes each case to CASE_FUNC. - Calls END_FUNC after the last case. - Returns true if successful, false if an I/O error occurred (or - if CASE_FUNC or END_FUNC ever returned false). */ -static bool -internal_procedure (bool (*case_func) (const struct ccase *, void *), - bool (*end_func) (void *), - void *aux) +/* Executes any pending transformations, if necessary. + This is not identical to the EXECUTE command in that it won't + always read the source data. This can be important when the + source data is given inline within BEGIN DATA...END FILE. */ +bool +proc_execute (struct dataset *ds) { - struct write_case_data wc_data; - bool ok = true; - - assert (proc_source != NULL); - - update_last_proc_invocation (); + bool ok; - /* Optimize the trivial case where we're not going to do - anything with the data, by not reading the data at all. */ - if (case_func == NULL && end_func == NULL - && case_source_is_class (proc_source, &storage_source_class) - && proc_sink == NULL - && (temporary_trns_chain == NULL - || trns_chain_is_empty (temporary_trns_chain)) - && trns_chain_is_empty (permanent_trns_chain)) + if ((ds->temporary_trns_chain == NULL + || trns_chain_is_empty (ds->temporary_trns_chain)) + && trns_chain_is_empty (ds->permanent_trns_chain)) { - n_lag = 0; - dict_set_case_limit (default_dict, 0); - dict_clear_vectors (default_dict); + ds->n_lag = 0; + ds->discard_output = false; + dict_set_case_limit (ds->dict, 0); + dict_clear_vectors (ds->dict); return true; } - - open_active_file (); - - wc_data.case_func = case_func; - wc_data.aux = aux; - create_trns_case (&wc_data.trns_case, default_dict); - case_create (&wc_data.sink_case, - dict_get_compacted_value_cnt (default_dict)); - wc_data.cases_written = 0; - - ok = proc_source->class->read (proc_source, - &wc_data.trns_case, - write_case, &wc_data) && ok; - if (end_func != NULL) - ok = end_func (aux) && ok; - - case_destroy (&wc_data.sink_case); - case_destroy (&wc_data.trns_case); - - ok = close_active_file () && ok; - return ok; + ok = casereader_destroy (proc_open (ds)); + return proc_commit (ds) && ok; } -/* Updates last_proc_invocation. */ -static void -update_last_proc_invocation (void) -{ - last_proc_invocation = time (NULL); -} +static const struct casereader_class proc_casereader_class; -/* Creates and returns a case, initializing it from the vectors - that say which `value's need to be initialized just once, and - which ones need to be re-initialized before every case. */ -static void -create_trns_case (struct ccase *trns_case, struct dictionary *dict) +/* Opens dataset DS for reading cases with proc_read. + proc_commit must be called when done. */ +struct casereader * +proc_open (struct dataset *ds) { - size_t var_cnt = dict_get_var_cnt (dict); - size_t i; + assert (ds->source != NULL); + assert (ds->proc_state == PROC_COMMITTED); - case_create (trns_case, dict_get_next_value_idx (dict)); - for (i = 0; i < var_cnt; i++) - { - struct variable *v = dict_get_var (dict, i); - union value *value = case_data_rw (trns_case, v->fv); + update_last_proc_invocation (ds); - if (v->type == NUMERIC) - value->f = v->leave ? 0.0 : SYSMIS; - else - memset (value->s, ' ', v->width); - } -} - -/* Makes all preparations for reading from the data source and writing - to the data sink. */ -static void -open_active_file (void) -{ - add_case_limit_trns (); - add_filter_trns (); + caseinit_mark_for_init (ds->caseinit, ds->dict); - /* Finalize transformations. */ - trns_chain_finalize (cur_trns_chain); + /* Finish up the collection of transformations. */ + add_case_limit_trns (ds); + add_filter_trns (ds); + trns_chain_finalize (ds->cur_trns_chain); /* Make permanent_dict refer to the dictionary right before data reaches the sink. */ - if (permanent_dict == NULL) - permanent_dict = default_dict; - - /* Figure out whether to compact. */ - compactor = (dict_compacting_would_shrink (permanent_dict) - ? dict_make_compactor (permanent_dict) - : NULL); + if (ds->permanent_dict == NULL) + ds->permanent_dict = ds->dict; /* Prepare sink. */ - if (proc_sink == NULL) - proc_sink = create_case_sink (&storage_sink_class, permanent_dict, NULL); - if (proc_sink->class->open != NULL) - proc_sink->class->open (proc_sink); - - /* Allocate memory for lag queue. */ - if (n_lag > 0) + if (!ds->discard_output) { - int i; - - lag_count = 0; - lag_head = 0; - lag_queue = xnmalloc (n_lag, sizeof *lag_queue); - for (i = 0; i < n_lag; i++) - case_nullify (&lag_queue[i]); - } -} - -/* Transforms trns_case and writes it to the replacement active - file if advisable. Returns true if more cases can be - accepted, false otherwise. Do not call this function again - after it has returned false once. */ -static bool -write_case (struct write_case_data *wc_data) -{ - enum trns_result retval; - size_t case_nr; - - /* Execute permanent transformations. */ - case_nr = wc_data->cases_written + 1; - retval = trns_chain_execute (permanent_trns_chain, - &wc_data->trns_case, &case_nr); - if (retval != TRNS_CONTINUE) - goto done; - - /* Write case to LAG queue. */ - if (n_lag) - lag_case (&wc_data->trns_case); - - /* Write case to replacement active file. */ - wc_data->cases_written++; - if (proc_sink->class->write != NULL) - { - if (compactor != NULL) + struct dictionary *pd = ds->permanent_dict; + size_t compacted_value_cnt = dict_count_values (pd, 1u << DC_SCRATCH); + if (compacted_value_cnt < dict_get_next_value_idx (pd)) { - dict_compactor_compact (compactor, &wc_data->sink_case, - &wc_data->trns_case); - proc_sink->class->write (proc_sink, &wc_data->sink_case); + struct caseproto *compacted_proto; + compacted_proto = dict_get_compacted_proto (pd, 1u << DC_SCRATCH); + ds->compactor = case_map_to_compact_dict (pd, 1u << DC_SCRATCH); + ds->sink = autopaging_writer_create (compacted_proto); + caseproto_unref (compacted_proto); } else - proc_sink->class->write (proc_sink, &wc_data->trns_case); - } - - /* Execute temporary transformations. */ - if (temporary_trns_chain != NULL) - { - retval = trns_chain_execute (temporary_trns_chain, - &wc_data->trns_case, - &wc_data->cases_written); - if (retval != TRNS_CONTINUE) - goto done; - } - - /* Pass case to procedure. */ - if (wc_data->case_func != NULL) - if (!wc_data->case_func (&wc_data->trns_case, wc_data->aux)) - retval = TRNS_ERROR; - - done: - clear_case (&wc_data->trns_case); - return retval != TRNS_ERROR; -} - -/* Add C to the lag queue. */ -static void -lag_case (const struct ccase *c) -{ - if (lag_count < n_lag) - lag_count++; - case_destroy (&lag_queue[lag_head]); - case_clone (&lag_queue[lag_head], c); - if (++lag_head >= n_lag) - lag_head = 0; -} - -/* Clears the variables in C that need to be cleared between - processing cases. */ -static void -clear_case (struct ccase *c) -{ - size_t var_cnt = dict_get_var_cnt (default_dict); - size_t i; - - for (i = 0; i < var_cnt; i++) - { - struct variable *v = dict_get_var (default_dict, i); - if (!v->leave) { - if (v->type == NUMERIC) - case_data_rw (c, v->fv)->f = SYSMIS; - else - memset (case_data_rw (c, v->fv)->s, ' ', v->width); - } - } -} - -/* Closes the active file. */ -static bool -close_active_file (void) -{ - /* Free memory for lag queue, and turn off lagging. */ - if (n_lag > 0) - { - int i; - - for (i = 0; i < n_lag; i++) - case_destroy (&lag_queue[i]); - free (lag_queue); - n_lag = 0; + ds->compactor = NULL; + ds->sink = autopaging_writer_create (dict_get_proto (pd)); + } } - - /* Dictionary from before TEMPORARY becomes permanent. */ - proc_cancel_temporary_transformations (); - - /* Finish compacting. */ - if (compactor != NULL) + else { - dict_compactor_destroy (compactor); - dict_compact_values (default_dict); - compactor = NULL; + ds->compactor = NULL; + ds->sink = NULL; } - - /* Free data source. */ - free_case_source (proc_source); - proc_source = NULL; - /* Old data sink becomes new data source. */ - if (proc_sink->class->make_source != NULL) - proc_source = proc_sink->class->make_source (proc_sink); - free_case_sink (proc_sink); - proc_sink = NULL; + /* Allocate memory for lagged cases. */ + ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases); - dict_clear_vectors (default_dict); - permanent_dict = NULL; - return proc_cancel_all_transformations (); -} - -/* Returns a pointer to the lagged case from N_BEFORE cases before the - current one, or NULL if there haven't been that many cases yet. */ -struct ccase * -lagged_case (int n_before) -{ - assert (n_before >= 1 ); - assert (n_before <= n_lag); + ds->proc_state = PROC_OPEN; + ds->cases_written = 0; + ds->ok = true; - if (n_before <= lag_count) - { - int index = lag_head - n_before; - if (index < 0) - index += n_lag; - return &lag_queue[index]; - } - else - return NULL; + /* FIXME: use taint in dataset in place of `ok'? */ + /* FIXME: for trivial cases we can just return a clone of + ds->source? */ + return casereader_create_sequential (NULL, dict_get_proto (ds->dict), + CASENUMBER_MAX, + &proc_casereader_class, ds); } - -/* Procedure that separates the data into SPLIT FILE groups. */ -/* Represents auxiliary data for handling SPLIT FILE. */ -struct split_aux_data - { - struct ccase prev_case; /* Data in previous case. */ - - /* Callback functions. */ - void (*begin_func) (const struct ccase *, void *); - bool (*proc_func) (const struct ccase *, void *); - void (*end_func) (void *); - void *func_aux; - }; - -static int equal_splits (const struct ccase *, const struct ccase *); -static bool split_procedure_case_func (const struct ccase *c, void *); -static bool split_procedure_end_func (void *); - -/* Like procedure(), but it automatically breaks the case stream - into SPLIT FILE break groups. Before each group of cases with - identical SPLIT FILE variable values, BEGIN_FUNC is called - with the first case in the group. - Then PROC_FUNC is called for each case in the group (including - the first). - END_FUNC is called when the group is finished. FUNC_AUX is - passed to each of the functions as auxiliary data. - - If the active file is empty, none of BEGIN_FUNC, PROC_FUNC, - and END_FUNC will be called at all. - - If SPLIT FILE is not in effect, then there is one break group - (if the active file is nonempty), and BEGIN_FUNC and END_FUNC - will be called once. - - Returns true if successful, false if an I/O error occurred. */ +/* Returns true if a procedure is in progress, that is, if + proc_open has been called but proc_commit has not. */ bool -procedure_with_splits (void (*begin_func) (const struct ccase *, void *aux), - bool (*proc_func) (const struct ccase *, void *aux), - void (*end_func) (void *aux), - void *func_aux) +proc_is_open (const struct dataset *ds) { - struct split_aux_data split_aux; - bool ok; - - case_nullify (&split_aux.prev_case); - split_aux.begin_func = begin_func; - split_aux.proc_func = proc_func; - split_aux.end_func = end_func; - split_aux.func_aux = func_aux; - - ok = internal_procedure (split_procedure_case_func, - split_procedure_end_func, &split_aux); - - case_destroy (&split_aux.prev_case); - - return ok; + return ds->proc_state != PROC_COMMITTED; } -/* Case callback used by procedure_with_splits(). */ -static bool -split_procedure_case_func (const struct ccase *c, void *split_aux_) +/* "read" function for procedure casereader. */ +static struct ccase * +proc_casereader_read (struct casereader *reader UNUSED, void *ds_) { - struct split_aux_data *split_aux = split_aux_; + struct dataset *ds = ds_; + enum trns_result retval = TRNS_DROP_CASE; + struct ccase *c; - /* Start a new series if needed. */ - if (case_is_null (&split_aux->prev_case) - || !equal_splits (c, &split_aux->prev_case)) + assert (ds->proc_state == PROC_OPEN); + for (; ; case_unref (c)) { - if (!case_is_null (&split_aux->prev_case) && split_aux->end_func != NULL) - split_aux->end_func (split_aux->func_aux); - - case_destroy (&split_aux->prev_case); - case_clone (&split_aux->prev_case, c); + casenumber case_nr; + + assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR); + if (retval == TRNS_ERROR) + ds->ok = false; + if (!ds->ok) + return NULL; + + /* Read a case from source. */ + c = casereader_read (ds->source); + if (c == NULL) + return NULL; + c = case_unshare_and_resize (c, dict_get_proto (ds->dict)); + caseinit_init_vars (ds->caseinit, c); + + /* Execute permanent transformations. */ + case_nr = ds->cases_written + 1; + retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE, + &c, case_nr); + caseinit_update_left_vars (ds->caseinit, c); + if (retval != TRNS_CONTINUE) + continue; - if (split_aux->begin_func != NULL) - split_aux->begin_func (&split_aux->prev_case, split_aux->func_aux); - } + /* Write case to collection of lagged cases. */ + if (ds->n_lag > 0) + { + while (deque_count (&ds->lag) >= ds->n_lag) + case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]); + ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c); + } - return (split_aux->proc_func == NULL - || split_aux->proc_func (c, split_aux->func_aux)); -} + /* Write case to replacement active file. */ + ds->cases_written++; + if (ds->sink != NULL) + casewriter_write (ds->sink, + case_map_execute (ds->compactor, case_ref (c))); -/* End-of-file callback used by procedure_with_splits(). */ -static bool -split_procedure_end_func (void *split_aux_) -{ - struct split_aux_data *split_aux = split_aux_; + /* Execute temporary transformations. */ + if (ds->temporary_trns_chain != NULL) + { + retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE, + &c, ds->cases_written); + if (retval != TRNS_CONTINUE) + continue; + } - if (!case_is_null (&split_aux->prev_case) && split_aux->end_func != NULL) - split_aux->end_func (split_aux->func_aux); - return true; + return c; + } } -/* Compares the SPLIT FILE variables in cases A and B and returns - nonzero only if they differ. */ -static int -equal_splits (const struct ccase *a, const struct ccase *b) +/* "destroy" function for procedure casereader. */ +static void +proc_casereader_destroy (struct casereader *reader, void *ds_) { - return case_compare (a, b, - dict_get_split_vars (default_dict), - dict_get_split_cnt (default_dict)) == 0; -} - -/* Multipass procedure that separates the data into SPLIT FILE - groups. */ - -/* Represents auxiliary data for handling SPLIT FILE in a - multipass procedure. */ -struct multipass_split_aux_data - { - struct ccase prev_case; /* Data in previous case. */ - struct casefile *casefile; /* Accumulates data for a split. */ + struct dataset *ds = ds_; + struct ccase *c; - /* Function to call with the accumulated data. */ - bool (*split_func) (const struct ccase *first, const struct casefile *, - void *); - void *func_aux; /* Auxiliary data. */ - }; + /* Make sure transformations happen for every input case, in + case they have side effects, and ensure that the replacement + active file gets all the cases it should. */ + while ((c = casereader_read (reader)) != NULL) + case_unref (c); -static bool multipass_split_case_func (const struct ccase *c, void *aux_); -static bool multipass_split_end_func (void *aux_); -static bool multipass_split_output (struct multipass_split_aux_data *); + ds->proc_state = PROC_CLOSED; + ds->ok = casereader_destroy (ds->source) && ds->ok; + ds->source = NULL; + proc_set_active_file_data (ds, NULL); +} -/* Returns true if successful, false if an I/O error occurred. */ +/* Must return false if the source casereader, a transformation, + or the sink casewriter signaled an error. (If a temporary + transformation signals an error, then the return value is + false, but the replacement active file may still be + untainted.) */ bool -multipass_procedure_with_splits (bool (*split_func) (const struct ccase *first, - const struct casefile *, - void *aux), - void *func_aux) +proc_commit (struct dataset *ds) { - struct multipass_split_aux_data aux; - bool ok; + assert (ds->proc_state == PROC_CLOSED); + ds->proc_state = PROC_COMMITTED; - case_nullify (&aux.prev_case); - aux.casefile = NULL; - aux.split_func = split_func; - aux.func_aux = func_aux; + dataset_set_unsaved (ds); - ok = internal_procedure (multipass_split_case_func, - multipass_split_end_func, &aux); - case_destroy (&aux.prev_case); - - return ok; -} + /* Free memory for lagged cases. */ + while (!deque_is_empty (&ds->lag)) + case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]); + free (ds->lag_cases); -/* Case callback used by multipass_procedure_with_splits(). */ -static bool -multipass_split_case_func (const struct ccase *c, void *aux_) -{ - struct multipass_split_aux_data *aux = aux_; - bool ok = true; + /* Dictionary from before TEMPORARY becomes permanent. */ + proc_cancel_temporary_transformations (ds); - /* Start a new series if needed. */ - if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case)) + if (!ds->discard_output) { - /* Record split values. */ - case_destroy (&aux->prev_case); - case_clone (&aux->prev_case, c); + /* Finish compacting. */ + if (ds->compactor != NULL) + { + case_map_destroy (ds->compactor); + ds->compactor = NULL; - /* Pass any cases to split_func. */ - if (aux->casefile != NULL) - ok = multipass_split_output (aux); + dict_delete_scratch_vars (ds->dict); + dict_compact_values (ds->dict); + } - /* Start a new casefile. */ - aux->casefile = casefile_create (dict_get_next_value_idx (default_dict)); + /* Old data sink becomes new data source. */ + if (ds->sink != NULL) + ds->source = casewriter_make_reader (ds->sink); } + else + { + ds->source = NULL; + ds->discard_output = false; + } + ds->sink = NULL; - return casefile_append (aux->casefile, c) && ok; -} + caseinit_clear (ds->caseinit); + caseinit_mark_as_preinited (ds->caseinit, ds->dict); -/* End-of-file callback used by multipass_procedure_with_splits(). */ -static bool -multipass_split_end_func (void *aux_) -{ - struct multipass_split_aux_data *aux = aux_; - return (aux->casefile == NULL || multipass_split_output (aux)); + dict_clear_vectors (ds->dict); + ds->permanent_dict = NULL; + return proc_cancel_all_transformations (ds) && ds->ok; } -static bool -multipass_split_output (struct multipass_split_aux_data *aux) -{ - bool ok; - - assert (aux->casefile != NULL); - ok = aux->split_func (&aux->prev_case, aux->casefile, aux->func_aux); - casefile_destroy (aux->casefile); - aux->casefile = NULL; +/* Casereader class for procedure execution. */ +static const struct casereader_class proc_casereader_class = + { + proc_casereader_read, + proc_casereader_destroy, + NULL, + NULL, + }; - return ok; +/* Updates last_proc_invocation. */ +static void +update_last_proc_invocation (struct dataset *ds) +{ + ds->last_proc_invocation = time (NULL); } -/* Discards all the current state in preparation for a data-input - command like DATA LIST or GET. */ -void -discard_variables (void) +/* Returns a pointer to the lagged case from N_BEFORE cases before the + current one, or NULL if there haven't been that many cases yet. */ +const struct ccase * +lagged_case (const struct dataset *ds, int n_before) { - dict_clear (default_dict); - fh_set_default_handle (NULL); - - n_lag = 0; - - free_case_source (proc_source); - proc_source = NULL; + assert (n_before >= 1); + assert (n_before <= ds->n_lag); - proc_cancel_all_transformations (); + if (n_before <= deque_count (&ds->lag)) + return ds->lag_cases[deque_front (&ds->lag, n_before - 1)]; + else + return NULL; } /* Returns the current set of permanent transformations, and clears the permanent transformations. For use by INPUT PROGRAM. */ struct trns_chain * -proc_capture_transformations (void) +proc_capture_transformations (struct dataset *ds) { struct trns_chain *chain; - - assert (temporary_trns_chain == NULL); - chain = permanent_trns_chain; - cur_trns_chain = permanent_trns_chain = trns_chain_create (); + + assert (ds->temporary_trns_chain == NULL); + chain = ds->permanent_trns_chain; + ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create (); + + if ( ds->xform_callback) + ds->xform_callback (false, ds->xform_callback_aux); + return chain; } @@ -679,9 +414,11 @@ proc_capture_transformations (void) frees itself with FREE to the current set of transformations. The functions are passed AUX as auxiliary data. */ void -add_transformation (trns_proc_func *proc, trns_free_func *free, void *aux) +add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux) { - trns_chain_append (cur_trns_chain, NULL, proc, free, aux); + trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux); + if ( ds->xform_callback) + ds->xform_callback (true, ds->xform_callback_aux); } /* Adds a transformation that processes a case with PROC and @@ -690,44 +427,52 @@ add_transformation (trns_proc_func *proc, trns_free_func *free, void *aux) FINALIZE will be called. The functions are passed AUX as auxiliary data. */ void -add_transformation_with_finalizer (trns_finalize_func *finalize, +add_transformation_with_finalizer (struct dataset *ds, + trns_finalize_func *finalize, trns_proc_func *proc, trns_free_func *free, void *aux) { - trns_chain_append (cur_trns_chain, finalize, proc, free, aux); + trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux); + + if ( ds->xform_callback) + ds->xform_callback (true, ds->xform_callback_aux); } /* Returns the index of the next transformation. This value can be returned by a transformation procedure function to indicate a "jump" to that transformation. */ size_t -next_transformation (void) +next_transformation (const struct dataset *ds) { - return trns_chain_next (cur_trns_chain); + return trns_chain_next (ds->cur_trns_chain); } /* Returns true if the next call to add_transformation() will add a temporary transformation, false if it will add a permanent transformation. */ bool -proc_in_temporary_transformations (void) +proc_in_temporary_transformations (const struct dataset *ds) { - return temporary_trns_chain != NULL; + return ds->temporary_trns_chain != NULL; } /* Marks the start of temporary transformations. Further calls to add_transformation() will add temporary transformations. */ void -proc_start_temporary_transformations (void) +proc_start_temporary_transformations (struct dataset *ds) { - if (!proc_in_temporary_transformations ()) + if (!proc_in_temporary_transformations (ds)) { - add_case_limit_trns (); + add_case_limit_trns (ds); - permanent_dict = dict_clone (default_dict); - trns_chain_finalize (permanent_trns_chain); - temporary_trns_chain = cur_trns_chain = trns_chain_create (); + ds->permanent_dict = dict_clone (ds->dict); + + trns_chain_finalize (ds->permanent_trns_chain); + ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create (); + + if ( ds->xform_callback) + ds->xform_callback (true, ds->xform_callback_aux); } } @@ -736,16 +481,16 @@ proc_start_temporary_transformations (void) permanent. Returns true if anything changed, false otherwise. */ bool -proc_make_temporary_transformations_permanent (void) +proc_make_temporary_transformations_permanent (struct dataset *ds) { - if (proc_in_temporary_transformations ()) + if (proc_in_temporary_transformations (ds)) { - trns_chain_finalize (temporary_trns_chain); - trns_chain_splice (permanent_trns_chain, temporary_trns_chain); - temporary_trns_chain = NULL; + trns_chain_finalize (ds->temporary_trns_chain); + trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain); + ds->temporary_trns_chain = NULL; - dict_destroy (permanent_dict); - permanent_dict = NULL; + dict_destroy (ds->permanent_dict); + ds->permanent_dict = NULL; return true; } @@ -757,16 +502,20 @@ proc_make_temporary_transformations_permanent (void) transformations will be permanent. Returns true if anything changed, false otherwise. */ bool -proc_cancel_temporary_transformations (void) +proc_cancel_temporary_transformations (struct dataset *ds) { - if (proc_in_temporary_transformations ()) + if (proc_in_temporary_transformations (ds)) { - dict_destroy (default_dict); - default_dict = permanent_dict; - permanent_dict = NULL; + dict_destroy (ds->dict); + ds->dict = ds->permanent_dict; + ds->permanent_dict = NULL; + + trns_chain_destroy (ds->temporary_trns_chain); + ds->temporary_trns_chain = NULL; - trns_chain_destroy (temporary_trns_chain); - temporary_trns_chain = NULL; + if ( ds->xform_callback) + ds->xform_callback (!trns_chain_is_empty (ds->permanent_trns_chain), + ds->xform_callback_aux); return true; } @@ -777,96 +526,185 @@ proc_cancel_temporary_transformations (void) /* Cancels all transformations, if any. Returns true if successful, false on I/O error. */ bool -proc_cancel_all_transformations (void) +proc_cancel_all_transformations (struct dataset *ds) { bool ok; - ok = trns_chain_destroy (permanent_trns_chain); - ok = trns_chain_destroy (temporary_trns_chain) && ok; - permanent_trns_chain = cur_trns_chain = trns_chain_create (); - temporary_trns_chain = NULL; + assert (ds->proc_state == PROC_COMMITTED); + ok = trns_chain_destroy (ds->permanent_trns_chain); + ok = trns_chain_destroy (ds->temporary_trns_chain) && ok; + ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create (); + ds->temporary_trns_chain = NULL; + if ( ds->xform_callback) + ds->xform_callback (false, ds->xform_callback_aux); + return ok; } + +static void +dict_callback (struct dictionary *d UNUSED, void *ds_) +{ + struct dataset *ds = ds_; + dataset_set_unsaved (ds); +} + /* Initializes procedure handling. */ +struct dataset * +create_dataset (void) +{ + struct dataset *ds = xzalloc (sizeof(*ds)); + ds->dict = dict_create (); + + dict_set_change_callback (ds->dict, dict_callback, ds); + + dict_set_encoding (ds->dict, get_default_encoding ()); + + ds->caseinit = caseinit_create (); + proc_cancel_all_transformations (ds); + return ds; +} + + void -proc_init (void) +dataset_add_transform_change_callback (struct dataset *ds, + transformation_change_callback_func *cb, + void *aux) { - default_dict = dict_create (); - proc_cancel_all_transformations (); + ds->xform_callback = cb; + ds->xform_callback_aux = aux; } /* Finishes up procedure handling. */ void -proc_done (void) +destroy_dataset (struct dataset *ds) { - discard_variables (); - dict_destroy (default_dict); + proc_discard_active_file (ds); + dict_destroy (ds->dict); + caseinit_destroy (ds->caseinit); + trns_chain_destroy (ds->permanent_trns_chain); + + if ( ds->xform_callback) + ds->xform_callback (false, ds->xform_callback_aux); + free (ds); } -/* Sets SINK as the destination for procedure output from the - next procedure. */ +/* Causes output from the next procedure to be discarded, instead + of being preserved for use as input for the next procedure. */ void -proc_set_sink (struct case_sink *sink) +proc_discard_output (struct dataset *ds) { - assert (proc_sink == NULL); - proc_sink = sink; + ds->discard_output = true; +} + +/* Discards the active file dictionary, data, and + transformations. */ +void +proc_discard_active_file (struct dataset *ds) +{ + assert (ds->proc_state == PROC_COMMITTED); + + dict_clear (ds->dict); + fh_set_default_handle (NULL); + + ds->n_lag = 0; + + casereader_destroy (ds->source); + ds->source = NULL; + + proc_cancel_all_transformations (ds); } /* Sets SOURCE as the source for procedure input for the next procedure. */ void -proc_set_source (struct case_source *source) +proc_set_active_file (struct dataset *ds, + struct casereader *source, + struct dictionary *dict) { - assert (proc_source == NULL); - proc_source = source; + assert (ds->proc_state == PROC_COMMITTED); + assert (ds->dict != dict); + + proc_discard_active_file (ds); + + dict_destroy (ds->dict); + ds->dict = dict; + dict_set_change_callback (ds->dict, dict_callback, ds); + + proc_set_active_file_data (ds, source); } -/* Returns true if a source for the next procedure has been - configured, false otherwise. */ +/* Replaces the active file's data by READER without replacing + the associated dictionary. */ bool -proc_has_source (void) +proc_set_active_file_data (struct dataset *ds, struct casereader *reader) { - return proc_source != NULL; + casereader_destroy (ds->source); + ds->source = reader; + + caseinit_clear (ds->caseinit); + caseinit_mark_as_preinited (ds->caseinit, ds->dict); + + return reader == NULL || !casereader_error (reader); } -/* Returns the output from the previous procedure. - For use only immediately after executing a procedure. - The returned casefile is owned by the caller; it will not be - automatically used for the next procedure's input. */ -struct casefile * -proc_capture_output (void) +/* Returns true if an active file data source is available, false + otherwise. */ +bool +proc_has_active_file (const struct dataset *ds) { - struct casefile *casefile; + return ds->source != NULL; +} - /* Try to make sure that this function is called immediately - after procedure() or a similar function. */ - assert (proc_source != NULL); - assert (case_source_is_class (proc_source, &storage_source_class)); - assert (trns_chain_is_empty (permanent_trns_chain)); - assert (!proc_in_temporary_transformations ()); +/* Returns the active file data source from DS, or a null pointer + if DS has no data source, and removes it from DS. */ +struct casereader * +proc_extract_active_file_data (struct dataset *ds) +{ + struct casereader *reader = ds->source; + ds->source = NULL; - casefile = storage_source_decapsulate (proc_source); - proc_source = NULL; + return reader; +} - return casefile; +/* Checks whether DS has a corrupted active file. If so, + discards it and returns false. If not, returns true without + doing anything. */ +bool +dataset_end_of_command (struct dataset *ds) +{ + if (ds->source != NULL) + { + if (casereader_error (ds->source)) + { + proc_discard_active_file (ds); + return false; + } + else + { + const struct taint *taint = casereader_get_taint (ds->source); + taint_reset_successor_taint ((struct taint *) taint); + assert (!taint_has_tainted_successor (taint)); + } + } + return true; } static trns_proc_func case_limit_trns_proc; static trns_free_func case_limit_trns_free; /* Adds a transformation that limits the number of cases that may - pass through, if default_dict has a case limit. */ + pass through, if DS->DICT has a case limit. */ static void -add_case_limit_trns (void) +add_case_limit_trns (struct dataset *ds) { - size_t case_limit = dict_get_case_limit (default_dict); + casenumber case_limit = dict_get_case_limit (ds->dict); if (case_limit != 0) { - size_t *cases_remaining = xmalloc (sizeof *cases_remaining); + casenumber *cases_remaining = xmalloc (sizeof *cases_remaining); *cases_remaining = case_limit; - add_transformation (case_limit_trns_proc, case_limit_trns_free, + add_transformation (ds, case_limit_trns_proc, case_limit_trns_free, cases_remaining); - dict_set_case_limit (default_dict, 0); + dict_set_case_limit (ds->dict, 0); } } @@ -874,12 +712,12 @@ add_case_limit_trns (void) *CASES_REMAINING. */ static int case_limit_trns_proc (void *cases_remaining_, - struct ccase *c UNUSED, int case_nr UNUSED) + struct ccase **c UNUSED, casenumber case_nr UNUSED) { size_t *cases_remaining = cases_remaining_; - if (*cases_remaining > 0) + if (*cases_remaining > 0) { - *cases_remaining--; + (*cases_remaining)--; return TRNS_CONTINUE; } else @@ -888,7 +726,7 @@ case_limit_trns_proc (void *cases_remaining_, /* Frees the data associated with a case limit transformation. */ static bool -case_limit_trns_free (void *cases_remaining_) +case_limit_trns_free (void *cases_remaining_) { size_t *cases_remaining = cases_remaining_; free (cases_remaining); @@ -900,25 +738,43 @@ static trns_proc_func filter_trns_proc; /* Adds a temporary transformation to filter data according to the variable specified on FILTER, if any. */ static void -add_filter_trns (void) +add_filter_trns (struct dataset *ds) { - struct variable *filter_var = dict_get_filter (default_dict); - if (filter_var != NULL) + struct variable *filter_var = dict_get_filter (ds->dict); + if (filter_var != NULL) { - proc_start_temporary_transformations (); - add_transformation (filter_trns_proc, NULL, filter_var); + proc_start_temporary_transformations (ds); + add_transformation (ds, filter_trns_proc, NULL, filter_var); } } /* FILTER transformation. */ static int filter_trns_proc (void *filter_var_, - struct ccase *c UNUSED, int case_nr UNUSED) - + struct ccase **c UNUSED, casenumber case_nr UNUSED) + { struct variable *filter_var = filter_var_; - double f = case_num (c, filter_var->fv); - return (f != 0.0 && !mv_is_num_missing (&filter_var->miss, f) + double f = case_num (*c, filter_var); + return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY) ? TRNS_CONTINUE : TRNS_DROP_CASE); } + +struct dictionary * +dataset_dict (const struct dataset *ds) +{ + return ds->dict; +} + +const struct casereader * +dataset_source (const struct dataset *ds) +{ + return ds->source; +} + +void +dataset_need_lag (struct dataset *ds, int n_before) +{ + ds->n_lag = MAX (ds->n_lag, n_before); +}