X-Git-Url: https://pintos-os.org/cgi-bin/gitweb.cgi?a=blobdiff_plain;f=src%2Fdata%2Fprocedure.c;h=98414a2b0478dbb5e44f306e0f99d90684b86003;hb=fd349404fdd08760cec579198c830bf5b8aede0f;hp=46a18bb463297fdf735a6e30a2d2207c0dc45e9a;hpb=92c09e564002d356d20fc1e2e131027ef89f6748;p=pspp-builds.git diff --git a/src/data/procedure.c b/src/data/procedure.c index 46a18bb4..98414a2b 100644 --- a/src/data/procedure.c +++ b/src/data/procedure.c @@ -1,20 +1,18 @@ -/* PSPP - computes sample statistics. +/* PSPP - a program for statistical analysis. Copyright (C) 1997-9, 2000, 2006, 2007 Free Software Foundation, Inc. - This program is free software; you can redistribute it and/or - modify it under the terms of the GNU General Public License as - published by the Free Software Foundation; either version 2 of the - License, or (at your option) any later version. + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. - This program is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA - 02110-1301, USA. */ + along with this program. If not, see . */ #include @@ -24,6 +22,7 @@ #include #include +#include #include #include #include @@ -33,12 +32,13 @@ #include #include #include -#include #include #include #include #include +#include "xalloc.h" + struct dataset { /* Cases are read from source, their transformation variables are initialized, @@ -56,12 +56,10 @@ struct dataset { struct trns_chain *temporary_trns_chain; struct dictionary *dict; - /* Callback which occurs when a procedure provides a new source for - the dataset */ - replace_source_callback *replace_source ; - - /* Callback which occurs whenever the DICT is replaced by a new one */ - replace_dictionary_callback *replace_dict; + /* Callback which occurs whenever the transformation chain(s) have + been modified */ + transformation_change_callback_func *xform_callback; + void *xform_callback_aux; /* If true, cases are discarded instead of being written to sink. */ @@ -71,9 +69,9 @@ struct dataset { added to. */ struct trns_chain *cur_trns_chain; - /* The compactor used to compact a case, if necessary; + /* The case map used to compact a case, if necessary; otherwise a null pointer. */ - struct dict_compactor *compactor; + struct case_map *compactor; /* Time at which proc was last invoked. */ time_t last_proc_invocation; @@ -84,14 +82,15 @@ struct dataset { struct ccase *lag_cases; /* Lagged cases managed by deque. */ /* Procedure data. */ - enum + enum { - PROC_COMMITTED, - PROC_OPEN, - PROC_CLOSED + PROC_COMMITTED, /* No procedure in progress. */ + PROC_OPEN, /* proc_open called, casereader still open. */ + PROC_CLOSED /* casereader from proc_open destroyed, + but proc_commit not yet called. */ } proc_state; - size_t cases_written; /* Cases output so far. */ + casenumber cases_written; /* Cases output so far. */ bool ok; /* Error status. */ }; /* struct dataset */ @@ -138,7 +137,7 @@ proc_execute (struct dataset *ds) return proc_commit (ds) && ok; } -static struct casereader_class proc_casereader_class; +static const struct casereader_class proc_casereader_class; /* Opens dataset DS for reading cases with proc_read. proc_commit must be called when done. */ @@ -163,15 +162,17 @@ proc_open (struct dataset *ds) ds->permanent_dict = ds->dict; /* Prepare sink. */ - if (!ds->discard_output) + if (!ds->discard_output) { - ds->compactor = (dict_compacting_would_shrink (ds->permanent_dict) - ? dict_make_compactor (ds->permanent_dict) + struct dictionary *pd = ds->permanent_dict; + size_t compacted_value_cnt = dict_count_values (pd, 1u << DC_SCRATCH); + bool should_compact = compacted_value_cnt < dict_get_next_value_idx (pd); + ds->compactor = (should_compact + ? case_map_to_compact_dict (pd, 1u << DC_SCRATCH) : NULL); - ds->sink = autopaging_writer_create (dict_get_compacted_value_cnt ( - ds->permanent_dict)); + ds->sink = autopaging_writer_create (compacted_value_cnt); } - else + else { ds->compactor = NULL; ds->sink = NULL; @@ -193,29 +194,26 @@ proc_open (struct dataset *ds) &proc_casereader_class, ds); } +/* Returns true if a procedure is in progress, that is, if + proc_open has been called but proc_commit has not. */ bool -proc_is_open (const struct dataset *ds) +proc_is_open (const struct dataset *ds) { return ds->proc_state != PROC_COMMITTED; } -/* Reads the next case from dataset DS, which must have been - opened for reading with proc_open. - Returns true if successful, in which case a pointer to the - case is stored in *C. - Return false at end of file or if a read error occurs. In - this case a null pointer is stored in *C. */ +/* "read" function for procedure casereader. */ static bool proc_casereader_read (struct casereader *reader UNUSED, void *ds_, - struct ccase *c) + struct ccase *c) { struct dataset *ds = ds_; enum trns_result retval = TRNS_DROP_CASE; assert (ds->proc_state == PROC_OPEN); - for (;;) + for (;;) { - size_t case_nr; + casenumber case_nr; assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR); if (retval == TRNS_ERROR) @@ -227,22 +225,21 @@ proc_casereader_read (struct casereader *reader UNUSED, void *ds_, if (!casereader_read (ds->source, c)) return false; case_resize (c, dict_get_next_value_idx (ds->dict)); - caseinit_init_reinit_vars (ds->caseinit, c); - caseinit_init_left_vars (ds->caseinit, c); + caseinit_init_vars (ds->caseinit, c); /* Execute permanent transformations. */ case_nr = ds->cases_written + 1; retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE, - c, &case_nr); + c, case_nr); caseinit_update_left_vars (ds->caseinit, c); - if (retval != TRNS_CONTINUE) + if (retval != TRNS_CONTINUE) { case_destroy (c); - continue; + continue; } - + /* Write case to collection of lagged cases. */ - if (ds->n_lag > 0) + if (ds->n_lag > 0) { while (deque_count (&ds->lag) >= ds->n_lag) case_destroy (&ds->lag_cases[deque_pop_back (&ds->lag)]); @@ -251,14 +248,11 @@ proc_casereader_read (struct casereader *reader UNUSED, void *ds_, /* Write case to replacement active file. */ ds->cases_written++; - if (ds->sink != NULL) + if (ds->sink != NULL) { struct ccase tmp; - if (ds->compactor != NULL) - { - case_create (&tmp, dict_get_compacted_value_cnt (ds->dict)); - dict_compactor_compact (ds->compactor, &tmp, c); - } + if (ds->compactor != NULL) + case_map_execute (ds->compactor, c, &tmp); else case_clone (&tmp, c); casewriter_write (ds->sink, &tmp); @@ -268,7 +262,7 @@ proc_casereader_read (struct casereader *reader UNUSED, void *ds_, if (ds->temporary_trns_chain != NULL) { retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE, - c, &ds->cases_written); + c, ds->cases_written); if (retval != TRNS_CONTINUE) { case_destroy (c); @@ -280,11 +274,7 @@ proc_casereader_read (struct casereader *reader UNUSED, void *ds_, } } -/* Closes dataset DS for reading. - Returns true if successful, false if an I/O error occurred - while reading or closing the data set. - If DS has not been opened, returns true without doing - anything else. */ +/* "destroy" function for procedure casereader. */ static void proc_casereader_destroy (struct casereader *reader, void *ds_) { @@ -309,7 +299,7 @@ proc_casereader_destroy (struct casereader *reader, void *ds_) false, but the replacement active file may still be untainted.) */ bool -proc_commit (struct dataset *ds) +proc_commit (struct dataset *ds) { assert (ds->proc_state == PROC_CLOSED); ds->proc_state = PROC_COMMITTED; @@ -322,27 +312,28 @@ proc_commit (struct dataset *ds) /* Dictionary from before TEMPORARY becomes permanent. */ proc_cancel_temporary_transformations (ds); - if (!ds->discard_output) + if (!ds->discard_output) { /* Finish compacting. */ - if (ds->compactor != NULL) + if (ds->compactor != NULL) { - dict_compactor_destroy (ds->compactor); - dict_compact_values (ds->dict); + case_map_destroy (ds->compactor); ds->compactor = NULL; + + dict_delete_scratch_vars (ds->dict); + dict_compact_values (ds->dict); } - + /* Old data sink becomes new data source. */ - if (ds->sink != NULL) + if (ds->sink != NULL) ds->source = casewriter_make_reader (ds->sink); } - else + else { ds->source = NULL; - ds->discard_output = false; + ds->discard_output = false; } ds->sink = NULL; - if ( ds->replace_source) ds->replace_source (ds->source); caseinit_clear (ds->caseinit); caseinit_mark_as_preinited (ds->caseinit, ds->dict); @@ -352,7 +343,8 @@ proc_commit (struct dataset *ds) return proc_cancel_all_transformations (ds) && ds->ok; } -static struct casereader_class proc_casereader_class = +/* Casereader class for procedure execution. */ +static const struct casereader_class proc_casereader_class = { proc_casereader_read, proc_casereader_destroy, @@ -392,6 +384,10 @@ proc_capture_transformations (struct dataset *ds) assert (ds->temporary_trns_chain == NULL); chain = ds->permanent_trns_chain; ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create (); + + if ( ds->xform_callback) + ds->xform_callback (false, ds->xform_callback_aux); + return chain; } @@ -402,6 +398,8 @@ void add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux) { trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux); + if ( ds->xform_callback) + ds->xform_callback (true, ds->xform_callback_aux); } /* Adds a transformation that processes a case with PROC and @@ -416,6 +414,9 @@ add_transformation_with_finalizer (struct dataset *ds, trns_free_func *free, void *aux) { trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux); + + if ( ds->xform_callback) + ds->xform_callback (true, ds->xform_callback_aux); } /* Returns the index of the next transformation. @@ -450,6 +451,9 @@ proc_start_temporary_transformations (struct dataset *ds) trns_chain_finalize (ds->permanent_trns_chain); ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create (); + + if ( ds->xform_callback) + ds->xform_callback (true, ds->xform_callback_aux); } } @@ -486,11 +490,14 @@ proc_cancel_temporary_transformations (struct dataset *ds) dict_destroy (ds->dict); ds->dict = ds->permanent_dict; ds->permanent_dict = NULL; - if (ds->replace_dict) ds->replace_dict (ds->dict); trns_chain_destroy (ds->temporary_trns_chain); ds->temporary_trns_chain = NULL; + if ( ds->xform_callback) + ds->xform_callback (!trns_chain_is_empty (ds->permanent_trns_chain), + ds->xform_callback_aux); + return true; } else @@ -508,23 +515,33 @@ proc_cancel_all_transformations (struct dataset *ds) ok = trns_chain_destroy (ds->temporary_trns_chain) && ok; ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create (); ds->temporary_trns_chain = NULL; + if ( ds->xform_callback) + ds->xform_callback (false, ds->xform_callback_aux); + return ok; } /* Initializes procedure handling. */ struct dataset * -create_dataset (replace_source_callback *rps, - replace_dictionary_callback *rds) +create_dataset (void) { struct dataset *ds = xzalloc (sizeof(*ds)); ds->dict = dict_create (); ds->caseinit = caseinit_create (); - ds->replace_source = rps; - ds->replace_dict = rds; proc_cancel_all_transformations (ds); return ds; } + +void +dataset_add_transform_change_callback (struct dataset *ds, + transformation_change_callback_func *cb, + void *aux) +{ + ds->xform_callback = cb; + ds->xform_callback_aux = aux; +} + /* Finishes up procedure handling. */ void destroy_dataset (struct dataset *ds) @@ -533,13 +550,16 @@ destroy_dataset (struct dataset *ds) dict_destroy (ds->dict); caseinit_destroy (ds->caseinit); trns_chain_destroy (ds->permanent_trns_chain); + + if ( ds->xform_callback) + ds->xform_callback (false, ds->xform_callback_aux); free (ds); } /* Causes output from the next procedure to be discarded, instead of being preserved for use as input for the next procedure. */ void -proc_discard_output (struct dataset *ds) +proc_discard_output (struct dataset *ds) { ds->discard_output = true; } @@ -555,10 +575,9 @@ proc_discard_active_file (struct dataset *ds) fh_set_default_handle (NULL); ds->n_lag = 0; - + casereader_destroy (ds->source); ds->source = NULL; - if ( ds->replace_source) ds->replace_source (NULL); proc_cancel_all_transformations (ds); } @@ -568,7 +587,7 @@ proc_discard_active_file (struct dataset *ds) void proc_set_active_file (struct dataset *ds, struct casereader *source, - struct dictionary *dict) + struct dictionary *dict) { assert (ds->proc_state == PROC_COMMITTED); assert (ds->dict != dict); @@ -577,7 +596,6 @@ proc_set_active_file (struct dataset *ds, dict_destroy (ds->dict); ds->dict = dict; - if ( ds->replace_dict) ds->replace_dict (dict); proc_set_active_file_data (ds, source); } @@ -585,11 +603,10 @@ proc_set_active_file (struct dataset *ds, /* Replaces the active file's data by READER without replacing the associated dictionary. */ bool -proc_set_active_file_data (struct dataset *ds, struct casereader *reader) +proc_set_active_file_data (struct dataset *ds, struct casereader *reader) { casereader_destroy (ds->source); ds->source = reader; - if (ds->replace_source) ds->replace_source (reader); caseinit_clear (ds->caseinit); caseinit_mark_as_preinited (ds->caseinit, ds->dict); @@ -600,32 +617,43 @@ proc_set_active_file_data (struct dataset *ds, struct casereader *reader) /* Returns true if an active file data source is available, false otherwise. */ bool -proc_has_active_file (const struct dataset *ds) +proc_has_active_file (const struct dataset *ds) { return ds->source != NULL; } +/* Returns the active file data source from DS, or a null pointer + if DS has no data source, and removes it from DS. */ +struct casereader * +proc_extract_active_file_data (struct dataset *ds) +{ + struct casereader *reader = ds->source; + ds->source = NULL; + + return reader; +} + /* Checks whether DS has a corrupted active file. If so, discards it and returns false. If not, returns true without doing anything. */ bool -dataset_end_of_command (struct dataset *ds) +dataset_end_of_command (struct dataset *ds) { - if (ds->source != NULL) + if (ds->source != NULL) { - if (casereader_error (ds->source)) + if (casereader_error (ds->source)) { proc_discard_active_file (ds); return false; } - else + else { const struct taint *taint = casereader_get_taint (ds->source); taint_reset_successor_taint ((struct taint *) taint); assert (!taint_has_tainted_successor (taint)); } } - return true; + return true; } static trns_proc_func case_limit_trns_proc; @@ -636,10 +664,10 @@ static trns_free_func case_limit_trns_free; static void add_case_limit_trns (struct dataset *ds) { - size_t case_limit = dict_get_case_limit (ds->dict); + casenumber case_limit = dict_get_case_limit (ds->dict); if (case_limit != 0) { - size_t *cases_remaining = xmalloc (sizeof *cases_remaining); + casenumber *cases_remaining = xmalloc (sizeof *cases_remaining); *cases_remaining = case_limit; add_transformation (ds, case_limit_trns_proc, case_limit_trns_free, cases_remaining); @@ -706,7 +734,13 @@ dataset_dict (const struct dataset *ds) return ds->dict; } -void +const struct casereader * +dataset_source (const struct dataset *ds) +{ + return ds->source; +} + +void dataset_need_lag (struct dataset *ds, int n_before) { ds->n_lag = MAX (ds->n_lag, n_before);