Corrected spelling of "consolidate".
[pspp-builds.git] / src / data / casereader-translator.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2007, 2009 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18 #include <data/val-type.h>
19 #include <data/casereader.h>
20 #include <stdlib.h>
21
22 #include <data/variable.h>
23 #include <data/casereader-provider.h>
24 #include <libpspp/taint.h>
25
26 #include "xalloc.h"
27
28 /* Casereader that applies a user-supplied function to translate
29    each case into another in an arbitrary fashion. */
30
31 /* A translating casereader. */
32 struct casereader_translator
33   {
34     struct casereader *subreader; /* Source of input cases. */
35
36     struct ccase *(*translate) (struct ccase *input, void *aux);
37     bool (*destroy) (void *aux);
38     void *aux;
39   };
40
41 static const struct casereader_class casereader_translator_class;
42
43 /* Creates and returns a new casereader whose cases are produced
44    by reading from SUBREADER and passing through TRANSLATE, which
45    must return the translated case, and populate it based on
46    INPUT and auxiliary data AUX.  TRANSLATE must destroy its
47    input case.
48
49    The cases returned by TRANSLATE must match OUTPUT_PROTO.
50
51    When the translating casereader is destroyed, DESTROY will be
52    called to allow any state maintained by TRANSLATE to be freed.
53
54    After this function is called, SUBREADER must not ever again
55    be referenced directly.  It will be destroyed automatically
56    when the translating casereader is destroyed. */
57 struct casereader *
58 casereader_create_translator (struct casereader *subreader,
59                               const struct caseproto *output_proto,
60                               struct ccase *(*translate) (struct ccase *input,
61                                                           void *aux),
62                               bool (*destroy) (void *aux),
63                               void *aux)
64 {
65   struct casereader_translator *ct = xmalloc (sizeof *ct);
66   struct casereader *reader;
67   ct->subreader = casereader_rename (subreader);
68   ct->translate = translate;
69   ct->destroy = destroy;
70   ct->aux = aux;
71   reader = casereader_create_sequential (
72     NULL, output_proto, casereader_get_case_cnt (ct->subreader),
73     &casereader_translator_class, ct);
74   taint_propagate (casereader_get_taint (ct->subreader),
75                    casereader_get_taint (reader));
76   return reader;
77 }
78
79 /* Internal read function for translating casereader. */
80 static struct ccase *
81 casereader_translator_read (struct casereader *reader UNUSED,
82                             void *ct_)
83 {
84   struct casereader_translator *ct = ct_;
85   struct ccase *tmp = casereader_read (ct->subreader);
86   if (tmp)
87     tmp = ct->translate (tmp, ct->aux);
88   return tmp;
89 }
90
91 /* Internal destroy function for translating casereader. */
92 static void
93 casereader_translator_destroy (struct casereader *reader UNUSED, void *ct_)
94 {
95   struct casereader_translator *ct = ct_;
96   casereader_destroy (ct->subreader);
97   ct->destroy (ct->aux);
98   free (ct);
99 }
100
101 /* Casereader class for translating casereader. */
102 static const struct casereader_class casereader_translator_class =
103   {
104     casereader_translator_read,
105     casereader_translator_destroy,
106     NULL,
107     NULL,
108   };
109
110 \f
111
112 struct casereader_append_numeric
113 {
114   struct caseproto *proto;
115   casenumber n;
116   new_value_func *func;
117   void *aux;
118   void (*destroy) (void *aux);
119 };
120
121 static bool can_destroy (void *can_);
122
123 static struct ccase *can_translate (struct ccase *, void *can_);
124
125 /* Creates and returns a new casereader whose cases are produced
126    by reading from SUBREADER and appending an additional value,
127    generated by FUNC.  AUX is an optional parameter which
128    gets passed to FUNC. FUNC will also receive N as it, which is
129    the ordinal number of the case in the reader.  DESTROY is an
130    optional parameter used to destroy AUX.
131
132    After this function is called, SUBREADER must not ever again
133    be referenced directly.  It will be destroyed automatically
134    when the translating casereader is destroyed. */
135 struct casereader *
136 casereader_create_append_numeric (struct casereader *subreader,
137                                   new_value_func func, void *aux,
138                                   void (*destroy) (void *aux))
139 {
140   struct casereader_append_numeric *can = xmalloc (sizeof *can);
141   can->proto = caseproto_ref (casereader_get_proto (subreader));
142   can->proto = caseproto_add_width (can->proto, 0);
143   can->n = 0;
144   can->aux = aux;
145   can->func = func;
146   can->destroy = destroy;
147   return casereader_create_translator (subreader, can->proto,
148                                        can_translate, can_destroy, can);
149 }
150
151
152 static struct ccase *
153 can_translate (struct ccase *c, void *can_)
154 {
155   struct casereader_append_numeric *can = can_;
156   double new_value = can->func (c, can->n++, can->aux);
157   c = case_unshare_and_resize (c, can->proto);
158   case_data_rw_idx (c, caseproto_get_n_widths (can->proto) - 1)->f = new_value;
159   return c;
160 }
161
162 static bool
163 can_destroy (void *can_)
164 {
165   struct casereader_append_numeric *can = can_;
166   if (can->destroy)
167     can->destroy (can->aux);
168   caseproto_unref (can->proto);
169   free (can);
170   return true;
171 }
172
173 \f
174
175 struct arithmetic_sequence
176 {
177   double first;
178   double increment;
179 };
180
181 static double
182 next_arithmetic (const struct ccase *c UNUSED,
183                  casenumber n,
184                  void *aux)
185 {
186   struct arithmetic_sequence *as = aux;
187   return n * as->increment + as->first;
188 }
189
190 /* Creates and returns a new casereader whose cases are produced
191    by reading from SUBREADER and appending an additional value,
192    which takes the value FIRST in the first case, FIRST +
193    INCREMENT in the second case, FIRST + INCREMENT * 2 in the
194    third case, and so on.
195
196    After this function is called, SUBREADER must not ever again
197    be referenced directly.  It will be destroyed automatically
198    when the translating casereader is destroyed. */
199 struct casereader *
200 casereader_create_arithmetic_sequence (struct casereader *subreader,
201                                        double first, double increment)
202 {
203   struct arithmetic_sequence *as = xzalloc (sizeof *as);
204   as->first = first;
205   as->increment = increment;
206   return casereader_create_append_numeric (subreader, next_arithmetic,
207                                            as, free);
208 }
209
210
211 \f
212
213 struct casereader_append_rank
214 {
215   struct casereader *clone;
216   casenumber n;
217   const struct variable *var;
218   const struct variable *weight;
219   struct caseproto *proto;
220   casenumber n_common;
221   double mean_rank;
222   double cc;
223   distinct_func *distinct;
224   void *aux;
225   enum rank_error *err;
226   double prev_value;
227 };
228
229 static bool car_destroy (void *car_);
230
231 static struct ccase *car_translate (struct ccase *input, void *car_);
232
233 /* Creates and returns a new casereader whose cases are produced
234    by reading from SUBREADER and appending an additional value,
235    which is the rank of the observation.   W is the weight variable
236    of the dictionary containing V, or NULL if there is no weight
237    variable.
238
239    The following preconditions must be met:
240
241    1.    SUBREADER must be sorted on V.
242
243    2.    The weight variables, must be non-negative.
244
245    If either of these preconditions are not satisfied, then the rank
246    variables may not be correct.  In this case, if ERR is non-null,
247    it will be set according to the erroneous conditions encountered.
248
249    If DISTINCT_CALLBACK is non-null, then  it will be called exactly
250    once for every case containing a distinct value of V.  AUX is
251    an auxilliary pointer passed to DISTINCT_CALLBACK.
252
253    After this function is called, SUBREADER must not ever again
254    be referenced directly.  It will be destroyed automatically
255    when the translating casereader is destroyed. */
256 struct casereader *
257 casereader_create_append_rank (struct casereader *subreader,
258                                const struct variable *v,
259                                const struct variable *w,
260                                enum rank_error *err,
261                                distinct_func *distinct_callback,
262                                void *aux
263                                )
264 {
265   struct casereader_append_rank *car = xmalloc (sizeof *car);
266   car->proto = caseproto_ref (casereader_get_proto (subreader));
267   car->proto = caseproto_add_width (car->proto, 0);
268   car->weight = w;
269   car->var = v;
270   car->n = 0;
271   car->n_common = 1;
272   car->cc = 0.0;
273   car->clone = casereader_clone (subreader);
274   car->distinct = distinct_callback;
275   car->aux = aux;
276   car->err = err;
277   car->prev_value = SYSMIS;
278
279   return casereader_create_translator (subreader, car->proto,
280                                        car_translate, car_destroy, car);
281 }
282
283
284 static bool
285 car_destroy (void *car_)
286 {
287   struct casereader_append_rank *car = car_;
288   casereader_destroy (car->clone);
289   caseproto_unref (car->proto);
290   free (car);
291   return true;
292 }
293
294 static struct ccase *
295 car_translate (struct ccase *input, void *car_)
296 {
297   struct casereader_append_rank *car = car_;
298
299   const double value = case_data (input, car->var)->f;
300
301   if ( car->prev_value != SYSMIS)
302     {
303       if (car->err && value < car->prev_value)
304         *car->err |= RANK_ERR_UNSORTED;
305     }
306
307   if ( car->n_common == 1)
308     {
309       double vxx = SYSMIS;
310       casenumber k = 0;
311       double weight = 1.0;
312       if (car->weight)
313         {
314           weight = case_data (input, car->weight)->f;
315           if ( car->err && weight < 0 )
316             *car->err |= RANK_ERR_NEGATIVE_WEIGHT;
317         }
318
319       do
320         {
321           struct ccase *c = casereader_peek (car->clone, car->n + ++k);
322           if (c == NULL)
323             break;
324           vxx = case_data (c, car->var)->f;
325
326           if ( vxx == value)
327             {
328               if (car->weight)
329                 {
330                   double w = case_data (c, car->weight)->f;
331
332                   if ( car->err && w < 0 )
333                     *car->err |= RANK_ERR_NEGATIVE_WEIGHT;
334
335                   weight += w;
336                 }
337               else
338                 weight += 1.0;
339               car->n_common++;
340             }
341           case_unref (c);
342         }
343       while (vxx == value);
344       car->mean_rank = car->cc + (weight + 1) / 2.0;
345       car->cc += weight;
346
347       if (car->distinct)
348         car->distinct (value, car->n_common, weight, car->aux);
349     }
350   else
351     car->n_common--;
352
353   car->n++;
354
355   input = case_unshare_and_resize (input, car->proto);
356   case_data_rw_idx (input, caseproto_get_n_widths (car->proto) - 1)->f
357     = car->mean_rank;
358   car->prev_value = value;
359   return input;
360 }
361
362
363 \f
364
365 struct consolidator
366 {
367   const struct variable *key;
368   const struct variable *weight;
369   double cc;
370   double prev_cc;
371
372   casenumber n;
373   struct casereader *clone;
374   struct caseproto *proto;
375   int direction;
376 };
377
378 static bool
379 uniquify (const struct ccase *c, void *aux)
380 {
381   struct consolidator *cdr = aux;
382   const union value *current_value = case_data (c, cdr->key);
383   const int key_width = var_get_width (cdr->key);
384   const double weight = cdr->weight ? case_data (c, cdr->weight)->f : 1.0;
385   const struct ccase *next_case = casereader_peek (cdr->clone, cdr->n + 1);
386   int dir = 0;
387
388   cdr->n ++;
389   cdr->cc += weight;
390
391   if ( NULL == next_case)
392       goto end;
393   
394   dir = value_compare_3way (case_data (next_case, cdr->key),
395                             current_value, key_width);
396   if ( dir != 0 )
397     {
398       /* Insist that the data are sorted */
399       assert (cdr->direction == 0 || dir == cdr->direction);
400       cdr->direction = dir;
401       goto end;
402     }
403   
404   return false;
405
406  end:
407   cdr->prev_cc = cdr->cc;
408   cdr->cc = 0;
409   return true;
410 }
411
412
413
414 static struct ccase *
415 consolodate_weight (struct ccase *input, void *aux)
416 {
417   struct consolidator *cdr = aux;
418   struct ccase *c;
419
420   c = case_unshare_and_resize (input, cdr->proto);
421
422   if (cdr->weight)
423     case_data_rw (c, cdr->weight)->f = cdr->prev_cc;
424   else
425     case_data_rw_idx (c, caseproto_get_n_widths (cdr->proto) - 1)->f = cdr->prev_cc;    
426
427   return c;
428 }
429
430
431 static bool
432 uniquify_destroy (void *aux)
433 {
434   struct consolidator *cdr = aux;
435
436   casereader_destroy (cdr->clone);
437   free (cdr);
438
439   return true;
440 }
441
442
443
444 /* Returns a new casereader which is based upon INPUT, but which contains a maximum 
445    of one case for each distinct value of KEY.
446    If WEIGHT is non-null, then the new casereader's values for this variable
447    will be the sum of all values matching KEY.
448    IF WEIGHT is null, then the new casereader will have an additional numeric
449    value appended, which will contain the total number of cases containing
450    KEY.
451    INPUT must be sorted on KEY
452 */
453 struct casereader *
454 casereader_create_distinct (struct casereader *input,
455                                                const struct variable *key,
456                                                const struct variable *weight)
457 {
458   struct casereader *u ;
459   struct casereader *ud ;
460   const struct caseproto *output_proto = casereader_get_proto (input);
461
462   struct consolidator *cdr = xmalloc (sizeof (*cdr));
463   cdr->n = 0;
464   cdr->key = key;
465   cdr->weight = weight;
466   cdr->cc = 0;
467   cdr->clone = casereader_clone (input);
468   cdr->direction = 0;
469
470   if ( NULL == cdr->weight )
471     output_proto = caseproto_add_width (output_proto, 0);
472
473   cdr->proto = output_proto;
474
475   u = casereader_create_filter_func (input, uniquify,
476                                      NULL, cdr, NULL);
477
478   ud = casereader_create_translator (u,
479                                      output_proto,
480                                      consolodate_weight,
481                                      uniquify_destroy,
482                                      cdr);
483 }
484