/* PSPP - a program for statistical analysis.
- Copyright (C) 2007, 2009 Free Software Foundation, Inc.
+ Copyright (C) 2007, 2009, 2011 Free Software Foundation, Inc.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
along with this program. If not, see <http://www.gnu.org/licenses/>. */
#include <config.h>
-#include <data/val-type.h>
-#include <data/casereader.h>
+
#include <stdlib.h>
-#include <data/casereader-provider.h>
-#include <libpspp/taint.h>
+#include "data/casereader-provider.h"
+#include "data/casereader.h"
+#include "data/val-type.h"
+#include "data/variable.h"
+#include "libpspp/taint.h"
-#include "xalloc.h"
+#include "gl/xalloc.h"
/* Casereader that applies a user-supplied function to translate
each case into another in an arbitrary fashion. */
INPUT and auxiliary data AUX. TRANSLATE must destroy its
input case.
+ TRANSLATE may be stateful, that is, the output for a given
+ case may depend on previous cases. If TRANSLATE is stateless,
+ then you may want to use casereader_translate_stateless
+ instead, since it sometimes performs better.
+
The cases returned by TRANSLATE must match OUTPUT_PROTO.
When the translating casereader is destroyed, DESTROY will be
NULL,
NULL,
};
+\f
+/* Casereader that applies a user-supplied function to translate
+ each case into another in a stateless fashion. */
+
+/* A statelessly translating casereader. */
+struct casereader_stateless_translator
+ {
+ struct casereader *subreader; /* Source of input cases. */
+
+ casenumber case_offset;
+ struct ccase *(*translate) (struct ccase *input, casenumber,
+ const void *aux);
+ bool (*destroy) (void *aux);
+ void *aux;
+ };
+
+static const struct casereader_random_class
+casereader_stateless_translator_class;
+
+/* Creates and returns a new casereader whose cases are produced by reading
+ from SUBREADER and passing through the TRANSLATE function. TRANSLATE must
+ takes ownership of its input case and returns a translated case, populating
+ the translated case based on INPUT and auxiliary data AUX.
+ TRANSLATE must be stateless, that is, the output for a given case must not
+ depend on previous cases. This is because cases may be retrieved in
+ arbitrary order, and some cases may be retrieved multiple times, and some
+ cases may be skipped and never retrieved at all. If TRANSLATE is stateful,
+ use casereader_create_translator instead.
+
+ The casenumber argument to the TRANSLATE function is the absolute case
+ number in SUBREADER, that is, 0 when the first case in SUBREADER is being
+ translated, 1 when the second case is being translated, and so on.
+
+ The cases returned by TRANSLATE must match OUTPUT_PROTO.
+
+ When the stateless translating casereader is destroyed, DESTROY will be
+ called to allow any auxiliary data maintained by TRANSLATE to be freed.
+
+ After this function is called, SUBREADER must not ever again be referenced
+ directly. It will be destroyed automatically when the translating
+ casereader is destroyed. */
+struct casereader *
+casereader_translate_stateless (
+ struct casereader *subreader,
+ const struct caseproto *output_proto,
+ struct ccase *(*translate) (struct ccase *input, casenumber,
+ const void *aux),
+ bool (*destroy) (void *aux),
+ void *aux)
+{
+ struct casereader_stateless_translator *cst = xmalloc (sizeof *cst);
+ struct casereader *reader;
+ cst->subreader = casereader_rename (subreader);
+ cst->translate = translate;
+ cst->destroy = destroy;
+ cst->aux = aux;
+ reader = casereader_create_random (
+ output_proto, casereader_get_case_cnt (cst->subreader),
+ &casereader_stateless_translator_class, cst);
+ taint_propagate (casereader_get_taint (cst->subreader),
+ casereader_get_taint (reader));
+ return reader;
+}
+
+/* Internal read function for stateless translating casereader. */
+static struct ccase *
+casereader_stateless_translator_read (struct casereader *reader UNUSED,
+ void *cst_, casenumber idx)
+{
+ struct casereader_stateless_translator *cst = cst_;
+ struct ccase *tmp = casereader_peek (cst->subreader, idx);
+ if (tmp != NULL)
+ tmp = cst->translate (tmp, cst->case_offset + idx, cst->aux);
+ return tmp;
+}
+
+/* Internal destroy function for translating casereader. */
+static void
+casereader_stateless_translator_destroy (struct casereader *reader UNUSED,
+ void *cst_)
+{
+ struct casereader_stateless_translator *cst = cst_;
+ casereader_destroy (cst->subreader);
+ cst->destroy (cst->aux);
+ free (cst);
+}
+
+static void
+casereader_stateless_translator_advance (struct casereader *reader UNUSED,
+ void *cst_, casenumber cnt)
+{
+ struct casereader_stateless_translator *cst = cst_;
+ cst->case_offset += casereader_advance (cst->subreader, cnt);
+}
+
+/* Casereader class for stateless translating casereader. */
+static const struct casereader_random_class
+casereader_stateless_translator_class =
+ {
+ casereader_stateless_translator_read,
+ casereader_stateless_translator_destroy,
+ casereader_stateless_translator_advance,
+ };
\f
struct casereader_append_numeric
If DISTINCT_CALLBACK is non-null, then it will be called exactly
once for every case containing a distinct value of V. AUX is
- an auxilliary pointer passed to DISTINCT_CALLBACK.
+ an auxiliary pointer passed to DISTINCT_CALLBACK.
After this function is called, SUBREADER must not ever again
be referenced directly. It will be destroyed automatically
\f
-struct consolodator
+struct consolidator
{
const struct variable *key;
const struct variable *weight;
casenumber n;
struct casereader *clone;
struct caseproto *proto;
+ int direction;
};
static bool
uniquify (const struct ccase *c, void *aux)
{
- struct consolodator *cdr = aux;
+ struct consolidator *cdr = aux;
const union value *current_value = case_data (c, cdr->key);
const int key_width = var_get_width (cdr->key);
const double weight = cdr->weight ? case_data (c, cdr->weight)->f : 1.0;
- const struct ccase *next_case = casereader_peek (cdr->clone, cdr->n + 1);
+ struct ccase *next_case = casereader_peek (cdr->clone, cdr->n + 1);
+ int dir = 0;
cdr->n ++;
cdr->cc += weight;
if ( NULL == next_case)
goto end;
-
- if ( 0 != value_compare_3way (case_data (next_case, cdr->key),
- current_value, key_width))
- goto end;
-
+ dir = value_compare_3way (case_data (next_case, cdr->key),
+ current_value, key_width);
+ case_unref (next_case);
+ if ( dir != 0 )
+ {
+ /* Insist that the data are sorted */
+ assert (cdr->direction == 0 || dir == cdr->direction);
+ cdr->direction = dir;
+ goto end;
+ }
+
return false;
end:
static struct ccase *
consolodate_weight (struct ccase *input, void *aux)
{
- struct consolodator *cdr = aux;
+ struct consolidator *cdr = aux;
struct ccase *c;
- c = case_unshare_and_resize (input, cdr->proto);
-
if (cdr->weight)
- case_data_rw (c, cdr->weight)->f = cdr->prev_cc;
+ {
+ c = case_unshare (input);
+ case_data_rw (c, cdr->weight)->f = cdr->prev_cc;
+ }
else
- case_data_rw_idx (c, caseproto_get_n_widths (cdr->proto) - 1)->f = cdr->prev_cc;
+ {
+ c = case_unshare_and_resize (input, cdr->proto);
+ case_data_rw_idx (c, caseproto_get_n_widths (cdr->proto) - 1)->f = cdr->prev_cc;
+ }
return c;
}
static bool
uniquify_destroy (void *aux)
{
- struct consolodator *cdr = aux;
+ struct consolidator *cdr = aux;
casereader_destroy (cdr->clone);
+ caseproto_unref (cdr->proto);
free (cdr);
return true;
-/* Returns a new casereader which is based upon INPUT, but which contains a maximum
+/* Returns a new casereader which is based upon INPUT, but which contains a maximum
of one case for each distinct value of KEY.
If WEIGHT is non-null, then the new casereader's values for this variable
will be the sum of all values matching KEY.
{
struct casereader *u ;
struct casereader *ud ;
- struct caseproto *output_proto = casereader_get_proto (input);
+ struct caseproto *output_proto = caseproto_ref (casereader_get_proto (input));
- struct consolodator *cdr = xmalloc (sizeof (*cdr));
+ struct consolidator *cdr = xmalloc (sizeof (*cdr));
cdr->n = 0;
cdr->key = key;
cdr->weight = weight;
cdr->cc = 0;
cdr->clone = casereader_clone (input);
+ cdr->direction = 0;
if ( NULL == cdr->weight )
output_proto = caseproto_add_width (output_proto, 0);
consolodate_weight,
uniquify_destroy,
cdr);
+
+ return ud;
}