casereader: New function casereader_truncate().
[pspp-builds.git] / src / data / casereader.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2007, 2009 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <data/casereader.h>
20 #include <data/casereader-provider.h>
21
22 #include <stdlib.h>
23
24 #include <data/casewindow.h>
25 #include <data/casewriter.h>
26 #include <data/settings.h>
27 #include <libpspp/assertion.h>
28 #include <libpspp/heap.h>
29 #include <libpspp/taint.h>
30
31 #include "xalloc.h"
32
33 /* A casereader. */
34 struct casereader
35   {
36     struct taint *taint;                  /* Corrupted? */
37     struct caseproto *proto;              /* Format of contained cases. */
38     casenumber case_cnt;                  /* Number of cases,
39                                              CASENUMBER_MAX if unknown. */
40     const struct casereader_class *class; /* Class. */
41     void *aux;                            /* Auxiliary data for class. */
42   };
43
44 static void insert_shim (struct casereader *);
45
46 /* Reads and returns the next case from READER.  The caller owns
47    the returned case and must call case_unref on it when its data
48    is no longer needed.  Returns a null pointer if cases have
49    been exhausted or upon detection of an I/O error.
50
51    The case returned is effectively consumed: it can never be
52    read again through READER.  If this is inconvenient, READER
53    may be cloned in advance with casereader_clone, or
54    casereader_peek may be used instead. */
55 struct ccase *
56 casereader_read (struct casereader *reader)
57 {
58   if (reader->case_cnt != 0)
59     {
60       /* ->read may use casereader_swap to replace itself by
61          another reader and then delegate to that reader by
62          recursively calling casereader_read.  Currently only
63          lazy_casereader does this and, with luck, nothing else
64          ever will.
65
66          To allow this to work, however, we must decrement
67          case_cnt before calling ->read.  If we decremented
68          case_cnt after calling ->read, then this would actually
69          drop two cases from case_cnt instead of one, and we'd
70          lose the last case in the casereader. */
71       struct ccase *c;
72       if (reader->case_cnt != CASENUMBER_MAX)
73         reader->case_cnt--;
74       c = reader->class->read (reader, reader->aux);
75       if (c != NULL)
76         {
77           size_t n_widths UNUSED = caseproto_get_n_widths (reader->proto);
78           assert (case_get_value_cnt (c) >= n_widths);
79           expensive_assert (caseproto_equal (case_get_proto (c), 0,
80                                              reader->proto, 0, n_widths));
81           return c;
82         }
83     }
84   reader->case_cnt = 0;
85   return NULL;
86 }
87
88 /* Destroys READER.
89    Returns false if an I/O error was detected on READER, true
90    otherwise. */
91 bool
92 casereader_destroy (struct casereader *reader)
93 {
94   bool ok = true;
95   if (reader != NULL)
96     {
97       reader->class->destroy (reader, reader->aux);
98       ok = taint_destroy (reader->taint);
99       caseproto_unref (reader->proto);
100       free (reader);
101     }
102   return ok;
103 }
104
105 /* Returns a clone of READER.  READER and its clone may be used
106    to read the same sequence of cases in the same order, barring
107    I/O errors. */
108 struct casereader *
109 casereader_clone (const struct casereader *reader_)
110 {
111   struct casereader *reader = CONST_CAST (struct casereader *, reader_);
112   struct casereader *clone;
113   if ( reader == NULL ) 
114     return NULL;
115
116   if (reader->class->clone == NULL)
117     insert_shim (reader);
118   clone = reader->class->clone (reader, reader->aux);
119   assert (clone != NULL);
120   assert (clone != reader);
121   return clone;
122 }
123
124 /* Makes a copy of ORIGINAL into *NEW1 (if NEW1 is non-null) and
125    *NEW2 (if NEW2 is non-null), then destroys ORIGINAL. */
126 void
127 casereader_split (struct casereader *original,
128                   struct casereader **new1, struct casereader **new2)
129 {
130   if (new1 != NULL && new2 != NULL)
131     {
132       *new1 = casereader_rename (original);
133       *new2 = casereader_clone (*new1);
134     }
135   else if (new1 != NULL)
136     *new1 = casereader_rename (original);
137   else if (new2 != NULL)
138     *new2 = casereader_rename (original);
139   else
140     casereader_destroy (original);
141 }
142
143 /* Returns a copy of READER, which is itself destroyed.
144    Useful for taking over ownership of a casereader, to enforce
145    preventing the original owner from accessing the casereader
146    again. */
147 struct casereader *
148 casereader_rename (struct casereader *reader)
149 {
150   struct casereader *new = xmemdup (reader, sizeof *reader);
151   free (reader);
152   return new;
153 }
154
155 /* Exchanges the casereaders referred to by A and B. */
156 void
157 casereader_swap (struct casereader *a, struct casereader *b)
158 {
159   if (a != b)
160     {
161       struct casereader tmp = *a;
162       *a = *b;
163       *b = tmp;
164     }
165 }
166
167 /* Reads and returns the (IDX + 1)'th case from READER.  The
168    caller owns the returned case and must call case_unref on it
169    when it is no longer needed.  Returns a null pointer if cases
170    have been exhausted or upon detection of an I/O error. */
171 struct ccase *
172 casereader_peek (struct casereader *reader, casenumber idx)
173 {
174   if (idx < reader->case_cnt)
175     {
176       struct ccase *c;
177       if (reader->class->peek == NULL)
178         insert_shim (reader);
179       c = reader->class->peek (reader, reader->aux, idx);
180       if (c != NULL)
181         return c;
182       else if (casereader_error (reader))
183         reader->case_cnt = 0;
184     }
185   if (reader->case_cnt > idx)
186     reader->case_cnt = idx;
187   return NULL;
188 }
189
190 /* Returns true if no cases remain to be read from READER, or if
191    an error has occurred on READER.  (A return value of false
192    does *not* mean that the next call to casereader_peek or
193    casereader_read will return true, because an error can occur
194    in the meantime.) */
195 bool
196 casereader_is_empty (struct casereader *reader)
197 {
198   if (reader->case_cnt == 0)
199     return true;
200   else
201     {
202       struct ccase *c = casereader_peek (reader, 0);
203       if (c == NULL)
204         return true;
205       else
206         {
207           case_unref (c);
208           return false;
209         }
210     }
211 }
212
213 /* Returns true if an I/O error or another hard error has
214    occurred on READER, a clone of READER, or on some object on
215    which READER's data has a dependency, false otherwise. */
216 bool
217 casereader_error (const struct casereader *reader)
218 {
219   return taint_is_tainted (reader->taint);
220 }
221
222 /* Marks READER as having encountered an error.
223
224    Ordinarily, this function should be called by the
225    implementation of a casereader, not by the casereader's
226    client.  Instead, casereader clients should usually ensure
227    that a casereader's error state is correct by using
228    taint_propagate to propagate to the casereader's taint
229    structure, which may be obtained via casereader_get_taint. */
230 void
231 casereader_force_error (struct casereader *reader)
232 {
233   taint_set_taint (reader->taint);
234 }
235
236 /* Returns READER's associate taint object, for use with
237    taint_propagate and other taint functions. */
238 const struct taint *
239 casereader_get_taint (const struct casereader *reader)
240 {
241   return reader->taint;
242 }
243
244 /* Returns the number of cases that will be read by successive
245    calls to casereader_read for READER, assuming that no errors
246    occur.  Upon an error condition, the case count drops to 0, so
247    that no more cases can be obtained.
248
249    Not all casereaders can predict the number of cases that they
250    will produce without actually reading all of them.  In that
251    case, this function returns CASENUMBER_MAX.  To obtain the
252    actual number of cases in such a casereader, use
253    casereader_count_cases. */
254 casenumber
255 casereader_get_case_cnt (struct casereader *reader)
256 {
257   return reader->case_cnt;
258 }
259
260 static casenumber
261 casereader_count_cases__ (struct casereader *reader, casenumber max_cases)
262 {
263   struct casereader *clone;
264   casenumber n_cases;
265
266   clone = casereader_clone (reader);
267   for (n_cases = 0; n_cases < max_cases; n_cases++)
268     {
269       struct ccase *c = casereader_read (clone);
270       if (c == NULL)
271         break;
272       case_unref (c);
273     }
274   casereader_destroy (clone);
275
276   return n_cases;
277 }
278
279 /* Returns the number of cases that will be read by successive
280    calls to casereader_read for READER, assuming that no errors
281    occur.  Upon an error condition, the case count drops to 0, so
282    that no more cases can be obtained.
283
284    For a casereader that cannot predict the number of cases it
285    will produce, this function actually reads (and discards) all
286    of the contents of a clone of READER.  Thus, the return value
287    is always correct in the absence of I/O errors. */
288 casenumber
289 casereader_count_cases (struct casereader *reader)
290 {
291   if (reader->case_cnt == CASENUMBER_MAX)
292     reader->case_cnt = casereader_count_cases__ (reader, CASENUMBER_MAX);
293   return reader->case_cnt;
294 }
295
296 /* Truncates READER to at most N cases. */
297 void
298 casereader_truncate (struct casereader *reader, casenumber n)
299 {
300   /* This could be optimized, if it ever becomes too expensive, by adding a
301      "max_cases" member to struct casereader.  We could also add a "truncate"
302      function to the casereader implementation, to allow the casereader to
303      throw away data that cannot ever be read. */
304   if (reader->case_cnt == CASENUMBER_MAX)
305     reader->case_cnt = casereader_count_cases__ (reader, n);
306   if (reader->case_cnt > n)
307     reader->case_cnt = n;
308 }
309
310 /* Returns the prototype for the cases in READER.  The caller
311    must not unref the returned prototype. */
312 const struct caseproto *
313 casereader_get_proto (const struct casereader *reader)
314 {
315   return reader->proto;
316 }
317
318 /* Copies all the cases in READER to WRITER, propagating errors
319    appropriately. */
320 void
321 casereader_transfer (struct casereader *reader, struct casewriter *writer)
322 {
323   struct ccase *c;
324
325   taint_propagate (casereader_get_taint (reader),
326                    casewriter_get_taint (writer));
327   while ((c = casereader_read (reader)) != NULL)
328     casewriter_write (writer, c);
329   casereader_destroy (reader);
330 }
331
332 /* Creates and returns a new casereader.  This function is
333    intended for use by casereader implementations, not by
334    casereader clients.
335
336    This function is most suited for creating a casereader for a
337    data source that is naturally sequential.
338    casereader_create_random may be more appropriate for a data
339    source that supports random access.
340
341    Ordinarily, specify a null pointer for TAINT, in which case
342    the new casereader will have a new, unique taint object.  If
343    the new casereader should have a clone of an existing taint
344    object, specify that object as TAINT.  (This is most commonly
345    useful in an implementation of the "clone" casereader_class
346    function, in which case the cloned casereader should have the
347    same taint object as the original casereader.)
348
349    PROTO must be the prototype for the cases that may be read
350    from the casereader.  The caller retains its reference to
351    PROTO.
352
353    CASE_CNT is an upper limit on the number of cases that
354    casereader_read will return from the casereader in successive
355    calls.  Ordinarily, this is the actual number of cases in the
356    data source or CASENUMBER_MAX if the number of cases cannot be
357    predicted in advance.
358
359    CLASS and AUX are a set of casereader implementation-specific
360    member functions and auxiliary data to pass to those member
361    functions, respectively. */
362 struct casereader *
363 casereader_create_sequential (const struct taint *taint,
364                               const struct caseproto *proto,
365                               casenumber case_cnt,
366                               const struct casereader_class *class, void *aux)
367 {
368   struct casereader *reader = xmalloc (sizeof *reader);
369   reader->taint = taint != NULL ? taint_clone (taint) : taint_create ();
370   reader->proto = caseproto_ref (proto);
371   reader->case_cnt = case_cnt;
372   reader->class = class;
373   reader->aux = aux;
374   return reader;
375 }
376
377 /* If READER is a casereader of the given CLASS, returns its
378    associated auxiliary data; otherwise, returns a null pointer.
379
380    This function is intended for use from casereader
381    implementations, not by casereader users.  Even within
382    casereader implementations, its usefulness is quite limited,
383    for at least two reasons.  First, every casereader member
384    function already receives a pointer to the casereader's
385    auxiliary data.  Second, a casereader's class can change
386    (through a call to casereader_swap) and this is in practice
387    quite common (e.g. any call to casereader_clone on a
388    casereader that does not directly support clone will cause the
389    casereader to be replaced by a shim caseader). */
390 void *
391 casereader_dynamic_cast (struct casereader *reader,
392                          const struct casereader_class *class)
393 {
394   return reader->class == class ? reader->aux : NULL;
395 }
396 \f
397 /* Random-access casereader implementation.
398
399    This is a set of wrappers around casereader_create_sequential
400    and struct casereader_class to make it easy to create
401    efficient casereaders for data sources that natively support
402    random access. */
403
404 /* One clone of a random reader. */
405 struct random_reader
406   {
407     struct random_reader_shared *shared; /* Data shared among clones. */
408     struct heap_node heap_node; /* Node in shared data's heap of readers. */
409     casenumber offset;          /* Number of cases already read. */
410   };
411
412 /* Returns the random_reader in which the given heap_node is
413    embedded. */
414 static struct random_reader *
415 random_reader_from_heap_node (const struct heap_node *node)
416 {
417   return heap_data (node, struct random_reader, heap_node);
418 }
419
420 /* Data shared among clones of a random reader. */
421 struct random_reader_shared
422   {
423     struct heap *readers;       /* Heap of struct random_readers. */
424     casenumber min_offset;      /* Smallest offset of any random_reader. */
425     const struct casereader_random_class *class;
426     void *aux;
427   };
428
429 static const struct casereader_class random_reader_casereader_class;
430
431 /* Creates and returns a new random_reader with the given SHARED
432    data and OFFSET.  Inserts the new random reader into the
433    shared heap. */
434 static struct random_reader *
435 make_random_reader (struct random_reader_shared *shared, casenumber offset)
436 {
437   struct random_reader *br = xmalloc (sizeof *br);
438   br->offset = offset;
439   br->shared = shared;
440   heap_insert (shared->readers, &br->heap_node);
441   return br;
442 }
443
444 /* Compares random_readers A and B by offset and returns a
445    strcmp()-like result. */
446 static int
447 compare_random_readers_by_offset (const struct heap_node *a_,
448                                   const struct heap_node *b_,
449                                   const void *aux UNUSED)
450 {
451   const struct random_reader *a = random_reader_from_heap_node (a_);
452   const struct random_reader *b = random_reader_from_heap_node (b_);
453   return a->offset < b->offset ? -1 : a->offset > b->offset;
454 }
455
456 /* Creates and returns a new casereader.  This function is
457    intended for use by casereader implementations, not by
458    casereader clients.
459
460    This function is most suited for creating a casereader for a
461    data source that supports random access.
462    casereader_create_sequential is more appropriate for a data
463    source that is naturally sequential.
464
465    PROTO must be the prototype for the cases that may be read
466    from the casereader.  The caller retains its reference to
467    PROTO.
468
469    CASE_CNT is an upper limit on the number of cases that
470    casereader_read will return from the casereader in successive
471    calls.  Ordinarily, this is the actual number of cases in the
472    data source or CASENUMBER_MAX if the number of cases cannot be
473    predicted in advance.
474
475    CLASS and AUX are a set of casereader implementation-specific
476    member functions and auxiliary data to pass to those member
477    functions, respectively. */
478 struct casereader *
479 casereader_create_random (const struct caseproto *proto, casenumber case_cnt,
480                           const struct casereader_random_class *class,
481                           void *aux)
482 {
483   struct random_reader_shared *shared = xmalloc (sizeof *shared);
484   shared->readers = heap_create (compare_random_readers_by_offset, NULL);
485   shared->class = class;
486   shared->aux = aux;
487   shared->min_offset = 0;
488   return casereader_create_sequential (NULL, proto, case_cnt,
489                                        &random_reader_casereader_class,
490                                        make_random_reader (shared, 0));
491 }
492
493 /* Reassesses the min_offset in SHARED based on the minimum
494    offset in the heap.   */
495 static void
496 advance_random_reader (struct casereader *reader,
497                        struct random_reader_shared *shared)
498 {
499   casenumber old, new;
500
501   old = shared->min_offset;
502   new = random_reader_from_heap_node (heap_minimum (shared->readers))->offset;
503   assert (new >= old);
504   if (new > old)
505     {
506       shared->min_offset = new;
507       shared->class->advance (reader, shared->aux, new - old);
508     }
509 }
510
511 /* struct casereader_class "read" function for random reader. */
512 static struct ccase *
513 random_reader_read (struct casereader *reader, void *br_)
514 {
515   struct random_reader *br = br_;
516   struct random_reader_shared *shared = br->shared;
517   struct ccase *c = shared->class->read (reader, shared->aux,
518                                          br->offset - shared->min_offset);
519   if (c != NULL)
520     {
521       br->offset++;
522       heap_changed (shared->readers, &br->heap_node);
523       advance_random_reader (reader, shared);
524     }
525   return c;
526 }
527
528 /* struct casereader_class "destroy" function for random
529    reader. */
530 static void
531 random_reader_destroy (struct casereader *reader, void *br_)
532 {
533   struct random_reader *br = br_;
534   struct random_reader_shared *shared = br->shared;
535
536   heap_delete (shared->readers, &br->heap_node);
537   if (heap_is_empty (shared->readers))
538     {
539       heap_destroy (shared->readers);
540       shared->class->destroy (reader, shared->aux);
541       free (shared);
542     }
543   else
544     advance_random_reader (reader, shared);
545
546   free (br);
547 }
548
549 /* struct casereader_class "clone" function for random reader. */
550 static struct casereader *
551 random_reader_clone (struct casereader *reader, void *br_)
552 {
553   struct random_reader *br = br_;
554   struct random_reader_shared *shared = br->shared;
555   return casereader_create_sequential (casereader_get_taint (reader),
556                                        reader->proto,
557                                        casereader_get_case_cnt (reader),
558                                        &random_reader_casereader_class,
559                                        make_random_reader (shared,
560                                                            br->offset));
561 }
562
563 /* struct casereader_class "peek" function for random reader. */
564 static struct ccase *
565 random_reader_peek (struct casereader *reader, void *br_, casenumber idx)
566 {
567   struct random_reader *br = br_;
568   struct random_reader_shared *shared = br->shared;
569
570   return shared->class->read (reader, shared->aux,
571                               br->offset - shared->min_offset + idx);
572 }
573
574 /* Casereader class for random reader. */
575 static const struct casereader_class random_reader_casereader_class =
576   {
577     random_reader_read,
578     random_reader_destroy,
579     random_reader_clone,
580     random_reader_peek,
581   };
582 \f
583 /* Buffering shim for implementing clone and peek operations.
584
585    The "clone" and "peek" operations aren't implemented by all
586    types of casereaders, but we have to expose a uniform
587    interface anyhow.  We do this by interposing a buffering
588    casereader on top of the existing casereader on the first call
589    to "clone" or "peek".  The buffering casereader maintains a
590    window of cases that spans the positions of the original
591    casereader and all of its clones (the "clone set"), from the
592    position of the casereader that has read the fewest cases to
593    the position of the casereader that has read the most.
594
595    Thus, if all of the casereaders in the clone set are at
596    approximately the same position, only a few cases are buffered
597    and there is little inefficiency.  If, on the other hand, one
598    casereader is not used to read any cases at all, but another
599    one is used to read all of the cases, the entire contents of
600    the casereader is copied into the buffer.  This still might
601    not be so inefficient, given that case data in memory is
602    shared across multiple identical copies, but in the worst case
603    the window implementation will write cases to disk instead of
604    maintaining them in-memory. */
605
606 /* A buffering shim for a non-clonable or non-peekable
607    casereader. */
608 struct shim
609   {
610     struct casewindow *window;          /* Window of buffered cases. */
611     struct casereader *subreader;       /* Subordinate casereader. */
612   };
613
614 static const struct casereader_random_class shim_class;
615
616 /* Interposes a buffering shim atop READER. */
617 static void
618 insert_shim (struct casereader *reader)
619 {
620   const struct caseproto *proto = casereader_get_proto (reader);
621   casenumber case_cnt = casereader_get_case_cnt (reader);
622   struct shim *b = xmalloc (sizeof *b);
623   b->window = casewindow_create (proto, settings_get_workspace_cases (proto));
624   b->subreader = casereader_create_random (proto, case_cnt, &shim_class, b);
625   casereader_swap (reader, b->subreader);
626   taint_propagate (casewindow_get_taint (b->window),
627                    casereader_get_taint (reader));
628   taint_propagate (casereader_get_taint (b->subreader),
629                    casereader_get_taint (reader));
630 }
631
632 /* Ensures that B's window contains at least CASE_CNT cases.
633    Return true if successful, false upon reaching the end of B's
634    subreader or an I/O error. */
635 static bool
636 prime_buffer (struct shim *b, casenumber case_cnt)
637 {
638   while (casewindow_get_case_cnt (b->window) < case_cnt)
639     {
640       struct ccase *tmp = casereader_read (b->subreader);
641       if (tmp == NULL)
642         return false;
643       casewindow_push_head (b->window, tmp);
644     }
645   return true;
646 }
647
648 /* Reads the case at the given 0-based OFFSET from the front of
649    the window into C.  Returns the case if successful, or a null
650    pointer if OFFSET is beyond the end of file or upon I/O error.
651    The caller must call case_unref() on the returned case when it
652    is no longer needed. */
653 static struct ccase *
654 shim_read (struct casereader *reader UNUSED, void *b_,
655            casenumber offset)
656 {
657   struct shim *b = b_;
658   if (!prime_buffer (b, offset + 1))
659     return NULL;
660   return casewindow_get_case (b->window, offset);
661 }
662
663 /* Destroys B. */
664 static void
665 shim_destroy (struct casereader *reader UNUSED, void *b_)
666 {
667   struct shim *b = b_;
668   casewindow_destroy (b->window);
669   casereader_destroy (b->subreader);
670   free (b);
671 }
672
673 /* Discards CNT cases from the front of B's window. */
674 static void
675 shim_advance (struct casereader *reader UNUSED, void *b_, casenumber case_cnt)
676 {
677   struct shim *b = b_;
678   casewindow_pop_tail (b->window, case_cnt);
679 }
680
681 /* Class for the buffered reader. */
682 static const struct casereader_random_class shim_class =
683   {
684     shim_read,
685     shim_destroy,
686     shim_advance,
687   };