1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 2007, 2009, 2010 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
19 #include "data/casereader.h"
20 #include "data/casereader-provider.h"
24 #include "data/casereader-shim.h"
25 #include "data/casewriter.h"
26 #include "libpspp/assertion.h"
27 #include "libpspp/heap.h"
28 #include "libpspp/taint.h"
30 #include "gl/xalloc.h"
35 struct taint *taint; /* Corrupted? */
36 struct caseproto *proto; /* Format of contained cases. */
37 casenumber case_cnt; /* Number of cases,
38 CASENUMBER_MAX if unknown. */
39 const struct casereader_class *class; /* Class. */
40 void *aux; /* Auxiliary data for class. */
43 /* Reads and returns the next case from READER. The caller owns
44 the returned case and must call case_unref on it when its data
45 is no longer needed. Returns a null pointer if cases have
46 been exhausted or upon detection of an I/O error.
48 The case returned is effectively consumed: it can never be
49 read again through READER. If this is inconvenient, READER
50 may be cloned in advance with casereader_clone, or
51 casereader_peek may be used instead. */
53 casereader_read (struct casereader *reader)
55 if (reader->case_cnt != 0)
57 /* ->read may use casereader_swap to replace itself by
58 another reader and then delegate to that reader by
59 recursively calling casereader_read. Currently only
60 lazy_casereader does this and, with luck, nothing else
63 To allow this to work, however, we must decrement
64 case_cnt before calling ->read. If we decremented
65 case_cnt after calling ->read, then this would actually
66 drop two cases from case_cnt instead of one, and we'd
67 lose the last case in the casereader. */
69 if (reader->case_cnt != CASENUMBER_MAX)
71 c = reader->class->read (reader, reader->aux);
74 size_t n_widths UNUSED = caseproto_get_n_widths (reader->proto);
75 assert (case_get_value_cnt (c) >= n_widths);
76 expensive_assert (caseproto_equal (case_get_proto (c), 0,
77 reader->proto, 0, n_widths));
86 Returns false if an I/O error was detected on READER, true
89 casereader_destroy (struct casereader *reader)
94 reader->class->destroy (reader, reader->aux);
95 ok = taint_destroy (reader->taint);
96 caseproto_unref (reader->proto);
102 /* Returns a clone of READER. READER and its clone may be used
103 to read the same sequence of cases in the same order, barring
106 casereader_clone (const struct casereader *reader_)
108 struct casereader *reader = CONST_CAST (struct casereader *, reader_);
109 struct casereader *clone;
110 if ( reader == NULL )
113 if (reader->class->clone == NULL)
114 casereader_shim_insert (reader);
115 clone = reader->class->clone (reader, reader->aux);
116 assert (clone != NULL);
117 assert (clone != reader);
121 /* Makes a copy of ORIGINAL into *NEW1 (if NEW1 is non-null) and
122 *NEW2 (if NEW2 is non-null), then destroys ORIGINAL. */
124 casereader_split (struct casereader *original,
125 struct casereader **new1, struct casereader **new2)
127 if (new1 != NULL && new2 != NULL)
129 *new1 = casereader_rename (original);
130 *new2 = casereader_clone (*new1);
132 else if (new1 != NULL)
133 *new1 = casereader_rename (original);
134 else if (new2 != NULL)
135 *new2 = casereader_rename (original);
137 casereader_destroy (original);
140 /* Returns a copy of READER, which is itself destroyed.
141 Useful for taking over ownership of a casereader, to enforce
142 preventing the original owner from accessing the casereader
145 casereader_rename (struct casereader *reader)
147 struct casereader *new = xmemdup (reader, sizeof *reader);
152 /* Exchanges the casereaders referred to by A and B. */
154 casereader_swap (struct casereader *a, struct casereader *b)
158 struct casereader tmp = *a;
164 /* Reads and returns the (IDX + 1)'th case from READER. The
165 caller owns the returned case and must call case_unref on it
166 when it is no longer needed. Returns a null pointer if cases
167 have been exhausted or upon detection of an I/O error. */
169 casereader_peek (struct casereader *reader, casenumber idx)
171 if (idx < reader->case_cnt)
174 if (reader->class->peek == NULL)
175 casereader_shim_insert (reader);
176 c = reader->class->peek (reader, reader->aux, idx);
179 else if (casereader_error (reader))
180 reader->case_cnt = 0;
182 if (reader->case_cnt > idx)
183 reader->case_cnt = idx;
187 /* Returns true if no cases remain to be read from READER, or if
188 an error has occurred on READER. (A return value of false
189 does *not* mean that the next call to casereader_peek or
190 casereader_read will return true, because an error can occur
193 casereader_is_empty (struct casereader *reader)
195 if (reader->case_cnt == 0)
199 struct ccase *c = casereader_peek (reader, 0);
210 /* Returns true if an I/O error or another hard error has
211 occurred on READER, a clone of READER, or on some object on
212 which READER's data has a dependency, false otherwise. */
214 casereader_error (const struct casereader *reader)
216 return taint_is_tainted (reader->taint);
219 /* Marks READER as having encountered an error.
221 Ordinarily, this function should be called by the
222 implementation of a casereader, not by the casereader's
223 client. Instead, casereader clients should usually ensure
224 that a casereader's error state is correct by using
225 taint_propagate to propagate to the casereader's taint
226 structure, which may be obtained via casereader_get_taint. */
228 casereader_force_error (struct casereader *reader)
230 taint_set_taint (reader->taint);
233 /* Returns READER's associate taint object, for use with
234 taint_propagate and other taint functions. */
236 casereader_get_taint (const struct casereader *reader)
238 return reader->taint;
241 /* Returns the number of cases that will be read by successive
242 calls to casereader_read for READER, assuming that no errors
243 occur. Upon an error condition, the case count drops to 0, so
244 that no more cases can be obtained.
246 Not all casereaders can predict the number of cases that they
247 will produce without actually reading all of them. In that
248 case, this function returns CASENUMBER_MAX. To obtain the
249 actual number of cases in such a casereader, use
250 casereader_count_cases. */
252 casereader_get_case_cnt (struct casereader *reader)
254 return reader->case_cnt;
258 casereader_count_cases__ (const struct casereader *reader,
259 casenumber max_cases)
261 struct casereader *clone;
264 clone = casereader_clone (reader);
265 n_cases = casereader_advance (clone, max_cases);
266 casereader_destroy (clone);
271 /* Returns the number of cases that will be read by successive
272 calls to casereader_read for READER, assuming that no errors
273 occur. Upon an error condition, the case count drops to 0, so
274 that no more cases can be obtained.
276 For a casereader that cannot predict the number of cases it
277 will produce, this function actually reads (and discards) all
278 of the contents of a clone of READER. Thus, the return value
279 is always correct in the absence of I/O errors. */
281 casereader_count_cases (const struct casereader *reader)
283 if (reader->case_cnt == CASENUMBER_MAX)
285 struct casereader *reader_rw = CONST_CAST (struct casereader *, reader);
286 reader_rw->case_cnt = casereader_count_cases__ (reader, CASENUMBER_MAX);
288 return reader->case_cnt;
291 /* Truncates READER to at most N cases. */
293 casereader_truncate (struct casereader *reader, casenumber n)
295 /* This could be optimized, if it ever becomes too expensive, by adding a
296 "max_cases" member to struct casereader. We could also add a "truncate"
297 function to the casereader implementation, to allow the casereader to
298 throw away data that cannot ever be read. */
299 if (reader->case_cnt == CASENUMBER_MAX)
300 reader->case_cnt = casereader_count_cases__ (reader, n);
301 if (reader->case_cnt > n)
302 reader->case_cnt = n;
305 /* Returns the prototype for the cases in READER. The caller
306 must not unref the returned prototype. */
307 const struct caseproto *
308 casereader_get_proto (const struct casereader *reader)
310 return reader->proto;
313 /* Skips past N cases in READER, stopping when the last case in
314 READER has been read or on an input error. Returns the number
315 of cases successfully skipped. */
317 casereader_advance (struct casereader *reader, casenumber n)
321 for (i = 0; i < n; i++)
323 struct ccase *c = casereader_read (reader);
333 /* Copies all the cases in READER to WRITER, propagating errors
334 appropriately. READER is destroyed by this function */
336 casereader_transfer (struct casereader *reader, struct casewriter *writer)
340 taint_propagate (casereader_get_taint (reader),
341 casewriter_get_taint (writer));
342 while ((c = casereader_read (reader)) != NULL)
343 casewriter_write (writer, c);
344 casereader_destroy (reader);
347 /* Creates and returns a new casereader. This function is
348 intended for use by casereader implementations, not by
351 This function is most suited for creating a casereader for a
352 data source that is naturally sequential.
353 casereader_create_random may be more appropriate for a data
354 source that supports random access.
356 Ordinarily, specify a null pointer for TAINT, in which case
357 the new casereader will have a new, unique taint object. If
358 the new casereader should have a clone of an existing taint
359 object, specify that object as TAINT. (This is most commonly
360 useful in an implementation of the "clone" casereader_class
361 function, in which case the cloned casereader should have the
362 same taint object as the original casereader.)
364 PROTO must be the prototype for the cases that may be read
365 from the casereader. The caller retains its reference to
368 CASE_CNT is an upper limit on the number of cases that
369 casereader_read will return from the casereader in successive
370 calls. Ordinarily, this is the actual number of cases in the
371 data source or CASENUMBER_MAX if the number of cases cannot be
372 predicted in advance.
374 CLASS and AUX are a set of casereader implementation-specific
375 member functions and auxiliary data to pass to those member
376 functions, respectively. */
378 casereader_create_sequential (const struct taint *taint,
379 const struct caseproto *proto,
381 const struct casereader_class *class, void *aux)
383 struct casereader *reader = xmalloc (sizeof *reader);
384 reader->taint = taint != NULL ? taint_clone (taint) : taint_create ();
385 reader->proto = caseproto_ref (proto);
386 reader->case_cnt = case_cnt;
387 reader->class = class;
392 /* If READER is a casereader of the given CLASS, returns its
393 associated auxiliary data; otherwise, returns a null pointer.
395 This function is intended for use from casereader
396 implementations, not by casereader users. Even within
397 casereader implementations, its usefulness is quite limited,
398 for at least two reasons. First, every casereader member
399 function already receives a pointer to the casereader's
400 auxiliary data. Second, a casereader's class can change
401 (through a call to casereader_swap) and this is in practice
402 quite common (e.g. any call to casereader_clone on a
403 casereader that does not directly support clone will cause the
404 casereader to be replaced by a shim caseader). */
406 casereader_dynamic_cast (struct casereader *reader,
407 const struct casereader_class *class)
409 return reader->class == class ? reader->aux : NULL;
412 /* Random-access casereader implementation.
414 This is a set of wrappers around casereader_create_sequential
415 and struct casereader_class to make it easy to create
416 efficient casereaders for data sources that natively support
419 /* One clone of a random reader. */
422 struct random_reader_shared *shared; /* Data shared among clones. */
423 struct heap_node heap_node; /* Node in shared data's heap of readers. */
424 casenumber offset; /* Number of cases already read. */
427 /* Returns the random_reader in which the given heap_node is
429 static struct random_reader *
430 random_reader_from_heap_node (const struct heap_node *node)
432 return heap_data (node, struct random_reader, heap_node);
435 /* Data shared among clones of a random reader. */
436 struct random_reader_shared
438 struct heap *readers; /* Heap of struct random_readers. */
439 casenumber min_offset; /* Smallest offset of any random_reader. */
440 const struct casereader_random_class *class;
444 static const struct casereader_class random_reader_casereader_class;
446 /* Creates and returns a new random_reader with the given SHARED
447 data and OFFSET. Inserts the new random reader into the
449 static struct random_reader *
450 make_random_reader (struct random_reader_shared *shared, casenumber offset)
452 struct random_reader *br = xmalloc (sizeof *br);
455 heap_insert (shared->readers, &br->heap_node);
459 /* Compares random_readers A and B by offset and returns a
460 strcmp()-like result. */
462 compare_random_readers_by_offset (const struct heap_node *a_,
463 const struct heap_node *b_,
464 const void *aux UNUSED)
466 const struct random_reader *a = random_reader_from_heap_node (a_);
467 const struct random_reader *b = random_reader_from_heap_node (b_);
468 return a->offset < b->offset ? -1 : a->offset > b->offset;
471 /* Creates and returns a new casereader. This function is
472 intended for use by casereader implementations, not by
475 This function is most suited for creating a casereader for a
476 data source that supports random access.
477 casereader_create_sequential is more appropriate for a data
478 source that is naturally sequential.
480 PROTO must be the prototype for the cases that may be read
481 from the casereader. The caller retains its reference to
484 CASE_CNT is an upper limit on the number of cases that
485 casereader_read will return from the casereader in successive
486 calls. Ordinarily, this is the actual number of cases in the
487 data source or CASENUMBER_MAX if the number of cases cannot be
488 predicted in advance.
490 CLASS and AUX are a set of casereader implementation-specific
491 member functions and auxiliary data to pass to those member
492 functions, respectively. */
494 casereader_create_random (const struct caseproto *proto, casenumber case_cnt,
495 const struct casereader_random_class *class,
498 struct random_reader_shared *shared = xmalloc (sizeof *shared);
499 shared->readers = heap_create (compare_random_readers_by_offset, NULL);
500 shared->class = class;
502 shared->min_offset = 0;
503 return casereader_create_sequential (NULL, proto, case_cnt,
504 &random_reader_casereader_class,
505 make_random_reader (shared, 0));
508 /* Reassesses the min_offset in SHARED based on the minimum
509 offset in the heap. */
511 advance_random_reader (struct casereader *reader,
512 struct random_reader_shared *shared)
516 old = shared->min_offset;
517 new = random_reader_from_heap_node (heap_minimum (shared->readers))->offset;
521 shared->min_offset = new;
522 shared->class->advance (reader, shared->aux, new - old);
526 /* struct casereader_class "read" function for random reader. */
527 static struct ccase *
528 random_reader_read (struct casereader *reader, void *br_)
530 struct random_reader *br = br_;
531 struct random_reader_shared *shared = br->shared;
532 struct ccase *c = shared->class->read (reader, shared->aux,
533 br->offset - shared->min_offset);
537 heap_changed (shared->readers, &br->heap_node);
538 advance_random_reader (reader, shared);
543 /* struct casereader_class "destroy" function for random
546 random_reader_destroy (struct casereader *reader, void *br_)
548 struct random_reader *br = br_;
549 struct random_reader_shared *shared = br->shared;
551 heap_delete (shared->readers, &br->heap_node);
552 if (heap_is_empty (shared->readers))
554 heap_destroy (shared->readers);
555 shared->class->destroy (reader, shared->aux);
559 advance_random_reader (reader, shared);
564 /* struct casereader_class "clone" function for random reader. */
565 static struct casereader *
566 random_reader_clone (struct casereader *reader, void *br_)
568 struct random_reader *br = br_;
569 struct random_reader_shared *shared = br->shared;
570 return casereader_create_sequential (casereader_get_taint (reader),
572 casereader_get_case_cnt (reader),
573 &random_reader_casereader_class,
574 make_random_reader (shared,
578 /* struct casereader_class "peek" function for random reader. */
579 static struct ccase *
580 random_reader_peek (struct casereader *reader, void *br_, casenumber idx)
582 struct random_reader *br = br_;
583 struct random_reader_shared *shared = br->shared;
585 return shared->class->read (reader, shared->aux,
586 br->offset - shared->min_offset + idx);
589 /* Casereader class for random reader. */
590 static const struct casereader_class random_reader_casereader_class =
593 random_reader_destroy,
599 static const struct casereader_class casereader_null_class;
601 /* Returns a casereader with no cases. The casereader has the prototype
602 specified by PROTO. PROTO may be specified as a null pointer, in which case
603 the casereader has no variables. */
605 casereader_create_empty (const struct caseproto *proto_)
607 struct casereader *reader;
608 struct caseproto *proto;
610 proto = proto_ != NULL ? caseproto_ref (proto_) : caseproto_create ();
611 reader = casereader_create_sequential (NULL, proto, 0,
612 &casereader_null_class, NULL);
613 caseproto_unref (proto);
618 static struct ccase *
619 casereader_null_read (struct casereader *reader UNUSED, void *aux UNUSED)
625 casereader_null_destroy (struct casereader *reader UNUSED, void *aux UNUSED)
630 static const struct casereader_class casereader_null_class =
632 casereader_null_read,
633 casereader_null_destroy,