1 /* pspp - a program for statistical analysis.
2 Copyright (C) 2007, 2009, 2010, 2013 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
19 #include "data/casereader.h"
20 #include "data/casereader-provider.h"
24 #include "data/casereader-shim.h"
25 #include "data/casewriter.h"
26 #include "libpspp/assertion.h"
27 #include "libpspp/heap.h"
28 #include "libpspp/taint.h"
30 #include "gl/xalloc.h"
35 struct taint *taint; /* Corrupted? */
36 struct caseproto *proto; /* Format of contained cases. */
37 casenumber case_cnt; /* Number of cases,
38 CASENUMBER_MAX if unknown. */
39 const struct casereader_class *class; /* Class. */
40 void *aux; /* Auxiliary data for class. */
43 /* Reads and returns the next case from READER. The caller owns
44 the returned case and must call case_unref on it when its data
45 is no longer needed. Returns a null pointer if cases have
46 been exhausted or upon detection of an I/O error.
48 The case returned is effectively consumed: it can never be
49 read again through READER. If this is inconvenient, READER
50 may be cloned in advance with casereader_clone, or
51 casereader_peek may be used instead. */
53 casereader_read (struct casereader *reader)
55 if (reader->case_cnt != 0)
57 /* ->read may use casereader_swap to replace itself by
58 another reader and then delegate to that reader by
59 recursively calling casereader_read. Currently only
60 lazy_casereader does this and, with luck, nothing else
63 To allow this to work, however, we must decrement
64 case_cnt before calling ->read. If we decremented
65 case_cnt after calling ->read, then this would actually
66 drop two cases from case_cnt instead of one, and we'd
67 lose the last case in the casereader. */
69 if (reader->case_cnt != CASENUMBER_MAX)
71 c = reader->class->read (reader, reader->aux);
74 size_t n_widths UNUSED = caseproto_get_n_widths (reader->proto);
75 assert (case_get_value_cnt (c) >= n_widths);
76 expensive_assert (caseproto_equal (case_get_proto (c), 0,
77 reader->proto, 0, n_widths));
86 Returns false if an I/O error was detected on READER, true
89 casereader_destroy (struct casereader *reader)
94 reader->class->destroy (reader, reader->aux);
95 ok = taint_destroy (reader->taint);
96 caseproto_unref (reader->proto);
102 /* Returns a clone of READER. READER and its clone may be used
103 to read the same sequence of cases in the same order, barring
106 casereader_clone (const struct casereader *reader_)
108 struct casereader *reader = CONST_CAST (struct casereader *, reader_);
109 struct casereader *clone;
110 if ( reader == NULL )
113 if (reader->class->clone == NULL)
114 casereader_shim_insert (reader);
115 clone = reader->class->clone (reader, reader->aux);
116 assert (clone != NULL);
117 assert (clone != reader);
121 /* Returns a copy of READER, which is itself destroyed.
122 Useful for taking over ownership of a casereader, to enforce
123 preventing the original owner from accessing the casereader
126 casereader_rename (struct casereader *reader)
128 struct casereader *new = xmemdup (reader, sizeof *reader);
133 /* Exchanges the casereaders referred to by A and B. */
135 casereader_swap (struct casereader *a, struct casereader *b)
139 struct casereader tmp = *a;
145 /* Reads and returns the (IDX + 1)'th case from READER. The
146 caller owns the returned case and must call case_unref on it
147 when it is no longer needed. Returns a null pointer if cases
148 have been exhausted or upon detection of an I/O error. */
150 casereader_peek (struct casereader *reader, casenumber idx)
152 if (idx < reader->case_cnt)
155 if (reader->class->peek == NULL)
156 casereader_shim_insert (reader);
157 c = reader->class->peek (reader, reader->aux, idx);
160 else if (casereader_error (reader))
161 reader->case_cnt = 0;
163 if (reader->case_cnt > idx)
164 reader->case_cnt = idx;
168 /* Returns true if no cases remain to be read from READER, or if
169 an error has occurred on READER. (A return value of false
170 does *not* mean that the next call to casereader_peek or
171 casereader_read will return true, because an error can occur
174 casereader_is_empty (struct casereader *reader)
176 if (reader->case_cnt == 0)
180 struct ccase *c = casereader_peek (reader, 0);
191 /* Returns true if an I/O error or another hard error has
192 occurred on READER, a clone of READER, or on some object on
193 which READER's data has a dependency, false otherwise. */
195 casereader_error (const struct casereader *reader)
197 return taint_is_tainted (reader->taint);
200 /* Marks READER as having encountered an error.
202 Ordinarily, this function should be called by the
203 implementation of a casereader, not by the casereader's
204 client. Instead, casereader clients should usually ensure
205 that a casereader's error state is correct by using
206 taint_propagate to propagate to the casereader's taint
207 structure, which may be obtained via casereader_get_taint. */
209 casereader_force_error (struct casereader *reader)
211 taint_set_taint (reader->taint);
214 /* Returns READER's associate taint object, for use with
215 taint_propagate and other taint functions. */
217 casereader_get_taint (const struct casereader *reader)
219 return reader->taint;
222 /* Returns the number of cases that will be read by successive
223 calls to casereader_read for READER, assuming that no errors
224 occur. Upon an error condition, the case count drops to 0, so
225 that no more cases can be obtained.
227 Not all casereaders can predict the number of cases that they
228 will produce without actually reading all of them. In that
229 case, this function returns CASENUMBER_MAX. To obtain the
230 actual number of cases in such a casereader, use
231 casereader_count_cases. */
233 casereader_get_case_cnt (struct casereader *reader)
235 return reader->case_cnt;
239 casereader_count_cases__ (const struct casereader *reader,
240 casenumber max_cases)
242 struct casereader *clone;
245 /* This seems to avoid a bug in Gcc 4.4.5 where, upon
246 return from this function, the stack appeared corrupt,
247 and the program returned to the wrong address. Oddly
248 the problem only manifested itself when used in conjunction
249 with the ODS reader, in code such as:
251 GET DATA /TYPE=ODS ....
254 #if (__GNUC__ == 4 ) && (__GNUC_MINOR__ == 4)
255 volatile int x = 1; x++;
258 clone = casereader_clone (reader);
259 n_cases = casereader_advance (clone, max_cases);
260 casereader_destroy (clone);
264 /* Returns the number of cases that will be read by successive
265 calls to casereader_read for READER, assuming that no errors
266 occur. Upon an error condition, the case count drops to 0, so
267 that no more cases can be obtained.
269 For a casereader that cannot predict the number of cases it
270 will produce, this function actually reads (and discards) all
271 of the contents of a clone of READER. Thus, the return value
272 is always correct in the absence of I/O errors. */
274 casereader_count_cases (const struct casereader *reader)
276 if (reader->case_cnt == CASENUMBER_MAX)
278 struct casereader *reader_rw = CONST_CAST (struct casereader *, reader);
279 reader_rw->case_cnt = casereader_count_cases__ (reader, CASENUMBER_MAX);
281 return reader->case_cnt;
284 /* Truncates READER to at most N cases. */
286 casereader_truncate (struct casereader *reader, casenumber n)
288 /* This could be optimized, if it ever becomes too expensive, by adding a
289 "max_cases" member to struct casereader. We could also add a "truncate"
290 function to the casereader implementation, to allow the casereader to
291 throw away data that cannot ever be read. */
292 if (reader->case_cnt == CASENUMBER_MAX)
293 reader->case_cnt = casereader_count_cases__ (reader, n);
294 if (reader->case_cnt > n)
295 reader->case_cnt = n;
298 /* Returns the prototype for the cases in READER. The caller
299 must not unref the returned prototype. */
300 const struct caseproto *
301 casereader_get_proto (const struct casereader *reader)
303 return reader->proto;
306 /* Skips past N cases in READER, stopping when the last case in
307 READER has been read or on an input error. Returns the number
308 of cases successfully skipped. */
310 casereader_advance (struct casereader *reader, casenumber n)
314 for (i = 0; i < n; i++)
316 struct ccase *c = casereader_read (reader);
326 /* Copies all the cases in READER to WRITER, propagating errors
327 appropriately. READER is destroyed by this function */
329 casereader_transfer (struct casereader *reader, struct casewriter *writer)
333 taint_propagate (casereader_get_taint (reader),
334 casewriter_get_taint (writer));
335 while ((c = casereader_read (reader)) != NULL)
336 casewriter_write (writer, c);
337 casereader_destroy (reader);
340 /* Creates and returns a new casereader. This function is
341 intended for use by casereader implementations, not by
344 This function is most suited for creating a casereader for a
345 data source that is naturally sequential.
346 casereader_create_random may be more appropriate for a data
347 source that supports random access.
349 Ordinarily, specify a null pointer for TAINT, in which case
350 the new casereader will have a new, unique taint object. If
351 the new casereader should have a clone of an existing taint
352 object, specify that object as TAINT. (This is most commonly
353 useful in an implementation of the "clone" casereader_class
354 function, in which case the cloned casereader should have the
355 same taint object as the original casereader.)
357 PROTO must be the prototype for the cases that may be read
358 from the casereader. The caller retains its reference to
361 CASE_CNT is an upper limit on the number of cases that
362 casereader_read will return from the casereader in successive
363 calls. Ordinarily, this is the actual number of cases in the
364 data source or CASENUMBER_MAX if the number of cases cannot be
365 predicted in advance.
367 CLASS and AUX are a set of casereader implementation-specific
368 member functions and auxiliary data to pass to those member
369 functions, respectively. */
371 casereader_create_sequential (const struct taint *taint,
372 const struct caseproto *proto,
374 const struct casereader_class *class, void *aux)
376 struct casereader *reader = xmalloc (sizeof *reader);
377 reader->taint = taint != NULL ? taint_clone (taint) : taint_create ();
378 reader->proto = caseproto_ref (proto);
379 reader->case_cnt = case_cnt;
380 reader->class = class;
385 /* If READER is a casereader of the given CLASS, returns its
386 associated auxiliary data; otherwise, returns a null pointer.
388 This function is intended for use from casereader
389 implementations, not by casereader users. Even within
390 casereader implementations, its usefulness is quite limited,
391 for at least two reasons. First, every casereader member
392 function already receives a pointer to the casereader's
393 auxiliary data. Second, a casereader's class can change
394 (through a call to casereader_swap) and this is in practice
395 quite common (e.g. any call to casereader_clone on a
396 casereader that does not directly support clone will cause the
397 casereader to be replaced by a shim caseader). */
399 casereader_dynamic_cast (struct casereader *reader,
400 const struct casereader_class *class)
402 return reader->class == class ? reader->aux : NULL;
405 /* Random-access casereader implementation.
407 This is a set of wrappers around casereader_create_sequential
408 and struct casereader_class to make it easy to create
409 efficient casereaders for data sources that natively support
412 /* One clone of a random reader. */
415 struct random_reader_shared *shared; /* Data shared among clones. */
416 struct heap_node heap_node; /* Node in shared data's heap of readers. */
417 casenumber offset; /* Number of cases already read. */
420 /* Returns the random_reader in which the given heap_node is
422 static struct random_reader *
423 random_reader_from_heap_node (const struct heap_node *node)
425 return heap_data (node, struct random_reader, heap_node);
428 /* Data shared among clones of a random reader. */
429 struct random_reader_shared
431 struct heap *readers; /* Heap of struct random_readers. */
432 casenumber min_offset; /* Smallest offset of any random_reader. */
433 const struct casereader_random_class *class;
437 static const struct casereader_class random_reader_casereader_class;
439 /* Creates and returns a new random_reader with the given SHARED
440 data and OFFSET. Inserts the new random reader into the
442 static struct random_reader *
443 make_random_reader (struct random_reader_shared *shared, casenumber offset)
445 struct random_reader *br = xmalloc (sizeof *br);
448 heap_insert (shared->readers, &br->heap_node);
452 /* Compares random_readers A and B by offset and returns a
453 strcmp()-like result. */
455 compare_random_readers_by_offset (const struct heap_node *a_,
456 const struct heap_node *b_,
457 const void *aux UNUSED)
459 const struct random_reader *a = random_reader_from_heap_node (a_);
460 const struct random_reader *b = random_reader_from_heap_node (b_);
461 return a->offset < b->offset ? -1 : a->offset > b->offset;
464 /* Creates and returns a new casereader. This function is
465 intended for use by casereader implementations, not by
468 This function is most suited for creating a casereader for a
469 data source that supports random access.
470 casereader_create_sequential is more appropriate for a data
471 source that is naturally sequential.
473 PROTO must be the prototype for the cases that may be read
474 from the casereader. The caller retains its reference to
477 CASE_CNT is an upper limit on the number of cases that
478 casereader_read will return from the casereader in successive
479 calls. Ordinarily, this is the actual number of cases in the
480 data source or CASENUMBER_MAX if the number of cases cannot be
481 predicted in advance.
483 CLASS and AUX are a set of casereader implementation-specific
484 member functions and auxiliary data to pass to those member
485 functions, respectively. */
487 casereader_create_random (const struct caseproto *proto, casenumber case_cnt,
488 const struct casereader_random_class *class,
491 struct random_reader_shared *shared = xmalloc (sizeof *shared);
492 shared->readers = heap_create (compare_random_readers_by_offset, NULL);
493 shared->class = class;
495 shared->min_offset = 0;
496 return casereader_create_sequential (NULL, proto, case_cnt,
497 &random_reader_casereader_class,
498 make_random_reader (shared, 0));
501 /* Reassesses the min_offset in SHARED based on the minimum
502 offset in the heap. */
504 advance_random_reader (struct casereader *reader,
505 struct random_reader_shared *shared)
509 old = shared->min_offset;
510 new = random_reader_from_heap_node (heap_minimum (shared->readers))->offset;
514 shared->min_offset = new;
515 shared->class->advance (reader, shared->aux, new - old);
519 /* struct casereader_class "read" function for random reader. */
520 static struct ccase *
521 random_reader_read (struct casereader *reader, void *br_)
523 struct random_reader *br = br_;
524 struct random_reader_shared *shared = br->shared;
525 struct ccase *c = shared->class->read (reader, shared->aux,
526 br->offset - shared->min_offset);
530 heap_changed (shared->readers, &br->heap_node);
531 advance_random_reader (reader, shared);
536 /* struct casereader_class "destroy" function for random
539 random_reader_destroy (struct casereader *reader, void *br_)
541 struct random_reader *br = br_;
542 struct random_reader_shared *shared = br->shared;
544 heap_delete (shared->readers, &br->heap_node);
545 if (heap_is_empty (shared->readers))
547 heap_destroy (shared->readers);
548 shared->class->destroy (reader, shared->aux);
552 advance_random_reader (reader, shared);
557 /* struct casereader_class "clone" function for random reader. */
558 static struct casereader *
559 random_reader_clone (struct casereader *reader, void *br_)
561 struct random_reader *br = br_;
562 struct random_reader_shared *shared = br->shared;
563 return casereader_create_sequential (casereader_get_taint (reader),
565 casereader_get_case_cnt (reader),
566 &random_reader_casereader_class,
567 make_random_reader (shared,
571 /* struct casereader_class "peek" function for random reader. */
572 static struct ccase *
573 random_reader_peek (struct casereader *reader, void *br_, casenumber idx)
575 struct random_reader *br = br_;
576 struct random_reader_shared *shared = br->shared;
578 return shared->class->read (reader, shared->aux,
579 br->offset - shared->min_offset + idx);
582 /* Casereader class for random reader. */
583 static const struct casereader_class random_reader_casereader_class =
586 random_reader_destroy,
592 static const struct casereader_class casereader_null_class;
594 /* Returns a casereader with no cases. The casereader has the prototype
595 specified by PROTO. PROTO may be specified as a null pointer, in which case
596 the casereader has no variables. */
598 casereader_create_empty (const struct caseproto *proto_)
600 struct casereader *reader;
601 struct caseproto *proto;
603 proto = proto_ != NULL ? caseproto_ref (proto_) : caseproto_create ();
604 reader = casereader_create_sequential (NULL, proto, 0,
605 &casereader_null_class, NULL);
606 caseproto_unref (proto);
611 static struct ccase *
612 casereader_null_read (struct casereader *reader UNUSED, void *aux UNUSED)
618 casereader_null_destroy (struct casereader *reader UNUSED, void *aux UNUSED)
623 static const struct casereader_class casereader_null_class =
625 casereader_null_read,
626 casereader_null_destroy,