X-Git-Url: https://pintos-os.org/cgi-bin/gitweb.cgi?a=blobdiff_plain;f=src%2Fdata%2Fprocedure.c;h=2154f689c8f50d7a3370d8d99a935f2056590897;hb=ecd26ec19e9f8a58079a1c5fa06b39484787ab7e;hp=29dffbcaf8303dd9daf607d19eec7cfd76a1590a;hpb=9cb85c5eb1b0e041f4b7b7f1c5ab9cce55de524f;p=pspp-builds.git diff --git a/src/data/procedure.c b/src/data/procedure.c index 29dffbca..2154f689 100644 --- a/src/data/procedure.c +++ b/src/data/procedure.c @@ -43,6 +43,13 @@ struct dataset { /* An abstract factory which creates casefiles */ struct casefile_factory *cf_factory; + /* Callback which occurs when a procedure provides a new source for + the dataset */ + replace_source_callback *replace_source ; + + /* Callback which occurs whenever the DICT is replaced by a new one */ + replace_dictionary_callback *replace_dict; + /* Cases are read from proc_source, pass through permanent_trns_chain (which transforms them into the format described by permanent_dict), @@ -120,10 +127,10 @@ time_of_last_procedure (struct dataset *ds) start the next case from step 1. 2. Write case to replacement active file. - + 3. Execute temporary transformations. If these drop the case, start the next case from step 1. - + 4. Pass case to PROC_FUNC, passing AUX as auxiliary data. Returns true if successful, false if an I/O error occurred. */ @@ -152,17 +159,17 @@ procedure (struct dataset *ds, case_func *cf, void *aux) /* Multipass procedure. */ -struct multipass_aux_data +struct multipass_aux_data { struct casefile *casefile; - + bool (*proc_func) (const struct casefile *, void *aux); void *aux; }; /* Case processing function for multipass_procedure(). */ static bool -multipass_case_func (const struct ccase *c, void *aux_data_, const struct dataset *ds UNUSED) +multipass_case_func (const struct ccase *c, void *aux_data_, const struct dataset *ds UNUSED) { struct multipass_aux_data *aux_data = aux_data_; return casefile_append (aux_data->casefile, c); @@ -170,7 +177,7 @@ multipass_case_func (const struct ccase *c, void *aux_data_, const struct datase /* End-of-file function for multipass_procedure(). */ static bool -multipass_end_func (void *aux_data_, const struct dataset *ds UNUSED) +multipass_end_func (void *aux_data_, const struct dataset *ds UNUSED) { struct multipass_aux_data *aux_data = aux_data_; return (aux_data->proc_func == NULL @@ -181,7 +188,7 @@ multipass_end_func (void *aux_data_, const struct dataset *ds UNUSED) The entire active file is passed to PROC_FUNC, with the given AUX as auxiliary data, as a unit. */ bool -multipass_procedure (struct dataset *ds, casefile_func *proc_func, void *aux) +multipass_procedure (struct dataset *ds, casefile_func *proc_func, void *aux) { struct multipass_aux_data aux_data; bool ok; @@ -201,6 +208,7 @@ multipass_procedure (struct dataset *ds, casefile_func *proc_func, void *aux) return ok; } + /* Procedure implementation. */ /* Executes a procedure. @@ -211,18 +219,27 @@ multipass_procedure (struct dataset *ds, casefile_func *proc_func, void *aux) static bool internal_procedure (struct dataset *ds, case_func *proc, end_func *end, - void *aux) + void *aux) { struct ccase *c; bool ok = true; - + proc_open (ds); while (ok && proc_read (ds, &c)) if (proc != NULL) ok = proc (c, aux, ds) && ok; if (end != NULL) ok = end (aux, ds) && ok; - return proc_close (ds) && ok; + + if ( proc_close (ds) && ok ) + { + if ( ds->replace_source ) + ds->replace_source (ds->proc_source); + + return true; + } + + return false; } /* Opens dataset DS for reading cases with proc_read. @@ -251,13 +268,13 @@ proc_open (struct dataset *ds) Return false at end of file or if a read error occurs. In this case a null pointer is stored in *C. */ bool -proc_read (struct dataset *ds, struct ccase **c) +proc_read (struct dataset *ds, struct ccase **c) { enum trns_result retval = TRNS_DROP_CASE; assert (ds->is_open); *c = NULL; - for (;;) + for (;;) { size_t case_nr; @@ -278,16 +295,16 @@ proc_read (struct dataset *ds, struct ccase **c) &ds->trns_case, &case_nr); if (retval != TRNS_CONTINUE) continue; - + /* Write case to LAG queue. */ if (ds->n_lag) lag_case (ds, &ds->trns_case); /* Write case to replacement active file. */ ds->cases_written++; - if (ds->proc_sink->class->write != NULL) + if (ds->proc_sink->class->write != NULL) { - if (ds->compactor != NULL) + if (ds->compactor != NULL) { dict_compactor_compact (ds->compactor, &ds->sink_case, &ds->trns_case); @@ -296,9 +313,9 @@ proc_read (struct dataset *ds, struct ccase **c) else ds->proc_sink->class->write (ds->proc_sink, &ds->trns_case); } - + /* Execute temporary transformations. */ - if (ds->temporary_trns_chain != NULL) + if (ds->temporary_trns_chain != NULL) { retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE, &ds->trns_case, &ds->cases_written); @@ -307,7 +324,7 @@ proc_read (struct dataset *ds, struct ccase **c) } *c = &ds->trns_case; - return true; + return true; } } @@ -317,17 +334,17 @@ proc_read (struct dataset *ds, struct ccase **c) If DS has not been opened, returns true without doing anything else. */ bool -proc_close (struct dataset *ds) +proc_close (struct dataset *ds) { if (!ds->is_open) return true; /* Drain any remaining cases. */ - while (ds->ok) + while (ds->ok) { struct ccase *c; if (!proc_read (ds, &c)) - break; + break; } ds->ok = free_case_source (ds->proc_source) && ds->ok; ds->proc_source = NULL; @@ -343,7 +360,7 @@ proc_close (struct dataset *ds) /* Updates last_proc_invocation. */ static void -update_last_proc_invocation (struct dataset *ds) +update_last_proc_invocation (struct dataset *ds) { ds->last_proc_invocation = time (NULL); } @@ -358,7 +375,7 @@ create_trns_case (struct ccase *trns_case, struct dictionary *dict) size_t i; case_create (trns_case, dict_get_next_value_idx (dict)); - for (i = 0; i < var_cnt; i++) + for (i = 0; i < var_cnt; i++) { struct variable *v = dict_get_var (dict, i); union value *value = case_data_rw (trns_case, v); @@ -387,7 +404,7 @@ open_active_file (struct dataset *ds) ds->permanent_dict = ds->dict; /* Figure out whether to compact. */ - ds->compactor = + ds->compactor = (dict_compacting_would_shrink (ds->permanent_dict) ? dict_make_compactor (ds->permanent_dict) : NULL); @@ -405,7 +422,7 @@ open_active_file (struct dataset *ds) if (ds->n_lag > 0) { int i; - + ds->lag_count = 0; ds->lag_head = 0; ds->lag_queue = xnmalloc (ds->n_lag, sizeof *ds->lag_queue); @@ -433,17 +450,17 @@ clear_case (const struct dataset *ds, struct ccase *c) { size_t var_cnt = dict_get_var_cnt (ds->dict); size_t i; - - for (i = 0; i < var_cnt; i++) + + for (i = 0; i < var_cnt; i++) { struct variable *v = dict_get_var (ds->dict, i); - if (!var_get_leave (v)) + if (!var_get_leave (v)) { if (var_is_numeric (v)) - case_data_rw (c, v)->f = SYSMIS; + case_data_rw (c, v)->f = SYSMIS; else memset (case_data_rw (c, v)->s, ' ', var_get_width (v)); - } + } } } @@ -455,24 +472,24 @@ close_active_file (struct dataset *ds) if (ds->n_lag > 0) { int i; - + for (i = 0; i < ds->n_lag; i++) case_destroy (&ds->lag_queue[i]); free (ds->lag_queue); ds->n_lag = 0; } - + /* Dictionary from before TEMPORARY becomes permanent. */ proc_cancel_temporary_transformations (ds); /* Finish compacting. */ - if (ds->compactor != NULL) + if (ds->compactor != NULL) { dict_compactor_destroy (ds->compactor); dict_compact_values (ds->dict); ds->compactor = NULL; } - + /* Old data sink becomes new data source. */ if (ds->proc_sink->class->make_source != NULL) ds->proc_source = ds->proc_sink->class->make_source (ds->proc_sink); @@ -506,13 +523,13 @@ lagged_case (const struct dataset *ds, int n_before) /* Procedure that separates the data into SPLIT FILE groups. */ /* Represents auxiliary data for handling SPLIT FILE. */ -struct split_aux_data +struct split_aux_data { struct dataset *dataset; /* The dataset */ struct ccase prev_case; /* Data in previous case. */ /* Callback functions. */ - begin_func *begin; + begin_func *begin; case_func *proc; end_func *end; void *func_aux; @@ -532,19 +549,19 @@ static bool split_procedure_end_func (void *, const struct dataset *); passed to each of the functions as auxiliary data. If the active file is empty, none of BEGIN_FUNC, PROC_FUNC, - and END_FUNC will be called at all. + and END_FUNC will be called at all. If SPLIT FILE is not in effect, then there is one break group (if the active file is nonempty), and BEGIN_FUNC and END_FUNC will be called once. - + Returns true if successful, false if an I/O error occurred. */ bool procedure_with_splits (struct dataset *ds, - begin_func begin, + begin_func begin, case_func *proc, end_func *end, - void *func_aux) + void *func_aux) { struct split_aux_data split_aux; bool ok; @@ -566,7 +583,7 @@ procedure_with_splits (struct dataset *ds, /* Case callback used by procedure_with_splits(). */ static bool -split_procedure_case_func (const struct ccase *c, void *split_aux_, const struct dataset *ds) +split_procedure_case_func (const struct ccase *c, void *split_aux_, const struct dataset *ds) { struct split_aux_data *split_aux = split_aux_; @@ -590,7 +607,7 @@ split_procedure_case_func (const struct ccase *c, void *split_aux_, const struct /* End-of-file callback used by procedure_with_splits(). */ static bool -split_procedure_end_func (void *split_aux_, const struct dataset *ds) +split_procedure_end_func (void *split_aux_, const struct dataset *ds) { struct split_aux_data *split_aux = split_aux_; @@ -602,8 +619,8 @@ split_procedure_end_func (void *split_aux_, const struct dataset *ds) /* Compares the SPLIT FILE variables in cases A and B and returns nonzero only if they differ. */ static int -equal_splits (const struct ccase *a, const struct ccase *b, - const struct dataset *ds) +equal_splits (const struct ccase *a, const struct ccase *b, + const struct dataset *ds) { return case_compare (a, b, dict_get_split_vars (ds->dict), @@ -615,14 +632,14 @@ equal_splits (const struct ccase *a, const struct ccase *b, /* Represents auxiliary data for handling SPLIT FILE in a multipass procedure. */ -struct multipass_split_aux_data +struct multipass_split_aux_data { struct dataset *dataset; /* The dataset of the split */ struct ccase prev_case; /* Data in previous case. */ struct casefile *casefile; /* Accumulates data for a split. */ - split_func *split; /* Function to call with the accumulated + split_func *split; /* Function to call with the accumulated data. */ - void *func_aux; /* Auxiliary data. */ + void *func_aux; /* Auxiliary data. */ }; static bool multipass_split_case_func (const struct ccase *c, void *aux_, const struct dataset *); @@ -631,7 +648,7 @@ static bool multipass_split_output (struct multipass_split_aux_data *, const str /* Returns true if successful, false if an I/O error occurred. */ bool -multipass_procedure_with_splits (struct dataset *ds, +multipass_procedure_with_splits (struct dataset *ds, split_func *split, void *func_aux) { @@ -670,7 +687,7 @@ multipass_split_case_func (const struct ccase *c, void *aux_, const struct datas ok = multipass_split_output (aux, ds); /* Start a new casefile. */ - aux->casefile = + aux->casefile = ds->cf_factory->create_casefile (ds->cf_factory, dict_get_next_value_idx (ds->dict)); } @@ -690,7 +707,7 @@ static bool multipass_split_output (struct multipass_split_aux_data *aux, const struct dataset *ds) { bool ok; - + assert (aux->casefile != NULL); ok = aux->split (&aux->prev_case, aux->casefile, aux->func_aux, ds); casefile_destroy (aux->casefile); @@ -708,9 +725,12 @@ discard_variables (struct dataset *ds) fh_set_default_handle (NULL); ds->n_lag = 0; - + free_case_source (ds->proc_source); ds->proc_source = NULL; + if ( ds->replace_source ) + ds->replace_source (ds->proc_source); + proc_cancel_all_transformations (ds); } @@ -719,10 +739,10 @@ discard_variables (struct dataset *ds) and clears the permanent transformations. For use by INPUT PROGRAM. */ struct trns_chain * -proc_capture_transformations (struct dataset *ds) +proc_capture_transformations (struct dataset *ds) { struct trns_chain *chain; - + assert (ds->temporary_trns_chain == NULL); chain = ds->permanent_trns_chain; ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create (); @@ -744,7 +764,7 @@ add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *fr FINALIZE will be called. The functions are passed AUX as auxiliary data. */ void -add_transformation_with_finalizer (struct dataset *ds, +add_transformation_with_finalizer (struct dataset *ds, trns_finalize_func *finalize, trns_proc_func *proc, trns_free_func *free, void *aux) @@ -756,7 +776,7 @@ add_transformation_with_finalizer (struct dataset *ds, This value can be returned by a transformation procedure function to indicate a "jump" to that transformation. */ size_t -next_transformation (const struct dataset *ds) +next_transformation (const struct dataset *ds) { return trns_chain_next (ds->cur_trns_chain); } @@ -765,7 +785,7 @@ next_transformation (const struct dataset *ds) a temporary transformation, false if it will add a permanent transformation. */ bool -proc_in_temporary_transformations (const struct dataset *ds) +proc_in_temporary_transformations (const struct dataset *ds) { return ds->temporary_trns_chain != NULL; } @@ -774,7 +794,7 @@ proc_in_temporary_transformations (const struct dataset *ds) Further calls to add_transformation() will add temporary transformations. */ void -proc_start_temporary_transformations (struct dataset *ds) +proc_start_temporary_transformations (struct dataset *ds) { if (!proc_in_temporary_transformations (ds)) { @@ -791,9 +811,9 @@ proc_start_temporary_transformations (struct dataset *ds) permanent. Returns true if anything changed, false otherwise. */ bool -proc_make_temporary_transformations_permanent (struct dataset *ds) +proc_make_temporary_transformations_permanent (struct dataset *ds) { - if (proc_in_temporary_transformations (ds)) + if (proc_in_temporary_transformations (ds)) { trns_chain_finalize (ds->temporary_trns_chain); trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain); @@ -812,9 +832,9 @@ proc_make_temporary_transformations_permanent (struct dataset *ds) transformations will be permanent. Returns true if anything changed, false otherwise. */ bool -proc_cancel_temporary_transformations (struct dataset *ds) +proc_cancel_temporary_transformations (struct dataset *ds) { - if (proc_in_temporary_transformations (ds)) + if (proc_in_temporary_transformations (ds)) { dict_destroy (ds->dict); ds->dict = ds->permanent_dict; @@ -844,11 +864,16 @@ proc_cancel_all_transformations (struct dataset *ds) /* Initializes procedure handling. */ struct dataset * -create_dataset (struct casefile_factory *fact) +create_dataset (struct casefile_factory *fact, + replace_source_callback *rps, + replace_dictionary_callback *rds + ) { struct dataset *ds = xzalloc (sizeof(*ds)); ds->dict = dict_create (); ds->cf_factory = fact; + ds->replace_source = rps; + ds->replace_dict = rds; proc_cancel_all_transformations (ds); return ds; } @@ -866,7 +891,7 @@ destroy_dataset (struct dataset *ds) /* Sets SINK as the destination for procedure output from the next procedure. */ void -proc_set_sink (struct dataset *ds, struct case_sink *sink) +proc_set_sink (struct dataset *ds, struct case_sink *sink) { assert (ds->proc_sink == NULL); ds->proc_sink = sink; @@ -875,7 +900,7 @@ proc_set_sink (struct dataset *ds, struct case_sink *sink) /* Sets SOURCE as the source for procedure input for the next procedure. */ void -proc_set_source (struct dataset *ds, struct case_source *source) +proc_set_source (struct dataset *ds, struct case_source *source) { assert (ds->proc_source == NULL); ds->proc_source = source; @@ -884,7 +909,7 @@ proc_set_source (struct dataset *ds, struct case_source *source) /* Returns true if a source for the next procedure has been configured, false otherwise. */ bool -proc_has_source (const struct dataset *ds) +proc_has_source (const struct dataset *ds) { return ds->proc_source != NULL; } @@ -894,7 +919,7 @@ proc_has_source (const struct dataset *ds) The returned casefile is owned by the caller; it will not be automatically used for the next procedure's input. */ struct casefile * -proc_capture_output (struct dataset *ds) +proc_capture_output (struct dataset *ds) { struct casefile *casefile; @@ -917,7 +942,7 @@ static trns_free_func case_limit_trns_free; /* Adds a transformation that limits the number of cases that may pass through, if DS->DICT has a case limit. */ static void -add_case_limit_trns (struct dataset *ds) +add_case_limit_trns (struct dataset *ds) { size_t case_limit = dict_get_case_limit (ds->dict); if (case_limit != 0) @@ -934,10 +959,10 @@ add_case_limit_trns (struct dataset *ds) *CASES_REMAINING. */ static int case_limit_trns_proc (void *cases_remaining_, - struct ccase *c UNUSED, casenumber case_nr UNUSED) + struct ccase *c UNUSED, casenumber case_nr UNUSED) { size_t *cases_remaining = cases_remaining_; - if (*cases_remaining > 0) + if (*cases_remaining > 0) { (*cases_remaining)--; return TRNS_CONTINUE; @@ -948,7 +973,7 @@ case_limit_trns_proc (void *cases_remaining_, /* Frees the data associated with a case limit transformation. */ static bool -case_limit_trns_free (void *cases_remaining_) +case_limit_trns_free (void *cases_remaining_) { size_t *cases_remaining = cases_remaining_; free (cases_remaining); @@ -960,10 +985,10 @@ static trns_proc_func filter_trns_proc; /* Adds a temporary transformation to filter data according to the variable specified on FILTER, if any. */ static void -add_filter_trns (struct dataset *ds) +add_filter_trns (struct dataset *ds) { struct variable *filter_var = dict_get_filter (ds->dict); - if (filter_var != NULL) + if (filter_var != NULL) { proc_start_temporary_transformations (ds); add_transformation (ds, filter_trns_proc, NULL, filter_var); @@ -973,8 +998,8 @@ add_filter_trns (struct dataset *ds) /* FILTER transformation. */ static int filter_trns_proc (void *filter_var_, - struct ccase *c UNUSED, casenumber case_nr UNUSED) - + struct ccase *c UNUSED, casenumber case_nr UNUSED) + { struct variable *filter_var = filter_var_; double f = case_num (c, filter_var); @@ -990,19 +1015,23 @@ dataset_dict (const struct dataset *ds) } -void +void dataset_set_dict (struct dataset *ds, struct dictionary *dict) { + dict_copy_callbacks (dict, ds->dict); ds->dict = dict; + + if ( ds->replace_dict ) + ds->replace_dict (dict); } -int +int dataset_n_lag (const struct dataset *ds) { return ds->n_lag; } -void +void dataset_set_n_lag (struct dataset *ds, int n_lag) { ds->n_lag = n_lag;