Fix bug #21192. Thanks to John Darrington for review.
[pspp-builds.git] / src / data / casereader.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2007 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <data/casereader.h>
20 #include <data/casereader-provider.h>
21
22 #include <stdlib.h>
23
24 #include <data/casewindow.h>
25 #include <data/casewriter.h>
26 #include <data/settings.h>
27 #include <libpspp/assertion.h>
28 #include <libpspp/heap.h>
29 #include <libpspp/taint.h>
30
31 #include "xalloc.h"
32
33 /* A casereader. */
34 struct casereader
35   {
36     struct taint *taint;                  /* Corrupted? */
37     size_t value_cnt;                     /* Values per case. */
38     casenumber case_cnt;                  /* Number of cases,
39                                              CASENUMBER_MAX if unknown. */
40     const struct casereader_class *class; /* Class. */
41     void *aux;                            /* Auxiliary data for class. */
42   };
43
44 static void insert_shim (struct casereader *);
45
46 /* Creates a new case in C and reads the next case from READER
47    into it.  The caller owns C and must destroy C when its data
48    is no longer needed.  Return true if successful, false when
49    cases have been exhausted or upon detection of an I/O error.
50    In the latter case, C is set to the null case.
51
52    The case returned is effectively consumed: it can never be
53    read again through READER.  If this is inconvenient, READER
54    may be cloned in advance with casereader_clone, or
55    casereader_peek may be used instead. */
56 bool
57 casereader_read (struct casereader *reader, struct ccase *c)
58 {
59   if (reader->case_cnt != 0)
60     {
61       /* ->read may use casereader_swap to replace itself by
62          another reader and then delegate to that reader by
63          recursively calling casereader_read.  Currently only
64          lazy_casereader does this and, with luck, nothing else
65          ever will.
66
67          To allow this to work, however, we must decrement
68          case_cnt before calling ->read.  If we decremented
69          case_cnt after calling ->read, then this would actually
70          drop two cases from case_cnt instead of one, and we'd
71          lose the last case in the casereader. */
72       if (reader->case_cnt != CASENUMBER_MAX)
73         reader->case_cnt--;
74       if (reader->class->read (reader, reader->aux, c))
75         {
76           assert (case_get_value_cnt (c) >= reader->value_cnt);
77           return true;
78         }
79     }
80   reader->case_cnt = 0;
81   case_nullify (c);
82   return false;
83 }
84
85 /* Destroys READER.
86    Returns false if an I/O error was detected on READER, true
87    otherwise. */
88 bool
89 casereader_destroy (struct casereader *reader)
90 {
91   bool ok = true;
92   if (reader != NULL)
93     {
94       reader->class->destroy (reader, reader->aux);
95       ok = taint_destroy (reader->taint);
96       free (reader);
97     }
98   return ok;
99 }
100
101 /* Returns a clone of READER.  READER and its clone may be used
102    to read the same sequence of cases in the same order, barring
103    I/O errors. */
104 struct casereader *
105 casereader_clone (const struct casereader *reader_)
106 {
107   struct casereader *reader = (struct casereader *) reader_;
108   struct casereader *clone;
109   if ( reader == NULL ) 
110     return NULL;
111
112   if (reader->class->clone == NULL)
113     insert_shim (reader);
114   clone = reader->class->clone (reader, reader->aux);
115   assert (clone != NULL);
116   assert (clone != reader);
117   return clone;
118 }
119
120 /* Makes a copy of ORIGINAL into *NEW1 (if NEW1 is non-null) and
121    *NEW2 (if NEW2 is non-null), then destroys ORIGINAL. */
122 void
123 casereader_split (struct casereader *original,
124                   struct casereader **new1, struct casereader **new2)
125 {
126   if (new1 != NULL && new2 != NULL)
127     {
128       *new1 = casereader_rename (original);
129       *new2 = casereader_clone (*new1);
130     }
131   else if (new1 != NULL)
132     *new1 = casereader_rename (original);
133   else if (new2 != NULL)
134     *new2 = casereader_rename (original);
135   else
136     casereader_destroy (original);
137 }
138
139 /* Returns a copy of READER, which is itself destroyed.
140    Useful for taking over ownership of a casereader, to enforce
141    preventing the original owner from accessing the casereader
142    again. */
143 struct casereader *
144 casereader_rename (struct casereader *reader)
145 {
146   struct casereader *new = xmemdup (reader, sizeof *reader);
147   free (reader);
148   return new;
149 }
150
151 /* Exchanges the casereaders referred to by A and B. */
152 void
153 casereader_swap (struct casereader *a, struct casereader *b)
154 {
155   if (a != b)
156     {
157       struct casereader tmp = *a;
158       *a = *b;
159       *b = tmp;
160     }
161 }
162
163 /* Creates a new case in C and reads the (IDX + 1)'th case from
164    READER into it.  The caller owns C and must destroy C when its
165    data is no longer needed.  Return true if successful, false
166    when cases have been exhausted or upon detection of an I/O
167    error.  In the latter case, C is set to the null case. */
168 bool
169 casereader_peek (struct casereader *reader, casenumber idx, struct ccase *c)
170 {
171   if (idx < reader->case_cnt)
172     {
173       if (reader->class->peek == NULL)
174         insert_shim (reader);
175       if (reader->class->peek (reader, reader->aux, idx, c))
176         return true;
177       else if (casereader_error (reader))
178         reader->case_cnt = 0;
179     }
180   if (reader->case_cnt > idx)
181     reader->case_cnt = idx;
182   case_nullify (c);
183   return false;
184 }
185
186 /* Returns true if an I/O error or another hard error has
187    occurred on READER, a clone of READER, or on some object on
188    which READER's data has a dependency, false otherwise. */
189 bool
190 casereader_error (const struct casereader *reader)
191 {
192   return taint_is_tainted (reader->taint);
193 }
194
195 /* Marks READER as having encountered an error.
196
197    Ordinarily, this function should be called by the
198    implementation of a casereader, not by the casereader's
199    client.  Instead, casereader clients should usually ensure
200    that a casereader's error state is correct by using
201    taint_propagate to propagate to the casereader's taint
202    structure, which may be obtained via casereader_get_taint. */
203 void
204 casereader_force_error (struct casereader *reader)
205 {
206   taint_set_taint (reader->taint);
207 }
208
209 /* Returns READER's associate taint object, for use with
210    taint_propagate and other taint functions. */
211 const struct taint *
212 casereader_get_taint (const struct casereader *reader)
213 {
214   return reader->taint;
215 }
216
217 /* Returns the number of cases that will be read by successive
218    calls to casereader_read for READER, assuming that no errors
219    occur.  Upon an error condition, the case count drops to 0, so
220    that no more cases can be obtained.
221
222    Not all casereaders can predict the number of cases that they
223    will produce without actually reading all of them.  In that
224    case, this function returns CASENUMBER_MAX.  To obtain the
225    actual number of cases in such a casereader, use
226    casereader_count_cases. */
227 casenumber
228 casereader_get_case_cnt (struct casereader *reader)
229 {
230   return reader->case_cnt;
231 }
232
233 /* Returns the number of cases that will be read by successive
234    calls to casereader_read for READER, assuming that no errors
235    occur.  Upon an error condition, the case count drops to 0, so
236    that no more cases can be obtained.
237
238    For a casereader that cannot predict the number of cases it
239    will produce, this function actually reads (and discards) all
240    of the contents of a clone of READER.  Thus, the return value
241    is always correct in the absence of I/O errors. */
242 casenumber
243 casereader_count_cases (struct casereader *reader)
244 {
245   if (reader->case_cnt == CASENUMBER_MAX)
246     {
247       casenumber n_cases = 0;
248       struct ccase c;
249
250       struct casereader *clone = casereader_clone (reader);
251
252       for (; casereader_read (clone, &c); case_destroy (&c))
253         n_cases++;
254
255       casereader_destroy (clone);
256       reader->case_cnt = n_cases;
257     }
258
259   return reader->case_cnt;
260 }
261
262 /* Returns the number of struct values in each case in READER. */
263 size_t
264 casereader_get_value_cnt (struct casereader *reader)
265 {
266   return reader->value_cnt;
267 }
268
269 /* Copies all the cases in READER to WRITER, propagating errors
270    appropriately. */
271 void
272 casereader_transfer (struct casereader *reader, struct casewriter *writer)
273 {
274   struct ccase c;
275
276   taint_propagate (casereader_get_taint (reader),
277                    casewriter_get_taint (writer));
278   while (casereader_read (reader, &c))
279     casewriter_write (writer, &c);
280   casereader_destroy (reader);
281 }
282
283 /* Creates and returns a new casereader.  This function is
284    intended for use by casereader implementations, not by
285    casereader clients.
286
287    This function is most suited for creating a casereader for a
288    data source that is naturally sequential.
289    casereader_create_random may be more appropriate for a data
290    source that supports random access.
291
292    Ordinarily, specify a null pointer for TAINT, in which case
293    the new casereader will have a new, unique taint object.  If
294    the new casereader should have a clone of an existing taint
295    object, specify that object as TAINT.  (This is most commonly
296    useful in an implementation of the "clone" casereader_class
297    function, in which case the cloned casereader should have the
298    same taint object as the original casereader.)
299
300    VALUE_CNT must be the number of struct values per case read
301    from the casereader.
302
303    CASE_CNT is an upper limit on the number of cases that
304    casereader_read will return from the casereader in successive
305    calls.  Ordinarily, this is the actual number of cases in the
306    data source or CASENUMBER_MAX if the number of cases cannot be
307    predicted in advance.
308
309    CLASS and AUX are a set of casereader implementation-specific
310    member functions and auxiliary data to pass to those member
311    functions, respectively. */
312 struct casereader *
313 casereader_create_sequential (const struct taint *taint,
314                               size_t value_cnt, casenumber case_cnt,
315                               const struct casereader_class *class, void *aux)
316 {
317   struct casereader *reader = xmalloc (sizeof *reader);
318   reader->taint = taint != NULL ? taint_clone (taint) : taint_create ();
319   reader->value_cnt = value_cnt;
320   reader->case_cnt = case_cnt;
321   reader->class = class;
322   reader->aux = aux;
323   return reader;
324 }
325
326 /* If READER is a casereader of the given CLASS, returns its
327    associated auxiliary data; otherwise, returns a null pointer.
328
329    This function is intended for use from casereader
330    implementations, not by casereader users.  Even within
331    casereader implementations, its usefulness is quite limited,
332    for at least two reasons.  First, every casereader member
333    function already receives a pointer to the casereader's
334    auxiliary data.  Second, a casereader's class can change
335    (through a call to casereader_swap) and this is in practice
336    quite common (e.g. any call to casereader_clone on a
337    casereader that does not directly support clone will cause the
338    casereader to be replaced by a shim caseader). */
339 void *
340 casereader_dynamic_cast (struct casereader *reader,
341                          struct casereader_class *class)
342 {
343   return reader->class == class ? reader->aux : NULL;
344 }
345 \f
346 /* Random-access casereader implementation.
347
348    This is a set of wrappers around casereader_create_sequential
349    and struct casereader_class to make it easy to create
350    efficient casereaders for data sources that natively support
351    random access. */
352
353 /* One clone of a random reader. */
354 struct random_reader
355   {
356     struct random_reader_shared *shared; /* Data shared among clones. */
357     struct heap_node heap_node; /* Node in shared data's heap of readers. */
358     casenumber offset;          /* Number of cases already read. */
359   };
360
361 /* Returns the random_reader in which the given heap_node is
362    embedded. */
363 static struct random_reader *
364 random_reader_from_heap_node (const struct heap_node *node)
365 {
366   return heap_data (node, struct random_reader, heap_node);
367 }
368
369 /* Data shared among clones of a random reader. */
370 struct random_reader_shared
371   {
372     struct heap *readers;       /* Heap of struct random_readers. */
373     casenumber min_offset;      /* Smallest offset of any random_reader. */
374     const struct casereader_random_class *class;
375     void *aux;
376   };
377
378 static struct casereader_class random_reader_casereader_class;
379
380 /* Creates and returns a new random_reader with the given SHARED
381    data and OFFSET.  Inserts the new random reader into the
382    shared heap. */
383 static struct random_reader *
384 make_random_reader (struct random_reader_shared *shared, casenumber offset)
385 {
386   struct random_reader *br = xmalloc (sizeof *br);
387   br->offset = offset;
388   br->shared = shared;
389   heap_insert (shared->readers, &br->heap_node);
390   return br;
391 }
392
393 /* Compares random_readers A and B by offset and returns a
394    strcmp()-like result. */
395 static int
396 compare_random_readers_by_offset (const struct heap_node *a_,
397                                   const struct heap_node *b_,
398                                   const void *aux UNUSED)
399 {
400   const struct random_reader *a = random_reader_from_heap_node (a_);
401   const struct random_reader *b = random_reader_from_heap_node (b_);
402   return a->offset < b->offset ? -1 : a->offset > b->offset;
403 }
404
405 /* Creates and returns a new casereader.  This function is
406    intended for use by casereader implementations, not by
407    casereader clients.
408
409    This function is most suited for creating a casereader for a
410    data source that supports random access.
411    casereader_create_sequential is more appropriate for a data
412    source that is naturally sequential.
413
414    VALUE_CNT must be the number of struct values per case read
415    from the casereader.
416
417    CASE_CNT is an upper limit on the number of cases that
418    casereader_read will return from the casereader in successive
419    calls.  Ordinarily, this is the actual number of cases in the
420    data source or CASENUMBER_MAX if the number of cases cannot be
421    predicted in advance.
422
423    CLASS and AUX are a set of casereader implementation-specific
424    member functions and auxiliary data to pass to those member
425    functions, respectively. */
426 struct casereader *
427 casereader_create_random (size_t value_cnt, casenumber case_cnt,
428                           const struct casereader_random_class *class,
429                           void *aux)
430 {
431   struct random_reader_shared *shared = xmalloc (sizeof *shared);
432   shared->readers = heap_create (compare_random_readers_by_offset, NULL);
433   shared->class = class;
434   shared->aux = aux;
435   shared->min_offset = 0;
436   return casereader_create_sequential (NULL, value_cnt, case_cnt,
437                                        &random_reader_casereader_class,
438                                        make_random_reader (shared, 0));
439 }
440
441 /* Reassesses the min_offset in SHARED based on the minimum
442    offset in the heap.   */
443 static void
444 advance_random_reader (struct casereader *reader,
445                        struct random_reader_shared *shared)
446 {
447   casenumber old, new;
448
449   old = shared->min_offset;
450   new = random_reader_from_heap_node (heap_minimum (shared->readers))->offset;
451   assert (new >= old);
452   if (new > old)
453     {
454       shared->min_offset = new;
455       shared->class->advance (reader, shared->aux, new - old);
456     }
457 }
458
459 /* struct casereader_class "read" function for random reader. */
460 static bool
461 random_reader_read (struct casereader *reader, void *br_, struct ccase *c)
462 {
463   struct random_reader *br = br_;
464   struct random_reader_shared *shared = br->shared;
465
466   if (shared->class->read (reader, shared->aux,
467                            br->offset - shared->min_offset, c))
468     {
469       br->offset++;
470       heap_changed (shared->readers, &br->heap_node);
471       advance_random_reader (reader, shared);
472       return true;
473     }
474   else
475     return false;
476 }
477
478 /* struct casereader_class "destroy" function for random
479    reader. */
480 static void
481 random_reader_destroy (struct casereader *reader, void *br_)
482 {
483   struct random_reader *br = br_;
484   struct random_reader_shared *shared = br->shared;
485
486   heap_delete (shared->readers, &br->heap_node);
487   if (heap_is_empty (shared->readers))
488     {
489       heap_destroy (shared->readers);
490       shared->class->destroy (reader, shared->aux);
491       free (shared);
492     }
493   else
494     advance_random_reader (reader, shared);
495
496   free (br);
497 }
498
499 /* struct casereader_class "clone" function for random reader. */
500 static struct casereader *
501 random_reader_clone (struct casereader *reader, void *br_)
502 {
503   struct random_reader *br = br_;
504   struct random_reader_shared *shared = br->shared;
505   return casereader_create_sequential (casereader_get_taint (reader),
506                                        casereader_get_value_cnt (reader),
507                                        casereader_get_case_cnt (reader),
508                                        &random_reader_casereader_class,
509                                        make_random_reader (shared,
510                                                            br->offset));
511 }
512
513 /* struct casereader_class "peek" function for random reader. */
514 static bool
515 random_reader_peek (struct casereader *reader, void *br_,
516                     casenumber idx, struct ccase *c)
517 {
518   struct random_reader *br = br_;
519   struct random_reader_shared *shared = br->shared;
520
521   return shared->class->read (reader, shared->aux,
522                               br->offset - shared->min_offset + idx, c);
523 }
524
525 /* Casereader class for random reader. */
526 static struct casereader_class random_reader_casereader_class =
527   {
528     random_reader_read,
529     random_reader_destroy,
530     random_reader_clone,
531     random_reader_peek,
532   };
533 \f
534 /* Buffering shim for implementing clone and peek operations.
535
536    The "clone" and "peek" operations aren't implemented by all
537    types of casereaders, but we have to expose a uniform
538    interface anyhow.  We do this by interposing a buffering
539    casereader on top of the existing casereader on the first call
540    to "clone" or "peek".  The buffering casereader maintains a
541    window of cases that spans the positions of the original
542    casereader and all of its clones (the "clone set"), from the
543    position of the casereader that has read the fewest cases to
544    the position of the casereader that has read the most.
545
546    Thus, if all of the casereaders in the clone set are at
547    approximately the same position, only a few cases are buffered
548    and there is little inefficiency.  If, on the other hand, one
549    casereader is not used to read any cases at all, but another
550    one is used to read all of the cases, the entire contents of
551    the casereader is copied into the buffer.  This still might
552    not be so inefficient, given that case data in memory is
553    shared across multiple identical copies, but in the worst case
554    the window implementation will write cases to disk instead of
555    maintaining them in-memory. */
556
557 /* A buffering shim for a non-clonable or non-peekable
558    casereader. */
559 struct shim
560   {
561     struct casewindow *window;          /* Window of buffered cases. */
562     struct casereader *subreader;       /* Subordinate casereader. */
563   };
564
565 static struct casereader_random_class shim_class;
566
567 /* Interposes a buffering shim atop READER. */
568 static void
569 insert_shim (struct casereader *reader)
570 {
571   size_t value_cnt = casereader_get_value_cnt (reader);
572   casenumber case_cnt = casereader_get_case_cnt (reader);
573   struct shim *b = xmalloc (sizeof *b);
574   b->window = casewindow_create (value_cnt, get_workspace_cases (value_cnt));
575   b->subreader = casereader_create_random (value_cnt, case_cnt,
576                                            &shim_class, b);
577   casereader_swap (reader, b->subreader);
578   taint_propagate (casewindow_get_taint (b->window),
579                    casereader_get_taint (reader));
580   taint_propagate (casereader_get_taint (b->subreader),
581                    casereader_get_taint (reader));
582 }
583
584 /* Ensures that B's window contains at least CASE_CNT cases.
585    Return true if successful, false upon reaching the end of B's
586    subreader or an I/O error. */
587 static bool
588 prime_buffer (struct shim *b, casenumber case_cnt)
589 {
590   while (casewindow_get_case_cnt (b->window) < case_cnt)
591     {
592       struct ccase tmp;
593       if (!casereader_read (b->subreader, &tmp))
594         return false;
595       casewindow_push_head (b->window, &tmp);
596     }
597   return true;
598 }
599
600 /* Reads the case at the given 0-based OFFSET from the front of
601    the window into C.  Returns true if successful, false if
602    OFFSET is beyond the end of file or upon I/O error. */
603 static bool
604 shim_read (struct casereader *reader UNUSED, void *b_,
605            casenumber offset, struct ccase *c)
606 {
607   struct shim *b = b_;
608   return (prime_buffer (b, offset + 1)
609           && casewindow_get_case (b->window, offset, c));
610 }
611
612 /* Destroys B. */
613 static void
614 shim_destroy (struct casereader *reader UNUSED, void *b_)
615 {
616   struct shim *b = b_;
617   casewindow_destroy (b->window);
618   casereader_destroy (b->subreader);
619   free (b);
620 }
621
622 /* Discards CNT cases from the front of B's window. */
623 static void
624 shim_advance (struct casereader *reader UNUSED, void *b_, casenumber case_cnt)
625 {
626   struct shim *b = b_;
627   casewindow_pop_tail (b->window, case_cnt);
628 }
629
630 /* Class for the buffered reader. */
631 static struct casereader_random_class shim_class =
632   {
633     shim_read,
634     shim_destroy,
635     shim_advance,
636   };