3d27a9192a79bd63466e25e2059ce91824e94241
[pspp-builds.git] / src / data / casereader.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2007, 2009 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <data/casereader.h>
20 #include <data/casereader-provider.h>
21
22 #include <stdlib.h>
23
24 #include <data/casewindow.h>
25 #include <data/casewriter.h>
26 #include <data/settings.h>
27 #include <libpspp/assertion.h>
28 #include <libpspp/heap.h>
29 #include <libpspp/taint.h>
30
31 #include "xalloc.h"
32
33 /* A casereader. */
34 struct casereader
35   {
36     struct taint *taint;                  /* Corrupted? */
37     size_t value_cnt;                     /* Values per case. */
38     casenumber case_cnt;                  /* Number of cases,
39                                              CASENUMBER_MAX if unknown. */
40     const struct casereader_class *class; /* Class. */
41     void *aux;                            /* Auxiliary data for class. */
42   };
43
44 static void insert_shim (struct casereader *);
45
46 /* Reads and returns the next case from READER.  The caller owns
47    the returned case and must call case_unref on it when its data
48    is no longer needed.  Returns a null pointer if cases have
49    been exhausted or upon detection of an I/O error.
50
51    The case returned is effectively consumed: it can never be
52    read again through READER.  If this is inconvenient, READER
53    may be cloned in advance with casereader_clone, or
54    casereader_peek may be used instead. */
55 struct ccase *
56 casereader_read (struct casereader *reader)
57 {
58   if (reader->case_cnt != 0)
59     {
60       /* ->read may use casereader_swap to replace itself by
61          another reader and then delegate to that reader by
62          recursively calling casereader_read.  Currently only
63          lazy_casereader does this and, with luck, nothing else
64          ever will.
65
66          To allow this to work, however, we must decrement
67          case_cnt before calling ->read.  If we decremented
68          case_cnt after calling ->read, then this would actually
69          drop two cases from case_cnt instead of one, and we'd
70          lose the last case in the casereader. */
71       struct ccase *c;
72       if (reader->case_cnt != CASENUMBER_MAX)
73         reader->case_cnt--;
74       c = reader->class->read (reader, reader->aux);
75       if (c != NULL)
76         {
77           assert (case_get_value_cnt (c) >= reader->value_cnt);
78           return c;
79         }
80     }
81   reader->case_cnt = 0;
82   return NULL;
83 }
84
85 /* Destroys READER.
86    Returns false if an I/O error was detected on READER, true
87    otherwise. */
88 bool
89 casereader_destroy (struct casereader *reader)
90 {
91   bool ok = true;
92   if (reader != NULL)
93     {
94       reader->class->destroy (reader, reader->aux);
95       ok = taint_destroy (reader->taint);
96       free (reader);
97     }
98   return ok;
99 }
100
101 /* Returns a clone of READER.  READER and its clone may be used
102    to read the same sequence of cases in the same order, barring
103    I/O errors. */
104 struct casereader *
105 casereader_clone (const struct casereader *reader_)
106 {
107   struct casereader *reader = (struct casereader *) reader_;
108   struct casereader *clone;
109   if ( reader == NULL ) 
110     return NULL;
111
112   if (reader->class->clone == NULL)
113     insert_shim (reader);
114   clone = reader->class->clone (reader, reader->aux);
115   assert (clone != NULL);
116   assert (clone != reader);
117   return clone;
118 }
119
120 /* Makes a copy of ORIGINAL into *NEW1 (if NEW1 is non-null) and
121    *NEW2 (if NEW2 is non-null), then destroys ORIGINAL. */
122 void
123 casereader_split (struct casereader *original,
124                   struct casereader **new1, struct casereader **new2)
125 {
126   if (new1 != NULL && new2 != NULL)
127     {
128       *new1 = casereader_rename (original);
129       *new2 = casereader_clone (*new1);
130     }
131   else if (new1 != NULL)
132     *new1 = casereader_rename (original);
133   else if (new2 != NULL)
134     *new2 = casereader_rename (original);
135   else
136     casereader_destroy (original);
137 }
138
139 /* Returns a copy of READER, which is itself destroyed.
140    Useful for taking over ownership of a casereader, to enforce
141    preventing the original owner from accessing the casereader
142    again. */
143 struct casereader *
144 casereader_rename (struct casereader *reader)
145 {
146   struct casereader *new = xmemdup (reader, sizeof *reader);
147   free (reader);
148   return new;
149 }
150
151 /* Exchanges the casereaders referred to by A and B. */
152 void
153 casereader_swap (struct casereader *a, struct casereader *b)
154 {
155   if (a != b)
156     {
157       struct casereader tmp = *a;
158       *a = *b;
159       *b = tmp;
160     }
161 }
162
163 /* Reads and returns the (IDX + 1)'th case from READER.  The
164    caller owns the returned case and must call case_unref on it
165    when it is no longer needed.  Returns a null pointer if cases
166    have been exhausted or upon detection of an I/O error. */
167 struct ccase *
168 casereader_peek (struct casereader *reader, casenumber idx)
169 {
170   if (idx < reader->case_cnt)
171     {
172       struct ccase *c;
173       if (reader->class->peek == NULL)
174         insert_shim (reader);
175       c = reader->class->peek (reader, reader->aux, idx);
176       if (c != NULL)
177         return c;
178       else if (casereader_error (reader))
179         reader->case_cnt = 0;
180     }
181   if (reader->case_cnt > idx)
182     reader->case_cnt = idx;
183   return NULL;
184 }
185
186 /* Returns true if no cases remain to be read from READER, or if
187    an error has occurred on READER.  (A return value of false
188    does *not* mean that the next call to casereader_peek or
189    casereader_read will return true, because an error can occur
190    in the meantime.) */
191 bool
192 casereader_is_empty (struct casereader *reader)
193 {
194   if (reader->case_cnt == 0)
195     return true;
196   else
197     {
198       struct ccase *c = casereader_peek (reader, 0);
199       if (c == NULL)
200         return true;
201       else
202         {
203           case_unref (c);
204           return false;
205         }
206     }
207 }
208
209 /* Returns true if an I/O error or another hard error has
210    occurred on READER, a clone of READER, or on some object on
211    which READER's data has a dependency, false otherwise. */
212 bool
213 casereader_error (const struct casereader *reader)
214 {
215   return taint_is_tainted (reader->taint);
216 }
217
218 /* Marks READER as having encountered an error.
219
220    Ordinarily, this function should be called by the
221    implementation of a casereader, not by the casereader's
222    client.  Instead, casereader clients should usually ensure
223    that a casereader's error state is correct by using
224    taint_propagate to propagate to the casereader's taint
225    structure, which may be obtained via casereader_get_taint. */
226 void
227 casereader_force_error (struct casereader *reader)
228 {
229   taint_set_taint (reader->taint);
230 }
231
232 /* Returns READER's associate taint object, for use with
233    taint_propagate and other taint functions. */
234 const struct taint *
235 casereader_get_taint (const struct casereader *reader)
236 {
237   return reader->taint;
238 }
239
240 /* Returns the number of cases that will be read by successive
241    calls to casereader_read for READER, assuming that no errors
242    occur.  Upon an error condition, the case count drops to 0, so
243    that no more cases can be obtained.
244
245    Not all casereaders can predict the number of cases that they
246    will produce without actually reading all of them.  In that
247    case, this function returns CASENUMBER_MAX.  To obtain the
248    actual number of cases in such a casereader, use
249    casereader_count_cases. */
250 casenumber
251 casereader_get_case_cnt (struct casereader *reader)
252 {
253   return reader->case_cnt;
254 }
255
256 /* Returns the number of cases that will be read by successive
257    calls to casereader_read for READER, assuming that no errors
258    occur.  Upon an error condition, the case count drops to 0, so
259    that no more cases can be obtained.
260
261    For a casereader that cannot predict the number of cases it
262    will produce, this function actually reads (and discards) all
263    of the contents of a clone of READER.  Thus, the return value
264    is always correct in the absence of I/O errors. */
265 casenumber
266 casereader_count_cases (struct casereader *reader)
267 {
268   if (reader->case_cnt == CASENUMBER_MAX)
269     {
270       casenumber n_cases = 0;
271       struct ccase *c;
272
273       struct casereader *clone = casereader_clone (reader);
274
275       for (; (c = casereader_read (clone)) != NULL; case_unref (c))
276         n_cases++;
277
278       casereader_destroy (clone);
279       reader->case_cnt = n_cases;
280     }
281
282   return reader->case_cnt;
283 }
284
285 /* Returns the number of struct values in each case in READER. */
286 size_t
287 casereader_get_value_cnt (struct casereader *reader)
288 {
289   return reader->value_cnt;
290 }
291
292 /* Copies all the cases in READER to WRITER, propagating errors
293    appropriately. */
294 void
295 casereader_transfer (struct casereader *reader, struct casewriter *writer)
296 {
297   struct ccase *c;
298
299   taint_propagate (casereader_get_taint (reader),
300                    casewriter_get_taint (writer));
301   while ((c = casereader_read (reader)) != NULL)
302     casewriter_write (writer, c);
303   casereader_destroy (reader);
304 }
305
306 /* Creates and returns a new casereader.  This function is
307    intended for use by casereader implementations, not by
308    casereader clients.
309
310    This function is most suited for creating a casereader for a
311    data source that is naturally sequential.
312    casereader_create_random may be more appropriate for a data
313    source that supports random access.
314
315    Ordinarily, specify a null pointer for TAINT, in which case
316    the new casereader will have a new, unique taint object.  If
317    the new casereader should have a clone of an existing taint
318    object, specify that object as TAINT.  (This is most commonly
319    useful in an implementation of the "clone" casereader_class
320    function, in which case the cloned casereader should have the
321    same taint object as the original casereader.)
322
323    VALUE_CNT must be the number of struct values per case read
324    from the casereader.
325
326    CASE_CNT is an upper limit on the number of cases that
327    casereader_read will return from the casereader in successive
328    calls.  Ordinarily, this is the actual number of cases in the
329    data source or CASENUMBER_MAX if the number of cases cannot be
330    predicted in advance.
331
332    CLASS and AUX are a set of casereader implementation-specific
333    member functions and auxiliary data to pass to those member
334    functions, respectively. */
335 struct casereader *
336 casereader_create_sequential (const struct taint *taint,
337                               size_t value_cnt, casenumber case_cnt,
338                               const struct casereader_class *class, void *aux)
339 {
340   struct casereader *reader = xmalloc (sizeof *reader);
341   reader->taint = taint != NULL ? taint_clone (taint) : taint_create ();
342   reader->value_cnt = value_cnt;
343   reader->case_cnt = case_cnt;
344   reader->class = class;
345   reader->aux = aux;
346   return reader;
347 }
348
349 /* If READER is a casereader of the given CLASS, returns its
350    associated auxiliary data; otherwise, returns a null pointer.
351
352    This function is intended for use from casereader
353    implementations, not by casereader users.  Even within
354    casereader implementations, its usefulness is quite limited,
355    for at least two reasons.  First, every casereader member
356    function already receives a pointer to the casereader's
357    auxiliary data.  Second, a casereader's class can change
358    (through a call to casereader_swap) and this is in practice
359    quite common (e.g. any call to casereader_clone on a
360    casereader that does not directly support clone will cause the
361    casereader to be replaced by a shim caseader). */
362 void *
363 casereader_dynamic_cast (struct casereader *reader,
364                          const struct casereader_class *class)
365 {
366   return reader->class == class ? reader->aux : NULL;
367 }
368 \f
369 /* Random-access casereader implementation.
370
371    This is a set of wrappers around casereader_create_sequential
372    and struct casereader_class to make it easy to create
373    efficient casereaders for data sources that natively support
374    random access. */
375
376 /* One clone of a random reader. */
377 struct random_reader
378   {
379     struct random_reader_shared *shared; /* Data shared among clones. */
380     struct heap_node heap_node; /* Node in shared data's heap of readers. */
381     casenumber offset;          /* Number of cases already read. */
382   };
383
384 /* Returns the random_reader in which the given heap_node is
385    embedded. */
386 static struct random_reader *
387 random_reader_from_heap_node (const struct heap_node *node)
388 {
389   return heap_data (node, struct random_reader, heap_node);
390 }
391
392 /* Data shared among clones of a random reader. */
393 struct random_reader_shared
394   {
395     struct heap *readers;       /* Heap of struct random_readers. */
396     casenumber min_offset;      /* Smallest offset of any random_reader. */
397     const struct casereader_random_class *class;
398     void *aux;
399   };
400
401 static const struct casereader_class random_reader_casereader_class;
402
403 /* Creates and returns a new random_reader with the given SHARED
404    data and OFFSET.  Inserts the new random reader into the
405    shared heap. */
406 static struct random_reader *
407 make_random_reader (struct random_reader_shared *shared, casenumber offset)
408 {
409   struct random_reader *br = xmalloc (sizeof *br);
410   br->offset = offset;
411   br->shared = shared;
412   heap_insert (shared->readers, &br->heap_node);
413   return br;
414 }
415
416 /* Compares random_readers A and B by offset and returns a
417    strcmp()-like result. */
418 static int
419 compare_random_readers_by_offset (const struct heap_node *a_,
420                                   const struct heap_node *b_,
421                                   const void *aux UNUSED)
422 {
423   const struct random_reader *a = random_reader_from_heap_node (a_);
424   const struct random_reader *b = random_reader_from_heap_node (b_);
425   return a->offset < b->offset ? -1 : a->offset > b->offset;
426 }
427
428 /* Creates and returns a new casereader.  This function is
429    intended for use by casereader implementations, not by
430    casereader clients.
431
432    This function is most suited for creating a casereader for a
433    data source that supports random access.
434    casereader_create_sequential is more appropriate for a data
435    source that is naturally sequential.
436
437    VALUE_CNT must be the number of struct values per case read
438    from the casereader.
439
440    CASE_CNT is an upper limit on the number of cases that
441    casereader_read will return from the casereader in successive
442    calls.  Ordinarily, this is the actual number of cases in the
443    data source or CASENUMBER_MAX if the number of cases cannot be
444    predicted in advance.
445
446    CLASS and AUX are a set of casereader implementation-specific
447    member functions and auxiliary data to pass to those member
448    functions, respectively. */
449 struct casereader *
450 casereader_create_random (size_t value_cnt, casenumber case_cnt,
451                           const struct casereader_random_class *class,
452                           void *aux)
453 {
454   struct random_reader_shared *shared = xmalloc (sizeof *shared);
455   shared->readers = heap_create (compare_random_readers_by_offset, NULL);
456   shared->class = class;
457   shared->aux = aux;
458   shared->min_offset = 0;
459   return casereader_create_sequential (NULL, value_cnt, case_cnt,
460                                        &random_reader_casereader_class,
461                                        make_random_reader (shared, 0));
462 }
463
464 /* Reassesses the min_offset in SHARED based on the minimum
465    offset in the heap.   */
466 static void
467 advance_random_reader (struct casereader *reader,
468                        struct random_reader_shared *shared)
469 {
470   casenumber old, new;
471
472   old = shared->min_offset;
473   new = random_reader_from_heap_node (heap_minimum (shared->readers))->offset;
474   assert (new >= old);
475   if (new > old)
476     {
477       shared->min_offset = new;
478       shared->class->advance (reader, shared->aux, new - old);
479     }
480 }
481
482 /* struct casereader_class "read" function for random reader. */
483 static struct ccase *
484 random_reader_read (struct casereader *reader, void *br_)
485 {
486   struct random_reader *br = br_;
487   struct random_reader_shared *shared = br->shared;
488   struct ccase *c = shared->class->read (reader, shared->aux,
489                                          br->offset - shared->min_offset);
490   if (c != NULL)
491     {
492       br->offset++;
493       heap_changed (shared->readers, &br->heap_node);
494       advance_random_reader (reader, shared);
495     }
496   return c;
497 }
498
499 /* struct casereader_class "destroy" function for random
500    reader. */
501 static void
502 random_reader_destroy (struct casereader *reader, void *br_)
503 {
504   struct random_reader *br = br_;
505   struct random_reader_shared *shared = br->shared;
506
507   heap_delete (shared->readers, &br->heap_node);
508   if (heap_is_empty (shared->readers))
509     {
510       heap_destroy (shared->readers);
511       shared->class->destroy (reader, shared->aux);
512       free (shared);
513     }
514   else
515     advance_random_reader (reader, shared);
516
517   free (br);
518 }
519
520 /* struct casereader_class "clone" function for random reader. */
521 static struct casereader *
522 random_reader_clone (struct casereader *reader, void *br_)
523 {
524   struct random_reader *br = br_;
525   struct random_reader_shared *shared = br->shared;
526   return casereader_create_sequential (casereader_get_taint (reader),
527                                        casereader_get_value_cnt (reader),
528                                        casereader_get_case_cnt (reader),
529                                        &random_reader_casereader_class,
530                                        make_random_reader (shared,
531                                                            br->offset));
532 }
533
534 /* struct casereader_class "peek" function for random reader. */
535 static struct ccase *
536 random_reader_peek (struct casereader *reader, void *br_, casenumber idx)
537 {
538   struct random_reader *br = br_;
539   struct random_reader_shared *shared = br->shared;
540
541   return shared->class->read (reader, shared->aux,
542                               br->offset - shared->min_offset + idx);
543 }
544
545 /* Casereader class for random reader. */
546 static const struct casereader_class random_reader_casereader_class =
547   {
548     random_reader_read,
549     random_reader_destroy,
550     random_reader_clone,
551     random_reader_peek,
552   };
553 \f
554 /* Buffering shim for implementing clone and peek operations.
555
556    The "clone" and "peek" operations aren't implemented by all
557    types of casereaders, but we have to expose a uniform
558    interface anyhow.  We do this by interposing a buffering
559    casereader on top of the existing casereader on the first call
560    to "clone" or "peek".  The buffering casereader maintains a
561    window of cases that spans the positions of the original
562    casereader and all of its clones (the "clone set"), from the
563    position of the casereader that has read the fewest cases to
564    the position of the casereader that has read the most.
565
566    Thus, if all of the casereaders in the clone set are at
567    approximately the same position, only a few cases are buffered
568    and there is little inefficiency.  If, on the other hand, one
569    casereader is not used to read any cases at all, but another
570    one is used to read all of the cases, the entire contents of
571    the casereader is copied into the buffer.  This still might
572    not be so inefficient, given that case data in memory is
573    shared across multiple identical copies, but in the worst case
574    the window implementation will write cases to disk instead of
575    maintaining them in-memory. */
576
577 /* A buffering shim for a non-clonable or non-peekable
578    casereader. */
579 struct shim
580   {
581     struct casewindow *window;          /* Window of buffered cases. */
582     struct casereader *subreader;       /* Subordinate casereader. */
583   };
584
585 static const struct casereader_random_class shim_class;
586
587 /* Interposes a buffering shim atop READER. */
588 static void
589 insert_shim (struct casereader *reader)
590 {
591   size_t value_cnt = casereader_get_value_cnt (reader);
592   casenumber case_cnt = casereader_get_case_cnt (reader);
593   struct shim *b = xmalloc (sizeof *b);
594   b->window = casewindow_create (value_cnt, settings_get_workspace_cases (value_cnt));
595   b->subreader = casereader_create_random (value_cnt, case_cnt,
596                                            &shim_class, b);
597   casereader_swap (reader, b->subreader);
598   taint_propagate (casewindow_get_taint (b->window),
599                    casereader_get_taint (reader));
600   taint_propagate (casereader_get_taint (b->subreader),
601                    casereader_get_taint (reader));
602 }
603
604 /* Ensures that B's window contains at least CASE_CNT cases.
605    Return true if successful, false upon reaching the end of B's
606    subreader or an I/O error. */
607 static bool
608 prime_buffer (struct shim *b, casenumber case_cnt)
609 {
610   while (casewindow_get_case_cnt (b->window) < case_cnt)
611     {
612       struct ccase *tmp = casereader_read (b->subreader);
613       if (tmp == NULL)
614         return false;
615       casewindow_push_head (b->window, tmp);
616     }
617   return true;
618 }
619
620 /* Reads the case at the given 0-based OFFSET from the front of
621    the window into C.  Returns the case if successful, or a null
622    pointer if OFFSET is beyond the end of file or upon I/O error.
623    The caller must call case_unref() on the returned case when it
624    is no longer needed. */
625 static struct ccase *
626 shim_read (struct casereader *reader UNUSED, void *b_,
627            casenumber offset)
628 {
629   struct shim *b = b_;
630   if (!prime_buffer (b, offset + 1))
631     return NULL;
632   return casewindow_get_case (b->window, offset);
633 }
634
635 /* Destroys B. */
636 static void
637 shim_destroy (struct casereader *reader UNUSED, void *b_)
638 {
639   struct shim *b = b_;
640   casewindow_destroy (b->window);
641   casereader_destroy (b->subreader);
642   free (b);
643 }
644
645 /* Discards CNT cases from the front of B's window. */
646 static void
647 shim_advance (struct casereader *reader UNUSED, void *b_, casenumber case_cnt)
648 {
649   struct shim *b = b_;
650   casewindow_pop_tail (b->window, case_cnt);
651 }
652
653 /* Class for the buffered reader. */
654 static const struct casereader_random_class shim_class =
655   {
656     shim_read,
657     shim_destroy,
658     shim_advance,
659   };