Merge remote branch 'origin/master' into import-gui
[pspp] / src / data / casereader.c
1 /* pspp - a program for statistical analysis.
2    Copyright (C) 2007, 2009, 2010, 2013 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include "data/casereader.h"
20 #include "data/casereader-provider.h"
21
22 #include <stdlib.h>
23
24 #include "data/casereader-shim.h"
25 #include "data/casewriter.h"
26 #include "libpspp/assertion.h"
27 #include "libpspp/heap.h"
28 #include "libpspp/taint.h"
29
30 #include "gl/xalloc.h"
31
32 /* A casereader. */
33 struct casereader
34   {
35     struct taint *taint;                  /* Corrupted? */
36     struct caseproto *proto;              /* Format of contained cases. */
37     casenumber case_cnt;                  /* Number of cases,
38                                              CASENUMBER_MAX if unknown. */
39     const struct casereader_class *class; /* Class. */
40     void *aux;                            /* Auxiliary data for class. */
41   };
42
43 /* Reads and returns the next case from READER.  The caller owns
44    the returned case and must call case_unref on it when its data
45    is no longer needed.  Returns a null pointer if cases have
46    been exhausted or upon detection of an I/O error.
47
48    The case returned is effectively consumed: it can never be
49    read again through READER.  If this is inconvenient, READER
50    may be cloned in advance with casereader_clone, or
51    casereader_peek may be used instead. */
52 struct ccase *
53 casereader_read (struct casereader *reader)
54 {
55   if (reader->case_cnt != 0)
56     {
57       /* ->read may use casereader_swap to replace itself by
58          another reader and then delegate to that reader by
59          recursively calling casereader_read.  Currently only
60          lazy_casereader does this and, with luck, nothing else
61          ever will.
62
63          To allow this to work, however, we must decrement
64          case_cnt before calling ->read.  If we decremented
65          case_cnt after calling ->read, then this would actually
66          drop two cases from case_cnt instead of one, and we'd
67          lose the last case in the casereader. */
68       struct ccase *c;
69       if (reader->case_cnt != CASENUMBER_MAX)
70         reader->case_cnt--;
71       c = reader->class->read (reader, reader->aux);
72       if (c != NULL)
73         {
74           size_t n_widths UNUSED = caseproto_get_n_widths (reader->proto);
75           assert (case_get_value_cnt (c) >= n_widths);
76           expensive_assert (caseproto_equal (case_get_proto (c), 0,
77                                              reader->proto, 0, n_widths));
78           return c;
79         }
80     }
81   reader->case_cnt = 0;
82   return NULL;
83 }
84
85 /* Destroys READER.
86    Returns false if an I/O error was detected on READER, true
87    otherwise. */
88 bool
89 casereader_destroy (struct casereader *reader)
90 {
91   bool ok = true;
92   if (reader != NULL)
93     {
94       reader->class->destroy (reader, reader->aux);
95       ok = taint_destroy (reader->taint);
96       caseproto_unref (reader->proto);
97       free (reader);
98     }
99   return ok;
100 }
101
102 /* Returns a clone of READER.  READER and its clone may be used
103    to read the same sequence of cases in the same order, barring
104    I/O errors. */
105 struct casereader *
106 casereader_clone (const struct casereader *reader_)
107 {
108   struct casereader *reader = CONST_CAST (struct casereader *, reader_);
109   struct casereader *clone;
110   if ( reader == NULL ) 
111     return NULL;
112
113   if (reader->class->clone == NULL)
114     casereader_shim_insert (reader);
115   clone = reader->class->clone (reader, reader->aux);
116   assert (clone != NULL);
117   assert (clone != reader);
118   return clone;
119 }
120
121 /* Returns a copy of READER, which is itself destroyed.
122    Useful for taking over ownership of a casereader, to enforce
123    preventing the original owner from accessing the casereader
124    again. */
125 struct casereader *
126 casereader_rename (struct casereader *reader)
127 {
128   struct casereader *new = xmemdup (reader, sizeof *reader);
129   free (reader);
130   return new;
131 }
132
133 /* Exchanges the casereaders referred to by A and B. */
134 void
135 casereader_swap (struct casereader *a, struct casereader *b)
136 {
137   if (a != b)
138     {
139       struct casereader tmp = *a;
140       *a = *b;
141       *b = tmp;
142     }
143 }
144
145 /* Reads and returns the (IDX + 1)'th case from READER.  The
146    caller owns the returned case and must call case_unref on it
147    when it is no longer needed.  Returns a null pointer if cases
148    have been exhausted or upon detection of an I/O error. */
149 struct ccase *
150 casereader_peek (struct casereader *reader, casenumber idx)
151 {
152   if (idx < reader->case_cnt)
153     {
154       struct ccase *c;
155       if (reader->class->peek == NULL)
156         casereader_shim_insert (reader);
157       c = reader->class->peek (reader, reader->aux, idx);
158       if (c != NULL)
159         return c;
160       else if (casereader_error (reader))
161         reader->case_cnt = 0;
162     }
163   if (reader->case_cnt > idx)
164     reader->case_cnt = idx;
165   return NULL;
166 }
167
168 /* Returns true if no cases remain to be read from READER, or if
169    an error has occurred on READER.  (A return value of false
170    does *not* mean that the next call to casereader_peek or
171    casereader_read will return true, because an error can occur
172    in the meantime.) */
173 bool
174 casereader_is_empty (struct casereader *reader)
175 {
176   if (reader->case_cnt == 0)
177     return true;
178   else
179     {
180       struct ccase *c = casereader_peek (reader, 0);
181       if (c == NULL)
182         return true;
183       else
184         {
185           case_unref (c);
186           return false;
187         }
188     }
189 }
190
191 /* Returns true if an I/O error or another hard error has
192    occurred on READER, a clone of READER, or on some object on
193    which READER's data has a dependency, false otherwise. */
194 bool
195 casereader_error (const struct casereader *reader)
196 {
197   return taint_is_tainted (reader->taint);
198 }
199
200 /* Marks READER as having encountered an error.
201
202    Ordinarily, this function should be called by the
203    implementation of a casereader, not by the casereader's
204    client.  Instead, casereader clients should usually ensure
205    that a casereader's error state is correct by using
206    taint_propagate to propagate to the casereader's taint
207    structure, which may be obtained via casereader_get_taint. */
208 void
209 casereader_force_error (struct casereader *reader)
210 {
211   taint_set_taint (reader->taint);
212 }
213
214 /* Returns READER's associate taint object, for use with
215    taint_propagate and other taint functions. */
216 const struct taint *
217 casereader_get_taint (const struct casereader *reader)
218 {
219   return reader->taint;
220 }
221
222 /* Returns the number of cases that will be read by successive
223    calls to casereader_read for READER, assuming that no errors
224    occur.  Upon an error condition, the case count drops to 0, so
225    that no more cases can be obtained.
226
227    Not all casereaders can predict the number of cases that they
228    will produce without actually reading all of them.  In that
229    case, this function returns CASENUMBER_MAX.  To obtain the
230    actual number of cases in such a casereader, use
231    casereader_count_cases. */
232 casenumber
233 casereader_get_case_cnt (struct casereader *reader)
234 {
235   return reader->case_cnt;
236 }
237
238 static casenumber
239 casereader_count_cases__ (const struct casereader *reader,
240                           casenumber max_cases)
241 {
242   struct casereader *clone = casereader_clone (reader);
243   casenumber n_cases = casereader_advance (clone, max_cases);
244 #if (__GNUC__ == 4 ) && (__GNUC_MINOR__ == 4)
245   volatile int x = 1; x++;
246 #endif
247   casereader_destroy (clone);
248   return n_cases;
249 }
250
251 /* Returns the number of cases that will be read by successive
252    calls to casereader_read for READER, assuming that no errors
253    occur.  Upon an error condition, the case count drops to 0, so
254    that no more cases can be obtained.
255
256    For a casereader that cannot predict the number of cases it
257    will produce, this function actually reads (and discards) all
258    of the contents of a clone of READER.  Thus, the return value
259    is always correct in the absence of I/O errors. */
260 casenumber
261 casereader_count_cases (const struct casereader *reader)
262 {
263   if (reader->case_cnt == CASENUMBER_MAX)
264     {
265       struct casereader *reader_rw = CONST_CAST (struct casereader *, reader);
266       reader_rw->case_cnt = casereader_count_cases__ (reader, CASENUMBER_MAX);
267     }
268   return reader->case_cnt;
269 }
270
271 /* Truncates READER to at most N cases. */
272 void
273 casereader_truncate (struct casereader *reader, casenumber n)
274 {
275   /* This could be optimized, if it ever becomes too expensive, by adding a
276      "max_cases" member to struct casereader.  We could also add a "truncate"
277      function to the casereader implementation, to allow the casereader to
278      throw away data that cannot ever be read. */
279   if (reader->case_cnt == CASENUMBER_MAX)
280     reader->case_cnt = casereader_count_cases__ (reader, n);
281   if (reader->case_cnt > n)
282     reader->case_cnt = n;
283 }
284
285 /* Returns the prototype for the cases in READER.  The caller
286    must not unref the returned prototype. */
287 const struct caseproto *
288 casereader_get_proto (const struct casereader *reader)
289 {
290   return reader->proto;
291 }
292
293 /* Skips past N cases in READER, stopping when the last case in
294    READER has been read or on an input error.  Returns the number
295    of cases successfully skipped. */
296 casenumber
297 casereader_advance (struct casereader *reader, casenumber n)
298 {
299   casenumber i;
300
301   for (i = 0; i < n; i++)
302     {
303       struct ccase *c = casereader_read (reader);
304       if (c == NULL)
305         break;
306       case_unref (c);
307     }
308
309   return i;
310 }
311
312
313 /* Copies all the cases in READER to WRITER, propagating errors
314    appropriately. READER is destroyed by this function */
315 void
316 casereader_transfer (struct casereader *reader, struct casewriter *writer)
317 {
318   struct ccase *c;
319
320   taint_propagate (casereader_get_taint (reader),
321                    casewriter_get_taint (writer));
322   while ((c = casereader_read (reader)) != NULL)
323     casewriter_write (writer, c);
324   casereader_destroy (reader);
325 }
326
327 /* Creates and returns a new casereader.  This function is
328    intended for use by casereader implementations, not by
329    casereader clients.
330
331    This function is most suited for creating a casereader for a
332    data source that is naturally sequential.
333    casereader_create_random may be more appropriate for a data
334    source that supports random access.
335
336    Ordinarily, specify a null pointer for TAINT, in which case
337    the new casereader will have a new, unique taint object.  If
338    the new casereader should have a clone of an existing taint
339    object, specify that object as TAINT.  (This is most commonly
340    useful in an implementation of the "clone" casereader_class
341    function, in which case the cloned casereader should have the
342    same taint object as the original casereader.)
343
344    PROTO must be the prototype for the cases that may be read
345    from the casereader.  The caller retains its reference to
346    PROTO.
347
348    CASE_CNT is an upper limit on the number of cases that
349    casereader_read will return from the casereader in successive
350    calls.  Ordinarily, this is the actual number of cases in the
351    data source or CASENUMBER_MAX if the number of cases cannot be
352    predicted in advance.
353
354    CLASS and AUX are a set of casereader implementation-specific
355    member functions and auxiliary data to pass to those member
356    functions, respectively. */
357 struct casereader *
358 casereader_create_sequential (const struct taint *taint,
359                               const struct caseproto *proto,
360                               casenumber case_cnt,
361                               const struct casereader_class *class, void *aux)
362 {
363   struct casereader *reader = xmalloc (sizeof *reader);
364   reader->taint = taint != NULL ? taint_clone (taint) : taint_create ();
365   reader->proto = caseproto_ref (proto);
366   reader->case_cnt = case_cnt;
367   reader->class = class;
368   reader->aux = aux;
369   return reader;
370 }
371
372 /* If READER is a casereader of the given CLASS, returns its
373    associated auxiliary data; otherwise, returns a null pointer.
374
375    This function is intended for use from casereader
376    implementations, not by casereader users.  Even within
377    casereader implementations, its usefulness is quite limited,
378    for at least two reasons.  First, every casereader member
379    function already receives a pointer to the casereader's
380    auxiliary data.  Second, a casereader's class can change
381    (through a call to casereader_swap) and this is in practice
382    quite common (e.g. any call to casereader_clone on a
383    casereader that does not directly support clone will cause the
384    casereader to be replaced by a shim caseader). */
385 void *
386 casereader_dynamic_cast (struct casereader *reader,
387                          const struct casereader_class *class)
388 {
389   return reader->class == class ? reader->aux : NULL;
390 }
391 \f
392 /* Random-access casereader implementation.
393
394    This is a set of wrappers around casereader_create_sequential
395    and struct casereader_class to make it easy to create
396    efficient casereaders for data sources that natively support
397    random access. */
398
399 /* One clone of a random reader. */
400 struct random_reader
401   {
402     struct random_reader_shared *shared; /* Data shared among clones. */
403     struct heap_node heap_node; /* Node in shared data's heap of readers. */
404     casenumber offset;          /* Number of cases already read. */
405   };
406
407 /* Returns the random_reader in which the given heap_node is
408    embedded. */
409 static struct random_reader *
410 random_reader_from_heap_node (const struct heap_node *node)
411 {
412   return heap_data (node, struct random_reader, heap_node);
413 }
414
415 /* Data shared among clones of a random reader. */
416 struct random_reader_shared
417   {
418     struct heap *readers;       /* Heap of struct random_readers. */
419     casenumber min_offset;      /* Smallest offset of any random_reader. */
420     const struct casereader_random_class *class;
421     void *aux;
422   };
423
424 static const struct casereader_class random_reader_casereader_class;
425
426 /* Creates and returns a new random_reader with the given SHARED
427    data and OFFSET.  Inserts the new random reader into the
428    shared heap. */
429 static struct random_reader *
430 make_random_reader (struct random_reader_shared *shared, casenumber offset)
431 {
432   struct random_reader *br = xmalloc (sizeof *br);
433   br->offset = offset;
434   br->shared = shared;
435   heap_insert (shared->readers, &br->heap_node);
436   return br;
437 }
438
439 /* Compares random_readers A and B by offset and returns a
440    strcmp()-like result. */
441 static int
442 compare_random_readers_by_offset (const struct heap_node *a_,
443                                   const struct heap_node *b_,
444                                   const void *aux UNUSED)
445 {
446   const struct random_reader *a = random_reader_from_heap_node (a_);
447   const struct random_reader *b = random_reader_from_heap_node (b_);
448   return a->offset < b->offset ? -1 : a->offset > b->offset;
449 }
450
451 /* Creates and returns a new casereader.  This function is
452    intended for use by casereader implementations, not by
453    casereader clients.
454
455    This function is most suited for creating a casereader for a
456    data source that supports random access.
457    casereader_create_sequential is more appropriate for a data
458    source that is naturally sequential.
459
460    PROTO must be the prototype for the cases that may be read
461    from the casereader.  The caller retains its reference to
462    PROTO.
463
464    CASE_CNT is an upper limit on the number of cases that
465    casereader_read will return from the casereader in successive
466    calls.  Ordinarily, this is the actual number of cases in the
467    data source or CASENUMBER_MAX if the number of cases cannot be
468    predicted in advance.
469
470    CLASS and AUX are a set of casereader implementation-specific
471    member functions and auxiliary data to pass to those member
472    functions, respectively. */
473 struct casereader *
474 casereader_create_random (const struct caseproto *proto, casenumber case_cnt,
475                           const struct casereader_random_class *class,
476                           void *aux)
477 {
478   struct random_reader_shared *shared = xmalloc (sizeof *shared);
479   shared->readers = heap_create (compare_random_readers_by_offset, NULL);
480   shared->class = class;
481   shared->aux = aux;
482   shared->min_offset = 0;
483   return casereader_create_sequential (NULL, proto, case_cnt,
484                                        &random_reader_casereader_class,
485                                        make_random_reader (shared, 0));
486 }
487
488 /* Reassesses the min_offset in SHARED based on the minimum
489    offset in the heap.   */
490 static void
491 advance_random_reader (struct casereader *reader,
492                        struct random_reader_shared *shared)
493 {
494   casenumber old, new;
495
496   old = shared->min_offset;
497   new = random_reader_from_heap_node (heap_minimum (shared->readers))->offset;
498   assert (new >= old);
499   if (new > old)
500     {
501       shared->min_offset = new;
502       shared->class->advance (reader, shared->aux, new - old);
503     }
504 }
505
506 /* struct casereader_class "read" function for random reader. */
507 static struct ccase *
508 random_reader_read (struct casereader *reader, void *br_)
509 {
510   struct random_reader *br = br_;
511   struct random_reader_shared *shared = br->shared;
512   struct ccase *c = shared->class->read (reader, shared->aux,
513                                          br->offset - shared->min_offset);
514   if (c != NULL)
515     {
516       br->offset++;
517       heap_changed (shared->readers, &br->heap_node);
518       advance_random_reader (reader, shared);
519     }
520   return c;
521 }
522
523 /* struct casereader_class "destroy" function for random
524    reader. */
525 static void
526 random_reader_destroy (struct casereader *reader, void *br_)
527 {
528   struct random_reader *br = br_;
529   struct random_reader_shared *shared = br->shared;
530
531   heap_delete (shared->readers, &br->heap_node);
532   if (heap_is_empty (shared->readers))
533     {
534       heap_destroy (shared->readers);
535       shared->class->destroy (reader, shared->aux);
536       free (shared);
537     }
538   else
539     advance_random_reader (reader, shared);
540
541   free (br);
542 }
543
544 /* struct casereader_class "clone" function for random reader. */
545 static struct casereader *
546 random_reader_clone (struct casereader *reader, void *br_)
547 {
548   struct random_reader *br = br_;
549   struct random_reader_shared *shared = br->shared;
550   return casereader_create_sequential (casereader_get_taint (reader),
551                                        reader->proto,
552                                        casereader_get_case_cnt (reader),
553                                        &random_reader_casereader_class,
554                                        make_random_reader (shared,
555                                                            br->offset));
556 }
557
558 /* struct casereader_class "peek" function for random reader. */
559 static struct ccase *
560 random_reader_peek (struct casereader *reader, void *br_, casenumber idx)
561 {
562   struct random_reader *br = br_;
563   struct random_reader_shared *shared = br->shared;
564
565   return shared->class->read (reader, shared->aux,
566                               br->offset - shared->min_offset + idx);
567 }
568
569 /* Casereader class for random reader. */
570 static const struct casereader_class random_reader_casereader_class =
571   {
572     random_reader_read,
573     random_reader_destroy,
574     random_reader_clone,
575     random_reader_peek,
576   };
577 \f
578 \f
579 static const struct casereader_class casereader_null_class;
580
581 /* Returns a casereader with no cases.  The casereader has the prototype
582    specified by PROTO.  PROTO may be specified as a null pointer, in which case
583    the casereader has no variables. */
584 struct casereader *
585 casereader_create_empty (const struct caseproto *proto_)
586 {
587   struct casereader *reader;
588   struct caseproto *proto;
589
590   proto = proto_ != NULL ? caseproto_ref (proto_) : caseproto_create ();
591   reader = casereader_create_sequential (NULL, proto, 0,
592                                          &casereader_null_class, NULL);
593   caseproto_unref (proto);
594
595   return reader;
596 }
597
598 static struct ccase *
599 casereader_null_read (struct casereader *reader UNUSED, void *aux UNUSED)
600 {
601   return NULL;
602 }
603
604 static void
605 casereader_null_destroy (struct casereader *reader UNUSED, void *aux UNUSED)
606 {
607   /* Nothing to do. */
608 }
609
610 static const struct casereader_class casereader_null_class =
611   {
612     casereader_null_read,
613     casereader_null_destroy,
614     NULL,                       /* clone */
615     NULL,                       /* peek */
616   };