/* PSPP - computes sample statistics.
Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
- Written by Ben Pfaff <blp@gnu.org>.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License as
#include <libpspp/misc.h>
#include <libpspp/str.h>
-/* Procedure execution data. */
-struct write_case_data
- {
- /* Function to call for each case. */
- case_func *proc;
- void *aux;
+struct dataset {
- struct dataset *dataset; /* The dataset concerned */
- struct ccase trns_case; /* Case used for transformations. */
- struct ccase sink_case; /* Case written to sink, if
- compacting is necessary. */
- size_t cases_written; /* Cases output so far. */
- };
+ /* An abstract factory which creates casefiles */
+ struct casefile_factory *cf_factory;
+
+ /* Callback which occurs when a procedure provides a new source for
+ the dataset */
+ replace_source_callback *replace_source ;
+
+ /* Callback which occurs whenever the DICT is replaced by a new one */
+ replace_dictionary_callback *replace_dict;
-struct dataset {
/* Cases are read from proc_source,
pass through permanent_trns_chain (which transforms them into
the format described by permanent_dict),
int lag_head; /* Index where next case will be added. */
struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
+ /* Procedure data. */
+ bool is_open; /* Procedure open? */
+ struct ccase trns_case; /* Case used for transformations. */
+ struct ccase sink_case; /* Case written to sink, if
+ compacting is necessary. */
+ size_t cases_written; /* Cases output so far. */
+ bool ok;
}; /* struct dataset */
static void update_last_proc_invocation (struct dataset *ds);
static void create_trns_case (struct ccase *, struct dictionary *);
static void open_active_file (struct dataset *ds);
-static bool write_case (struct write_case_data *wc_data);
static void lag_case (struct dataset *ds, const struct ccase *c);
static void clear_case (const struct dataset *ds, struct ccase *c);
static bool close_active_file (struct dataset *ds);
start the next case from step 1.
2. Write case to replacement active file.
-
+
3. Execute temporary transformations. If these drop the case,
start the next case from step 1.
-
+
4. Pass case to PROC_FUNC, passing AUX as auxiliary data.
Returns true if successful, false if an I/O error occurred. */
bool
procedure (struct dataset *ds, case_func *cf, void *aux)
{
+ update_last_proc_invocation (ds);
+
+ /* Optimize the trivial case where we're not going to do
+ anything with the data, by not reading the data at all. */
+ if (cf == NULL
+ && case_source_is_class (ds->proc_source, &storage_source_class)
+ && ds->proc_sink == NULL
+ && (ds->temporary_trns_chain == NULL
+ || trns_chain_is_empty (ds->temporary_trns_chain))
+ && trns_chain_is_empty (ds->permanent_trns_chain))
+ {
+ ds->n_lag = 0;
+ dict_set_case_limit (ds->dict, 0);
+ dict_clear_vectors (ds->dict);
+ return true;
+ }
+
return internal_procedure (ds, cf, NULL, aux);
}
\f
/* Multipass procedure. */
-struct multipass_aux_data
+struct multipass_aux_data
{
struct casefile *casefile;
-
+
bool (*proc_func) (const struct casefile *, void *aux);
void *aux;
};
/* Case processing function for multipass_procedure(). */
static bool
-multipass_case_func (const struct ccase *c, void *aux_data_, const struct dataset *ds UNUSED)
+multipass_case_func (const struct ccase *c, void *aux_data_, const struct dataset *ds UNUSED)
{
struct multipass_aux_data *aux_data = aux_data_;
return casefile_append (aux_data->casefile, c);
/* End-of-file function for multipass_procedure(). */
static bool
-multipass_end_func (void *aux_data_, const struct dataset *ds UNUSED)
+multipass_end_func (void *aux_data_, const struct dataset *ds UNUSED)
{
struct multipass_aux_data *aux_data = aux_data_;
return (aux_data->proc_func == NULL
The entire active file is passed to PROC_FUNC, with the given
AUX as auxiliary data, as a unit. */
bool
-multipass_procedure (struct dataset *ds, casefile_func *proc_func, void *aux)
+multipass_procedure (struct dataset *ds, casefile_func *proc_func, void *aux)
{
struct multipass_aux_data aux_data;
bool ok;
- aux_data.casefile = fastfile_create (dict_get_next_value_idx (ds->dict));
+ aux_data.casefile =
+ ds->cf_factory->create_casefile (ds->cf_factory,
+ dict_get_next_value_idx (ds->dict));
+
aux_data.proc_func = proc_func;
aux_data.aux = aux;
return ok;
}
\f
-/* Procedure implementation. */
+/* Procedure implementation. */
/* Executes a procedure.
Passes each case to CASE_FUNC.
static bool
internal_procedure (struct dataset *ds, case_func *proc,
end_func *end,
- void *aux)
+ void *aux)
{
- struct write_case_data wc_data;
+ struct ccase *c;
bool ok = true;
+ proc_open (ds);
+ while (ok && proc_read (ds, &c))
+ if (proc != NULL)
+ ok = proc (c, aux, ds) && ok;
+ if (end != NULL)
+ ok = end (aux, ds) && ok;
+
+ if ( proc_close (ds) && ok )
+ {
+ if ( ds->replace_source )
+ ds->replace_source (ds->proc_source);
+
+ return true;
+ }
+
+ return false;
+}
+
+/* Opens dataset DS for reading cases with proc_read.
+ proc_close must be called when done. */
+void
+proc_open (struct dataset *ds)
+{
assert (ds->proc_source != NULL);
+ assert (!ds->is_open);
update_last_proc_invocation (ds);
- /* Optimize the trivial case where we're not going to do
- anything with the data, by not reading the data at all. */
- if (proc == NULL && end == NULL
- && case_source_is_class (ds->proc_source, &storage_source_class)
- && ds->proc_sink == NULL
- && (ds->temporary_trns_chain == NULL
- || trns_chain_is_empty (ds->temporary_trns_chain))
- && trns_chain_is_empty (ds->permanent_trns_chain))
+ open_active_file (ds);
+
+ ds->is_open = true;
+ create_trns_case (&ds->trns_case, ds->dict);
+ case_create (&ds->sink_case, dict_get_compacted_value_cnt (ds->dict));
+ ds->cases_written = 0;
+ ds->ok = true;
+}
+
+/* Reads the next case from dataset DS, which must have been
+ opened for reading with proc_open.
+ Returns true if successful, in which case a pointer to the
+ case is stored in *C.
+ Return false at end of file or if a read error occurs. In
+ this case a null pointer is stored in *C. */
+bool
+proc_read (struct dataset *ds, struct ccase **c)
+{
+ enum trns_result retval = TRNS_DROP_CASE;
+
+ assert (ds->is_open);
+ *c = NULL;
+ for (;;)
{
- ds->n_lag = 0;
- dict_set_case_limit (ds->dict, 0);
- dict_clear_vectors (ds->dict);
+ size_t case_nr;
+
+ assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
+ if (retval == TRNS_ERROR)
+ ds->ok = false;
+ if (!ds->ok)
+ return false;
+
+ /* Read a case from proc_source. */
+ clear_case (ds, &ds->trns_case);
+ if (!ds->proc_source->class->read (ds->proc_source, &ds->trns_case))
+ return false;
+
+ /* Execute permanent transformations. */
+ case_nr = ds->cases_written + 1;
+ retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
+ &ds->trns_case, &case_nr);
+ if (retval != TRNS_CONTINUE)
+ continue;
+
+ /* Write case to LAG queue. */
+ if (ds->n_lag)
+ lag_case (ds, &ds->trns_case);
+
+ /* Write case to replacement active file. */
+ ds->cases_written++;
+ if (ds->proc_sink->class->write != NULL)
+ {
+ if (ds->compactor != NULL)
+ {
+ dict_compactor_compact (ds->compactor, &ds->sink_case,
+ &ds->trns_case);
+ ds->proc_sink->class->write (ds->proc_sink, &ds->sink_case);
+ }
+ else
+ ds->proc_sink->class->write (ds->proc_sink, &ds->trns_case);
+ }
+
+ /* Execute temporary transformations. */
+ if (ds->temporary_trns_chain != NULL)
+ {
+ retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
+ &ds->trns_case, &ds->cases_written);
+ if (retval != TRNS_CONTINUE)
+ continue;
+ }
+
+ *c = &ds->trns_case;
return true;
}
-
- open_active_file (ds);
-
- wc_data.proc = proc;
- wc_data.aux = aux;
- wc_data.dataset = ds;
- create_trns_case (&wc_data.trns_case, ds->dict);
- case_create (&wc_data.sink_case,
- dict_get_compacted_value_cnt (ds->dict));
- wc_data.cases_written = 0;
-
- ok = ds->proc_source->class->read (ds->proc_source,
- &wc_data.trns_case,
- write_case, &wc_data) && ok;
- if (end != NULL)
- ok = end (aux, ds) && ok;
+}
+
+/* Closes dataset DS for reading.
+ Returns true if successful, false if an I/O error occurred
+ while reading or closing the data set.
+ If DS has not been opened, returns true without doing
+ anything else. */
+bool
+proc_close (struct dataset *ds)
+{
+ if (!ds->is_open)
+ return true;
+
+ /* Drain any remaining cases. */
+ while (ds->ok)
+ {
+ struct ccase *c;
+ if (!proc_read (ds, &c))
+ break;
+ }
+ ds->ok = free_case_source (ds->proc_source) && ds->ok;
+ ds->proc_source = NULL;
- case_destroy (&wc_data.sink_case);
- case_destroy (&wc_data.trns_case);
+ case_destroy (&ds->sink_case);
+ case_destroy (&ds->trns_case);
- ok = close_active_file (ds) && ok;
+ ds->ok = close_active_file (ds) && ds->ok;
+ ds->is_open = false;
- return ok;
+ return ds->ok;
}
/* Updates last_proc_invocation. */
static void
-update_last_proc_invocation (struct dataset *ds)
+update_last_proc_invocation (struct dataset *ds)
{
ds->last_proc_invocation = time (NULL);
}
size_t i;
case_create (trns_case, dict_get_next_value_idx (dict));
- for (i = 0; i < var_cnt; i++)
+ for (i = 0; i < var_cnt; i++)
{
struct variable *v = dict_get_var (dict, i);
union value *value = case_data_rw (trns_case, v);
ds->permanent_dict = ds->dict;
/* Figure out whether to compact. */
- ds->compactor =
+ ds->compactor =
(dict_compacting_would_shrink (ds->permanent_dict)
? dict_make_compactor (ds->permanent_dict)
: NULL);
/* Prepare sink. */
if (ds->proc_sink == NULL)
- ds->proc_sink = create_case_sink (&storage_sink_class, ds->permanent_dict, NULL);
+ ds->proc_sink = create_case_sink (&storage_sink_class,
+ ds->permanent_dict,
+ ds->cf_factory,
+ NULL);
if (ds->proc_sink->class->open != NULL)
ds->proc_sink->class->open (ds->proc_sink);
if (ds->n_lag > 0)
{
int i;
-
+
ds->lag_count = 0;
ds->lag_head = 0;
ds->lag_queue = xnmalloc (ds->n_lag, sizeof *ds->lag_queue);
}
}
-/* Transforms trns_case and writes it to the replacement active
- file if advisable. Returns true if more cases can be
- accepted, false otherwise. Do not call this function again
- after it has returned false once. */
-static bool
-write_case (struct write_case_data *wc_data)
-{
- enum trns_result retval;
- size_t case_nr;
-
- struct dataset *ds = wc_data->dataset;
-
- /* Execute permanent transformations. */
- case_nr = wc_data->cases_written + 1;
- retval = trns_chain_execute (ds->permanent_trns_chain,
- &wc_data->trns_case, &case_nr);
- if (retval != TRNS_CONTINUE)
- goto done;
-
- /* Write case to LAG queue. */
- if (ds->n_lag)
- lag_case (ds, &wc_data->trns_case);
-
- /* Write case to replacement active file. */
- wc_data->cases_written++;
- if (ds->proc_sink->class->write != NULL)
- {
- if (ds->compactor != NULL)
- {
- dict_compactor_compact (ds->compactor, &wc_data->sink_case,
- &wc_data->trns_case);
- ds->proc_sink->class->write (ds->proc_sink, &wc_data->sink_case);
- }
- else
- ds->proc_sink->class->write (ds->proc_sink, &wc_data->trns_case);
- }
-
- /* Execute temporary transformations. */
- if (ds->temporary_trns_chain != NULL)
- {
- retval = trns_chain_execute (ds->temporary_trns_chain,
- &wc_data->trns_case,
- &wc_data->cases_written);
- if (retval != TRNS_CONTINUE)
- goto done;
- }
-
- /* Pass case to procedure. */
- if (wc_data->proc != NULL)
- if (!wc_data->proc (&wc_data->trns_case, wc_data->aux, ds))
- retval = TRNS_ERROR;
-
- done:
- clear_case (ds, &wc_data->trns_case);
- return retval != TRNS_ERROR;
-}
-
/* Add C to the lag queue. */
static void
lag_case (struct dataset *ds, const struct ccase *c)
{
size_t var_cnt = dict_get_var_cnt (ds->dict);
size_t i;
-
- for (i = 0; i < var_cnt; i++)
+
+ for (i = 0; i < var_cnt; i++)
{
struct variable *v = dict_get_var (ds->dict, i);
- if (!var_get_leave (v))
+ if (!var_get_leave (v))
{
if (var_is_numeric (v))
- case_data_rw (c, v)->f = SYSMIS;
+ case_data_rw (c, v)->f = SYSMIS;
else
memset (case_data_rw (c, v)->s, ' ', var_get_width (v));
- }
+ }
}
}
if (ds->n_lag > 0)
{
int i;
-
+
for (i = 0; i < ds->n_lag; i++)
case_destroy (&ds->lag_queue[i]);
free (ds->lag_queue);
ds->n_lag = 0;
}
-
+
/* Dictionary from before TEMPORARY becomes permanent. */
proc_cancel_temporary_transformations (ds);
/* Finish compacting. */
- if (ds->compactor != NULL)
+ if (ds->compactor != NULL)
{
dict_compactor_destroy (ds->compactor);
dict_compact_values (ds->dict);
ds->compactor = NULL;
}
-
- /* Free data source. */
- free_case_source (ds->proc_source);
- ds->proc_source = NULL;
-
+
/* Old data sink becomes new data source. */
if (ds->proc_sink->class->make_source != NULL)
ds->proc_source = ds->proc_sink->class->make_source (ds->proc_sink);
/* Procedure that separates the data into SPLIT FILE groups. */
/* Represents auxiliary data for handling SPLIT FILE. */
-struct split_aux_data
+struct split_aux_data
{
struct dataset *dataset; /* The dataset */
struct ccase prev_case; /* Data in previous case. */
/* Callback functions. */
- begin_func *begin;
+ begin_func *begin;
case_func *proc;
end_func *end;
void *func_aux;
passed to each of the functions as auxiliary data.
If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
- and END_FUNC will be called at all.
+ and END_FUNC will be called at all.
If SPLIT FILE is not in effect, then there is one break group
(if the active file is nonempty), and BEGIN_FUNC and END_FUNC
will be called once.
-
+
Returns true if successful, false if an I/O error occurred. */
bool
procedure_with_splits (struct dataset *ds,
- begin_func begin,
+ begin_func begin,
case_func *proc,
end_func *end,
- void *func_aux)
+ void *func_aux)
{
struct split_aux_data split_aux;
bool ok;
/* Case callback used by procedure_with_splits(). */
static bool
-split_procedure_case_func (const struct ccase *c, void *split_aux_, const struct dataset *ds)
+split_procedure_case_func (const struct ccase *c, void *split_aux_, const struct dataset *ds)
{
struct split_aux_data *split_aux = split_aux_;
/* End-of-file callback used by procedure_with_splits(). */
static bool
-split_procedure_end_func (void *split_aux_, const struct dataset *ds)
+split_procedure_end_func (void *split_aux_, const struct dataset *ds)
{
struct split_aux_data *split_aux = split_aux_;
/* Compares the SPLIT FILE variables in cases A and B and returns
nonzero only if they differ. */
static int
-equal_splits (const struct ccase *a, const struct ccase *b,
- const struct dataset *ds)
+equal_splits (const struct ccase *a, const struct ccase *b,
+ const struct dataset *ds)
{
return case_compare (a, b,
dict_get_split_vars (ds->dict),
/* Represents auxiliary data for handling SPLIT FILE in a
multipass procedure. */
-struct multipass_split_aux_data
+struct multipass_split_aux_data
{
struct dataset *dataset; /* The dataset of the split */
struct ccase prev_case; /* Data in previous case. */
struct casefile *casefile; /* Accumulates data for a split. */
- split_func *split; /* Function to call with the accumulated
+ split_func *split; /* Function to call with the accumulated
data. */
- void *func_aux; /* Auxiliary data. */
+ void *func_aux; /* Auxiliary data. */
};
static bool multipass_split_case_func (const struct ccase *c, void *aux_, const struct dataset *);
/* Returns true if successful, false if an I/O error occurred. */
bool
-multipass_procedure_with_splits (struct dataset *ds,
+multipass_procedure_with_splits (struct dataset *ds,
split_func *split,
void *func_aux)
{
ok = multipass_split_output (aux, ds);
/* Start a new casefile. */
- aux->casefile =
- fastfile_create (dict_get_next_value_idx (ds->dict));
+ aux->casefile =
+ ds->cf_factory->create_casefile (ds->cf_factory,
+ dict_get_next_value_idx (ds->dict));
}
return casefile_append (aux->casefile, c) && ok;
multipass_split_output (struct multipass_split_aux_data *aux, const struct dataset *ds)
{
bool ok;
-
+
assert (aux->casefile != NULL);
ok = aux->split (&aux->prev_case, aux->casefile, aux->func_aux, ds);
casefile_destroy (aux->casefile);
fh_set_default_handle (NULL);
ds->n_lag = 0;
-
+
free_case_source (ds->proc_source);
ds->proc_source = NULL;
+ if ( ds->replace_source )
+ ds->replace_source (ds->proc_source);
+
proc_cancel_all_transformations (ds);
}
and clears the permanent transformations.
For use by INPUT PROGRAM. */
struct trns_chain *
-proc_capture_transformations (struct dataset *ds)
+proc_capture_transformations (struct dataset *ds)
{
struct trns_chain *chain;
-
+
assert (ds->temporary_trns_chain == NULL);
chain = ds->permanent_trns_chain;
ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
FINALIZE will be called.
The functions are passed AUX as auxiliary data. */
void
-add_transformation_with_finalizer (struct dataset *ds,
+add_transformation_with_finalizer (struct dataset *ds,
trns_finalize_func *finalize,
trns_proc_func *proc,
trns_free_func *free, void *aux)
This value can be returned by a transformation procedure
function to indicate a "jump" to that transformation. */
size_t
-next_transformation (const struct dataset *ds)
+next_transformation (const struct dataset *ds)
{
return trns_chain_next (ds->cur_trns_chain);
}
a temporary transformation, false if it will add a permanent
transformation. */
bool
-proc_in_temporary_transformations (const struct dataset *ds)
+proc_in_temporary_transformations (const struct dataset *ds)
{
return ds->temporary_trns_chain != NULL;
}
Further calls to add_transformation() will add temporary
transformations. */
void
-proc_start_temporary_transformations (struct dataset *ds)
+proc_start_temporary_transformations (struct dataset *ds)
{
if (!proc_in_temporary_transformations (ds))
{
permanent.
Returns true if anything changed, false otherwise. */
bool
-proc_make_temporary_transformations_permanent (struct dataset *ds)
+proc_make_temporary_transformations_permanent (struct dataset *ds)
{
- if (proc_in_temporary_transformations (ds))
+ if (proc_in_temporary_transformations (ds))
{
trns_chain_finalize (ds->temporary_trns_chain);
trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
transformations will be permanent.
Returns true if anything changed, false otherwise. */
bool
-proc_cancel_temporary_transformations (struct dataset *ds)
+proc_cancel_temporary_transformations (struct dataset *ds)
{
- if (proc_in_temporary_transformations (ds))
+ if (proc_in_temporary_transformations (ds))
{
dict_destroy (ds->dict);
ds->dict = ds->permanent_dict;
\f
/* Initializes procedure handling. */
struct dataset *
-create_dataset (void)
+create_dataset (struct casefile_factory *fact,
+ replace_source_callback *rps,
+ replace_dictionary_callback *rds
+ )
{
struct dataset *ds = xzalloc (sizeof(*ds));
ds->dict = dict_create ();
+ ds->cf_factory = fact;
+ ds->replace_source = rps;
+ ds->replace_dict = rds;
proc_cancel_all_transformations (ds);
return ds;
}
/* Sets SINK as the destination for procedure output from the
next procedure. */
void
-proc_set_sink (struct dataset *ds, struct case_sink *sink)
+proc_set_sink (struct dataset *ds, struct case_sink *sink)
{
assert (ds->proc_sink == NULL);
ds->proc_sink = sink;
/* Sets SOURCE as the source for procedure input for the next
procedure. */
void
-proc_set_source (struct dataset *ds, struct case_source *source)
+proc_set_source (struct dataset *ds, struct case_source *source)
{
assert (ds->proc_source == NULL);
ds->proc_source = source;
/* Returns true if a source for the next procedure has been
configured, false otherwise. */
bool
-proc_has_source (const struct dataset *ds)
+proc_has_source (const struct dataset *ds)
{
return ds->proc_source != NULL;
}
The returned casefile is owned by the caller; it will not be
automatically used for the next procedure's input. */
struct casefile *
-proc_capture_output (struct dataset *ds)
+proc_capture_output (struct dataset *ds)
{
struct casefile *casefile;
/* Adds a transformation that limits the number of cases that may
pass through, if DS->DICT has a case limit. */
static void
-add_case_limit_trns (struct dataset *ds)
+add_case_limit_trns (struct dataset *ds)
{
size_t case_limit = dict_get_case_limit (ds->dict);
if (case_limit != 0)
*CASES_REMAINING. */
static int
case_limit_trns_proc (void *cases_remaining_,
- struct ccase *c UNUSED, casenumber case_nr UNUSED)
+ struct ccase *c UNUSED, casenumber case_nr UNUSED)
{
size_t *cases_remaining = cases_remaining_;
- if (*cases_remaining > 0)
+ if (*cases_remaining > 0)
{
(*cases_remaining)--;
return TRNS_CONTINUE;
/* Frees the data associated with a case limit transformation. */
static bool
-case_limit_trns_free (void *cases_remaining_)
+case_limit_trns_free (void *cases_remaining_)
{
size_t *cases_remaining = cases_remaining_;
free (cases_remaining);
/* Adds a temporary transformation to filter data according to
the variable specified on FILTER, if any. */
static void
-add_filter_trns (struct dataset *ds)
+add_filter_trns (struct dataset *ds)
{
struct variable *filter_var = dict_get_filter (ds->dict);
- if (filter_var != NULL)
+ if (filter_var != NULL)
{
proc_start_temporary_transformations (ds);
add_transformation (ds, filter_trns_proc, NULL, filter_var);
/* FILTER transformation. */
static int
filter_trns_proc (void *filter_var_,
- struct ccase *c UNUSED, casenumber case_nr UNUSED)
-
+ struct ccase *c UNUSED, casenumber case_nr UNUSED)
+
{
struct variable *filter_var = filter_var_;
double f = case_num (c, filter_var);
- return (f != 0.0 && !var_is_num_missing (filter_var, f)
+ return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
? TRNS_CONTINUE : TRNS_DROP_CASE);
}
}
-void
+void
dataset_set_dict (struct dataset *ds, struct dictionary *dict)
{
+ dict_copy_callbacks (dict, ds->dict);
ds->dict = dict;
+
+ if ( ds->replace_dict )
+ ds->replace_dict (dict);
}
-int
+int
dataset_n_lag (const struct dataset *ds)
{
return ds->n_lag;
}
-void
+void
dataset_set_n_lag (struct dataset *ds, int n_lag)
{
ds->n_lag = n_lag;
}
+struct casefile_factory *
+dataset_get_casefile_factory (const struct dataset *ds)
+{
+ return ds->cf_factory;
+}
+