src/data/casereader-project.c \
        src/data/casereader-provider.h \
        src/data/casereader-select.c \
+       src/data/casereader-shim.c \
+       src/data/casereader-shim.h \
        src/data/casereader-translator.c \
        src/data/casereader.c \
        src/data/casereader.h \
 
--- /dev/null
+/* PSPP - a program for statistical analysis.
+   Copyright (C) 2007, 2009, 2010 Free Software Foundation, Inc.
+
+   This program is free software: you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation, either version 3 of the License, or
+   (at your option) any later version.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program.  If not, see <http://www.gnu.org/licenses/>. */
+
+#include <config.h>
+
+#include "data/casereader-shim.h"
+
+#include <stdlib.h>
+
+#include "data/casereader.h"
+#include "data/casereader-provider.h"
+#include "data/casewindow.h"
+#include "data/settings.h"
+#include "libpspp/taint.h"
+
+#include "gl/xalloc.h"
+
+/* A buffering shim casereader. */
+struct casereader_shim
+  {
+    struct casewindow *window;          /* Window of buffered cases. */
+    struct casereader *subreader;       /* Subordinate casereader. */
+  };
+
+static const struct casereader_random_class shim_class;
+
+static bool buffer_case (struct casereader_shim *s);
+
+/* Interposes a buffering shim on READER.
+
+   Returns the new shim.  The only legitimate use of the returned
+   casereader_shim is for calling casereader_shim_slurp().  If READER has no
+   clones already (which the caller should ensure, if it plans to use the
+   return value), then the returned casreader_shim is valid for that purpose
+   until, and only until, the READER's 'destroy' function is called. */
+struct casereader_shim *
+casereader_shim_insert (struct casereader *reader)
+{
+  const struct caseproto *proto = casereader_get_proto (reader);
+  casenumber case_cnt = casereader_get_case_cnt (reader);
+  struct casereader_shim *s = xmalloc (sizeof *s);
+  s->window = casewindow_create (proto, settings_get_workspace_cases (proto));
+  s->subreader = casereader_create_random (proto, case_cnt, &shim_class, s);
+  casereader_swap (reader, s->subreader);
+  taint_propagate (casewindow_get_taint (s->window),
+                   casereader_get_taint (reader));
+  taint_propagate (casereader_get_taint (s->subreader),
+                   casereader_get_taint (reader));
+  return s;
+}
+
+/* Reads all of the cases from S's subreader into S's buffer and destroys S's
+   subreader.  (This is a no-op if the subreader has already been
+   destroyed.)
+
+   Refer to the comment on casereader_shim_insert() for information on when
+   this function may be used. */
+void
+casereader_shim_slurp (struct casereader_shim *s)
+{
+  while (buffer_case (s))
+    continue;
+}
+
+/* Reads a case from S's subreader and appends it to S's window.  Returns true
+   if successful, false at the end of S's subreader or upon an I/O error. */
+static bool
+buffer_case (struct casereader_shim *s)
+{
+  struct ccase *tmp;
+
+  if (s->subreader == NULL)
+    return false;
+
+  tmp = casereader_read (s->subreader);
+  if (tmp == NULL)
+    {
+      casereader_destroy (s->subreader);
+      s->subreader = NULL;
+      return false;
+    }
+
+  casewindow_push_head (s->window, tmp);
+  return true;
+}
+
+/* Reads the case at the given 0-based OFFSET from the front of the window into
+   C.  Returns the case if successful, or a null pointer if OFFSET is beyond
+   the end of file or upon I/O error.  The caller must call case_unref() on the
+   returned case when it is no longer needed. */
+static struct ccase *
+casereader_shim_read (struct casereader *reader UNUSED, void *s_,
+                      casenumber offset)
+{
+  struct casereader_shim *s = s_;
+
+  while (casewindow_get_case_cnt (s->window) <= offset)
+    if (!buffer_case (s))
+      return false;
+
+  return casewindow_get_case (s->window, offset);
+}
+
+/* Destroys S. */
+static void
+casereader_shim_destroy (struct casereader *reader UNUSED, void *s_)
+{
+  struct casereader_shim *s = s_;
+  casewindow_destroy (s->window);
+  casereader_destroy (s->subreader);
+  free (s);
+}
+
+/* Discards CNT cases from the front of S's window. */
+static void
+casereader_shim_advance (struct casereader *reader UNUSED, void *s_,
+                         casenumber case_cnt)
+{
+  struct casereader_shim *s = s_;
+  casewindow_pop_tail (s->window, case_cnt);
+}
+
+/* Class for the buffered reader. */
+static const struct casereader_random_class shim_class =
+  {
+    casereader_shim_read,
+    casereader_shim_destroy,
+    casereader_shim_advance,
+  };
 
--- /dev/null
+/* PSPP - a program for statistical analysis.
+   Copyright (C) 2007, 2009, 2010 Free Software Foundation, Inc.
+
+   This program is free software: you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation, either version 3 of the License, or
+   (at your option) any later version.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program.  If not, see <http://www.gnu.org/licenses/>. */
+
+#ifndef DATA_CASEREADER_SHIM_H
+#define DATA_CASEREADER_SHIM_H 1
+
+/* Buffering shim for implementing clone and peek operations.
+
+   The "clone" and "peek" operations aren't implemented by all types of
+   casereaders, but we have to expose a uniform interface anyhow.  The
+   casereader buffering shim can do this by interposing a buffer on top of an
+   existing casereader.  The shim maintains a window of cases that spans the
+   positions of the original casereader and all of its clones (the "clone
+   set"), from the position of the casereader that has read the fewest cases to
+   the position of the casereader that has read the most.
+
+   Thus, if all of the casereaders in the clone set are at approximately the
+   same position, only a few cases are buffered and there is little
+   inefficiency.  If, on the other hand, one casereader is not used to read any
+   cases at all, but another one is used to read all of the cases, the entire
+   contents of the casereader is copied into the buffer.  This still might not
+   be so inefficient, given that case data in memory is shared across multiple
+   identical copies, but in the worst case the window implementation will write
+   cases to disk instead of maintaining them in-memory.
+
+   Casereader buffering shims are inserted automatically on the first call to
+   casereader_clone() or casereader_peek() for a casereader that does not
+   support those operations natively.  Thus, there is ordinarily little reason
+   to intentionally insert a shim. */
+
+struct casereader;
+struct casereader_shim *casereader_shim_insert (struct casereader *);
+void casereader_shim_slurp (struct casereader_shim *);
+
+#endif /* data/casereader-shim.h */
 
 /* PSPP - a program for statistical analysis.
-   Copyright (C) 2007, 2009 Free Software Foundation, Inc.
+   Copyright (C) 2007, 2009, 2010 Free Software Foundation, Inc.
 
    This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
 
 #include <config.h>
 
-#include <data/casereader.h>
-#include <data/casereader-provider.h>
+#include "data/casereader.h"
+#include "data/casereader-provider.h"
 
 #include <stdlib.h>
 
-#include <data/casewindow.h>
-#include <data/casewriter.h>
-#include <data/settings.h>
-#include <libpspp/assertion.h>
-#include <libpspp/heap.h>
-#include <libpspp/taint.h>
+#include "data/casereader-shim.h"
+#include "data/casewriter.h"
+#include "libpspp/assertion.h"
+#include "libpspp/heap.h"
+#include "libpspp/taint.h"
 
-#include "xalloc.h"
+#include "gl/xalloc.h"
 
 /* A casereader. */
 struct casereader
     void *aux;                            /* Auxiliary data for class. */
   };
 
-static void insert_shim (struct casereader *);
-
 /* Reads and returns the next case from READER.  The caller owns
    the returned case and must call case_unref on it when its data
    is no longer needed.  Returns a null pointer if cases have
     return NULL;
 
   if (reader->class->clone == NULL)
-    insert_shim (reader);
+    casereader_shim_insert (reader);
   clone = reader->class->clone (reader, reader->aux);
   assert (clone != NULL);
   assert (clone != reader);
     {
       struct ccase *c;
       if (reader->class->peek == NULL)
-        insert_shim (reader);
+        casereader_shim_insert (reader);
       c = reader->class->peek (reader, reader->aux, idx);
       if (c != NULL)
         return c;
     random_reader_peek,
   };
 \f
-/* Buffering shim for implementing clone and peek operations.
-
-   The "clone" and "peek" operations aren't implemented by all
-   types of casereaders, but we have to expose a uniform
-   interface anyhow.  We do this by interposing a buffering
-   casereader on top of the existing casereader on the first call
-   to "clone" or "peek".  The buffering casereader maintains a
-   window of cases that spans the positions of the original
-   casereader and all of its clones (the "clone set"), from the
-   position of the casereader that has read the fewest cases to
-   the position of the casereader that has read the most.
-
-   Thus, if all of the casereaders in the clone set are at
-   approximately the same position, only a few cases are buffered
-   and there is little inefficiency.  If, on the other hand, one
-   casereader is not used to read any cases at all, but another
-   one is used to read all of the cases, the entire contents of
-   the casereader is copied into the buffer.  This still might
-   not be so inefficient, given that case data in memory is
-   shared across multiple identical copies, but in the worst case
-   the window implementation will write cases to disk instead of
-   maintaining them in-memory. */
-
-/* A buffering shim for a non-clonable or non-peekable
-   casereader. */
-struct shim
-  {
-    struct casewindow *window;          /* Window of buffered cases. */
-    struct casereader *subreader;       /* Subordinate casereader. */
-  };
-
-static const struct casereader_random_class shim_class;
-
-/* Interposes a buffering shim atop READER. */
-static void
-insert_shim (struct casereader *reader)
-{
-  const struct caseproto *proto = casereader_get_proto (reader);
-  casenumber case_cnt = casereader_get_case_cnt (reader);
-  struct shim *b = xmalloc (sizeof *b);
-  b->window = casewindow_create (proto, settings_get_workspace_cases (proto));
-  b->subreader = casereader_create_random (proto, case_cnt, &shim_class, b);
-  casereader_swap (reader, b->subreader);
-  taint_propagate (casewindow_get_taint (b->window),
-                   casereader_get_taint (reader));
-  taint_propagate (casereader_get_taint (b->subreader),
-                   casereader_get_taint (reader));
-}
-
-/* Ensures that B's window contains at least CASE_CNT cases.
-   Return true if successful, false upon reaching the end of B's
-   subreader or an I/O error. */
-static bool
-prime_buffer (struct shim *b, casenumber case_cnt)
-{
-  while (casewindow_get_case_cnt (b->window) < case_cnt)
-    {
-      struct ccase *tmp = casereader_read (b->subreader);
-      if (tmp == NULL)
-        return false;
-      casewindow_push_head (b->window, tmp);
-    }
-  return true;
-}
-
-/* Reads the case at the given 0-based OFFSET from the front of
-   the window into C.  Returns the case if successful, or a null
-   pointer if OFFSET is beyond the end of file or upon I/O error.
-   The caller must call case_unref() on the returned case when it
-   is no longer needed. */
-static struct ccase *
-shim_read (struct casereader *reader UNUSED, void *b_,
-           casenumber offset)
-{
-  struct shim *b = b_;
-  if (!prime_buffer (b, offset + 1))
-    return NULL;
-  return casewindow_get_case (b->window, offset);
-}
-
-/* Destroys B. */
-static void
-shim_destroy (struct casereader *reader UNUSED, void *b_)
-{
-  struct shim *b = b_;
-  casewindow_destroy (b->window);
-  casereader_destroy (b->subreader);
-  free (b);
-}
-
-/* Discards CNT cases from the front of B's window. */
-static void
-shim_advance (struct casereader *reader UNUSED, void *b_, casenumber case_cnt)
-{
-  struct shim *b = b_;
-  casewindow_pop_tail (b->window, case_cnt);
-}
-
-/* Class for the buffered reader. */
-static const struct casereader_random_class shim_class =
-  {
-    shim_read,
-    shim_destroy,
-    shim_advance,
-  };
 \f
 static const struct casereader_class casereader_null_class;