casereader: New function casereader_advance().
[pspp-builds.git] / src / data / casereader.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2007, 2009 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <data/casereader.h>
20 #include <data/casereader-provider.h>
21
22 #include <stdlib.h>
23
24 #include <data/casewindow.h>
25 #include <data/casewriter.h>
26 #include <data/settings.h>
27 #include <libpspp/assertion.h>
28 #include <libpspp/heap.h>
29 #include <libpspp/taint.h>
30
31 #include "xalloc.h"
32
33 /* A casereader. */
34 struct casereader
35   {
36     struct taint *taint;                  /* Corrupted? */
37     struct caseproto *proto;              /* Format of contained cases. */
38     casenumber case_cnt;                  /* Number of cases,
39                                              CASENUMBER_MAX if unknown. */
40     const struct casereader_class *class; /* Class. */
41     void *aux;                            /* Auxiliary data for class. */
42   };
43
44 static void insert_shim (struct casereader *);
45
46 /* Reads and returns the next case from READER.  The caller owns
47    the returned case and must call case_unref on it when its data
48    is no longer needed.  Returns a null pointer if cases have
49    been exhausted or upon detection of an I/O error.
50
51    The case returned is effectively consumed: it can never be
52    read again through READER.  If this is inconvenient, READER
53    may be cloned in advance with casereader_clone, or
54    casereader_peek may be used instead. */
55 struct ccase *
56 casereader_read (struct casereader *reader)
57 {
58   if (reader->case_cnt != 0)
59     {
60       /* ->read may use casereader_swap to replace itself by
61          another reader and then delegate to that reader by
62          recursively calling casereader_read.  Currently only
63          lazy_casereader does this and, with luck, nothing else
64          ever will.
65
66          To allow this to work, however, we must decrement
67          case_cnt before calling ->read.  If we decremented
68          case_cnt after calling ->read, then this would actually
69          drop two cases from case_cnt instead of one, and we'd
70          lose the last case in the casereader. */
71       struct ccase *c;
72       if (reader->case_cnt != CASENUMBER_MAX)
73         reader->case_cnt--;
74       c = reader->class->read (reader, reader->aux);
75       if (c != NULL)
76         {
77           size_t n_widths UNUSED = caseproto_get_n_widths (reader->proto);
78           assert (case_get_value_cnt (c) >= n_widths);
79           expensive_assert (caseproto_equal (case_get_proto (c), 0,
80                                              reader->proto, 0, n_widths));
81           return c;
82         }
83     }
84   reader->case_cnt = 0;
85   return NULL;
86 }
87
88 /* Destroys READER.
89    Returns false if an I/O error was detected on READER, true
90    otherwise. */
91 bool
92 casereader_destroy (struct casereader *reader)
93 {
94   bool ok = true;
95   if (reader != NULL)
96     {
97       reader->class->destroy (reader, reader->aux);
98       ok = taint_destroy (reader->taint);
99       caseproto_unref (reader->proto);
100       free (reader);
101     }
102   return ok;
103 }
104
105 /* Returns a clone of READER.  READER and its clone may be used
106    to read the same sequence of cases in the same order, barring
107    I/O errors. */
108 struct casereader *
109 casereader_clone (const struct casereader *reader_)
110 {
111   struct casereader *reader = CONST_CAST (struct casereader *, reader_);
112   struct casereader *clone;
113   if ( reader == NULL ) 
114     return NULL;
115
116   if (reader->class->clone == NULL)
117     insert_shim (reader);
118   clone = reader->class->clone (reader, reader->aux);
119   assert (clone != NULL);
120   assert (clone != reader);
121   return clone;
122 }
123
124 /* Makes a copy of ORIGINAL into *NEW1 (if NEW1 is non-null) and
125    *NEW2 (if NEW2 is non-null), then destroys ORIGINAL. */
126 void
127 casereader_split (struct casereader *original,
128                   struct casereader **new1, struct casereader **new2)
129 {
130   if (new1 != NULL && new2 != NULL)
131     {
132       *new1 = casereader_rename (original);
133       *new2 = casereader_clone (*new1);
134     }
135   else if (new1 != NULL)
136     *new1 = casereader_rename (original);
137   else if (new2 != NULL)
138     *new2 = casereader_rename (original);
139   else
140     casereader_destroy (original);
141 }
142
143 /* Returns a copy of READER, which is itself destroyed.
144    Useful for taking over ownership of a casereader, to enforce
145    preventing the original owner from accessing the casereader
146    again. */
147 struct casereader *
148 casereader_rename (struct casereader *reader)
149 {
150   struct casereader *new = xmemdup (reader, sizeof *reader);
151   free (reader);
152   return new;
153 }
154
155 /* Exchanges the casereaders referred to by A and B. */
156 void
157 casereader_swap (struct casereader *a, struct casereader *b)
158 {
159   if (a != b)
160     {
161       struct casereader tmp = *a;
162       *a = *b;
163       *b = tmp;
164     }
165 }
166
167 /* Reads and returns the (IDX + 1)'th case from READER.  The
168    caller owns the returned case and must call case_unref on it
169    when it is no longer needed.  Returns a null pointer if cases
170    have been exhausted or upon detection of an I/O error. */
171 struct ccase *
172 casereader_peek (struct casereader *reader, casenumber idx)
173 {
174   if (idx < reader->case_cnt)
175     {
176       struct ccase *c;
177       if (reader->class->peek == NULL)
178         insert_shim (reader);
179       c = reader->class->peek (reader, reader->aux, idx);
180       if (c != NULL)
181         return c;
182       else if (casereader_error (reader))
183         reader->case_cnt = 0;
184     }
185   if (reader->case_cnt > idx)
186     reader->case_cnt = idx;
187   return NULL;
188 }
189
190 /* Returns true if no cases remain to be read from READER, or if
191    an error has occurred on READER.  (A return value of false
192    does *not* mean that the next call to casereader_peek or
193    casereader_read will return true, because an error can occur
194    in the meantime.) */
195 bool
196 casereader_is_empty (struct casereader *reader)
197 {
198   if (reader->case_cnt == 0)
199     return true;
200   else
201     {
202       struct ccase *c = casereader_peek (reader, 0);
203       if (c == NULL)
204         return true;
205       else
206         {
207           case_unref (c);
208           return false;
209         }
210     }
211 }
212
213 /* Returns true if an I/O error or another hard error has
214    occurred on READER, a clone of READER, or on some object on
215    which READER's data has a dependency, false otherwise. */
216 bool
217 casereader_error (const struct casereader *reader)
218 {
219   return taint_is_tainted (reader->taint);
220 }
221
222 /* Marks READER as having encountered an error.
223
224    Ordinarily, this function should be called by the
225    implementation of a casereader, not by the casereader's
226    client.  Instead, casereader clients should usually ensure
227    that a casereader's error state is correct by using
228    taint_propagate to propagate to the casereader's taint
229    structure, which may be obtained via casereader_get_taint. */
230 void
231 casereader_force_error (struct casereader *reader)
232 {
233   taint_set_taint (reader->taint);
234 }
235
236 /* Returns READER's associate taint object, for use with
237    taint_propagate and other taint functions. */
238 const struct taint *
239 casereader_get_taint (const struct casereader *reader)
240 {
241   return reader->taint;
242 }
243
244 /* Returns the number of cases that will be read by successive
245    calls to casereader_read for READER, assuming that no errors
246    occur.  Upon an error condition, the case count drops to 0, so
247    that no more cases can be obtained.
248
249    Not all casereaders can predict the number of cases that they
250    will produce without actually reading all of them.  In that
251    case, this function returns CASENUMBER_MAX.  To obtain the
252    actual number of cases in such a casereader, use
253    casereader_count_cases. */
254 casenumber
255 casereader_get_case_cnt (struct casereader *reader)
256 {
257   return reader->case_cnt;
258 }
259
260 static casenumber
261 casereader_count_cases__ (struct casereader *reader, casenumber max_cases)
262 {
263   struct casereader *clone;
264   casenumber n_cases;
265
266   clone = casereader_clone (reader);
267   n_cases = casereader_advance (clone, max_cases);
268   casereader_destroy (clone);
269
270   return n_cases;
271 }
272
273 /* Returns the number of cases that will be read by successive
274    calls to casereader_read for READER, assuming that no errors
275    occur.  Upon an error condition, the case count drops to 0, so
276    that no more cases can be obtained.
277
278    For a casereader that cannot predict the number of cases it
279    will produce, this function actually reads (and discards) all
280    of the contents of a clone of READER.  Thus, the return value
281    is always correct in the absence of I/O errors. */
282 casenumber
283 casereader_count_cases (struct casereader *reader)
284 {
285   if (reader->case_cnt == CASENUMBER_MAX)
286     reader->case_cnt = casereader_count_cases__ (reader, CASENUMBER_MAX);
287   return reader->case_cnt;
288 }
289
290 /* Truncates READER to at most N cases. */
291 void
292 casereader_truncate (struct casereader *reader, casenumber n)
293 {
294   /* This could be optimized, if it ever becomes too expensive, by adding a
295      "max_cases" member to struct casereader.  We could also add a "truncate"
296      function to the casereader implementation, to allow the casereader to
297      throw away data that cannot ever be read. */
298   if (reader->case_cnt == CASENUMBER_MAX)
299     reader->case_cnt = casereader_count_cases__ (reader, n);
300   if (reader->case_cnt > n)
301     reader->case_cnt = n;
302 }
303
304 /* Returns the prototype for the cases in READER.  The caller
305    must not unref the returned prototype. */
306 const struct caseproto *
307 casereader_get_proto (const struct casereader *reader)
308 {
309   return reader->proto;
310 }
311
312 /* Skips past N cases in READER, stopping when the last case in
313    READER has been read or on an input error.  Returns the number
314    of cases successfully skipped. */
315 casenumber
316 casereader_advance (struct casereader *reader, casenumber n)
317 {
318   casenumber i;
319
320   for (i = 0; i < n; i++)
321     {
322       struct ccase *c = casereader_read (reader);
323       if (c == NULL)
324         break;
325       case_unref (c);
326     }
327
328   return i;
329 }
330
331
332 /* Copies all the cases in READER to WRITER, propagating errors
333    appropriately. */
334 void
335 casereader_transfer (struct casereader *reader, struct casewriter *writer)
336 {
337   struct ccase *c;
338
339   taint_propagate (casereader_get_taint (reader),
340                    casewriter_get_taint (writer));
341   while ((c = casereader_read (reader)) != NULL)
342     casewriter_write (writer, c);
343   casereader_destroy (reader);
344 }
345
346 /* Creates and returns a new casereader.  This function is
347    intended for use by casereader implementations, not by
348    casereader clients.
349
350    This function is most suited for creating a casereader for a
351    data source that is naturally sequential.
352    casereader_create_random may be more appropriate for a data
353    source that supports random access.
354
355    Ordinarily, specify a null pointer for TAINT, in which case
356    the new casereader will have a new, unique taint object.  If
357    the new casereader should have a clone of an existing taint
358    object, specify that object as TAINT.  (This is most commonly
359    useful in an implementation of the "clone" casereader_class
360    function, in which case the cloned casereader should have the
361    same taint object as the original casereader.)
362
363    PROTO must be the prototype for the cases that may be read
364    from the casereader.  The caller retains its reference to
365    PROTO.
366
367    CASE_CNT is an upper limit on the number of cases that
368    casereader_read will return from the casereader in successive
369    calls.  Ordinarily, this is the actual number of cases in the
370    data source or CASENUMBER_MAX if the number of cases cannot be
371    predicted in advance.
372
373    CLASS and AUX are a set of casereader implementation-specific
374    member functions and auxiliary data to pass to those member
375    functions, respectively. */
376 struct casereader *
377 casereader_create_sequential (const struct taint *taint,
378                               const struct caseproto *proto,
379                               casenumber case_cnt,
380                               const struct casereader_class *class, void *aux)
381 {
382   struct casereader *reader = xmalloc (sizeof *reader);
383   reader->taint = taint != NULL ? taint_clone (taint) : taint_create ();
384   reader->proto = caseproto_ref (proto);
385   reader->case_cnt = case_cnt;
386   reader->class = class;
387   reader->aux = aux;
388   return reader;
389 }
390
391 /* If READER is a casereader of the given CLASS, returns its
392    associated auxiliary data; otherwise, returns a null pointer.
393
394    This function is intended for use from casereader
395    implementations, not by casereader users.  Even within
396    casereader implementations, its usefulness is quite limited,
397    for at least two reasons.  First, every casereader member
398    function already receives a pointer to the casereader's
399    auxiliary data.  Second, a casereader's class can change
400    (through a call to casereader_swap) and this is in practice
401    quite common (e.g. any call to casereader_clone on a
402    casereader that does not directly support clone will cause the
403    casereader to be replaced by a shim caseader). */
404 void *
405 casereader_dynamic_cast (struct casereader *reader,
406                          const struct casereader_class *class)
407 {
408   return reader->class == class ? reader->aux : NULL;
409 }
410 \f
411 /* Random-access casereader implementation.
412
413    This is a set of wrappers around casereader_create_sequential
414    and struct casereader_class to make it easy to create
415    efficient casereaders for data sources that natively support
416    random access. */
417
418 /* One clone of a random reader. */
419 struct random_reader
420   {
421     struct random_reader_shared *shared; /* Data shared among clones. */
422     struct heap_node heap_node; /* Node in shared data's heap of readers. */
423     casenumber offset;          /* Number of cases already read. */
424   };
425
426 /* Returns the random_reader in which the given heap_node is
427    embedded. */
428 static struct random_reader *
429 random_reader_from_heap_node (const struct heap_node *node)
430 {
431   return heap_data (node, struct random_reader, heap_node);
432 }
433
434 /* Data shared among clones of a random reader. */
435 struct random_reader_shared
436   {
437     struct heap *readers;       /* Heap of struct random_readers. */
438     casenumber min_offset;      /* Smallest offset of any random_reader. */
439     const struct casereader_random_class *class;
440     void *aux;
441   };
442
443 static const struct casereader_class random_reader_casereader_class;
444
445 /* Creates and returns a new random_reader with the given SHARED
446    data and OFFSET.  Inserts the new random reader into the
447    shared heap. */
448 static struct random_reader *
449 make_random_reader (struct random_reader_shared *shared, casenumber offset)
450 {
451   struct random_reader *br = xmalloc (sizeof *br);
452   br->offset = offset;
453   br->shared = shared;
454   heap_insert (shared->readers, &br->heap_node);
455   return br;
456 }
457
458 /* Compares random_readers A and B by offset and returns a
459    strcmp()-like result. */
460 static int
461 compare_random_readers_by_offset (const struct heap_node *a_,
462                                   const struct heap_node *b_,
463                                   const void *aux UNUSED)
464 {
465   const struct random_reader *a = random_reader_from_heap_node (a_);
466   const struct random_reader *b = random_reader_from_heap_node (b_);
467   return a->offset < b->offset ? -1 : a->offset > b->offset;
468 }
469
470 /* Creates and returns a new casereader.  This function is
471    intended for use by casereader implementations, not by
472    casereader clients.
473
474    This function is most suited for creating a casereader for a
475    data source that supports random access.
476    casereader_create_sequential is more appropriate for a data
477    source that is naturally sequential.
478
479    PROTO must be the prototype for the cases that may be read
480    from the casereader.  The caller retains its reference to
481    PROTO.
482
483    CASE_CNT is an upper limit on the number of cases that
484    casereader_read will return from the casereader in successive
485    calls.  Ordinarily, this is the actual number of cases in the
486    data source or CASENUMBER_MAX if the number of cases cannot be
487    predicted in advance.
488
489    CLASS and AUX are a set of casereader implementation-specific
490    member functions and auxiliary data to pass to those member
491    functions, respectively. */
492 struct casereader *
493 casereader_create_random (const struct caseproto *proto, casenumber case_cnt,
494                           const struct casereader_random_class *class,
495                           void *aux)
496 {
497   struct random_reader_shared *shared = xmalloc (sizeof *shared);
498   shared->readers = heap_create (compare_random_readers_by_offset, NULL);
499   shared->class = class;
500   shared->aux = aux;
501   shared->min_offset = 0;
502   return casereader_create_sequential (NULL, proto, case_cnt,
503                                        &random_reader_casereader_class,
504                                        make_random_reader (shared, 0));
505 }
506
507 /* Reassesses the min_offset in SHARED based on the minimum
508    offset in the heap.   */
509 static void
510 advance_random_reader (struct casereader *reader,
511                        struct random_reader_shared *shared)
512 {
513   casenumber old, new;
514
515   old = shared->min_offset;
516   new = random_reader_from_heap_node (heap_minimum (shared->readers))->offset;
517   assert (new >= old);
518   if (new > old)
519     {
520       shared->min_offset = new;
521       shared->class->advance (reader, shared->aux, new - old);
522     }
523 }
524
525 /* struct casereader_class "read" function for random reader. */
526 static struct ccase *
527 random_reader_read (struct casereader *reader, void *br_)
528 {
529   struct random_reader *br = br_;
530   struct random_reader_shared *shared = br->shared;
531   struct ccase *c = shared->class->read (reader, shared->aux,
532                                          br->offset - shared->min_offset);
533   if (c != NULL)
534     {
535       br->offset++;
536       heap_changed (shared->readers, &br->heap_node);
537       advance_random_reader (reader, shared);
538     }
539   return c;
540 }
541
542 /* struct casereader_class "destroy" function for random
543    reader. */
544 static void
545 random_reader_destroy (struct casereader *reader, void *br_)
546 {
547   struct random_reader *br = br_;
548   struct random_reader_shared *shared = br->shared;
549
550   heap_delete (shared->readers, &br->heap_node);
551   if (heap_is_empty (shared->readers))
552     {
553       heap_destroy (shared->readers);
554       shared->class->destroy (reader, shared->aux);
555       free (shared);
556     }
557   else
558     advance_random_reader (reader, shared);
559
560   free (br);
561 }
562
563 /* struct casereader_class "clone" function for random reader. */
564 static struct casereader *
565 random_reader_clone (struct casereader *reader, void *br_)
566 {
567   struct random_reader *br = br_;
568   struct random_reader_shared *shared = br->shared;
569   return casereader_create_sequential (casereader_get_taint (reader),
570                                        reader->proto,
571                                        casereader_get_case_cnt (reader),
572                                        &random_reader_casereader_class,
573                                        make_random_reader (shared,
574                                                            br->offset));
575 }
576
577 /* struct casereader_class "peek" function for random reader. */
578 static struct ccase *
579 random_reader_peek (struct casereader *reader, void *br_, casenumber idx)
580 {
581   struct random_reader *br = br_;
582   struct random_reader_shared *shared = br->shared;
583
584   return shared->class->read (reader, shared->aux,
585                               br->offset - shared->min_offset + idx);
586 }
587
588 /* Casereader class for random reader. */
589 static const struct casereader_class random_reader_casereader_class =
590   {
591     random_reader_read,
592     random_reader_destroy,
593     random_reader_clone,
594     random_reader_peek,
595   };
596 \f
597 /* Buffering shim for implementing clone and peek operations.
598
599    The "clone" and "peek" operations aren't implemented by all
600    types of casereaders, but we have to expose a uniform
601    interface anyhow.  We do this by interposing a buffering
602    casereader on top of the existing casereader on the first call
603    to "clone" or "peek".  The buffering casereader maintains a
604    window of cases that spans the positions of the original
605    casereader and all of its clones (the "clone set"), from the
606    position of the casereader that has read the fewest cases to
607    the position of the casereader that has read the most.
608
609    Thus, if all of the casereaders in the clone set are at
610    approximately the same position, only a few cases are buffered
611    and there is little inefficiency.  If, on the other hand, one
612    casereader is not used to read any cases at all, but another
613    one is used to read all of the cases, the entire contents of
614    the casereader is copied into the buffer.  This still might
615    not be so inefficient, given that case data in memory is
616    shared across multiple identical copies, but in the worst case
617    the window implementation will write cases to disk instead of
618    maintaining them in-memory. */
619
620 /* A buffering shim for a non-clonable or non-peekable
621    casereader. */
622 struct shim
623   {
624     struct casewindow *window;          /* Window of buffered cases. */
625     struct casereader *subreader;       /* Subordinate casereader. */
626   };
627
628 static const struct casereader_random_class shim_class;
629
630 /* Interposes a buffering shim atop READER. */
631 static void
632 insert_shim (struct casereader *reader)
633 {
634   const struct caseproto *proto = casereader_get_proto (reader);
635   casenumber case_cnt = casereader_get_case_cnt (reader);
636   struct shim *b = xmalloc (sizeof *b);
637   b->window = casewindow_create (proto, settings_get_workspace_cases (proto));
638   b->subreader = casereader_create_random (proto, case_cnt, &shim_class, b);
639   casereader_swap (reader, b->subreader);
640   taint_propagate (casewindow_get_taint (b->window),
641                    casereader_get_taint (reader));
642   taint_propagate (casereader_get_taint (b->subreader),
643                    casereader_get_taint (reader));
644 }
645
646 /* Ensures that B's window contains at least CASE_CNT cases.
647    Return true if successful, false upon reaching the end of B's
648    subreader or an I/O error. */
649 static bool
650 prime_buffer (struct shim *b, casenumber case_cnt)
651 {
652   while (casewindow_get_case_cnt (b->window) < case_cnt)
653     {
654       struct ccase *tmp = casereader_read (b->subreader);
655       if (tmp == NULL)
656         return false;
657       casewindow_push_head (b->window, tmp);
658     }
659   return true;
660 }
661
662 /* Reads the case at the given 0-based OFFSET from the front of
663    the window into C.  Returns the case if successful, or a null
664    pointer if OFFSET is beyond the end of file or upon I/O error.
665    The caller must call case_unref() on the returned case when it
666    is no longer needed. */
667 static struct ccase *
668 shim_read (struct casereader *reader UNUSED, void *b_,
669            casenumber offset)
670 {
671   struct shim *b = b_;
672   if (!prime_buffer (b, offset + 1))
673     return NULL;
674   return casewindow_get_case (b->window, offset);
675 }
676
677 /* Destroys B. */
678 static void
679 shim_destroy (struct casereader *reader UNUSED, void *b_)
680 {
681   struct shim *b = b_;
682   casewindow_destroy (b->window);
683   casereader_destroy (b->subreader);
684   free (b);
685 }
686
687 /* Discards CNT cases from the front of B's window. */
688 static void
689 shim_advance (struct casereader *reader UNUSED, void *b_, casenumber case_cnt)
690 {
691   struct shim *b = b_;
692   casewindow_pop_tail (b->window, case_cnt);
693 }
694
695 /* Class for the buffered reader. */
696 static const struct casereader_random_class shim_class =
697   {
698     shim_read,
699     shim_destroy,
700     shim_advance,
701   };