1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 2007, 2009 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
19 #include <data/casereader.h>
20 #include <data/casereader-provider.h>
24 #include <data/casewindow.h>
25 #include <data/casewriter.h>
26 #include <data/settings.h>
27 #include <libpspp/assertion.h>
28 #include <libpspp/heap.h>
29 #include <libpspp/taint.h>
36 struct taint *taint; /* Corrupted? */
37 struct caseproto *proto; /* Format of contained cases. */
38 casenumber case_cnt; /* Number of cases,
39 CASENUMBER_MAX if unknown. */
40 const struct casereader_class *class; /* Class. */
41 void *aux; /* Auxiliary data for class. */
44 static void insert_shim (struct casereader *);
46 /* Reads and returns the next case from READER. The caller owns
47 the returned case and must call case_unref on it when its data
48 is no longer needed. Returns a null pointer if cases have
49 been exhausted or upon detection of an I/O error.
51 The case returned is effectively consumed: it can never be
52 read again through READER. If this is inconvenient, READER
53 may be cloned in advance with casereader_clone, or
54 casereader_peek may be used instead. */
56 casereader_read (struct casereader *reader)
58 if (reader->case_cnt != 0)
60 /* ->read may use casereader_swap to replace itself by
61 another reader and then delegate to that reader by
62 recursively calling casereader_read. Currently only
63 lazy_casereader does this and, with luck, nothing else
66 To allow this to work, however, we must decrement
67 case_cnt before calling ->read. If we decremented
68 case_cnt after calling ->read, then this would actually
69 drop two cases from case_cnt instead of one, and we'd
70 lose the last case in the casereader. */
72 if (reader->case_cnt != CASENUMBER_MAX)
74 c = reader->class->read (reader, reader->aux);
77 size_t n_widths UNUSED = caseproto_get_n_widths (reader->proto);
78 assert (case_get_value_cnt (c) >= n_widths);
79 expensive_assert (caseproto_equal (case_get_proto (c), 0,
80 reader->proto, 0, n_widths));
89 Returns false if an I/O error was detected on READER, true
92 casereader_destroy (struct casereader *reader)
97 reader->class->destroy (reader, reader->aux);
98 ok = taint_destroy (reader->taint);
99 caseproto_unref (reader->proto);
105 /* Returns a clone of READER. READER and its clone may be used
106 to read the same sequence of cases in the same order, barring
109 casereader_clone (const struct casereader *reader_)
111 struct casereader *reader = CONST_CAST (struct casereader *, reader_);
112 struct casereader *clone;
113 if ( reader == NULL )
116 if (reader->class->clone == NULL)
117 insert_shim (reader);
118 clone = reader->class->clone (reader, reader->aux);
119 assert (clone != NULL);
120 assert (clone != reader);
124 /* Makes a copy of ORIGINAL into *NEW1 (if NEW1 is non-null) and
125 *NEW2 (if NEW2 is non-null), then destroys ORIGINAL. */
127 casereader_split (struct casereader *original,
128 struct casereader **new1, struct casereader **new2)
130 if (new1 != NULL && new2 != NULL)
132 *new1 = casereader_rename (original);
133 *new2 = casereader_clone (*new1);
135 else if (new1 != NULL)
136 *new1 = casereader_rename (original);
137 else if (new2 != NULL)
138 *new2 = casereader_rename (original);
140 casereader_destroy (original);
143 /* Returns a copy of READER, which is itself destroyed.
144 Useful for taking over ownership of a casereader, to enforce
145 preventing the original owner from accessing the casereader
148 casereader_rename (struct casereader *reader)
150 struct casereader *new = xmemdup (reader, sizeof *reader);
155 /* Exchanges the casereaders referred to by A and B. */
157 casereader_swap (struct casereader *a, struct casereader *b)
161 struct casereader tmp = *a;
167 /* Reads and returns the (IDX + 1)'th case from READER. The
168 caller owns the returned case and must call case_unref on it
169 when it is no longer needed. Returns a null pointer if cases
170 have been exhausted or upon detection of an I/O error. */
172 casereader_peek (struct casereader *reader, casenumber idx)
174 if (idx < reader->case_cnt)
177 if (reader->class->peek == NULL)
178 insert_shim (reader);
179 c = reader->class->peek (reader, reader->aux, idx);
182 else if (casereader_error (reader))
183 reader->case_cnt = 0;
185 if (reader->case_cnt > idx)
186 reader->case_cnt = idx;
190 /* Returns true if no cases remain to be read from READER, or if
191 an error has occurred on READER. (A return value of false
192 does *not* mean that the next call to casereader_peek or
193 casereader_read will return true, because an error can occur
196 casereader_is_empty (struct casereader *reader)
198 if (reader->case_cnt == 0)
202 struct ccase *c = casereader_peek (reader, 0);
213 /* Returns true if an I/O error or another hard error has
214 occurred on READER, a clone of READER, or on some object on
215 which READER's data has a dependency, false otherwise. */
217 casereader_error (const struct casereader *reader)
219 return taint_is_tainted (reader->taint);
222 /* Marks READER as having encountered an error.
224 Ordinarily, this function should be called by the
225 implementation of a casereader, not by the casereader's
226 client. Instead, casereader clients should usually ensure
227 that a casereader's error state is correct by using
228 taint_propagate to propagate to the casereader's taint
229 structure, which may be obtained via casereader_get_taint. */
231 casereader_force_error (struct casereader *reader)
233 taint_set_taint (reader->taint);
236 /* Returns READER's associate taint object, for use with
237 taint_propagate and other taint functions. */
239 casereader_get_taint (const struct casereader *reader)
241 return reader->taint;
244 /* Returns the number of cases that will be read by successive
245 calls to casereader_read for READER, assuming that no errors
246 occur. Upon an error condition, the case count drops to 0, so
247 that no more cases can be obtained.
249 Not all casereaders can predict the number of cases that they
250 will produce without actually reading all of them. In that
251 case, this function returns CASENUMBER_MAX. To obtain the
252 actual number of cases in such a casereader, use
253 casereader_count_cases. */
255 casereader_get_case_cnt (struct casereader *reader)
257 return reader->case_cnt;
261 casereader_count_cases__ (const struct casereader *reader,
262 casenumber max_cases)
264 struct casereader *clone;
267 clone = casereader_clone (reader);
268 n_cases = casereader_advance (clone, max_cases);
269 casereader_destroy (clone);
274 /* Returns the number of cases that will be read by successive
275 calls to casereader_read for READER, assuming that no errors
276 occur. Upon an error condition, the case count drops to 0, so
277 that no more cases can be obtained.
279 For a casereader that cannot predict the number of cases it
280 will produce, this function actually reads (and discards) all
281 of the contents of a clone of READER. Thus, the return value
282 is always correct in the absence of I/O errors. */
284 casereader_count_cases (const struct casereader *reader)
286 if (reader->case_cnt == CASENUMBER_MAX)
288 struct casereader *reader_rw = CONST_CAST (struct casereader *, reader);
289 reader_rw->case_cnt = casereader_count_cases__ (reader, CASENUMBER_MAX);
291 return reader->case_cnt;
294 /* Truncates READER to at most N cases. */
296 casereader_truncate (struct casereader *reader, casenumber n)
298 /* This could be optimized, if it ever becomes too expensive, by adding a
299 "max_cases" member to struct casereader. We could also add a "truncate"
300 function to the casereader implementation, to allow the casereader to
301 throw away data that cannot ever be read. */
302 if (reader->case_cnt == CASENUMBER_MAX)
303 reader->case_cnt = casereader_count_cases__ (reader, n);
304 if (reader->case_cnt > n)
305 reader->case_cnt = n;
308 /* Returns the prototype for the cases in READER. The caller
309 must not unref the returned prototype. */
310 const struct caseproto *
311 casereader_get_proto (const struct casereader *reader)
313 return reader->proto;
316 /* Skips past N cases in READER, stopping when the last case in
317 READER has been read or on an input error. Returns the number
318 of cases successfully skipped. */
320 casereader_advance (struct casereader *reader, casenumber n)
324 for (i = 0; i < n; i++)
326 struct ccase *c = casereader_read (reader);
336 /* Copies all the cases in READER to WRITER, propagating errors
339 casereader_transfer (struct casereader *reader, struct casewriter *writer)
343 taint_propagate (casereader_get_taint (reader),
344 casewriter_get_taint (writer));
345 while ((c = casereader_read (reader)) != NULL)
346 casewriter_write (writer, c);
347 casereader_destroy (reader);
350 /* Creates and returns a new casereader. This function is
351 intended for use by casereader implementations, not by
354 This function is most suited for creating a casereader for a
355 data source that is naturally sequential.
356 casereader_create_random may be more appropriate for a data
357 source that supports random access.
359 Ordinarily, specify a null pointer for TAINT, in which case
360 the new casereader will have a new, unique taint object. If
361 the new casereader should have a clone of an existing taint
362 object, specify that object as TAINT. (This is most commonly
363 useful in an implementation of the "clone" casereader_class
364 function, in which case the cloned casereader should have the
365 same taint object as the original casereader.)
367 PROTO must be the prototype for the cases that may be read
368 from the casereader. The caller retains its reference to
371 CASE_CNT is an upper limit on the number of cases that
372 casereader_read will return from the casereader in successive
373 calls. Ordinarily, this is the actual number of cases in the
374 data source or CASENUMBER_MAX if the number of cases cannot be
375 predicted in advance.
377 CLASS and AUX are a set of casereader implementation-specific
378 member functions and auxiliary data to pass to those member
379 functions, respectively. */
381 casereader_create_sequential (const struct taint *taint,
382 const struct caseproto *proto,
384 const struct casereader_class *class, void *aux)
386 struct casereader *reader = xmalloc (sizeof *reader);
387 reader->taint = taint != NULL ? taint_clone (taint) : taint_create ();
388 reader->proto = caseproto_ref (proto);
389 reader->case_cnt = case_cnt;
390 reader->class = class;
395 /* If READER is a casereader of the given CLASS, returns its
396 associated auxiliary data; otherwise, returns a null pointer.
398 This function is intended for use from casereader
399 implementations, not by casereader users. Even within
400 casereader implementations, its usefulness is quite limited,
401 for at least two reasons. First, every casereader member
402 function already receives a pointer to the casereader's
403 auxiliary data. Second, a casereader's class can change
404 (through a call to casereader_swap) and this is in practice
405 quite common (e.g. any call to casereader_clone on a
406 casereader that does not directly support clone will cause the
407 casereader to be replaced by a shim caseader). */
409 casereader_dynamic_cast (struct casereader *reader,
410 const struct casereader_class *class)
412 return reader->class == class ? reader->aux : NULL;
415 /* Random-access casereader implementation.
417 This is a set of wrappers around casereader_create_sequential
418 and struct casereader_class to make it easy to create
419 efficient casereaders for data sources that natively support
422 /* One clone of a random reader. */
425 struct random_reader_shared *shared; /* Data shared among clones. */
426 struct heap_node heap_node; /* Node in shared data's heap of readers. */
427 casenumber offset; /* Number of cases already read. */
430 /* Returns the random_reader in which the given heap_node is
432 static struct random_reader *
433 random_reader_from_heap_node (const struct heap_node *node)
435 return heap_data (node, struct random_reader, heap_node);
438 /* Data shared among clones of a random reader. */
439 struct random_reader_shared
441 struct heap *readers; /* Heap of struct random_readers. */
442 casenumber min_offset; /* Smallest offset of any random_reader. */
443 const struct casereader_random_class *class;
447 static const struct casereader_class random_reader_casereader_class;
449 /* Creates and returns a new random_reader with the given SHARED
450 data and OFFSET. Inserts the new random reader into the
452 static struct random_reader *
453 make_random_reader (struct random_reader_shared *shared, casenumber offset)
455 struct random_reader *br = xmalloc (sizeof *br);
458 heap_insert (shared->readers, &br->heap_node);
462 /* Compares random_readers A and B by offset and returns a
463 strcmp()-like result. */
465 compare_random_readers_by_offset (const struct heap_node *a_,
466 const struct heap_node *b_,
467 const void *aux UNUSED)
469 const struct random_reader *a = random_reader_from_heap_node (a_);
470 const struct random_reader *b = random_reader_from_heap_node (b_);
471 return a->offset < b->offset ? -1 : a->offset > b->offset;
474 /* Creates and returns a new casereader. This function is
475 intended for use by casereader implementations, not by
478 This function is most suited for creating a casereader for a
479 data source that supports random access.
480 casereader_create_sequential is more appropriate for a data
481 source that is naturally sequential.
483 PROTO must be the prototype for the cases that may be read
484 from the casereader. The caller retains its reference to
487 CASE_CNT is an upper limit on the number of cases that
488 casereader_read will return from the casereader in successive
489 calls. Ordinarily, this is the actual number of cases in the
490 data source or CASENUMBER_MAX if the number of cases cannot be
491 predicted in advance.
493 CLASS and AUX are a set of casereader implementation-specific
494 member functions and auxiliary data to pass to those member
495 functions, respectively. */
497 casereader_create_random (const struct caseproto *proto, casenumber case_cnt,
498 const struct casereader_random_class *class,
501 struct random_reader_shared *shared = xmalloc (sizeof *shared);
502 shared->readers = heap_create (compare_random_readers_by_offset, NULL);
503 shared->class = class;
505 shared->min_offset = 0;
506 return casereader_create_sequential (NULL, proto, case_cnt,
507 &random_reader_casereader_class,
508 make_random_reader (shared, 0));
511 /* Reassesses the min_offset in SHARED based on the minimum
512 offset in the heap. */
514 advance_random_reader (struct casereader *reader,
515 struct random_reader_shared *shared)
519 old = shared->min_offset;
520 new = random_reader_from_heap_node (heap_minimum (shared->readers))->offset;
524 shared->min_offset = new;
525 shared->class->advance (reader, shared->aux, new - old);
529 /* struct casereader_class "read" function for random reader. */
530 static struct ccase *
531 random_reader_read (struct casereader *reader, void *br_)
533 struct random_reader *br = br_;
534 struct random_reader_shared *shared = br->shared;
535 struct ccase *c = shared->class->read (reader, shared->aux,
536 br->offset - shared->min_offset);
540 heap_changed (shared->readers, &br->heap_node);
541 advance_random_reader (reader, shared);
546 /* struct casereader_class "destroy" function for random
549 random_reader_destroy (struct casereader *reader, void *br_)
551 struct random_reader *br = br_;
552 struct random_reader_shared *shared = br->shared;
554 heap_delete (shared->readers, &br->heap_node);
555 if (heap_is_empty (shared->readers))
557 heap_destroy (shared->readers);
558 shared->class->destroy (reader, shared->aux);
562 advance_random_reader (reader, shared);
567 /* struct casereader_class "clone" function for random reader. */
568 static struct casereader *
569 random_reader_clone (struct casereader *reader, void *br_)
571 struct random_reader *br = br_;
572 struct random_reader_shared *shared = br->shared;
573 return casereader_create_sequential (casereader_get_taint (reader),
575 casereader_get_case_cnt (reader),
576 &random_reader_casereader_class,
577 make_random_reader (shared,
581 /* struct casereader_class "peek" function for random reader. */
582 static struct ccase *
583 random_reader_peek (struct casereader *reader, void *br_, casenumber idx)
585 struct random_reader *br = br_;
586 struct random_reader_shared *shared = br->shared;
588 return shared->class->read (reader, shared->aux,
589 br->offset - shared->min_offset + idx);
592 /* Casereader class for random reader. */
593 static const struct casereader_class random_reader_casereader_class =
596 random_reader_destroy,
601 /* Buffering shim for implementing clone and peek operations.
603 The "clone" and "peek" operations aren't implemented by all
604 types of casereaders, but we have to expose a uniform
605 interface anyhow. We do this by interposing a buffering
606 casereader on top of the existing casereader on the first call
607 to "clone" or "peek". The buffering casereader maintains a
608 window of cases that spans the positions of the original
609 casereader and all of its clones (the "clone set"), from the
610 position of the casereader that has read the fewest cases to
611 the position of the casereader that has read the most.
613 Thus, if all of the casereaders in the clone set are at
614 approximately the same position, only a few cases are buffered
615 and there is little inefficiency. If, on the other hand, one
616 casereader is not used to read any cases at all, but another
617 one is used to read all of the cases, the entire contents of
618 the casereader is copied into the buffer. This still might
619 not be so inefficient, given that case data in memory is
620 shared across multiple identical copies, but in the worst case
621 the window implementation will write cases to disk instead of
622 maintaining them in-memory. */
624 /* A buffering shim for a non-clonable or non-peekable
628 struct casewindow *window; /* Window of buffered cases. */
629 struct casereader *subreader; /* Subordinate casereader. */
632 static const struct casereader_random_class shim_class;
634 /* Interposes a buffering shim atop READER. */
636 insert_shim (struct casereader *reader)
638 const struct caseproto *proto = casereader_get_proto (reader);
639 casenumber case_cnt = casereader_get_case_cnt (reader);
640 struct shim *b = xmalloc (sizeof *b);
641 b->window = casewindow_create (proto, settings_get_workspace_cases (proto));
642 b->subreader = casereader_create_random (proto, case_cnt, &shim_class, b);
643 casereader_swap (reader, b->subreader);
644 taint_propagate (casewindow_get_taint (b->window),
645 casereader_get_taint (reader));
646 taint_propagate (casereader_get_taint (b->subreader),
647 casereader_get_taint (reader));
650 /* Ensures that B's window contains at least CASE_CNT cases.
651 Return true if successful, false upon reaching the end of B's
652 subreader or an I/O error. */
654 prime_buffer (struct shim *b, casenumber case_cnt)
656 while (casewindow_get_case_cnt (b->window) < case_cnt)
658 struct ccase *tmp = casereader_read (b->subreader);
661 casewindow_push_head (b->window, tmp);
666 /* Reads the case at the given 0-based OFFSET from the front of
667 the window into C. Returns the case if successful, or a null
668 pointer if OFFSET is beyond the end of file or upon I/O error.
669 The caller must call case_unref() on the returned case when it
670 is no longer needed. */
671 static struct ccase *
672 shim_read (struct casereader *reader UNUSED, void *b_,
676 if (!prime_buffer (b, offset + 1))
678 return casewindow_get_case (b->window, offset);
683 shim_destroy (struct casereader *reader UNUSED, void *b_)
686 casewindow_destroy (b->window);
687 casereader_destroy (b->subreader);
691 /* Discards CNT cases from the front of B's window. */
693 shim_advance (struct casereader *reader UNUSED, void *b_, casenumber case_cnt)
696 casewindow_pop_tail (b->window, case_cnt);
699 /* Class for the buffered reader. */
700 static const struct casereader_random_class shim_class =
707 static const struct casereader_class casereader_null_class;
709 /* Returns a casereader with no cases. The casereader has the prototype
710 specified by PROTO. PROTO may be specified as a null pointer, in which case
711 the casereader has no variables. */
713 casereader_create_empty (const struct caseproto *proto_)
715 struct casereader *reader;
716 struct caseproto *proto;
718 proto = proto_ != NULL ? caseproto_ref (proto_) : caseproto_create ();
719 reader = casereader_create_sequential (NULL, proto, 0,
720 &casereader_null_class, NULL);
721 caseproto_unref (proto);
726 static struct ccase *
727 casereader_null_read (struct casereader *reader UNUSED, void *aux UNUSED)
733 casereader_null_destroy (struct casereader *reader UNUSED, void *aux UNUSED)
738 static const struct casereader_class casereader_null_class =
740 casereader_null_read,
741 casereader_null_destroy,