Merge remote branch 'origin/master' into import-gui
[pspp] / src / data / casereader.c
1 /* pspp - a program for statistical analysis.
2    Copyright (C) 2007, 2009, 2010, 2013 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include "data/casereader.h"
20 #include "data/casereader-provider.h"
21
22 #include <stdlib.h>
23
24 #include "data/casereader-shim.h"
25 #include "data/casewriter.h"
26 #include "libpspp/assertion.h"
27 #include "libpspp/heap.h"
28 #include "libpspp/taint.h"
29
30 #include "gl/xalloc.h"
31
32 /* A casereader. */
33 struct casereader
34   {
35     struct taint *taint;                  /* Corrupted? */
36     struct caseproto *proto;              /* Format of contained cases. */
37     casenumber case_cnt;                  /* Number of cases,
38                                              CASENUMBER_MAX if unknown. */
39     const struct casereader_class *class; /* Class. */
40     void *aux;                            /* Auxiliary data for class. */
41   };
42
43 /* Reads and returns the next case from READER.  The caller owns
44    the returned case and must call case_unref on it when its data
45    is no longer needed.  Returns a null pointer if cases have
46    been exhausted or upon detection of an I/O error.
47
48    The case returned is effectively consumed: it can never be
49    read again through READER.  If this is inconvenient, READER
50    may be cloned in advance with casereader_clone, or
51    casereader_peek may be used instead. */
52 struct ccase *
53 casereader_read (struct casereader *reader)
54 {
55   if (reader->case_cnt != 0)
56     {
57       /* ->read may use casereader_swap to replace itself by
58          another reader and then delegate to that reader by
59          recursively calling casereader_read.  Currently only
60          lazy_casereader does this and, with luck, nothing else
61          ever will.
62
63          To allow this to work, however, we must decrement
64          case_cnt before calling ->read.  If we decremented
65          case_cnt after calling ->read, then this would actually
66          drop two cases from case_cnt instead of one, and we'd
67          lose the last case in the casereader. */
68       struct ccase *c;
69       if (reader->case_cnt != CASENUMBER_MAX)
70         reader->case_cnt--;
71       c = reader->class->read (reader, reader->aux);
72       if (c != NULL)
73         {
74           size_t n_widths UNUSED = caseproto_get_n_widths (reader->proto);
75           assert (case_get_value_cnt (c) >= n_widths);
76           expensive_assert (caseproto_equal (case_get_proto (c), 0,
77                                              reader->proto, 0, n_widths));
78           return c;
79         }
80     }
81   reader->case_cnt = 0;
82   return NULL;
83 }
84
85 /* Destroys READER.
86    Returns false if an I/O error was detected on READER, true
87    otherwise. */
88 bool
89 casereader_destroy (struct casereader *reader)
90 {
91   bool ok = true;
92   if (reader != NULL)
93     {
94       reader->class->destroy (reader, reader->aux);
95       ok = taint_destroy (reader->taint);
96       caseproto_unref (reader->proto);
97       free (reader);
98     }
99   return ok;
100 }
101
102 /* Returns a clone of READER.  READER and its clone may be used
103    to read the same sequence of cases in the same order, barring
104    I/O errors. */
105 struct casereader *
106 casereader_clone (const struct casereader *reader_)
107 {
108   struct casereader *reader = CONST_CAST (struct casereader *, reader_);
109   struct casereader *clone;
110   if ( reader == NULL ) 
111     return NULL;
112
113   if (reader->class->clone == NULL)
114     casereader_shim_insert (reader);
115   clone = reader->class->clone (reader, reader->aux);
116   assert (clone != NULL);
117   assert (clone != reader);
118   return clone;
119 }
120
121 /* Returns a copy of READER, which is itself destroyed.
122    Useful for taking over ownership of a casereader, to enforce
123    preventing the original owner from accessing the casereader
124    again. */
125 struct casereader *
126 casereader_rename (struct casereader *reader)
127 {
128   struct casereader *new = xmemdup (reader, sizeof *reader);
129   free (reader);
130   return new;
131 }
132
133 /* Exchanges the casereaders referred to by A and B. */
134 void
135 casereader_swap (struct casereader *a, struct casereader *b)
136 {
137   if (a != b)
138     {
139       struct casereader tmp = *a;
140       *a = *b;
141       *b = tmp;
142     }
143 }
144
145 /* Reads and returns the (IDX + 1)'th case from READER.  The
146    caller owns the returned case and must call case_unref on it
147    when it is no longer needed.  Returns a null pointer if cases
148    have been exhausted or upon detection of an I/O error. */
149 struct ccase *
150 casereader_peek (struct casereader *reader, casenumber idx)
151 {
152   if (idx < reader->case_cnt)
153     {
154       struct ccase *c;
155       if (reader->class->peek == NULL)
156         casereader_shim_insert (reader);
157       c = reader->class->peek (reader, reader->aux, idx);
158       if (c != NULL)
159         return c;
160       else if (casereader_error (reader))
161         reader->case_cnt = 0;
162     }
163   if (reader->case_cnt > idx)
164     reader->case_cnt = idx;
165   return NULL;
166 }
167
168 /* Returns true if no cases remain to be read from READER, or if
169    an error has occurred on READER.  (A return value of false
170    does *not* mean that the next call to casereader_peek or
171    casereader_read will return true, because an error can occur
172    in the meantime.) */
173 bool
174 casereader_is_empty (struct casereader *reader)
175 {
176   if (reader->case_cnt == 0)
177     return true;
178   else
179     {
180       struct ccase *c = casereader_peek (reader, 0);
181       if (c == NULL)
182         return true;
183       else
184         {
185           case_unref (c);
186           return false;
187         }
188     }
189 }
190
191 /* Returns true if an I/O error or another hard error has
192    occurred on READER, a clone of READER, or on some object on
193    which READER's data has a dependency, false otherwise. */
194 bool
195 casereader_error (const struct casereader *reader)
196 {
197   return taint_is_tainted (reader->taint);
198 }
199
200 /* Marks READER as having encountered an error.
201
202    Ordinarily, this function should be called by the
203    implementation of a casereader, not by the casereader's
204    client.  Instead, casereader clients should usually ensure
205    that a casereader's error state is correct by using
206    taint_propagate to propagate to the casereader's taint
207    structure, which may be obtained via casereader_get_taint. */
208 void
209 casereader_force_error (struct casereader *reader)
210 {
211   taint_set_taint (reader->taint);
212 }
213
214 /* Returns READER's associate taint object, for use with
215    taint_propagate and other taint functions. */
216 const struct taint *
217 casereader_get_taint (const struct casereader *reader)
218 {
219   return reader->taint;
220 }
221
222 /* Returns the number of cases that will be read by successive
223    calls to casereader_read for READER, assuming that no errors
224    occur.  Upon an error condition, the case count drops to 0, so
225    that no more cases can be obtained.
226
227    Not all casereaders can predict the number of cases that they
228    will produce without actually reading all of them.  In that
229    case, this function returns CASENUMBER_MAX.  To obtain the
230    actual number of cases in such a casereader, use
231    casereader_count_cases. */
232 casenumber
233 casereader_get_case_cnt (struct casereader *reader)
234 {
235   return reader->case_cnt;
236 }
237
238 static casenumber
239 casereader_count_cases__ (const struct casereader *reader,
240                           casenumber max_cases)
241 {
242   struct casereader *clone;
243   casenumber n_cases;
244
245   /* This seems to avoid a bug in Gcc 4.4.5 where, upon
246      return from this function, the stack appeared corrupt,
247      and the program returned to the wrong address.  Oddly
248      the problem only manifested itself when used in conjunction
249      with the ODS reader, in code such as:
250
251      GET DATA /TYPE=ODS ....
252      LIST.
253   */
254 #if (__GNUC__ == 4 ) && (__GNUC_MINOR__ == 4)
255   volatile int x = 1; x++;
256 #endif
257
258   clone = casereader_clone (reader);
259   n_cases = casereader_advance (clone, max_cases);
260   casereader_destroy (clone);
261   return n_cases;
262 }
263
264 /* Returns the number of cases that will be read by successive
265    calls to casereader_read for READER, assuming that no errors
266    occur.  Upon an error condition, the case count drops to 0, so
267    that no more cases can be obtained.
268
269    For a casereader that cannot predict the number of cases it
270    will produce, this function actually reads (and discards) all
271    of the contents of a clone of READER.  Thus, the return value
272    is always correct in the absence of I/O errors. */
273 casenumber
274 casereader_count_cases (const struct casereader *reader)
275 {
276   if (reader->case_cnt == CASENUMBER_MAX)
277     {
278       struct casereader *reader_rw = CONST_CAST (struct casereader *, reader);
279       reader_rw->case_cnt = casereader_count_cases__ (reader, CASENUMBER_MAX);
280     }
281   return reader->case_cnt;
282 }
283
284 /* Truncates READER to at most N cases. */
285 void
286 casereader_truncate (struct casereader *reader, casenumber n)
287 {
288   /* This could be optimized, if it ever becomes too expensive, by adding a
289      "max_cases" member to struct casereader.  We could also add a "truncate"
290      function to the casereader implementation, to allow the casereader to
291      throw away data that cannot ever be read. */
292   if (reader->case_cnt == CASENUMBER_MAX)
293     reader->case_cnt = casereader_count_cases__ (reader, n);
294   if (reader->case_cnt > n)
295     reader->case_cnt = n;
296 }
297
298 /* Returns the prototype for the cases in READER.  The caller
299    must not unref the returned prototype. */
300 const struct caseproto *
301 casereader_get_proto (const struct casereader *reader)
302 {
303   return reader->proto;
304 }
305
306 /* Skips past N cases in READER, stopping when the last case in
307    READER has been read or on an input error.  Returns the number
308    of cases successfully skipped. */
309 casenumber
310 casereader_advance (struct casereader *reader, casenumber n)
311 {
312   casenumber i;
313
314   for (i = 0; i < n; i++)
315     {
316       struct ccase *c = casereader_read (reader);
317       if (c == NULL)
318         break;
319       case_unref (c);
320     }
321
322   return i;
323 }
324
325
326 /* Copies all the cases in READER to WRITER, propagating errors
327    appropriately. READER is destroyed by this function */
328 void
329 casereader_transfer (struct casereader *reader, struct casewriter *writer)
330 {
331   struct ccase *c;
332
333   taint_propagate (casereader_get_taint (reader),
334                    casewriter_get_taint (writer));
335   while ((c = casereader_read (reader)) != NULL)
336     casewriter_write (writer, c);
337   casereader_destroy (reader);
338 }
339
340 /* Creates and returns a new casereader.  This function is
341    intended for use by casereader implementations, not by
342    casereader clients.
343
344    This function is most suited for creating a casereader for a
345    data source that is naturally sequential.
346    casereader_create_random may be more appropriate for a data
347    source that supports random access.
348
349    Ordinarily, specify a null pointer for TAINT, in which case
350    the new casereader will have a new, unique taint object.  If
351    the new casereader should have a clone of an existing taint
352    object, specify that object as TAINT.  (This is most commonly
353    useful in an implementation of the "clone" casereader_class
354    function, in which case the cloned casereader should have the
355    same taint object as the original casereader.)
356
357    PROTO must be the prototype for the cases that may be read
358    from the casereader.  The caller retains its reference to
359    PROTO.
360
361    CASE_CNT is an upper limit on the number of cases that
362    casereader_read will return from the casereader in successive
363    calls.  Ordinarily, this is the actual number of cases in the
364    data source or CASENUMBER_MAX if the number of cases cannot be
365    predicted in advance.
366
367    CLASS and AUX are a set of casereader implementation-specific
368    member functions and auxiliary data to pass to those member
369    functions, respectively. */
370 struct casereader *
371 casereader_create_sequential (const struct taint *taint,
372                               const struct caseproto *proto,
373                               casenumber case_cnt,
374                               const struct casereader_class *class, void *aux)
375 {
376   struct casereader *reader = xmalloc (sizeof *reader);
377   reader->taint = taint != NULL ? taint_clone (taint) : taint_create ();
378   reader->proto = caseproto_ref (proto);
379   reader->case_cnt = case_cnt;
380   reader->class = class;
381   reader->aux = aux;
382   return reader;
383 }
384
385 /* If READER is a casereader of the given CLASS, returns its
386    associated auxiliary data; otherwise, returns a null pointer.
387
388    This function is intended for use from casereader
389    implementations, not by casereader users.  Even within
390    casereader implementations, its usefulness is quite limited,
391    for at least two reasons.  First, every casereader member
392    function already receives a pointer to the casereader's
393    auxiliary data.  Second, a casereader's class can change
394    (through a call to casereader_swap) and this is in practice
395    quite common (e.g. any call to casereader_clone on a
396    casereader that does not directly support clone will cause the
397    casereader to be replaced by a shim caseader). */
398 void *
399 casereader_dynamic_cast (struct casereader *reader,
400                          const struct casereader_class *class)
401 {
402   return reader->class == class ? reader->aux : NULL;
403 }
404 \f
405 /* Random-access casereader implementation.
406
407    This is a set of wrappers around casereader_create_sequential
408    and struct casereader_class to make it easy to create
409    efficient casereaders for data sources that natively support
410    random access. */
411
412 /* One clone of a random reader. */
413 struct random_reader
414   {
415     struct random_reader_shared *shared; /* Data shared among clones. */
416     struct heap_node heap_node; /* Node in shared data's heap of readers. */
417     casenumber offset;          /* Number of cases already read. */
418   };
419
420 /* Returns the random_reader in which the given heap_node is
421    embedded. */
422 static struct random_reader *
423 random_reader_from_heap_node (const struct heap_node *node)
424 {
425   return heap_data (node, struct random_reader, heap_node);
426 }
427
428 /* Data shared among clones of a random reader. */
429 struct random_reader_shared
430   {
431     struct heap *readers;       /* Heap of struct random_readers. */
432     casenumber min_offset;      /* Smallest offset of any random_reader. */
433     const struct casereader_random_class *class;
434     void *aux;
435   };
436
437 static const struct casereader_class random_reader_casereader_class;
438
439 /* Creates and returns a new random_reader with the given SHARED
440    data and OFFSET.  Inserts the new random reader into the
441    shared heap. */
442 static struct random_reader *
443 make_random_reader (struct random_reader_shared *shared, casenumber offset)
444 {
445   struct random_reader *br = xmalloc (sizeof *br);
446   br->offset = offset;
447   br->shared = shared;
448   heap_insert (shared->readers, &br->heap_node);
449   return br;
450 }
451
452 /* Compares random_readers A and B by offset and returns a
453    strcmp()-like result. */
454 static int
455 compare_random_readers_by_offset (const struct heap_node *a_,
456                                   const struct heap_node *b_,
457                                   const void *aux UNUSED)
458 {
459   const struct random_reader *a = random_reader_from_heap_node (a_);
460   const struct random_reader *b = random_reader_from_heap_node (b_);
461   return a->offset < b->offset ? -1 : a->offset > b->offset;
462 }
463
464 /* Creates and returns a new casereader.  This function is
465    intended for use by casereader implementations, not by
466    casereader clients.
467
468    This function is most suited for creating a casereader for a
469    data source that supports random access.
470    casereader_create_sequential is more appropriate for a data
471    source that is naturally sequential.
472
473    PROTO must be the prototype for the cases that may be read
474    from the casereader.  The caller retains its reference to
475    PROTO.
476
477    CASE_CNT is an upper limit on the number of cases that
478    casereader_read will return from the casereader in successive
479    calls.  Ordinarily, this is the actual number of cases in the
480    data source or CASENUMBER_MAX if the number of cases cannot be
481    predicted in advance.
482
483    CLASS and AUX are a set of casereader implementation-specific
484    member functions and auxiliary data to pass to those member
485    functions, respectively. */
486 struct casereader *
487 casereader_create_random (const struct caseproto *proto, casenumber case_cnt,
488                           const struct casereader_random_class *class,
489                           void *aux)
490 {
491   struct random_reader_shared *shared = xmalloc (sizeof *shared);
492   shared->readers = heap_create (compare_random_readers_by_offset, NULL);
493   shared->class = class;
494   shared->aux = aux;
495   shared->min_offset = 0;
496   return casereader_create_sequential (NULL, proto, case_cnt,
497                                        &random_reader_casereader_class,
498                                        make_random_reader (shared, 0));
499 }
500
501 /* Reassesses the min_offset in SHARED based on the minimum
502    offset in the heap.   */
503 static void
504 advance_random_reader (struct casereader *reader,
505                        struct random_reader_shared *shared)
506 {
507   casenumber old, new;
508
509   old = shared->min_offset;
510   new = random_reader_from_heap_node (heap_minimum (shared->readers))->offset;
511   assert (new >= old);
512   if (new > old)
513     {
514       shared->min_offset = new;
515       shared->class->advance (reader, shared->aux, new - old);
516     }
517 }
518
519 /* struct casereader_class "read" function for random reader. */
520 static struct ccase *
521 random_reader_read (struct casereader *reader, void *br_)
522 {
523   struct random_reader *br = br_;
524   struct random_reader_shared *shared = br->shared;
525   struct ccase *c = shared->class->read (reader, shared->aux,
526                                          br->offset - shared->min_offset);
527   if (c != NULL)
528     {
529       br->offset++;
530       heap_changed (shared->readers, &br->heap_node);
531       advance_random_reader (reader, shared);
532     }
533   return c;
534 }
535
536 /* struct casereader_class "destroy" function for random
537    reader. */
538 static void
539 random_reader_destroy (struct casereader *reader, void *br_)
540 {
541   struct random_reader *br = br_;
542   struct random_reader_shared *shared = br->shared;
543
544   heap_delete (shared->readers, &br->heap_node);
545   if (heap_is_empty (shared->readers))
546     {
547       heap_destroy (shared->readers);
548       shared->class->destroy (reader, shared->aux);
549       free (shared);
550     }
551   else
552     advance_random_reader (reader, shared);
553
554   free (br);
555 }
556
557 /* struct casereader_class "clone" function for random reader. */
558 static struct casereader *
559 random_reader_clone (struct casereader *reader, void *br_)
560 {
561   struct random_reader *br = br_;
562   struct random_reader_shared *shared = br->shared;
563   return casereader_create_sequential (casereader_get_taint (reader),
564                                        reader->proto,
565                                        casereader_get_case_cnt (reader),
566                                        &random_reader_casereader_class,
567                                        make_random_reader (shared,
568                                                            br->offset));
569 }
570
571 /* struct casereader_class "peek" function for random reader. */
572 static struct ccase *
573 random_reader_peek (struct casereader *reader, void *br_, casenumber idx)
574 {
575   struct random_reader *br = br_;
576   struct random_reader_shared *shared = br->shared;
577
578   return shared->class->read (reader, shared->aux,
579                               br->offset - shared->min_offset + idx);
580 }
581
582 /* Casereader class for random reader. */
583 static const struct casereader_class random_reader_casereader_class =
584   {
585     random_reader_read,
586     random_reader_destroy,
587     random_reader_clone,
588     random_reader_peek,
589   };
590 \f
591 \f
592 static const struct casereader_class casereader_null_class;
593
594 /* Returns a casereader with no cases.  The casereader has the prototype
595    specified by PROTO.  PROTO may be specified as a null pointer, in which case
596    the casereader has no variables. */
597 struct casereader *
598 casereader_create_empty (const struct caseproto *proto_)
599 {
600   struct casereader *reader;
601   struct caseproto *proto;
602
603   proto = proto_ != NULL ? caseproto_ref (proto_) : caseproto_create ();
604   reader = casereader_create_sequential (NULL, proto, 0,
605                                          &casereader_null_class, NULL);
606   caseproto_unref (proto);
607
608   return reader;
609 }
610
611 static struct ccase *
612 casereader_null_read (struct casereader *reader UNUSED, void *aux UNUSED)
613 {
614   return NULL;
615 }
616
617 static void
618 casereader_null_destroy (struct casereader *reader UNUSED, void *aux UNUSED)
619 {
620   /* Nothing to do. */
621 }
622
623 static const struct casereader_class casereader_null_class =
624   {
625     casereader_null_read,
626     casereader_null_destroy,
627     NULL,                       /* clone */
628     NULL,                       /* peek */
629   };