X-Git-Url: https://pintos-os.org/cgi-bin/gitweb.cgi?a=blobdiff_plain;f=src%2Fdata%2Fcasereader.c;h=98c10273b314dd4bbc4f81170e485136a828a756;hb=38993354cabb6fc37bb882be92f9a49e9aeb4c88;hp=e6d859c9df5c101799f086aa12b58f670b36561d;hpb=c217e77f75f84c5e31ea0f698014069cd4e78e0c;p=pspp diff --git a/src/data/casereader.c b/src/data/casereader.c index e6d859c9df..98c10273b3 100644 --- a/src/data/casereader.c +++ b/src/data/casereader.c @@ -1,5 +1,5 @@ /* PSPP - a program for statistical analysis. - Copyright (C) 2007 Free Software Foundation, Inc. + Copyright (C) 2007, 2009, 2010 Free Software Foundation, Inc. This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -16,45 +16,41 @@ #include -#include -#include +#include "data/casereader.h" +#include "data/casereader-provider.h" #include -#include -#include -#include -#include -#include -#include +#include "data/casereader-shim.h" +#include "data/casewriter.h" +#include "libpspp/assertion.h" +#include "libpspp/heap.h" +#include "libpspp/taint.h" -#include "xalloc.h" +#include "gl/xalloc.h" /* A casereader. */ struct casereader { struct taint *taint; /* Corrupted? */ - size_t value_cnt; /* Values per case. */ + struct caseproto *proto; /* Format of contained cases. */ casenumber case_cnt; /* Number of cases, CASENUMBER_MAX if unknown. */ const struct casereader_class *class; /* Class. */ void *aux; /* Auxiliary data for class. */ }; -static void insert_shim (struct casereader *); - -/* Creates a new case in C and reads the next case from READER - into it. The caller owns C and must destroy C when its data - is no longer needed. Return true if successful, false when - cases have been exhausted or upon detection of an I/O error. - In the latter case, C is set to the null case. +/* Reads and returns the next case from READER. The caller owns + the returned case and must call case_unref on it when its data + is no longer needed. Returns a null pointer if cases have + been exhausted or upon detection of an I/O error. The case returned is effectively consumed: it can never be read again through READER. If this is inconvenient, READER may be cloned in advance with casereader_clone, or casereader_peek may be used instead. */ -bool -casereader_read (struct casereader *reader, struct ccase *c) +struct ccase * +casereader_read (struct casereader *reader) { if (reader->case_cnt != 0) { @@ -69,17 +65,21 @@ casereader_read (struct casereader *reader, struct ccase *c) case_cnt after calling ->read, then this would actually drop two cases from case_cnt instead of one, and we'd lose the last case in the casereader. */ + struct ccase *c; if (reader->case_cnt != CASENUMBER_MAX) reader->case_cnt--; - if (reader->class->read (reader, reader->aux, c)) + c = reader->class->read (reader, reader->aux); + if (c != NULL) { - assert (case_get_value_cnt (c) >= reader->value_cnt); - return true; + size_t n_widths UNUSED = caseproto_get_n_widths (reader->proto); + assert (case_get_value_cnt (c) >= n_widths); + expensive_assert (caseproto_equal (case_get_proto (c), 0, + reader->proto, 0, n_widths)); + return c; } } reader->case_cnt = 0; - case_nullify (c); - return false; + return NULL; } /* Destroys READER. @@ -93,6 +93,7 @@ casereader_destroy (struct casereader *reader) { reader->class->destroy (reader, reader->aux); ok = taint_destroy (reader->taint); + caseproto_unref (reader->proto); free (reader); } return ok; @@ -104,13 +105,13 @@ casereader_destroy (struct casereader *reader) struct casereader * casereader_clone (const struct casereader *reader_) { - struct casereader *reader = (struct casereader *) reader_; + struct casereader *reader = CONST_CAST (struct casereader *, reader_); struct casereader *clone; if ( reader == NULL ) return NULL; if (reader->class->clone == NULL) - insert_shim (reader); + casereader_shim_insert (reader); clone = reader->class->clone (reader, reader->aux); assert (clone != NULL); assert (clone != reader); @@ -160,27 +161,27 @@ casereader_swap (struct casereader *a, struct casereader *b) } } -/* Creates a new case in C and reads the (IDX + 1)'th case from - READER into it. The caller owns C and must destroy C when its - data is no longer needed. Return true if successful, false - when cases have been exhausted or upon detection of an I/O - error. In the latter case, C is set to the null case. */ -bool -casereader_peek (struct casereader *reader, casenumber idx, struct ccase *c) +/* Reads and returns the (IDX + 1)'th case from READER. The + caller owns the returned case and must call case_unref on it + when it is no longer needed. Returns a null pointer if cases + have been exhausted or upon detection of an I/O error. */ +struct ccase * +casereader_peek (struct casereader *reader, casenumber idx) { if (idx < reader->case_cnt) { + struct ccase *c; if (reader->class->peek == NULL) - insert_shim (reader); - if (reader->class->peek (reader, reader->aux, idx, c)) - return true; + casereader_shim_insert (reader); + c = reader->class->peek (reader, reader->aux, idx); + if (c != NULL) + return c; else if (casereader_error (reader)) reader->case_cnt = 0; } if (reader->case_cnt > idx) reader->case_cnt = idx; - case_nullify (c); - return false; + return NULL; } /* Returns true if no cases remain to be read from READER, or if @@ -191,13 +192,18 @@ casereader_peek (struct casereader *reader, casenumber idx, struct ccase *c) bool casereader_is_empty (struct casereader *reader) { - struct ccase c; - if (reader->case_cnt == 0 || !casereader_peek (reader, 0, &c)) + if (reader->case_cnt == 0) return true; else { - case_destroy (&c); - return false; + struct ccase *c = casereader_peek (reader, 0); + if (c == NULL) + return true; + else + { + case_unref (c); + return false; + } } } @@ -248,6 +254,20 @@ casereader_get_case_cnt (struct casereader *reader) return reader->case_cnt; } +static casenumber +casereader_count_cases__ (const struct casereader *reader, + casenumber max_cases) +{ + struct casereader *clone; + casenumber n_cases; + + clone = casereader_clone (reader); + n_cases = casereader_advance (clone, max_cases); + casereader_destroy (clone); + + return n_cases; +} + /* Returns the number of cases that will be read by successive calls to casereader_read for READER, assuming that no errors occur. Upon an error condition, the case count drops to 0, so @@ -258,43 +278,69 @@ casereader_get_case_cnt (struct casereader *reader) of the contents of a clone of READER. Thus, the return value is always correct in the absence of I/O errors. */ casenumber -casereader_count_cases (struct casereader *reader) +casereader_count_cases (const struct casereader *reader) { if (reader->case_cnt == CASENUMBER_MAX) { - casenumber n_cases = 0; - struct ccase c; + struct casereader *reader_rw = CONST_CAST (struct casereader *, reader); + reader_rw->case_cnt = casereader_count_cases__ (reader, CASENUMBER_MAX); + } + return reader->case_cnt; +} - struct casereader *clone = casereader_clone (reader); +/* Truncates READER to at most N cases. */ +void +casereader_truncate (struct casereader *reader, casenumber n) +{ + /* This could be optimized, if it ever becomes too expensive, by adding a + "max_cases" member to struct casereader. We could also add a "truncate" + function to the casereader implementation, to allow the casereader to + throw away data that cannot ever be read. */ + if (reader->case_cnt == CASENUMBER_MAX) + reader->case_cnt = casereader_count_cases__ (reader, n); + if (reader->case_cnt > n) + reader->case_cnt = n; +} - for (; casereader_read (clone, &c); case_destroy (&c)) - n_cases++; +/* Returns the prototype for the cases in READER. The caller + must not unref the returned prototype. */ +const struct caseproto * +casereader_get_proto (const struct casereader *reader) +{ + return reader->proto; +} - casereader_destroy (clone); - reader->case_cnt = n_cases; +/* Skips past N cases in READER, stopping when the last case in + READER has been read or on an input error. Returns the number + of cases successfully skipped. */ +casenumber +casereader_advance (struct casereader *reader, casenumber n) +{ + casenumber i; + + for (i = 0; i < n; i++) + { + struct ccase *c = casereader_read (reader); + if (c == NULL) + break; + case_unref (c); } - return reader->case_cnt; + return i; } -/* Returns the number of struct values in each case in READER. */ -size_t -casereader_get_value_cnt (struct casereader *reader) -{ - return reader->value_cnt; -} /* Copies all the cases in READER to WRITER, propagating errors appropriately. */ void casereader_transfer (struct casereader *reader, struct casewriter *writer) { - struct ccase c; + struct ccase *c; taint_propagate (casereader_get_taint (reader), casewriter_get_taint (writer)); - while (casereader_read (reader, &c)) - casewriter_write (writer, &c); + while ((c = casereader_read (reader)) != NULL) + casewriter_write (writer, c); casereader_destroy (reader); } @@ -315,8 +361,9 @@ casereader_transfer (struct casereader *reader, struct casewriter *writer) function, in which case the cloned casereader should have the same taint object as the original casereader.) - VALUE_CNT must be the number of struct values per case read - from the casereader. + PROTO must be the prototype for the cases that may be read + from the casereader. The caller retains its reference to + PROTO. CASE_CNT is an upper limit on the number of cases that casereader_read will return from the casereader in successive @@ -329,12 +376,13 @@ casereader_transfer (struct casereader *reader, struct casewriter *writer) functions, respectively. */ struct casereader * casereader_create_sequential (const struct taint *taint, - size_t value_cnt, casenumber case_cnt, + const struct caseproto *proto, + casenumber case_cnt, const struct casereader_class *class, void *aux) { struct casereader *reader = xmalloc (sizeof *reader); reader->taint = taint != NULL ? taint_clone (taint) : taint_create (); - reader->value_cnt = value_cnt; + reader->proto = caseproto_ref (proto); reader->case_cnt = case_cnt; reader->class = class; reader->aux = aux; @@ -356,7 +404,7 @@ casereader_create_sequential (const struct taint *taint, casereader to be replaced by a shim caseader). */ void * casereader_dynamic_cast (struct casereader *reader, - struct casereader_class *class) + const struct casereader_class *class) { return reader->class == class ? reader->aux : NULL; } @@ -393,7 +441,7 @@ struct random_reader_shared void *aux; }; -static struct casereader_class random_reader_casereader_class; +static const struct casereader_class random_reader_casereader_class; /* Creates and returns a new random_reader with the given SHARED data and OFFSET. Inserts the new random reader into the @@ -429,8 +477,9 @@ compare_random_readers_by_offset (const struct heap_node *a_, casereader_create_sequential is more appropriate for a data source that is naturally sequential. - VALUE_CNT must be the number of struct values per case read - from the casereader. + PROTO must be the prototype for the cases that may be read + from the casereader. The caller retains its reference to + PROTO. CASE_CNT is an upper limit on the number of cases that casereader_read will return from the casereader in successive @@ -442,7 +491,7 @@ compare_random_readers_by_offset (const struct heap_node *a_, member functions and auxiliary data to pass to those member functions, respectively. */ struct casereader * -casereader_create_random (size_t value_cnt, casenumber case_cnt, +casereader_create_random (const struct caseproto *proto, casenumber case_cnt, const struct casereader_random_class *class, void *aux) { @@ -451,7 +500,7 @@ casereader_create_random (size_t value_cnt, casenumber case_cnt, shared->class = class; shared->aux = aux; shared->min_offset = 0; - return casereader_create_sequential (NULL, value_cnt, case_cnt, + return casereader_create_sequential (NULL, proto, case_cnt, &random_reader_casereader_class, make_random_reader (shared, 0)); } @@ -475,22 +524,20 @@ advance_random_reader (struct casereader *reader, } /* struct casereader_class "read" function for random reader. */ -static bool -random_reader_read (struct casereader *reader, void *br_, struct ccase *c) +static struct ccase * +random_reader_read (struct casereader *reader, void *br_) { struct random_reader *br = br_; struct random_reader_shared *shared = br->shared; - - if (shared->class->read (reader, shared->aux, - br->offset - shared->min_offset, c)) + struct ccase *c = shared->class->read (reader, shared->aux, + br->offset - shared->min_offset); + if (c != NULL) { br->offset++; heap_changed (shared->readers, &br->heap_node); advance_random_reader (reader, shared); - return true; } - else - return false; + return c; } /* struct casereader_class "destroy" function for random @@ -521,7 +568,7 @@ random_reader_clone (struct casereader *reader, void *br_) struct random_reader *br = br_; struct random_reader_shared *shared = br->shared; return casereader_create_sequential (casereader_get_taint (reader), - casereader_get_value_cnt (reader), + reader->proto, casereader_get_case_cnt (reader), &random_reader_casereader_class, make_random_reader (shared, @@ -529,19 +576,18 @@ random_reader_clone (struct casereader *reader, void *br_) } /* struct casereader_class "peek" function for random reader. */ -static bool -random_reader_peek (struct casereader *reader, void *br_, - casenumber idx, struct ccase *c) +static struct ccase * +random_reader_peek (struct casereader *reader, void *br_, casenumber idx) { struct random_reader *br = br_; struct random_reader_shared *shared = br->shared; return shared->class->read (reader, shared->aux, - br->offset - shared->min_offset + idx, c); + br->offset - shared->min_offset + idx); } /* Casereader class for random reader. */ -static struct casereader_class random_reader_casereader_class = +static const struct casereader_class random_reader_casereader_class = { random_reader_read, random_reader_destroy, @@ -549,106 +595,42 @@ static struct casereader_class random_reader_casereader_class = random_reader_peek, }; -/* Buffering shim for implementing clone and peek operations. - - The "clone" and "peek" operations aren't implemented by all - types of casereaders, but we have to expose a uniform - interface anyhow. We do this by interposing a buffering - casereader on top of the existing casereader on the first call - to "clone" or "peek". The buffering casereader maintains a - window of cases that spans the positions of the original - casereader and all of its clones (the "clone set"), from the - position of the casereader that has read the fewest cases to - the position of the casereader that has read the most. - - Thus, if all of the casereaders in the clone set are at - approximately the same position, only a few cases are buffered - and there is little inefficiency. If, on the other hand, one - casereader is not used to read any cases at all, but another - one is used to read all of the cases, the entire contents of - the casereader is copied into the buffer. This still might - not be so inefficient, given that case data in memory is - shared across multiple identical copies, but in the worst case - the window implementation will write cases to disk instead of - maintaining them in-memory. */ - -/* A buffering shim for a non-clonable or non-peekable - casereader. */ -struct shim - { - struct casewindow *window; /* Window of buffered cases. */ - struct casereader *subreader; /* Subordinate casereader. */ - }; + +static const struct casereader_class casereader_null_class; -static struct casereader_random_class shim_class; +/* Returns a casereader with no cases. The casereader has the prototype + specified by PROTO. PROTO may be specified as a null pointer, in which case + the casereader has no variables. */ +struct casereader * +casereader_create_empty (const struct caseproto *proto_) +{ + struct casereader *reader; + struct caseproto *proto; -/* Interposes a buffering shim atop READER. */ -static void -insert_shim (struct casereader *reader) -{ - size_t value_cnt = casereader_get_value_cnt (reader); - casenumber case_cnt = casereader_get_case_cnt (reader); - struct shim *b = xmalloc (sizeof *b); - b->window = casewindow_create (value_cnt, get_workspace_cases (value_cnt)); - b->subreader = casereader_create_random (value_cnt, case_cnt, - &shim_class, b); - casereader_swap (reader, b->subreader); - taint_propagate (casewindow_get_taint (b->window), - casereader_get_taint (reader)); - taint_propagate (casereader_get_taint (b->subreader), - casereader_get_taint (reader)); -} - -/* Ensures that B's window contains at least CASE_CNT cases. - Return true if successful, false upon reaching the end of B's - subreader or an I/O error. */ -static bool -prime_buffer (struct shim *b, casenumber case_cnt) -{ - while (casewindow_get_case_cnt (b->window) < case_cnt) - { - struct ccase tmp; - if (!casereader_read (b->subreader, &tmp)) - return false; - casewindow_push_head (b->window, &tmp); - } - return true; -} + proto = proto_ != NULL ? caseproto_ref (proto_) : caseproto_create (); + reader = casereader_create_sequential (NULL, proto, 0, + &casereader_null_class, NULL); + caseproto_unref (proto); -/* Reads the case at the given 0-based OFFSET from the front of - the window into C. Returns true if successful, false if - OFFSET is beyond the end of file or upon I/O error. */ -static bool -shim_read (struct casereader *reader UNUSED, void *b_, - casenumber offset, struct ccase *c) -{ - struct shim *b = b_; - return (prime_buffer (b, offset + 1) - && casewindow_get_case (b->window, offset, c)); + return reader; } -/* Destroys B. */ -static void -shim_destroy (struct casereader *reader UNUSED, void *b_) +static struct ccase * +casereader_null_read (struct casereader *reader UNUSED, void *aux UNUSED) { - struct shim *b = b_; - casewindow_destroy (b->window); - casereader_destroy (b->subreader); - free (b); + return NULL; } -/* Discards CNT cases from the front of B's window. */ static void -shim_advance (struct casereader *reader UNUSED, void *b_, casenumber case_cnt) +casereader_null_destroy (struct casereader *reader UNUSED, void *aux UNUSED) { - struct shim *b = b_; - casewindow_pop_tail (b->window, case_cnt); + /* Nothing to do. */ } -/* Class for the buffered reader. */ -static struct casereader_random_class shim_class = +static const struct casereader_class casereader_null_class = { - shim_read, - shim_destroy, - shim_advance, + casereader_null_read, + casereader_null_destroy, + NULL, /* clone */ + NULL, /* peek */ };