treewide: Replace <name>_cnt by n_<name>s and <name>_cap by allocated_<name>.
[pspp] / src / data / casereader.c
index da49e0323a5e2df1d952dd7c88df9c0bf220923a..379e351edc781098558a24cecc0d2dc6e266f2c3 100644 (file)
@@ -1,89 +1,99 @@
-/* PSPP - computes sample statistics.
-   Copyright (C) 2007 Free Software Foundation, Inc.
+/* PSPP - a program for statistical analysis.
+   Copyright (C) 2007, 2009, 2010, 2013 Free Software Foundation, Inc.
 
-   This program is free software; you can redistribute it and/or
-   modify it under the terms of the GNU General Public License as
-   published by the Free Software Foundation; either version 2 of the
-   License, or (at your option) any later version.
+   This program is free software: you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation, either version 3 of the License, or
+   (at your option) any later version.
 
-   This program is distributed in the hope that it will be useful, but
-   WITHOUT ANY WARRANTY; without even the implied warranty of
-   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-   General Public License for more details.
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
 
    You should have received a copy of the GNU General Public License
-   along with this program; if not, write to the Free Software
-   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
-   02110-1301, USA. */
+   along with this program.  If not, see <http://www.gnu.org/licenses/>. */
 
 #include <config.h>
 
-#include <data/casereader.h>
-#include <data/casereader-provider.h>
+#include "data/casereader.h"
+#include "data/casereader-provider.h"
 
 #include <stdlib.h>
 
-#include <data/casewindow.h>
-#include <data/casewriter.h>
-#include <data/settings.h>
-#include <libpspp/assertion.h>
-#include <libpspp/heap.h>
-#include <libpspp/taint.h>
+#include "data/casereader-shim.h"
+#include "data/casewriter.h"
+#include "libpspp/assertion.h"
+#include "libpspp/heap.h"
+#include "libpspp/taint.h"
 
-#include "xalloc.h"
+#include "gl/xalloc.h"
 
 /* A casereader. */
-struct casereader 
+struct casereader
   {
     struct taint *taint;                  /* Corrupted? */
-    size_t value_cnt;                     /* Values per case. */
-    casenumber case_cnt;                  /* Number of cases,
+    struct caseproto *proto;              /* Format of contained cases. */
+    casenumber n_cases;                   /* Number of cases,
                                              CASENUMBER_MAX if unknown. */
     const struct casereader_class *class; /* Class. */
     void *aux;                            /* Auxiliary data for class. */
   };
 
-static void insert_shim (struct casereader *);
-
-/* Creates a new case in C and reads the next case from READER
-   into it.  The caller owns C and must destroy C when its data
-   is no longer needed.  Return true if successful, false when
-   cases have been exhausted or upon detection of an I/O error.
-   In the latter case, C is set to the null case.
+/* Reads and returns the next case from READER.  The caller owns
+   the returned case and must call case_unref on it when its data
+   is no longer needed.  Returns a null pointer if cases have
+   been exhausted or upon detection of an I/O error.
 
    The case returned is effectively consumed: it can never be
    read again through READER.  If this is inconvenient, READER
    may be cloned in advance with casereader_clone, or
    casereader_peek may be used instead. */
-bool
-casereader_read (struct casereader *reader, struct ccase *c) 
+struct ccase *
+casereader_read (struct casereader *reader)
 {
-  if (reader->case_cnt != 0 && reader->class->read (reader, reader->aux, c)) 
-    {
-      assert (case_get_value_cnt (c) == reader->value_cnt);
-      if (reader->case_cnt != CASENUMBER_MAX)
-        reader->case_cnt--;
-      return true; 
-    }
-  else 
+  if (reader->n_cases != 0)
     {
-      reader->case_cnt = 0;
-      case_nullify (c);
-      return false; 
+      /* ->read may use casereader_swap to replace itself by
+         another reader and then delegate to that reader by
+         recursively calling casereader_read.  Currently only
+         lazy_casereader does this and, with luck, nothing else
+         ever will.
+
+         To allow this to work, however, we must decrement
+         n_cases before calling ->read.  If we decremented
+         n_cases after calling ->read, then this would actually
+         drop two cases from n_cases instead of one, and we'd
+         lose the last case in the casereader. */
+      struct ccase *c;
+      if (reader->n_cases != CASENUMBER_MAX)
+        reader->n_cases--;
+      c = reader->class->read (reader, reader->aux);
+      if (c != NULL)
+        {
+          size_t n_widths UNUSED = caseproto_get_n_widths (reader->proto);
+          assert (case_get_n_values (c) >= n_widths);
+          expensive_assert (caseproto_equal (case_get_proto (c), 0,
+                                             reader->proto, 0, n_widths));
+          return c;
+        }
     }
+  reader->n_cases = 0;
+  return NULL;
 }
 
 /* Destroys READER.
    Returns false if an I/O error was detected on READER, true
    otherwise. */
 bool
-casereader_destroy (struct casereader *reader) 
+casereader_destroy (struct casereader *reader)
 {
   bool ok = true;
-  if (reader != NULL) 
+  if (reader != NULL)
     {
       reader->class->destroy (reader, reader->aux);
       ok = taint_destroy (reader->taint);
+      caseproto_unref (reader->proto);
       free (reader);
     }
   return ok;
@@ -93,44 +103,27 @@ casereader_destroy (struct casereader *reader)
    to read the same sequence of cases in the same order, barring
    I/O errors. */
 struct casereader *
-casereader_clone (const struct casereader *reader_) 
+casereader_clone (const struct casereader *reader_)
 {
-  struct casereader *reader = (struct casereader *) reader_;
+  struct casereader *reader = CONST_CAST (struct casereader *, reader_);
   struct casereader *clone;
+  if (reader == NULL)
+    return NULL;
 
   if (reader->class->clone == NULL)
-    insert_shim (reader);
+    casereader_shim_insert (reader);
   clone = reader->class->clone (reader, reader->aux);
   assert (clone != NULL);
   assert (clone != reader);
   return clone;
 }
 
-/* Makes a copy of ORIGINAL into *NEW1 (if NEW1 is non-null) and
-   *NEW2 (if NEW2 is non-null), then destroys ORIGINAL. */
-void
-casereader_split (struct casereader *original,
-                  struct casereader **new1, struct casereader **new2) 
-{
-  if (new1 != NULL && new2 != NULL) 
-    {
-      *new1 = casereader_rename (original);
-      *new2 = casereader_clone (*new1);
-    }
-  else if (new1 != NULL)
-    *new1 = casereader_rename (original);
-  else if (new2 != NULL)
-    *new2 = casereader_rename (original);
-  else
-    casereader_destroy (original);
-}
-
 /* Returns a copy of READER, which is itself destroyed.
    Useful for taking over ownership of a casereader, to enforce
    preventing the original owner from accessing the casereader
    again. */
 struct casereader *
-casereader_rename (struct casereader *reader) 
+casereader_rename (struct casereader *reader)
 {
   struct casereader *new = xmemdup (reader, sizeof *reader);
   free (reader);
@@ -139,9 +132,9 @@ casereader_rename (struct casereader *reader)
 
 /* Exchanges the casereaders referred to by A and B. */
 void
-casereader_swap (struct casereader *a, struct casereader *b) 
+casereader_swap (struct casereader *a, struct casereader *b)
 {
-  if (a != b) 
+  if (a != b)
     {
       struct casereader tmp = *a;
       *a = *b;
@@ -149,34 +142,57 @@ casereader_swap (struct casereader *a, struct casereader *b)
     }
 }
 
-/* Creates a new case in C and reads the (IDX + 1)'th case from
-   READER into it.  The caller owns C and must destroy C when its
-   data is no longer needed.  Return true if successful, false
-   when cases have been exhausted or upon detection of an I/O
-   error.  In the latter case, C is set to the null case. */
-bool
-casereader_peek (struct casereader *reader, casenumber idx, struct ccase *c)
+/* Reads and returns the (IDX + 1)'th case from READER.  The
+   caller owns the returned case and must call case_unref on it
+   when it is no longer needed.  Returns a null pointer if cases
+   have been exhausted or upon detection of an I/O error. */
+struct ccase *
+casereader_peek (struct casereader *reader, casenumber idx)
 {
-  if (idx < reader->case_cnt)
+  if (idx < reader->n_cases)
     {
+      struct ccase *c;
       if (reader->class->peek == NULL)
-        insert_shim (reader);
-      if (reader->class->peek (reader, reader->aux, idx, c))
+        casereader_shim_insert (reader);
+      c = reader->class->peek (reader, reader->aux, idx);
+      if (c != NULL)
+        return c;
+      else if (casereader_error (reader))
+        reader->n_cases = 0;
+    }
+  if (reader->n_cases > idx)
+    reader->n_cases = idx;
+  return NULL;
+}
+
+/* Returns true if no cases remain to be read from READER, or if
+   an error has occurred on READER.  (A return value of false
+   does *not* mean that the next call to casereader_peek or
+   casereader_read will return true, because an error can occur
+   in the meantime.) */
+bool
+casereader_is_empty (struct casereader *reader)
+{
+  if (reader->n_cases == 0)
+    return true;
+  else
+    {
+      struct ccase *c = casereader_peek (reader, 0);
+      if (c == NULL)
         return true;
-      else if (casereader_error (reader)) 
-        reader->case_cnt = 0;
+      else
+        {
+          case_unref (c);
+          return false;
+        }
     }
-  if (reader->case_cnt > idx)
-    reader->case_cnt = idx;
-  case_nullify (c);
-  return false;
 }
 
 /* Returns true if an I/O error or another hard error has
    occurred on READER, a clone of READER, or on some object on
    which READER's data has a dependency, false otherwise. */
 bool
-casereader_error (const struct casereader *reader) 
+casereader_error (const struct casereader *reader)
 {
   return taint_is_tainted (reader->taint);
 }
@@ -190,7 +206,7 @@ casereader_error (const struct casereader *reader)
    taint_propagate to propagate to the casereader's taint
    structure, which may be obtained via casereader_get_taint. */
 void
-casereader_force_error (struct casereader *reader) 
+casereader_force_error (struct casereader *reader)
 {
   taint_set_taint (reader->taint);
 }
@@ -198,7 +214,7 @@ casereader_force_error (struct casereader *reader)
 /* Returns READER's associate taint object, for use with
    taint_propagate and other taint functions. */
 const struct taint *
-casereader_get_taint (const struct casereader *reader) 
+casereader_get_taint (const struct casereader *reader)
 {
   return reader->taint;
 }
@@ -214,9 +230,23 @@ casereader_get_taint (const struct casereader *reader)
    actual number of cases in such a casereader, use
    casereader_count_cases. */
 casenumber
-casereader_get_case_cnt (struct casereader *reader) 
+casereader_get_n_cases (struct casereader *reader)
+{
+  return reader->n_cases;
+}
+
+static casenumber
+casereader_count_cases__ (const struct casereader *reader,
+                          casenumber max_cases)
 {
-  return reader->case_cnt;
+  struct casereader *clone;
+  casenumber n_cases;
+
+  clone = casereader_clone (reader);
+  n_cases = casereader_advance (clone, max_cases);
+  casereader_destroy (clone);
+
+  return n_cases;
 }
 
 /* Returns the number of cases that will be read by successive
@@ -229,43 +259,69 @@ casereader_get_case_cnt (struct casereader *reader)
    of the contents of a clone of READER.  Thus, the return value
    is always correct in the absence of I/O errors. */
 casenumber
-casereader_count_cases (struct casereader *reader)
+casereader_count_cases (const struct casereader *reader)
 {
-  if (reader->case_cnt == CASENUMBER_MAX)
+  if (reader->n_cases == CASENUMBER_MAX)
     {
-      casenumber n_cases = 0;
-      struct ccase c;
+      struct casereader *reader_rw = CONST_CAST (struct casereader *, reader);
+      reader_rw->n_cases = casereader_count_cases__ (reader, CASENUMBER_MAX);
+    }
+  return reader->n_cases;
+}
 
-      struct casereader *clone = casereader_clone (reader);
+/* Truncates READER to at most N cases. */
+void
+casereader_truncate (struct casereader *reader, casenumber n)
+{
+  /* This could be optimized, if it ever becomes too expensive, by adding a
+     "max_cases" member to struct casereader.  We could also add a "truncate"
+     function to the casereader implementation, to allow the casereader to
+     throw away data that cannot ever be read. */
+  if (reader->n_cases == CASENUMBER_MAX)
+    reader->n_cases = casereader_count_cases__ (reader, n);
+  if (reader->n_cases > n)
+    reader->n_cases = n;
+}
 
-      for (; casereader_read (clone, &c); case_destroy (&c))
-        n_cases++;
+/* Returns the prototype for the cases in READER.  The caller
+   must not unref the returned prototype. */
+const struct caseproto *
+casereader_get_proto (const struct casereader *reader)
+{
+  return reader->proto;
+}
 
-      casereader_destroy (clone);
-      reader->case_cnt = n_cases;
+/* Skips past N cases in READER, stopping when the last case in
+   READER has been read or on an input error.  Returns the number
+   of cases successfully skipped. */
+casenumber
+casereader_advance (struct casereader *reader, casenumber n)
+{
+  casenumber i;
+
+  for (i = 0; i < n; i++)
+    {
+      struct ccase *c = casereader_read (reader);
+      if (c == NULL)
+        break;
+      case_unref (c);
     }
 
-  return reader->case_cnt;
+  return i;
 }
 
-/* Returns the number of struct values in each case in READER. */
-size_t
-casereader_get_value_cnt (struct casereader *reader) 
-{
-  return reader->value_cnt;
-}
 
 /* Copies all the cases in READER to WRITER, propagating errors
-   appropriately. */
+   appropriately. READER is destroyed by this function */
 void
 casereader_transfer (struct casereader *reader, struct casewriter *writer)
 {
-  struct ccase c;
+  struct ccase *c;
 
   taint_propagate (casereader_get_taint (reader),
                    casewriter_get_taint (writer));
-  while (casereader_read (reader, &c))
-    casewriter_write (writer, &c);
+  while ((c = casereader_read (reader)) != NULL)
+    casewriter_write (writer, c);
   casereader_destroy (reader);
 }
 
@@ -286,8 +342,9 @@ casereader_transfer (struct casereader *reader, struct casewriter *writer)
    function, in which case the cloned casereader should have the
    same taint object as the original casereader.)
 
-   VALUE_CNT must be the number of struct values per case read
-   from the casereader.
+   PROTO must be the prototype for the cases that may be read
+   from the casereader.  The caller retains its reference to
+   PROTO.
 
    CASE_CNT is an upper limit on the number of cases that
    casereader_read will return from the casereader in successive
@@ -300,17 +357,38 @@ casereader_transfer (struct casereader *reader, struct casewriter *writer)
    functions, respectively. */
 struct casereader *
 casereader_create_sequential (const struct taint *taint,
-                              size_t value_cnt, casenumber case_cnt,
-                              const struct casereader_class *class, void *aux) 
+                              const struct caseproto *proto,
+                              casenumber n_cases,
+                              const struct casereader_class *class, void *aux)
 {
   struct casereader *reader = xmalloc (sizeof *reader);
   reader->taint = taint != NULL ? taint_clone (taint) : taint_create ();
-  reader->value_cnt = value_cnt;
-  reader->case_cnt = case_cnt;
+  reader->proto = caseproto_ref (proto);
+  reader->n_cases = n_cases;
   reader->class = class;
   reader->aux = aux;
   return reader;
 }
+
+/* If READER is a casereader of the given CLASS, returns its
+   associated auxiliary data; otherwise, returns a null pointer.
+
+   This function is intended for use from casereader
+   implementations, not by casereader users.  Even within
+   casereader implementations, its usefulness is quite limited,
+   for at least two reasons.  First, every casereader member
+   function already receives a pointer to the casereader's
+   auxiliary data.  Second, a casereader's class can change
+   (through a call to casereader_swap) and this is in practice
+   quite common (e.g. any call to casereader_clone on a
+   casereader that does not directly support clone will cause the
+   casereader to be replaced by a shim caseader). */
+void *
+casereader_dynamic_cast (struct casereader *reader,
+                         const struct casereader_class *class)
+{
+  return reader->class == class ? reader->aux : NULL;
+}
 \f
 /* Random-access casereader implementation.
 
@@ -330,13 +408,13 @@ struct random_reader
 /* Returns the random_reader in which the given heap_node is
    embedded. */
 static struct random_reader *
-random_reader_from_heap_node (const struct heap_node *node) 
+random_reader_from_heap_node (const struct heap_node *node)
 {
   return heap_data (node, struct random_reader, heap_node);
 }
 
 /* Data shared among clones of a random reader. */
-struct random_reader_shared 
+struct random_reader_shared
   {
     struct heap *readers;       /* Heap of struct random_readers. */
     casenumber min_offset;      /* Smallest offset of any random_reader. */
@@ -344,7 +422,7 @@ struct random_reader_shared
     void *aux;
   };
 
-static struct casereader_class random_reader_casereader_class;
+static const struct casereader_class random_reader_casereader_class;
 
 /* Creates and returns a new random_reader with the given SHARED
    data and OFFSET.  Inserts the new random reader into the
@@ -364,7 +442,7 @@ make_random_reader (struct random_reader_shared *shared, casenumber offset)
 static int
 compare_random_readers_by_offset (const struct heap_node *a_,
                                   const struct heap_node *b_,
-                                  const void *aux UNUSED) 
+                                  const void *aux UNUSED)
 {
   const struct random_reader *a = random_reader_from_heap_node (a_);
   const struct random_reader *b = random_reader_from_heap_node (b_);
@@ -380,8 +458,9 @@ compare_random_readers_by_offset (const struct heap_node *a_,
    casereader_create_sequential is more appropriate for a data
    source that is naturally sequential.
 
-   VALUE_CNT must be the number of struct values per case read
-   from the casereader.
+   PROTO must be the prototype for the cases that may be read
+   from the casereader.  The caller retains its reference to
+   PROTO.
 
    CASE_CNT is an upper limit on the number of cases that
    casereader_read will return from the casereader in successive
@@ -393,16 +472,16 @@ compare_random_readers_by_offset (const struct heap_node *a_,
    member functions and auxiliary data to pass to those member
    functions, respectively. */
 struct casereader *
-casereader_create_random (size_t value_cnt, casenumber case_cnt,
+casereader_create_random (const struct caseproto *proto, casenumber n_cases,
                           const struct casereader_random_class *class,
-                          void *aux) 
+                          void *aux)
 {
   struct random_reader_shared *shared = xmalloc (sizeof *shared);
   shared->readers = heap_create (compare_random_readers_by_offset, NULL);
   shared->class = class;
   shared->aux = aux;
   shared->min_offset = 0;
-  return casereader_create_sequential (NULL, value_cnt, case_cnt,
+  return casereader_create_sequential (NULL, proto, n_cases,
                                        &random_reader_casereader_class,
                                        make_random_reader (shared, 0));
 }
@@ -411,7 +490,7 @@ casereader_create_random (size_t value_cnt, casenumber case_cnt,
    offset in the heap.   */
 static void
 advance_random_reader (struct casereader *reader,
-                       struct random_reader_shared *shared) 
+                       struct random_reader_shared *shared)
 {
   casenumber old, new;
 
@@ -426,34 +505,32 @@ advance_random_reader (struct casereader *reader,
 }
 
 /* struct casereader_class "read" function for random reader. */
-static bool
-random_reader_read (struct casereader *reader, void *br_, struct ccase *c)
+static struct ccase *
+random_reader_read (struct casereader *reader, void *br_)
 {
   struct random_reader *br = br_;
   struct random_reader_shared *shared = br->shared;
-
-  if (shared->class->read (reader, shared->aux,
-                           br->offset - shared->min_offset, c)) 
+  struct ccase *c = shared->class->read (reader, shared->aux,
+                                         br->offset - shared->min_offset);
+  if (c != NULL)
     {
       br->offset++;
       heap_changed (shared->readers, &br->heap_node);
-      advance_random_reader (reader, shared); 
-      return true;
+      advance_random_reader (reader, shared);
     }
-  else
-    return false; 
+  return c;
 }
 
 /* struct casereader_class "destroy" function for random
    reader. */
 static void
-random_reader_destroy (struct casereader *reader, void *br_) 
+random_reader_destroy (struct casereader *reader, void *br_)
 {
   struct random_reader *br = br_;
   struct random_reader_shared *shared = br->shared;
 
   heap_delete (shared->readers, &br->heap_node);
-  if (heap_is_empty (shared->readers)) 
+  if (heap_is_empty (shared->readers))
     {
       heap_destroy (shared->readers);
       shared->class->destroy (reader, shared->aux);
@@ -467,32 +544,31 @@ random_reader_destroy (struct casereader *reader, void *br_)
 
 /* struct casereader_class "clone" function for random reader. */
 static struct casereader *
-random_reader_clone (struct casereader *reader, void *br_) 
+random_reader_clone (struct casereader *reader, void *br_)
 {
   struct random_reader *br = br_;
   struct random_reader_shared *shared = br->shared;
   return casereader_create_sequential (casereader_get_taint (reader),
-                                       casereader_get_value_cnt (reader),
-                                       casereader_get_case_cnt (reader),
+                                       reader->proto,
+                                       casereader_get_n_cases (reader),
                                        &random_reader_casereader_class,
                                        make_random_reader (shared,
                                                            br->offset));
 }
 
 /* struct casereader_class "peek" function for random reader. */
-static bool
-random_reader_peek (struct casereader *reader, void *br_,
-                    casenumber idx, struct ccase *c) 
+static struct ccase *
+random_reader_peek (struct casereader *reader, void *br_, casenumber idx)
 {
   struct random_reader *br = br_;
   struct random_reader_shared *shared = br->shared;
 
   return shared->class->read (reader, shared->aux,
-                              br->offset - shared->min_offset + idx, c);
+                              br->offset - shared->min_offset + idx);
 }
 
 /* Casereader class for random reader. */
-static struct casereader_class random_reader_casereader_class = 
+static const struct casereader_class random_reader_casereader_class =
   {
     random_reader_read,
     random_reader_destroy,
@@ -500,106 +576,42 @@ static struct casereader_class random_reader_casereader_class =
     random_reader_peek,
   };
 \f
-/* Buffering shim for implementing clone and peek operations.
-
-   The "clone" and "peek" operations aren't implemented by all
-   types of casereaders, but we have to expose a uniform
-   interface anyhow.  We do this by interposing a buffering
-   casereader on top of the existing casereader on the first call
-   to "clone" or "peek".  The buffering casereader maintains a
-   window of cases that spans the positions of the original
-   casereader and all of its clones (the "clone set"), from the
-   position of the casereader that has read the fewest cases to
-   the position of the casereader that has read the most.  
-
-   Thus, if all of the casereaders in the clone set are at
-   approximately the same position, only a few cases are buffered
-   and there is little inefficiency.  If, on the other hand, one
-   casereader is not used to read any cases at all, but another
-   one is used to read all of the cases, the entire contents of
-   the casereader is copied into the buffer.  This still might
-   not be so inefficient, given that case data in memory is
-   shared across multiple identical copies, but in the worst case
-   the window implementation will write cases to disk instead of
-   maintaining them in-memory. */
-
-/* A buffering shim for a non-clonable or non-peekable
-   casereader. */
-struct shim 
-  {
-    struct casewindow *window;          /* Window of buffered cases. */
-    struct casereader *subreader;       /* Subordinate casereader. */
-  };
-
-static struct casereader_random_class shim_class;
+\f
+static const struct casereader_class casereader_null_class;
 
-/* Interposes a buffering shim atop READER. */
-static void
-insert_shim (struct casereader *reader) 
+/* Returns a casereader with no cases.  The casereader has the prototype
+   specified by PROTO.  PROTO may be specified as a null pointer, in which case
+   the casereader has no variables. */
+struct casereader *
+casereader_create_empty (const struct caseproto *proto_)
 {
-  size_t value_cnt = casereader_get_value_cnt (reader);
-  casenumber case_cnt = casereader_get_case_cnt (reader);
-  struct shim *b = xmalloc (sizeof *b);
-  b->window = casewindow_create (value_cnt, get_workspace_cases (value_cnt));
-  b->subreader = casereader_create_random (value_cnt, case_cnt,
-                                           &shim_class, b);
-  casereader_swap (reader, b->subreader);
-  taint_propagate (casewindow_get_taint (b->window),
-                   casereader_get_taint (reader));
-  taint_propagate (casereader_get_taint (b->subreader),
-                   casereader_get_taint (reader));
-}
+  struct casereader *reader;
+  struct caseproto *proto;
 
-/* Ensures that B's window contains at least CASE_CNT cases.
-   Return true if successful, false upon reaching the end of B's
-   subreader or an I/O error. */
-static bool
-prime_buffer (struct shim *b, casenumber case_cnt) 
-{
-  while (casewindow_get_case_cnt (b->window) < case_cnt) 
-    {
-      struct ccase tmp;
-      if (!casereader_read (b->subreader, &tmp))
-        return false;
-      casewindow_push_head (b->window, &tmp);
-    }
-  return true;
-}
+  proto = proto_ != NULL ? caseproto_ref (proto_) : caseproto_create ();
+  reader = casereader_create_sequential (NULL, proto, 0,
+                                         &casereader_null_class, NULL);
+  caseproto_unref (proto);
 
-/* Reads the case at the given 0-based OFFSET from the front of
-   the window into C.  Returns true if successful, false if
-   OFFSET is beyond the end of file or upon I/O error. */
-static bool
-shim_read (struct casereader *reader UNUSED, void *b_,
-           casenumber offset, struct ccase *c) 
-{
-  struct shim *b = b_;
-  return (prime_buffer (b, offset + 1)
-          && casewindow_get_case (b->window, offset, c));
+  return reader;
 }
 
-/* Destroys B. */
-static void
-shim_destroy (struct casereader *reader UNUSED, void *b_) 
+static struct ccase *
+casereader_null_read (struct casereader *reader UNUSED, void *aux UNUSED)
 {
-  struct shim *b = b_;
-  casewindow_destroy (b->window);
-  casereader_destroy (b->subreader);
-  free (b);
+  return NULL;
 }
 
-/* Discards CNT cases from the front of B's window. */
 static void
-shim_advance (struct casereader *reader UNUSED, void *b_, casenumber case_cnt)
+casereader_null_destroy (struct casereader *reader UNUSED, void *aux UNUSED)
 {
-  struct shim *b = b_;
-  casewindow_pop_tail (b->window, case_cnt);
+  /* Nothing to do. */
 }
 
-/* Class for the buffered reader. */
-static struct casereader_random_class shim_class = 
+static const struct casereader_class casereader_null_class =
   {
-    shim_read,
-    shim_destroy,
-    shim_advance,
+    casereader_null_read,
+    casereader_null_destroy,
+    NULL,                       /* clone */
+    NULL,                       /* peek */
   };