1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 2007, 2009 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
19 #include <data/casereader.h>
20 #include <data/casereader-provider.h>
24 #include <data/casewindow.h>
25 #include <data/casewriter.h>
26 #include <data/settings.h>
27 #include <libpspp/assertion.h>
28 #include <libpspp/heap.h>
29 #include <libpspp/taint.h>
36 struct taint *taint; /* Corrupted? */
37 struct caseproto *proto; /* Format of contained cases. */
38 casenumber case_cnt; /* Number of cases,
39 CASENUMBER_MAX if unknown. */
40 const struct casereader_class *class; /* Class. */
41 void *aux; /* Auxiliary data for class. */
44 static void insert_shim (struct casereader *);
46 /* Reads and returns the next case from READER. The caller owns
47 the returned case and must call case_unref on it when its data
48 is no longer needed. Returns a null pointer if cases have
49 been exhausted or upon detection of an I/O error.
51 The case returned is effectively consumed: it can never be
52 read again through READER. If this is inconvenient, READER
53 may be cloned in advance with casereader_clone, or
54 casereader_peek may be used instead. */
56 casereader_read (struct casereader *reader)
58 if (reader->case_cnt != 0)
60 /* ->read may use casereader_swap to replace itself by
61 another reader and then delegate to that reader by
62 recursively calling casereader_read. Currently only
63 lazy_casereader does this and, with luck, nothing else
66 To allow this to work, however, we must decrement
67 case_cnt before calling ->read. If we decremented
68 case_cnt after calling ->read, then this would actually
69 drop two cases from case_cnt instead of one, and we'd
70 lose the last case in the casereader. */
72 if (reader->case_cnt != CASENUMBER_MAX)
74 c = reader->class->read (reader, reader->aux);
77 size_t n_widths UNUSED = caseproto_get_n_widths (reader->proto);
78 assert (case_get_value_cnt (c) >= n_widths);
79 expensive_assert (caseproto_equal (case_get_proto (c), 0,
80 reader->proto, 0, n_widths));
89 Returns false if an I/O error was detected on READER, true
92 casereader_destroy (struct casereader *reader)
97 reader->class->destroy (reader, reader->aux);
98 ok = taint_destroy (reader->taint);
99 caseproto_unref (reader->proto);
105 /* Returns a clone of READER. READER and its clone may be used
106 to read the same sequence of cases in the same order, barring
109 casereader_clone (const struct casereader *reader_)
111 struct casereader *reader = (struct casereader *) reader_;
112 struct casereader *clone;
113 if ( reader == NULL )
116 if (reader->class->clone == NULL)
117 insert_shim (reader);
118 clone = reader->class->clone (reader, reader->aux);
119 assert (clone != NULL);
120 assert (clone != reader);
124 /* Makes a copy of ORIGINAL into *NEW1 (if NEW1 is non-null) and
125 *NEW2 (if NEW2 is non-null), then destroys ORIGINAL. */
127 casereader_split (struct casereader *original,
128 struct casereader **new1, struct casereader **new2)
130 if (new1 != NULL && new2 != NULL)
132 *new1 = casereader_rename (original);
133 *new2 = casereader_clone (*new1);
135 else if (new1 != NULL)
136 *new1 = casereader_rename (original);
137 else if (new2 != NULL)
138 *new2 = casereader_rename (original);
140 casereader_destroy (original);
143 /* Returns a copy of READER, which is itself destroyed.
144 Useful for taking over ownership of a casereader, to enforce
145 preventing the original owner from accessing the casereader
148 casereader_rename (struct casereader *reader)
150 struct casereader *new = xmemdup (reader, sizeof *reader);
155 /* Exchanges the casereaders referred to by A and B. */
157 casereader_swap (struct casereader *a, struct casereader *b)
161 struct casereader tmp = *a;
167 /* Reads and returns the (IDX + 1)'th case from READER. The
168 caller owns the returned case and must call case_unref on it
169 when it is no longer needed. Returns a null pointer if cases
170 have been exhausted or upon detection of an I/O error. */
172 casereader_peek (struct casereader *reader, casenumber idx)
174 if (idx < reader->case_cnt)
177 if (reader->class->peek == NULL)
178 insert_shim (reader);
179 c = reader->class->peek (reader, reader->aux, idx);
182 else if (casereader_error (reader))
183 reader->case_cnt = 0;
185 if (reader->case_cnt > idx)
186 reader->case_cnt = idx;
190 /* Returns true if no cases remain to be read from READER, or if
191 an error has occurred on READER. (A return value of false
192 does *not* mean that the next call to casereader_peek or
193 casereader_read will return true, because an error can occur
196 casereader_is_empty (struct casereader *reader)
198 if (reader->case_cnt == 0)
202 struct ccase *c = casereader_peek (reader, 0);
213 /* Returns true if an I/O error or another hard error has
214 occurred on READER, a clone of READER, or on some object on
215 which READER's data has a dependency, false otherwise. */
217 casereader_error (const struct casereader *reader)
219 return taint_is_tainted (reader->taint);
222 /* Marks READER as having encountered an error.
224 Ordinarily, this function should be called by the
225 implementation of a casereader, not by the casereader's
226 client. Instead, casereader clients should usually ensure
227 that a casereader's error state is correct by using
228 taint_propagate to propagate to the casereader's taint
229 structure, which may be obtained via casereader_get_taint. */
231 casereader_force_error (struct casereader *reader)
233 taint_set_taint (reader->taint);
236 /* Returns READER's associate taint object, for use with
237 taint_propagate and other taint functions. */
239 casereader_get_taint (const struct casereader *reader)
241 return reader->taint;
244 /* Returns the number of cases that will be read by successive
245 calls to casereader_read for READER, assuming that no errors
246 occur. Upon an error condition, the case count drops to 0, so
247 that no more cases can be obtained.
249 Not all casereaders can predict the number of cases that they
250 will produce without actually reading all of them. In that
251 case, this function returns CASENUMBER_MAX. To obtain the
252 actual number of cases in such a casereader, use
253 casereader_count_cases. */
255 casereader_get_case_cnt (struct casereader *reader)
257 return reader->case_cnt;
260 /* Returns the number of cases that will be read by successive
261 calls to casereader_read for READER, assuming that no errors
262 occur. Upon an error condition, the case count drops to 0, so
263 that no more cases can be obtained.
265 For a casereader that cannot predict the number of cases it
266 will produce, this function actually reads (and discards) all
267 of the contents of a clone of READER. Thus, the return value
268 is always correct in the absence of I/O errors. */
270 casereader_count_cases (struct casereader *reader)
272 if (reader->case_cnt == CASENUMBER_MAX)
274 casenumber n_cases = 0;
277 struct casereader *clone = casereader_clone (reader);
279 for (; (c = casereader_read (clone)) != NULL; case_unref (c))
282 casereader_destroy (clone);
283 reader->case_cnt = n_cases;
286 return reader->case_cnt;
289 /* Returns the prototype for the cases in READER. The caller
290 must not unref the returned prototype. */
291 const struct caseproto *
292 casereader_get_proto (const struct casereader *reader)
294 return reader->proto;
297 /* Copies all the cases in READER to WRITER, propagating errors
300 casereader_transfer (struct casereader *reader, struct casewriter *writer)
304 taint_propagate (casereader_get_taint (reader),
305 casewriter_get_taint (writer));
306 while ((c = casereader_read (reader)) != NULL)
307 casewriter_write (writer, c);
308 casereader_destroy (reader);
311 /* Creates and returns a new casereader. This function is
312 intended for use by casereader implementations, not by
315 This function is most suited for creating a casereader for a
316 data source that is naturally sequential.
317 casereader_create_random may be more appropriate for a data
318 source that supports random access.
320 Ordinarily, specify a null pointer for TAINT, in which case
321 the new casereader will have a new, unique taint object. If
322 the new casereader should have a clone of an existing taint
323 object, specify that object as TAINT. (This is most commonly
324 useful in an implementation of the "clone" casereader_class
325 function, in which case the cloned casereader should have the
326 same taint object as the original casereader.)
328 PROTO must be the prototype for the cases that may be read
329 from the casereader. The caller retains its reference to
332 CASE_CNT is an upper limit on the number of cases that
333 casereader_read will return from the casereader in successive
334 calls. Ordinarily, this is the actual number of cases in the
335 data source or CASENUMBER_MAX if the number of cases cannot be
336 predicted in advance.
338 CLASS and AUX are a set of casereader implementation-specific
339 member functions and auxiliary data to pass to those member
340 functions, respectively. */
342 casereader_create_sequential (const struct taint *taint,
343 const struct caseproto *proto,
345 const struct casereader_class *class, void *aux)
347 struct casereader *reader = xmalloc (sizeof *reader);
348 reader->taint = taint != NULL ? taint_clone (taint) : taint_create ();
349 reader->proto = caseproto_ref (proto);
350 reader->case_cnt = case_cnt;
351 reader->class = class;
356 /* If READER is a casereader of the given CLASS, returns its
357 associated auxiliary data; otherwise, returns a null pointer.
359 This function is intended for use from casereader
360 implementations, not by casereader users. Even within
361 casereader implementations, its usefulness is quite limited,
362 for at least two reasons. First, every casereader member
363 function already receives a pointer to the casereader's
364 auxiliary data. Second, a casereader's class can change
365 (through a call to casereader_swap) and this is in practice
366 quite common (e.g. any call to casereader_clone on a
367 casereader that does not directly support clone will cause the
368 casereader to be replaced by a shim caseader). */
370 casereader_dynamic_cast (struct casereader *reader,
371 const struct casereader_class *class)
373 return reader->class == class ? reader->aux : NULL;
376 /* Random-access casereader implementation.
378 This is a set of wrappers around casereader_create_sequential
379 and struct casereader_class to make it easy to create
380 efficient casereaders for data sources that natively support
383 /* One clone of a random reader. */
386 struct random_reader_shared *shared; /* Data shared among clones. */
387 struct heap_node heap_node; /* Node in shared data's heap of readers. */
388 casenumber offset; /* Number of cases already read. */
391 /* Returns the random_reader in which the given heap_node is
393 static struct random_reader *
394 random_reader_from_heap_node (const struct heap_node *node)
396 return heap_data (node, struct random_reader, heap_node);
399 /* Data shared among clones of a random reader. */
400 struct random_reader_shared
402 struct heap *readers; /* Heap of struct random_readers. */
403 casenumber min_offset; /* Smallest offset of any random_reader. */
404 const struct casereader_random_class *class;
408 static const struct casereader_class random_reader_casereader_class;
410 /* Creates and returns a new random_reader with the given SHARED
411 data and OFFSET. Inserts the new random reader into the
413 static struct random_reader *
414 make_random_reader (struct random_reader_shared *shared, casenumber offset)
416 struct random_reader *br = xmalloc (sizeof *br);
419 heap_insert (shared->readers, &br->heap_node);
423 /* Compares random_readers A and B by offset and returns a
424 strcmp()-like result. */
426 compare_random_readers_by_offset (const struct heap_node *a_,
427 const struct heap_node *b_,
428 const void *aux UNUSED)
430 const struct random_reader *a = random_reader_from_heap_node (a_);
431 const struct random_reader *b = random_reader_from_heap_node (b_);
432 return a->offset < b->offset ? -1 : a->offset > b->offset;
435 /* Creates and returns a new casereader. This function is
436 intended for use by casereader implementations, not by
439 This function is most suited for creating a casereader for a
440 data source that supports random access.
441 casereader_create_sequential is more appropriate for a data
442 source that is naturally sequential.
444 PROTO must be the prototype for the cases that may be read
445 from the casereader. The caller retains its reference to
448 CASE_CNT is an upper limit on the number of cases that
449 casereader_read will return from the casereader in successive
450 calls. Ordinarily, this is the actual number of cases in the
451 data source or CASENUMBER_MAX if the number of cases cannot be
452 predicted in advance.
454 CLASS and AUX are a set of casereader implementation-specific
455 member functions and auxiliary data to pass to those member
456 functions, respectively. */
458 casereader_create_random (const struct caseproto *proto, casenumber case_cnt,
459 const struct casereader_random_class *class,
462 struct random_reader_shared *shared = xmalloc (sizeof *shared);
463 shared->readers = heap_create (compare_random_readers_by_offset, NULL);
464 shared->class = class;
466 shared->min_offset = 0;
467 return casereader_create_sequential (NULL, proto, case_cnt,
468 &random_reader_casereader_class,
469 make_random_reader (shared, 0));
472 /* Reassesses the min_offset in SHARED based on the minimum
473 offset in the heap. */
475 advance_random_reader (struct casereader *reader,
476 struct random_reader_shared *shared)
480 old = shared->min_offset;
481 new = random_reader_from_heap_node (heap_minimum (shared->readers))->offset;
485 shared->min_offset = new;
486 shared->class->advance (reader, shared->aux, new - old);
490 /* struct casereader_class "read" function for random reader. */
491 static struct ccase *
492 random_reader_read (struct casereader *reader, void *br_)
494 struct random_reader *br = br_;
495 struct random_reader_shared *shared = br->shared;
496 struct ccase *c = shared->class->read (reader, shared->aux,
497 br->offset - shared->min_offset);
501 heap_changed (shared->readers, &br->heap_node);
502 advance_random_reader (reader, shared);
507 /* struct casereader_class "destroy" function for random
510 random_reader_destroy (struct casereader *reader, void *br_)
512 struct random_reader *br = br_;
513 struct random_reader_shared *shared = br->shared;
515 heap_delete (shared->readers, &br->heap_node);
516 if (heap_is_empty (shared->readers))
518 heap_destroy (shared->readers);
519 shared->class->destroy (reader, shared->aux);
523 advance_random_reader (reader, shared);
528 /* struct casereader_class "clone" function for random reader. */
529 static struct casereader *
530 random_reader_clone (struct casereader *reader, void *br_)
532 struct random_reader *br = br_;
533 struct random_reader_shared *shared = br->shared;
534 return casereader_create_sequential (casereader_get_taint (reader),
536 casereader_get_case_cnt (reader),
537 &random_reader_casereader_class,
538 make_random_reader (shared,
542 /* struct casereader_class "peek" function for random reader. */
543 static struct ccase *
544 random_reader_peek (struct casereader *reader, void *br_, casenumber idx)
546 struct random_reader *br = br_;
547 struct random_reader_shared *shared = br->shared;
549 return shared->class->read (reader, shared->aux,
550 br->offset - shared->min_offset + idx);
553 /* Casereader class for random reader. */
554 static const struct casereader_class random_reader_casereader_class =
557 random_reader_destroy,
562 /* Buffering shim for implementing clone and peek operations.
564 The "clone" and "peek" operations aren't implemented by all
565 types of casereaders, but we have to expose a uniform
566 interface anyhow. We do this by interposing a buffering
567 casereader on top of the existing casereader on the first call
568 to "clone" or "peek". The buffering casereader maintains a
569 window of cases that spans the positions of the original
570 casereader and all of its clones (the "clone set"), from the
571 position of the casereader that has read the fewest cases to
572 the position of the casereader that has read the most.
574 Thus, if all of the casereaders in the clone set are at
575 approximately the same position, only a few cases are buffered
576 and there is little inefficiency. If, on the other hand, one
577 casereader is not used to read any cases at all, but another
578 one is used to read all of the cases, the entire contents of
579 the casereader is copied into the buffer. This still might
580 not be so inefficient, given that case data in memory is
581 shared across multiple identical copies, but in the worst case
582 the window implementation will write cases to disk instead of
583 maintaining them in-memory. */
585 /* A buffering shim for a non-clonable or non-peekable
589 struct casewindow *window; /* Window of buffered cases. */
590 struct casereader *subreader; /* Subordinate casereader. */
593 static const struct casereader_random_class shim_class;
595 /* Interposes a buffering shim atop READER. */
597 insert_shim (struct casereader *reader)
599 const struct caseproto *proto = casereader_get_proto (reader);
600 casenumber case_cnt = casereader_get_case_cnt (reader);
601 struct shim *b = xmalloc (sizeof *b);
602 b->window = casewindow_create (proto, settings_get_workspace_cases (proto));
603 b->subreader = casereader_create_random (proto, case_cnt, &shim_class, b);
604 casereader_swap (reader, b->subreader);
605 taint_propagate (casewindow_get_taint (b->window),
606 casereader_get_taint (reader));
607 taint_propagate (casereader_get_taint (b->subreader),
608 casereader_get_taint (reader));
611 /* Ensures that B's window contains at least CASE_CNT cases.
612 Return true if successful, false upon reaching the end of B's
613 subreader or an I/O error. */
615 prime_buffer (struct shim *b, casenumber case_cnt)
617 while (casewindow_get_case_cnt (b->window) < case_cnt)
619 struct ccase *tmp = casereader_read (b->subreader);
622 casewindow_push_head (b->window, tmp);
627 /* Reads the case at the given 0-based OFFSET from the front of
628 the window into C. Returns the case if successful, or a null
629 pointer if OFFSET is beyond the end of file or upon I/O error.
630 The caller must call case_unref() on the returned case when it
631 is no longer needed. */
632 static struct ccase *
633 shim_read (struct casereader *reader UNUSED, void *b_,
637 if (!prime_buffer (b, offset + 1))
639 return casewindow_get_case (b->window, offset);
644 shim_destroy (struct casereader *reader UNUSED, void *b_)
647 casewindow_destroy (b->window);
648 casereader_destroy (b->subreader);
652 /* Discards CNT cases from the front of B's window. */
654 shim_advance (struct casereader *reader UNUSED, void *b_, casenumber case_cnt)
657 casewindow_pop_tail (b->window, case_cnt);
660 /* Class for the buffered reader. */
661 static const struct casereader_random_class shim_class =