X-Git-Url: https://pintos-os.org/cgi-bin/gitweb.cgi?a=blobdiff_plain;f=src%2Fdata%2Fprocedure.c;h=b762214d10e20a33d6bbdf89ac52253a67589453;hb=b5c82cc9aabe7e641011130240ae1b2e84348e23;hp=baba8fa7cefb20f049b09484d2de162f50b51694;hpb=5862de297bb487c81660beb3796d5c612eeb12b7;p=pspp-builds.git diff --git a/src/data/procedure.c b/src/data/procedure.c index baba8fa7..b762214d 100644 --- a/src/data/procedure.c +++ b/src/data/procedure.c @@ -1,20 +1,18 @@ -/* PSPP - computes sample statistics. - Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc. +/* PSPP - a program for statistical analysis. + Copyright (C) 1997-9, 2000, 2006, 2007, 2009 Free Software Foundation, Inc. - This program is free software; you can redistribute it and/or - modify it under the terms of the GNU General Public License as - published by the Free Software Foundation; either version 2 of the - License, or (at your option) any later version. + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. - This program is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA - 02110-1301, USA. */ + along with this program. If not, see . */ #include @@ -23,81 +21,111 @@ #include #include -#include -#include #include -#include -#include +#include +#include +#include +#include +#include #include #include #include -#include #include #include -#include +#include #include #include +#include +#include + +#include "minmax.h" +#include "xalloc.h" struct dataset { - /* Cases are read from proc_source, + /* Cases are read from source, + their transformation variables are initialized, pass through permanent_trns_chain (which transforms them into the format described by permanent_dict), - are written to proc_sink, + are written to sink, pass through temporary_trns_chain (which transforms them into the format described by dict), and are finally passed to the procedure. */ - struct case_source *proc_source; + struct casereader *source; + struct caseinit *caseinit; struct trns_chain *permanent_trns_chain; struct dictionary *permanent_dict; - struct case_sink *proc_sink; + struct casewriter *sink; struct trns_chain *temporary_trns_chain; struct dictionary *dict; + /* Callback which occurs whenever the transformation chain(s) have + been modified */ + transformation_change_callback_func *xform_callback; + void *xform_callback_aux; + + /* If true, cases are discarded instead of being written to + sink. */ + bool discard_output; + /* The transformation chain that the next transformation will be added to. */ struct trns_chain *cur_trns_chain; - /* The compactor used to compact a case, if necessary; + /* The case map used to compact a case, if necessary; otherwise a null pointer. */ - struct dict_compactor *compactor; + struct case_map *compactor; /* Time at which proc was last invoked. */ time_t last_proc_invocation; - /* Lag queue. */ + /* Cases just before ("lagging") the current one. */ int n_lag; /* Number of cases to lag. */ - int lag_count; /* Number of cases in lag_queue so far. */ - int lag_head; /* Index where next case will be added. */ - struct ccase *lag_queue; /* Array of n_lag ccase * elements. */ + struct deque lag; /* Deque of lagged cases. */ + struct ccase **lag_cases; /* Lagged cases managed by deque. */ /* Procedure data. */ - bool is_open; /* Procedure open? */ - struct ccase trns_case; /* Case used for transformations. */ - struct ccase sink_case; /* Case written to sink, if - compacting is necessary. */ - size_t cases_written; /* Cases output so far. */ - bool ok; + enum + { + PROC_COMMITTED, /* No procedure in progress. */ + PROC_OPEN, /* proc_open called, casereader still open. */ + PROC_CLOSED /* casereader from proc_open destroyed, + but proc_commit not yet called. */ + } + proc_state; + casenumber cases_written; /* Cases output so far. */ + bool ok; /* Error status. */ + + void (*callback) (void *); /* Callback for when the dataset changes */ + void *cb_data; + }; /* struct dataset */ static void add_case_limit_trns (struct dataset *ds); static void add_filter_trns (struct dataset *ds); -static bool internal_procedure (struct dataset *ds, case_func *, - end_func *, - void *aux); static void update_last_proc_invocation (struct dataset *ds); -static void create_trns_case (struct ccase *, struct dictionary *); -static void open_active_file (struct dataset *ds); -static void lag_case (struct dataset *ds, const struct ccase *c); -static void clear_case (const struct dataset *ds, struct ccase *c); -static bool close_active_file (struct dataset *ds); + +static void +dataset_set_unsaved (const struct dataset *ds) +{ + if (ds->callback) ds->callback (ds->cb_data); +} + /* Public functions. */ +void +dataset_set_callback (struct dataset *ds, void (*cb) (void *), void *cb_data) +{ + ds->callback = cb; + ds->cb_data = cb_data; +} + + /* Returns the last time the data was read. */ time_t -time_of_last_procedure (struct dataset *ds) +time_of_last_procedure (struct dataset *ds) { if (ds->last_proc_invocation == 0) update_last_proc_invocation (ds); @@ -106,616 +134,279 @@ time_of_last_procedure (struct dataset *ds) /* Regular procedure. */ - - -/* Reads the data from the input program and writes it to a new - active file. For each case we read from the input program, we - do the following: - - 1. Execute permanent transformations. If these drop the case, - start the next case from step 1. - - 2. Write case to replacement active file. - - 3. Execute temporary transformations. If these drop the case, - start the next case from step 1. - - 4. Pass case to PROC_FUNC, passing AUX as auxiliary data. - - Returns true if successful, false if an I/O error occurred. */ +/* Executes any pending transformations, if necessary. + This is not identical to the EXECUTE command in that it won't + always read the source data. This can be important when the + source data is given inline within BEGIN DATA...END FILE. */ bool -procedure (struct dataset *ds, case_func *cf, void *aux) +proc_execute (struct dataset *ds) { - update_last_proc_invocation (ds); + bool ok; - /* Optimize the trivial case where we're not going to do - anything with the data, by not reading the data at all. */ - if (cf == NULL - && case_source_is_class (ds->proc_source, &storage_source_class) - && ds->proc_sink == NULL - && (ds->temporary_trns_chain == NULL - || trns_chain_is_empty (ds->temporary_trns_chain)) + if ((ds->temporary_trns_chain == NULL + || trns_chain_is_empty (ds->temporary_trns_chain)) && trns_chain_is_empty (ds->permanent_trns_chain)) { ds->n_lag = 0; + ds->discard_output = false; dict_set_case_limit (ds->dict, 0); dict_clear_vectors (ds->dict); return true; } - return internal_procedure (ds, cf, NULL, aux); -} - -/* Multipass procedure. */ - -struct multipass_aux_data - { - struct casefile *casefile; - - bool (*proc_func) (const struct casefile *, void *aux); - void *aux; - }; - -/* Case processing function for multipass_procedure(). */ -static bool -multipass_case_func (const struct ccase *c, void *aux_data_, const struct dataset *ds UNUSED) -{ - struct multipass_aux_data *aux_data = aux_data_; - return casefile_append (aux_data->casefile, c); + ok = casereader_destroy (proc_open (ds)); + return proc_commit (ds) && ok; } -/* End-of-file function for multipass_procedure(). */ -static bool -multipass_end_func (void *aux_data_, const struct dataset *ds UNUSED) -{ - struct multipass_aux_data *aux_data = aux_data_; - return (aux_data->proc_func == NULL - || aux_data->proc_func (aux_data->casefile, aux_data->aux)); -} +static const struct casereader_class proc_casereader_class; -/* Procedure that allows multiple passes over the input data. - The entire active file is passed to PROC_FUNC, with the given - AUX as auxiliary data, as a unit. */ -bool -multipass_procedure (struct dataset *ds, casefile_func *proc_func, void *aux) +/* Opens dataset DS for reading cases with proc_read. + proc_commit must be called when done. */ +struct casereader * +proc_open (struct dataset *ds) { - struct multipass_aux_data aux_data; - bool ok; - - aux_data.casefile = fastfile_create (dict_get_next_value_idx (ds->dict)); - aux_data.proc_func = proc_func; - aux_data.aux = aux; - - ok = internal_procedure (ds, multipass_case_func, multipass_end_func, &aux_data); - ok = !casefile_error (aux_data.casefile) && ok; + assert (ds->source != NULL); + assert (ds->proc_state == PROC_COMMITTED); - casefile_destroy (aux_data.casefile); + update_last_proc_invocation (ds); - return ok; -} - -/* Procedure implementation. */ + caseinit_mark_for_init (ds->caseinit, ds->dict); -/* Executes a procedure. - Passes each case to CASE_FUNC. - Calls END_FUNC after the last case. - Returns true if successful, false if an I/O error occurred (or - if CASE_FUNC or END_FUNC ever returned false). */ -static bool -internal_procedure (struct dataset *ds, case_func *proc, - end_func *end, - void *aux) -{ - struct ccase *c; - bool ok = true; - - proc_open (ds); - while (ok && proc_read (ds, &c)) - if (proc != NULL) - ok = proc (c, aux, ds) && ok; - if (end != NULL) - ok = end (aux, ds) && ok; - return proc_close (ds) && ok; -} + /* Finish up the collection of transformations. */ + add_case_limit_trns (ds); + add_filter_trns (ds); + trns_chain_finalize (ds->cur_trns_chain); -/* Opens dataset DS for reading cases with proc_read. - proc_close must be called when done. */ -void -proc_open (struct dataset *ds) -{ - assert (ds->proc_source != NULL); - assert (!ds->is_open); + /* Make permanent_dict refer to the dictionary right before + data reaches the sink. */ + if (ds->permanent_dict == NULL) + ds->permanent_dict = ds->dict; - update_last_proc_invocation (ds); + /* Prepare sink. */ + if (!ds->discard_output) + { + struct dictionary *pd = ds->permanent_dict; + size_t compacted_value_cnt = dict_count_values (pd, 1u << DC_SCRATCH); + if (compacted_value_cnt < dict_get_next_value_idx (pd)) + { + struct caseproto *compacted_proto; + compacted_proto = dict_get_compacted_proto (pd, 1u << DC_SCRATCH); + ds->compactor = case_map_to_compact_dict (pd, 1u << DC_SCRATCH); + ds->sink = autopaging_writer_create (compacted_proto); + caseproto_unref (compacted_proto); + } + else + { + ds->compactor = NULL; + ds->sink = autopaging_writer_create (dict_get_proto (pd)); + } + } + else + { + ds->compactor = NULL; + ds->sink = NULL; + } - open_active_file (ds); + /* Allocate memory for lagged cases. */ + ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases); - ds->is_open = true; - create_trns_case (&ds->trns_case, ds->dict); - case_create (&ds->sink_case, dict_get_compacted_value_cnt (ds->dict)); + ds->proc_state = PROC_OPEN; ds->cases_written = 0; ds->ok = true; + + /* FIXME: use taint in dataset in place of `ok'? */ + /* FIXME: for trivial cases we can just return a clone of + ds->source? */ + return casereader_create_sequential (NULL, dict_get_proto (ds->dict), + CASENUMBER_MAX, + &proc_casereader_class, ds); } -/* Reads the next case from dataset DS, which must have been - opened for reading with proc_open. - Returns true if successful, in which case a pointer to the - case is stored in *C. - Return false at end of file or if a read error occurs. In - this case a null pointer is stored in *C. */ +/* Returns true if a procedure is in progress, that is, if + proc_open has been called but proc_commit has not. */ bool -proc_read (struct dataset *ds, struct ccase **c) +proc_is_open (const struct dataset *ds) { + return ds->proc_state != PROC_COMMITTED; +} + +/* "read" function for procedure casereader. */ +static struct ccase * +proc_casereader_read (struct casereader *reader UNUSED, void *ds_) +{ + struct dataset *ds = ds_; enum trns_result retval = TRNS_DROP_CASE; + struct ccase *c; - assert (ds->is_open); - *c = NULL; - for (;;) + assert (ds->proc_state == PROC_OPEN); + for (; ; case_unref (c)) { - size_t case_nr; + casenumber case_nr; assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR); if (retval == TRNS_ERROR) ds->ok = false; if (!ds->ok) - return false; + return NULL; - /* Read a case from proc_source. */ - clear_case (ds, &ds->trns_case); - if (!ds->proc_source->class->read (ds->proc_source, &ds->trns_case)) - return false; + /* Read a case from source. */ + c = casereader_read (ds->source); + if (c == NULL) + return NULL; + c = case_unshare_and_resize (c, dict_get_proto (ds->dict)); + caseinit_init_vars (ds->caseinit, c); /* Execute permanent transformations. */ case_nr = ds->cases_written + 1; retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE, - &ds->trns_case, &case_nr); + &c, case_nr); + caseinit_update_left_vars (ds->caseinit, c); if (retval != TRNS_CONTINUE) continue; - - /* Write case to LAG queue. */ - if (ds->n_lag) - lag_case (ds, &ds->trns_case); - /* Write case to replacement active file. */ - ds->cases_written++; - if (ds->proc_sink->class->write != NULL) + /* Write case to collection of lagged cases. */ + if (ds->n_lag > 0) { - if (ds->compactor != NULL) - { - dict_compactor_compact (ds->compactor, &ds->sink_case, - &ds->trns_case); - ds->proc_sink->class->write (ds->proc_sink, &ds->sink_case); - } - else - ds->proc_sink->class->write (ds->proc_sink, &ds->trns_case); + while (deque_count (&ds->lag) >= ds->n_lag) + case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]); + ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c); } - + + /* Write case to replacement active file. */ + ds->cases_written++; + if (ds->sink != NULL) + casewriter_write (ds->sink, + case_map_execute (ds->compactor, case_ref (c))); + /* Execute temporary transformations. */ - if (ds->temporary_trns_chain != NULL) + if (ds->temporary_trns_chain != NULL) { retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE, - &ds->trns_case, &ds->cases_written); + &c, ds->cases_written); if (retval != TRNS_CONTINUE) continue; } - *c = &ds->trns_case; - return true; + return c; } } -/* Closes dataset DS for reading. - Returns true if successful, false if an I/O error occurred - while reading or closing the data set. - If DS has not been opened, returns true without doing - anything else. */ -bool -proc_close (struct dataset *ds) -{ - if (!ds->is_open) - return true; - - /* Drain any remaining cases. */ - while (ds->ok) - { - struct ccase *c; - if (!proc_read (ds, &c)) - break; - } - - ds->ok = free_case_source (ds->proc_source) && ds->ok; - ds->proc_source = NULL; - - case_destroy (&ds->sink_case); - case_destroy (&ds->trns_case); - - ds->ok = close_active_file (ds) && ds->ok; - ds->is_open = false; - - return ds->ok; -} - -/* Updates last_proc_invocation. */ +/* "destroy" function for procedure casereader. */ static void -update_last_proc_invocation (struct dataset *ds) +proc_casereader_destroy (struct casereader *reader, void *ds_) { - ds->last_proc_invocation = time (NULL); -} - -/* Creates and returns a case, initializing it from the vectors - that say which `value's need to be initialized just once, and - which ones need to be re-initialized before every case. */ -static void -create_trns_case (struct ccase *trns_case, struct dictionary *dict) -{ - size_t var_cnt = dict_get_var_cnt (dict); - size_t i; + struct dataset *ds = ds_; + struct ccase *c; - case_create (trns_case, dict_get_next_value_idx (dict)); - for (i = 0; i < var_cnt; i++) - { - struct variable *v = dict_get_var (dict, i); - union value *value = case_data_rw (trns_case, v); + /* Make sure transformations happen for every input case, in + case they have side effects, and ensure that the replacement + active file gets all the cases it should. */ + while ((c = casereader_read (reader)) != NULL) + case_unref (c); - if (var_is_numeric (v)) - value->f = var_get_leave (v) ? 0.0 : SYSMIS; - else - memset (value->s, ' ', var_get_width (v)); - } + ds->proc_state = PROC_CLOSED; + ds->ok = casereader_destroy (ds->source) && ds->ok; + ds->source = NULL; + proc_set_active_file_data (ds, NULL); } -/* Makes all preparations for reading from the data source and writing - to the data sink. */ -static void -open_active_file (struct dataset *ds) +/* Must return false if the source casereader, a transformation, + or the sink casewriter signaled an error. (If a temporary + transformation signals an error, then the return value is + false, but the replacement active file may still be + untainted.) */ +bool +proc_commit (struct dataset *ds) { - add_case_limit_trns (ds); - add_filter_trns (ds); + assert (ds->proc_state == PROC_CLOSED); + ds->proc_state = PROC_COMMITTED; - /* Finalize transformations. */ - trns_chain_finalize (ds->cur_trns_chain); + dataset_set_unsaved (ds); - /* Make permanent_dict refer to the dictionary right before - data reaches the sink. */ - if (ds->permanent_dict == NULL) - ds->permanent_dict = ds->dict; - - /* Figure out whether to compact. */ - ds->compactor = - (dict_compacting_would_shrink (ds->permanent_dict) - ? dict_make_compactor (ds->permanent_dict) - : NULL); - - /* Prepare sink. */ - if (ds->proc_sink == NULL) - ds->proc_sink = create_case_sink (&storage_sink_class, ds->permanent_dict, NULL); - if (ds->proc_sink->class->open != NULL) - ds->proc_sink->class->open (ds->proc_sink); - - /* Allocate memory for lag queue. */ - if (ds->n_lag > 0) - { - int i; - - ds->lag_count = 0; - ds->lag_head = 0; - ds->lag_queue = xnmalloc (ds->n_lag, sizeof *ds->lag_queue); - for (i = 0; i < ds->n_lag; i++) - case_nullify (&ds->lag_queue[i]); - } -} - -/* Add C to the lag queue. */ -static void -lag_case (struct dataset *ds, const struct ccase *c) -{ - if (ds->lag_count < ds->n_lag) - ds->lag_count++; - case_destroy (&ds->lag_queue[ds->lag_head]); - case_clone (&ds->lag_queue[ds->lag_head], c); - if (++ds->lag_head >= ds->n_lag) - ds->lag_head = 0; -} - -/* Clears the variables in C that need to be cleared between - processing cases. */ -static void -clear_case (const struct dataset *ds, struct ccase *c) -{ - size_t var_cnt = dict_get_var_cnt (ds->dict); - size_t i; - - for (i = 0; i < var_cnt; i++) - { - struct variable *v = dict_get_var (ds->dict, i); - if (!var_get_leave (v)) - { - if (var_is_numeric (v)) - case_data_rw (c, v)->f = SYSMIS; - else - memset (case_data_rw (c, v)->s, ' ', var_get_width (v)); - } - } -} + /* Free memory for lagged cases. */ + while (!deque_is_empty (&ds->lag)) + case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]); + free (ds->lag_cases); -/* Closes the active file. */ -static bool -close_active_file (struct dataset *ds) -{ - /* Free memory for lag queue, and turn off lagging. */ - if (ds->n_lag > 0) - { - int i; - - for (i = 0; i < ds->n_lag; i++) - case_destroy (&ds->lag_queue[i]); - free (ds->lag_queue); - ds->n_lag = 0; - } - /* Dictionary from before TEMPORARY becomes permanent. */ proc_cancel_temporary_transformations (ds); - /* Finish compacting. */ - if (ds->compactor != NULL) + if (!ds->discard_output) { - dict_compactor_destroy (ds->compactor); - dict_compact_values (ds->dict); - ds->compactor = NULL; - } - - /* Old data sink becomes new data source. */ - if (ds->proc_sink->class->make_source != NULL) - ds->proc_source = ds->proc_sink->class->make_source (ds->proc_sink); - free_case_sink (ds->proc_sink); - ds->proc_sink = NULL; + /* Finish compacting. */ + if (ds->compactor != NULL) + { + case_map_destroy (ds->compactor); + ds->compactor = NULL; - dict_clear_vectors (ds->dict); - ds->permanent_dict = NULL; - return proc_cancel_all_transformations (ds); -} - -/* Returns a pointer to the lagged case from N_BEFORE cases before the - current one, or NULL if there haven't been that many cases yet. */ -struct ccase * -lagged_case (const struct dataset *ds, int n_before) -{ - assert (n_before >= 1 ); - assert (n_before <= ds->n_lag); + dict_delete_scratch_vars (ds->dict); + dict_compact_values (ds->dict); + } - if (n_before <= ds->lag_count) - { - int index = ds->lag_head - n_before; - if (index < 0) - index += ds->n_lag; - return &ds->lag_queue[index]; + /* Old data sink becomes new data source. */ + if (ds->sink != NULL) + ds->source = casewriter_make_reader (ds->sink); } else - return NULL; -} - -/* Procedure that separates the data into SPLIT FILE groups. */ - -/* Represents auxiliary data for handling SPLIT FILE. */ -struct split_aux_data - { - struct dataset *dataset; /* The dataset */ - struct ccase prev_case; /* Data in previous case. */ - - /* Callback functions. */ - begin_func *begin; - case_func *proc; - end_func *end; - void *func_aux; - }; - -static int equal_splits (const struct ccase *, const struct ccase *, const struct dataset *ds); -static bool split_procedure_case_func (const struct ccase *c, void *, const struct dataset *); -static bool split_procedure_end_func (void *, const struct dataset *); - -/* Like procedure(), but it automatically breaks the case stream - into SPLIT FILE break groups. Before each group of cases with - identical SPLIT FILE variable values, BEGIN_FUNC is called - with the first case in the group. - Then PROC_FUNC is called for each case in the group (including - the first). - END_FUNC is called when the group is finished. FUNC_AUX is - passed to each of the functions as auxiliary data. - - If the active file is empty, none of BEGIN_FUNC, PROC_FUNC, - and END_FUNC will be called at all. - - If SPLIT FILE is not in effect, then there is one break group - (if the active file is nonempty), and BEGIN_FUNC and END_FUNC - will be called once. - - Returns true if successful, false if an I/O error occurred. */ -bool -procedure_with_splits (struct dataset *ds, - begin_func begin, - case_func *proc, - end_func *end, - void *func_aux) -{ - struct split_aux_data split_aux; - bool ok; - - case_nullify (&split_aux.prev_case); - split_aux.begin = begin; - split_aux.proc = proc; - split_aux.end = end; - split_aux.func_aux = func_aux; - split_aux.dataset = ds; - - ok = internal_procedure (ds, split_procedure_case_func, - split_procedure_end_func, &split_aux); - - case_destroy (&split_aux.prev_case); - - return ok; -} - -/* Case callback used by procedure_with_splits(). */ -static bool -split_procedure_case_func (const struct ccase *c, void *split_aux_, const struct dataset *ds) -{ - struct split_aux_data *split_aux = split_aux_; - - /* Start a new series if needed. */ - if (case_is_null (&split_aux->prev_case) - || !equal_splits (c, &split_aux->prev_case, split_aux->dataset)) { - if (!case_is_null (&split_aux->prev_case) && split_aux->end != NULL) - split_aux->end (split_aux->func_aux, ds); - - case_destroy (&split_aux->prev_case); - case_clone (&split_aux->prev_case, c); - - if (split_aux->begin != NULL) - split_aux->begin (&split_aux->prev_case, split_aux->func_aux, ds); + ds->source = NULL; + ds->discard_output = false; } + ds->sink = NULL; - return (split_aux->proc == NULL - || split_aux->proc (c, split_aux->func_aux, ds)); -} - -/* End-of-file callback used by procedure_with_splits(). */ -static bool -split_procedure_end_func (void *split_aux_, const struct dataset *ds) -{ - struct split_aux_data *split_aux = split_aux_; + caseinit_clear (ds->caseinit); + caseinit_mark_as_preinited (ds->caseinit, ds->dict); - if (!case_is_null (&split_aux->prev_case) && split_aux->end != NULL) - split_aux->end (split_aux->func_aux, ds); - return true; + dict_clear_vectors (ds->dict); + ds->permanent_dict = NULL; + return proc_cancel_all_transformations (ds) && ds->ok; } -/* Compares the SPLIT FILE variables in cases A and B and returns - nonzero only if they differ. */ -static int -equal_splits (const struct ccase *a, const struct ccase *b, - const struct dataset *ds) -{ - return case_compare (a, b, - dict_get_split_vars (ds->dict), - dict_get_split_cnt (ds->dict)) == 0; -} - -/* Multipass procedure that separates the data into SPLIT FILE - groups. */ - -/* Represents auxiliary data for handling SPLIT FILE in a - multipass procedure. */ -struct multipass_split_aux_data +/* Casereader class for procedure execution. */ +static const struct casereader_class proc_casereader_class = { - struct dataset *dataset; /* The dataset of the split */ - struct ccase prev_case; /* Data in previous case. */ - struct casefile *casefile; /* Accumulates data for a split. */ - split_func *split; /* Function to call with the accumulated - data. */ - void *func_aux; /* Auxiliary data. */ + proc_casereader_read, + proc_casereader_destroy, + NULL, + NULL, }; -static bool multipass_split_case_func (const struct ccase *c, void *aux_, const struct dataset *); -static bool multipass_split_end_func (void *aux_, const struct dataset *ds); -static bool multipass_split_output (struct multipass_split_aux_data *, const struct dataset *ds); - -/* Returns true if successful, false if an I/O error occurred. */ -bool -multipass_procedure_with_splits (struct dataset *ds, - split_func *split, - void *func_aux) -{ - struct multipass_split_aux_data aux; - bool ok; - - case_nullify (&aux.prev_case); - aux.casefile = NULL; - aux.split = split; - aux.func_aux = func_aux; - aux.dataset = ds; - - ok = internal_procedure (ds, multipass_split_case_func, - multipass_split_end_func, &aux); - case_destroy (&aux.prev_case); - - return ok; -} - -/* Case callback used by multipass_procedure_with_splits(). */ -static bool -multipass_split_case_func (const struct ccase *c, void *aux_, const struct dataset *ds) -{ - struct multipass_split_aux_data *aux = aux_; - bool ok = true; - - /* Start a new series if needed. */ - if (aux->casefile == NULL || ! equal_splits (c, &aux->prev_case, ds)) - { - /* Record split values. */ - case_destroy (&aux->prev_case); - case_clone (&aux->prev_case, c); - - /* Pass any cases to split_func. */ - if (aux->casefile != NULL) - ok = multipass_split_output (aux, ds); - - /* Start a new casefile. */ - aux->casefile = - fastfile_create (dict_get_next_value_idx (ds->dict)); - } - - return casefile_append (aux->casefile, c) && ok; -} - -/* End-of-file callback used by multipass_procedure_with_splits(). */ -static bool -multipass_split_end_func (void *aux_, const struct dataset *ds) -{ - struct multipass_split_aux_data *aux = aux_; - return (aux->casefile == NULL || multipass_split_output (aux, ds)); -} - -static bool -multipass_split_output (struct multipass_split_aux_data *aux, const struct dataset *ds) +/* Updates last_proc_invocation. */ +static void +update_last_proc_invocation (struct dataset *ds) { - bool ok; - - assert (aux->casefile != NULL); - ok = aux->split (&aux->prev_case, aux->casefile, aux->func_aux, ds); - casefile_destroy (aux->casefile); - aux->casefile = NULL; - - return ok; + ds->last_proc_invocation = time (NULL); } -/* Discards all the current state in preparation for a data-input - command like DATA LIST or GET. */ -void -discard_variables (struct dataset *ds) +/* Returns a pointer to the lagged case from N_BEFORE cases before the + current one, or NULL if there haven't been that many cases yet. */ +const struct ccase * +lagged_case (const struct dataset *ds, int n_before) { - dict_clear (ds->dict); - fh_set_default_handle (NULL); - - ds->n_lag = 0; - - free_case_source (ds->proc_source); - ds->proc_source = NULL; + assert (n_before >= 1); + assert (n_before <= ds->n_lag); - proc_cancel_all_transformations (ds); + if (n_before <= deque_count (&ds->lag)) + return ds->lag_cases[deque_front (&ds->lag, n_before - 1)]; + else + return NULL; } /* Returns the current set of permanent transformations, and clears the permanent transformations. For use by INPUT PROGRAM. */ struct trns_chain * -proc_capture_transformations (struct dataset *ds) +proc_capture_transformations (struct dataset *ds) { struct trns_chain *chain; - + assert (ds->temporary_trns_chain == NULL); chain = ds->permanent_trns_chain; ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create (); + + if ( ds->xform_callback) + ds->xform_callback (false, ds->xform_callback_aux); + return chain; } @@ -726,6 +417,8 @@ void add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux) { trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux); + if ( ds->xform_callback) + ds->xform_callback (true, ds->xform_callback_aux); } /* Adds a transformation that processes a case with PROC and @@ -734,19 +427,22 @@ add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *fr FINALIZE will be called. The functions are passed AUX as auxiliary data. */ void -add_transformation_with_finalizer (struct dataset *ds, +add_transformation_with_finalizer (struct dataset *ds, trns_finalize_func *finalize, trns_proc_func *proc, trns_free_func *free, void *aux) { trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux); + + if ( ds->xform_callback) + ds->xform_callback (true, ds->xform_callback_aux); } /* Returns the index of the next transformation. This value can be returned by a transformation procedure function to indicate a "jump" to that transformation. */ size_t -next_transformation (const struct dataset *ds) +next_transformation (const struct dataset *ds) { return trns_chain_next (ds->cur_trns_chain); } @@ -755,7 +451,7 @@ next_transformation (const struct dataset *ds) a temporary transformation, false if it will add a permanent transformation. */ bool -proc_in_temporary_transformations (const struct dataset *ds) +proc_in_temporary_transformations (const struct dataset *ds) { return ds->temporary_trns_chain != NULL; } @@ -764,15 +460,19 @@ proc_in_temporary_transformations (const struct dataset *ds) Further calls to add_transformation() will add temporary transformations. */ void -proc_start_temporary_transformations (struct dataset *ds) +proc_start_temporary_transformations (struct dataset *ds) { if (!proc_in_temporary_transformations (ds)) { add_case_limit_trns (ds); ds->permanent_dict = dict_clone (ds->dict); + trns_chain_finalize (ds->permanent_trns_chain); ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create (); + + if ( ds->xform_callback) + ds->xform_callback (true, ds->xform_callback_aux); } } @@ -781,9 +481,9 @@ proc_start_temporary_transformations (struct dataset *ds) permanent. Returns true if anything changed, false otherwise. */ bool -proc_make_temporary_transformations_permanent (struct dataset *ds) +proc_make_temporary_transformations_permanent (struct dataset *ds) { - if (proc_in_temporary_transformations (ds)) + if (proc_in_temporary_transformations (ds)) { trns_chain_finalize (ds->temporary_trns_chain); trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain); @@ -802,9 +502,9 @@ proc_make_temporary_transformations_permanent (struct dataset *ds) transformations will be permanent. Returns true if anything changed, false otherwise. */ bool -proc_cancel_temporary_transformations (struct dataset *ds) +proc_cancel_temporary_transformations (struct dataset *ds) { - if (proc_in_temporary_transformations (ds)) + if (proc_in_temporary_transformations (ds)) { dict_destroy (ds->dict); ds->dict = ds->permanent_dict; @@ -813,6 +513,10 @@ proc_cancel_temporary_transformations (struct dataset *ds) trns_chain_destroy (ds->temporary_trns_chain); ds->temporary_trns_chain = NULL; + if ( ds->xform_callback) + ds->xform_callback (!trns_chain_is_empty (ds->permanent_trns_chain), + ds->xform_callback_aux); + return true; } else @@ -825,79 +529,164 @@ bool proc_cancel_all_transformations (struct dataset *ds) { bool ok; + assert (ds->proc_state == PROC_COMMITTED); ok = trns_chain_destroy (ds->permanent_trns_chain); ok = trns_chain_destroy (ds->temporary_trns_chain) && ok; ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create (); ds->temporary_trns_chain = NULL; + if ( ds->xform_callback) + ds->xform_callback (false, ds->xform_callback_aux); + return ok; } + +static void +dict_callback (struct dictionary *d UNUSED, void *ds_) +{ + struct dataset *ds = ds_; + dataset_set_unsaved (ds); +} + /* Initializes procedure handling. */ struct dataset * create_dataset (void) { struct dataset *ds = xzalloc (sizeof(*ds)); ds->dict = dict_create (); + + dict_set_change_callback (ds->dict, dict_callback, ds); + + dict_set_encoding (ds->dict, get_default_encoding ()); + + ds->caseinit = caseinit_create (); proc_cancel_all_transformations (ds); return ds; } + +void +dataset_add_transform_change_callback (struct dataset *ds, + transformation_change_callback_func *cb, + void *aux) +{ + ds->xform_callback = cb; + ds->xform_callback_aux = aux; +} + /* Finishes up procedure handling. */ void destroy_dataset (struct dataset *ds) { - discard_variables (ds); + proc_discard_active_file (ds); dict_destroy (ds->dict); + caseinit_destroy (ds->caseinit); trns_chain_destroy (ds->permanent_trns_chain); + + if ( ds->xform_callback) + ds->xform_callback (false, ds->xform_callback_aux); free (ds); } -/* Sets SINK as the destination for procedure output from the - next procedure. */ +/* Causes output from the next procedure to be discarded, instead + of being preserved for use as input for the next procedure. */ void -proc_set_sink (struct dataset *ds, struct case_sink *sink) +proc_discard_output (struct dataset *ds) { - assert (ds->proc_sink == NULL); - ds->proc_sink = sink; + ds->discard_output = true; +} + +/* Discards the active file dictionary, data, and + transformations. */ +void +proc_discard_active_file (struct dataset *ds) +{ + assert (ds->proc_state == PROC_COMMITTED); + + dict_clear (ds->dict); + fh_set_default_handle (NULL); + + ds->n_lag = 0; + + casereader_destroy (ds->source); + ds->source = NULL; + + proc_cancel_all_transformations (ds); } /* Sets SOURCE as the source for procedure input for the next procedure. */ void -proc_set_source (struct dataset *ds, struct case_source *source) +proc_set_active_file (struct dataset *ds, + struct casereader *source, + struct dictionary *dict) { - assert (ds->proc_source == NULL); - ds->proc_source = source; + assert (ds->proc_state == PROC_COMMITTED); + assert (ds->dict != dict); + + proc_discard_active_file (ds); + + dict_destroy (ds->dict); + ds->dict = dict; + dict_set_change_callback (ds->dict, dict_callback, ds); + + proc_set_active_file_data (ds, source); } -/* Returns true if a source for the next procedure has been - configured, false otherwise. */ +/* Replaces the active file's data by READER without replacing + the associated dictionary. */ bool -proc_has_source (const struct dataset *ds) +proc_set_active_file_data (struct dataset *ds, struct casereader *reader) { - return ds->proc_source != NULL; + casereader_destroy (ds->source); + ds->source = reader; + + caseinit_clear (ds->caseinit); + caseinit_mark_as_preinited (ds->caseinit, ds->dict); + + return reader == NULL || !casereader_error (reader); } -/* Returns the output from the previous procedure. - For use only immediately after executing a procedure. - The returned casefile is owned by the caller; it will not be - automatically used for the next procedure's input. */ -struct casefile * -proc_capture_output (struct dataset *ds) +/* Returns true if an active file data source is available, false + otherwise. */ +bool +proc_has_active_file (const struct dataset *ds) { - struct casefile *casefile; + return ds->source != NULL; +} - /* Try to make sure that this function is called immediately - after procedure() or a similar function. */ - assert (ds->proc_source != NULL); - assert (case_source_is_class (ds->proc_source, &storage_source_class)); - assert (trns_chain_is_empty (ds->permanent_trns_chain)); - assert (!proc_in_temporary_transformations (ds)); +/* Returns the active file data source from DS, or a null pointer + if DS has no data source, and removes it from DS. */ +struct casereader * +proc_extract_active_file_data (struct dataset *ds) +{ + struct casereader *reader = ds->source; + ds->source = NULL; - casefile = storage_source_decapsulate (ds->proc_source); - ds->proc_source = NULL; + return reader; +} - return casefile; +/* Checks whether DS has a corrupted active file. If so, + discards it and returns false. If not, returns true without + doing anything. */ +bool +dataset_end_of_command (struct dataset *ds) +{ + if (ds->source != NULL) + { + if (casereader_error (ds->source)) + { + proc_discard_active_file (ds); + return false; + } + else + { + const struct taint *taint = casereader_get_taint (ds->source); + taint_reset_successor_taint ((struct taint *) taint); + assert (!taint_has_tainted_successor (taint)); + } + } + return true; } static trns_proc_func case_limit_trns_proc; @@ -906,12 +695,12 @@ static trns_free_func case_limit_trns_free; /* Adds a transformation that limits the number of cases that may pass through, if DS->DICT has a case limit. */ static void -add_case_limit_trns (struct dataset *ds) +add_case_limit_trns (struct dataset *ds) { - size_t case_limit = dict_get_case_limit (ds->dict); + casenumber case_limit = dict_get_case_limit (ds->dict); if (case_limit != 0) { - size_t *cases_remaining = xmalloc (sizeof *cases_remaining); + casenumber *cases_remaining = xmalloc (sizeof *cases_remaining); *cases_remaining = case_limit; add_transformation (ds, case_limit_trns_proc, case_limit_trns_free, cases_remaining); @@ -923,10 +712,10 @@ add_case_limit_trns (struct dataset *ds) *CASES_REMAINING. */ static int case_limit_trns_proc (void *cases_remaining_, - struct ccase *c UNUSED, casenumber case_nr UNUSED) + struct ccase **c UNUSED, casenumber case_nr UNUSED) { size_t *cases_remaining = cases_remaining_; - if (*cases_remaining > 0) + if (*cases_remaining > 0) { (*cases_remaining)--; return TRNS_CONTINUE; @@ -937,7 +726,7 @@ case_limit_trns_proc (void *cases_remaining_, /* Frees the data associated with a case limit transformation. */ static bool -case_limit_trns_free (void *cases_remaining_) +case_limit_trns_free (void *cases_remaining_) { size_t *cases_remaining = cases_remaining_; free (cases_remaining); @@ -949,10 +738,10 @@ static trns_proc_func filter_trns_proc; /* Adds a temporary transformation to filter data according to the variable specified on FILTER, if any. */ static void -add_filter_trns (struct dataset *ds) +add_filter_trns (struct dataset *ds) { struct variable *filter_var = dict_get_filter (ds->dict); - if (filter_var != NULL) + if (filter_var != NULL) { proc_start_temporary_transformations (ds); add_transformation (ds, filter_trns_proc, NULL, filter_var); @@ -962,12 +751,12 @@ add_filter_trns (struct dataset *ds) /* FILTER transformation. */ static int filter_trns_proc (void *filter_var_, - struct ccase *c UNUSED, casenumber case_nr UNUSED) - + struct ccase **c UNUSED, casenumber case_nr UNUSED) + { struct variable *filter_var = filter_var_; - double f = case_num (c, filter_var); - return (f != 0.0 && !var_is_num_missing (filter_var, f) + double f = case_num (*c, filter_var); + return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY) ? TRNS_CONTINUE : TRNS_DROP_CASE); } @@ -978,23 +767,14 @@ dataset_dict (const struct dataset *ds) return ds->dict; } - -void -dataset_set_dict (struct dataset *ds, struct dictionary *dict) +const struct casereader * +dataset_source (const struct dataset *ds) { - ds->dict = dict; + return ds->source; } -int -dataset_n_lag (const struct dataset *ds) -{ - return ds->n_lag; -} - -void -dataset_set_n_lag (struct dataset *ds, int n_lag) +void +dataset_need_lag (struct dataset *ds, int n_before) { - ds->n_lag = n_lag; + ds->n_lag = MAX (ds->n_lag, n_before); } - -