/* PSPP - computes sample statistics.
- Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
- Written by Ben Pfaff <blp@gnu.org>.
+ Copyright (C) 1997-9, 2000, 2006, 2007 Free Software Foundation, Inc.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License as
#include <libpspp/misc.h>
#include <libpspp/str.h>
-/* Procedure execution data. */
-struct write_case_data
- {
- /* Function to call for each case. */
- case_func_t case_func;
- void *aux;
+struct dataset {
- struct dataset *dataset; /* The dataset concerned */
- struct ccase trns_case; /* Case used for transformations. */
- struct ccase sink_case; /* Case written to sink, if
- compacting is necessary. */
- size_t cases_written; /* Cases output so far. */
- };
+ /* An abstract factory which creates casefiles */
+ struct casefile_factory *cf_factory;
+
+ /* Callback which occurs when a procedure provides a new source for
+ the dataset */
+ replace_source_callback *replace_source ;
+
+ /* Callback which occurs whenever the DICT is replaced by a new one */
+ replace_dictionary_callback *replace_dict;
-struct dataset {
/* Cases are read from proc_source,
pass through permanent_trns_chain (which transforms them into
the format described by permanent_dict),
int lag_head; /* Index where next case will be added. */
struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
+ /* Procedure data. */
+ bool is_open; /* Procedure open? */
+ struct ccase trns_case; /* Case used for transformations. */
+ struct ccase sink_case; /* Case written to sink, if
+ compacting is necessary. */
+ size_t cases_written; /* Cases output so far. */
+ bool ok;
}; /* struct dataset */
-struct dataset *current_dataset;
-
static void add_case_limit_trns (struct dataset *ds);
static void add_filter_trns (struct dataset *ds);
-static bool internal_procedure (struct dataset *ds, case_func_t,
- bool (*end_func) (void *),
+static bool internal_procedure (struct dataset *ds, case_func *,
+ end_func *,
void *aux);
static void update_last_proc_invocation (struct dataset *ds);
static void create_trns_case (struct ccase *, struct dictionary *);
static void open_active_file (struct dataset *ds);
-static bool write_case (struct write_case_data *wc_data);
static void lag_case (struct dataset *ds, const struct ccase *c);
static void clear_case (const struct dataset *ds, struct ccase *c);
static bool close_active_file (struct dataset *ds);
/* Returns the last time the data was read. */
time_t
-time_of_last_procedure (struct dataset *ds)
+time_of_last_procedure (struct dataset *ds)
{
if (ds->last_proc_invocation == 0)
update_last_proc_invocation (ds);
start the next case from step 1.
2. Write case to replacement active file.
-
+
3. Execute temporary transformations. If these drop the case,
start the next case from step 1.
-
+
4. Pass case to PROC_FUNC, passing AUX as auxiliary data.
Returns true if successful, false if an I/O error occurred. */
bool
-procedure (struct dataset *ds, case_func_t cf, void *aux)
+procedure (struct dataset *ds, case_func *cf, void *aux)
{
+ update_last_proc_invocation (ds);
+
+ /* Optimize the trivial case where we're not going to do
+ anything with the data, by not reading the data at all. */
+ if (cf == NULL
+ && case_source_is_class (ds->proc_source, &storage_source_class)
+ && ds->proc_sink == NULL
+ && (ds->temporary_trns_chain == NULL
+ || trns_chain_is_empty (ds->temporary_trns_chain))
+ && trns_chain_is_empty (ds->permanent_trns_chain))
+ {
+ ds->n_lag = 0;
+ dict_set_case_limit (ds->dict, 0);
+ dict_clear_vectors (ds->dict);
+ return true;
+ }
+
return internal_procedure (ds, cf, NULL, aux);
}
\f
/* Multipass procedure. */
-struct multipass_aux_data
+struct multipass_aux_data
{
struct casefile *casefile;
-
+
bool (*proc_func) (const struct casefile *, void *aux);
void *aux;
};
/* Case processing function for multipass_procedure(). */
static bool
-multipass_case_func (const struct ccase *c, void *aux_data_)
+multipass_case_func (const struct ccase *c, void *aux_data_, const struct dataset *ds UNUSED)
{
struct multipass_aux_data *aux_data = aux_data_;
return casefile_append (aux_data->casefile, c);
/* End-of-file function for multipass_procedure(). */
static bool
-multipass_end_func (void *aux_data_)
+multipass_end_func (void *aux_data_, const struct dataset *ds UNUSED)
{
struct multipass_aux_data *aux_data = aux_data_;
return (aux_data->proc_func == NULL
The entire active file is passed to PROC_FUNC, with the given
AUX as auxiliary data, as a unit. */
bool
-multipass_procedure (struct dataset *ds, casefile_func_t proc_func, void *aux)
+multipass_procedure (struct dataset *ds, casefile_func *proc_func, void *aux)
{
struct multipass_aux_data aux_data;
bool ok;
- aux_data.casefile = fastfile_create (dict_get_next_value_idx (ds->dict));
+ aux_data.casefile =
+ ds->cf_factory->create_casefile (ds->cf_factory,
+ dict_get_next_value_idx (ds->dict));
+
aux_data.proc_func = proc_func;
aux_data.aux = aux;
return ok;
}
\f
-/* Procedure implementation. */
+/* Procedure implementation. */
/* Executes a procedure.
Passes each case to CASE_FUNC.
Returns true if successful, false if an I/O error occurred (or
if CASE_FUNC or END_FUNC ever returned false). */
static bool
-internal_procedure (struct dataset *ds, case_func_t case_func,
- bool (*end_func) (void *),
- void *aux)
+internal_procedure (struct dataset *ds, case_func *proc,
+ end_func *end,
+ void *aux)
{
- struct write_case_data wc_data;
+ struct ccase *c;
bool ok = true;
+ proc_open (ds);
+ while (ok && proc_read (ds, &c))
+ if (proc != NULL)
+ ok = proc (c, aux, ds) && ok;
+ if (end != NULL)
+ ok = end (aux, ds) && ok;
+
+ if ( proc_close (ds) && ok )
+ {
+
+ return true;
+ }
+
+ return false;
+}
+
+/* Opens dataset DS for reading cases with proc_read.
+ proc_close must be called when done. */
+void
+proc_open (struct dataset *ds)
+{
assert (ds->proc_source != NULL);
+ assert (!ds->is_open);
update_last_proc_invocation (ds);
- /* Optimize the trivial case where we're not going to do
- anything with the data, by not reading the data at all. */
- if (case_func == NULL && end_func == NULL
- && case_source_is_class (ds->proc_source, &storage_source_class)
- && ds->proc_sink == NULL
- && (ds->temporary_trns_chain == NULL
- || trns_chain_is_empty (ds->temporary_trns_chain))
- && trns_chain_is_empty (ds->permanent_trns_chain))
+ open_active_file (ds);
+
+ ds->is_open = true;
+ create_trns_case (&ds->trns_case, ds->dict);
+ case_create (&ds->sink_case, dict_get_compacted_value_cnt (ds->dict));
+ ds->cases_written = 0;
+ ds->ok = true;
+}
+
+/* Reads the next case from dataset DS, which must have been
+ opened for reading with proc_open.
+ Returns true if successful, in which case a pointer to the
+ case is stored in *C.
+ Return false at end of file or if a read error occurs. In
+ this case a null pointer is stored in *C. */
+bool
+proc_read (struct dataset *ds, struct ccase **c)
+{
+ enum trns_result retval = TRNS_DROP_CASE;
+
+ assert (ds->is_open);
+ *c = NULL;
+ for (;;)
{
- ds->n_lag = 0;
- dict_set_case_limit (ds->dict, 0);
- dict_clear_vectors (ds->dict);
+ size_t case_nr;
+
+ assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
+ if (retval == TRNS_ERROR)
+ ds->ok = false;
+ if (!ds->ok)
+ return false;
+
+ /* Read a case from proc_source. */
+ clear_case (ds, &ds->trns_case);
+ if (!ds->proc_source->class->read (ds->proc_source, &ds->trns_case))
+ return false;
+
+ /* Execute permanent transformations. */
+ case_nr = ds->cases_written + 1;
+ retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
+ &ds->trns_case, &case_nr);
+ if (retval != TRNS_CONTINUE)
+ continue;
+
+ /* Write case to LAG queue. */
+ if (ds->n_lag)
+ lag_case (ds, &ds->trns_case);
+
+ /* Write case to replacement active file. */
+ ds->cases_written++;
+ if (ds->proc_sink->class->write != NULL)
+ {
+ if (ds->compactor != NULL)
+ {
+ dict_compactor_compact (ds->compactor, &ds->sink_case,
+ &ds->trns_case);
+ ds->proc_sink->class->write (ds->proc_sink, &ds->sink_case);
+ }
+ else
+ ds->proc_sink->class->write (ds->proc_sink, &ds->trns_case);
+ }
+
+ /* Execute temporary transformations. */
+ if (ds->temporary_trns_chain != NULL)
+ {
+ retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
+ &ds->trns_case, &ds->cases_written);
+ if (retval != TRNS_CONTINUE)
+ continue;
+ }
+
+ *c = &ds->trns_case;
return true;
}
-
- open_active_file (ds);
-
- wc_data.case_func = case_func;
- wc_data.aux = aux;
- wc_data.dataset = ds;
- create_trns_case (&wc_data.trns_case, ds->dict);
- case_create (&wc_data.sink_case,
- dict_get_compacted_value_cnt (ds->dict));
- wc_data.cases_written = 0;
-
- ok = ds->proc_source->class->read (ds->proc_source,
- &wc_data.trns_case,
- write_case, &wc_data) && ok;
- if (end_func != NULL)
- ok = end_func (aux) && ok;
-
- case_destroy (&wc_data.sink_case);
- case_destroy (&wc_data.trns_case);
-
- ok = close_active_file (ds) && ok;
+}
- return ok;
+/* Closes dataset DS for reading.
+ Returns true if successful, false if an I/O error occurred
+ while reading or closing the data set.
+ If DS has not been opened, returns true without doing
+ anything else. */
+bool
+proc_close (struct dataset *ds)
+{
+ if (!ds->is_open)
+ return true;
+
+ /* Drain any remaining cases. */
+ while (ds->ok)
+ {
+ struct ccase *c;
+ if (!proc_read (ds, &c))
+ break;
+ }
+ ds->ok = free_case_source (ds->proc_source) && ds->ok;
+ proc_set_source (ds, NULL);
+
+ case_destroy (&ds->sink_case);
+ case_destroy (&ds->trns_case);
+
+ ds->ok = close_active_file (ds) && ds->ok;
+ ds->is_open = false;
+
+ return ds->ok;
}
/* Updates last_proc_invocation. */
static void
-update_last_proc_invocation (struct dataset *ds)
+update_last_proc_invocation (struct dataset *ds)
{
ds->last_proc_invocation = time (NULL);
}
size_t i;
case_create (trns_case, dict_get_next_value_idx (dict));
- for (i = 0; i < var_cnt; i++)
+ for (i = 0; i < var_cnt; i++)
{
struct variable *v = dict_get_var (dict, i);
- union value *value = case_data_rw (trns_case, v->fv);
+ union value *value = case_data_rw (trns_case, v);
- if (v->type == NUMERIC)
- value->f = v->leave ? 0.0 : SYSMIS;
+ if (var_is_numeric (v))
+ value->f = var_get_leave (v) ? 0.0 : SYSMIS;
else
- memset (value->s, ' ', v->width);
+ memset (value->s, ' ', var_get_width (v));
}
}
ds->permanent_dict = ds->dict;
/* Figure out whether to compact. */
- ds->compactor =
+ ds->compactor =
(dict_compacting_would_shrink (ds->permanent_dict)
? dict_make_compactor (ds->permanent_dict)
: NULL);
/* Prepare sink. */
if (ds->proc_sink == NULL)
- ds->proc_sink = create_case_sink (&storage_sink_class, ds->permanent_dict, NULL);
+ ds->proc_sink = create_case_sink (&storage_sink_class,
+ ds->permanent_dict,
+ ds->cf_factory,
+ NULL);
if (ds->proc_sink->class->open != NULL)
ds->proc_sink->class->open (ds->proc_sink);
if (ds->n_lag > 0)
{
int i;
-
+
ds->lag_count = 0;
ds->lag_head = 0;
ds->lag_queue = xnmalloc (ds->n_lag, sizeof *ds->lag_queue);
}
}
-/* Transforms trns_case and writes it to the replacement active
- file if advisable. Returns true if more cases can be
- accepted, false otherwise. Do not call this function again
- after it has returned false once. */
-static bool
-write_case (struct write_case_data *wc_data)
-{
- enum trns_result retval;
- size_t case_nr;
-
- struct dataset *ds = wc_data->dataset;
-
- /* Execute permanent transformations. */
- case_nr = wc_data->cases_written + 1;
- retval = trns_chain_execute (ds->permanent_trns_chain,
- &wc_data->trns_case, &case_nr);
- if (retval != TRNS_CONTINUE)
- goto done;
-
- /* Write case to LAG queue. */
- if (ds->n_lag)
- lag_case (ds, &wc_data->trns_case);
-
- /* Write case to replacement active file. */
- wc_data->cases_written++;
- if (ds->proc_sink->class->write != NULL)
- {
- if (ds->compactor != NULL)
- {
- dict_compactor_compact (ds->compactor, &wc_data->sink_case,
- &wc_data->trns_case);
- ds->proc_sink->class->write (ds->proc_sink, &wc_data->sink_case);
- }
- else
- ds->proc_sink->class->write (ds->proc_sink, &wc_data->trns_case);
- }
-
- /* Execute temporary transformations. */
- if (ds->temporary_trns_chain != NULL)
- {
- retval = trns_chain_execute (ds->temporary_trns_chain,
- &wc_data->trns_case,
- &wc_data->cases_written);
- if (retval != TRNS_CONTINUE)
- goto done;
- }
-
- /* Pass case to procedure. */
- if (wc_data->case_func != NULL)
- if (!wc_data->case_func (&wc_data->trns_case, wc_data->aux))
- retval = TRNS_ERROR;
-
- done:
- clear_case (ds, &wc_data->trns_case);
- return retval != TRNS_ERROR;
-}
-
/* Add C to the lag queue. */
static void
lag_case (struct dataset *ds, const struct ccase *c)
{
size_t var_cnt = dict_get_var_cnt (ds->dict);
size_t i;
-
- for (i = 0; i < var_cnt; i++)
+
+ for (i = 0; i < var_cnt; i++)
{
struct variable *v = dict_get_var (ds->dict, i);
- if (!v->leave)
+ if (!var_get_leave (v))
{
- if (v->type == NUMERIC)
- case_data_rw (c, v->fv)->f = SYSMIS;
+ if (var_is_numeric (v))
+ case_data_rw (c, v)->f = SYSMIS;
else
- memset (case_data_rw (c, v->fv)->s, ' ', v->width);
- }
+ memset (case_data_rw (c, v)->s, ' ', var_get_width (v));
+ }
}
}
if (ds->n_lag > 0)
{
int i;
-
+
for (i = 0; i < ds->n_lag; i++)
case_destroy (&ds->lag_queue[i]);
free (ds->lag_queue);
ds->n_lag = 0;
}
-
+
/* Dictionary from before TEMPORARY becomes permanent. */
proc_cancel_temporary_transformations (ds);
/* Finish compacting. */
- if (ds->compactor != NULL)
+ if (ds->compactor != NULL)
{
dict_compactor_destroy (ds->compactor);
dict_compact_values (ds->dict);
ds->compactor = NULL;
}
-
- /* Free data source. */
- free_case_source (ds->proc_source);
- ds->proc_source = NULL;
/* Old data sink becomes new data source. */
if (ds->proc_sink->class->make_source != NULL)
- ds->proc_source = ds->proc_sink->class->make_source (ds->proc_sink);
+ proc_set_source (ds, ds->proc_sink->class->make_source (ds->proc_sink) );
free_case_sink (ds->proc_sink);
ds->proc_sink = NULL;
/* Procedure that separates the data into SPLIT FILE groups. */
/* Represents auxiliary data for handling SPLIT FILE. */
-struct split_aux_data
+struct split_aux_data
{
struct dataset *dataset; /* The dataset */
struct ccase prev_case; /* Data in previous case. */
/* Callback functions. */
- begin_func_t begin_func ;
- case_func_t proc_func ;
- void (*end_func) (void *);
+ begin_func *begin;
+ case_func *proc;
+ end_func *end;
void *func_aux;
};
static int equal_splits (const struct ccase *, const struct ccase *, const struct dataset *ds);
-static bool split_procedure_case_func (const struct ccase *c, void *);
-static bool split_procedure_end_func (void *);
+static bool split_procedure_case_func (const struct ccase *c, void *, const struct dataset *);
+static bool split_procedure_end_func (void *, const struct dataset *);
/* Like procedure(), but it automatically breaks the case stream
into SPLIT FILE break groups. Before each group of cases with
passed to each of the functions as auxiliary data.
If the active file is empty, none of BEGIN_FUNC, PROC_FUNC,
- and END_FUNC will be called at all.
+ and END_FUNC will be called at all.
If SPLIT FILE is not in effect, then there is one break group
(if the active file is nonempty), and BEGIN_FUNC and END_FUNC
will be called once.
-
+
Returns true if successful, false if an I/O error occurred. */
bool
procedure_with_splits (struct dataset *ds,
- begin_func_t begin_func,
- case_func_t proc_func,
- void (*end_func) (void *aux),
- void *func_aux)
+ begin_func begin,
+ case_func *proc,
+ end_func *end,
+ void *func_aux)
{
struct split_aux_data split_aux;
bool ok;
case_nullify (&split_aux.prev_case);
- split_aux.begin_func = begin_func;
- split_aux.proc_func = proc_func;
- split_aux.end_func = end_func;
+ split_aux.begin = begin;
+ split_aux.proc = proc;
+ split_aux.end = end;
split_aux.func_aux = func_aux;
split_aux.dataset = ds;
/* Case callback used by procedure_with_splits(). */
static bool
-split_procedure_case_func (const struct ccase *c, void *split_aux_)
+split_procedure_case_func (const struct ccase *c, void *split_aux_, const struct dataset *ds)
{
struct split_aux_data *split_aux = split_aux_;
if (case_is_null (&split_aux->prev_case)
|| !equal_splits (c, &split_aux->prev_case, split_aux->dataset))
{
- if (!case_is_null (&split_aux->prev_case) && split_aux->end_func != NULL)
- split_aux->end_func (split_aux->func_aux);
+ if (!case_is_null (&split_aux->prev_case) && split_aux->end != NULL)
+ split_aux->end (split_aux->func_aux, ds);
case_destroy (&split_aux->prev_case);
case_clone (&split_aux->prev_case, c);
- if (split_aux->begin_func != NULL)
- split_aux->begin_func (&split_aux->prev_case, split_aux->func_aux);
-
+ if (split_aux->begin != NULL)
+ split_aux->begin (&split_aux->prev_case, split_aux->func_aux, ds);
}
- return (split_aux->proc_func == NULL
- || split_aux->proc_func (c, split_aux->func_aux));
+ return (split_aux->proc == NULL
+ || split_aux->proc (c, split_aux->func_aux, ds));
}
/* End-of-file callback used by procedure_with_splits(). */
static bool
-split_procedure_end_func (void *split_aux_)
+split_procedure_end_func (void *split_aux_, const struct dataset *ds)
{
struct split_aux_data *split_aux = split_aux_;
- if (!case_is_null (&split_aux->prev_case) && split_aux->end_func != NULL)
- split_aux->end_func (split_aux->func_aux);
+ if (!case_is_null (&split_aux->prev_case) && split_aux->end != NULL)
+ split_aux->end (split_aux->func_aux, ds);
return true;
}
/* Compares the SPLIT FILE variables in cases A and B and returns
nonzero only if they differ. */
static int
-equal_splits (const struct ccase *a, const struct ccase *b,
- const struct dataset *ds)
+equal_splits (const struct ccase *a, const struct ccase *b,
+ const struct dataset *ds)
{
return case_compare (a, b,
dict_get_split_vars (ds->dict),
/* Represents auxiliary data for handling SPLIT FILE in a
multipass procedure. */
-struct multipass_split_aux_data
+struct multipass_split_aux_data
{
struct dataset *dataset; /* The dataset of the split */
struct ccase prev_case; /* Data in previous case. */
struct casefile *casefile; /* Accumulates data for a split. */
-
- /* Function to call with the accumulated data. */
- bool (*split_func) (const struct ccase *first, const struct casefile *,
- void *);
- void *func_aux; /* Auxiliary data. */
+ split_func *split; /* Function to call with the accumulated
+ data. */
+ void *func_aux; /* Auxiliary data. */
};
-static bool multipass_split_case_func (const struct ccase *c, void *aux_);
-static bool multipass_split_end_func (void *aux_);
-static bool multipass_split_output (struct multipass_split_aux_data *);
+static bool multipass_split_case_func (const struct ccase *c, void *aux_, const struct dataset *);
+static bool multipass_split_end_func (void *aux_, const struct dataset *ds);
+static bool multipass_split_output (struct multipass_split_aux_data *, const struct dataset *ds);
/* Returns true if successful, false if an I/O error occurred. */
bool
-multipass_procedure_with_splits (struct dataset *ds,
- bool (*split_func) (const struct ccase *first,
- const struct casefile *,
- void *aux),
+multipass_procedure_with_splits (struct dataset *ds,
+ split_func *split,
void *func_aux)
{
struct multipass_split_aux_data aux;
case_nullify (&aux.prev_case);
aux.casefile = NULL;
- aux.split_func = split_func;
+ aux.split = split;
aux.func_aux = func_aux;
aux.dataset = ds;
/* Case callback used by multipass_procedure_with_splits(). */
static bool
-multipass_split_case_func (const struct ccase *c, void *aux_)
+multipass_split_case_func (const struct ccase *c, void *aux_, const struct dataset *ds)
{
struct multipass_split_aux_data *aux = aux_;
- struct dataset *ds = aux->dataset;
bool ok = true;
/* Start a new series if needed. */
/* Pass any cases to split_func. */
if (aux->casefile != NULL)
- ok = multipass_split_output (aux);
+ ok = multipass_split_output (aux, ds);
/* Start a new casefile. */
- aux->casefile =
- fastfile_create (dict_get_next_value_idx (ds->dict));
+ aux->casefile =
+ ds->cf_factory->create_casefile (ds->cf_factory,
+ dict_get_next_value_idx (ds->dict));
}
return casefile_append (aux->casefile, c) && ok;
/* End-of-file callback used by multipass_procedure_with_splits(). */
static bool
-multipass_split_end_func (void *aux_)
+multipass_split_end_func (void *aux_, const struct dataset *ds)
{
struct multipass_split_aux_data *aux = aux_;
- return (aux->casefile == NULL || multipass_split_output (aux));
+ return (aux->casefile == NULL || multipass_split_output (aux, ds));
}
static bool
-multipass_split_output (struct multipass_split_aux_data *aux)
+multipass_split_output (struct multipass_split_aux_data *aux, const struct dataset *ds)
{
bool ok;
-
+
assert (aux->casefile != NULL);
- ok = aux->split_func (&aux->prev_case, aux->casefile, aux->func_aux);
+ ok = aux->split (&aux->prev_case, aux->casefile, aux->func_aux, ds);
casefile_destroy (aux->casefile);
aux->casefile = NULL;
fh_set_default_handle (NULL);
ds->n_lag = 0;
-
+
free_case_source (ds->proc_source);
- ds->proc_source = NULL;
+ proc_set_source (ds, NULL);
proc_cancel_all_transformations (ds);
}
and clears the permanent transformations.
For use by INPUT PROGRAM. */
struct trns_chain *
-proc_capture_transformations (struct dataset *ds)
+proc_capture_transformations (struct dataset *ds)
{
struct trns_chain *chain;
-
+
assert (ds->temporary_trns_chain == NULL);
chain = ds->permanent_trns_chain;
ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
FINALIZE will be called.
The functions are passed AUX as auxiliary data. */
void
-add_transformation_with_finalizer (struct dataset *ds,
+add_transformation_with_finalizer (struct dataset *ds,
trns_finalize_func *finalize,
trns_proc_func *proc,
trns_free_func *free, void *aux)
This value can be returned by a transformation procedure
function to indicate a "jump" to that transformation. */
size_t
-next_transformation (const struct dataset *ds)
+next_transformation (const struct dataset *ds)
{
return trns_chain_next (ds->cur_trns_chain);
}
a temporary transformation, false if it will add a permanent
transformation. */
bool
-proc_in_temporary_transformations (const struct dataset *ds)
+proc_in_temporary_transformations (const struct dataset *ds)
{
return ds->temporary_trns_chain != NULL;
}
Further calls to add_transformation() will add temporary
transformations. */
void
-proc_start_temporary_transformations (struct dataset *ds)
+proc_start_temporary_transformations (struct dataset *ds)
{
if (!proc_in_temporary_transformations (ds))
{
add_case_limit_trns (ds);
ds->permanent_dict = dict_clone (ds->dict);
+
trns_chain_finalize (ds->permanent_trns_chain);
ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
}
permanent.
Returns true if anything changed, false otherwise. */
bool
-proc_make_temporary_transformations_permanent (struct dataset *ds)
+proc_make_temporary_transformations_permanent (struct dataset *ds)
{
- if (proc_in_temporary_transformations (ds))
+ if (proc_in_temporary_transformations (ds))
{
trns_chain_finalize (ds->temporary_trns_chain);
trns_chain_splice (ds->permanent_trns_chain, ds->temporary_trns_chain);
transformations will be permanent.
Returns true if anything changed, false otherwise. */
bool
-proc_cancel_temporary_transformations (struct dataset *ds)
+proc_cancel_temporary_transformations (struct dataset *ds)
{
- if (proc_in_temporary_transformations (ds))
+ if (proc_in_temporary_transformations (ds))
{
- dict_destroy (ds->dict);
- ds->dict = ds->permanent_dict;
+ dataset_set_dict (ds, ds->permanent_dict);
ds->permanent_dict = NULL;
trns_chain_destroy (ds->temporary_trns_chain);
\f
/* Initializes procedure handling. */
struct dataset *
-create_dataset (void)
+create_dataset (struct casefile_factory *fact,
+ replace_source_callback *rps,
+ replace_dictionary_callback *rds
+ )
{
struct dataset *ds = xzalloc (sizeof(*ds));
ds->dict = dict_create ();
+ ds->cf_factory = fact;
+ ds->replace_source = rps;
+ ds->replace_dict = rds;
proc_cancel_all_transformations (ds);
return ds;
}
{
discard_variables (ds);
dict_destroy (ds->dict);
+ trns_chain_destroy (ds->permanent_trns_chain);
free (ds);
}
/* Sets SINK as the destination for procedure output from the
next procedure. */
void
-proc_set_sink (struct dataset *ds, struct case_sink *sink)
+proc_set_sink (struct dataset *ds, struct case_sink *sink)
{
assert (ds->proc_sink == NULL);
ds->proc_sink = sink;
/* Sets SOURCE as the source for procedure input for the next
procedure. */
void
-proc_set_source (struct dataset *ds, struct case_source *source)
+proc_set_source (struct dataset *ds, struct case_source *source)
{
- assert (ds->proc_source == NULL);
ds->proc_source = source;
+
+ if ( ds->replace_source )
+ ds->replace_source (ds->proc_source);
}
/* Returns true if a source for the next procedure has been
configured, false otherwise. */
bool
-proc_has_source (const struct dataset *ds)
+proc_has_source (const struct dataset *ds)
{
return ds->proc_source != NULL;
}
The returned casefile is owned by the caller; it will not be
automatically used for the next procedure's input. */
struct casefile *
-proc_capture_output (struct dataset *ds)
+proc_capture_output (struct dataset *ds)
{
struct casefile *casefile;
assert (!proc_in_temporary_transformations (ds));
casefile = storage_source_decapsulate (ds->proc_source);
- ds->proc_source = NULL;
+ proc_set_source (ds, NULL);
return casefile;
}
/* Adds a transformation that limits the number of cases that may
pass through, if DS->DICT has a case limit. */
static void
-add_case_limit_trns (struct dataset *ds)
+add_case_limit_trns (struct dataset *ds)
{
size_t case_limit = dict_get_case_limit (ds->dict);
if (case_limit != 0)
*CASES_REMAINING. */
static int
case_limit_trns_proc (void *cases_remaining_,
- struct ccase *c UNUSED, casenum_t case_nr UNUSED)
+ struct ccase *c UNUSED, casenumber case_nr UNUSED)
{
size_t *cases_remaining = cases_remaining_;
- if (*cases_remaining > 0)
+ if (*cases_remaining > 0)
{
(*cases_remaining)--;
return TRNS_CONTINUE;
/* Frees the data associated with a case limit transformation. */
static bool
-case_limit_trns_free (void *cases_remaining_)
+case_limit_trns_free (void *cases_remaining_)
{
size_t *cases_remaining = cases_remaining_;
free (cases_remaining);
/* Adds a temporary transformation to filter data according to
the variable specified on FILTER, if any. */
static void
-add_filter_trns (struct dataset *ds)
+add_filter_trns (struct dataset *ds)
{
struct variable *filter_var = dict_get_filter (ds->dict);
- if (filter_var != NULL)
+ if (filter_var != NULL)
{
proc_start_temporary_transformations (ds);
add_transformation (ds, filter_trns_proc, NULL, filter_var);
/* FILTER transformation. */
static int
filter_trns_proc (void *filter_var_,
- struct ccase *c UNUSED, casenum_t case_nr UNUSED)
-
+ struct ccase *c UNUSED, casenumber case_nr UNUSED)
+
{
struct variable *filter_var = filter_var_;
- double f = case_num (c, filter_var->fv);
- return (f != 0.0 && !mv_is_num_missing (&filter_var->miss, f)
+ double f = case_num (c, filter_var);
+ return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
? TRNS_CONTINUE : TRNS_DROP_CASE);
}
}
-void
+/* Set or replace dataset DS's dictionary with DICT.
+ The old dictionary is destroyed */
+void
dataset_set_dict (struct dataset *ds, struct dictionary *dict)
{
+ struct dictionary *old_dict = ds->dict;
+
+ dict_copy_callbacks (dict, ds->dict);
ds->dict = dict;
+
+ if ( ds->replace_dict )
+ ds->replace_dict (dict);
+
+ dict_destroy (old_dict);
}
-int
+int
dataset_n_lag (const struct dataset *ds)
{
return ds->n_lag;
}
-void
+void
dataset_set_n_lag (struct dataset *ds, int n_lag)
{
ds->n_lag = n_lag;
}
+struct casefile_factory *
+dataset_get_casefile_factory (const struct dataset *ds)
+{
+ return ds->cf_factory;
+}
+