better tests
[pspp] / src / data / casereader.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2007, 2009, 2010, 2013 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include "data/casereader.h"
20 #include "data/casereader-provider.h"
21
22 #include <stdlib.h>
23
24 #include "data/casereader-shim.h"
25 #include "data/casewriter.h"
26 #include "libpspp/assertion.h"
27 #include "libpspp/heap.h"
28 #include "libpspp/taint.h"
29
30 #include "gl/xalloc.h"
31
32 /* A casereader. */
33 struct casereader
34   {
35     struct taint *taint;                  /* Corrupted? */
36     struct caseproto *proto;              /* Format of contained cases. */
37     casenumber n_cases;                   /* Number of cases,
38                                              CASENUMBER_MAX if unknown. */
39     const struct casereader_class *class; /* Class. */
40     void *aux;                            /* Auxiliary data for class. */
41   };
42
43 /* Reads and returns the next case from READER.  The caller owns
44    the returned case and must call case_unref on it when its data
45    is no longer needed.  Returns a null pointer if cases have
46    been exhausted or upon detection of an I/O error.
47
48    The case returned is effectively consumed: it can never be
49    read again through READER.  If this is inconvenient, READER
50    may be cloned in advance with casereader_clone, or
51    casereader_peek may be used instead. */
52 struct ccase *
53 casereader_read (struct casereader *reader)
54 {
55   if (reader->n_cases != 0)
56     {
57       /* ->read may use casereader_swap to replace itself by
58          another reader and then delegate to that reader by
59          recursively calling casereader_read.  Currently only
60          lazy_casereader does this and, with luck, nothing else
61          ever will.
62
63          To allow this to work, however, we must decrement
64          n_cases before calling ->read.  If we decremented
65          n_cases after calling ->read, then this would actually
66          drop two cases from n_cases instead of one, and we'd
67          lose the last case in the casereader. */
68       struct ccase *c;
69       if (reader->n_cases != CASENUMBER_MAX)
70         reader->n_cases--;
71       c = reader->class->read (reader, reader->aux);
72       if (c != NULL)
73         {
74           size_t n_widths UNUSED = caseproto_get_n_widths (reader->proto);
75           assert (case_get_n_values (c) >= n_widths);
76           expensive_assert (caseproto_equal (case_get_proto (c), 0,
77                                              reader->proto, 0, n_widths));
78           return c;
79         }
80     }
81   reader->n_cases = 0;
82   return NULL;
83 }
84
85 /* Destroys READER.
86    Returns false if an I/O error was detected on READER, true
87    otherwise. */
88 bool
89 casereader_destroy (struct casereader *reader)
90 {
91   bool ok = true;
92   if (reader != NULL)
93     {
94       reader->class->destroy (reader, reader->aux);
95       ok = taint_destroy (reader->taint);
96       caseproto_unref (reader->proto);
97       free (reader);
98     }
99   return ok;
100 }
101
102 /* Returns a clone of READER.  READER and its clone may be used
103    to read the same sequence of cases in the same order, barring
104    I/O errors. */
105 struct casereader *
106 casereader_clone (const struct casereader *reader_)
107 {
108   struct casereader *reader = CONST_CAST (struct casereader *, reader_);
109   struct casereader *clone;
110   if (reader == NULL)
111     return NULL;
112
113   if (reader->class->clone == NULL)
114     casereader_shim_insert (reader);
115   clone = reader->class->clone (reader, reader->aux);
116   assert (clone != NULL);
117   assert (clone != reader);
118   return clone;
119 }
120
121 /* Returns a copy of READER, which is itself destroyed.
122    Useful for taking over ownership of a casereader, to enforce
123    preventing the original owner from accessing the casereader
124    again. */
125 struct casereader *
126 casereader_rename (struct casereader *reader)
127 {
128   struct casereader *new = xmemdup (reader, sizeof *reader);
129   free (reader);
130   return new;
131 }
132
133 /* Exchanges the casereaders referred to by A and B. */
134 void
135 casereader_swap (struct casereader *a, struct casereader *b)
136 {
137   if (a != b)
138     {
139       struct casereader tmp = *a;
140       *a = *b;
141       *b = tmp;
142     }
143 }
144
145 /* Reads and returns the (IDX + 1)'th case from READER.  The
146    caller owns the returned case and must call case_unref on it
147    when it is no longer needed.  Returns a null pointer if cases
148    have been exhausted or upon detection of an I/O error. */
149 struct ccase *
150 casereader_peek (struct casereader *reader, casenumber idx)
151 {
152   if (idx < reader->n_cases)
153     {
154       struct ccase *c;
155       if (reader->class->peek == NULL)
156         casereader_shim_insert (reader);
157       c = reader->class->peek (reader, reader->aux, idx);
158       if (c != NULL)
159         return c;
160       else if (casereader_error (reader))
161         reader->n_cases = 0;
162     }
163   if (reader->n_cases > idx)
164     reader->n_cases = idx;
165   return NULL;
166 }
167
168 /* Returns true if no cases remain to be read from READER, or if
169    an error has occurred on READER.  (A return value of false
170    does *not* mean that the next call to casereader_peek or
171    casereader_read will return true, because an error can occur
172    in the meantime.) */
173 bool
174 casereader_is_empty (struct casereader *reader)
175 {
176   if (reader->n_cases == 0)
177     return true;
178   else
179     {
180       struct ccase *c = casereader_peek (reader, 0);
181       if (c == NULL)
182         return true;
183       else
184         {
185           case_unref (c);
186           return false;
187         }
188     }
189 }
190
191 /* Returns true if an I/O error or another hard error has
192    occurred on READER, a clone of READER, or on some object on
193    which READER's data has a dependency, false otherwise. */
194 bool
195 casereader_error (const struct casereader *reader)
196 {
197   return taint_is_tainted (reader->taint);
198 }
199
200 /* Marks READER as having encountered an error.
201
202    Ordinarily, this function should be called by the
203    implementation of a casereader, not by the casereader's
204    client.  Instead, casereader clients should usually ensure
205    that a casereader's error state is correct by using
206    taint_propagate to propagate to the casereader's taint
207    structure, which may be obtained via casereader_get_taint. */
208 void
209 casereader_force_error (struct casereader *reader)
210 {
211   taint_set_taint (reader->taint);
212 }
213
214 /* Returns READER's associate taint object, for use with
215    taint_propagate and other taint functions. */
216 const struct taint *
217 casereader_get_taint (const struct casereader *reader)
218 {
219   return reader->taint;
220 }
221
222 /* Returns the number of cases that will be read by successive
223    calls to casereader_read for READER, assuming that no errors
224    occur.  Upon an error condition, the case count drops to 0, so
225    that no more cases can be obtained.
226
227    Not all casereaders can predict the number of cases that they
228    will produce without actually reading all of them.  In that
229    case, this function returns CASENUMBER_MAX.  To obtain the
230    actual number of cases in such a casereader, use
231    casereader_count_cases. */
232 casenumber
233 casereader_get_n_cases (struct casereader *reader)
234 {
235   return reader->n_cases;
236 }
237
238 static casenumber
239 casereader_count_cases__ (const struct casereader *reader,
240                           casenumber max_cases)
241 {
242   struct casereader *clone;
243   casenumber n_cases;
244
245   clone = casereader_clone (reader);
246   n_cases = casereader_advance (clone, max_cases);
247   casereader_destroy (clone);
248
249   return n_cases;
250 }
251
252 /* Returns the number of cases that will be read by successive
253    calls to casereader_read for READER, assuming that no errors
254    occur.  Upon an error condition, the case count drops to 0, so
255    that no more cases can be obtained.
256
257    For a casereader that cannot predict the number of cases it
258    will produce, this function actually reads (and discards) all
259    of the contents of a clone of READER.  Thus, the return value
260    is always correct in the absence of I/O errors. */
261 casenumber
262 casereader_count_cases (const struct casereader *reader)
263 {
264   if (reader->n_cases == CASENUMBER_MAX)
265     {
266       struct casereader *reader_rw = CONST_CAST (struct casereader *, reader);
267       reader_rw->n_cases = casereader_count_cases__ (reader, CASENUMBER_MAX);
268     }
269   return reader->n_cases;
270 }
271
272 /* Truncates READER to at most N cases. */
273 void
274 casereader_truncate (struct casereader *reader, casenumber n)
275 {
276   /* This could be optimized, if it ever becomes too expensive, by adding a
277      "max_cases" member to struct casereader.  We could also add a "truncate"
278      function to the casereader implementation, to allow the casereader to
279      throw away data that cannot ever be read. */
280   if (reader->n_cases == CASENUMBER_MAX)
281     reader->n_cases = casereader_count_cases__ (reader, n);
282   if (reader->n_cases > n)
283     reader->n_cases = n;
284 }
285
286 /* Returns the prototype for the cases in READER.  The caller
287    must not unref the returned prototype. */
288 const struct caseproto *
289 casereader_get_proto (const struct casereader *reader)
290 {
291   return reader->proto;
292 }
293
294 /* Skips past N cases in READER, stopping when the last case in
295    READER has been read or on an input error.  Returns the number
296    of cases successfully skipped. */
297 casenumber
298 casereader_advance (struct casereader *reader, casenumber n)
299 {
300   casenumber i;
301
302   for (i = 0; i < n; i++)
303     {
304       struct ccase *c = casereader_read (reader);
305       if (c == NULL)
306         break;
307       case_unref (c);
308     }
309
310   return i;
311 }
312
313
314 /* Copies all the cases in READER to WRITER, propagating errors
315    appropriately. READER is destroyed by this function */
316 void
317 casereader_transfer (struct casereader *reader, struct casewriter *writer)
318 {
319   struct ccase *c;
320
321   taint_propagate (casereader_get_taint (reader),
322                    casewriter_get_taint (writer));
323   while ((c = casereader_read (reader)) != NULL)
324     casewriter_write (writer, c);
325   casereader_destroy (reader);
326 }
327
328 /* Creates and returns a new casereader.  This function is
329    intended for use by casereader implementations, not by
330    casereader clients.
331
332    This function is most suited for creating a casereader for a
333    data source that is naturally sequential.
334    casereader_create_random may be more appropriate for a data
335    source that supports random access.
336
337    Ordinarily, specify a null pointer for TAINT, in which case
338    the new casereader will have a new, unique taint object.  If
339    the new casereader should have a clone of an existing taint
340    object, specify that object as TAINT.  (This is most commonly
341    useful in an implementation of the "clone" casereader_class
342    function, in which case the cloned casereader should have the
343    same taint object as the original casereader.)
344
345    PROTO must be the prototype for the cases that may be read
346    from the casereader.  The caller retains its reference to
347    PROTO.
348
349    CASE_CNT is an upper limit on the number of cases that
350    casereader_read will return from the casereader in successive
351    calls.  Ordinarily, this is the actual number of cases in the
352    data source or CASENUMBER_MAX if the number of cases cannot be
353    predicted in advance.
354
355    CLASS and AUX are a set of casereader implementation-specific
356    member functions and auxiliary data to pass to those member
357    functions, respectively. */
358 struct casereader *
359 casereader_create_sequential (const struct taint *taint,
360                               const struct caseproto *proto,
361                               casenumber n_cases,
362                               const struct casereader_class *class, void *aux)
363 {
364   struct casereader *reader = xmalloc (sizeof *reader);
365   reader->taint = taint != NULL ? taint_clone (taint) : taint_create ();
366   reader->proto = caseproto_ref (proto);
367   reader->n_cases = n_cases;
368   reader->class = class;
369   reader->aux = aux;
370   return reader;
371 }
372
373 /* If READER is a casereader of the given CLASS, returns its
374    associated auxiliary data; otherwise, returns a null pointer.
375
376    This function is intended for use from casereader
377    implementations, not by casereader users.  Even within
378    casereader implementations, its usefulness is quite limited,
379    for at least two reasons.  First, every casereader member
380    function already receives a pointer to the casereader's
381    auxiliary data.  Second, a casereader's class can change
382    (through a call to casereader_swap) and this is in practice
383    quite common (e.g. any call to casereader_clone on a
384    casereader that does not directly support clone will cause the
385    casereader to be replaced by a shim caseader). */
386 void *
387 casereader_dynamic_cast (struct casereader *reader,
388                          const struct casereader_class *class)
389 {
390   return reader->class == class ? reader->aux : NULL;
391 }
392 \f
393 /* Random-access casereader implementation.
394
395    This is a set of wrappers around casereader_create_sequential
396    and struct casereader_class to make it easy to create
397    efficient casereaders for data sources that natively support
398    random access. */
399
400 /* One clone of a random reader. */
401 struct random_reader
402   {
403     struct random_reader_shared *shared; /* Data shared among clones. */
404     struct heap_node heap_node; /* Node in shared data's heap of readers. */
405     casenumber offset;          /* Number of cases already read. */
406   };
407
408 /* Returns the random_reader in which the given heap_node is
409    embedded. */
410 static struct random_reader *
411 random_reader_from_heap_node (const struct heap_node *node)
412 {
413   return heap_data (node, struct random_reader, heap_node);
414 }
415
416 /* Data shared among clones of a random reader. */
417 struct random_reader_shared
418   {
419     struct heap *readers;       /* Heap of struct random_readers. */
420     casenumber min_offset;      /* Smallest offset of any random_reader. */
421     const struct casereader_random_class *class;
422     void *aux;
423   };
424
425 static const struct casereader_class random_reader_casereader_class;
426
427 /* Creates and returns a new random_reader with the given SHARED
428    data and OFFSET.  Inserts the new random reader into the
429    shared heap. */
430 static struct random_reader *
431 make_random_reader (struct random_reader_shared *shared, casenumber offset)
432 {
433   struct random_reader *br = xmalloc (sizeof *br);
434   br->offset = offset;
435   br->shared = shared;
436   heap_insert (shared->readers, &br->heap_node);
437   return br;
438 }
439
440 /* Compares random_readers A and B by offset and returns a
441    strcmp()-like result. */
442 static int
443 compare_random_readers_by_offset (const struct heap_node *a_,
444                                   const struct heap_node *b_,
445                                   const void *aux UNUSED)
446 {
447   const struct random_reader *a = random_reader_from_heap_node (a_);
448   const struct random_reader *b = random_reader_from_heap_node (b_);
449   return a->offset < b->offset ? -1 : a->offset > b->offset;
450 }
451
452 /* Creates and returns a new casereader.  This function is
453    intended for use by casereader implementations, not by
454    casereader clients.
455
456    This function is most suited for creating a casereader for a
457    data source that supports random access.
458    casereader_create_sequential is more appropriate for a data
459    source that is naturally sequential.
460
461    PROTO must be the prototype for the cases that may be read
462    from the casereader.  The caller retains its reference to
463    PROTO.
464
465    CASE_CNT is an upper limit on the number of cases that
466    casereader_read will return from the casereader in successive
467    calls.  Ordinarily, this is the actual number of cases in the
468    data source or CASENUMBER_MAX if the number of cases cannot be
469    predicted in advance.
470
471    CLASS and AUX are a set of casereader implementation-specific
472    member functions and auxiliary data to pass to those member
473    functions, respectively. */
474 struct casereader *
475 casereader_create_random (const struct caseproto *proto, casenumber n_cases,
476                           const struct casereader_random_class *class,
477                           void *aux)
478 {
479   struct random_reader_shared *shared = xmalloc (sizeof *shared);
480   shared->readers = heap_create (compare_random_readers_by_offset, NULL);
481   shared->class = class;
482   shared->aux = aux;
483   shared->min_offset = 0;
484   return casereader_create_sequential (NULL, proto, n_cases,
485                                        &random_reader_casereader_class,
486                                        make_random_reader (shared, 0));
487 }
488
489 /* Reassesses the min_offset in SHARED based on the minimum
490    offset in the heap.   */
491 static void
492 advance_random_reader (struct casereader *reader,
493                        struct random_reader_shared *shared)
494 {
495   casenumber old, new;
496
497   old = shared->min_offset;
498   new = random_reader_from_heap_node (heap_minimum (shared->readers))->offset;
499   assert (new >= old);
500   if (new > old)
501     {
502       shared->min_offset = new;
503       shared->class->advance (reader, shared->aux, new - old);
504     }
505 }
506
507 /* struct casereader_class "read" function for random reader. */
508 static struct ccase *
509 random_reader_read (struct casereader *reader, void *br_)
510 {
511   struct random_reader *br = br_;
512   struct random_reader_shared *shared = br->shared;
513   struct ccase *c = shared->class->read (reader, shared->aux,
514                                          br->offset - shared->min_offset);
515   if (c != NULL)
516     {
517       br->offset++;
518       heap_changed (shared->readers, &br->heap_node);
519       advance_random_reader (reader, shared);
520     }
521   return c;
522 }
523
524 /* struct casereader_class "destroy" function for random
525    reader. */
526 static void
527 random_reader_destroy (struct casereader *reader, void *br_)
528 {
529   struct random_reader *br = br_;
530   struct random_reader_shared *shared = br->shared;
531
532   heap_delete (shared->readers, &br->heap_node);
533   if (heap_is_empty (shared->readers))
534     {
535       heap_destroy (shared->readers);
536       shared->class->destroy (reader, shared->aux);
537       free (shared);
538     }
539   else
540     advance_random_reader (reader, shared);
541
542   free (br);
543 }
544
545 /* struct casereader_class "clone" function for random reader. */
546 static struct casereader *
547 random_reader_clone (struct casereader *reader, void *br_)
548 {
549   struct random_reader *br = br_;
550   struct random_reader_shared *shared = br->shared;
551   return casereader_create_sequential (casereader_get_taint (reader),
552                                        reader->proto,
553                                        casereader_get_n_cases (reader),
554                                        &random_reader_casereader_class,
555                                        make_random_reader (shared,
556                                                            br->offset));
557 }
558
559 /* struct casereader_class "peek" function for random reader. */
560 static struct ccase *
561 random_reader_peek (struct casereader *reader, void *br_, casenumber idx)
562 {
563   struct random_reader *br = br_;
564   struct random_reader_shared *shared = br->shared;
565
566   return shared->class->read (reader, shared->aux,
567                               br->offset - shared->min_offset + idx);
568 }
569
570 /* Casereader class for random reader. */
571 static const struct casereader_class random_reader_casereader_class =
572   {
573     random_reader_read,
574     random_reader_destroy,
575     random_reader_clone,
576     random_reader_peek,
577   };
578 \f
579 \f
580 static const struct casereader_class casereader_null_class;
581
582 /* Returns a casereader with no cases.  The casereader has the prototype
583    specified by PROTO.  PROTO may be specified as a null pointer, in which case
584    the casereader has no variables. */
585 struct casereader *
586 casereader_create_empty (const struct caseproto *proto_)
587 {
588   struct casereader *reader;
589   struct caseproto *proto;
590
591   proto = proto_ != NULL ? caseproto_ref (proto_) : caseproto_create ();
592   reader = casereader_create_sequential (NULL, proto, 0,
593                                          &casereader_null_class, NULL);
594   caseproto_unref (proto);
595
596   return reader;
597 }
598
599 static struct ccase *
600 casereader_null_read (struct casereader *reader UNUSED, void *aux UNUSED)
601 {
602   return NULL;
603 }
604
605 static void
606 casereader_null_destroy (struct casereader *reader UNUSED, void *aux UNUSED)
607 {
608   /* Nothing to do. */
609 }
610
611 static const struct casereader_class casereader_null_class =
612   {
613     casereader_null_read,
614     casereader_null_destroy,
615     NULL,                       /* clone */
616     NULL,                       /* peek */
617   };