+
+\f
+
+struct casereader_append_rank
+{
+ struct casereader *clone;
+ casenumber n;
+ const struct variable *var;
+ const struct variable *weight;
+ struct caseproto *proto;
+ casenumber n_common;
+ double mean_rank;
+ double cc;
+ distinct_func *distinct;
+ void *aux;
+ enum rank_error *err;
+ double prev_value;
+};
+
+static bool car_destroy (void *car_);
+
+static struct ccase *car_translate (struct ccase *input, void *car_);
+
+/* Creates and returns a new casereader whose cases are produced
+ by reading from SUBREADER and appending an additional value,
+ which is the rank of the observation. W is the weight variable
+ of the dictionary containing V, or NULL if there is no weight
+ variable.
+
+ The following preconditions must be met:
+
+ 1. SUBREADER must be sorted on V.
+
+ 2. The weight variables, must be non-negative.
+
+ If either of these preconditions are not satisfied, then the rank
+ variables may not be correct. In this case, if ERR is non-null,
+ it will be set according to the erroneous conditions encountered.
+
+ If DISTINCT_CALLBACK is non-null, then it will be called exactly
+ once for every case containing a distinct value of V. AUX is
+ an auxilliary pointer passed to DISTINCT_CALLBACK.
+
+ After this function is called, SUBREADER must not ever again
+ be referenced directly. It will be destroyed automatically
+ when the translating casereader is destroyed. */
+struct casereader *
+casereader_create_append_rank (struct casereader *subreader,
+ const struct variable *v,
+ const struct variable *w,
+ enum rank_error *err,
+ distinct_func *distinct_callback,
+ void *aux
+ )
+{
+ struct casereader_append_rank *car = xmalloc (sizeof *car);
+ car->proto = caseproto_ref (casereader_get_proto (subreader));
+ car->proto = caseproto_add_width (car->proto, 0);
+ car->weight = w;
+ car->var = v;
+ car->n = 0;
+ car->n_common = 1;
+ car->cc = 0.0;
+ car->clone = casereader_clone (subreader);
+ car->distinct = distinct_callback;
+ car->aux = aux;
+ car->err = err;
+ car->prev_value = SYSMIS;
+
+ return casereader_create_translator (subreader, car->proto,
+ car_translate, car_destroy, car);
+}
+
+
+static bool
+car_destroy (void *car_)
+{
+ struct casereader_append_rank *car = car_;
+ casereader_destroy (car->clone);
+ caseproto_unref (car->proto);
+ free (car);
+ return true;
+}
+
+static struct ccase *
+car_translate (struct ccase *input, void *car_)
+{
+ struct casereader_append_rank *car = car_;
+
+ const double value = case_data (input, car->var)->f;
+
+ if ( car->prev_value != SYSMIS)
+ {
+ if (car->err && value < car->prev_value)
+ *car->err |= RANK_ERR_UNSORTED;
+ }
+
+ if ( car->n_common == 1)
+ {
+ double vxx = SYSMIS;
+ casenumber k = 0;
+ double weight = 1.0;
+ if (car->weight)
+ {
+ weight = case_data (input, car->weight)->f;
+ if ( car->err && weight < 0 )
+ *car->err |= RANK_ERR_NEGATIVE_WEIGHT;
+ }
+
+ do
+ {
+ struct ccase *c = casereader_peek (car->clone, car->n + ++k);
+ if (c == NULL)
+ break;
+ vxx = case_data (c, car->var)->f;
+
+ if ( vxx == value)
+ {
+ if (car->weight)
+ {
+ double w = case_data (c, car->weight)->f;
+
+ if ( car->err && w < 0 )
+ *car->err |= RANK_ERR_NEGATIVE_WEIGHT;
+
+ weight += w;
+ }
+ else
+ weight += 1.0;
+ car->n_common++;
+ }
+ case_unref (c);
+ }
+ while (vxx == value);
+ car->mean_rank = car->cc + (weight + 1) / 2.0;
+ car->cc += weight;
+
+ if (car->distinct)
+ car->distinct (value, car->n_common, weight, car->aux);
+ }
+ else
+ car->n_common--;
+
+ car->n++;
+
+ input = case_unshare_and_resize (input, car->proto);
+ case_data_rw_idx (input, caseproto_get_n_widths (car->proto) - 1)->f
+ = car->mean_rank;
+ car->prev_value = value;
+ return input;
+}
+
+