}
+\f
+
+struct consolodator
+{
+ const struct variable *key;
+ const struct variable *weight;
+ double cc;
+ double prev_cc;
+
+ casenumber n;
+ struct casereader *clone;
+ struct caseproto *proto;
+};
+
+static bool
+uniquify (const struct ccase *c, void *aux)
+{
+ struct consolodator *cdr = aux;
+ const union value *current_value = case_data (c, cdr->key);
+ const int key_width = var_get_width (cdr->key);
+ const double weight = cdr->weight ? case_data (c, cdr->weight)->f : 1.0;
+ const struct ccase *next_case = casereader_peek (cdr->clone, cdr->n + 1);
+
+ cdr->n ++;
+ cdr->cc += weight;
+
+ if ( NULL == next_case)
+ goto end;
+
+ if ( 0 != value_compare_3way (case_data (next_case, cdr->key),
+ current_value, key_width))
+ goto end;
+
+
+ return false;
+
+ end:
+ cdr->prev_cc = cdr->cc;
+ cdr->cc = 0;
+ return true;
+}
+
+
+
+static struct ccase *
+consolodate_weight (struct ccase *input, void *aux)
+{
+ struct consolodator *cdr = aux;
+ struct ccase *c;
+
+ c = case_unshare_and_resize (input, cdr->proto);
+
+ if (cdr->weight)
+ case_data_rw (c, cdr->weight)->f = cdr->prev_cc;
+ else
+ case_data_rw_idx (c, caseproto_get_n_widths (cdr->proto) - 1)->f = cdr->prev_cc;
+
+ return c;
+}
+
+
+static bool
+uniquify_destroy (void *aux)
+{
+ struct consolodator *cdr = aux;
+
+ casereader_destroy (cdr->clone);
+ free (cdr);
+
+ return true;
+}
+
+
+
+/* Returns a new casereader which is based upon INPUT, but which contains a maximum
+ of one case for each distinct value of KEY.
+ If WEIGHT is non-null, then the new casereader's values for this variable
+ will be the sum of all values matching KEY.
+ IF WEIGHT is null, then the new casereader will have an additional numeric
+ value appended, which will contain the total number of cases containing
+ KEY.
+ INPUT must be sorted on KEY
+*/
+struct casereader *
+casereader_create_distinct (struct casereader *input,
+ const struct variable *key,
+ const struct variable *weight)
+{
+ struct casereader *u ;
+ struct casereader *ud ;
+ struct caseproto *output_proto = casereader_get_proto (input);
+
+ struct consolodator *cdr = xmalloc (sizeof (*cdr));
+ cdr->n = 0;
+ cdr->key = key;
+ cdr->weight = weight;
+ cdr->cc = 0;
+ cdr->clone = casereader_clone (input);
+
+ if ( NULL == cdr->weight )
+ output_proto = caseproto_add_width (output_proto, 0);
+
+ cdr->proto = output_proto;
+
+ u = casereader_create_filter_func (input, uniquify,
+ NULL, cdr, NULL);
+
+ ud = casereader_create_translator (u,
+ output_proto,
+ consolodate_weight,
+ uniquify_destroy,
+ cdr);
+}
+