Fix some typos (found by codespell)
[pspp] / src / data / casereader-translator.c
index d55e18e50292282a6e7c6d95233802fb42c920ed..d3388aa68b4d98ca3b69052338a412651aa9d5f3 100644 (file)
@@ -1,5 +1,5 @@
 /* PSPP - a program for statistical analysis.
-   Copyright (C) 2007, 2009 Free Software Foundation, Inc.
+   Copyright (C) 2007, 2009, 2011 Free Software Foundation, Inc.
 
    This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
 
 #include <config.h>
-#include <data/val-type.h>
-#include <data/casereader.h>
+
 #include <stdlib.h>
 
-#include <data/casereader-provider.h>
-#include <libpspp/taint.h>
+#include "data/casereader-provider.h"
+#include "data/casereader.h"
+#include "data/val-type.h"
+#include "data/variable.h"
+#include "libpspp/taint.h"
 
-#include "xalloc.h"
+#include "gl/xalloc.h"
 
 /* Casereader that applies a user-supplied function to translate
    each case into another in an arbitrary fashion. */
@@ -41,9 +43,16 @@ static const struct casereader_class casereader_translator_class;
 
 /* Creates and returns a new casereader whose cases are produced
    by reading from SUBREADER and passing through TRANSLATE, which
-   must return the translated case, with OUTPUT_VALUE_CNT values,
-   and populate it based on INPUT and auxiliary data AUX.
-   TRANSLATE must destroy its input case.
+   must return the translated case, and populate it based on
+   INPUT and auxiliary data AUX.  TRANSLATE must destroy its
+   input case.
+
+   TRANSLATE may be stateful, that is, the output for a given
+   case may depend on previous cases.  If TRANSLATE is stateless,
+   then you may want to use casereader_translate_stateless
+   instead, since it sometimes performs better.
+
+   The cases returned by TRANSLATE must match OUTPUT_PROTO.
 
    When the translating casereader is destroyed, DESTROY will be
    called to allow any state maintained by TRANSLATE to be freed.
@@ -53,7 +62,7 @@ static const struct casereader_class casereader_translator_class;
    when the translating casereader is destroyed. */
 struct casereader *
 casereader_create_translator (struct casereader *subreader,
-                              size_t output_value_cnt,
+                              const struct caseproto *output_proto,
                               struct ccase *(*translate) (struct ccase *input,
                                                           void *aux),
                               bool (*destroy) (void *aux),
@@ -66,7 +75,7 @@ casereader_create_translator (struct casereader *subreader,
   ct->destroy = destroy;
   ct->aux = aux;
   reader = casereader_create_sequential (
-    NULL, output_value_cnt, casereader_get_case_cnt (ct->subreader),
+    NULL, output_proto, casereader_get_case_cnt (ct->subreader),
     &casereader_translator_class, ct);
   taint_propagate (casereader_get_taint (ct->subreader),
                    casereader_get_taint (reader));
@@ -103,12 +112,115 @@ static const struct casereader_class casereader_translator_class =
     NULL,
     NULL,
   };
+\f
+/* Casereader that applies a user-supplied function to translate
+   each case into another in a stateless fashion. */
+
+/* A statelessly translating casereader. */
+struct casereader_stateless_translator
+  {
+    struct casereader *subreader; /* Source of input cases. */
+
+    casenumber case_offset;
+    struct ccase *(*translate) (struct ccase *input, casenumber,
+                                const void *aux);
+    bool (*destroy) (void *aux);
+    void *aux;
+  };
 
+static const struct casereader_random_class
+casereader_stateless_translator_class;
+
+/* Creates and returns a new casereader whose cases are produced by reading
+   from SUBREADER and passing through the TRANSLATE function.  TRANSLATE must
+   takes ownership of its input case and returns a translated case, populating
+   the translated case based on INPUT and auxiliary data AUX.
+
+   TRANSLATE must be stateless, that is, the output for a given case must not
+   depend on previous cases.  This is because cases may be retrieved in
+   arbitrary order, and some cases may be retrieved multiple times, and some
+   cases may be skipped and never retrieved at all.  If TRANSLATE is stateful,
+   use casereader_create_translator instead.
+
+   The casenumber argument to the TRANSLATE function is the absolute case
+   number in SUBREADER, that is, 0 when the first case in SUBREADER is being
+   translated, 1 when the second case is being translated, and so on.
+
+   The cases returned by TRANSLATE must match OUTPUT_PROTO.
+
+   When the stateless translating casereader is destroyed, DESTROY will be
+   called to allow any auxiliary data maintained by TRANSLATE to be freed.
+
+   After this function is called, SUBREADER must not ever again be referenced
+   directly.  It will be destroyed automatically when the translating
+   casereader is destroyed. */
+struct casereader *
+casereader_translate_stateless (
+  struct casereader *subreader,
+  const struct caseproto *output_proto,
+  struct ccase *(*translate) (struct ccase *input, casenumber,
+                              const void *aux),
+  bool (*destroy) (void *aux),
+  void *aux)
+{
+  struct casereader_stateless_translator *cst = xmalloc (sizeof *cst);
+  struct casereader *reader;
+  cst->subreader = casereader_rename (subreader);
+  cst->translate = translate;
+  cst->destroy = destroy;
+  cst->aux = aux;
+  reader = casereader_create_random (
+    output_proto, casereader_get_case_cnt (cst->subreader),
+    &casereader_stateless_translator_class, cst);
+  taint_propagate (casereader_get_taint (cst->subreader),
+                   casereader_get_taint (reader));
+  return reader;
+}
+
+/* Internal read function for stateless translating casereader. */
+static struct ccase *
+casereader_stateless_translator_read (struct casereader *reader UNUSED,
+                                      void *cst_, casenumber idx)
+{
+  struct casereader_stateless_translator *cst = cst_;
+  struct ccase *tmp = casereader_peek (cst->subreader, idx);
+  if (tmp != NULL)
+    tmp = cst->translate (tmp, cst->case_offset + idx, cst->aux);
+  return tmp;
+}
+
+/* Internal destroy function for translating casereader. */
+static void
+casereader_stateless_translator_destroy (struct casereader *reader UNUSED,
+                                         void *cst_)
+{
+  struct casereader_stateless_translator *cst = cst_;
+  casereader_destroy (cst->subreader);
+  cst->destroy (cst->aux);
+  free (cst);
+}
+
+static void
+casereader_stateless_translator_advance (struct casereader *reader UNUSED,
+                                         void *cst_, casenumber cnt)
+{
+  struct casereader_stateless_translator *cst = cst_;
+  cst->case_offset += casereader_advance (cst->subreader, cnt);
+}
+
+/* Casereader class for stateless translating casereader. */
+static const struct casereader_random_class
+casereader_stateless_translator_class =
+  {
+    casereader_stateless_translator_read,
+    casereader_stateless_translator_destroy,
+    casereader_stateless_translator_advance,
+  };
 \f
 
 struct casereader_append_numeric
 {
-  int value_ofs;
+  struct caseproto *proto;
   casenumber n;
   new_value_func *func;
   void *aux;
@@ -135,12 +247,13 @@ casereader_create_append_numeric (struct casereader *subreader,
                                  void (*destroy) (void *aux))
 {
   struct casereader_append_numeric *can = xmalloc (sizeof *can);
-  can->value_ofs = casereader_get_value_cnt (subreader);
+  can->proto = caseproto_ref (casereader_get_proto (subreader));
+  can->proto = caseproto_add_width (can->proto, 0);
   can->n = 0;
   can->aux = aux;
   can->func = func;
   can->destroy = destroy;
-  return casereader_create_translator (subreader, can->value_ofs + 1,
+  return casereader_create_translator (subreader, can->proto,
                                        can_translate, can_destroy, can);
 }
 
@@ -150,8 +263,8 @@ can_translate (struct ccase *c, void *can_)
 {
   struct casereader_append_numeric *can = can_;
   double new_value = can->func (c, can->n++, can->aux);
-  c = case_unshare_and_resize (c, can->value_ofs + 1);
-  case_data_rw_idx (c, can->value_ofs)->f = new_value;
+  c = case_unshare_and_resize (c, can->proto);
+  case_data_rw_idx (c, caseproto_get_n_widths (can->proto) - 1)->f = new_value;
   return c;
 }
 
@@ -161,6 +274,7 @@ can_destroy (void *can_)
   struct casereader_append_numeric *can = can_;
   if (can->destroy)
     can->destroy (can->aux);
+  caseproto_unref (can->proto);
   free (can);
   return true;
 }
@@ -211,7 +325,7 @@ struct casereader_append_rank
   casenumber n;
   const struct variable *var;
   const struct variable *weight;
-  int value_ofs;
+  struct caseproto *proto;
   casenumber n_common;
   double mean_rank;
   double cc;
@@ -243,7 +357,7 @@ static struct ccase *car_translate (struct ccase *input, void *car_);
 
    If DISTINCT_CALLBACK is non-null, then  it will be called exactly
    once for every case containing a distinct value of V.  AUX is
-   an auxilliary pointer passed to DISTINCT_CALLBACK.
+   an auxiliary pointer passed to DISTINCT_CALLBACK.
 
    After this function is called, SUBREADER must not ever again
    be referenced directly.  It will be destroyed automatically
@@ -258,7 +372,8 @@ casereader_create_append_rank (struct casereader *subreader,
                               )
 {
   struct casereader_append_rank *car = xmalloc (sizeof *car);
-  car->value_ofs = casereader_get_value_cnt (subreader);
+  car->proto = caseproto_ref (casereader_get_proto (subreader));
+  car->proto = caseproto_add_width (car->proto, 0);
   car->weight = w;
   car->var = v;
   car->n = 0;
@@ -270,7 +385,7 @@ casereader_create_append_rank (struct casereader *subreader,
   car->err = err;
   car->prev_value = SYSMIS;
 
-  return casereader_create_translator (subreader, car->value_ofs + 1,
+  return casereader_create_translator (subreader, car->proto,
                                        car_translate, car_destroy, car);
 }
 
@@ -280,6 +395,7 @@ car_destroy (void *car_)
 {
   struct casereader_append_rank *car = car_;
   casereader_destroy (car->clone);
+  caseproto_unref (car->proto);
   free (car);
   return true;
 }
@@ -345,10 +461,141 @@ car_translate (struct ccase *input, void *car_)
 
   car->n++;
 
-  input = case_unshare_and_resize (input, car->value_ofs + 1);
-  case_data_rw_idx (input, car->value_ofs)->f = car->mean_rank ;
+  input = case_unshare_and_resize (input, car->proto);
+  case_data_rw_idx (input, caseproto_get_n_widths (car->proto) - 1)->f
+    = car->mean_rank;
   car->prev_value = value;
   return input;
 }
 
 
+\f
+
+struct consolidator
+{
+  const struct variable *key;
+  const struct variable *weight;
+  double cc;
+  double prev_cc;
+
+  casenumber n;
+  struct casereader *clone;
+  struct caseproto *proto;
+  int direction;
+};
+
+static bool
+uniquify (const struct ccase *c, void *aux)
+{
+  struct consolidator *cdr = aux;
+  const union value *current_value = case_data (c, cdr->key);
+  const int key_width = var_get_width (cdr->key);
+  const double weight = cdr->weight ? case_data (c, cdr->weight)->f : 1.0;
+  struct ccase *next_case = casereader_peek (cdr->clone, cdr->n + 1);
+  int dir = 0;
+
+  cdr->n ++;
+  cdr->cc += weight;
+
+  if ( NULL == next_case)
+      goto end;
+
+  dir = value_compare_3way (case_data (next_case, cdr->key),
+                           current_value, key_width);
+  case_unref (next_case);
+  if ( dir != 0 )
+    {
+      /* Insist that the data are sorted */
+      assert (cdr->direction == 0 || dir == cdr->direction);
+      cdr->direction = dir;
+      goto end;
+    }
+
+  return false;
+
+ end:
+  cdr->prev_cc = cdr->cc;
+  cdr->cc = 0;
+  return true;
+}
+
+
+
+static struct ccase *
+consolodate_weight (struct ccase *input, void *aux)
+{
+  struct consolidator *cdr = aux;
+  struct ccase *c;
+
+  if (cdr->weight)
+    {
+      c = case_unshare (input);
+      case_data_rw (c, cdr->weight)->f = cdr->prev_cc;
+    }
+  else
+    {
+      c = case_unshare_and_resize (input, cdr->proto);
+      case_data_rw_idx (c, caseproto_get_n_widths (cdr->proto) - 1)->f = cdr->prev_cc;
+    }
+
+  return c;
+}
+
+
+static bool
+uniquify_destroy (void *aux)
+{
+  struct consolidator *cdr = aux;
+
+  casereader_destroy (cdr->clone);
+  caseproto_unref (cdr->proto);
+  free (cdr);
+
+  return true;
+}
+
+
+
+/* Returns a new casereader which is based upon INPUT, but which contains a maximum
+   of one case for each distinct value of KEY.
+   If WEIGHT is non-null, then the new casereader's values for this variable
+   will be the sum of all values matching KEY.
+   IF WEIGHT is null, then the new casereader will have an additional numeric
+   value appended, which will contain the total number of cases containing
+   KEY.
+   INPUT must be sorted on KEY
+*/
+struct casereader *
+casereader_create_distinct (struct casereader *input,
+                                              const struct variable *key,
+                                              const struct variable *weight)
+{
+  struct casereader *u ;
+  struct casereader *ud ;
+  struct caseproto *output_proto = caseproto_ref (casereader_get_proto (input));
+
+  struct consolidator *cdr = xmalloc (sizeof (*cdr));
+  cdr->n = 0;
+  cdr->key = key;
+  cdr->weight = weight;
+  cdr->cc = 0;
+  cdr->clone = casereader_clone (input);
+  cdr->direction = 0;
+
+  if ( NULL == cdr->weight )
+    output_proto = caseproto_add_width (output_proto, 0);
+
+  cdr->proto = output_proto;
+
+  u = casereader_create_filter_func (input, uniquify,
+                                    NULL, cdr, NULL);
+
+  ud = casereader_create_translator (u,
+                                    output_proto,
+                                    consolodate_weight,
+                                    uniquify_destroy,
+                                    cdr);
+
+  return ud;
+}
+