Fix bug #20910.
[pspp-builds.git] / src / data / casereader.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2007 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <data/casereader.h>
20 #include <data/casereader-provider.h>
21
22 #include <stdlib.h>
23
24 #include <data/casewindow.h>
25 #include <data/casewriter.h>
26 #include <data/settings.h>
27 #include <libpspp/assertion.h>
28 #include <libpspp/heap.h>
29 #include <libpspp/taint.h>
30
31 #include "xalloc.h"
32
33 /* A casereader. */
34 struct casereader
35   {
36     struct taint *taint;                  /* Corrupted? */
37     size_t value_cnt;                     /* Values per case. */
38     casenumber case_cnt;                  /* Number of cases,
39                                              CASENUMBER_MAX if unknown. */
40     const struct casereader_class *class; /* Class. */
41     void *aux;                            /* Auxiliary data for class. */
42   };
43
44 static void insert_shim (struct casereader *);
45
46 /* Creates a new case in C and reads the next case from READER
47    into it.  The caller owns C and must destroy C when its data
48    is no longer needed.  Return true if successful, false when
49    cases have been exhausted or upon detection of an I/O error.
50    In the latter case, C is set to the null case.
51
52    The case returned is effectively consumed: it can never be
53    read again through READER.  If this is inconvenient, READER
54    may be cloned in advance with casereader_clone, or
55    casereader_peek may be used instead. */
56 bool
57 casereader_read (struct casereader *reader, struct ccase *c)
58 {
59   if (reader->case_cnt != 0 && reader->class->read (reader, reader->aux, c))
60     {
61       assert (case_get_value_cnt (c) >= reader->value_cnt);
62       if (reader->case_cnt != CASENUMBER_MAX)
63         reader->case_cnt--;
64       return true;
65     }
66   else
67     {
68       reader->case_cnt = 0;
69       case_nullify (c);
70       return false;
71     }
72 }
73
74 /* Destroys READER.
75    Returns false if an I/O error was detected on READER, true
76    otherwise. */
77 bool
78 casereader_destroy (struct casereader *reader)
79 {
80   bool ok = true;
81   if (reader != NULL)
82     {
83       reader->class->destroy (reader, reader->aux);
84       ok = taint_destroy (reader->taint);
85       free (reader);
86     }
87   return ok;
88 }
89
90 /* Returns a clone of READER.  READER and its clone may be used
91    to read the same sequence of cases in the same order, barring
92    I/O errors. */
93 struct casereader *
94 casereader_clone (const struct casereader *reader_)
95 {
96   struct casereader *reader = (struct casereader *) reader_;
97   struct casereader *clone;
98   if ( reader == NULL ) 
99     return NULL;
100
101   if (reader->class->clone == NULL)
102     insert_shim (reader);
103   clone = reader->class->clone (reader, reader->aux);
104   assert (clone != NULL);
105   assert (clone != reader);
106   return clone;
107 }
108
109 /* Makes a copy of ORIGINAL into *NEW1 (if NEW1 is non-null) and
110    *NEW2 (if NEW2 is non-null), then destroys ORIGINAL. */
111 void
112 casereader_split (struct casereader *original,
113                   struct casereader **new1, struct casereader **new2)
114 {
115   if (new1 != NULL && new2 != NULL)
116     {
117       *new1 = casereader_rename (original);
118       *new2 = casereader_clone (*new1);
119     }
120   else if (new1 != NULL)
121     *new1 = casereader_rename (original);
122   else if (new2 != NULL)
123     *new2 = casereader_rename (original);
124   else
125     casereader_destroy (original);
126 }
127
128 /* Returns a copy of READER, which is itself destroyed.
129    Useful for taking over ownership of a casereader, to enforce
130    preventing the original owner from accessing the casereader
131    again. */
132 struct casereader *
133 casereader_rename (struct casereader *reader)
134 {
135   struct casereader *new = xmemdup (reader, sizeof *reader);
136   free (reader);
137   return new;
138 }
139
140 /* Exchanges the casereaders referred to by A and B. */
141 void
142 casereader_swap (struct casereader *a, struct casereader *b)
143 {
144   if (a != b)
145     {
146       struct casereader tmp = *a;
147       *a = *b;
148       *b = tmp;
149     }
150 }
151
152 /* Creates a new case in C and reads the (IDX + 1)'th case from
153    READER into it.  The caller owns C and must destroy C when its
154    data is no longer needed.  Return true if successful, false
155    when cases have been exhausted or upon detection of an I/O
156    error.  In the latter case, C is set to the null case. */
157 bool
158 casereader_peek (struct casereader *reader, casenumber idx, struct ccase *c)
159 {
160   if (idx < reader->case_cnt)
161     {
162       if (reader->class->peek == NULL)
163         insert_shim (reader);
164       if (reader->class->peek (reader, reader->aux, idx, c))
165         return true;
166       else if (casereader_error (reader))
167         reader->case_cnt = 0;
168     }
169   if (reader->case_cnt > idx)
170     reader->case_cnt = idx;
171   case_nullify (c);
172   return false;
173 }
174
175 /* Returns true if an I/O error or another hard error has
176    occurred on READER, a clone of READER, or on some object on
177    which READER's data has a dependency, false otherwise. */
178 bool
179 casereader_error (const struct casereader *reader)
180 {
181   return taint_is_tainted (reader->taint);
182 }
183
184 /* Marks READER as having encountered an error.
185
186    Ordinarily, this function should be called by the
187    implementation of a casereader, not by the casereader's
188    client.  Instead, casereader clients should usually ensure
189    that a casereader's error state is correct by using
190    taint_propagate to propagate to the casereader's taint
191    structure, which may be obtained via casereader_get_taint. */
192 void
193 casereader_force_error (struct casereader *reader)
194 {
195   taint_set_taint (reader->taint);
196 }
197
198 /* Returns READER's associate taint object, for use with
199    taint_propagate and other taint functions. */
200 const struct taint *
201 casereader_get_taint (const struct casereader *reader)
202 {
203   return reader->taint;
204 }
205
206 /* Returns the number of cases that will be read by successive
207    calls to casereader_read for READER, assuming that no errors
208    occur.  Upon an error condition, the case count drops to 0, so
209    that no more cases can be obtained.
210
211    Not all casereaders can predict the number of cases that they
212    will produce without actually reading all of them.  In that
213    case, this function returns CASENUMBER_MAX.  To obtain the
214    actual number of cases in such a casereader, use
215    casereader_count_cases. */
216 casenumber
217 casereader_get_case_cnt (struct casereader *reader)
218 {
219   return reader->case_cnt;
220 }
221
222 /* Returns the number of cases that will be read by successive
223    calls to casereader_read for READER, assuming that no errors
224    occur.  Upon an error condition, the case count drops to 0, so
225    that no more cases can be obtained.
226
227    For a casereader that cannot predict the number of cases it
228    will produce, this function actually reads (and discards) all
229    of the contents of a clone of READER.  Thus, the return value
230    is always correct in the absence of I/O errors. */
231 casenumber
232 casereader_count_cases (struct casereader *reader)
233 {
234   if (reader->case_cnt == CASENUMBER_MAX)
235     {
236       casenumber n_cases = 0;
237       struct ccase c;
238
239       struct casereader *clone = casereader_clone (reader);
240
241       for (; casereader_read (clone, &c); case_destroy (&c))
242         n_cases++;
243
244       casereader_destroy (clone);
245       reader->case_cnt = n_cases;
246     }
247
248   return reader->case_cnt;
249 }
250
251 /* Returns the number of struct values in each case in READER. */
252 size_t
253 casereader_get_value_cnt (struct casereader *reader)
254 {
255   return reader->value_cnt;
256 }
257
258 /* Copies all the cases in READER to WRITER, propagating errors
259    appropriately. */
260 void
261 casereader_transfer (struct casereader *reader, struct casewriter *writer)
262 {
263   struct ccase c;
264
265   taint_propagate (casereader_get_taint (reader),
266                    casewriter_get_taint (writer));
267   while (casereader_read (reader, &c))
268     casewriter_write (writer, &c);
269   casereader_destroy (reader);
270 }
271
272 /* Creates and returns a new casereader.  This function is
273    intended for use by casereader implementations, not by
274    casereader clients.
275
276    This function is most suited for creating a casereader for a
277    data source that is naturally sequential.
278    casereader_create_random may be more appropriate for a data
279    source that supports random access.
280
281    Ordinarily, specify a null pointer for TAINT, in which case
282    the new casereader will have a new, unique taint object.  If
283    the new casereader should have a clone of an existing taint
284    object, specify that object as TAINT.  (This is most commonly
285    useful in an implementation of the "clone" casereader_class
286    function, in which case the cloned casereader should have the
287    same taint object as the original casereader.)
288
289    VALUE_CNT must be the number of struct values per case read
290    from the casereader.
291
292    CASE_CNT is an upper limit on the number of cases that
293    casereader_read will return from the casereader in successive
294    calls.  Ordinarily, this is the actual number of cases in the
295    data source or CASENUMBER_MAX if the number of cases cannot be
296    predicted in advance.
297
298    CLASS and AUX are a set of casereader implementation-specific
299    member functions and auxiliary data to pass to those member
300    functions, respectively. */
301 struct casereader *
302 casereader_create_sequential (const struct taint *taint,
303                               size_t value_cnt, casenumber case_cnt,
304                               const struct casereader_class *class, void *aux)
305 {
306   struct casereader *reader = xmalloc (sizeof *reader);
307   reader->taint = taint != NULL ? taint_clone (taint) : taint_create ();
308   reader->value_cnt = value_cnt;
309   reader->case_cnt = case_cnt;
310   reader->class = class;
311   reader->aux = aux;
312   return reader;
313 }
314
315 /* If READER is a casereader of the given CLASS, returns its
316    associated auxiliary data; otherwise, returns a null pointer.
317
318    This function is intended for use from casereader
319    implementations, not by casereader users.  Even within
320    casereader implementations, its usefulness is quite limited,
321    for at least two reasons.  First, every casereader member
322    function already receives a pointer to the casereader's
323    auxiliary data.  Second, a casereader's class can change
324    (through a call to casereader_swap) and this is in practice
325    quite common (e.g. any call to casereader_clone on a
326    casereader that does not directly support clone will cause the
327    casereader to be replaced by a shim caseader). */
328 void *
329 casereader_dynamic_cast (struct casereader *reader,
330                          struct casereader_class *class)
331 {
332   return reader->class == class ? reader->aux : NULL;
333 }
334 \f
335 /* Random-access casereader implementation.
336
337    This is a set of wrappers around casereader_create_sequential
338    and struct casereader_class to make it easy to create
339    efficient casereaders for data sources that natively support
340    random access. */
341
342 /* One clone of a random reader. */
343 struct random_reader
344   {
345     struct random_reader_shared *shared; /* Data shared among clones. */
346     struct heap_node heap_node; /* Node in shared data's heap of readers. */
347     casenumber offset;          /* Number of cases already read. */
348   };
349
350 /* Returns the random_reader in which the given heap_node is
351    embedded. */
352 static struct random_reader *
353 random_reader_from_heap_node (const struct heap_node *node)
354 {
355   return heap_data (node, struct random_reader, heap_node);
356 }
357
358 /* Data shared among clones of a random reader. */
359 struct random_reader_shared
360   {
361     struct heap *readers;       /* Heap of struct random_readers. */
362     casenumber min_offset;      /* Smallest offset of any random_reader. */
363     const struct casereader_random_class *class;
364     void *aux;
365   };
366
367 static struct casereader_class random_reader_casereader_class;
368
369 /* Creates and returns a new random_reader with the given SHARED
370    data and OFFSET.  Inserts the new random reader into the
371    shared heap. */
372 static struct random_reader *
373 make_random_reader (struct random_reader_shared *shared, casenumber offset)
374 {
375   struct random_reader *br = xmalloc (sizeof *br);
376   br->offset = offset;
377   br->shared = shared;
378   heap_insert (shared->readers, &br->heap_node);
379   return br;
380 }
381
382 /* Compares random_readers A and B by offset and returns a
383    strcmp()-like result. */
384 static int
385 compare_random_readers_by_offset (const struct heap_node *a_,
386                                   const struct heap_node *b_,
387                                   const void *aux UNUSED)
388 {
389   const struct random_reader *a = random_reader_from_heap_node (a_);
390   const struct random_reader *b = random_reader_from_heap_node (b_);
391   return a->offset < b->offset ? -1 : a->offset > b->offset;
392 }
393
394 /* Creates and returns a new casereader.  This function is
395    intended for use by casereader implementations, not by
396    casereader clients.
397
398    This function is most suited for creating a casereader for a
399    data source that supports random access.
400    casereader_create_sequential is more appropriate for a data
401    source that is naturally sequential.
402
403    VALUE_CNT must be the number of struct values per case read
404    from the casereader.
405
406    CASE_CNT is an upper limit on the number of cases that
407    casereader_read will return from the casereader in successive
408    calls.  Ordinarily, this is the actual number of cases in the
409    data source or CASENUMBER_MAX if the number of cases cannot be
410    predicted in advance.
411
412    CLASS and AUX are a set of casereader implementation-specific
413    member functions and auxiliary data to pass to those member
414    functions, respectively. */
415 struct casereader *
416 casereader_create_random (size_t value_cnt, casenumber case_cnt,
417                           const struct casereader_random_class *class,
418                           void *aux)
419 {
420   struct random_reader_shared *shared = xmalloc (sizeof *shared);
421   shared->readers = heap_create (compare_random_readers_by_offset, NULL);
422   shared->class = class;
423   shared->aux = aux;
424   shared->min_offset = 0;
425   return casereader_create_sequential (NULL, value_cnt, case_cnt,
426                                        &random_reader_casereader_class,
427                                        make_random_reader (shared, 0));
428 }
429
430 /* Reassesses the min_offset in SHARED based on the minimum
431    offset in the heap.   */
432 static void
433 advance_random_reader (struct casereader *reader,
434                        struct random_reader_shared *shared)
435 {
436   casenumber old, new;
437
438   old = shared->min_offset;
439   new = random_reader_from_heap_node (heap_minimum (shared->readers))->offset;
440   assert (new >= old);
441   if (new > old)
442     {
443       shared->min_offset = new;
444       shared->class->advance (reader, shared->aux, new - old);
445     }
446 }
447
448 /* struct casereader_class "read" function for random reader. */
449 static bool
450 random_reader_read (struct casereader *reader, void *br_, struct ccase *c)
451 {
452   struct random_reader *br = br_;
453   struct random_reader_shared *shared = br->shared;
454
455   if (shared->class->read (reader, shared->aux,
456                            br->offset - shared->min_offset, c))
457     {
458       br->offset++;
459       heap_changed (shared->readers, &br->heap_node);
460       advance_random_reader (reader, shared);
461       return true;
462     }
463   else
464     return false;
465 }
466
467 /* struct casereader_class "destroy" function for random
468    reader. */
469 static void
470 random_reader_destroy (struct casereader *reader, void *br_)
471 {
472   struct random_reader *br = br_;
473   struct random_reader_shared *shared = br->shared;
474
475   heap_delete (shared->readers, &br->heap_node);
476   if (heap_is_empty (shared->readers))
477     {
478       heap_destroy (shared->readers);
479       shared->class->destroy (reader, shared->aux);
480       free (shared);
481     }
482   else
483     advance_random_reader (reader, shared);
484
485   free (br);
486 }
487
488 /* struct casereader_class "clone" function for random reader. */
489 static struct casereader *
490 random_reader_clone (struct casereader *reader, void *br_)
491 {
492   struct random_reader *br = br_;
493   struct random_reader_shared *shared = br->shared;
494   return casereader_create_sequential (casereader_get_taint (reader),
495                                        casereader_get_value_cnt (reader),
496                                        casereader_get_case_cnt (reader),
497                                        &random_reader_casereader_class,
498                                        make_random_reader (shared,
499                                                            br->offset));
500 }
501
502 /* struct casereader_class "peek" function for random reader. */
503 static bool
504 random_reader_peek (struct casereader *reader, void *br_,
505                     casenumber idx, struct ccase *c)
506 {
507   struct random_reader *br = br_;
508   struct random_reader_shared *shared = br->shared;
509
510   return shared->class->read (reader, shared->aux,
511                               br->offset - shared->min_offset + idx, c);
512 }
513
514 /* Casereader class for random reader. */
515 static struct casereader_class random_reader_casereader_class =
516   {
517     random_reader_read,
518     random_reader_destroy,
519     random_reader_clone,
520     random_reader_peek,
521   };
522 \f
523 /* Buffering shim for implementing clone and peek operations.
524
525    The "clone" and "peek" operations aren't implemented by all
526    types of casereaders, but we have to expose a uniform
527    interface anyhow.  We do this by interposing a buffering
528    casereader on top of the existing casereader on the first call
529    to "clone" or "peek".  The buffering casereader maintains a
530    window of cases that spans the positions of the original
531    casereader and all of its clones (the "clone set"), from the
532    position of the casereader that has read the fewest cases to
533    the position of the casereader that has read the most.
534
535    Thus, if all of the casereaders in the clone set are at
536    approximately the same position, only a few cases are buffered
537    and there is little inefficiency.  If, on the other hand, one
538    casereader is not used to read any cases at all, but another
539    one is used to read all of the cases, the entire contents of
540    the casereader is copied into the buffer.  This still might
541    not be so inefficient, given that case data in memory is
542    shared across multiple identical copies, but in the worst case
543    the window implementation will write cases to disk instead of
544    maintaining them in-memory. */
545
546 /* A buffering shim for a non-clonable or non-peekable
547    casereader. */
548 struct shim
549   {
550     struct casewindow *window;          /* Window of buffered cases. */
551     struct casereader *subreader;       /* Subordinate casereader. */
552   };
553
554 static struct casereader_random_class shim_class;
555
556 /* Interposes a buffering shim atop READER. */
557 static void
558 insert_shim (struct casereader *reader)
559 {
560   size_t value_cnt = casereader_get_value_cnt (reader);
561   casenumber case_cnt = casereader_get_case_cnt (reader);
562   struct shim *b = xmalloc (sizeof *b);
563   b->window = casewindow_create (value_cnt, get_workspace_cases (value_cnt));
564   b->subreader = casereader_create_random (value_cnt, case_cnt,
565                                            &shim_class, b);
566   casereader_swap (reader, b->subreader);
567   taint_propagate (casewindow_get_taint (b->window),
568                    casereader_get_taint (reader));
569   taint_propagate (casereader_get_taint (b->subreader),
570                    casereader_get_taint (reader));
571 }
572
573 /* Ensures that B's window contains at least CASE_CNT cases.
574    Return true if successful, false upon reaching the end of B's
575    subreader or an I/O error. */
576 static bool
577 prime_buffer (struct shim *b, casenumber case_cnt)
578 {
579   while (casewindow_get_case_cnt (b->window) < case_cnt)
580     {
581       struct ccase tmp;
582       if (!casereader_read (b->subreader, &tmp))
583         return false;
584       casewindow_push_head (b->window, &tmp);
585     }
586   return true;
587 }
588
589 /* Reads the case at the given 0-based OFFSET from the front of
590    the window into C.  Returns true if successful, false if
591    OFFSET is beyond the end of file or upon I/O error. */
592 static bool
593 shim_read (struct casereader *reader UNUSED, void *b_,
594            casenumber offset, struct ccase *c)
595 {
596   struct shim *b = b_;
597   return (prime_buffer (b, offset + 1)
598           && casewindow_get_case (b->window, offset, c));
599 }
600
601 /* Destroys B. */
602 static void
603 shim_destroy (struct casereader *reader UNUSED, void *b_)
604 {
605   struct shim *b = b_;
606   casewindow_destroy (b->window);
607   casereader_destroy (b->subreader);
608   free (b);
609 }
610
611 /* Discards CNT cases from the front of B's window. */
612 static void
613 shim_advance (struct casereader *reader UNUSED, void *b_, casenumber case_cnt)
614 {
615   struct shim *b = b_;
616   casewindow_pop_tail (b->window, case_cnt);
617 }
618
619 /* Class for the buffered reader. */
620 static struct casereader_random_class shim_class =
621   {
622     shim_read,
623     shim_destroy,
624     shim_advance,
625   };