#include <unistd.h>
#include <data/case.h>
+#include <data/case-map.h>
#include <data/caseinit.h>
#include <data/casereader.h>
#include <data/casereader-provider.h>
#include <data/procedure.h>
#include <data/transformations.h>
#include <data/variable.h>
-#include <libpspp/alloc.h>
#include <libpspp/deque.h>
#include <libpspp/misc.h>
#include <libpspp/str.h>
#include <libpspp/taint.h>
+#include "xalloc.h"
struct dataset {
/* Cases are read from source,
struct trns_chain *temporary_trns_chain;
struct dictionary *dict;
- /* Callback which occurs when a procedure provides a new source for
- the dataset */
- replace_source_callback *replace_source ;
-
- /* Callback which occurs whenever the DICT is replaced by a new one */
- replace_dictionary_callback *replace_dict;
-
/* Callback which occurs whenever the transformation chain(s) have
been modified */
transformation_change_callback_func *xform_callback;
added to. */
struct trns_chain *cur_trns_chain;
- /* The compactor used to compact a case, if necessary;
+ /* The case map used to compact a case, if necessary;
otherwise a null pointer. */
- struct dict_compactor *compactor;
+ struct case_map *compactor;
/* Time at which proc was last invoked. */
time_t last_proc_invocation;
return proc_commit (ds) && ok;
}
-static struct casereader_class proc_casereader_class;
+static const struct casereader_class proc_casereader_class;
/* Opens dataset DS for reading cases with proc_read.
proc_commit must be called when done. */
/* Prepare sink. */
if (!ds->discard_output)
{
- ds->compactor = (dict_compacting_would_shrink (ds->permanent_dict)
- ? dict_make_compactor (ds->permanent_dict)
+ struct dictionary *pd = ds->permanent_dict;
+ size_t compacted_value_cnt = dict_count_values (pd, 1u << DC_SCRATCH);
+ bool should_compact = compacted_value_cnt < dict_get_next_value_idx (pd);
+ ds->compactor = (should_compact
+ ? case_map_to_compact_dict (pd, 1u << DC_SCRATCH)
: NULL);
- ds->sink = autopaging_writer_create (dict_get_compacted_value_cnt (
- ds->permanent_dict));
+ ds->sink = autopaging_writer_create (compacted_value_cnt);
}
else
{
{
struct ccase tmp;
if (ds->compactor != NULL)
- {
- case_create (&tmp, dict_get_compacted_value_cnt (ds->dict));
- dict_compactor_compact (ds->compactor, &tmp, c);
- }
+ case_map_execute (ds->compactor, c, &tmp);
else
case_clone (&tmp, c);
casewriter_write (ds->sink, &tmp);
/* Finish compacting. */
if (ds->compactor != NULL)
{
- dict_compactor_destroy (ds->compactor);
- dict_compact_values (ds->dict);
+ case_map_destroy (ds->compactor);
ds->compactor = NULL;
+
+ dict_delete_scratch_vars (ds->dict);
+ dict_compact_values (ds->dict);
}
/* Old data sink becomes new data source. */
ds->discard_output = false;
}
ds->sink = NULL;
- if ( ds->replace_source) ds->replace_source (ds->source);
caseinit_clear (ds->caseinit);
caseinit_mark_as_preinited (ds->caseinit, ds->dict);
}
/* Casereader class for procedure execution. */
-static struct casereader_class proc_casereader_class =
+static const struct casereader_class proc_casereader_class =
{
proc_casereader_read,
proc_casereader_destroy,
dict_destroy (ds->dict);
ds->dict = ds->permanent_dict;
ds->permanent_dict = NULL;
- if (ds->replace_dict) ds->replace_dict (ds->dict);
trns_chain_destroy (ds->temporary_trns_chain);
ds->temporary_trns_chain = NULL;
\f
/* Initializes procedure handling. */
struct dataset *
-create_dataset (transformation_change_callback_func *cb, void *aux)
+create_dataset (void)
{
struct dataset *ds = xzalloc (sizeof(*ds));
ds->dict = dict_create ();
ds->caseinit = caseinit_create ();
- ds->xform_callback = cb;
- ds->xform_callback_aux = aux;
proc_cancel_all_transformations (ds);
return ds;
}
casereader_destroy (ds->source);
ds->source = NULL;
- if ( ds->replace_source) ds->replace_source (NULL);
proc_cancel_all_transformations (ds);
}
dict_destroy (ds->dict);
ds->dict = dict;
- if ( ds->replace_dict) ds->replace_dict (dict);
proc_set_active_file_data (ds, source);
}
{
casereader_destroy (ds->source);
ds->source = reader;
- if (ds->replace_source) ds->replace_source (reader);
caseinit_clear (ds->caseinit);
caseinit_mark_as_preinited (ds->caseinit, ds->dict);
return ds->source != NULL;
}
+/* Returns the active file data source from DS, or a null pointer
+ if DS has no data source, and removes it from DS. */
+struct casereader *
+proc_extract_active_file_data (struct dataset *ds)
+{
+ struct casereader *reader = ds->source;
+ ds->source = NULL;
+
+ return reader;
+}
+
/* Checks whether DS has a corrupted active file. If so,
discards it and returns false. If not, returns true without
doing anything. */
static void
add_case_limit_trns (struct dataset *ds)
{
- size_t case_limit = dict_get_case_limit (ds->dict);
+ casenumber case_limit = dict_get_case_limit (ds->dict);
if (case_limit != 0)
{
- size_t *cases_remaining = xmalloc (sizeof *cases_remaining);
+ casenumber *cases_remaining = xmalloc (sizeof *cases_remaining);
*cases_remaining = case_limit;
add_transformation (ds, case_limit_trns_proc, case_limit_trns_free,
cases_remaining);