/* PSPP - computes sample statistics.
- Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
+ Copyright (C) 1997-9, 2000, 2006, 2007 Free Software Foundation, Inc.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License as
#include <data/case-source.h>
#include <data/case-sink.h>
#include <data/case.h>
+#include <data/casedeque.h>
#include <data/casefile.h>
#include <data/fastfile.h>
#include <data/dictionary.h>
/* An abstract factory which creates casefiles */
struct casefile_factory *cf_factory;
+ /* Callback which occurs when a procedure provides a new source for
+ the dataset */
+ replace_source_callback *replace_source ;
+
+ /* Callback which occurs whenever the DICT is replaced by a new one */
+ replace_dictionary_callback *replace_dict;
+
/* Cases are read from proc_source,
pass through permanent_trns_chain (which transforms them into
the format described by permanent_dict),
/* Time at which proc was last invoked. */
time_t last_proc_invocation;
- /* Lag queue. */
+ /* Cases just before ("lagging") the current one. */
int n_lag; /* Number of cases to lag. */
- int lag_count; /* Number of cases in lag_queue so far. */
- int lag_head; /* Index where next case will be added. */
- struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
+ struct casedeque lagged_cases; /* Lagged cases. */
/* Procedure data. */
bool is_open; /* Procedure open? */
static void update_last_proc_invocation (struct dataset *ds);
static void create_trns_case (struct ccase *, struct dictionary *);
static void open_active_file (struct dataset *ds);
-static void lag_case (struct dataset *ds, const struct ccase *c);
static void clear_case (const struct dataset *ds, struct ccase *c);
static bool close_active_file (struct dataset *ds);
\f
/* Returns the last time the data was read. */
time_t
-time_of_last_procedure (struct dataset *ds)
+time_of_last_procedure (struct dataset *ds)
{
if (ds->last_proc_invocation == 0)
update_last_proc_invocation (ds);
start the next case from step 1.
2. Write case to replacement active file.
-
+
3. Execute temporary transformations. If these drop the case,
start the next case from step 1.
-
+
4. Pass case to PROC_FUNC, passing AUX as auxiliary data.
Returns true if successful, false if an I/O error occurred. */
\f
/* Multipass procedure. */
-struct multipass_aux_data
+struct multipass_aux_data
{
struct casefile *casefile;
-
+
bool (*proc_func) (const struct casefile *, void *aux);
void *aux;
};
/* Case processing function for multipass_procedure(). */
static bool
-multipass_case_func (const struct ccase *c, void *aux_data_, const struct dataset *ds UNUSED)
+multipass_case_func (const struct ccase *c, void *aux_data_, const struct dataset *ds UNUSED)
{
struct multipass_aux_data *aux_data = aux_data_;
return casefile_append (aux_data->casefile, c);
/* End-of-file function for multipass_procedure(). */
static bool
-multipass_end_func (void *aux_data_, const struct dataset *ds UNUSED)
+multipass_end_func (void *aux_data_, const struct dataset *ds UNUSED)
{
struct multipass_aux_data *aux_data = aux_data_;
return (aux_data->proc_func == NULL
The entire active file is passed to PROC_FUNC, with the given
AUX as auxiliary data, as a unit. */
bool
-multipass_procedure (struct dataset *ds, casefile_func *proc_func, void *aux)
+multipass_procedure (struct dataset *ds, casefile_func *proc_func, void *aux)
{
struct multipass_aux_data aux_data;
bool ok;
return ok;
}
\f
+
/* Procedure implementation. */
/* Executes a procedure.
static bool
internal_procedure (struct dataset *ds, case_func *proc,
end_func *end,
- void *aux)
+ void *aux)
{
struct ccase *c;
bool ok = true;
-
+
proc_open (ds);
while (ok && proc_read (ds, &c))
if (proc != NULL)
ok = proc (c, aux, ds) && ok;
if (end != NULL)
ok = end (aux, ds) && ok;
- return proc_close (ds) && ok;
+
+ if ( proc_close (ds) && ok )
+ {
+
+ return true;
+ }
+
+ return false;
}
/* Opens dataset DS for reading cases with proc_read.
Return false at end of file or if a read error occurs. In
this case a null pointer is stored in *C. */
bool
-proc_read (struct dataset *ds, struct ccase **c)
+proc_read (struct dataset *ds, struct ccase **c)
{
enum trns_result retval = TRNS_DROP_CASE;
assert (ds->is_open);
*c = NULL;
- for (;;)
+ for (;;)
{
size_t case_nr;
&ds->trns_case, &case_nr);
if (retval != TRNS_CONTINUE)
continue;
-
- /* Write case to LAG queue. */
- if (ds->n_lag)
- lag_case (ds, &ds->trns_case);
+
+ /* Write case to collection of lagged cases. */
+ if (ds->n_lag > 0)
+ {
+ while (casedeque_count (&ds->lagged_cases) >= ds->n_lag)
+ case_destroy (casedeque_pop_back (&ds->lagged_cases));
+ case_clone (casedeque_push_front (&ds->lagged_cases),
+ &ds->trns_case);
+ }
/* Write case to replacement active file. */
ds->cases_written++;
- if (ds->proc_sink->class->write != NULL)
+ if (ds->proc_sink->class->write != NULL)
{
- if (ds->compactor != NULL)
+ if (ds->compactor != NULL)
{
dict_compactor_compact (ds->compactor, &ds->sink_case,
&ds->trns_case);
else
ds->proc_sink->class->write (ds->proc_sink, &ds->trns_case);
}
-
+
/* Execute temporary transformations. */
- if (ds->temporary_trns_chain != NULL)
+ if (ds->temporary_trns_chain != NULL)
{
retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
&ds->trns_case, &ds->cases_written);
}
*c = &ds->trns_case;
- return true;
+ return true;
}
}
If DS has not been opened, returns true without doing
anything else. */
bool
-proc_close (struct dataset *ds)
+proc_close (struct dataset *ds)
{
if (!ds->is_open)
return true;
/* Drain any remaining cases. */
- while (ds->ok)
+ while (ds->ok)
{
struct ccase *c;
if (!proc_read (ds, &c))
- break;
+ break;
}
ds->ok = free_case_source (ds->proc_source) && ds->ok;
- ds->proc_source = NULL;
+ proc_set_source (ds, NULL);
case_destroy (&ds->sink_case);
case_destroy (&ds->trns_case);
/* Updates last_proc_invocation. */
static void
-update_last_proc_invocation (struct dataset *ds)
+update_last_proc_invocation (struct dataset *ds)
{
ds->last_proc_invocation = time (NULL);
}
size_t i;
case_create (trns_case, dict_get_next_value_idx (dict));
- for (i = 0; i < var_cnt; i++)
+ for (i = 0; i < var_cnt; i++)
{
struct variable *v = dict_get_var (dict, i);
union value *value = case_data_rw (trns_case, v);
ds->permanent_dict = ds->dict;
/* Figure out whether to compact. */
- ds->compactor =
+ ds->compactor =
(dict_compacting_would_shrink (ds->permanent_dict)
? dict_make_compactor (ds->permanent_dict)
: NULL);
if (ds->proc_sink->class->open != NULL)
ds->proc_sink->class->open (ds->proc_sink);
- /* Allocate memory for lag queue. */
- if (ds->n_lag > 0)
- {
- int i;
-
- ds->lag_count = 0;
- ds->lag_head = 0;
- ds->lag_queue = xnmalloc (ds->n_lag, sizeof *ds->lag_queue);
- for (i = 0; i < ds->n_lag; i++)
- case_nullify (&ds->lag_queue[i]);
- }
-}
-
-/* Add C to the lag queue. */
-static void
-lag_case (struct dataset *ds, const struct ccase *c)
-{
- if (ds->lag_count < ds->n_lag)
- ds->lag_count++;
- case_destroy (&ds->lag_queue[ds->lag_head]);
- case_clone (&ds->lag_queue[ds->lag_head], c);
- if (++ds->lag_head >= ds->n_lag)
- ds->lag_head = 0;
+ /* Allocate memory for lagged cases. */
+ casedeque_init (&ds->lagged_cases, ds->n_lag);
}
/* Clears the variables in C that need to be cleared between
{
size_t var_cnt = dict_get_var_cnt (ds->dict);
size_t i;
-
- for (i = 0; i < var_cnt; i++)
+
+ for (i = 0; i < var_cnt; i++)
{
struct variable *v = dict_get_var (ds->dict, i);
- if (!var_get_leave (v))
+ if (!var_get_leave (v))
{
if (var_is_numeric (v))
- case_data_rw (c, v)->f = SYSMIS;
+ case_data_rw (c, v)->f = SYSMIS;
else
memset (case_data_rw (c, v)->s, ' ', var_get_width (v));
- }
+ }
}
}
static bool
close_active_file (struct dataset *ds)
{
- /* Free memory for lag queue, and turn off lagging. */
- if (ds->n_lag > 0)
- {
- int i;
-
- for (i = 0; i < ds->n_lag; i++)
- case_destroy (&ds->lag_queue[i]);
- free (ds->lag_queue);
- ds->n_lag = 0;
- }
-
+ /* Free memory for lagged cases. */
+ while (!casedeque_is_empty (&ds->lagged_cases))
+ case_destroy (casedeque_pop_back (&ds->lagged_cases));
+ casedeque_destroy (&ds->lagged_cases);
+
/* Dictionary from before TEMPORARY becomes permanent. */
proc_cancel_temporary_transformations (ds);
/* Finish compacting. */
- if (ds->compactor != NULL)
+ if (ds->compactor != NULL)
{
dict_compactor_destroy (ds->compactor);
dict_compact_values (ds->dict);
ds->compactor = NULL;
}
-
+
/* Old data sink becomes new data source. */
if (ds->proc_sink->class->make_source != NULL)
- ds->proc_source = ds->proc_sink->class->make_source (ds->proc_sink);
+ proc_set_source (ds, ds->proc_sink->class->make_source (ds->proc_sink) );
free_case_sink (ds->proc_sink);
ds->proc_sink = NULL;
struct ccase *
lagged_case (const struct dataset *ds, int n_before)
{
- assert (n_before >= 1 );
+ assert (n_before >= 1);
assert (n_before <= ds->n_lag);
- if (n_before <= ds->lag_count)
- {
- int index = ds->lag_head - n_before;
- if (index < 0)
- index += ds->n_lag;
- return &ds->lag_queue[index];
- }
+ if (n_before <= casedeque_count (&ds->lagged_cases))
+ return casedeque_front (&ds->lagged_cases, n_before - 1);
else
return NULL;
}
/* Procedure that separates the data into SPLIT FILE groups. */
/* Represents auxiliary data for handling SPLIT FILE. */
-struct split_aux_data
+struct split_aux_data
{
struct dataset *dataset; /* The dataset */
struct ccase prev_case; /* Data in previous case. */
/* Callback functions. */
- begin_func *begin;
+ begin_func *begin;
case_func *proc;
end_func *end;
void *func_aux;
passed to each of the functions as auxiliary data.
If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
- and END_FUNC will be called at all.
+ and END_FUNC will be called at all.
If SPLIT FILE is not in effect, then there is one break group
(if the active file is nonempty), and BEGIN_FUNC and END_FUNC
will be called once.
-
+
Returns true if successful, false if an I/O error occurred. */
bool
procedure_with_splits (struct dataset *ds,
- begin_func begin,
+ begin_func begin,
case_func *proc,
end_func *end,
- void *func_aux)
+ void *func_aux)
{
struct split_aux_data split_aux;
bool ok;
/* Case callback used by procedure_with_splits(). */
static bool
-split_procedure_case_func (const struct ccase *c, void *split_aux_, const struct dataset *ds)
+split_procedure_case_func (const struct ccase *c, void *split_aux_, const struct dataset *ds)
{
struct split_aux_data *split_aux = split_aux_;
/* End-of-file callback used by procedure_with_splits(). */
static bool
-split_procedure_end_func (void *split_aux_, const struct dataset *ds)
+split_procedure_end_func (void *split_aux_, const struct dataset *ds)
{
struct split_aux_data *split_aux = split_aux_;
/* Compares the SPLIT FILE variables in cases A and B and returns
nonzero only if they differ. */
static int
-equal_splits (const struct ccase *a, const struct ccase *b,
- const struct dataset *ds)
+equal_splits (const struct ccase *a, const struct ccase *b,
+ const struct dataset *ds)
{
return case_compare (a, b,
dict_get_split_vars (ds->dict),
/* Represents auxiliary data for handling SPLIT FILE in a
multipass procedure. */
-struct multipass_split_aux_data
+struct multipass_split_aux_data
{
struct dataset *dataset; /* The dataset of the split */
struct ccase prev_case; /* Data in previous case. */
struct casefile *casefile; /* Accumulates data for a split. */
- split_func *split; /* Function to call with the accumulated
+ split_func *split; /* Function to call with the accumulated
data. */
- void *func_aux; /* Auxiliary data. */
+ void *func_aux; /* Auxiliary data. */
};
static bool multipass_split_case_func (const struct ccase *c, void *aux_, const struct dataset *);
/* Returns true if successful, false if an I/O error occurred. */
bool
-multipass_procedure_with_splits (struct dataset *ds,
+multipass_procedure_with_splits (struct dataset *ds,
split_func *split,
void *func_aux)
{
ok = multipass_split_output (aux, ds);
/* Start a new casefile. */
- aux->casefile =
+ aux->casefile =
ds->cf_factory->create_casefile (ds->cf_factory,
dict_get_next_value_idx (ds->dict));
}
multipass_split_output (struct multipass_split_aux_data *aux, const struct dataset *ds)
{
bool ok;
-
+
assert (aux->casefile != NULL);
ok = aux->split (&aux->prev_case, aux->casefile, aux->func_aux, ds);
casefile_destroy (aux->casefile);
fh_set_default_handle (NULL);
ds->n_lag = 0;
-
+
free_case_source (ds->proc_source);
- ds->proc_source = NULL;
+ proc_set_source (ds, NULL);
proc_cancel_all_transformations (ds);
}
and clears the permanent transformations.
For use by INPUT PROGRAM. */
struct trns_chain *
-proc_capture_transformations (struct dataset *ds)
+proc_capture_transformations (struct dataset *ds)
{
struct trns_chain *chain;
-
+
assert (ds->temporary_trns_chain == NULL);
chain = ds->permanent_trns_chain;
ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
FINALIZE will be called.
The functions are passed AUX as auxiliary data. */
void
-add_transformation_with_finalizer (struct dataset *ds,
+add_transformation_with_finalizer (struct dataset *ds,
trns_finalize_func *finalize,
trns_proc_func *proc,
trns_free_func *free, void *aux)
This value can be returned by a transformation procedure
function to indicate a "jump" to that transformation. */
size_t
-next_transformation (const struct dataset *ds)
+next_transformation (const struct dataset *ds)
{
return trns_chain_next (ds->cur_trns_chain);
}
a temporary transformation, false if it will add a permanent
transformation. */
bool
-proc_in_temporary_transformations (const struct dataset *ds)
+proc_in_temporary_transformations (const struct dataset *ds)
{
return ds->temporary_trns_chain != NULL;
}
Further calls to add_transformation() will add temporary
transformations. */
void
-proc_start_temporary_transformations (struct dataset *ds)
+proc_start_temporary_transformations (struct dataset *ds)
{
if (!proc_in_temporary_transformations (ds))
{
add_case_limit_trns (ds);
ds->permanent_dict = dict_clone (ds->dict);
+
trns_chain_finalize (ds->permanent_trns_chain);
ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
}
permanent.
Returns true if anything changed, false otherwise. */
bool
-proc_make_temporary_transformations_permanent (struct dataset *ds)
+proc_make_temporary_transformations_permanent (struct dataset *ds)
{
- if (proc_in_temporary_transformations (ds))
+ if (proc_in_temporary_transformations (ds))
{
trns_chain_finalize (ds->temporary_trns_chain);
trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
transformations will be permanent.
Returns true if anything changed, false otherwise. */
bool
-proc_cancel_temporary_transformations (struct dataset *ds)
+proc_cancel_temporary_transformations (struct dataset *ds)
{
- if (proc_in_temporary_transformations (ds))
+ if (proc_in_temporary_transformations (ds))
{
- dict_destroy (ds->dict);
- ds->dict = ds->permanent_dict;
+ dataset_set_dict (ds, ds->permanent_dict);
ds->permanent_dict = NULL;
trns_chain_destroy (ds->temporary_trns_chain);
\f
/* Initializes procedure handling. */
struct dataset *
-create_dataset (struct casefile_factory *fact)
+create_dataset (struct casefile_factory *fact,
+ replace_source_callback *rps,
+ replace_dictionary_callback *rds
+ )
{
struct dataset *ds = xzalloc (sizeof(*ds));
ds->dict = dict_create ();
ds->cf_factory = fact;
+ ds->replace_source = rps;
+ ds->replace_dict = rds;
proc_cancel_all_transformations (ds);
return ds;
}
/* Sets SINK as the destination for procedure output from the
next procedure. */
void
-proc_set_sink (struct dataset *ds, struct case_sink *sink)
+proc_set_sink (struct dataset *ds, struct case_sink *sink)
{
assert (ds->proc_sink == NULL);
ds->proc_sink = sink;
/* Sets SOURCE as the source for procedure input for the next
procedure. */
void
-proc_set_source (struct dataset *ds, struct case_source *source)
+proc_set_source (struct dataset *ds, struct case_source *source)
{
- assert (ds->proc_source == NULL);
ds->proc_source = source;
+
+ if ( ds->replace_source )
+ ds->replace_source (ds->proc_source);
}
/* Returns true if a source for the next procedure has been
configured, false otherwise. */
bool
-proc_has_source (const struct dataset *ds)
+proc_has_source (const struct dataset *ds)
{
return ds->proc_source != NULL;
}
The returned casefile is owned by the caller; it will not be
automatically used for the next procedure's input. */
struct casefile *
-proc_capture_output (struct dataset *ds)
+proc_capture_output (struct dataset *ds)
{
struct casefile *casefile;
assert (!proc_in_temporary_transformations (ds));
casefile = storage_source_decapsulate (ds->proc_source);
- ds->proc_source = NULL;
+ proc_set_source (ds, NULL);
return casefile;
}
/* Adds a transformation that limits the number of cases that may
pass through, if DS->DICT has a case limit. */
static void
-add_case_limit_trns (struct dataset *ds)
+add_case_limit_trns (struct dataset *ds)
{
size_t case_limit = dict_get_case_limit (ds->dict);
if (case_limit != 0)
*CASES_REMAINING. */
static int
case_limit_trns_proc (void *cases_remaining_,
- struct ccase *c UNUSED, casenumber case_nr UNUSED)
+ struct ccase *c UNUSED, casenumber case_nr UNUSED)
{
size_t *cases_remaining = cases_remaining_;
- if (*cases_remaining > 0)
+ if (*cases_remaining > 0)
{
(*cases_remaining)--;
return TRNS_CONTINUE;
/* Frees the data associated with a case limit transformation. */
static bool
-case_limit_trns_free (void *cases_remaining_)
+case_limit_trns_free (void *cases_remaining_)
{
size_t *cases_remaining = cases_remaining_;
free (cases_remaining);
/* Adds a temporary transformation to filter data according to
the variable specified on FILTER, if any. */
static void
-add_filter_trns (struct dataset *ds)
+add_filter_trns (struct dataset *ds)
{
struct variable *filter_var = dict_get_filter (ds->dict);
- if (filter_var != NULL)
+ if (filter_var != NULL)
{
proc_start_temporary_transformations (ds);
add_transformation (ds, filter_trns_proc, NULL, filter_var);
/* FILTER transformation. */
static int
filter_trns_proc (void *filter_var_,
- struct ccase *c UNUSED, casenumber case_nr UNUSED)
-
+ struct ccase *c UNUSED, casenumber case_nr UNUSED)
+
{
struct variable *filter_var = filter_var_;
double f = case_num (c, filter_var);
- return (f != 0.0 && !var_is_num_missing (filter_var, f)
+ return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
? TRNS_CONTINUE : TRNS_DROP_CASE);
}
}
-void
+/* Set or replace dataset DS's dictionary with DICT.
+ The old dictionary is destroyed */
+void
dataset_set_dict (struct dataset *ds, struct dictionary *dict)
{
+ struct dictionary *old_dict = ds->dict;
+
+ dict_copy_callbacks (dict, ds->dict);
ds->dict = dict;
-}
-int
-dataset_n_lag (const struct dataset *ds)
-{
- return ds->n_lag;
+ if ( ds->replace_dict )
+ ds->replace_dict (dict);
+
+ dict_destroy (old_dict);
}
void
-dataset_set_n_lag (struct dataset *ds, int n_lag)
+dataset_need_lag (struct dataset *ds, int n_before)
{
- ds->n_lag = n_lag;
+ ds->n_lag = MAX (ds->n_lag, n_before);
}
-
struct casefile_factory *
dataset_get_casefile_factory (const struct dataset *ds)
{