X-Git-Url: https://pintos-os.org/cgi-bin/gitweb.cgi?a=blobdiff_plain;f=src%2Fdata%2Fprocedure.c;h=8aec2c9fe05092d1beded5ca2bddafbc12048e1d;hb=687adf53eae434e88a47bb3409f946f3a26115a4;hp=7cd34369a122824751245023f4b03e9b1340db22;hpb=888d0f91d57e0c3c5a4206c30ac71eb87bf44227;p=pspp-builds.git diff --git a/src/data/procedure.c b/src/data/procedure.c index 7cd34369..8aec2c9f 100644 --- a/src/data/procedure.c +++ b/src/data/procedure.c @@ -1,6 +1,5 @@ /* PSPP - computes sample statistics. Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc. - Written by Ben Pfaff . This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as @@ -39,21 +38,11 @@ #include #include -/* Procedure execution data. */ -struct write_case_data - { - /* Function to call for each case. */ - case_func *proc; - void *aux; +struct dataset { - struct dataset *dataset; /* The dataset concerned */ - struct ccase trns_case; /* Case used for transformations. */ - struct ccase sink_case; /* Case written to sink, if - compacting is necessary. */ - size_t cases_written; /* Cases output so far. */ - }; + /* An abstract factory which creates casefiles */ + struct casefile_factory *cf_factory; -struct dataset { /* Cases are read from proc_source, pass through permanent_trns_chain (which transforms them into the format described by permanent_dict), @@ -85,6 +74,13 @@ struct dataset { int lag_head; /* Index where next case will be added. */ struct ccase *lag_queue; /* Array of n_lag ccase * elements. */ + /* Procedure data. */ + bool is_open; /* Procedure open? */ + struct ccase trns_case; /* Case used for transformations. */ + struct ccase sink_case; /* Case written to sink, if + compacting is necessary. */ + size_t cases_written; /* Cases output so far. */ + bool ok; }; /* struct dataset */ @@ -97,7 +93,6 @@ static bool internal_procedure (struct dataset *ds, case_func *, static void update_last_proc_invocation (struct dataset *ds); static void create_trns_case (struct ccase *, struct dictionary *); static void open_active_file (struct dataset *ds); -static bool write_case (struct write_case_data *wc_data); static void lag_case (struct dataset *ds, const struct ccase *c); static void clear_case (const struct dataset *ds, struct ccase *c); static bool close_active_file (struct dataset *ds); @@ -135,6 +130,23 @@ time_of_last_procedure (struct dataset *ds) bool procedure (struct dataset *ds, case_func *cf, void *aux) { + update_last_proc_invocation (ds); + + /* Optimize the trivial case where we're not going to do + anything with the data, by not reading the data at all. */ + if (cf == NULL + && case_source_is_class (ds->proc_source, &storage_source_class) + && ds->proc_sink == NULL + && (ds->temporary_trns_chain == NULL + || trns_chain_is_empty (ds->temporary_trns_chain)) + && trns_chain_is_empty (ds->permanent_trns_chain)) + { + ds->n_lag = 0; + dict_set_case_limit (ds->dict, 0); + dict_clear_vectors (ds->dict); + return true; + } + return internal_procedure (ds, cf, NULL, aux); } @@ -174,7 +186,10 @@ multipass_procedure (struct dataset *ds, casefile_func *proc_func, void *aux) struct multipass_aux_data aux_data; bool ok; - aux_data.casefile = fastfile_create (dict_get_next_value_idx (ds->dict)); + aux_data.casefile = + ds->cf_factory->create_casefile (ds->cf_factory, + dict_get_next_value_idx (ds->dict)); + aux_data.proc_func = proc_func; aux_data.aux = aux; @@ -188,7 +203,6 @@ multipass_procedure (struct dataset *ds, casefile_func *proc_func, void *aux) /* Procedure implementation. */ - /* Executes a procedure. Passes each case to CASE_FUNC. Calls END_FUNC after the last case. @@ -199,50 +213,132 @@ internal_procedure (struct dataset *ds, case_func *proc, end_func *end, void *aux) { - struct write_case_data wc_data; + struct ccase *c; bool ok = true; + + proc_open (ds); + while (ok && proc_read (ds, &c)) + if (proc != NULL) + ok = proc (c, aux, ds) && ok; + if (end != NULL) + ok = end (aux, ds) && ok; + return proc_close (ds) && ok; +} +/* Opens dataset DS for reading cases with proc_read. + proc_close must be called when done. */ +void +proc_open (struct dataset *ds) +{ assert (ds->proc_source != NULL); + assert (!ds->is_open); update_last_proc_invocation (ds); - /* Optimize the trivial case where we're not going to do - anything with the data, by not reading the data at all. */ - if (proc == NULL && end == NULL - && case_source_is_class (ds->proc_source, &storage_source_class) - && ds->proc_sink == NULL - && (ds->temporary_trns_chain == NULL - || trns_chain_is_empty (ds->temporary_trns_chain)) - && trns_chain_is_empty (ds->permanent_trns_chain)) + open_active_file (ds); + + ds->is_open = true; + create_trns_case (&ds->trns_case, ds->dict); + case_create (&ds->sink_case, dict_get_compacted_value_cnt (ds->dict)); + ds->cases_written = 0; + ds->ok = true; +} + +/* Reads the next case from dataset DS, which must have been + opened for reading with proc_open. + Returns true if successful, in which case a pointer to the + case is stored in *C. + Return false at end of file or if a read error occurs. In + this case a null pointer is stored in *C. */ +bool +proc_read (struct dataset *ds, struct ccase **c) +{ + enum trns_result retval = TRNS_DROP_CASE; + + assert (ds->is_open); + *c = NULL; + for (;;) { - ds->n_lag = 0; - dict_set_case_limit (ds->dict, 0); - dict_clear_vectors (ds->dict); - return true; - } + size_t case_nr; + + assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR); + if (retval == TRNS_ERROR) + ds->ok = false; + if (!ds->ok) + return false; + + /* Read a case from proc_source. */ + clear_case (ds, &ds->trns_case); + if (!ds->proc_source->class->read (ds->proc_source, &ds->trns_case)) + return false; + + /* Execute permanent transformations. */ + case_nr = ds->cases_written + 1; + retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE, + &ds->trns_case, &case_nr); + if (retval != TRNS_CONTINUE) + continue; - open_active_file (ds); + /* Write case to LAG queue. */ + if (ds->n_lag) + lag_case (ds, &ds->trns_case); + + /* Write case to replacement active file. */ + ds->cases_written++; + if (ds->proc_sink->class->write != NULL) + { + if (ds->compactor != NULL) + { + dict_compactor_compact (ds->compactor, &ds->sink_case, + &ds->trns_case); + ds->proc_sink->class->write (ds->proc_sink, &ds->sink_case); + } + else + ds->proc_sink->class->write (ds->proc_sink, &ds->trns_case); + } - wc_data.proc = proc; - wc_data.aux = aux; - wc_data.dataset = ds; - create_trns_case (&wc_data.trns_case, ds->dict); - case_create (&wc_data.sink_case, - dict_get_compacted_value_cnt (ds->dict)); - wc_data.cases_written = 0; - - ok = ds->proc_source->class->read (ds->proc_source, - &wc_data.trns_case, - write_case, &wc_data) && ok; - if (end != NULL) - ok = end (aux, ds) && ok; + /* Execute temporary transformations. */ + if (ds->temporary_trns_chain != NULL) + { + retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE, + &ds->trns_case, &ds->cases_written); + if (retval != TRNS_CONTINUE) + continue; + } - case_destroy (&wc_data.sink_case); - case_destroy (&wc_data.trns_case); + *c = &ds->trns_case; + return true; + } +} - ok = close_active_file (ds) && ok; +/* Closes dataset DS for reading. + Returns true if successful, false if an I/O error occurred + while reading or closing the data set. + If DS has not been opened, returns true without doing + anything else. */ +bool +proc_close (struct dataset *ds) +{ + if (!ds->is_open) + return true; - return ok; + /* Drain any remaining cases. */ + while (ds->ok) + { + struct ccase *c; + if (!proc_read (ds, &c)) + break; + } + ds->ok = free_case_source (ds->proc_source) && ds->ok; + ds->proc_source = NULL; + + case_destroy (&ds->sink_case); + case_destroy (&ds->trns_case); + + ds->ok = close_active_file (ds) && ds->ok; + ds->is_open = false; + + return ds->ok; } /* Updates last_proc_invocation. */ @@ -265,7 +361,7 @@ create_trns_case (struct ccase *trns_case, struct dictionary *dict) for (i = 0; i < var_cnt; i++) { struct variable *v = dict_get_var (dict, i); - union value *value = case_data_rw (trns_case, v->fv); + union value *value = case_data_rw (trns_case, v); if (var_is_numeric (v)) value->f = var_get_leave (v) ? 0.0 : SYSMIS; @@ -298,7 +394,10 @@ open_active_file (struct dataset *ds) /* Prepare sink. */ if (ds->proc_sink == NULL) - ds->proc_sink = create_case_sink (&storage_sink_class, ds->permanent_dict, NULL); + ds->proc_sink = create_case_sink (&storage_sink_class, + ds->permanent_dict, + ds->cf_factory, + NULL); if (ds->proc_sink->class->open != NULL) ds->proc_sink->class->open (ds->proc_sink); @@ -315,63 +414,6 @@ open_active_file (struct dataset *ds) } } -/* Transforms trns_case and writes it to the replacement active - file if advisable. Returns true if more cases can be - accepted, false otherwise. Do not call this function again - after it has returned false once. */ -static bool -write_case (struct write_case_data *wc_data) -{ - enum trns_result retval; - size_t case_nr; - - struct dataset *ds = wc_data->dataset; - - /* Execute permanent transformations. */ - case_nr = wc_data->cases_written + 1; - retval = trns_chain_execute (ds->permanent_trns_chain, - &wc_data->trns_case, &case_nr); - if (retval != TRNS_CONTINUE) - goto done; - - /* Write case to LAG queue. */ - if (ds->n_lag) - lag_case (ds, &wc_data->trns_case); - - /* Write case to replacement active file. */ - wc_data->cases_written++; - if (ds->proc_sink->class->write != NULL) - { - if (ds->compactor != NULL) - { - dict_compactor_compact (ds->compactor, &wc_data->sink_case, - &wc_data->trns_case); - ds->proc_sink->class->write (ds->proc_sink, &wc_data->sink_case); - } - else - ds->proc_sink->class->write (ds->proc_sink, &wc_data->trns_case); - } - - /* Execute temporary transformations. */ - if (ds->temporary_trns_chain != NULL) - { - retval = trns_chain_execute (ds->temporary_trns_chain, - &wc_data->trns_case, - &wc_data->cases_written); - if (retval != TRNS_CONTINUE) - goto done; - } - - /* Pass case to procedure. */ - if (wc_data->proc != NULL) - if (!wc_data->proc (&wc_data->trns_case, wc_data->aux, ds)) - retval = TRNS_ERROR; - - done: - clear_case (ds, &wc_data->trns_case); - return retval != TRNS_ERROR; -} - /* Add C to the lag queue. */ static void lag_case (struct dataset *ds, const struct ccase *c) @@ -398,9 +440,9 @@ clear_case (const struct dataset *ds, struct ccase *c) if (!var_get_leave (v)) { if (var_is_numeric (v)) - case_data_rw (c, v->fv)->f = SYSMIS; + case_data_rw (c, v)->f = SYSMIS; else - memset (case_data_rw (c, v->fv)->s, ' ', var_get_width (v)); + memset (case_data_rw (c, v)->s, ' ', var_get_width (v)); } } } @@ -431,10 +473,6 @@ close_active_file (struct dataset *ds) ds->compactor = NULL; } - /* Free data source. */ - free_case_source (ds->proc_source); - ds->proc_source = NULL; - /* Old data sink becomes new data source. */ if (ds->proc_sink->class->make_source != NULL) ds->proc_source = ds->proc_sink->class->make_source (ds->proc_sink); @@ -633,7 +671,8 @@ multipass_split_case_func (const struct ccase *c, void *aux_, const struct datas /* Start a new casefile. */ aux->casefile = - fastfile_create (dict_get_next_value_idx (ds->dict)); + ds->cf_factory->create_casefile (ds->cf_factory, + dict_get_next_value_idx (ds->dict)); } return casefile_append (aux->casefile, c) && ok; @@ -805,10 +844,11 @@ proc_cancel_all_transformations (struct dataset *ds) /* Initializes procedure handling. */ struct dataset * -create_dataset (void) +create_dataset (struct casefile_factory *fact) { struct dataset *ds = xzalloc (sizeof(*ds)); ds->dict = dict_create (); + ds->cf_factory = fact; proc_cancel_all_transformations (ds); return ds; } @@ -937,7 +977,7 @@ filter_trns_proc (void *filter_var_, { struct variable *filter_var = filter_var_; - double f = case_num (c, filter_var->fv); + double f = case_num (c, filter_var); return (f != 0.0 && !var_is_num_missing (filter_var, f) ? TRNS_CONTINUE : TRNS_DROP_CASE); } @@ -969,3 +1009,9 @@ dataset_set_n_lag (struct dataset *ds, int n_lag) } +struct casefile_factory * +dataset_get_casefile_factory (const struct dataset *ds) +{ + return ds->cf_factory; +} +