1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 2007, 2009, 2010, 2013 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
19 #include "data/casereader.h"
20 #include "data/casereader-provider.h"
24 #include "data/casereader-shim.h"
25 #include "data/casewriter.h"
26 #include "libpspp/assertion.h"
27 #include "libpspp/heap.h"
28 #include "libpspp/taint.h"
30 #include "gl/xalloc.h"
35 struct taint *taint; /* Corrupted? */
36 struct caseproto *proto; /* Format of contained cases. */
37 casenumber n_cases; /* Number of cases,
38 CASENUMBER_MAX if unknown. */
39 const struct casereader_class *class; /* Class. */
40 void *aux; /* Auxiliary data for class. */
43 /* Reads and returns the next case from READER. The caller owns
44 the returned case and must call case_unref on it when its data
45 is no longer needed. Returns a null pointer if cases have
46 been exhausted or upon detection of an I/O error.
48 The case returned is effectively consumed: it can never be
49 read again through READER. If this is inconvenient, READER
50 may be cloned in advance with casereader_clone, or
51 casereader_peek may be used instead. */
53 casereader_read (struct casereader *reader)
55 if (reader->n_cases != 0)
57 /* ->read may use casereader_swap to replace itself by
58 another reader and then delegate to that reader by
59 recursively calling casereader_read. Currently only
60 lazy_casereader does this and, with luck, nothing else
63 To allow this to work, however, we must decrement
64 n_cases before calling ->read. If we decremented
65 n_cases after calling ->read, then this would actually
66 drop two cases from n_cases instead of one, and we'd
67 lose the last case in the casereader. */
69 if (reader->n_cases != CASENUMBER_MAX)
71 c = reader->class->read (reader, reader->aux);
74 size_t n_widths UNUSED = caseproto_get_n_widths (reader->proto);
75 assert (case_get_n_values (c) >= n_widths);
76 expensive_assert (caseproto_equal (case_get_proto (c), 0,
77 reader->proto, 0, n_widths));
86 Returns false if an I/O error was detected on READER, true
89 casereader_destroy (struct casereader *reader)
94 reader->class->destroy (reader, reader->aux);
95 ok = taint_destroy (reader->taint);
96 caseproto_unref (reader->proto);
102 /* Returns a clone of READER. READER and its clone may be used
103 to read the same sequence of cases in the same order, barring
106 casereader_clone (const struct casereader *reader_)
108 struct casereader *reader = CONST_CAST (struct casereader *, reader_);
109 struct casereader *clone;
113 if (reader->class->clone == NULL)
114 casereader_shim_insert (reader);
115 clone = reader->class->clone (reader, reader->aux);
116 assert (clone != NULL);
117 assert (clone != reader);
121 /* Returns a copy of READER, which is itself destroyed.
122 Useful for taking over ownership of a casereader, to enforce
123 preventing the original owner from accessing the casereader
126 casereader_rename (struct casereader *reader)
128 struct casereader *new = xmemdup (reader, sizeof *reader);
133 /* Exchanges the casereaders referred to by A and B. */
135 casereader_swap (struct casereader *a, struct casereader *b)
139 struct casereader tmp = *a;
145 /* Reads and returns the (IDX + 1)'th case from READER. The
146 caller owns the returned case and must call case_unref on it
147 when it is no longer needed. Returns a null pointer if cases
148 have been exhausted or upon detection of an I/O error. */
150 casereader_peek (struct casereader *reader, casenumber idx)
152 if (idx < reader->n_cases)
155 if (reader->class->peek == NULL)
156 casereader_shim_insert (reader);
157 c = reader->class->peek (reader, reader->aux, idx);
160 else if (casereader_error (reader))
163 if (reader->n_cases > idx)
164 reader->n_cases = idx;
168 /* Returns true if no cases remain to be read from READER, or if
169 an error has occurred on READER. (A return value of false
170 does *not* mean that the next call to casereader_peek or
171 casereader_read will return true, because an error can occur
174 casereader_is_empty (struct casereader *reader)
176 if (reader->n_cases == 0)
180 struct ccase *c = casereader_peek (reader, 0);
191 /* Returns true if an I/O error or another hard error has
192 occurred on READER, a clone of READER, or on some object on
193 which READER's data has a dependency, false otherwise. */
195 casereader_error (const struct casereader *reader)
197 return taint_is_tainted (reader->taint);
200 /* Marks READER as having encountered an error.
202 Ordinarily, this function should be called by the
203 implementation of a casereader, not by the casereader's
204 client. Instead, casereader clients should usually ensure
205 that a casereader's error state is correct by using
206 taint_propagate to propagate to the casereader's taint
207 structure, which may be obtained via casereader_get_taint. */
209 casereader_force_error (struct casereader *reader)
211 taint_set_taint (reader->taint);
214 /* Returns READER's associate taint object, for use with
215 taint_propagate and other taint functions. */
217 casereader_get_taint (const struct casereader *reader)
219 return reader->taint;
222 /* Returns the number of cases that will be read by successive
223 calls to casereader_read for READER, assuming that no errors
224 occur. Upon an error condition, the case count drops to 0, so
225 that no more cases can be obtained.
227 Not all casereaders can predict the number of cases that they
228 will produce without actually reading all of them. In that
229 case, this function returns CASENUMBER_MAX. To obtain the
230 actual number of cases in such a casereader, use
231 casereader_count_cases. */
233 casereader_get_n_cases (struct casereader *reader)
235 return reader->n_cases;
239 casereader_count_cases__ (const struct casereader *reader,
240 casenumber max_cases)
242 struct casereader *clone;
245 clone = casereader_clone (reader);
246 n_cases = casereader_advance (clone, max_cases);
247 casereader_destroy (clone);
252 /* Returns the number of cases that will be read by successive
253 calls to casereader_read for READER, assuming that no errors
254 occur. Upon an error condition, the case count drops to 0, so
255 that no more cases can be obtained.
257 For a casereader that cannot predict the number of cases it
258 will produce, this function actually reads (and discards) all
259 of the contents of a clone of READER. Thus, the return value
260 is always correct in the absence of I/O errors. */
262 casereader_count_cases (const struct casereader *reader)
264 if (reader->n_cases == CASENUMBER_MAX)
266 struct casereader *reader_rw = CONST_CAST (struct casereader *, reader);
267 reader_rw->n_cases = casereader_count_cases__ (reader, CASENUMBER_MAX);
269 return reader->n_cases;
272 /* Truncates READER to at most N cases. */
274 casereader_truncate (struct casereader *reader, casenumber n)
276 /* This could be optimized, if it ever becomes too expensive, by adding a
277 "max_cases" member to struct casereader. We could also add a "truncate"
278 function to the casereader implementation, to allow the casereader to
279 throw away data that cannot ever be read. */
280 if (reader->n_cases == CASENUMBER_MAX)
281 reader->n_cases = casereader_count_cases__ (reader, n);
282 if (reader->n_cases > n)
286 /* Returns the prototype for the cases in READER. The caller
287 must not unref the returned prototype. */
288 const struct caseproto *
289 casereader_get_proto (const struct casereader *reader)
291 return reader->proto;
294 /* Skips past N cases in READER, stopping when the last case in
295 READER has been read or on an input error. Returns the number
296 of cases successfully skipped. */
298 casereader_advance (struct casereader *reader, casenumber n)
302 for (i = 0; i < n; i++)
304 struct ccase *c = casereader_read (reader);
314 /* Copies all the cases in READER to WRITER, propagating errors
315 appropriately. READER is destroyed by this function */
317 casereader_transfer (struct casereader *reader, struct casewriter *writer)
321 taint_propagate (casereader_get_taint (reader),
322 casewriter_get_taint (writer));
323 while ((c = casereader_read (reader)) != NULL)
324 casewriter_write (writer, c);
325 casereader_destroy (reader);
328 /* Creates and returns a new casereader. This function is
329 intended for use by casereader implementations, not by
332 This function is most suited for creating a casereader for a
333 data source that is naturally sequential.
334 casereader_create_random may be more appropriate for a data
335 source that supports random access.
337 Ordinarily, specify a null pointer for TAINT, in which case
338 the new casereader will have a new, unique taint object. If
339 the new casereader should have a clone of an existing taint
340 object, specify that object as TAINT. (This is most commonly
341 useful in an implementation of the "clone" casereader_class
342 function, in which case the cloned casereader should have the
343 same taint object as the original casereader.)
345 PROTO must be the prototype for the cases that may be read
346 from the casereader. The caller retains its reference to
349 CASE_CNT is an upper limit on the number of cases that
350 casereader_read will return from the casereader in successive
351 calls. Ordinarily, this is the actual number of cases in the
352 data source or CASENUMBER_MAX if the number of cases cannot be
353 predicted in advance.
355 CLASS and AUX are a set of casereader implementation-specific
356 member functions and auxiliary data to pass to those member
357 functions, respectively. */
359 casereader_create_sequential (const struct taint *taint,
360 const struct caseproto *proto,
362 const struct casereader_class *class, void *aux)
364 struct casereader *reader = xmalloc (sizeof *reader);
365 reader->taint = taint != NULL ? taint_clone (taint) : taint_create ();
366 reader->proto = caseproto_ref (proto);
367 reader->n_cases = n_cases;
368 reader->class = class;
373 /* If READER is a casereader of the given CLASS, returns its
374 associated auxiliary data; otherwise, returns a null pointer.
376 This function is intended for use from casereader
377 implementations, not by casereader users. Even within
378 casereader implementations, its usefulness is quite limited,
379 for at least two reasons. First, every casereader member
380 function already receives a pointer to the casereader's
381 auxiliary data. Second, a casereader's class can change
382 (through a call to casereader_swap) and this is in practice
383 quite common (e.g. any call to casereader_clone on a
384 casereader that does not directly support clone will cause the
385 casereader to be replaced by a shim caseader). */
387 casereader_dynamic_cast (struct casereader *reader,
388 const struct casereader_class *class)
390 return reader->class == class ? reader->aux : NULL;
393 /* Random-access casereader implementation.
395 This is a set of wrappers around casereader_create_sequential
396 and struct casereader_class to make it easy to create
397 efficient casereaders for data sources that natively support
400 /* One clone of a random reader. */
403 struct random_reader_shared *shared; /* Data shared among clones. */
404 struct heap_node heap_node; /* Node in shared data's heap of readers. */
405 casenumber offset; /* Number of cases already read. */
408 /* Returns the random_reader in which the given heap_node is
410 static struct random_reader *
411 random_reader_from_heap_node (const struct heap_node *node)
413 return heap_data (node, struct random_reader, heap_node);
416 /* Data shared among clones of a random reader. */
417 struct random_reader_shared
419 struct heap *readers; /* Heap of struct random_readers. */
420 casenumber min_offset; /* Smallest offset of any random_reader. */
421 const struct casereader_random_class *class;
425 static const struct casereader_class random_reader_casereader_class;
427 /* Creates and returns a new random_reader with the given SHARED
428 data and OFFSET. Inserts the new random reader into the
430 static struct random_reader *
431 make_random_reader (struct random_reader_shared *shared, casenumber offset)
433 struct random_reader *br = xmalloc (sizeof *br);
436 heap_insert (shared->readers, &br->heap_node);
440 /* Compares random_readers A and B by offset and returns a
441 strcmp()-like result. */
443 compare_random_readers_by_offset (const struct heap_node *a_,
444 const struct heap_node *b_,
445 const void *aux UNUSED)
447 const struct random_reader *a = random_reader_from_heap_node (a_);
448 const struct random_reader *b = random_reader_from_heap_node (b_);
449 return a->offset < b->offset ? -1 : a->offset > b->offset;
452 /* Creates and returns a new casereader. This function is
453 intended for use by casereader implementations, not by
456 This function is most suited for creating a casereader for a
457 data source that supports random access.
458 casereader_create_sequential is more appropriate for a data
459 source that is naturally sequential.
461 PROTO must be the prototype for the cases that may be read
462 from the casereader. The caller retains its reference to
465 CASE_CNT is an upper limit on the number of cases that
466 casereader_read will return from the casereader in successive
467 calls. Ordinarily, this is the actual number of cases in the
468 data source or CASENUMBER_MAX if the number of cases cannot be
469 predicted in advance.
471 CLASS and AUX are a set of casereader implementation-specific
472 member functions and auxiliary data to pass to those member
473 functions, respectively. */
475 casereader_create_random (const struct caseproto *proto, casenumber n_cases,
476 const struct casereader_random_class *class,
479 struct random_reader_shared *shared = xmalloc (sizeof *shared);
480 shared->readers = heap_create (compare_random_readers_by_offset, NULL);
481 shared->class = class;
483 shared->min_offset = 0;
484 return casereader_create_sequential (NULL, proto, n_cases,
485 &random_reader_casereader_class,
486 make_random_reader (shared, 0));
489 /* Reassesses the min_offset in SHARED based on the minimum
490 offset in the heap. */
492 advance_random_reader (struct casereader *reader,
493 struct random_reader_shared *shared)
497 old = shared->min_offset;
498 new = random_reader_from_heap_node (heap_minimum (shared->readers))->offset;
502 shared->min_offset = new;
503 shared->class->advance (reader, shared->aux, new - old);
507 /* struct casereader_class "read" function for random reader. */
508 static struct ccase *
509 random_reader_read (struct casereader *reader, void *br_)
511 struct random_reader *br = br_;
512 struct random_reader_shared *shared = br->shared;
513 struct ccase *c = shared->class->read (reader, shared->aux,
514 br->offset - shared->min_offset);
518 heap_changed (shared->readers, &br->heap_node);
519 advance_random_reader (reader, shared);
524 /* struct casereader_class "destroy" function for random
527 random_reader_destroy (struct casereader *reader, void *br_)
529 struct random_reader *br = br_;
530 struct random_reader_shared *shared = br->shared;
532 heap_delete (shared->readers, &br->heap_node);
533 if (heap_is_empty (shared->readers))
535 heap_destroy (shared->readers);
536 shared->class->destroy (reader, shared->aux);
540 advance_random_reader (reader, shared);
545 /* struct casereader_class "clone" function for random reader. */
546 static struct casereader *
547 random_reader_clone (struct casereader *reader, void *br_)
549 struct random_reader *br = br_;
550 struct random_reader_shared *shared = br->shared;
551 return casereader_create_sequential (casereader_get_taint (reader),
553 casereader_get_n_cases (reader),
554 &random_reader_casereader_class,
555 make_random_reader (shared,
559 /* struct casereader_class "peek" function for random reader. */
560 static struct ccase *
561 random_reader_peek (struct casereader *reader, void *br_, casenumber idx)
563 struct random_reader *br = br_;
564 struct random_reader_shared *shared = br->shared;
566 return shared->class->read (reader, shared->aux,
567 br->offset - shared->min_offset + idx);
570 /* Casereader class for random reader. */
571 static const struct casereader_class random_reader_casereader_class =
574 random_reader_destroy,
580 static const struct casereader_class casereader_null_class;
582 /* Returns a casereader with no cases. The casereader has the prototype
583 specified by PROTO. PROTO may be specified as a null pointer, in which case
584 the casereader has no variables. */
586 casereader_create_empty (const struct caseproto *proto_)
588 struct casereader *reader;
589 struct caseproto *proto;
591 proto = proto_ != NULL ? caseproto_ref (proto_) : caseproto_create ();
592 reader = casereader_create_sequential (NULL, proto, 0,
593 &casereader_null_class, NULL);
594 caseproto_unref (proto);
599 static struct ccase *
600 casereader_null_read (struct casereader *reader UNUSED, void *aux UNUSED)
606 casereader_null_destroy (struct casereader *reader UNUSED, void *aux UNUSED)
611 static const struct casereader_class casereader_null_class =
613 casereader_null_read,
614 casereader_null_destroy,