Fix memory leaks.
[pspp-builds.git] / src / casefile.c
index d2c1038330018da29e2b2a90463e97372d96c3dd..2ff3a5742f3492ff65c719c38ebb6ebe03c96219 100644 (file)
 #include <string.h>
 #include <unistd.h>
 #include "alloc.h"
+#include "case.h"
 #include "error.h"
 #include "misc.h"
+#include "settings.h"
 #include "var.h"
-#include "workspace.h"
 
 #ifdef HAVE_VALGRIND_VALGRIND_H
 #include <valgrind/valgrind.h>
    file.  Once any cases have been read, no more cases may be
    appended.  The entire file is discarded at once. */
 
+/* In-memory cases are arranged in an array of arrays.  The top
+   level is variable size and the size of each bottom level array
+   is fixed at the number of cases defined here.  */
+#define CASES_PER_BLOCK 128             
+
 /* A casefile. */
 struct casefile 
   {
     /* Basic data. */
     struct casefile *next, *prev;       /* Next, prev in global list. */
+    size_t value_cnt;                   /* Case size in `union value's. */
     size_t case_size;                   /* Case size in bytes. */
-    size_t case_list_size;              /* Bytes to allocate for case_lists. */
+    size_t case_acct_size;              /* Case size for accounting. */
     unsigned long case_cnt;             /* Number of cases stored. */
     enum { MEMORY, DISK } storage;      /* Where cases are stored. */
     enum { WRITE, READ } mode;          /* Is writing or reading allowed? */
     struct casereader *readers;         /* List of our readers. */
+    int being_destroyed;                /* Does a destructive reader exist? */
 
     /* Memory storage. */
-    struct case_list *head, *tail;      /* Beginning, end of list of cases. */
+    struct ccase **cases;               /* Pointer to array of cases. */
 
     /* Disk storage. */
     int fd;                             /* File descriptor, -1 if none. */
@@ -68,23 +76,26 @@ struct casefile
     size_t buffer_size;                 /* Buffer size in bytes. */
   };
 
-/* For reading out the casing in a casefile. */
+/* For reading out the cases in a casefile. */
 struct casereader 
   {
     struct casereader *next, *prev;     /* Next, prev in casefile's list. */
     struct casefile *cf;                /* Our casefile. */
     unsigned long case_idx;             /* Case number of current case. */
-
-    /* Memory storage. */
-    struct case_list *cur;              /* Current case. */
+    int destructive;                    /* Is this a destructive reader? */
 
     /* Disk storage. */
     int fd;                             /* File descriptor. */
     unsigned char *buffer;              /* I/O buffer. */
     size_t buffer_pos;                  /* Byte offset of buffer position. */
+    struct ccase c;                     /* Current case. */
   };
 
-struct casefile *casefiles;
+/* Doubly linked list of all casefiles. */
+static struct casefile *casefiles;
+
+/* Number of bytes of case allocated in in-memory casefiles. */
+static size_t case_bytes;
 
 static void register_atexit (void);
 static void exit_handler (void);
@@ -99,9 +110,10 @@ static int safe_close (int fd);
 static int full_read (int fd, char *buffer, size_t size);
 static int full_write (int fd, const char *buffer, size_t size);
 
-/* Creates and returns a casefile to store cases of CASE_SIZE bytes each. */
+/* Creates and returns a casefile to store cases of VALUE_CNT
+   `union value's each. */
 struct casefile *
-casefile_create (size_t case_size
+casefile_create (size_t value_cnt
 {
   struct casefile *cf = xmalloc (sizeof *cf);
   cf->next = casefiles;
@@ -109,19 +121,21 @@ casefile_create (size_t case_size)
   if (cf->next != NULL)
     cf->next->prev = cf;
   casefiles = cf;
-  cf->case_size = case_size;
-  cf->case_list_size = sizeof *cf->head + case_size - sizeof *cf->head->c.data;
+  cf->value_cnt = value_cnt;
+  cf->case_size = case_serial_size (value_cnt);
+  cf->case_acct_size = cf->case_size + 4 * sizeof (void *);
   cf->case_cnt = 0;
   cf->storage = MEMORY;
   cf->mode = WRITE;
   cf->readers = NULL;
-  cf->head = cf->tail = NULL;
+  cf->being_destroyed = 0;
+  cf->cases = NULL;
   cf->fd = -1;
   cf->filename = NULL;
   cf->buffer = NULL;
-  cf->buffer_size = ROUND_UP (case_size, IO_BUF_SIZE);
-  if (case_size > 0 && cf->buffer_size % case_size > 512)
-    cf->buffer_size = case_size;
+  cf->buffer_size = ROUND_UP (cf->case_size, IO_BUF_SIZE);
+  if (cf->case_size > 0 && cf->buffer_size % cf->case_size > 512)
+    cf->buffer_size = cf->case_size;
   cf->buffer_used = 0;
   register_atexit ();
   return cf;
@@ -133,8 +147,6 @@ casefile_destroy (struct casefile *cf)
 {
   if (cf != NULL) 
     {
-      struct case_list *iter, *next;
-
       if (cf->next != NULL)
         cf->next->prev = cf->prev;
       if (cf->prev != NULL)
@@ -145,10 +157,24 @@ casefile_destroy (struct casefile *cf)
       while (cf->readers != NULL) 
         casereader_destroy (cf->readers);
 
-      for (iter = cf->head; iter != NULL; iter = next
+      if (cf->cases != NULL
         {
-          next = iter->next;
-          workspace_free (iter, cf->case_list_size);
+          size_t idx, block_cnt;
+
+          case_bytes -= cf->case_cnt * cf->case_acct_size;
+          for (idx = 0; idx < cf->case_cnt; idx++)
+            {
+              size_t block_idx = idx / CASES_PER_BLOCK;
+              size_t case_idx = idx % CASES_PER_BLOCK;
+              struct ccase *c = &cf->cases[block_idx][case_idx];
+              case_destroy (c);
+            }
+
+          block_cnt = DIV_RND_UP (cf->case_cnt, CASES_PER_BLOCK);
+          for (idx = 0; idx < block_cnt; idx++)
+            free (cf->cases[idx]);
+
+          free (cf->cases);
         }
 
       if (cf->fd != -1)
@@ -175,13 +201,44 @@ casefile_in_core (const struct casefile *cf)
   return cf->storage == MEMORY;
 }
 
-/* Returns the number of bytes in a case for CF. */
+/* Puts a casefile to "sleep", that is, minimizes the resources
+   needed for it by closing its file descriptor and freeing its
+   buffer.  This is useful if we need so many casefiles that we
+   might not have enough memory and file descriptors to go
+   around.
+
+   For simplicity, this implementation always converts the
+   casefile to reader mode.  If this turns out to be a problem,
+   with a little extra work we could also support sleeping
+   writers. */
+void
+casefile_sleep (const struct casefile *cf_) 
+{
+  struct casefile *cf = (struct casefile *) cf_;
+  assert (cf != NULL);
+
+  casefile_mode_reader (cf);
+  casefile_to_disk (cf);
+
+  if (cf->fd != -1) 
+    {
+      safe_close (cf->fd);
+      cf->fd = -1;
+    }
+  if (cf->buffer != NULL) 
+    {
+      free (cf->buffer);
+      cf->buffer = NULL;
+    }
+}
+
+/* Returns the number of `union value's in a case for CF. */
 size_t
-casefile_get_case_size (const struct casefile *cf) 
+casefile_get_value_cnt (const struct casefile *cf) 
 {
   assert (cf != NULL);
 
-  return cf->case_size;
+  return cf->value_cnt;
 }
 
 /* Returns the number of cases in casefile CF. */
@@ -193,8 +250,8 @@ casefile_get_case_cnt (const struct casefile *cf)
   return cf->case_cnt;
 }
 
-/* Appends case C to casefile CF.  Not valid after any reader for CF has been
-   created. */
+/* Appends a copy of case C to casefile CF.  Not valid after any
+   reader for CF has been created. */
 void
 casefile_append (struct casefile *cf, const struct ccase *c) 
 {
@@ -202,23 +259,31 @@ casefile_append (struct casefile *cf, const struct ccase *c)
   assert (c != NULL);
   assert (cf->mode == WRITE);
 
-  cf->case_cnt++;
-
   /* Try memory first. */
   if (cf->storage == MEMORY) 
     {
-      struct case_list *new_case;
-
-      new_case = workspace_malloc (cf->case_list_size);
-      if (new_case != NULL) 
+      if (case_bytes < get_max_workspace ())
         {
-          memcpy (&new_case->c, c, cf->case_size);
-          new_case->next = NULL;
-          if (cf->head != NULL)
-            cf->tail->next = new_case;
-          else
-            cf->head = new_case;
-          cf->tail = new_case;
+          size_t block_idx = cf->case_cnt / CASES_PER_BLOCK;
+          size_t case_idx = cf->case_cnt % CASES_PER_BLOCK;
+          struct ccase new_case;
+
+          case_bytes += cf->case_acct_size;
+          case_clone (&new_case, c);
+          if (case_idx == 0) 
+            {
+              if ((block_idx & (block_idx - 1)) == 0) 
+                {
+                  size_t block_cap = block_idx == 0 ? 1 : block_idx * 2;
+                  cf->cases = xrealloc (cf->cases,
+                                        sizeof *cf->cases * block_cap);
+                }
+
+              cf->cases[block_idx] = xmalloc (sizeof **cf->cases
+                                              * CASES_PER_BLOCK);
+            }
+
+          case_move (&cf->cases[block_idx][case_idx], &new_case);
         }
       else
         {
@@ -229,6 +294,17 @@ casefile_append (struct casefile *cf, const struct ccase *c)
     }
   else
     write_case_to_disk (cf, c);
+
+  cf->case_cnt++;
+}
+
+/* Appends case C to casefile CF, which takes over ownership of
+   C.  Not valid after any reader for CF has been created. */
+void
+casefile_append_xfer (struct casefile *cf, struct ccase *c) 
+{
+  casefile_append (cf, c);
+  case_destroy (c);
 }
 
 /* Writes case C to casefile CF's disk buffer, first flushing the buffer to
@@ -236,13 +312,14 @@ casefile_append (struct casefile *cf, const struct ccase *c)
 static void
 write_case_to_disk (struct casefile *cf, const struct ccase *c) 
 {
-  memcpy (cf->buffer + cf->buffer_used, c->data, cf->case_size);
+  case_serialize (c, cf->buffer + cf->buffer_used, cf->case_size);
   cf->buffer_used += cf->case_size;
   if (cf->buffer_used + cf->case_size > cf->buffer_size)
     flush_buffer (cf);
-
 }
 
+/* If any bytes in CF's output buffer are used, flush them to
+   disk. */
 static void
 flush_buffer (struct casefile *cf) 
 {
@@ -285,35 +362,20 @@ make_temp_file (int *fd, char **filename)
   return 1;
 }
 
-static void
-call_posix_fadvise (int fd UNUSED,
-                    off_t offset UNUSED, off_t len UNUSED,
-                    int advice UNUSED) 
-{
-#ifdef HAVE_VALGRIND_VALGRIND_H
-  /* Valgrind doesn't know about posix_fadvise() as of this
-     writing. */
-  if (RUNNING_ON_VALGRIND)
-    return; 
-#endif
-
-#ifdef HAVE_POSIX_FADVISE
-  posix_fadvise (fd, offset, len, advice);
-#endif
-}
-
 /* If CF is currently stored in memory, writes it to disk.  Readers, if any,
    retain their current positions. */
 void
-casefile_to_disk (struct casefile *cf
+casefile_to_disk (const struct casefile *cf_
 {
-  struct case_list *iter, *next;
+  struct casefile *cf = (struct casefile *) cf_;
   struct casereader *reader;
   
   assert (cf != NULL);
   
   if (cf->storage == MEMORY)
     {
+      size_t idx, block_cnt;
+      
       assert (cf->filename == NULL);
       assert (cf->fd == -1);
       assert (cf->buffer_used == 0);
@@ -321,114 +383,41 @@ casefile_to_disk (struct casefile *cf)
       cf->storage = DISK;
       if (!make_temp_file (&cf->fd, &cf->filename))
         err_failure ();
-      call_posix_fadvise (cf->fd, 0, 0, POSIX_FADV_SEQUENTIAL);
       cf->buffer = xmalloc (cf->buffer_size);
       memset (cf->buffer, 0, cf->buffer_size);
 
-      for (iter = cf->head; iter != NULL; iter = next) 
+      case_bytes -= cf->case_cnt * cf->case_acct_size;
+      for (idx = 0; idx < cf->case_cnt; idx++)
         {
-          next = iter->next;
-          write_case_to_disk (cf, &iter->c);
-          workspace_free (iter, cf->case_list_size);
+          size_t block_idx = idx / CASES_PER_BLOCK;
+          size_t case_idx = idx % CASES_PER_BLOCK;
+          struct ccase *c = &cf->cases[block_idx][case_idx];
+          write_case_to_disk (cf, c);
+          case_destroy (c);
         }
-      flush_buffer (cf);
-      cf->head = cf->tail = NULL;
+
+      block_cnt = DIV_RND_UP (cf->case_cnt, CASES_PER_BLOCK);
+      for (idx = 0; idx < block_cnt; idx++)
+        free (cf->cases[idx]);
+
+      free (cf->cases);
+      cf->cases = NULL;
+
+      if (cf->mode == READ)
+        flush_buffer (cf);
 
       for (reader = cf->readers; reader != NULL; reader = reader->next)
         reader_open_file (reader);
     }
 }
 
-/* Merges lists A and B into a single list, which is returned.  Cases are
-   compared according to comparison function COMPARE, which receives auxiliary
-   data AUX. */
-static struct case_list *
-merge (struct case_list *a, struct case_list *b,
-       int (*compare) (const struct ccase *,
-                       const struct ccase *, void *aux),
-       void *aux) 
-{
-  struct case_list head;
-  struct case_list *tail = &head;
-
-  while (a != NULL && b != NULL)
-    if (compare (&a->c, &b->c, aux) < 0) 
-      {
-        tail->next = a;
-        tail = a;
-        a = a->next;
-      }
-    else 
-      {
-        tail->next = b;
-        tail = b;
-        b = b->next;
-      }
-
-  tail->next = a == NULL ? b : a;
-
-  return head.next;
-}
-
-/* Sorts the list beginning at FIRST, returning the new first case.  Cases are
-   compared according to comparison function COMPARE, which receives auxiliary
-   data AUX. */
-static struct case_list *
-merge_sort (struct case_list *first,
-            int (*compare) (const struct ccase *,
-                            const struct ccase *, void *aux),
-            void *aux) 
-{
-  /* FIXME: we should use a "natural" merge sort to take
-     advantage of the natural order of the data. */
-  struct case_list *middle, *last, *tmp;
-
-  /* A list of zero or one elements is already sorted. */
-  if (first == NULL || first->next == NULL)
-    return first;
-
-  middle = first;
-  last = first->next;
-  while (last != NULL && last->next != NULL) 
-    {
-      middle = middle->next;
-      last = last->next->next;
-    }
-  tmp = middle;
-  middle = middle->next;
-  tmp->next = NULL;
-  return merge (merge_sort (first, compare, aux),
-                merge_sort (middle, compare, aux),
-                compare, aux);
-}
-
-/* Tries to sort casefile CF according to comparison function
-   COMPARE, which is passes auxiliary data AUX.  If successful,
-   returns nonzero.  Currently only sorting of in-memory
-   casefiles is implemented. */
-int
-casefile_sort (struct casefile *cf,
-               int (*compare) (const struct ccase *,
-                               const struct ccase *, void *aux),
-               void *aux)
+/* Changes CF to reader mode, ensuring that no more cases may be
+   added.  Creating a casereader for CF has the same effect. */
+void
+casefile_mode_reader (struct casefile *cf) 
 {
   assert (cf != NULL);
-  assert (compare != NULL);
-
-  cf->mode = WRITE;
-
-  if (cf->case_cnt < 2)
-    return 1;
-  else if (cf->storage == DISK)
-    return 0;
-  else 
-    {
-      cf->head = cf->tail = merge_sort (cf->head, compare, aux);
-      while (cf->tail->next != NULL)
-        cf->tail = cf->tail->next;
-
-      return 1; 
-    }
+  cf->mode = READ;
 }
 
 /* Creates and returns a casereader for CF.  A casereader can be used to
@@ -439,6 +428,9 @@ casefile_get_reader (const struct casefile *cf_)
   struct casefile *cf = (struct casefile *) cf_;
   struct casereader *reader;
 
+  assert (cf != NULL);
+  assert (!cf->being_destroyed);
+
   /* Flush the buffer to disk if it's not empty. */
   if (cf->mode == WRITE && cf->storage == DISK)
     flush_buffer (cf);
@@ -453,19 +445,35 @@ casefile_get_reader (const struct casefile *cf_)
   reader->prev = NULL;
   cf->readers = reader;
   reader->case_idx = 0;
-  reader->cur = NULL;
   reader->fd = -1;
   reader->buffer = NULL;
   reader->buffer_pos = 0;
+  case_nullify (&reader->c);
 
-  if (reader->cf->storage == MEMORY) 
-    reader->cur = cf->head;
-  else
+  if (reader->cf->storage == DISK) 
     reader_open_file (reader);
 
   return reader;
 }
 
+/* Creates and returns a destructive casereader for CF.  Like a
+   normal casereader, a destructive casereader sequentially reads
+   the cases in a casefile.  Unlike a normal casereader, a
+   destructive reader cannot operate concurrently with any other
+   reader.  (This restriction could be relaxed in a few ways, but
+   it is so far unnecessary for other code.) */
+struct casereader *
+casefile_get_destructive_reader (struct casefile *cf) 
+{
+  struct casereader *reader;
+  
+  assert (cf->readers == NULL);
+  reader = casefile_get_reader (cf);
+  reader->destructive = 1;
+  cf->being_destroyed = 1;
+  return reader;
+}
+
 /* Opens a disk file for READER and seeks to the current position as indicated
    by case_idx.  Normally the current position is the beginning of the file,
    but casefile_to_disk may cause the file to be opened at a different
@@ -473,53 +481,55 @@ casefile_get_reader (const struct casefile *cf_)
 static void
 reader_open_file (struct casereader *reader) 
 {
+  struct casefile *cf = reader->cf;
   size_t buffer_case_cnt;
   off_t file_ofs;
 
-  if (reader->case_idx >= reader->cf->case_cnt)
+  if (reader->case_idx >= cf->case_cnt)
     return;
 
-  if (reader->cf->fd != -1) 
+  if (cf->fd != -1) 
     {
-      reader->fd = reader->cf->fd;
-      reader->cf->fd = -1;
+      reader->fd = cf->fd;
+      cf->fd = -1;
     }
   else 
     {
-      reader->fd = safe_open (reader->cf->filename, O_RDONLY);
+      reader->fd = safe_open (cf->filename, O_RDONLY);
       if (reader->fd < 0)
         msg (FE, _("%s: Opening temporary file: %s."),
-             reader->cf->filename, strerror (errno));
+             cf->filename, strerror (errno));
     }
 
-  if (reader->cf->buffer != NULL) 
+  if (cf->buffer != NULL) 
     {
-      reader->buffer = reader->cf->buffer;
-      reader->cf->buffer = NULL; 
+      reader->buffer = cf->buffer;
+      cf->buffer = NULL; 
     }
   else 
     {
-      reader->buffer = xmalloc (reader->cf->buffer_size);
-      memset (reader->buffer, 0, reader->cf->buffer_size); 
+      reader->buffer = xmalloc (cf->buffer_size);
+      memset (reader->buffer, 0, cf->buffer_size); 
     }
 
-  if (reader->cf->case_size != 0) 
+  if (cf->case_size != 0) 
     {
-      buffer_case_cnt = reader->cf->buffer_size / reader->cf->case_size;
+      buffer_case_cnt = cf->buffer_size / cf->case_size;
       file_ofs = ((off_t) reader->case_idx
-                  / buffer_case_cnt * reader->cf->buffer_size);
+                  / buffer_case_cnt * cf->buffer_size);
       reader->buffer_pos = (reader->case_idx % buffer_case_cnt
-                            * reader->cf->case_size);
+                            * cf->case_size);
     }
   else 
     file_ofs = 0;
-  call_posix_fadvise (reader->fd, file_ofs, 0, POSIX_FADV_SEQUENTIAL);
   if (lseek (reader->fd, file_ofs, SEEK_SET) != file_ofs)
     msg (FE, _("%s: Seeking temporary file: %s."),
-         reader->cf->filename, strerror (errno));
+         cf->filename, strerror (errno));
 
-  if (reader->cf->case_cnt > 0 && reader->cf->case_size > 0)
+  if (cf->case_cnt > 0 && cf->case_size > 0)
     fill_buffer (reader);
+
+  case_create (&reader->c, cf->value_cnt);
 }
 
 /* Fills READER's buffer by reading a block from disk. */
@@ -535,23 +545,32 @@ fill_buffer (struct casereader *reader)
          reader->cf->filename); 
 }
 
+/* Returns the casefile that READER reads. */
+const struct casefile *
+casereader_get_casefile (const struct casereader *reader) 
+{
+  assert (reader != NULL);
+  
+  return reader->cf;
+}
+
+/* Reads a copy of the next case from READER into C.
+   Caller is responsible for destroying C. */
 int
-casereader_read (struct casereader *reader, const struct ccase **c) 
+casereader_read (struct casereader *reader, struct ccase *c) 
 {
   assert (reader != NULL);
   
   if (reader->case_idx >= reader->cf->case_cnt) 
-    {
-      *c = NULL;
-      return 0;
-    }
+    return 0;
 
-  reader->case_idx++;
   if (reader->cf->storage == MEMORY) 
     {
-      assert (reader->cur != NULL);
-      *c = &reader->cur->c;
-      reader->cur = reader->cur->next;
+      size_t block_idx = reader->case_idx / CASES_PER_BLOCK;
+      size_t case_idx = reader->case_idx % CASES_PER_BLOCK;
+
+      case_clone (c, &reader->cf->cases[block_idx][case_idx]);
+      reader->case_idx++;
       return 1;
     }
   else 
@@ -562,12 +581,40 @@ casereader_read (struct casereader *reader, const struct ccase **c)
           reader->buffer_pos = 0;
         }
 
-      *c = (struct ccase *) (reader->buffer + reader->buffer_pos);
+      case_unserialize (&reader->c, reader->buffer + reader->buffer_pos,
+                        reader->cf->case_size);
       reader->buffer_pos += reader->cf->case_size;
+      reader->case_idx++;
+
+      case_clone (c, &reader->c);
+      return 1;
+    }
+}
+
+/* Reads the next case from READER into C and transfers ownership
+   to the caller.  Caller is responsible for destroying C. */
+int
+casereader_read_xfer (struct casereader *reader, struct ccase *c)
+{
+  assert (reader != NULL);
+
+  if (reader->destructive == 0
+      || reader->case_idx >= reader->cf->case_cnt
+      || reader->cf->storage == DISK) 
+    return casereader_read (reader, c);
+  else 
+    {
+      size_t block_idx = reader->case_idx / CASES_PER_BLOCK;
+      size_t case_idx = reader->case_idx % CASES_PER_BLOCK;
+      struct ccase *read_case = &reader->cf->cases[block_idx][case_idx];
+
+      case_move (c, read_case);
+      reader->case_idx++;
       return 1;
     }
 }
 
+/* Destroys READER. */
 void
 casereader_destroy (struct casereader *reader)
 {
@@ -593,9 +640,13 @@ casereader_destroy (struct casereader *reader)
         safe_close (reader->fd);
     }
   
+  case_destroy (&reader->c);
+
   free (reader);
 }
 
+/* Calls open(), passing FILENAME and FLAGS, repeating as necessary
+   to deal with interrupted calls. */
 static int
 safe_open (const char *filename, int flags) 
 {
@@ -610,6 +661,8 @@ safe_open (const char *filename, int flags)
   return fd;
 }
 
+/* Calls close(), passing FD, repeating as necessary to deal with
+   interrupted calls. */
 static int safe_close (int fd) 
 {
   int retval;
@@ -623,6 +676,8 @@ static int safe_close (int fd)
   return retval;
 }
 
+/* Calls read(), passing FD, BUFFER, and SIZE, repeating as
+   necessary to deal with interrupted calls. */
 static int
 full_read (int fd, char *buffer, size_t size) 
 {
@@ -642,6 +697,8 @@ full_read (int fd, char *buffer, size_t size)
   return bytes_read;
 }
 
+/* Calls write(), passing FD, BUFFER, and SIZE, repeating as
+   necessary to deal with interrupted calls. */
 static int
 full_write (int fd, const char *buffer, size_t size) 
 {
@@ -659,6 +716,8 @@ full_write (int fd, const char *buffer, size_t size)
   return bytes_written;
 }
 
+/* Registers our exit handler with atexit() if it has not already
+   been registered. */
 static void
 register_atexit (void) 
 {
@@ -670,6 +729,8 @@ register_atexit (void)
     }
 }
 
+/* atexit() handler that closes and deletes our temporary
+   files. */
 static void
 exit_handler (void) 
 {
@@ -682,8 +743,9 @@ exit_handler (void)
 #include "random.h"
 #include "lexer.h"
 
-static void test_casefile (int pattern, size_t case_size, size_t case_cnt);
-static struct ccase *get_random_case (size_t case_size, size_t case_idx);
+static void test_casefile (int pattern, size_t value_cnt, size_t case_cnt);
+static void get_random_case (struct ccase *, size_t value_cnt,
+                             size_t case_idx);
 static void write_random_case (struct casefile *cf, size_t case_idx);
 static void read_and_verify_random_case (struct casefile *cf,
                                          struct casereader *reader,
@@ -723,7 +785,7 @@ cmd_debug_casefile (void)
 
           for (case_cnt = 0; case_cnt <= case_max;
                case_cnt = (case_cnt * 2) + 1)
-            test_casefile (pattern, *size * sizeof (union value), case_cnt);
+            test_casefile (pattern, *size, case_cnt);
         }
     }
   printf ("Casefile tests succeeded.\n");
@@ -731,18 +793,18 @@ cmd_debug_casefile (void)
 }
 
 static void
-test_casefile (int pattern, size_t case_size, size_t case_cnt) 
+test_casefile (int pattern, size_t value_cnt, size_t case_cnt) 
 {
   int zero = 0;
   struct casefile *cf;
   struct casereader *r1, *r2;
-  const struct ccase *c;
+  struct ccase c;
   struct rng *rng;
   size_t i, j;
 
   rng = rng_create ();
   rng_seed (rng, &zero, sizeof zero);
-  cf = casefile_create (case_size);
+  cf = casefile_create (value_cnt);
   for (i = 0; i < case_cnt; i++)
     write_random_case (cf, i);
   r1 = casefile_get_reader (cf);
@@ -785,41 +847,70 @@ test_casefile (int pattern, size_t case_size, size_t case_cnt)
     casereader_destroy (r1);
   if (pattern != 2)
     casereader_destroy (r2);
+  if (pattern > 2) 
+    {
+      r1 = casefile_get_destructive_reader (cf);
+      for (i = 0; i < case_cnt; i++) 
+        {
+          struct ccase read_case, expected_case;
+          
+          get_random_case (&expected_case, value_cnt, i);
+          if (!casereader_read_xfer (r1, &read_case)) 
+            fail_test ("Premature end of casefile.");
+          for (j = 0; j < value_cnt; j++) 
+            {
+              double a = case_num (&read_case, j);
+              double b = case_num (&expected_case, j);
+              if (a != b)
+                fail_test ("Case %lu fails comparison.", (unsigned long) i); 
+            }
+          case_destroy (&expected_case);
+          case_destroy (&read_case);
+        }
+      casereader_destroy (r1);
+    }
   casefile_destroy (cf);
   rng_destroy (rng);
 }
 
-static struct ccase *
-get_random_case (size_t case_size, size_t case_idx) 
+static void
+get_random_case (struct ccase *c, size_t value_cnt, size_t case_idx) 
 {
-  struct ccase *c = xmalloc (case_size);
-  memset (c, case_idx % 257, case_size);
-  return c;
+  int i;
+  case_create (c, value_cnt);
+  for (i = 0; i < value_cnt; i++)
+    case_data_rw (c, i)->f = case_idx % 257 + i;
 }
 
 static void
 write_random_case (struct casefile *cf, size_t case_idx) 
 {
-  struct ccase *c = get_random_case (casefile_get_case_size (cf), case_idx);
-  casefile_append (cf, c);
-  free (c);
+  struct ccase c;
+  get_random_case (&c, casefile_get_value_cnt (cf), case_idx);
+  casefile_append_xfer (cf, &c);
 }
 
 static void
 read_and_verify_random_case (struct casefile *cf,
                              struct casereader *reader, size_t case_idx) 
 {
-  const struct ccase *read_case;
-  struct ccase *expected_case;
-  size_t case_size;
-
-  case_size = casefile_get_case_size (cf);
-  expected_case = get_random_case (case_size, case_idx);
+  struct ccase read_case, expected_case;
+  size_t value_cnt;
+  size_t i;
+  
+  value_cnt = casefile_get_value_cnt (cf);
+  get_random_case (&expected_case, value_cnt, case_idx);
   if (!casereader_read (reader, &read_case)) 
     fail_test ("Premature end of casefile.");
-  if (memcmp (read_case, expected_case, case_size))
-    fail_test ("Case %lu fails comparison.", (unsigned long) case_idx);
-  free (expected_case);
+  for (i = 0; i < value_cnt; i++) 
+    {
+      double a = case_num (&read_case, i);
+      double b = case_num (&expected_case, i);
+      if (a != b)
+        fail_test ("Case %lu fails comparison.", (unsigned long) case_idx); 
+    }
+  case_destroy (&read_case);
+  case_destroy (&expected_case);
 }
 
 static void