1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 2007 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
19 #include <data/casereader.h>
20 #include <data/casereader-provider.h>
24 #include <data/casewindow.h>
25 #include <data/casewriter.h>
26 #include <data/settings.h>
27 #include <libpspp/assertion.h>
28 #include <libpspp/heap.h>
29 #include <libpspp/taint.h>
36 struct taint *taint; /* Corrupted? */
37 size_t value_cnt; /* Values per case. */
38 casenumber case_cnt; /* Number of cases,
39 CASENUMBER_MAX if unknown. */
40 const struct casereader_class *class; /* Class. */
41 void *aux; /* Auxiliary data for class. */
44 static void insert_shim (struct casereader *);
46 /* Creates a new case in C and reads the next case from READER
47 into it. The caller owns C and must destroy C when its data
48 is no longer needed. Return true if successful, false when
49 cases have been exhausted or upon detection of an I/O error.
50 In the latter case, C is set to the null case.
52 The case returned is effectively consumed: it can never be
53 read again through READER. If this is inconvenient, READER
54 may be cloned in advance with casereader_clone, or
55 casereader_peek may be used instead. */
57 casereader_read (struct casereader *reader, struct ccase *c)
59 if (reader->case_cnt != 0)
61 /* ->read may use casereader_swap to replace itself by
62 another reader and then delegate to that reader by
63 recursively calling casereader_read. Currently only
64 lazy_casereader does this and, with luck, nothing else
67 To allow this to work, however, we must decrement
68 case_cnt before calling ->read. If we decremented
69 case_cnt after calling ->read, then this would actually
70 drop two cases from case_cnt instead of one, and we'd
71 lose the last case in the casereader. */
72 if (reader->case_cnt != CASENUMBER_MAX)
74 if (reader->class->read (reader, reader->aux, c))
76 assert (case_get_value_cnt (c) >= reader->value_cnt);
86 Returns false if an I/O error was detected on READER, true
89 casereader_destroy (struct casereader *reader)
94 reader->class->destroy (reader, reader->aux);
95 ok = taint_destroy (reader->taint);
101 /* Returns a clone of READER. READER and its clone may be used
102 to read the same sequence of cases in the same order, barring
105 casereader_clone (const struct casereader *reader_)
107 struct casereader *reader = (struct casereader *) reader_;
108 struct casereader *clone;
109 if ( reader == NULL )
112 if (reader->class->clone == NULL)
113 insert_shim (reader);
114 clone = reader->class->clone (reader, reader->aux);
115 assert (clone != NULL);
116 assert (clone != reader);
120 /* Makes a copy of ORIGINAL into *NEW1 (if NEW1 is non-null) and
121 *NEW2 (if NEW2 is non-null), then destroys ORIGINAL. */
123 casereader_split (struct casereader *original,
124 struct casereader **new1, struct casereader **new2)
126 if (new1 != NULL && new2 != NULL)
128 *new1 = casereader_rename (original);
129 *new2 = casereader_clone (*new1);
131 else if (new1 != NULL)
132 *new1 = casereader_rename (original);
133 else if (new2 != NULL)
134 *new2 = casereader_rename (original);
136 casereader_destroy (original);
139 /* Returns a copy of READER, which is itself destroyed.
140 Useful for taking over ownership of a casereader, to enforce
141 preventing the original owner from accessing the casereader
144 casereader_rename (struct casereader *reader)
146 struct casereader *new = xmemdup (reader, sizeof *reader);
151 /* Exchanges the casereaders referred to by A and B. */
153 casereader_swap (struct casereader *a, struct casereader *b)
157 struct casereader tmp = *a;
163 /* Creates a new case in C and reads the (IDX + 1)'th case from
164 READER into it. The caller owns C and must destroy C when its
165 data is no longer needed. Return true if successful, false
166 when cases have been exhausted or upon detection of an I/O
167 error. In the latter case, C is set to the null case. */
169 casereader_peek (struct casereader *reader, casenumber idx, struct ccase *c)
171 if (idx < reader->case_cnt)
173 if (reader->class->peek == NULL)
174 insert_shim (reader);
175 if (reader->class->peek (reader, reader->aux, idx, c))
177 else if (casereader_error (reader))
178 reader->case_cnt = 0;
180 if (reader->case_cnt > idx)
181 reader->case_cnt = idx;
186 /* Returns true if an I/O error or another hard error has
187 occurred on READER, a clone of READER, or on some object on
188 which READER's data has a dependency, false otherwise. */
190 casereader_error (const struct casereader *reader)
192 return taint_is_tainted (reader->taint);
195 /* Marks READER as having encountered an error.
197 Ordinarily, this function should be called by the
198 implementation of a casereader, not by the casereader's
199 client. Instead, casereader clients should usually ensure
200 that a casereader's error state is correct by using
201 taint_propagate to propagate to the casereader's taint
202 structure, which may be obtained via casereader_get_taint. */
204 casereader_force_error (struct casereader *reader)
206 taint_set_taint (reader->taint);
209 /* Returns READER's associate taint object, for use with
210 taint_propagate and other taint functions. */
212 casereader_get_taint (const struct casereader *reader)
214 return reader->taint;
217 /* Returns the number of cases that will be read by successive
218 calls to casereader_read for READER, assuming that no errors
219 occur. Upon an error condition, the case count drops to 0, so
220 that no more cases can be obtained.
222 Not all casereaders can predict the number of cases that they
223 will produce without actually reading all of them. In that
224 case, this function returns CASENUMBER_MAX. To obtain the
225 actual number of cases in such a casereader, use
226 casereader_count_cases. */
228 casereader_get_case_cnt (struct casereader *reader)
230 return reader->case_cnt;
233 /* Returns the number of cases that will be read by successive
234 calls to casereader_read for READER, assuming that no errors
235 occur. Upon an error condition, the case count drops to 0, so
236 that no more cases can be obtained.
238 For a casereader that cannot predict the number of cases it
239 will produce, this function actually reads (and discards) all
240 of the contents of a clone of READER. Thus, the return value
241 is always correct in the absence of I/O errors. */
243 casereader_count_cases (struct casereader *reader)
245 if (reader->case_cnt == CASENUMBER_MAX)
247 casenumber n_cases = 0;
250 struct casereader *clone = casereader_clone (reader);
252 for (; casereader_read (clone, &c); case_destroy (&c))
255 casereader_destroy (clone);
256 reader->case_cnt = n_cases;
259 return reader->case_cnt;
262 /* Returns the number of struct values in each case in READER. */
264 casereader_get_value_cnt (struct casereader *reader)
266 return reader->value_cnt;
269 /* Copies all the cases in READER to WRITER, propagating errors
272 casereader_transfer (struct casereader *reader, struct casewriter *writer)
276 taint_propagate (casereader_get_taint (reader),
277 casewriter_get_taint (writer));
278 while (casereader_read (reader, &c))
279 casewriter_write (writer, &c);
280 casereader_destroy (reader);
283 /* Creates and returns a new casereader. This function is
284 intended for use by casereader implementations, not by
287 This function is most suited for creating a casereader for a
288 data source that is naturally sequential.
289 casereader_create_random may be more appropriate for a data
290 source that supports random access.
292 Ordinarily, specify a null pointer for TAINT, in which case
293 the new casereader will have a new, unique taint object. If
294 the new casereader should have a clone of an existing taint
295 object, specify that object as TAINT. (This is most commonly
296 useful in an implementation of the "clone" casereader_class
297 function, in which case the cloned casereader should have the
298 same taint object as the original casereader.)
300 VALUE_CNT must be the number of struct values per case read
303 CASE_CNT is an upper limit on the number of cases that
304 casereader_read will return from the casereader in successive
305 calls. Ordinarily, this is the actual number of cases in the
306 data source or CASENUMBER_MAX if the number of cases cannot be
307 predicted in advance.
309 CLASS and AUX are a set of casereader implementation-specific
310 member functions and auxiliary data to pass to those member
311 functions, respectively. */
313 casereader_create_sequential (const struct taint *taint,
314 size_t value_cnt, casenumber case_cnt,
315 const struct casereader_class *class, void *aux)
317 struct casereader *reader = xmalloc (sizeof *reader);
318 reader->taint = taint != NULL ? taint_clone (taint) : taint_create ();
319 reader->value_cnt = value_cnt;
320 reader->case_cnt = case_cnt;
321 reader->class = class;
326 /* If READER is a casereader of the given CLASS, returns its
327 associated auxiliary data; otherwise, returns a null pointer.
329 This function is intended for use from casereader
330 implementations, not by casereader users. Even within
331 casereader implementations, its usefulness is quite limited,
332 for at least two reasons. First, every casereader member
333 function already receives a pointer to the casereader's
334 auxiliary data. Second, a casereader's class can change
335 (through a call to casereader_swap) and this is in practice
336 quite common (e.g. any call to casereader_clone on a
337 casereader that does not directly support clone will cause the
338 casereader to be replaced by a shim caseader). */
340 casereader_dynamic_cast (struct casereader *reader,
341 struct casereader_class *class)
343 return reader->class == class ? reader->aux : NULL;
346 /* Random-access casereader implementation.
348 This is a set of wrappers around casereader_create_sequential
349 and struct casereader_class to make it easy to create
350 efficient casereaders for data sources that natively support
353 /* One clone of a random reader. */
356 struct random_reader_shared *shared; /* Data shared among clones. */
357 struct heap_node heap_node; /* Node in shared data's heap of readers. */
358 casenumber offset; /* Number of cases already read. */
361 /* Returns the random_reader in which the given heap_node is
363 static struct random_reader *
364 random_reader_from_heap_node (const struct heap_node *node)
366 return heap_data (node, struct random_reader, heap_node);
369 /* Data shared among clones of a random reader. */
370 struct random_reader_shared
372 struct heap *readers; /* Heap of struct random_readers. */
373 casenumber min_offset; /* Smallest offset of any random_reader. */
374 const struct casereader_random_class *class;
378 static struct casereader_class random_reader_casereader_class;
380 /* Creates and returns a new random_reader with the given SHARED
381 data and OFFSET. Inserts the new random reader into the
383 static struct random_reader *
384 make_random_reader (struct random_reader_shared *shared, casenumber offset)
386 struct random_reader *br = xmalloc (sizeof *br);
389 heap_insert (shared->readers, &br->heap_node);
393 /* Compares random_readers A and B by offset and returns a
394 strcmp()-like result. */
396 compare_random_readers_by_offset (const struct heap_node *a_,
397 const struct heap_node *b_,
398 const void *aux UNUSED)
400 const struct random_reader *a = random_reader_from_heap_node (a_);
401 const struct random_reader *b = random_reader_from_heap_node (b_);
402 return a->offset < b->offset ? -1 : a->offset > b->offset;
405 /* Creates and returns a new casereader. This function is
406 intended for use by casereader implementations, not by
409 This function is most suited for creating a casereader for a
410 data source that supports random access.
411 casereader_create_sequential is more appropriate for a data
412 source that is naturally sequential.
414 VALUE_CNT must be the number of struct values per case read
417 CASE_CNT is an upper limit on the number of cases that
418 casereader_read will return from the casereader in successive
419 calls. Ordinarily, this is the actual number of cases in the
420 data source or CASENUMBER_MAX if the number of cases cannot be
421 predicted in advance.
423 CLASS and AUX are a set of casereader implementation-specific
424 member functions and auxiliary data to pass to those member
425 functions, respectively. */
427 casereader_create_random (size_t value_cnt, casenumber case_cnt,
428 const struct casereader_random_class *class,
431 struct random_reader_shared *shared = xmalloc (sizeof *shared);
432 shared->readers = heap_create (compare_random_readers_by_offset, NULL);
433 shared->class = class;
435 shared->min_offset = 0;
436 return casereader_create_sequential (NULL, value_cnt, case_cnt,
437 &random_reader_casereader_class,
438 make_random_reader (shared, 0));
441 /* Reassesses the min_offset in SHARED based on the minimum
442 offset in the heap. */
444 advance_random_reader (struct casereader *reader,
445 struct random_reader_shared *shared)
449 old = shared->min_offset;
450 new = random_reader_from_heap_node (heap_minimum (shared->readers))->offset;
454 shared->min_offset = new;
455 shared->class->advance (reader, shared->aux, new - old);
459 /* struct casereader_class "read" function for random reader. */
461 random_reader_read (struct casereader *reader, void *br_, struct ccase *c)
463 struct random_reader *br = br_;
464 struct random_reader_shared *shared = br->shared;
466 if (shared->class->read (reader, shared->aux,
467 br->offset - shared->min_offset, c))
470 heap_changed (shared->readers, &br->heap_node);
471 advance_random_reader (reader, shared);
478 /* struct casereader_class "destroy" function for random
481 random_reader_destroy (struct casereader *reader, void *br_)
483 struct random_reader *br = br_;
484 struct random_reader_shared *shared = br->shared;
486 heap_delete (shared->readers, &br->heap_node);
487 if (heap_is_empty (shared->readers))
489 heap_destroy (shared->readers);
490 shared->class->destroy (reader, shared->aux);
494 advance_random_reader (reader, shared);
499 /* struct casereader_class "clone" function for random reader. */
500 static struct casereader *
501 random_reader_clone (struct casereader *reader, void *br_)
503 struct random_reader *br = br_;
504 struct random_reader_shared *shared = br->shared;
505 return casereader_create_sequential (casereader_get_taint (reader),
506 casereader_get_value_cnt (reader),
507 casereader_get_case_cnt (reader),
508 &random_reader_casereader_class,
509 make_random_reader (shared,
513 /* struct casereader_class "peek" function for random reader. */
515 random_reader_peek (struct casereader *reader, void *br_,
516 casenumber idx, struct ccase *c)
518 struct random_reader *br = br_;
519 struct random_reader_shared *shared = br->shared;
521 return shared->class->read (reader, shared->aux,
522 br->offset - shared->min_offset + idx, c);
525 /* Casereader class for random reader. */
526 static struct casereader_class random_reader_casereader_class =
529 random_reader_destroy,
534 /* Buffering shim for implementing clone and peek operations.
536 The "clone" and "peek" operations aren't implemented by all
537 types of casereaders, but we have to expose a uniform
538 interface anyhow. We do this by interposing a buffering
539 casereader on top of the existing casereader on the first call
540 to "clone" or "peek". The buffering casereader maintains a
541 window of cases that spans the positions of the original
542 casereader and all of its clones (the "clone set"), from the
543 position of the casereader that has read the fewest cases to
544 the position of the casereader that has read the most.
546 Thus, if all of the casereaders in the clone set are at
547 approximately the same position, only a few cases are buffered
548 and there is little inefficiency. If, on the other hand, one
549 casereader is not used to read any cases at all, but another
550 one is used to read all of the cases, the entire contents of
551 the casereader is copied into the buffer. This still might
552 not be so inefficient, given that case data in memory is
553 shared across multiple identical copies, but in the worst case
554 the window implementation will write cases to disk instead of
555 maintaining them in-memory. */
557 /* A buffering shim for a non-clonable or non-peekable
561 struct casewindow *window; /* Window of buffered cases. */
562 struct casereader *subreader; /* Subordinate casereader. */
565 static struct casereader_random_class shim_class;
567 /* Interposes a buffering shim atop READER. */
569 insert_shim (struct casereader *reader)
571 size_t value_cnt = casereader_get_value_cnt (reader);
572 casenumber case_cnt = casereader_get_case_cnt (reader);
573 struct shim *b = xmalloc (sizeof *b);
574 b->window = casewindow_create (value_cnt, get_workspace_cases (value_cnt));
575 b->subreader = casereader_create_random (value_cnt, case_cnt,
577 casereader_swap (reader, b->subreader);
578 taint_propagate (casewindow_get_taint (b->window),
579 casereader_get_taint (reader));
580 taint_propagate (casereader_get_taint (b->subreader),
581 casereader_get_taint (reader));
584 /* Ensures that B's window contains at least CASE_CNT cases.
585 Return true if successful, false upon reaching the end of B's
586 subreader or an I/O error. */
588 prime_buffer (struct shim *b, casenumber case_cnt)
590 while (casewindow_get_case_cnt (b->window) < case_cnt)
593 if (!casereader_read (b->subreader, &tmp))
595 casewindow_push_head (b->window, &tmp);
600 /* Reads the case at the given 0-based OFFSET from the front of
601 the window into C. Returns true if successful, false if
602 OFFSET is beyond the end of file or upon I/O error. */
604 shim_read (struct casereader *reader UNUSED, void *b_,
605 casenumber offset, struct ccase *c)
608 return (prime_buffer (b, offset + 1)
609 && casewindow_get_case (b->window, offset, c));
614 shim_destroy (struct casereader *reader UNUSED, void *b_)
617 casewindow_destroy (b->window);
618 casereader_destroy (b->subreader);
622 /* Discards CNT cases from the front of B's window. */
624 shim_advance (struct casereader *reader UNUSED, void *b_, casenumber case_cnt)
627 casewindow_pop_tail (b->window, case_cnt);
630 /* Class for the buffered reader. */
631 static struct casereader_random_class shim_class =