X-Git-Url: https://pintos-os.org/cgi-bin/gitweb.cgi?a=blobdiff_plain;f=src%2Fdata%2Fprocedure.c;h=46a18bb463297fdf735a6e30a2d2207c0dc45e9a;hb=92c09e564002d356d20fc1e2e131027ef89f6748;hp=7a9b432132c09158ce849147801f4a17bc4f64a4;hpb=661b9ee29cbb6b89c4ea53a05b8a4eb3a5028de0;p=pspp-builds.git diff --git a/src/data/procedure.c b/src/data/procedure.c index 7a9b4321..46a18bb4 100644 --- a/src/data/procedure.c +++ b/src/data/procedure.c @@ -23,48 +23,50 @@ #include #include -#include -#include #include -#include -#include +#include +#include +#include +#include #include #include #include -#include #include #include #include #include #include #include +#include struct dataset { - - /* An abstract factory which creates casefiles */ - struct casefile_factory *cf_factory; - - /* Callback which occurs when a procedure provides a new source for - the dataset */ - replace_source_callback *replace_source ; - - /* Callback which occurs whenever the DICT is replaced by a new one */ - replace_dictionary_callback *replace_dict; - - /* Cases are read from proc_source, + /* Cases are read from source, + their transformation variables are initialized, pass through permanent_trns_chain (which transforms them into the format described by permanent_dict), - are written to proc_sink, + are written to sink, pass through temporary_trns_chain (which transforms them into the format described by dict), and are finally passed to the procedure. */ - struct case_source *proc_source; + struct casereader *source; + struct caseinit *caseinit; struct trns_chain *permanent_trns_chain; struct dictionary *permanent_dict; - struct case_sink *proc_sink; + struct casewriter *sink; struct trns_chain *temporary_trns_chain; struct dictionary *dict; + /* Callback which occurs when a procedure provides a new source for + the dataset */ + replace_source_callback *replace_source ; + + /* Callback which occurs whenever the DICT is replaced by a new one */ + replace_dictionary_callback *replace_dict; + + /* If true, cases are discarded instead of being written to + sink. */ + bool discard_output; + /* The transformation chain that the next transformation will be added to. */ struct trns_chain *cur_trns_chain; @@ -82,26 +84,22 @@ struct dataset { struct ccase *lag_cases; /* Lagged cases managed by deque. */ /* Procedure data. */ - bool is_open; /* Procedure open? */ - struct ccase trns_case; /* Case used for transformations. */ - struct ccase sink_case; /* Case written to sink, if - compacting is necessary. */ + enum + { + PROC_COMMITTED, + PROC_OPEN, + PROC_CLOSED + } + proc_state; size_t cases_written; /* Cases output so far. */ - bool ok; + bool ok; /* Error status. */ }; /* struct dataset */ static void add_case_limit_trns (struct dataset *ds); static void add_filter_trns (struct dataset *ds); -static bool internal_procedure (struct dataset *ds, case_func *, - end_func *, - void *aux); static void update_last_proc_invocation (struct dataset *ds); -static void create_trns_case (struct ccase *, struct dictionary *); -static void open_active_file (struct dataset *ds); -static void clear_case (const struct dataset *ds, struct ccase *c); -static bool close_active_file (struct dataset *ds); /* Public functions. */ @@ -116,146 +114,89 @@ time_of_last_procedure (struct dataset *ds) /* Regular procedure. */ - - -/* Reads the data from the input program and writes it to a new - active file. For each case we read from the input program, we - do the following: - - 1. Execute permanent transformations. If these drop the case, - start the next case from step 1. - - 2. Write case to replacement active file. - - 3. Execute temporary transformations. If these drop the case, - start the next case from step 1. - - 4. Pass case to PROC_FUNC, passing AUX as auxiliary data. - - Returns true if successful, false if an I/O error occurred. */ +/* Executes any pending transformations, if necessary. + This is not identical to the EXECUTE command in that it won't + always read the source data. This can be important when the + source data is given inline within BEGIN DATA...END FILE. */ bool -procedure (struct dataset *ds, case_func *cf, void *aux) +proc_execute (struct dataset *ds) { - update_last_proc_invocation (ds); + bool ok; - /* Optimize the trivial case where we're not going to do - anything with the data, by not reading the data at all. */ - if (cf == NULL - && case_source_is_class (ds->proc_source, &storage_source_class) - && ds->proc_sink == NULL - && (ds->temporary_trns_chain == NULL - || trns_chain_is_empty (ds->temporary_trns_chain)) + if ((ds->temporary_trns_chain == NULL + || trns_chain_is_empty (ds->temporary_trns_chain)) && trns_chain_is_empty (ds->permanent_trns_chain)) { ds->n_lag = 0; + ds->discard_output = false; dict_set_case_limit (ds->dict, 0); dict_clear_vectors (ds->dict); return true; } - return internal_procedure (ds, cf, NULL, aux); + ok = casereader_destroy (proc_open (ds)); + return proc_commit (ds) && ok; } - -/* Multipass procedure. */ -struct multipass_aux_data - { - struct casefile *casefile; +static struct casereader_class proc_casereader_class; - bool (*proc_func) (const struct casefile *, void *aux); - void *aux; - }; - -/* Case processing function for multipass_procedure(). */ -static bool -multipass_case_func (const struct ccase *c, void *aux_data_, const struct dataset *ds UNUSED) -{ - struct multipass_aux_data *aux_data = aux_data_; - return casefile_append (aux_data->casefile, c); -} - -/* End-of-file function for multipass_procedure(). */ -static bool -multipass_end_func (void *aux_data_, const struct dataset *ds UNUSED) -{ - struct multipass_aux_data *aux_data = aux_data_; - return (aux_data->proc_func == NULL - || aux_data->proc_func (aux_data->casefile, aux_data->aux)); -} - -/* Procedure that allows multiple passes over the input data. - The entire active file is passed to PROC_FUNC, with the given - AUX as auxiliary data, as a unit. */ -bool -multipass_procedure (struct dataset *ds, casefile_func *proc_func, void *aux) +/* Opens dataset DS for reading cases with proc_read. + proc_commit must be called when done. */ +struct casereader * +proc_open (struct dataset *ds) { - struct multipass_aux_data aux_data; - bool ok; + assert (ds->source != NULL); + assert (ds->proc_state == PROC_COMMITTED); - aux_data.casefile = - ds->cf_factory->create_casefile (ds->cf_factory, - dict_get_next_value_idx (ds->dict)); - - aux_data.proc_func = proc_func; - aux_data.aux = aux; - - ok = internal_procedure (ds, multipass_case_func, multipass_end_func, &aux_data); - ok = !casefile_error (aux_data.casefile) && ok; - - casefile_destroy (aux_data.casefile); - - return ok; -} - + update_last_proc_invocation (ds); -/* Procedure implementation. */ + caseinit_mark_for_init (ds->caseinit, ds->dict); -/* Executes a procedure. - Passes each case to CASE_FUNC. - Calls END_FUNC after the last case. - Returns true if successful, false if an I/O error occurred (or - if CASE_FUNC or END_FUNC ever returned false). */ -static bool -internal_procedure (struct dataset *ds, case_func *proc, - end_func *end, - void *aux) -{ - struct ccase *c; - bool ok = true; + /* Finish up the collection of transformations. */ + add_case_limit_trns (ds); + add_filter_trns (ds); + trns_chain_finalize (ds->cur_trns_chain); - proc_open (ds); - while (ok && proc_read (ds, &c)) - if (proc != NULL) - ok = proc (c, aux, ds) && ok; - if (end != NULL) - ok = end (aux, ds) && ok; + /* Make permanent_dict refer to the dictionary right before + data reaches the sink. */ + if (ds->permanent_dict == NULL) + ds->permanent_dict = ds->dict; - if ( proc_close (ds) && ok ) + /* Prepare sink. */ + if (!ds->discard_output) { - - return true; + ds->compactor = (dict_compacting_would_shrink (ds->permanent_dict) + ? dict_make_compactor (ds->permanent_dict) + : NULL); + ds->sink = autopaging_writer_create (dict_get_compacted_value_cnt ( + ds->permanent_dict)); + } + else + { + ds->compactor = NULL; + ds->sink = NULL; } - return false; -} - -/* Opens dataset DS for reading cases with proc_read. - proc_close must be called when done. */ -void -proc_open (struct dataset *ds) -{ - assert (ds->proc_source != NULL); - assert (!ds->is_open); - - update_last_proc_invocation (ds); - - open_active_file (ds); + /* Allocate memory for lagged cases. */ + ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases); - ds->is_open = true; - create_trns_case (&ds->trns_case, ds->dict); - case_create (&ds->sink_case, dict_get_compacted_value_cnt (ds->dict)); + ds->proc_state = PROC_OPEN; ds->cases_written = 0; ds->ok = true; + + /* FIXME: use taint in dataset in place of `ok'? */ + /* FIXME: for trivial cases we can just return a clone of + ds->source? */ + return casereader_create_sequential (NULL, + dict_get_next_value_idx (ds->dict), + CASENUMBER_MAX, + &proc_casereader_class, ds); +} + +bool +proc_is_open (const struct dataset *ds) +{ + return ds->proc_state != PROC_COMMITTED; } /* Reads the next case from dataset DS, which must have been @@ -264,14 +205,15 @@ proc_open (struct dataset *ds) case is stored in *C. Return false at end of file or if a read error occurs. In this case a null pointer is stored in *C. */ -bool -proc_read (struct dataset *ds, struct ccase **c) +static bool +proc_casereader_read (struct casereader *reader UNUSED, void *ds_, + struct ccase *c) { + struct dataset *ds = ds_; enum trns_result retval = TRNS_DROP_CASE; - assert (ds->is_open); - *c = NULL; - for (;;) + assert (ds->proc_state == PROC_OPEN); + for (;;) { size_t case_nr; @@ -281,51 +223,59 @@ proc_read (struct dataset *ds, struct ccase **c) if (!ds->ok) return false; - /* Read a case from proc_source. */ - clear_case (ds, &ds->trns_case); - if (!ds->proc_source->class->read (ds->proc_source, &ds->trns_case)) + /* Read a case from source. */ + if (!casereader_read (ds->source, c)) return false; + case_resize (c, dict_get_next_value_idx (ds->dict)); + caseinit_init_reinit_vars (ds->caseinit, c); + caseinit_init_left_vars (ds->caseinit, c); /* Execute permanent transformations. */ case_nr = ds->cases_written + 1; retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE, - &ds->trns_case, &case_nr); - if (retval != TRNS_CONTINUE) - continue; - + c, &case_nr); + caseinit_update_left_vars (ds->caseinit, c); + if (retval != TRNS_CONTINUE) + { + case_destroy (c); + continue; + } + /* Write case to collection of lagged cases. */ if (ds->n_lag > 0) { while (deque_count (&ds->lag) >= ds->n_lag) case_destroy (&ds->lag_cases[deque_pop_back (&ds->lag)]); - case_clone (&ds->lag_cases[deque_push_front (&ds->lag)], - &ds->trns_case); + case_clone (&ds->lag_cases[deque_push_front (&ds->lag)], c); } /* Write case to replacement active file. */ ds->cases_written++; - if (ds->proc_sink->class->write != NULL) + if (ds->sink != NULL) { - if (ds->compactor != NULL) + struct ccase tmp; + if (ds->compactor != NULL) { - dict_compactor_compact (ds->compactor, &ds->sink_case, - &ds->trns_case); - ds->proc_sink->class->write (ds->proc_sink, &ds->sink_case); + case_create (&tmp, dict_get_compacted_value_cnt (ds->dict)); + dict_compactor_compact (ds->compactor, &tmp, c); } else - ds->proc_sink->class->write (ds->proc_sink, &ds->trns_case); + case_clone (&tmp, c); + casewriter_write (ds->sink, &tmp); } /* Execute temporary transformations. */ if (ds->temporary_trns_chain != NULL) { retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE, - &ds->trns_case, &ds->cases_written); + c, &ds->cases_written); if (retval != TRNS_CONTINUE) - continue; + { + case_destroy (c); + continue; + } } - *c = &ds->trns_case; return true; } } @@ -335,120 +285,35 @@ proc_read (struct dataset *ds, struct ccase **c) while reading or closing the data set. If DS has not been opened, returns true without doing anything else. */ -bool -proc_close (struct dataset *ds) -{ - if (!ds->is_open) - return true; - - /* Drain any remaining cases. */ - while (ds->ok) - { - struct ccase *c; - if (!proc_read (ds, &c)) - break; - } - ds->ok = free_case_source (ds->proc_source) && ds->ok; - proc_set_source (ds, NULL); - - case_destroy (&ds->sink_case); - case_destroy (&ds->trns_case); - - ds->ok = close_active_file (ds) && ds->ok; - ds->is_open = false; - - return ds->ok; -} - -/* Updates last_proc_invocation. */ -static void -update_last_proc_invocation (struct dataset *ds) -{ - ds->last_proc_invocation = time (NULL); -} - -/* Creates and returns a case, initializing it from the vectors - that say which `value's need to be initialized just once, and - which ones need to be re-initialized before every case. */ static void -create_trns_case (struct ccase *trns_case, struct dictionary *dict) +proc_casereader_destroy (struct casereader *reader, void *ds_) { - size_t var_cnt = dict_get_var_cnt (dict); - size_t i; + struct dataset *ds = ds_; + struct ccase c; - case_create (trns_case, dict_get_next_value_idx (dict)); - for (i = 0; i < var_cnt; i++) - { - struct variable *v = dict_get_var (dict, i); - union value *value = case_data_rw (trns_case, v); + /* Make sure transformations happen for every input case, in + case they have side effects, and ensure that the replacement + active file gets all the cases it should. */ + while (casereader_read (reader, &c)) + case_destroy (&c); - if (var_is_numeric (v)) - value->f = var_get_leave (v) ? 0.0 : SYSMIS; - else - memset (value->s, ' ', var_get_width (v)); - } + ds->proc_state = PROC_CLOSED; + ds->ok = casereader_destroy (ds->source) && ds->ok; + ds->source = NULL; + proc_set_active_file_data (ds, NULL); } -/* Makes all preparations for reading from the data source and writing - to the data sink. */ -static void -open_active_file (struct dataset *ds) -{ - add_case_limit_trns (ds); - add_filter_trns (ds); - - /* Finalize transformations. */ - trns_chain_finalize (ds->cur_trns_chain); - - /* Make permanent_dict refer to the dictionary right before - data reaches the sink. */ - if (ds->permanent_dict == NULL) - ds->permanent_dict = ds->dict; - - /* Figure out whether to compact. */ - ds->compactor = - (dict_compacting_would_shrink (ds->permanent_dict) - ? dict_make_compactor (ds->permanent_dict) - : NULL); - - /* Prepare sink. */ - if (ds->proc_sink == NULL) - ds->proc_sink = create_case_sink (&storage_sink_class, - ds->permanent_dict, - ds->cf_factory, - NULL); - if (ds->proc_sink->class->open != NULL) - ds->proc_sink->class->open (ds->proc_sink); - - /* Allocate memory for lagged cases. */ - ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases); -} - -/* Clears the variables in C that need to be cleared between - processing cases. */ -static void -clear_case (const struct dataset *ds, struct ccase *c) +/* Must return false if the source casereader, a transformation, + or the sink casewriter signaled an error. (If a temporary + transformation signals an error, then the return value is + false, but the replacement active file may still be + untainted.) */ +bool +proc_commit (struct dataset *ds) { - size_t var_cnt = dict_get_var_cnt (ds->dict); - size_t i; + assert (ds->proc_state == PROC_CLOSED); + ds->proc_state = PROC_COMMITTED; - for (i = 0; i < var_cnt; i++) - { - struct variable *v = dict_get_var (ds->dict, i); - if (!var_get_leave (v)) - { - if (var_is_numeric (v)) - case_data_rw (c, v)->f = SYSMIS; - else - memset (case_data_rw (c, v)->s, ' ', var_get_width (v)); - } - } -} - -/* Closes the active file. */ -static bool -close_active_file (struct dataset *ds) -{ /* Free memory for lagged cases. */ while (!deque_is_empty (&ds->lag)) case_destroy (&ds->lag_cases[deque_pop_back (&ds->lag)]); @@ -457,23 +322,49 @@ close_active_file (struct dataset *ds) /* Dictionary from before TEMPORARY becomes permanent. */ proc_cancel_temporary_transformations (ds); - /* Finish compacting. */ - if (ds->compactor != NULL) + if (!ds->discard_output) { - dict_compactor_destroy (ds->compactor); - dict_compact_values (ds->dict); - ds->compactor = NULL; + /* Finish compacting. */ + if (ds->compactor != NULL) + { + dict_compactor_destroy (ds->compactor); + dict_compact_values (ds->dict); + ds->compactor = NULL; + } + + /* Old data sink becomes new data source. */ + if (ds->sink != NULL) + ds->source = casewriter_make_reader (ds->sink); } + else + { + ds->source = NULL; + ds->discard_output = false; + } + ds->sink = NULL; + if ( ds->replace_source) ds->replace_source (ds->source); - /* Old data sink becomes new data source. */ - if (ds->proc_sink->class->make_source != NULL) - proc_set_source (ds, ds->proc_sink->class->make_source (ds->proc_sink) ); - free_case_sink (ds->proc_sink); - ds->proc_sink = NULL; + caseinit_clear (ds->caseinit); + caseinit_mark_as_preinited (ds->caseinit, ds->dict); dict_clear_vectors (ds->dict); ds->permanent_dict = NULL; - return proc_cancel_all_transformations (ds); + return proc_cancel_all_transformations (ds) && ds->ok; +} + +static struct casereader_class proc_casereader_class = + { + proc_casereader_read, + proc_casereader_destroy, + NULL, + NULL, + }; + +/* Updates last_proc_invocation. */ +static void +update_last_proc_invocation (struct dataset *ds) +{ + ds->last_proc_invocation = time (NULL); } /* Returns a pointer to the lagged case from N_BEFORE cases before the @@ -490,218 +381,6 @@ lagged_case (const struct dataset *ds, int n_before) return NULL; } -/* Procedure that separates the data into SPLIT FILE groups. */ - -/* Represents auxiliary data for handling SPLIT FILE. */ -struct split_aux_data - { - struct dataset *dataset; /* The dataset */ - struct ccase prev_case; /* Data in previous case. */ - - /* Callback functions. */ - begin_func *begin; - case_func *proc; - end_func *end; - void *func_aux; - }; - -static int equal_splits (const struct ccase *, const struct ccase *, const struct dataset *ds); -static bool split_procedure_case_func (const struct ccase *c, void *, const struct dataset *); -static bool split_procedure_end_func (void *, const struct dataset *); - -/* Like procedure(), but it automatically breaks the case stream - into SPLIT FILE break groups. Before each group of cases with - identical SPLIT FILE variable values, BEGIN_FUNC is called - with the first case in the group. - Then PROC_FUNC is called for each case in the group (including - the first). - END_FUNC is called when the group is finished. FUNC_AUX is - passed to each of the functions as auxiliary data. - - If the active file is empty, none of BEGIN_FUNC, PROC_FUNC, - and END_FUNC will be called at all. - - If SPLIT FILE is not in effect, then there is one break group - (if the active file is nonempty), and BEGIN_FUNC and END_FUNC - will be called once. - - Returns true if successful, false if an I/O error occurred. */ -bool -procedure_with_splits (struct dataset *ds, - begin_func begin, - case_func *proc, - end_func *end, - void *func_aux) -{ - struct split_aux_data split_aux; - bool ok; - - case_nullify (&split_aux.prev_case); - split_aux.begin = begin; - split_aux.proc = proc; - split_aux.end = end; - split_aux.func_aux = func_aux; - split_aux.dataset = ds; - - ok = internal_procedure (ds, split_procedure_case_func, - split_procedure_end_func, &split_aux); - - case_destroy (&split_aux.prev_case); - - return ok; -} - -/* Case callback used by procedure_with_splits(). */ -static bool -split_procedure_case_func (const struct ccase *c, void *split_aux_, const struct dataset *ds) -{ - struct split_aux_data *split_aux = split_aux_; - - /* Start a new series if needed. */ - if (case_is_null (&split_aux->prev_case) - || !equal_splits (c, &split_aux->prev_case, split_aux->dataset)) - { - if (!case_is_null (&split_aux->prev_case) && split_aux->end != NULL) - split_aux->end (split_aux->func_aux, ds); - - case_destroy (&split_aux->prev_case); - case_clone (&split_aux->prev_case, c); - - if (split_aux->begin != NULL) - split_aux->begin (&split_aux->prev_case, split_aux->func_aux, ds); - } - - return (split_aux->proc == NULL - || split_aux->proc (c, split_aux->func_aux, ds)); -} - -/* End-of-file callback used by procedure_with_splits(). */ -static bool -split_procedure_end_func (void *split_aux_, const struct dataset *ds) -{ - struct split_aux_data *split_aux = split_aux_; - - if (!case_is_null (&split_aux->prev_case) && split_aux->end != NULL) - split_aux->end (split_aux->func_aux, ds); - return true; -} - -/* Compares the SPLIT FILE variables in cases A and B and returns - nonzero only if they differ. */ -static int -equal_splits (const struct ccase *a, const struct ccase *b, - const struct dataset *ds) -{ - return case_compare (a, b, - dict_get_split_vars (ds->dict), - dict_get_split_cnt (ds->dict)) == 0; -} - -/* Multipass procedure that separates the data into SPLIT FILE - groups. */ - -/* Represents auxiliary data for handling SPLIT FILE in a - multipass procedure. */ -struct multipass_split_aux_data - { - struct dataset *dataset; /* The dataset of the split */ - struct ccase prev_case; /* Data in previous case. */ - struct casefile *casefile; /* Accumulates data for a split. */ - split_func *split; /* Function to call with the accumulated - data. */ - void *func_aux; /* Auxiliary data. */ - }; - -static bool multipass_split_case_func (const struct ccase *c, void *aux_, const struct dataset *); -static bool multipass_split_end_func (void *aux_, const struct dataset *ds); -static bool multipass_split_output (struct multipass_split_aux_data *, const struct dataset *ds); - -/* Returns true if successful, false if an I/O error occurred. */ -bool -multipass_procedure_with_splits (struct dataset *ds, - split_func *split, - void *func_aux) -{ - struct multipass_split_aux_data aux; - bool ok; - - case_nullify (&aux.prev_case); - aux.casefile = NULL; - aux.split = split; - aux.func_aux = func_aux; - aux.dataset = ds; - - ok = internal_procedure (ds, multipass_split_case_func, - multipass_split_end_func, &aux); - case_destroy (&aux.prev_case); - - return ok; -} - -/* Case callback used by multipass_procedure_with_splits(). */ -static bool -multipass_split_case_func (const struct ccase *c, void *aux_, const struct dataset *ds) -{ - struct multipass_split_aux_data *aux = aux_; - bool ok = true; - - /* Start a new series if needed. */ - if (aux->casefile == NULL || ! equal_splits (c, &aux->prev_case, ds)) - { - /* Record split values. */ - case_destroy (&aux->prev_case); - case_clone (&aux->prev_case, c); - - /* Pass any cases to split_func. */ - if (aux->casefile != NULL) - ok = multipass_split_output (aux, ds); - - /* Start a new casefile. */ - aux->casefile = - ds->cf_factory->create_casefile (ds->cf_factory, - dict_get_next_value_idx (ds->dict)); - } - - return casefile_append (aux->casefile, c) && ok; -} - -/* End-of-file callback used by multipass_procedure_with_splits(). */ -static bool -multipass_split_end_func (void *aux_, const struct dataset *ds) -{ - struct multipass_split_aux_data *aux = aux_; - return (aux->casefile == NULL || multipass_split_output (aux, ds)); -} - -static bool -multipass_split_output (struct multipass_split_aux_data *aux, const struct dataset *ds) -{ - bool ok; - - assert (aux->casefile != NULL); - ok = aux->split (&aux->prev_case, aux->casefile, aux->func_aux, ds); - casefile_destroy (aux->casefile); - aux->casefile = NULL; - - return ok; -} - -/* Discards all the current state in preparation for a data-input - command like DATA LIST or GET. */ -void -discard_variables (struct dataset *ds) -{ - dict_clear (ds->dict); - fh_set_default_handle (NULL); - - ds->n_lag = 0; - - free_case_source (ds->proc_source); - proc_set_source (ds, NULL); - - proc_cancel_all_transformations (ds); -} - /* Returns the current set of permanent transformations, and clears the permanent transformations. For use by INPUT PROGRAM. */ @@ -804,8 +483,10 @@ proc_cancel_temporary_transformations (struct dataset *ds) { if (proc_in_temporary_transformations (ds)) { - dataset_set_dict (ds, ds->permanent_dict); + dict_destroy (ds->dict); + ds->dict = ds->permanent_dict; ds->permanent_dict = NULL; + if (ds->replace_dict) ds->replace_dict (ds->dict); trns_chain_destroy (ds->temporary_trns_chain); ds->temporary_trns_chain = NULL; @@ -822,6 +503,7 @@ bool proc_cancel_all_transformations (struct dataset *ds) { bool ok; + assert (ds->proc_state == PROC_COMMITTED); ok = trns_chain_destroy (ds->permanent_trns_chain); ok = trns_chain_destroy (ds->temporary_trns_chain) && ok; ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create (); @@ -831,14 +513,12 @@ proc_cancel_all_transformations (struct dataset *ds) /* Initializes procedure handling. */ struct dataset * -create_dataset (struct casefile_factory *fact, - replace_source_callback *rps, - replace_dictionary_callback *rds - ) +create_dataset (replace_source_callback *rps, + replace_dictionary_callback *rds) { struct dataset *ds = xzalloc (sizeof(*ds)); ds->dict = dict_create (); - ds->cf_factory = fact; + ds->caseinit = caseinit_create (); ds->replace_source = rps; ds->replace_dict = rds; proc_cancel_all_transformations (ds); @@ -849,60 +529,103 @@ create_dataset (struct casefile_factory *fact, void destroy_dataset (struct dataset *ds) { - discard_variables (ds); + proc_discard_active_file (ds); dict_destroy (ds->dict); + caseinit_destroy (ds->caseinit); trns_chain_destroy (ds->permanent_trns_chain); free (ds); } -/* Sets SINK as the destination for procedure output from the - next procedure. */ +/* Causes output from the next procedure to be discarded, instead + of being preserved for use as input for the next procedure. */ void -proc_set_sink (struct dataset *ds, struct case_sink *sink) +proc_discard_output (struct dataset *ds) { - assert (ds->proc_sink == NULL); - ds->proc_sink = sink; + ds->discard_output = true; +} + +/* Discards the active file dictionary, data, and + transformations. */ +void +proc_discard_active_file (struct dataset *ds) +{ + assert (ds->proc_state == PROC_COMMITTED); + + dict_clear (ds->dict); + fh_set_default_handle (NULL); + + ds->n_lag = 0; + + casereader_destroy (ds->source); + ds->source = NULL; + if ( ds->replace_source) ds->replace_source (NULL); + + proc_cancel_all_transformations (ds); } /* Sets SOURCE as the source for procedure input for the next procedure. */ void -proc_set_source (struct dataset *ds, struct case_source *source) +proc_set_active_file (struct dataset *ds, + struct casereader *source, + struct dictionary *dict) { - ds->proc_source = source; + assert (ds->proc_state == PROC_COMMITTED); + assert (ds->dict != dict); + + proc_discard_active_file (ds); - if ( ds->replace_source ) - ds->replace_source (ds->proc_source); + dict_destroy (ds->dict); + ds->dict = dict; + if ( ds->replace_dict) ds->replace_dict (dict); + + proc_set_active_file_data (ds, source); } -/* Returns true if a source for the next procedure has been - configured, false otherwise. */ +/* Replaces the active file's data by READER without replacing + the associated dictionary. */ bool -proc_has_source (const struct dataset *ds) +proc_set_active_file_data (struct dataset *ds, struct casereader *reader) { - return ds->proc_source != NULL; -} + casereader_destroy (ds->source); + ds->source = reader; + if (ds->replace_source) ds->replace_source (reader); -/* Returns the output from the previous procedure. - For use only immediately after executing a procedure. - The returned casefile is owned by the caller; it will not be - automatically used for the next procedure's input. */ -struct casefile * -proc_capture_output (struct dataset *ds) -{ - struct casefile *casefile; + caseinit_clear (ds->caseinit); + caseinit_mark_as_preinited (ds->caseinit, ds->dict); - /* Try to make sure that this function is called immediately - after procedure() or a similar function. */ - assert (ds->proc_source != NULL); - assert (case_source_is_class (ds->proc_source, &storage_source_class)); - assert (trns_chain_is_empty (ds->permanent_trns_chain)); - assert (!proc_in_temporary_transformations (ds)); + return reader == NULL || !casereader_error (reader); +} - casefile = storage_source_decapsulate (ds->proc_source); - proc_set_source (ds, NULL); +/* Returns true if an active file data source is available, false + otherwise. */ +bool +proc_has_active_file (const struct dataset *ds) +{ + return ds->source != NULL; +} - return casefile; +/* Checks whether DS has a corrupted active file. If so, + discards it and returns false. If not, returns true without + doing anything. */ +bool +dataset_end_of_command (struct dataset *ds) +{ + if (ds->source != NULL) + { + if (casereader_error (ds->source)) + { + proc_discard_active_file (ds); + return false; + } + else + { + const struct taint *taint = casereader_get_taint (ds->source); + taint_reset_successor_taint ((struct taint *) taint); + assert (!taint_has_tainted_successor (taint)); + } + } + return true; } static trns_proc_func case_limit_trns_proc; @@ -983,32 +706,8 @@ dataset_dict (const struct dataset *ds) return ds->dict; } - -/* Set or replace dataset DS's dictionary with DICT. - The old dictionary is destroyed */ -void -dataset_set_dict (struct dataset *ds, struct dictionary *dict) -{ - struct dictionary *old_dict = ds->dict; - - dict_copy_callbacks (dict, ds->dict); - ds->dict = dict; - - if ( ds->replace_dict ) - ds->replace_dict (dict); - - dict_destroy (old_dict); -} - void dataset_need_lag (struct dataset *ds, int n_before) { ds->n_lag = MAX (ds->n_lag, n_before); } - -struct casefile_factory * -dataset_get_casefile_factory (const struct dataset *ds) -{ - return ds->cf_factory; -} -