/* PSPP - a program for statistical analysis.
- Copyright (C) 2007 Free Software Foundation, Inc.
+ Copyright (C) 2007, 2009 Free Software Foundation, Inc.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
struct casereader
{
struct taint *taint; /* Corrupted? */
- size_t value_cnt; /* Values per case. */
+ struct caseproto *proto; /* Format of contained cases. */
casenumber case_cnt; /* Number of cases,
CASENUMBER_MAX if unknown. */
const struct casereader_class *class; /* Class. */
static void insert_shim (struct casereader *);
-/* Creates a new case in C and reads the next case from READER
- into it. The caller owns C and must destroy C when its data
- is no longer needed. Return true if successful, false when
- cases have been exhausted or upon detection of an I/O error.
- In the latter case, C is set to the null case.
+/* Reads and returns the next case from READER. The caller owns
+ the returned case and must call case_unref on it when its data
+ is no longer needed. Returns a null pointer if cases have
+ been exhausted or upon detection of an I/O error.
The case returned is effectively consumed: it can never be
read again through READER. If this is inconvenient, READER
may be cloned in advance with casereader_clone, or
casereader_peek may be used instead. */
-bool
-casereader_read (struct casereader *reader, struct ccase *c)
+struct ccase *
+casereader_read (struct casereader *reader)
{
- if (reader->case_cnt != 0 && reader->class->read (reader, reader->aux, c))
+ if (reader->case_cnt != 0)
{
- assert (case_get_value_cnt (c) == reader->value_cnt);
+ /* ->read may use casereader_swap to replace itself by
+ another reader and then delegate to that reader by
+ recursively calling casereader_read. Currently only
+ lazy_casereader does this and, with luck, nothing else
+ ever will.
+
+ To allow this to work, however, we must decrement
+ case_cnt before calling ->read. If we decremented
+ case_cnt after calling ->read, then this would actually
+ drop two cases from case_cnt instead of one, and we'd
+ lose the last case in the casereader. */
+ struct ccase *c;
if (reader->case_cnt != CASENUMBER_MAX)
reader->case_cnt--;
- return true;
- }
- else
- {
- reader->case_cnt = 0;
- case_nullify (c);
- return false;
+ c = reader->class->read (reader, reader->aux);
+ if (c != NULL)
+ {
+ size_t n_widths UNUSED = caseproto_get_n_widths (reader->proto);
+ assert (case_get_value_cnt (c) >= n_widths);
+ expensive_assert (caseproto_equal (case_get_proto (c), 0,
+ reader->proto, 0, n_widths));
+ return c;
+ }
}
+ reader->case_cnt = 0;
+ return NULL;
}
/* Destroys READER.
{
reader->class->destroy (reader, reader->aux);
ok = taint_destroy (reader->taint);
+ caseproto_unref (reader->proto);
free (reader);
}
return ok;
}
}
-/* Creates a new case in C and reads the (IDX + 1)'th case from
- READER into it. The caller owns C and must destroy C when its
- data is no longer needed. Return true if successful, false
- when cases have been exhausted or upon detection of an I/O
- error. In the latter case, C is set to the null case. */
-bool
-casereader_peek (struct casereader *reader, casenumber idx, struct ccase *c)
+/* Reads and returns the (IDX + 1)'th case from READER. The
+ caller owns the returned case and must call case_unref on it
+ when it is no longer needed. Returns a null pointer if cases
+ have been exhausted or upon detection of an I/O error. */
+struct ccase *
+casereader_peek (struct casereader *reader, casenumber idx)
{
if (idx < reader->case_cnt)
{
+ struct ccase *c;
if (reader->class->peek == NULL)
insert_shim (reader);
- if (reader->class->peek (reader, reader->aux, idx, c))
- return true;
+ c = reader->class->peek (reader, reader->aux, idx);
+ if (c != NULL)
+ return c;
else if (casereader_error (reader))
reader->case_cnt = 0;
}
if (reader->case_cnt > idx)
reader->case_cnt = idx;
- case_nullify (c);
- return false;
+ return NULL;
+}
+
+/* Returns true if no cases remain to be read from READER, or if
+ an error has occurred on READER. (A return value of false
+ does *not* mean that the next call to casereader_peek or
+ casereader_read will return true, because an error can occur
+ in the meantime.) */
+bool
+casereader_is_empty (struct casereader *reader)
+{
+ if (reader->case_cnt == 0)
+ return true;
+ else
+ {
+ struct ccase *c = casereader_peek (reader, 0);
+ if (c == NULL)
+ return true;
+ else
+ {
+ case_unref (c);
+ return false;
+ }
+ }
}
/* Returns true if an I/O error or another hard error has
if (reader->case_cnt == CASENUMBER_MAX)
{
casenumber n_cases = 0;
- struct ccase c;
+ struct ccase *c;
struct casereader *clone = casereader_clone (reader);
- for (; casereader_read (clone, &c); case_destroy (&c))
+ for (; (c = casereader_read (clone)) != NULL; case_unref (c))
n_cases++;
casereader_destroy (clone);
return reader->case_cnt;
}
-/* Returns the number of struct values in each case in READER. */
-size_t
-casereader_get_value_cnt (struct casereader *reader)
+/* Returns the prototype for the cases in READER. The caller
+ must not unref the returned prototype. */
+const struct caseproto *
+casereader_get_proto (const struct casereader *reader)
{
- return reader->value_cnt;
+ return reader->proto;
}
/* Copies all the cases in READER to WRITER, propagating errors
void
casereader_transfer (struct casereader *reader, struct casewriter *writer)
{
- struct ccase c;
+ struct ccase *c;
taint_propagate (casereader_get_taint (reader),
casewriter_get_taint (writer));
- while (casereader_read (reader, &c))
- casewriter_write (writer, &c);
+ while ((c = casereader_read (reader)) != NULL)
+ casewriter_write (writer, c);
casereader_destroy (reader);
}
function, in which case the cloned casereader should have the
same taint object as the original casereader.)
- VALUE_CNT must be the number of struct values per case read
- from the casereader.
+ PROTO must be the prototype for the cases that may be read
+ from the casereader. The caller retains its reference to
+ PROTO.
CASE_CNT is an upper limit on the number of cases that
casereader_read will return from the casereader in successive
functions, respectively. */
struct casereader *
casereader_create_sequential (const struct taint *taint,
- size_t value_cnt, casenumber case_cnt,
+ const struct caseproto *proto,
+ casenumber case_cnt,
const struct casereader_class *class, void *aux)
{
struct casereader *reader = xmalloc (sizeof *reader);
reader->taint = taint != NULL ? taint_clone (taint) : taint_create ();
- reader->value_cnt = value_cnt;
+ reader->proto = caseproto_ref (proto);
reader->case_cnt = case_cnt;
reader->class = class;
reader->aux = aux;
return reader;
}
+
+/* If READER is a casereader of the given CLASS, returns its
+ associated auxiliary data; otherwise, returns a null pointer.
+
+ This function is intended for use from casereader
+ implementations, not by casereader users. Even within
+ casereader implementations, its usefulness is quite limited,
+ for at least two reasons. First, every casereader member
+ function already receives a pointer to the casereader's
+ auxiliary data. Second, a casereader's class can change
+ (through a call to casereader_swap) and this is in practice
+ quite common (e.g. any call to casereader_clone on a
+ casereader that does not directly support clone will cause the
+ casereader to be replaced by a shim caseader). */
+void *
+casereader_dynamic_cast (struct casereader *reader,
+ const struct casereader_class *class)
+{
+ return reader->class == class ? reader->aux : NULL;
+}
\f
/* Random-access casereader implementation.
void *aux;
};
-static struct casereader_class random_reader_casereader_class;
+static const struct casereader_class random_reader_casereader_class;
/* Creates and returns a new random_reader with the given SHARED
data and OFFSET. Inserts the new random reader into the
casereader_create_sequential is more appropriate for a data
source that is naturally sequential.
- VALUE_CNT must be the number of struct values per case read
- from the casereader.
+ PROTO must be the prototype for the cases that may be read
+ from the casereader. The caller retains its reference to
+ PROTO.
CASE_CNT is an upper limit on the number of cases that
casereader_read will return from the casereader in successive
member functions and auxiliary data to pass to those member
functions, respectively. */
struct casereader *
-casereader_create_random (size_t value_cnt, casenumber case_cnt,
+casereader_create_random (const struct caseproto *proto, casenumber case_cnt,
const struct casereader_random_class *class,
void *aux)
{
shared->class = class;
shared->aux = aux;
shared->min_offset = 0;
- return casereader_create_sequential (NULL, value_cnt, case_cnt,
+ return casereader_create_sequential (NULL, proto, case_cnt,
&random_reader_casereader_class,
make_random_reader (shared, 0));
}
}
/* struct casereader_class "read" function for random reader. */
-static bool
-random_reader_read (struct casereader *reader, void *br_, struct ccase *c)
+static struct ccase *
+random_reader_read (struct casereader *reader, void *br_)
{
struct random_reader *br = br_;
struct random_reader_shared *shared = br->shared;
-
- if (shared->class->read (reader, shared->aux,
- br->offset - shared->min_offset, c))
+ struct ccase *c = shared->class->read (reader, shared->aux,
+ br->offset - shared->min_offset);
+ if (c != NULL)
{
br->offset++;
heap_changed (shared->readers, &br->heap_node);
advance_random_reader (reader, shared);
- return true;
}
- else
- return false;
+ return c;
}
/* struct casereader_class "destroy" function for random
struct random_reader *br = br_;
struct random_reader_shared *shared = br->shared;
return casereader_create_sequential (casereader_get_taint (reader),
- casereader_get_value_cnt (reader),
+ reader->proto,
casereader_get_case_cnt (reader),
&random_reader_casereader_class,
make_random_reader (shared,
}
/* struct casereader_class "peek" function for random reader. */
-static bool
-random_reader_peek (struct casereader *reader, void *br_,
- casenumber idx, struct ccase *c)
+static struct ccase *
+random_reader_peek (struct casereader *reader, void *br_, casenumber idx)
{
struct random_reader *br = br_;
struct random_reader_shared *shared = br->shared;
return shared->class->read (reader, shared->aux,
- br->offset - shared->min_offset + idx, c);
+ br->offset - shared->min_offset + idx);
}
/* Casereader class for random reader. */
-static struct casereader_class random_reader_casereader_class =
+static const struct casereader_class random_reader_casereader_class =
{
random_reader_read,
random_reader_destroy,
struct casereader *subreader; /* Subordinate casereader. */
};
-static struct casereader_random_class shim_class;
+static const struct casereader_random_class shim_class;
/* Interposes a buffering shim atop READER. */
static void
insert_shim (struct casereader *reader)
{
- size_t value_cnt = casereader_get_value_cnt (reader);
+ const struct caseproto *proto = casereader_get_proto (reader);
casenumber case_cnt = casereader_get_case_cnt (reader);
struct shim *b = xmalloc (sizeof *b);
- b->window = casewindow_create (value_cnt, get_workspace_cases (value_cnt));
- b->subreader = casereader_create_random (value_cnt, case_cnt,
- &shim_class, b);
+ b->window = casewindow_create (proto, settings_get_workspace_cases (proto));
+ b->subreader = casereader_create_random (proto, case_cnt, &shim_class, b);
casereader_swap (reader, b->subreader);
taint_propagate (casewindow_get_taint (b->window),
casereader_get_taint (reader));
{
while (casewindow_get_case_cnt (b->window) < case_cnt)
{
- struct ccase tmp;
- if (!casereader_read (b->subreader, &tmp))
+ struct ccase *tmp = casereader_read (b->subreader);
+ if (tmp == NULL)
return false;
- casewindow_push_head (b->window, &tmp);
+ casewindow_push_head (b->window, tmp);
}
return true;
}
/* Reads the case at the given 0-based OFFSET from the front of
- the window into C. Returns true if successful, false if
- OFFSET is beyond the end of file or upon I/O error. */
-static bool
+ the window into C. Returns the case if successful, or a null
+ pointer if OFFSET is beyond the end of file or upon I/O error.
+ The caller must call case_unref() on the returned case when it
+ is no longer needed. */
+static struct ccase *
shim_read (struct casereader *reader UNUSED, void *b_,
- casenumber offset, struct ccase *c)
+ casenumber offset)
{
struct shim *b = b_;
- return (prime_buffer (b, offset + 1)
- && casewindow_get_case (b->window, offset, c));
+ if (!prime_buffer (b, offset + 1))
+ return NULL;
+ return casewindow_get_case (b->window, offset);
}
/* Destroys B. */
}
/* Class for the buffered reader. */
-static struct casereader_random_class shim_class =
+static const struct casereader_random_class shim_class =
{
shim_read,
shim_destroy,