X-Git-Url: https://pintos-os.org/cgi-bin/gitweb.cgi?a=blobdiff_plain;f=src%2Fdata%2Fprocedure.c;h=ed69420c3e464bd71221bd3bff5ee0ad3e727037;hb=8b708f4291deb6ee408e8f62a694b638c581c40e;hp=5341de4652a068e5794319c98b4720b0eccd6763;hpb=42489b63e0b4bec2e20c2f55c9791234f7b41764;p=pspp-builds.git diff --git a/src/data/procedure.c b/src/data/procedure.c index 5341de46..ed69420c 100644 --- a/src/data/procedure.c +++ b/src/data/procedure.c @@ -1,6 +1,5 @@ /* PSPP - computes sample statistics. - Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc. - Written by Ben Pfaff . + Copyright (C) 1997-9, 2000, 2006, 2007 Free Software Foundation, Inc. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as @@ -27,6 +26,7 @@ #include #include #include +#include #include #include #include @@ -39,21 +39,18 @@ #include #include -/* Procedure execution data. */ -struct write_case_data - { - /* Function to call for each case. */ - case_func_t case_func; - void *aux; +struct dataset { - struct dataset *dataset; /* The dataset concerned */ - struct ccase trns_case; /* Case used for transformations. */ - struct ccase sink_case; /* Case written to sink, if - compacting is necessary. */ - size_t cases_written; /* Cases output so far. */ - }; + /* An abstract factory which creates casefiles */ + struct casefile_factory *cf_factory; + + /* Callback which occurs when a procedure provides a new source for + the dataset */ + replace_source_callback *replace_source ; + + /* Callback which occurs whenever the DICT is replaced by a new one */ + replace_dictionary_callback *replace_dict; -struct dataset { /* Cases are read from proc_source, pass through permanent_trns_chain (which transforms them into the format described by permanent_dict), @@ -79,28 +76,29 @@ struct dataset { /* Time at which proc was last invoked. */ time_t last_proc_invocation; - /* Lag queue. */ + /* Cases just before ("lagging") the current one. */ int n_lag; /* Number of cases to lag. */ - int lag_count; /* Number of cases in lag_queue so far. */ - int lag_head; /* Index where next case will be added. */ - struct ccase *lag_queue; /* Array of n_lag ccase * elements. */ - + struct casedeque lagged_cases; /* Lagged cases. */ + + /* Procedure data. */ + bool is_open; /* Procedure open? */ + struct ccase trns_case; /* Case used for transformations. */ + struct ccase sink_case; /* Case written to sink, if + compacting is necessary. */ + size_t cases_written; /* Cases output so far. */ + bool ok; }; /* struct dataset */ -struct dataset *current_dataset; - static void add_case_limit_trns (struct dataset *ds); static void add_filter_trns (struct dataset *ds); -static bool internal_procedure (struct dataset *ds, case_func_t, - bool (*end_func) (void *), +static bool internal_procedure (struct dataset *ds, case_func *, + end_func *, void *aux); static void update_last_proc_invocation (struct dataset *ds); static void create_trns_case (struct ccase *, struct dictionary *); static void open_active_file (struct dataset *ds); -static bool write_case (struct write_case_data *wc_data); -static void lag_case (struct dataset *ds, const struct ccase *c); static void clear_case (const struct dataset *ds, struct ccase *c); static bool close_active_file (struct dataset *ds); @@ -108,7 +106,7 @@ static bool close_active_file (struct dataset *ds); /* Returns the last time the data was read. */ time_t -time_of_last_procedure (struct dataset *ds) +time_of_last_procedure (struct dataset *ds) { if (ds->last_proc_invocation == 0) update_last_proc_invocation (ds); @@ -127,32 +125,49 @@ time_of_last_procedure (struct dataset *ds) start the next case from step 1. 2. Write case to replacement active file. - + 3. Execute temporary transformations. If these drop the case, start the next case from step 1. - + 4. Pass case to PROC_FUNC, passing AUX as auxiliary data. Returns true if successful, false if an I/O error occurred. */ bool -procedure (struct dataset *ds, case_func_t cf, void *aux) +procedure (struct dataset *ds, case_func *cf, void *aux) { + update_last_proc_invocation (ds); + + /* Optimize the trivial case where we're not going to do + anything with the data, by not reading the data at all. */ + if (cf == NULL + && case_source_is_class (ds->proc_source, &storage_source_class) + && ds->proc_sink == NULL + && (ds->temporary_trns_chain == NULL + || trns_chain_is_empty (ds->temporary_trns_chain)) + && trns_chain_is_empty (ds->permanent_trns_chain)) + { + ds->n_lag = 0; + dict_set_case_limit (ds->dict, 0); + dict_clear_vectors (ds->dict); + return true; + } + return internal_procedure (ds, cf, NULL, aux); } /* Multipass procedure. */ -struct multipass_aux_data +struct multipass_aux_data { struct casefile *casefile; - + bool (*proc_func) (const struct casefile *, void *aux); void *aux; }; /* Case processing function for multipass_procedure(). */ static bool -multipass_case_func (const struct ccase *c, void *aux_data_) +multipass_case_func (const struct ccase *c, void *aux_data_, const struct dataset *ds UNUSED) { struct multipass_aux_data *aux_data = aux_data_; return casefile_append (aux_data->casefile, c); @@ -160,7 +175,7 @@ multipass_case_func (const struct ccase *c, void *aux_data_) /* End-of-file function for multipass_procedure(). */ static bool -multipass_end_func (void *aux_data_) +multipass_end_func (void *aux_data_, const struct dataset *ds UNUSED) { struct multipass_aux_data *aux_data = aux_data_; return (aux_data->proc_func == NULL @@ -171,12 +186,15 @@ multipass_end_func (void *aux_data_) The entire active file is passed to PROC_FUNC, with the given AUX as auxiliary data, as a unit. */ bool -multipass_procedure (struct dataset *ds, casefile_func_t proc_func, void *aux) +multipass_procedure (struct dataset *ds, casefile_func *proc_func, void *aux) { struct multipass_aux_data aux_data; bool ok; - aux_data.casefile = fastfile_create (dict_get_next_value_idx (ds->dict)); + aux_data.casefile = + ds->cf_factory->create_casefile (ds->cf_factory, + dict_get_next_value_idx (ds->dict)); + aux_data.proc_func = proc_func; aux_data.aux = aux; @@ -188,8 +206,8 @@ multipass_procedure (struct dataset *ds, casefile_func_t proc_func, void *aux) return ok; } -/* Procedure implementation. */ +/* Procedure implementation. */ /* Executes a procedure. Passes each case to CASE_FUNC. @@ -197,59 +215,153 @@ multipass_procedure (struct dataset *ds, casefile_func_t proc_func, void *aux) Returns true if successful, false if an I/O error occurred (or if CASE_FUNC or END_FUNC ever returned false). */ static bool -internal_procedure (struct dataset *ds, case_func_t case_func, - bool (*end_func) (void *), - void *aux) +internal_procedure (struct dataset *ds, case_func *proc, + end_func *end, + void *aux) { - struct write_case_data wc_data; + struct ccase *c; bool ok = true; + proc_open (ds); + while (ok && proc_read (ds, &c)) + if (proc != NULL) + ok = proc (c, aux, ds) && ok; + if (end != NULL) + ok = end (aux, ds) && ok; + + if ( proc_close (ds) && ok ) + { + + return true; + } + + return false; +} + +/* Opens dataset DS for reading cases with proc_read. + proc_close must be called when done. */ +void +proc_open (struct dataset *ds) +{ assert (ds->proc_source != NULL); + assert (!ds->is_open); update_last_proc_invocation (ds); - /* Optimize the trivial case where we're not going to do - anything with the data, by not reading the data at all. */ - if (case_func == NULL && end_func == NULL - && case_source_is_class (ds->proc_source, &storage_source_class) - && ds->proc_sink == NULL - && (ds->temporary_trns_chain == NULL - || trns_chain_is_empty (ds->temporary_trns_chain)) - && trns_chain_is_empty (ds->permanent_trns_chain)) + open_active_file (ds); + + ds->is_open = true; + create_trns_case (&ds->trns_case, ds->dict); + case_create (&ds->sink_case, dict_get_compacted_value_cnt (ds->dict)); + ds->cases_written = 0; + ds->ok = true; +} + +/* Reads the next case from dataset DS, which must have been + opened for reading with proc_open. + Returns true if successful, in which case a pointer to the + case is stored in *C. + Return false at end of file or if a read error occurs. In + this case a null pointer is stored in *C. */ +bool +proc_read (struct dataset *ds, struct ccase **c) +{ + enum trns_result retval = TRNS_DROP_CASE; + + assert (ds->is_open); + *c = NULL; + for (;;) { - ds->n_lag = 0; - dict_set_case_limit (ds->dict, 0); - dict_clear_vectors (ds->dict); + size_t case_nr; + + assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR); + if (retval == TRNS_ERROR) + ds->ok = false; + if (!ds->ok) + return false; + + /* Read a case from proc_source. */ + clear_case (ds, &ds->trns_case); + if (!ds->proc_source->class->read (ds->proc_source, &ds->trns_case)) + return false; + + /* Execute permanent transformations. */ + case_nr = ds->cases_written + 1; + retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE, + &ds->trns_case, &case_nr); + if (retval != TRNS_CONTINUE) + continue; + + /* Write case to collection of lagged cases. */ + if (ds->n_lag > 0) + { + while (casedeque_count (&ds->lagged_cases) >= ds->n_lag) + case_destroy (casedeque_pop_back (&ds->lagged_cases)); + case_clone (casedeque_push_front (&ds->lagged_cases), + &ds->trns_case); + } + + /* Write case to replacement active file. */ + ds->cases_written++; + if (ds->proc_sink->class->write != NULL) + { + if (ds->compactor != NULL) + { + dict_compactor_compact (ds->compactor, &ds->sink_case, + &ds->trns_case); + ds->proc_sink->class->write (ds->proc_sink, &ds->sink_case); + } + else + ds->proc_sink->class->write (ds->proc_sink, &ds->trns_case); + } + + /* Execute temporary transformations. */ + if (ds->temporary_trns_chain != NULL) + { + retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE, + &ds->trns_case, &ds->cases_written); + if (retval != TRNS_CONTINUE) + continue; + } + + *c = &ds->trns_case; return true; } - - open_active_file (ds); - - wc_data.case_func = case_func; - wc_data.aux = aux; - wc_data.dataset = ds; - create_trns_case (&wc_data.trns_case, ds->dict); - case_create (&wc_data.sink_case, - dict_get_compacted_value_cnt (ds->dict)); - wc_data.cases_written = 0; - - ok = ds->proc_source->class->read (ds->proc_source, - &wc_data.trns_case, - write_case, &wc_data) && ok; - if (end_func != NULL) - ok = end_func (aux) && ok; - - case_destroy (&wc_data.sink_case); - case_destroy (&wc_data.trns_case); - - ok = close_active_file (ds) && ok; +} - return ok; +/* Closes dataset DS for reading. + Returns true if successful, false if an I/O error occurred + while reading or closing the data set. + If DS has not been opened, returns true without doing + anything else. */ +bool +proc_close (struct dataset *ds) +{ + if (!ds->is_open) + return true; + + /* Drain any remaining cases. */ + while (ds->ok) + { + struct ccase *c; + if (!proc_read (ds, &c)) + break; + } + ds->ok = free_case_source (ds->proc_source) && ds->ok; + proc_set_source (ds, NULL); + + case_destroy (&ds->sink_case); + case_destroy (&ds->trns_case); + + ds->ok = close_active_file (ds) && ds->ok; + ds->is_open = false; + + return ds->ok; } /* Updates last_proc_invocation. */ static void -update_last_proc_invocation (struct dataset *ds) +update_last_proc_invocation (struct dataset *ds) { ds->last_proc_invocation = time (NULL); } @@ -264,15 +376,15 @@ create_trns_case (struct ccase *trns_case, struct dictionary *dict) size_t i; case_create (trns_case, dict_get_next_value_idx (dict)); - for (i = 0; i < var_cnt; i++) + for (i = 0; i < var_cnt; i++) { struct variable *v = dict_get_var (dict, i); - union value *value = case_data_rw (trns_case, v->fv); + union value *value = case_data_rw (trns_case, v); - if (v->type == NUMERIC) - value->f = v->leave ? 0.0 : SYSMIS; + if (var_is_numeric (v)) + value->f = var_get_leave (v) ? 0.0 : SYSMIS; else - memset (value->s, ' ', v->width); + memset (value->s, ' ', var_get_width (v)); } } @@ -293,97 +405,22 @@ open_active_file (struct dataset *ds) ds->permanent_dict = ds->dict; /* Figure out whether to compact. */ - ds->compactor = + ds->compactor = (dict_compacting_would_shrink (ds->permanent_dict) ? dict_make_compactor (ds->permanent_dict) : NULL); /* Prepare sink. */ if (ds->proc_sink == NULL) - ds->proc_sink = create_case_sink (&storage_sink_class, ds->permanent_dict, NULL); + ds->proc_sink = create_case_sink (&storage_sink_class, + ds->permanent_dict, + ds->cf_factory, + NULL); if (ds->proc_sink->class->open != NULL) ds->proc_sink->class->open (ds->proc_sink); - /* Allocate memory for lag queue. */ - if (ds->n_lag > 0) - { - int i; - - ds->lag_count = 0; - ds->lag_head = 0; - ds->lag_queue = xnmalloc (ds->n_lag, sizeof *ds->lag_queue); - for (i = 0; i < ds->n_lag; i++) - case_nullify (&ds->lag_queue[i]); - } -} - -/* Transforms trns_case and writes it to the replacement active - file if advisable. Returns true if more cases can be - accepted, false otherwise. Do not call this function again - after it has returned false once. */ -static bool -write_case (struct write_case_data *wc_data) -{ - enum trns_result retval; - size_t case_nr; - - struct dataset *ds = wc_data->dataset; - - /* Execute permanent transformations. */ - case_nr = wc_data->cases_written + 1; - retval = trns_chain_execute (ds->permanent_trns_chain, - &wc_data->trns_case, &case_nr); - if (retval != TRNS_CONTINUE) - goto done; - - /* Write case to LAG queue. */ - if (ds->n_lag) - lag_case (ds, &wc_data->trns_case); - - /* Write case to replacement active file. */ - wc_data->cases_written++; - if (ds->proc_sink->class->write != NULL) - { - if (ds->compactor != NULL) - { - dict_compactor_compact (ds->compactor, &wc_data->sink_case, - &wc_data->trns_case); - ds->proc_sink->class->write (ds->proc_sink, &wc_data->sink_case); - } - else - ds->proc_sink->class->write (ds->proc_sink, &wc_data->trns_case); - } - - /* Execute temporary transformations. */ - if (ds->temporary_trns_chain != NULL) - { - retval = trns_chain_execute (ds->temporary_trns_chain, - &wc_data->trns_case, - &wc_data->cases_written); - if (retval != TRNS_CONTINUE) - goto done; - } - - /* Pass case to procedure. */ - if (wc_data->case_func != NULL) - if (!wc_data->case_func (&wc_data->trns_case, wc_data->aux)) - retval = TRNS_ERROR; - - done: - clear_case (ds, &wc_data->trns_case); - return retval != TRNS_ERROR; -} - -/* Add C to the lag queue. */ -static void -lag_case (struct dataset *ds, const struct ccase *c) -{ - if (ds->lag_count < ds->n_lag) - ds->lag_count++; - case_destroy (&ds->lag_queue[ds->lag_head]); - case_clone (&ds->lag_queue[ds->lag_head], c); - if (++ds->lag_head >= ds->n_lag) - ds->lag_head = 0; + /* Allocate memory for lagged cases. */ + casedeque_init (&ds->lagged_cases, ds->n_lag); } /* Clears the variables in C that need to be cleared between @@ -393,17 +430,17 @@ clear_case (const struct dataset *ds, struct ccase *c) { size_t var_cnt = dict_get_var_cnt (ds->dict); size_t i; - - for (i = 0; i < var_cnt; i++) + + for (i = 0; i < var_cnt; i++) { struct variable *v = dict_get_var (ds->dict, i); - if (!v->leave) + if (!var_get_leave (v)) { - if (v->type == NUMERIC) - case_data_rw (c, v->fv)->f = SYSMIS; + if (var_is_numeric (v)) + case_data_rw (c, v)->f = SYSMIS; else - memset (case_data_rw (c, v->fv)->s, ' ', v->width); - } + memset (case_data_rw (c, v)->s, ' ', var_get_width (v)); + } } } @@ -411,35 +448,25 @@ clear_case (const struct dataset *ds, struct ccase *c) static bool close_active_file (struct dataset *ds) { - /* Free memory for lag queue, and turn off lagging. */ - if (ds->n_lag > 0) - { - int i; - - for (i = 0; i < ds->n_lag; i++) - case_destroy (&ds->lag_queue[i]); - free (ds->lag_queue); - ds->n_lag = 0; - } - + /* Free memory for lagged cases. */ + while (!casedeque_is_empty (&ds->lagged_cases)) + case_destroy (casedeque_pop_back (&ds->lagged_cases)); + casedeque_destroy (&ds->lagged_cases); + /* Dictionary from before TEMPORARY becomes permanent. */ proc_cancel_temporary_transformations (ds); /* Finish compacting. */ - if (ds->compactor != NULL) + if (ds->compactor != NULL) { dict_compactor_destroy (ds->compactor); dict_compact_values (ds->dict); ds->compactor = NULL; } - - /* Free data source. */ - free_case_source (ds->proc_source); - ds->proc_source = NULL; /* Old data sink becomes new data source. */ if (ds->proc_sink->class->make_source != NULL) - ds->proc_source = ds->proc_sink->class->make_source (ds->proc_sink); + proc_set_source (ds, ds->proc_sink->class->make_source (ds->proc_sink) ); free_case_sink (ds->proc_sink); ds->proc_sink = NULL; @@ -453,16 +480,11 @@ close_active_file (struct dataset *ds) struct ccase * lagged_case (const struct dataset *ds, int n_before) { - assert (n_before >= 1 ); + assert (n_before >= 1); assert (n_before <= ds->n_lag); - if (n_before <= ds->lag_count) - { - int index = ds->lag_head - n_before; - if (index < 0) - index += ds->n_lag; - return &ds->lag_queue[index]; - } + if (n_before <= casedeque_count (&ds->lagged_cases)) + return casedeque_front (&ds->lagged_cases, n_before - 1); else return NULL; } @@ -470,21 +492,21 @@ lagged_case (const struct dataset *ds, int n_before) /* Procedure that separates the data into SPLIT FILE groups. */ /* Represents auxiliary data for handling SPLIT FILE. */ -struct split_aux_data +struct split_aux_data { struct dataset *dataset; /* The dataset */ struct ccase prev_case; /* Data in previous case. */ /* Callback functions. */ - begin_func_t begin_func ; - case_func_t proc_func ; - void (*end_func) (void *); + begin_func *begin; + case_func *proc; + end_func *end; void *func_aux; }; static int equal_splits (const struct ccase *, const struct ccase *, const struct dataset *ds); -static bool split_procedure_case_func (const struct ccase *c, void *); -static bool split_procedure_end_func (void *); +static bool split_procedure_case_func (const struct ccase *c, void *, const struct dataset *); +static bool split_procedure_end_func (void *, const struct dataset *); /* Like procedure(), but it automatically breaks the case stream into SPLIT FILE break groups. Before each group of cases with @@ -496,27 +518,27 @@ static bool split_procedure_end_func (void *); passed to each of the functions as auxiliary data. If the active file is empty, none of BEGIN_FUNC, PROC_FUNC, - and END_FUNC will be called at all. + and END_FUNC will be called at all. If SPLIT FILE is not in effect, then there is one break group (if the active file is nonempty), and BEGIN_FUNC and END_FUNC will be called once. - + Returns true if successful, false if an I/O error occurred. */ bool procedure_with_splits (struct dataset *ds, - begin_func_t begin_func, - case_func_t proc_func, - void (*end_func) (void *aux), - void *func_aux) + begin_func begin, + case_func *proc, + end_func *end, + void *func_aux) { struct split_aux_data split_aux; bool ok; case_nullify (&split_aux.prev_case); - split_aux.begin_func = begin_func; - split_aux.proc_func = proc_func; - split_aux.end_func = end_func; + split_aux.begin = begin; + split_aux.proc = proc; + split_aux.end = end; split_aux.func_aux = func_aux; split_aux.dataset = ds; @@ -530,7 +552,7 @@ procedure_with_splits (struct dataset *ds, /* Case callback used by procedure_with_splits(). */ static bool -split_procedure_case_func (const struct ccase *c, void *split_aux_) +split_procedure_case_func (const struct ccase *c, void *split_aux_, const struct dataset *ds) { struct split_aux_data *split_aux = split_aux_; @@ -538,37 +560,36 @@ split_procedure_case_func (const struct ccase *c, void *split_aux_) if (case_is_null (&split_aux->prev_case) || !equal_splits (c, &split_aux->prev_case, split_aux->dataset)) { - if (!case_is_null (&split_aux->prev_case) && split_aux->end_func != NULL) - split_aux->end_func (split_aux->func_aux); + if (!case_is_null (&split_aux->prev_case) && split_aux->end != NULL) + split_aux->end (split_aux->func_aux, ds); case_destroy (&split_aux->prev_case); case_clone (&split_aux->prev_case, c); - if (split_aux->begin_func != NULL) - split_aux->begin_func (&split_aux->prev_case, split_aux->func_aux); - + if (split_aux->begin != NULL) + split_aux->begin (&split_aux->prev_case, split_aux->func_aux, ds); } - return (split_aux->proc_func == NULL - || split_aux->proc_func (c, split_aux->func_aux)); + return (split_aux->proc == NULL + || split_aux->proc (c, split_aux->func_aux, ds)); } /* End-of-file callback used by procedure_with_splits(). */ static bool -split_procedure_end_func (void *split_aux_) +split_procedure_end_func (void *split_aux_, const struct dataset *ds) { struct split_aux_data *split_aux = split_aux_; - if (!case_is_null (&split_aux->prev_case) && split_aux->end_func != NULL) - split_aux->end_func (split_aux->func_aux); + if (!case_is_null (&split_aux->prev_case) && split_aux->end != NULL) + split_aux->end (split_aux->func_aux, ds); return true; } /* Compares the SPLIT FILE variables in cases A and B and returns nonzero only if they differ. */ static int -equal_splits (const struct ccase *a, const struct ccase *b, - const struct dataset *ds) +equal_splits (const struct ccase *a, const struct ccase *b, + const struct dataset *ds) { return case_compare (a, b, dict_get_split_vars (ds->dict), @@ -580,28 +601,24 @@ equal_splits (const struct ccase *a, const struct ccase *b, /* Represents auxiliary data for handling SPLIT FILE in a multipass procedure. */ -struct multipass_split_aux_data +struct multipass_split_aux_data { struct dataset *dataset; /* The dataset of the split */ struct ccase prev_case; /* Data in previous case. */ struct casefile *casefile; /* Accumulates data for a split. */ - - /* Function to call with the accumulated data. */ - bool (*split_func) (const struct ccase *first, const struct casefile *, - void *); - void *func_aux; /* Auxiliary data. */ + split_func *split; /* Function to call with the accumulated + data. */ + void *func_aux; /* Auxiliary data. */ }; -static bool multipass_split_case_func (const struct ccase *c, void *aux_); -static bool multipass_split_end_func (void *aux_); -static bool multipass_split_output (struct multipass_split_aux_data *); +static bool multipass_split_case_func (const struct ccase *c, void *aux_, const struct dataset *); +static bool multipass_split_end_func (void *aux_, const struct dataset *ds); +static bool multipass_split_output (struct multipass_split_aux_data *, const struct dataset *ds); /* Returns true if successful, false if an I/O error occurred. */ bool -multipass_procedure_with_splits (struct dataset *ds, - bool (*split_func) (const struct ccase *first, - const struct casefile *, - void *aux), +multipass_procedure_with_splits (struct dataset *ds, + split_func *split, void *func_aux) { struct multipass_split_aux_data aux; @@ -609,7 +626,7 @@ multipass_procedure_with_splits (struct dataset *ds, case_nullify (&aux.prev_case); aux.casefile = NULL; - aux.split_func = split_func; + aux.split = split; aux.func_aux = func_aux; aux.dataset = ds; @@ -622,10 +639,9 @@ multipass_procedure_with_splits (struct dataset *ds, /* Case callback used by multipass_procedure_with_splits(). */ static bool -multipass_split_case_func (const struct ccase *c, void *aux_) +multipass_split_case_func (const struct ccase *c, void *aux_, const struct dataset *ds) { struct multipass_split_aux_data *aux = aux_; - struct dataset *ds = aux->dataset; bool ok = true; /* Start a new series if needed. */ @@ -637,11 +653,12 @@ multipass_split_case_func (const struct ccase *c, void *aux_) /* Pass any cases to split_func. */ if (aux->casefile != NULL) - ok = multipass_split_output (aux); + ok = multipass_split_output (aux, ds); /* Start a new casefile. */ - aux->casefile = - fastfile_create (dict_get_next_value_idx (ds->dict)); + aux->casefile = + ds->cf_factory->create_casefile (ds->cf_factory, + dict_get_next_value_idx (ds->dict)); } return casefile_append (aux->casefile, c) && ok; @@ -649,19 +666,19 @@ multipass_split_case_func (const struct ccase *c, void *aux_) /* End-of-file callback used by multipass_procedure_with_splits(). */ static bool -multipass_split_end_func (void *aux_) +multipass_split_end_func (void *aux_, const struct dataset *ds) { struct multipass_split_aux_data *aux = aux_; - return (aux->casefile == NULL || multipass_split_output (aux)); + return (aux->casefile == NULL || multipass_split_output (aux, ds)); } static bool -multipass_split_output (struct multipass_split_aux_data *aux) +multipass_split_output (struct multipass_split_aux_data *aux, const struct dataset *ds) { bool ok; - + assert (aux->casefile != NULL); - ok = aux->split_func (&aux->prev_case, aux->casefile, aux->func_aux); + ok = aux->split (&aux->prev_case, aux->casefile, aux->func_aux, ds); casefile_destroy (aux->casefile); aux->casefile = NULL; @@ -677,9 +694,9 @@ discard_variables (struct dataset *ds) fh_set_default_handle (NULL); ds->n_lag = 0; - + free_case_source (ds->proc_source); - ds->proc_source = NULL; + proc_set_source (ds, NULL); proc_cancel_all_transformations (ds); } @@ -688,10 +705,10 @@ discard_variables (struct dataset *ds) and clears the permanent transformations. For use by INPUT PROGRAM. */ struct trns_chain * -proc_capture_transformations (struct dataset *ds) +proc_capture_transformations (struct dataset *ds) { struct trns_chain *chain; - + assert (ds->temporary_trns_chain == NULL); chain = ds->permanent_trns_chain; ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create (); @@ -713,7 +730,7 @@ add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *fr FINALIZE will be called. The functions are passed AUX as auxiliary data. */ void -add_transformation_with_finalizer (struct dataset *ds, +add_transformation_with_finalizer (struct dataset *ds, trns_finalize_func *finalize, trns_proc_func *proc, trns_free_func *free, void *aux) @@ -725,7 +742,7 @@ add_transformation_with_finalizer (struct dataset *ds, This value can be returned by a transformation procedure function to indicate a "jump" to that transformation. */ size_t -next_transformation (const struct dataset *ds) +next_transformation (const struct dataset *ds) { return trns_chain_next (ds->cur_trns_chain); } @@ -734,7 +751,7 @@ next_transformation (const struct dataset *ds) a temporary transformation, false if it will add a permanent transformation. */ bool -proc_in_temporary_transformations (const struct dataset *ds) +proc_in_temporary_transformations (const struct dataset *ds) { return ds->temporary_trns_chain != NULL; } @@ -743,13 +760,14 @@ proc_in_temporary_transformations (const struct dataset *ds) Further calls to add_transformation() will add temporary transformations. */ void -proc_start_temporary_transformations (struct dataset *ds) +proc_start_temporary_transformations (struct dataset *ds) { if (!proc_in_temporary_transformations (ds)) { add_case_limit_trns (ds); ds->permanent_dict = dict_clone (ds->dict); + trns_chain_finalize (ds->permanent_trns_chain); ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create (); } @@ -760,9 +778,9 @@ proc_start_temporary_transformations (struct dataset *ds) permanent. Returns true if anything changed, false otherwise. */ bool -proc_make_temporary_transformations_permanent (struct dataset *ds) +proc_make_temporary_transformations_permanent (struct dataset *ds) { - if (proc_in_temporary_transformations (ds)) + if (proc_in_temporary_transformations (ds)) { trns_chain_finalize (ds->temporary_trns_chain); trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain); @@ -781,12 +799,11 @@ proc_make_temporary_transformations_permanent (struct dataset *ds) transformations will be permanent. Returns true if anything changed, false otherwise. */ bool -proc_cancel_temporary_transformations (struct dataset *ds) +proc_cancel_temporary_transformations (struct dataset *ds) { - if (proc_in_temporary_transformations (ds)) + if (proc_in_temporary_transformations (ds)) { - dict_destroy (ds->dict); - ds->dict = ds->permanent_dict; + dataset_set_dict (ds, ds->permanent_dict); ds->permanent_dict = NULL; trns_chain_destroy (ds->temporary_trns_chain); @@ -813,10 +830,16 @@ proc_cancel_all_transformations (struct dataset *ds) /* Initializes procedure handling. */ struct dataset * -create_dataset (void) +create_dataset (struct casefile_factory *fact, + replace_source_callback *rps, + replace_dictionary_callback *rds + ) { struct dataset *ds = xzalloc (sizeof(*ds)); ds->dict = dict_create (); + ds->cf_factory = fact; + ds->replace_source = rps; + ds->replace_dict = rds; proc_cancel_all_transformations (ds); return ds; } @@ -827,13 +850,14 @@ destroy_dataset (struct dataset *ds) { discard_variables (ds); dict_destroy (ds->dict); + trns_chain_destroy (ds->permanent_trns_chain); free (ds); } /* Sets SINK as the destination for procedure output from the next procedure. */ void -proc_set_sink (struct dataset *ds, struct case_sink *sink) +proc_set_sink (struct dataset *ds, struct case_sink *sink) { assert (ds->proc_sink == NULL); ds->proc_sink = sink; @@ -842,16 +866,18 @@ proc_set_sink (struct dataset *ds, struct case_sink *sink) /* Sets SOURCE as the source for procedure input for the next procedure. */ void -proc_set_source (struct dataset *ds, struct case_source *source) +proc_set_source (struct dataset *ds, struct case_source *source) { - assert (ds->proc_source == NULL); ds->proc_source = source; + + if ( ds->replace_source ) + ds->replace_source (ds->proc_source); } /* Returns true if a source for the next procedure has been configured, false otherwise. */ bool -proc_has_source (const struct dataset *ds) +proc_has_source (const struct dataset *ds) { return ds->proc_source != NULL; } @@ -861,7 +887,7 @@ proc_has_source (const struct dataset *ds) The returned casefile is owned by the caller; it will not be automatically used for the next procedure's input. */ struct casefile * -proc_capture_output (struct dataset *ds) +proc_capture_output (struct dataset *ds) { struct casefile *casefile; @@ -873,7 +899,7 @@ proc_capture_output (struct dataset *ds) assert (!proc_in_temporary_transformations (ds)); casefile = storage_source_decapsulate (ds->proc_source); - ds->proc_source = NULL; + proc_set_source (ds, NULL); return casefile; } @@ -884,7 +910,7 @@ static trns_free_func case_limit_trns_free; /* Adds a transformation that limits the number of cases that may pass through, if DS->DICT has a case limit. */ static void -add_case_limit_trns (struct dataset *ds) +add_case_limit_trns (struct dataset *ds) { size_t case_limit = dict_get_case_limit (ds->dict); if (case_limit != 0) @@ -901,10 +927,10 @@ add_case_limit_trns (struct dataset *ds) *CASES_REMAINING. */ static int case_limit_trns_proc (void *cases_remaining_, - struct ccase *c UNUSED, casenum_t case_nr UNUSED) + struct ccase *c UNUSED, casenumber case_nr UNUSED) { size_t *cases_remaining = cases_remaining_; - if (*cases_remaining > 0) + if (*cases_remaining > 0) { (*cases_remaining)--; return TRNS_CONTINUE; @@ -915,7 +941,7 @@ case_limit_trns_proc (void *cases_remaining_, /* Frees the data associated with a case limit transformation. */ static bool -case_limit_trns_free (void *cases_remaining_) +case_limit_trns_free (void *cases_remaining_) { size_t *cases_remaining = cases_remaining_; free (cases_remaining); @@ -927,10 +953,10 @@ static trns_proc_func filter_trns_proc; /* Adds a temporary transformation to filter data according to the variable specified on FILTER, if any. */ static void -add_filter_trns (struct dataset *ds) +add_filter_trns (struct dataset *ds) { struct variable *filter_var = dict_get_filter (ds->dict); - if (filter_var != NULL) + if (filter_var != NULL) { proc_start_temporary_transformations (ds); add_transformation (ds, filter_trns_proc, NULL, filter_var); @@ -940,12 +966,12 @@ add_filter_trns (struct dataset *ds) /* FILTER transformation. */ static int filter_trns_proc (void *filter_var_, - struct ccase *c UNUSED, casenum_t case_nr UNUSED) - + struct ccase *c UNUSED, casenumber case_nr UNUSED) + { struct variable *filter_var = filter_var_; - double f = case_num (c, filter_var->fv); - return (f != 0.0 && !mv_is_num_missing (&filter_var->miss, f) + double f = case_num (c, filter_var); + return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY) ? TRNS_CONTINUE : TRNS_DROP_CASE); } @@ -957,22 +983,31 @@ dataset_dict (const struct dataset *ds) } -void +/* Set or replace dataset DS's dictionary with DICT. + The old dictionary is destroyed */ +void dataset_set_dict (struct dataset *ds, struct dictionary *dict) { + struct dictionary *old_dict = ds->dict; + + dict_copy_callbacks (dict, ds->dict); ds->dict = dict; -} -int -dataset_n_lag (const struct dataset *ds) -{ - return ds->n_lag; + if ( ds->replace_dict ) + ds->replace_dict (dict); + + dict_destroy (old_dict); } void -dataset_set_n_lag (struct dataset *ds, int n_lag) +dataset_need_lag (struct dataset *ds, int n_before) { - ds->n_lag = n_lag; + ds->n_lag = MAX (ds->n_lag, n_before); } +struct casefile_factory * +dataset_get_casefile_factory (const struct dataset *ds) +{ + return ds->cf_factory; +}