3c062d9068b6355ace4f8e3caf005b89cd7c25ad
[pspp-builds.git] / src / data / casereader.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 2007 Free Software Foundation, Inc.
3
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License as
6    published by the Free Software Foundation; either version 2 of the
7    License, or (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful, but
10    WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12    General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
17    02110-1301, USA. */
18
19 #include <config.h>
20
21 #include <data/casereader.h>
22 #include <data/casereader-provider.h>
23
24 #include <stdlib.h>
25
26 #include <data/casewindow.h>
27 #include <data/casewriter.h>
28 #include <data/settings.h>
29 #include <libpspp/assertion.h>
30 #include <libpspp/heap.h>
31 #include <libpspp/taint.h>
32
33 #include "xalloc.h"
34
35 /* A casereader. */
36 struct casereader
37   {
38     struct taint *taint;                  /* Corrupted? */
39     size_t value_cnt;                     /* Values per case. */
40     casenumber case_cnt;                  /* Number of cases,
41                                              CASENUMBER_MAX if unknown. */
42     const struct casereader_class *class; /* Class. */
43     void *aux;                            /* Auxiliary data for class. */
44   };
45
46 static void insert_shim (struct casereader *);
47
48 /* Creates a new case in C and reads the next case from READER
49    into it.  The caller owns C and must destroy C when its data
50    is no longer needed.  Return true if successful, false when
51    cases have been exhausted or upon detection of an I/O error.
52    In the latter case, C is set to the null case.
53
54    The case returned is effectively consumed: it can never be
55    read again through READER.  If this is inconvenient, READER
56    may be cloned in advance with casereader_clone, or
57    casereader_peek may be used instead. */
58 bool
59 casereader_read (struct casereader *reader, struct ccase *c)
60 {
61   if (reader->case_cnt != 0 && reader->class->read (reader, reader->aux, c))
62     {
63       assert (case_get_value_cnt (c) == reader->value_cnt);
64       if (reader->case_cnt != CASENUMBER_MAX)
65         reader->case_cnt--;
66       return true;
67     }
68   else
69     {
70       reader->case_cnt = 0;
71       case_nullify (c);
72       return false;
73     }
74 }
75
76 /* Destroys READER.
77    Returns false if an I/O error was detected on READER, true
78    otherwise. */
79 bool
80 casereader_destroy (struct casereader *reader)
81 {
82   bool ok = true;
83   if (reader != NULL)
84     {
85       reader->class->destroy (reader, reader->aux);
86       ok = taint_destroy (reader->taint);
87       free (reader);
88     }
89   return ok;
90 }
91
92 /* Returns a clone of READER.  READER and its clone may be used
93    to read the same sequence of cases in the same order, barring
94    I/O errors. */
95 struct casereader *
96 casereader_clone (const struct casereader *reader_)
97 {
98   struct casereader *reader = (struct casereader *) reader_;
99   struct casereader *clone;
100   if ( reader == NULL ) 
101     return NULL;
102
103   if (reader->class->clone == NULL)
104     insert_shim (reader);
105   clone = reader->class->clone (reader, reader->aux);
106   assert (clone != NULL);
107   assert (clone != reader);
108   return clone;
109 }
110
111 /* Makes a copy of ORIGINAL into *NEW1 (if NEW1 is non-null) and
112    *NEW2 (if NEW2 is non-null), then destroys ORIGINAL. */
113 void
114 casereader_split (struct casereader *original,
115                   struct casereader **new1, struct casereader **new2)
116 {
117   if (new1 != NULL && new2 != NULL)
118     {
119       *new1 = casereader_rename (original);
120       *new2 = casereader_clone (*new1);
121     }
122   else if (new1 != NULL)
123     *new1 = casereader_rename (original);
124   else if (new2 != NULL)
125     *new2 = casereader_rename (original);
126   else
127     casereader_destroy (original);
128 }
129
130 /* Returns a copy of READER, which is itself destroyed.
131    Useful for taking over ownership of a casereader, to enforce
132    preventing the original owner from accessing the casereader
133    again. */
134 struct casereader *
135 casereader_rename (struct casereader *reader)
136 {
137   struct casereader *new = xmemdup (reader, sizeof *reader);
138   free (reader);
139   return new;
140 }
141
142 /* Exchanges the casereaders referred to by A and B. */
143 void
144 casereader_swap (struct casereader *a, struct casereader *b)
145 {
146   if (a != b)
147     {
148       struct casereader tmp = *a;
149       *a = *b;
150       *b = tmp;
151     }
152 }
153
154 /* Creates a new case in C and reads the (IDX + 1)'th case from
155    READER into it.  The caller owns C and must destroy C when its
156    data is no longer needed.  Return true if successful, false
157    when cases have been exhausted or upon detection of an I/O
158    error.  In the latter case, C is set to the null case. */
159 bool
160 casereader_peek (struct casereader *reader, casenumber idx, struct ccase *c)
161 {
162   if (idx < reader->case_cnt)
163     {
164       if (reader->class->peek == NULL)
165         insert_shim (reader);
166       if (reader->class->peek (reader, reader->aux, idx, c))
167         return true;
168       else if (casereader_error (reader))
169         reader->case_cnt = 0;
170     }
171   if (reader->case_cnt > idx)
172     reader->case_cnt = idx;
173   case_nullify (c);
174   return false;
175 }
176
177 /* Returns true if an I/O error or another hard error has
178    occurred on READER, a clone of READER, or on some object on
179    which READER's data has a dependency, false otherwise. */
180 bool
181 casereader_error (const struct casereader *reader)
182 {
183   return taint_is_tainted (reader->taint);
184 }
185
186 /* Marks READER as having encountered an error.
187
188    Ordinarily, this function should be called by the
189    implementation of a casereader, not by the casereader's
190    client.  Instead, casereader clients should usually ensure
191    that a casereader's error state is correct by using
192    taint_propagate to propagate to the casereader's taint
193    structure, which may be obtained via casereader_get_taint. */
194 void
195 casereader_force_error (struct casereader *reader)
196 {
197   taint_set_taint (reader->taint);
198 }
199
200 /* Returns READER's associate taint object, for use with
201    taint_propagate and other taint functions. */
202 const struct taint *
203 casereader_get_taint (const struct casereader *reader)
204 {
205   return reader->taint;
206 }
207
208 /* Returns the number of cases that will be read by successive
209    calls to casereader_read for READER, assuming that no errors
210    occur.  Upon an error condition, the case count drops to 0, so
211    that no more cases can be obtained.
212
213    Not all casereaders can predict the number of cases that they
214    will produce without actually reading all of them.  In that
215    case, this function returns CASENUMBER_MAX.  To obtain the
216    actual number of cases in such a casereader, use
217    casereader_count_cases. */
218 casenumber
219 casereader_get_case_cnt (struct casereader *reader)
220 {
221   return reader->case_cnt;
222 }
223
224 /* Returns the number of cases that will be read by successive
225    calls to casereader_read for READER, assuming that no errors
226    occur.  Upon an error condition, the case count drops to 0, so
227    that no more cases can be obtained.
228
229    For a casereader that cannot predict the number of cases it
230    will produce, this function actually reads (and discards) all
231    of the contents of a clone of READER.  Thus, the return value
232    is always correct in the absence of I/O errors. */
233 casenumber
234 casereader_count_cases (struct casereader *reader)
235 {
236   if (reader->case_cnt == CASENUMBER_MAX)
237     {
238       casenumber n_cases = 0;
239       struct ccase c;
240
241       struct casereader *clone = casereader_clone (reader);
242
243       for (; casereader_read (clone, &c); case_destroy (&c))
244         n_cases++;
245
246       casereader_destroy (clone);
247       reader->case_cnt = n_cases;
248     }
249
250   return reader->case_cnt;
251 }
252
253 /* Returns the number of struct values in each case in READER. */
254 size_t
255 casereader_get_value_cnt (struct casereader *reader)
256 {
257   return reader->value_cnt;
258 }
259
260 /* Copies all the cases in READER to WRITER, propagating errors
261    appropriately. */
262 void
263 casereader_transfer (struct casereader *reader, struct casewriter *writer)
264 {
265   struct ccase c;
266
267   taint_propagate (casereader_get_taint (reader),
268                    casewriter_get_taint (writer));
269   while (casereader_read (reader, &c))
270     casewriter_write (writer, &c);
271   casereader_destroy (reader);
272 }
273
274 /* Creates and returns a new casereader.  This function is
275    intended for use by casereader implementations, not by
276    casereader clients.
277
278    This function is most suited for creating a casereader for a
279    data source that is naturally sequential.
280    casereader_create_random may be more appropriate for a data
281    source that supports random access.
282
283    Ordinarily, specify a null pointer for TAINT, in which case
284    the new casereader will have a new, unique taint object.  If
285    the new casereader should have a clone of an existing taint
286    object, specify that object as TAINT.  (This is most commonly
287    useful in an implementation of the "clone" casereader_class
288    function, in which case the cloned casereader should have the
289    same taint object as the original casereader.)
290
291    VALUE_CNT must be the number of struct values per case read
292    from the casereader.
293
294    CASE_CNT is an upper limit on the number of cases that
295    casereader_read will return from the casereader in successive
296    calls.  Ordinarily, this is the actual number of cases in the
297    data source or CASENUMBER_MAX if the number of cases cannot be
298    predicted in advance.
299
300    CLASS and AUX are a set of casereader implementation-specific
301    member functions and auxiliary data to pass to those member
302    functions, respectively. */
303 struct casereader *
304 casereader_create_sequential (const struct taint *taint,
305                               size_t value_cnt, casenumber case_cnt,
306                               const struct casereader_class *class, void *aux)
307 {
308   struct casereader *reader = xmalloc (sizeof *reader);
309   reader->taint = taint != NULL ? taint_clone (taint) : taint_create ();
310   reader->value_cnt = value_cnt;
311   reader->case_cnt = case_cnt;
312   reader->class = class;
313   reader->aux = aux;
314   return reader;
315 }
316 \f
317 /* Random-access casereader implementation.
318
319    This is a set of wrappers around casereader_create_sequential
320    and struct casereader_class to make it easy to create
321    efficient casereaders for data sources that natively support
322    random access. */
323
324 /* One clone of a random reader. */
325 struct random_reader
326   {
327     struct random_reader_shared *shared; /* Data shared among clones. */
328     struct heap_node heap_node; /* Node in shared data's heap of readers. */
329     casenumber offset;          /* Number of cases already read. */
330   };
331
332 /* Returns the random_reader in which the given heap_node is
333    embedded. */
334 static struct random_reader *
335 random_reader_from_heap_node (const struct heap_node *node)
336 {
337   return heap_data (node, struct random_reader, heap_node);
338 }
339
340 /* Data shared among clones of a random reader. */
341 struct random_reader_shared
342   {
343     struct heap *readers;       /* Heap of struct random_readers. */
344     casenumber min_offset;      /* Smallest offset of any random_reader. */
345     const struct casereader_random_class *class;
346     void *aux;
347   };
348
349 static struct casereader_class random_reader_casereader_class;
350
351 /* Creates and returns a new random_reader with the given SHARED
352    data and OFFSET.  Inserts the new random reader into the
353    shared heap. */
354 static struct random_reader *
355 make_random_reader (struct random_reader_shared *shared, casenumber offset)
356 {
357   struct random_reader *br = xmalloc (sizeof *br);
358   br->offset = offset;
359   br->shared = shared;
360   heap_insert (shared->readers, &br->heap_node);
361   return br;
362 }
363
364 /* Compares random_readers A and B by offset and returns a
365    strcmp()-like result. */
366 static int
367 compare_random_readers_by_offset (const struct heap_node *a_,
368                                   const struct heap_node *b_,
369                                   const void *aux UNUSED)
370 {
371   const struct random_reader *a = random_reader_from_heap_node (a_);
372   const struct random_reader *b = random_reader_from_heap_node (b_);
373   return a->offset < b->offset ? -1 : a->offset > b->offset;
374 }
375
376 /* Creates and returns a new casereader.  This function is
377    intended for use by casereader implementations, not by
378    casereader clients.
379
380    This function is most suited for creating a casereader for a
381    data source that supports random access.
382    casereader_create_sequential is more appropriate for a data
383    source that is naturally sequential.
384
385    VALUE_CNT must be the number of struct values per case read
386    from the casereader.
387
388    CASE_CNT is an upper limit on the number of cases that
389    casereader_read will return from the casereader in successive
390    calls.  Ordinarily, this is the actual number of cases in the
391    data source or CASENUMBER_MAX if the number of cases cannot be
392    predicted in advance.
393
394    CLASS and AUX are a set of casereader implementation-specific
395    member functions and auxiliary data to pass to those member
396    functions, respectively. */
397 struct casereader *
398 casereader_create_random (size_t value_cnt, casenumber case_cnt,
399                           const struct casereader_random_class *class,
400                           void *aux)
401 {
402   struct random_reader_shared *shared = xmalloc (sizeof *shared);
403   shared->readers = heap_create (compare_random_readers_by_offset, NULL);
404   shared->class = class;
405   shared->aux = aux;
406   shared->min_offset = 0;
407   return casereader_create_sequential (NULL, value_cnt, case_cnt,
408                                        &random_reader_casereader_class,
409                                        make_random_reader (shared, 0));
410 }
411
412 /* Reassesses the min_offset in SHARED based on the minimum
413    offset in the heap.   */
414 static void
415 advance_random_reader (struct casereader *reader,
416                        struct random_reader_shared *shared)
417 {
418   casenumber old, new;
419
420   old = shared->min_offset;
421   new = random_reader_from_heap_node (heap_minimum (shared->readers))->offset;
422   assert (new >= old);
423   if (new > old)
424     {
425       shared->min_offset = new;
426       shared->class->advance (reader, shared->aux, new - old);
427     }
428 }
429
430 /* struct casereader_class "read" function for random reader. */
431 static bool
432 random_reader_read (struct casereader *reader, void *br_, struct ccase *c)
433 {
434   struct random_reader *br = br_;
435   struct random_reader_shared *shared = br->shared;
436
437   if (shared->class->read (reader, shared->aux,
438                            br->offset - shared->min_offset, c))
439     {
440       br->offset++;
441       heap_changed (shared->readers, &br->heap_node);
442       advance_random_reader (reader, shared);
443       return true;
444     }
445   else
446     return false;
447 }
448
449 /* struct casereader_class "destroy" function for random
450    reader. */
451 static void
452 random_reader_destroy (struct casereader *reader, void *br_)
453 {
454   struct random_reader *br = br_;
455   struct random_reader_shared *shared = br->shared;
456
457   heap_delete (shared->readers, &br->heap_node);
458   if (heap_is_empty (shared->readers))
459     {
460       heap_destroy (shared->readers);
461       shared->class->destroy (reader, shared->aux);
462       free (shared);
463     }
464   else
465     advance_random_reader (reader, shared);
466
467   free (br);
468 }
469
470 /* struct casereader_class "clone" function for random reader. */
471 static struct casereader *
472 random_reader_clone (struct casereader *reader, void *br_)
473 {
474   struct random_reader *br = br_;
475   struct random_reader_shared *shared = br->shared;
476   return casereader_create_sequential (casereader_get_taint (reader),
477                                        casereader_get_value_cnt (reader),
478                                        casereader_get_case_cnt (reader),
479                                        &random_reader_casereader_class,
480                                        make_random_reader (shared,
481                                                            br->offset));
482 }
483
484 /* struct casereader_class "peek" function for random reader. */
485 static bool
486 random_reader_peek (struct casereader *reader, void *br_,
487                     casenumber idx, struct ccase *c)
488 {
489   struct random_reader *br = br_;
490   struct random_reader_shared *shared = br->shared;
491
492   return shared->class->read (reader, shared->aux,
493                               br->offset - shared->min_offset + idx, c);
494 }
495
496 /* Casereader class for random reader. */
497 static struct casereader_class random_reader_casereader_class =
498   {
499     random_reader_read,
500     random_reader_destroy,
501     random_reader_clone,
502     random_reader_peek,
503   };
504 \f
505 /* Buffering shim for implementing clone and peek operations.
506
507    The "clone" and "peek" operations aren't implemented by all
508    types of casereaders, but we have to expose a uniform
509    interface anyhow.  We do this by interposing a buffering
510    casereader on top of the existing casereader on the first call
511    to "clone" or "peek".  The buffering casereader maintains a
512    window of cases that spans the positions of the original
513    casereader and all of its clones (the "clone set"), from the
514    position of the casereader that has read the fewest cases to
515    the position of the casereader that has read the most.
516
517    Thus, if all of the casereaders in the clone set are at
518    approximately the same position, only a few cases are buffered
519    and there is little inefficiency.  If, on the other hand, one
520    casereader is not used to read any cases at all, but another
521    one is used to read all of the cases, the entire contents of
522    the casereader is copied into the buffer.  This still might
523    not be so inefficient, given that case data in memory is
524    shared across multiple identical copies, but in the worst case
525    the window implementation will write cases to disk instead of
526    maintaining them in-memory. */
527
528 /* A buffering shim for a non-clonable or non-peekable
529    casereader. */
530 struct shim
531   {
532     struct casewindow *window;          /* Window of buffered cases. */
533     struct casereader *subreader;       /* Subordinate casereader. */
534   };
535
536 static struct casereader_random_class shim_class;
537
538 /* Interposes a buffering shim atop READER. */
539 static void
540 insert_shim (struct casereader *reader)
541 {
542   size_t value_cnt = casereader_get_value_cnt (reader);
543   casenumber case_cnt = casereader_get_case_cnt (reader);
544   struct shim *b = xmalloc (sizeof *b);
545   b->window = casewindow_create (value_cnt, get_workspace_cases (value_cnt));
546   b->subreader = casereader_create_random (value_cnt, case_cnt,
547                                            &shim_class, b);
548   casereader_swap (reader, b->subreader);
549   taint_propagate (casewindow_get_taint (b->window),
550                    casereader_get_taint (reader));
551   taint_propagate (casereader_get_taint (b->subreader),
552                    casereader_get_taint (reader));
553 }
554
555 /* Ensures that B's window contains at least CASE_CNT cases.
556    Return true if successful, false upon reaching the end of B's
557    subreader or an I/O error. */
558 static bool
559 prime_buffer (struct shim *b, casenumber case_cnt)
560 {
561   while (casewindow_get_case_cnt (b->window) < case_cnt)
562     {
563       struct ccase tmp;
564       if (!casereader_read (b->subreader, &tmp))
565         return false;
566       casewindow_push_head (b->window, &tmp);
567     }
568   return true;
569 }
570
571 /* Reads the case at the given 0-based OFFSET from the front of
572    the window into C.  Returns true if successful, false if
573    OFFSET is beyond the end of file or upon I/O error. */
574 static bool
575 shim_read (struct casereader *reader UNUSED, void *b_,
576            casenumber offset, struct ccase *c)
577 {
578   struct shim *b = b_;
579   return (prime_buffer (b, offset + 1)
580           && casewindow_get_case (b->window, offset, c));
581 }
582
583 /* Destroys B. */
584 static void
585 shim_destroy (struct casereader *reader UNUSED, void *b_)
586 {
587   struct shim *b = b_;
588   casewindow_destroy (b->window);
589   casereader_destroy (b->subreader);
590   free (b);
591 }
592
593 /* Discards CNT cases from the front of B's window. */
594 static void
595 shim_advance (struct casereader *reader UNUSED, void *b_, casenumber case_cnt)
596 {
597   struct shim *b = b_;
598   casewindow_pop_tail (b->window, case_cnt);
599 }
600
601 /* Class for the buffered reader. */
602 static struct casereader_random_class shim_class =
603   {
604     shim_read,
605     shim_destroy,
606     shim_advance,
607   };