1 /* PSPP - computes sample statistics.
2 Copyright (C) 2007 Free Software Foundation, Inc.
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License as
6 published by the Free Software Foundation; either version 2 of the
7 License, or (at your option) any later version.
9 This program is distributed in the hope that it will be useful, but
10 WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 #include <data/casereader.h>
22 #include <data/casereader-provider.h>
26 #include <data/casewindow.h>
27 #include <data/casewriter.h>
28 #include <data/settings.h>
29 #include <libpspp/assertion.h>
30 #include <libpspp/heap.h>
31 #include <libpspp/taint.h>
38 struct taint *taint; /* Corrupted? */
39 size_t value_cnt; /* Values per case. */
40 casenumber case_cnt; /* Number of cases,
41 CASENUMBER_MAX if unknown. */
42 const struct casereader_class *class; /* Class. */
43 void *aux; /* Auxiliary data for class. */
46 static void insert_shim (struct casereader *);
48 /* Creates a new case in C and reads the next case from READER
49 into it. The caller owns C and must destroy C when its data
50 is no longer needed. Return true if successful, false when
51 cases have been exhausted or upon detection of an I/O error.
52 In the latter case, C is set to the null case.
54 The case returned is effectively consumed: it can never be
55 read again through READER. If this is inconvenient, READER
56 may be cloned in advance with casereader_clone, or
57 casereader_peek may be used instead. */
59 casereader_read (struct casereader *reader, struct ccase *c)
61 if (reader->case_cnt != 0 && reader->class->read (reader, reader->aux, c))
63 assert (case_get_value_cnt (c) == reader->value_cnt);
64 if (reader->case_cnt != CASENUMBER_MAX)
77 Returns false if an I/O error was detected on READER, true
80 casereader_destroy (struct casereader *reader)
85 reader->class->destroy (reader, reader->aux);
86 ok = taint_destroy (reader->taint);
92 /* Returns a clone of READER. READER and its clone may be used
93 to read the same sequence of cases in the same order, barring
96 casereader_clone (const struct casereader *reader_)
98 struct casereader *reader = (struct casereader *) reader_;
99 struct casereader *clone;
100 if ( reader == NULL )
103 if (reader->class->clone == NULL)
104 insert_shim (reader);
105 clone = reader->class->clone (reader, reader->aux);
106 assert (clone != NULL);
107 assert (clone != reader);
111 /* Makes a copy of ORIGINAL into *NEW1 (if NEW1 is non-null) and
112 *NEW2 (if NEW2 is non-null), then destroys ORIGINAL. */
114 casereader_split (struct casereader *original,
115 struct casereader **new1, struct casereader **new2)
117 if (new1 != NULL && new2 != NULL)
119 *new1 = casereader_rename (original);
120 *new2 = casereader_clone (*new1);
122 else if (new1 != NULL)
123 *new1 = casereader_rename (original);
124 else if (new2 != NULL)
125 *new2 = casereader_rename (original);
127 casereader_destroy (original);
130 /* Returns a copy of READER, which is itself destroyed.
131 Useful for taking over ownership of a casereader, to enforce
132 preventing the original owner from accessing the casereader
135 casereader_rename (struct casereader *reader)
137 struct casereader *new = xmemdup (reader, sizeof *reader);
142 /* Exchanges the casereaders referred to by A and B. */
144 casereader_swap (struct casereader *a, struct casereader *b)
148 struct casereader tmp = *a;
154 /* Creates a new case in C and reads the (IDX + 1)'th case from
155 READER into it. The caller owns C and must destroy C when its
156 data is no longer needed. Return true if successful, false
157 when cases have been exhausted or upon detection of an I/O
158 error. In the latter case, C is set to the null case. */
160 casereader_peek (struct casereader *reader, casenumber idx, struct ccase *c)
162 if (idx < reader->case_cnt)
164 if (reader->class->peek == NULL)
165 insert_shim (reader);
166 if (reader->class->peek (reader, reader->aux, idx, c))
168 else if (casereader_error (reader))
169 reader->case_cnt = 0;
171 if (reader->case_cnt > idx)
172 reader->case_cnt = idx;
177 /* Returns true if an I/O error or another hard error has
178 occurred on READER, a clone of READER, or on some object on
179 which READER's data has a dependency, false otherwise. */
181 casereader_error (const struct casereader *reader)
183 return taint_is_tainted (reader->taint);
186 /* Marks READER as having encountered an error.
188 Ordinarily, this function should be called by the
189 implementation of a casereader, not by the casereader's
190 client. Instead, casereader clients should usually ensure
191 that a casereader's error state is correct by using
192 taint_propagate to propagate to the casereader's taint
193 structure, which may be obtained via casereader_get_taint. */
195 casereader_force_error (struct casereader *reader)
197 taint_set_taint (reader->taint);
200 /* Returns READER's associate taint object, for use with
201 taint_propagate and other taint functions. */
203 casereader_get_taint (const struct casereader *reader)
205 return reader->taint;
208 /* Returns the number of cases that will be read by successive
209 calls to casereader_read for READER, assuming that no errors
210 occur. Upon an error condition, the case count drops to 0, so
211 that no more cases can be obtained.
213 Not all casereaders can predict the number of cases that they
214 will produce without actually reading all of them. In that
215 case, this function returns CASENUMBER_MAX. To obtain the
216 actual number of cases in such a casereader, use
217 casereader_count_cases. */
219 casereader_get_case_cnt (struct casereader *reader)
221 return reader->case_cnt;
224 /* Returns the number of cases that will be read by successive
225 calls to casereader_read for READER, assuming that no errors
226 occur. Upon an error condition, the case count drops to 0, so
227 that no more cases can be obtained.
229 For a casereader that cannot predict the number of cases it
230 will produce, this function actually reads (and discards) all
231 of the contents of a clone of READER. Thus, the return value
232 is always correct in the absence of I/O errors. */
234 casereader_count_cases (struct casereader *reader)
236 if (reader->case_cnt == CASENUMBER_MAX)
238 casenumber n_cases = 0;
241 struct casereader *clone = casereader_clone (reader);
243 for (; casereader_read (clone, &c); case_destroy (&c))
246 casereader_destroy (clone);
247 reader->case_cnt = n_cases;
250 return reader->case_cnt;
253 /* Returns the number of struct values in each case in READER. */
255 casereader_get_value_cnt (struct casereader *reader)
257 return reader->value_cnt;
260 /* Copies all the cases in READER to WRITER, propagating errors
263 casereader_transfer (struct casereader *reader, struct casewriter *writer)
267 taint_propagate (casereader_get_taint (reader),
268 casewriter_get_taint (writer));
269 while (casereader_read (reader, &c))
270 casewriter_write (writer, &c);
271 casereader_destroy (reader);
274 /* Creates and returns a new casereader. This function is
275 intended for use by casereader implementations, not by
278 This function is most suited for creating a casereader for a
279 data source that is naturally sequential.
280 casereader_create_random may be more appropriate for a data
281 source that supports random access.
283 Ordinarily, specify a null pointer for TAINT, in which case
284 the new casereader will have a new, unique taint object. If
285 the new casereader should have a clone of an existing taint
286 object, specify that object as TAINT. (This is most commonly
287 useful in an implementation of the "clone" casereader_class
288 function, in which case the cloned casereader should have the
289 same taint object as the original casereader.)
291 VALUE_CNT must be the number of struct values per case read
294 CASE_CNT is an upper limit on the number of cases that
295 casereader_read will return from the casereader in successive
296 calls. Ordinarily, this is the actual number of cases in the
297 data source or CASENUMBER_MAX if the number of cases cannot be
298 predicted in advance.
300 CLASS and AUX are a set of casereader implementation-specific
301 member functions and auxiliary data to pass to those member
302 functions, respectively. */
304 casereader_create_sequential (const struct taint *taint,
305 size_t value_cnt, casenumber case_cnt,
306 const struct casereader_class *class, void *aux)
308 struct casereader *reader = xmalloc (sizeof *reader);
309 reader->taint = taint != NULL ? taint_clone (taint) : taint_create ();
310 reader->value_cnt = value_cnt;
311 reader->case_cnt = case_cnt;
312 reader->class = class;
317 /* Random-access casereader implementation.
319 This is a set of wrappers around casereader_create_sequential
320 and struct casereader_class to make it easy to create
321 efficient casereaders for data sources that natively support
324 /* One clone of a random reader. */
327 struct random_reader_shared *shared; /* Data shared among clones. */
328 struct heap_node heap_node; /* Node in shared data's heap of readers. */
329 casenumber offset; /* Number of cases already read. */
332 /* Returns the random_reader in which the given heap_node is
334 static struct random_reader *
335 random_reader_from_heap_node (const struct heap_node *node)
337 return heap_data (node, struct random_reader, heap_node);
340 /* Data shared among clones of a random reader. */
341 struct random_reader_shared
343 struct heap *readers; /* Heap of struct random_readers. */
344 casenumber min_offset; /* Smallest offset of any random_reader. */
345 const struct casereader_random_class *class;
349 static struct casereader_class random_reader_casereader_class;
351 /* Creates and returns a new random_reader with the given SHARED
352 data and OFFSET. Inserts the new random reader into the
354 static struct random_reader *
355 make_random_reader (struct random_reader_shared *shared, casenumber offset)
357 struct random_reader *br = xmalloc (sizeof *br);
360 heap_insert (shared->readers, &br->heap_node);
364 /* Compares random_readers A and B by offset and returns a
365 strcmp()-like result. */
367 compare_random_readers_by_offset (const struct heap_node *a_,
368 const struct heap_node *b_,
369 const void *aux UNUSED)
371 const struct random_reader *a = random_reader_from_heap_node (a_);
372 const struct random_reader *b = random_reader_from_heap_node (b_);
373 return a->offset < b->offset ? -1 : a->offset > b->offset;
376 /* Creates and returns a new casereader. This function is
377 intended for use by casereader implementations, not by
380 This function is most suited for creating a casereader for a
381 data source that supports random access.
382 casereader_create_sequential is more appropriate for a data
383 source that is naturally sequential.
385 VALUE_CNT must be the number of struct values per case read
388 CASE_CNT is an upper limit on the number of cases that
389 casereader_read will return from the casereader in successive
390 calls. Ordinarily, this is the actual number of cases in the
391 data source or CASENUMBER_MAX if the number of cases cannot be
392 predicted in advance.
394 CLASS and AUX are a set of casereader implementation-specific
395 member functions and auxiliary data to pass to those member
396 functions, respectively. */
398 casereader_create_random (size_t value_cnt, casenumber case_cnt,
399 const struct casereader_random_class *class,
402 struct random_reader_shared *shared = xmalloc (sizeof *shared);
403 shared->readers = heap_create (compare_random_readers_by_offset, NULL);
404 shared->class = class;
406 shared->min_offset = 0;
407 return casereader_create_sequential (NULL, value_cnt, case_cnt,
408 &random_reader_casereader_class,
409 make_random_reader (shared, 0));
412 /* Reassesses the min_offset in SHARED based on the minimum
413 offset in the heap. */
415 advance_random_reader (struct casereader *reader,
416 struct random_reader_shared *shared)
420 old = shared->min_offset;
421 new = random_reader_from_heap_node (heap_minimum (shared->readers))->offset;
425 shared->min_offset = new;
426 shared->class->advance (reader, shared->aux, new - old);
430 /* struct casereader_class "read" function for random reader. */
432 random_reader_read (struct casereader *reader, void *br_, struct ccase *c)
434 struct random_reader *br = br_;
435 struct random_reader_shared *shared = br->shared;
437 if (shared->class->read (reader, shared->aux,
438 br->offset - shared->min_offset, c))
441 heap_changed (shared->readers, &br->heap_node);
442 advance_random_reader (reader, shared);
449 /* struct casereader_class "destroy" function for random
452 random_reader_destroy (struct casereader *reader, void *br_)
454 struct random_reader *br = br_;
455 struct random_reader_shared *shared = br->shared;
457 heap_delete (shared->readers, &br->heap_node);
458 if (heap_is_empty (shared->readers))
460 heap_destroy (shared->readers);
461 shared->class->destroy (reader, shared->aux);
465 advance_random_reader (reader, shared);
470 /* struct casereader_class "clone" function for random reader. */
471 static struct casereader *
472 random_reader_clone (struct casereader *reader, void *br_)
474 struct random_reader *br = br_;
475 struct random_reader_shared *shared = br->shared;
476 return casereader_create_sequential (casereader_get_taint (reader),
477 casereader_get_value_cnt (reader),
478 casereader_get_case_cnt (reader),
479 &random_reader_casereader_class,
480 make_random_reader (shared,
484 /* struct casereader_class "peek" function for random reader. */
486 random_reader_peek (struct casereader *reader, void *br_,
487 casenumber idx, struct ccase *c)
489 struct random_reader *br = br_;
490 struct random_reader_shared *shared = br->shared;
492 return shared->class->read (reader, shared->aux,
493 br->offset - shared->min_offset + idx, c);
496 /* Casereader class for random reader. */
497 static struct casereader_class random_reader_casereader_class =
500 random_reader_destroy,
505 /* Buffering shim for implementing clone and peek operations.
507 The "clone" and "peek" operations aren't implemented by all
508 types of casereaders, but we have to expose a uniform
509 interface anyhow. We do this by interposing a buffering
510 casereader on top of the existing casereader on the first call
511 to "clone" or "peek". The buffering casereader maintains a
512 window of cases that spans the positions of the original
513 casereader and all of its clones (the "clone set"), from the
514 position of the casereader that has read the fewest cases to
515 the position of the casereader that has read the most.
517 Thus, if all of the casereaders in the clone set are at
518 approximately the same position, only a few cases are buffered
519 and there is little inefficiency. If, on the other hand, one
520 casereader is not used to read any cases at all, but another
521 one is used to read all of the cases, the entire contents of
522 the casereader is copied into the buffer. This still might
523 not be so inefficient, given that case data in memory is
524 shared across multiple identical copies, but in the worst case
525 the window implementation will write cases to disk instead of
526 maintaining them in-memory. */
528 /* A buffering shim for a non-clonable or non-peekable
532 struct casewindow *window; /* Window of buffered cases. */
533 struct casereader *subreader; /* Subordinate casereader. */
536 static struct casereader_random_class shim_class;
538 /* Interposes a buffering shim atop READER. */
540 insert_shim (struct casereader *reader)
542 size_t value_cnt = casereader_get_value_cnt (reader);
543 casenumber case_cnt = casereader_get_case_cnt (reader);
544 struct shim *b = xmalloc (sizeof *b);
545 b->window = casewindow_create (value_cnt, get_workspace_cases (value_cnt));
546 b->subreader = casereader_create_random (value_cnt, case_cnt,
548 casereader_swap (reader, b->subreader);
549 taint_propagate (casewindow_get_taint (b->window),
550 casereader_get_taint (reader));
551 taint_propagate (casereader_get_taint (b->subreader),
552 casereader_get_taint (reader));
555 /* Ensures that B's window contains at least CASE_CNT cases.
556 Return true if successful, false upon reaching the end of B's
557 subreader or an I/O error. */
559 prime_buffer (struct shim *b, casenumber case_cnt)
561 while (casewindow_get_case_cnt (b->window) < case_cnt)
564 if (!casereader_read (b->subreader, &tmp))
566 casewindow_push_head (b->window, &tmp);
571 /* Reads the case at the given 0-based OFFSET from the front of
572 the window into C. Returns true if successful, false if
573 OFFSET is beyond the end of file or upon I/O error. */
575 shim_read (struct casereader *reader UNUSED, void *b_,
576 casenumber offset, struct ccase *c)
579 return (prime_buffer (b, offset + 1)
580 && casewindow_get_case (b->window, offset, c));
585 shim_destroy (struct casereader *reader UNUSED, void *b_)
588 casewindow_destroy (b->window);
589 casereader_destroy (b->subreader);
593 /* Discards CNT cases from the front of B's window. */
595 shim_advance (struct casereader *reader UNUSED, void *b_, casenumber case_cnt)
598 casewindow_pop_tail (b->window, case_cnt);
601 /* Class for the buffered reader. */
602 static struct casereader_random_class shim_class =