+
+static struct ccase *
+car_translate (struct ccase *input, void *car_)
+{
+ struct casereader_append_rank *car = car_;
+
+ const double value = case_num (input, car->var);
+
+ if (car->prev_value != SYSMIS)
+ {
+ if (car->err && value < car->prev_value)
+ *car->err |= RANK_ERR_UNSORTED;
+ }
+
+ if (car->n_common == 1)
+ {
+ double vxx = SYSMIS;
+ casenumber k = 0;
+ double weight = 1.0;
+ if (car->weight)
+ {
+ weight = case_num (input, car->weight);
+ if (car->err && weight < 0)
+ *car->err |= RANK_ERR_NEGATIVE_WEIGHT;
+ }
+
+ do
+ {
+ struct ccase *c = casereader_peek (car->clone, car->n + ++k);
+ if (c == NULL)
+ break;
+ vxx = case_num (c, car->var);
+
+ if (vxx == value)
+ {
+ if (car->weight)
+ {
+ double w = case_num (c, car->weight);
+
+ if (car->err && w < 0)
+ *car->err |= RANK_ERR_NEGATIVE_WEIGHT;
+
+ weight += w;
+ }
+ else
+ weight += 1.0;
+ car->n_common++;
+ }
+ case_unref (c);
+ }
+ while (vxx == value);
+ car->mean_rank = car->cc + (weight + 1) / 2.0;
+ car->cc += weight;
+
+ if (car->distinct)
+ car->distinct (value, car->n_common, weight, car->aux);
+ }
+ else
+ car->n_common--;
+
+ car->n++;
+
+ input = case_unshare_and_resize (input, car->proto);
+ *case_num_rw_idx (input, caseproto_get_n_widths (car->proto) - 1)
+ = car->mean_rank;
+ car->prev_value = value;
+ return input;
+}
+
+
+\f
+
+struct consolidator
+{
+ const struct variable *key;
+ const struct variable *weight;
+ double cc;
+ double prev_cc;
+
+ casenumber n;
+ struct casereader *clone;
+ struct caseproto *proto;
+ int direction;
+};
+
+static bool
+uniquify (const struct ccase *c, void *aux)
+{
+ struct consolidator *cdr = aux;
+ const union value *current_value = case_data (c, cdr->key);
+ const int key_width = var_get_width (cdr->key);
+ const double weight = cdr->weight ? case_num (c, cdr->weight) : 1.0;
+ struct ccase *next_case = casereader_peek (cdr->clone, cdr->n + 1);
+ int dir = 0;
+
+ cdr->n ++;
+ cdr->cc += weight;
+
+ if (NULL == next_case)
+ goto end;
+
+ dir = value_compare_3way (case_data (next_case, cdr->key),
+ current_value, key_width);
+ if (dir > 0)
+ dir = 1;
+ if (dir < 0)
+ dir = -1;
+
+ case_unref (next_case);
+ if (dir != 0)
+ {
+ /* Insist that the data are sorted */
+ assert (cdr->direction == 0 || dir == cdr->direction);
+ cdr->direction = dir;
+ goto end;
+ }
+
+ return false;
+
+ end:
+ cdr->prev_cc = cdr->cc;
+ cdr->cc = 0;
+ return true;
+}
+
+
+
+static struct ccase *
+consolodate_weight (struct ccase *input, void *aux)
+{
+ struct consolidator *cdr = aux;
+ struct ccase *c;
+
+ if (cdr->weight)
+ {
+ c = case_unshare (input);
+ *case_num_rw (c, cdr->weight) = cdr->prev_cc;
+ }
+ else
+ {
+ c = case_unshare_and_resize (input, cdr->proto);
+ *case_num_rw_idx (c, caseproto_get_n_widths (cdr->proto) - 1) = cdr->prev_cc;
+ }
+
+ return c;
+}
+
+
+static bool
+uniquify_destroy (void *aux)
+{
+ struct consolidator *cdr = aux;
+
+ casereader_destroy (cdr->clone);
+ caseproto_unref (cdr->proto);
+ free (cdr);
+
+ return true;
+}
+
+
+
+/* Returns a new casereader which is based upon INPUT, but which contains a maximum
+ of one case for each distinct value of KEY.
+ If WEIGHT is non-null, then the new casereader's values for this variable
+ will be the sum of all values matching KEY.
+ IF WEIGHT is null, then the new casereader will have an additional numeric
+ value appended, which will contain the total number of cases containing
+ KEY.
+ INPUT must be sorted on KEY
+*/
+struct casereader *
+casereader_create_distinct (struct casereader *input,
+ const struct variable *key,
+ const struct variable *weight)
+{
+ struct casereader *u ;
+ struct casereader *ud ;
+ struct caseproto *output_proto = caseproto_ref (casereader_get_proto (input));
+
+ struct consolidator *cdr = xmalloc (sizeof (*cdr));
+ cdr->n = 0;
+ cdr->key = key;
+ cdr->weight = weight;
+ cdr->cc = 0;
+ cdr->clone = casereader_clone (input);
+ cdr->direction = 0;
+
+ if (NULL == cdr->weight)
+ output_proto = caseproto_add_width (output_proto, 0);
+
+ cdr->proto = output_proto;
+
+ u = casereader_create_filter_func (input, uniquify,
+ NULL, cdr, NULL);
+
+ ud = casereader_create_translator (u,
+ output_proto,
+ consolodate_weight,
+ uniquify_destroy,
+ cdr);
+
+ return ud;
+}
+