1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 2007, 2009, 2010, 2013 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
19 #include "data/casereader.h"
20 #include "data/casereader-provider.h"
24 #include "data/casereader-shim.h"
25 #include "data/casewriter.h"
26 #include "libpspp/assertion.h"
27 #include "libpspp/heap.h"
28 #include "libpspp/taint.h"
30 #include "gl/xalloc.h"
35 struct taint *taint; /* Corrupted? */
36 struct caseproto *proto; /* Format of contained cases. */
37 casenumber case_cnt; /* Number of cases,
38 CASENUMBER_MAX if unknown. */
39 const struct casereader_class *class; /* Class. */
40 void *aux; /* Auxiliary data for class. */
43 /* Reads and returns the next case from READER. The caller owns
44 the returned case and must call case_unref on it when its data
45 is no longer needed. Returns a null pointer if cases have
46 been exhausted or upon detection of an I/O error.
48 The case returned is effectively consumed: it can never be
49 read again through READER. If this is inconvenient, READER
50 may be cloned in advance with casereader_clone, or
51 casereader_peek may be used instead. */
53 casereader_read (struct casereader *reader)
55 if (reader->case_cnt != 0)
57 /* ->read may use casereader_swap to replace itself by
58 another reader and then delegate to that reader by
59 recursively calling casereader_read. Currently only
60 lazy_casereader does this and, with luck, nothing else
63 To allow this to work, however, we must decrement
64 case_cnt before calling ->read. If we decremented
65 case_cnt after calling ->read, then this would actually
66 drop two cases from case_cnt instead of one, and we'd
67 lose the last case in the casereader. */
69 if (reader->case_cnt != CASENUMBER_MAX)
71 c = reader->class->read (reader, reader->aux);
74 size_t n_widths UNUSED = caseproto_get_n_widths (reader->proto);
75 assert (case_get_value_cnt (c) >= n_widths);
76 expensive_assert (caseproto_equal (case_get_proto (c), 0,
77 reader->proto, 0, n_widths));
86 Returns false if an I/O error was detected on READER, true
89 casereader_destroy (struct casereader *reader)
94 reader->class->destroy (reader, reader->aux);
95 ok = taint_destroy (reader->taint);
96 caseproto_unref (reader->proto);
102 /* Returns a clone of READER. READER and its clone may be used
103 to read the same sequence of cases in the same order, barring
106 casereader_clone (const struct casereader *reader_)
108 struct casereader *reader = CONST_CAST (struct casereader *, reader_);
109 struct casereader *clone;
110 if ( reader == NULL )
113 if (reader->class->clone == NULL)
114 casereader_shim_insert (reader);
115 clone = reader->class->clone (reader, reader->aux);
116 assert (clone != NULL);
117 assert (clone != reader);
121 /* Returns a copy of READER, which is itself destroyed.
122 Useful for taking over ownership of a casereader, to enforce
123 preventing the original owner from accessing the casereader
126 casereader_rename (struct casereader *reader)
128 struct casereader *new = xmemdup (reader, sizeof *reader);
133 /* Exchanges the casereaders referred to by A and B. */
135 casereader_swap (struct casereader *a, struct casereader *b)
139 struct casereader tmp = *a;
145 /* Reads and returns the (IDX + 1)'th case from READER. The
146 caller owns the returned case and must call case_unref on it
147 when it is no longer needed. Returns a null pointer if cases
148 have been exhausted or upon detection of an I/O error. */
150 casereader_peek (struct casereader *reader, casenumber idx)
152 if (idx < reader->case_cnt)
155 if (reader->class->peek == NULL)
156 casereader_shim_insert (reader);
157 c = reader->class->peek (reader, reader->aux, idx);
160 else if (casereader_error (reader))
161 reader->case_cnt = 0;
163 if (reader->case_cnt > idx)
164 reader->case_cnt = idx;
168 /* Returns true if no cases remain to be read from READER, or if
169 an error has occurred on READER. (A return value of false
170 does *not* mean that the next call to casereader_peek or
171 casereader_read will return true, because an error can occur
174 casereader_is_empty (struct casereader *reader)
176 if (reader->case_cnt == 0)
180 struct ccase *c = casereader_peek (reader, 0);
191 /* Returns true if an I/O error or another hard error has
192 occurred on READER, a clone of READER, or on some object on
193 which READER's data has a dependency, false otherwise. */
195 casereader_error (const struct casereader *reader)
197 return taint_is_tainted (reader->taint);
200 /* Marks READER as having encountered an error.
202 Ordinarily, this function should be called by the
203 implementation of a casereader, not by the casereader's
204 client. Instead, casereader clients should usually ensure
205 that a casereader's error state is correct by using
206 taint_propagate to propagate to the casereader's taint
207 structure, which may be obtained via casereader_get_taint. */
209 casereader_force_error (struct casereader *reader)
211 taint_set_taint (reader->taint);
214 /* Returns READER's associate taint object, for use with
215 taint_propagate and other taint functions. */
217 casereader_get_taint (const struct casereader *reader)
219 return reader->taint;
222 /* Returns the number of cases that will be read by successive
223 calls to casereader_read for READER, assuming that no errors
224 occur. Upon an error condition, the case count drops to 0, so
225 that no more cases can be obtained.
227 Not all casereaders can predict the number of cases that they
228 will produce without actually reading all of them. In that
229 case, this function returns CASENUMBER_MAX. To obtain the
230 actual number of cases in such a casereader, use
231 casereader_count_cases. */
233 casereader_get_case_cnt (struct casereader *reader)
235 return reader->case_cnt;
239 casereader_count_cases__ (const struct casereader *reader,
240 casenumber max_cases)
242 struct casereader *clone;
245 clone = casereader_clone (reader);
246 n_cases = casereader_advance (clone, max_cases);
247 casereader_destroy (clone);
252 /* Returns the number of cases that will be read by successive
253 calls to casereader_read for READER, assuming that no errors
254 occur. Upon an error condition, the case count drops to 0, so
255 that no more cases can be obtained.
257 For a casereader that cannot predict the number of cases it
258 will produce, this function actually reads (and discards) all
259 of the contents of a clone of READER. Thus, the return value
260 is always correct in the absence of I/O errors. */
262 casereader_count_cases (const struct casereader *reader)
264 if (reader->case_cnt == CASENUMBER_MAX)
266 struct casereader *reader_rw = CONST_CAST (struct casereader *, reader);
267 reader_rw->case_cnt = casereader_count_cases__ (reader, CASENUMBER_MAX);
269 return reader->case_cnt;
272 /* Truncates READER to at most N cases. */
274 casereader_truncate (struct casereader *reader, casenumber n)
276 /* This could be optimized, if it ever becomes too expensive, by adding a
277 "max_cases" member to struct casereader. We could also add a "truncate"
278 function to the casereader implementation, to allow the casereader to
279 throw away data that cannot ever be read. */
280 if (reader->case_cnt == CASENUMBER_MAX)
281 reader->case_cnt = casereader_count_cases__ (reader, n);
282 if (reader->case_cnt > n)
283 reader->case_cnt = n;
286 /* Returns the prototype for the cases in READER. The caller
287 must not unref the returned prototype. */
288 const struct caseproto *
289 casereader_get_proto (const struct casereader *reader)
291 return reader->proto;
294 /* Skips past N cases in READER, stopping when the last case in
295 READER has been read or on an input error. Returns the number
296 of cases successfully skipped. */
298 casereader_advance (struct casereader *reader, casenumber n)
302 for (i = 0; i < n; i++)
304 struct ccase *c = casereader_read (reader);
314 /* Copies all the cases in READER to WRITER, propagating errors
315 appropriately. READER is destroyed by this function */
317 casereader_transfer (struct casereader *reader, struct casewriter *writer)
321 taint_propagate (casereader_get_taint (reader),
322 casewriter_get_taint (writer));
323 while ((c = casereader_read (reader)) != NULL)
324 casewriter_write (writer, c);
325 casereader_destroy (reader);
329 casereader_save (const struct casereader *reader_, struct pxd *pxd)
331 struct pxd_array_builder ab;
332 struct casereader *reader;
333 struct pxd_builder b;
336 pxd_array_builder_init (&ab, pxd);
337 reader = casereader_clone (reader_);
338 while ((c = casereader_read (reader)) != NULL)
340 pxd_array_builder_add (&ab, case_save (c, pxd));
343 casereader_destroy (reader);
345 pxd_builder_init (&b, pxd);
346 pxd_builder_put_link (&b, caseproto_save (reader->proto, pxd));
347 pxd_builder_put_link (&b, pxd_array_builder_commit (&ab));
349 return pxd_builder_commit (&b);
353 casereader_load (struct pxd_object *object, const struct pxd *pxd)
355 struct casewriter *writer;
356 struct caseproto *proto;
357 struct pxd_array array;
361 pxd_parser_init (&p, object, pxd);
363 proto = caseproto_load (pxd_parser_get_link (&p), pxd);
364 writer = autopaging_writer_create (proto);
365 caseproto_unref (proto);
367 pxd_array_init (&array, pxd_parser_get_link (&p), pxd);
368 for (i = 0; i < pxd_array_size (&array); i++)
369 casewriter_write (writer, case_load (pxd_array_get (&array, i), pxd));
370 pxd_array_destroy (&array);
372 pxd_parser_destroy (&p);
374 return casewriter_make_reader (writer);
378 /* Creates and returns a new casereader. This function is
379 intended for use by casereader implementations, not by
382 This function is most suited for creating a casereader for a
383 data source that is naturally sequential.
384 casereader_create_random may be more appropriate for a data
385 source that supports random access.
387 Ordinarily, specify a null pointer for TAINT, in which case
388 the new casereader will have a new, unique taint object. If
389 the new casereader should have a clone of an existing taint
390 object, specify that object as TAINT. (This is most commonly
391 useful in an implementation of the "clone" casereader_class
392 function, in which case the cloned casereader should have the
393 same taint object as the original casereader.)
395 PROTO must be the prototype for the cases that may be read
396 from the casereader. The caller retains its reference to
399 CASE_CNT is an upper limit on the number of cases that
400 casereader_read will return from the casereader in successive
401 calls. Ordinarily, this is the actual number of cases in the
402 data source or CASENUMBER_MAX if the number of cases cannot be
403 predicted in advance.
405 CLASS and AUX are a set of casereader implementation-specific
406 member functions and auxiliary data to pass to those member
407 functions, respectively. */
409 casereader_create_sequential (const struct taint *taint,
410 const struct caseproto *proto,
412 const struct casereader_class *class, void *aux)
414 struct casereader *reader = xmalloc (sizeof *reader);
415 reader->taint = taint != NULL ? taint_clone (taint) : taint_create ();
416 reader->proto = caseproto_ref (proto);
417 reader->case_cnt = case_cnt;
418 reader->class = class;
423 /* If READER is a casereader of the given CLASS, returns its
424 associated auxiliary data; otherwise, returns a null pointer.
426 This function is intended for use from casereader
427 implementations, not by casereader users. Even within
428 casereader implementations, its usefulness is quite limited,
429 for at least two reasons. First, every casereader member
430 function already receives a pointer to the casereader's
431 auxiliary data. Second, a casereader's class can change
432 (through a call to casereader_swap) and this is in practice
433 quite common (e.g. any call to casereader_clone on a
434 casereader that does not directly support clone will cause the
435 casereader to be replaced by a shim caseader). */
437 casereader_dynamic_cast (struct casereader *reader,
438 const struct casereader_class *class)
440 return reader->class == class ? reader->aux : NULL;
443 /* Random-access casereader implementation.
445 This is a set of wrappers around casereader_create_sequential
446 and struct casereader_class to make it easy to create
447 efficient casereaders for data sources that natively support
450 /* One clone of a random reader. */
453 struct random_reader_shared *shared; /* Data shared among clones. */
454 struct heap_node heap_node; /* Node in shared data's heap of readers. */
455 casenumber offset; /* Number of cases already read. */
458 /* Returns the random_reader in which the given heap_node is
460 static struct random_reader *
461 random_reader_from_heap_node (const struct heap_node *node)
463 return heap_data (node, struct random_reader, heap_node);
466 /* Data shared among clones of a random reader. */
467 struct random_reader_shared
469 struct heap *readers; /* Heap of struct random_readers. */
470 casenumber min_offset; /* Smallest offset of any random_reader. */
471 const struct casereader_random_class *class;
475 static const struct casereader_class random_reader_casereader_class;
477 /* Creates and returns a new random_reader with the given SHARED
478 data and OFFSET. Inserts the new random reader into the
480 static struct random_reader *
481 make_random_reader (struct random_reader_shared *shared, casenumber offset)
483 struct random_reader *br = xmalloc (sizeof *br);
486 heap_insert (shared->readers, &br->heap_node);
490 /* Compares random_readers A and B by offset and returns a
491 strcmp()-like result. */
493 compare_random_readers_by_offset (const struct heap_node *a_,
494 const struct heap_node *b_,
495 const void *aux UNUSED)
497 const struct random_reader *a = random_reader_from_heap_node (a_);
498 const struct random_reader *b = random_reader_from_heap_node (b_);
499 return a->offset < b->offset ? -1 : a->offset > b->offset;
502 /* Creates and returns a new casereader. This function is
503 intended for use by casereader implementations, not by
506 This function is most suited for creating a casereader for a
507 data source that supports random access.
508 casereader_create_sequential is more appropriate for a data
509 source that is naturally sequential.
511 PROTO must be the prototype for the cases that may be read
512 from the casereader. The caller retains its reference to
515 CASE_CNT is an upper limit on the number of cases that
516 casereader_read will return from the casereader in successive
517 calls. Ordinarily, this is the actual number of cases in the
518 data source or CASENUMBER_MAX if the number of cases cannot be
519 predicted in advance.
521 CLASS and AUX are a set of casereader implementation-specific
522 member functions and auxiliary data to pass to those member
523 functions, respectively. */
525 casereader_create_random (const struct caseproto *proto, casenumber case_cnt,
526 const struct casereader_random_class *class,
529 struct random_reader_shared *shared = xmalloc (sizeof *shared);
530 shared->readers = heap_create (compare_random_readers_by_offset, NULL);
531 shared->class = class;
533 shared->min_offset = 0;
534 return casereader_create_sequential (NULL, proto, case_cnt,
535 &random_reader_casereader_class,
536 make_random_reader (shared, 0));
539 /* Reassesses the min_offset in SHARED based on the minimum
540 offset in the heap. */
542 advance_random_reader (struct casereader *reader,
543 struct random_reader_shared *shared)
547 old = shared->min_offset;
548 new = random_reader_from_heap_node (heap_minimum (shared->readers))->offset;
552 shared->min_offset = new;
553 shared->class->advance (reader, shared->aux, new - old);
557 /* struct casereader_class "read" function for random reader. */
558 static struct ccase *
559 random_reader_read (struct casereader *reader, void *br_)
561 struct random_reader *br = br_;
562 struct random_reader_shared *shared = br->shared;
563 struct ccase *c = shared->class->read (reader, shared->aux,
564 br->offset - shared->min_offset);
568 heap_changed (shared->readers, &br->heap_node);
569 advance_random_reader (reader, shared);
574 /* struct casereader_class "destroy" function for random
577 random_reader_destroy (struct casereader *reader, void *br_)
579 struct random_reader *br = br_;
580 struct random_reader_shared *shared = br->shared;
582 heap_delete (shared->readers, &br->heap_node);
583 if (heap_is_empty (shared->readers))
585 heap_destroy (shared->readers);
586 shared->class->destroy (reader, shared->aux);
590 advance_random_reader (reader, shared);
595 /* struct casereader_class "clone" function for random reader. */
596 static struct casereader *
597 random_reader_clone (struct casereader *reader, void *br_)
599 struct random_reader *br = br_;
600 struct random_reader_shared *shared = br->shared;
601 return casereader_create_sequential (casereader_get_taint (reader),
603 casereader_get_case_cnt (reader),
604 &random_reader_casereader_class,
605 make_random_reader (shared,
609 /* struct casereader_class "peek" function for random reader. */
610 static struct ccase *
611 random_reader_peek (struct casereader *reader, void *br_, casenumber idx)
613 struct random_reader *br = br_;
614 struct random_reader_shared *shared = br->shared;
616 return shared->class->read (reader, shared->aux,
617 br->offset - shared->min_offset + idx);
620 /* Casereader class for random reader. */
621 static const struct casereader_class random_reader_casereader_class =
624 random_reader_destroy,
630 static const struct casereader_class casereader_null_class;
632 /* Returns a casereader with no cases. The casereader has the prototype
633 specified by PROTO. PROTO may be specified as a null pointer, in which case
634 the casereader has no variables. */
636 casereader_create_empty (const struct caseproto *proto_)
638 struct casereader *reader;
639 struct caseproto *proto;
641 proto = proto_ != NULL ? caseproto_ref (proto_) : caseproto_create ();
642 reader = casereader_create_sequential (NULL, proto, 0,
643 &casereader_null_class, NULL);
644 caseproto_unref (proto);
649 static struct ccase *
650 casereader_null_read (struct casereader *reader UNUSED, void *aux UNUSED)
656 casereader_null_destroy (struct casereader *reader UNUSED, void *aux UNUSED)
661 static const struct casereader_class casereader_null_class =
663 casereader_null_read,
664 casereader_null_destroy,