+
+static struct ccase *
+consolodate_weight (struct ccase *input, void *aux)
+{
+ struct consolidator *cdr = aux;
+ struct ccase *c;
+
+ if (cdr->weight)
+ {
+ c = case_unshare (input);
+ case_data_rw (c, cdr->weight)->f = cdr->prev_cc;
+ }
+ else
+ {
+ c = case_unshare_and_resize (input, cdr->proto);
+ case_data_rw_idx (c, caseproto_get_n_widths (cdr->proto) - 1)->f = cdr->prev_cc;
+ }
+
+ return c;
+}
+
+
+static bool
+uniquify_destroy (void *aux)
+{
+ struct consolidator *cdr = aux;
+
+ casereader_destroy (cdr->clone);
+ caseproto_unref (cdr->proto);
+ free (cdr);
+
+ return true;
+}
+
+
+
+/* Returns a new casereader which is based upon INPUT, but which contains a maximum
+ of one case for each distinct value of KEY.
+ If WEIGHT is non-null, then the new casereader's values for this variable
+ will be the sum of all values matching KEY.
+ IF WEIGHT is null, then the new casereader will have an additional numeric
+ value appended, which will contain the total number of cases containing
+ KEY.
+ INPUT must be sorted on KEY
+*/
+struct casereader *
+casereader_create_distinct (struct casereader *input,
+ const struct variable *key,
+ const struct variable *weight)
+{
+ struct casereader *u ;
+ struct casereader *ud ;
+ struct caseproto *output_proto = caseproto_ref (casereader_get_proto (input));
+
+ struct consolidator *cdr = xmalloc (sizeof (*cdr));
+ cdr->n = 0;
+ cdr->key = key;
+ cdr->weight = weight;
+ cdr->cc = 0;
+ cdr->clone = casereader_clone (input);
+ cdr->direction = 0;
+
+ if ( NULL == cdr->weight )
+ output_proto = caseproto_add_width (output_proto, 0);
+
+ cdr->proto = output_proto;
+
+ u = casereader_create_filter_func (input, uniquify,
+ NULL, cdr, NULL);
+
+ ud = casereader_create_translator (u,
+ output_proto,
+ consolodate_weight,
+ uniquify_destroy,
+ cdr);
+
+ return ud;
+}
+