/* PSPP - a program for statistical analysis.
- Copyright (C) 2007, 2009 Free Software Foundation, Inc.
+ Copyright (C) 2007, 2009, 2010, 2013 Free Software Foundation, Inc.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
#include <config.h>
-#include <data/casereader.h>
-#include <data/casereader-provider.h>
+#include "data/casereader.h"
+#include "data/casereader-provider.h"
#include <stdlib.h>
-#include <data/casewindow.h>
-#include <data/casewriter.h>
-#include <data/settings.h>
-#include <libpspp/assertion.h>
-#include <libpspp/heap.h>
-#include <libpspp/taint.h>
+#include "data/casereader-shim.h"
+#include "data/casewriter.h"
+#include "libpspp/assertion.h"
+#include "libpspp/heap.h"
+#include "libpspp/taint.h"
-#include "xalloc.h"
+#include "gl/xalloc.h"
/* A casereader. */
struct casereader
{
struct taint *taint; /* Corrupted? */
- size_t value_cnt; /* Values per case. */
+ struct caseproto *proto; /* Format of contained cases. */
casenumber case_cnt; /* Number of cases,
CASENUMBER_MAX if unknown. */
const struct casereader_class *class; /* Class. */
void *aux; /* Auxiliary data for class. */
};
-static void insert_shim (struct casereader *);
-
/* Reads and returns the next case from READER. The caller owns
the returned case and must call case_unref on it when its data
is no longer needed. Returns a null pointer if cases have
c = reader->class->read (reader, reader->aux);
if (c != NULL)
{
- assert (case_get_value_cnt (c) >= reader->value_cnt);
+ size_t n_widths UNUSED = caseproto_get_n_widths (reader->proto);
+ assert (case_get_value_cnt (c) >= n_widths);
+ expensive_assert (caseproto_equal (case_get_proto (c), 0,
+ reader->proto, 0, n_widths));
return c;
}
}
{
reader->class->destroy (reader, reader->aux);
ok = taint_destroy (reader->taint);
+ caseproto_unref (reader->proto);
free (reader);
}
return ok;
struct casereader *
casereader_clone (const struct casereader *reader_)
{
- struct casereader *reader = (struct casereader *) reader_;
+ struct casereader *reader = CONST_CAST (struct casereader *, reader_);
struct casereader *clone;
if ( reader == NULL )
return NULL;
if (reader->class->clone == NULL)
- insert_shim (reader);
+ casereader_shim_insert (reader);
clone = reader->class->clone (reader, reader->aux);
assert (clone != NULL);
assert (clone != reader);
return clone;
}
-/* Makes a copy of ORIGINAL into *NEW1 (if NEW1 is non-null) and
- *NEW2 (if NEW2 is non-null), then destroys ORIGINAL. */
-void
-casereader_split (struct casereader *original,
- struct casereader **new1, struct casereader **new2)
-{
- if (new1 != NULL && new2 != NULL)
- {
- *new1 = casereader_rename (original);
- *new2 = casereader_clone (*new1);
- }
- else if (new1 != NULL)
- *new1 = casereader_rename (original);
- else if (new2 != NULL)
- *new2 = casereader_rename (original);
- else
- casereader_destroy (original);
-}
-
/* Returns a copy of READER, which is itself destroyed.
Useful for taking over ownership of a casereader, to enforce
preventing the original owner from accessing the casereader
{
struct ccase *c;
if (reader->class->peek == NULL)
- insert_shim (reader);
+ casereader_shim_insert (reader);
c = reader->class->peek (reader, reader->aux, idx);
if (c != NULL)
return c;
return reader->case_cnt;
}
+static casenumber
+casereader_count_cases__ (const struct casereader *reader,
+ casenumber max_cases)
+{
+ struct casereader *clone;
+ casenumber n_cases;
+
+ clone = casereader_clone (reader);
+ n_cases = casereader_advance (clone, max_cases);
+ casereader_destroy (clone);
+
+ return n_cases;
+}
+
/* Returns the number of cases that will be read by successive
calls to casereader_read for READER, assuming that no errors
occur. Upon an error condition, the case count drops to 0, so
of the contents of a clone of READER. Thus, the return value
is always correct in the absence of I/O errors. */
casenumber
-casereader_count_cases (struct casereader *reader)
+casereader_count_cases (const struct casereader *reader)
{
if (reader->case_cnt == CASENUMBER_MAX)
{
- casenumber n_cases = 0;
- struct ccase *c;
+ struct casereader *reader_rw = CONST_CAST (struct casereader *, reader);
+ reader_rw->case_cnt = casereader_count_cases__ (reader, CASENUMBER_MAX);
+ }
+ return reader->case_cnt;
+}
- struct casereader *clone = casereader_clone (reader);
+/* Truncates READER to at most N cases. */
+void
+casereader_truncate (struct casereader *reader, casenumber n)
+{
+ /* This could be optimized, if it ever becomes too expensive, by adding a
+ "max_cases" member to struct casereader. We could also add a "truncate"
+ function to the casereader implementation, to allow the casereader to
+ throw away data that cannot ever be read. */
+ if (reader->case_cnt == CASENUMBER_MAX)
+ reader->case_cnt = casereader_count_cases__ (reader, n);
+ if (reader->case_cnt > n)
+ reader->case_cnt = n;
+}
- for (; (c = casereader_read (clone)) != NULL; case_unref (c))
- n_cases++;
+/* Returns the prototype for the cases in READER. The caller
+ must not unref the returned prototype. */
+const struct caseproto *
+casereader_get_proto (const struct casereader *reader)
+{
+ return reader->proto;
+}
+
+/* Skips past N cases in READER, stopping when the last case in
+ READER has been read or on an input error. Returns the number
+ of cases successfully skipped. */
+casenumber
+casereader_advance (struct casereader *reader, casenumber n)
+{
+ casenumber i;
- casereader_destroy (clone);
- reader->case_cnt = n_cases;
+ for (i = 0; i < n; i++)
+ {
+ struct ccase *c = casereader_read (reader);
+ if (c == NULL)
+ break;
+ case_unref (c);
}
- return reader->case_cnt;
+ return i;
}
-/* Returns the number of struct values in each case in READER. */
-size_t
-casereader_get_value_cnt (struct casereader *reader)
-{
- return reader->value_cnt;
-}
/* Copies all the cases in READER to WRITER, propagating errors
- appropriately. */
+ appropriately. READER is destroyed by this function */
void
casereader_transfer (struct casereader *reader, struct casewriter *writer)
{
casereader_destroy (reader);
}
+struct pxd_object *
+casereader_save (const struct casereader *reader_, struct pxd *pxd)
+{
+ struct pxd_array_builder ab;
+ struct casereader *reader;
+ struct pxd_builder b;
+ struct ccase *c;
+
+ pxd_array_builder_init (&ab, pxd);
+ reader = casereader_clone (reader_);
+ while ((c = casereader_read (reader)) != NULL)
+ {
+ pxd_array_builder_add (&ab, case_save (c, pxd));
+ case_unref (c);
+ }
+ casereader_destroy (reader);
+
+ pxd_builder_init (&b, pxd);
+ pxd_builder_put_link (&b, caseproto_save (reader->proto, pxd));
+ pxd_builder_put_link (&b, pxd_array_builder_commit (&ab));
+
+ return pxd_builder_commit (&b);
+}
+
+struct casereader *
+casereader_load (struct pxd_object *object, const struct pxd *pxd)
+{
+ struct casewriter *writer;
+ struct caseproto *proto;
+ struct pxd_array array;
+ struct pxd_parser p;
+ size_t i;
+
+ pxd_parser_init (&p, object, pxd);
+
+ proto = caseproto_load (pxd_parser_get_link (&p), pxd);
+ writer = autopaging_writer_create (proto);
+ caseproto_unref (proto);
+
+ pxd_array_init (&array, pxd_parser_get_link (&p), pxd);
+ for (i = 0; i < pxd_array_size (&array); i++)
+ casewriter_write (writer, case_load (pxd_array_get (&array, i), pxd));
+ pxd_array_destroy (&array);
+
+ pxd_parser_destroy (&p);
+
+ return casewriter_make_reader (writer);
+}
+
+
/* Creates and returns a new casereader. This function is
intended for use by casereader implementations, not by
casereader clients.
function, in which case the cloned casereader should have the
same taint object as the original casereader.)
- VALUE_CNT must be the number of struct values per case read
- from the casereader.
+ PROTO must be the prototype for the cases that may be read
+ from the casereader. The caller retains its reference to
+ PROTO.
CASE_CNT is an upper limit on the number of cases that
casereader_read will return from the casereader in successive
functions, respectively. */
struct casereader *
casereader_create_sequential (const struct taint *taint,
- size_t value_cnt, casenumber case_cnt,
+ const struct caseproto *proto,
+ casenumber case_cnt,
const struct casereader_class *class, void *aux)
{
struct casereader *reader = xmalloc (sizeof *reader);
reader->taint = taint != NULL ? taint_clone (taint) : taint_create ();
- reader->value_cnt = value_cnt;
+ reader->proto = caseproto_ref (proto);
reader->case_cnt = case_cnt;
reader->class = class;
reader->aux = aux;
casereader_create_sequential is more appropriate for a data
source that is naturally sequential.
- VALUE_CNT must be the number of struct values per case read
- from the casereader.
+ PROTO must be the prototype for the cases that may be read
+ from the casereader. The caller retains its reference to
+ PROTO.
CASE_CNT is an upper limit on the number of cases that
casereader_read will return from the casereader in successive
member functions and auxiliary data to pass to those member
functions, respectively. */
struct casereader *
-casereader_create_random (size_t value_cnt, casenumber case_cnt,
+casereader_create_random (const struct caseproto *proto, casenumber case_cnt,
const struct casereader_random_class *class,
void *aux)
{
shared->class = class;
shared->aux = aux;
shared->min_offset = 0;
- return casereader_create_sequential (NULL, value_cnt, case_cnt,
+ return casereader_create_sequential (NULL, proto, case_cnt,
&random_reader_casereader_class,
make_random_reader (shared, 0));
}
struct random_reader *br = br_;
struct random_reader_shared *shared = br->shared;
return casereader_create_sequential (casereader_get_taint (reader),
- casereader_get_value_cnt (reader),
+ reader->proto,
casereader_get_case_cnt (reader),
&random_reader_casereader_class,
make_random_reader (shared,
random_reader_peek,
};
\f
-/* Buffering shim for implementing clone and peek operations.
-
- The "clone" and "peek" operations aren't implemented by all
- types of casereaders, but we have to expose a uniform
- interface anyhow. We do this by interposing a buffering
- casereader on top of the existing casereader on the first call
- to "clone" or "peek". The buffering casereader maintains a
- window of cases that spans the positions of the original
- casereader and all of its clones (the "clone set"), from the
- position of the casereader that has read the fewest cases to
- the position of the casereader that has read the most.
-
- Thus, if all of the casereaders in the clone set are at
- approximately the same position, only a few cases are buffered
- and there is little inefficiency. If, on the other hand, one
- casereader is not used to read any cases at all, but another
- one is used to read all of the cases, the entire contents of
- the casereader is copied into the buffer. This still might
- not be so inefficient, given that case data in memory is
- shared across multiple identical copies, but in the worst case
- the window implementation will write cases to disk instead of
- maintaining them in-memory. */
-
-/* A buffering shim for a non-clonable or non-peekable
- casereader. */
-struct shim
- {
- struct casewindow *window; /* Window of buffered cases. */
- struct casereader *subreader; /* Subordinate casereader. */
- };
+\f
+static const struct casereader_class casereader_null_class;
-static const struct casereader_random_class shim_class;
+/* Returns a casereader with no cases. The casereader has the prototype
+ specified by PROTO. PROTO may be specified as a null pointer, in which case
+ the casereader has no variables. */
+struct casereader *
+casereader_create_empty (const struct caseproto *proto_)
+{
+ struct casereader *reader;
+ struct caseproto *proto;
-/* Interposes a buffering shim atop READER. */
-static void
-insert_shim (struct casereader *reader)
-{
- size_t value_cnt = casereader_get_value_cnt (reader);
- casenumber case_cnt = casereader_get_case_cnt (reader);
- struct shim *b = xmalloc (sizeof *b);
- b->window = casewindow_create (value_cnt, settings_get_workspace_cases (value_cnt));
- b->subreader = casereader_create_random (value_cnt, case_cnt,
- &shim_class, b);
- casereader_swap (reader, b->subreader);
- taint_propagate (casewindow_get_taint (b->window),
- casereader_get_taint (reader));
- taint_propagate (casereader_get_taint (b->subreader),
- casereader_get_taint (reader));
-}
-
-/* Ensures that B's window contains at least CASE_CNT cases.
- Return true if successful, false upon reaching the end of B's
- subreader or an I/O error. */
-static bool
-prime_buffer (struct shim *b, casenumber case_cnt)
-{
- while (casewindow_get_case_cnt (b->window) < case_cnt)
- {
- struct ccase *tmp = casereader_read (b->subreader);
- if (tmp == NULL)
- return false;
- casewindow_push_head (b->window, tmp);
- }
- return true;
-}
+ proto = proto_ != NULL ? caseproto_ref (proto_) : caseproto_create ();
+ reader = casereader_create_sequential (NULL, proto, 0,
+ &casereader_null_class, NULL);
+ caseproto_unref (proto);
-/* Reads the case at the given 0-based OFFSET from the front of
- the window into C. Returns the case if successful, or a null
- pointer if OFFSET is beyond the end of file or upon I/O error.
- The caller must call case_unref() on the returned case when it
- is no longer needed. */
-static struct ccase *
-shim_read (struct casereader *reader UNUSED, void *b_,
- casenumber offset)
-{
- struct shim *b = b_;
- if (!prime_buffer (b, offset + 1))
- return NULL;
- return casewindow_get_case (b->window, offset);
+ return reader;
}
-/* Destroys B. */
-static void
-shim_destroy (struct casereader *reader UNUSED, void *b_)
+static struct ccase *
+casereader_null_read (struct casereader *reader UNUSED, void *aux UNUSED)
{
- struct shim *b = b_;
- casewindow_destroy (b->window);
- casereader_destroy (b->subreader);
- free (b);
+ return NULL;
}
-/* Discards CNT cases from the front of B's window. */
static void
-shim_advance (struct casereader *reader UNUSED, void *b_, casenumber case_cnt)
+casereader_null_destroy (struct casereader *reader UNUSED, void *aux UNUSED)
{
- struct shim *b = b_;
- casewindow_pop_tail (b->window, case_cnt);
+ /* Nothing to do. */
}
-/* Class for the buffered reader. */
-static const struct casereader_random_class shim_class =
+static const struct casereader_class casereader_null_class =
{
- shim_read,
- shim_destroy,
- shim_advance,
+ casereader_null_read,
+ casereader_null_destroy,
+ NULL, /* clone */
+ NULL, /* peek */
};