/* PSPP - a program for statistical analysis.
- Copyright (C) 2007, 2009 Free Software Foundation, Inc.
+ Copyright (C) 2007, 2009, 2010, 2013 Free Software Foundation, Inc.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
#include <config.h>
-#include <data/casereader.h>
-#include <data/casereader-provider.h>
+#include "data/casereader.h"
+#include "data/casereader-provider.h"
#include <stdlib.h>
-#include <data/casewindow.h>
-#include <data/casewriter.h>
-#include <data/settings.h>
-#include <libpspp/assertion.h>
-#include <libpspp/heap.h>
-#include <libpspp/taint.h>
+#include "data/casereader-shim.h"
+#include "data/casewriter.h"
+#include "libpspp/assertion.h"
+#include "libpspp/heap.h"
+#include "libpspp/taint.h"
-#include "xalloc.h"
+#include "gl/xalloc.h"
/* A casereader. */
struct casereader
{
struct taint *taint; /* Corrupted? */
struct caseproto *proto; /* Format of contained cases. */
- casenumber case_cnt; /* Number of cases,
+ casenumber n_cases; /* Number of cases,
CASENUMBER_MAX if unknown. */
const struct casereader_class *class; /* Class. */
void *aux; /* Auxiliary data for class. */
};
-static void insert_shim (struct casereader *);
-
/* Reads and returns the next case from READER. The caller owns
the returned case and must call case_unref on it when its data
is no longer needed. Returns a null pointer if cases have
struct ccase *
casereader_read (struct casereader *reader)
{
- if (reader->case_cnt != 0)
+ if (reader->n_cases != 0)
{
/* ->read may use casereader_swap to replace itself by
another reader and then delegate to that reader by
ever will.
To allow this to work, however, we must decrement
- case_cnt before calling ->read. If we decremented
- case_cnt after calling ->read, then this would actually
- drop two cases from case_cnt instead of one, and we'd
+ n_cases before calling ->read. If we decremented
+ n_cases after calling ->read, then this would actually
+ drop two cases from n_cases instead of one, and we'd
lose the last case in the casereader. */
struct ccase *c;
- if (reader->case_cnt != CASENUMBER_MAX)
- reader->case_cnt--;
+ if (reader->n_cases != CASENUMBER_MAX)
+ reader->n_cases--;
c = reader->class->read (reader, reader->aux);
if (c != NULL)
{
size_t n_widths UNUSED = caseproto_get_n_widths (reader->proto);
- assert (case_get_value_cnt (c) >= n_widths);
+ assert (case_get_n_values (c) >= n_widths);
expensive_assert (caseproto_equal (case_get_proto (c), 0,
reader->proto, 0, n_widths));
return c;
}
}
- reader->case_cnt = 0;
+ reader->n_cases = 0;
return NULL;
}
{
struct casereader *reader = CONST_CAST (struct casereader *, reader_);
struct casereader *clone;
- if ( reader == NULL )
+ if (reader == NULL)
return NULL;
if (reader->class->clone == NULL)
- insert_shim (reader);
+ casereader_shim_insert (reader);
clone = reader->class->clone (reader, reader->aux);
assert (clone != NULL);
assert (clone != reader);
return clone;
}
-/* Makes a copy of ORIGINAL into *NEW1 (if NEW1 is non-null) and
- *NEW2 (if NEW2 is non-null), then destroys ORIGINAL. */
-void
-casereader_split (struct casereader *original,
- struct casereader **new1, struct casereader **new2)
-{
- if (new1 != NULL && new2 != NULL)
- {
- *new1 = casereader_rename (original);
- *new2 = casereader_clone (*new1);
- }
- else if (new1 != NULL)
- *new1 = casereader_rename (original);
- else if (new2 != NULL)
- *new2 = casereader_rename (original);
- else
- casereader_destroy (original);
-}
-
/* Returns a copy of READER, which is itself destroyed.
Useful for taking over ownership of a casereader, to enforce
preventing the original owner from accessing the casereader
struct ccase *
casereader_peek (struct casereader *reader, casenumber idx)
{
- if (idx < reader->case_cnt)
+ if (idx < reader->n_cases)
{
struct ccase *c;
if (reader->class->peek == NULL)
- insert_shim (reader);
+ casereader_shim_insert (reader);
c = reader->class->peek (reader, reader->aux, idx);
if (c != NULL)
return c;
else if (casereader_error (reader))
- reader->case_cnt = 0;
+ reader->n_cases = 0;
}
- if (reader->case_cnt > idx)
- reader->case_cnt = idx;
+ if (reader->n_cases > idx)
+ reader->n_cases = idx;
return NULL;
}
bool
casereader_is_empty (struct casereader *reader)
{
- if (reader->case_cnt == 0)
+ if (reader->n_cases == 0)
return true;
else
{
actual number of cases in such a casereader, use
casereader_count_cases. */
casenumber
-casereader_get_case_cnt (struct casereader *reader)
+casereader_get_n_cases (struct casereader *reader)
{
- return reader->case_cnt;
+ return reader->n_cases;
}
static casenumber
-casereader_count_cases__ (struct casereader *reader, casenumber max_cases)
+casereader_count_cases__ (const struct casereader *reader,
+ casenumber max_cases)
{
struct casereader *clone;
casenumber n_cases;
of the contents of a clone of READER. Thus, the return value
is always correct in the absence of I/O errors. */
casenumber
-casereader_count_cases (struct casereader *reader)
+casereader_count_cases (const struct casereader *reader)
{
- if (reader->case_cnt == CASENUMBER_MAX)
- reader->case_cnt = casereader_count_cases__ (reader, CASENUMBER_MAX);
- return reader->case_cnt;
+ if (reader->n_cases == CASENUMBER_MAX)
+ {
+ struct casereader *reader_rw = CONST_CAST (struct casereader *, reader);
+ reader_rw->n_cases = casereader_count_cases__ (reader, CASENUMBER_MAX);
+ }
+ return reader->n_cases;
}
/* Truncates READER to at most N cases. */
"max_cases" member to struct casereader. We could also add a "truncate"
function to the casereader implementation, to allow the casereader to
throw away data that cannot ever be read. */
- if (reader->case_cnt == CASENUMBER_MAX)
- reader->case_cnt = casereader_count_cases__ (reader, n);
- if (reader->case_cnt > n)
- reader->case_cnt = n;
+ if (reader->n_cases == CASENUMBER_MAX)
+ reader->n_cases = casereader_count_cases__ (reader, n);
+ if (reader->n_cases > n)
+ reader->n_cases = n;
}
/* Returns the prototype for the cases in READER. The caller
/* Copies all the cases in READER to WRITER, propagating errors
- appropriately. */
+ appropriately. READER is destroyed by this function */
void
casereader_transfer (struct casereader *reader, struct casewriter *writer)
{
struct casereader *
casereader_create_sequential (const struct taint *taint,
const struct caseproto *proto,
- casenumber case_cnt,
+ casenumber n_cases,
const struct casereader_class *class, void *aux)
{
struct casereader *reader = xmalloc (sizeof *reader);
reader->taint = taint != NULL ? taint_clone (taint) : taint_create ();
reader->proto = caseproto_ref (proto);
- reader->case_cnt = case_cnt;
+ reader->n_cases = n_cases;
reader->class = class;
reader->aux = aux;
return reader;
member functions and auxiliary data to pass to those member
functions, respectively. */
struct casereader *
-casereader_create_random (const struct caseproto *proto, casenumber case_cnt,
+casereader_create_random (const struct caseproto *proto, casenumber n_cases,
const struct casereader_random_class *class,
void *aux)
{
shared->class = class;
shared->aux = aux;
shared->min_offset = 0;
- return casereader_create_sequential (NULL, proto, case_cnt,
+ return casereader_create_sequential (NULL, proto, n_cases,
&random_reader_casereader_class,
make_random_reader (shared, 0));
}
struct random_reader_shared *shared = br->shared;
return casereader_create_sequential (casereader_get_taint (reader),
reader->proto,
- casereader_get_case_cnt (reader),
+ casereader_get_n_cases (reader),
&random_reader_casereader_class,
make_random_reader (shared,
br->offset));
random_reader_peek,
};
\f
-/* Buffering shim for implementing clone and peek operations.
-
- The "clone" and "peek" operations aren't implemented by all
- types of casereaders, but we have to expose a uniform
- interface anyhow. We do this by interposing a buffering
- casereader on top of the existing casereader on the first call
- to "clone" or "peek". The buffering casereader maintains a
- window of cases that spans the positions of the original
- casereader and all of its clones (the "clone set"), from the
- position of the casereader that has read the fewest cases to
- the position of the casereader that has read the most.
-
- Thus, if all of the casereaders in the clone set are at
- approximately the same position, only a few cases are buffered
- and there is little inefficiency. If, on the other hand, one
- casereader is not used to read any cases at all, but another
- one is used to read all of the cases, the entire contents of
- the casereader is copied into the buffer. This still might
- not be so inefficient, given that case data in memory is
- shared across multiple identical copies, but in the worst case
- the window implementation will write cases to disk instead of
- maintaining them in-memory. */
-
-/* A buffering shim for a non-clonable or non-peekable
- casereader. */
-struct shim
- {
- struct casewindow *window; /* Window of buffered cases. */
- struct casereader *subreader; /* Subordinate casereader. */
- };
-
-static const struct casereader_random_class shim_class;
+\f
+static const struct casereader_class casereader_null_class;
-/* Interposes a buffering shim atop READER. */
-static void
-insert_shim (struct casereader *reader)
+/* Returns a casereader with no cases. The casereader has the prototype
+ specified by PROTO. PROTO may be specified as a null pointer, in which case
+ the casereader has no variables. */
+struct casereader *
+casereader_create_empty (const struct caseproto *proto_)
{
- const struct caseproto *proto = casereader_get_proto (reader);
- casenumber case_cnt = casereader_get_case_cnt (reader);
- struct shim *b = xmalloc (sizeof *b);
- b->window = casewindow_create (proto, settings_get_workspace_cases (proto));
- b->subreader = casereader_create_random (proto, case_cnt, &shim_class, b);
- casereader_swap (reader, b->subreader);
- taint_propagate (casewindow_get_taint (b->window),
- casereader_get_taint (reader));
- taint_propagate (casereader_get_taint (b->subreader),
- casereader_get_taint (reader));
-}
+ struct casereader *reader;
+ struct caseproto *proto;
-/* Ensures that B's window contains at least CASE_CNT cases.
- Return true if successful, false upon reaching the end of B's
- subreader or an I/O error. */
-static bool
-prime_buffer (struct shim *b, casenumber case_cnt)
-{
- while (casewindow_get_case_cnt (b->window) < case_cnt)
- {
- struct ccase *tmp = casereader_read (b->subreader);
- if (tmp == NULL)
- return false;
- casewindow_push_head (b->window, tmp);
- }
- return true;
-}
+ proto = proto_ != NULL ? caseproto_ref (proto_) : caseproto_create ();
+ reader = casereader_create_sequential (NULL, proto, 0,
+ &casereader_null_class, NULL);
+ caseproto_unref (proto);
-/* Reads the case at the given 0-based OFFSET from the front of
- the window into C. Returns the case if successful, or a null
- pointer if OFFSET is beyond the end of file or upon I/O error.
- The caller must call case_unref() on the returned case when it
- is no longer needed. */
-static struct ccase *
-shim_read (struct casereader *reader UNUSED, void *b_,
- casenumber offset)
-{
- struct shim *b = b_;
- if (!prime_buffer (b, offset + 1))
- return NULL;
- return casewindow_get_case (b->window, offset);
+ return reader;
}
-/* Destroys B. */
-static void
-shim_destroy (struct casereader *reader UNUSED, void *b_)
+static struct ccase *
+casereader_null_read (struct casereader *reader UNUSED, void *aux UNUSED)
{
- struct shim *b = b_;
- casewindow_destroy (b->window);
- casereader_destroy (b->subreader);
- free (b);
+ return NULL;
}
-/* Discards CNT cases from the front of B's window. */
static void
-shim_advance (struct casereader *reader UNUSED, void *b_, casenumber case_cnt)
+casereader_null_destroy (struct casereader *reader UNUSED, void *aux UNUSED)
{
- struct shim *b = b_;
- casewindow_pop_tail (b->window, case_cnt);
+ /* Nothing to do. */
}
-/* Class for the buffered reader. */
-static const struct casereader_random_class shim_class =
+static const struct casereader_class casereader_null_class =
{
- shim_read,
- shim_destroy,
- shim_advance,
+ casereader_null_read,
+ casereader_null_destroy,
+ NULL, /* clone */
+ NULL, /* peek */
};