X-Git-Url: https://pintos-os.org/cgi-bin/gitweb.cgi?a=blobdiff_plain;f=src%2Fdata%2Fprocedure.c;h=a45f497afd1268012a77064a5d49b33839c974f8;hb=ac6bc481cef04a4f69a55075d09927c8617be14b;hp=60be6b47efac3440e2173436ddeb9b7bf4f70eed;hpb=6e352378b703f57313fdd0f628b99b19ff25d055;p=pspp-builds.git diff --git a/src/data/procedure.c b/src/data/procedure.c index 60be6b47..a45f497a 100644 --- a/src/data/procedure.c +++ b/src/data/procedure.c @@ -1,5 +1,5 @@ /* PSPP - a program for statistical analysis. - Copyright (C) 1997-9, 2000, 2006, 2007 Free Software Foundation, Inc. + Copyright (C) 1997-9, 2000, 2006, 2007, 2009, 2010 Free Software Foundation, Inc. This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -16,27 +16,32 @@ #include +#include "data/procedure.h" + #include #include #include #include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - +#include "data/case.h" +#include "data/case-map.h" +#include "data/caseinit.h" +#include "data/casereader.h" +#include "data/casereader-provider.h" +#include "data/casereader-shim.h" +#include "data/casewriter.h" +#include "data/dictionary.h" +#include "data/file-handle-def.h" +#include "data/transformations.h" +#include "data/variable.h" +#include "libpspp/deque.h" +#include "libpspp/misc.h" +#include "libpspp/str.h" +#include "libpspp/taint.h" +#include "libpspp/i18n.h" + +#include "gl/minmax.h" +#include "gl/xalloc.h" struct dataset { /* Cases are read from source, @@ -55,13 +60,6 @@ struct dataset { struct trns_chain *temporary_trns_chain; struct dictionary *dict; - /* Callback which occurs when a procedure provides a new source for - the dataset */ - replace_source_callback *replace_source ; - - /* Callback which occurs whenever the DICT is replaced by a new one */ - replace_dictionary_callback *replace_dict; - /* Callback which occurs whenever the transformation chain(s) have been modified */ transformation_change_callback_func *xform_callback; @@ -75,9 +73,9 @@ struct dataset { added to. */ struct trns_chain *cur_trns_chain; - /* The compactor used to compact a case, if necessary; + /* The case map used to compact a case, if necessary; otherwise a null pointer. */ - struct dict_compactor *compactor; + struct case_map *compactor; /* Time at which proc was last invoked. */ time_t last_proc_invocation; @@ -85,7 +83,7 @@ struct dataset { /* Cases just before ("lagging") the current one. */ int n_lag; /* Number of cases to lag. */ struct deque lag; /* Deque of lagged cases. */ - struct ccase *lag_cases; /* Lagged cases managed by deque. */ + struct ccase **lag_cases; /* Lagged cases managed by deque. */ /* Procedure data. */ enum @@ -96,8 +94,15 @@ struct dataset { but proc_commit not yet called. */ } proc_state; - casenumber cases_written; /* Cases output so far. */ - bool ok; /* Error status. */ + casenumber cases_written; /* Cases output so far. */ + bool ok; /* Error status. */ + struct casereader_shim *shim; /* Shim on proc_open() casereader. */ + + void (*callback) (void *); /* Callback for when the dataset changes */ + void *cb_data; + + /* Default encoding for reading syntax files. */ + char *syntax_encoding; }; /* struct dataset */ @@ -105,9 +110,36 @@ static void add_case_limit_trns (struct dataset *ds); static void add_filter_trns (struct dataset *ds); static void update_last_proc_invocation (struct dataset *ds); + +static void +dataset_set_unsaved (const struct dataset *ds) +{ + if (ds->callback) ds->callback (ds->cb_data); +} + /* Public functions. */ +void +dataset_set_callback (struct dataset *ds, void (*cb) (void *), void *cb_data) +{ + ds->callback = cb; + ds->cb_data = cb_data; +} + +void +dataset_set_default_syntax_encoding (struct dataset *ds, const char *encoding) +{ + free (ds->syntax_encoding); + ds->syntax_encoding = xstrdup (encoding); +} + +const char * +dataset_get_default_syntax_encoding (const struct dataset *ds) +{ + return ds->syntax_encoding; +} + /* Returns the last time the data was read. */ time_t time_of_last_procedure (struct dataset *ds) @@ -143,13 +175,19 @@ proc_execute (struct dataset *ds) return proc_commit (ds) && ok; } -static struct casereader_class proc_casereader_class; +static const struct casereader_class proc_casereader_class; + +/* Opens dataset DS for reading cases with proc_read. If FILTER is true, then + cases filtered out with FILTER BY will not be included in the casereader + (which is usually desirable). If FILTER is false, all cases will be + included regardless of FILTER BY settings. -/* Opens dataset DS for reading cases with proc_read. proc_commit must be called when done. */ struct casereader * -proc_open (struct dataset *ds) +proc_open_filtering (struct dataset *ds, bool filter) { + struct casereader *reader; + assert (ds->source != NULL); assert (ds->proc_state == PROC_COMMITTED); @@ -159,7 +197,8 @@ proc_open (struct dataset *ds) /* Finish up the collection of transformations. */ add_case_limit_trns (ds); - add_filter_trns (ds); + if (filter) + add_filter_trns (ds); trns_chain_finalize (ds->cur_trns_chain); /* Make permanent_dict refer to the dictionary right before @@ -172,11 +211,19 @@ proc_open (struct dataset *ds) { struct dictionary *pd = ds->permanent_dict; size_t compacted_value_cnt = dict_count_values (pd, 1u << DC_SCRATCH); - bool should_compact = compacted_value_cnt < dict_get_next_value_idx (pd); - ds->compactor = (should_compact - ? dict_make_compactor (pd, 1u << DC_SCRATCH) - : NULL); - ds->sink = autopaging_writer_create (compacted_value_cnt); + if (compacted_value_cnt < dict_get_next_value_idx (pd)) + { + struct caseproto *compacted_proto; + compacted_proto = dict_get_compacted_proto (pd, 1u << DC_SCRATCH); + ds->compactor = case_map_to_compact_dict (pd, 1u << DC_SCRATCH); + ds->sink = autopaging_writer_create (compacted_proto); + caseproto_unref (compacted_proto); + } + else + { + ds->compactor = NULL; + ds->sink = autopaging_writer_create (dict_get_proto (pd)); + } } else { @@ -194,10 +241,27 @@ proc_open (struct dataset *ds) /* FIXME: use taint in dataset in place of `ok'? */ /* FIXME: for trivial cases we can just return a clone of ds->source? */ - return casereader_create_sequential (NULL, - dict_get_next_value_idx (ds->dict), - CASENUMBER_MAX, - &proc_casereader_class, ds); + + /* Create casereader and insert a shim on top. The shim allows us to + arbitrarily extend the casereader's lifetime, by slurping the cases into + the shim's buffer in proc_commit(). That is especially useful when output + table_items are generated directly from the procedure casereader (e.g. by + the LIST procedure) when we are using an output driver that keeps a + reference to the output items passed to it (e.g. the GUI output driver in + PSPPIRE). */ + reader = casereader_create_sequential (NULL, dict_get_proto (ds->dict), + CASENUMBER_MAX, + &proc_casereader_class, ds); + ds->shim = casereader_shim_insert (reader); + return reader; +} + +/* Opens dataset DS for reading cases with proc_read. + proc_commit must be called when done. */ +struct casereader * +proc_open (struct dataset *ds) +{ + return proc_open_filtering (ds, true); } /* Returns true if a procedure is in progress, that is, if @@ -209,15 +273,15 @@ proc_is_open (const struct dataset *ds) } /* "read" function for procedure casereader. */ -static bool -proc_casereader_read (struct casereader *reader UNUSED, void *ds_, - struct ccase *c) +static struct ccase * +proc_casereader_read (struct casereader *reader UNUSED, void *ds_) { struct dataset *ds = ds_; enum trns_result retval = TRNS_DROP_CASE; + struct ccase *c; assert (ds->proc_state == PROC_OPEN); - for (;;) + for (; ; case_unref (c)) { casenumber case_nr; @@ -225,61 +289,47 @@ proc_casereader_read (struct casereader *reader UNUSED, void *ds_, if (retval == TRNS_ERROR) ds->ok = false; if (!ds->ok) - return false; + return NULL; /* Read a case from source. */ - if (!casereader_read (ds->source, c)) - return false; - case_resize (c, dict_get_next_value_idx (ds->dict)); + c = casereader_read (ds->source); + if (c == NULL) + return NULL; + c = case_unshare_and_resize (c, dict_get_proto (ds->dict)); caseinit_init_vars (ds->caseinit, c); /* Execute permanent transformations. */ case_nr = ds->cases_written + 1; retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE, - c, case_nr); + &c, case_nr); caseinit_update_left_vars (ds->caseinit, c); if (retval != TRNS_CONTINUE) - { - case_destroy (c); - continue; - } + continue; /* Write case to collection of lagged cases. */ if (ds->n_lag > 0) { while (deque_count (&ds->lag) >= ds->n_lag) - case_destroy (&ds->lag_cases[deque_pop_back (&ds->lag)]); - case_clone (&ds->lag_cases[deque_push_front (&ds->lag)], c); + case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]); + ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c); } /* Write case to replacement active file. */ ds->cases_written++; if (ds->sink != NULL) - { - struct ccase tmp; - if (ds->compactor != NULL) - { - case_create (&tmp, casewriter_get_value_cnt (ds->sink)); - dict_compactor_compact (ds->compactor, &tmp, c); - } - else - case_clone (&tmp, c); - casewriter_write (ds->sink, &tmp); - } + casewriter_write (ds->sink, + case_map_execute (ds->compactor, case_ref (c))); /* Execute temporary transformations. */ if (ds->temporary_trns_chain != NULL) { retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE, - c, ds->cases_written); + &c, ds->cases_written); if (retval != TRNS_CONTINUE) - { - case_destroy (c); - continue; - } + continue; } - return true; + return c; } } @@ -288,13 +338,18 @@ static void proc_casereader_destroy (struct casereader *reader, void *ds_) { struct dataset *ds = ds_; - struct ccase c; + struct ccase *c; + + /* We are always the subreader for a casereader_buffer, so if we're being + destroyed then it's because the casereader_buffer has read all the cases + that it ever will. */ + ds->shim = NULL; /* Make sure transformations happen for every input case, in case they have side effects, and ensure that the replacement active file gets all the cases it should. */ - while (casereader_read (reader, &c)) - case_destroy (&c); + while ((c = casereader_read (reader)) != NULL) + case_unref (c); ds->proc_state = PROC_CLOSED; ds->ok = casereader_destroy (ds->source) && ds->ok; @@ -310,12 +365,17 @@ proc_casereader_destroy (struct casereader *reader, void *ds_) bool proc_commit (struct dataset *ds) { + if (ds->shim != NULL) + casereader_shim_slurp (ds->shim); + assert (ds->proc_state == PROC_CLOSED); ds->proc_state = PROC_COMMITTED; + dataset_set_unsaved (ds); + /* Free memory for lagged cases. */ while (!deque_is_empty (&ds->lag)) - case_destroy (&ds->lag_cases[deque_pop_back (&ds->lag)]); + case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]); free (ds->lag_cases); /* Dictionary from before TEMPORARY becomes permanent. */ @@ -326,7 +386,7 @@ proc_commit (struct dataset *ds) /* Finish compacting. */ if (ds->compactor != NULL) { - dict_compactor_destroy (ds->compactor); + case_map_destroy (ds->compactor); ds->compactor = NULL; dict_delete_scratch_vars (ds->dict); @@ -343,7 +403,6 @@ proc_commit (struct dataset *ds) ds->discard_output = false; } ds->sink = NULL; - if ( ds->replace_source) ds->replace_source (ds->source); caseinit_clear (ds->caseinit); caseinit_mark_as_preinited (ds->caseinit, ds->dict); @@ -354,7 +413,7 @@ proc_commit (struct dataset *ds) } /* Casereader class for procedure execution. */ -static struct casereader_class proc_casereader_class = +static const struct casereader_class proc_casereader_class = { proc_casereader_read, proc_casereader_destroy, @@ -371,14 +430,14 @@ update_last_proc_invocation (struct dataset *ds) /* Returns a pointer to the lagged case from N_BEFORE cases before the current one, or NULL if there haven't been that many cases yet. */ -struct ccase * +const struct ccase * lagged_case (const struct dataset *ds, int n_before) { assert (n_before >= 1); assert (n_before <= ds->n_lag); if (n_before <= deque_count (&ds->lag)) - return &ds->lag_cases[deque_front (&ds->lag, n_before - 1)]; + return ds->lag_cases[deque_front (&ds->lag, n_before - 1)]; else return NULL; } @@ -500,7 +559,6 @@ proc_cancel_temporary_transformations (struct dataset *ds) dict_destroy (ds->dict); ds->dict = ds->permanent_dict; ds->permanent_dict = NULL; - if (ds->replace_dict) ds->replace_dict (ds->dict); trns_chain_destroy (ds->temporary_trns_chain); ds->temporary_trns_chain = NULL; @@ -532,16 +590,30 @@ proc_cancel_all_transformations (struct dataset *ds) return ok; } + +static void +dict_callback (struct dictionary *d UNUSED, void *ds_) +{ + struct dataset *ds = ds_; + dataset_set_unsaved (ds); +} + /* Initializes procedure handling. */ struct dataset * -create_dataset (transformation_change_callback_func *cb, void *aux) +create_dataset (void) { struct dataset *ds = xzalloc (sizeof(*ds)); ds->dict = dict_create (); + + dict_set_change_callback (ds->dict, dict_callback, ds); + + dict_set_encoding (ds->dict, get_default_encoding ()); + ds->caseinit = caseinit_create (); - ds->xform_callback = cb; - ds->xform_callback_aux = aux; proc_cancel_all_transformations (ds); + + ds->syntax_encoding = xstrdup ("Auto"); + return ds; } @@ -566,6 +638,8 @@ destroy_dataset (struct dataset *ds) if ( ds->xform_callback) ds->xform_callback (false, ds->xform_callback_aux); + + free (ds->syntax_encoding); free (ds); } @@ -591,7 +665,6 @@ proc_discard_active_file (struct dataset *ds) casereader_destroy (ds->source); ds->source = NULL; - if ( ds->replace_source) ds->replace_source (NULL); proc_cancel_all_transformations (ds); } @@ -610,7 +683,7 @@ proc_set_active_file (struct dataset *ds, dict_destroy (ds->dict); ds->dict = dict; - if ( ds->replace_dict) ds->replace_dict (dict); + dict_set_change_callback (ds->dict, dict_callback, ds); proc_set_active_file_data (ds, source); } @@ -622,7 +695,6 @@ proc_set_active_file_data (struct dataset *ds, struct casereader *reader) { casereader_destroy (ds->source); ds->source = reader; - if (ds->replace_source) ds->replace_source (reader); caseinit_clear (ds->caseinit); caseinit_mark_as_preinited (ds->caseinit, ds->dict); @@ -638,6 +710,17 @@ proc_has_active_file (const struct dataset *ds) return ds->source != NULL; } +/* Returns the active file data source from DS, or a null pointer + if DS has no data source, and removes it from DS. */ +struct casereader * +proc_extract_active_file_data (struct dataset *ds) +{ + struct casereader *reader = ds->source; + ds->source = NULL; + + return reader; +} + /* Checks whether DS has a corrupted active file. If so, discards it and returns false. If not, returns true without doing anything. */ @@ -654,7 +737,7 @@ dataset_end_of_command (struct dataset *ds) else { const struct taint *taint = casereader_get_taint (ds->source); - taint_reset_successor_taint ((struct taint *) taint); + taint_reset_successor_taint (CONST_CAST (struct taint *, taint)); assert (!taint_has_tainted_successor (taint)); } } @@ -669,10 +752,10 @@ static trns_free_func case_limit_trns_free; static void add_case_limit_trns (struct dataset *ds) { - size_t case_limit = dict_get_case_limit (ds->dict); + casenumber case_limit = dict_get_case_limit (ds->dict); if (case_limit != 0) { - size_t *cases_remaining = xmalloc (sizeof *cases_remaining); + casenumber *cases_remaining = xmalloc (sizeof *cases_remaining); *cases_remaining = case_limit; add_transformation (ds, case_limit_trns_proc, case_limit_trns_free, cases_remaining); @@ -684,7 +767,7 @@ add_case_limit_trns (struct dataset *ds) *CASES_REMAINING. */ static int case_limit_trns_proc (void *cases_remaining_, - struct ccase *c UNUSED, casenumber case_nr UNUSED) + struct ccase **c UNUSED, casenumber case_nr UNUSED) { size_t *cases_remaining = cases_remaining_; if (*cases_remaining > 0) @@ -723,11 +806,11 @@ add_filter_trns (struct dataset *ds) /* FILTER transformation. */ static int filter_trns_proc (void *filter_var_, - struct ccase *c UNUSED, casenumber case_nr UNUSED) + struct ccase **c UNUSED, casenumber case_nr UNUSED) { struct variable *filter_var = filter_var_; - double f = case_num (c, filter_var); + double f = case_num (*c, filter_var); return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY) ? TRNS_CONTINUE : TRNS_DROP_CASE); }