Change license from GPLv2+ to GPLv3+.
[pspp-builds.git] / src / data / casereader.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2007 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <data/casereader.h>
20 #include <data/casereader-provider.h>
21
22 #include <stdlib.h>
23
24 #include <data/casewindow.h>
25 #include <data/casewriter.h>
26 #include <data/settings.h>
27 #include <libpspp/assertion.h>
28 #include <libpspp/heap.h>
29 #include <libpspp/taint.h>
30
31 #include "xalloc.h"
32
33 /* A casereader. */
34 struct casereader
35   {
36     struct taint *taint;                  /* Corrupted? */
37     size_t value_cnt;                     /* Values per case. */
38     casenumber case_cnt;                  /* Number of cases,
39                                              CASENUMBER_MAX if unknown. */
40     const struct casereader_class *class; /* Class. */
41     void *aux;                            /* Auxiliary data for class. */
42   };
43
44 static void insert_shim (struct casereader *);
45
46 /* Creates a new case in C and reads the next case from READER
47    into it.  The caller owns C and must destroy C when its data
48    is no longer needed.  Return true if successful, false when
49    cases have been exhausted or upon detection of an I/O error.
50    In the latter case, C is set to the null case.
51
52    The case returned is effectively consumed: it can never be
53    read again through READER.  If this is inconvenient, READER
54    may be cloned in advance with casereader_clone, or
55    casereader_peek may be used instead. */
56 bool
57 casereader_read (struct casereader *reader, struct ccase *c)
58 {
59   if (reader->case_cnt != 0 && reader->class->read (reader, reader->aux, c))
60     {
61       assert (case_get_value_cnt (c) == reader->value_cnt);
62       if (reader->case_cnt != CASENUMBER_MAX)
63         reader->case_cnt--;
64       return true;
65     }
66   else
67     {
68       reader->case_cnt = 0;
69       case_nullify (c);
70       return false;
71     }
72 }
73
74 /* Destroys READER.
75    Returns false if an I/O error was detected on READER, true
76    otherwise. */
77 bool
78 casereader_destroy (struct casereader *reader)
79 {
80   bool ok = true;
81   if (reader != NULL)
82     {
83       reader->class->destroy (reader, reader->aux);
84       ok = taint_destroy (reader->taint);
85       free (reader);
86     }
87   return ok;
88 }
89
90 /* Returns a clone of READER.  READER and its clone may be used
91    to read the same sequence of cases in the same order, barring
92    I/O errors. */
93 struct casereader *
94 casereader_clone (const struct casereader *reader_)
95 {
96   struct casereader *reader = (struct casereader *) reader_;
97   struct casereader *clone;
98   if ( reader == NULL ) 
99     return NULL;
100
101   if (reader->class->clone == NULL)
102     insert_shim (reader);
103   clone = reader->class->clone (reader, reader->aux);
104   assert (clone != NULL);
105   assert (clone != reader);
106   return clone;
107 }
108
109 /* Makes a copy of ORIGINAL into *NEW1 (if NEW1 is non-null) and
110    *NEW2 (if NEW2 is non-null), then destroys ORIGINAL. */
111 void
112 casereader_split (struct casereader *original,
113                   struct casereader **new1, struct casereader **new2)
114 {
115   if (new1 != NULL && new2 != NULL)
116     {
117       *new1 = casereader_rename (original);
118       *new2 = casereader_clone (*new1);
119     }
120   else if (new1 != NULL)
121     *new1 = casereader_rename (original);
122   else if (new2 != NULL)
123     *new2 = casereader_rename (original);
124   else
125     casereader_destroy (original);
126 }
127
128 /* Returns a copy of READER, which is itself destroyed.
129    Useful for taking over ownership of a casereader, to enforce
130    preventing the original owner from accessing the casereader
131    again. */
132 struct casereader *
133 casereader_rename (struct casereader *reader)
134 {
135   struct casereader *new = xmemdup (reader, sizeof *reader);
136   free (reader);
137   return new;
138 }
139
140 /* Exchanges the casereaders referred to by A and B. */
141 void
142 casereader_swap (struct casereader *a, struct casereader *b)
143 {
144   if (a != b)
145     {
146       struct casereader tmp = *a;
147       *a = *b;
148       *b = tmp;
149     }
150 }
151
152 /* Creates a new case in C and reads the (IDX + 1)'th case from
153    READER into it.  The caller owns C and must destroy C when its
154    data is no longer needed.  Return true if successful, false
155    when cases have been exhausted or upon detection of an I/O
156    error.  In the latter case, C is set to the null case. */
157 bool
158 casereader_peek (struct casereader *reader, casenumber idx, struct ccase *c)
159 {
160   if (idx < reader->case_cnt)
161     {
162       if (reader->class->peek == NULL)
163         insert_shim (reader);
164       if (reader->class->peek (reader, reader->aux, idx, c))
165         return true;
166       else if (casereader_error (reader))
167         reader->case_cnt = 0;
168     }
169   if (reader->case_cnt > idx)
170     reader->case_cnt = idx;
171   case_nullify (c);
172   return false;
173 }
174
175 /* Returns true if an I/O error or another hard error has
176    occurred on READER, a clone of READER, or on some object on
177    which READER's data has a dependency, false otherwise. */
178 bool
179 casereader_error (const struct casereader *reader)
180 {
181   return taint_is_tainted (reader->taint);
182 }
183
184 /* Marks READER as having encountered an error.
185
186    Ordinarily, this function should be called by the
187    implementation of a casereader, not by the casereader's
188    client.  Instead, casereader clients should usually ensure
189    that a casereader's error state is correct by using
190    taint_propagate to propagate to the casereader's taint
191    structure, which may be obtained via casereader_get_taint. */
192 void
193 casereader_force_error (struct casereader *reader)
194 {
195   taint_set_taint (reader->taint);
196 }
197
198 /* Returns READER's associate taint object, for use with
199    taint_propagate and other taint functions. */
200 const struct taint *
201 casereader_get_taint (const struct casereader *reader)
202 {
203   return reader->taint;
204 }
205
206 /* Returns the number of cases that will be read by successive
207    calls to casereader_read for READER, assuming that no errors
208    occur.  Upon an error condition, the case count drops to 0, so
209    that no more cases can be obtained.
210
211    Not all casereaders can predict the number of cases that they
212    will produce without actually reading all of them.  In that
213    case, this function returns CASENUMBER_MAX.  To obtain the
214    actual number of cases in such a casereader, use
215    casereader_count_cases. */
216 casenumber
217 casereader_get_case_cnt (struct casereader *reader)
218 {
219   return reader->case_cnt;
220 }
221
222 /* Returns the number of cases that will be read by successive
223    calls to casereader_read for READER, assuming that no errors
224    occur.  Upon an error condition, the case count drops to 0, so
225    that no more cases can be obtained.
226
227    For a casereader that cannot predict the number of cases it
228    will produce, this function actually reads (and discards) all
229    of the contents of a clone of READER.  Thus, the return value
230    is always correct in the absence of I/O errors. */
231 casenumber
232 casereader_count_cases (struct casereader *reader)
233 {
234   if (reader->case_cnt == CASENUMBER_MAX)
235     {
236       casenumber n_cases = 0;
237       struct ccase c;
238
239       struct casereader *clone = casereader_clone (reader);
240
241       for (; casereader_read (clone, &c); case_destroy (&c))
242         n_cases++;
243
244       casereader_destroy (clone);
245       reader->case_cnt = n_cases;
246     }
247
248   return reader->case_cnt;
249 }
250
251 /* Returns the number of struct values in each case in READER. */
252 size_t
253 casereader_get_value_cnt (struct casereader *reader)
254 {
255   return reader->value_cnt;
256 }
257
258 /* Copies all the cases in READER to WRITER, propagating errors
259    appropriately. */
260 void
261 casereader_transfer (struct casereader *reader, struct casewriter *writer)
262 {
263   struct ccase c;
264
265   taint_propagate (casereader_get_taint (reader),
266                    casewriter_get_taint (writer));
267   while (casereader_read (reader, &c))
268     casewriter_write (writer, &c);
269   casereader_destroy (reader);
270 }
271
272 /* Creates and returns a new casereader.  This function is
273    intended for use by casereader implementations, not by
274    casereader clients.
275
276    This function is most suited for creating a casereader for a
277    data source that is naturally sequential.
278    casereader_create_random may be more appropriate for a data
279    source that supports random access.
280
281    Ordinarily, specify a null pointer for TAINT, in which case
282    the new casereader will have a new, unique taint object.  If
283    the new casereader should have a clone of an existing taint
284    object, specify that object as TAINT.  (This is most commonly
285    useful in an implementation of the "clone" casereader_class
286    function, in which case the cloned casereader should have the
287    same taint object as the original casereader.)
288
289    VALUE_CNT must be the number of struct values per case read
290    from the casereader.
291
292    CASE_CNT is an upper limit on the number of cases that
293    casereader_read will return from the casereader in successive
294    calls.  Ordinarily, this is the actual number of cases in the
295    data source or CASENUMBER_MAX if the number of cases cannot be
296    predicted in advance.
297
298    CLASS and AUX are a set of casereader implementation-specific
299    member functions and auxiliary data to pass to those member
300    functions, respectively. */
301 struct casereader *
302 casereader_create_sequential (const struct taint *taint,
303                               size_t value_cnt, casenumber case_cnt,
304                               const struct casereader_class *class, void *aux)
305 {
306   struct casereader *reader = xmalloc (sizeof *reader);
307   reader->taint = taint != NULL ? taint_clone (taint) : taint_create ();
308   reader->value_cnt = value_cnt;
309   reader->case_cnt = case_cnt;
310   reader->class = class;
311   reader->aux = aux;
312   return reader;
313 }
314 \f
315 /* Random-access casereader implementation.
316
317    This is a set of wrappers around casereader_create_sequential
318    and struct casereader_class to make it easy to create
319    efficient casereaders for data sources that natively support
320    random access. */
321
322 /* One clone of a random reader. */
323 struct random_reader
324   {
325     struct random_reader_shared *shared; /* Data shared among clones. */
326     struct heap_node heap_node; /* Node in shared data's heap of readers. */
327     casenumber offset;          /* Number of cases already read. */
328   };
329
330 /* Returns the random_reader in which the given heap_node is
331    embedded. */
332 static struct random_reader *
333 random_reader_from_heap_node (const struct heap_node *node)
334 {
335   return heap_data (node, struct random_reader, heap_node);
336 }
337
338 /* Data shared among clones of a random reader. */
339 struct random_reader_shared
340   {
341     struct heap *readers;       /* Heap of struct random_readers. */
342     casenumber min_offset;      /* Smallest offset of any random_reader. */
343     const struct casereader_random_class *class;
344     void *aux;
345   };
346
347 static struct casereader_class random_reader_casereader_class;
348
349 /* Creates and returns a new random_reader with the given SHARED
350    data and OFFSET.  Inserts the new random reader into the
351    shared heap. */
352 static struct random_reader *
353 make_random_reader (struct random_reader_shared *shared, casenumber offset)
354 {
355   struct random_reader *br = xmalloc (sizeof *br);
356   br->offset = offset;
357   br->shared = shared;
358   heap_insert (shared->readers, &br->heap_node);
359   return br;
360 }
361
362 /* Compares random_readers A and B by offset and returns a
363    strcmp()-like result. */
364 static int
365 compare_random_readers_by_offset (const struct heap_node *a_,
366                                   const struct heap_node *b_,
367                                   const void *aux UNUSED)
368 {
369   const struct random_reader *a = random_reader_from_heap_node (a_);
370   const struct random_reader *b = random_reader_from_heap_node (b_);
371   return a->offset < b->offset ? -1 : a->offset > b->offset;
372 }
373
374 /* Creates and returns a new casereader.  This function is
375    intended for use by casereader implementations, not by
376    casereader clients.
377
378    This function is most suited for creating a casereader for a
379    data source that supports random access.
380    casereader_create_sequential is more appropriate for a data
381    source that is naturally sequential.
382
383    VALUE_CNT must be the number of struct values per case read
384    from the casereader.
385
386    CASE_CNT is an upper limit on the number of cases that
387    casereader_read will return from the casereader in successive
388    calls.  Ordinarily, this is the actual number of cases in the
389    data source or CASENUMBER_MAX if the number of cases cannot be
390    predicted in advance.
391
392    CLASS and AUX are a set of casereader implementation-specific
393    member functions and auxiliary data to pass to those member
394    functions, respectively. */
395 struct casereader *
396 casereader_create_random (size_t value_cnt, casenumber case_cnt,
397                           const struct casereader_random_class *class,
398                           void *aux)
399 {
400   struct random_reader_shared *shared = xmalloc (sizeof *shared);
401   shared->readers = heap_create (compare_random_readers_by_offset, NULL);
402   shared->class = class;
403   shared->aux = aux;
404   shared->min_offset = 0;
405   return casereader_create_sequential (NULL, value_cnt, case_cnt,
406                                        &random_reader_casereader_class,
407                                        make_random_reader (shared, 0));
408 }
409
410 /* Reassesses the min_offset in SHARED based on the minimum
411    offset in the heap.   */
412 static void
413 advance_random_reader (struct casereader *reader,
414                        struct random_reader_shared *shared)
415 {
416   casenumber old, new;
417
418   old = shared->min_offset;
419   new = random_reader_from_heap_node (heap_minimum (shared->readers))->offset;
420   assert (new >= old);
421   if (new > old)
422     {
423       shared->min_offset = new;
424       shared->class->advance (reader, shared->aux, new - old);
425     }
426 }
427
428 /* struct casereader_class "read" function for random reader. */
429 static bool
430 random_reader_read (struct casereader *reader, void *br_, struct ccase *c)
431 {
432   struct random_reader *br = br_;
433   struct random_reader_shared *shared = br->shared;
434
435   if (shared->class->read (reader, shared->aux,
436                            br->offset - shared->min_offset, c))
437     {
438       br->offset++;
439       heap_changed (shared->readers, &br->heap_node);
440       advance_random_reader (reader, shared);
441       return true;
442     }
443   else
444     return false;
445 }
446
447 /* struct casereader_class "destroy" function for random
448    reader. */
449 static void
450 random_reader_destroy (struct casereader *reader, void *br_)
451 {
452   struct random_reader *br = br_;
453   struct random_reader_shared *shared = br->shared;
454
455   heap_delete (shared->readers, &br->heap_node);
456   if (heap_is_empty (shared->readers))
457     {
458       heap_destroy (shared->readers);
459       shared->class->destroy (reader, shared->aux);
460       free (shared);
461     }
462   else
463     advance_random_reader (reader, shared);
464
465   free (br);
466 }
467
468 /* struct casereader_class "clone" function for random reader. */
469 static struct casereader *
470 random_reader_clone (struct casereader *reader, void *br_)
471 {
472   struct random_reader *br = br_;
473   struct random_reader_shared *shared = br->shared;
474   return casereader_create_sequential (casereader_get_taint (reader),
475                                        casereader_get_value_cnt (reader),
476                                        casereader_get_case_cnt (reader),
477                                        &random_reader_casereader_class,
478                                        make_random_reader (shared,
479                                                            br->offset));
480 }
481
482 /* struct casereader_class "peek" function for random reader. */
483 static bool
484 random_reader_peek (struct casereader *reader, void *br_,
485                     casenumber idx, struct ccase *c)
486 {
487   struct random_reader *br = br_;
488   struct random_reader_shared *shared = br->shared;
489
490   return shared->class->read (reader, shared->aux,
491                               br->offset - shared->min_offset + idx, c);
492 }
493
494 /* Casereader class for random reader. */
495 static struct casereader_class random_reader_casereader_class =
496   {
497     random_reader_read,
498     random_reader_destroy,
499     random_reader_clone,
500     random_reader_peek,
501   };
502 \f
503 /* Buffering shim for implementing clone and peek operations.
504
505    The "clone" and "peek" operations aren't implemented by all
506    types of casereaders, but we have to expose a uniform
507    interface anyhow.  We do this by interposing a buffering
508    casereader on top of the existing casereader on the first call
509    to "clone" or "peek".  The buffering casereader maintains a
510    window of cases that spans the positions of the original
511    casereader and all of its clones (the "clone set"), from the
512    position of the casereader that has read the fewest cases to
513    the position of the casereader that has read the most.
514
515    Thus, if all of the casereaders in the clone set are at
516    approximately the same position, only a few cases are buffered
517    and there is little inefficiency.  If, on the other hand, one
518    casereader is not used to read any cases at all, but another
519    one is used to read all of the cases, the entire contents of
520    the casereader is copied into the buffer.  This still might
521    not be so inefficient, given that case data in memory is
522    shared across multiple identical copies, but in the worst case
523    the window implementation will write cases to disk instead of
524    maintaining them in-memory. */
525
526 /* A buffering shim for a non-clonable or non-peekable
527    casereader. */
528 struct shim
529   {
530     struct casewindow *window;          /* Window of buffered cases. */
531     struct casereader *subreader;       /* Subordinate casereader. */
532   };
533
534 static struct casereader_random_class shim_class;
535
536 /* Interposes a buffering shim atop READER. */
537 static void
538 insert_shim (struct casereader *reader)
539 {
540   size_t value_cnt = casereader_get_value_cnt (reader);
541   casenumber case_cnt = casereader_get_case_cnt (reader);
542   struct shim *b = xmalloc (sizeof *b);
543   b->window = casewindow_create (value_cnt, get_workspace_cases (value_cnt));
544   b->subreader = casereader_create_random (value_cnt, case_cnt,
545                                            &shim_class, b);
546   casereader_swap (reader, b->subreader);
547   taint_propagate (casewindow_get_taint (b->window),
548                    casereader_get_taint (reader));
549   taint_propagate (casereader_get_taint (b->subreader),
550                    casereader_get_taint (reader));
551 }
552
553 /* Ensures that B's window contains at least CASE_CNT cases.
554    Return true if successful, false upon reaching the end of B's
555    subreader or an I/O error. */
556 static bool
557 prime_buffer (struct shim *b, casenumber case_cnt)
558 {
559   while (casewindow_get_case_cnt (b->window) < case_cnt)
560     {
561       struct ccase tmp;
562       if (!casereader_read (b->subreader, &tmp))
563         return false;
564       casewindow_push_head (b->window, &tmp);
565     }
566   return true;
567 }
568
569 /* Reads the case at the given 0-based OFFSET from the front of
570    the window into C.  Returns true if successful, false if
571    OFFSET is beyond the end of file or upon I/O error. */
572 static bool
573 shim_read (struct casereader *reader UNUSED, void *b_,
574            casenumber offset, struct ccase *c)
575 {
576   struct shim *b = b_;
577   return (prime_buffer (b, offset + 1)
578           && casewindow_get_case (b->window, offset, c));
579 }
580
581 /* Destroys B. */
582 static void
583 shim_destroy (struct casereader *reader UNUSED, void *b_)
584 {
585   struct shim *b = b_;
586   casewindow_destroy (b->window);
587   casereader_destroy (b->subreader);
588   free (b);
589 }
590
591 /* Discards CNT cases from the front of B's window. */
592 static void
593 shim_advance (struct casereader *reader UNUSED, void *b_, casenumber case_cnt)
594 {
595   struct shim *b = b_;
596   casewindow_pop_tail (b->window, case_cnt);
597 }
598
599 /* Class for the buffered reader. */
600 static struct casereader_random_class shim_class =
601   {
602     shim_read,
603     shim_destroy,
604     shim_advance,
605   };