X-Git-Url: https://pintos-os.org/cgi-bin/gitweb.cgi?a=blobdiff_plain;f=src%2Fdata%2Fprocedure.c;h=b6f988aabbaf46b2707940aca2e646504ced5ffa;hb=4517b68e7248f22e7b7ed81f0d73179351a53047;hp=16dcd745929f9962d489fd58f66bcd4110fcfc13;hpb=87668a0dae5d2e33c843cfd1b21223bd0a525220;p=pspp-builds.git diff --git a/src/data/procedure.c b/src/data/procedure.c index 16dcd745..b6f988aa 100644 --- a/src/data/procedure.c +++ b/src/data/procedure.c @@ -1,6 +1,5 @@ /* PSPP - computes sample statistics. - Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc. - Written by Ben Pfaff . + Copyright (C) 1997-9, 2000, 2006, 2007 Free Software Foundation, Inc. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as @@ -28,6 +27,7 @@ #include #include #include +#include #include #include #include @@ -38,78 +38,87 @@ #include #include -/* Procedure execution data. */ -struct write_case_data - { - /* Function to call for each case. */ - bool (*case_func) (const struct ccase *, void *); - void *aux; +struct dataset { + + /* An abstract factory which creates casefiles */ + struct casefile_factory *cf_factory; + + /* Callback which occurs when a procedure provides a new source for + the dataset */ + replace_source_callback *replace_source ; + + /* Callback which occurs whenever the DICT is replaced by a new one */ + replace_dictionary_callback *replace_dict; + + /* Cases are read from proc_source, + pass through permanent_trns_chain (which transforms them into + the format described by permanent_dict), + are written to proc_sink, + pass through temporary_trns_chain (which transforms them into + the format described by dict), + and are finally passed to the procedure. */ + struct case_source *proc_source; + struct trns_chain *permanent_trns_chain; + struct dictionary *permanent_dict; + struct case_sink *proc_sink; + struct trns_chain *temporary_trns_chain; + struct dictionary *dict; + + /* The transformation chain that the next transformation will be + added to. */ + struct trns_chain *cur_trns_chain; + + /* The compactor used to compact a case, if necessary; + otherwise a null pointer. */ + struct dict_compactor *compactor; + + /* Time at which proc was last invoked. */ + time_t last_proc_invocation; + + /* Lag queue. */ + int n_lag; /* Number of cases to lag. */ + int lag_count; /* Number of cases in lag_queue so far. */ + int lag_head; /* Index where next case will be added. */ + struct ccase *lag_queue; /* Array of n_lag ccase * elements. */ + + /* Procedure data. */ + bool is_open; /* Procedure open? */ + struct ccase trns_case; /* Case used for transformations. */ + struct ccase sink_case; /* Case written to sink, if + compacting is necessary. */ + size_t cases_written; /* Cases output so far. */ + bool ok; +}; /* struct dataset */ - struct ccase trns_case; /* Case used for transformations. */ - struct ccase sink_case; /* Case written to sink, if - compaction is necessary. */ - size_t cases_written; /* Cases output so far. */ - }; -/* Cases are read from proc_source, - pass through permanent_trns_chain (which transforms them into - the format described by permanent_dict), - are written to proc_sink, - pass through temporary_trns_chain (which transforms them into - the format described by default_dict), - and are finally passed to the procedure. */ -static struct case_source *proc_source; -static struct trns_chain *permanent_trns_chain; -static struct dictionary *permanent_dict; -static struct case_sink *proc_sink; -static struct trns_chain *temporary_trns_chain; -struct dictionary *default_dict; - -/* The transformation chain that the next transformation will be - added to. */ -static struct trns_chain *cur_trns_chain; - -/* The compactor used to compact a case, if necessary; - otherwise a null pointer. */ -static struct dict_compactor *compactor; - -/* Time at which proc was last invoked. */ -static time_t last_proc_invocation; - -/* Lag queue. */ -int n_lag; /* Number of cases to lag. */ -static int lag_count; /* Number of cases in lag_queue so far. */ -static int lag_head; /* Index where next case will be added. */ -static struct ccase *lag_queue; /* Array of n_lag ccase * elements. */ - -static void add_case_limit_trns (void); -static void add_filter_trns (void); - -static bool internal_procedure (bool (*case_func) (const struct ccase *, - void *), - bool (*end_func) (void *), +static void add_case_limit_trns (struct dataset *ds); +static void add_filter_trns (struct dataset *ds); + +static bool internal_procedure (struct dataset *ds, case_func *, + end_func *, void *aux); -static void update_last_proc_invocation (void); +static void update_last_proc_invocation (struct dataset *ds); static void create_trns_case (struct ccase *, struct dictionary *); -static void open_active_file (void); -static bool write_case (struct write_case_data *wc_data); -static void lag_case (const struct ccase *c); -static void clear_case (struct ccase *c); -static bool close_active_file (void); +static void open_active_file (struct dataset *ds); +static void lag_case (struct dataset *ds, const struct ccase *c); +static void clear_case (const struct dataset *ds, struct ccase *c); +static bool close_active_file (struct dataset *ds); /* Public functions. */ /* Returns the last time the data was read. */ time_t -time_of_last_procedure (void) +time_of_last_procedure (struct dataset *ds) { - if (last_proc_invocation == 0) - update_last_proc_invocation (); - return last_proc_invocation; + if (ds->last_proc_invocation == 0) + update_last_proc_invocation (ds); + return ds->last_proc_invocation; } /* Regular procedure. */ + + /* Reads the data from the input program and writes it to a new active file. For each case we read from the input program, we do the following: @@ -118,32 +127,49 @@ time_of_last_procedure (void) start the next case from step 1. 2. Write case to replacement active file. - + 3. Execute temporary transformations. If these drop the case, start the next case from step 1. - + 4. Pass case to PROC_FUNC, passing AUX as auxiliary data. Returns true if successful, false if an I/O error occurred. */ bool -procedure (bool (*proc_func) (const struct ccase *, void *), void *aux) +procedure (struct dataset *ds, case_func *cf, void *aux) { - return internal_procedure (proc_func, NULL, aux); + update_last_proc_invocation (ds); + + /* Optimize the trivial case where we're not going to do + anything with the data, by not reading the data at all. */ + if (cf == NULL + && case_source_is_class (ds->proc_source, &storage_source_class) + && ds->proc_sink == NULL + && (ds->temporary_trns_chain == NULL + || trns_chain_is_empty (ds->temporary_trns_chain)) + && trns_chain_is_empty (ds->permanent_trns_chain)) + { + ds->n_lag = 0; + dict_set_case_limit (ds->dict, 0); + dict_clear_vectors (ds->dict); + return true; + } + + return internal_procedure (ds, cf, NULL, aux); } /* Multipass procedure. */ -struct multipass_aux_data +struct multipass_aux_data { struct casefile *casefile; - + bool (*proc_func) (const struct casefile *, void *aux); void *aux; }; /* Case processing function for multipass_procedure(). */ static bool -multipass_case_func (const struct ccase *c, void *aux_data_) +multipass_case_func (const struct ccase *c, void *aux_data_, const struct dataset *ds UNUSED) { struct multipass_aux_data *aux_data = aux_data_; return casefile_append (aux_data->casefile, c); @@ -151,7 +177,7 @@ multipass_case_func (const struct ccase *c, void *aux_data_) /* End-of-file function for multipass_procedure(). */ static bool -multipass_end_func (void *aux_data_) +multipass_end_func (void *aux_data_, const struct dataset *ds UNUSED) { struct multipass_aux_data *aux_data = aux_data_; return (aux_data->proc_func == NULL @@ -162,17 +188,19 @@ multipass_end_func (void *aux_data_) The entire active file is passed to PROC_FUNC, with the given AUX as auxiliary data, as a unit. */ bool -multipass_procedure (bool (*proc_func) (const struct casefile *, void *aux), - void *aux) +multipass_procedure (struct dataset *ds, casefile_func *proc_func, void *aux) { struct multipass_aux_data aux_data; bool ok; - aux_data.casefile = casefile_create (dict_get_next_value_idx (default_dict)); + aux_data.casefile = + ds->cf_factory->create_casefile (ds->cf_factory, + dict_get_next_value_idx (ds->dict)); + aux_data.proc_func = proc_func; aux_data.aux = aux; - ok = internal_procedure (multipass_case_func, multipass_end_func, &aux_data); + ok = internal_procedure (ds, multipass_case_func, multipass_end_func, &aux_data); ok = !casefile_error (aux_data.casefile) && ok; casefile_destroy (aux_data.casefile); @@ -180,6 +208,7 @@ multipass_procedure (bool (*proc_func) (const struct casefile *, void *aux), return ok; } + /* Procedure implementation. */ /* Executes a procedure. @@ -188,59 +217,150 @@ multipass_procedure (bool (*proc_func) (const struct casefile *, void *aux), Returns true if successful, false if an I/O error occurred (or if CASE_FUNC or END_FUNC ever returned false). */ static bool -internal_procedure (bool (*case_func) (const struct ccase *, void *), - bool (*end_func) (void *), - void *aux) +internal_procedure (struct dataset *ds, case_func *proc, + end_func *end, + void *aux) { - struct write_case_data wc_data; + struct ccase *c; bool ok = true; - assert (proc_source != NULL); + proc_open (ds); + while (ok && proc_read (ds, &c)) + if (proc != NULL) + ok = proc (c, aux, ds) && ok; + if (end != NULL) + ok = end (aux, ds) && ok; - update_last_proc_invocation (); + if ( proc_close (ds) && ok ) + { - /* Optimize the trivial case where we're not going to do - anything with the data, by not reading the data at all. */ - if (case_func == NULL && end_func == NULL - && case_source_is_class (proc_source, &storage_source_class) - && proc_sink == NULL - && (temporary_trns_chain == NULL - || trns_chain_is_empty (temporary_trns_chain)) - && trns_chain_is_empty (permanent_trns_chain)) + return true; + } + + return false; +} + +/* Opens dataset DS for reading cases with proc_read. + proc_close must be called when done. */ +void +proc_open (struct dataset *ds) +{ + assert (ds->proc_source != NULL); + assert (!ds->is_open); + + update_last_proc_invocation (ds); + + open_active_file (ds); + + ds->is_open = true; + create_trns_case (&ds->trns_case, ds->dict); + case_create (&ds->sink_case, dict_get_compacted_value_cnt (ds->dict)); + ds->cases_written = 0; + ds->ok = true; +} + +/* Reads the next case from dataset DS, which must have been + opened for reading with proc_open. + Returns true if successful, in which case a pointer to the + case is stored in *C. + Return false at end of file or if a read error occurs. In + this case a null pointer is stored in *C. */ +bool +proc_read (struct dataset *ds, struct ccase **c) +{ + enum trns_result retval = TRNS_DROP_CASE; + + assert (ds->is_open); + *c = NULL; + for (;;) { - n_lag = 0; - dict_set_case_limit (default_dict, 0); - dict_clear_vectors (default_dict); + size_t case_nr; + + assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR); + if (retval == TRNS_ERROR) + ds->ok = false; + if (!ds->ok) + return false; + + /* Read a case from proc_source. */ + clear_case (ds, &ds->trns_case); + if (!ds->proc_source->class->read (ds->proc_source, &ds->trns_case)) + return false; + + /* Execute permanent transformations. */ + case_nr = ds->cases_written + 1; + retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE, + &ds->trns_case, &case_nr); + if (retval != TRNS_CONTINUE) + continue; + + /* Write case to LAG queue. */ + if (ds->n_lag) + lag_case (ds, &ds->trns_case); + + /* Write case to replacement active file. */ + ds->cases_written++; + if (ds->proc_sink->class->write != NULL) + { + if (ds->compactor != NULL) + { + dict_compactor_compact (ds->compactor, &ds->sink_case, + &ds->trns_case); + ds->proc_sink->class->write (ds->proc_sink, &ds->sink_case); + } + else + ds->proc_sink->class->write (ds->proc_sink, &ds->trns_case); + } + + /* Execute temporary transformations. */ + if (ds->temporary_trns_chain != NULL) + { + retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE, + &ds->trns_case, &ds->cases_written); + if (retval != TRNS_CONTINUE) + continue; + } + + *c = &ds->trns_case; return true; } - - open_active_file (); - - wc_data.case_func = case_func; - wc_data.aux = aux; - create_trns_case (&wc_data.trns_case, default_dict); - case_create (&wc_data.sink_case, dict_get_next_value_idx (default_dict)); - wc_data.cases_written = 0; - - ok = proc_source->class->read (proc_source, - &wc_data.trns_case, - write_case, &wc_data) && ok; - if (end_func != NULL) - ok = end_func (aux) && ok; - - case_destroy (&wc_data.sink_case); - case_destroy (&wc_data.trns_case); - - ok = close_active_file () && ok; +} - return ok; +/* Closes dataset DS for reading. + Returns true if successful, false if an I/O error occurred + while reading or closing the data set. + If DS has not been opened, returns true without doing + anything else. */ +bool +proc_close (struct dataset *ds) +{ + if (!ds->is_open) + return true; + + /* Drain any remaining cases. */ + while (ds->ok) + { + struct ccase *c; + if (!proc_read (ds, &c)) + break; + } + ds->ok = free_case_source (ds->proc_source) && ds->ok; + proc_set_source (ds, NULL); + + case_destroy (&ds->sink_case); + case_destroy (&ds->trns_case); + + ds->ok = close_active_file (ds) && ds->ok; + ds->is_open = false; + + return ds->ok; } /* Updates last_proc_invocation. */ static void -update_last_proc_invocation (void) +update_last_proc_invocation (struct dataset *ds) { - last_proc_invocation = time (NULL); + ds->last_proc_invocation = time (NULL); } /* Creates and returns a case, initializing it from the vectors @@ -253,201 +373,146 @@ create_trns_case (struct ccase *trns_case, struct dictionary *dict) size_t i; case_create (trns_case, dict_get_next_value_idx (dict)); - for (i = 0; i < var_cnt; i++) + for (i = 0; i < var_cnt; i++) { struct variable *v = dict_get_var (dict, i); - union value *value = case_data_rw (trns_case, v->fv); + union value *value = case_data_rw (trns_case, v); - if (v->type == NUMERIC) - value->f = v->leave ? 0.0 : SYSMIS; + if (var_is_numeric (v)) + value->f = var_get_leave (v) ? 0.0 : SYSMIS; else - memset (value->s, ' ', v->width); + memset (value->s, ' ', var_get_width (v)); } } /* Makes all preparations for reading from the data source and writing to the data sink. */ static void -open_active_file (void) +open_active_file (struct dataset *ds) { - add_case_limit_trns (); - add_filter_trns (); + add_case_limit_trns (ds); + add_filter_trns (ds); /* Finalize transformations. */ - trns_chain_finalize (cur_trns_chain); + trns_chain_finalize (ds->cur_trns_chain); /* Make permanent_dict refer to the dictionary right before data reaches the sink. */ - if (permanent_dict == NULL) - permanent_dict = default_dict; + if (ds->permanent_dict == NULL) + ds->permanent_dict = ds->dict; - /* Figure out compaction. */ - compactor = (dict_needs_compaction (permanent_dict) - ? dict_make_compactor (permanent_dict) - : NULL); + /* Figure out whether to compact. */ + ds->compactor = + (dict_compacting_would_shrink (ds->permanent_dict) + ? dict_make_compactor (ds->permanent_dict) + : NULL); /* Prepare sink. */ - if (proc_sink == NULL) - proc_sink = create_case_sink (&storage_sink_class, permanent_dict, NULL); - if (proc_sink->class->open != NULL) - proc_sink->class->open (proc_sink); + if (ds->proc_sink == NULL) + ds->proc_sink = create_case_sink (&storage_sink_class, + ds->permanent_dict, + ds->cf_factory, + NULL); + if (ds->proc_sink->class->open != NULL) + ds->proc_sink->class->open (ds->proc_sink); /* Allocate memory for lag queue. */ - if (n_lag > 0) + if (ds->n_lag > 0) { int i; - - lag_count = 0; - lag_head = 0; - lag_queue = xnmalloc (n_lag, sizeof *lag_queue); - for (i = 0; i < n_lag; i++) - case_nullify (&lag_queue[i]); - } -} -/* Transforms trns_case and writes it to the replacement active - file if advisable. Returns true if more cases can be - accepted, false otherwise. Do not call this function again - after it has returned false once. */ -static bool -write_case (struct write_case_data *wc_data) -{ - enum trns_result retval; - size_t case_nr; - - /* Execute permanent transformations. */ - case_nr = wc_data->cases_written + 1; - retval = trns_chain_execute (permanent_trns_chain, - &wc_data->trns_case, &case_nr); - if (retval != TRNS_CONTINUE) - goto done; - - /* Write case to LAG queue. */ - if (n_lag) - lag_case (&wc_data->trns_case); - - /* Write case to replacement active file. */ - wc_data->cases_written++; - if (proc_sink->class->write != NULL) - { - if (compactor != NULL) - { - dict_compactor_compact (compactor, &wc_data->sink_case, - &wc_data->trns_case); - proc_sink->class->write (proc_sink, &wc_data->sink_case); - } - else - proc_sink->class->write (proc_sink, &wc_data->trns_case); - } - - /* Execute temporary transformations. */ - if (temporary_trns_chain != NULL) - { - retval = trns_chain_execute (temporary_trns_chain, - &wc_data->trns_case, - &wc_data->cases_written); - if (retval != TRNS_CONTINUE) - goto done; + ds->lag_count = 0; + ds->lag_head = 0; + ds->lag_queue = xnmalloc (ds->n_lag, sizeof *ds->lag_queue); + for (i = 0; i < ds->n_lag; i++) + case_nullify (&ds->lag_queue[i]); } - - /* Pass case to procedure. */ - if (wc_data->case_func != NULL) - if (!wc_data->case_func (&wc_data->trns_case, wc_data->aux)) - retval = TRNS_ERROR; - - done: - clear_case (&wc_data->trns_case); - return retval != TRNS_ERROR; } /* Add C to the lag queue. */ static void -lag_case (const struct ccase *c) +lag_case (struct dataset *ds, const struct ccase *c) { - if (lag_count < n_lag) - lag_count++; - case_destroy (&lag_queue[lag_head]); - case_clone (&lag_queue[lag_head], c); - if (++lag_head >= n_lag) - lag_head = 0; + if (ds->lag_count < ds->n_lag) + ds->lag_count++; + case_destroy (&ds->lag_queue[ds->lag_head]); + case_clone (&ds->lag_queue[ds->lag_head], c); + if (++ds->lag_head >= ds->n_lag) + ds->lag_head = 0; } /* Clears the variables in C that need to be cleared between processing cases. */ static void -clear_case (struct ccase *c) +clear_case (const struct dataset *ds, struct ccase *c) { - size_t var_cnt = dict_get_var_cnt (default_dict); + size_t var_cnt = dict_get_var_cnt (ds->dict); size_t i; - - for (i = 0; i < var_cnt; i++) + + for (i = 0; i < var_cnt; i++) { - struct variable *v = dict_get_var (default_dict, i); - if (!v->leave) + struct variable *v = dict_get_var (ds->dict, i); + if (!var_get_leave (v)) { - if (v->type == NUMERIC) - case_data_rw (c, v->fv)->f = SYSMIS; + if (var_is_numeric (v)) + case_data_rw (c, v)->f = SYSMIS; else - memset (case_data_rw (c, v->fv)->s, ' ', v->width); - } + memset (case_data_rw (c, v)->s, ' ', var_get_width (v)); + } } } /* Closes the active file. */ static bool -close_active_file (void) +close_active_file (struct dataset *ds) { /* Free memory for lag queue, and turn off lagging. */ - if (n_lag > 0) + if (ds->n_lag > 0) { int i; - - for (i = 0; i < n_lag; i++) - case_destroy (&lag_queue[i]); - free (lag_queue); - n_lag = 0; + + for (i = 0; i < ds->n_lag; i++) + case_destroy (&ds->lag_queue[i]); + free (ds->lag_queue); + ds->n_lag = 0; } - + /* Dictionary from before TEMPORARY becomes permanent. */ - proc_cancel_temporary_transformations (); + proc_cancel_temporary_transformations (ds); - /* Finish compaction. */ - if (compactor != NULL) + /* Finish compacting. */ + if (ds->compactor != NULL) { - dict_compactor_destroy (compactor); - dict_compact_values (default_dict); - compactor = NULL; + dict_compactor_destroy (ds->compactor); + dict_compact_values (ds->dict); + ds->compactor = NULL; } - - /* Free data source. */ - free_case_source (proc_source); - proc_source = NULL; /* Old data sink becomes new data source. */ - if (proc_sink->class->make_source != NULL) - proc_source = proc_sink->class->make_source (proc_sink); - free_case_sink (proc_sink); - proc_sink = NULL; - - dict_clear_vectors (default_dict); - permanent_dict = NULL; - return proc_cancel_all_transformations (); + if (ds->proc_sink->class->make_source != NULL) + proc_set_source (ds, ds->proc_sink->class->make_source (ds->proc_sink) ); + free_case_sink (ds->proc_sink); + ds->proc_sink = NULL; + + dict_clear_vectors (ds->dict); + ds->permanent_dict = NULL; + return proc_cancel_all_transformations (ds); } /* Returns a pointer to the lagged case from N_BEFORE cases before the current one, or NULL if there haven't been that many cases yet. */ struct ccase * -lagged_case (int n_before) +lagged_case (const struct dataset *ds, int n_before) { assert (n_before >= 1 ); - assert (n_before <= n_lag); + assert (n_before <= ds->n_lag); - if (n_before <= lag_count) + if (n_before <= ds->lag_count) { - int index = lag_head - n_before; + int index = ds->lag_head - n_before; if (index < 0) - index += n_lag; - return &lag_queue[index]; + index += ds->n_lag; + return &ds->lag_queue[index]; } else return NULL; @@ -456,21 +521,21 @@ lagged_case (int n_before) /* Procedure that separates the data into SPLIT FILE groups. */ /* Represents auxiliary data for handling SPLIT FILE. */ -struct split_aux_data +struct split_aux_data { - size_t case_count; /* Number of cases so far. */ + struct dataset *dataset; /* The dataset */ struct ccase prev_case; /* Data in previous case. */ /* Callback functions. */ - void (*begin_func) (const struct ccase *, void *); - bool (*proc_func) (const struct ccase *, void *); - void (*end_func) (void *); + begin_func *begin; + case_func *proc; + end_func *end; void *func_aux; }; -static int equal_splits (const struct ccase *, const struct ccase *); -static bool split_procedure_case_func (const struct ccase *c, void *); -static bool split_procedure_end_func (void *); +static int equal_splits (const struct ccase *, const struct ccase *, const struct dataset *ds); +static bool split_procedure_case_func (const struct ccase *c, void *, const struct dataset *); +static bool split_procedure_end_func (void *, const struct dataset *); /* Like procedure(), but it automatically breaks the case stream into SPLIT FILE break groups. Before each group of cases with @@ -482,30 +547,31 @@ static bool split_procedure_end_func (void *); passed to each of the functions as auxiliary data. If the active file is empty, none of BEGIN_FUNC, PROC_FUNC, - and END_FUNC will be called at all. + and END_FUNC will be called at all. If SPLIT FILE is not in effect, then there is one break group (if the active file is nonempty), and BEGIN_FUNC and END_FUNC will be called once. - + Returns true if successful, false if an I/O error occurred. */ bool -procedure_with_splits (void (*begin_func) (const struct ccase *, void *aux), - bool (*proc_func) (const struct ccase *, void *aux), - void (*end_func) (void *aux), - void *func_aux) +procedure_with_splits (struct dataset *ds, + begin_func begin, + case_func *proc, + end_func *end, + void *func_aux) { struct split_aux_data split_aux; bool ok; - split_aux.case_count = 0; case_nullify (&split_aux.prev_case); - split_aux.begin_func = begin_func; - split_aux.proc_func = proc_func; - split_aux.end_func = end_func; + split_aux.begin = begin; + split_aux.proc = proc; + split_aux.end = end; split_aux.func_aux = func_aux; + split_aux.dataset = ds; - ok = internal_procedure (split_procedure_case_func, + ok = internal_procedure (ds, split_procedure_case_func, split_procedure_end_func, &split_aux); case_destroy (&split_aux.prev_case); @@ -515,48 +581,48 @@ procedure_with_splits (void (*begin_func) (const struct ccase *, void *aux), /* Case callback used by procedure_with_splits(). */ static bool -split_procedure_case_func (const struct ccase *c, void *split_aux_) +split_procedure_case_func (const struct ccase *c, void *split_aux_, const struct dataset *ds) { struct split_aux_data *split_aux = split_aux_; /* Start a new series if needed. */ - if (split_aux->case_count == 0 - || !equal_splits (c, &split_aux->prev_case)) + if (case_is_null (&split_aux->prev_case) + || !equal_splits (c, &split_aux->prev_case, split_aux->dataset)) { - if (split_aux->case_count > 0 && split_aux->end_func != NULL) - split_aux->end_func (split_aux->func_aux); + if (!case_is_null (&split_aux->prev_case) && split_aux->end != NULL) + split_aux->end (split_aux->func_aux, ds); case_destroy (&split_aux->prev_case); case_clone (&split_aux->prev_case, c); - if (split_aux->begin_func != NULL) - split_aux->begin_func (&split_aux->prev_case, split_aux->func_aux); + if (split_aux->begin != NULL) + split_aux->begin (&split_aux->prev_case, split_aux->func_aux, ds); } - split_aux->case_count++; - return (split_aux->proc_func == NULL - || split_aux->proc_func (c, split_aux->func_aux)); + return (split_aux->proc == NULL + || split_aux->proc (c, split_aux->func_aux, ds)); } /* End-of-file callback used by procedure_with_splits(). */ static bool -split_procedure_end_func (void *split_aux_) +split_procedure_end_func (void *split_aux_, const struct dataset *ds) { struct split_aux_data *split_aux = split_aux_; - if (split_aux->case_count > 0 && split_aux->end_func != NULL) - split_aux->end_func (split_aux->func_aux); + if (!case_is_null (&split_aux->prev_case) && split_aux->end != NULL) + split_aux->end (split_aux->func_aux, ds); return true; } /* Compares the SPLIT FILE variables in cases A and B and returns nonzero only if they differ. */ static int -equal_splits (const struct ccase *a, const struct ccase *b) +equal_splits (const struct ccase *a, const struct ccase *b, + const struct dataset *ds) { return case_compare (a, b, - dict_get_split_vars (default_dict), - dict_get_split_cnt (default_dict)) == 0; + dict_get_split_vars (ds->dict), + dict_get_split_cnt (ds->dict)) == 0; } /* Multipass procedure that separates the data into SPLIT FILE @@ -564,26 +630,24 @@ equal_splits (const struct ccase *a, const struct ccase *b) /* Represents auxiliary data for handling SPLIT FILE in a multipass procedure. */ -struct multipass_split_aux_data +struct multipass_split_aux_data { + struct dataset *dataset; /* The dataset of the split */ struct ccase prev_case; /* Data in previous case. */ struct casefile *casefile; /* Accumulates data for a split. */ - - /* Function to call with the accumulated data. */ - bool (*split_func) (const struct ccase *first, const struct casefile *, - void *); - void *func_aux; /* Auxiliary data. */ + split_func *split; /* Function to call with the accumulated + data. */ + void *func_aux; /* Auxiliary data. */ }; -static bool multipass_split_case_func (const struct ccase *c, void *aux_); -static bool multipass_split_end_func (void *aux_); -static bool multipass_split_output (struct multipass_split_aux_data *); +static bool multipass_split_case_func (const struct ccase *c, void *aux_, const struct dataset *); +static bool multipass_split_end_func (void *aux_, const struct dataset *ds); +static bool multipass_split_output (struct multipass_split_aux_data *, const struct dataset *ds); /* Returns true if successful, false if an I/O error occurred. */ bool -multipass_procedure_with_splits (bool (*split_func) (const struct ccase *first, - const struct casefile *, - void *aux), +multipass_procedure_with_splits (struct dataset *ds, + split_func *split, void *func_aux) { struct multipass_split_aux_data aux; @@ -591,10 +655,11 @@ multipass_procedure_with_splits (bool (*split_func) (const struct ccase *first, case_nullify (&aux.prev_case); aux.casefile = NULL; - aux.split_func = split_func; + aux.split = split; aux.func_aux = func_aux; + aux.dataset = ds; - ok = internal_procedure (multipass_split_case_func, + ok = internal_procedure (ds, multipass_split_case_func, multipass_split_end_func, &aux); case_destroy (&aux.prev_case); @@ -603,13 +668,13 @@ multipass_procedure_with_splits (bool (*split_func) (const struct ccase *first, /* Case callback used by multipass_procedure_with_splits(). */ static bool -multipass_split_case_func (const struct ccase *c, void *aux_) +multipass_split_case_func (const struct ccase *c, void *aux_, const struct dataset *ds) { struct multipass_split_aux_data *aux = aux_; bool ok = true; /* Start a new series if needed. */ - if (aux->casefile == NULL || !equal_splits (c, &aux->prev_case)) + if (aux->casefile == NULL || ! equal_splits (c, &aux->prev_case, ds)) { /* Record split values. */ case_destroy (&aux->prev_case); @@ -617,10 +682,12 @@ multipass_split_case_func (const struct ccase *c, void *aux_) /* Pass any cases to split_func. */ if (aux->casefile != NULL) - ok = multipass_split_output (aux); + ok = multipass_split_output (aux, ds); /* Start a new casefile. */ - aux->casefile = casefile_create (dict_get_next_value_idx (default_dict)); + aux->casefile = + ds->cf_factory->create_casefile (ds->cf_factory, + dict_get_next_value_idx (ds->dict)); } return casefile_append (aux->casefile, c) && ok; @@ -628,19 +695,19 @@ multipass_split_case_func (const struct ccase *c, void *aux_) /* End-of-file callback used by multipass_procedure_with_splits(). */ static bool -multipass_split_end_func (void *aux_) +multipass_split_end_func (void *aux_, const struct dataset *ds) { struct multipass_split_aux_data *aux = aux_; - return (aux->casefile == NULL || multipass_split_output (aux)); + return (aux->casefile == NULL || multipass_split_output (aux, ds)); } static bool -multipass_split_output (struct multipass_split_aux_data *aux) +multipass_split_output (struct multipass_split_aux_data *aux, const struct dataset *ds) { bool ok; - + assert (aux->casefile != NULL); - ok = aux->split_func (&aux->prev_case, aux->casefile, aux->func_aux); + ok = aux->split (&aux->prev_case, aux->casefile, aux->func_aux, ds); casefile_destroy (aux->casefile); aux->casefile = NULL; @@ -650,30 +717,30 @@ multipass_split_output (struct multipass_split_aux_data *aux) /* Discards all the current state in preparation for a data-input command like DATA LIST or GET. */ void -discard_variables (void) +discard_variables (struct dataset *ds) { - dict_clear (default_dict); + dict_clear (ds->dict); fh_set_default_handle (NULL); - n_lag = 0; - - free_case_source (proc_source); - proc_source = NULL; + ds->n_lag = 0; - proc_cancel_all_transformations (); + free_case_source (ds->proc_source); + proc_set_source (ds, NULL); + + proc_cancel_all_transformations (ds); } /* Returns the current set of permanent transformations, and clears the permanent transformations. For use by INPUT PROGRAM. */ struct trns_chain * -proc_capture_transformations (void) +proc_capture_transformations (struct dataset *ds) { struct trns_chain *chain; - - assert (temporary_trns_chain == NULL); - chain = permanent_trns_chain; - cur_trns_chain = permanent_trns_chain = trns_chain_create (); + + assert (ds->temporary_trns_chain == NULL); + chain = ds->permanent_trns_chain; + ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create (); return chain; } @@ -681,9 +748,9 @@ proc_capture_transformations (void) frees itself with FREE to the current set of transformations. The functions are passed AUX as auxiliary data. */ void -add_transformation (trns_proc_func *proc, trns_free_func *free, void *aux) +add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux) { - trns_chain_append (cur_trns_chain, NULL, proc, free, aux); + trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux); } /* Adds a transformation that processes a case with PROC and @@ -692,44 +759,46 @@ add_transformation (trns_proc_func *proc, trns_free_func *free, void *aux) FINALIZE will be called. The functions are passed AUX as auxiliary data. */ void -add_transformation_with_finalizer (trns_finalize_func *finalize, +add_transformation_with_finalizer (struct dataset *ds, + trns_finalize_func *finalize, trns_proc_func *proc, trns_free_func *free, void *aux) { - trns_chain_append (cur_trns_chain, finalize, proc, free, aux); + trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux); } /* Returns the index of the next transformation. This value can be returned by a transformation procedure function to indicate a "jump" to that transformation. */ size_t -next_transformation (void) +next_transformation (const struct dataset *ds) { - return trns_chain_next (cur_trns_chain); + return trns_chain_next (ds->cur_trns_chain); } /* Returns true if the next call to add_transformation() will add a temporary transformation, false if it will add a permanent transformation. */ bool -proc_in_temporary_transformations (void) +proc_in_temporary_transformations (const struct dataset *ds) { - return temporary_trns_chain != NULL; + return ds->temporary_trns_chain != NULL; } /* Marks the start of temporary transformations. Further calls to add_transformation() will add temporary transformations. */ void -proc_start_temporary_transformations (void) +proc_start_temporary_transformations (struct dataset *ds) { - if (!proc_in_temporary_transformations ()) + if (!proc_in_temporary_transformations (ds)) { - add_case_limit_trns (); + add_case_limit_trns (ds); + + ds->permanent_dict = dict_clone (ds->dict); - permanent_dict = dict_clone (default_dict); - trns_chain_finalize (permanent_trns_chain); - temporary_trns_chain = cur_trns_chain = trns_chain_create (); + trns_chain_finalize (ds->permanent_trns_chain); + ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create (); } } @@ -738,16 +807,16 @@ proc_start_temporary_transformations (void) permanent. Returns true if anything changed, false otherwise. */ bool -proc_make_temporary_transformations_permanent (void) +proc_make_temporary_transformations_permanent (struct dataset *ds) { - if (proc_in_temporary_transformations ()) + if (proc_in_temporary_transformations (ds)) { - trns_chain_finalize (temporary_trns_chain); - trns_chain_splice (permanent_trns_chain, temporary_trns_chain); - temporary_trns_chain = NULL; + trns_chain_finalize (ds->temporary_trns_chain); + trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain); + ds->temporary_trns_chain = NULL; - dict_destroy (permanent_dict); - permanent_dict = NULL; + dict_destroy (ds->permanent_dict); + ds->permanent_dict = NULL; return true; } @@ -759,16 +828,15 @@ proc_make_temporary_transformations_permanent (void) transformations will be permanent. Returns true if anything changed, false otherwise. */ bool -proc_cancel_temporary_transformations (void) +proc_cancel_temporary_transformations (struct dataset *ds) { - if (proc_in_temporary_transformations ()) + if (proc_in_temporary_transformations (ds)) { - dict_destroy (default_dict); - default_dict = permanent_dict; - permanent_dict = NULL; + dataset_set_dict (ds, ds->permanent_dict); + ds->permanent_dict = NULL; - trns_chain_destroy (temporary_trns_chain); - temporary_trns_chain = NULL; + trns_chain_destroy (ds->temporary_trns_chain); + ds->temporary_trns_chain = NULL; return true; } @@ -779,55 +847,68 @@ proc_cancel_temporary_transformations (void) /* Cancels all transformations, if any. Returns true if successful, false on I/O error. */ bool -proc_cancel_all_transformations (void) +proc_cancel_all_transformations (struct dataset *ds) { bool ok; - ok = trns_chain_destroy (permanent_trns_chain); - ok = trns_chain_destroy (temporary_trns_chain) && ok; - permanent_trns_chain = cur_trns_chain = trns_chain_create (); - temporary_trns_chain = NULL; + ok = trns_chain_destroy (ds->permanent_trns_chain); + ok = trns_chain_destroy (ds->temporary_trns_chain) && ok; + ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create (); + ds->temporary_trns_chain = NULL; return ok; } /* Initializes procedure handling. */ -void -proc_init (void) +struct dataset * +create_dataset (struct casefile_factory *fact, + replace_source_callback *rps, + replace_dictionary_callback *rds + ) { - default_dict = dict_create (); - proc_cancel_all_transformations (); + struct dataset *ds = xzalloc (sizeof(*ds)); + ds->dict = dict_create (); + ds->cf_factory = fact; + ds->replace_source = rps; + ds->replace_dict = rds; + proc_cancel_all_transformations (ds); + return ds; } /* Finishes up procedure handling. */ void -proc_done (void) +destroy_dataset (struct dataset *ds) { - discard_variables (); + discard_variables (ds); + dict_destroy (ds->dict); + trns_chain_destroy (ds->permanent_trns_chain); + free (ds); } /* Sets SINK as the destination for procedure output from the next procedure. */ void -proc_set_sink (struct case_sink *sink) +proc_set_sink (struct dataset *ds, struct case_sink *sink) { - assert (proc_sink == NULL); - proc_sink = sink; + assert (ds->proc_sink == NULL); + ds->proc_sink = sink; } /* Sets SOURCE as the source for procedure input for the next procedure. */ void -proc_set_source (struct case_source *source) +proc_set_source (struct dataset *ds, struct case_source *source) { - assert (proc_source == NULL); - proc_source = source; + ds->proc_source = source; + + if ( ds->replace_source ) + ds->replace_source (ds->proc_source); } /* Returns true if a source for the next procedure has been configured, false otherwise. */ bool -proc_has_source (void) +proc_has_source (const struct dataset *ds) { - return proc_source != NULL; + return ds->proc_source != NULL; } /* Returns the output from the previous procedure. @@ -835,19 +916,19 @@ proc_has_source (void) The returned casefile is owned by the caller; it will not be automatically used for the next procedure's input. */ struct casefile * -proc_capture_output (void) +proc_capture_output (struct dataset *ds) { struct casefile *casefile; /* Try to make sure that this function is called immediately after procedure() or a similar function. */ - assert (proc_source != NULL); - assert (case_source_is_class (proc_source, &storage_source_class)); - assert (trns_chain_is_empty (permanent_trns_chain)); - assert (!proc_in_temporary_transformations ()); + assert (ds->proc_source != NULL); + assert (case_source_is_class (ds->proc_source, &storage_source_class)); + assert (trns_chain_is_empty (ds->permanent_trns_chain)); + assert (!proc_in_temporary_transformations (ds)); - casefile = storage_source_decapsulate (proc_source); - proc_source = NULL; + casefile = storage_source_decapsulate (ds->proc_source); + proc_set_source (ds, NULL); return casefile; } @@ -856,18 +937,18 @@ static trns_proc_func case_limit_trns_proc; static trns_free_func case_limit_trns_free; /* Adds a transformation that limits the number of cases that may - pass through, if default_dict has a case limit. */ + pass through, if DS->DICT has a case limit. */ static void -add_case_limit_trns (void) +add_case_limit_trns (struct dataset *ds) { - size_t case_limit = dict_get_case_limit (default_dict); + size_t case_limit = dict_get_case_limit (ds->dict); if (case_limit != 0) { size_t *cases_remaining = xmalloc (sizeof *cases_remaining); *cases_remaining = case_limit; - add_transformation (case_limit_trns_proc, case_limit_trns_free, + add_transformation (ds, case_limit_trns_proc, case_limit_trns_free, cases_remaining); - dict_set_case_limit (default_dict, 0); + dict_set_case_limit (ds->dict, 0); } } @@ -875,12 +956,12 @@ add_case_limit_trns (void) *CASES_REMAINING. */ static int case_limit_trns_proc (void *cases_remaining_, - struct ccase *c UNUSED, int case_nr UNUSED) + struct ccase *c UNUSED, casenumber case_nr UNUSED) { size_t *cases_remaining = cases_remaining_; - if (*cases_remaining > 0) + if (*cases_remaining > 0) { - *cases_remaining--; + (*cases_remaining)--; return TRNS_CONTINUE; } else @@ -889,7 +970,7 @@ case_limit_trns_proc (void *cases_remaining_, /* Frees the data associated with a case limit transformation. */ static bool -case_limit_trns_free (void *cases_remaining_) +case_limit_trns_free (void *cases_remaining_) { size_t *cases_remaining = cases_remaining_; free (cases_remaining); @@ -901,25 +982,68 @@ static trns_proc_func filter_trns_proc; /* Adds a temporary transformation to filter data according to the variable specified on FILTER, if any. */ static void -add_filter_trns (void) +add_filter_trns (struct dataset *ds) { - struct variable *filter_var = dict_get_filter (default_dict); - if (filter_var != NULL) + struct variable *filter_var = dict_get_filter (ds->dict); + if (filter_var != NULL) { - proc_start_temporary_transformations (); - add_transformation (filter_trns_proc, NULL, filter_var); + proc_start_temporary_transformations (ds); + add_transformation (ds, filter_trns_proc, NULL, filter_var); } } /* FILTER transformation. */ static int filter_trns_proc (void *filter_var_, - struct ccase *c UNUSED, int case_nr UNUSED) - + struct ccase *c UNUSED, casenumber case_nr UNUSED) + { struct variable *filter_var = filter_var_; - double f = case_num (c, filter_var->fv); - return (f != 0.0 && !mv_is_num_missing (&filter_var->miss, f) + double f = case_num (c, filter_var); + return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY) ? TRNS_CONTINUE : TRNS_DROP_CASE); } + +struct dictionary * +dataset_dict (const struct dataset *ds) +{ + return ds->dict; +} + + +/* Set or replace dataset DS's dictionary with DICT. + The old dictionary is destroyed */ +void +dataset_set_dict (struct dataset *ds, struct dictionary *dict) +{ + struct dictionary *old_dict = ds->dict; + + dict_copy_callbacks (dict, ds->dict); + ds->dict = dict; + + if ( ds->replace_dict ) + ds->replace_dict (dict); + + dict_destroy (old_dict); +} + +int +dataset_n_lag (const struct dataset *ds) +{ + return ds->n_lag; +} + +void +dataset_set_n_lag (struct dataset *ds, int n_lag) +{ + ds->n_lag = n_lag; +} + + +struct casefile_factory * +dataset_get_casefile_factory (const struct dataset *ds) +{ + return ds->cf_factory; +} +