1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 2007 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
19 #include <data/casereader.h>
20 #include <data/casereader-provider.h>
24 #include <data/casewindow.h>
25 #include <data/casewriter.h>
26 #include <data/settings.h>
27 #include <libpspp/assertion.h>
28 #include <libpspp/heap.h>
29 #include <libpspp/taint.h>
36 struct taint *taint; /* Corrupted? */
37 size_t value_cnt; /* Values per case. */
38 casenumber case_cnt; /* Number of cases,
39 CASENUMBER_MAX if unknown. */
40 const struct casereader_class *class; /* Class. */
41 void *aux; /* Auxiliary data for class. */
44 static void insert_shim (struct casereader *);
46 /* Creates a new case in C and reads the next case from READER
47 into it. The caller owns C and must destroy C when its data
48 is no longer needed. Return true if successful, false when
49 cases have been exhausted or upon detection of an I/O error.
50 In the latter case, C is set to the null case.
52 The case returned is effectively consumed: it can never be
53 read again through READER. If this is inconvenient, READER
54 may be cloned in advance with casereader_clone, or
55 casereader_peek may be used instead. */
57 casereader_read (struct casereader *reader, struct ccase *c)
59 if (reader->case_cnt != 0)
61 /* ->read may use casereader_swap to replace itself by
62 another reader and then delegate to that reader by
63 recursively calling casereader_read. Currently only
64 lazy_casereader does this and, with luck, nothing else
67 To allow this to work, however, we must decrement
68 case_cnt before calling ->read. If we decremented
69 case_cnt after calling ->read, then this would actually
70 drop two cases from case_cnt instead of one, and we'd
71 lose the last case in the casereader. */
72 if (reader->case_cnt != CASENUMBER_MAX)
74 if (reader->class->read (reader, reader->aux, c))
76 assert (case_get_value_cnt (c) >= reader->value_cnt);
86 Returns false if an I/O error was detected on READER, true
89 casereader_destroy (struct casereader *reader)
94 reader->class->destroy (reader, reader->aux);
95 ok = taint_destroy (reader->taint);
101 /* Returns a clone of READER. READER and its clone may be used
102 to read the same sequence of cases in the same order, barring
105 casereader_clone (const struct casereader *reader_)
107 struct casereader *reader = (struct casereader *) reader_;
108 struct casereader *clone;
109 if ( reader == NULL )
112 if (reader->class->clone == NULL)
113 insert_shim (reader);
114 clone = reader->class->clone (reader, reader->aux);
115 assert (clone != NULL);
116 assert (clone != reader);
120 /* Makes a copy of ORIGINAL into *NEW1 (if NEW1 is non-null) and
121 *NEW2 (if NEW2 is non-null), then destroys ORIGINAL. */
123 casereader_split (struct casereader *original,
124 struct casereader **new1, struct casereader **new2)
126 if (new1 != NULL && new2 != NULL)
128 *new1 = casereader_rename (original);
129 *new2 = casereader_clone (*new1);
131 else if (new1 != NULL)
132 *new1 = casereader_rename (original);
133 else if (new2 != NULL)
134 *new2 = casereader_rename (original);
136 casereader_destroy (original);
139 /* Returns a copy of READER, which is itself destroyed.
140 Useful for taking over ownership of a casereader, to enforce
141 preventing the original owner from accessing the casereader
144 casereader_rename (struct casereader *reader)
146 struct casereader *new = xmemdup (reader, sizeof *reader);
151 /* Exchanges the casereaders referred to by A and B. */
153 casereader_swap (struct casereader *a, struct casereader *b)
157 struct casereader tmp = *a;
163 /* Creates a new case in C and reads the (IDX + 1)'th case from
164 READER into it. The caller owns C and must destroy C when its
165 data is no longer needed. Return true if successful, false
166 when cases have been exhausted or upon detection of an I/O
167 error. In the latter case, C is set to the null case. */
169 casereader_peek (struct casereader *reader, casenumber idx, struct ccase *c)
171 if (idx < reader->case_cnt)
173 if (reader->class->peek == NULL)
174 insert_shim (reader);
175 if (reader->class->peek (reader, reader->aux, idx, c))
177 else if (casereader_error (reader))
178 reader->case_cnt = 0;
180 if (reader->case_cnt > idx)
181 reader->case_cnt = idx;
186 /* Returns true if no cases remain to be read from READER, or if
187 an error has occurred on READER. (A return value of false
188 does *not* mean that the next call to casereader_peek or
189 casereader_read will return true, because an error can occur
192 casereader_is_empty (struct casereader *reader)
195 if (reader->case_cnt == 0 || !casereader_peek (reader, 0, &c))
204 /* Returns true if an I/O error or another hard error has
205 occurred on READER, a clone of READER, or on some object on
206 which READER's data has a dependency, false otherwise. */
208 casereader_error (const struct casereader *reader)
210 return taint_is_tainted (reader->taint);
213 /* Marks READER as having encountered an error.
215 Ordinarily, this function should be called by the
216 implementation of a casereader, not by the casereader's
217 client. Instead, casereader clients should usually ensure
218 that a casereader's error state is correct by using
219 taint_propagate to propagate to the casereader's taint
220 structure, which may be obtained via casereader_get_taint. */
222 casereader_force_error (struct casereader *reader)
224 taint_set_taint (reader->taint);
227 /* Returns READER's associate taint object, for use with
228 taint_propagate and other taint functions. */
230 casereader_get_taint (const struct casereader *reader)
232 return reader->taint;
235 /* Returns the number of cases that will be read by successive
236 calls to casereader_read for READER, assuming that no errors
237 occur. Upon an error condition, the case count drops to 0, so
238 that no more cases can be obtained.
240 Not all casereaders can predict the number of cases that they
241 will produce without actually reading all of them. In that
242 case, this function returns CASENUMBER_MAX. To obtain the
243 actual number of cases in such a casereader, use
244 casereader_count_cases. */
246 casereader_get_case_cnt (struct casereader *reader)
248 return reader->case_cnt;
251 /* Returns the number of cases that will be read by successive
252 calls to casereader_read for READER, assuming that no errors
253 occur. Upon an error condition, the case count drops to 0, so
254 that no more cases can be obtained.
256 For a casereader that cannot predict the number of cases it
257 will produce, this function actually reads (and discards) all
258 of the contents of a clone of READER. Thus, the return value
259 is always correct in the absence of I/O errors. */
261 casereader_count_cases (struct casereader *reader)
263 if (reader->case_cnt == CASENUMBER_MAX)
265 casenumber n_cases = 0;
268 struct casereader *clone = casereader_clone (reader);
270 for (; casereader_read (clone, &c); case_destroy (&c))
273 casereader_destroy (clone);
274 reader->case_cnt = n_cases;
277 return reader->case_cnt;
280 /* Returns the number of struct values in each case in READER. */
282 casereader_get_value_cnt (struct casereader *reader)
284 return reader->value_cnt;
287 /* Copies all the cases in READER to WRITER, propagating errors
290 casereader_transfer (struct casereader *reader, struct casewriter *writer)
294 taint_propagate (casereader_get_taint (reader),
295 casewriter_get_taint (writer));
296 while (casereader_read (reader, &c))
297 casewriter_write (writer, &c);
298 casereader_destroy (reader);
301 /* Creates and returns a new casereader. This function is
302 intended for use by casereader implementations, not by
305 This function is most suited for creating a casereader for a
306 data source that is naturally sequential.
307 casereader_create_random may be more appropriate for a data
308 source that supports random access.
310 Ordinarily, specify a null pointer for TAINT, in which case
311 the new casereader will have a new, unique taint object. If
312 the new casereader should have a clone of an existing taint
313 object, specify that object as TAINT. (This is most commonly
314 useful in an implementation of the "clone" casereader_class
315 function, in which case the cloned casereader should have the
316 same taint object as the original casereader.)
318 VALUE_CNT must be the number of struct values per case read
321 CASE_CNT is an upper limit on the number of cases that
322 casereader_read will return from the casereader in successive
323 calls. Ordinarily, this is the actual number of cases in the
324 data source or CASENUMBER_MAX if the number of cases cannot be
325 predicted in advance.
327 CLASS and AUX are a set of casereader implementation-specific
328 member functions and auxiliary data to pass to those member
329 functions, respectively. */
331 casereader_create_sequential (const struct taint *taint,
332 size_t value_cnt, casenumber case_cnt,
333 const struct casereader_class *class, void *aux)
335 struct casereader *reader = xmalloc (sizeof *reader);
336 reader->taint = taint != NULL ? taint_clone (taint) : taint_create ();
337 reader->value_cnt = value_cnt;
338 reader->case_cnt = case_cnt;
339 reader->class = class;
344 /* If READER is a casereader of the given CLASS, returns its
345 associated auxiliary data; otherwise, returns a null pointer.
347 This function is intended for use from casereader
348 implementations, not by casereader users. Even within
349 casereader implementations, its usefulness is quite limited,
350 for at least two reasons. First, every casereader member
351 function already receives a pointer to the casereader's
352 auxiliary data. Second, a casereader's class can change
353 (through a call to casereader_swap) and this is in practice
354 quite common (e.g. any call to casereader_clone on a
355 casereader that does not directly support clone will cause the
356 casereader to be replaced by a shim caseader). */
358 casereader_dynamic_cast (struct casereader *reader,
359 const struct casereader_class *class)
361 return reader->class == class ? reader->aux : NULL;
364 /* Random-access casereader implementation.
366 This is a set of wrappers around casereader_create_sequential
367 and struct casereader_class to make it easy to create
368 efficient casereaders for data sources that natively support
371 /* One clone of a random reader. */
374 struct random_reader_shared *shared; /* Data shared among clones. */
375 struct heap_node heap_node; /* Node in shared data's heap of readers. */
376 casenumber offset; /* Number of cases already read. */
379 /* Returns the random_reader in which the given heap_node is
381 static struct random_reader *
382 random_reader_from_heap_node (const struct heap_node *node)
384 return heap_data (node, struct random_reader, heap_node);
387 /* Data shared among clones of a random reader. */
388 struct random_reader_shared
390 struct heap *readers; /* Heap of struct random_readers. */
391 casenumber min_offset; /* Smallest offset of any random_reader. */
392 const struct casereader_random_class *class;
396 static const struct casereader_class random_reader_casereader_class;
398 /* Creates and returns a new random_reader with the given SHARED
399 data and OFFSET. Inserts the new random reader into the
401 static struct random_reader *
402 make_random_reader (struct random_reader_shared *shared, casenumber offset)
404 struct random_reader *br = xmalloc (sizeof *br);
407 heap_insert (shared->readers, &br->heap_node);
411 /* Compares random_readers A and B by offset and returns a
412 strcmp()-like result. */
414 compare_random_readers_by_offset (const struct heap_node *a_,
415 const struct heap_node *b_,
416 const void *aux UNUSED)
418 const struct random_reader *a = random_reader_from_heap_node (a_);
419 const struct random_reader *b = random_reader_from_heap_node (b_);
420 return a->offset < b->offset ? -1 : a->offset > b->offset;
423 /* Creates and returns a new casereader. This function is
424 intended for use by casereader implementations, not by
427 This function is most suited for creating a casereader for a
428 data source that supports random access.
429 casereader_create_sequential is more appropriate for a data
430 source that is naturally sequential.
432 VALUE_CNT must be the number of struct values per case read
435 CASE_CNT is an upper limit on the number of cases that
436 casereader_read will return from the casereader in successive
437 calls. Ordinarily, this is the actual number of cases in the
438 data source or CASENUMBER_MAX if the number of cases cannot be
439 predicted in advance.
441 CLASS and AUX are a set of casereader implementation-specific
442 member functions and auxiliary data to pass to those member
443 functions, respectively. */
445 casereader_create_random (size_t value_cnt, casenumber case_cnt,
446 const struct casereader_random_class *class,
449 struct random_reader_shared *shared = xmalloc (sizeof *shared);
450 shared->readers = heap_create (compare_random_readers_by_offset, NULL);
451 shared->class = class;
453 shared->min_offset = 0;
454 return casereader_create_sequential (NULL, value_cnt, case_cnt,
455 &random_reader_casereader_class,
456 make_random_reader (shared, 0));
459 /* Reassesses the min_offset in SHARED based on the minimum
460 offset in the heap. */
462 advance_random_reader (struct casereader *reader,
463 struct random_reader_shared *shared)
467 old = shared->min_offset;
468 new = random_reader_from_heap_node (heap_minimum (shared->readers))->offset;
472 shared->min_offset = new;
473 shared->class->advance (reader, shared->aux, new - old);
477 /* struct casereader_class "read" function for random reader. */
479 random_reader_read (struct casereader *reader, void *br_, struct ccase *c)
481 struct random_reader *br = br_;
482 struct random_reader_shared *shared = br->shared;
484 if (shared->class->read (reader, shared->aux,
485 br->offset - shared->min_offset, c))
488 heap_changed (shared->readers, &br->heap_node);
489 advance_random_reader (reader, shared);
496 /* struct casereader_class "destroy" function for random
499 random_reader_destroy (struct casereader *reader, void *br_)
501 struct random_reader *br = br_;
502 struct random_reader_shared *shared = br->shared;
504 heap_delete (shared->readers, &br->heap_node);
505 if (heap_is_empty (shared->readers))
507 heap_destroy (shared->readers);
508 shared->class->destroy (reader, shared->aux);
512 advance_random_reader (reader, shared);
517 /* struct casereader_class "clone" function for random reader. */
518 static struct casereader *
519 random_reader_clone (struct casereader *reader, void *br_)
521 struct random_reader *br = br_;
522 struct random_reader_shared *shared = br->shared;
523 return casereader_create_sequential (casereader_get_taint (reader),
524 casereader_get_value_cnt (reader),
525 casereader_get_case_cnt (reader),
526 &random_reader_casereader_class,
527 make_random_reader (shared,
531 /* struct casereader_class "peek" function for random reader. */
533 random_reader_peek (struct casereader *reader, void *br_,
534 casenumber idx, struct ccase *c)
536 struct random_reader *br = br_;
537 struct random_reader_shared *shared = br->shared;
539 return shared->class->read (reader, shared->aux,
540 br->offset - shared->min_offset + idx, c);
543 /* Casereader class for random reader. */
544 static const struct casereader_class random_reader_casereader_class =
547 random_reader_destroy,
552 /* Buffering shim for implementing clone and peek operations.
554 The "clone" and "peek" operations aren't implemented by all
555 types of casereaders, but we have to expose a uniform
556 interface anyhow. We do this by interposing a buffering
557 casereader on top of the existing casereader on the first call
558 to "clone" or "peek". The buffering casereader maintains a
559 window of cases that spans the positions of the original
560 casereader and all of its clones (the "clone set"), from the
561 position of the casereader that has read the fewest cases to
562 the position of the casereader that has read the most.
564 Thus, if all of the casereaders in the clone set are at
565 approximately the same position, only a few cases are buffered
566 and there is little inefficiency. If, on the other hand, one
567 casereader is not used to read any cases at all, but another
568 one is used to read all of the cases, the entire contents of
569 the casereader is copied into the buffer. This still might
570 not be so inefficient, given that case data in memory is
571 shared across multiple identical copies, but in the worst case
572 the window implementation will write cases to disk instead of
573 maintaining them in-memory. */
575 /* A buffering shim for a non-clonable or non-peekable
579 struct casewindow *window; /* Window of buffered cases. */
580 struct casereader *subreader; /* Subordinate casereader. */
583 static const struct casereader_random_class shim_class;
585 /* Interposes a buffering shim atop READER. */
587 insert_shim (struct casereader *reader)
589 size_t value_cnt = casereader_get_value_cnt (reader);
590 casenumber case_cnt = casereader_get_case_cnt (reader);
591 struct shim *b = xmalloc (sizeof *b);
592 b->window = casewindow_create (value_cnt, settings_get_workspace_cases (value_cnt));
593 b->subreader = casereader_create_random (value_cnt, case_cnt,
595 casereader_swap (reader, b->subreader);
596 taint_propagate (casewindow_get_taint (b->window),
597 casereader_get_taint (reader));
598 taint_propagate (casereader_get_taint (b->subreader),
599 casereader_get_taint (reader));
602 /* Ensures that B's window contains at least CASE_CNT cases.
603 Return true if successful, false upon reaching the end of B's
604 subreader or an I/O error. */
606 prime_buffer (struct shim *b, casenumber case_cnt)
608 while (casewindow_get_case_cnt (b->window) < case_cnt)
611 if (!casereader_read (b->subreader, &tmp))
613 casewindow_push_head (b->window, &tmp);
618 /* Reads the case at the given 0-based OFFSET from the front of
619 the window into C. Returns true if successful, false if
620 OFFSET is beyond the end of file or upon I/O error. */
622 shim_read (struct casereader *reader UNUSED, void *b_,
623 casenumber offset, struct ccase *c)
626 return (prime_buffer (b, offset + 1)
627 && casewindow_get_case (b->window, offset, c));
632 shim_destroy (struct casereader *reader UNUSED, void *b_)
635 casewindow_destroy (b->window);
636 casereader_destroy (b->subreader);
640 /* Discards CNT cases from the front of B's window. */
642 shim_advance (struct casereader *reader UNUSED, void *b_, casenumber case_cnt)
645 casewindow_pop_tail (b->window, case_cnt);
648 /* Class for the buffered reader. */
649 static const struct casereader_random_class shim_class =