Respect the constness of caseproto.
[pspp-builds.git] / src / data / casereader-translator.c
index 62b38b99a6c07114bcb4159caa23c35060cbd078..c193d404dcccdf08a1a73c96109ee00c44f9e0ef 100644 (file)
@@ -19,6 +19,7 @@
 #include <data/casereader.h>
 #include <stdlib.h>
 
+#include <data/variable.h>
 #include <data/casereader-provider.h>
 #include <libpspp/taint.h>
 
@@ -361,7 +362,7 @@ car_translate (struct ccase *input, void *car_)
 
 \f
 
-struct consolodator
+struct consolidator
 {
   const struct variable *key;
   const struct variable *weight;
@@ -371,27 +372,34 @@ struct consolodator
   casenumber n;
   struct casereader *clone;
   struct caseproto *proto;
+  int direction;
 };
 
 static bool
 uniquify (const struct ccase *c, void *aux)
 {
-  struct consolodator *cdr = aux;
+  struct consolidator *cdr = aux;
   const union value *current_value = case_data (c, cdr->key);
   const int key_width = var_get_width (cdr->key);
   const double weight = cdr->weight ? case_data (c, cdr->weight)->f : 1.0;
   const struct ccase *next_case = casereader_peek (cdr->clone, cdr->n + 1);
+  int dir = 0;
 
   cdr->n ++;
   cdr->cc += weight;
 
   if ( NULL == next_case)
       goto end;
-    
-  if ( 0 != value_compare_3way (case_data (next_case, cdr->key),
-                               current_value, key_width))
-    goto end;
-
+  
+  dir = value_compare_3way (case_data (next_case, cdr->key),
+                           current_value, key_width);
+  if ( dir != 0 )
+    {
+      /* Insist that the data are sorted */
+      assert (cdr->direction == 0 || dir == cdr->direction);
+      cdr->direction = dir;
+      goto end;
+    }
   
   return false;
 
@@ -406,15 +414,19 @@ uniquify (const struct ccase *c, void *aux)
 static struct ccase *
 consolodate_weight (struct ccase *input, void *aux)
 {
-  struct consolodator *cdr = aux;
+  struct consolidator *cdr = aux;
   struct ccase *c;
 
-  c = case_unshare_and_resize (input, cdr->proto);
-
   if (cdr->weight)
-    case_data_rw (c, cdr->weight)->f = cdr->prev_cc;
+    {
+      c = case_unshare (input);
+      case_data_rw (c, cdr->weight)->f = cdr->prev_cc;
+    }
   else
-    case_data_rw_idx (c, caseproto_get_n_widths (cdr->proto) - 1)->f = cdr->prev_cc;    
+    {
+      c = case_unshare_and_resize (input, cdr->proto);
+      case_data_rw_idx (c, caseproto_get_n_widths (cdr->proto) - 1)->f = cdr->prev_cc;    
+    }
 
   return c;
 }
@@ -423,7 +435,7 @@ consolodate_weight (struct ccase *input, void *aux)
 static bool
 uniquify_destroy (void *aux)
 {
-  struct consolodator *cdr = aux;
+  struct consolidator *cdr = aux;
 
   casereader_destroy (cdr->clone);
   free (cdr);
@@ -449,14 +461,15 @@ casereader_create_distinct (struct casereader *input,
 {
   struct casereader *u ;
   struct casereader *ud ;
-  struct caseproto *output_proto = casereader_get_proto (input);
+  struct caseproto *output_proto = caseproto_clone (casereader_get_proto (input));
 
-  struct consolodator *cdr = xmalloc (sizeof (*cdr));
+  struct consolidator *cdr = xmalloc (sizeof (*cdr));
   cdr->n = 0;
   cdr->key = key;
   cdr->weight = weight;
   cdr->cc = 0;
   cdr->clone = casereader_clone (input);
+  cdr->direction = 0;
 
   if ( NULL == cdr->weight )
     output_proto = caseproto_add_width (output_proto, 0);
@@ -471,5 +484,7 @@ casereader_create_distinct (struct casereader *input,
                                     consolodate_weight,
                                     uniquify_destroy,
                                     cdr);
+
+  return ud;
 }