make-file: Pass O_BINARY or O_TEXT to open() call in replace_file_start().
[pspp] / src / data / casereader.c
index da49e0323a5e2df1d952dd7c88df9c0bf220923a..578a865dfd459372d528708b866038d32ca48eb4 100644 (file)
@@ -1,89 +1,99 @@
-/* PSPP - computes sample statistics.
-   Copyright (C) 2007 Free Software Foundation, Inc.
+/* PSPP - a program for statistical analysis.
+   Copyright (C) 2007, 2009, 2010, 2013 Free Software Foundation, Inc.
 
 
-   This program is free software; you can redistribute it and/or
-   modify it under the terms of the GNU General Public License as
-   published by the Free Software Foundation; either version 2 of the
-   License, or (at your option) any later version.
+   This program is free software: you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation, either version 3 of the License, or
+   (at your option) any later version.
 
 
-   This program is distributed in the hope that it will be useful, but
-   WITHOUT ANY WARRANTY; without even the implied warranty of
-   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-   General Public License for more details.
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
 
    You should have received a copy of the GNU General Public License
 
    You should have received a copy of the GNU General Public License
-   along with this program; if not, write to the Free Software
-   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
-   02110-1301, USA. */
+   along with this program.  If not, see <http://www.gnu.org/licenses/>. */
 
 #include <config.h>
 
 
 #include <config.h>
 
-#include <data/casereader.h>
-#include <data/casereader-provider.h>
+#include "data/casereader.h"
+#include "data/casereader-provider.h"
 
 #include <stdlib.h>
 
 
 #include <stdlib.h>
 
-#include <data/casewindow.h>
-#include <data/casewriter.h>
-#include <data/settings.h>
-#include <libpspp/assertion.h>
-#include <libpspp/heap.h>
-#include <libpspp/taint.h>
+#include "data/casereader-shim.h"
+#include "data/casewriter.h"
+#include "libpspp/assertion.h"
+#include "libpspp/heap.h"
+#include "libpspp/taint.h"
 
 
-#include "xalloc.h"
+#include "gl/xalloc.h"
 
 /* A casereader. */
 
 /* A casereader. */
-struct casereader 
+struct casereader
   {
     struct taint *taint;                  /* Corrupted? */
   {
     struct taint *taint;                  /* Corrupted? */
-    size_t value_cnt;                     /* Values per case. */
+    struct caseproto *proto;              /* Format of contained cases. */
     casenumber case_cnt;                  /* Number of cases,
                                              CASENUMBER_MAX if unknown. */
     const struct casereader_class *class; /* Class. */
     void *aux;                            /* Auxiliary data for class. */
   };
 
     casenumber case_cnt;                  /* Number of cases,
                                              CASENUMBER_MAX if unknown. */
     const struct casereader_class *class; /* Class. */
     void *aux;                            /* Auxiliary data for class. */
   };
 
-static void insert_shim (struct casereader *);
-
-/* Creates a new case in C and reads the next case from READER
-   into it.  The caller owns C and must destroy C when its data
-   is no longer needed.  Return true if successful, false when
-   cases have been exhausted or upon detection of an I/O error.
-   In the latter case, C is set to the null case.
+/* Reads and returns the next case from READER.  The caller owns
+   the returned case and must call case_unref on it when its data
+   is no longer needed.  Returns a null pointer if cases have
+   been exhausted or upon detection of an I/O error.
 
    The case returned is effectively consumed: it can never be
    read again through READER.  If this is inconvenient, READER
    may be cloned in advance with casereader_clone, or
    casereader_peek may be used instead. */
 
    The case returned is effectively consumed: it can never be
    read again through READER.  If this is inconvenient, READER
    may be cloned in advance with casereader_clone, or
    casereader_peek may be used instead. */
-bool
-casereader_read (struct casereader *reader, struct ccase *c) 
+struct ccase *
+casereader_read (struct casereader *reader)
 {
 {
-  if (reader->case_cnt != 0 && reader->class->read (reader, reader->aux, c)) 
+  if (reader->case_cnt != 0)
     {
     {
-      assert (case_get_value_cnt (c) == reader->value_cnt);
+      /* ->read may use casereader_swap to replace itself by
+         another reader and then delegate to that reader by
+         recursively calling casereader_read.  Currently only
+         lazy_casereader does this and, with luck, nothing else
+         ever will.
+
+         To allow this to work, however, we must decrement
+         case_cnt before calling ->read.  If we decremented
+         case_cnt after calling ->read, then this would actually
+         drop two cases from case_cnt instead of one, and we'd
+         lose the last case in the casereader. */
+      struct ccase *c;
       if (reader->case_cnt != CASENUMBER_MAX)
         reader->case_cnt--;
       if (reader->case_cnt != CASENUMBER_MAX)
         reader->case_cnt--;
-      return true; 
-    }
-  else 
-    {
-      reader->case_cnt = 0;
-      case_nullify (c);
-      return false; 
+      c = reader->class->read (reader, reader->aux);
+      if (c != NULL)
+        {
+          size_t n_widths UNUSED = caseproto_get_n_widths (reader->proto);
+          assert (case_get_value_cnt (c) >= n_widths);
+          expensive_assert (caseproto_equal (case_get_proto (c), 0,
+                                             reader->proto, 0, n_widths));
+          return c;
+        }
     }
     }
+  reader->case_cnt = 0;
+  return NULL;
 }
 
 /* Destroys READER.
    Returns false if an I/O error was detected on READER, true
    otherwise. */
 bool
 }
 
 /* Destroys READER.
    Returns false if an I/O error was detected on READER, true
    otherwise. */
 bool
-casereader_destroy (struct casereader *reader) 
+casereader_destroy (struct casereader *reader)
 {
   bool ok = true;
 {
   bool ok = true;
-  if (reader != NULL) 
+  if (reader != NULL)
     {
       reader->class->destroy (reader, reader->aux);
       ok = taint_destroy (reader->taint);
     {
       reader->class->destroy (reader, reader->aux);
       ok = taint_destroy (reader->taint);
+      caseproto_unref (reader->proto);
       free (reader);
     }
   return ok;
       free (reader);
     }
   return ok;
@@ -93,44 +103,27 @@ casereader_destroy (struct casereader *reader)
    to read the same sequence of cases in the same order, barring
    I/O errors. */
 struct casereader *
    to read the same sequence of cases in the same order, barring
    I/O errors. */
 struct casereader *
-casereader_clone (const struct casereader *reader_) 
+casereader_clone (const struct casereader *reader_)
 {
 {
-  struct casereader *reader = (struct casereader *) reader_;
+  struct casereader *reader = CONST_CAST (struct casereader *, reader_);
   struct casereader *clone;
   struct casereader *clone;
+  if (reader == NULL)
+    return NULL;
 
   if (reader->class->clone == NULL)
 
   if (reader->class->clone == NULL)
-    insert_shim (reader);
+    casereader_shim_insert (reader);
   clone = reader->class->clone (reader, reader->aux);
   assert (clone != NULL);
   assert (clone != reader);
   return clone;
 }
 
   clone = reader->class->clone (reader, reader->aux);
   assert (clone != NULL);
   assert (clone != reader);
   return clone;
 }
 
-/* Makes a copy of ORIGINAL into *NEW1 (if NEW1 is non-null) and
-   *NEW2 (if NEW2 is non-null), then destroys ORIGINAL. */
-void
-casereader_split (struct casereader *original,
-                  struct casereader **new1, struct casereader **new2) 
-{
-  if (new1 != NULL && new2 != NULL) 
-    {
-      *new1 = casereader_rename (original);
-      *new2 = casereader_clone (*new1);
-    }
-  else if (new1 != NULL)
-    *new1 = casereader_rename (original);
-  else if (new2 != NULL)
-    *new2 = casereader_rename (original);
-  else
-    casereader_destroy (original);
-}
-
 /* Returns a copy of READER, which is itself destroyed.
    Useful for taking over ownership of a casereader, to enforce
    preventing the original owner from accessing the casereader
    again. */
 struct casereader *
 /* Returns a copy of READER, which is itself destroyed.
    Useful for taking over ownership of a casereader, to enforce
    preventing the original owner from accessing the casereader
    again. */
 struct casereader *
-casereader_rename (struct casereader *reader) 
+casereader_rename (struct casereader *reader)
 {
   struct casereader *new = xmemdup (reader, sizeof *reader);
   free (reader);
 {
   struct casereader *new = xmemdup (reader, sizeof *reader);
   free (reader);
@@ -139,9 +132,9 @@ casereader_rename (struct casereader *reader)
 
 /* Exchanges the casereaders referred to by A and B. */
 void
 
 /* Exchanges the casereaders referred to by A and B. */
 void
-casereader_swap (struct casereader *a, struct casereader *b) 
+casereader_swap (struct casereader *a, struct casereader *b)
 {
 {
-  if (a != b) 
+  if (a != b)
     {
       struct casereader tmp = *a;
       *a = *b;
     {
       struct casereader tmp = *a;
       *a = *b;
@@ -149,34 +142,57 @@ casereader_swap (struct casereader *a, struct casereader *b)
     }
 }
 
     }
 }
 
-/* Creates a new case in C and reads the (IDX + 1)'th case from
-   READER into it.  The caller owns C and must destroy C when its
-   data is no longer needed.  Return true if successful, false
-   when cases have been exhausted or upon detection of an I/O
-   error.  In the latter case, C is set to the null case. */
-bool
-casereader_peek (struct casereader *reader, casenumber idx, struct ccase *c)
+/* Reads and returns the (IDX + 1)'th case from READER.  The
+   caller owns the returned case and must call case_unref on it
+   when it is no longer needed.  Returns a null pointer if cases
+   have been exhausted or upon detection of an I/O error. */
+struct ccase *
+casereader_peek (struct casereader *reader, casenumber idx)
 {
   if (idx < reader->case_cnt)
     {
 {
   if (idx < reader->case_cnt)
     {
+      struct ccase *c;
       if (reader->class->peek == NULL)
       if (reader->class->peek == NULL)
-        insert_shim (reader);
-      if (reader->class->peek (reader, reader->aux, idx, c))
-        return true;
-      else if (casereader_error (reader)) 
+        casereader_shim_insert (reader);
+      c = reader->class->peek (reader, reader->aux, idx);
+      if (c != NULL)
+        return c;
+      else if (casereader_error (reader))
         reader->case_cnt = 0;
     }
   if (reader->case_cnt > idx)
     reader->case_cnt = idx;
         reader->case_cnt = 0;
     }
   if (reader->case_cnt > idx)
     reader->case_cnt = idx;
-  case_nullify (c);
-  return false;
+  return NULL;
+}
+
+/* Returns true if no cases remain to be read from READER, or if
+   an error has occurred on READER.  (A return value of false
+   does *not* mean that the next call to casereader_peek or
+   casereader_read will return true, because an error can occur
+   in the meantime.) */
+bool
+casereader_is_empty (struct casereader *reader)
+{
+  if (reader->case_cnt == 0)
+    return true;
+  else
+    {
+      struct ccase *c = casereader_peek (reader, 0);
+      if (c == NULL)
+        return true;
+      else
+        {
+          case_unref (c);
+          return false;
+        }
+    }
 }
 
 /* Returns true if an I/O error or another hard error has
    occurred on READER, a clone of READER, or on some object on
    which READER's data has a dependency, false otherwise. */
 bool
 }
 
 /* Returns true if an I/O error or another hard error has
    occurred on READER, a clone of READER, or on some object on
    which READER's data has a dependency, false otherwise. */
 bool
-casereader_error (const struct casereader *reader) 
+casereader_error (const struct casereader *reader)
 {
   return taint_is_tainted (reader->taint);
 }
 {
   return taint_is_tainted (reader->taint);
 }
@@ -190,7 +206,7 @@ casereader_error (const struct casereader *reader)
    taint_propagate to propagate to the casereader's taint
    structure, which may be obtained via casereader_get_taint. */
 void
    taint_propagate to propagate to the casereader's taint
    structure, which may be obtained via casereader_get_taint. */
 void
-casereader_force_error (struct casereader *reader) 
+casereader_force_error (struct casereader *reader)
 {
   taint_set_taint (reader->taint);
 }
 {
   taint_set_taint (reader->taint);
 }
@@ -198,7 +214,7 @@ casereader_force_error (struct casereader *reader)
 /* Returns READER's associate taint object, for use with
    taint_propagate and other taint functions. */
 const struct taint *
 /* Returns READER's associate taint object, for use with
    taint_propagate and other taint functions. */
 const struct taint *
-casereader_get_taint (const struct casereader *reader) 
+casereader_get_taint (const struct casereader *reader)
 {
   return reader->taint;
 }
 {
   return reader->taint;
 }
@@ -214,11 +230,25 @@ casereader_get_taint (const struct casereader *reader)
    actual number of cases in such a casereader, use
    casereader_count_cases. */
 casenumber
    actual number of cases in such a casereader, use
    casereader_count_cases. */
 casenumber
-casereader_get_case_cnt (struct casereader *reader) 
+casereader_get_case_cnt (struct casereader *reader)
 {
   return reader->case_cnt;
 }
 
 {
   return reader->case_cnt;
 }
 
+static casenumber
+casereader_count_cases__ (const struct casereader *reader,
+                          casenumber max_cases)
+{
+  struct casereader *clone;
+  casenumber n_cases;
+
+  clone = casereader_clone (reader);
+  n_cases = casereader_advance (clone, max_cases);
+  casereader_destroy (clone);
+
+  return n_cases;
+}
+
 /* Returns the number of cases that will be read by successive
    calls to casereader_read for READER, assuming that no errors
    occur.  Upon an error condition, the case count drops to 0, so
 /* Returns the number of cases that will be read by successive
    calls to casereader_read for READER, assuming that no errors
    occur.  Upon an error condition, the case count drops to 0, so
@@ -229,43 +259,69 @@ casereader_get_case_cnt (struct casereader *reader)
    of the contents of a clone of READER.  Thus, the return value
    is always correct in the absence of I/O errors. */
 casenumber
    of the contents of a clone of READER.  Thus, the return value
    is always correct in the absence of I/O errors. */
 casenumber
-casereader_count_cases (struct casereader *reader)
+casereader_count_cases (const struct casereader *reader)
 {
   if (reader->case_cnt == CASENUMBER_MAX)
     {
 {
   if (reader->case_cnt == CASENUMBER_MAX)
     {
-      casenumber n_cases = 0;
-      struct ccase c;
+      struct casereader *reader_rw = CONST_CAST (struct casereader *, reader);
+      reader_rw->case_cnt = casereader_count_cases__ (reader, CASENUMBER_MAX);
+    }
+  return reader->case_cnt;
+}
+
+/* Truncates READER to at most N cases. */
+void
+casereader_truncate (struct casereader *reader, casenumber n)
+{
+  /* This could be optimized, if it ever becomes too expensive, by adding a
+     "max_cases" member to struct casereader.  We could also add a "truncate"
+     function to the casereader implementation, to allow the casereader to
+     throw away data that cannot ever be read. */
+  if (reader->case_cnt == CASENUMBER_MAX)
+    reader->case_cnt = casereader_count_cases__ (reader, n);
+  if (reader->case_cnt > n)
+    reader->case_cnt = n;
+}
 
 
-      struct casereader *clone = casereader_clone (reader);
+/* Returns the prototype for the cases in READER.  The caller
+   must not unref the returned prototype. */
+const struct caseproto *
+casereader_get_proto (const struct casereader *reader)
+{
+  return reader->proto;
+}
 
 
-      for (; casereader_read (clone, &c); case_destroy (&c))
-        n_cases++;
+/* Skips past N cases in READER, stopping when the last case in
+   READER has been read or on an input error.  Returns the number
+   of cases successfully skipped. */
+casenumber
+casereader_advance (struct casereader *reader, casenumber n)
+{
+  casenumber i;
 
 
-      casereader_destroy (clone);
-      reader->case_cnt = n_cases;
+  for (i = 0; i < n; i++)
+    {
+      struct ccase *c = casereader_read (reader);
+      if (c == NULL)
+        break;
+      case_unref (c);
     }
 
     }
 
-  return reader->case_cnt;
+  return i;
 }
 
 }
 
-/* Returns the number of struct values in each case in READER. */
-size_t
-casereader_get_value_cnt (struct casereader *reader) 
-{
-  return reader->value_cnt;
-}
 
 /* Copies all the cases in READER to WRITER, propagating errors
 
 /* Copies all the cases in READER to WRITER, propagating errors
-   appropriately. */
+   appropriately. READER is destroyed by this function */
 void
 casereader_transfer (struct casereader *reader, struct casewriter *writer)
 {
 void
 casereader_transfer (struct casereader *reader, struct casewriter *writer)
 {
-  struct ccase c;
+  struct ccase *c;
 
   taint_propagate (casereader_get_taint (reader),
                    casewriter_get_taint (writer));
 
   taint_propagate (casereader_get_taint (reader),
                    casewriter_get_taint (writer));
-  while (casereader_read (reader, &c))
-    casewriter_write (writer, &c);
+  while ((c = casereader_read (reader)) != NULL)
+    casewriter_write (writer, c);
   casereader_destroy (reader);
 }
 
   casereader_destroy (reader);
 }
 
@@ -286,8 +342,9 @@ casereader_transfer (struct casereader *reader, struct casewriter *writer)
    function, in which case the cloned casereader should have the
    same taint object as the original casereader.)
 
    function, in which case the cloned casereader should have the
    same taint object as the original casereader.)
 
-   VALUE_CNT must be the number of struct values per case read
-   from the casereader.
+   PROTO must be the prototype for the cases that may be read
+   from the casereader.  The caller retains its reference to
+   PROTO.
 
    CASE_CNT is an upper limit on the number of cases that
    casereader_read will return from the casereader in successive
 
    CASE_CNT is an upper limit on the number of cases that
    casereader_read will return from the casereader in successive
@@ -300,17 +357,38 @@ casereader_transfer (struct casereader *reader, struct casewriter *writer)
    functions, respectively. */
 struct casereader *
 casereader_create_sequential (const struct taint *taint,
    functions, respectively. */
 struct casereader *
 casereader_create_sequential (const struct taint *taint,
-                              size_t value_cnt, casenumber case_cnt,
-                              const struct casereader_class *class, void *aux) 
+                              const struct caseproto *proto,
+                              casenumber case_cnt,
+                              const struct casereader_class *class, void *aux)
 {
   struct casereader *reader = xmalloc (sizeof *reader);
   reader->taint = taint != NULL ? taint_clone (taint) : taint_create ();
 {
   struct casereader *reader = xmalloc (sizeof *reader);
   reader->taint = taint != NULL ? taint_clone (taint) : taint_create ();
-  reader->value_cnt = value_cnt;
+  reader->proto = caseproto_ref (proto);
   reader->case_cnt = case_cnt;
   reader->class = class;
   reader->aux = aux;
   return reader;
 }
   reader->case_cnt = case_cnt;
   reader->class = class;
   reader->aux = aux;
   return reader;
 }
+
+/* If READER is a casereader of the given CLASS, returns its
+   associated auxiliary data; otherwise, returns a null pointer.
+
+   This function is intended for use from casereader
+   implementations, not by casereader users.  Even within
+   casereader implementations, its usefulness is quite limited,
+   for at least two reasons.  First, every casereader member
+   function already receives a pointer to the casereader's
+   auxiliary data.  Second, a casereader's class can change
+   (through a call to casereader_swap) and this is in practice
+   quite common (e.g. any call to casereader_clone on a
+   casereader that does not directly support clone will cause the
+   casereader to be replaced by a shim caseader). */
+void *
+casereader_dynamic_cast (struct casereader *reader,
+                         const struct casereader_class *class)
+{
+  return reader->class == class ? reader->aux : NULL;
+}
 \f
 /* Random-access casereader implementation.
 
 \f
 /* Random-access casereader implementation.
 
@@ -330,13 +408,13 @@ struct random_reader
 /* Returns the random_reader in which the given heap_node is
    embedded. */
 static struct random_reader *
 /* Returns the random_reader in which the given heap_node is
    embedded. */
 static struct random_reader *
-random_reader_from_heap_node (const struct heap_node *node) 
+random_reader_from_heap_node (const struct heap_node *node)
 {
   return heap_data (node, struct random_reader, heap_node);
 }
 
 /* Data shared among clones of a random reader. */
 {
   return heap_data (node, struct random_reader, heap_node);
 }
 
 /* Data shared among clones of a random reader. */
-struct random_reader_shared 
+struct random_reader_shared
   {
     struct heap *readers;       /* Heap of struct random_readers. */
     casenumber min_offset;      /* Smallest offset of any random_reader. */
   {
     struct heap *readers;       /* Heap of struct random_readers. */
     casenumber min_offset;      /* Smallest offset of any random_reader. */
@@ -344,7 +422,7 @@ struct random_reader_shared
     void *aux;
   };
 
     void *aux;
   };
 
-static struct casereader_class random_reader_casereader_class;
+static const struct casereader_class random_reader_casereader_class;
 
 /* Creates and returns a new random_reader with the given SHARED
    data and OFFSET.  Inserts the new random reader into the
 
 /* Creates and returns a new random_reader with the given SHARED
    data and OFFSET.  Inserts the new random reader into the
@@ -364,7 +442,7 @@ make_random_reader (struct random_reader_shared *shared, casenumber offset)
 static int
 compare_random_readers_by_offset (const struct heap_node *a_,
                                   const struct heap_node *b_,
 static int
 compare_random_readers_by_offset (const struct heap_node *a_,
                                   const struct heap_node *b_,
-                                  const void *aux UNUSED) 
+                                  const void *aux UNUSED)
 {
   const struct random_reader *a = random_reader_from_heap_node (a_);
   const struct random_reader *b = random_reader_from_heap_node (b_);
 {
   const struct random_reader *a = random_reader_from_heap_node (a_);
   const struct random_reader *b = random_reader_from_heap_node (b_);
@@ -380,8 +458,9 @@ compare_random_readers_by_offset (const struct heap_node *a_,
    casereader_create_sequential is more appropriate for a data
    source that is naturally sequential.
 
    casereader_create_sequential is more appropriate for a data
    source that is naturally sequential.
 
-   VALUE_CNT must be the number of struct values per case read
-   from the casereader.
+   PROTO must be the prototype for the cases that may be read
+   from the casereader.  The caller retains its reference to
+   PROTO.
 
    CASE_CNT is an upper limit on the number of cases that
    casereader_read will return from the casereader in successive
 
    CASE_CNT is an upper limit on the number of cases that
    casereader_read will return from the casereader in successive
@@ -393,16 +472,16 @@ compare_random_readers_by_offset (const struct heap_node *a_,
    member functions and auxiliary data to pass to those member
    functions, respectively. */
 struct casereader *
    member functions and auxiliary data to pass to those member
    functions, respectively. */
 struct casereader *
-casereader_create_random (size_t value_cnt, casenumber case_cnt,
+casereader_create_random (const struct caseproto *proto, casenumber case_cnt,
                           const struct casereader_random_class *class,
                           const struct casereader_random_class *class,
-                          void *aux) 
+                          void *aux)
 {
   struct random_reader_shared *shared = xmalloc (sizeof *shared);
   shared->readers = heap_create (compare_random_readers_by_offset, NULL);
   shared->class = class;
   shared->aux = aux;
   shared->min_offset = 0;
 {
   struct random_reader_shared *shared = xmalloc (sizeof *shared);
   shared->readers = heap_create (compare_random_readers_by_offset, NULL);
   shared->class = class;
   shared->aux = aux;
   shared->min_offset = 0;
-  return casereader_create_sequential (NULL, value_cnt, case_cnt,
+  return casereader_create_sequential (NULL, proto, case_cnt,
                                        &random_reader_casereader_class,
                                        make_random_reader (shared, 0));
 }
                                        &random_reader_casereader_class,
                                        make_random_reader (shared, 0));
 }
@@ -411,7 +490,7 @@ casereader_create_random (size_t value_cnt, casenumber case_cnt,
    offset in the heap.   */
 static void
 advance_random_reader (struct casereader *reader,
    offset in the heap.   */
 static void
 advance_random_reader (struct casereader *reader,
-                       struct random_reader_shared *shared) 
+                       struct random_reader_shared *shared)
 {
   casenumber old, new;
 
 {
   casenumber old, new;
 
@@ -426,34 +505,32 @@ advance_random_reader (struct casereader *reader,
 }
 
 /* struct casereader_class "read" function for random reader. */
 }
 
 /* struct casereader_class "read" function for random reader. */
-static bool
-random_reader_read (struct casereader *reader, void *br_, struct ccase *c)
+static struct ccase *
+random_reader_read (struct casereader *reader, void *br_)
 {
   struct random_reader *br = br_;
   struct random_reader_shared *shared = br->shared;
 {
   struct random_reader *br = br_;
   struct random_reader_shared *shared = br->shared;
-
-  if (shared->class->read (reader, shared->aux,
-                           br->offset - shared->min_offset, c)) 
+  struct ccase *c = shared->class->read (reader, shared->aux,
+                                         br->offset - shared->min_offset);
+  if (c != NULL)
     {
       br->offset++;
       heap_changed (shared->readers, &br->heap_node);
     {
       br->offset++;
       heap_changed (shared->readers, &br->heap_node);
-      advance_random_reader (reader, shared); 
-      return true;
+      advance_random_reader (reader, shared);
     }
     }
-  else
-    return false; 
+  return c;
 }
 
 /* struct casereader_class "destroy" function for random
    reader. */
 static void
 }
 
 /* struct casereader_class "destroy" function for random
    reader. */
 static void
-random_reader_destroy (struct casereader *reader, void *br_) 
+random_reader_destroy (struct casereader *reader, void *br_)
 {
   struct random_reader *br = br_;
   struct random_reader_shared *shared = br->shared;
 
   heap_delete (shared->readers, &br->heap_node);
 {
   struct random_reader *br = br_;
   struct random_reader_shared *shared = br->shared;
 
   heap_delete (shared->readers, &br->heap_node);
-  if (heap_is_empty (shared->readers)) 
+  if (heap_is_empty (shared->readers))
     {
       heap_destroy (shared->readers);
       shared->class->destroy (reader, shared->aux);
     {
       heap_destroy (shared->readers);
       shared->class->destroy (reader, shared->aux);
@@ -467,12 +544,12 @@ random_reader_destroy (struct casereader *reader, void *br_)
 
 /* struct casereader_class "clone" function for random reader. */
 static struct casereader *
 
 /* struct casereader_class "clone" function for random reader. */
 static struct casereader *
-random_reader_clone (struct casereader *reader, void *br_) 
+random_reader_clone (struct casereader *reader, void *br_)
 {
   struct random_reader *br = br_;
   struct random_reader_shared *shared = br->shared;
   return casereader_create_sequential (casereader_get_taint (reader),
 {
   struct random_reader *br = br_;
   struct random_reader_shared *shared = br->shared;
   return casereader_create_sequential (casereader_get_taint (reader),
-                                       casereader_get_value_cnt (reader),
+                                       reader->proto,
                                        casereader_get_case_cnt (reader),
                                        &random_reader_casereader_class,
                                        make_random_reader (shared,
                                        casereader_get_case_cnt (reader),
                                        &random_reader_casereader_class,
                                        make_random_reader (shared,
@@ -480,19 +557,18 @@ random_reader_clone (struct casereader *reader, void *br_)
 }
 
 /* struct casereader_class "peek" function for random reader. */
 }
 
 /* struct casereader_class "peek" function for random reader. */
-static bool
-random_reader_peek (struct casereader *reader, void *br_,
-                    casenumber idx, struct ccase *c) 
+static struct ccase *
+random_reader_peek (struct casereader *reader, void *br_, casenumber idx)
 {
   struct random_reader *br = br_;
   struct random_reader_shared *shared = br->shared;
 
   return shared->class->read (reader, shared->aux,
 {
   struct random_reader *br = br_;
   struct random_reader_shared *shared = br->shared;
 
   return shared->class->read (reader, shared->aux,
-                              br->offset - shared->min_offset + idx, c);
+                              br->offset - shared->min_offset + idx);
 }
 
 /* Casereader class for random reader. */
 }
 
 /* Casereader class for random reader. */
-static struct casereader_class random_reader_casereader_class = 
+static const struct casereader_class random_reader_casereader_class =
   {
     random_reader_read,
     random_reader_destroy,
   {
     random_reader_read,
     random_reader_destroy,
@@ -500,106 +576,42 @@ static struct casereader_class random_reader_casereader_class =
     random_reader_peek,
   };
 \f
     random_reader_peek,
   };
 \f
-/* Buffering shim for implementing clone and peek operations.
-
-   The "clone" and "peek" operations aren't implemented by all
-   types of casereaders, but we have to expose a uniform
-   interface anyhow.  We do this by interposing a buffering
-   casereader on top of the existing casereader on the first call
-   to "clone" or "peek".  The buffering casereader maintains a
-   window of cases that spans the positions of the original
-   casereader and all of its clones (the "clone set"), from the
-   position of the casereader that has read the fewest cases to
-   the position of the casereader that has read the most.  
-
-   Thus, if all of the casereaders in the clone set are at
-   approximately the same position, only a few cases are buffered
-   and there is little inefficiency.  If, on the other hand, one
-   casereader is not used to read any cases at all, but another
-   one is used to read all of the cases, the entire contents of
-   the casereader is copied into the buffer.  This still might
-   not be so inefficient, given that case data in memory is
-   shared across multiple identical copies, but in the worst case
-   the window implementation will write cases to disk instead of
-   maintaining them in-memory. */
-
-/* A buffering shim for a non-clonable or non-peekable
-   casereader. */
-struct shim 
-  {
-    struct casewindow *window;          /* Window of buffered cases. */
-    struct casereader *subreader;       /* Subordinate casereader. */
-  };
-
-static struct casereader_random_class shim_class;
+\f
+static const struct casereader_class casereader_null_class;
 
 
-/* Interposes a buffering shim atop READER. */
-static void
-insert_shim (struct casereader *reader) 
+/* Returns a casereader with no cases.  The casereader has the prototype
+   specified by PROTO.  PROTO may be specified as a null pointer, in which case
+   the casereader has no variables. */
+struct casereader *
+casereader_create_empty (const struct caseproto *proto_)
 {
 {
-  size_t value_cnt = casereader_get_value_cnt (reader);
-  casenumber case_cnt = casereader_get_case_cnt (reader);
-  struct shim *b = xmalloc (sizeof *b);
-  b->window = casewindow_create (value_cnt, get_workspace_cases (value_cnt));
-  b->subreader = casereader_create_random (value_cnt, case_cnt,
-                                           &shim_class, b);
-  casereader_swap (reader, b->subreader);
-  taint_propagate (casewindow_get_taint (b->window),
-                   casereader_get_taint (reader));
-  taint_propagate (casereader_get_taint (b->subreader),
-                   casereader_get_taint (reader));
-}
+  struct casereader *reader;
+  struct caseproto *proto;
 
 
-/* Ensures that B's window contains at least CASE_CNT cases.
-   Return true if successful, false upon reaching the end of B's
-   subreader or an I/O error. */
-static bool
-prime_buffer (struct shim *b, casenumber case_cnt) 
-{
-  while (casewindow_get_case_cnt (b->window) < case_cnt) 
-    {
-      struct ccase tmp;
-      if (!casereader_read (b->subreader, &tmp))
-        return false;
-      casewindow_push_head (b->window, &tmp);
-    }
-  return true;
-}
+  proto = proto_ != NULL ? caseproto_ref (proto_) : caseproto_create ();
+  reader = casereader_create_sequential (NULL, proto, 0,
+                                         &casereader_null_class, NULL);
+  caseproto_unref (proto);
 
 
-/* Reads the case at the given 0-based OFFSET from the front of
-   the window into C.  Returns true if successful, false if
-   OFFSET is beyond the end of file or upon I/O error. */
-static bool
-shim_read (struct casereader *reader UNUSED, void *b_,
-           casenumber offset, struct ccase *c) 
-{
-  struct shim *b = b_;
-  return (prime_buffer (b, offset + 1)
-          && casewindow_get_case (b->window, offset, c));
+  return reader;
 }
 
 }
 
-/* Destroys B. */
-static void
-shim_destroy (struct casereader *reader UNUSED, void *b_) 
+static struct ccase *
+casereader_null_read (struct casereader *reader UNUSED, void *aux UNUSED)
 {
 {
-  struct shim *b = b_;
-  casewindow_destroy (b->window);
-  casereader_destroy (b->subreader);
-  free (b);
+  return NULL;
 }
 
 }
 
-/* Discards CNT cases from the front of B's window. */
 static void
 static void
-shim_advance (struct casereader *reader UNUSED, void *b_, casenumber case_cnt)
+casereader_null_destroy (struct casereader *reader UNUSED, void *aux UNUSED)
 {
 {
-  struct shim *b = b_;
-  casewindow_pop_tail (b->window, case_cnt);
+  /* Nothing to do. */
 }
 
 }
 
-/* Class for the buffered reader. */
-static struct casereader_random_class shim_class = 
+static const struct casereader_class casereader_null_class =
   {
   {
-    shim_read,
-    shim_destroy,
-    shim_advance,
+    casereader_null_read,
+    casereader_null_destroy,
+    NULL,                       /* clone */
+    NULL,                       /* peek */
   };
   };