1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 2007, 2009 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
19 #include <data/casereader.h>
20 #include <data/casereader-provider.h>
24 #include <data/casewindow.h>
25 #include <data/casewriter.h>
26 #include <data/settings.h>
27 #include <libpspp/assertion.h>
28 #include <libpspp/heap.h>
29 #include <libpspp/taint.h>
36 struct taint *taint; /* Corrupted? */
37 struct caseproto *proto; /* Format of contained cases. */
38 casenumber case_cnt; /* Number of cases,
39 CASENUMBER_MAX if unknown. */
40 const struct casereader_class *class; /* Class. */
41 void *aux; /* Auxiliary data for class. */
44 static void insert_shim (struct casereader *);
46 /* Reads and returns the next case from READER. The caller owns
47 the returned case and must call case_unref on it when its data
48 is no longer needed. Returns a null pointer if cases have
49 been exhausted or upon detection of an I/O error.
51 The case returned is effectively consumed: it can never be
52 read again through READER. If this is inconvenient, READER
53 may be cloned in advance with casereader_clone, or
54 casereader_peek may be used instead. */
56 casereader_read (struct casereader *reader)
58 if (reader->case_cnt != 0)
60 /* ->read may use casereader_swap to replace itself by
61 another reader and then delegate to that reader by
62 recursively calling casereader_read. Currently only
63 lazy_casereader does this and, with luck, nothing else
66 To allow this to work, however, we must decrement
67 case_cnt before calling ->read. If we decremented
68 case_cnt after calling ->read, then this would actually
69 drop two cases from case_cnt instead of one, and we'd
70 lose the last case in the casereader. */
72 if (reader->case_cnt != CASENUMBER_MAX)
74 c = reader->class->read (reader, reader->aux);
77 size_t n_widths UNUSED = caseproto_get_n_widths (reader->proto);
78 assert (case_get_value_cnt (c) >= n_widths);
79 expensive_assert (caseproto_equal (case_get_proto (c), 0,
80 reader->proto, 0, n_widths));
89 Returns false if an I/O error was detected on READER, true
92 casereader_destroy (struct casereader *reader)
97 reader->class->destroy (reader, reader->aux);
98 ok = taint_destroy (reader->taint);
99 caseproto_unref (reader->proto);
105 /* Returns a clone of READER. READER and its clone may be used
106 to read the same sequence of cases in the same order, barring
109 casereader_clone (const struct casereader *reader_)
111 struct casereader *reader = CONST_CAST (struct casereader *, reader_);
112 struct casereader *clone;
113 if ( reader == NULL )
116 if (reader->class->clone == NULL)
117 insert_shim (reader);
118 clone = reader->class->clone (reader, reader->aux);
119 assert (clone != NULL);
120 assert (clone != reader);
124 /* Makes a copy of ORIGINAL into *NEW1 (if NEW1 is non-null) and
125 *NEW2 (if NEW2 is non-null), then destroys ORIGINAL. */
127 casereader_split (struct casereader *original,
128 struct casereader **new1, struct casereader **new2)
130 if (new1 != NULL && new2 != NULL)
132 *new1 = casereader_rename (original);
133 *new2 = casereader_clone (*new1);
135 else if (new1 != NULL)
136 *new1 = casereader_rename (original);
137 else if (new2 != NULL)
138 *new2 = casereader_rename (original);
140 casereader_destroy (original);
143 /* Returns a copy of READER, which is itself destroyed.
144 Useful for taking over ownership of a casereader, to enforce
145 preventing the original owner from accessing the casereader
148 casereader_rename (struct casereader *reader)
150 struct casereader *new = xmemdup (reader, sizeof *reader);
155 /* Exchanges the casereaders referred to by A and B. */
157 casereader_swap (struct casereader *a, struct casereader *b)
161 struct casereader tmp = *a;
167 /* Reads and returns the (IDX + 1)'th case from READER. The
168 caller owns the returned case and must call case_unref on it
169 when it is no longer needed. Returns a null pointer if cases
170 have been exhausted or upon detection of an I/O error. */
172 casereader_peek (struct casereader *reader, casenumber idx)
174 if (idx < reader->case_cnt)
177 if (reader->class->peek == NULL)
178 insert_shim (reader);
179 c = reader->class->peek (reader, reader->aux, idx);
182 else if (casereader_error (reader))
183 reader->case_cnt = 0;
185 if (reader->case_cnt > idx)
186 reader->case_cnt = idx;
190 /* Returns true if no cases remain to be read from READER, or if
191 an error has occurred on READER. (A return value of false
192 does *not* mean that the next call to casereader_peek or
193 casereader_read will return true, because an error can occur
196 casereader_is_empty (struct casereader *reader)
198 if (reader->case_cnt == 0)
202 struct ccase *c = casereader_peek (reader, 0);
213 /* Returns true if an I/O error or another hard error has
214 occurred on READER, a clone of READER, or on some object on
215 which READER's data has a dependency, false otherwise. */
217 casereader_error (const struct casereader *reader)
219 return taint_is_tainted (reader->taint);
222 /* Marks READER as having encountered an error.
224 Ordinarily, this function should be called by the
225 implementation of a casereader, not by the casereader's
226 client. Instead, casereader clients should usually ensure
227 that a casereader's error state is correct by using
228 taint_propagate to propagate to the casereader's taint
229 structure, which may be obtained via casereader_get_taint. */
231 casereader_force_error (struct casereader *reader)
233 taint_set_taint (reader->taint);
236 /* Returns READER's associate taint object, for use with
237 taint_propagate and other taint functions. */
239 casereader_get_taint (const struct casereader *reader)
241 return reader->taint;
244 /* Returns the number of cases that will be read by successive
245 calls to casereader_read for READER, assuming that no errors
246 occur. Upon an error condition, the case count drops to 0, so
247 that no more cases can be obtained.
249 Not all casereaders can predict the number of cases that they
250 will produce without actually reading all of them. In that
251 case, this function returns CASENUMBER_MAX. To obtain the
252 actual number of cases in such a casereader, use
253 casereader_count_cases. */
255 casereader_get_case_cnt (struct casereader *reader)
257 return reader->case_cnt;
261 casereader_count_cases__ (struct casereader *reader, casenumber max_cases)
263 struct casereader *clone;
266 clone = casereader_clone (reader);
267 for (n_cases = 0; n_cases < max_cases; n_cases++)
269 struct ccase *c = casereader_read (clone);
274 casereader_destroy (clone);
279 /* Returns the number of cases that will be read by successive
280 calls to casereader_read for READER, assuming that no errors
281 occur. Upon an error condition, the case count drops to 0, so
282 that no more cases can be obtained.
284 For a casereader that cannot predict the number of cases it
285 will produce, this function actually reads (and discards) all
286 of the contents of a clone of READER. Thus, the return value
287 is always correct in the absence of I/O errors. */
289 casereader_count_cases (struct casereader *reader)
291 if (reader->case_cnt == CASENUMBER_MAX)
292 reader->case_cnt = casereader_count_cases__ (reader, CASENUMBER_MAX);
293 return reader->case_cnt;
296 /* Truncates READER to at most N cases. */
298 casereader_truncate (struct casereader *reader, casenumber n)
300 /* This could be optimized, if it ever becomes too expensive, by adding a
301 "max_cases" member to struct casereader. We could also add a "truncate"
302 function to the casereader implementation, to allow the casereader to
303 throw away data that cannot ever be read. */
304 if (reader->case_cnt == CASENUMBER_MAX)
305 reader->case_cnt = casereader_count_cases__ (reader, n);
306 if (reader->case_cnt > n)
307 reader->case_cnt = n;
310 /* Returns the prototype for the cases in READER. The caller
311 must not unref the returned prototype. */
312 const struct caseproto *
313 casereader_get_proto (const struct casereader *reader)
315 return reader->proto;
318 /* Copies all the cases in READER to WRITER, propagating errors
321 casereader_transfer (struct casereader *reader, struct casewriter *writer)
325 taint_propagate (casereader_get_taint (reader),
326 casewriter_get_taint (writer));
327 while ((c = casereader_read (reader)) != NULL)
328 casewriter_write (writer, c);
329 casereader_destroy (reader);
332 /* Creates and returns a new casereader. This function is
333 intended for use by casereader implementations, not by
336 This function is most suited for creating a casereader for a
337 data source that is naturally sequential.
338 casereader_create_random may be more appropriate for a data
339 source that supports random access.
341 Ordinarily, specify a null pointer for TAINT, in which case
342 the new casereader will have a new, unique taint object. If
343 the new casereader should have a clone of an existing taint
344 object, specify that object as TAINT. (This is most commonly
345 useful in an implementation of the "clone" casereader_class
346 function, in which case the cloned casereader should have the
347 same taint object as the original casereader.)
349 PROTO must be the prototype for the cases that may be read
350 from the casereader. The caller retains its reference to
353 CASE_CNT is an upper limit on the number of cases that
354 casereader_read will return from the casereader in successive
355 calls. Ordinarily, this is the actual number of cases in the
356 data source or CASENUMBER_MAX if the number of cases cannot be
357 predicted in advance.
359 CLASS and AUX are a set of casereader implementation-specific
360 member functions and auxiliary data to pass to those member
361 functions, respectively. */
363 casereader_create_sequential (const struct taint *taint,
364 const struct caseproto *proto,
366 const struct casereader_class *class, void *aux)
368 struct casereader *reader = xmalloc (sizeof *reader);
369 reader->taint = taint != NULL ? taint_clone (taint) : taint_create ();
370 reader->proto = caseproto_ref (proto);
371 reader->case_cnt = case_cnt;
372 reader->class = class;
377 /* If READER is a casereader of the given CLASS, returns its
378 associated auxiliary data; otherwise, returns a null pointer.
380 This function is intended for use from casereader
381 implementations, not by casereader users. Even within
382 casereader implementations, its usefulness is quite limited,
383 for at least two reasons. First, every casereader member
384 function already receives a pointer to the casereader's
385 auxiliary data. Second, a casereader's class can change
386 (through a call to casereader_swap) and this is in practice
387 quite common (e.g. any call to casereader_clone on a
388 casereader that does not directly support clone will cause the
389 casereader to be replaced by a shim caseader). */
391 casereader_dynamic_cast (struct casereader *reader,
392 const struct casereader_class *class)
394 return reader->class == class ? reader->aux : NULL;
397 /* Random-access casereader implementation.
399 This is a set of wrappers around casereader_create_sequential
400 and struct casereader_class to make it easy to create
401 efficient casereaders for data sources that natively support
404 /* One clone of a random reader. */
407 struct random_reader_shared *shared; /* Data shared among clones. */
408 struct heap_node heap_node; /* Node in shared data's heap of readers. */
409 casenumber offset; /* Number of cases already read. */
412 /* Returns the random_reader in which the given heap_node is
414 static struct random_reader *
415 random_reader_from_heap_node (const struct heap_node *node)
417 return heap_data (node, struct random_reader, heap_node);
420 /* Data shared among clones of a random reader. */
421 struct random_reader_shared
423 struct heap *readers; /* Heap of struct random_readers. */
424 casenumber min_offset; /* Smallest offset of any random_reader. */
425 const struct casereader_random_class *class;
429 static const struct casereader_class random_reader_casereader_class;
431 /* Creates and returns a new random_reader with the given SHARED
432 data and OFFSET. Inserts the new random reader into the
434 static struct random_reader *
435 make_random_reader (struct random_reader_shared *shared, casenumber offset)
437 struct random_reader *br = xmalloc (sizeof *br);
440 heap_insert (shared->readers, &br->heap_node);
444 /* Compares random_readers A and B by offset and returns a
445 strcmp()-like result. */
447 compare_random_readers_by_offset (const struct heap_node *a_,
448 const struct heap_node *b_,
449 const void *aux UNUSED)
451 const struct random_reader *a = random_reader_from_heap_node (a_);
452 const struct random_reader *b = random_reader_from_heap_node (b_);
453 return a->offset < b->offset ? -1 : a->offset > b->offset;
456 /* Creates and returns a new casereader. This function is
457 intended for use by casereader implementations, not by
460 This function is most suited for creating a casereader for a
461 data source that supports random access.
462 casereader_create_sequential is more appropriate for a data
463 source that is naturally sequential.
465 PROTO must be the prototype for the cases that may be read
466 from the casereader. The caller retains its reference to
469 CASE_CNT is an upper limit on the number of cases that
470 casereader_read will return from the casereader in successive
471 calls. Ordinarily, this is the actual number of cases in the
472 data source or CASENUMBER_MAX if the number of cases cannot be
473 predicted in advance.
475 CLASS and AUX are a set of casereader implementation-specific
476 member functions and auxiliary data to pass to those member
477 functions, respectively. */
479 casereader_create_random (const struct caseproto *proto, casenumber case_cnt,
480 const struct casereader_random_class *class,
483 struct random_reader_shared *shared = xmalloc (sizeof *shared);
484 shared->readers = heap_create (compare_random_readers_by_offset, NULL);
485 shared->class = class;
487 shared->min_offset = 0;
488 return casereader_create_sequential (NULL, proto, case_cnt,
489 &random_reader_casereader_class,
490 make_random_reader (shared, 0));
493 /* Reassesses the min_offset in SHARED based on the minimum
494 offset in the heap. */
496 advance_random_reader (struct casereader *reader,
497 struct random_reader_shared *shared)
501 old = shared->min_offset;
502 new = random_reader_from_heap_node (heap_minimum (shared->readers))->offset;
506 shared->min_offset = new;
507 shared->class->advance (reader, shared->aux, new - old);
511 /* struct casereader_class "read" function for random reader. */
512 static struct ccase *
513 random_reader_read (struct casereader *reader, void *br_)
515 struct random_reader *br = br_;
516 struct random_reader_shared *shared = br->shared;
517 struct ccase *c = shared->class->read (reader, shared->aux,
518 br->offset - shared->min_offset);
522 heap_changed (shared->readers, &br->heap_node);
523 advance_random_reader (reader, shared);
528 /* struct casereader_class "destroy" function for random
531 random_reader_destroy (struct casereader *reader, void *br_)
533 struct random_reader *br = br_;
534 struct random_reader_shared *shared = br->shared;
536 heap_delete (shared->readers, &br->heap_node);
537 if (heap_is_empty (shared->readers))
539 heap_destroy (shared->readers);
540 shared->class->destroy (reader, shared->aux);
544 advance_random_reader (reader, shared);
549 /* struct casereader_class "clone" function for random reader. */
550 static struct casereader *
551 random_reader_clone (struct casereader *reader, void *br_)
553 struct random_reader *br = br_;
554 struct random_reader_shared *shared = br->shared;
555 return casereader_create_sequential (casereader_get_taint (reader),
557 casereader_get_case_cnt (reader),
558 &random_reader_casereader_class,
559 make_random_reader (shared,
563 /* struct casereader_class "peek" function for random reader. */
564 static struct ccase *
565 random_reader_peek (struct casereader *reader, void *br_, casenumber idx)
567 struct random_reader *br = br_;
568 struct random_reader_shared *shared = br->shared;
570 return shared->class->read (reader, shared->aux,
571 br->offset - shared->min_offset + idx);
574 /* Casereader class for random reader. */
575 static const struct casereader_class random_reader_casereader_class =
578 random_reader_destroy,
583 /* Buffering shim for implementing clone and peek operations.
585 The "clone" and "peek" operations aren't implemented by all
586 types of casereaders, but we have to expose a uniform
587 interface anyhow. We do this by interposing a buffering
588 casereader on top of the existing casereader on the first call
589 to "clone" or "peek". The buffering casereader maintains a
590 window of cases that spans the positions of the original
591 casereader and all of its clones (the "clone set"), from the
592 position of the casereader that has read the fewest cases to
593 the position of the casereader that has read the most.
595 Thus, if all of the casereaders in the clone set are at
596 approximately the same position, only a few cases are buffered
597 and there is little inefficiency. If, on the other hand, one
598 casereader is not used to read any cases at all, but another
599 one is used to read all of the cases, the entire contents of
600 the casereader is copied into the buffer. This still might
601 not be so inefficient, given that case data in memory is
602 shared across multiple identical copies, but in the worst case
603 the window implementation will write cases to disk instead of
604 maintaining them in-memory. */
606 /* A buffering shim for a non-clonable or non-peekable
610 struct casewindow *window; /* Window of buffered cases. */
611 struct casereader *subreader; /* Subordinate casereader. */
614 static const struct casereader_random_class shim_class;
616 /* Interposes a buffering shim atop READER. */
618 insert_shim (struct casereader *reader)
620 const struct caseproto *proto = casereader_get_proto (reader);
621 casenumber case_cnt = casereader_get_case_cnt (reader);
622 struct shim *b = xmalloc (sizeof *b);
623 b->window = casewindow_create (proto, settings_get_workspace_cases (proto));
624 b->subreader = casereader_create_random (proto, case_cnt, &shim_class, b);
625 casereader_swap (reader, b->subreader);
626 taint_propagate (casewindow_get_taint (b->window),
627 casereader_get_taint (reader));
628 taint_propagate (casereader_get_taint (b->subreader),
629 casereader_get_taint (reader));
632 /* Ensures that B's window contains at least CASE_CNT cases.
633 Return true if successful, false upon reaching the end of B's
634 subreader or an I/O error. */
636 prime_buffer (struct shim *b, casenumber case_cnt)
638 while (casewindow_get_case_cnt (b->window) < case_cnt)
640 struct ccase *tmp = casereader_read (b->subreader);
643 casewindow_push_head (b->window, tmp);
648 /* Reads the case at the given 0-based OFFSET from the front of
649 the window into C. Returns the case if successful, or a null
650 pointer if OFFSET is beyond the end of file or upon I/O error.
651 The caller must call case_unref() on the returned case when it
652 is no longer needed. */
653 static struct ccase *
654 shim_read (struct casereader *reader UNUSED, void *b_,
658 if (!prime_buffer (b, offset + 1))
660 return casewindow_get_case (b->window, offset);
665 shim_destroy (struct casereader *reader UNUSED, void *b_)
668 casewindow_destroy (b->window);
669 casereader_destroy (b->subreader);
673 /* Discards CNT cases from the front of B's window. */
675 shim_advance (struct casereader *reader UNUSED, void *b_, casenumber case_cnt)
678 casewindow_pop_tail (b->window, case_cnt);
681 /* Class for the buffered reader. */
682 static const struct casereader_random_class shim_class =