Corrected spelling of "consolidate".
[pspp-builds.git] / src / data / casereader-translator.c
index d55e18e50292282a6e7c6d95233802fb42c920ed..a2963980253790dd5f398cf20fd44420350c26d3 100644 (file)
@@ -19,6 +19,7 @@
 #include <data/casereader.h>
 #include <stdlib.h>
 
 #include <data/casereader.h>
 #include <stdlib.h>
 
+#include <data/variable.h>
 #include <data/casereader-provider.h>
 #include <libpspp/taint.h>
 
 #include <data/casereader-provider.h>
 #include <libpspp/taint.h>
 
@@ -41,9 +42,11 @@ static const struct casereader_class casereader_translator_class;
 
 /* Creates and returns a new casereader whose cases are produced
    by reading from SUBREADER and passing through TRANSLATE, which
 
 /* Creates and returns a new casereader whose cases are produced
    by reading from SUBREADER and passing through TRANSLATE, which
-   must return the translated case, with OUTPUT_VALUE_CNT values,
-   and populate it based on INPUT and auxiliary data AUX.
-   TRANSLATE must destroy its input case.
+   must return the translated case, and populate it based on
+   INPUT and auxiliary data AUX.  TRANSLATE must destroy its
+   input case.
+
+   The cases returned by TRANSLATE must match OUTPUT_PROTO.
 
    When the translating casereader is destroyed, DESTROY will be
    called to allow any state maintained by TRANSLATE to be freed.
 
    When the translating casereader is destroyed, DESTROY will be
    called to allow any state maintained by TRANSLATE to be freed.
@@ -53,7 +56,7 @@ static const struct casereader_class casereader_translator_class;
    when the translating casereader is destroyed. */
 struct casereader *
 casereader_create_translator (struct casereader *subreader,
    when the translating casereader is destroyed. */
 struct casereader *
 casereader_create_translator (struct casereader *subreader,
-                              size_t output_value_cnt,
+                              const struct caseproto *output_proto,
                               struct ccase *(*translate) (struct ccase *input,
                                                           void *aux),
                               bool (*destroy) (void *aux),
                               struct ccase *(*translate) (struct ccase *input,
                                                           void *aux),
                               bool (*destroy) (void *aux),
@@ -66,7 +69,7 @@ casereader_create_translator (struct casereader *subreader,
   ct->destroy = destroy;
   ct->aux = aux;
   reader = casereader_create_sequential (
   ct->destroy = destroy;
   ct->aux = aux;
   reader = casereader_create_sequential (
-    NULL, output_value_cnt, casereader_get_case_cnt (ct->subreader),
+    NULL, output_proto, casereader_get_case_cnt (ct->subreader),
     &casereader_translator_class, ct);
   taint_propagate (casereader_get_taint (ct->subreader),
                    casereader_get_taint (reader));
     &casereader_translator_class, ct);
   taint_propagate (casereader_get_taint (ct->subreader),
                    casereader_get_taint (reader));
@@ -108,7 +111,7 @@ static const struct casereader_class casereader_translator_class =
 
 struct casereader_append_numeric
 {
 
 struct casereader_append_numeric
 {
-  int value_ofs;
+  struct caseproto *proto;
   casenumber n;
   new_value_func *func;
   void *aux;
   casenumber n;
   new_value_func *func;
   void *aux;
@@ -135,12 +138,13 @@ casereader_create_append_numeric (struct casereader *subreader,
                                  void (*destroy) (void *aux))
 {
   struct casereader_append_numeric *can = xmalloc (sizeof *can);
                                  void (*destroy) (void *aux))
 {
   struct casereader_append_numeric *can = xmalloc (sizeof *can);
-  can->value_ofs = casereader_get_value_cnt (subreader);
+  can->proto = caseproto_ref (casereader_get_proto (subreader));
+  can->proto = caseproto_add_width (can->proto, 0);
   can->n = 0;
   can->aux = aux;
   can->func = func;
   can->destroy = destroy;
   can->n = 0;
   can->aux = aux;
   can->func = func;
   can->destroy = destroy;
-  return casereader_create_translator (subreader, can->value_ofs + 1,
+  return casereader_create_translator (subreader, can->proto,
                                        can_translate, can_destroy, can);
 }
 
                                        can_translate, can_destroy, can);
 }
 
@@ -150,8 +154,8 @@ can_translate (struct ccase *c, void *can_)
 {
   struct casereader_append_numeric *can = can_;
   double new_value = can->func (c, can->n++, can->aux);
 {
   struct casereader_append_numeric *can = can_;
   double new_value = can->func (c, can->n++, can->aux);
-  c = case_unshare_and_resize (c, can->value_ofs + 1);
-  case_data_rw_idx (c, can->value_ofs)->f = new_value;
+  c = case_unshare_and_resize (c, can->proto);
+  case_data_rw_idx (c, caseproto_get_n_widths (can->proto) - 1)->f = new_value;
   return c;
 }
 
   return c;
 }
 
@@ -161,6 +165,7 @@ can_destroy (void *can_)
   struct casereader_append_numeric *can = can_;
   if (can->destroy)
     can->destroy (can->aux);
   struct casereader_append_numeric *can = can_;
   if (can->destroy)
     can->destroy (can->aux);
+  caseproto_unref (can->proto);
   free (can);
   return true;
 }
   free (can);
   return true;
 }
@@ -211,7 +216,7 @@ struct casereader_append_rank
   casenumber n;
   const struct variable *var;
   const struct variable *weight;
   casenumber n;
   const struct variable *var;
   const struct variable *weight;
-  int value_ofs;
+  struct caseproto *proto;
   casenumber n_common;
   double mean_rank;
   double cc;
   casenumber n_common;
   double mean_rank;
   double cc;
@@ -258,7 +263,8 @@ casereader_create_append_rank (struct casereader *subreader,
                               )
 {
   struct casereader_append_rank *car = xmalloc (sizeof *car);
                               )
 {
   struct casereader_append_rank *car = xmalloc (sizeof *car);
-  car->value_ofs = casereader_get_value_cnt (subreader);
+  car->proto = caseproto_ref (casereader_get_proto (subreader));
+  car->proto = caseproto_add_width (car->proto, 0);
   car->weight = w;
   car->var = v;
   car->n = 0;
   car->weight = w;
   car->var = v;
   car->n = 0;
@@ -270,7 +276,7 @@ casereader_create_append_rank (struct casereader *subreader,
   car->err = err;
   car->prev_value = SYSMIS;
 
   car->err = err;
   car->prev_value = SYSMIS;
 
-  return casereader_create_translator (subreader, car->value_ofs + 1,
+  return casereader_create_translator (subreader, car->proto,
                                        car_translate, car_destroy, car);
 }
 
                                        car_translate, car_destroy, car);
 }
 
@@ -280,6 +286,7 @@ car_destroy (void *car_)
 {
   struct casereader_append_rank *car = car_;
   casereader_destroy (car->clone);
 {
   struct casereader_append_rank *car = car_;
   casereader_destroy (car->clone);
+  caseproto_unref (car->proto);
   free (car);
   return true;
 }
   free (car);
   return true;
 }
@@ -345,10 +352,133 @@ car_translate (struct ccase *input, void *car_)
 
   car->n++;
 
 
   car->n++;
 
-  input = case_unshare_and_resize (input, car->value_ofs + 1);
-  case_data_rw_idx (input, car->value_ofs)->f = car->mean_rank ;
+  input = case_unshare_and_resize (input, car->proto);
+  case_data_rw_idx (input, caseproto_get_n_widths (car->proto) - 1)->f
+    = car->mean_rank;
   car->prev_value = value;
   return input;
 }
 
 
   car->prev_value = value;
   return input;
 }
 
 
+\f
+
+struct consolidator
+{
+  const struct variable *key;
+  const struct variable *weight;
+  double cc;
+  double prev_cc;
+
+  casenumber n;
+  struct casereader *clone;
+  struct caseproto *proto;
+  int direction;
+};
+
+static bool
+uniquify (const struct ccase *c, void *aux)
+{
+  struct consolidator *cdr = aux;
+  const union value *current_value = case_data (c, cdr->key);
+  const int key_width = var_get_width (cdr->key);
+  const double weight = cdr->weight ? case_data (c, cdr->weight)->f : 1.0;
+  const struct ccase *next_case = casereader_peek (cdr->clone, cdr->n + 1);
+  int dir = 0;
+
+  cdr->n ++;
+  cdr->cc += weight;
+
+  if ( NULL == next_case)
+      goto end;
+  
+  dir = value_compare_3way (case_data (next_case, cdr->key),
+                           current_value, key_width);
+  if ( dir != 0 )
+    {
+      /* Insist that the data are sorted */
+      assert (cdr->direction == 0 || dir == cdr->direction);
+      cdr->direction = dir;
+      goto end;
+    }
+  
+  return false;
+
+ end:
+  cdr->prev_cc = cdr->cc;
+  cdr->cc = 0;
+  return true;
+}
+
+
+
+static struct ccase *
+consolodate_weight (struct ccase *input, void *aux)
+{
+  struct consolidator *cdr = aux;
+  struct ccase *c;
+
+  c = case_unshare_and_resize (input, cdr->proto);
+
+  if (cdr->weight)
+    case_data_rw (c, cdr->weight)->f = cdr->prev_cc;
+  else
+    case_data_rw_idx (c, caseproto_get_n_widths (cdr->proto) - 1)->f = cdr->prev_cc;    
+
+  return c;
+}
+
+
+static bool
+uniquify_destroy (void *aux)
+{
+  struct consolidator *cdr = aux;
+
+  casereader_destroy (cdr->clone);
+  free (cdr);
+
+  return true;
+}
+
+
+
+/* Returns a new casereader which is based upon INPUT, but which contains a maximum 
+   of one case for each distinct value of KEY.
+   If WEIGHT is non-null, then the new casereader's values for this variable
+   will be the sum of all values matching KEY.
+   IF WEIGHT is null, then the new casereader will have an additional numeric
+   value appended, which will contain the total number of cases containing
+   KEY.
+   INPUT must be sorted on KEY
+*/
+struct casereader *
+casereader_create_distinct (struct casereader *input,
+                                              const struct variable *key,
+                                              const struct variable *weight)
+{
+  struct casereader *u ;
+  struct casereader *ud ;
+  const struct caseproto *output_proto = casereader_get_proto (input);
+
+  struct consolidator *cdr = xmalloc (sizeof (*cdr));
+  cdr->n = 0;
+  cdr->key = key;
+  cdr->weight = weight;
+  cdr->cc = 0;
+  cdr->clone = casereader_clone (input);
+  cdr->direction = 0;
+
+  if ( NULL == cdr->weight )
+    output_proto = caseproto_add_width (output_proto, 0);
+
+  cdr->proto = output_proto;
+
+  u = casereader_create_filter_func (input, uniquify,
+                                    NULL, cdr, NULL);
+
+  ud = casereader_create_translator (u,
+                                    output_proto,
+                                    consolodate_weight,
+                                    uniquify_destroy,
+                                    cdr);
+}
+