31ee82c1f627e19b19f6846aff860d6f1c450463
[pspp] / src / data / casereader-translator.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2007, 2009, 2011 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <stdlib.h>
20
21 #include "data/casereader-provider.h"
22 #include "data/casereader.h"
23 #include "data/val-type.h"
24 #include "data/variable.h"
25 #include "libpspp/taint.h"
26
27 #include "gl/xalloc.h"
28
29 /* Casereader that applies a user-supplied function to translate
30    each case into another in an arbitrary fashion. */
31
32 /* A translating casereader. */
33 struct casereader_translator
34   {
35     struct casereader *subreader; /* Source of input cases. */
36     const struct casereader_translator_class *class;
37     void *aux;
38   };
39
40 static const struct casereader_class casereader_translator_class;
41
42 /* Creates and returns a new casereader whose cases are produced
43    by reading from SUBREADER and passing through TRANSLATE, which
44    must return the translated case, and populate it based on
45    INPUT and auxiliary data AUX.  TRANSLATE must destroy its
46    input case.
47
48    TRANSLATE may be stateful, that is, the output for a given
49    case may depend on previous cases.  If TRANSLATE is stateless,
50    then you may want to use casereader_translate_stateless
51    instead, since it sometimes performs better.
52
53    The cases returned by TRANSLATE must match OUTPUT_PROTO.
54
55    When the translating casereader is destroyed, DESTROY will be
56    called to allow any state maintained by TRANSLATE to be freed.
57
58    After this function is called, SUBREADER must not ever again
59    be referenced directly.  It will be destroyed automatically
60    when the translating casereader is destroyed. */
61 struct casereader *
62 casereader_create_translator (struct casereader *subreader,
63                               const struct caseproto *output_proto,
64                               const struct casereader_translator_class *class,
65                               void *aux)
66 {
67   struct casereader_translator *ct = xmalloc (sizeof *ct);
68   *ct = (struct casereader_translator) {
69     .subreader = casereader_rename (subreader),
70     .class = class,
71     .aux = aux,
72   };
73
74   struct casereader *reader = casereader_create_sequential (
75     NULL, output_proto, casereader_get_n_cases (ct->subreader),
76     &casereader_translator_class, ct);
77   taint_propagate (casereader_get_taint (ct->subreader),
78                    casereader_get_taint (reader));
79   return reader;
80 }
81
82 /* Internal read function for translating casereader. */
83 static struct ccase *
84 casereader_translator_read (struct casereader *reader UNUSED,
85                             void *ct_)
86 {
87   struct casereader_translator *ct = ct_;
88   struct ccase *tmp = casereader_read (ct->subreader);
89   if (tmp)
90     tmp = ct->class->translate (tmp, ct->aux);
91   return tmp;
92 }
93
94 /* Internal destroy function for translating casereader. */
95 static void
96 casereader_translator_destroy (struct casereader *reader UNUSED, void *ct_)
97 {
98   struct casereader_translator *ct = ct_;
99   casereader_destroy (ct->subreader);
100   ct->class->destroy (ct->aux);
101   free (ct);
102 }
103
104 /* Casereader class for translating casereader. */
105 static const struct casereader_class casereader_translator_class =
106   {
107     casereader_translator_read,
108     casereader_translator_destroy,
109     NULL,
110     NULL,
111   };
112 \f
113 /* Casereader that applies a user-supplied function to translate
114    each case into another in a stateless fashion. */
115
116 static const struct casereader_random_class
117 casereader_stateless_translator_class;
118
119 /* Creates and returns a new casereader whose cases are produced by reading
120    from SUBREADER and passing through the TRANSLATE function.  TRANSLATE must
121    takes ownership of its input case and returns a translated case, populating
122    the translated case based on INPUT and auxiliary data AUX.
123
124    TRANSLATE must be stateless, that is, the output for a given case must not
125    depend on previous cases.  This is because cases may be retrieved in
126    arbitrary order, and some cases may be retrieved multiple times, and some
127    cases may be skipped and never retrieved at all.  If TRANSLATE is stateful,
128    use casereader_create_translator instead.
129
130    The cases returned by TRANSLATE must match OUTPUT_PROTO.
131
132    When the stateless translating casereader is destroyed, DESTROY will be
133    called to allow any auxiliary data maintained by TRANSLATE to be freed.
134
135    After this function is called, SUBREADER must not ever again be referenced
136    directly.  It will be destroyed automatically when the translating
137    casereader is destroyed. */
138 struct casereader *
139 casereader_translate_stateless (
140   struct casereader *subreader,
141   const struct caseproto *output_proto,
142   const struct casereader_translator_class *class,
143   void *aux)
144 {
145   struct casereader_translator *ct = xmalloc (sizeof *ct);
146   *ct = (struct casereader_translator) {
147     .subreader = casereader_rename (subreader),
148     .class = class,
149     .aux = aux,
150   };
151
152   struct casereader *reader = casereader_create_random (
153     output_proto, casereader_get_n_cases (ct->subreader),
154     &casereader_stateless_translator_class, ct);
155   taint_propagate (casereader_get_taint (ct->subreader),
156                    casereader_get_taint (reader));
157   return reader;
158 }
159
160 /* Internal read function for stateless translating casereader. */
161 static struct ccase *
162 casereader_stateless_translator_read (struct casereader *reader UNUSED,
163                                       void *ct_, casenumber idx)
164 {
165   struct casereader_translator *ct = ct_;
166   struct ccase *tmp = casereader_peek (ct->subreader, idx);
167   if (tmp != NULL)
168     tmp = ct->class->translate (tmp, ct->aux);
169   return tmp;
170 }
171
172 /* Internal destroy function for translating casereader. */
173 static void
174 casereader_stateless_translator_destroy (struct casereader *reader UNUSED,
175                                          void *ct_)
176 {
177   struct casereader_translator *ct = ct_;
178   casereader_destroy (ct->subreader);
179   ct->class->destroy (ct->aux);
180   free (ct);
181 }
182
183 /* Casereader class for stateless translating casereader. */
184 static const struct casereader_random_class
185 casereader_stateless_translator_class =
186   {
187     casereader_stateless_translator_read,
188     casereader_stateless_translator_destroy,
189     NULL,
190   };
191 \f
192
193 struct casereader_append_numeric
194 {
195   struct caseproto *proto;
196   casenumber n;
197   new_value_func *func;
198   void *aux;
199   void (*destroy) (void *aux);
200 };
201
202 static bool can_destroy (void *can_);
203
204 static struct ccase *can_translate (struct ccase *, void *can_);
205
206 /* Creates and returns a new casereader whose cases are produced
207    by reading from SUBREADER and appending an additional value,
208    generated by FUNC.  AUX is an optional parameter which
209    gets passed to FUNC. FUNC will also receive N as it, which is
210    the ordinal number of the case in the reader.  DESTROY is an
211    optional parameter used to destroy AUX.
212
213    After this function is called, SUBREADER must not ever again
214    be referenced directly.  It will be destroyed automatically
215    when the translating casereader is destroyed. */
216 struct casereader *
217 casereader_create_append_numeric (struct casereader *subreader,
218                                   new_value_func func, void *aux,
219                                   void (*destroy) (void *aux))
220 {
221   struct casereader_append_numeric *can = xmalloc (sizeof *can);
222   can->proto = caseproto_ref (casereader_get_proto (subreader));
223   can->proto = caseproto_add_width (can->proto, 0);
224   can->n = 0;
225   can->aux = aux;
226   can->func = func;
227   can->destroy = destroy;
228
229   static const struct casereader_translator_class class = {
230     can_translate, can_destroy,
231   };
232   return casereader_create_translator (subreader, can->proto, &class, can);
233 }
234
235
236 static struct ccase *
237 can_translate (struct ccase *c, void *can_)
238 {
239   struct casereader_append_numeric *can = can_;
240   double new_value = can->func (c, can->n++, can->aux);
241   c = case_unshare_and_resize (c, can->proto);
242   *case_num_rw_idx (c, caseproto_get_n_widths (can->proto) - 1) = new_value;
243   return c;
244 }
245
246 static bool
247 can_destroy (void *can_)
248 {
249   struct casereader_append_numeric *can = can_;
250   if (can->destroy)
251     can->destroy (can->aux);
252   caseproto_unref (can->proto);
253   free (can);
254   return true;
255 }
256
257 \f
258
259 struct arithmetic_sequence
260 {
261   double first;
262   double increment;
263 };
264
265 static double
266 next_arithmetic (const struct ccase *c UNUSED,
267                  casenumber n,
268                  void *aux)
269 {
270   struct arithmetic_sequence *as = aux;
271   return n * as->increment + as->first;
272 }
273
274 /* Creates and returns a new casereader whose cases are produced
275    by reading from SUBREADER and appending an additional value,
276    which takes the value FIRST in the first case, FIRST +
277    INCREMENT in the second case, FIRST + INCREMENT * 2 in the
278    third case, and so on.
279
280    After this function is called, SUBREADER must not ever again
281    be referenced directly.  It will be destroyed automatically
282    when the translating casereader is destroyed. */
283 struct casereader *
284 casereader_create_arithmetic_sequence (struct casereader *subreader,
285                                        double first, double increment)
286 {
287   struct arithmetic_sequence *as = XZALLOC (struct arithmetic_sequence);
288   as->first = first;
289   as->increment = increment;
290   return casereader_create_append_numeric (subreader, next_arithmetic,
291                                            as, free);
292 }
293
294
295 \f
296
297 struct casereader_append_rank
298 {
299   struct casereader *clone;
300   casenumber n;
301   const struct variable *var;
302   const struct variable *weight;
303   struct caseproto *proto;
304   casenumber n_common;
305   double mean_rank;
306   double cc;
307   distinct_func *distinct;
308   void *aux;
309   enum rank_error *err;
310   double prev_value;
311 };
312
313 static bool car_destroy (void *car_);
314
315 static struct ccase *car_translate (struct ccase *input, void *car_);
316
317 /* Creates and returns a new casereader whose cases are produced
318    by reading from SUBREADER and appending an additional value,
319    which is the rank of the observation.   W is the weight variable
320    of the dictionary containing V, or NULL if there is no weight
321    variable.
322
323    The following preconditions must be met:
324
325    1.    SUBREADER must be sorted on V.
326
327    2.    The weight variables, must be non-negative.
328
329    If either of these preconditions are not satisfied, then the rank
330    variables may not be correct.  In this case, if ERR is non-null,
331    it will be set according to the erroneous conditions encountered.
332
333    If DISTINCT_CALLBACK is non-null, then  it will be called exactly
334    once for every case containing a distinct value of V.  AUX is
335    an auxiliary pointer passed to DISTINCT_CALLBACK.
336
337    After this function is called, SUBREADER must not ever again
338    be referenced directly.  It will be destroyed automatically
339    when the translating casereader is destroyed. */
340 struct casereader *
341 casereader_create_append_rank (struct casereader *subreader,
342                                const struct variable *v,
343                                const struct variable *w,
344                                enum rank_error *err,
345                                distinct_func *distinct_callback,
346                                void *aux
347                         )
348 {
349   struct casereader_append_rank *car = xmalloc (sizeof *car);
350   car->proto = caseproto_ref (casereader_get_proto (subreader));
351   car->proto = caseproto_add_width (car->proto, 0);
352   car->weight = w;
353   car->var = v;
354   car->n = 0;
355   car->n_common = 1;
356   car->cc = 0.0;
357   car->clone = casereader_clone (subreader);
358   car->distinct = distinct_callback;
359   car->aux = aux;
360   car->err = err;
361   car->prev_value = SYSMIS;
362
363   static const struct casereader_translator_class class = {
364     car_translate, car_destroy
365   };
366   return casereader_create_translator (subreader, car->proto, &class, car);
367 }
368
369
370 static bool
371 car_destroy (void *car_)
372 {
373   struct casereader_append_rank *car = car_;
374   casereader_destroy (car->clone);
375   caseproto_unref (car->proto);
376   free (car);
377   return true;
378 }
379
380 static struct ccase *
381 car_translate (struct ccase *input, void *car_)
382 {
383   struct casereader_append_rank *car = car_;
384
385   const double value = case_num (input, car->var);
386
387   if (car->prev_value != SYSMIS)
388     {
389       if (car->err && value < car->prev_value)
390         *car->err |= RANK_ERR_UNSORTED;
391     }
392
393   if (car->n_common == 1)
394     {
395       double vxx = SYSMIS;
396       casenumber k = 0;
397       double weight = 1.0;
398       if (car->weight)
399         {
400           weight = case_num (input, car->weight);
401           if (car->err && weight < 0)
402             *car->err |= RANK_ERR_NEGATIVE_WEIGHT;
403         }
404
405       do
406         {
407           struct ccase *c = casereader_peek (car->clone, car->n + ++k);
408           if (c == NULL)
409             break;
410           vxx = case_num (c, car->var);
411
412           if (vxx == value)
413             {
414               if (car->weight)
415                 {
416                   double w = case_num (c, car->weight);
417
418                   if (car->err && w < 0)
419                     *car->err |= RANK_ERR_NEGATIVE_WEIGHT;
420
421                   weight += w;
422                 }
423               else
424                 weight += 1.0;
425               car->n_common++;
426             }
427           case_unref (c);
428         }
429       while (vxx == value);
430       car->mean_rank = car->cc + (weight + 1) / 2.0;
431       car->cc += weight;
432
433       if (car->distinct)
434         car->distinct (value, car->n_common, weight, car->aux);
435     }
436   else
437     car->n_common--;
438
439   car->n++;
440
441   input = case_unshare_and_resize (input, car->proto);
442   *case_num_rw_idx (input, caseproto_get_n_widths (car->proto) - 1)
443     = car->mean_rank;
444   car->prev_value = value;
445   return input;
446 }
447
448
449 \f
450
451 struct consolidator
452 {
453   const struct variable *key;
454   const struct variable *weight;
455   double cc;
456   double prev_cc;
457
458   casenumber n;
459   struct casereader *clone;
460   struct caseproto *proto;
461   int direction;
462 };
463
464 static bool
465 uniquify (const struct ccase *c, void *aux)
466 {
467   struct consolidator *cdr = aux;
468   const union value *current_value = case_data (c, cdr->key);
469   const int key_width = var_get_width (cdr->key);
470   const double weight = cdr->weight ? case_num (c, cdr->weight) : 1.0;
471   struct ccase *next_case = casereader_peek (cdr->clone, cdr->n + 1);
472   int dir = 0;
473
474   cdr->n ++;
475   cdr->cc += weight;
476
477   if (NULL == next_case)
478       goto end;
479
480   dir = value_compare_3way (case_data (next_case, cdr->key),
481                             current_value, key_width);
482   if (dir > 0)
483     dir = 1;
484   if (dir < 0)
485     dir = -1;
486       
487   case_unref (next_case);
488   if (dir != 0)
489     {
490       /* Insist that the data are sorted */
491       assert (cdr->direction == 0 || dir == cdr->direction);
492       cdr->direction = dir;
493       goto end;
494     }
495
496   return false;
497
498  end:
499   cdr->prev_cc = cdr->cc;
500   cdr->cc = 0;
501   return true;
502 }
503
504
505
506 static struct ccase *
507 consolodate_weight (struct ccase *input, void *aux)
508 {
509   struct consolidator *cdr = aux;
510   struct ccase *c;
511
512   if (cdr->weight)
513     {
514       c = case_unshare (input);
515       *case_num_rw (c, cdr->weight) = cdr->prev_cc;
516     }
517   else
518     {
519       c = case_unshare_and_resize (input, cdr->proto);
520       *case_num_rw_idx (c, caseproto_get_n_widths (cdr->proto) - 1) = cdr->prev_cc;
521     }
522
523   return c;
524 }
525
526
527 static bool
528 uniquify_destroy (void *aux)
529 {
530   struct consolidator *cdr = aux;
531
532   casereader_destroy (cdr->clone);
533   caseproto_unref (cdr->proto);
534   free (cdr);
535
536   return true;
537 }
538
539
540
541 /* Returns a new casereader which is based upon INPUT, but which contains a maximum
542    of one case for each distinct value of KEY.
543    If WEIGHT is non-null, then the new casereader's values for this variable
544    will be the sum of all values matching KEY.
545    IF WEIGHT is null, then the new casereader will have an additional numeric
546    value appended, which will contain the total number of cases containing
547    KEY.
548    INPUT must be sorted on KEY
549 */
550 struct casereader *
551 casereader_create_distinct (struct casereader *input,
552                                                const struct variable *key,
553                                                const struct variable *weight)
554 {
555   struct casereader *u ;
556   struct caseproto *output_proto = caseproto_ref (casereader_get_proto (input));
557
558   struct consolidator *cdr = xmalloc (sizeof (*cdr));
559   cdr->n = 0;
560   cdr->key = key;
561   cdr->weight = weight;
562   cdr->cc = 0;
563   cdr->clone = casereader_clone (input);
564   cdr->direction = 0;
565
566   if (NULL == cdr->weight)
567     output_proto = caseproto_add_width (output_proto, 0);
568
569   cdr->proto = output_proto;
570
571   u = casereader_create_filter_func (input, uniquify,
572                                      NULL, cdr, NULL);
573
574   static const struct casereader_translator_class class = {
575     consolodate_weight, uniquify_destroy,
576   };
577   return casereader_create_translator (u, output_proto, &class, cdr);
578 }
579