fc823049d570df34863291ab4b0650f2cba6d26b
[pspp-builds.git] / src / data / casereader-translator.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2007, 2009 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18 #include <data/val-type.h>
19 #include <data/casereader.h>
20 #include <stdlib.h>
21
22 #include <data/variable.h>
23 #include <data/casereader-provider.h>
24 #include <libpspp/taint.h>
25
26 #include "xalloc.h"
27
28 /* Casereader that applies a user-supplied function to translate
29    each case into another in an arbitrary fashion. */
30
31 /* A translating casereader. */
32 struct casereader_translator
33   {
34     struct casereader *subreader; /* Source of input cases. */
35
36     struct ccase *(*translate) (struct ccase *input, void *aux);
37     bool (*destroy) (void *aux);
38     void *aux;
39   };
40
41 static const struct casereader_class casereader_translator_class;
42
43 /* Creates and returns a new casereader whose cases are produced
44    by reading from SUBREADER and passing through TRANSLATE, which
45    must return the translated case, and populate it based on
46    INPUT and auxiliary data AUX.  TRANSLATE must destroy its
47    input case.
48
49    TRANSLATE may be stateful, that is, the output for a given
50    case may depend on previous cases.  If TRANSLATE is stateless,
51    then you may want to use casereader_translate_stateless
52    instead, since it sometimes performs better.
53
54    The cases returned by TRANSLATE must match OUTPUT_PROTO.
55
56    When the translating casereader is destroyed, DESTROY will be
57    called to allow any state maintained by TRANSLATE to be freed.
58
59    After this function is called, SUBREADER must not ever again
60    be referenced directly.  It will be destroyed automatically
61    when the translating casereader is destroyed. */
62 struct casereader *
63 casereader_create_translator (struct casereader *subreader,
64                               const struct caseproto *output_proto,
65                               struct ccase *(*translate) (struct ccase *input,
66                                                           void *aux),
67                               bool (*destroy) (void *aux),
68                               void *aux)
69 {
70   struct casereader_translator *ct = xmalloc (sizeof *ct);
71   struct casereader *reader;
72   ct->subreader = casereader_rename (subreader);
73   ct->translate = translate;
74   ct->destroy = destroy;
75   ct->aux = aux;
76   reader = casereader_create_sequential (
77     NULL, output_proto, casereader_get_case_cnt (ct->subreader),
78     &casereader_translator_class, ct);
79   taint_propagate (casereader_get_taint (ct->subreader),
80                    casereader_get_taint (reader));
81   return reader;
82 }
83
84 /* Internal read function for translating casereader. */
85 static struct ccase *
86 casereader_translator_read (struct casereader *reader UNUSED,
87                             void *ct_)
88 {
89   struct casereader_translator *ct = ct_;
90   struct ccase *tmp = casereader_read (ct->subreader);
91   if (tmp)
92     tmp = ct->translate (tmp, ct->aux);
93   return tmp;
94 }
95
96 /* Internal destroy function for translating casereader. */
97 static void
98 casereader_translator_destroy (struct casereader *reader UNUSED, void *ct_)
99 {
100   struct casereader_translator *ct = ct_;
101   casereader_destroy (ct->subreader);
102   ct->destroy (ct->aux);
103   free (ct);
104 }
105
106 /* Casereader class for translating casereader. */
107 static const struct casereader_class casereader_translator_class =
108   {
109     casereader_translator_read,
110     casereader_translator_destroy,
111     NULL,
112     NULL,
113   };
114 \f
115 /* Casereader that applies a user-supplied function to translate
116    each case into another in a stateless fashion. */
117
118 /* A statelessly translating casereader. */
119 struct casereader_stateless_translator
120   {
121     struct casereader *subreader; /* Source of input cases. */
122
123     casenumber case_offset;
124     struct ccase *(*translate) (struct ccase *input, casenumber,
125                                 const void *aux);
126     bool (*destroy) (void *aux);
127     void *aux;
128   };
129
130 static const struct casereader_random_class
131 casereader_stateless_translator_class;
132
133 /* Creates and returns a new casereader whose cases are produced by reading
134    from SUBREADER and passing through the TRANSLATE function.  TRANSLATE must
135    takes ownership of its input case and returns a translated case, populating
136    the translated case based on INPUT and auxiliary data AUX.
137
138    TRANSLATE must be stateless, that is, the output for a given case must not
139    depend on previous cases.  This is because cases may be retrieved in
140    arbitrary order, and some cases may be retrieved multiple times, and some
141    cases may be skipped and never retrieved at all.  If TRANSLATE is stateful,
142    use casereader_create_translator instead.
143
144    The casenumber argument to the TRANSLATE function is the absolute case
145    number in SUBREADER, that is, 0 when the first case in SUBREADER is being
146    translated, 1 when the second case is being translated, and so on.
147
148    The cases returned by TRANSLATE must match OUTPUT_PROTO.
149
150    When the stateless translating casereader is destroyed, DESTROY will be
151    called to allow any auxiliary data maintained by TRANSLATE to be freed.
152
153    After this function is called, SUBREADER must not ever again be referenced
154    directly.  It will be destroyed automatically when the translating
155    casereader is destroyed. */
156 struct casereader *
157 casereader_translate_stateless (
158   struct casereader *subreader,
159   const struct caseproto *output_proto,
160   struct ccase *(*translate) (struct ccase *input, casenumber,
161                               const void *aux),
162   bool (*destroy) (void *aux),
163   void *aux)
164 {
165   struct casereader_stateless_translator *cst = xmalloc (sizeof *cst);
166   struct casereader *reader;
167   cst->subreader = casereader_rename (subreader);
168   cst->translate = translate;
169   cst->destroy = destroy;
170   cst->aux = aux;
171   reader = casereader_create_random (
172     output_proto, casereader_get_case_cnt (cst->subreader),
173     &casereader_stateless_translator_class, cst);
174   taint_propagate (casereader_get_taint (cst->subreader),
175                    casereader_get_taint (reader));
176   return reader;
177 }
178
179 /* Internal read function for stateless translating casereader. */
180 static struct ccase *
181 casereader_stateless_translator_read (struct casereader *reader UNUSED,
182                                       void *cst_, casenumber idx)
183 {
184   struct casereader_stateless_translator *cst = cst_;
185   struct ccase *tmp = casereader_peek (cst->subreader, idx);
186   if (tmp != NULL)
187     tmp = cst->translate (tmp, cst->case_offset + idx, cst->aux);
188   return tmp;
189 }
190
191 /* Internal destroy function for translating casereader. */
192 static void
193 casereader_stateless_translator_destroy (struct casereader *reader UNUSED,
194                                          void *cst_)
195 {
196   struct casereader_stateless_translator *cst = cst_;
197   casereader_destroy (cst->subreader);
198   cst->destroy (cst->aux);
199   free (cst);
200 }
201
202 static void
203 casereader_stateless_translator_advance (struct casereader *reader UNUSED,
204                                          void *cst_, casenumber cnt)
205 {
206   struct casereader_stateless_translator *cst = cst_;
207   cst->case_offset += casereader_advance (cst->subreader, cnt);
208 }
209
210 /* Casereader class for stateless translating casereader. */
211 static const struct casereader_random_class
212 casereader_stateless_translator_class =
213   {
214     casereader_stateless_translator_read,
215     casereader_stateless_translator_destroy,
216     casereader_stateless_translator_advance,
217   };
218 \f
219
220 struct casereader_append_numeric
221 {
222   struct caseproto *proto;
223   casenumber n;
224   new_value_func *func;
225   void *aux;
226   void (*destroy) (void *aux);
227 };
228
229 static bool can_destroy (void *can_);
230
231 static struct ccase *can_translate (struct ccase *, void *can_);
232
233 /* Creates and returns a new casereader whose cases are produced
234    by reading from SUBREADER and appending an additional value,
235    generated by FUNC.  AUX is an optional parameter which
236    gets passed to FUNC. FUNC will also receive N as it, which is
237    the ordinal number of the case in the reader.  DESTROY is an
238    optional parameter used to destroy AUX.
239
240    After this function is called, SUBREADER must not ever again
241    be referenced directly.  It will be destroyed automatically
242    when the translating casereader is destroyed. */
243 struct casereader *
244 casereader_create_append_numeric (struct casereader *subreader,
245                                   new_value_func func, void *aux,
246                                   void (*destroy) (void *aux))
247 {
248   struct casereader_append_numeric *can = xmalloc (sizeof *can);
249   can->proto = caseproto_ref (casereader_get_proto (subreader));
250   can->proto = caseproto_add_width (can->proto, 0);
251   can->n = 0;
252   can->aux = aux;
253   can->func = func;
254   can->destroy = destroy;
255   return casereader_create_translator (subreader, can->proto,
256                                        can_translate, can_destroy, can);
257 }
258
259
260 static struct ccase *
261 can_translate (struct ccase *c, void *can_)
262 {
263   struct casereader_append_numeric *can = can_;
264   double new_value = can->func (c, can->n++, can->aux);
265   c = case_unshare_and_resize (c, can->proto);
266   case_data_rw_idx (c, caseproto_get_n_widths (can->proto) - 1)->f = new_value;
267   return c;
268 }
269
270 static bool
271 can_destroy (void *can_)
272 {
273   struct casereader_append_numeric *can = can_;
274   if (can->destroy)
275     can->destroy (can->aux);
276   caseproto_unref (can->proto);
277   free (can);
278   return true;
279 }
280
281 \f
282
283 struct arithmetic_sequence
284 {
285   double first;
286   double increment;
287 };
288
289 static double
290 next_arithmetic (const struct ccase *c UNUSED,
291                  casenumber n,
292                  void *aux)
293 {
294   struct arithmetic_sequence *as = aux;
295   return n * as->increment + as->first;
296 }
297
298 /* Creates and returns a new casereader whose cases are produced
299    by reading from SUBREADER and appending an additional value,
300    which takes the value FIRST in the first case, FIRST +
301    INCREMENT in the second case, FIRST + INCREMENT * 2 in the
302    third case, and so on.
303
304    After this function is called, SUBREADER must not ever again
305    be referenced directly.  It will be destroyed automatically
306    when the translating casereader is destroyed. */
307 struct casereader *
308 casereader_create_arithmetic_sequence (struct casereader *subreader,
309                                        double first, double increment)
310 {
311   struct arithmetic_sequence *as = xzalloc (sizeof *as);
312   as->first = first;
313   as->increment = increment;
314   return casereader_create_append_numeric (subreader, next_arithmetic,
315                                            as, free);
316 }
317
318
319 \f
320
321 struct casereader_append_rank
322 {
323   struct casereader *clone;
324   casenumber n;
325   const struct variable *var;
326   const struct variable *weight;
327   struct caseproto *proto;
328   casenumber n_common;
329   double mean_rank;
330   double cc;
331   distinct_func *distinct;
332   void *aux;
333   enum rank_error *err;
334   double prev_value;
335 };
336
337 static bool car_destroy (void *car_);
338
339 static struct ccase *car_translate (struct ccase *input, void *car_);
340
341 /* Creates and returns a new casereader whose cases are produced
342    by reading from SUBREADER and appending an additional value,
343    which is the rank of the observation.   W is the weight variable
344    of the dictionary containing V, or NULL if there is no weight
345    variable.
346
347    The following preconditions must be met:
348
349    1.    SUBREADER must be sorted on V.
350
351    2.    The weight variables, must be non-negative.
352
353    If either of these preconditions are not satisfied, then the rank
354    variables may not be correct.  In this case, if ERR is non-null,
355    it will be set according to the erroneous conditions encountered.
356
357    If DISTINCT_CALLBACK is non-null, then  it will be called exactly
358    once for every case containing a distinct value of V.  AUX is
359    an auxilliary pointer passed to DISTINCT_CALLBACK.
360
361    After this function is called, SUBREADER must not ever again
362    be referenced directly.  It will be destroyed automatically
363    when the translating casereader is destroyed. */
364 struct casereader *
365 casereader_create_append_rank (struct casereader *subreader,
366                                const struct variable *v,
367                                const struct variable *w,
368                                enum rank_error *err,
369                                distinct_func *distinct_callback,
370                                void *aux
371                                )
372 {
373   struct casereader_append_rank *car = xmalloc (sizeof *car);
374   car->proto = caseproto_ref (casereader_get_proto (subreader));
375   car->proto = caseproto_add_width (car->proto, 0);
376   car->weight = w;
377   car->var = v;
378   car->n = 0;
379   car->n_common = 1;
380   car->cc = 0.0;
381   car->clone = casereader_clone (subreader);
382   car->distinct = distinct_callback;
383   car->aux = aux;
384   car->err = err;
385   car->prev_value = SYSMIS;
386
387   return casereader_create_translator (subreader, car->proto,
388                                        car_translate, car_destroy, car);
389 }
390
391
392 static bool
393 car_destroy (void *car_)
394 {
395   struct casereader_append_rank *car = car_;
396   casereader_destroy (car->clone);
397   caseproto_unref (car->proto);
398   free (car);
399   return true;
400 }
401
402 static struct ccase *
403 car_translate (struct ccase *input, void *car_)
404 {
405   struct casereader_append_rank *car = car_;
406
407   const double value = case_data (input, car->var)->f;
408
409   if ( car->prev_value != SYSMIS)
410     {
411       if (car->err && value < car->prev_value)
412         *car->err |= RANK_ERR_UNSORTED;
413     }
414
415   if ( car->n_common == 1)
416     {
417       double vxx = SYSMIS;
418       casenumber k = 0;
419       double weight = 1.0;
420       if (car->weight)
421         {
422           weight = case_data (input, car->weight)->f;
423           if ( car->err && weight < 0 )
424             *car->err |= RANK_ERR_NEGATIVE_WEIGHT;
425         }
426
427       do
428         {
429           struct ccase *c = casereader_peek (car->clone, car->n + ++k);
430           if (c == NULL)
431             break;
432           vxx = case_data (c, car->var)->f;
433
434           if ( vxx == value)
435             {
436               if (car->weight)
437                 {
438                   double w = case_data (c, car->weight)->f;
439
440                   if ( car->err && w < 0 )
441                     *car->err |= RANK_ERR_NEGATIVE_WEIGHT;
442
443                   weight += w;
444                 }
445               else
446                 weight += 1.0;
447               car->n_common++;
448             }
449           case_unref (c);
450         }
451       while (vxx == value);
452       car->mean_rank = car->cc + (weight + 1) / 2.0;
453       car->cc += weight;
454
455       if (car->distinct)
456         car->distinct (value, car->n_common, weight, car->aux);
457     }
458   else
459     car->n_common--;
460
461   car->n++;
462
463   input = case_unshare_and_resize (input, car->proto);
464   case_data_rw_idx (input, caseproto_get_n_widths (car->proto) - 1)->f
465     = car->mean_rank;
466   car->prev_value = value;
467   return input;
468 }
469
470
471 \f
472
473 struct consolidator
474 {
475   const struct variable *key;
476   const struct variable *weight;
477   double cc;
478   double prev_cc;
479
480   casenumber n;
481   struct casereader *clone;
482   struct caseproto *proto;
483   int direction;
484 };
485
486 static bool
487 uniquify (const struct ccase *c, void *aux)
488 {
489   struct consolidator *cdr = aux;
490   const union value *current_value = case_data (c, cdr->key);
491   const int key_width = var_get_width (cdr->key);
492   const double weight = cdr->weight ? case_data (c, cdr->weight)->f : 1.0;
493   struct ccase *next_case = casereader_peek (cdr->clone, cdr->n + 1);
494   int dir = 0;
495
496   cdr->n ++;
497   cdr->cc += weight;
498
499   if ( NULL == next_case)
500       goto end;
501   
502   dir = value_compare_3way (case_data (next_case, cdr->key),
503                             current_value, key_width);
504   case_unref (next_case);
505   if ( dir != 0 )
506     {
507       /* Insist that the data are sorted */
508       assert (cdr->direction == 0 || dir == cdr->direction);
509       cdr->direction = dir;
510       goto end;
511     }
512   
513   return false;
514
515  end:
516   cdr->prev_cc = cdr->cc;
517   cdr->cc = 0;
518   return true;
519 }
520
521
522
523 static struct ccase *
524 consolodate_weight (struct ccase *input, void *aux)
525 {
526   struct consolidator *cdr = aux;
527   struct ccase *c;
528
529   if (cdr->weight)
530     {
531       c = case_unshare (input);
532       case_data_rw (c, cdr->weight)->f = cdr->prev_cc;
533     }
534   else
535     {
536       c = case_unshare_and_resize (input, cdr->proto);
537       case_data_rw_idx (c, caseproto_get_n_widths (cdr->proto) - 1)->f = cdr->prev_cc;    
538     }
539
540   return c;
541 }
542
543
544 static bool
545 uniquify_destroy (void *aux)
546 {
547   struct consolidator *cdr = aux;
548
549   casereader_destroy (cdr->clone);
550   caseproto_unref (cdr->proto);
551   free (cdr);
552
553   return true;
554 }
555
556
557
558 /* Returns a new casereader which is based upon INPUT, but which contains a maximum 
559    of one case for each distinct value of KEY.
560    If WEIGHT is non-null, then the new casereader's values for this variable
561    will be the sum of all values matching KEY.
562    IF WEIGHT is null, then the new casereader will have an additional numeric
563    value appended, which will contain the total number of cases containing
564    KEY.
565    INPUT must be sorted on KEY
566 */
567 struct casereader *
568 casereader_create_distinct (struct casereader *input,
569                                                const struct variable *key,
570                                                const struct variable *weight)
571 {
572   struct casereader *u ;
573   struct casereader *ud ;
574   struct caseproto *output_proto = caseproto_ref (casereader_get_proto (input));
575
576   struct consolidator *cdr = xmalloc (sizeof (*cdr));
577   cdr->n = 0;
578   cdr->key = key;
579   cdr->weight = weight;
580   cdr->cc = 0;
581   cdr->clone = casereader_clone (input);
582   cdr->direction = 0;
583
584   if ( NULL == cdr->weight )
585     output_proto = caseproto_add_width (output_proto, 0);
586
587   cdr->proto = output_proto;
588
589   u = casereader_create_filter_func (input, uniquify,
590                                      NULL, cdr, NULL);
591
592   ud = casereader_create_translator (u,
593                                      output_proto,
594                                      consolodate_weight,
595                                      uniquify_destroy,
596                                      cdr);
597
598   return ud;
599 }
600