X-Git-Url: https://pintos-os.org/cgi-bin/gitweb.cgi?a=blobdiff_plain;f=src%2Fdata%2Fprocedure.c;h=b6f988aabbaf46b2707940aca2e646504ced5ffa;hb=4517b68e7248f22e7b7ed81f0d73179351a53047;hp=8aec2c9fe05092d1beded5ca2bddafbc12048e1d;hpb=687adf53eae434e88a47bb3409f946f3a26115a4;p=pspp-builds.git diff --git a/src/data/procedure.c b/src/data/procedure.c index 8aec2c9f..b6f988aa 100644 --- a/src/data/procedure.c +++ b/src/data/procedure.c @@ -1,5 +1,5 @@ /* PSPP - computes sample statistics. - Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc. + Copyright (C) 1997-9, 2000, 2006, 2007 Free Software Foundation, Inc. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as @@ -43,6 +43,13 @@ struct dataset { /* An abstract factory which creates casefiles */ struct casefile_factory *cf_factory; + /* Callback which occurs when a procedure provides a new source for + the dataset */ + replace_source_callback *replace_source ; + + /* Callback which occurs whenever the DICT is replaced by a new one */ + replace_dictionary_callback *replace_dict; + /* Cases are read from proc_source, pass through permanent_trns_chain (which transforms them into the format described by permanent_dict), @@ -101,7 +108,7 @@ static bool close_active_file (struct dataset *ds); /* Returns the last time the data was read. */ time_t -time_of_last_procedure (struct dataset *ds) +time_of_last_procedure (struct dataset *ds) { if (ds->last_proc_invocation == 0) update_last_proc_invocation (ds); @@ -120,10 +127,10 @@ time_of_last_procedure (struct dataset *ds) start the next case from step 1. 2. Write case to replacement active file. - + 3. Execute temporary transformations. If these drop the case, start the next case from step 1. - + 4. Pass case to PROC_FUNC, passing AUX as auxiliary data. Returns true if successful, false if an I/O error occurred. */ @@ -152,17 +159,17 @@ procedure (struct dataset *ds, case_func *cf, void *aux) /* Multipass procedure. */ -struct multipass_aux_data +struct multipass_aux_data { struct casefile *casefile; - + bool (*proc_func) (const struct casefile *, void *aux); void *aux; }; /* Case processing function for multipass_procedure(). */ static bool -multipass_case_func (const struct ccase *c, void *aux_data_, const struct dataset *ds UNUSED) +multipass_case_func (const struct ccase *c, void *aux_data_, const struct dataset *ds UNUSED) { struct multipass_aux_data *aux_data = aux_data_; return casefile_append (aux_data->casefile, c); @@ -170,7 +177,7 @@ multipass_case_func (const struct ccase *c, void *aux_data_, const struct datase /* End-of-file function for multipass_procedure(). */ static bool -multipass_end_func (void *aux_data_, const struct dataset *ds UNUSED) +multipass_end_func (void *aux_data_, const struct dataset *ds UNUSED) { struct multipass_aux_data *aux_data = aux_data_; return (aux_data->proc_func == NULL @@ -181,7 +188,7 @@ multipass_end_func (void *aux_data_, const struct dataset *ds UNUSED) The entire active file is passed to PROC_FUNC, with the given AUX as auxiliary data, as a unit. */ bool -multipass_procedure (struct dataset *ds, casefile_func *proc_func, void *aux) +multipass_procedure (struct dataset *ds, casefile_func *proc_func, void *aux) { struct multipass_aux_data aux_data; bool ok; @@ -201,6 +208,7 @@ multipass_procedure (struct dataset *ds, casefile_func *proc_func, void *aux) return ok; } + /* Procedure implementation. */ /* Executes a procedure. @@ -211,18 +219,25 @@ multipass_procedure (struct dataset *ds, casefile_func *proc_func, void *aux) static bool internal_procedure (struct dataset *ds, case_func *proc, end_func *end, - void *aux) + void *aux) { struct ccase *c; bool ok = true; - + proc_open (ds); while (ok && proc_read (ds, &c)) if (proc != NULL) ok = proc (c, aux, ds) && ok; if (end != NULL) ok = end (aux, ds) && ok; - return proc_close (ds) && ok; + + if ( proc_close (ds) && ok ) + { + + return true; + } + + return false; } /* Opens dataset DS for reading cases with proc_read. @@ -251,13 +266,13 @@ proc_open (struct dataset *ds) Return false at end of file or if a read error occurs. In this case a null pointer is stored in *C. */ bool -proc_read (struct dataset *ds, struct ccase **c) +proc_read (struct dataset *ds, struct ccase **c) { enum trns_result retval = TRNS_DROP_CASE; assert (ds->is_open); *c = NULL; - for (;;) + for (;;) { size_t case_nr; @@ -278,16 +293,16 @@ proc_read (struct dataset *ds, struct ccase **c) &ds->trns_case, &case_nr); if (retval != TRNS_CONTINUE) continue; - + /* Write case to LAG queue. */ if (ds->n_lag) lag_case (ds, &ds->trns_case); /* Write case to replacement active file. */ ds->cases_written++; - if (ds->proc_sink->class->write != NULL) + if (ds->proc_sink->class->write != NULL) { - if (ds->compactor != NULL) + if (ds->compactor != NULL) { dict_compactor_compact (ds->compactor, &ds->sink_case, &ds->trns_case); @@ -296,9 +311,9 @@ proc_read (struct dataset *ds, struct ccase **c) else ds->proc_sink->class->write (ds->proc_sink, &ds->trns_case); } - + /* Execute temporary transformations. */ - if (ds->temporary_trns_chain != NULL) + if (ds->temporary_trns_chain != NULL) { retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE, &ds->trns_case, &ds->cases_written); @@ -307,7 +322,7 @@ proc_read (struct dataset *ds, struct ccase **c) } *c = &ds->trns_case; - return true; + return true; } } @@ -317,20 +332,20 @@ proc_read (struct dataset *ds, struct ccase **c) If DS has not been opened, returns true without doing anything else. */ bool -proc_close (struct dataset *ds) +proc_close (struct dataset *ds) { if (!ds->is_open) return true; /* Drain any remaining cases. */ - while (ds->ok) + while (ds->ok) { struct ccase *c; if (!proc_read (ds, &c)) - break; + break; } ds->ok = free_case_source (ds->proc_source) && ds->ok; - ds->proc_source = NULL; + proc_set_source (ds, NULL); case_destroy (&ds->sink_case); case_destroy (&ds->trns_case); @@ -343,7 +358,7 @@ proc_close (struct dataset *ds) /* Updates last_proc_invocation. */ static void -update_last_proc_invocation (struct dataset *ds) +update_last_proc_invocation (struct dataset *ds) { ds->last_proc_invocation = time (NULL); } @@ -358,7 +373,7 @@ create_trns_case (struct ccase *trns_case, struct dictionary *dict) size_t i; case_create (trns_case, dict_get_next_value_idx (dict)); - for (i = 0; i < var_cnt; i++) + for (i = 0; i < var_cnt; i++) { struct variable *v = dict_get_var (dict, i); union value *value = case_data_rw (trns_case, v); @@ -387,7 +402,7 @@ open_active_file (struct dataset *ds) ds->permanent_dict = ds->dict; /* Figure out whether to compact. */ - ds->compactor = + ds->compactor = (dict_compacting_would_shrink (ds->permanent_dict) ? dict_make_compactor (ds->permanent_dict) : NULL); @@ -405,7 +420,7 @@ open_active_file (struct dataset *ds) if (ds->n_lag > 0) { int i; - + ds->lag_count = 0; ds->lag_head = 0; ds->lag_queue = xnmalloc (ds->n_lag, sizeof *ds->lag_queue); @@ -433,17 +448,17 @@ clear_case (const struct dataset *ds, struct ccase *c) { size_t var_cnt = dict_get_var_cnt (ds->dict); size_t i; - - for (i = 0; i < var_cnt; i++) + + for (i = 0; i < var_cnt; i++) { struct variable *v = dict_get_var (ds->dict, i); - if (!var_get_leave (v)) + if (!var_get_leave (v)) { if (var_is_numeric (v)) - case_data_rw (c, v)->f = SYSMIS; + case_data_rw (c, v)->f = SYSMIS; else memset (case_data_rw (c, v)->s, ' ', var_get_width (v)); - } + } } } @@ -455,27 +470,27 @@ close_active_file (struct dataset *ds) if (ds->n_lag > 0) { int i; - + for (i = 0; i < ds->n_lag; i++) case_destroy (&ds->lag_queue[i]); free (ds->lag_queue); ds->n_lag = 0; } - + /* Dictionary from before TEMPORARY becomes permanent. */ proc_cancel_temporary_transformations (ds); /* Finish compacting. */ - if (ds->compactor != NULL) + if (ds->compactor != NULL) { dict_compactor_destroy (ds->compactor); dict_compact_values (ds->dict); ds->compactor = NULL; } - + /* Old data sink becomes new data source. */ if (ds->proc_sink->class->make_source != NULL) - ds->proc_source = ds->proc_sink->class->make_source (ds->proc_sink); + proc_set_source (ds, ds->proc_sink->class->make_source (ds->proc_sink) ); free_case_sink (ds->proc_sink); ds->proc_sink = NULL; @@ -506,13 +521,13 @@ lagged_case (const struct dataset *ds, int n_before) /* Procedure that separates the data into SPLIT FILE groups. */ /* Represents auxiliary data for handling SPLIT FILE. */ -struct split_aux_data +struct split_aux_data { struct dataset *dataset; /* The dataset */ struct ccase prev_case; /* Data in previous case. */ /* Callback functions. */ - begin_func *begin; + begin_func *begin; case_func *proc; end_func *end; void *func_aux; @@ -532,19 +547,19 @@ static bool split_procedure_end_func (void *, const struct dataset *); passed to each of the functions as auxiliary data. If the active file is empty, none of BEGIN_FUNC, PROC_FUNC, - and END_FUNC will be called at all. + and END_FUNC will be called at all. If SPLIT FILE is not in effect, then there is one break group (if the active file is nonempty), and BEGIN_FUNC and END_FUNC will be called once. - + Returns true if successful, false if an I/O error occurred. */ bool procedure_with_splits (struct dataset *ds, - begin_func begin, + begin_func begin, case_func *proc, end_func *end, - void *func_aux) + void *func_aux) { struct split_aux_data split_aux; bool ok; @@ -566,7 +581,7 @@ procedure_with_splits (struct dataset *ds, /* Case callback used by procedure_with_splits(). */ static bool -split_procedure_case_func (const struct ccase *c, void *split_aux_, const struct dataset *ds) +split_procedure_case_func (const struct ccase *c, void *split_aux_, const struct dataset *ds) { struct split_aux_data *split_aux = split_aux_; @@ -590,7 +605,7 @@ split_procedure_case_func (const struct ccase *c, void *split_aux_, const struct /* End-of-file callback used by procedure_with_splits(). */ static bool -split_procedure_end_func (void *split_aux_, const struct dataset *ds) +split_procedure_end_func (void *split_aux_, const struct dataset *ds) { struct split_aux_data *split_aux = split_aux_; @@ -602,8 +617,8 @@ split_procedure_end_func (void *split_aux_, const struct dataset *ds) /* Compares the SPLIT FILE variables in cases A and B and returns nonzero only if they differ. */ static int -equal_splits (const struct ccase *a, const struct ccase *b, - const struct dataset *ds) +equal_splits (const struct ccase *a, const struct ccase *b, + const struct dataset *ds) { return case_compare (a, b, dict_get_split_vars (ds->dict), @@ -615,14 +630,14 @@ equal_splits (const struct ccase *a, const struct ccase *b, /* Represents auxiliary data for handling SPLIT FILE in a multipass procedure. */ -struct multipass_split_aux_data +struct multipass_split_aux_data { struct dataset *dataset; /* The dataset of the split */ struct ccase prev_case; /* Data in previous case. */ struct casefile *casefile; /* Accumulates data for a split. */ - split_func *split; /* Function to call with the accumulated + split_func *split; /* Function to call with the accumulated data. */ - void *func_aux; /* Auxiliary data. */ + void *func_aux; /* Auxiliary data. */ }; static bool multipass_split_case_func (const struct ccase *c, void *aux_, const struct dataset *); @@ -631,7 +646,7 @@ static bool multipass_split_output (struct multipass_split_aux_data *, const str /* Returns true if successful, false if an I/O error occurred. */ bool -multipass_procedure_with_splits (struct dataset *ds, +multipass_procedure_with_splits (struct dataset *ds, split_func *split, void *func_aux) { @@ -670,7 +685,7 @@ multipass_split_case_func (const struct ccase *c, void *aux_, const struct datas ok = multipass_split_output (aux, ds); /* Start a new casefile. */ - aux->casefile = + aux->casefile = ds->cf_factory->create_casefile (ds->cf_factory, dict_get_next_value_idx (ds->dict)); } @@ -690,7 +705,7 @@ static bool multipass_split_output (struct multipass_split_aux_data *aux, const struct dataset *ds) { bool ok; - + assert (aux->casefile != NULL); ok = aux->split (&aux->prev_case, aux->casefile, aux->func_aux, ds); casefile_destroy (aux->casefile); @@ -708,9 +723,9 @@ discard_variables (struct dataset *ds) fh_set_default_handle (NULL); ds->n_lag = 0; - + free_case_source (ds->proc_source); - ds->proc_source = NULL; + proc_set_source (ds, NULL); proc_cancel_all_transformations (ds); } @@ -719,10 +734,10 @@ discard_variables (struct dataset *ds) and clears the permanent transformations. For use by INPUT PROGRAM. */ struct trns_chain * -proc_capture_transformations (struct dataset *ds) +proc_capture_transformations (struct dataset *ds) { struct trns_chain *chain; - + assert (ds->temporary_trns_chain == NULL); chain = ds->permanent_trns_chain; ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create (); @@ -744,7 +759,7 @@ add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *fr FINALIZE will be called. The functions are passed AUX as auxiliary data. */ void -add_transformation_with_finalizer (struct dataset *ds, +add_transformation_with_finalizer (struct dataset *ds, trns_finalize_func *finalize, trns_proc_func *proc, trns_free_func *free, void *aux) @@ -756,7 +771,7 @@ add_transformation_with_finalizer (struct dataset *ds, This value can be returned by a transformation procedure function to indicate a "jump" to that transformation. */ size_t -next_transformation (const struct dataset *ds) +next_transformation (const struct dataset *ds) { return trns_chain_next (ds->cur_trns_chain); } @@ -765,7 +780,7 @@ next_transformation (const struct dataset *ds) a temporary transformation, false if it will add a permanent transformation. */ bool -proc_in_temporary_transformations (const struct dataset *ds) +proc_in_temporary_transformations (const struct dataset *ds) { return ds->temporary_trns_chain != NULL; } @@ -774,13 +789,14 @@ proc_in_temporary_transformations (const struct dataset *ds) Further calls to add_transformation() will add temporary transformations. */ void -proc_start_temporary_transformations (struct dataset *ds) +proc_start_temporary_transformations (struct dataset *ds) { if (!proc_in_temporary_transformations (ds)) { add_case_limit_trns (ds); ds->permanent_dict = dict_clone (ds->dict); + trns_chain_finalize (ds->permanent_trns_chain); ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create (); } @@ -791,9 +807,9 @@ proc_start_temporary_transformations (struct dataset *ds) permanent. Returns true if anything changed, false otherwise. */ bool -proc_make_temporary_transformations_permanent (struct dataset *ds) +proc_make_temporary_transformations_permanent (struct dataset *ds) { - if (proc_in_temporary_transformations (ds)) + if (proc_in_temporary_transformations (ds)) { trns_chain_finalize (ds->temporary_trns_chain); trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain); @@ -812,12 +828,11 @@ proc_make_temporary_transformations_permanent (struct dataset *ds) transformations will be permanent. Returns true if anything changed, false otherwise. */ bool -proc_cancel_temporary_transformations (struct dataset *ds) +proc_cancel_temporary_transformations (struct dataset *ds) { - if (proc_in_temporary_transformations (ds)) + if (proc_in_temporary_transformations (ds)) { - dict_destroy (ds->dict); - ds->dict = ds->permanent_dict; + dataset_set_dict (ds, ds->permanent_dict); ds->permanent_dict = NULL; trns_chain_destroy (ds->temporary_trns_chain); @@ -844,11 +859,16 @@ proc_cancel_all_transformations (struct dataset *ds) /* Initializes procedure handling. */ struct dataset * -create_dataset (struct casefile_factory *fact) +create_dataset (struct casefile_factory *fact, + replace_source_callback *rps, + replace_dictionary_callback *rds + ) { struct dataset *ds = xzalloc (sizeof(*ds)); ds->dict = dict_create (); ds->cf_factory = fact; + ds->replace_source = rps; + ds->replace_dict = rds; proc_cancel_all_transformations (ds); return ds; } @@ -866,7 +886,7 @@ destroy_dataset (struct dataset *ds) /* Sets SINK as the destination for procedure output from the next procedure. */ void -proc_set_sink (struct dataset *ds, struct case_sink *sink) +proc_set_sink (struct dataset *ds, struct case_sink *sink) { assert (ds->proc_sink == NULL); ds->proc_sink = sink; @@ -875,16 +895,18 @@ proc_set_sink (struct dataset *ds, struct case_sink *sink) /* Sets SOURCE as the source for procedure input for the next procedure. */ void -proc_set_source (struct dataset *ds, struct case_source *source) +proc_set_source (struct dataset *ds, struct case_source *source) { - assert (ds->proc_source == NULL); ds->proc_source = source; + + if ( ds->replace_source ) + ds->replace_source (ds->proc_source); } /* Returns true if a source for the next procedure has been configured, false otherwise. */ bool -proc_has_source (const struct dataset *ds) +proc_has_source (const struct dataset *ds) { return ds->proc_source != NULL; } @@ -894,7 +916,7 @@ proc_has_source (const struct dataset *ds) The returned casefile is owned by the caller; it will not be automatically used for the next procedure's input. */ struct casefile * -proc_capture_output (struct dataset *ds) +proc_capture_output (struct dataset *ds) { struct casefile *casefile; @@ -906,7 +928,7 @@ proc_capture_output (struct dataset *ds) assert (!proc_in_temporary_transformations (ds)); casefile = storage_source_decapsulate (ds->proc_source); - ds->proc_source = NULL; + proc_set_source (ds, NULL); return casefile; } @@ -917,7 +939,7 @@ static trns_free_func case_limit_trns_free; /* Adds a transformation that limits the number of cases that may pass through, if DS->DICT has a case limit. */ static void -add_case_limit_trns (struct dataset *ds) +add_case_limit_trns (struct dataset *ds) { size_t case_limit = dict_get_case_limit (ds->dict); if (case_limit != 0) @@ -934,10 +956,10 @@ add_case_limit_trns (struct dataset *ds) *CASES_REMAINING. */ static int case_limit_trns_proc (void *cases_remaining_, - struct ccase *c UNUSED, casenumber case_nr UNUSED) + struct ccase *c UNUSED, casenumber case_nr UNUSED) { size_t *cases_remaining = cases_remaining_; - if (*cases_remaining > 0) + if (*cases_remaining > 0) { (*cases_remaining)--; return TRNS_CONTINUE; @@ -948,7 +970,7 @@ case_limit_trns_proc (void *cases_remaining_, /* Frees the data associated with a case limit transformation. */ static bool -case_limit_trns_free (void *cases_remaining_) +case_limit_trns_free (void *cases_remaining_) { size_t *cases_remaining = cases_remaining_; free (cases_remaining); @@ -960,10 +982,10 @@ static trns_proc_func filter_trns_proc; /* Adds a temporary transformation to filter data according to the variable specified on FILTER, if any. */ static void -add_filter_trns (struct dataset *ds) +add_filter_trns (struct dataset *ds) { struct variable *filter_var = dict_get_filter (ds->dict); - if (filter_var != NULL) + if (filter_var != NULL) { proc_start_temporary_transformations (ds); add_transformation (ds, filter_trns_proc, NULL, filter_var); @@ -973,12 +995,12 @@ add_filter_trns (struct dataset *ds) /* FILTER transformation. */ static int filter_trns_proc (void *filter_var_, - struct ccase *c UNUSED, casenumber case_nr UNUSED) - + struct ccase *c UNUSED, casenumber case_nr UNUSED) + { struct variable *filter_var = filter_var_; double f = case_num (c, filter_var); - return (f != 0.0 && !var_is_num_missing (filter_var, f) + return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY) ? TRNS_CONTINUE : TRNS_DROP_CASE); } @@ -990,19 +1012,29 @@ dataset_dict (const struct dataset *ds) } -void +/* Set or replace dataset DS's dictionary with DICT. + The old dictionary is destroyed */ +void dataset_set_dict (struct dataset *ds, struct dictionary *dict) { + struct dictionary *old_dict = ds->dict; + + dict_copy_callbacks (dict, ds->dict); ds->dict = dict; + + if ( ds->replace_dict ) + ds->replace_dict (dict); + + dict_destroy (old_dict); } -int +int dataset_n_lag (const struct dataset *ds) { return ds->n_lag; } -void +void dataset_set_n_lag (struct dataset *ds, int n_lag) { ds->n_lag = n_lag;