Merge 'psppsheet' into 'master'.
[pspp] / src / data / casereader.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2007, 2009, 2010, 2013 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include "data/casereader.h"
20 #include "data/casereader-provider.h"
21
22 #include <stdlib.h>
23
24 #include "data/casereader-shim.h"
25 #include "data/casewriter.h"
26 #include "libpspp/assertion.h"
27 #include "libpspp/heap.h"
28 #include "libpspp/taint.h"
29
30 #include "gl/xalloc.h"
31
32 /* A casereader. */
33 struct casereader
34   {
35     struct taint *taint;                  /* Corrupted? */
36     struct caseproto *proto;              /* Format of contained cases. */
37     casenumber case_cnt;                  /* Number of cases,
38                                              CASENUMBER_MAX if unknown. */
39     const struct casereader_class *class; /* Class. */
40     void *aux;                            /* Auxiliary data for class. */
41   };
42
43 /* Reads and returns the next case from READER.  The caller owns
44    the returned case and must call case_unref on it when its data
45    is no longer needed.  Returns a null pointer if cases have
46    been exhausted or upon detection of an I/O error.
47
48    The case returned is effectively consumed: it can never be
49    read again through READER.  If this is inconvenient, READER
50    may be cloned in advance with casereader_clone, or
51    casereader_peek may be used instead. */
52 struct ccase *
53 casereader_read (struct casereader *reader)
54 {
55   if (reader->case_cnt != 0)
56     {
57       /* ->read may use casereader_swap to replace itself by
58          another reader and then delegate to that reader by
59          recursively calling casereader_read.  Currently only
60          lazy_casereader does this and, with luck, nothing else
61          ever will.
62
63          To allow this to work, however, we must decrement
64          case_cnt before calling ->read.  If we decremented
65          case_cnt after calling ->read, then this would actually
66          drop two cases from case_cnt instead of one, and we'd
67          lose the last case in the casereader. */
68       struct ccase *c;
69       if (reader->case_cnt != CASENUMBER_MAX)
70         reader->case_cnt--;
71       c = reader->class->read (reader, reader->aux);
72       if (c != NULL)
73         {
74           size_t n_widths UNUSED = caseproto_get_n_widths (reader->proto);
75           assert (case_get_value_cnt (c) >= n_widths);
76           expensive_assert (caseproto_equal (case_get_proto (c), 0,
77                                              reader->proto, 0, n_widths));
78           return c;
79         }
80     }
81   reader->case_cnt = 0;
82   return NULL;
83 }
84
85 /* Destroys READER.
86    Returns false if an I/O error was detected on READER, true
87    otherwise. */
88 bool
89 casereader_destroy (struct casereader *reader)
90 {
91   bool ok = true;
92   if (reader != NULL)
93     {
94       reader->class->destroy (reader, reader->aux);
95       ok = taint_destroy (reader->taint);
96       caseproto_unref (reader->proto);
97       free (reader);
98     }
99   return ok;
100 }
101
102 /* Returns a clone of READER.  READER and its clone may be used
103    to read the same sequence of cases in the same order, barring
104    I/O errors. */
105 struct casereader *
106 casereader_clone (const struct casereader *reader_)
107 {
108   struct casereader *reader = CONST_CAST (struct casereader *, reader_);
109   struct casereader *clone;
110   if ( reader == NULL ) 
111     return NULL;
112
113   if (reader->class->clone == NULL)
114     casereader_shim_insert (reader);
115   clone = reader->class->clone (reader, reader->aux);
116   assert (clone != NULL);
117   assert (clone != reader);
118   return clone;
119 }
120
121 /* Returns a copy of READER, which is itself destroyed.
122    Useful for taking over ownership of a casereader, to enforce
123    preventing the original owner from accessing the casereader
124    again. */
125 struct casereader *
126 casereader_rename (struct casereader *reader)
127 {
128   struct casereader *new = xmemdup (reader, sizeof *reader);
129   free (reader);
130   return new;
131 }
132
133 /* Exchanges the casereaders referred to by A and B. */
134 void
135 casereader_swap (struct casereader *a, struct casereader *b)
136 {
137   if (a != b)
138     {
139       struct casereader tmp = *a;
140       *a = *b;
141       *b = tmp;
142     }
143 }
144
145 /* Reads and returns the (IDX + 1)'th case from READER.  The
146    caller owns the returned case and must call case_unref on it
147    when it is no longer needed.  Returns a null pointer if cases
148    have been exhausted or upon detection of an I/O error. */
149 struct ccase *
150 casereader_peek (struct casereader *reader, casenumber idx)
151 {
152   if (idx < reader->case_cnt)
153     {
154       struct ccase *c;
155       if (reader->class->peek == NULL)
156         casereader_shim_insert (reader);
157       c = reader->class->peek (reader, reader->aux, idx);
158       if (c != NULL)
159         return c;
160       else if (casereader_error (reader))
161         reader->case_cnt = 0;
162     }
163   if (reader->case_cnt > idx)
164     reader->case_cnt = idx;
165   return NULL;
166 }
167
168 /* Returns true if no cases remain to be read from READER, or if
169    an error has occurred on READER.  (A return value of false
170    does *not* mean that the next call to casereader_peek or
171    casereader_read will return true, because an error can occur
172    in the meantime.) */
173 bool
174 casereader_is_empty (struct casereader *reader)
175 {
176   if (reader->case_cnt == 0)
177     return true;
178   else
179     {
180       struct ccase *c = casereader_peek (reader, 0);
181       if (c == NULL)
182         return true;
183       else
184         {
185           case_unref (c);
186           return false;
187         }
188     }
189 }
190
191 /* Returns true if an I/O error or another hard error has
192    occurred on READER, a clone of READER, or on some object on
193    which READER's data has a dependency, false otherwise. */
194 bool
195 casereader_error (const struct casereader *reader)
196 {
197   return taint_is_tainted (reader->taint);
198 }
199
200 /* Marks READER as having encountered an error.
201
202    Ordinarily, this function should be called by the
203    implementation of a casereader, not by the casereader's
204    client.  Instead, casereader clients should usually ensure
205    that a casereader's error state is correct by using
206    taint_propagate to propagate to the casereader's taint
207    structure, which may be obtained via casereader_get_taint. */
208 void
209 casereader_force_error (struct casereader *reader)
210 {
211   taint_set_taint (reader->taint);
212 }
213
214 /* Returns READER's associate taint object, for use with
215    taint_propagate and other taint functions. */
216 const struct taint *
217 casereader_get_taint (const struct casereader *reader)
218 {
219   return reader->taint;
220 }
221
222 /* Returns the number of cases that will be read by successive
223    calls to casereader_read for READER, assuming that no errors
224    occur.  Upon an error condition, the case count drops to 0, so
225    that no more cases can be obtained.
226
227    Not all casereaders can predict the number of cases that they
228    will produce without actually reading all of them.  In that
229    case, this function returns CASENUMBER_MAX.  To obtain the
230    actual number of cases in such a casereader, use
231    casereader_count_cases. */
232 casenumber
233 casereader_get_case_cnt (struct casereader *reader)
234 {
235   return reader->case_cnt;
236 }
237
238 static casenumber
239 casereader_count_cases__ (const struct casereader *reader,
240                           casenumber max_cases)
241 {
242   struct casereader *clone;
243   casenumber n_cases;
244
245   /* This seems to avoid a bug in Gcc 4.4.5 where, upon
246      return from this function, the stack appeared corrupt,
247      and the program returned to the wrong address.  Oddly
248      the problem only manifested itself when used in conjunction
249      with the ODS reader, in code such as:
250
251      GET DATA /TYPE=ODS ....
252      LIST.
253   */
254 #if (__GNUC__ == 4 ) && (__GNUC_MINOR__ == 4)
255   volatile int x = 1; x++;
256 #endif
257
258   clone = casereader_clone (reader);
259   n_cases = casereader_advance (clone, max_cases);
260   casereader_destroy (clone);
261
262   return n_cases;
263 }
264
265 /* Returns the number of cases that will be read by successive
266    calls to casereader_read for READER, assuming that no errors
267    occur.  Upon an error condition, the case count drops to 0, so
268    that no more cases can be obtained.
269
270    For a casereader that cannot predict the number of cases it
271    will produce, this function actually reads (and discards) all
272    of the contents of a clone of READER.  Thus, the return value
273    is always correct in the absence of I/O errors. */
274 casenumber
275 casereader_count_cases (const struct casereader *reader)
276 {
277   if (reader->case_cnt == CASENUMBER_MAX)
278     {
279       struct casereader *reader_rw = CONST_CAST (struct casereader *, reader);
280       reader_rw->case_cnt = casereader_count_cases__ (reader, CASENUMBER_MAX);
281     }
282   return reader->case_cnt;
283 }
284
285 /* Truncates READER to at most N cases. */
286 void
287 casereader_truncate (struct casereader *reader, casenumber n)
288 {
289   /* This could be optimized, if it ever becomes too expensive, by adding a
290      "max_cases" member to struct casereader.  We could also add a "truncate"
291      function to the casereader implementation, to allow the casereader to
292      throw away data that cannot ever be read. */
293   if (reader->case_cnt == CASENUMBER_MAX)
294     reader->case_cnt = casereader_count_cases__ (reader, n);
295   if (reader->case_cnt > n)
296     reader->case_cnt = n;
297 }
298
299 /* Returns the prototype for the cases in READER.  The caller
300    must not unref the returned prototype. */
301 const struct caseproto *
302 casereader_get_proto (const struct casereader *reader)
303 {
304   return reader->proto;
305 }
306
307 /* Skips past N cases in READER, stopping when the last case in
308    READER has been read or on an input error.  Returns the number
309    of cases successfully skipped. */
310 casenumber
311 casereader_advance (struct casereader *reader, casenumber n)
312 {
313   casenumber i;
314
315   for (i = 0; i < n; i++)
316     {
317       struct ccase *c = casereader_read (reader);
318       if (c == NULL)
319         break;
320       case_unref (c);
321     }
322
323   return i;
324 }
325
326
327 /* Copies all the cases in READER to WRITER, propagating errors
328    appropriately. READER is destroyed by this function */
329 void
330 casereader_transfer (struct casereader *reader, struct casewriter *writer)
331 {
332   struct ccase *c;
333
334   taint_propagate (casereader_get_taint (reader),
335                    casewriter_get_taint (writer));
336   while ((c = casereader_read (reader)) != NULL)
337     casewriter_write (writer, c);
338   casereader_destroy (reader);
339 }
340
341 /* Creates and returns a new casereader.  This function is
342    intended for use by casereader implementations, not by
343    casereader clients.
344
345    This function is most suited for creating a casereader for a
346    data source that is naturally sequential.
347    casereader_create_random may be more appropriate for a data
348    source that supports random access.
349
350    Ordinarily, specify a null pointer for TAINT, in which case
351    the new casereader will have a new, unique taint object.  If
352    the new casereader should have a clone of an existing taint
353    object, specify that object as TAINT.  (This is most commonly
354    useful in an implementation of the "clone" casereader_class
355    function, in which case the cloned casereader should have the
356    same taint object as the original casereader.)
357
358    PROTO must be the prototype for the cases that may be read
359    from the casereader.  The caller retains its reference to
360    PROTO.
361
362    CASE_CNT is an upper limit on the number of cases that
363    casereader_read will return from the casereader in successive
364    calls.  Ordinarily, this is the actual number of cases in the
365    data source or CASENUMBER_MAX if the number of cases cannot be
366    predicted in advance.
367
368    CLASS and AUX are a set of casereader implementation-specific
369    member functions and auxiliary data to pass to those member
370    functions, respectively. */
371 struct casereader *
372 casereader_create_sequential (const struct taint *taint,
373                               const struct caseproto *proto,
374                               casenumber case_cnt,
375                               const struct casereader_class *class, void *aux)
376 {
377   struct casereader *reader = xmalloc (sizeof *reader);
378   reader->taint = taint != NULL ? taint_clone (taint) : taint_create ();
379   reader->proto = caseproto_ref (proto);
380   reader->case_cnt = case_cnt;
381   reader->class = class;
382   reader->aux = aux;
383   return reader;
384 }
385
386 /* If READER is a casereader of the given CLASS, returns its
387    associated auxiliary data; otherwise, returns a null pointer.
388
389    This function is intended for use from casereader
390    implementations, not by casereader users.  Even within
391    casereader implementations, its usefulness is quite limited,
392    for at least two reasons.  First, every casereader member
393    function already receives a pointer to the casereader's
394    auxiliary data.  Second, a casereader's class can change
395    (through a call to casereader_swap) and this is in practice
396    quite common (e.g. any call to casereader_clone on a
397    casereader that does not directly support clone will cause the
398    casereader to be replaced by a shim caseader). */
399 void *
400 casereader_dynamic_cast (struct casereader *reader,
401                          const struct casereader_class *class)
402 {
403   return reader->class == class ? reader->aux : NULL;
404 }
405 \f
406 /* Random-access casereader implementation.
407
408    This is a set of wrappers around casereader_create_sequential
409    and struct casereader_class to make it easy to create
410    efficient casereaders for data sources that natively support
411    random access. */
412
413 /* One clone of a random reader. */
414 struct random_reader
415   {
416     struct random_reader_shared *shared; /* Data shared among clones. */
417     struct heap_node heap_node; /* Node in shared data's heap of readers. */
418     casenumber offset;          /* Number of cases already read. */
419   };
420
421 /* Returns the random_reader in which the given heap_node is
422    embedded. */
423 static struct random_reader *
424 random_reader_from_heap_node (const struct heap_node *node)
425 {
426   return heap_data (node, struct random_reader, heap_node);
427 }
428
429 /* Data shared among clones of a random reader. */
430 struct random_reader_shared
431   {
432     struct heap *readers;       /* Heap of struct random_readers. */
433     casenumber min_offset;      /* Smallest offset of any random_reader. */
434     const struct casereader_random_class *class;
435     void *aux;
436   };
437
438 static const struct casereader_class random_reader_casereader_class;
439
440 /* Creates and returns a new random_reader with the given SHARED
441    data and OFFSET.  Inserts the new random reader into the
442    shared heap. */
443 static struct random_reader *
444 make_random_reader (struct random_reader_shared *shared, casenumber offset)
445 {
446   struct random_reader *br = xmalloc (sizeof *br);
447   br->offset = offset;
448   br->shared = shared;
449   heap_insert (shared->readers, &br->heap_node);
450   return br;
451 }
452
453 /* Compares random_readers A and B by offset and returns a
454    strcmp()-like result. */
455 static int
456 compare_random_readers_by_offset (const struct heap_node *a_,
457                                   const struct heap_node *b_,
458                                   const void *aux UNUSED)
459 {
460   const struct random_reader *a = random_reader_from_heap_node (a_);
461   const struct random_reader *b = random_reader_from_heap_node (b_);
462   return a->offset < b->offset ? -1 : a->offset > b->offset;
463 }
464
465 /* Creates and returns a new casereader.  This function is
466    intended for use by casereader implementations, not by
467    casereader clients.
468
469    This function is most suited for creating a casereader for a
470    data source that supports random access.
471    casereader_create_sequential is more appropriate for a data
472    source that is naturally sequential.
473
474    PROTO must be the prototype for the cases that may be read
475    from the casereader.  The caller retains its reference to
476    PROTO.
477
478    CASE_CNT is an upper limit on the number of cases that
479    casereader_read will return from the casereader in successive
480    calls.  Ordinarily, this is the actual number of cases in the
481    data source or CASENUMBER_MAX if the number of cases cannot be
482    predicted in advance.
483
484    CLASS and AUX are a set of casereader implementation-specific
485    member functions and auxiliary data to pass to those member
486    functions, respectively. */
487 struct casereader *
488 casereader_create_random (const struct caseproto *proto, casenumber case_cnt,
489                           const struct casereader_random_class *class,
490                           void *aux)
491 {
492   struct random_reader_shared *shared = xmalloc (sizeof *shared);
493   shared->readers = heap_create (compare_random_readers_by_offset, NULL);
494   shared->class = class;
495   shared->aux = aux;
496   shared->min_offset = 0;
497   return casereader_create_sequential (NULL, proto, case_cnt,
498                                        &random_reader_casereader_class,
499                                        make_random_reader (shared, 0));
500 }
501
502 /* Reassesses the min_offset in SHARED based on the minimum
503    offset in the heap.   */
504 static void
505 advance_random_reader (struct casereader *reader,
506                        struct random_reader_shared *shared)
507 {
508   casenumber old, new;
509
510   old = shared->min_offset;
511   new = random_reader_from_heap_node (heap_minimum (shared->readers))->offset;
512   assert (new >= old);
513   if (new > old)
514     {
515       shared->min_offset = new;
516       shared->class->advance (reader, shared->aux, new - old);
517     }
518 }
519
520 /* struct casereader_class "read" function for random reader. */
521 static struct ccase *
522 random_reader_read (struct casereader *reader, void *br_)
523 {
524   struct random_reader *br = br_;
525   struct random_reader_shared *shared = br->shared;
526   struct ccase *c = shared->class->read (reader, shared->aux,
527                                          br->offset - shared->min_offset);
528   if (c != NULL)
529     {
530       br->offset++;
531       heap_changed (shared->readers, &br->heap_node);
532       advance_random_reader (reader, shared);
533     }
534   return c;
535 }
536
537 /* struct casereader_class "destroy" function for random
538    reader. */
539 static void
540 random_reader_destroy (struct casereader *reader, void *br_)
541 {
542   struct random_reader *br = br_;
543   struct random_reader_shared *shared = br->shared;
544
545   heap_delete (shared->readers, &br->heap_node);
546   if (heap_is_empty (shared->readers))
547     {
548       heap_destroy (shared->readers);
549       shared->class->destroy (reader, shared->aux);
550       free (shared);
551     }
552   else
553     advance_random_reader (reader, shared);
554
555   free (br);
556 }
557
558 /* struct casereader_class "clone" function for random reader. */
559 static struct casereader *
560 random_reader_clone (struct casereader *reader, void *br_)
561 {
562   struct random_reader *br = br_;
563   struct random_reader_shared *shared = br->shared;
564   return casereader_create_sequential (casereader_get_taint (reader),
565                                        reader->proto,
566                                        casereader_get_case_cnt (reader),
567                                        &random_reader_casereader_class,
568                                        make_random_reader (shared,
569                                                            br->offset));
570 }
571
572 /* struct casereader_class "peek" function for random reader. */
573 static struct ccase *
574 random_reader_peek (struct casereader *reader, void *br_, casenumber idx)
575 {
576   struct random_reader *br = br_;
577   struct random_reader_shared *shared = br->shared;
578
579   return shared->class->read (reader, shared->aux,
580                               br->offset - shared->min_offset + idx);
581 }
582
583 /* Casereader class for random reader. */
584 static const struct casereader_class random_reader_casereader_class =
585   {
586     random_reader_read,
587     random_reader_destroy,
588     random_reader_clone,
589     random_reader_peek,
590   };
591 \f
592 \f
593 static const struct casereader_class casereader_null_class;
594
595 /* Returns a casereader with no cases.  The casereader has the prototype
596    specified by PROTO.  PROTO may be specified as a null pointer, in which case
597    the casereader has no variables. */
598 struct casereader *
599 casereader_create_empty (const struct caseproto *proto_)
600 {
601   struct casereader *reader;
602   struct caseproto *proto;
603
604   proto = proto_ != NULL ? caseproto_ref (proto_) : caseproto_create ();
605   reader = casereader_create_sequential (NULL, proto, 0,
606                                          &casereader_null_class, NULL);
607   caseproto_unref (proto);
608
609   return reader;
610 }
611
612 static struct ccase *
613 casereader_null_read (struct casereader *reader UNUSED, void *aux UNUSED)
614 {
615   return NULL;
616 }
617
618 static void
619 casereader_null_destroy (struct casereader *reader UNUSED, void *aux UNUSED)
620 {
621   /* Nothing to do. */
622 }
623
624 static const struct casereader_class casereader_null_class =
625   {
626     casereader_null_read,
627     casereader_null_destroy,
628     NULL,                       /* clone */
629     NULL,                       /* peek */
630   };