1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 2007, 2009, 2010, 2013 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
19 #include "data/casereader.h"
20 #include "data/casereader-provider.h"
24 #include "data/casereader-shim.h"
25 #include "data/casewriter.h"
26 #include "libpspp/assertion.h"
27 #include "libpspp/heap.h"
28 #include "libpspp/taint.h"
30 #include "gl/xalloc.h"
35 struct taint *taint; /* Corrupted? */
36 struct caseproto *proto; /* Format of contained cases. */
37 casenumber case_cnt; /* Number of cases,
38 CASENUMBER_MAX if unknown. */
39 const struct casereader_class *class; /* Class. */
40 void *aux; /* Auxiliary data for class. */
43 /* Reads and returns the next case from READER. The caller owns
44 the returned case and must call case_unref on it when its data
45 is no longer needed. Returns a null pointer if cases have
46 been exhausted or upon detection of an I/O error.
48 The case returned is effectively consumed: it can never be
49 read again through READER. If this is inconvenient, READER
50 may be cloned in advance with casereader_clone, or
51 casereader_peek may be used instead. */
53 casereader_read (struct casereader *reader)
55 if (reader->case_cnt != 0)
57 /* ->read may use casereader_swap to replace itself by
58 another reader and then delegate to that reader by
59 recursively calling casereader_read. Currently only
60 lazy_casereader does this and, with luck, nothing else
63 To allow this to work, however, we must decrement
64 case_cnt before calling ->read. If we decremented
65 case_cnt after calling ->read, then this would actually
66 drop two cases from case_cnt instead of one, and we'd
67 lose the last case in the casereader. */
69 if (reader->case_cnt != CASENUMBER_MAX)
71 c = reader->class->read (reader, reader->aux);
74 size_t n_widths UNUSED = caseproto_get_n_widths (reader->proto);
75 assert (case_get_value_cnt (c) >= n_widths);
76 expensive_assert (caseproto_equal (case_get_proto (c), 0,
77 reader->proto, 0, n_widths));
86 Returns false if an I/O error was detected on READER, true
89 casereader_destroy (struct casereader *reader)
94 reader->class->destroy (reader, reader->aux);
95 ok = taint_destroy (reader->taint);
96 caseproto_unref (reader->proto);
102 /* Returns a clone of READER. READER and its clone may be used
103 to read the same sequence of cases in the same order, barring
106 casereader_clone (const struct casereader *reader_)
108 struct casereader *reader = CONST_CAST (struct casereader *, reader_);
109 struct casereader *clone;
110 if ( reader == NULL )
113 if (reader->class->clone == NULL)
114 casereader_shim_insert (reader);
115 clone = reader->class->clone (reader, reader->aux);
116 assert (clone != NULL);
117 assert (clone != reader);
121 /* Returns a copy of READER, which is itself destroyed.
122 Useful for taking over ownership of a casereader, to enforce
123 preventing the original owner from accessing the casereader
126 casereader_rename (struct casereader *reader)
128 struct casereader *new = xmemdup (reader, sizeof *reader);
133 /* Exchanges the casereaders referred to by A and B. */
135 casereader_swap (struct casereader *a, struct casereader *b)
139 struct casereader tmp = *a;
145 /* Reads and returns the (IDX + 1)'th case from READER. The
146 caller owns the returned case and must call case_unref on it
147 when it is no longer needed. Returns a null pointer if cases
148 have been exhausted or upon detection of an I/O error. */
150 casereader_peek (struct casereader *reader, casenumber idx)
152 if (idx < reader->case_cnt)
155 if (reader->class->peek == NULL)
156 casereader_shim_insert (reader);
157 c = reader->class->peek (reader, reader->aux, idx);
160 else if (casereader_error (reader))
161 reader->case_cnt = 0;
163 if (reader->case_cnt > idx)
164 reader->case_cnt = idx;
168 /* Returns true if no cases remain to be read from READER, or if
169 an error has occurred on READER. (A return value of false
170 does *not* mean that the next call to casereader_peek or
171 casereader_read will return true, because an error can occur
174 casereader_is_empty (struct casereader *reader)
176 if (reader->case_cnt == 0)
180 struct ccase *c = casereader_peek (reader, 0);
191 /* Returns true if an I/O error or another hard error has
192 occurred on READER, a clone of READER, or on some object on
193 which READER's data has a dependency, false otherwise. */
195 casereader_error (const struct casereader *reader)
197 return taint_is_tainted (reader->taint);
200 /* Marks READER as having encountered an error.
202 Ordinarily, this function should be called by the
203 implementation of a casereader, not by the casereader's
204 client. Instead, casereader clients should usually ensure
205 that a casereader's error state is correct by using
206 taint_propagate to propagate to the casereader's taint
207 structure, which may be obtained via casereader_get_taint. */
209 casereader_force_error (struct casereader *reader)
211 taint_set_taint (reader->taint);
214 /* Returns READER's associate taint object, for use with
215 taint_propagate and other taint functions. */
217 casereader_get_taint (const struct casereader *reader)
219 return reader->taint;
222 /* Returns the number of cases that will be read by successive
223 calls to casereader_read for READER, assuming that no errors
224 occur. Upon an error condition, the case count drops to 0, so
225 that no more cases can be obtained.
227 Not all casereaders can predict the number of cases that they
228 will produce without actually reading all of them. In that
229 case, this function returns CASENUMBER_MAX. To obtain the
230 actual number of cases in such a casereader, use
231 casereader_count_cases. */
233 casereader_get_case_cnt (struct casereader *reader)
235 return reader->case_cnt;
239 casereader_count_cases__ (const struct casereader *reader,
240 casenumber max_cases)
242 struct casereader *clone = casereader_clone (reader);
243 casenumber n_cases = casereader_advance (clone, max_cases);
244 #if (__GNUC__ == 4 ) && (__GNUC_MINOR__ == 4)
247 casereader_destroy (clone);
251 /* Returns the number of cases that will be read by successive
252 calls to casereader_read for READER, assuming that no errors
253 occur. Upon an error condition, the case count drops to 0, so
254 that no more cases can be obtained.
256 For a casereader that cannot predict the number of cases it
257 will produce, this function actually reads (and discards) all
258 of the contents of a clone of READER. Thus, the return value
259 is always correct in the absence of I/O errors. */
261 casereader_count_cases (const struct casereader *reader)
263 if (reader->case_cnt == CASENUMBER_MAX)
265 struct casereader *reader_rw = CONST_CAST (struct casereader *, reader);
266 reader_rw->case_cnt = casereader_count_cases__ (reader, CASENUMBER_MAX);
268 return reader->case_cnt;
271 /* Truncates READER to at most N cases. */
273 casereader_truncate (struct casereader *reader, casenumber n)
275 /* This could be optimized, if it ever becomes too expensive, by adding a
276 "max_cases" member to struct casereader. We could also add a "truncate"
277 function to the casereader implementation, to allow the casereader to
278 throw away data that cannot ever be read. */
279 if (reader->case_cnt == CASENUMBER_MAX)
280 reader->case_cnt = casereader_count_cases__ (reader, n);
281 if (reader->case_cnt > n)
282 reader->case_cnt = n;
285 /* Returns the prototype for the cases in READER. The caller
286 must not unref the returned prototype. */
287 const struct caseproto *
288 casereader_get_proto (const struct casereader *reader)
290 return reader->proto;
293 /* Skips past N cases in READER, stopping when the last case in
294 READER has been read or on an input error. Returns the number
295 of cases successfully skipped. */
297 casereader_advance (struct casereader *reader, casenumber n)
301 for (i = 0; i < n; i++)
303 struct ccase *c = casereader_read (reader);
313 /* Copies all the cases in READER to WRITER, propagating errors
314 appropriately. READER is destroyed by this function */
316 casereader_transfer (struct casereader *reader, struct casewriter *writer)
320 taint_propagate (casereader_get_taint (reader),
321 casewriter_get_taint (writer));
322 while ((c = casereader_read (reader)) != NULL)
323 casewriter_write (writer, c);
324 casereader_destroy (reader);
327 /* Creates and returns a new casereader. This function is
328 intended for use by casereader implementations, not by
331 This function is most suited for creating a casereader for a
332 data source that is naturally sequential.
333 casereader_create_random may be more appropriate for a data
334 source that supports random access.
336 Ordinarily, specify a null pointer for TAINT, in which case
337 the new casereader will have a new, unique taint object. If
338 the new casereader should have a clone of an existing taint
339 object, specify that object as TAINT. (This is most commonly
340 useful in an implementation of the "clone" casereader_class
341 function, in which case the cloned casereader should have the
342 same taint object as the original casereader.)
344 PROTO must be the prototype for the cases that may be read
345 from the casereader. The caller retains its reference to
348 CASE_CNT is an upper limit on the number of cases that
349 casereader_read will return from the casereader in successive
350 calls. Ordinarily, this is the actual number of cases in the
351 data source or CASENUMBER_MAX if the number of cases cannot be
352 predicted in advance.
354 CLASS and AUX are a set of casereader implementation-specific
355 member functions and auxiliary data to pass to those member
356 functions, respectively. */
358 casereader_create_sequential (const struct taint *taint,
359 const struct caseproto *proto,
361 const struct casereader_class *class, void *aux)
363 struct casereader *reader = xmalloc (sizeof *reader);
364 reader->taint = taint != NULL ? taint_clone (taint) : taint_create ();
365 reader->proto = caseproto_ref (proto);
366 reader->case_cnt = case_cnt;
367 reader->class = class;
372 /* If READER is a casereader of the given CLASS, returns its
373 associated auxiliary data; otherwise, returns a null pointer.
375 This function is intended for use from casereader
376 implementations, not by casereader users. Even within
377 casereader implementations, its usefulness is quite limited,
378 for at least two reasons. First, every casereader member
379 function already receives a pointer to the casereader's
380 auxiliary data. Second, a casereader's class can change
381 (through a call to casereader_swap) and this is in practice
382 quite common (e.g. any call to casereader_clone on a
383 casereader that does not directly support clone will cause the
384 casereader to be replaced by a shim caseader). */
386 casereader_dynamic_cast (struct casereader *reader,
387 const struct casereader_class *class)
389 return reader->class == class ? reader->aux : NULL;
392 /* Random-access casereader implementation.
394 This is a set of wrappers around casereader_create_sequential
395 and struct casereader_class to make it easy to create
396 efficient casereaders for data sources that natively support
399 /* One clone of a random reader. */
402 struct random_reader_shared *shared; /* Data shared among clones. */
403 struct heap_node heap_node; /* Node in shared data's heap of readers. */
404 casenumber offset; /* Number of cases already read. */
407 /* Returns the random_reader in which the given heap_node is
409 static struct random_reader *
410 random_reader_from_heap_node (const struct heap_node *node)
412 return heap_data (node, struct random_reader, heap_node);
415 /* Data shared among clones of a random reader. */
416 struct random_reader_shared
418 struct heap *readers; /* Heap of struct random_readers. */
419 casenumber min_offset; /* Smallest offset of any random_reader. */
420 const struct casereader_random_class *class;
424 static const struct casereader_class random_reader_casereader_class;
426 /* Creates and returns a new random_reader with the given SHARED
427 data and OFFSET. Inserts the new random reader into the
429 static struct random_reader *
430 make_random_reader (struct random_reader_shared *shared, casenumber offset)
432 struct random_reader *br = xmalloc (sizeof *br);
435 heap_insert (shared->readers, &br->heap_node);
439 /* Compares random_readers A and B by offset and returns a
440 strcmp()-like result. */
442 compare_random_readers_by_offset (const struct heap_node *a_,
443 const struct heap_node *b_,
444 const void *aux UNUSED)
446 const struct random_reader *a = random_reader_from_heap_node (a_);
447 const struct random_reader *b = random_reader_from_heap_node (b_);
448 return a->offset < b->offset ? -1 : a->offset > b->offset;
451 /* Creates and returns a new casereader. This function is
452 intended for use by casereader implementations, not by
455 This function is most suited for creating a casereader for a
456 data source that supports random access.
457 casereader_create_sequential is more appropriate for a data
458 source that is naturally sequential.
460 PROTO must be the prototype for the cases that may be read
461 from the casereader. The caller retains its reference to
464 CASE_CNT is an upper limit on the number of cases that
465 casereader_read will return from the casereader in successive
466 calls. Ordinarily, this is the actual number of cases in the
467 data source or CASENUMBER_MAX if the number of cases cannot be
468 predicted in advance.
470 CLASS and AUX are a set of casereader implementation-specific
471 member functions and auxiliary data to pass to those member
472 functions, respectively. */
474 casereader_create_random (const struct caseproto *proto, casenumber case_cnt,
475 const struct casereader_random_class *class,
478 struct random_reader_shared *shared = xmalloc (sizeof *shared);
479 shared->readers = heap_create (compare_random_readers_by_offset, NULL);
480 shared->class = class;
482 shared->min_offset = 0;
483 return casereader_create_sequential (NULL, proto, case_cnt,
484 &random_reader_casereader_class,
485 make_random_reader (shared, 0));
488 /* Reassesses the min_offset in SHARED based on the minimum
489 offset in the heap. */
491 advance_random_reader (struct casereader *reader,
492 struct random_reader_shared *shared)
496 old = shared->min_offset;
497 new = random_reader_from_heap_node (heap_minimum (shared->readers))->offset;
501 shared->min_offset = new;
502 shared->class->advance (reader, shared->aux, new - old);
506 /* struct casereader_class "read" function for random reader. */
507 static struct ccase *
508 random_reader_read (struct casereader *reader, void *br_)
510 struct random_reader *br = br_;
511 struct random_reader_shared *shared = br->shared;
512 struct ccase *c = shared->class->read (reader, shared->aux,
513 br->offset - shared->min_offset);
517 heap_changed (shared->readers, &br->heap_node);
518 advance_random_reader (reader, shared);
523 /* struct casereader_class "destroy" function for random
526 random_reader_destroy (struct casereader *reader, void *br_)
528 struct random_reader *br = br_;
529 struct random_reader_shared *shared = br->shared;
531 heap_delete (shared->readers, &br->heap_node);
532 if (heap_is_empty (shared->readers))
534 heap_destroy (shared->readers);
535 shared->class->destroy (reader, shared->aux);
539 advance_random_reader (reader, shared);
544 /* struct casereader_class "clone" function for random reader. */
545 static struct casereader *
546 random_reader_clone (struct casereader *reader, void *br_)
548 struct random_reader *br = br_;
549 struct random_reader_shared *shared = br->shared;
550 return casereader_create_sequential (casereader_get_taint (reader),
552 casereader_get_case_cnt (reader),
553 &random_reader_casereader_class,
554 make_random_reader (shared,
558 /* struct casereader_class "peek" function for random reader. */
559 static struct ccase *
560 random_reader_peek (struct casereader *reader, void *br_, casenumber idx)
562 struct random_reader *br = br_;
563 struct random_reader_shared *shared = br->shared;
565 return shared->class->read (reader, shared->aux,
566 br->offset - shared->min_offset + idx);
569 /* Casereader class for random reader. */
570 static const struct casereader_class random_reader_casereader_class =
573 random_reader_destroy,
579 static const struct casereader_class casereader_null_class;
581 /* Returns a casereader with no cases. The casereader has the prototype
582 specified by PROTO. PROTO may be specified as a null pointer, in which case
583 the casereader has no variables. */
585 casereader_create_empty (const struct caseproto *proto_)
587 struct casereader *reader;
588 struct caseproto *proto;
590 proto = proto_ != NULL ? caseproto_ref (proto_) : caseproto_create ();
591 reader = casereader_create_sequential (NULL, proto, 0,
592 &casereader_null_class, NULL);
593 caseproto_unref (proto);
598 static struct ccase *
599 casereader_null_read (struct casereader *reader UNUSED, void *aux UNUSED)
605 casereader_null_destroy (struct casereader *reader UNUSED, void *aux UNUSED)
610 static const struct casereader_class casereader_null_class =
612 casereader_null_read,
613 casereader_null_destroy,