#include <libpspp/misc.h>
#include <libpspp/str.h>
-/* Procedure execution data. */
-struct write_case_data
- {
- /* Function to call for each case. */
- case_func *proc;
- void *aux;
-
- struct dataset *dataset; /* The dataset concerned */
- struct ccase trns_case; /* Case used for transformations. */
- struct ccase sink_case; /* Case written to sink, if
- compacting is necessary. */
- size_t cases_written; /* Cases output so far. */
- };
-
struct dataset {
/* Cases are read from proc_source,
pass through permanent_trns_chain (which transforms them into
int lag_head; /* Index where next case will be added. */
struct ccase *lag_queue; /* Array of n_lag ccase * elements. */
+ /* Procedure data. */
+ bool is_open; /* Procedure open? */
+ struct ccase trns_case; /* Case used for transformations. */
+ struct ccase sink_case; /* Case written to sink, if
+ compacting is necessary. */
+ size_t cases_written; /* Cases output so far. */
+ bool ok;
}; /* struct dataset */
static void update_last_proc_invocation (struct dataset *ds);
static void create_trns_case (struct ccase *, struct dictionary *);
static void open_active_file (struct dataset *ds);
-static bool write_case (struct write_case_data *wc_data);
static void lag_case (struct dataset *ds, const struct ccase *c);
static void clear_case (const struct dataset *ds, struct ccase *c);
static bool close_active_file (struct dataset *ds);
bool
procedure (struct dataset *ds, case_func *cf, void *aux)
{
+ update_last_proc_invocation (ds);
+
+ /* Optimize the trivial case where we're not going to do
+ anything with the data, by not reading the data at all. */
+ if (cf == NULL
+ && case_source_is_class (ds->proc_source, &storage_source_class)
+ && ds->proc_sink == NULL
+ && (ds->temporary_trns_chain == NULL
+ || trns_chain_is_empty (ds->temporary_trns_chain))
+ && trns_chain_is_empty (ds->permanent_trns_chain))
+ {
+ ds->n_lag = 0;
+ dict_set_case_limit (ds->dict, 0);
+ dict_clear_vectors (ds->dict);
+ return true;
+ }
+
return internal_procedure (ds, cf, NULL, aux);
}
\f
\f
/* Procedure implementation. */
-
/* Executes a procedure.
Passes each case to CASE_FUNC.
Calls END_FUNC after the last case.
end_func *end,
void *aux)
{
- struct write_case_data wc_data;
+ struct ccase *c;
bool ok = true;
+
+ proc_open (ds);
+ while (ok && proc_read (ds, &c))
+ if (proc != NULL)
+ ok = proc (c, aux, ds) && ok;
+ if (end != NULL)
+ ok = end (aux, ds) && ok;
+ return proc_close (ds) && ok;
+}
+/* Opens dataset DS for reading cases with proc_read.
+ proc_close must be called when done. */
+void
+proc_open (struct dataset *ds)
+{
assert (ds->proc_source != NULL);
+ assert (!ds->is_open);
update_last_proc_invocation (ds);
- /* Optimize the trivial case where we're not going to do
- anything with the data, by not reading the data at all. */
- if (proc == NULL && end == NULL
- && case_source_is_class (ds->proc_source, &storage_source_class)
- && ds->proc_sink == NULL
- && (ds->temporary_trns_chain == NULL
- || trns_chain_is_empty (ds->temporary_trns_chain))
- && trns_chain_is_empty (ds->permanent_trns_chain))
+ open_active_file (ds);
+
+ ds->is_open = true;
+ create_trns_case (&ds->trns_case, ds->dict);
+ case_create (&ds->sink_case, dict_get_compacted_value_cnt (ds->dict));
+ ds->cases_written = 0;
+ ds->ok = true;
+}
+
+/* Reads the next case from dataset DS, which must have been
+ opened for reading with proc_open.
+ Returns true if successful, in which case a pointer to the
+ case is stored in *C.
+ Return false at end of file or if a read error occurs. In
+ this case a null pointer is stored in *C. */
+bool
+proc_read (struct dataset *ds, struct ccase **c)
+{
+ enum trns_result retval = TRNS_DROP_CASE;
+
+ assert (ds->is_open);
+ *c = NULL;
+ for (;;)
{
- ds->n_lag = 0;
- dict_set_case_limit (ds->dict, 0);
- dict_clear_vectors (ds->dict);
- return true;
- }
+ size_t case_nr;
+
+ assert (retval == TRNS_DROP_CASE || retval == TRNS_ERROR);
+ if (retval == TRNS_ERROR)
+ ds->ok = false;
+ if (!ds->ok)
+ return false;
+
+ /* Read a case from proc_source. */
+ clear_case (ds, &ds->trns_case);
+ if (!ds->proc_source->class->read (ds->proc_source, &ds->trns_case))
+ return false;
+
+ /* Execute permanent transformations. */
+ case_nr = ds->cases_written + 1;
+ retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
+ &ds->trns_case, &case_nr);
+ if (retval != TRNS_CONTINUE)
+ continue;
- open_active_file (ds);
+ /* Write case to LAG queue. */
+ if (ds->n_lag)
+ lag_case (ds, &ds->trns_case);
+
+ /* Write case to replacement active file. */
+ ds->cases_written++;
+ if (ds->proc_sink->class->write != NULL)
+ {
+ if (ds->compactor != NULL)
+ {
+ dict_compactor_compact (ds->compactor, &ds->sink_case,
+ &ds->trns_case);
+ ds->proc_sink->class->write (ds->proc_sink, &ds->sink_case);
+ }
+ else
+ ds->proc_sink->class->write (ds->proc_sink, &ds->trns_case);
+ }
- wc_data.proc = proc;
- wc_data.aux = aux;
- wc_data.dataset = ds;
- create_trns_case (&wc_data.trns_case, ds->dict);
- case_create (&wc_data.sink_case,
- dict_get_compacted_value_cnt (ds->dict));
- wc_data.cases_written = 0;
-
- ok = ds->proc_source->class->read (ds->proc_source,
- &wc_data.trns_case,
- write_case, &wc_data) && ok;
- if (end != NULL)
- ok = end (aux, ds) && ok;
+ /* Execute temporary transformations. */
+ if (ds->temporary_trns_chain != NULL)
+ {
+ retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
+ &ds->trns_case, &ds->cases_written);
+ if (retval != TRNS_CONTINUE)
+ continue;
+ }
- case_destroy (&wc_data.sink_case);
- case_destroy (&wc_data.trns_case);
+ *c = &ds->trns_case;
+ return true;
+ }
+}
- ok = close_active_file (ds) && ok;
+/* Closes dataset DS for reading.
+ Returns true if successful, false if an I/O error occurred
+ while reading or closing the data set.
+ If DS has not been opened, returns true without doing
+ anything else. */
+bool
+proc_close (struct dataset *ds)
+{
+ if (!ds->is_open)
+ return true;
- return ok;
+ /* Drain any remaining cases. */
+ while (ds->ok)
+ {
+ struct ccase *c;
+ if (!proc_read (ds, &c))
+ break;
+ }
+
+ ds->ok = free_case_source (ds->proc_source) && ds->ok;
+ ds->proc_source = NULL;
+
+ case_destroy (&ds->sink_case);
+ case_destroy (&ds->trns_case);
+
+ ds->ok = close_active_file (ds) && ds->ok;
+ ds->is_open = false;
+
+ return ds->ok;
}
/* Updates last_proc_invocation. */
}
}
-/* Transforms trns_case and writes it to the replacement active
- file if advisable. Returns true if more cases can be
- accepted, false otherwise. Do not call this function again
- after it has returned false once. */
-static bool
-write_case (struct write_case_data *wc_data)
-{
- enum trns_result retval;
- size_t case_nr;
-
- struct dataset *ds = wc_data->dataset;
-
- /* Execute permanent transformations. */
- case_nr = wc_data->cases_written + 1;
- retval = trns_chain_execute (ds->permanent_trns_chain,
- &wc_data->trns_case, &case_nr);
- if (retval != TRNS_CONTINUE)
- goto done;
-
- /* Write case to LAG queue. */
- if (ds->n_lag)
- lag_case (ds, &wc_data->trns_case);
-
- /* Write case to replacement active file. */
- wc_data->cases_written++;
- if (ds->proc_sink->class->write != NULL)
- {
- if (ds->compactor != NULL)
- {
- dict_compactor_compact (ds->compactor, &wc_data->sink_case,
- &wc_data->trns_case);
- ds->proc_sink->class->write (ds->proc_sink, &wc_data->sink_case);
- }
- else
- ds->proc_sink->class->write (ds->proc_sink, &wc_data->trns_case);
- }
-
- /* Execute temporary transformations. */
- if (ds->temporary_trns_chain != NULL)
- {
- retval = trns_chain_execute (ds->temporary_trns_chain,
- &wc_data->trns_case,
- &wc_data->cases_written);
- if (retval != TRNS_CONTINUE)
- goto done;
- }
-
- /* Pass case to procedure. */
- if (wc_data->proc != NULL)
- if (!wc_data->proc (&wc_data->trns_case, wc_data->aux, ds))
- retval = TRNS_ERROR;
-
- done:
- clear_case (ds, &wc_data->trns_case);
- return retval != TRNS_ERROR;
-}
-
/* Add C to the lag queue. */
static void
lag_case (struct dataset *ds, const struct ccase *c)
ds->compactor = NULL;
}
- /* Free data source. */
- free_case_source (ds->proc_source);
- ds->proc_source = NULL;
-
/* Old data sink becomes new data source. */
if (ds->proc_sink->class->make_source != NULL)
ds->proc_source = ds->proc_sink->class->make_source (ds->proc_sink);