X-Git-Url: https://pintos-os.org/cgi-bin/gitweb.cgi?a=blobdiff_plain;f=src%2Fdata%2Fprocedure.c;h=7a9b432132c09158ce849147801f4a17bc4f64a4;hb=41a3a550334da96a9b4e5e089ad1768acf288092;hp=4354aaef8a624e98221f850dbf40d5f9627b2da7;hpb=68af3306969829d17a05cfab5c9d46cb920b7607;p=pspp-builds.git diff --git a/src/data/procedure.c b/src/data/procedure.c index 4354aaef..7a9b4321 100644 --- a/src/data/procedure.c +++ b/src/data/procedure.c @@ -35,6 +35,7 @@ #include #include #include +#include #include #include @@ -75,11 +76,10 @@ struct dataset { /* Time at which proc was last invoked. */ time_t last_proc_invocation; - /* Lag queue. */ + /* Cases just before ("lagging") the current one. */ int n_lag; /* Number of cases to lag. */ - int lag_count; /* Number of cases in lag_queue so far. */ - int lag_head; /* Index where next case will be added. */ - struct ccase *lag_queue; /* Array of n_lag ccase * elements. */ + struct deque lag; /* Deque of lagged cases. */ + struct ccase *lag_cases; /* Lagged cases managed by deque. */ /* Procedure data. */ bool is_open; /* Procedure open? */ @@ -100,7 +100,6 @@ static bool internal_procedure (struct dataset *ds, case_func *, static void update_last_proc_invocation (struct dataset *ds); static void create_trns_case (struct ccase *, struct dictionary *); static void open_active_file (struct dataset *ds); -static void lag_case (struct dataset *ds, const struct ccase *c); static void clear_case (const struct dataset *ds, struct ccase *c); static bool close_active_file (struct dataset *ds); @@ -233,8 +232,6 @@ internal_procedure (struct dataset *ds, case_func *proc, if ( proc_close (ds) && ok ) { - if ( ds->replace_source ) - ds->replace_source (ds->proc_source); return true; } @@ -296,9 +293,14 @@ proc_read (struct dataset *ds, struct ccase **c) if (retval != TRNS_CONTINUE) continue; - /* Write case to LAG queue. */ - if (ds->n_lag) - lag_case (ds, &ds->trns_case); + /* Write case to collection of lagged cases. */ + if (ds->n_lag > 0) + { + while (deque_count (&ds->lag) >= ds->n_lag) + case_destroy (&ds->lag_cases[deque_pop_back (&ds->lag)]); + case_clone (&ds->lag_cases[deque_push_front (&ds->lag)], + &ds->trns_case); + } /* Write case to replacement active file. */ ds->cases_written++; @@ -347,7 +349,7 @@ proc_close (struct dataset *ds) break; } ds->ok = free_case_source (ds->proc_source) && ds->ok; - ds->proc_source = NULL; + proc_set_source (ds, NULL); case_destroy (&ds->sink_case); case_destroy (&ds->trns_case); @@ -418,29 +420,8 @@ open_active_file (struct dataset *ds) if (ds->proc_sink->class->open != NULL) ds->proc_sink->class->open (ds->proc_sink); - /* Allocate memory for lag queue. */ - if (ds->n_lag > 0) - { - int i; - - ds->lag_count = 0; - ds->lag_head = 0; - ds->lag_queue = xnmalloc (ds->n_lag, sizeof *ds->lag_queue); - for (i = 0; i < ds->n_lag; i++) - case_nullify (&ds->lag_queue[i]); - } -} - -/* Add C to the lag queue. */ -static void -lag_case (struct dataset *ds, const struct ccase *c) -{ - if (ds->lag_count < ds->n_lag) - ds->lag_count++; - case_destroy (&ds->lag_queue[ds->lag_head]); - case_clone (&ds->lag_queue[ds->lag_head], c); - if (++ds->lag_head >= ds->n_lag) - ds->lag_head = 0; + /* Allocate memory for lagged cases. */ + ds->lag_cases = deque_init (&ds->lag, ds->n_lag, sizeof *ds->lag_cases); } /* Clears the variables in C that need to be cleared between @@ -468,16 +449,10 @@ clear_case (const struct dataset *ds, struct ccase *c) static bool close_active_file (struct dataset *ds) { - /* Free memory for lag queue, and turn off lagging. */ - if (ds->n_lag > 0) - { - int i; - - for (i = 0; i < ds->n_lag; i++) - case_destroy (&ds->lag_queue[i]); - free (ds->lag_queue); - ds->n_lag = 0; - } + /* Free memory for lagged cases. */ + while (!deque_is_empty (&ds->lag)) + case_destroy (&ds->lag_cases[deque_pop_back (&ds->lag)]); + free (ds->lag_cases); /* Dictionary from before TEMPORARY becomes permanent. */ proc_cancel_temporary_transformations (ds); @@ -492,7 +467,7 @@ close_active_file (struct dataset *ds) /* Old data sink becomes new data source. */ if (ds->proc_sink->class->make_source != NULL) - ds->proc_source = ds->proc_sink->class->make_source (ds->proc_sink); + proc_set_source (ds, ds->proc_sink->class->make_source (ds->proc_sink) ); free_case_sink (ds->proc_sink); ds->proc_sink = NULL; @@ -506,16 +481,11 @@ close_active_file (struct dataset *ds) struct ccase * lagged_case (const struct dataset *ds, int n_before) { - assert (n_before >= 1 ); + assert (n_before >= 1); assert (n_before <= ds->n_lag); - if (n_before <= ds->lag_count) - { - int index = ds->lag_head - n_before; - if (index < 0) - index += ds->n_lag; - return &ds->lag_queue[index]; - } + if (n_before <= deque_count (&ds->lag)) + return &ds->lag_cases[deque_front (&ds->lag, n_before - 1)]; else return NULL; } @@ -727,10 +697,7 @@ discard_variables (struct dataset *ds) ds->n_lag = 0; free_case_source (ds->proc_source); - ds->proc_source = NULL; - if ( ds->replace_source ) - ds->replace_source (ds->proc_source); - + proc_set_source (ds, NULL); proc_cancel_all_transformations (ds); } @@ -902,8 +869,10 @@ proc_set_sink (struct dataset *ds, struct case_sink *sink) void proc_set_source (struct dataset *ds, struct case_source *source) { - assert (ds->proc_source == NULL); ds->proc_source = source; + + if ( ds->replace_source ) + ds->replace_source (ds->proc_source); } /* Returns true if a source for the next procedure has been @@ -931,7 +900,7 @@ proc_capture_output (struct dataset *ds) assert (!proc_in_temporary_transformations (ds)); casefile = storage_source_decapsulate (ds->proc_source); - ds->proc_source = NULL; + proc_set_source (ds, NULL); return casefile; } @@ -1031,19 +1000,12 @@ dataset_set_dict (struct dataset *ds, struct dictionary *dict) dict_destroy (old_dict); } -int -dataset_n_lag (const struct dataset *ds) -{ - return ds->n_lag; -} - -void -dataset_set_n_lag (struct dataset *ds, int n_lag) +void +dataset_need_lag (struct dataset *ds, int n_before) { - ds->n_lag = n_lag; + ds->n_lag = MAX (ds->n_lag, n_before); } - struct casefile_factory * dataset_get_casefile_factory (const struct dataset *ds) {