X-Git-Url: https://pintos-os.org/cgi-bin/gitweb.cgi?a=blobdiff_plain;f=src%2Fdata%2Fcasereader.c;h=379e351edc781098558a24cecc0d2dc6e266f2c3;hb=refs%2Fheads%2Fctables7;hp=3d27a9192a79bd63466e25e2059ce91824e94241;hpb=9a331fe64eb814ae5c1322e21717a04fb254bf65;p=pspp diff --git a/src/data/casereader.c b/src/data/casereader.c index 3d27a9192a..379e351edc 100644 --- a/src/data/casereader.c +++ b/src/data/casereader.c @@ -1,5 +1,5 @@ /* PSPP - a program for statistical analysis. - Copyright (C) 2007, 2009 Free Software Foundation, Inc. + Copyright (C) 2007, 2009, 2010, 2013 Free Software Foundation, Inc. This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -16,33 +16,30 @@ #include -#include -#include +#include "data/casereader.h" +#include "data/casereader-provider.h" #include -#include -#include -#include -#include -#include -#include +#include "data/casereader-shim.h" +#include "data/casewriter.h" +#include "libpspp/assertion.h" +#include "libpspp/heap.h" +#include "libpspp/taint.h" -#include "xalloc.h" +#include "gl/xalloc.h" /* A casereader. */ struct casereader { struct taint *taint; /* Corrupted? */ - size_t value_cnt; /* Values per case. */ - casenumber case_cnt; /* Number of cases, + struct caseproto *proto; /* Format of contained cases. */ + casenumber n_cases; /* Number of cases, CASENUMBER_MAX if unknown. */ const struct casereader_class *class; /* Class. */ void *aux; /* Auxiliary data for class. */ }; -static void insert_shim (struct casereader *); - /* Reads and returns the next case from READER. The caller owns the returned case and must call case_unref on it when its data is no longer needed. Returns a null pointer if cases have @@ -55,7 +52,7 @@ static void insert_shim (struct casereader *); struct ccase * casereader_read (struct casereader *reader) { - if (reader->case_cnt != 0) + if (reader->n_cases != 0) { /* ->read may use casereader_swap to replace itself by another reader and then delegate to that reader by @@ -64,21 +61,24 @@ casereader_read (struct casereader *reader) ever will. To allow this to work, however, we must decrement - case_cnt before calling ->read. If we decremented - case_cnt after calling ->read, then this would actually - drop two cases from case_cnt instead of one, and we'd + n_cases before calling ->read. If we decremented + n_cases after calling ->read, then this would actually + drop two cases from n_cases instead of one, and we'd lose the last case in the casereader. */ struct ccase *c; - if (reader->case_cnt != CASENUMBER_MAX) - reader->case_cnt--; + if (reader->n_cases != CASENUMBER_MAX) + reader->n_cases--; c = reader->class->read (reader, reader->aux); if (c != NULL) { - assert (case_get_value_cnt (c) >= reader->value_cnt); + size_t n_widths UNUSED = caseproto_get_n_widths (reader->proto); + assert (case_get_n_values (c) >= n_widths); + expensive_assert (caseproto_equal (case_get_proto (c), 0, + reader->proto, 0, n_widths)); return c; } } - reader->case_cnt = 0; + reader->n_cases = 0; return NULL; } @@ -93,6 +93,7 @@ casereader_destroy (struct casereader *reader) { reader->class->destroy (reader, reader->aux); ok = taint_destroy (reader->taint); + caseproto_unref (reader->proto); free (reader); } return ok; @@ -104,38 +105,19 @@ casereader_destroy (struct casereader *reader) struct casereader * casereader_clone (const struct casereader *reader_) { - struct casereader *reader = (struct casereader *) reader_; + struct casereader *reader = CONST_CAST (struct casereader *, reader_); struct casereader *clone; - if ( reader == NULL ) + if (reader == NULL) return NULL; if (reader->class->clone == NULL) - insert_shim (reader); + casereader_shim_insert (reader); clone = reader->class->clone (reader, reader->aux); assert (clone != NULL); assert (clone != reader); return clone; } -/* Makes a copy of ORIGINAL into *NEW1 (if NEW1 is non-null) and - *NEW2 (if NEW2 is non-null), then destroys ORIGINAL. */ -void -casereader_split (struct casereader *original, - struct casereader **new1, struct casereader **new2) -{ - if (new1 != NULL && new2 != NULL) - { - *new1 = casereader_rename (original); - *new2 = casereader_clone (*new1); - } - else if (new1 != NULL) - *new1 = casereader_rename (original); - else if (new2 != NULL) - *new2 = casereader_rename (original); - else - casereader_destroy (original); -} - /* Returns a copy of READER, which is itself destroyed. Useful for taking over ownership of a casereader, to enforce preventing the original owner from accessing the casereader @@ -167,19 +149,19 @@ casereader_swap (struct casereader *a, struct casereader *b) struct ccase * casereader_peek (struct casereader *reader, casenumber idx) { - if (idx < reader->case_cnt) + if (idx < reader->n_cases) { struct ccase *c; if (reader->class->peek == NULL) - insert_shim (reader); + casereader_shim_insert (reader); c = reader->class->peek (reader, reader->aux, idx); if (c != NULL) return c; else if (casereader_error (reader)) - reader->case_cnt = 0; + reader->n_cases = 0; } - if (reader->case_cnt > idx) - reader->case_cnt = idx; + if (reader->n_cases > idx) + reader->n_cases = idx; return NULL; } @@ -191,7 +173,7 @@ casereader_peek (struct casereader *reader, casenumber idx) bool casereader_is_empty (struct casereader *reader) { - if (reader->case_cnt == 0) + if (reader->n_cases == 0) return true; else { @@ -248,9 +230,23 @@ casereader_get_taint (const struct casereader *reader) actual number of cases in such a casereader, use casereader_count_cases. */ casenumber -casereader_get_case_cnt (struct casereader *reader) +casereader_get_n_cases (struct casereader *reader) +{ + return reader->n_cases; +} + +static casenumber +casereader_count_cases__ (const struct casereader *reader, + casenumber max_cases) { - return reader->case_cnt; + struct casereader *clone; + casenumber n_cases; + + clone = casereader_clone (reader); + n_cases = casereader_advance (clone, max_cases); + casereader_destroy (clone); + + return n_cases; } /* Returns the number of cases that will be read by successive @@ -263,34 +259,60 @@ casereader_get_case_cnt (struct casereader *reader) of the contents of a clone of READER. Thus, the return value is always correct in the absence of I/O errors. */ casenumber -casereader_count_cases (struct casereader *reader) +casereader_count_cases (const struct casereader *reader) { - if (reader->case_cnt == CASENUMBER_MAX) + if (reader->n_cases == CASENUMBER_MAX) { - casenumber n_cases = 0; - struct ccase *c; + struct casereader *reader_rw = CONST_CAST (struct casereader *, reader); + reader_rw->n_cases = casereader_count_cases__ (reader, CASENUMBER_MAX); + } + return reader->n_cases; +} - struct casereader *clone = casereader_clone (reader); +/* Truncates READER to at most N cases. */ +void +casereader_truncate (struct casereader *reader, casenumber n) +{ + /* This could be optimized, if it ever becomes too expensive, by adding a + "max_cases" member to struct casereader. We could also add a "truncate" + function to the casereader implementation, to allow the casereader to + throw away data that cannot ever be read. */ + if (reader->n_cases == CASENUMBER_MAX) + reader->n_cases = casereader_count_cases__ (reader, n); + if (reader->n_cases > n) + reader->n_cases = n; +} - for (; (c = casereader_read (clone)) != NULL; case_unref (c)) - n_cases++; +/* Returns the prototype for the cases in READER. The caller + must not unref the returned prototype. */ +const struct caseproto * +casereader_get_proto (const struct casereader *reader) +{ + return reader->proto; +} - casereader_destroy (clone); - reader->case_cnt = n_cases; +/* Skips past N cases in READER, stopping when the last case in + READER has been read or on an input error. Returns the number + of cases successfully skipped. */ +casenumber +casereader_advance (struct casereader *reader, casenumber n) +{ + casenumber i; + + for (i = 0; i < n; i++) + { + struct ccase *c = casereader_read (reader); + if (c == NULL) + break; + case_unref (c); } - return reader->case_cnt; + return i; } -/* Returns the number of struct values in each case in READER. */ -size_t -casereader_get_value_cnt (struct casereader *reader) -{ - return reader->value_cnt; -} /* Copies all the cases in READER to WRITER, propagating errors - appropriately. */ + appropriately. READER is destroyed by this function */ void casereader_transfer (struct casereader *reader, struct casewriter *writer) { @@ -320,8 +342,9 @@ casereader_transfer (struct casereader *reader, struct casewriter *writer) function, in which case the cloned casereader should have the same taint object as the original casereader.) - VALUE_CNT must be the number of struct values per case read - from the casereader. + PROTO must be the prototype for the cases that may be read + from the casereader. The caller retains its reference to + PROTO. CASE_CNT is an upper limit on the number of cases that casereader_read will return from the casereader in successive @@ -334,13 +357,14 @@ casereader_transfer (struct casereader *reader, struct casewriter *writer) functions, respectively. */ struct casereader * casereader_create_sequential (const struct taint *taint, - size_t value_cnt, casenumber case_cnt, + const struct caseproto *proto, + casenumber n_cases, const struct casereader_class *class, void *aux) { struct casereader *reader = xmalloc (sizeof *reader); reader->taint = taint != NULL ? taint_clone (taint) : taint_create (); - reader->value_cnt = value_cnt; - reader->case_cnt = case_cnt; + reader->proto = caseproto_ref (proto); + reader->n_cases = n_cases; reader->class = class; reader->aux = aux; return reader; @@ -434,8 +458,9 @@ compare_random_readers_by_offset (const struct heap_node *a_, casereader_create_sequential is more appropriate for a data source that is naturally sequential. - VALUE_CNT must be the number of struct values per case read - from the casereader. + PROTO must be the prototype for the cases that may be read + from the casereader. The caller retains its reference to + PROTO. CASE_CNT is an upper limit on the number of cases that casereader_read will return from the casereader in successive @@ -447,7 +472,7 @@ compare_random_readers_by_offset (const struct heap_node *a_, member functions and auxiliary data to pass to those member functions, respectively. */ struct casereader * -casereader_create_random (size_t value_cnt, casenumber case_cnt, +casereader_create_random (const struct caseproto *proto, casenumber n_cases, const struct casereader_random_class *class, void *aux) { @@ -456,7 +481,7 @@ casereader_create_random (size_t value_cnt, casenumber case_cnt, shared->class = class; shared->aux = aux; shared->min_offset = 0; - return casereader_create_sequential (NULL, value_cnt, case_cnt, + return casereader_create_sequential (NULL, proto, n_cases, &random_reader_casereader_class, make_random_reader (shared, 0)); } @@ -524,8 +549,8 @@ random_reader_clone (struct casereader *reader, void *br_) struct random_reader *br = br_; struct random_reader_shared *shared = br->shared; return casereader_create_sequential (casereader_get_taint (reader), - casereader_get_value_cnt (reader), - casereader_get_case_cnt (reader), + reader->proto, + casereader_get_n_cases (reader), &random_reader_casereader_class, make_random_reader (shared, br->offset)); @@ -551,109 +576,42 @@ static const struct casereader_class random_reader_casereader_class = random_reader_peek, }; -/* Buffering shim for implementing clone and peek operations. - - The "clone" and "peek" operations aren't implemented by all - types of casereaders, but we have to expose a uniform - interface anyhow. We do this by interposing a buffering - casereader on top of the existing casereader on the first call - to "clone" or "peek". The buffering casereader maintains a - window of cases that spans the positions of the original - casereader and all of its clones (the "clone set"), from the - position of the casereader that has read the fewest cases to - the position of the casereader that has read the most. - - Thus, if all of the casereaders in the clone set are at - approximately the same position, only a few cases are buffered - and there is little inefficiency. If, on the other hand, one - casereader is not used to read any cases at all, but another - one is used to read all of the cases, the entire contents of - the casereader is copied into the buffer. This still might - not be so inefficient, given that case data in memory is - shared across multiple identical copies, but in the worst case - the window implementation will write cases to disk instead of - maintaining them in-memory. */ - -/* A buffering shim for a non-clonable or non-peekable - casereader. */ -struct shim - { - struct casewindow *window; /* Window of buffered cases. */ - struct casereader *subreader; /* Subordinate casereader. */ - }; - -static const struct casereader_random_class shim_class; + +static const struct casereader_class casereader_null_class; -/* Interposes a buffering shim atop READER. */ -static void -insert_shim (struct casereader *reader) +/* Returns a casereader with no cases. The casereader has the prototype + specified by PROTO. PROTO may be specified as a null pointer, in which case + the casereader has no variables. */ +struct casereader * +casereader_create_empty (const struct caseproto *proto_) { - size_t value_cnt = casereader_get_value_cnt (reader); - casenumber case_cnt = casereader_get_case_cnt (reader); - struct shim *b = xmalloc (sizeof *b); - b->window = casewindow_create (value_cnt, settings_get_workspace_cases (value_cnt)); - b->subreader = casereader_create_random (value_cnt, case_cnt, - &shim_class, b); - casereader_swap (reader, b->subreader); - taint_propagate (casewindow_get_taint (b->window), - casereader_get_taint (reader)); - taint_propagate (casereader_get_taint (b->subreader), - casereader_get_taint (reader)); -} + struct casereader *reader; + struct caseproto *proto; -/* Ensures that B's window contains at least CASE_CNT cases. - Return true if successful, false upon reaching the end of B's - subreader or an I/O error. */ -static bool -prime_buffer (struct shim *b, casenumber case_cnt) -{ - while (casewindow_get_case_cnt (b->window) < case_cnt) - { - struct ccase *tmp = casereader_read (b->subreader); - if (tmp == NULL) - return false; - casewindow_push_head (b->window, tmp); - } - return true; -} + proto = proto_ != NULL ? caseproto_ref (proto_) : caseproto_create (); + reader = casereader_create_sequential (NULL, proto, 0, + &casereader_null_class, NULL); + caseproto_unref (proto); -/* Reads the case at the given 0-based OFFSET from the front of - the window into C. Returns the case if successful, or a null - pointer if OFFSET is beyond the end of file or upon I/O error. - The caller must call case_unref() on the returned case when it - is no longer needed. */ -static struct ccase * -shim_read (struct casereader *reader UNUSED, void *b_, - casenumber offset) -{ - struct shim *b = b_; - if (!prime_buffer (b, offset + 1)) - return NULL; - return casewindow_get_case (b->window, offset); + return reader; } -/* Destroys B. */ -static void -shim_destroy (struct casereader *reader UNUSED, void *b_) +static struct ccase * +casereader_null_read (struct casereader *reader UNUSED, void *aux UNUSED) { - struct shim *b = b_; - casewindow_destroy (b->window); - casereader_destroy (b->subreader); - free (b); + return NULL; } -/* Discards CNT cases from the front of B's window. */ static void -shim_advance (struct casereader *reader UNUSED, void *b_, casenumber case_cnt) +casereader_null_destroy (struct casereader *reader UNUSED, void *aux UNUSED) { - struct shim *b = b_; - casewindow_pop_tail (b->window, case_cnt); + /* Nothing to do. */ } -/* Class for the buffered reader. */ -static const struct casereader_random_class shim_class = +static const struct casereader_class casereader_null_class = { - shim_read, - shim_destroy, - shim_advance, + casereader_null_read, + casereader_null_destroy, + NULL, /* clone */ + NULL, /* peek */ };