1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 2007, 2009 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
19 #include <data/casereader.h>
20 #include <data/casereader-provider.h>
24 #include <data/casewindow.h>
25 #include <data/casewriter.h>
26 #include <data/settings.h>
27 #include <libpspp/assertion.h>
28 #include <libpspp/heap.h>
29 #include <libpspp/taint.h>
36 struct taint *taint; /* Corrupted? */
37 struct caseproto *proto; /* Format of contained cases. */
38 casenumber case_cnt; /* Number of cases,
39 CASENUMBER_MAX if unknown. */
40 const struct casereader_class *class; /* Class. */
41 void *aux; /* Auxiliary data for class. */
44 static void insert_shim (struct casereader *);
46 /* Reads and returns the next case from READER. The caller owns
47 the returned case and must call case_unref on it when its data
48 is no longer needed. Returns a null pointer if cases have
49 been exhausted or upon detection of an I/O error.
51 The case returned is effectively consumed: it can never be
52 read again through READER. If this is inconvenient, READER
53 may be cloned in advance with casereader_clone, or
54 casereader_peek may be used instead. */
56 casereader_read (struct casereader *reader)
58 if (reader->case_cnt != 0)
60 /* ->read may use casereader_swap to replace itself by
61 another reader and then delegate to that reader by
62 recursively calling casereader_read. Currently only
63 lazy_casereader does this and, with luck, nothing else
66 To allow this to work, however, we must decrement
67 case_cnt before calling ->read. If we decremented
68 case_cnt after calling ->read, then this would actually
69 drop two cases from case_cnt instead of one, and we'd
70 lose the last case in the casereader. */
72 if (reader->case_cnt != CASENUMBER_MAX)
74 c = reader->class->read (reader, reader->aux);
77 size_t n_widths UNUSED = caseproto_get_n_widths (reader->proto);
78 assert (case_get_value_cnt (c) >= n_widths);
79 expensive_assert (caseproto_equal (case_get_proto (c), 0,
80 reader->proto, 0, n_widths));
89 Returns false if an I/O error was detected on READER, true
92 casereader_destroy (struct casereader *reader)
97 reader->class->destroy (reader, reader->aux);
98 ok = taint_destroy (reader->taint);
99 caseproto_unref (reader->proto);
105 /* Returns a clone of READER. READER and its clone may be used
106 to read the same sequence of cases in the same order, barring
109 casereader_clone (const struct casereader *reader_)
111 struct casereader *reader = CONST_CAST (struct casereader *, reader_);
112 struct casereader *clone;
113 if ( reader == NULL )
116 if (reader->class->clone == NULL)
117 insert_shim (reader);
118 clone = reader->class->clone (reader, reader->aux);
119 assert (clone != NULL);
120 assert (clone != reader);
124 /* Makes a copy of ORIGINAL into *NEW1 (if NEW1 is non-null) and
125 *NEW2 (if NEW2 is non-null), then destroys ORIGINAL. */
127 casereader_split (struct casereader *original,
128 struct casereader **new1, struct casereader **new2)
130 if (new1 != NULL && new2 != NULL)
132 *new1 = casereader_rename (original);
133 *new2 = casereader_clone (*new1);
135 else if (new1 != NULL)
136 *new1 = casereader_rename (original);
137 else if (new2 != NULL)
138 *new2 = casereader_rename (original);
140 casereader_destroy (original);
143 /* Returns a copy of READER, which is itself destroyed.
144 Useful for taking over ownership of a casereader, to enforce
145 preventing the original owner from accessing the casereader
148 casereader_rename (struct casereader *reader)
150 struct casereader *new = xmemdup (reader, sizeof *reader);
155 /* Exchanges the casereaders referred to by A and B. */
157 casereader_swap (struct casereader *a, struct casereader *b)
161 struct casereader tmp = *a;
167 /* Reads and returns the (IDX + 1)'th case from READER. The
168 caller owns the returned case and must call case_unref on it
169 when it is no longer needed. Returns a null pointer if cases
170 have been exhausted or upon detection of an I/O error. */
172 casereader_peek (struct casereader *reader, casenumber idx)
174 if (idx < reader->case_cnt)
177 if (reader->class->peek == NULL)
178 insert_shim (reader);
179 c = reader->class->peek (reader, reader->aux, idx);
182 else if (casereader_error (reader))
183 reader->case_cnt = 0;
185 if (reader->case_cnt > idx)
186 reader->case_cnt = idx;
190 /* Returns true if no cases remain to be read from READER, or if
191 an error has occurred on READER. (A return value of false
192 does *not* mean that the next call to casereader_peek or
193 casereader_read will return true, because an error can occur
196 casereader_is_empty (struct casereader *reader)
198 if (reader->case_cnt == 0)
202 struct ccase *c = casereader_peek (reader, 0);
213 /* Returns true if an I/O error or another hard error has
214 occurred on READER, a clone of READER, or on some object on
215 which READER's data has a dependency, false otherwise. */
217 casereader_error (const struct casereader *reader)
219 return taint_is_tainted (reader->taint);
222 /* Marks READER as having encountered an error.
224 Ordinarily, this function should be called by the
225 implementation of a casereader, not by the casereader's
226 client. Instead, casereader clients should usually ensure
227 that a casereader's error state is correct by using
228 taint_propagate to propagate to the casereader's taint
229 structure, which may be obtained via casereader_get_taint. */
231 casereader_force_error (struct casereader *reader)
233 taint_set_taint (reader->taint);
236 /* Returns READER's associate taint object, for use with
237 taint_propagate and other taint functions. */
239 casereader_get_taint (const struct casereader *reader)
241 return reader->taint;
244 /* Returns the number of cases that will be read by successive
245 calls to casereader_read for READER, assuming that no errors
246 occur. Upon an error condition, the case count drops to 0, so
247 that no more cases can be obtained.
249 Not all casereaders can predict the number of cases that they
250 will produce without actually reading all of them. In that
251 case, this function returns CASENUMBER_MAX. To obtain the
252 actual number of cases in such a casereader, use
253 casereader_count_cases. */
255 casereader_get_case_cnt (struct casereader *reader)
257 return reader->case_cnt;
261 casereader_count_cases__ (struct casereader *reader, casenumber max_cases)
263 struct casereader *clone;
266 clone = casereader_clone (reader);
267 n_cases = casereader_advance (clone, max_cases);
268 casereader_destroy (clone);
273 /* Returns the number of cases that will be read by successive
274 calls to casereader_read for READER, assuming that no errors
275 occur. Upon an error condition, the case count drops to 0, so
276 that no more cases can be obtained.
278 For a casereader that cannot predict the number of cases it
279 will produce, this function actually reads (and discards) all
280 of the contents of a clone of READER. Thus, the return value
281 is always correct in the absence of I/O errors. */
283 casereader_count_cases (struct casereader *reader)
285 if (reader->case_cnt == CASENUMBER_MAX)
286 reader->case_cnt = casereader_count_cases__ (reader, CASENUMBER_MAX);
287 return reader->case_cnt;
290 /* Truncates READER to at most N cases. */
292 casereader_truncate (struct casereader *reader, casenumber n)
294 /* This could be optimized, if it ever becomes too expensive, by adding a
295 "max_cases" member to struct casereader. We could also add a "truncate"
296 function to the casereader implementation, to allow the casereader to
297 throw away data that cannot ever be read. */
298 if (reader->case_cnt == CASENUMBER_MAX)
299 reader->case_cnt = casereader_count_cases__ (reader, n);
300 if (reader->case_cnt > n)
301 reader->case_cnt = n;
304 /* Returns the prototype for the cases in READER. The caller
305 must not unref the returned prototype. */
306 const struct caseproto *
307 casereader_get_proto (const struct casereader *reader)
309 return reader->proto;
312 /* Skips past N cases in READER, stopping when the last case in
313 READER has been read or on an input error. Returns the number
314 of cases successfully skipped. */
316 casereader_advance (struct casereader *reader, casenumber n)
320 for (i = 0; i < n; i++)
322 struct ccase *c = casereader_read (reader);
332 /* Copies all the cases in READER to WRITER, propagating errors
335 casereader_transfer (struct casereader *reader, struct casewriter *writer)
339 taint_propagate (casereader_get_taint (reader),
340 casewriter_get_taint (writer));
341 while ((c = casereader_read (reader)) != NULL)
342 casewriter_write (writer, c);
343 casereader_destroy (reader);
346 /* Creates and returns a new casereader. This function is
347 intended for use by casereader implementations, not by
350 This function is most suited for creating a casereader for a
351 data source that is naturally sequential.
352 casereader_create_random may be more appropriate for a data
353 source that supports random access.
355 Ordinarily, specify a null pointer for TAINT, in which case
356 the new casereader will have a new, unique taint object. If
357 the new casereader should have a clone of an existing taint
358 object, specify that object as TAINT. (This is most commonly
359 useful in an implementation of the "clone" casereader_class
360 function, in which case the cloned casereader should have the
361 same taint object as the original casereader.)
363 PROTO must be the prototype for the cases that may be read
364 from the casereader. The caller retains its reference to
367 CASE_CNT is an upper limit on the number of cases that
368 casereader_read will return from the casereader in successive
369 calls. Ordinarily, this is the actual number of cases in the
370 data source or CASENUMBER_MAX if the number of cases cannot be
371 predicted in advance.
373 CLASS and AUX are a set of casereader implementation-specific
374 member functions and auxiliary data to pass to those member
375 functions, respectively. */
377 casereader_create_sequential (const struct taint *taint,
378 const struct caseproto *proto,
380 const struct casereader_class *class, void *aux)
382 struct casereader *reader = xmalloc (sizeof *reader);
383 reader->taint = taint != NULL ? taint_clone (taint) : taint_create ();
384 reader->proto = caseproto_ref (proto);
385 reader->case_cnt = case_cnt;
386 reader->class = class;
391 /* If READER is a casereader of the given CLASS, returns its
392 associated auxiliary data; otherwise, returns a null pointer.
394 This function is intended for use from casereader
395 implementations, not by casereader users. Even within
396 casereader implementations, its usefulness is quite limited,
397 for at least two reasons. First, every casereader member
398 function already receives a pointer to the casereader's
399 auxiliary data. Second, a casereader's class can change
400 (through a call to casereader_swap) and this is in practice
401 quite common (e.g. any call to casereader_clone on a
402 casereader that does not directly support clone will cause the
403 casereader to be replaced by a shim caseader). */
405 casereader_dynamic_cast (struct casereader *reader,
406 const struct casereader_class *class)
408 return reader->class == class ? reader->aux : NULL;
411 /* Random-access casereader implementation.
413 This is a set of wrappers around casereader_create_sequential
414 and struct casereader_class to make it easy to create
415 efficient casereaders for data sources that natively support
418 /* One clone of a random reader. */
421 struct random_reader_shared *shared; /* Data shared among clones. */
422 struct heap_node heap_node; /* Node in shared data's heap of readers. */
423 casenumber offset; /* Number of cases already read. */
426 /* Returns the random_reader in which the given heap_node is
428 static struct random_reader *
429 random_reader_from_heap_node (const struct heap_node *node)
431 return heap_data (node, struct random_reader, heap_node);
434 /* Data shared among clones of a random reader. */
435 struct random_reader_shared
437 struct heap *readers; /* Heap of struct random_readers. */
438 casenumber min_offset; /* Smallest offset of any random_reader. */
439 const struct casereader_random_class *class;
443 static const struct casereader_class random_reader_casereader_class;
445 /* Creates and returns a new random_reader with the given SHARED
446 data and OFFSET. Inserts the new random reader into the
448 static struct random_reader *
449 make_random_reader (struct random_reader_shared *shared, casenumber offset)
451 struct random_reader *br = xmalloc (sizeof *br);
454 heap_insert (shared->readers, &br->heap_node);
458 /* Compares random_readers A and B by offset and returns a
459 strcmp()-like result. */
461 compare_random_readers_by_offset (const struct heap_node *a_,
462 const struct heap_node *b_,
463 const void *aux UNUSED)
465 const struct random_reader *a = random_reader_from_heap_node (a_);
466 const struct random_reader *b = random_reader_from_heap_node (b_);
467 return a->offset < b->offset ? -1 : a->offset > b->offset;
470 /* Creates and returns a new casereader. This function is
471 intended for use by casereader implementations, not by
474 This function is most suited for creating a casereader for a
475 data source that supports random access.
476 casereader_create_sequential is more appropriate for a data
477 source that is naturally sequential.
479 PROTO must be the prototype for the cases that may be read
480 from the casereader. The caller retains its reference to
483 CASE_CNT is an upper limit on the number of cases that
484 casereader_read will return from the casereader in successive
485 calls. Ordinarily, this is the actual number of cases in the
486 data source or CASENUMBER_MAX if the number of cases cannot be
487 predicted in advance.
489 CLASS and AUX are a set of casereader implementation-specific
490 member functions and auxiliary data to pass to those member
491 functions, respectively. */
493 casereader_create_random (const struct caseproto *proto, casenumber case_cnt,
494 const struct casereader_random_class *class,
497 struct random_reader_shared *shared = xmalloc (sizeof *shared);
498 shared->readers = heap_create (compare_random_readers_by_offset, NULL);
499 shared->class = class;
501 shared->min_offset = 0;
502 return casereader_create_sequential (NULL, proto, case_cnt,
503 &random_reader_casereader_class,
504 make_random_reader (shared, 0));
507 /* Reassesses the min_offset in SHARED based on the minimum
508 offset in the heap. */
510 advance_random_reader (struct casereader *reader,
511 struct random_reader_shared *shared)
515 old = shared->min_offset;
516 new = random_reader_from_heap_node (heap_minimum (shared->readers))->offset;
520 shared->min_offset = new;
521 shared->class->advance (reader, shared->aux, new - old);
525 /* struct casereader_class "read" function for random reader. */
526 static struct ccase *
527 random_reader_read (struct casereader *reader, void *br_)
529 struct random_reader *br = br_;
530 struct random_reader_shared *shared = br->shared;
531 struct ccase *c = shared->class->read (reader, shared->aux,
532 br->offset - shared->min_offset);
536 heap_changed (shared->readers, &br->heap_node);
537 advance_random_reader (reader, shared);
542 /* struct casereader_class "destroy" function for random
545 random_reader_destroy (struct casereader *reader, void *br_)
547 struct random_reader *br = br_;
548 struct random_reader_shared *shared = br->shared;
550 heap_delete (shared->readers, &br->heap_node);
551 if (heap_is_empty (shared->readers))
553 heap_destroy (shared->readers);
554 shared->class->destroy (reader, shared->aux);
558 advance_random_reader (reader, shared);
563 /* struct casereader_class "clone" function for random reader. */
564 static struct casereader *
565 random_reader_clone (struct casereader *reader, void *br_)
567 struct random_reader *br = br_;
568 struct random_reader_shared *shared = br->shared;
569 return casereader_create_sequential (casereader_get_taint (reader),
571 casereader_get_case_cnt (reader),
572 &random_reader_casereader_class,
573 make_random_reader (shared,
577 /* struct casereader_class "peek" function for random reader. */
578 static struct ccase *
579 random_reader_peek (struct casereader *reader, void *br_, casenumber idx)
581 struct random_reader *br = br_;
582 struct random_reader_shared *shared = br->shared;
584 return shared->class->read (reader, shared->aux,
585 br->offset - shared->min_offset + idx);
588 /* Casereader class for random reader. */
589 static const struct casereader_class random_reader_casereader_class =
592 random_reader_destroy,
597 /* Buffering shim for implementing clone and peek operations.
599 The "clone" and "peek" operations aren't implemented by all
600 types of casereaders, but we have to expose a uniform
601 interface anyhow. We do this by interposing a buffering
602 casereader on top of the existing casereader on the first call
603 to "clone" or "peek". The buffering casereader maintains a
604 window of cases that spans the positions of the original
605 casereader and all of its clones (the "clone set"), from the
606 position of the casereader that has read the fewest cases to
607 the position of the casereader that has read the most.
609 Thus, if all of the casereaders in the clone set are at
610 approximately the same position, only a few cases are buffered
611 and there is little inefficiency. If, on the other hand, one
612 casereader is not used to read any cases at all, but another
613 one is used to read all of the cases, the entire contents of
614 the casereader is copied into the buffer. This still might
615 not be so inefficient, given that case data in memory is
616 shared across multiple identical copies, but in the worst case
617 the window implementation will write cases to disk instead of
618 maintaining them in-memory. */
620 /* A buffering shim for a non-clonable or non-peekable
624 struct casewindow *window; /* Window of buffered cases. */
625 struct casereader *subreader; /* Subordinate casereader. */
628 static const struct casereader_random_class shim_class;
630 /* Interposes a buffering shim atop READER. */
632 insert_shim (struct casereader *reader)
634 const struct caseproto *proto = casereader_get_proto (reader);
635 casenumber case_cnt = casereader_get_case_cnt (reader);
636 struct shim *b = xmalloc (sizeof *b);
637 b->window = casewindow_create (proto, settings_get_workspace_cases (proto));
638 b->subreader = casereader_create_random (proto, case_cnt, &shim_class, b);
639 casereader_swap (reader, b->subreader);
640 taint_propagate (casewindow_get_taint (b->window),
641 casereader_get_taint (reader));
642 taint_propagate (casereader_get_taint (b->subreader),
643 casereader_get_taint (reader));
646 /* Ensures that B's window contains at least CASE_CNT cases.
647 Return true if successful, false upon reaching the end of B's
648 subreader or an I/O error. */
650 prime_buffer (struct shim *b, casenumber case_cnt)
652 while (casewindow_get_case_cnt (b->window) < case_cnt)
654 struct ccase *tmp = casereader_read (b->subreader);
657 casewindow_push_head (b->window, tmp);
662 /* Reads the case at the given 0-based OFFSET from the front of
663 the window into C. Returns the case if successful, or a null
664 pointer if OFFSET is beyond the end of file or upon I/O error.
665 The caller must call case_unref() on the returned case when it
666 is no longer needed. */
667 static struct ccase *
668 shim_read (struct casereader *reader UNUSED, void *b_,
672 if (!prime_buffer (b, offset + 1))
674 return casewindow_get_case (b->window, offset);
679 shim_destroy (struct casereader *reader UNUSED, void *b_)
682 casewindow_destroy (b->window);
683 casereader_destroy (b->subreader);
687 /* Discards CNT cases from the front of B's window. */
689 shim_advance (struct casereader *reader UNUSED, void *b_, casenumber case_cnt)
692 casewindow_pop_tail (b->window, case_cnt);
695 /* Class for the buffered reader. */
696 static const struct casereader_random_class shim_class =
703 static const struct casereader_class casereader_null_class;
705 /* Returns a casereader with no cases. The casereader has the prototype
706 specified by PROTO. PROTO may be specified as a null pointer, in which case
707 the casereader has no variables. */
709 casereader_create_empty (const struct caseproto *proto_)
711 struct casereader *reader;
712 struct caseproto *proto;
714 proto = proto_ != NULL ? caseproto_ref (proto_) : caseproto_create ();
715 reader = casereader_create_sequential (NULL, proto, 0,
716 &casereader_null_class, NULL);
717 caseproto_unref (proto);
722 static struct ccase *
723 casereader_null_read (struct casereader *reader UNUSED, void *aux UNUSED)
729 casereader_null_destroy (struct casereader *reader UNUSED, void *aux UNUSED)
734 static const struct casereader_class casereader_null_class =
736 casereader_null_read,
737 casereader_null_destroy,