Change "union value" to dynamically allocate long strings.
[pspp-builds.git] / src / data / casereader-translator.c
index ae22f1297e19dbe5a12e6d9e2ec49f75ce8de087..548c22fb2ddd646653debaeb50e22fae70dd9c81 100644 (file)
@@ -1,5 +1,5 @@
 /* PSPP - a program for statistical analysis.
-   Copyright (C) 2007 Free Software Foundation, Inc.
+   Copyright (C) 2007, 2009 Free Software Foundation, Inc.
 
    This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
@@ -32,7 +32,7 @@ struct casereader_translator
   {
     struct casereader *subreader; /* Source of input cases. */
 
-    void (*translate) (struct ccase *input, struct ccase *output, void *aux);
+    struct ccase *(*translate) (struct ccase *input, void *aux);
     bool (*destroy) (void *aux);
     void *aux;
   };
@@ -41,9 +41,11 @@ static const struct casereader_class casereader_translator_class;
 
 /* Creates and returns a new casereader whose cases are produced
    by reading from SUBREADER and passing through TRANSLATE, which
-   must create case OUTPUT, with OUTPUT_VALUE_CNT values, and
-   populate it based on INPUT and auxiliary data AUX.  TRANSLATE
-   must also destroy INPUT.
+   must return the translated case, and populate it based on
+   INPUT and auxiliary data AUX.  TRANSLATE must destroy its
+   input case.
+
+   The cases returned by TRANSLATE must match OUTPUT_PROTO.
 
    When the translating casereader is destroyed, DESTROY will be
    called to allow any state maintained by TRANSLATE to be freed.
@@ -53,10 +55,9 @@ static const struct casereader_class casereader_translator_class;
    when the translating casereader is destroyed. */
 struct casereader *
 casereader_create_translator (struct casereader *subreader,
-                              size_t output_value_cnt,
-                              void (*translate) (struct ccase *input,
-                                                 struct ccase *output,
-                                                 void *aux),
+                              const struct caseproto *output_proto,
+                              struct ccase *(*translate) (struct ccase *input,
+                                                          void *aux),
                               bool (*destroy) (void *aux),
                               void *aux)
 {
@@ -67,7 +68,7 @@ casereader_create_translator (struct casereader *subreader,
   ct->destroy = destroy;
   ct->aux = aux;
   reader = casereader_create_sequential (
-    NULL, output_value_cnt, casereader_get_case_cnt (ct->subreader),
+    NULL, output_proto, casereader_get_case_cnt (ct->subreader),
     &casereader_translator_class, ct);
   taint_propagate (casereader_get_taint (ct->subreader),
                    casereader_get_taint (reader));
@@ -75,20 +76,15 @@ casereader_create_translator (struct casereader *subreader,
 }
 
 /* Internal read function for translating casereader. */
-static bool
+static struct ccase *
 casereader_translator_read (struct casereader *reader UNUSED,
-                            void *ct_, struct ccase *c)
+                            void *ct_)
 {
   struct casereader_translator *ct = ct_;
-  struct ccase tmp;
-
-  if (casereader_read (ct->subreader, &tmp))
-    {
-      ct->translate (&tmp, c, ct->aux);
-      return true;
-    }
-  else
-    return false;
+  struct ccase *tmp = casereader_read (ct->subreader);
+  if (tmp)
+    tmp = ct->translate (tmp, ct->aux);
+  return tmp;
 }
 
 /* Internal destroy function for translating casereader. */
@@ -114,7 +110,7 @@ static const struct casereader_class casereader_translator_class =
 
 struct casereader_append_numeric
 {
-  int value_ofs;
+  struct caseproto *proto;
   casenumber n;
   new_value_func *func;
   void *aux;
@@ -123,8 +119,7 @@ struct casereader_append_numeric
 
 static bool can_destroy (void *can_);
 
-static void can_translate (struct ccase *input, struct ccase *output,
-                          void *can_);
+static struct ccase *can_translate (struct ccase *, void *can_);
 
 /* Creates and returns a new casereader whose cases are produced
    by reading from SUBREADER and appending an additional value,
@@ -142,25 +137,25 @@ casereader_create_append_numeric (struct casereader *subreader,
                                  void (*destroy) (void *aux))
 {
   struct casereader_append_numeric *can = xmalloc (sizeof *can);
-  can->value_ofs = casereader_get_value_cnt (subreader);
+  can->proto = caseproto_ref (casereader_get_proto (subreader));
+  can->proto = caseproto_add_width (can->proto, 0);
   can->n = 0;
   can->aux = aux;
   can->func = func;
   can->destroy = destroy;
-  return casereader_create_translator (subreader, can->value_ofs + 1,
+  return casereader_create_translator (subreader, can->proto,
                                        can_translate, can_destroy, can);
 }
 
 
-static void
-can_translate (struct ccase *input, struct ccase *output, void *can_)
+static struct ccase *
+can_translate (struct ccase *c, void *can_)
 {
   struct casereader_append_numeric *can = can_;
-  double new_value = can->func (input, can->n++, can->aux);
-  case_nullify (output);
-  case_move (output, input);
-  case_resize (output, can->value_ofs + 1);
-  case_data_rw_idx (output, can->value_ofs)->f = new_value;
+  double new_value = can->func (c, can->n++, can->aux);
+  c = case_unshare_and_resize (c, can->proto);
+  case_data_rw_idx (c, caseproto_get_n_widths (can->proto) - 1)->f = new_value;
+  return c;
 }
 
 static bool
@@ -169,6 +164,7 @@ can_destroy (void *can_)
   struct casereader_append_numeric *can = can_;
   if (can->destroy)
     can->destroy (can->aux);
+  caseproto_unref (can->proto);
   free (can);
   return true;
 }
@@ -219,7 +215,7 @@ struct casereader_append_rank
   casenumber n;
   const struct variable *var;
   const struct variable *weight;
-  int value_ofs;
+  struct caseproto *proto;
   casenumber n_common;
   double mean_rank;
   double cc;
@@ -231,8 +227,7 @@ struct casereader_append_rank
 
 static bool car_destroy (void *car_);
 
-static void car_translate (struct ccase *input, struct ccase *output,
-                          void *car_);
+static struct ccase *car_translate (struct ccase *input, void *car_);
 
 /* Creates and returns a new casereader whose cases are produced
    by reading from SUBREADER and appending an additional value,
@@ -267,7 +262,8 @@ casereader_create_append_rank (struct casereader *subreader,
                               )
 {
   struct casereader_append_rank *car = xmalloc (sizeof *car);
-  car->value_ofs = casereader_get_value_cnt (subreader);
+  car->proto = caseproto_ref (casereader_get_proto (subreader));
+  car->proto = caseproto_add_width (car->proto, 0);
   car->weight = w;
   car->var = v;
   car->n = 0;
@@ -279,7 +275,7 @@ casereader_create_append_rank (struct casereader *subreader,
   car->err = err;
   car->prev_value = SYSMIS;
 
-  return casereader_create_translator (subreader, car->value_ofs + 1,
+  return casereader_create_translator (subreader, car->proto,
                                        car_translate, car_destroy, car);
 }
 
@@ -289,13 +285,13 @@ car_destroy (void *car_)
 {
   struct casereader_append_rank *car = car_;
   casereader_destroy (car->clone);
+  caseproto_unref (car->proto);
   free (car);
   return true;
 }
 
-
-static void
-car_translate (struct ccase *input, struct ccase *output,  void *car_)
+static struct ccase *
+car_translate (struct ccase *input, void *car_)
 {
   struct casereader_append_rank *car = car_;
 
@@ -321,16 +317,16 @@ car_translate (struct ccase *input, struct ccase *output,  void *car_)
 
       do
        {
-         struct ccase c;
-         if ( ! casereader_peek (car->clone, car->n + ++k, &c))
+         struct ccase *c = casereader_peek (car->clone, car->n + ++k);
+         if (c == NULL)
            break;
-         vxx = case_data (&c, car->var)->f;
+         vxx = case_data (c, car->var)->f;
 
          if ( vxx == value)
            {
              if (car->weight)
                {
-                 double w = case_data (&c, car->weight)->f;
+                 double w = case_data (c, car->weight)->f;
 
                  if ( car->err && w < 0 )
                    *car->err |= RANK_ERR_NEGATIVE_WEIGHT;
@@ -341,7 +337,7 @@ car_translate (struct ccase *input, struct ccase *output,  void *car_)
                weight += 1.0;
              car->n_common++;
            }
-         case_destroy (&c);
+          case_unref (c);
        }
       while (vxx == value);
       car->mean_rank = car->cc + (weight + 1) / 2.0;
@@ -355,11 +351,11 @@ car_translate (struct ccase *input, struct ccase *output,  void *car_)
 
   car->n++;
 
-  case_nullify (output);
-  case_move (output, input);
-  case_resize (output, car->value_ofs + 1);
-  case_data_rw_idx (output, car->value_ofs)->f = car->mean_rank ;
+  input = case_unshare_and_resize (input, car->proto);
+  case_data_rw_idx (input, caseproto_get_n_widths (car->proto) - 1)->f
+    = car->mean_rank;
   car->prev_value = value;
+  return input;
 }