#include <data/casereader.h>
#include <stdlib.h>
+#include <data/variable.h>
#include <data/casereader-provider.h>
#include <libpspp/taint.h>
\f
-struct consolodator
+struct consolidator
{
const struct variable *key;
const struct variable *weight;
casenumber n;
struct casereader *clone;
struct caseproto *proto;
+ int direction;
};
static bool
uniquify (const struct ccase *c, void *aux)
{
- struct consolodator *cdr = aux;
+ struct consolidator *cdr = aux;
const union value *current_value = case_data (c, cdr->key);
const int key_width = var_get_width (cdr->key);
const double weight = cdr->weight ? case_data (c, cdr->weight)->f : 1.0;
const struct ccase *next_case = casereader_peek (cdr->clone, cdr->n + 1);
+ int dir = 0;
cdr->n ++;
cdr->cc += weight;
if ( NULL == next_case)
goto end;
-
- if ( 0 != value_compare_3way (case_data (next_case, cdr->key),
- current_value, key_width))
- goto end;
-
+
+ dir = value_compare_3way (case_data (next_case, cdr->key),
+ current_value, key_width);
+ if ( dir != 0 )
+ {
+ /* Insist that the data are sorted */
+ assert (cdr->direction == 0 || dir == cdr->direction);
+ cdr->direction = dir;
+ goto end;
+ }
return false;
static struct ccase *
consolodate_weight (struct ccase *input, void *aux)
{
- struct consolodator *cdr = aux;
+ struct consolidator *cdr = aux;
struct ccase *c;
- c = case_unshare_and_resize (input, cdr->proto);
-
if (cdr->weight)
- case_data_rw (c, cdr->weight)->f = cdr->prev_cc;
+ {
+ c = case_unshare (input);
+ case_data_rw (c, cdr->weight)->f = cdr->prev_cc;
+ }
else
- case_data_rw_idx (c, caseproto_get_n_widths (cdr->proto) - 1)->f = cdr->prev_cc;
+ {
+ c = case_unshare_and_resize (input, cdr->proto);
+ case_data_rw_idx (c, caseproto_get_n_widths (cdr->proto) - 1)->f = cdr->prev_cc;
+ }
return c;
}
static bool
uniquify_destroy (void *aux)
{
- struct consolodator *cdr = aux;
+ struct consolidator *cdr = aux;
casereader_destroy (cdr->clone);
+ caseproto_unref (cdr->proto);
free (cdr);
return true;
{
struct casereader *u ;
struct casereader *ud ;
- struct caseproto *output_proto = casereader_get_proto (input);
+ struct caseproto *output_proto = caseproto_ref (casereader_get_proto (input));
- struct consolodator *cdr = xmalloc (sizeof (*cdr));
+ struct consolidator *cdr = xmalloc (sizeof (*cdr));
cdr->n = 0;
cdr->key = key;
cdr->weight = weight;
cdr->cc = 0;
cdr->clone = casereader_clone (input);
+ cdr->direction = 0;
if ( NULL == cdr->weight )
output_proto = caseproto_add_width (output_proto, 0);
consolodate_weight,
uniquify_destroy,
cdr);
+
+ return ud;
}