1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 2007, 2009 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
19 #include <data/casereader.h>
20 #include <data/casereader-provider.h>
24 #include <data/casewindow.h>
25 #include <data/casewriter.h>
26 #include <data/settings.h>
27 #include <libpspp/assertion.h>
28 #include <libpspp/heap.h>
29 #include <libpspp/taint.h>
36 struct taint *taint; /* Corrupted? */
37 size_t value_cnt; /* Values per case. */
38 casenumber case_cnt; /* Number of cases,
39 CASENUMBER_MAX if unknown. */
40 const struct casereader_class *class; /* Class. */
41 void *aux; /* Auxiliary data for class. */
44 static void insert_shim (struct casereader *);
46 /* Reads and returns the next case from READER. The caller owns
47 the returned case and must call case_unref on it when its data
48 is no longer needed. Returns a null pointer if cases have
49 been exhausted or upon detection of an I/O error.
51 The case returned is effectively consumed: it can never be
52 read again through READER. If this is inconvenient, READER
53 may be cloned in advance with casereader_clone, or
54 casereader_peek may be used instead. */
56 casereader_read (struct casereader *reader)
58 if (reader->case_cnt != 0)
60 /* ->read may use casereader_swap to replace itself by
61 another reader and then delegate to that reader by
62 recursively calling casereader_read. Currently only
63 lazy_casereader does this and, with luck, nothing else
66 To allow this to work, however, we must decrement
67 case_cnt before calling ->read. If we decremented
68 case_cnt after calling ->read, then this would actually
69 drop two cases from case_cnt instead of one, and we'd
70 lose the last case in the casereader. */
72 if (reader->case_cnt != CASENUMBER_MAX)
74 c = reader->class->read (reader, reader->aux);
77 assert (case_get_value_cnt (c) >= reader->value_cnt);
86 Returns false if an I/O error was detected on READER, true
89 casereader_destroy (struct casereader *reader)
94 reader->class->destroy (reader, reader->aux);
95 ok = taint_destroy (reader->taint);
101 /* Returns a clone of READER. READER and its clone may be used
102 to read the same sequence of cases in the same order, barring
105 casereader_clone (const struct casereader *reader_)
107 struct casereader *reader = (struct casereader *) reader_;
108 struct casereader *clone;
109 if ( reader == NULL )
112 if (reader->class->clone == NULL)
113 insert_shim (reader);
114 clone = reader->class->clone (reader, reader->aux);
115 assert (clone != NULL);
116 assert (clone != reader);
120 /* Makes a copy of ORIGINAL into *NEW1 (if NEW1 is non-null) and
121 *NEW2 (if NEW2 is non-null), then destroys ORIGINAL. */
123 casereader_split (struct casereader *original,
124 struct casereader **new1, struct casereader **new2)
126 if (new1 != NULL && new2 != NULL)
128 *new1 = casereader_rename (original);
129 *new2 = casereader_clone (*new1);
131 else if (new1 != NULL)
132 *new1 = casereader_rename (original);
133 else if (new2 != NULL)
134 *new2 = casereader_rename (original);
136 casereader_destroy (original);
139 /* Returns a copy of READER, which is itself destroyed.
140 Useful for taking over ownership of a casereader, to enforce
141 preventing the original owner from accessing the casereader
144 casereader_rename (struct casereader *reader)
146 struct casereader *new = xmemdup (reader, sizeof *reader);
151 /* Exchanges the casereaders referred to by A and B. */
153 casereader_swap (struct casereader *a, struct casereader *b)
157 struct casereader tmp = *a;
163 /* Reads and returns the (IDX + 1)'th case from READER. The
164 caller owns the returned case and must call case_unref on it
165 when it is no longer needed. Returns a null pointer if cases
166 have been exhausted or upon detection of an I/O error. */
168 casereader_peek (struct casereader *reader, casenumber idx)
170 if (idx < reader->case_cnt)
173 if (reader->class->peek == NULL)
174 insert_shim (reader);
175 c = reader->class->peek (reader, reader->aux, idx);
178 else if (casereader_error (reader))
179 reader->case_cnt = 0;
181 if (reader->case_cnt > idx)
182 reader->case_cnt = idx;
186 /* Returns true if no cases remain to be read from READER, or if
187 an error has occurred on READER. (A return value of false
188 does *not* mean that the next call to casereader_peek or
189 casereader_read will return true, because an error can occur
192 casereader_is_empty (struct casereader *reader)
194 if (reader->case_cnt == 0)
198 struct ccase *c = casereader_peek (reader, 0);
209 /* Returns true if an I/O error or another hard error has
210 occurred on READER, a clone of READER, or on some object on
211 which READER's data has a dependency, false otherwise. */
213 casereader_error (const struct casereader *reader)
215 return taint_is_tainted (reader->taint);
218 /* Marks READER as having encountered an error.
220 Ordinarily, this function should be called by the
221 implementation of a casereader, not by the casereader's
222 client. Instead, casereader clients should usually ensure
223 that a casereader's error state is correct by using
224 taint_propagate to propagate to the casereader's taint
225 structure, which may be obtained via casereader_get_taint. */
227 casereader_force_error (struct casereader *reader)
229 taint_set_taint (reader->taint);
232 /* Returns READER's associate taint object, for use with
233 taint_propagate and other taint functions. */
235 casereader_get_taint (const struct casereader *reader)
237 return reader->taint;
240 /* Returns the number of cases that will be read by successive
241 calls to casereader_read for READER, assuming that no errors
242 occur. Upon an error condition, the case count drops to 0, so
243 that no more cases can be obtained.
245 Not all casereaders can predict the number of cases that they
246 will produce without actually reading all of them. In that
247 case, this function returns CASENUMBER_MAX. To obtain the
248 actual number of cases in such a casereader, use
249 casereader_count_cases. */
251 casereader_get_case_cnt (struct casereader *reader)
253 return reader->case_cnt;
256 /* Returns the number of cases that will be read by successive
257 calls to casereader_read for READER, assuming that no errors
258 occur. Upon an error condition, the case count drops to 0, so
259 that no more cases can be obtained.
261 For a casereader that cannot predict the number of cases it
262 will produce, this function actually reads (and discards) all
263 of the contents of a clone of READER. Thus, the return value
264 is always correct in the absence of I/O errors. */
266 casereader_count_cases (struct casereader *reader)
268 if (reader->case_cnt == CASENUMBER_MAX)
270 casenumber n_cases = 0;
273 struct casereader *clone = casereader_clone (reader);
275 for (; (c = casereader_read (clone)) != NULL; case_unref (c))
278 casereader_destroy (clone);
279 reader->case_cnt = n_cases;
282 return reader->case_cnt;
285 /* Returns the number of struct values in each case in READER. */
287 casereader_get_value_cnt (struct casereader *reader)
289 return reader->value_cnt;
292 /* Copies all the cases in READER to WRITER, propagating errors
295 casereader_transfer (struct casereader *reader, struct casewriter *writer)
299 taint_propagate (casereader_get_taint (reader),
300 casewriter_get_taint (writer));
301 while ((c = casereader_read (reader)) != NULL)
302 casewriter_write (writer, c);
303 casereader_destroy (reader);
306 /* Creates and returns a new casereader. This function is
307 intended for use by casereader implementations, not by
310 This function is most suited for creating a casereader for a
311 data source that is naturally sequential.
312 casereader_create_random may be more appropriate for a data
313 source that supports random access.
315 Ordinarily, specify a null pointer for TAINT, in which case
316 the new casereader will have a new, unique taint object. If
317 the new casereader should have a clone of an existing taint
318 object, specify that object as TAINT. (This is most commonly
319 useful in an implementation of the "clone" casereader_class
320 function, in which case the cloned casereader should have the
321 same taint object as the original casereader.)
323 VALUE_CNT must be the number of struct values per case read
326 CASE_CNT is an upper limit on the number of cases that
327 casereader_read will return from the casereader in successive
328 calls. Ordinarily, this is the actual number of cases in the
329 data source or CASENUMBER_MAX if the number of cases cannot be
330 predicted in advance.
332 CLASS and AUX are a set of casereader implementation-specific
333 member functions and auxiliary data to pass to those member
334 functions, respectively. */
336 casereader_create_sequential (const struct taint *taint,
337 size_t value_cnt, casenumber case_cnt,
338 const struct casereader_class *class, void *aux)
340 struct casereader *reader = xmalloc (sizeof *reader);
341 reader->taint = taint != NULL ? taint_clone (taint) : taint_create ();
342 reader->value_cnt = value_cnt;
343 reader->case_cnt = case_cnt;
344 reader->class = class;
349 /* If READER is a casereader of the given CLASS, returns its
350 associated auxiliary data; otherwise, returns a null pointer.
352 This function is intended for use from casereader
353 implementations, not by casereader users. Even within
354 casereader implementations, its usefulness is quite limited,
355 for at least two reasons. First, every casereader member
356 function already receives a pointer to the casereader's
357 auxiliary data. Second, a casereader's class can change
358 (through a call to casereader_swap) and this is in practice
359 quite common (e.g. any call to casereader_clone on a
360 casereader that does not directly support clone will cause the
361 casereader to be replaced by a shim caseader). */
363 casereader_dynamic_cast (struct casereader *reader,
364 const struct casereader_class *class)
366 return reader->class == class ? reader->aux : NULL;
369 /* Random-access casereader implementation.
371 This is a set of wrappers around casereader_create_sequential
372 and struct casereader_class to make it easy to create
373 efficient casereaders for data sources that natively support
376 /* One clone of a random reader. */
379 struct random_reader_shared *shared; /* Data shared among clones. */
380 struct heap_node heap_node; /* Node in shared data's heap of readers. */
381 casenumber offset; /* Number of cases already read. */
384 /* Returns the random_reader in which the given heap_node is
386 static struct random_reader *
387 random_reader_from_heap_node (const struct heap_node *node)
389 return heap_data (node, struct random_reader, heap_node);
392 /* Data shared among clones of a random reader. */
393 struct random_reader_shared
395 struct heap *readers; /* Heap of struct random_readers. */
396 casenumber min_offset; /* Smallest offset of any random_reader. */
397 const struct casereader_random_class *class;
401 static const struct casereader_class random_reader_casereader_class;
403 /* Creates and returns a new random_reader with the given SHARED
404 data and OFFSET. Inserts the new random reader into the
406 static struct random_reader *
407 make_random_reader (struct random_reader_shared *shared, casenumber offset)
409 struct random_reader *br = xmalloc (sizeof *br);
412 heap_insert (shared->readers, &br->heap_node);
416 /* Compares random_readers A and B by offset and returns a
417 strcmp()-like result. */
419 compare_random_readers_by_offset (const struct heap_node *a_,
420 const struct heap_node *b_,
421 const void *aux UNUSED)
423 const struct random_reader *a = random_reader_from_heap_node (a_);
424 const struct random_reader *b = random_reader_from_heap_node (b_);
425 return a->offset < b->offset ? -1 : a->offset > b->offset;
428 /* Creates and returns a new casereader. This function is
429 intended for use by casereader implementations, not by
432 This function is most suited for creating a casereader for a
433 data source that supports random access.
434 casereader_create_sequential is more appropriate for a data
435 source that is naturally sequential.
437 VALUE_CNT must be the number of struct values per case read
440 CASE_CNT is an upper limit on the number of cases that
441 casereader_read will return from the casereader in successive
442 calls. Ordinarily, this is the actual number of cases in the
443 data source or CASENUMBER_MAX if the number of cases cannot be
444 predicted in advance.
446 CLASS and AUX are a set of casereader implementation-specific
447 member functions and auxiliary data to pass to those member
448 functions, respectively. */
450 casereader_create_random (size_t value_cnt, casenumber case_cnt,
451 const struct casereader_random_class *class,
454 struct random_reader_shared *shared = xmalloc (sizeof *shared);
455 shared->readers = heap_create (compare_random_readers_by_offset, NULL);
456 shared->class = class;
458 shared->min_offset = 0;
459 return casereader_create_sequential (NULL, value_cnt, case_cnt,
460 &random_reader_casereader_class,
461 make_random_reader (shared, 0));
464 /* Reassesses the min_offset in SHARED based on the minimum
465 offset in the heap. */
467 advance_random_reader (struct casereader *reader,
468 struct random_reader_shared *shared)
472 old = shared->min_offset;
473 new = random_reader_from_heap_node (heap_minimum (shared->readers))->offset;
477 shared->min_offset = new;
478 shared->class->advance (reader, shared->aux, new - old);
482 /* struct casereader_class "read" function for random reader. */
483 static struct ccase *
484 random_reader_read (struct casereader *reader, void *br_)
486 struct random_reader *br = br_;
487 struct random_reader_shared *shared = br->shared;
488 struct ccase *c = shared->class->read (reader, shared->aux,
489 br->offset - shared->min_offset);
493 heap_changed (shared->readers, &br->heap_node);
494 advance_random_reader (reader, shared);
499 /* struct casereader_class "destroy" function for random
502 random_reader_destroy (struct casereader *reader, void *br_)
504 struct random_reader *br = br_;
505 struct random_reader_shared *shared = br->shared;
507 heap_delete (shared->readers, &br->heap_node);
508 if (heap_is_empty (shared->readers))
510 heap_destroy (shared->readers);
511 shared->class->destroy (reader, shared->aux);
515 advance_random_reader (reader, shared);
520 /* struct casereader_class "clone" function for random reader. */
521 static struct casereader *
522 random_reader_clone (struct casereader *reader, void *br_)
524 struct random_reader *br = br_;
525 struct random_reader_shared *shared = br->shared;
526 return casereader_create_sequential (casereader_get_taint (reader),
527 casereader_get_value_cnt (reader),
528 casereader_get_case_cnt (reader),
529 &random_reader_casereader_class,
530 make_random_reader (shared,
534 /* struct casereader_class "peek" function for random reader. */
535 static struct ccase *
536 random_reader_peek (struct casereader *reader, void *br_, casenumber idx)
538 struct random_reader *br = br_;
539 struct random_reader_shared *shared = br->shared;
541 return shared->class->read (reader, shared->aux,
542 br->offset - shared->min_offset + idx);
545 /* Casereader class for random reader. */
546 static const struct casereader_class random_reader_casereader_class =
549 random_reader_destroy,
554 /* Buffering shim for implementing clone and peek operations.
556 The "clone" and "peek" operations aren't implemented by all
557 types of casereaders, but we have to expose a uniform
558 interface anyhow. We do this by interposing a buffering
559 casereader on top of the existing casereader on the first call
560 to "clone" or "peek". The buffering casereader maintains a
561 window of cases that spans the positions of the original
562 casereader and all of its clones (the "clone set"), from the
563 position of the casereader that has read the fewest cases to
564 the position of the casereader that has read the most.
566 Thus, if all of the casereaders in the clone set are at
567 approximately the same position, only a few cases are buffered
568 and there is little inefficiency. If, on the other hand, one
569 casereader is not used to read any cases at all, but another
570 one is used to read all of the cases, the entire contents of
571 the casereader is copied into the buffer. This still might
572 not be so inefficient, given that case data in memory is
573 shared across multiple identical copies, but in the worst case
574 the window implementation will write cases to disk instead of
575 maintaining them in-memory. */
577 /* A buffering shim for a non-clonable or non-peekable
581 struct casewindow *window; /* Window of buffered cases. */
582 struct casereader *subreader; /* Subordinate casereader. */
585 static const struct casereader_random_class shim_class;
587 /* Interposes a buffering shim atop READER. */
589 insert_shim (struct casereader *reader)
591 size_t value_cnt = casereader_get_value_cnt (reader);
592 casenumber case_cnt = casereader_get_case_cnt (reader);
593 struct shim *b = xmalloc (sizeof *b);
594 b->window = casewindow_create (value_cnt, settings_get_workspace_cases (value_cnt));
595 b->subreader = casereader_create_random (value_cnt, case_cnt,
597 casereader_swap (reader, b->subreader);
598 taint_propagate (casewindow_get_taint (b->window),
599 casereader_get_taint (reader));
600 taint_propagate (casereader_get_taint (b->subreader),
601 casereader_get_taint (reader));
604 /* Ensures that B's window contains at least CASE_CNT cases.
605 Return true if successful, false upon reaching the end of B's
606 subreader or an I/O error. */
608 prime_buffer (struct shim *b, casenumber case_cnt)
610 while (casewindow_get_case_cnt (b->window) < case_cnt)
612 struct ccase *tmp = casereader_read (b->subreader);
615 casewindow_push_head (b->window, tmp);
620 /* Reads the case at the given 0-based OFFSET from the front of
621 the window into C. Returns the case if successful, or a null
622 pointer if OFFSET is beyond the end of file or upon I/O error.
623 The caller must call case_unref() on the returned case when it
624 is no longer needed. */
625 static struct ccase *
626 shim_read (struct casereader *reader UNUSED, void *b_,
630 if (!prime_buffer (b, offset + 1))
632 return casewindow_get_case (b->window, offset);
637 shim_destroy (struct casereader *reader UNUSED, void *b_)
640 casewindow_destroy (b->window);
641 casereader_destroy (b->subreader);
645 /* Discards CNT cases from the front of B's window. */
647 shim_advance (struct casereader *reader UNUSED, void *b_, casenumber case_cnt)
650 casewindow_pop_tail (b->window, case_cnt);
653 /* Class for the buffered reader. */
654 static const struct casereader_random_class shim_class =