1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 2007 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
19 #include <data/casereader.h>
20 #include <data/casereader-provider.h>
24 #include <data/casewindow.h>
25 #include <data/casewriter.h>
26 #include <data/settings.h>
27 #include <libpspp/assertion.h>
28 #include <libpspp/heap.h>
29 #include <libpspp/taint.h>
36 struct taint *taint; /* Corrupted? */
37 size_t value_cnt; /* Values per case. */
38 casenumber case_cnt; /* Number of cases,
39 CASENUMBER_MAX if unknown. */
40 const struct casereader_class *class; /* Class. */
41 void *aux; /* Auxiliary data for class. */
44 static void insert_shim (struct casereader *);
46 /* Creates a new case in C and reads the next case from READER
47 into it. The caller owns C and must destroy C when its data
48 is no longer needed. Return true if successful, false when
49 cases have been exhausted or upon detection of an I/O error.
50 In the latter case, C is set to the null case.
52 The case returned is effectively consumed: it can never be
53 read again through READER. If this is inconvenient, READER
54 may be cloned in advance with casereader_clone, or
55 casereader_peek may be used instead. */
57 casereader_read (struct casereader *reader, struct ccase *c)
59 if (reader->case_cnt != 0 && reader->class->read (reader, reader->aux, c))
61 assert (case_get_value_cnt (c) >= reader->value_cnt);
62 if (reader->case_cnt != CASENUMBER_MAX)
75 Returns false if an I/O error was detected on READER, true
78 casereader_destroy (struct casereader *reader)
83 reader->class->destroy (reader, reader->aux);
84 ok = taint_destroy (reader->taint);
90 /* Returns a clone of READER. READER and its clone may be used
91 to read the same sequence of cases in the same order, barring
94 casereader_clone (const struct casereader *reader_)
96 struct casereader *reader = (struct casereader *) reader_;
97 struct casereader *clone;
101 if (reader->class->clone == NULL)
102 insert_shim (reader);
103 clone = reader->class->clone (reader, reader->aux);
104 assert (clone != NULL);
105 assert (clone != reader);
109 /* Makes a copy of ORIGINAL into *NEW1 (if NEW1 is non-null) and
110 *NEW2 (if NEW2 is non-null), then destroys ORIGINAL. */
112 casereader_split (struct casereader *original,
113 struct casereader **new1, struct casereader **new2)
115 if (new1 != NULL && new2 != NULL)
117 *new1 = casereader_rename (original);
118 *new2 = casereader_clone (*new1);
120 else if (new1 != NULL)
121 *new1 = casereader_rename (original);
122 else if (new2 != NULL)
123 *new2 = casereader_rename (original);
125 casereader_destroy (original);
128 /* Returns a copy of READER, which is itself destroyed.
129 Useful for taking over ownership of a casereader, to enforce
130 preventing the original owner from accessing the casereader
133 casereader_rename (struct casereader *reader)
135 struct casereader *new = xmemdup (reader, sizeof *reader);
140 /* Exchanges the casereaders referred to by A and B. */
142 casereader_swap (struct casereader *a, struct casereader *b)
146 struct casereader tmp = *a;
152 /* Creates a new case in C and reads the (IDX + 1)'th case from
153 READER into it. The caller owns C and must destroy C when its
154 data is no longer needed. Return true if successful, false
155 when cases have been exhausted or upon detection of an I/O
156 error. In the latter case, C is set to the null case. */
158 casereader_peek (struct casereader *reader, casenumber idx, struct ccase *c)
160 if (idx < reader->case_cnt)
162 if (reader->class->peek == NULL)
163 insert_shim (reader);
164 if (reader->class->peek (reader, reader->aux, idx, c))
166 else if (casereader_error (reader))
167 reader->case_cnt = 0;
169 if (reader->case_cnt > idx)
170 reader->case_cnt = idx;
175 /* Returns true if an I/O error or another hard error has
176 occurred on READER, a clone of READER, or on some object on
177 which READER's data has a dependency, false otherwise. */
179 casereader_error (const struct casereader *reader)
181 return taint_is_tainted (reader->taint);
184 /* Marks READER as having encountered an error.
186 Ordinarily, this function should be called by the
187 implementation of a casereader, not by the casereader's
188 client. Instead, casereader clients should usually ensure
189 that a casereader's error state is correct by using
190 taint_propagate to propagate to the casereader's taint
191 structure, which may be obtained via casereader_get_taint. */
193 casereader_force_error (struct casereader *reader)
195 taint_set_taint (reader->taint);
198 /* Returns READER's associate taint object, for use with
199 taint_propagate and other taint functions. */
201 casereader_get_taint (const struct casereader *reader)
203 return reader->taint;
206 /* Returns the number of cases that will be read by successive
207 calls to casereader_read for READER, assuming that no errors
208 occur. Upon an error condition, the case count drops to 0, so
209 that no more cases can be obtained.
211 Not all casereaders can predict the number of cases that they
212 will produce without actually reading all of them. In that
213 case, this function returns CASENUMBER_MAX. To obtain the
214 actual number of cases in such a casereader, use
215 casereader_count_cases. */
217 casereader_get_case_cnt (struct casereader *reader)
219 return reader->case_cnt;
222 /* Returns the number of cases that will be read by successive
223 calls to casereader_read for READER, assuming that no errors
224 occur. Upon an error condition, the case count drops to 0, so
225 that no more cases can be obtained.
227 For a casereader that cannot predict the number of cases it
228 will produce, this function actually reads (and discards) all
229 of the contents of a clone of READER. Thus, the return value
230 is always correct in the absence of I/O errors. */
232 casereader_count_cases (struct casereader *reader)
234 if (reader->case_cnt == CASENUMBER_MAX)
236 casenumber n_cases = 0;
239 struct casereader *clone = casereader_clone (reader);
241 for (; casereader_read (clone, &c); case_destroy (&c))
244 casereader_destroy (clone);
245 reader->case_cnt = n_cases;
248 return reader->case_cnt;
251 /* Returns the number of struct values in each case in READER. */
253 casereader_get_value_cnt (struct casereader *reader)
255 return reader->value_cnt;
258 /* Copies all the cases in READER to WRITER, propagating errors
261 casereader_transfer (struct casereader *reader, struct casewriter *writer)
265 taint_propagate (casereader_get_taint (reader),
266 casewriter_get_taint (writer));
267 while (casereader_read (reader, &c))
268 casewriter_write (writer, &c);
269 casereader_destroy (reader);
272 /* Creates and returns a new casereader. This function is
273 intended for use by casereader implementations, not by
276 This function is most suited for creating a casereader for a
277 data source that is naturally sequential.
278 casereader_create_random may be more appropriate for a data
279 source that supports random access.
281 Ordinarily, specify a null pointer for TAINT, in which case
282 the new casereader will have a new, unique taint object. If
283 the new casereader should have a clone of an existing taint
284 object, specify that object as TAINT. (This is most commonly
285 useful in an implementation of the "clone" casereader_class
286 function, in which case the cloned casereader should have the
287 same taint object as the original casereader.)
289 VALUE_CNT must be the number of struct values per case read
292 CASE_CNT is an upper limit on the number of cases that
293 casereader_read will return from the casereader in successive
294 calls. Ordinarily, this is the actual number of cases in the
295 data source or CASENUMBER_MAX if the number of cases cannot be
296 predicted in advance.
298 CLASS and AUX are a set of casereader implementation-specific
299 member functions and auxiliary data to pass to those member
300 functions, respectively. */
302 casereader_create_sequential (const struct taint *taint,
303 size_t value_cnt, casenumber case_cnt,
304 const struct casereader_class *class, void *aux)
306 struct casereader *reader = xmalloc (sizeof *reader);
307 reader->taint = taint != NULL ? taint_clone (taint) : taint_create ();
308 reader->value_cnt = value_cnt;
309 reader->case_cnt = case_cnt;
310 reader->class = class;
315 /* If READER is a casereader of the given CLASS, returns its
316 associated auxiliary data; otherwise, returns a null pointer.
318 This function is intended for use from casereader
319 implementations, not by casereader users. Even within
320 casereader implementations, its usefulness is quite limited,
321 for at least two reasons. First, every casereader member
322 function already receives a pointer to the casereader's
323 auxiliary data. Second, a casereader's class can change
324 (through a call to casereader_swap) and this is in practice
325 quite common (e.g. any call to casereader_clone on a
326 casereader that does not directly support clone will cause the
327 casereader to be replaced by a shim caseader). */
329 casereader_dynamic_cast (struct casereader *reader,
330 struct casereader_class *class)
332 return reader->class == class ? reader->aux : NULL;
335 /* Random-access casereader implementation.
337 This is a set of wrappers around casereader_create_sequential
338 and struct casereader_class to make it easy to create
339 efficient casereaders for data sources that natively support
342 /* One clone of a random reader. */
345 struct random_reader_shared *shared; /* Data shared among clones. */
346 struct heap_node heap_node; /* Node in shared data's heap of readers. */
347 casenumber offset; /* Number of cases already read. */
350 /* Returns the random_reader in which the given heap_node is
352 static struct random_reader *
353 random_reader_from_heap_node (const struct heap_node *node)
355 return heap_data (node, struct random_reader, heap_node);
358 /* Data shared among clones of a random reader. */
359 struct random_reader_shared
361 struct heap *readers; /* Heap of struct random_readers. */
362 casenumber min_offset; /* Smallest offset of any random_reader. */
363 const struct casereader_random_class *class;
367 static struct casereader_class random_reader_casereader_class;
369 /* Creates and returns a new random_reader with the given SHARED
370 data and OFFSET. Inserts the new random reader into the
372 static struct random_reader *
373 make_random_reader (struct random_reader_shared *shared, casenumber offset)
375 struct random_reader *br = xmalloc (sizeof *br);
378 heap_insert (shared->readers, &br->heap_node);
382 /* Compares random_readers A and B by offset and returns a
383 strcmp()-like result. */
385 compare_random_readers_by_offset (const struct heap_node *a_,
386 const struct heap_node *b_,
387 const void *aux UNUSED)
389 const struct random_reader *a = random_reader_from_heap_node (a_);
390 const struct random_reader *b = random_reader_from_heap_node (b_);
391 return a->offset < b->offset ? -1 : a->offset > b->offset;
394 /* Creates and returns a new casereader. This function is
395 intended for use by casereader implementations, not by
398 This function is most suited for creating a casereader for a
399 data source that supports random access.
400 casereader_create_sequential is more appropriate for a data
401 source that is naturally sequential.
403 VALUE_CNT must be the number of struct values per case read
406 CASE_CNT is an upper limit on the number of cases that
407 casereader_read will return from the casereader in successive
408 calls. Ordinarily, this is the actual number of cases in the
409 data source or CASENUMBER_MAX if the number of cases cannot be
410 predicted in advance.
412 CLASS and AUX are a set of casereader implementation-specific
413 member functions and auxiliary data to pass to those member
414 functions, respectively. */
416 casereader_create_random (size_t value_cnt, casenumber case_cnt,
417 const struct casereader_random_class *class,
420 struct random_reader_shared *shared = xmalloc (sizeof *shared);
421 shared->readers = heap_create (compare_random_readers_by_offset, NULL);
422 shared->class = class;
424 shared->min_offset = 0;
425 return casereader_create_sequential (NULL, value_cnt, case_cnt,
426 &random_reader_casereader_class,
427 make_random_reader (shared, 0));
430 /* Reassesses the min_offset in SHARED based on the minimum
431 offset in the heap. */
433 advance_random_reader (struct casereader *reader,
434 struct random_reader_shared *shared)
438 old = shared->min_offset;
439 new = random_reader_from_heap_node (heap_minimum (shared->readers))->offset;
443 shared->min_offset = new;
444 shared->class->advance (reader, shared->aux, new - old);
448 /* struct casereader_class "read" function for random reader. */
450 random_reader_read (struct casereader *reader, void *br_, struct ccase *c)
452 struct random_reader *br = br_;
453 struct random_reader_shared *shared = br->shared;
455 if (shared->class->read (reader, shared->aux,
456 br->offset - shared->min_offset, c))
459 heap_changed (shared->readers, &br->heap_node);
460 advance_random_reader (reader, shared);
467 /* struct casereader_class "destroy" function for random
470 random_reader_destroy (struct casereader *reader, void *br_)
472 struct random_reader *br = br_;
473 struct random_reader_shared *shared = br->shared;
475 heap_delete (shared->readers, &br->heap_node);
476 if (heap_is_empty (shared->readers))
478 heap_destroy (shared->readers);
479 shared->class->destroy (reader, shared->aux);
483 advance_random_reader (reader, shared);
488 /* struct casereader_class "clone" function for random reader. */
489 static struct casereader *
490 random_reader_clone (struct casereader *reader, void *br_)
492 struct random_reader *br = br_;
493 struct random_reader_shared *shared = br->shared;
494 return casereader_create_sequential (casereader_get_taint (reader),
495 casereader_get_value_cnt (reader),
496 casereader_get_case_cnt (reader),
497 &random_reader_casereader_class,
498 make_random_reader (shared,
502 /* struct casereader_class "peek" function for random reader. */
504 random_reader_peek (struct casereader *reader, void *br_,
505 casenumber idx, struct ccase *c)
507 struct random_reader *br = br_;
508 struct random_reader_shared *shared = br->shared;
510 return shared->class->read (reader, shared->aux,
511 br->offset - shared->min_offset + idx, c);
514 /* Casereader class for random reader. */
515 static struct casereader_class random_reader_casereader_class =
518 random_reader_destroy,
523 /* Buffering shim for implementing clone and peek operations.
525 The "clone" and "peek" operations aren't implemented by all
526 types of casereaders, but we have to expose a uniform
527 interface anyhow. We do this by interposing a buffering
528 casereader on top of the existing casereader on the first call
529 to "clone" or "peek". The buffering casereader maintains a
530 window of cases that spans the positions of the original
531 casereader and all of its clones (the "clone set"), from the
532 position of the casereader that has read the fewest cases to
533 the position of the casereader that has read the most.
535 Thus, if all of the casereaders in the clone set are at
536 approximately the same position, only a few cases are buffered
537 and there is little inefficiency. If, on the other hand, one
538 casereader is not used to read any cases at all, but another
539 one is used to read all of the cases, the entire contents of
540 the casereader is copied into the buffer. This still might
541 not be so inefficient, given that case data in memory is
542 shared across multiple identical copies, but in the worst case
543 the window implementation will write cases to disk instead of
544 maintaining them in-memory. */
546 /* A buffering shim for a non-clonable or non-peekable
550 struct casewindow *window; /* Window of buffered cases. */
551 struct casereader *subreader; /* Subordinate casereader. */
554 static struct casereader_random_class shim_class;
556 /* Interposes a buffering shim atop READER. */
558 insert_shim (struct casereader *reader)
560 size_t value_cnt = casereader_get_value_cnt (reader);
561 casenumber case_cnt = casereader_get_case_cnt (reader);
562 struct shim *b = xmalloc (sizeof *b);
563 b->window = casewindow_create (value_cnt, get_workspace_cases (value_cnt));
564 b->subreader = casereader_create_random (value_cnt, case_cnt,
566 casereader_swap (reader, b->subreader);
567 taint_propagate (casewindow_get_taint (b->window),
568 casereader_get_taint (reader));
569 taint_propagate (casereader_get_taint (b->subreader),
570 casereader_get_taint (reader));
573 /* Ensures that B's window contains at least CASE_CNT cases.
574 Return true if successful, false upon reaching the end of B's
575 subreader or an I/O error. */
577 prime_buffer (struct shim *b, casenumber case_cnt)
579 while (casewindow_get_case_cnt (b->window) < case_cnt)
582 if (!casereader_read (b->subreader, &tmp))
584 casewindow_push_head (b->window, &tmp);
589 /* Reads the case at the given 0-based OFFSET from the front of
590 the window into C. Returns true if successful, false if
591 OFFSET is beyond the end of file or upon I/O error. */
593 shim_read (struct casereader *reader UNUSED, void *b_,
594 casenumber offset, struct ccase *c)
597 return (prime_buffer (b, offset + 1)
598 && casewindow_get_case (b->window, offset, c));
603 shim_destroy (struct casereader *reader UNUSED, void *b_)
606 casewindow_destroy (b->window);
607 casereader_destroy (b->subreader);
611 /* Discards CNT cases from the front of B's window. */
613 shim_advance (struct casereader *reader UNUSED, void *b_, casenumber case_cnt)
616 casewindow_pop_tail (b->window, case_cnt);
619 /* Class for the buffered reader. */
620 static struct casereader_random_class shim_class =