Rewrite expression code.
[pspp-builds.git] / src / casefile.c
index 9c889b64226e726e88917e98f5024b5509022059..97929c570eaaa81b655cba62f6a2c32dacdfacd7 100644 (file)
 #include <string.h>
 #include <unistd.h>
 #include "alloc.h"
+#include "case.h"
 #include "error.h"
 #include "misc.h"
+#include "mkfile.h"
+#include "settings.h"
 #include "var.h"
-#include "workspace.h"
 
-#define IO_BUF_SIZE 8192
+#ifdef HAVE_VALGRIND_VALGRIND_H
+#include <valgrind/valgrind.h>
+#endif
+
+#define IO_BUF_SIZE (8192 / sizeof (union value))
 
 /* A casefile is a sequentially accessible array of immutable
    cases.  It may be stored in memory or on disk as workspace
    file.  Once any cases have been read, no more cases may be
    appended.  The entire file is discarded at once. */
 
+/* In-memory cases are arranged in an array of arrays.  The top
+   level is variable size and the size of each bottom level array
+   is fixed at the number of cases defined here.  */
+#define CASES_PER_BLOCK 128             
+
 /* A casefile. */
 struct casefile 
   {
     /* Basic data. */
     struct casefile *next, *prev;       /* Next, prev in global list. */
-    size_t case_size;                   /* Case size in bytes. */
-    size_t case_list_size;              /* Bytes to allocate for case_lists. */
+    size_t value_cnt;                   /* Case size in `union value's. */
+    size_t case_acct_size;              /* Case size for accounting. */
     unsigned long case_cnt;             /* Number of cases stored. */
     enum { MEMORY, DISK } storage;      /* Where cases are stored. */
     enum { WRITE, READ } mode;          /* Is writing or reading allowed? */
     struct casereader *readers;         /* List of our readers. */
+    int being_destroyed;                /* Does a destructive reader exist? */
 
     /* Memory storage. */
-    struct case_list *head, *tail;      /* Beginning, end of list of cases. */
+    struct ccase **cases;               /* Pointer to array of cases. */
 
     /* Disk storage. */
     int fd;                             /* File descriptor, -1 if none. */
     char *filename;                     /* Filename. */
-    unsigned char *buffer;              /* I/O buffer, NULL if none. */
-    size_t buffer_used;                 /* Number of bytes used in buffer. */
-    size_t buffer_size;                 /* Buffer size in bytes. */
+    union value *buffer;                /* I/O buffer, NULL if none. */
+    size_t buffer_used;                 /* Number of values used in buffer. */
+    size_t buffer_size;                 /* Buffer size in values. */
   };
 
-/* For reading out the casing in a casefile. */
+/* For reading out the cases in a casefile. */
 struct casereader 
   {
     struct casereader *next, *prev;     /* Next, prev in casefile's list. */
     struct casefile *cf;                /* Our casefile. */
     unsigned long case_idx;             /* Case number of current case. */
-
-    /* Memory storage. */
-    struct case_list *cur;              /* Current case. */
+    int destructive;                    /* Is this a destructive reader? */
 
     /* Disk storage. */
     int fd;                             /* File descriptor. */
-    unsigned char *buffer;              /* I/O buffer. */
-    size_t buffer_pos;                  /* Byte offset of buffer position. */
+    union value *buffer;                /* I/O buffer. */
+    size_t buffer_pos;                  /* Offset of buffer position. */
+    struct ccase c;                     /* Current case. */
   };
 
-struct casefile *casefiles;
+/* Return the case number of the current case */
+unsigned long
+casereader_cnum(const struct casereader *r)
+{
+  return r->case_idx;
+}
+
+/* Doubly linked list of all casefiles. */
+static struct casefile *casefiles;
+
+/* Number of bytes of case allocated in in-memory casefiles. */
+static size_t case_bytes;
 
 static void register_atexit (void);
 static void exit_handler (void);
@@ -92,12 +114,13 @@ static void fill_buffer (struct casereader *reader);
 
 static int safe_open (const char *filename, int flags);
 static int safe_close (int fd);
-static int full_read (int fd, char *buffer, size_t size);
-static int full_write (int fd, const char *buffer, size_t size);
+static int full_read (int fd, void *buffer, size_t size);
+static int full_write (int fd, const void *buffer, size_t size);
 
-/* Creates and returns a casefile to store cases of CASE_SIZE bytes each. */
+/* Creates and returns a casefile to store cases of VALUE_CNT
+   `union value's each. */
 struct casefile *
-casefile_create (size_t case_size
+casefile_create (size_t value_cnt
 {
   struct casefile *cf = xmalloc (sizeof *cf);
   cf->next = casefiles;
@@ -105,19 +128,20 @@ casefile_create (size_t case_size)
   if (cf->next != NULL)
     cf->next->prev = cf;
   casefiles = cf;
-  cf->case_size = case_size;
-  cf->case_list_size = sizeof *cf->head + case_size - sizeof *cf->head->c.data;
+  cf->value_cnt = value_cnt;
+  cf->case_acct_size = (cf->value_cnt + 4) * sizeof *cf->buffer;
   cf->case_cnt = 0;
   cf->storage = MEMORY;
   cf->mode = WRITE;
   cf->readers = NULL;
-  cf->head = cf->tail = NULL;
+  cf->being_destroyed = 0;
+  cf->cases = NULL;
   cf->fd = -1;
   cf->filename = NULL;
   cf->buffer = NULL;
-  cf->buffer_size = ROUND_UP (case_size, IO_BUF_SIZE);
-  if (case_size > 0 && cf->buffer_size % case_size > 512)
-    cf->buffer_size = case_size;
+  cf->buffer_size = ROUND_UP (cf->value_cnt, IO_BUF_SIZE);
+  if (cf->value_cnt > 0 && cf->buffer_size % cf->value_cnt > 64)
+    cf->buffer_size = cf->value_cnt;
   cf->buffer_used = 0;
   register_atexit ();
   return cf;
@@ -129,8 +153,6 @@ casefile_destroy (struct casefile *cf)
 {
   if (cf != NULL) 
     {
-      struct case_list *iter, *next;
-
       if (cf->next != NULL)
         cf->next->prev = cf->prev;
       if (cf->prev != NULL)
@@ -141,10 +163,24 @@ casefile_destroy (struct casefile *cf)
       while (cf->readers != NULL) 
         casereader_destroy (cf->readers);
 
-      for (iter = cf->head; iter != NULL; iter = next
+      if (cf->cases != NULL
         {
-          next = iter->next;
-          workspace_free (iter, cf->case_list_size);
+          size_t idx, block_cnt;
+
+          case_bytes -= cf->case_cnt * cf->case_acct_size;
+          for (idx = 0; idx < cf->case_cnt; idx++)
+            {
+              size_t block_idx = idx / CASES_PER_BLOCK;
+              size_t case_idx = idx % CASES_PER_BLOCK;
+              struct ccase *c = &cf->cases[block_idx][case_idx];
+              case_destroy (c);
+            }
+
+          block_cnt = DIV_RND_UP (cf->case_cnt, CASES_PER_BLOCK);
+          for (idx = 0; idx < block_cnt; idx++)
+            free (cf->cases[idx]);
+
+          free (cf->cases);
         }
 
       if (cf->fd != -1)
@@ -153,6 +189,7 @@ casefile_destroy (struct casefile *cf)
       if (cf->filename != NULL && remove (cf->filename) == -1) 
         msg (ME, _("%s: Removing temporary file: %s."),
              cf->filename, strerror (errno));
+      free (cf->filename);
 
       free (cf->buffer);
 
@@ -170,13 +207,45 @@ casefile_in_core (const struct casefile *cf)
   return cf->storage == MEMORY;
 }
 
-/* Returns the number of bytes in a case for CF. */
+/* Puts a casefile to "sleep", that is, minimizes the resources
+   needed for it by closing its file descriptor and freeing its
+   buffer.  This is useful if we need so many casefiles that we
+   might not have enough memory and file descriptors to go
+   around.
+
+   For simplicity, this implementation always converts the
+   casefile to reader mode.  If this turns out to be a problem,
+   with a little extra work we could also support sleeping
+   writers. */
+void
+casefile_sleep (const struct casefile *cf_) 
+{
+  struct casefile *cf = (struct casefile *) cf_;
+  assert (cf != NULL);
+
+  casefile_mode_reader (cf);
+  casefile_to_disk (cf);
+  flush_buffer (cf);
+
+  if (cf->fd != -1) 
+    {
+      safe_close (cf->fd);
+      cf->fd = -1;
+    }
+  if (cf->buffer != NULL) 
+    {
+      free (cf->buffer);
+      cf->buffer = NULL;
+    }
+}
+
+/* Returns the number of `union value's in a case for CF. */
 size_t
-casefile_get_case_size (const struct casefile *cf) 
+casefile_get_value_cnt (const struct casefile *cf) 
 {
   assert (cf != NULL);
 
-  return cf->case_size;
+  return cf->value_cnt;
 }
 
 /* Returns the number of cases in casefile CF. */
@@ -188,8 +257,8 @@ casefile_get_case_cnt (const struct casefile *cf)
   return cf->case_cnt;
 }
 
-/* Appends case C to casefile CF.  Not valid after any reader for CF has been
-   created. */
+/* Appends a copy of case C to casefile CF.  Not valid after any
+   reader for CF has been created. */
 void
 casefile_append (struct casefile *cf, const struct ccase *c) 
 {
@@ -197,23 +266,31 @@ casefile_append (struct casefile *cf, const struct ccase *c)
   assert (c != NULL);
   assert (cf->mode == WRITE);
 
-  cf->case_cnt++;
-
   /* Try memory first. */
   if (cf->storage == MEMORY) 
     {
-      struct case_list *new_case;
-
-      new_case = workspace_malloc (cf->case_list_size);
-      if (new_case != NULL) 
+      if (case_bytes < get_max_workspace ())
         {
-          memcpy (&new_case->c, c, cf->case_size);
-          new_case->next = NULL;
-          if (cf->head != NULL)
-            cf->tail->next = new_case;
-          else
-            cf->head = new_case;
-          cf->tail = new_case;
+          size_t block_idx = cf->case_cnt / CASES_PER_BLOCK;
+          size_t case_idx = cf->case_cnt % CASES_PER_BLOCK;
+          struct ccase new_case;
+
+          case_bytes += cf->case_acct_size;
+          case_clone (&new_case, c);
+          if (case_idx == 0) 
+            {
+              if ((block_idx & (block_idx - 1)) == 0) 
+                {
+                  size_t block_cap = block_idx == 0 ? 1 : block_idx * 2;
+                  cf->cases = xrealloc (cf->cases,
+                                        sizeof *cf->cases * block_cap);
+                }
+
+              cf->cases[block_idx] = xmalloc (sizeof **cf->cases
+                                              * CASES_PER_BLOCK);
+            }
+
+          case_move (&cf->cases[block_idx][case_idx], &new_case);
         }
       else
         {
@@ -224,6 +301,17 @@ casefile_append (struct casefile *cf, const struct ccase *c)
     }
   else
     write_case_to_disk (cf, c);
+
+  cf->case_cnt++;
+}
+
+/* Appends case C to casefile CF, which takes over ownership of
+   C.  Not valid after any reader for CF has been created. */
+void
+casefile_append_xfer (struct casefile *cf, struct ccase *c) 
+{
+  casefile_append (cf, c);
+  case_destroy (c);
 }
 
 /* Writes case C to casefile CF's disk buffer, first flushing the buffer to
@@ -231,67 +319,42 @@ casefile_append (struct casefile *cf, const struct ccase *c)
 static void
 write_case_to_disk (struct casefile *cf, const struct ccase *c) 
 {
-  memcpy (cf->buffer + cf->buffer_used, c->data, cf->case_size);
-  cf->buffer_used += cf->case_size;
-  if (cf->buffer_used + cf->case_size > cf->buffer_size)
+  case_to_values (c, cf->buffer + cf->buffer_used, cf->value_cnt);
+  cf->buffer_used += cf->value_cnt;
+  if (cf->buffer_used + cf->value_cnt > cf->buffer_size)
     flush_buffer (cf);
-
 }
 
+/* If any bytes in CF's output buffer are used, flush them to
+   disk. */
 static void
 flush_buffer (struct casefile *cf) 
 {
   if (cf->buffer_used > 0) 
     {
-      if (!full_write (cf->fd, cf->buffer, cf->buffer_size)) 
+      if (!full_write (cf->fd, cf->buffer,
+                       cf->buffer_size * sizeof *cf->buffer)) 
         msg (FE, _("Error writing temporary file: %s."), strerror (errno));
 
       cf->buffer_used = 0;
     } 
 }
 
-/* Creates a temporary file and stores its name in *FILENAME and
-   a file descriptor for it in *FD.  Returns success.  Caller is
-   responsible for freeing *FILENAME. */
-static int
-make_temp_file (int *fd, char **filename)
-{
-  const char *parent_dir;
-
-  assert (filename != NULL);
-  assert (fd != NULL);
-
-  if (getenv ("TMPDIR") != NULL)
-    parent_dir = getenv ("TMPDIR");
-  else
-    parent_dir = P_tmpdir;
-
-  *filename = xmalloc (strlen (parent_dir) + 32);
-  sprintf (*filename, "%s%cpsppXXXXXX", parent_dir, DIR_SEPARATOR);
-  *fd = mkstemp (*filename);
-  if (*fd < 0)
-    {
-      msg (FE, _("%s: Creating temporary file: %s."),
-           *filename, strerror (errno));
-      free (*filename);
-      *filename = NULL;
-      return 0;
-    }
-  return 1;
-}
 
 /* If CF is currently stored in memory, writes it to disk.  Readers, if any,
    retain their current positions. */
 void
-casefile_to_disk (struct casefile *cf
+casefile_to_disk (const struct casefile *cf_
 {
-  struct case_list *iter, *next;
+  struct casefile *cf = (struct casefile *) cf_;
   struct casereader *reader;
   
   assert (cf != NULL);
-  
+
   if (cf->storage == MEMORY)
     {
+      size_t idx, block_cnt;
+      
       assert (cf->filename == NULL);
       assert (cf->fd == -1);
       assert (cf->buffer_used == 0);
@@ -299,116 +362,41 @@ casefile_to_disk (struct casefile *cf)
       cf->storage = DISK;
       if (!make_temp_file (&cf->fd, &cf->filename))
         err_failure ();
-#if HAVE_POSIX_FADVISE
-      posix_fadvise (cf->fd, 0, 0, POSIX_FADV_SEQUENTIAL);
-#endif
-      cf->buffer = xmalloc (cf->buffer_size);
-      memset (cf->buffer, 0, cf->buffer_size);
+      cf->buffer = xmalloc (cf->buffer_size * sizeof *cf->buffer);
+      memset (cf->buffer, 0, cf->buffer_size * sizeof *cf->buffer);
 
-      for (iter = cf->head; iter != NULL; iter = next) 
+      case_bytes -= cf->case_cnt * cf->case_acct_size;
+      for (idx = 0; idx < cf->case_cnt; idx++)
         {
-          next = iter->next;
-          write_case_to_disk (cf, &iter->c);
-          workspace_free (iter, cf->case_list_size);
+          size_t block_idx = idx / CASES_PER_BLOCK;
+          size_t case_idx = idx % CASES_PER_BLOCK;
+          struct ccase *c = &cf->cases[block_idx][case_idx];
+          write_case_to_disk (cf, c);
+          case_destroy (c);
         }
-      flush_buffer (cf);
-      cf->head = cf->tail = NULL;
+
+      block_cnt = DIV_RND_UP (cf->case_cnt, CASES_PER_BLOCK);
+      for (idx = 0; idx < block_cnt; idx++)
+        free (cf->cases[idx]);
+
+      free (cf->cases);
+      cf->cases = NULL;
+
+      if (cf->mode == READ)
+        flush_buffer (cf);
 
       for (reader = cf->readers; reader != NULL; reader = reader->next)
         reader_open_file (reader);
     }
 }
 
-/* Merges lists A and B into a single list, which is returned.  Cases are
-   compared according to comparison function COMPARE, which receives auxiliary
-   data AUX. */
-static struct case_list *
-merge (struct case_list *a, struct case_list *b,
-       int (*compare) (const struct ccase *,
-                       const struct ccase *, void *aux),
-       void *aux) 
-{
-  struct case_list head;
-  struct case_list *tail = &head;
-
-  while (a != NULL && b != NULL)
-    if (compare (&a->c, &b->c, aux) < 0) 
-      {
-        tail->next = a;
-        tail = a;
-        a = a->next;
-      }
-    else 
-      {
-        tail->next = b;
-        tail = b;
-        b = b->next;
-      }
-
-  tail->next = a == NULL ? b : a;
-
-  return head.next;
-}
-
-/* Sorts the list beginning at FIRST, returning the new first case.  Cases are
-   compared according to comparison function COMPARE, which receives auxiliary
-   data AUX. */
-static struct case_list *
-merge_sort (struct case_list *first,
-            int (*compare) (const struct ccase *,
-                            const struct ccase *, void *aux),
-            void *aux) 
-{
-  /* FIXME: we should use a "natural" merge sort to take
-     advantage of the natural order of the data. */
-  struct case_list *middle, *last, *tmp;
-
-  /* A list of zero or one elements is already sorted. */
-  if (first == NULL || first->next == NULL)
-    return first;
-
-  middle = first;
-  last = first->next;
-  while (last != NULL && last->next != NULL) 
-    {
-      middle = middle->next;
-      last = last->next->next;
-    }
-  tmp = middle;
-  middle = middle->next;
-  tmp->next = NULL;
-  return merge (merge_sort (first, compare, aux),
-                merge_sort (middle, compare, aux),
-                compare, aux);
-}
-
-/* Tries to sort casefile CF according to comparison function
-   COMPARE, which is passes auxiliary data AUX.  If successful,
-   returns nonzero.  Currently only sorting of in-memory
-   casefiles is implemented. */
-int
-casefile_sort (struct casefile *cf,
-               int (*compare) (const struct ccase *,
-                               const struct ccase *, void *aux),
-               void *aux)
+/* Changes CF to reader mode, ensuring that no more cases may be
+   added.  Creating a casereader for CF has the same effect. */
+void
+casefile_mode_reader (struct casefile *cf) 
 {
   assert (cf != NULL);
-  assert (compare != NULL);
-
-  cf->mode = WRITE;
-
-  if (cf->case_cnt < 2)
-    return 1;
-  else if (cf->storage == DISK)
-    return 0;
-  else 
-    {
-      cf->head = cf->tail = merge_sort (cf->head, compare, aux);
-      while (cf->tail->next != NULL)
-        cf->tail = cf->tail->next;
-
-      return 1; 
-    }
+  cf->mode = READ;
 }
 
 /* Creates and returns a casereader for CF.  A casereader can be used to
@@ -419,6 +407,9 @@ casefile_get_reader (const struct casefile *cf_)
   struct casefile *cf = (struct casefile *) cf_;
   struct casereader *reader;
 
+  assert (cf != NULL);
+  assert (!cf->being_destroyed);
+
   /* Flush the buffer to disk if it's not empty. */
   if (cf->mode == WRITE && cf->storage == DISK)
     flush_buffer (cf);
@@ -426,26 +417,43 @@ casefile_get_reader (const struct casefile *cf_)
   cf->mode = READ;
 
   reader = xmalloc (sizeof *reader);
-  reader->cf = cf;
   reader->next = cf->readers;
   if (cf->readers != NULL)
     reader->next->prev = reader;
-  reader->prev = NULL;
   cf->readers = reader;
+  reader->prev = NULL;
+  reader->cf = cf;
   reader->case_idx = 0;
-  reader->cur = NULL;
+  reader->destructive = 0;
   reader->fd = -1;
   reader->buffer = NULL;
   reader->buffer_pos = 0;
+  case_nullify (&reader->c);
 
-  if (reader->cf->storage == MEMORY) 
-    reader->cur = cf->head;
-  else
+  if (reader->cf->storage == DISK) 
     reader_open_file (reader);
 
   return reader;
 }
 
+/* Creates and returns a destructive casereader for CF.  Like a
+   normal casereader, a destructive casereader sequentially reads
+   the cases in a casefile.  Unlike a normal casereader, a
+   destructive reader cannot operate concurrently with any other
+   reader.  (This restriction could be relaxed in a few ways, but
+   it is so far unnecessary for other code.) */
+struct casereader *
+casefile_get_destructive_reader (struct casefile *cf) 
+{
+  struct casereader *reader;
+  
+  assert (cf->readers == NULL);
+  reader = casefile_get_reader (cf);
+  reader->destructive = 1;
+  cf->being_destroyed = 1;
+  return reader;
+}
+
 /* Opens a disk file for READER and seeks to the current position as indicated
    by case_idx.  Normally the current position is the beginning of the file,
    but casefile_to_disk may cause the file to be opened at a different
@@ -453,103 +461,140 @@ casefile_get_reader (const struct casefile *cf_)
 static void
 reader_open_file (struct casereader *reader) 
 {
-  size_t buffer_case_cnt;
+  struct casefile *cf = reader->cf;
   off_t file_ofs;
 
-  if (reader->case_idx >= reader->cf->case_cnt)
+  if (reader->case_idx >= cf->case_cnt)
     return;
 
-  if (reader->cf->fd != -1) 
+  if (cf->fd != -1) 
     {
-      reader->fd = reader->cf->fd;
-      reader->cf->fd = -1;
+      reader->fd = cf->fd;
+      cf->fd = -1;
     }
   else 
     {
-      reader->fd = safe_open (reader->cf->filename, O_RDONLY);
+      reader->fd = safe_open (cf->filename, O_RDONLY);
       if (reader->fd < 0)
         msg (FE, _("%s: Opening temporary file: %s."),
-             reader->cf->filename, strerror (errno));
+             cf->filename, strerror (errno));
     }
 
-  if (reader->cf->buffer != NULL) 
+  if (cf->buffer != NULL) 
     {
-      reader->buffer = reader->cf->buffer;
-      reader->cf->buffer = NULL; 
+      reader->buffer = cf->buffer;
+      cf->buffer = NULL; 
     }
   else 
     {
-      reader->buffer = xmalloc (reader->cf->buffer_size);
-      memset (reader->buffer, 0, reader->cf->buffer_size); 
+      reader->buffer = xmalloc (cf->buffer_size * sizeof *cf->buffer);
+      memset (reader->buffer, 0, cf->buffer_size * sizeof *cf->buffer); 
     }
 
-  if (reader->cf->case_size != 0) 
+  if (cf->value_cnt != 0) 
     {
-      buffer_case_cnt = reader->cf->buffer_size / reader->cf->case_size;
-      file_ofs = ((off_t) reader->case_idx
-                  / buffer_case_cnt * reader->cf->buffer_size);
+      size_t buffer_case_cnt = cf->buffer_size / cf->value_cnt;
+      file_ofs = ((off_t) reader->case_idx / buffer_case_cnt
+                  * cf->buffer_size * sizeof *cf->buffer);
       reader->buffer_pos = (reader->case_idx % buffer_case_cnt
-                            * reader->cf->case_size);
+                            * cf->value_cnt);
     }
   else 
     file_ofs = 0;
-#if HAVE_POSIX_FADVISE
-  posix_fadvise (reader->fd, file_ofs, 0, POSIX_FADV_SEQUENTIAL);
-#endif
   if (lseek (reader->fd, file_ofs, SEEK_SET) != file_ofs)
     msg (FE, _("%s: Seeking temporary file: %s."),
-         reader->cf->filename, strerror (errno));
+         cf->filename, strerror (errno));
 
-  if (reader->cf->case_cnt > 0 && reader->cf->case_size > 0)
+  if (cf->case_cnt > 0 && cf->value_cnt > 0)
     fill_buffer (reader);
+
+  case_create (&reader->c, cf->value_cnt);
 }
 
 /* Fills READER's buffer by reading a block from disk. */
 static void
 fill_buffer (struct casereader *reader)
 {
-  int retval = full_read (reader->fd, reader->buffer, reader->cf->buffer_size);
+  int retval = full_read (reader->fd, reader->buffer,
+                          reader->cf->buffer_size * sizeof *reader->buffer);
   if (retval < 0)
     msg (FE, _("%s: Reading temporary file: %s."),
          reader->cf->filename, strerror (errno));
-  else if (retval != reader->cf->buffer_size)
+  else if (retval != reader->cf->buffer_size * sizeof *reader->buffer)
     msg (FE, _("%s: Temporary file ended unexpectedly."),
          reader->cf->filename); 
 }
 
+/* Returns the casefile that READER reads. */
+const struct casefile *
+casereader_get_casefile (const struct casereader *reader) 
+{
+  assert (reader != NULL);
+  
+  return reader->cf;
+}
+
+/* Reads a copy of the next case from READER into C.
+   Caller is responsible for destroying C. */
 int
-casereader_read (struct casereader *reader, const struct ccase **c) 
+casereader_read (struct casereader *reader, struct ccase *c) 
 {
   assert (reader != NULL);
   
   if (reader->case_idx >= reader->cf->case_cnt) 
-    {
-      *c = NULL;
-      return 0;
-    }
+    return 0;
 
-  reader->case_idx++;
   if (reader->cf->storage == MEMORY) 
     {
-      assert (reader->cur != NULL);
-      *c = &reader->cur->c;
-      reader->cur = reader->cur->next;
+      size_t block_idx = reader->case_idx / CASES_PER_BLOCK;
+      size_t case_idx = reader->case_idx % CASES_PER_BLOCK;
+
+      case_clone (c, &reader->cf->cases[block_idx][case_idx]);
+      reader->case_idx++;
       return 1;
     }
   else 
     {
-      if (reader->buffer_pos + reader->cf->case_size > reader->cf->buffer_size)
+      if (reader->buffer_pos + reader->cf->value_cnt > reader->cf->buffer_size)
         {
           fill_buffer (reader);
           reader->buffer_pos = 0;
         }
 
-      *c = (struct ccase *) (reader->buffer + reader->buffer_pos);
-      reader->buffer_pos += reader->cf->case_size;
+      case_from_values (&reader->c, reader->buffer + reader->buffer_pos,
+                        reader->cf->value_cnt);
+      reader->buffer_pos += reader->cf->value_cnt;
+      reader->case_idx++;
+
+      case_clone (c, &reader->c);
       return 1;
     }
 }
 
+/* Reads the next case from READER into C and transfers ownership
+   to the caller.  Caller is responsible for destroying C. */
+int
+casereader_read_xfer (struct casereader *reader, struct ccase *c)
+{
+  assert (reader != NULL);
+
+  if (reader->destructive == 0
+      || reader->case_idx >= reader->cf->case_cnt
+      || reader->cf->storage == DISK) 
+    return casereader_read (reader, c);
+  else 
+    {
+      size_t block_idx = reader->case_idx / CASES_PER_BLOCK;
+      size_t case_idx = reader->case_idx % CASES_PER_BLOCK;
+      struct ccase *read_case = &reader->cf->cases[block_idx][case_idx];
+
+      case_move (c, read_case);
+      reader->case_idx++;
+      return 1;
+    }
+}
+
+/* Destroys READER. */
 void
 casereader_destroy (struct casereader *reader)
 {
@@ -567,14 +612,21 @@ casereader_destroy (struct casereader *reader)
   else
     free (reader->buffer);
 
-  if (reader->cf->fd == -1)
-    reader->cf->fd = reader->fd;
-  else
-    safe_close (reader->fd);
+  if (reader->fd != -1) 
+    {
+      if (reader->cf->fd == -1)
+        reader->cf->fd = reader->fd;
+      else
+        safe_close (reader->fd);
+    }
+  
+  case_destroy (&reader->c);
 
   free (reader);
 }
 
+/* Calls open(), passing FILENAME and FLAGS, repeating as necessary
+   to deal with interrupted calls. */
 static int
 safe_open (const char *filename, int flags) 
 {
@@ -589,6 +641,8 @@ safe_open (const char *filename, int flags)
   return fd;
 }
 
+/* Calls close(), passing FD, repeating as necessary to deal with
+   interrupted calls. */
 static int safe_close (int fd) 
 {
   int retval;
@@ -602,9 +656,12 @@ static int safe_close (int fd)
   return retval;
 }
 
+/* Calls read(), passing FD, BUFFER, and SIZE, repeating as
+   necessary to deal with interrupted calls. */
 static int
-full_read (int fd, char *buffer, size_t size) 
+full_read (int fd, void *buffer_, size_t size) 
 {
+  char *buffer = buffer_;
   size_t bytes_read = 0;
   
   while (bytes_read < size)
@@ -621,9 +678,12 @@ full_read (int fd, char *buffer, size_t size)
   return bytes_read;
 }
 
+/* Calls write(), passing FD, BUFFER, and SIZE, repeating as
+   necessary to deal with interrupted calls. */
 static int
-full_write (int fd, const char *buffer, size_t size) 
+full_write (int fd, const void *buffer_, size_t size) 
 {
+  const char *buffer = buffer_;
   size_t bytes_written = 0;
   
   while (bytes_written < size)
@@ -638,6 +698,9 @@ full_write (int fd, const char *buffer, size_t size)
   return bytes_written;
 }
 
+
+/* Registers our exit handler with atexit() if it has not already
+   been registered. */
 static void
 register_atexit (void) 
 {
@@ -649,6 +712,10 @@ register_atexit (void)
     }
 }
 
+
+
+/* atexit() handler that closes and deletes our temporary
+   files. */
 static void
 exit_handler (void) 
 {
@@ -656,13 +723,14 @@ exit_handler (void)
     casefile_destroy (casefiles);
 }
 \f
+#include <gsl/gsl_rng.h>
 #include <stdarg.h>
 #include "command.h"
-#include "random.h"
 #include "lexer.h"
 
-static void test_casefile (int pattern, size_t case_size, size_t case_cnt);
-static struct ccase *get_random_case (size_t case_size, size_t case_idx);
+static void test_casefile (int pattern, size_t value_cnt, size_t case_cnt);
+static void get_random_case (struct ccase *, size_t value_cnt,
+                             size_t case_idx);
 static void write_random_case (struct casefile *cf, size_t case_idx);
 static void read_and_verify_random_case (struct casefile *cf,
                                          struct casereader *reader,
@@ -692,7 +760,7 @@ cmd_debug_casefile (void)
   if (token != '.')
     return lex_end_of_command ();
     
-  for (pattern = 0; pattern < 5; pattern++) 
+  for (pattern = 0; pattern < 6; pattern++) 
     {
       const size_t *size;
 
@@ -702,7 +770,7 @@ cmd_debug_casefile (void)
 
           for (case_cnt = 0; case_cnt <= case_max;
                case_cnt = (case_cnt * 2) + 1)
-            test_casefile (pattern, *size * sizeof (union value), case_cnt);
+            test_casefile (pattern, *size, case_cnt);
         }
     }
   printf ("Casefile tests succeeded.\n");
@@ -710,25 +778,28 @@ cmd_debug_casefile (void)
 }
 
 static void
-test_casefile (int pattern, size_t case_size, size_t case_cnt) 
+test_casefile (int pattern, size_t value_cnt, size_t case_cnt) 
 {
-  int zero = 0;
   struct casefile *cf;
   struct casereader *r1, *r2;
-  const struct ccase *c;
-  struct rng *rng;
+  struct ccase c;
+  gsl_rng *rng;
   size_t i, j;
 
-  rng = rng_create ();
-  rng_seed (rng, &zero, sizeof zero);
-  cf = casefile_create (case_size);
+  rng = gsl_rng_alloc (gsl_rng_mt19937);
+  cf = casefile_create (value_cnt);
+  if (pattern == 5)
+    casefile_to_disk (cf);
   for (i = 0; i < case_cnt; i++)
     write_random_case (cf, i);
+  if (pattern == 5)
+    casefile_sleep (cf);
   r1 = casefile_get_reader (cf);
   r2 = casefile_get_reader (cf);
   switch (pattern) 
     {
     case 0:
+    case 5:
       for (i = 0; i < case_cnt; i++) 
         {
           read_and_verify_random_case (cf, r1, i);
@@ -747,8 +818,8 @@ test_casefile (int pattern, size_t case_size, size_t case_cnt)
       for (i = j = 0; i < case_cnt; i++) 
         {
           read_and_verify_random_case (cf, r1, i);
-          if (rng_get_int (rng) % pattern == 0)
-            read_and_verify_random_case (cf, r2, j++);
+          if (gsl_rng_get (rng) % pattern == 0) 
+            read_and_verify_random_case (cf, r2, j++); 
           if (i == case_cnt / 2)
             casefile_to_disk (cf);
         }
@@ -764,41 +835,70 @@ test_casefile (int pattern, size_t case_size, size_t case_cnt)
     casereader_destroy (r1);
   if (pattern != 2)
     casereader_destroy (r2);
+  if (pattern > 2) 
+    {
+      r1 = casefile_get_destructive_reader (cf);
+      for (i = 0; i < case_cnt; i++) 
+        {
+          struct ccase read_case, expected_case;
+          
+          get_random_case (&expected_case, value_cnt, i);
+          if (!casereader_read_xfer (r1, &read_case)) 
+            fail_test ("Premature end of casefile.");
+          for (j = 0; j < value_cnt; j++) 
+            {
+              double a = case_num (&read_case, j);
+              double b = case_num (&expected_case, j);
+              if (a != b)
+                fail_test ("Case %lu fails comparison.", (unsigned long) i); 
+            }
+          case_destroy (&expected_case);
+          case_destroy (&read_case);
+        }
+      casereader_destroy (r1);
+    }
   casefile_destroy (cf);
-  rng_destroy (rng);
+  gsl_rng_free (rng);
 }
 
-static struct ccase *
-get_random_case (size_t case_size, size_t case_idx) 
+static void
+get_random_case (struct ccase *c, size_t value_cnt, size_t case_idx) 
 {
-  struct ccase *c = xmalloc (case_size);
-  memset (c, case_idx % 257, case_size);
-  return c;
+  int i;
+  case_create (c, value_cnt);
+  for (i = 0; i < value_cnt; i++)
+    case_data_rw (c, i)->f = case_idx % 257 + i;
 }
 
 static void
 write_random_case (struct casefile *cf, size_t case_idx) 
 {
-  struct ccase *c = get_random_case (casefile_get_case_size (cf), case_idx);
-  casefile_append (cf, c);
-  free (c);
+  struct ccase c;
+  get_random_case (&c, casefile_get_value_cnt (cf), case_idx);
+  casefile_append_xfer (cf, &c);
 }
 
 static void
 read_and_verify_random_case (struct casefile *cf,
                              struct casereader *reader, size_t case_idx) 
 {
-  const struct ccase *read_case;
-  struct ccase *expected_case;
-  size_t case_size;
-
-  case_size = casefile_get_case_size (cf);
-  expected_case = get_random_case (case_size, case_idx);
+  struct ccase read_case, expected_case;
+  size_t value_cnt;
+  size_t i;
+  
+  value_cnt = casefile_get_value_cnt (cf);
+  get_random_case (&expected_case, value_cnt, case_idx);
   if (!casereader_read (reader, &read_case)) 
     fail_test ("Premature end of casefile.");
-  if (memcmp (read_case, expected_case, case_size))
-    fail_test ("Case %lu fails comparison.", (unsigned long) case_idx);
-  free (expected_case);
+  for (i = 0; i < value_cnt; i++) 
+    {
+      double a = case_num (&read_case, i);
+      double b = case_num (&expected_case, i);
+      if (a != b)
+        fail_test ("Case %lu fails comparison.", (unsigned long) case_idx); 
+    }
+  case_destroy (&read_case);
+  case_destroy (&expected_case);
 }
 
 static void