Change "union value" to dynamically allocate long strings.
[pspp-builds.git] / src / data / casereader.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2007, 2009 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <data/casereader.h>
20 #include <data/casereader-provider.h>
21
22 #include <stdlib.h>
23
24 #include <data/casewindow.h>
25 #include <data/casewriter.h>
26 #include <data/settings.h>
27 #include <libpspp/assertion.h>
28 #include <libpspp/heap.h>
29 #include <libpspp/taint.h>
30
31 #include "xalloc.h"
32
33 /* A casereader. */
34 struct casereader
35   {
36     struct taint *taint;                  /* Corrupted? */
37     struct caseproto *proto;              /* Format of contained cases. */
38     casenumber case_cnt;                  /* Number of cases,
39                                              CASENUMBER_MAX if unknown. */
40     const struct casereader_class *class; /* Class. */
41     void *aux;                            /* Auxiliary data for class. */
42   };
43
44 static void insert_shim (struct casereader *);
45
46 /* Reads and returns the next case from READER.  The caller owns
47    the returned case and must call case_unref on it when its data
48    is no longer needed.  Returns a null pointer if cases have
49    been exhausted or upon detection of an I/O error.
50
51    The case returned is effectively consumed: it can never be
52    read again through READER.  If this is inconvenient, READER
53    may be cloned in advance with casereader_clone, or
54    casereader_peek may be used instead. */
55 struct ccase *
56 casereader_read (struct casereader *reader)
57 {
58   if (reader->case_cnt != 0)
59     {
60       /* ->read may use casereader_swap to replace itself by
61          another reader and then delegate to that reader by
62          recursively calling casereader_read.  Currently only
63          lazy_casereader does this and, with luck, nothing else
64          ever will.
65
66          To allow this to work, however, we must decrement
67          case_cnt before calling ->read.  If we decremented
68          case_cnt after calling ->read, then this would actually
69          drop two cases from case_cnt instead of one, and we'd
70          lose the last case in the casereader. */
71       struct ccase *c;
72       if (reader->case_cnt != CASENUMBER_MAX)
73         reader->case_cnt--;
74       c = reader->class->read (reader, reader->aux);
75       if (c != NULL)
76         {
77           size_t n_widths UNUSED = caseproto_get_n_widths (reader->proto);
78           assert (case_get_value_cnt (c) >= n_widths);
79           expensive_assert (caseproto_equal (case_get_proto (c), 0,
80                                              reader->proto, 0, n_widths));
81           return c;
82         }
83     }
84   reader->case_cnt = 0;
85   return NULL;
86 }
87
88 /* Destroys READER.
89    Returns false if an I/O error was detected on READER, true
90    otherwise. */
91 bool
92 casereader_destroy (struct casereader *reader)
93 {
94   bool ok = true;
95   if (reader != NULL)
96     {
97       reader->class->destroy (reader, reader->aux);
98       ok = taint_destroy (reader->taint);
99       caseproto_unref (reader->proto);
100       free (reader);
101     }
102   return ok;
103 }
104
105 /* Returns a clone of READER.  READER and its clone may be used
106    to read the same sequence of cases in the same order, barring
107    I/O errors. */
108 struct casereader *
109 casereader_clone (const struct casereader *reader_)
110 {
111   struct casereader *reader = (struct casereader *) reader_;
112   struct casereader *clone;
113   if ( reader == NULL ) 
114     return NULL;
115
116   if (reader->class->clone == NULL)
117     insert_shim (reader);
118   clone = reader->class->clone (reader, reader->aux);
119   assert (clone != NULL);
120   assert (clone != reader);
121   return clone;
122 }
123
124 /* Makes a copy of ORIGINAL into *NEW1 (if NEW1 is non-null) and
125    *NEW2 (if NEW2 is non-null), then destroys ORIGINAL. */
126 void
127 casereader_split (struct casereader *original,
128                   struct casereader **new1, struct casereader **new2)
129 {
130   if (new1 != NULL && new2 != NULL)
131     {
132       *new1 = casereader_rename (original);
133       *new2 = casereader_clone (*new1);
134     }
135   else if (new1 != NULL)
136     *new1 = casereader_rename (original);
137   else if (new2 != NULL)
138     *new2 = casereader_rename (original);
139   else
140     casereader_destroy (original);
141 }
142
143 /* Returns a copy of READER, which is itself destroyed.
144    Useful for taking over ownership of a casereader, to enforce
145    preventing the original owner from accessing the casereader
146    again. */
147 struct casereader *
148 casereader_rename (struct casereader *reader)
149 {
150   struct casereader *new = xmemdup (reader, sizeof *reader);
151   free (reader);
152   return new;
153 }
154
155 /* Exchanges the casereaders referred to by A and B. */
156 void
157 casereader_swap (struct casereader *a, struct casereader *b)
158 {
159   if (a != b)
160     {
161       struct casereader tmp = *a;
162       *a = *b;
163       *b = tmp;
164     }
165 }
166
167 /* Reads and returns the (IDX + 1)'th case from READER.  The
168    caller owns the returned case and must call case_unref on it
169    when it is no longer needed.  Returns a null pointer if cases
170    have been exhausted or upon detection of an I/O error. */
171 struct ccase *
172 casereader_peek (struct casereader *reader, casenumber idx)
173 {
174   if (idx < reader->case_cnt)
175     {
176       struct ccase *c;
177       if (reader->class->peek == NULL)
178         insert_shim (reader);
179       c = reader->class->peek (reader, reader->aux, idx);
180       if (c != NULL)
181         return c;
182       else if (casereader_error (reader))
183         reader->case_cnt = 0;
184     }
185   if (reader->case_cnt > idx)
186     reader->case_cnt = idx;
187   return NULL;
188 }
189
190 /* Returns true if no cases remain to be read from READER, or if
191    an error has occurred on READER.  (A return value of false
192    does *not* mean that the next call to casereader_peek or
193    casereader_read will return true, because an error can occur
194    in the meantime.) */
195 bool
196 casereader_is_empty (struct casereader *reader)
197 {
198   if (reader->case_cnt == 0)
199     return true;
200   else
201     {
202       struct ccase *c = casereader_peek (reader, 0);
203       if (c == NULL)
204         return true;
205       else
206         {
207           case_unref (c);
208           return false;
209         }
210     }
211 }
212
213 /* Returns true if an I/O error or another hard error has
214    occurred on READER, a clone of READER, or on some object on
215    which READER's data has a dependency, false otherwise. */
216 bool
217 casereader_error (const struct casereader *reader)
218 {
219   return taint_is_tainted (reader->taint);
220 }
221
222 /* Marks READER as having encountered an error.
223
224    Ordinarily, this function should be called by the
225    implementation of a casereader, not by the casereader's
226    client.  Instead, casereader clients should usually ensure
227    that a casereader's error state is correct by using
228    taint_propagate to propagate to the casereader's taint
229    structure, which may be obtained via casereader_get_taint. */
230 void
231 casereader_force_error (struct casereader *reader)
232 {
233   taint_set_taint (reader->taint);
234 }
235
236 /* Returns READER's associate taint object, for use with
237    taint_propagate and other taint functions. */
238 const struct taint *
239 casereader_get_taint (const struct casereader *reader)
240 {
241   return reader->taint;
242 }
243
244 /* Returns the number of cases that will be read by successive
245    calls to casereader_read for READER, assuming that no errors
246    occur.  Upon an error condition, the case count drops to 0, so
247    that no more cases can be obtained.
248
249    Not all casereaders can predict the number of cases that they
250    will produce without actually reading all of them.  In that
251    case, this function returns CASENUMBER_MAX.  To obtain the
252    actual number of cases in such a casereader, use
253    casereader_count_cases. */
254 casenumber
255 casereader_get_case_cnt (struct casereader *reader)
256 {
257   return reader->case_cnt;
258 }
259
260 /* Returns the number of cases that will be read by successive
261    calls to casereader_read for READER, assuming that no errors
262    occur.  Upon an error condition, the case count drops to 0, so
263    that no more cases can be obtained.
264
265    For a casereader that cannot predict the number of cases it
266    will produce, this function actually reads (and discards) all
267    of the contents of a clone of READER.  Thus, the return value
268    is always correct in the absence of I/O errors. */
269 casenumber
270 casereader_count_cases (struct casereader *reader)
271 {
272   if (reader->case_cnt == CASENUMBER_MAX)
273     {
274       casenumber n_cases = 0;
275       struct ccase *c;
276
277       struct casereader *clone = casereader_clone (reader);
278
279       for (; (c = casereader_read (clone)) != NULL; case_unref (c))
280         n_cases++;
281
282       casereader_destroy (clone);
283       reader->case_cnt = n_cases;
284     }
285
286   return reader->case_cnt;
287 }
288
289 /* Returns the prototype for the cases in READER.  The caller
290    must not unref the returned prototype. */
291 const struct caseproto *
292 casereader_get_proto (const struct casereader *reader)
293 {
294   return reader->proto;
295 }
296
297 /* Copies all the cases in READER to WRITER, propagating errors
298    appropriately. */
299 void
300 casereader_transfer (struct casereader *reader, struct casewriter *writer)
301 {
302   struct ccase *c;
303
304   taint_propagate (casereader_get_taint (reader),
305                    casewriter_get_taint (writer));
306   while ((c = casereader_read (reader)) != NULL)
307     casewriter_write (writer, c);
308   casereader_destroy (reader);
309 }
310
311 /* Creates and returns a new casereader.  This function is
312    intended for use by casereader implementations, not by
313    casereader clients.
314
315    This function is most suited for creating a casereader for a
316    data source that is naturally sequential.
317    casereader_create_random may be more appropriate for a data
318    source that supports random access.
319
320    Ordinarily, specify a null pointer for TAINT, in which case
321    the new casereader will have a new, unique taint object.  If
322    the new casereader should have a clone of an existing taint
323    object, specify that object as TAINT.  (This is most commonly
324    useful in an implementation of the "clone" casereader_class
325    function, in which case the cloned casereader should have the
326    same taint object as the original casereader.)
327
328    PROTO must be the prototype for the cases that may be read
329    from the casereader.  The caller retains its reference to
330    PROTO.
331
332    CASE_CNT is an upper limit on the number of cases that
333    casereader_read will return from the casereader in successive
334    calls.  Ordinarily, this is the actual number of cases in the
335    data source or CASENUMBER_MAX if the number of cases cannot be
336    predicted in advance.
337
338    CLASS and AUX are a set of casereader implementation-specific
339    member functions and auxiliary data to pass to those member
340    functions, respectively. */
341 struct casereader *
342 casereader_create_sequential (const struct taint *taint,
343                               const struct caseproto *proto,
344                               casenumber case_cnt,
345                               const struct casereader_class *class, void *aux)
346 {
347   struct casereader *reader = xmalloc (sizeof *reader);
348   reader->taint = taint != NULL ? taint_clone (taint) : taint_create ();
349   reader->proto = caseproto_ref (proto);
350   reader->case_cnt = case_cnt;
351   reader->class = class;
352   reader->aux = aux;
353   return reader;
354 }
355
356 /* If READER is a casereader of the given CLASS, returns its
357    associated auxiliary data; otherwise, returns a null pointer.
358
359    This function is intended for use from casereader
360    implementations, not by casereader users.  Even within
361    casereader implementations, its usefulness is quite limited,
362    for at least two reasons.  First, every casereader member
363    function already receives a pointer to the casereader's
364    auxiliary data.  Second, a casereader's class can change
365    (through a call to casereader_swap) and this is in practice
366    quite common (e.g. any call to casereader_clone on a
367    casereader that does not directly support clone will cause the
368    casereader to be replaced by a shim caseader). */
369 void *
370 casereader_dynamic_cast (struct casereader *reader,
371                          const struct casereader_class *class)
372 {
373   return reader->class == class ? reader->aux : NULL;
374 }
375 \f
376 /* Random-access casereader implementation.
377
378    This is a set of wrappers around casereader_create_sequential
379    and struct casereader_class to make it easy to create
380    efficient casereaders for data sources that natively support
381    random access. */
382
383 /* One clone of a random reader. */
384 struct random_reader
385   {
386     struct random_reader_shared *shared; /* Data shared among clones. */
387     struct heap_node heap_node; /* Node in shared data's heap of readers. */
388     casenumber offset;          /* Number of cases already read. */
389   };
390
391 /* Returns the random_reader in which the given heap_node is
392    embedded. */
393 static struct random_reader *
394 random_reader_from_heap_node (const struct heap_node *node)
395 {
396   return heap_data (node, struct random_reader, heap_node);
397 }
398
399 /* Data shared among clones of a random reader. */
400 struct random_reader_shared
401   {
402     struct heap *readers;       /* Heap of struct random_readers. */
403     casenumber min_offset;      /* Smallest offset of any random_reader. */
404     const struct casereader_random_class *class;
405     void *aux;
406   };
407
408 static const struct casereader_class random_reader_casereader_class;
409
410 /* Creates and returns a new random_reader with the given SHARED
411    data and OFFSET.  Inserts the new random reader into the
412    shared heap. */
413 static struct random_reader *
414 make_random_reader (struct random_reader_shared *shared, casenumber offset)
415 {
416   struct random_reader *br = xmalloc (sizeof *br);
417   br->offset = offset;
418   br->shared = shared;
419   heap_insert (shared->readers, &br->heap_node);
420   return br;
421 }
422
423 /* Compares random_readers A and B by offset and returns a
424    strcmp()-like result. */
425 static int
426 compare_random_readers_by_offset (const struct heap_node *a_,
427                                   const struct heap_node *b_,
428                                   const void *aux UNUSED)
429 {
430   const struct random_reader *a = random_reader_from_heap_node (a_);
431   const struct random_reader *b = random_reader_from_heap_node (b_);
432   return a->offset < b->offset ? -1 : a->offset > b->offset;
433 }
434
435 /* Creates and returns a new casereader.  This function is
436    intended for use by casereader implementations, not by
437    casereader clients.
438
439    This function is most suited for creating a casereader for a
440    data source that supports random access.
441    casereader_create_sequential is more appropriate for a data
442    source that is naturally sequential.
443
444    PROTO must be the prototype for the cases that may be read
445    from the casereader.  The caller retains its reference to
446    PROTO.
447
448    CASE_CNT is an upper limit on the number of cases that
449    casereader_read will return from the casereader in successive
450    calls.  Ordinarily, this is the actual number of cases in the
451    data source or CASENUMBER_MAX if the number of cases cannot be
452    predicted in advance.
453
454    CLASS and AUX are a set of casereader implementation-specific
455    member functions and auxiliary data to pass to those member
456    functions, respectively. */
457 struct casereader *
458 casereader_create_random (const struct caseproto *proto, casenumber case_cnt,
459                           const struct casereader_random_class *class,
460                           void *aux)
461 {
462   struct random_reader_shared *shared = xmalloc (sizeof *shared);
463   shared->readers = heap_create (compare_random_readers_by_offset, NULL);
464   shared->class = class;
465   shared->aux = aux;
466   shared->min_offset = 0;
467   return casereader_create_sequential (NULL, proto, case_cnt,
468                                        &random_reader_casereader_class,
469                                        make_random_reader (shared, 0));
470 }
471
472 /* Reassesses the min_offset in SHARED based on the minimum
473    offset in the heap.   */
474 static void
475 advance_random_reader (struct casereader *reader,
476                        struct random_reader_shared *shared)
477 {
478   casenumber old, new;
479
480   old = shared->min_offset;
481   new = random_reader_from_heap_node (heap_minimum (shared->readers))->offset;
482   assert (new >= old);
483   if (new > old)
484     {
485       shared->min_offset = new;
486       shared->class->advance (reader, shared->aux, new - old);
487     }
488 }
489
490 /* struct casereader_class "read" function for random reader. */
491 static struct ccase *
492 random_reader_read (struct casereader *reader, void *br_)
493 {
494   struct random_reader *br = br_;
495   struct random_reader_shared *shared = br->shared;
496   struct ccase *c = shared->class->read (reader, shared->aux,
497                                          br->offset - shared->min_offset);
498   if (c != NULL)
499     {
500       br->offset++;
501       heap_changed (shared->readers, &br->heap_node);
502       advance_random_reader (reader, shared);
503     }
504   return c;
505 }
506
507 /* struct casereader_class "destroy" function for random
508    reader. */
509 static void
510 random_reader_destroy (struct casereader *reader, void *br_)
511 {
512   struct random_reader *br = br_;
513   struct random_reader_shared *shared = br->shared;
514
515   heap_delete (shared->readers, &br->heap_node);
516   if (heap_is_empty (shared->readers))
517     {
518       heap_destroy (shared->readers);
519       shared->class->destroy (reader, shared->aux);
520       free (shared);
521     }
522   else
523     advance_random_reader (reader, shared);
524
525   free (br);
526 }
527
528 /* struct casereader_class "clone" function for random reader. */
529 static struct casereader *
530 random_reader_clone (struct casereader *reader, void *br_)
531 {
532   struct random_reader *br = br_;
533   struct random_reader_shared *shared = br->shared;
534   return casereader_create_sequential (casereader_get_taint (reader),
535                                        reader->proto,
536                                        casereader_get_case_cnt (reader),
537                                        &random_reader_casereader_class,
538                                        make_random_reader (shared,
539                                                            br->offset));
540 }
541
542 /* struct casereader_class "peek" function for random reader. */
543 static struct ccase *
544 random_reader_peek (struct casereader *reader, void *br_, casenumber idx)
545 {
546   struct random_reader *br = br_;
547   struct random_reader_shared *shared = br->shared;
548
549   return shared->class->read (reader, shared->aux,
550                               br->offset - shared->min_offset + idx);
551 }
552
553 /* Casereader class for random reader. */
554 static const struct casereader_class random_reader_casereader_class =
555   {
556     random_reader_read,
557     random_reader_destroy,
558     random_reader_clone,
559     random_reader_peek,
560   };
561 \f
562 /* Buffering shim for implementing clone and peek operations.
563
564    The "clone" and "peek" operations aren't implemented by all
565    types of casereaders, but we have to expose a uniform
566    interface anyhow.  We do this by interposing a buffering
567    casereader on top of the existing casereader on the first call
568    to "clone" or "peek".  The buffering casereader maintains a
569    window of cases that spans the positions of the original
570    casereader and all of its clones (the "clone set"), from the
571    position of the casereader that has read the fewest cases to
572    the position of the casereader that has read the most.
573
574    Thus, if all of the casereaders in the clone set are at
575    approximately the same position, only a few cases are buffered
576    and there is little inefficiency.  If, on the other hand, one
577    casereader is not used to read any cases at all, but another
578    one is used to read all of the cases, the entire contents of
579    the casereader is copied into the buffer.  This still might
580    not be so inefficient, given that case data in memory is
581    shared across multiple identical copies, but in the worst case
582    the window implementation will write cases to disk instead of
583    maintaining them in-memory. */
584
585 /* A buffering shim for a non-clonable or non-peekable
586    casereader. */
587 struct shim
588   {
589     struct casewindow *window;          /* Window of buffered cases. */
590     struct casereader *subreader;       /* Subordinate casereader. */
591   };
592
593 static const struct casereader_random_class shim_class;
594
595 /* Interposes a buffering shim atop READER. */
596 static void
597 insert_shim (struct casereader *reader)
598 {
599   const struct caseproto *proto = casereader_get_proto (reader);
600   casenumber case_cnt = casereader_get_case_cnt (reader);
601   struct shim *b = xmalloc (sizeof *b);
602   b->window = casewindow_create (proto, settings_get_workspace_cases (proto));
603   b->subreader = casereader_create_random (proto, case_cnt, &shim_class, b);
604   casereader_swap (reader, b->subreader);
605   taint_propagate (casewindow_get_taint (b->window),
606                    casereader_get_taint (reader));
607   taint_propagate (casereader_get_taint (b->subreader),
608                    casereader_get_taint (reader));
609 }
610
611 /* Ensures that B's window contains at least CASE_CNT cases.
612    Return true if successful, false upon reaching the end of B's
613    subreader or an I/O error. */
614 static bool
615 prime_buffer (struct shim *b, casenumber case_cnt)
616 {
617   while (casewindow_get_case_cnt (b->window) < case_cnt)
618     {
619       struct ccase *tmp = casereader_read (b->subreader);
620       if (tmp == NULL)
621         return false;
622       casewindow_push_head (b->window, tmp);
623     }
624   return true;
625 }
626
627 /* Reads the case at the given 0-based OFFSET from the front of
628    the window into C.  Returns the case if successful, or a null
629    pointer if OFFSET is beyond the end of file or upon I/O error.
630    The caller must call case_unref() on the returned case when it
631    is no longer needed. */
632 static struct ccase *
633 shim_read (struct casereader *reader UNUSED, void *b_,
634            casenumber offset)
635 {
636   struct shim *b = b_;
637   if (!prime_buffer (b, offset + 1))
638     return NULL;
639   return casewindow_get_case (b->window, offset);
640 }
641
642 /* Destroys B. */
643 static void
644 shim_destroy (struct casereader *reader UNUSED, void *b_)
645 {
646   struct shim *b = b_;
647   casewindow_destroy (b->window);
648   casereader_destroy (b->subreader);
649   free (b);
650 }
651
652 /* Discards CNT cases from the front of B's window. */
653 static void
654 shim_advance (struct casereader *reader UNUSED, void *b_, casenumber case_cnt)
655 {
656   struct shim *b = b_;
657   casewindow_pop_tail (b->window, case_cnt);
658 }
659
660 /* Class for the buffered reader. */
661 static const struct casereader_random_class shim_class =
662   {
663     shim_read,
664     shim_destroy,
665     shim_advance,
666   };