/* PSPP - a program for statistical analysis.
- Copyright (C) 1997-9, 2000, 2006, 2007 Free Software Foundation, Inc.
+ Copyright (C) 1997-9, 2000, 2006, 2007, 2009, 2010 Free Software Foundation, Inc.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
#include <config.h>
+#include "data/procedure.h"
+
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
-#include <data/case.h>
-#include <data/caseinit.h>
-#include <data/casereader.h>
-#include <data/casereader-provider.h>
-#include <data/casewriter.h>
-#include <data/dictionary.h>
-#include <data/file-handle-def.h>
-#include <data/procedure.h>
-#include <data/transformations.h>
-#include <data/variable.h>
-#include <libpspp/alloc.h>
-#include <libpspp/deque.h>
-#include <libpspp/misc.h>
-#include <libpspp/str.h>
-#include <libpspp/taint.h>
+#include "data/case.h"
+#include "data/case-map.h"
+#include "data/caseinit.h"
+#include "data/casereader.h"
+#include "data/casereader-provider.h"
+#include "data/casereader-shim.h"
+#include "data/casewriter.h"
+#include "data/dictionary.h"
+#include "data/file-handle-def.h"
+#include "data/transformations.h"
+#include "data/variable.h"
+#include "libpspp/deque.h"
+#include "libpspp/misc.h"
+#include "libpspp/str.h"
+#include "libpspp/taint.h"
+#include "libpspp/i18n.h"
+
+#include "gl/minmax.h"
+#include "gl/xalloc.h"
struct dataset {
/* Cases are read from source,
struct trns_chain *temporary_trns_chain;
struct dictionary *dict;
- /* Callback which occurs when a procedure provides a new source for
- the dataset */
- replace_source_callback *replace_source ;
-
- /* Callback which occurs whenever the DICT is replaced by a new one */
- replace_dictionary_callback *replace_dict;
+ /* Callback which occurs whenever the transformation chain(s) have
+ been modified */
+ transformation_change_callback_func *xform_callback;
+ void *xform_callback_aux;
/* If true, cases are discarded instead of being written to
sink. */
added to. */
struct trns_chain *cur_trns_chain;
- /* The compactor used to compact a case, if necessary;
+ /* The case map used to compact a case, if necessary;
otherwise a null pointer. */
- struct dict_compactor *compactor;
+ struct case_map *compactor;
/* Time at which proc was last invoked. */
time_t last_proc_invocation;
/* Cases just before ("lagging") the current one. */
int n_lag; /* Number of cases to lag. */
struct deque lag; /* Deque of lagged cases. */
- struct ccase *lag_cases; /* Lagged cases managed by deque. */
+ struct ccase **lag_cases; /* Lagged cases managed by deque. */
/* Procedure data. */
enum
but proc_commit not yet called. */
}
proc_state;
- casenumber cases_written; /* Cases output so far. */
- bool ok; /* Error status. */
+ casenumber cases_written; /* Cases output so far. */
+ bool ok; /* Error status. */
+ struct casereader_shim *shim; /* Shim on proc_open() casereader. */
+
+ void (*callback) (void *); /* Callback for when the dataset changes */
+ void *cb_data;
+
}; /* struct dataset */
static void add_filter_trns (struct dataset *ds);
static void update_last_proc_invocation (struct dataset *ds);
+
+static void
+dataset_set_unsaved (const struct dataset *ds)
+{
+ if (ds->callback) ds->callback (ds->cb_data);
+}
+
\f
/* Public functions. */
+void
+dataset_set_callback (struct dataset *ds, void (*cb) (void *), void *cb_data)
+{
+ ds->callback = cb;
+ ds->cb_data = cb_data;
+}
+
+
/* Returns the last time the data was read. */
time_t
time_of_last_procedure (struct dataset *ds)
return proc_commit (ds) && ok;
}
-static struct casereader_class proc_casereader_class;
+static const struct casereader_class proc_casereader_class;
+
+/* Opens dataset DS for reading cases with proc_read. If FILTER is true, then
+ cases filtered out with FILTER BY will not be included in the casereader
+ (which is usually desirable). If FILTER is false, all cases will be
+ included regardless of FILTER BY settings.
-/* Opens dataset DS for reading cases with proc_read.
proc_commit must be called when done. */
struct casereader *
-proc_open (struct dataset *ds)
+proc_open_filtering (struct dataset *ds, bool filter)
{
+ struct casereader *reader;
+
assert (ds->source != NULL);
assert (ds->proc_state == PROC_COMMITTED);
/* Finish up the collection of transformations. */
add_case_limit_trns (ds);
- add_filter_trns (ds);
+ if (filter)
+ add_filter_trns (ds);
trns_chain_finalize (ds->cur_trns_chain);
/* Make permanent_dict refer to the dictionary right before
/* Prepare sink. */
if (!ds->discard_output)
{
- ds->compactor = (dict_compacting_would_shrink (ds->permanent_dict)
- ? dict_make_compactor (ds->permanent_dict)
- : NULL);
- ds->sink = autopaging_writer_create (dict_get_compacted_value_cnt (
- ds->permanent_dict));
+ struct dictionary *pd = ds->permanent_dict;
+ size_t compacted_value_cnt = dict_count_values (pd, 1u << DC_SCRATCH);
+ if (compacted_value_cnt < dict_get_next_value_idx (pd))
+ {
+ struct caseproto *compacted_proto;
+ compacted_proto = dict_get_compacted_proto (pd, 1u << DC_SCRATCH);
+ ds->compactor = case_map_to_compact_dict (pd, 1u << DC_SCRATCH);
+ ds->sink = autopaging_writer_create (compacted_proto);
+ caseproto_unref (compacted_proto);
+ }
+ else
+ {
+ ds->compactor = NULL;
+ ds->sink = autopaging_writer_create (dict_get_proto (pd));
+ }
}
else
{
/* FIXME: use taint in dataset in place of `ok'? */
/* FIXME: for trivial cases we can just return a clone of
ds->source? */
- return casereader_create_sequential (NULL,
- dict_get_next_value_idx (ds->dict),
- CASENUMBER_MAX,
- &proc_casereader_class, ds);
+
+ /* Create casereader and insert a shim on top. The shim allows us to
+ arbitrarily extend the casereader's lifetime, by slurping the cases into
+ the shim's buffer in proc_commit(). That is especially useful when output
+ table_items are generated directly from the procedure casereader (e.g. by
+ the LIST procedure) when we are using an output driver that keeps a
+ reference to the output items passed to it (e.g. the GUI output driver in
+ PSPPIRE). */
+ reader = casereader_create_sequential (NULL, dict_get_proto (ds->dict),
+ CASENUMBER_MAX,
+ &proc_casereader_class, ds);
+ ds->shim = casereader_shim_insert (reader);
+ return reader;
+}
+
+/* Opens dataset DS for reading cases with proc_read.
+ proc_commit must be called when done. */
+struct casereader *
+proc_open (struct dataset *ds)
+{
+ return proc_open_filtering (ds, true);
}
/* Returns true if a procedure is in progress, that is, if
}
/* "read" function for procedure casereader. */
-static bool
-proc_casereader_read (struct casereader *reader UNUSED, void *ds_,
- struct ccase *c)
+static struct ccase *
+proc_casereader_read (struct casereader *reader UNUSED, void *ds_)
{
struct dataset *ds = ds_;
enum trns_result retval = TRNS_DROP_CASE;
+ struct ccase *c;
assert (ds->proc_state == PROC_OPEN);
- for (;;)
+ for (; ; case_unref (c))
{
casenumber case_nr;
if (retval == TRNS_ERROR)
ds->ok = false;
if (!ds->ok)
- return false;
+ return NULL;
/* Read a case from source. */
- if (!casereader_read (ds->source, c))
- return false;
- case_resize (c, dict_get_next_value_idx (ds->dict));
+ c = casereader_read (ds->source);
+ if (c == NULL)
+ return NULL;
+ c = case_unshare_and_resize (c, dict_get_proto (ds->dict));
caseinit_init_vars (ds->caseinit, c);
/* Execute permanent transformations. */
case_nr = ds->cases_written + 1;
retval = trns_chain_execute (ds->permanent_trns_chain, TRNS_CONTINUE,
- c, case_nr);
+ &c, case_nr);
caseinit_update_left_vars (ds->caseinit, c);
if (retval != TRNS_CONTINUE)
- {
- case_destroy (c);
- continue;
- }
+ continue;
/* Write case to collection of lagged cases. */
if (ds->n_lag > 0)
{
while (deque_count (&ds->lag) >= ds->n_lag)
- case_destroy (&ds->lag_cases[deque_pop_back (&ds->lag)]);
- case_clone (&ds->lag_cases[deque_push_front (&ds->lag)], c);
+ case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
+ ds->lag_cases[deque_push_front (&ds->lag)] = case_ref (c);
}
/* Write case to replacement active file. */
ds->cases_written++;
if (ds->sink != NULL)
- {
- struct ccase tmp;
- if (ds->compactor != NULL)
- {
- case_create (&tmp, dict_get_compacted_value_cnt (ds->dict));
- dict_compactor_compact (ds->compactor, &tmp, c);
- }
- else
- case_clone (&tmp, c);
- casewriter_write (ds->sink, &tmp);
- }
+ casewriter_write (ds->sink,
+ case_map_execute (ds->compactor, case_ref (c)));
/* Execute temporary transformations. */
if (ds->temporary_trns_chain != NULL)
{
retval = trns_chain_execute (ds->temporary_trns_chain, TRNS_CONTINUE,
- c, ds->cases_written);
+ &c, ds->cases_written);
if (retval != TRNS_CONTINUE)
- {
- case_destroy (c);
- continue;
- }
+ continue;
}
- return true;
+ return c;
}
}
proc_casereader_destroy (struct casereader *reader, void *ds_)
{
struct dataset *ds = ds_;
- struct ccase c;
+ struct ccase *c;
+
+ /* We are always the subreader for a casereader_buffer, so if we're being
+ destroyed then it's because the casereader_buffer has read all the cases
+ that it ever will. */
+ ds->shim = NULL;
/* Make sure transformations happen for every input case, in
case they have side effects, and ensure that the replacement
active file gets all the cases it should. */
- while (casereader_read (reader, &c))
- case_destroy (&c);
+ while ((c = casereader_read (reader)) != NULL)
+ case_unref (c);
ds->proc_state = PROC_CLOSED;
ds->ok = casereader_destroy (ds->source) && ds->ok;
bool
proc_commit (struct dataset *ds)
{
+ if (ds->shim != NULL)
+ casereader_shim_slurp (ds->shim);
+
assert (ds->proc_state == PROC_CLOSED);
ds->proc_state = PROC_COMMITTED;
+ dataset_set_unsaved (ds);
+
/* Free memory for lagged cases. */
while (!deque_is_empty (&ds->lag))
- case_destroy (&ds->lag_cases[deque_pop_back (&ds->lag)]);
+ case_unref (ds->lag_cases[deque_pop_back (&ds->lag)]);
free (ds->lag_cases);
/* Dictionary from before TEMPORARY becomes permanent. */
/* Finish compacting. */
if (ds->compactor != NULL)
{
- dict_compactor_destroy (ds->compactor);
- dict_compact_values (ds->dict);
+ case_map_destroy (ds->compactor);
ds->compactor = NULL;
+
+ dict_delete_scratch_vars (ds->dict);
+ dict_compact_values (ds->dict);
}
/* Old data sink becomes new data source. */
ds->discard_output = false;
}
ds->sink = NULL;
- if ( ds->replace_source) ds->replace_source (ds->source);
caseinit_clear (ds->caseinit);
caseinit_mark_as_preinited (ds->caseinit, ds->dict);
}
/* Casereader class for procedure execution. */
-static struct casereader_class proc_casereader_class =
+static const struct casereader_class proc_casereader_class =
{
proc_casereader_read,
proc_casereader_destroy,
\f
/* Returns a pointer to the lagged case from N_BEFORE cases before the
current one, or NULL if there haven't been that many cases yet. */
-struct ccase *
+const struct ccase *
lagged_case (const struct dataset *ds, int n_before)
{
assert (n_before >= 1);
assert (n_before <= ds->n_lag);
if (n_before <= deque_count (&ds->lag))
- return &ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
+ return ds->lag_cases[deque_front (&ds->lag, n_before - 1)];
else
return NULL;
}
assert (ds->temporary_trns_chain == NULL);
chain = ds->permanent_trns_chain;
ds->cur_trns_chain = ds->permanent_trns_chain = trns_chain_create ();
+
+ if ( ds->xform_callback)
+ ds->xform_callback (false, ds->xform_callback_aux);
+
return chain;
}
add_transformation (struct dataset *ds, trns_proc_func *proc, trns_free_func *free, void *aux)
{
trns_chain_append (ds->cur_trns_chain, NULL, proc, free, aux);
+ if ( ds->xform_callback)
+ ds->xform_callback (true, ds->xform_callback_aux);
}
/* Adds a transformation that processes a case with PROC and
trns_free_func *free, void *aux)
{
trns_chain_append (ds->cur_trns_chain, finalize, proc, free, aux);
+
+ if ( ds->xform_callback)
+ ds->xform_callback (true, ds->xform_callback_aux);
}
/* Returns the index of the next transformation.
trns_chain_finalize (ds->permanent_trns_chain);
ds->temporary_trns_chain = ds->cur_trns_chain = trns_chain_create ();
+
+ if ( ds->xform_callback)
+ ds->xform_callback (true, ds->xform_callback_aux);
}
}
dict_destroy (ds->dict);
ds->dict = ds->permanent_dict;
ds->permanent_dict = NULL;
- if (ds->replace_dict) ds->replace_dict (ds->dict);
trns_chain_destroy (ds->temporary_trns_chain);
ds->temporary_trns_chain = NULL;
+ if ( ds->xform_callback)
+ ds->xform_callback (!trns_chain_is_empty (ds->permanent_trns_chain),
+ ds->xform_callback_aux);
+
return true;
}
else
ok = trns_chain_destroy (ds->temporary_trns_chain) && ok;
ds->permanent_trns_chain = ds->cur_trns_chain = trns_chain_create ();
ds->temporary_trns_chain = NULL;
+ if ( ds->xform_callback)
+ ds->xform_callback (false, ds->xform_callback_aux);
+
return ok;
}
\f
+
+static void
+dict_callback (struct dictionary *d UNUSED, void *ds_)
+{
+ struct dataset *ds = ds_;
+ dataset_set_unsaved (ds);
+}
+
/* Initializes procedure handling. */
struct dataset *
-create_dataset (replace_source_callback *rps,
- replace_dictionary_callback *rds)
+create_dataset (void)
{
struct dataset *ds = xzalloc (sizeof(*ds));
ds->dict = dict_create ();
+
+ dict_set_change_callback (ds->dict, dict_callback, ds);
+
+ dict_set_encoding (ds->dict, get_default_encoding ());
+
ds->caseinit = caseinit_create ();
- ds->replace_source = rps;
- ds->replace_dict = rds;
proc_cancel_all_transformations (ds);
return ds;
}
+
+void
+dataset_add_transform_change_callback (struct dataset *ds,
+ transformation_change_callback_func *cb,
+ void *aux)
+{
+ ds->xform_callback = cb;
+ ds->xform_callback_aux = aux;
+}
+
/* Finishes up procedure handling. */
void
destroy_dataset (struct dataset *ds)
dict_destroy (ds->dict);
caseinit_destroy (ds->caseinit);
trns_chain_destroy (ds->permanent_trns_chain);
+
+ if ( ds->xform_callback)
+ ds->xform_callback (false, ds->xform_callback_aux);
free (ds);
}
casereader_destroy (ds->source);
ds->source = NULL;
- if ( ds->replace_source) ds->replace_source (NULL);
proc_cancel_all_transformations (ds);
}
dict_destroy (ds->dict);
ds->dict = dict;
- if ( ds->replace_dict) ds->replace_dict (dict);
+ dict_set_change_callback (ds->dict, dict_callback, ds);
proc_set_active_file_data (ds, source);
}
{
casereader_destroy (ds->source);
ds->source = reader;
- if (ds->replace_source) ds->replace_source (reader);
caseinit_clear (ds->caseinit);
caseinit_mark_as_preinited (ds->caseinit, ds->dict);
return ds->source != NULL;
}
+/* Returns the active file data source from DS, or a null pointer
+ if DS has no data source, and removes it from DS. */
+struct casereader *
+proc_extract_active_file_data (struct dataset *ds)
+{
+ struct casereader *reader = ds->source;
+ ds->source = NULL;
+
+ return reader;
+}
+
/* Checks whether DS has a corrupted active file. If so,
discards it and returns false. If not, returns true without
doing anything. */
else
{
const struct taint *taint = casereader_get_taint (ds->source);
- taint_reset_successor_taint ((struct taint *) taint);
+ taint_reset_successor_taint (CONST_CAST (struct taint *, taint));
assert (!taint_has_tainted_successor (taint));
}
}
static void
add_case_limit_trns (struct dataset *ds)
{
- size_t case_limit = dict_get_case_limit (ds->dict);
+ casenumber case_limit = dict_get_case_limit (ds->dict);
if (case_limit != 0)
{
- size_t *cases_remaining = xmalloc (sizeof *cases_remaining);
+ casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
*cases_remaining = case_limit;
add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
cases_remaining);
*CASES_REMAINING. */
static int
case_limit_trns_proc (void *cases_remaining_,
- struct ccase *c UNUSED, casenumber case_nr UNUSED)
+ struct ccase **c UNUSED, casenumber case_nr UNUSED)
{
size_t *cases_remaining = cases_remaining_;
if (*cases_remaining > 0)
/* FILTER transformation. */
static int
filter_trns_proc (void *filter_var_,
- struct ccase *c UNUSED, casenumber case_nr UNUSED)
+ struct ccase **c UNUSED, casenumber case_nr UNUSED)
{
struct variable *filter_var = filter_var_;
- double f = case_num (c, filter_var);
+ double f = case_num (*c, filter_var);
return (f != 0.0 && !var_is_num_missing (filter_var, f, MV_ANY)
? TRNS_CONTINUE : TRNS_DROP_CASE);
}