/* PSPP - computes sample statistics.
- Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
+ Copyright (C) 1997-9, 2000, 2006, 2007 Free Software Foundation, Inc.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License as
#include <data/case-source.h>
#include <data/case-sink.h>
#include <data/case.h>
+#include <data/casedeque.h>
#include <data/casefile.h>
#include <data/fastfile.h>
#include <data/dictionary.h>
/* Time at which proc was last invoked. */
time_t last_proc_invocation;
- /* Lag queue. */
+ /* Cases just before ("lagging") the current one. */
int n_lag; /* Number of cases to lag. */
- int lag_count; /* Number of cases in lag_queue so far. */
- int lag_head; /* Index where next case will be added. */
- struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
+ struct casedeque lagged_cases; /* Lagged cases. */
/* Procedure data. */
bool is_open; /* Procedure open? */
static void update_last_proc_invocation (struct dataset *ds);
static void create_trns_case (struct ccase *, struct dictionary *);
static void open_active_file (struct dataset *ds);
-static void lag_case (struct dataset *ds, const struct ccase *c);
static void clear_case (const struct dataset *ds, struct ccase *c);
static bool close_active_file (struct dataset *ds);
\f
/* Returns the last time the data was read. */
time_t
-time_of_last_procedure (struct dataset *ds)
+time_of_last_procedure (struct dataset *ds)
{
if (ds->last_proc_invocation == 0)
update_last_proc_invocation (ds);
start the next case from step 1.
2. Write case to replacement active file.
-
+
3. Execute temporary transformations. If these drop the case,
start the next case from step 1.
-
+
4. Pass case to PROC_FUNC, passing AUX as auxiliary data.
Returns true if successful, false if an I/O error occurred. */
struct multipass_aux_data
{
struct casefile *casefile;
-
+
bool (*proc_func) (const struct casefile *, void *aux);
void *aux;
};
if ( proc_close (ds) && ok )
{
- if ( ds->replace_source )
- ds->replace_source (ds->proc_source);
return true;
}
&ds->trns_case, &case_nr);
if (retval != TRNS_CONTINUE)
continue;
-
- /* Write case to LAG queue. */
- if (ds->n_lag)
- lag_case (ds, &ds->trns_case);
+
+ /* Write case to collection of lagged cases. */
+ if (ds->n_lag > 0)
+ {
+ while (casedeque_count (&ds->lagged_cases) >= ds->n_lag)
+ case_destroy (casedeque_pop_back (&ds->lagged_cases));
+ case_clone (casedeque_push_front (&ds->lagged_cases),
+ &ds->trns_case);
+ }
/* Write case to replacement active file. */
ds->cases_written++;
else
ds->proc_sink->class->write (ds->proc_sink, &ds->trns_case);
}
-
+
/* Execute temporary transformations. */
if (ds->temporary_trns_chain != NULL)
{
break;
}
ds->ok = free_case_source (ds->proc_source) && ds->ok;
- ds->proc_source = NULL;
+ proc_set_source (ds, NULL);
case_destroy (&ds->sink_case);
case_destroy (&ds->trns_case);
if (ds->proc_sink->class->open != NULL)
ds->proc_sink->class->open (ds->proc_sink);
- /* Allocate memory for lag queue. */
- if (ds->n_lag > 0)
- {
- int i;
-
- ds->lag_count = 0;
- ds->lag_head = 0;
- ds->lag_queue = xnmalloc (ds->n_lag, sizeof *ds->lag_queue);
- for (i = 0; i < ds->n_lag; i++)
- case_nullify (&ds->lag_queue[i]);
- }
-}
-
-/* Add C to the lag queue. */
-static void
-lag_case (struct dataset *ds, const struct ccase *c)
-{
- if (ds->lag_count < ds->n_lag)
- ds->lag_count++;
- case_destroy (&ds->lag_queue[ds->lag_head]);
- case_clone (&ds->lag_queue[ds->lag_head], c);
- if (++ds->lag_head >= ds->n_lag)
- ds->lag_head = 0;
+ /* Allocate memory for lagged cases. */
+ casedeque_init (&ds->lagged_cases, ds->n_lag);
}
/* Clears the variables in C that need to be cleared between
{
size_t var_cnt = dict_get_var_cnt (ds->dict);
size_t i;
-
+
for (i = 0; i < var_cnt; i++)
{
struct variable *v = dict_get_var (ds->dict, i);
static bool
close_active_file (struct dataset *ds)
{
- /* Free memory for lag queue, and turn off lagging. */
- if (ds->n_lag > 0)
- {
- int i;
-
- for (i = 0; i < ds->n_lag; i++)
- case_destroy (&ds->lag_queue[i]);
- free (ds->lag_queue);
- ds->n_lag = 0;
- }
-
+ /* Free memory for lagged cases. */
+ while (!casedeque_is_empty (&ds->lagged_cases))
+ case_destroy (casedeque_pop_back (&ds->lagged_cases));
+ casedeque_destroy (&ds->lagged_cases);
+
/* Dictionary from before TEMPORARY becomes permanent. */
proc_cancel_temporary_transformations (ds);
dict_compact_values (ds->dict);
ds->compactor = NULL;
}
-
+
/* Old data sink becomes new data source. */
if (ds->proc_sink->class->make_source != NULL)
- ds->proc_source = ds->proc_sink->class->make_source (ds->proc_sink);
+ proc_set_source (ds, ds->proc_sink->class->make_source (ds->proc_sink) );
free_case_sink (ds->proc_sink);
ds->proc_sink = NULL;
struct ccase *
lagged_case (const struct dataset *ds, int n_before)
{
- assert (n_before >= 1 );
+ assert (n_before >= 1);
assert (n_before <= ds->n_lag);
- if (n_before <= ds->lag_count)
- {
- int index = ds->lag_head - n_before;
- if (index < 0)
- index += ds->n_lag;
- return &ds->lag_queue[index];
- }
+ if (n_before <= casedeque_count (&ds->lagged_cases))
+ return casedeque_front (&ds->lagged_cases, n_before - 1);
else
return NULL;
}
If SPLIT FILE is not in effect, then there is one break group
(if the active file is nonempty), and BEGIN_FUNC and END_FUNC
will be called once.
-
+
Returns true if successful, false if an I/O error occurred. */
bool
procedure_with_splits (struct dataset *ds,
multipass_split_output (struct multipass_split_aux_data *aux, const struct dataset *ds)
{
bool ok;
-
+
assert (aux->casefile != NULL);
ok = aux->split (&aux->prev_case, aux->casefile, aux->func_aux, ds);
casefile_destroy (aux->casefile);
fh_set_default_handle (NULL);
ds->n_lag = 0;
-
- free_case_source (ds->proc_source);
- ds->proc_source = NULL;
- if ( ds->replace_source )
- ds->replace_source (ds->proc_source);
+ free_case_source (ds->proc_source);
+ proc_set_source (ds, NULL);
proc_cancel_all_transformations (ds);
}
proc_capture_transformations (struct dataset *ds)
{
struct trns_chain *chain;
-
+
assert (ds->temporary_trns_chain == NULL);
chain = ds->permanent_trns_chain;
ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
add_case_limit_trns (ds);
ds->permanent_dict = dict_clone (ds->dict);
+
trns_chain_finalize (ds->permanent_trns_chain);
ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
}
{
if (proc_in_temporary_transformations (ds))
{
- dict_destroy (ds->dict);
- ds->dict = ds->permanent_dict;
+ dataset_set_dict (ds, ds->permanent_dict);
ds->permanent_dict = NULL;
trns_chain_destroy (ds->temporary_trns_chain);
void
proc_set_source (struct dataset *ds, struct case_source *source)
{
- assert (ds->proc_source == NULL);
ds->proc_source = source;
+
+ if ( ds->replace_source )
+ ds->replace_source (ds->proc_source);
}
/* Returns true if a source for the next procedure has been
assert (!proc_in_temporary_transformations (ds));
casefile = storage_source_decapsulate (ds->proc_source);
- ds->proc_source = NULL;
+ proc_set_source (ds, NULL);
return casefile;
}
}
+/* Set or replace dataset DS's dictionary with DICT.
+ The old dictionary is destroyed */
void
dataset_set_dict (struct dataset *ds, struct dictionary *dict)
{
+ struct dictionary *old_dict = ds->dict;
+
dict_copy_callbacks (dict, ds->dict);
ds->dict = dict;
if ( ds->replace_dict )
ds->replace_dict (dict);
-}
-int
-dataset_n_lag (const struct dataset *ds)
-{
- return ds->n_lag;
+ dict_destroy (old_dict);
}
-void
-dataset_set_n_lag (struct dataset *ds, int n_lag)
+void
+dataset_need_lag (struct dataset *ds, int n_before)
{
- ds->n_lag = n_lag;
+ ds->n_lag = MAX (ds->n_lag, n_before);
}
-
struct casefile_factory *
dataset_get_casefile_factory (const struct dataset *ds)
{