X-Git-Url: https://pintos-os.org/cgi-bin/gitweb.cgi?a=blobdiff_plain;f=src%2Fdata%2Fprocedure.c;h=ed69420c3e464bd71221bd3bff5ee0ad3e727037;hb=df936ae7a823634e599df091f8ae935d7842fbaa;hp=2154f689c8f50d7a3370d8d99a935f2056590897;hpb=ecd26ec19e9f8a58079a1c5fa06b39484787ab7e;p=pspp-builds.git diff --git a/src/data/procedure.c b/src/data/procedure.c index 2154f689..ed69420c 100644 --- a/src/data/procedure.c +++ b/src/data/procedure.c @@ -1,5 +1,5 @@ /* PSPP - computes sample statistics. - Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc. + Copyright (C) 1997-9, 2000, 2006, 2007 Free Software Foundation, Inc. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as @@ -26,6 +26,7 @@ #include #include #include +#include #include #include #include @@ -75,11 +76,9 @@ struct dataset { /* Time at which proc was last invoked. */ time_t last_proc_invocation; - /* Lag queue. */ + /* Cases just before ("lagging") the current one. */ int n_lag; /* Number of cases to lag. */ - int lag_count; /* Number of cases in lag_queue so far. */ - int lag_head; /* Index where next case will be added. */ - struct ccase *lag_queue; /* Array of n_lag ccase * elements. */ + struct casedeque lagged_cases; /* Lagged cases. */ /* Procedure data. */ bool is_open; /* Procedure open? */ @@ -100,7 +99,6 @@ static bool internal_procedure (struct dataset *ds, case_func *, static void update_last_proc_invocation (struct dataset *ds); static void create_trns_case (struct ccase *, struct dictionary *); static void open_active_file (struct dataset *ds); -static void lag_case (struct dataset *ds, const struct ccase *c); static void clear_case (const struct dataset *ds, struct ccase *c); static bool close_active_file (struct dataset *ds); @@ -108,7 +106,7 @@ static bool close_active_file (struct dataset *ds); /* Returns the last time the data was read. */ time_t -time_of_last_procedure (struct dataset *ds) +time_of_last_procedure (struct dataset *ds) { if (ds->last_proc_invocation == 0) update_last_proc_invocation (ds); @@ -127,10 +125,10 @@ time_of_last_procedure (struct dataset *ds) start the next case from step 1. 2. Write case to replacement active file. - + 3. Execute temporary transformations. If these drop the case, start the next case from step 1. - + 4. Pass case to PROC_FUNC, passing AUX as auxiliary data. Returns true if successful, false if an I/O error occurred. */ @@ -162,7 +160,7 @@ procedure (struct dataset *ds, case_func *cf, void *aux) struct multipass_aux_data { struct casefile *casefile; - + bool (*proc_func) (const struct casefile *, void *aux); void *aux; }; @@ -233,8 +231,6 @@ internal_procedure (struct dataset *ds, case_func *proc, if ( proc_close (ds) && ok ) { - if ( ds->replace_source ) - ds->replace_source (ds->proc_source); return true; } @@ -295,10 +291,15 @@ proc_read (struct dataset *ds, struct ccase **c) &ds->trns_case, &case_nr); if (retval != TRNS_CONTINUE) continue; - - /* Write case to LAG queue. */ - if (ds->n_lag) - lag_case (ds, &ds->trns_case); + + /* Write case to collection of lagged cases. */ + if (ds->n_lag > 0) + { + while (casedeque_count (&ds->lagged_cases) >= ds->n_lag) + case_destroy (casedeque_pop_back (&ds->lagged_cases)); + case_clone (casedeque_push_front (&ds->lagged_cases), + &ds->trns_case); + } /* Write case to replacement active file. */ ds->cases_written++; @@ -313,7 +314,7 @@ proc_read (struct dataset *ds, struct ccase **c) else ds->proc_sink->class->write (ds->proc_sink, &ds->trns_case); } - + /* Execute temporary transformations. */ if (ds->temporary_trns_chain != NULL) { @@ -347,7 +348,7 @@ proc_close (struct dataset *ds) break; } ds->ok = free_case_source (ds->proc_source) && ds->ok; - ds->proc_source = NULL; + proc_set_source (ds, NULL); case_destroy (&ds->sink_case); case_destroy (&ds->trns_case); @@ -418,29 +419,8 @@ open_active_file (struct dataset *ds) if (ds->proc_sink->class->open != NULL) ds->proc_sink->class->open (ds->proc_sink); - /* Allocate memory for lag queue. */ - if (ds->n_lag > 0) - { - int i; - - ds->lag_count = 0; - ds->lag_head = 0; - ds->lag_queue = xnmalloc (ds->n_lag, sizeof *ds->lag_queue); - for (i = 0; i < ds->n_lag; i++) - case_nullify (&ds->lag_queue[i]); - } -} - -/* Add C to the lag queue. */ -static void -lag_case (struct dataset *ds, const struct ccase *c) -{ - if (ds->lag_count < ds->n_lag) - ds->lag_count++; - case_destroy (&ds->lag_queue[ds->lag_head]); - case_clone (&ds->lag_queue[ds->lag_head], c); - if (++ds->lag_head >= ds->n_lag) - ds->lag_head = 0; + /* Allocate memory for lagged cases. */ + casedeque_init (&ds->lagged_cases, ds->n_lag); } /* Clears the variables in C that need to be cleared between @@ -450,7 +430,7 @@ clear_case (const struct dataset *ds, struct ccase *c) { size_t var_cnt = dict_get_var_cnt (ds->dict); size_t i; - + for (i = 0; i < var_cnt; i++) { struct variable *v = dict_get_var (ds->dict, i); @@ -468,17 +448,11 @@ clear_case (const struct dataset *ds, struct ccase *c) static bool close_active_file (struct dataset *ds) { - /* Free memory for lag queue, and turn off lagging. */ - if (ds->n_lag > 0) - { - int i; - - for (i = 0; i < ds->n_lag; i++) - case_destroy (&ds->lag_queue[i]); - free (ds->lag_queue); - ds->n_lag = 0; - } - + /* Free memory for lagged cases. */ + while (!casedeque_is_empty (&ds->lagged_cases)) + case_destroy (casedeque_pop_back (&ds->lagged_cases)); + casedeque_destroy (&ds->lagged_cases); + /* Dictionary from before TEMPORARY becomes permanent. */ proc_cancel_temporary_transformations (ds); @@ -489,10 +463,10 @@ close_active_file (struct dataset *ds) dict_compact_values (ds->dict); ds->compactor = NULL; } - + /* Old data sink becomes new data source. */ if (ds->proc_sink->class->make_source != NULL) - ds->proc_source = ds->proc_sink->class->make_source (ds->proc_sink); + proc_set_source (ds, ds->proc_sink->class->make_source (ds->proc_sink) ); free_case_sink (ds->proc_sink); ds->proc_sink = NULL; @@ -506,16 +480,11 @@ close_active_file (struct dataset *ds) struct ccase * lagged_case (const struct dataset *ds, int n_before) { - assert (n_before >= 1 ); + assert (n_before >= 1); assert (n_before <= ds->n_lag); - if (n_before <= ds->lag_count) - { - int index = ds->lag_head - n_before; - if (index < 0) - index += ds->n_lag; - return &ds->lag_queue[index]; - } + if (n_before <= casedeque_count (&ds->lagged_cases)) + return casedeque_front (&ds->lagged_cases, n_before - 1); else return NULL; } @@ -554,7 +523,7 @@ static bool split_procedure_end_func (void *, const struct dataset *); If SPLIT FILE is not in effect, then there is one break group (if the active file is nonempty), and BEGIN_FUNC and END_FUNC will be called once. - + Returns true if successful, false if an I/O error occurred. */ bool procedure_with_splits (struct dataset *ds, @@ -707,7 +676,7 @@ static bool multipass_split_output (struct multipass_split_aux_data *aux, const struct dataset *ds) { bool ok; - + assert (aux->casefile != NULL); ok = aux->split (&aux->prev_case, aux->casefile, aux->func_aux, ds); casefile_destroy (aux->casefile); @@ -725,12 +694,9 @@ discard_variables (struct dataset *ds) fh_set_default_handle (NULL); ds->n_lag = 0; - - free_case_source (ds->proc_source); - ds->proc_source = NULL; - if ( ds->replace_source ) - ds->replace_source (ds->proc_source); + free_case_source (ds->proc_source); + proc_set_source (ds, NULL); proc_cancel_all_transformations (ds); } @@ -742,7 +708,7 @@ struct trns_chain * proc_capture_transformations (struct dataset *ds) { struct trns_chain *chain; - + assert (ds->temporary_trns_chain == NULL); chain = ds->permanent_trns_chain; ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create (); @@ -801,6 +767,7 @@ proc_start_temporary_transformations (struct dataset *ds) add_case_limit_trns (ds); ds->permanent_dict = dict_clone (ds->dict); + trns_chain_finalize (ds->permanent_trns_chain); ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create (); } @@ -836,8 +803,7 @@ proc_cancel_temporary_transformations (struct dataset *ds) { if (proc_in_temporary_transformations (ds)) { - dict_destroy (ds->dict); - ds->dict = ds->permanent_dict; + dataset_set_dict (ds, ds->permanent_dict); ds->permanent_dict = NULL; trns_chain_destroy (ds->temporary_trns_chain); @@ -902,8 +868,10 @@ proc_set_sink (struct dataset *ds, struct case_sink *sink) void proc_set_source (struct dataset *ds, struct case_source *source) { - assert (ds->proc_source == NULL); ds->proc_source = source; + + if ( ds->replace_source ) + ds->replace_source (ds->proc_source); } /* Returns true if a source for the next procedure has been @@ -931,7 +899,7 @@ proc_capture_output (struct dataset *ds) assert (!proc_in_temporary_transformations (ds)); casefile = storage_source_decapsulate (ds->proc_source); - ds->proc_source = NULL; + proc_set_source (ds, NULL); return casefile; } @@ -1015,29 +983,28 @@ dataset_dict (const struct dataset *ds) } +/* Set or replace dataset DS's dictionary with DICT. + The old dictionary is destroyed */ void dataset_set_dict (struct dataset *ds, struct dictionary *dict) { + struct dictionary *old_dict = ds->dict; + dict_copy_callbacks (dict, ds->dict); ds->dict = dict; if ( ds->replace_dict ) ds->replace_dict (dict); -} -int -dataset_n_lag (const struct dataset *ds) -{ - return ds->n_lag; + dict_destroy (old_dict); } -void -dataset_set_n_lag (struct dataset *ds, int n_lag) +void +dataset_need_lag (struct dataset *ds, int n_before) { - ds->n_lag = n_lag; + ds->n_lag = MAX (ds->n_lag, n_before); } - struct casefile_factory * dataset_get_casefile_factory (const struct dataset *ds) {