1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 2007 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
19 #include <data/casereader.h>
20 #include <data/casereader-provider.h>
24 #include <data/casewindow.h>
25 #include <data/casewriter.h>
26 #include <data/settings.h>
27 #include <libpspp/assertion.h>
28 #include <libpspp/heap.h>
29 #include <libpspp/taint.h>
36 struct taint *taint; /* Corrupted? */
37 size_t value_cnt; /* Values per case. */
38 casenumber case_cnt; /* Number of cases,
39 CASENUMBER_MAX if unknown. */
40 const struct casereader_class *class; /* Class. */
41 void *aux; /* Auxiliary data for class. */
44 static void insert_shim (struct casereader *);
46 /* Creates a new case in C and reads the next case from READER
47 into it. The caller owns C and must destroy C when its data
48 is no longer needed. Return true if successful, false when
49 cases have been exhausted or upon detection of an I/O error.
50 In the latter case, C is set to the null case.
52 The case returned is effectively consumed: it can never be
53 read again through READER. If this is inconvenient, READER
54 may be cloned in advance with casereader_clone, or
55 casereader_peek may be used instead. */
57 casereader_read (struct casereader *reader, struct ccase *c)
59 if (reader->case_cnt != 0 && reader->class->read (reader, reader->aux, c))
61 assert (case_get_value_cnt (c) >= reader->value_cnt);
62 if (reader->case_cnt != CASENUMBER_MAX)
75 Returns false if an I/O error was detected on READER, true
78 casereader_destroy (struct casereader *reader)
83 reader->class->destroy (reader, reader->aux);
84 ok = taint_destroy (reader->taint);
90 /* Returns a clone of READER. READER and its clone may be used
91 to read the same sequence of cases in the same order, barring
94 casereader_clone (const struct casereader *reader_)
96 struct casereader *reader = (struct casereader *) reader_;
97 struct casereader *clone;
101 if (reader->class->clone == NULL)
102 insert_shim (reader);
103 clone = reader->class->clone (reader, reader->aux);
104 assert (clone != NULL);
105 assert (clone != reader);
109 /* Makes a copy of ORIGINAL into *NEW1 (if NEW1 is non-null) and
110 *NEW2 (if NEW2 is non-null), then destroys ORIGINAL. */
112 casereader_split (struct casereader *original,
113 struct casereader **new1, struct casereader **new2)
115 if (new1 != NULL && new2 != NULL)
117 *new1 = casereader_rename (original);
118 *new2 = casereader_clone (*new1);
120 else if (new1 != NULL)
121 *new1 = casereader_rename (original);
122 else if (new2 != NULL)
123 *new2 = casereader_rename (original);
125 casereader_destroy (original);
128 /* Returns a copy of READER, which is itself destroyed.
129 Useful for taking over ownership of a casereader, to enforce
130 preventing the original owner from accessing the casereader
133 casereader_rename (struct casereader *reader)
135 struct casereader *new = xmemdup (reader, sizeof *reader);
140 /* Exchanges the casereaders referred to by A and B. */
142 casereader_swap (struct casereader *a, struct casereader *b)
146 struct casereader tmp = *a;
152 /* Creates a new case in C and reads the (IDX + 1)'th case from
153 READER into it. The caller owns C and must destroy C when its
154 data is no longer needed. Return true if successful, false
155 when cases have been exhausted or upon detection of an I/O
156 error. In the latter case, C is set to the null case. */
158 casereader_peek (struct casereader *reader, casenumber idx, struct ccase *c)
160 if (idx < reader->case_cnt)
162 if (reader->class->peek == NULL)
163 insert_shim (reader);
164 if (reader->class->peek (reader, reader->aux, idx, c))
166 else if (casereader_error (reader))
167 reader->case_cnt = 0;
169 if (reader->case_cnt > idx)
170 reader->case_cnt = idx;
175 /* Returns true if an I/O error or another hard error has
176 occurred on READER, a clone of READER, or on some object on
177 which READER's data has a dependency, false otherwise. */
179 casereader_error (const struct casereader *reader)
181 return taint_is_tainted (reader->taint);
184 /* Marks READER as having encountered an error.
186 Ordinarily, this function should be called by the
187 implementation of a casereader, not by the casereader's
188 client. Instead, casereader clients should usually ensure
189 that a casereader's error state is correct by using
190 taint_propagate to propagate to the casereader's taint
191 structure, which may be obtained via casereader_get_taint. */
193 casereader_force_error (struct casereader *reader)
195 taint_set_taint (reader->taint);
198 /* Returns READER's associate taint object, for use with
199 taint_propagate and other taint functions. */
201 casereader_get_taint (const struct casereader *reader)
203 return reader->taint;
206 /* Returns the number of cases that will be read by successive
207 calls to casereader_read for READER, assuming that no errors
208 occur. Upon an error condition, the case count drops to 0, so
209 that no more cases can be obtained.
211 Not all casereaders can predict the number of cases that they
212 will produce without actually reading all of them. In that
213 case, this function returns CASENUMBER_MAX. To obtain the
214 actual number of cases in such a casereader, use
215 casereader_count_cases. */
217 casereader_get_case_cnt (struct casereader *reader)
219 return reader->case_cnt;
222 /* Returns the number of cases that will be read by successive
223 calls to casereader_read for READER, assuming that no errors
224 occur. Upon an error condition, the case count drops to 0, so
225 that no more cases can be obtained.
227 For a casereader that cannot predict the number of cases it
228 will produce, this function actually reads (and discards) all
229 of the contents of a clone of READER. Thus, the return value
230 is always correct in the absence of I/O errors. */
232 casereader_count_cases (struct casereader *reader)
234 if (reader->case_cnt == CASENUMBER_MAX)
236 casenumber n_cases = 0;
239 struct casereader *clone = casereader_clone (reader);
241 for (; casereader_read (clone, &c); case_destroy (&c))
244 casereader_destroy (clone);
245 reader->case_cnt = n_cases;
248 return reader->case_cnt;
251 /* Returns the number of struct values in each case in READER. */
253 casereader_get_value_cnt (struct casereader *reader)
255 return reader->value_cnt;
258 /* Copies all the cases in READER to WRITER, propagating errors
261 casereader_transfer (struct casereader *reader, struct casewriter *writer)
265 taint_propagate (casereader_get_taint (reader),
266 casewriter_get_taint (writer));
267 while (casereader_read (reader, &c))
268 casewriter_write (writer, &c);
269 casereader_destroy (reader);
272 /* Creates and returns a new casereader. This function is
273 intended for use by casereader implementations, not by
276 This function is most suited for creating a casereader for a
277 data source that is naturally sequential.
278 casereader_create_random may be more appropriate for a data
279 source that supports random access.
281 Ordinarily, specify a null pointer for TAINT, in which case
282 the new casereader will have a new, unique taint object. If
283 the new casereader should have a clone of an existing taint
284 object, specify that object as TAINT. (This is most commonly
285 useful in an implementation of the "clone" casereader_class
286 function, in which case the cloned casereader should have the
287 same taint object as the original casereader.)
289 VALUE_CNT must be the number of struct values per case read
292 CASE_CNT is an upper limit on the number of cases that
293 casereader_read will return from the casereader in successive
294 calls. Ordinarily, this is the actual number of cases in the
295 data source or CASENUMBER_MAX if the number of cases cannot be
296 predicted in advance.
298 CLASS and AUX are a set of casereader implementation-specific
299 member functions and auxiliary data to pass to those member
300 functions, respectively. */
302 casereader_create_sequential (const struct taint *taint,
303 size_t value_cnt, casenumber case_cnt,
304 const struct casereader_class *class, void *aux)
306 struct casereader *reader = xmalloc (sizeof *reader);
307 reader->taint = taint != NULL ? taint_clone (taint) : taint_create ();
308 reader->value_cnt = value_cnt;
309 reader->case_cnt = case_cnt;
310 reader->class = class;
315 /* Random-access casereader implementation.
317 This is a set of wrappers around casereader_create_sequential
318 and struct casereader_class to make it easy to create
319 efficient casereaders for data sources that natively support
322 /* One clone of a random reader. */
325 struct random_reader_shared *shared; /* Data shared among clones. */
326 struct heap_node heap_node; /* Node in shared data's heap of readers. */
327 casenumber offset; /* Number of cases already read. */
330 /* Returns the random_reader in which the given heap_node is
332 static struct random_reader *
333 random_reader_from_heap_node (const struct heap_node *node)
335 return heap_data (node, struct random_reader, heap_node);
338 /* Data shared among clones of a random reader. */
339 struct random_reader_shared
341 struct heap *readers; /* Heap of struct random_readers. */
342 casenumber min_offset; /* Smallest offset of any random_reader. */
343 const struct casereader_random_class *class;
347 static struct casereader_class random_reader_casereader_class;
349 /* Creates and returns a new random_reader with the given SHARED
350 data and OFFSET. Inserts the new random reader into the
352 static struct random_reader *
353 make_random_reader (struct random_reader_shared *shared, casenumber offset)
355 struct random_reader *br = xmalloc (sizeof *br);
358 heap_insert (shared->readers, &br->heap_node);
362 /* Compares random_readers A and B by offset and returns a
363 strcmp()-like result. */
365 compare_random_readers_by_offset (const struct heap_node *a_,
366 const struct heap_node *b_,
367 const void *aux UNUSED)
369 const struct random_reader *a = random_reader_from_heap_node (a_);
370 const struct random_reader *b = random_reader_from_heap_node (b_);
371 return a->offset < b->offset ? -1 : a->offset > b->offset;
374 /* Creates and returns a new casereader. This function is
375 intended for use by casereader implementations, not by
378 This function is most suited for creating a casereader for a
379 data source that supports random access.
380 casereader_create_sequential is more appropriate for a data
381 source that is naturally sequential.
383 VALUE_CNT must be the number of struct values per case read
386 CASE_CNT is an upper limit on the number of cases that
387 casereader_read will return from the casereader in successive
388 calls. Ordinarily, this is the actual number of cases in the
389 data source or CASENUMBER_MAX if the number of cases cannot be
390 predicted in advance.
392 CLASS and AUX are a set of casereader implementation-specific
393 member functions and auxiliary data to pass to those member
394 functions, respectively. */
396 casereader_create_random (size_t value_cnt, casenumber case_cnt,
397 const struct casereader_random_class *class,
400 struct random_reader_shared *shared = xmalloc (sizeof *shared);
401 shared->readers = heap_create (compare_random_readers_by_offset, NULL);
402 shared->class = class;
404 shared->min_offset = 0;
405 return casereader_create_sequential (NULL, value_cnt, case_cnt,
406 &random_reader_casereader_class,
407 make_random_reader (shared, 0));
410 /* Reassesses the min_offset in SHARED based on the minimum
411 offset in the heap. */
413 advance_random_reader (struct casereader *reader,
414 struct random_reader_shared *shared)
418 old = shared->min_offset;
419 new = random_reader_from_heap_node (heap_minimum (shared->readers))->offset;
423 shared->min_offset = new;
424 shared->class->advance (reader, shared->aux, new - old);
428 /* struct casereader_class "read" function for random reader. */
430 random_reader_read (struct casereader *reader, void *br_, struct ccase *c)
432 struct random_reader *br = br_;
433 struct random_reader_shared *shared = br->shared;
435 if (shared->class->read (reader, shared->aux,
436 br->offset - shared->min_offset, c))
439 heap_changed (shared->readers, &br->heap_node);
440 advance_random_reader (reader, shared);
447 /* struct casereader_class "destroy" function for random
450 random_reader_destroy (struct casereader *reader, void *br_)
452 struct random_reader *br = br_;
453 struct random_reader_shared *shared = br->shared;
455 heap_delete (shared->readers, &br->heap_node);
456 if (heap_is_empty (shared->readers))
458 heap_destroy (shared->readers);
459 shared->class->destroy (reader, shared->aux);
463 advance_random_reader (reader, shared);
468 /* struct casereader_class "clone" function for random reader. */
469 static struct casereader *
470 random_reader_clone (struct casereader *reader, void *br_)
472 struct random_reader *br = br_;
473 struct random_reader_shared *shared = br->shared;
474 return casereader_create_sequential (casereader_get_taint (reader),
475 casereader_get_value_cnt (reader),
476 casereader_get_case_cnt (reader),
477 &random_reader_casereader_class,
478 make_random_reader (shared,
482 /* struct casereader_class "peek" function for random reader. */
484 random_reader_peek (struct casereader *reader, void *br_,
485 casenumber idx, struct ccase *c)
487 struct random_reader *br = br_;
488 struct random_reader_shared *shared = br->shared;
490 return shared->class->read (reader, shared->aux,
491 br->offset - shared->min_offset + idx, c);
494 /* Casereader class for random reader. */
495 static struct casereader_class random_reader_casereader_class =
498 random_reader_destroy,
503 /* Buffering shim for implementing clone and peek operations.
505 The "clone" and "peek" operations aren't implemented by all
506 types of casereaders, but we have to expose a uniform
507 interface anyhow. We do this by interposing a buffering
508 casereader on top of the existing casereader on the first call
509 to "clone" or "peek". The buffering casereader maintains a
510 window of cases that spans the positions of the original
511 casereader and all of its clones (the "clone set"), from the
512 position of the casereader that has read the fewest cases to
513 the position of the casereader that has read the most.
515 Thus, if all of the casereaders in the clone set are at
516 approximately the same position, only a few cases are buffered
517 and there is little inefficiency. If, on the other hand, one
518 casereader is not used to read any cases at all, but another
519 one is used to read all of the cases, the entire contents of
520 the casereader is copied into the buffer. This still might
521 not be so inefficient, given that case data in memory is
522 shared across multiple identical copies, but in the worst case
523 the window implementation will write cases to disk instead of
524 maintaining them in-memory. */
526 /* A buffering shim for a non-clonable or non-peekable
530 struct casewindow *window; /* Window of buffered cases. */
531 struct casereader *subreader; /* Subordinate casereader. */
534 static struct casereader_random_class shim_class;
536 /* Interposes a buffering shim atop READER. */
538 insert_shim (struct casereader *reader)
540 size_t value_cnt = casereader_get_value_cnt (reader);
541 casenumber case_cnt = casereader_get_case_cnt (reader);
542 struct shim *b = xmalloc (sizeof *b);
543 b->window = casewindow_create (value_cnt, get_workspace_cases (value_cnt));
544 b->subreader = casereader_create_random (value_cnt, case_cnt,
546 casereader_swap (reader, b->subreader);
547 taint_propagate (casewindow_get_taint (b->window),
548 casereader_get_taint (reader));
549 taint_propagate (casereader_get_taint (b->subreader),
550 casereader_get_taint (reader));
553 /* Ensures that B's window contains at least CASE_CNT cases.
554 Return true if successful, false upon reaching the end of B's
555 subreader or an I/O error. */
557 prime_buffer (struct shim *b, casenumber case_cnt)
559 while (casewindow_get_case_cnt (b->window) < case_cnt)
562 if (!casereader_read (b->subreader, &tmp))
564 casewindow_push_head (b->window, &tmp);
569 /* Reads the case at the given 0-based OFFSET from the front of
570 the window into C. Returns true if successful, false if
571 OFFSET is beyond the end of file or upon I/O error. */
573 shim_read (struct casereader *reader UNUSED, void *b_,
574 casenumber offset, struct ccase *c)
577 return (prime_buffer (b, offset + 1)
578 && casewindow_get_case (b->window, offset, c));
583 shim_destroy (struct casereader *reader UNUSED, void *b_)
586 casewindow_destroy (b->window);
587 casereader_destroy (b->subreader);
591 /* Discards CNT cases from the front of B's window. */
593 shim_advance (struct casereader *reader UNUSED, void *b_, casenumber case_cnt)
596 casewindow_pop_tail (b->window, case_cnt);
599 /* Class for the buffered reader. */
600 static struct casereader_random_class shim_class =