casereader: Factor buffering shim out into separate source file.
[pspp] / src / data / casereader.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2007, 2009, 2010 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include "data/casereader.h"
20 #include "data/casereader-provider.h"
21
22 #include <stdlib.h>
23
24 #include "data/casereader-shim.h"
25 #include "data/casewriter.h"
26 #include "libpspp/assertion.h"
27 #include "libpspp/heap.h"
28 #include "libpspp/taint.h"
29
30 #include "gl/xalloc.h"
31
32 /* A casereader. */
33 struct casereader
34   {
35     struct taint *taint;                  /* Corrupted? */
36     struct caseproto *proto;              /* Format of contained cases. */
37     casenumber case_cnt;                  /* Number of cases,
38                                              CASENUMBER_MAX if unknown. */
39     const struct casereader_class *class; /* Class. */
40     void *aux;                            /* Auxiliary data for class. */
41   };
42
43 /* Reads and returns the next case from READER.  The caller owns
44    the returned case and must call case_unref on it when its data
45    is no longer needed.  Returns a null pointer if cases have
46    been exhausted or upon detection of an I/O error.
47
48    The case returned is effectively consumed: it can never be
49    read again through READER.  If this is inconvenient, READER
50    may be cloned in advance with casereader_clone, or
51    casereader_peek may be used instead. */
52 struct ccase *
53 casereader_read (struct casereader *reader)
54 {
55   if (reader->case_cnt != 0)
56     {
57       /* ->read may use casereader_swap to replace itself by
58          another reader and then delegate to that reader by
59          recursively calling casereader_read.  Currently only
60          lazy_casereader does this and, with luck, nothing else
61          ever will.
62
63          To allow this to work, however, we must decrement
64          case_cnt before calling ->read.  If we decremented
65          case_cnt after calling ->read, then this would actually
66          drop two cases from case_cnt instead of one, and we'd
67          lose the last case in the casereader. */
68       struct ccase *c;
69       if (reader->case_cnt != CASENUMBER_MAX)
70         reader->case_cnt--;
71       c = reader->class->read (reader, reader->aux);
72       if (c != NULL)
73         {
74           size_t n_widths UNUSED = caseproto_get_n_widths (reader->proto);
75           assert (case_get_value_cnt (c) >= n_widths);
76           expensive_assert (caseproto_equal (case_get_proto (c), 0,
77                                              reader->proto, 0, n_widths));
78           return c;
79         }
80     }
81   reader->case_cnt = 0;
82   return NULL;
83 }
84
85 /* Destroys READER.
86    Returns false if an I/O error was detected on READER, true
87    otherwise. */
88 bool
89 casereader_destroy (struct casereader *reader)
90 {
91   bool ok = true;
92   if (reader != NULL)
93     {
94       reader->class->destroy (reader, reader->aux);
95       ok = taint_destroy (reader->taint);
96       caseproto_unref (reader->proto);
97       free (reader);
98     }
99   return ok;
100 }
101
102 /* Returns a clone of READER.  READER and its clone may be used
103    to read the same sequence of cases in the same order, barring
104    I/O errors. */
105 struct casereader *
106 casereader_clone (const struct casereader *reader_)
107 {
108   struct casereader *reader = CONST_CAST (struct casereader *, reader_);
109   struct casereader *clone;
110   if ( reader == NULL ) 
111     return NULL;
112
113   if (reader->class->clone == NULL)
114     casereader_shim_insert (reader);
115   clone = reader->class->clone (reader, reader->aux);
116   assert (clone != NULL);
117   assert (clone != reader);
118   return clone;
119 }
120
121 /* Makes a copy of ORIGINAL into *NEW1 (if NEW1 is non-null) and
122    *NEW2 (if NEW2 is non-null), then destroys ORIGINAL. */
123 void
124 casereader_split (struct casereader *original,
125                   struct casereader **new1, struct casereader **new2)
126 {
127   if (new1 != NULL && new2 != NULL)
128     {
129       *new1 = casereader_rename (original);
130       *new2 = casereader_clone (*new1);
131     }
132   else if (new1 != NULL)
133     *new1 = casereader_rename (original);
134   else if (new2 != NULL)
135     *new2 = casereader_rename (original);
136   else
137     casereader_destroy (original);
138 }
139
140 /* Returns a copy of READER, which is itself destroyed.
141    Useful for taking over ownership of a casereader, to enforce
142    preventing the original owner from accessing the casereader
143    again. */
144 struct casereader *
145 casereader_rename (struct casereader *reader)
146 {
147   struct casereader *new = xmemdup (reader, sizeof *reader);
148   free (reader);
149   return new;
150 }
151
152 /* Exchanges the casereaders referred to by A and B. */
153 void
154 casereader_swap (struct casereader *a, struct casereader *b)
155 {
156   if (a != b)
157     {
158       struct casereader tmp = *a;
159       *a = *b;
160       *b = tmp;
161     }
162 }
163
164 /* Reads and returns the (IDX + 1)'th case from READER.  The
165    caller owns the returned case and must call case_unref on it
166    when it is no longer needed.  Returns a null pointer if cases
167    have been exhausted or upon detection of an I/O error. */
168 struct ccase *
169 casereader_peek (struct casereader *reader, casenumber idx)
170 {
171   if (idx < reader->case_cnt)
172     {
173       struct ccase *c;
174       if (reader->class->peek == NULL)
175         casereader_shim_insert (reader);
176       c = reader->class->peek (reader, reader->aux, idx);
177       if (c != NULL)
178         return c;
179       else if (casereader_error (reader))
180         reader->case_cnt = 0;
181     }
182   if (reader->case_cnt > idx)
183     reader->case_cnt = idx;
184   return NULL;
185 }
186
187 /* Returns true if no cases remain to be read from READER, or if
188    an error has occurred on READER.  (A return value of false
189    does *not* mean that the next call to casereader_peek or
190    casereader_read will return true, because an error can occur
191    in the meantime.) */
192 bool
193 casereader_is_empty (struct casereader *reader)
194 {
195   if (reader->case_cnt == 0)
196     return true;
197   else
198     {
199       struct ccase *c = casereader_peek (reader, 0);
200       if (c == NULL)
201         return true;
202       else
203         {
204           case_unref (c);
205           return false;
206         }
207     }
208 }
209
210 /* Returns true if an I/O error or another hard error has
211    occurred on READER, a clone of READER, or on some object on
212    which READER's data has a dependency, false otherwise. */
213 bool
214 casereader_error (const struct casereader *reader)
215 {
216   return taint_is_tainted (reader->taint);
217 }
218
219 /* Marks READER as having encountered an error.
220
221    Ordinarily, this function should be called by the
222    implementation of a casereader, not by the casereader's
223    client.  Instead, casereader clients should usually ensure
224    that a casereader's error state is correct by using
225    taint_propagate to propagate to the casereader's taint
226    structure, which may be obtained via casereader_get_taint. */
227 void
228 casereader_force_error (struct casereader *reader)
229 {
230   taint_set_taint (reader->taint);
231 }
232
233 /* Returns READER's associate taint object, for use with
234    taint_propagate and other taint functions. */
235 const struct taint *
236 casereader_get_taint (const struct casereader *reader)
237 {
238   return reader->taint;
239 }
240
241 /* Returns the number of cases that will be read by successive
242    calls to casereader_read for READER, assuming that no errors
243    occur.  Upon an error condition, the case count drops to 0, so
244    that no more cases can be obtained.
245
246    Not all casereaders can predict the number of cases that they
247    will produce without actually reading all of them.  In that
248    case, this function returns CASENUMBER_MAX.  To obtain the
249    actual number of cases in such a casereader, use
250    casereader_count_cases. */
251 casenumber
252 casereader_get_case_cnt (struct casereader *reader)
253 {
254   return reader->case_cnt;
255 }
256
257 static casenumber
258 casereader_count_cases__ (const struct casereader *reader,
259                           casenumber max_cases)
260 {
261   struct casereader *clone;
262   casenumber n_cases;
263
264   clone = casereader_clone (reader);
265   n_cases = casereader_advance (clone, max_cases);
266   casereader_destroy (clone);
267
268   return n_cases;
269 }
270
271 /* Returns the number of cases that will be read by successive
272    calls to casereader_read for READER, assuming that no errors
273    occur.  Upon an error condition, the case count drops to 0, so
274    that no more cases can be obtained.
275
276    For a casereader that cannot predict the number of cases it
277    will produce, this function actually reads (and discards) all
278    of the contents of a clone of READER.  Thus, the return value
279    is always correct in the absence of I/O errors. */
280 casenumber
281 casereader_count_cases (const struct casereader *reader)
282 {
283   if (reader->case_cnt == CASENUMBER_MAX)
284     {
285       struct casereader *reader_rw = CONST_CAST (struct casereader *, reader);
286       reader_rw->case_cnt = casereader_count_cases__ (reader, CASENUMBER_MAX);
287     }
288   return reader->case_cnt;
289 }
290
291 /* Truncates READER to at most N cases. */
292 void
293 casereader_truncate (struct casereader *reader, casenumber n)
294 {
295   /* This could be optimized, if it ever becomes too expensive, by adding a
296      "max_cases" member to struct casereader.  We could also add a "truncate"
297      function to the casereader implementation, to allow the casereader to
298      throw away data that cannot ever be read. */
299   if (reader->case_cnt == CASENUMBER_MAX)
300     reader->case_cnt = casereader_count_cases__ (reader, n);
301   if (reader->case_cnt > n)
302     reader->case_cnt = n;
303 }
304
305 /* Returns the prototype for the cases in READER.  The caller
306    must not unref the returned prototype. */
307 const struct caseproto *
308 casereader_get_proto (const struct casereader *reader)
309 {
310   return reader->proto;
311 }
312
313 /* Skips past N cases in READER, stopping when the last case in
314    READER has been read or on an input error.  Returns the number
315    of cases successfully skipped. */
316 casenumber
317 casereader_advance (struct casereader *reader, casenumber n)
318 {
319   casenumber i;
320
321   for (i = 0; i < n; i++)
322     {
323       struct ccase *c = casereader_read (reader);
324       if (c == NULL)
325         break;
326       case_unref (c);
327     }
328
329   return i;
330 }
331
332
333 /* Copies all the cases in READER to WRITER, propagating errors
334    appropriately. */
335 void
336 casereader_transfer (struct casereader *reader, struct casewriter *writer)
337 {
338   struct ccase *c;
339
340   taint_propagate (casereader_get_taint (reader),
341                    casewriter_get_taint (writer));
342   while ((c = casereader_read (reader)) != NULL)
343     casewriter_write (writer, c);
344   casereader_destroy (reader);
345 }
346
347 /* Creates and returns a new casereader.  This function is
348    intended for use by casereader implementations, not by
349    casereader clients.
350
351    This function is most suited for creating a casereader for a
352    data source that is naturally sequential.
353    casereader_create_random may be more appropriate for a data
354    source that supports random access.
355
356    Ordinarily, specify a null pointer for TAINT, in which case
357    the new casereader will have a new, unique taint object.  If
358    the new casereader should have a clone of an existing taint
359    object, specify that object as TAINT.  (This is most commonly
360    useful in an implementation of the "clone" casereader_class
361    function, in which case the cloned casereader should have the
362    same taint object as the original casereader.)
363
364    PROTO must be the prototype for the cases that may be read
365    from the casereader.  The caller retains its reference to
366    PROTO.
367
368    CASE_CNT is an upper limit on the number of cases that
369    casereader_read will return from the casereader in successive
370    calls.  Ordinarily, this is the actual number of cases in the
371    data source or CASENUMBER_MAX if the number of cases cannot be
372    predicted in advance.
373
374    CLASS and AUX are a set of casereader implementation-specific
375    member functions and auxiliary data to pass to those member
376    functions, respectively. */
377 struct casereader *
378 casereader_create_sequential (const struct taint *taint,
379                               const struct caseproto *proto,
380                               casenumber case_cnt,
381                               const struct casereader_class *class, void *aux)
382 {
383   struct casereader *reader = xmalloc (sizeof *reader);
384   reader->taint = taint != NULL ? taint_clone (taint) : taint_create ();
385   reader->proto = caseproto_ref (proto);
386   reader->case_cnt = case_cnt;
387   reader->class = class;
388   reader->aux = aux;
389   return reader;
390 }
391
392 /* If READER is a casereader of the given CLASS, returns its
393    associated auxiliary data; otherwise, returns a null pointer.
394
395    This function is intended for use from casereader
396    implementations, not by casereader users.  Even within
397    casereader implementations, its usefulness is quite limited,
398    for at least two reasons.  First, every casereader member
399    function already receives a pointer to the casereader's
400    auxiliary data.  Second, a casereader's class can change
401    (through a call to casereader_swap) and this is in practice
402    quite common (e.g. any call to casereader_clone on a
403    casereader that does not directly support clone will cause the
404    casereader to be replaced by a shim caseader). */
405 void *
406 casereader_dynamic_cast (struct casereader *reader,
407                          const struct casereader_class *class)
408 {
409   return reader->class == class ? reader->aux : NULL;
410 }
411 \f
412 /* Random-access casereader implementation.
413
414    This is a set of wrappers around casereader_create_sequential
415    and struct casereader_class to make it easy to create
416    efficient casereaders for data sources that natively support
417    random access. */
418
419 /* One clone of a random reader. */
420 struct random_reader
421   {
422     struct random_reader_shared *shared; /* Data shared among clones. */
423     struct heap_node heap_node; /* Node in shared data's heap of readers. */
424     casenumber offset;          /* Number of cases already read. */
425   };
426
427 /* Returns the random_reader in which the given heap_node is
428    embedded. */
429 static struct random_reader *
430 random_reader_from_heap_node (const struct heap_node *node)
431 {
432   return heap_data (node, struct random_reader, heap_node);
433 }
434
435 /* Data shared among clones of a random reader. */
436 struct random_reader_shared
437   {
438     struct heap *readers;       /* Heap of struct random_readers. */
439     casenumber min_offset;      /* Smallest offset of any random_reader. */
440     const struct casereader_random_class *class;
441     void *aux;
442   };
443
444 static const struct casereader_class random_reader_casereader_class;
445
446 /* Creates and returns a new random_reader with the given SHARED
447    data and OFFSET.  Inserts the new random reader into the
448    shared heap. */
449 static struct random_reader *
450 make_random_reader (struct random_reader_shared *shared, casenumber offset)
451 {
452   struct random_reader *br = xmalloc (sizeof *br);
453   br->offset = offset;
454   br->shared = shared;
455   heap_insert (shared->readers, &br->heap_node);
456   return br;
457 }
458
459 /* Compares random_readers A and B by offset and returns a
460    strcmp()-like result. */
461 static int
462 compare_random_readers_by_offset (const struct heap_node *a_,
463                                   const struct heap_node *b_,
464                                   const void *aux UNUSED)
465 {
466   const struct random_reader *a = random_reader_from_heap_node (a_);
467   const struct random_reader *b = random_reader_from_heap_node (b_);
468   return a->offset < b->offset ? -1 : a->offset > b->offset;
469 }
470
471 /* Creates and returns a new casereader.  This function is
472    intended for use by casereader implementations, not by
473    casereader clients.
474
475    This function is most suited for creating a casereader for a
476    data source that supports random access.
477    casereader_create_sequential is more appropriate for a data
478    source that is naturally sequential.
479
480    PROTO must be the prototype for the cases that may be read
481    from the casereader.  The caller retains its reference to
482    PROTO.
483
484    CASE_CNT is an upper limit on the number of cases that
485    casereader_read will return from the casereader in successive
486    calls.  Ordinarily, this is the actual number of cases in the
487    data source or CASENUMBER_MAX if the number of cases cannot be
488    predicted in advance.
489
490    CLASS and AUX are a set of casereader implementation-specific
491    member functions and auxiliary data to pass to those member
492    functions, respectively. */
493 struct casereader *
494 casereader_create_random (const struct caseproto *proto, casenumber case_cnt,
495                           const struct casereader_random_class *class,
496                           void *aux)
497 {
498   struct random_reader_shared *shared = xmalloc (sizeof *shared);
499   shared->readers = heap_create (compare_random_readers_by_offset, NULL);
500   shared->class = class;
501   shared->aux = aux;
502   shared->min_offset = 0;
503   return casereader_create_sequential (NULL, proto, case_cnt,
504                                        &random_reader_casereader_class,
505                                        make_random_reader (shared, 0));
506 }
507
508 /* Reassesses the min_offset in SHARED based on the minimum
509    offset in the heap.   */
510 static void
511 advance_random_reader (struct casereader *reader,
512                        struct random_reader_shared *shared)
513 {
514   casenumber old, new;
515
516   old = shared->min_offset;
517   new = random_reader_from_heap_node (heap_minimum (shared->readers))->offset;
518   assert (new >= old);
519   if (new > old)
520     {
521       shared->min_offset = new;
522       shared->class->advance (reader, shared->aux, new - old);
523     }
524 }
525
526 /* struct casereader_class "read" function for random reader. */
527 static struct ccase *
528 random_reader_read (struct casereader *reader, void *br_)
529 {
530   struct random_reader *br = br_;
531   struct random_reader_shared *shared = br->shared;
532   struct ccase *c = shared->class->read (reader, shared->aux,
533                                          br->offset - shared->min_offset);
534   if (c != NULL)
535     {
536       br->offset++;
537       heap_changed (shared->readers, &br->heap_node);
538       advance_random_reader (reader, shared);
539     }
540   return c;
541 }
542
543 /* struct casereader_class "destroy" function for random
544    reader. */
545 static void
546 random_reader_destroy (struct casereader *reader, void *br_)
547 {
548   struct random_reader *br = br_;
549   struct random_reader_shared *shared = br->shared;
550
551   heap_delete (shared->readers, &br->heap_node);
552   if (heap_is_empty (shared->readers))
553     {
554       heap_destroy (shared->readers);
555       shared->class->destroy (reader, shared->aux);
556       free (shared);
557     }
558   else
559     advance_random_reader (reader, shared);
560
561   free (br);
562 }
563
564 /* struct casereader_class "clone" function for random reader. */
565 static struct casereader *
566 random_reader_clone (struct casereader *reader, void *br_)
567 {
568   struct random_reader *br = br_;
569   struct random_reader_shared *shared = br->shared;
570   return casereader_create_sequential (casereader_get_taint (reader),
571                                        reader->proto,
572                                        casereader_get_case_cnt (reader),
573                                        &random_reader_casereader_class,
574                                        make_random_reader (shared,
575                                                            br->offset));
576 }
577
578 /* struct casereader_class "peek" function for random reader. */
579 static struct ccase *
580 random_reader_peek (struct casereader *reader, void *br_, casenumber idx)
581 {
582   struct random_reader *br = br_;
583   struct random_reader_shared *shared = br->shared;
584
585   return shared->class->read (reader, shared->aux,
586                               br->offset - shared->min_offset + idx);
587 }
588
589 /* Casereader class for random reader. */
590 static const struct casereader_class random_reader_casereader_class =
591   {
592     random_reader_read,
593     random_reader_destroy,
594     random_reader_clone,
595     random_reader_peek,
596   };
597 \f
598 \f
599 static const struct casereader_class casereader_null_class;
600
601 /* Returns a casereader with no cases.  The casereader has the prototype
602    specified by PROTO.  PROTO may be specified as a null pointer, in which case
603    the casereader has no variables. */
604 struct casereader *
605 casereader_create_empty (const struct caseproto *proto_)
606 {
607   struct casereader *reader;
608   struct caseproto *proto;
609
610   proto = proto_ != NULL ? caseproto_ref (proto_) : caseproto_create ();
611   reader = casereader_create_sequential (NULL, proto, 0,
612                                          &casereader_null_class, NULL);
613   caseproto_unref (proto);
614
615   return reader;
616 }
617
618 static struct ccase *
619 casereader_null_read (struct casereader *reader UNUSED, void *aux UNUSED)
620 {
621   return NULL;
622 }
623
624 static void
625 casereader_null_destroy (struct casereader *reader UNUSED, void *aux UNUSED)
626 {
627   /* Nothing to do. */
628 }
629
630 static const struct casereader_class casereader_null_class =
631   {
632     casereader_null_read,
633     casereader_null_destroy,
634     NULL,                       /* clone */
635     NULL,                       /* peek */
636   };