casereader: Implement save, load.
[pspp] / src / data / casereader.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2007, 2009, 2010, 2013 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include "data/casereader.h"
20 #include "data/casereader-provider.h"
21
22 #include <stdlib.h>
23
24 #include "data/casereader-shim.h"
25 #include "data/casewriter.h"
26 #include "libpspp/assertion.h"
27 #include "libpspp/heap.h"
28 #include "libpspp/taint.h"
29
30 #include "gl/xalloc.h"
31
32 /* A casereader. */
33 struct casereader
34   {
35     struct taint *taint;                  /* Corrupted? */
36     struct caseproto *proto;              /* Format of contained cases. */
37     casenumber case_cnt;                  /* Number of cases,
38                                              CASENUMBER_MAX if unknown. */
39     const struct casereader_class *class; /* Class. */
40     void *aux;                            /* Auxiliary data for class. */
41   };
42
43 /* Reads and returns the next case from READER.  The caller owns
44    the returned case and must call case_unref on it when its data
45    is no longer needed.  Returns a null pointer if cases have
46    been exhausted or upon detection of an I/O error.
47
48    The case returned is effectively consumed: it can never be
49    read again through READER.  If this is inconvenient, READER
50    may be cloned in advance with casereader_clone, or
51    casereader_peek may be used instead. */
52 struct ccase *
53 casereader_read (struct casereader *reader)
54 {
55   if (reader->case_cnt != 0)
56     {
57       /* ->read may use casereader_swap to replace itself by
58          another reader and then delegate to that reader by
59          recursively calling casereader_read.  Currently only
60          lazy_casereader does this and, with luck, nothing else
61          ever will.
62
63          To allow this to work, however, we must decrement
64          case_cnt before calling ->read.  If we decremented
65          case_cnt after calling ->read, then this would actually
66          drop two cases from case_cnt instead of one, and we'd
67          lose the last case in the casereader. */
68       struct ccase *c;
69       if (reader->case_cnt != CASENUMBER_MAX)
70         reader->case_cnt--;
71       c = reader->class->read (reader, reader->aux);
72       if (c != NULL)
73         {
74           size_t n_widths UNUSED = caseproto_get_n_widths (reader->proto);
75           assert (case_get_value_cnt (c) >= n_widths);
76           expensive_assert (caseproto_equal (case_get_proto (c), 0,
77                                              reader->proto, 0, n_widths));
78           return c;
79         }
80     }
81   reader->case_cnt = 0;
82   return NULL;
83 }
84
85 /* Destroys READER.
86    Returns false if an I/O error was detected on READER, true
87    otherwise. */
88 bool
89 casereader_destroy (struct casereader *reader)
90 {
91   bool ok = true;
92   if (reader != NULL)
93     {
94       reader->class->destroy (reader, reader->aux);
95       ok = taint_destroy (reader->taint);
96       caseproto_unref (reader->proto);
97       free (reader);
98     }
99   return ok;
100 }
101
102 /* Returns a clone of READER.  READER and its clone may be used
103    to read the same sequence of cases in the same order, barring
104    I/O errors. */
105 struct casereader *
106 casereader_clone (const struct casereader *reader_)
107 {
108   struct casereader *reader = CONST_CAST (struct casereader *, reader_);
109   struct casereader *clone;
110   if ( reader == NULL ) 
111     return NULL;
112
113   if (reader->class->clone == NULL)
114     casereader_shim_insert (reader);
115   clone = reader->class->clone (reader, reader->aux);
116   assert (clone != NULL);
117   assert (clone != reader);
118   return clone;
119 }
120
121 /* Returns a copy of READER, which is itself destroyed.
122    Useful for taking over ownership of a casereader, to enforce
123    preventing the original owner from accessing the casereader
124    again. */
125 struct casereader *
126 casereader_rename (struct casereader *reader)
127 {
128   struct casereader *new = xmemdup (reader, sizeof *reader);
129   free (reader);
130   return new;
131 }
132
133 /* Exchanges the casereaders referred to by A and B. */
134 void
135 casereader_swap (struct casereader *a, struct casereader *b)
136 {
137   if (a != b)
138     {
139       struct casereader tmp = *a;
140       *a = *b;
141       *b = tmp;
142     }
143 }
144
145 /* Reads and returns the (IDX + 1)'th case from READER.  The
146    caller owns the returned case and must call case_unref on it
147    when it is no longer needed.  Returns a null pointer if cases
148    have been exhausted or upon detection of an I/O error. */
149 struct ccase *
150 casereader_peek (struct casereader *reader, casenumber idx)
151 {
152   if (idx < reader->case_cnt)
153     {
154       struct ccase *c;
155       if (reader->class->peek == NULL)
156         casereader_shim_insert (reader);
157       c = reader->class->peek (reader, reader->aux, idx);
158       if (c != NULL)
159         return c;
160       else if (casereader_error (reader))
161         reader->case_cnt = 0;
162     }
163   if (reader->case_cnt > idx)
164     reader->case_cnt = idx;
165   return NULL;
166 }
167
168 /* Returns true if no cases remain to be read from READER, or if
169    an error has occurred on READER.  (A return value of false
170    does *not* mean that the next call to casereader_peek or
171    casereader_read will return true, because an error can occur
172    in the meantime.) */
173 bool
174 casereader_is_empty (struct casereader *reader)
175 {
176   if (reader->case_cnt == 0)
177     return true;
178   else
179     {
180       struct ccase *c = casereader_peek (reader, 0);
181       if (c == NULL)
182         return true;
183       else
184         {
185           case_unref (c);
186           return false;
187         }
188     }
189 }
190
191 /* Returns true if an I/O error or another hard error has
192    occurred on READER, a clone of READER, or on some object on
193    which READER's data has a dependency, false otherwise. */
194 bool
195 casereader_error (const struct casereader *reader)
196 {
197   return taint_is_tainted (reader->taint);
198 }
199
200 /* Marks READER as having encountered an error.
201
202    Ordinarily, this function should be called by the
203    implementation of a casereader, not by the casereader's
204    client.  Instead, casereader clients should usually ensure
205    that a casereader's error state is correct by using
206    taint_propagate to propagate to the casereader's taint
207    structure, which may be obtained via casereader_get_taint. */
208 void
209 casereader_force_error (struct casereader *reader)
210 {
211   taint_set_taint (reader->taint);
212 }
213
214 /* Returns READER's associate taint object, for use with
215    taint_propagate and other taint functions. */
216 const struct taint *
217 casereader_get_taint (const struct casereader *reader)
218 {
219   return reader->taint;
220 }
221
222 /* Returns the number of cases that will be read by successive
223    calls to casereader_read for READER, assuming that no errors
224    occur.  Upon an error condition, the case count drops to 0, so
225    that no more cases can be obtained.
226
227    Not all casereaders can predict the number of cases that they
228    will produce without actually reading all of them.  In that
229    case, this function returns CASENUMBER_MAX.  To obtain the
230    actual number of cases in such a casereader, use
231    casereader_count_cases. */
232 casenumber
233 casereader_get_case_cnt (struct casereader *reader)
234 {
235   return reader->case_cnt;
236 }
237
238 static casenumber
239 casereader_count_cases__ (const struct casereader *reader,
240                           casenumber max_cases)
241 {
242   struct casereader *clone;
243   casenumber n_cases;
244
245   clone = casereader_clone (reader);
246   n_cases = casereader_advance (clone, max_cases);
247   casereader_destroy (clone);
248
249   return n_cases;
250 }
251
252 /* Returns the number of cases that will be read by successive
253    calls to casereader_read for READER, assuming that no errors
254    occur.  Upon an error condition, the case count drops to 0, so
255    that no more cases can be obtained.
256
257    For a casereader that cannot predict the number of cases it
258    will produce, this function actually reads (and discards) all
259    of the contents of a clone of READER.  Thus, the return value
260    is always correct in the absence of I/O errors. */
261 casenumber
262 casereader_count_cases (const struct casereader *reader)
263 {
264   if (reader->case_cnt == CASENUMBER_MAX)
265     {
266       struct casereader *reader_rw = CONST_CAST (struct casereader *, reader);
267       reader_rw->case_cnt = casereader_count_cases__ (reader, CASENUMBER_MAX);
268     }
269   return reader->case_cnt;
270 }
271
272 /* Truncates READER to at most N cases. */
273 void
274 casereader_truncate (struct casereader *reader, casenumber n)
275 {
276   /* This could be optimized, if it ever becomes too expensive, by adding a
277      "max_cases" member to struct casereader.  We could also add a "truncate"
278      function to the casereader implementation, to allow the casereader to
279      throw away data that cannot ever be read. */
280   if (reader->case_cnt == CASENUMBER_MAX)
281     reader->case_cnt = casereader_count_cases__ (reader, n);
282   if (reader->case_cnt > n)
283     reader->case_cnt = n;
284 }
285
286 /* Returns the prototype for the cases in READER.  The caller
287    must not unref the returned prototype. */
288 const struct caseproto *
289 casereader_get_proto (const struct casereader *reader)
290 {
291   return reader->proto;
292 }
293
294 /* Skips past N cases in READER, stopping when the last case in
295    READER has been read or on an input error.  Returns the number
296    of cases successfully skipped. */
297 casenumber
298 casereader_advance (struct casereader *reader, casenumber n)
299 {
300   casenumber i;
301
302   for (i = 0; i < n; i++)
303     {
304       struct ccase *c = casereader_read (reader);
305       if (c == NULL)
306         break;
307       case_unref (c);
308     }
309
310   return i;
311 }
312
313
314 /* Copies all the cases in READER to WRITER, propagating errors
315    appropriately. READER is destroyed by this function */
316 void
317 casereader_transfer (struct casereader *reader, struct casewriter *writer)
318 {
319   struct ccase *c;
320
321   taint_propagate (casereader_get_taint (reader),
322                    casewriter_get_taint (writer));
323   while ((c = casereader_read (reader)) != NULL)
324     casewriter_write (writer, c);
325   casereader_destroy (reader);
326 }
327
328 struct pxd_object *
329 casereader_save (const struct casereader *reader_, struct pxd *pxd)
330 {
331   struct pxd_array_builder ab;
332   struct casereader *reader;
333   struct pxd_builder b;
334   struct ccase *c;
335
336   pxd_array_builder_init (&ab, pxd);
337   reader = casereader_clone (reader_);
338   while ((c = casereader_read (reader)) != NULL)
339     {
340       pxd_array_builder_add (&ab, case_save (c, pxd));
341       case_unref (c);
342     }
343   casereader_destroy (reader);
344
345   pxd_builder_init (&b, pxd);
346   pxd_builder_put_link (&b, caseproto_save (reader->proto, pxd));
347   pxd_builder_put_link (&b, pxd_array_builder_commit (&ab));
348
349   return pxd_builder_commit (&b);
350 }
351
352 struct casereader *
353 casereader_load (struct pxd_object *object, const struct pxd *pxd)
354 {
355   struct casewriter *writer;
356   struct caseproto *proto;
357   struct pxd_array array;
358   struct pxd_parser p;
359   size_t i;
360
361   pxd_parser_init (&p, object, pxd);
362
363   proto = caseproto_load (pxd_parser_get_link (&p), pxd);
364   writer = autopaging_writer_create (proto);
365   caseproto_unref (proto);
366
367   pxd_array_init (&array, pxd_parser_get_link (&p), pxd);
368   for (i = 0; i < pxd_array_size (&array); i++)
369     casewriter_write (writer, case_load (pxd_array_get (&array, i), pxd));
370   pxd_array_destroy (&array);
371
372   pxd_parser_destroy (&p);
373
374   return casewriter_make_reader (writer);
375 }
376
377
378 /* Creates and returns a new casereader.  This function is
379    intended for use by casereader implementations, not by
380    casereader clients.
381
382    This function is most suited for creating a casereader for a
383    data source that is naturally sequential.
384    casereader_create_random may be more appropriate for a data
385    source that supports random access.
386
387    Ordinarily, specify a null pointer for TAINT, in which case
388    the new casereader will have a new, unique taint object.  If
389    the new casereader should have a clone of an existing taint
390    object, specify that object as TAINT.  (This is most commonly
391    useful in an implementation of the "clone" casereader_class
392    function, in which case the cloned casereader should have the
393    same taint object as the original casereader.)
394
395    PROTO must be the prototype for the cases that may be read
396    from the casereader.  The caller retains its reference to
397    PROTO.
398
399    CASE_CNT is an upper limit on the number of cases that
400    casereader_read will return from the casereader in successive
401    calls.  Ordinarily, this is the actual number of cases in the
402    data source or CASENUMBER_MAX if the number of cases cannot be
403    predicted in advance.
404
405    CLASS and AUX are a set of casereader implementation-specific
406    member functions and auxiliary data to pass to those member
407    functions, respectively. */
408 struct casereader *
409 casereader_create_sequential (const struct taint *taint,
410                               const struct caseproto *proto,
411                               casenumber case_cnt,
412                               const struct casereader_class *class, void *aux)
413 {
414   struct casereader *reader = xmalloc (sizeof *reader);
415   reader->taint = taint != NULL ? taint_clone (taint) : taint_create ();
416   reader->proto = caseproto_ref (proto);
417   reader->case_cnt = case_cnt;
418   reader->class = class;
419   reader->aux = aux;
420   return reader;
421 }
422
423 /* If READER is a casereader of the given CLASS, returns its
424    associated auxiliary data; otherwise, returns a null pointer.
425
426    This function is intended for use from casereader
427    implementations, not by casereader users.  Even within
428    casereader implementations, its usefulness is quite limited,
429    for at least two reasons.  First, every casereader member
430    function already receives a pointer to the casereader's
431    auxiliary data.  Second, a casereader's class can change
432    (through a call to casereader_swap) and this is in practice
433    quite common (e.g. any call to casereader_clone on a
434    casereader that does not directly support clone will cause the
435    casereader to be replaced by a shim caseader). */
436 void *
437 casereader_dynamic_cast (struct casereader *reader,
438                          const struct casereader_class *class)
439 {
440   return reader->class == class ? reader->aux : NULL;
441 }
442 \f
443 /* Random-access casereader implementation.
444
445    This is a set of wrappers around casereader_create_sequential
446    and struct casereader_class to make it easy to create
447    efficient casereaders for data sources that natively support
448    random access. */
449
450 /* One clone of a random reader. */
451 struct random_reader
452   {
453     struct random_reader_shared *shared; /* Data shared among clones. */
454     struct heap_node heap_node; /* Node in shared data's heap of readers. */
455     casenumber offset;          /* Number of cases already read. */
456   };
457
458 /* Returns the random_reader in which the given heap_node is
459    embedded. */
460 static struct random_reader *
461 random_reader_from_heap_node (const struct heap_node *node)
462 {
463   return heap_data (node, struct random_reader, heap_node);
464 }
465
466 /* Data shared among clones of a random reader. */
467 struct random_reader_shared
468   {
469     struct heap *readers;       /* Heap of struct random_readers. */
470     casenumber min_offset;      /* Smallest offset of any random_reader. */
471     const struct casereader_random_class *class;
472     void *aux;
473   };
474
475 static const struct casereader_class random_reader_casereader_class;
476
477 /* Creates and returns a new random_reader with the given SHARED
478    data and OFFSET.  Inserts the new random reader into the
479    shared heap. */
480 static struct random_reader *
481 make_random_reader (struct random_reader_shared *shared, casenumber offset)
482 {
483   struct random_reader *br = xmalloc (sizeof *br);
484   br->offset = offset;
485   br->shared = shared;
486   heap_insert (shared->readers, &br->heap_node);
487   return br;
488 }
489
490 /* Compares random_readers A and B by offset and returns a
491    strcmp()-like result. */
492 static int
493 compare_random_readers_by_offset (const struct heap_node *a_,
494                                   const struct heap_node *b_,
495                                   const void *aux UNUSED)
496 {
497   const struct random_reader *a = random_reader_from_heap_node (a_);
498   const struct random_reader *b = random_reader_from_heap_node (b_);
499   return a->offset < b->offset ? -1 : a->offset > b->offset;
500 }
501
502 /* Creates and returns a new casereader.  This function is
503    intended for use by casereader implementations, not by
504    casereader clients.
505
506    This function is most suited for creating a casereader for a
507    data source that supports random access.
508    casereader_create_sequential is more appropriate for a data
509    source that is naturally sequential.
510
511    PROTO must be the prototype for the cases that may be read
512    from the casereader.  The caller retains its reference to
513    PROTO.
514
515    CASE_CNT is an upper limit on the number of cases that
516    casereader_read will return from the casereader in successive
517    calls.  Ordinarily, this is the actual number of cases in the
518    data source or CASENUMBER_MAX if the number of cases cannot be
519    predicted in advance.
520
521    CLASS and AUX are a set of casereader implementation-specific
522    member functions and auxiliary data to pass to those member
523    functions, respectively. */
524 struct casereader *
525 casereader_create_random (const struct caseproto *proto, casenumber case_cnt,
526                           const struct casereader_random_class *class,
527                           void *aux)
528 {
529   struct random_reader_shared *shared = xmalloc (sizeof *shared);
530   shared->readers = heap_create (compare_random_readers_by_offset, NULL);
531   shared->class = class;
532   shared->aux = aux;
533   shared->min_offset = 0;
534   return casereader_create_sequential (NULL, proto, case_cnt,
535                                        &random_reader_casereader_class,
536                                        make_random_reader (shared, 0));
537 }
538
539 /* Reassesses the min_offset in SHARED based on the minimum
540    offset in the heap.   */
541 static void
542 advance_random_reader (struct casereader *reader,
543                        struct random_reader_shared *shared)
544 {
545   casenumber old, new;
546
547   old = shared->min_offset;
548   new = random_reader_from_heap_node (heap_minimum (shared->readers))->offset;
549   assert (new >= old);
550   if (new > old)
551     {
552       shared->min_offset = new;
553       shared->class->advance (reader, shared->aux, new - old);
554     }
555 }
556
557 /* struct casereader_class "read" function for random reader. */
558 static struct ccase *
559 random_reader_read (struct casereader *reader, void *br_)
560 {
561   struct random_reader *br = br_;
562   struct random_reader_shared *shared = br->shared;
563   struct ccase *c = shared->class->read (reader, shared->aux,
564                                          br->offset - shared->min_offset);
565   if (c != NULL)
566     {
567       br->offset++;
568       heap_changed (shared->readers, &br->heap_node);
569       advance_random_reader (reader, shared);
570     }
571   return c;
572 }
573
574 /* struct casereader_class "destroy" function for random
575    reader. */
576 static void
577 random_reader_destroy (struct casereader *reader, void *br_)
578 {
579   struct random_reader *br = br_;
580   struct random_reader_shared *shared = br->shared;
581
582   heap_delete (shared->readers, &br->heap_node);
583   if (heap_is_empty (shared->readers))
584     {
585       heap_destroy (shared->readers);
586       shared->class->destroy (reader, shared->aux);
587       free (shared);
588     }
589   else
590     advance_random_reader (reader, shared);
591
592   free (br);
593 }
594
595 /* struct casereader_class "clone" function for random reader. */
596 static struct casereader *
597 random_reader_clone (struct casereader *reader, void *br_)
598 {
599   struct random_reader *br = br_;
600   struct random_reader_shared *shared = br->shared;
601   return casereader_create_sequential (casereader_get_taint (reader),
602                                        reader->proto,
603                                        casereader_get_case_cnt (reader),
604                                        &random_reader_casereader_class,
605                                        make_random_reader (shared,
606                                                            br->offset));
607 }
608
609 /* struct casereader_class "peek" function for random reader. */
610 static struct ccase *
611 random_reader_peek (struct casereader *reader, void *br_, casenumber idx)
612 {
613   struct random_reader *br = br_;
614   struct random_reader_shared *shared = br->shared;
615
616   return shared->class->read (reader, shared->aux,
617                               br->offset - shared->min_offset + idx);
618 }
619
620 /* Casereader class for random reader. */
621 static const struct casereader_class random_reader_casereader_class =
622   {
623     random_reader_read,
624     random_reader_destroy,
625     random_reader_clone,
626     random_reader_peek,
627   };
628 \f
629 \f
630 static const struct casereader_class casereader_null_class;
631
632 /* Returns a casereader with no cases.  The casereader has the prototype
633    specified by PROTO.  PROTO may be specified as a null pointer, in which case
634    the casereader has no variables. */
635 struct casereader *
636 casereader_create_empty (const struct caseproto *proto_)
637 {
638   struct casereader *reader;
639   struct caseproto *proto;
640
641   proto = proto_ != NULL ? caseproto_ref (proto_) : caseproto_create ();
642   reader = casereader_create_sequential (NULL, proto, 0,
643                                          &casereader_null_class, NULL);
644   caseproto_unref (proto);
645
646   return reader;
647 }
648
649 static struct ccase *
650 casereader_null_read (struct casereader *reader UNUSED, void *aux UNUSED)
651 {
652   return NULL;
653 }
654
655 static void
656 casereader_null_destroy (struct casereader *reader UNUSED, void *aux UNUSED)
657 {
658   /* Nothing to do. */
659 }
660
661 static const struct casereader_class casereader_null_class =
662   {
663     casereader_null_read,
664     casereader_null_destroy,
665     NULL,                       /* clone */
666     NULL,                       /* peek */
667   };