1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 2007, 2009, 2010, 2013 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
19 #include "data/casereader.h"
20 #include "data/casereader-provider.h"
24 #include "data/casereader-shim.h"
25 #include "data/casewriter.h"
26 #include "libpspp/assertion.h"
27 #include "libpspp/heap.h"
28 #include "libpspp/taint.h"
30 #include "gl/xalloc.h"
35 struct taint *taint; /* Corrupted? */
36 struct caseproto *proto; /* Format of contained cases. */
37 casenumber case_cnt; /* Number of cases,
38 CASENUMBER_MAX if unknown. */
39 const struct casereader_class *class; /* Class. */
40 void *aux; /* Auxiliary data for class. */
43 /* Reads and returns the next case from READER. The caller owns
44 the returned case and must call case_unref on it when its data
45 is no longer needed. Returns a null pointer if cases have
46 been exhausted or upon detection of an I/O error.
48 The case returned is effectively consumed: it can never be
49 read again through READER. If this is inconvenient, READER
50 may be cloned in advance with casereader_clone, or
51 casereader_peek may be used instead. */
53 casereader_read (struct casereader *reader)
55 if (reader->case_cnt != 0)
57 /* ->read may use casereader_swap to replace itself by
58 another reader and then delegate to that reader by
59 recursively calling casereader_read. Currently only
60 lazy_casereader does this and, with luck, nothing else
63 To allow this to work, however, we must decrement
64 case_cnt before calling ->read. If we decremented
65 case_cnt after calling ->read, then this would actually
66 drop two cases from case_cnt instead of one, and we'd
67 lose the last case in the casereader. */
69 if (reader->case_cnt != CASENUMBER_MAX)
71 c = reader->class->read (reader, reader->aux);
74 size_t n_widths UNUSED = caseproto_get_n_widths (reader->proto);
75 assert (case_get_value_cnt (c) >= n_widths);
76 expensive_assert (caseproto_equal (case_get_proto (c), 0,
77 reader->proto, 0, n_widths));
86 Returns false if an I/O error was detected on READER, true
89 casereader_destroy (struct casereader *reader)
94 reader->class->destroy (reader, reader->aux);
95 ok = taint_destroy (reader->taint);
96 caseproto_unref (reader->proto);
102 /* Returns a clone of READER. READER and its clone may be used
103 to read the same sequence of cases in the same order, barring
106 casereader_clone (const struct casereader *reader_)
108 struct casereader *reader = CONST_CAST (struct casereader *, reader_);
109 struct casereader *clone;
110 if ( reader == NULL )
113 if (reader->class->clone == NULL)
114 casereader_shim_insert (reader);
115 clone = reader->class->clone (reader, reader->aux);
116 assert (clone != NULL);
117 assert (clone != reader);
121 /* Returns a copy of READER, which is itself destroyed.
122 Useful for taking over ownership of a casereader, to enforce
123 preventing the original owner from accessing the casereader
126 casereader_rename (struct casereader *reader)
128 struct casereader *new = xmemdup (reader, sizeof *reader);
133 /* Exchanges the casereaders referred to by A and B. */
135 casereader_swap (struct casereader *a, struct casereader *b)
139 struct casereader tmp = *a;
145 /* Reads and returns the (IDX + 1)'th case from READER. The
146 caller owns the returned case and must call case_unref on it
147 when it is no longer needed. Returns a null pointer if cases
148 have been exhausted or upon detection of an I/O error. */
150 casereader_peek (struct casereader *reader, casenumber idx)
152 if (idx < reader->case_cnt)
155 if (reader->class->peek == NULL)
156 casereader_shim_insert (reader);
157 c = reader->class->peek (reader, reader->aux, idx);
160 else if (casereader_error (reader))
161 reader->case_cnt = 0;
163 if (reader->case_cnt > idx)
164 reader->case_cnt = idx;
168 /* Returns true if no cases remain to be read from READER, or if
169 an error has occurred on READER. (A return value of false
170 does *not* mean that the next call to casereader_peek or
171 casereader_read will return true, because an error can occur
174 casereader_is_empty (struct casereader *reader)
176 if (reader->case_cnt == 0)
180 struct ccase *c = casereader_peek (reader, 0);
191 /* Returns true if an I/O error or another hard error has
192 occurred on READER, a clone of READER, or on some object on
193 which READER's data has a dependency, false otherwise. */
195 casereader_error (const struct casereader *reader)
197 return taint_is_tainted (reader->taint);
200 /* Marks READER as having encountered an error.
202 Ordinarily, this function should be called by the
203 implementation of a casereader, not by the casereader's
204 client. Instead, casereader clients should usually ensure
205 that a casereader's error state is correct by using
206 taint_propagate to propagate to the casereader's taint
207 structure, which may be obtained via casereader_get_taint. */
209 casereader_force_error (struct casereader *reader)
211 taint_set_taint (reader->taint);
214 /* Returns READER's associate taint object, for use with
215 taint_propagate and other taint functions. */
217 casereader_get_taint (const struct casereader *reader)
219 return reader->taint;
222 /* Returns the number of cases that will be read by successive
223 calls to casereader_read for READER, assuming that no errors
224 occur. Upon an error condition, the case count drops to 0, so
225 that no more cases can be obtained.
227 Not all casereaders can predict the number of cases that they
228 will produce without actually reading all of them. In that
229 case, this function returns CASENUMBER_MAX. To obtain the
230 actual number of cases in such a casereader, use
231 casereader_count_cases. */
233 casereader_get_case_cnt (struct casereader *reader)
235 return reader->case_cnt;
239 casereader_count_cases__ (const struct casereader *reader,
240 casenumber max_cases)
242 struct casereader *clone;
245 /* This seems to avoid a bug in Gcc 4.4.5 where, upon
246 return from this function, the stack appeared corrupt,
247 and the program returned to the wrong address. Oddly
248 the problem only manifested itself when used in conjunction
249 with the ODS reader, in code such as:
251 GET DATA /TYPE=ODS ....
254 #if (__GNUC__ == 4 ) && (__GNUC_MINOR__ == 4)
255 volatile int x = 1; x++;
258 clone = casereader_clone (reader);
259 n_cases = casereader_advance (clone, max_cases);
260 casereader_destroy (clone);
265 /* Returns the number of cases that will be read by successive
266 calls to casereader_read for READER, assuming that no errors
267 occur. Upon an error condition, the case count drops to 0, so
268 that no more cases can be obtained.
270 For a casereader that cannot predict the number of cases it
271 will produce, this function actually reads (and discards) all
272 of the contents of a clone of READER. Thus, the return value
273 is always correct in the absence of I/O errors. */
275 casereader_count_cases (const struct casereader *reader)
277 if (reader->case_cnt == CASENUMBER_MAX)
279 struct casereader *reader_rw = CONST_CAST (struct casereader *, reader);
280 reader_rw->case_cnt = casereader_count_cases__ (reader, CASENUMBER_MAX);
282 return reader->case_cnt;
285 /* Truncates READER to at most N cases. */
287 casereader_truncate (struct casereader *reader, casenumber n)
289 /* This could be optimized, if it ever becomes too expensive, by adding a
290 "max_cases" member to struct casereader. We could also add a "truncate"
291 function to the casereader implementation, to allow the casereader to
292 throw away data that cannot ever be read. */
293 if (reader->case_cnt == CASENUMBER_MAX)
294 reader->case_cnt = casereader_count_cases__ (reader, n);
295 if (reader->case_cnt > n)
296 reader->case_cnt = n;
299 /* Returns the prototype for the cases in READER. The caller
300 must not unref the returned prototype. */
301 const struct caseproto *
302 casereader_get_proto (const struct casereader *reader)
304 return reader->proto;
307 /* Skips past N cases in READER, stopping when the last case in
308 READER has been read or on an input error. Returns the number
309 of cases successfully skipped. */
311 casereader_advance (struct casereader *reader, casenumber n)
315 for (i = 0; i < n; i++)
317 struct ccase *c = casereader_read (reader);
327 /* Copies all the cases in READER to WRITER, propagating errors
328 appropriately. READER is destroyed by this function */
330 casereader_transfer (struct casereader *reader, struct casewriter *writer)
334 taint_propagate (casereader_get_taint (reader),
335 casewriter_get_taint (writer));
336 while ((c = casereader_read (reader)) != NULL)
337 casewriter_write (writer, c);
338 casereader_destroy (reader);
341 /* Creates and returns a new casereader. This function is
342 intended for use by casereader implementations, not by
345 This function is most suited for creating a casereader for a
346 data source that is naturally sequential.
347 casereader_create_random may be more appropriate for a data
348 source that supports random access.
350 Ordinarily, specify a null pointer for TAINT, in which case
351 the new casereader will have a new, unique taint object. If
352 the new casereader should have a clone of an existing taint
353 object, specify that object as TAINT. (This is most commonly
354 useful in an implementation of the "clone" casereader_class
355 function, in which case the cloned casereader should have the
356 same taint object as the original casereader.)
358 PROTO must be the prototype for the cases that may be read
359 from the casereader. The caller retains its reference to
362 CASE_CNT is an upper limit on the number of cases that
363 casereader_read will return from the casereader in successive
364 calls. Ordinarily, this is the actual number of cases in the
365 data source or CASENUMBER_MAX if the number of cases cannot be
366 predicted in advance.
368 CLASS and AUX are a set of casereader implementation-specific
369 member functions and auxiliary data to pass to those member
370 functions, respectively. */
372 casereader_create_sequential (const struct taint *taint,
373 const struct caseproto *proto,
375 const struct casereader_class *class, void *aux)
377 struct casereader *reader = xmalloc (sizeof *reader);
378 reader->taint = taint != NULL ? taint_clone (taint) : taint_create ();
379 reader->proto = caseproto_ref (proto);
380 reader->case_cnt = case_cnt;
381 reader->class = class;
386 /* If READER is a casereader of the given CLASS, returns its
387 associated auxiliary data; otherwise, returns a null pointer.
389 This function is intended for use from casereader
390 implementations, not by casereader users. Even within
391 casereader implementations, its usefulness is quite limited,
392 for at least two reasons. First, every casereader member
393 function already receives a pointer to the casereader's
394 auxiliary data. Second, a casereader's class can change
395 (through a call to casereader_swap) and this is in practice
396 quite common (e.g. any call to casereader_clone on a
397 casereader that does not directly support clone will cause the
398 casereader to be replaced by a shim caseader). */
400 casereader_dynamic_cast (struct casereader *reader,
401 const struct casereader_class *class)
403 return reader->class == class ? reader->aux : NULL;
406 /* Random-access casereader implementation.
408 This is a set of wrappers around casereader_create_sequential
409 and struct casereader_class to make it easy to create
410 efficient casereaders for data sources that natively support
413 /* One clone of a random reader. */
416 struct random_reader_shared *shared; /* Data shared among clones. */
417 struct heap_node heap_node; /* Node in shared data's heap of readers. */
418 casenumber offset; /* Number of cases already read. */
421 /* Returns the random_reader in which the given heap_node is
423 static struct random_reader *
424 random_reader_from_heap_node (const struct heap_node *node)
426 return heap_data (node, struct random_reader, heap_node);
429 /* Data shared among clones of a random reader. */
430 struct random_reader_shared
432 struct heap *readers; /* Heap of struct random_readers. */
433 casenumber min_offset; /* Smallest offset of any random_reader. */
434 const struct casereader_random_class *class;
438 static const struct casereader_class random_reader_casereader_class;
440 /* Creates and returns a new random_reader with the given SHARED
441 data and OFFSET. Inserts the new random reader into the
443 static struct random_reader *
444 make_random_reader (struct random_reader_shared *shared, casenumber offset)
446 struct random_reader *br = xmalloc (sizeof *br);
449 heap_insert (shared->readers, &br->heap_node);
453 /* Compares random_readers A and B by offset and returns a
454 strcmp()-like result. */
456 compare_random_readers_by_offset (const struct heap_node *a_,
457 const struct heap_node *b_,
458 const void *aux UNUSED)
460 const struct random_reader *a = random_reader_from_heap_node (a_);
461 const struct random_reader *b = random_reader_from_heap_node (b_);
462 return a->offset < b->offset ? -1 : a->offset > b->offset;
465 /* Creates and returns a new casereader. This function is
466 intended for use by casereader implementations, not by
469 This function is most suited for creating a casereader for a
470 data source that supports random access.
471 casereader_create_sequential is more appropriate for a data
472 source that is naturally sequential.
474 PROTO must be the prototype for the cases that may be read
475 from the casereader. The caller retains its reference to
478 CASE_CNT is an upper limit on the number of cases that
479 casereader_read will return from the casereader in successive
480 calls. Ordinarily, this is the actual number of cases in the
481 data source or CASENUMBER_MAX if the number of cases cannot be
482 predicted in advance.
484 CLASS and AUX are a set of casereader implementation-specific
485 member functions and auxiliary data to pass to those member
486 functions, respectively. */
488 casereader_create_random (const struct caseproto *proto, casenumber case_cnt,
489 const struct casereader_random_class *class,
492 struct random_reader_shared *shared = xmalloc (sizeof *shared);
493 shared->readers = heap_create (compare_random_readers_by_offset, NULL);
494 shared->class = class;
496 shared->min_offset = 0;
497 return casereader_create_sequential (NULL, proto, case_cnt,
498 &random_reader_casereader_class,
499 make_random_reader (shared, 0));
502 /* Reassesses the min_offset in SHARED based on the minimum
503 offset in the heap. */
505 advance_random_reader (struct casereader *reader,
506 struct random_reader_shared *shared)
510 old = shared->min_offset;
511 new = random_reader_from_heap_node (heap_minimum (shared->readers))->offset;
515 shared->min_offset = new;
516 shared->class->advance (reader, shared->aux, new - old);
520 /* struct casereader_class "read" function for random reader. */
521 static struct ccase *
522 random_reader_read (struct casereader *reader, void *br_)
524 struct random_reader *br = br_;
525 struct random_reader_shared *shared = br->shared;
526 struct ccase *c = shared->class->read (reader, shared->aux,
527 br->offset - shared->min_offset);
531 heap_changed (shared->readers, &br->heap_node);
532 advance_random_reader (reader, shared);
537 /* struct casereader_class "destroy" function for random
540 random_reader_destroy (struct casereader *reader, void *br_)
542 struct random_reader *br = br_;
543 struct random_reader_shared *shared = br->shared;
545 heap_delete (shared->readers, &br->heap_node);
546 if (heap_is_empty (shared->readers))
548 heap_destroy (shared->readers);
549 shared->class->destroy (reader, shared->aux);
553 advance_random_reader (reader, shared);
558 /* struct casereader_class "clone" function for random reader. */
559 static struct casereader *
560 random_reader_clone (struct casereader *reader, void *br_)
562 struct random_reader *br = br_;
563 struct random_reader_shared *shared = br->shared;
564 return casereader_create_sequential (casereader_get_taint (reader),
566 casereader_get_case_cnt (reader),
567 &random_reader_casereader_class,
568 make_random_reader (shared,
572 /* struct casereader_class "peek" function for random reader. */
573 static struct ccase *
574 random_reader_peek (struct casereader *reader, void *br_, casenumber idx)
576 struct random_reader *br = br_;
577 struct random_reader_shared *shared = br->shared;
579 return shared->class->read (reader, shared->aux,
580 br->offset - shared->min_offset + idx);
583 /* Casereader class for random reader. */
584 static const struct casereader_class random_reader_casereader_class =
587 random_reader_destroy,
593 static const struct casereader_class casereader_null_class;
595 /* Returns a casereader with no cases. The casereader has the prototype
596 specified by PROTO. PROTO may be specified as a null pointer, in which case
597 the casereader has no variables. */
599 casereader_create_empty (const struct caseproto *proto_)
601 struct casereader *reader;
602 struct caseproto *proto;
604 proto = proto_ != NULL ? caseproto_ref (proto_) : caseproto_create ();
605 reader = casereader_create_sequential (NULL, proto, 0,
606 &casereader_null_class, NULL);
607 caseproto_unref (proto);
612 static struct ccase *
613 casereader_null_read (struct casereader *reader UNUSED, void *aux UNUSED)
619 casereader_null_destroy (struct casereader *reader UNUSED, void *aux UNUSED)
624 static const struct casereader_class casereader_null_class =
626 casereader_null_read,
627 casereader_null_destroy,