casereader: Make parameter to casereader_count_cases const.
[pspp-builds.git] / src / data / casereader.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2007, 2009 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <data/casereader.h>
20 #include <data/casereader-provider.h>
21
22 #include <stdlib.h>
23
24 #include <data/casewindow.h>
25 #include <data/casewriter.h>
26 #include <data/settings.h>
27 #include <libpspp/assertion.h>
28 #include <libpspp/heap.h>
29 #include <libpspp/taint.h>
30
31 #include "xalloc.h"
32
33 /* A casereader. */
34 struct casereader
35   {
36     struct taint *taint;                  /* Corrupted? */
37     struct caseproto *proto;              /* Format of contained cases. */
38     casenumber case_cnt;                  /* Number of cases,
39                                              CASENUMBER_MAX if unknown. */
40     const struct casereader_class *class; /* Class. */
41     void *aux;                            /* Auxiliary data for class. */
42   };
43
44 static void insert_shim (struct casereader *);
45
46 /* Reads and returns the next case from READER.  The caller owns
47    the returned case and must call case_unref on it when its data
48    is no longer needed.  Returns a null pointer if cases have
49    been exhausted or upon detection of an I/O error.
50
51    The case returned is effectively consumed: it can never be
52    read again through READER.  If this is inconvenient, READER
53    may be cloned in advance with casereader_clone, or
54    casereader_peek may be used instead. */
55 struct ccase *
56 casereader_read (struct casereader *reader)
57 {
58   if (reader->case_cnt != 0)
59     {
60       /* ->read may use casereader_swap to replace itself by
61          another reader and then delegate to that reader by
62          recursively calling casereader_read.  Currently only
63          lazy_casereader does this and, with luck, nothing else
64          ever will.
65
66          To allow this to work, however, we must decrement
67          case_cnt before calling ->read.  If we decremented
68          case_cnt after calling ->read, then this would actually
69          drop two cases from case_cnt instead of one, and we'd
70          lose the last case in the casereader. */
71       struct ccase *c;
72       if (reader->case_cnt != CASENUMBER_MAX)
73         reader->case_cnt--;
74       c = reader->class->read (reader, reader->aux);
75       if (c != NULL)
76         {
77           size_t n_widths UNUSED = caseproto_get_n_widths (reader->proto);
78           assert (case_get_value_cnt (c) >= n_widths);
79           expensive_assert (caseproto_equal (case_get_proto (c), 0,
80                                              reader->proto, 0, n_widths));
81           return c;
82         }
83     }
84   reader->case_cnt = 0;
85   return NULL;
86 }
87
88 /* Destroys READER.
89    Returns false if an I/O error was detected on READER, true
90    otherwise. */
91 bool
92 casereader_destroy (struct casereader *reader)
93 {
94   bool ok = true;
95   if (reader != NULL)
96     {
97       reader->class->destroy (reader, reader->aux);
98       ok = taint_destroy (reader->taint);
99       caseproto_unref (reader->proto);
100       free (reader);
101     }
102   return ok;
103 }
104
105 /* Returns a clone of READER.  READER and its clone may be used
106    to read the same sequence of cases in the same order, barring
107    I/O errors. */
108 struct casereader *
109 casereader_clone (const struct casereader *reader_)
110 {
111   struct casereader *reader = CONST_CAST (struct casereader *, reader_);
112   struct casereader *clone;
113   if ( reader == NULL ) 
114     return NULL;
115
116   if (reader->class->clone == NULL)
117     insert_shim (reader);
118   clone = reader->class->clone (reader, reader->aux);
119   assert (clone != NULL);
120   assert (clone != reader);
121   return clone;
122 }
123
124 /* Makes a copy of ORIGINAL into *NEW1 (if NEW1 is non-null) and
125    *NEW2 (if NEW2 is non-null), then destroys ORIGINAL. */
126 void
127 casereader_split (struct casereader *original,
128                   struct casereader **new1, struct casereader **new2)
129 {
130   if (new1 != NULL && new2 != NULL)
131     {
132       *new1 = casereader_rename (original);
133       *new2 = casereader_clone (*new1);
134     }
135   else if (new1 != NULL)
136     *new1 = casereader_rename (original);
137   else if (new2 != NULL)
138     *new2 = casereader_rename (original);
139   else
140     casereader_destroy (original);
141 }
142
143 /* Returns a copy of READER, which is itself destroyed.
144    Useful for taking over ownership of a casereader, to enforce
145    preventing the original owner from accessing the casereader
146    again. */
147 struct casereader *
148 casereader_rename (struct casereader *reader)
149 {
150   struct casereader *new = xmemdup (reader, sizeof *reader);
151   free (reader);
152   return new;
153 }
154
155 /* Exchanges the casereaders referred to by A and B. */
156 void
157 casereader_swap (struct casereader *a, struct casereader *b)
158 {
159   if (a != b)
160     {
161       struct casereader tmp = *a;
162       *a = *b;
163       *b = tmp;
164     }
165 }
166
167 /* Reads and returns the (IDX + 1)'th case from READER.  The
168    caller owns the returned case and must call case_unref on it
169    when it is no longer needed.  Returns a null pointer if cases
170    have been exhausted or upon detection of an I/O error. */
171 struct ccase *
172 casereader_peek (struct casereader *reader, casenumber idx)
173 {
174   if (idx < reader->case_cnt)
175     {
176       struct ccase *c;
177       if (reader->class->peek == NULL)
178         insert_shim (reader);
179       c = reader->class->peek (reader, reader->aux, idx);
180       if (c != NULL)
181         return c;
182       else if (casereader_error (reader))
183         reader->case_cnt = 0;
184     }
185   if (reader->case_cnt > idx)
186     reader->case_cnt = idx;
187   return NULL;
188 }
189
190 /* Returns true if no cases remain to be read from READER, or if
191    an error has occurred on READER.  (A return value of false
192    does *not* mean that the next call to casereader_peek or
193    casereader_read will return true, because an error can occur
194    in the meantime.) */
195 bool
196 casereader_is_empty (struct casereader *reader)
197 {
198   if (reader->case_cnt == 0)
199     return true;
200   else
201     {
202       struct ccase *c = casereader_peek (reader, 0);
203       if (c == NULL)
204         return true;
205       else
206         {
207           case_unref (c);
208           return false;
209         }
210     }
211 }
212
213 /* Returns true if an I/O error or another hard error has
214    occurred on READER, a clone of READER, or on some object on
215    which READER's data has a dependency, false otherwise. */
216 bool
217 casereader_error (const struct casereader *reader)
218 {
219   return taint_is_tainted (reader->taint);
220 }
221
222 /* Marks READER as having encountered an error.
223
224    Ordinarily, this function should be called by the
225    implementation of a casereader, not by the casereader's
226    client.  Instead, casereader clients should usually ensure
227    that a casereader's error state is correct by using
228    taint_propagate to propagate to the casereader's taint
229    structure, which may be obtained via casereader_get_taint. */
230 void
231 casereader_force_error (struct casereader *reader)
232 {
233   taint_set_taint (reader->taint);
234 }
235
236 /* Returns READER's associate taint object, for use with
237    taint_propagate and other taint functions. */
238 const struct taint *
239 casereader_get_taint (const struct casereader *reader)
240 {
241   return reader->taint;
242 }
243
244 /* Returns the number of cases that will be read by successive
245    calls to casereader_read for READER, assuming that no errors
246    occur.  Upon an error condition, the case count drops to 0, so
247    that no more cases can be obtained.
248
249    Not all casereaders can predict the number of cases that they
250    will produce without actually reading all of them.  In that
251    case, this function returns CASENUMBER_MAX.  To obtain the
252    actual number of cases in such a casereader, use
253    casereader_count_cases. */
254 casenumber
255 casereader_get_case_cnt (struct casereader *reader)
256 {
257   return reader->case_cnt;
258 }
259
260 static casenumber
261 casereader_count_cases__ (const struct casereader *reader,
262                           casenumber max_cases)
263 {
264   struct casereader *clone;
265   casenumber n_cases;
266
267   clone = casereader_clone (reader);
268   n_cases = casereader_advance (clone, max_cases);
269   casereader_destroy (clone);
270
271   return n_cases;
272 }
273
274 /* Returns the number of cases that will be read by successive
275    calls to casereader_read for READER, assuming that no errors
276    occur.  Upon an error condition, the case count drops to 0, so
277    that no more cases can be obtained.
278
279    For a casereader that cannot predict the number of cases it
280    will produce, this function actually reads (and discards) all
281    of the contents of a clone of READER.  Thus, the return value
282    is always correct in the absence of I/O errors. */
283 casenumber
284 casereader_count_cases (const struct casereader *reader)
285 {
286   if (reader->case_cnt == CASENUMBER_MAX)
287     {
288       struct casereader *reader_rw = CONST_CAST (struct casereader *, reader);
289       reader_rw->case_cnt = casereader_count_cases__ (reader, CASENUMBER_MAX);
290     }
291   return reader->case_cnt;
292 }
293
294 /* Truncates READER to at most N cases. */
295 void
296 casereader_truncate (struct casereader *reader, casenumber n)
297 {
298   /* This could be optimized, if it ever becomes too expensive, by adding a
299      "max_cases" member to struct casereader.  We could also add a "truncate"
300      function to the casereader implementation, to allow the casereader to
301      throw away data that cannot ever be read. */
302   if (reader->case_cnt == CASENUMBER_MAX)
303     reader->case_cnt = casereader_count_cases__ (reader, n);
304   if (reader->case_cnt > n)
305     reader->case_cnt = n;
306 }
307
308 /* Returns the prototype for the cases in READER.  The caller
309    must not unref the returned prototype. */
310 const struct caseproto *
311 casereader_get_proto (const struct casereader *reader)
312 {
313   return reader->proto;
314 }
315
316 /* Skips past N cases in READER, stopping when the last case in
317    READER has been read or on an input error.  Returns the number
318    of cases successfully skipped. */
319 casenumber
320 casereader_advance (struct casereader *reader, casenumber n)
321 {
322   casenumber i;
323
324   for (i = 0; i < n; i++)
325     {
326       struct ccase *c = casereader_read (reader);
327       if (c == NULL)
328         break;
329       case_unref (c);
330     }
331
332   return i;
333 }
334
335
336 /* Copies all the cases in READER to WRITER, propagating errors
337    appropriately. */
338 void
339 casereader_transfer (struct casereader *reader, struct casewriter *writer)
340 {
341   struct ccase *c;
342
343   taint_propagate (casereader_get_taint (reader),
344                    casewriter_get_taint (writer));
345   while ((c = casereader_read (reader)) != NULL)
346     casewriter_write (writer, c);
347   casereader_destroy (reader);
348 }
349
350 /* Creates and returns a new casereader.  This function is
351    intended for use by casereader implementations, not by
352    casereader clients.
353
354    This function is most suited for creating a casereader for a
355    data source that is naturally sequential.
356    casereader_create_random may be more appropriate for a data
357    source that supports random access.
358
359    Ordinarily, specify a null pointer for TAINT, in which case
360    the new casereader will have a new, unique taint object.  If
361    the new casereader should have a clone of an existing taint
362    object, specify that object as TAINT.  (This is most commonly
363    useful in an implementation of the "clone" casereader_class
364    function, in which case the cloned casereader should have the
365    same taint object as the original casereader.)
366
367    PROTO must be the prototype for the cases that may be read
368    from the casereader.  The caller retains its reference to
369    PROTO.
370
371    CASE_CNT is an upper limit on the number of cases that
372    casereader_read will return from the casereader in successive
373    calls.  Ordinarily, this is the actual number of cases in the
374    data source or CASENUMBER_MAX if the number of cases cannot be
375    predicted in advance.
376
377    CLASS and AUX are a set of casereader implementation-specific
378    member functions and auxiliary data to pass to those member
379    functions, respectively. */
380 struct casereader *
381 casereader_create_sequential (const struct taint *taint,
382                               const struct caseproto *proto,
383                               casenumber case_cnt,
384                               const struct casereader_class *class, void *aux)
385 {
386   struct casereader *reader = xmalloc (sizeof *reader);
387   reader->taint = taint != NULL ? taint_clone (taint) : taint_create ();
388   reader->proto = caseproto_ref (proto);
389   reader->case_cnt = case_cnt;
390   reader->class = class;
391   reader->aux = aux;
392   return reader;
393 }
394
395 /* If READER is a casereader of the given CLASS, returns its
396    associated auxiliary data; otherwise, returns a null pointer.
397
398    This function is intended for use from casereader
399    implementations, not by casereader users.  Even within
400    casereader implementations, its usefulness is quite limited,
401    for at least two reasons.  First, every casereader member
402    function already receives a pointer to the casereader's
403    auxiliary data.  Second, a casereader's class can change
404    (through a call to casereader_swap) and this is in practice
405    quite common (e.g. any call to casereader_clone on a
406    casereader that does not directly support clone will cause the
407    casereader to be replaced by a shim caseader). */
408 void *
409 casereader_dynamic_cast (struct casereader *reader,
410                          const struct casereader_class *class)
411 {
412   return reader->class == class ? reader->aux : NULL;
413 }
414 \f
415 /* Random-access casereader implementation.
416
417    This is a set of wrappers around casereader_create_sequential
418    and struct casereader_class to make it easy to create
419    efficient casereaders for data sources that natively support
420    random access. */
421
422 /* One clone of a random reader. */
423 struct random_reader
424   {
425     struct random_reader_shared *shared; /* Data shared among clones. */
426     struct heap_node heap_node; /* Node in shared data's heap of readers. */
427     casenumber offset;          /* Number of cases already read. */
428   };
429
430 /* Returns the random_reader in which the given heap_node is
431    embedded. */
432 static struct random_reader *
433 random_reader_from_heap_node (const struct heap_node *node)
434 {
435   return heap_data (node, struct random_reader, heap_node);
436 }
437
438 /* Data shared among clones of a random reader. */
439 struct random_reader_shared
440   {
441     struct heap *readers;       /* Heap of struct random_readers. */
442     casenumber min_offset;      /* Smallest offset of any random_reader. */
443     const struct casereader_random_class *class;
444     void *aux;
445   };
446
447 static const struct casereader_class random_reader_casereader_class;
448
449 /* Creates and returns a new random_reader with the given SHARED
450    data and OFFSET.  Inserts the new random reader into the
451    shared heap. */
452 static struct random_reader *
453 make_random_reader (struct random_reader_shared *shared, casenumber offset)
454 {
455   struct random_reader *br = xmalloc (sizeof *br);
456   br->offset = offset;
457   br->shared = shared;
458   heap_insert (shared->readers, &br->heap_node);
459   return br;
460 }
461
462 /* Compares random_readers A and B by offset and returns a
463    strcmp()-like result. */
464 static int
465 compare_random_readers_by_offset (const struct heap_node *a_,
466                                   const struct heap_node *b_,
467                                   const void *aux UNUSED)
468 {
469   const struct random_reader *a = random_reader_from_heap_node (a_);
470   const struct random_reader *b = random_reader_from_heap_node (b_);
471   return a->offset < b->offset ? -1 : a->offset > b->offset;
472 }
473
474 /* Creates and returns a new casereader.  This function is
475    intended for use by casereader implementations, not by
476    casereader clients.
477
478    This function is most suited for creating a casereader for a
479    data source that supports random access.
480    casereader_create_sequential is more appropriate for a data
481    source that is naturally sequential.
482
483    PROTO must be the prototype for the cases that may be read
484    from the casereader.  The caller retains its reference to
485    PROTO.
486
487    CASE_CNT is an upper limit on the number of cases that
488    casereader_read will return from the casereader in successive
489    calls.  Ordinarily, this is the actual number of cases in the
490    data source or CASENUMBER_MAX if the number of cases cannot be
491    predicted in advance.
492
493    CLASS and AUX are a set of casereader implementation-specific
494    member functions and auxiliary data to pass to those member
495    functions, respectively. */
496 struct casereader *
497 casereader_create_random (const struct caseproto *proto, casenumber case_cnt,
498                           const struct casereader_random_class *class,
499                           void *aux)
500 {
501   struct random_reader_shared *shared = xmalloc (sizeof *shared);
502   shared->readers = heap_create (compare_random_readers_by_offset, NULL);
503   shared->class = class;
504   shared->aux = aux;
505   shared->min_offset = 0;
506   return casereader_create_sequential (NULL, proto, case_cnt,
507                                        &random_reader_casereader_class,
508                                        make_random_reader (shared, 0));
509 }
510
511 /* Reassesses the min_offset in SHARED based on the minimum
512    offset in the heap.   */
513 static void
514 advance_random_reader (struct casereader *reader,
515                        struct random_reader_shared *shared)
516 {
517   casenumber old, new;
518
519   old = shared->min_offset;
520   new = random_reader_from_heap_node (heap_minimum (shared->readers))->offset;
521   assert (new >= old);
522   if (new > old)
523     {
524       shared->min_offset = new;
525       shared->class->advance (reader, shared->aux, new - old);
526     }
527 }
528
529 /* struct casereader_class "read" function for random reader. */
530 static struct ccase *
531 random_reader_read (struct casereader *reader, void *br_)
532 {
533   struct random_reader *br = br_;
534   struct random_reader_shared *shared = br->shared;
535   struct ccase *c = shared->class->read (reader, shared->aux,
536                                          br->offset - shared->min_offset);
537   if (c != NULL)
538     {
539       br->offset++;
540       heap_changed (shared->readers, &br->heap_node);
541       advance_random_reader (reader, shared);
542     }
543   return c;
544 }
545
546 /* struct casereader_class "destroy" function for random
547    reader. */
548 static void
549 random_reader_destroy (struct casereader *reader, void *br_)
550 {
551   struct random_reader *br = br_;
552   struct random_reader_shared *shared = br->shared;
553
554   heap_delete (shared->readers, &br->heap_node);
555   if (heap_is_empty (shared->readers))
556     {
557       heap_destroy (shared->readers);
558       shared->class->destroy (reader, shared->aux);
559       free (shared);
560     }
561   else
562     advance_random_reader (reader, shared);
563
564   free (br);
565 }
566
567 /* struct casereader_class "clone" function for random reader. */
568 static struct casereader *
569 random_reader_clone (struct casereader *reader, void *br_)
570 {
571   struct random_reader *br = br_;
572   struct random_reader_shared *shared = br->shared;
573   return casereader_create_sequential (casereader_get_taint (reader),
574                                        reader->proto,
575                                        casereader_get_case_cnt (reader),
576                                        &random_reader_casereader_class,
577                                        make_random_reader (shared,
578                                                            br->offset));
579 }
580
581 /* struct casereader_class "peek" function for random reader. */
582 static struct ccase *
583 random_reader_peek (struct casereader *reader, void *br_, casenumber idx)
584 {
585   struct random_reader *br = br_;
586   struct random_reader_shared *shared = br->shared;
587
588   return shared->class->read (reader, shared->aux,
589                               br->offset - shared->min_offset + idx);
590 }
591
592 /* Casereader class for random reader. */
593 static const struct casereader_class random_reader_casereader_class =
594   {
595     random_reader_read,
596     random_reader_destroy,
597     random_reader_clone,
598     random_reader_peek,
599   };
600 \f
601 /* Buffering shim for implementing clone and peek operations.
602
603    The "clone" and "peek" operations aren't implemented by all
604    types of casereaders, but we have to expose a uniform
605    interface anyhow.  We do this by interposing a buffering
606    casereader on top of the existing casereader on the first call
607    to "clone" or "peek".  The buffering casereader maintains a
608    window of cases that spans the positions of the original
609    casereader and all of its clones (the "clone set"), from the
610    position of the casereader that has read the fewest cases to
611    the position of the casereader that has read the most.
612
613    Thus, if all of the casereaders in the clone set are at
614    approximately the same position, only a few cases are buffered
615    and there is little inefficiency.  If, on the other hand, one
616    casereader is not used to read any cases at all, but another
617    one is used to read all of the cases, the entire contents of
618    the casereader is copied into the buffer.  This still might
619    not be so inefficient, given that case data in memory is
620    shared across multiple identical copies, but in the worst case
621    the window implementation will write cases to disk instead of
622    maintaining them in-memory. */
623
624 /* A buffering shim for a non-clonable or non-peekable
625    casereader. */
626 struct shim
627   {
628     struct casewindow *window;          /* Window of buffered cases. */
629     struct casereader *subreader;       /* Subordinate casereader. */
630   };
631
632 static const struct casereader_random_class shim_class;
633
634 /* Interposes a buffering shim atop READER. */
635 static void
636 insert_shim (struct casereader *reader)
637 {
638   const struct caseproto *proto = casereader_get_proto (reader);
639   casenumber case_cnt = casereader_get_case_cnt (reader);
640   struct shim *b = xmalloc (sizeof *b);
641   b->window = casewindow_create (proto, settings_get_workspace_cases (proto));
642   b->subreader = casereader_create_random (proto, case_cnt, &shim_class, b);
643   casereader_swap (reader, b->subreader);
644   taint_propagate (casewindow_get_taint (b->window),
645                    casereader_get_taint (reader));
646   taint_propagate (casereader_get_taint (b->subreader),
647                    casereader_get_taint (reader));
648 }
649
650 /* Ensures that B's window contains at least CASE_CNT cases.
651    Return true if successful, false upon reaching the end of B's
652    subreader or an I/O error. */
653 static bool
654 prime_buffer (struct shim *b, casenumber case_cnt)
655 {
656   while (casewindow_get_case_cnt (b->window) < case_cnt)
657     {
658       struct ccase *tmp = casereader_read (b->subreader);
659       if (tmp == NULL)
660         return false;
661       casewindow_push_head (b->window, tmp);
662     }
663   return true;
664 }
665
666 /* Reads the case at the given 0-based OFFSET from the front of
667    the window into C.  Returns the case if successful, or a null
668    pointer if OFFSET is beyond the end of file or upon I/O error.
669    The caller must call case_unref() on the returned case when it
670    is no longer needed. */
671 static struct ccase *
672 shim_read (struct casereader *reader UNUSED, void *b_,
673            casenumber offset)
674 {
675   struct shim *b = b_;
676   if (!prime_buffer (b, offset + 1))
677     return NULL;
678   return casewindow_get_case (b->window, offset);
679 }
680
681 /* Destroys B. */
682 static void
683 shim_destroy (struct casereader *reader UNUSED, void *b_)
684 {
685   struct shim *b = b_;
686   casewindow_destroy (b->window);
687   casereader_destroy (b->subreader);
688   free (b);
689 }
690
691 /* Discards CNT cases from the front of B's window. */
692 static void
693 shim_advance (struct casereader *reader UNUSED, void *b_, casenumber case_cnt)
694 {
695   struct shim *b = b_;
696   casewindow_pop_tail (b->window, case_cnt);
697 }
698
699 /* Class for the buffered reader. */
700 static const struct casereader_random_class shim_class =
701   {
702     shim_read,
703     shim_destroy,
704     shim_advance,
705   };
706 \f
707 static const struct casereader_class casereader_null_class;
708
709 /* Returns a casereader with no cases.  The casereader has the prototype
710    specified by PROTO.  PROTO may be specified as a null pointer, in which case
711    the casereader has no variables. */
712 struct casereader *
713 casereader_create_empty (const struct caseproto *proto_)
714 {
715   struct casereader *reader;
716   struct caseproto *proto;
717
718   proto = proto_ != NULL ? caseproto_ref (proto_) : caseproto_create ();
719   reader = casereader_create_sequential (NULL, proto, 0,
720                                          &casereader_null_class, NULL);
721   caseproto_unref (proto);
722
723   return reader;
724 }
725
726 static struct ccase *
727 casereader_null_read (struct casereader *reader UNUSED, void *aux UNUSED)
728 {
729   return NULL;
730 }
731
732 static void
733 casereader_null_destroy (struct casereader *reader UNUSED, void *aux UNUSED)
734 {
735   /* Nothing to do. */
736 }
737
738 static const struct casereader_class casereader_null_class =
739   {
740     casereader_null_read,
741     casereader_null_destroy,
742     NULL,                       /* clone */
743     NULL,                       /* peek */
744   };