casereader-translator: Add a class for casereader translators.
[pspp] / src / data / casereader.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2007, 2009, 2010, 2013 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include "data/casereader.h"
20 #include "data/casereader-provider.h"
21
22 #include <stdlib.h>
23
24 #include "data/casereader-shim.h"
25 #include "data/casewriter.h"
26 #include "libpspp/assertion.h"
27 #include "libpspp/heap.h"
28 #include "libpspp/taint.h"
29
30 #include "gl/xalloc.h"
31
32 /* A casereader. */
33 struct casereader
34   {
35     struct taint *taint;                  /* Corrupted? */
36     struct caseproto *proto;              /* Format of contained cases. */
37     casenumber n_cases;                   /* Number of cases,
38                                              CASENUMBER_MAX if unknown. */
39     const struct casereader_class *class; /* Class. */
40     void *aux;                            /* Auxiliary data for class. */
41   };
42
43 /* Reads and returns the next case from READER.  The caller owns
44    the returned case and must call case_unref on it when its data
45    is no longer needed.  Returns a null pointer if cases have
46    been exhausted or upon detection of an I/O error.
47
48    The case returned is effectively consumed: it can never be
49    read again through READER.  If this is inconvenient, READER
50    may be cloned in advance with casereader_clone, or
51    casereader_peek may be used instead. */
52 struct ccase *
53 casereader_read (struct casereader *reader)
54 {
55   if (reader->n_cases != 0)
56     {
57       /* ->read may use casereader_swap to replace itself by
58          another reader and then delegate to that reader by
59          recursively calling casereader_read.  Currently only
60          lazy_casereader does this and, with luck, nothing else
61          ever will.
62
63          To allow this to work, however, we must decrement
64          n_cases before calling ->read.  If we decremented
65          n_cases after calling ->read, then this would actually
66          drop two cases from n_cases instead of one, and we'd
67          lose the last case in the casereader. */
68       struct ccase *c;
69       if (reader->n_cases != CASENUMBER_MAX)
70         reader->n_cases--;
71       c = reader->class->read (reader, reader->aux);
72       if (c != NULL)
73         {
74           size_t n_widths UNUSED = caseproto_get_n_widths (reader->proto);
75           assert (case_get_n_values (c) >= n_widths);
76           expensive_assert (caseproto_equal (case_get_proto (c), 0,
77                                              reader->proto, 0, n_widths));
78           return c;
79         }
80     }
81   reader->n_cases = 0;
82   return NULL;
83 }
84
85 /* Destroys READER.
86    Returns false if an I/O error was detected on READER, true
87    otherwise. */
88 bool
89 casereader_destroy (struct casereader *reader)
90 {
91   bool ok = true;
92   if (reader != NULL)
93     {
94       reader->class->destroy (reader, reader->aux);
95       ok = taint_destroy (reader->taint);
96       caseproto_unref (reader->proto);
97       free (reader);
98     }
99   return ok;
100 }
101
102 /* Returns a clone of READER.  READER and its clone may be used
103    to read the same sequence of cases in the same order, barring
104    I/O errors. */
105 struct casereader *
106 casereader_clone (const struct casereader *reader_)
107 {
108   struct casereader *reader = CONST_CAST (struct casereader *, reader_);
109   struct casereader *clone;
110   if (reader == NULL)
111     return NULL;
112
113   if (reader->class->clone == NULL)
114     casereader_shim_insert (reader);
115   clone = reader->class->clone (reader, reader->aux);
116   assert (clone != NULL);
117   assert (clone != reader);
118   return clone;
119 }
120
121 /* Returns a copy of READER, which is itself destroyed.
122    Useful for taking over ownership of a casereader, to enforce
123    preventing the original owner from accessing the casereader
124    again. */
125 struct casereader *
126 casereader_rename (struct casereader *reader)
127 {
128   struct casereader *new = xmemdup (reader, sizeof *reader);
129   free (reader);
130   return new;
131 }
132
133 /* Exchanges the casereaders referred to by A and B. */
134 void
135 casereader_swap (struct casereader *a, struct casereader *b)
136 {
137   if (a != b)
138     {
139       struct casereader tmp = *a;
140       *a = *b;
141       *b = tmp;
142     }
143 }
144
145 /* Reads and returns the (IDX + 1)'th case from READER.  The
146    caller owns the returned case and must call case_unref on it
147    when it is no longer needed.  Returns a null pointer if cases
148    have been exhausted or upon detection of an I/O error. */
149 struct ccase *
150 casereader_peek (const struct casereader *reader_, casenumber idx)
151 {
152   struct casereader *reader = CONST_CAST (struct casereader *, reader_);
153   if (idx < reader->n_cases)
154     {
155       struct ccase *c;
156       if (reader->class->peek == NULL)
157         casereader_shim_insert (reader);
158       c = reader->class->peek (reader, reader->aux, idx);
159       if (c != NULL)
160         return c;
161       else if (casereader_error (reader))
162         reader->n_cases = 0;
163     }
164   if (reader->n_cases > idx)
165     reader->n_cases = idx;
166   return NULL;
167 }
168
169 /* Returns true if no cases remain to be read from READER, or if
170    an error has occurred on READER.  (A return value of false
171    does *not* mean that the next call to casereader_peek or
172    casereader_read will return true, because an error can occur
173    in the meantime.) */
174 bool
175 casereader_is_empty (const struct casereader *reader)
176 {
177   if (reader->n_cases == 0)
178     return true;
179   else
180     {
181       struct ccase *c = casereader_peek (reader, 0);
182       if (c == NULL)
183         return true;
184       else
185         {
186           case_unref (c);
187           return false;
188         }
189     }
190 }
191
192 /* Returns true if an I/O error or another hard error has
193    occurred on READER, a clone of READER, or on some object on
194    which READER's data has a dependency, false otherwise. */
195 bool
196 casereader_error (const struct casereader *reader)
197 {
198   return taint_is_tainted (reader->taint);
199 }
200
201 /* Marks READER as having encountered an error.
202
203    Ordinarily, this function should be called by the
204    implementation of a casereader, not by the casereader's
205    client.  Instead, casereader clients should usually ensure
206    that a casereader's error state is correct by using
207    taint_propagate to propagate to the casereader's taint
208    structure, which may be obtained via casereader_get_taint. */
209 void
210 casereader_force_error (struct casereader *reader)
211 {
212   taint_set_taint (reader->taint);
213 }
214
215 /* Returns READER's associate taint object, for use with
216    taint_propagate and other taint functions. */
217 const struct taint *
218 casereader_get_taint (const struct casereader *reader)
219 {
220   return reader->taint;
221 }
222
223 /* Returns the number of cases that will be read by successive
224    calls to casereader_read for READER, assuming that no errors
225    occur.  Upon an error condition, the case count drops to 0, so
226    that no more cases can be obtained.
227
228    Not all casereaders can predict the number of cases that they
229    will produce without actually reading all of them.  In that
230    case, this function returns CASENUMBER_MAX.  To obtain the
231    actual number of cases in such a casereader, use
232    casereader_count_cases. */
233 casenumber
234 casereader_get_n_cases (struct casereader *reader)
235 {
236   return reader->n_cases;
237 }
238
239 static casenumber
240 casereader_count_cases__ (const struct casereader *reader,
241                           casenumber max_cases)
242 {
243   struct casereader *clone;
244   casenumber n_cases;
245
246   clone = casereader_clone (reader);
247   n_cases = casereader_advance (clone, max_cases);
248   casereader_destroy (clone);
249
250   return n_cases;
251 }
252
253 /* Returns the number of cases that will be read by successive
254    calls to casereader_read for READER, assuming that no errors
255    occur.  Upon an error condition, the case count drops to 0, so
256    that no more cases can be obtained.
257
258    For a casereader that cannot predict the number of cases it
259    will produce, this function actually reads (and discards) all
260    of the contents of a clone of READER.  Thus, the return value
261    is always correct in the absence of I/O errors. */
262 casenumber
263 casereader_count_cases (const struct casereader *reader)
264 {
265   if (reader->n_cases == CASENUMBER_MAX)
266     {
267       struct casereader *reader_rw = CONST_CAST (struct casereader *, reader);
268       reader_rw->n_cases = casereader_count_cases__ (reader, CASENUMBER_MAX);
269     }
270   return reader->n_cases;
271 }
272
273 /* Truncates READER to at most N cases. */
274 void
275 casereader_truncate (struct casereader *reader, casenumber n)
276 {
277   /* This could be optimized, if it ever becomes too expensive, by adding a
278      "max_cases" member to struct casereader.  We could also add a "truncate"
279      function to the casereader implementation, to allow the casereader to
280      throw away data that cannot ever be read. */
281   if (reader->n_cases == CASENUMBER_MAX)
282     reader->n_cases = casereader_count_cases__ (reader, n);
283   if (reader->n_cases > n)
284     reader->n_cases = n;
285 }
286
287 /* Returns the prototype for the cases in READER.  The caller
288    must not unref the returned prototype. */
289 const struct caseproto *
290 casereader_get_proto (const struct casereader *reader)
291 {
292   return reader->proto;
293 }
294
295 /* Skips past N cases in READER, stopping when the last case in
296    READER has been read or on an input error.  Returns the number
297    of cases successfully skipped. */
298 casenumber
299 casereader_advance (struct casereader *reader, casenumber n)
300 {
301   casenumber i;
302
303   for (i = 0; i < n; i++)
304     {
305       struct ccase *c = casereader_read (reader);
306       if (c == NULL)
307         break;
308       case_unref (c);
309     }
310
311   return i;
312 }
313
314
315 /* Copies all the cases in READER to WRITER, propagating errors
316    appropriately. READER is destroyed by this function */
317 void
318 casereader_transfer (struct casereader *reader, struct casewriter *writer)
319 {
320   struct ccase *c;
321
322   taint_propagate (casereader_get_taint (reader),
323                    casewriter_get_taint (writer));
324   while ((c = casereader_read (reader)) != NULL)
325     casewriter_write (writer, c);
326   casereader_destroy (reader);
327 }
328
329 /* Creates and returns a new casereader.  This function is
330    intended for use by casereader implementations, not by
331    casereader clients.
332
333    This function is most suited for creating a casereader for a
334    data source that is naturally sequential.
335    casereader_create_random may be more appropriate for a data
336    source that supports random access.
337
338    Ordinarily, specify a null pointer for TAINT, in which case
339    the new casereader will have a new, unique taint object.  If
340    the new casereader should have a clone of an existing taint
341    object, specify that object as TAINT.  (This is most commonly
342    useful in an implementation of the "clone" casereader_class
343    function, in which case the cloned casereader should have the
344    same taint object as the original casereader.)
345
346    PROTO must be the prototype for the cases that may be read
347    from the casereader.  The caller retains its reference to
348    PROTO.
349
350    CASE_CNT is an upper limit on the number of cases that
351    casereader_read will return from the casereader in successive
352    calls.  Ordinarily, this is the actual number of cases in the
353    data source or CASENUMBER_MAX if the number of cases cannot be
354    predicted in advance.
355
356    CLASS and AUX are a set of casereader implementation-specific
357    member functions and auxiliary data to pass to those member
358    functions, respectively. */
359 struct casereader *
360 casereader_create_sequential (const struct taint *taint,
361                               const struct caseproto *proto,
362                               casenumber n_cases,
363                               const struct casereader_class *class, void *aux)
364 {
365   struct casereader *reader = xmalloc (sizeof *reader);
366   reader->taint = taint != NULL ? taint_clone (taint) : taint_create ();
367   reader->proto = caseproto_ref (proto);
368   reader->n_cases = n_cases;
369   reader->class = class;
370   reader->aux = aux;
371   return reader;
372 }
373
374 /* If READER is a casereader of the given CLASS, returns its
375    associated auxiliary data; otherwise, returns a null pointer.
376
377    This function is intended for use from casereader
378    implementations, not by casereader users.  Even within
379    casereader implementations, its usefulness is quite limited,
380    for at least two reasons.  First, every casereader member
381    function already receives a pointer to the casereader's
382    auxiliary data.  Second, a casereader's class can change
383    (through a call to casereader_swap) and this is in practice
384    quite common (e.g. any call to casereader_clone on a
385    casereader that does not directly support clone will cause the
386    casereader to be replaced by a shim caseader). */
387 void *
388 casereader_dynamic_cast (struct casereader *reader,
389                          const struct casereader_class *class)
390 {
391   return reader->class == class ? reader->aux : NULL;
392 }
393 \f
394 /* Random-access casereader implementation.
395
396    This is a set of wrappers around casereader_create_sequential
397    and struct casereader_class to make it easy to create
398    efficient casereaders for data sources that natively support
399    random access. */
400
401 /* One clone of a random reader. */
402 struct random_reader
403   {
404     struct random_reader_shared *shared; /* Data shared among clones. */
405     struct heap_node heap_node; /* Node in shared data's heap of readers. */
406     casenumber offset;          /* Number of cases already read. */
407   };
408
409 /* Returns the random_reader in which the given heap_node is
410    embedded. */
411 static struct random_reader *
412 random_reader_from_heap_node (const struct heap_node *node)
413 {
414   return heap_data (node, struct random_reader, heap_node);
415 }
416
417 /* Data shared among clones of a random reader. */
418 struct random_reader_shared
419   {
420     struct heap *readers;       /* Heap of struct random_readers. */
421     casenumber min_offset;      /* Smallest offset of any random_reader. */
422     const struct casereader_random_class *class;
423     void *aux;
424   };
425
426 static const struct casereader_class random_reader_casereader_class;
427
428 /* Creates and returns a new random_reader with the given SHARED
429    data and OFFSET.  Inserts the new random reader into the
430    shared heap. */
431 static struct random_reader *
432 make_random_reader (struct random_reader_shared *shared, casenumber offset)
433 {
434   struct random_reader *br = xmalloc (sizeof *br);
435   br->offset = offset;
436   br->shared = shared;
437   heap_insert (shared->readers, &br->heap_node);
438   return br;
439 }
440
441 /* Compares random_readers A and B by offset and returns a
442    strcmp()-like result. */
443 static int
444 compare_random_readers_by_offset (const struct heap_node *a_,
445                                   const struct heap_node *b_,
446                                   const void *aux UNUSED)
447 {
448   const struct random_reader *a = random_reader_from_heap_node (a_);
449   const struct random_reader *b = random_reader_from_heap_node (b_);
450   return a->offset < b->offset ? -1 : a->offset > b->offset;
451 }
452
453 /* Creates and returns a new casereader.  This function is
454    intended for use by casereader implementations, not by
455    casereader clients.
456
457    This function is most suited for creating a casereader for a
458    data source that supports random access.
459    casereader_create_sequential is more appropriate for a data
460    source that is naturally sequential.
461
462    PROTO must be the prototype for the cases that may be read
463    from the casereader.  The caller retains its reference to
464    PROTO.
465
466    CASE_CNT is an upper limit on the number of cases that
467    casereader_read will return from the casereader in successive
468    calls.  Ordinarily, this is the actual number of cases in the
469    data source or CASENUMBER_MAX if the number of cases cannot be
470    predicted in advance.
471
472    CLASS and AUX are a set of casereader implementation-specific
473    member functions and auxiliary data to pass to those member
474    functions, respectively. */
475 struct casereader *
476 casereader_create_random (const struct caseproto *proto, casenumber n_cases,
477                           const struct casereader_random_class *class,
478                           void *aux)
479 {
480   struct random_reader_shared *shared = xmalloc (sizeof *shared);
481   shared->readers = heap_create (compare_random_readers_by_offset, NULL);
482   shared->class = class;
483   shared->aux = aux;
484   shared->min_offset = 0;
485   return casereader_create_sequential (NULL, proto, n_cases,
486                                        &random_reader_casereader_class,
487                                        make_random_reader (shared, 0));
488 }
489
490 /* Reassesses the min_offset in SHARED based on the minimum
491    offset in the heap.   */
492 static void
493 advance_random_reader (struct casereader *reader,
494                        struct random_reader_shared *shared)
495 {
496   casenumber old, new;
497
498   old = shared->min_offset;
499   new = random_reader_from_heap_node (heap_minimum (shared->readers))->offset;
500   assert (new >= old);
501   if (new > old)
502     {
503       shared->min_offset = new;
504       if (shared->class->advance)
505         shared->class->advance (reader, shared->aux, new - old);
506     }
507 }
508
509 /* struct casereader_class "read" function for random reader. */
510 static struct ccase *
511 random_reader_read (struct casereader *reader, void *br_)
512 {
513   struct random_reader *br = br_;
514   struct random_reader_shared *shared = br->shared;
515   struct ccase *c = shared->class->read (reader, shared->aux,
516                                          br->offset - shared->min_offset);
517   if (c != NULL)
518     {
519       br->offset++;
520       heap_changed (shared->readers, &br->heap_node);
521       advance_random_reader (reader, shared);
522     }
523   return c;
524 }
525
526 /* struct casereader_class "destroy" function for random
527    reader. */
528 static void
529 random_reader_destroy (struct casereader *reader, void *br_)
530 {
531   struct random_reader *br = br_;
532   struct random_reader_shared *shared = br->shared;
533
534   heap_delete (shared->readers, &br->heap_node);
535   if (heap_is_empty (shared->readers))
536     {
537       heap_destroy (shared->readers);
538       shared->class->destroy (reader, shared->aux);
539       free (shared);
540     }
541   else
542     advance_random_reader (reader, shared);
543
544   free (br);
545 }
546
547 /* struct casereader_class "clone" function for random reader. */
548 static struct casereader *
549 random_reader_clone (struct casereader *reader, void *br_)
550 {
551   struct random_reader *br = br_;
552   struct random_reader_shared *shared = br->shared;
553   return casereader_create_sequential (casereader_get_taint (reader),
554                                        reader->proto,
555                                        casereader_get_n_cases (reader),
556                                        &random_reader_casereader_class,
557                                        make_random_reader (shared,
558                                                            br->offset));
559 }
560
561 /* struct casereader_class "peek" function for random reader. */
562 static struct ccase *
563 random_reader_peek (struct casereader *reader, void *br_, casenumber idx)
564 {
565   struct random_reader *br = br_;
566   struct random_reader_shared *shared = br->shared;
567
568   return shared->class->read (reader, shared->aux,
569                               br->offset - shared->min_offset + idx);
570 }
571
572 /* Casereader class for random reader. */
573 static const struct casereader_class random_reader_casereader_class =
574   {
575     random_reader_read,
576     random_reader_destroy,
577     random_reader_clone,
578     random_reader_peek,
579   };
580 \f
581 \f
582 static const struct casereader_class casereader_null_class;
583
584 /* Returns a casereader with no cases.  The casereader has the prototype
585    specified by PROTO.  PROTO may be specified as a null pointer, in which case
586    the casereader has no variables. */
587 struct casereader *
588 casereader_create_empty (const struct caseproto *proto_)
589 {
590   struct casereader *reader;
591   struct caseproto *proto;
592
593   proto = proto_ != NULL ? caseproto_ref (proto_) : caseproto_create ();
594   reader = casereader_create_sequential (NULL, proto, 0,
595                                          &casereader_null_class, NULL);
596   caseproto_unref (proto);
597
598   return reader;
599 }
600
601 static struct ccase *
602 casereader_null_read (struct casereader *reader UNUSED, void *aux UNUSED)
603 {
604   return NULL;
605 }
606
607 static void
608 casereader_null_destroy (struct casereader *reader UNUSED, void *aux UNUSED)
609 {
610   /* Nothing to do. */
611 }
612
613 static const struct casereader_class casereader_null_class =
614   {
615     casereader_null_read,
616     casereader_null_destroy,
617     NULL,                       /* clone */
618     NULL,                       /* peek */
619   };