casereader-translator: Add a class for casereader translators.
[pspp] / src / data / casereader-translator.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2007, 2009, 2011 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <stdlib.h>
20
21 #include "data/casereader-provider.h"
22 #include "data/casereader.h"
23 #include "data/val-type.h"
24 #include "data/variable.h"
25 #include "libpspp/taint.h"
26
27 #include "gl/xalloc.h"
28
29 /* Casereader that applies a user-supplied function to translate
30    each case into another in an arbitrary fashion. */
31
32 /* A translating casereader. */
33 struct casereader_translator
34   {
35     struct casereader *subreader; /* Source of input cases. */
36     const struct casereader_translator_class *class;
37     void *aux;
38   };
39
40 static const struct casereader_class casereader_translator_class;
41
42 /* Creates and returns a new casereader whose cases are produced
43    by reading from SUBREADER and passing through TRANSLATE, which
44    must return the translated case, and populate it based on
45    INPUT and auxiliary data AUX.  TRANSLATE must destroy its
46    input case.
47
48    TRANSLATE may be stateful, that is, the output for a given
49    case may depend on previous cases.  If TRANSLATE is stateless,
50    then you may want to use casereader_translate_stateless
51    instead, since it sometimes performs better.
52
53    The cases returned by TRANSLATE must match OUTPUT_PROTO.
54
55    When the translating casereader is destroyed, DESTROY will be
56    called to allow any state maintained by TRANSLATE to be freed.
57
58    After this function is called, SUBREADER must not ever again
59    be referenced directly.  It will be destroyed automatically
60    when the translating casereader is destroyed. */
61 struct casereader *
62 casereader_create_translator (struct casereader *subreader,
63                               const struct caseproto *output_proto,
64                               const struct casereader_translator_class *class,
65                               void *aux)
66 {
67   struct casereader_translator *ct = xmalloc (sizeof *ct);
68   *ct = (struct casereader_translator) {
69     .subreader = casereader_rename (subreader),
70     .class = class,
71     .aux = aux,
72   };
73
74   struct casereader *reader = casereader_create_sequential (
75     NULL, output_proto, casereader_get_n_cases (ct->subreader),
76     &casereader_translator_class, ct);
77   taint_propagate (casereader_get_taint (ct->subreader),
78                    casereader_get_taint (reader));
79   return reader;
80 }
81
82 /* Internal read function for translating casereader. */
83 static struct ccase *
84 casereader_translator_read (struct casereader *reader UNUSED,
85                             void *ct_)
86 {
87   struct casereader_translator *ct = ct_;
88   struct ccase *tmp = casereader_read (ct->subreader);
89   if (tmp)
90     tmp = ct->class->translate (tmp, ct->aux);
91   return tmp;
92 }
93
94 /* Internal destroy function for translating casereader. */
95 static void
96 casereader_translator_destroy (struct casereader *reader UNUSED, void *ct_)
97 {
98   struct casereader_translator *ct = ct_;
99   casereader_destroy (ct->subreader);
100   ct->class->destroy (ct->aux);
101   free (ct);
102 }
103
104 /* Casereader class for translating casereader. */
105 static const struct casereader_class casereader_translator_class =
106   {
107     casereader_translator_read,
108     casereader_translator_destroy,
109     NULL,
110     NULL,
111   };
112 \f
113 /* Casereader that applies a user-supplied function to translate
114    each case into another in a stateless fashion. */
115
116 /* A statelessly translating casereader. */
117 struct casereader_stateless_translator
118   {
119     struct casereader *subreader; /* Source of input cases. */
120
121     casenumber case_offset;
122     const struct casereader_translator_class *class;
123     void *aux;
124   };
125
126 static const struct casereader_random_class
127 casereader_stateless_translator_class;
128
129 /* Creates and returns a new casereader whose cases are produced by reading
130    from SUBREADER and passing through the TRANSLATE function.  TRANSLATE must
131    takes ownership of its input case and returns a translated case, populating
132    the translated case based on INPUT and auxiliary data AUX.
133
134    TRANSLATE must be stateless, that is, the output for a given case must not
135    depend on previous cases.  This is because cases may be retrieved in
136    arbitrary order, and some cases may be retrieved multiple times, and some
137    cases may be skipped and never retrieved at all.  If TRANSLATE is stateful,
138    use casereader_create_translator instead.
139
140    The cases returned by TRANSLATE must match OUTPUT_PROTO.
141
142    When the stateless translating casereader is destroyed, DESTROY will be
143    called to allow any auxiliary data maintained by TRANSLATE to be freed.
144
145    After this function is called, SUBREADER must not ever again be referenced
146    directly.  It will be destroyed automatically when the translating
147    casereader is destroyed. */
148 struct casereader *
149 casereader_translate_stateless (
150   struct casereader *subreader,
151   const struct caseproto *output_proto,
152   const struct casereader_translator_class *class,
153   void *aux)
154 {
155   struct casereader_stateless_translator *cst = xmalloc (sizeof *cst);
156   *cst = (struct casereader_stateless_translator) {
157     .subreader = casereader_rename (subreader),
158     .class = class,
159     .aux = aux,
160   };
161   struct casereader *reader = casereader_create_random (
162     output_proto, casereader_get_n_cases (cst->subreader),
163     &casereader_stateless_translator_class, cst);
164   taint_propagate (casereader_get_taint (cst->subreader),
165                    casereader_get_taint (reader));
166   return reader;
167 }
168
169 /* Internal read function for stateless translating casereader. */
170 static struct ccase *
171 casereader_stateless_translator_read (struct casereader *reader UNUSED,
172                                       void *cst_, casenumber idx)
173 {
174   struct casereader_stateless_translator *cst = cst_;
175   struct ccase *tmp = casereader_peek (cst->subreader, idx);
176   if (tmp != NULL)
177     tmp = cst->class->translate (tmp, cst->aux);
178   return tmp;
179 }
180
181 /* Internal destroy function for translating casereader. */
182 static void
183 casereader_stateless_translator_destroy (struct casereader *reader UNUSED,
184                                          void *cst_)
185 {
186   struct casereader_stateless_translator *cst = cst_;
187   casereader_destroy (cst->subreader);
188   cst->class->destroy (cst->aux);
189   free (cst);
190 }
191
192 static void
193 casereader_stateless_translator_advance (struct casereader *reader UNUSED,
194                                          void *cst_, casenumber n)
195 {
196   struct casereader_stateless_translator *cst = cst_;
197   cst->case_offset += casereader_advance (cst->subreader, n);
198 }
199
200 /* Casereader class for stateless translating casereader. */
201 static const struct casereader_random_class
202 casereader_stateless_translator_class =
203   {
204     casereader_stateless_translator_read,
205     casereader_stateless_translator_destroy,
206     casereader_stateless_translator_advance,
207   };
208 \f
209
210 struct casereader_append_numeric
211 {
212   struct caseproto *proto;
213   casenumber n;
214   new_value_func *func;
215   void *aux;
216   void (*destroy) (void *aux);
217 };
218
219 static bool can_destroy (void *can_);
220
221 static struct ccase *can_translate (struct ccase *, void *can_);
222
223 /* Creates and returns a new casereader whose cases are produced
224    by reading from SUBREADER and appending an additional value,
225    generated by FUNC.  AUX is an optional parameter which
226    gets passed to FUNC. FUNC will also receive N as it, which is
227    the ordinal number of the case in the reader.  DESTROY is an
228    optional parameter used to destroy AUX.
229
230    After this function is called, SUBREADER must not ever again
231    be referenced directly.  It will be destroyed automatically
232    when the translating casereader is destroyed. */
233 struct casereader *
234 casereader_create_append_numeric (struct casereader *subreader,
235                                   new_value_func func, void *aux,
236                                   void (*destroy) (void *aux))
237 {
238   struct casereader_append_numeric *can = xmalloc (sizeof *can);
239   can->proto = caseproto_ref (casereader_get_proto (subreader));
240   can->proto = caseproto_add_width (can->proto, 0);
241   can->n = 0;
242   can->aux = aux;
243   can->func = func;
244   can->destroy = destroy;
245
246   static const struct casereader_translator_class class = {
247     can_translate, can_destroy,
248   };
249   return casereader_create_translator (subreader, can->proto, &class, can);
250 }
251
252
253 static struct ccase *
254 can_translate (struct ccase *c, void *can_)
255 {
256   struct casereader_append_numeric *can = can_;
257   double new_value = can->func (c, can->n++, can->aux);
258   c = case_unshare_and_resize (c, can->proto);
259   *case_num_rw_idx (c, caseproto_get_n_widths (can->proto) - 1) = new_value;
260   return c;
261 }
262
263 static bool
264 can_destroy (void *can_)
265 {
266   struct casereader_append_numeric *can = can_;
267   if (can->destroy)
268     can->destroy (can->aux);
269   caseproto_unref (can->proto);
270   free (can);
271   return true;
272 }
273
274 \f
275
276 struct arithmetic_sequence
277 {
278   double first;
279   double increment;
280 };
281
282 static double
283 next_arithmetic (const struct ccase *c UNUSED,
284                  casenumber n,
285                  void *aux)
286 {
287   struct arithmetic_sequence *as = aux;
288   return n * as->increment + as->first;
289 }
290
291 /* Creates and returns a new casereader whose cases are produced
292    by reading from SUBREADER and appending an additional value,
293    which takes the value FIRST in the first case, FIRST +
294    INCREMENT in the second case, FIRST + INCREMENT * 2 in the
295    third case, and so on.
296
297    After this function is called, SUBREADER must not ever again
298    be referenced directly.  It will be destroyed automatically
299    when the translating casereader is destroyed. */
300 struct casereader *
301 casereader_create_arithmetic_sequence (struct casereader *subreader,
302                                        double first, double increment)
303 {
304   struct arithmetic_sequence *as = XZALLOC (struct arithmetic_sequence);
305   as->first = first;
306   as->increment = increment;
307   return casereader_create_append_numeric (subreader, next_arithmetic,
308                                            as, free);
309 }
310
311
312 \f
313
314 struct casereader_append_rank
315 {
316   struct casereader *clone;
317   casenumber n;
318   const struct variable *var;
319   const struct variable *weight;
320   struct caseproto *proto;
321   casenumber n_common;
322   double mean_rank;
323   double cc;
324   distinct_func *distinct;
325   void *aux;
326   enum rank_error *err;
327   double prev_value;
328 };
329
330 static bool car_destroy (void *car_);
331
332 static struct ccase *car_translate (struct ccase *input, void *car_);
333
334 /* Creates and returns a new casereader whose cases are produced
335    by reading from SUBREADER and appending an additional value,
336    which is the rank of the observation.   W is the weight variable
337    of the dictionary containing V, or NULL if there is no weight
338    variable.
339
340    The following preconditions must be met:
341
342    1.    SUBREADER must be sorted on V.
343
344    2.    The weight variables, must be non-negative.
345
346    If either of these preconditions are not satisfied, then the rank
347    variables may not be correct.  In this case, if ERR is non-null,
348    it will be set according to the erroneous conditions encountered.
349
350    If DISTINCT_CALLBACK is non-null, then  it will be called exactly
351    once for every case containing a distinct value of V.  AUX is
352    an auxiliary pointer passed to DISTINCT_CALLBACK.
353
354    After this function is called, SUBREADER must not ever again
355    be referenced directly.  It will be destroyed automatically
356    when the translating casereader is destroyed. */
357 struct casereader *
358 casereader_create_append_rank (struct casereader *subreader,
359                                const struct variable *v,
360                                const struct variable *w,
361                                enum rank_error *err,
362                                distinct_func *distinct_callback,
363                                void *aux
364                         )
365 {
366   struct casereader_append_rank *car = xmalloc (sizeof *car);
367   car->proto = caseproto_ref (casereader_get_proto (subreader));
368   car->proto = caseproto_add_width (car->proto, 0);
369   car->weight = w;
370   car->var = v;
371   car->n = 0;
372   car->n_common = 1;
373   car->cc = 0.0;
374   car->clone = casereader_clone (subreader);
375   car->distinct = distinct_callback;
376   car->aux = aux;
377   car->err = err;
378   car->prev_value = SYSMIS;
379
380   static const struct casereader_translator_class class = {
381     car_translate, car_destroy
382   };
383   return casereader_create_translator (subreader, car->proto, &class, car);
384 }
385
386
387 static bool
388 car_destroy (void *car_)
389 {
390   struct casereader_append_rank *car = car_;
391   casereader_destroy (car->clone);
392   caseproto_unref (car->proto);
393   free (car);
394   return true;
395 }
396
397 static struct ccase *
398 car_translate (struct ccase *input, void *car_)
399 {
400   struct casereader_append_rank *car = car_;
401
402   const double value = case_num (input, car->var);
403
404   if (car->prev_value != SYSMIS)
405     {
406       if (car->err && value < car->prev_value)
407         *car->err |= RANK_ERR_UNSORTED;
408     }
409
410   if (car->n_common == 1)
411     {
412       double vxx = SYSMIS;
413       casenumber k = 0;
414       double weight = 1.0;
415       if (car->weight)
416         {
417           weight = case_num (input, car->weight);
418           if (car->err && weight < 0)
419             *car->err |= RANK_ERR_NEGATIVE_WEIGHT;
420         }
421
422       do
423         {
424           struct ccase *c = casereader_peek (car->clone, car->n + ++k);
425           if (c == NULL)
426             break;
427           vxx = case_num (c, car->var);
428
429           if (vxx == value)
430             {
431               if (car->weight)
432                 {
433                   double w = case_num (c, car->weight);
434
435                   if (car->err && w < 0)
436                     *car->err |= RANK_ERR_NEGATIVE_WEIGHT;
437
438                   weight += w;
439                 }
440               else
441                 weight += 1.0;
442               car->n_common++;
443             }
444           case_unref (c);
445         }
446       while (vxx == value);
447       car->mean_rank = car->cc + (weight + 1) / 2.0;
448       car->cc += weight;
449
450       if (car->distinct)
451         car->distinct (value, car->n_common, weight, car->aux);
452     }
453   else
454     car->n_common--;
455
456   car->n++;
457
458   input = case_unshare_and_resize (input, car->proto);
459   *case_num_rw_idx (input, caseproto_get_n_widths (car->proto) - 1)
460     = car->mean_rank;
461   car->prev_value = value;
462   return input;
463 }
464
465
466 \f
467
468 struct consolidator
469 {
470   const struct variable *key;
471   const struct variable *weight;
472   double cc;
473   double prev_cc;
474
475   casenumber n;
476   struct casereader *clone;
477   struct caseproto *proto;
478   int direction;
479 };
480
481 static bool
482 uniquify (const struct ccase *c, void *aux)
483 {
484   struct consolidator *cdr = aux;
485   const union value *current_value = case_data (c, cdr->key);
486   const int key_width = var_get_width (cdr->key);
487   const double weight = cdr->weight ? case_num (c, cdr->weight) : 1.0;
488   struct ccase *next_case = casereader_peek (cdr->clone, cdr->n + 1);
489   int dir = 0;
490
491   cdr->n ++;
492   cdr->cc += weight;
493
494   if (NULL == next_case)
495       goto end;
496
497   dir = value_compare_3way (case_data (next_case, cdr->key),
498                             current_value, key_width);
499   if (dir > 0)
500     dir = 1;
501   if (dir < 0)
502     dir = -1;
503       
504   case_unref (next_case);
505   if (dir != 0)
506     {
507       /* Insist that the data are sorted */
508       assert (cdr->direction == 0 || dir == cdr->direction);
509       cdr->direction = dir;
510       goto end;
511     }
512
513   return false;
514
515  end:
516   cdr->prev_cc = cdr->cc;
517   cdr->cc = 0;
518   return true;
519 }
520
521
522
523 static struct ccase *
524 consolodate_weight (struct ccase *input, void *aux)
525 {
526   struct consolidator *cdr = aux;
527   struct ccase *c;
528
529   if (cdr->weight)
530     {
531       c = case_unshare (input);
532       *case_num_rw (c, cdr->weight) = cdr->prev_cc;
533     }
534   else
535     {
536       c = case_unshare_and_resize (input, cdr->proto);
537       *case_num_rw_idx (c, caseproto_get_n_widths (cdr->proto) - 1) = cdr->prev_cc;
538     }
539
540   return c;
541 }
542
543
544 static bool
545 uniquify_destroy (void *aux)
546 {
547   struct consolidator *cdr = aux;
548
549   casereader_destroy (cdr->clone);
550   caseproto_unref (cdr->proto);
551   free (cdr);
552
553   return true;
554 }
555
556
557
558 /* Returns a new casereader which is based upon INPUT, but which contains a maximum
559    of one case for each distinct value of KEY.
560    If WEIGHT is non-null, then the new casereader's values for this variable
561    will be the sum of all values matching KEY.
562    IF WEIGHT is null, then the new casereader will have an additional numeric
563    value appended, which will contain the total number of cases containing
564    KEY.
565    INPUT must be sorted on KEY
566 */
567 struct casereader *
568 casereader_create_distinct (struct casereader *input,
569                                                const struct variable *key,
570                                                const struct variable *weight)
571 {
572   struct casereader *u ;
573   struct caseproto *output_proto = caseproto_ref (casereader_get_proto (input));
574
575   struct consolidator *cdr = xmalloc (sizeof (*cdr));
576   cdr->n = 0;
577   cdr->key = key;
578   cdr->weight = weight;
579   cdr->cc = 0;
580   cdr->clone = casereader_clone (input);
581   cdr->direction = 0;
582
583   if (NULL == cdr->weight)
584     output_proto = caseproto_add_width (output_proto, 0);
585
586   cdr->proto = output_proto;
587
588   u = casereader_create_filter_func (input, uniquify,
589                                      NULL, cdr, NULL);
590
591   static const struct casereader_translator_class class = {
592     consolodate_weight, uniquify_destroy,
593   };
594   return casereader_create_translator (u, output_proto, &class, cdr);
595 }
596