New data structure sparse_xarray.
authorBen Pfaff <blp@gnu.org>
Thu, 7 May 2009 05:47:51 +0000 (22:47 -0700)
committerBen Pfaff <blp@gnu.org>
Sun, 7 Jun 2009 04:11:08 +0000 (21:11 -0700)
src/libpspp/sparse-xarray.c [new file with mode: 0644]
src/libpspp/sparse-xarray.h [new file with mode: 0644]
tests/automake.mk
tests/libpspp/sparse-xarray-test.c [new file with mode: 0644]
tests/libpspp/sparse-xarray-test.sh [new file with mode: 0755]

diff --git a/src/libpspp/sparse-xarray.c b/src/libpspp/sparse-xarray.c
new file mode 100644 (file)
index 0000000..b6f69d9
--- /dev/null
@@ -0,0 +1,608 @@
+/* PSPP - a program for statistical analysis.
+   Copyright (C) 2007, 2009 Free Software Foundation, Inc.
+
+   This program is free software: you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation, either version 3 of the License, or
+   (at your option) any later version.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program.  If not, see <http://www.gnu.org/licenses/>. */
+
+#include <config.h>
+
+#include <libpspp/sparse-xarray.h>
+
+#include <stdint.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <libpspp/assertion.h>
+#include <libpspp/misc.h>
+#include <libpspp/range-set.h>
+#include <libpspp/sparse-array.h>
+#include <libpspp/tmpfile.h>
+
+#include "md4.h"
+#include "minmax.h"
+#include "xalloc.h"
+
+/* A sparse array of arrays of bytes. */
+struct sparse_xarray
+  {
+    size_t n_bytes;                     /* Number of bytes per row. */
+    uint8_t *default_row;               /* Defaults for unwritten rows. */
+    unsigned long int max_memory_rows;  /* Max rows before dumping to disk. */
+    struct sparse_array *memory;        /* Backing, if stored in memory. */
+    struct tmpfile *disk;               /* Backing, if stored on disk. */
+    struct range_set *disk_rows;        /* Allocated rows, if on disk. */
+  };
+
+static bool UNUSED
+range_is_valid (const struct sparse_xarray *sx, size_t ofs, size_t n)
+{
+  return n <= sx->n_bytes && ofs <= sx->n_bytes && ofs + n <= sx->n_bytes;
+}
+
+/* Creates and returns a new sparse array of arrays of bytes.
+   Each row in the sparse array will consist of N_BYTES bytes.
+   If fewer than MAX_MEMORY_ROWS rows are added to the array,
+   then it will be kept in memory; if more than that are added,
+   then it will be stored on disk. */
+struct sparse_xarray *
+sparse_xarray_create (size_t n_bytes, size_t max_memory_rows)
+{
+  struct sparse_xarray *sx = xmalloc (sizeof *sx);
+  sx->n_bytes = n_bytes;
+  sx->default_row = xzalloc (n_bytes);
+  sx->max_memory_rows = max_memory_rows;
+  sx->memory = sparse_array_create (sizeof (uint8_t *));
+  sx->disk = NULL;
+  sx->disk_rows = NULL;
+  return sx;
+}
+
+/* Creates and returns a new sparse array of rows that contains
+   the same data as OLD.  Returns a null pointer if cloning
+   fails. */
+struct sparse_xarray *
+sparse_xarray_clone (const struct sparse_xarray *old)
+{
+  struct sparse_xarray *new = xmalloc (sizeof *new);
+
+  new->n_bytes = old->n_bytes;
+
+  new->default_row = xmemdup (old->default_row, old->n_bytes);
+
+  new->max_memory_rows = old->max_memory_rows;
+
+  if (old->memory != NULL)
+    {
+      unsigned long int idx;
+      uint8_t **old_row;
+
+      new->memory = sparse_array_create (sizeof (uint8_t *));
+      for (old_row = sparse_array_first (old->memory, &idx); old_row != NULL;
+           old_row = sparse_array_next (old->memory, idx, &idx))
+        {
+          uint8_t **new_row = sparse_array_insert (new->memory, idx);
+          *new_row = xmemdup (*old_row, new->n_bytes);
+        }
+    }
+  else
+    new->memory = NULL;
+
+  if (old->disk != NULL)
+    {
+      const struct range_set_node *node;
+      void *tmp = xmalloc (old->n_bytes);
+
+      new->disk = tmpfile_create ();
+      new->disk_rows = range_set_clone (old->disk_rows, NULL);
+      for (node = range_set_first (old->disk_rows); node != NULL;
+           node = range_set_next (old->disk_rows, node))
+        {
+          unsigned long int start = range_set_node_get_start (node);
+          unsigned long int end = range_set_node_get_end (node);
+          unsigned long int idx;
+
+          for (idx = start; idx < end; idx++)
+            {
+              off_t offset = (off_t) idx * old->n_bytes;
+              if (!tmpfile_read (old->disk, offset, old->n_bytes, tmp)
+                  || !tmpfile_write (new->disk, offset, old->n_bytes, tmp))
+                {
+                  free (tmp);
+                  sparse_xarray_destroy (new);
+                  return NULL;
+                }
+            }
+        }
+      free (tmp);
+    }
+  else
+    {
+      new->disk = NULL;
+      new->disk_rows = NULL;
+    }
+
+  return new;
+}
+
+/* Destroys sparse array of rows SX. */
+void
+sparse_xarray_destroy (struct sparse_xarray *sx)
+{
+  if (sx != NULL)
+    {
+      free (sx->default_row);
+      if (sx->memory != NULL)
+        {
+          unsigned long int idx;
+          uint8_t **row;
+          for (row = sparse_array_first (sx->memory, &idx); row != NULL;
+               row = sparse_array_next (sx->memory, idx, &idx))
+            free (*row);
+          sparse_array_destroy (sx->memory);
+        }
+      tmpfile_destroy (sx->disk);
+      range_set_destroy (sx->disk_rows);
+      free (sx);
+    }
+}
+
+/* Returns the number of bytes in each row in SX. */
+size_t
+sparse_xarray_get_n_columns (const struct sparse_xarray *sx)
+{
+  return sx->n_bytes;
+}
+
+/* Returns the number of rows in SX. */
+size_t
+sparse_xarray_get_n_rows (const struct sparse_xarray *sx)
+{
+  if (sx->memory)
+    {
+      unsigned long int idx;
+      return sparse_array_last (sx->memory, &idx) != NULL ? idx + 1 : 0;
+    }
+  else
+    {
+      const struct range_set_node *last = range_set_last (sx->disk_rows);
+      return last != NULL ? range_set_node_get_end (last) : 0;
+    }
+}
+
+/* Dumps the rows in SX, which must currently be stored in
+   memory, to disk.  Returns true if successful, false on I/O
+   error. */
+static bool
+dump_sparse_xarray_to_disk (struct sparse_xarray *sx)
+{
+  unsigned long int idx;
+  uint8_t **row;
+
+  assert (sx->memory != NULL);
+  assert (sx->disk == NULL);
+
+  sx->disk = tmpfile_create ();
+  sx->disk_rows = range_set_create ();
+
+  for (row = sparse_array_first (sx->memory, &idx); row != NULL;
+       row = sparse_array_next (sx->memory, idx, &idx))
+    {
+      if (!tmpfile_write (sx->disk, (off_t) idx * sx->n_bytes, sx->n_bytes,
+                          *row))
+        {
+          tmpfile_destroy (sx->disk);
+          sx->disk = NULL;
+          range_set_destroy (sx->disk_rows);
+          sx->disk_rows = NULL;
+          return false;
+        }
+      range_set_insert (sx->disk_rows, idx, 1);
+    }
+  sparse_array_destroy (sx->memory);
+  sx->memory = NULL;
+  return true;
+}
+
+/* Returns true if any data has ever been written to ROW in SX,
+   false otherwise. */
+bool
+sparse_xarray_contains_row (const struct sparse_xarray *sx,
+                            unsigned long int row)
+{
+  return (sx->memory != NULL
+          ? sparse_array_get (sx->memory, row) != NULL
+          : range_set_contains (sx->disk_rows, row));
+}
+
+/* Reads columns COLUMNS...(COLUMNS + VALUE_CNT), exclusive, in
+   the given ROW in SX, into the VALUE_CNT values in VALUES.
+   Returns true if successful, false on I/O error. */
+bool
+sparse_xarray_read (const struct sparse_xarray *sx, unsigned long int row,
+                    size_t start, size_t n, void *data)
+{
+  assert (range_is_valid (sx, start, n));
+
+  if (sx->memory != NULL)
+    {
+      uint8_t **p = sparse_array_get (sx->memory, row);
+      if (p != NULL)
+        {
+          memcpy (data, *p + start, n);
+          return true;
+        }
+    }
+  else
+    {
+      if (range_set_contains (sx->disk_rows, row))
+        return tmpfile_read (sx->disk, (off_t) row * sx->n_bytes + start,
+                             n, data);
+    }
+
+  memcpy (data, sx->default_row + start, n);
+  return true;
+}
+
+/* Implements sparse_xarray_write for an on-disk sparse_xarray. */
+static bool
+write_disk_row (struct sparse_xarray *sx, unsigned long int row,
+                size_t start, size_t n, const void *data)
+{
+  off_t ofs = (off_t) row * sx->n_bytes;
+  if (range_set_contains (sx->disk_rows, row))
+    return tmpfile_write (sx->disk, ofs + start, n, data);
+  else
+    {
+      range_set_insert (sx->disk_rows, row, 1);
+      return (tmpfile_write (sx->disk, ofs, start, sx->default_row)
+              && tmpfile_write (sx->disk, ofs + start, n, data)
+              && tmpfile_write (sx->disk, ofs + start + n,
+                                sx->n_bytes - start - n,
+                                sx->default_row + start + n));
+    }
+}
+
+/* Writes the VALUE_CNT values in VALUES into columns
+   COLUMNS...(COLUMNS + VALUE_CNT), exclusive, in the given ROW
+   in SX.
+   Returns true if successful, false on I/O error. */
+bool
+sparse_xarray_write (struct sparse_xarray *sx, unsigned long int row,
+                     size_t start, size_t n, const void *data)
+{
+  assert (range_is_valid (sx, start, n));
+  if (sx->memory != NULL)
+    {
+      uint8_t **p = sparse_array_get (sx->memory, row);
+      if (p == NULL)
+        {
+          if (sparse_array_count (sx->memory) < sx->max_memory_rows)
+            {
+              p = sparse_array_insert (sx->memory, row);
+              *p = xmemdup (sx->default_row, sx->n_bytes);
+            }
+          else
+            {
+              if (!dump_sparse_xarray_to_disk (sx))
+                return false;
+              return write_disk_row (sx, row, start, n, data);
+            }
+        }
+      memcpy (*p + start, data, n);
+      return true;
+    }
+  else
+    return write_disk_row (sx, row, start, n, data);
+}
+
+/* Writes the VALUE_CNT values in VALUES to columns
+   START_COLUMN...(START_COLUMN + VALUE_CNT), exclusive, in every
+   row in SX, even those rows that have not yet been written.
+   Returns true if successful, false on I/O error.
+
+   The runtime of this function is linear in the number of rows
+   in SX that have already been written. */
+bool
+sparse_xarray_write_columns (struct sparse_xarray *sx, size_t start,
+                             size_t n, const void *data)
+{
+  assert (range_is_valid (sx, start, n));
+
+  /* Set defaults. */
+  memcpy (sx->default_row + start, data, n);
+
+  /* Set individual rows. */
+  if (sx->memory != NULL)
+    {
+      unsigned long int idx;
+      uint8_t **p;
+
+      for (p = sparse_array_first (sx->memory, &idx); p != NULL;
+           p = sparse_array_next (sx->memory, idx, &idx))
+        memcpy (*p + start, data, n);
+    }
+  else
+    {
+      const struct range_set_node *node;
+
+      for (node = range_set_first (sx->disk_rows); node != NULL;
+           node = range_set_next (sx->disk_rows, node))
+        {
+          unsigned long int start_row = range_set_node_get_start (node);
+          unsigned long int end_row = range_set_node_get_end (node);
+          unsigned long int row;
+
+          for (row = start_row; row < end_row; row++)
+            {
+              off_t offset = (off_t) row * sx->n_bytes;
+              if (!tmpfile_write (sx->disk, offset + start, n, data))
+                break;
+            }
+        }
+
+      if (tmpfile_error (sx->disk))
+        return false;
+    }
+  return true;
+}
+
+static unsigned long int
+scan_first (const struct sparse_xarray *sx)
+{
+  if (sx->memory)
+    {
+      unsigned long int idx;
+      return sparse_array_first (sx->memory, &idx) ? idx : ULONG_MAX;
+    }
+  else
+    return range_set_scan (sx->disk_rows, 0);
+}
+
+static unsigned long int
+scan_next (const struct sparse_xarray *sx, unsigned long int start)
+{
+  if (sx->memory)
+    {
+      unsigned long int idx;
+      return sparse_array_next (sx->memory, start, &idx) ? idx : ULONG_MAX;
+    }
+  else
+    return range_set_scan (sx->disk_rows, start + 1);
+}
+
+/* Only works for rows for which sparse_xarray_contains_row()
+   would return true. */
+static uint8_t *
+get_row (const struct sparse_xarray *sx, unsigned long int idx,
+         uint8_t *buffer)
+{
+  if (sx->memory)
+    {
+      uint8_t **p = sparse_array_get (sx->memory, idx);
+      return *p;
+    }
+  else if (tmpfile_read (sx->disk, (off_t) idx * sx->n_bytes,
+                         sx->n_bytes, buffer))
+    return buffer;
+  else
+    return NULL;
+}
+
+/* Iterates over all the rows in SX and DX, passing each pair of
+   rows with equal indexes to CB.  CB's modifications, if any, to
+   destination rows are written back to DX.
+
+   All rows that are actually in use in SX or in DX or both are
+   passed to CB.  If a row is in use in SX but not in DX, or vice
+   versa, then the "default" row (as set by
+   sparse_xarray_write_columns) is passed as the contents of the
+   other row.
+
+   CB is also called once with the default row from SX and the
+   default row from DX.  Modifying the data passed as the default
+   row from DX will change DX's default row.
+
+   Returns true if successful, false if I/O on SX or DX fails or
+   if CB returns false.  On failure, the contents of DX are
+   undefined. */
+bool
+sparse_xarray_copy (const struct sparse_xarray *sx, struct sparse_xarray *dx,
+                    bool (*cb) (const void *src, void *dst, void *aux),
+                    void *aux)
+{
+  bool success = true;
+
+  if (!cb (sx->default_row, dx->default_row, aux))
+    return false;
+
+  if (sx == dx)
+    {
+      if (sx->memory)
+        {
+          unsigned long int idx;
+          uint8_t **row;
+
+          for (row = sparse_array_first (sx->memory, &idx); row != NULL;
+               row = sparse_array_next (sx->memory, idx, &idx))
+            {
+              success = cb (*row, *row, aux);
+              if (!success)
+                break;
+            }
+        }
+      else if (sx->disk)
+        {
+          const struct range_set_node *node;
+          void *tmp = xmalloc (sx->n_bytes);
+
+          for (node = range_set_first (sx->disk_rows); node != NULL;
+               node = range_set_next (sx->disk_rows, node))
+            {
+              unsigned long int start = range_set_node_get_start (node);
+              unsigned long int end = range_set_node_get_end (node);
+              unsigned long int row;
+
+              for (row = start; row < end; row++)
+                {
+                  off_t offset = (off_t) row * sx->n_bytes;
+                  success = (tmpfile_read (sx->disk, offset, sx->n_bytes, tmp)
+                             && cb (tmp, tmp, aux)
+                             && tmpfile_write (dx->disk, offset,
+                                               dx->n_bytes, tmp));
+                  if (!success)
+                    break;
+                }
+            }
+
+          free (tmp);
+        }
+    }
+  else
+    {
+      unsigned long int src_idx = scan_first (sx);
+      unsigned long int dst_idx = scan_first (dx);
+
+      uint8_t *tmp_src_row = xmalloc (sx->n_bytes);
+      uint8_t *tmp_dst_row = xmalloc (dx->n_bytes);
+
+      for (;;)
+        {
+          unsigned long int idx;
+          const uint8_t *src_row;
+          uint8_t *dst_row;
+
+          /* Determine the index of the row to process.  If
+             src_idx == dst_idx, then the row has been written in
+             both SX and DX.  Otherwise, it has been written in
+             only the sparse_xarray corresponding to the smaller
+             index, and has the default contents in the other. */
+          idx = MIN (src_idx, dst_idx);
+          if (idx == ULONG_MAX)
+            break;
+
+          /* Obtain a copy of the source row as src_row. */
+          if (idx == src_idx)
+            src_row = get_row (sx, idx, tmp_src_row);
+          else
+            src_row = sx->default_row;
+
+          /* Obtain the destination row as dst_row. */
+          if (idx == dst_idx)
+            dst_row = get_row (dx, idx, tmp_dst_row);
+          else if (dx->memory
+                   && sparse_array_count (dx->memory) < dx->max_memory_rows)
+            {
+              uint8_t **p = sparse_array_insert (dx->memory, idx);
+              dst_row = *p = xmemdup (dx->default_row, dx->n_bytes);
+            }
+          else
+            {
+              memcpy (tmp_dst_row, dx->default_row, dx->n_bytes);
+              dst_row = tmp_dst_row;
+            }
+
+          /* Run the callback. */
+          success = cb (src_row, dst_row, aux);
+          if (!success)
+            break;
+
+          /* Write back the destination row, if necessary. */
+          if (dst_row == tmp_dst_row)
+            {
+              success = sparse_xarray_write (dx, idx, 0, dx->n_bytes, dst_row);
+              if (!success)
+                break;
+            }
+          else
+            {
+              /* Nothing to do: we modified the destination row in-place. */
+            }
+
+          /* Advance to the next row. */
+          if (src_idx == idx)
+            src_idx = scan_next (sx, src_idx);
+          if (dst_idx == idx)
+            dst_idx = scan_next (dx, dst_idx);
+        }
+
+      free (tmp_src_row);
+      free (tmp_dst_row);
+    }
+
+  return success;
+}
+
+/* Returns a hash value for SX suitable for use with the model
+   checker.  The value in BASIS is folded into the hash.
+
+   The returned hash value is *not* suitable for storage and
+   retrieval of sparse_xarrays that have identical contents,
+   because it will return different hash values for
+   sparse_xarrays that have the same contents (and it's slow).
+
+   We use MD4 because it is much faster than MD5 or SHA-1 but its
+   collision resistance is just as good. */
+unsigned int
+sparse_xarray_model_checker_hash (const struct sparse_xarray *sx,
+                                  unsigned int basis)
+{
+  unsigned int hash[DIV_RND_UP (20, sizeof (unsigned int))];
+  struct md4_ctx ctx;
+
+  md4_init_ctx (&ctx);
+  md4_process_bytes (&basis, sizeof basis, &ctx);
+  md4_process_bytes (&sx->n_bytes, sizeof sx->n_bytes, &ctx);
+  md4_process_bytes (sx->default_row, sx->n_bytes, &ctx);
+
+  if (sx->memory)
+    {
+      unsigned long int idx;
+      uint8_t **row;
+
+      md4_process_bytes ("m", 1, &ctx);
+      md4_process_bytes (&sx->max_memory_rows, sizeof sx->max_memory_rows,
+                         &ctx);
+      for (row = sparse_array_first (sx->memory, &idx); row != NULL;
+           row = sparse_array_next (sx->memory, idx, &idx))
+        {
+          md4_process_bytes (&idx, sizeof idx, &ctx);
+          md4_process_bytes (*row, sx->n_bytes, &ctx);
+        }
+    }
+  else
+    {
+      const struct range_set_node *node;
+      void *tmp = xmalloc (sx->n_bytes);
+
+      md4_process_bytes ("d", 1, &ctx);
+      for (node = range_set_first (sx->disk_rows); node != NULL;
+           node = range_set_next (sx->disk_rows, node))
+        {
+          unsigned long int start = range_set_node_get_start (node);
+          unsigned long int end = range_set_node_get_end (node);
+          unsigned long int idx;
+
+          for (idx = start; idx < end; idx++)
+            {
+              off_t offset = (off_t) idx * sx->n_bytes;
+              if (!tmpfile_read (sx->disk, offset, sx->n_bytes, tmp))
+                NOT_REACHED ();
+              md4_process_bytes (&idx, sizeof idx, &ctx);
+              md4_process_bytes (tmp, sx->n_bytes, &ctx);
+            }
+        }
+      free (tmp);
+    }
+  md4_finish_ctx (&ctx, hash);
+  return hash[0];
+}
diff --git a/src/libpspp/sparse-xarray.h b/src/libpspp/sparse-xarray.h
new file mode 100644 (file)
index 0000000..15ba49f
--- /dev/null
@@ -0,0 +1,72 @@
+/* PSPP - a program for statistical analysis.
+   Copyright (C) 2007, 2009 Free Software Foundation, Inc.
+
+   This program is free software: you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation, either version 3 of the License, or
+   (at your option) any later version.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program.  If not, see <http://www.gnu.org/licenses/>. */
+
+/* Sparse 2-d array.
+
+   Implements a sparse array, in particular a sparse array of
+   byte arrays.  Each row is either present or absent, and each
+   row that is present consists of a fixed number of bytes
+   (columns).  Data in the array may be accessed randomly by
+   column and row.  When the number of rows stored in the array
+   is small, the data is stored in memory; when it is large, the
+   data is stored in a temporary file.
+
+   The sparse_xarray_write_columns function provides a somewhat
+   unusual ability: to write a given value to every row in a
+   column or set of columns.  This overwrites any values
+   previously written into those columns.  For rows that have
+   never been written, this function sets "default" values that
+   later writes can override.  The default values are initially
+   all zero bytes.
+
+   The array keeps track of which row have been written.  Reading
+   from a row that has never been written yields the default
+   values.  It is permissible to write to only some columns in a
+   row and leave the rest of the row's data at the default
+   values. */
+
+#ifndef LIBPSPP_SPARSE_XARRAY_H
+#define LIBPSPP_SPARSE_XARRAY_H 1
+
+#include <stddef.h>
+#include <stdbool.h>
+
+struct sparse_xarray *sparse_xarray_create (size_t n_columns,
+                                            size_t max_memory_rows);
+struct sparse_xarray *sparse_xarray_clone (const struct sparse_xarray *);
+void sparse_xarray_destroy (struct sparse_xarray *);
+
+size_t sparse_xarray_get_n_columns (const struct sparse_xarray *);
+size_t sparse_xarray_get_n_rows (const struct sparse_xarray *);
+
+bool sparse_xarray_contains_row (const struct sparse_xarray *,
+                                 unsigned long int row);
+bool sparse_xarray_read (const struct sparse_xarray *, unsigned long int row,
+                         size_t start, size_t n, void *);
+bool sparse_xarray_write (struct sparse_xarray *, unsigned long int row,
+                          size_t start, size_t n, const void *);
+bool sparse_xarray_write_columns (struct sparse_xarray *, size_t start,
+                                  size_t n, const void *);
+bool sparse_xarray_copy (const struct sparse_xarray *src,
+                         struct sparse_xarray *dst,
+                         bool (*cb) (const void *src, void *dst, void *aux),
+                         void *aux);
+
+/* For testing purposes only. */
+unsigned int sparse_xarray_model_checker_hash (const struct sparse_xarray *,
+                                               unsigned int basis);
+
+#endif /* libpspp/sparse-libpspp.h */
index 1e3b3a97d6a834ebce6e5031e12f300d63833ba6..533744e79b778e17409be1b73092f07561921bc3 100644 (file)
@@ -152,6 +152,7 @@ dist_TESTS = \
        tests/bugs/temp-freq.sh \
        tests/bugs/print-crash.sh \
        tests/bugs/keep-all.sh \
+       tests/libpspp/sparse-xarray-test.sh \
        tests/output/paper-size.sh \
        tests/xforms/recode.sh \
        tests/stats/descript-basic.sh \
@@ -194,7 +195,8 @@ TESTS = $(dist_TESTS) $(nodist_TESTS)
 
 check_PROGRAMS += \
        $(nodist_TESTS) \
-       tests/formats/inexactify
+       tests/formats/inexactify \
+       tests/libpspp/sparse-xarray-test
 
 tests_libpspp_ll_test_SOURCES = \
        src/libpspp/ll.c \
@@ -295,6 +297,21 @@ tests_libpspp_sparse_array_test_SOURCES = \
 tests_libpspp_sparse_array_test_LDADD = gl/libgl.la @LIBINTL@ 
 tests_libpspp_sparse_array_test_CPPFLAGS = $(AM_CPPFLAGS) -DASSERT_LEVEL=10
 
+tests_libpspp_sparse_xarray_test_SOURCES = \
+       src/libpspp/argv-parser.c \
+       src/libpspp/bt.c \
+       src/libpspp/deque.c \
+       src/libpspp/model-checker.c \
+       src/libpspp/range-set.c \
+       src/libpspp/sparse-array.c \
+       src/libpspp/sparse-xarray.c \
+       src/libpspp/str.c \
+       src/libpspp/pool.c \
+       src/libpspp/tmpfile.c \
+       tests/libpspp/sparse-xarray-test.c
+tests_libpspp_sparse_xarray_test_LDADD = gl/libgl.la @LIBINTL@ 
+tests_libpspp_sparse_xarray_test_CPPFLAGS = $(AM_CPPFLAGS) -DASSERT_LEVEL=10
+
 tests_formats_inexactify_SOURCES = tests/formats/inexactify.c
 
 noinst_PROGRAMS += tests/dissect-sysfile
diff --git a/tests/libpspp/sparse-xarray-test.c b/tests/libpspp/sparse-xarray-test.c
new file mode 100644 (file)
index 0000000..798b842
--- /dev/null
@@ -0,0 +1,644 @@
+/* PSPP - a program for statistical analysis.
+   Copyright (C) 2007, 2009 Free Software Foundation, Inc.
+
+   This program is free software: you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation, either version 3 of the License, or
+   (at your option) any later version.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program.  If not, see <http://www.gnu.org/licenses/>. */
+
+/* This is a test program for the sparse array routines defined
+   in sparse-xarray.c.  This test program aims to be as
+   comprehensive as possible.  "gcov -b" should report 100%
+   coverage of lines and branches in sparse-xarray.c when
+   compiled with -DNDEBUG.  "valgrind --leak-check=yes
+   --show-reachable=yes" should give a clean report. */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include <getopt.h>
+#include <assert.h>
+#include <limits.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <libpspp/argv-parser.h>
+#include <libpspp/assertion.h>
+#include <libpspp/hash-functions.h>
+#include <libpspp/model-checker.h>
+#include <libpspp/sparse-xarray.h>
+#include <libpspp/str.h>
+
+#include "error.h"
+#include "minmax.h"
+#include "progname.h"
+#include "xalloc.h"
+\f
+/* Maximum size of sparse_xarray supported for model checking
+   purposes. */
+#define MAX_ROWS 5
+#define MAX_COLS 5
+
+/* Test parameters. */
+struct test_params
+  {
+    /* Controlling the test state space. */
+    int n_columns;              /* Number of columns in each row. */
+    int max_rows;               /* Maximum number of rows. */
+    int max_memory_rows;        /* Max rows before writing to disk. */
+    unsigned char n_values;     /* Number of unique cell values. */
+    int n_xarrays;              /* Number of sparse_xarrays in state. */
+
+    /* Types of operations to perform. */
+    bool write_cells;           /* Write to individual cells. */
+    bool write_rows;            /* Write whole rows. */
+    bool write_columns;         /* Write whole columns. */
+    bool copy_within_xarray;    /* Copy column ranges in a single xarray. */
+  };
+
+struct test_state
+  {
+    struct sparse_xarray *xarrays[2];
+  };
+
+static void
+test_state_destroy (const struct test_params *params, struct test_state *ts)
+{
+  int i;
+
+  for (i = 0; i < params->n_xarrays; i++)
+    sparse_xarray_destroy (ts->xarrays[i]);
+}
+
+static struct test_state *
+test_state_clone (const struct test_params *params,
+                  const struct test_state *ots)
+{
+  struct test_state *ts;
+  int i;
+
+  ts = xmalloc (sizeof *ts);
+  for (i = 0; i < params->n_xarrays; i++)
+    {
+      ts->xarrays[i] = sparse_xarray_clone (ots->xarrays[i]);
+      if (ts->xarrays[i] == NULL)
+        NOT_REACHED ();
+    }
+  return ts;
+}
+
+struct xarray_model
+  {
+    uint8_t data[MAX_ROWS][MAX_COLS];
+    bool contains_row[MAX_ROWS];
+  };
+
+struct test_model
+  {
+    struct xarray_model models[2];
+  };
+
+/* Extracts the contents of TS into TM. */
+static void
+test_model_extract (const struct test_params *params,
+                    const struct test_state *ts, struct test_model *tm)
+{
+  int i;
+
+  for (i = 0; i < params->n_xarrays; i++)
+    {
+      const struct sparse_xarray *sx = ts->xarrays[i];
+      struct xarray_model *model = &tm->models[i];
+      size_t n_columns = sparse_xarray_get_n_columns (sx);
+      size_t n_rows = sparse_xarray_get_n_rows (sx);
+      size_t row;
+
+      assert (n_rows < MAX_ROWS);
+      assert (n_columns < MAX_COLS);
+      for (row = 0; row < params->max_rows; row++)
+        {
+          model->contains_row[row] = sparse_xarray_contains_row (sx, row);
+          if (!sparse_xarray_read (sx, row, 0, n_columns, model->data[row]))
+            NOT_REACHED ();
+        }
+    }
+}
+
+/* Checks that test state TS matches the test model TM and
+   reports any mismatches via mc_error.  Then, adds SX to MC as a
+   new state. */
+static void
+check_state (struct mc *mc, struct test_state *ts, const struct test_model *tm)
+{
+  const struct test_params *params = mc_get_aux (mc);
+  int n_columns = params->n_columns;
+  unsigned int hash;
+  int i;
+
+  for (i = 0; i < params->n_xarrays; i++)
+    {
+      const struct xarray_model *model = &tm->models[i];
+      const struct sparse_xarray *sx = ts->xarrays[i];
+      bool difference;
+      int row, col;
+      int n_rows;
+
+      assert (n_columns < MAX_COLS);
+
+      /* Check row count. */
+      n_rows = 0;
+      for (row = 0; row < params->max_rows; row++)
+        if (model->contains_row[row])
+          n_rows = row + 1;
+      if (n_rows != sparse_xarray_get_n_rows (sx))
+        mc_error (mc, "xarray %d: row count (%zu) does not match expected "
+                  "(%d)", i, sparse_xarray_get_n_rows (sx), n_rows);
+
+      /* Check row containment. */
+      for (row = 0; row < params->max_rows; row++)
+        {
+          bool contains = sparse_xarray_contains_row (sx, row);
+          if (contains && !model->contains_row[row])
+            mc_error (mc, "xarray %d: row %d is contained by sparse_xarray "
+                      "but should not be", i, row);
+          else if (!contains && model->contains_row[row])
+            mc_error (mc, "xarray %d: row %d is not contained by "
+                      "sparse_xarray but should be", i, row);
+        }
+
+      /* Check contents. */
+      difference = false;
+      for (row = 0; row < params->max_rows; row++)
+        {
+          unsigned char data[MAX_COLS];
+
+          if (!sparse_xarray_read (sx, row, 0, n_columns, data))
+            NOT_REACHED ();
+          for (col = 0; col < params->n_columns; col++)
+            if (data[col] != model->data[row][col])
+              {
+                mc_error (mc, "xarray %d: element %zu,%zu (of %zu,%zu) "
+                          "differs: %d should be %d",
+                          i, row, col, n_rows, n_columns, data[col],
+                          model->data[row][col]);
+                difference = true;
+              }
+        }
+
+      if (difference)
+        {
+          struct string ds;
+
+          mc_error (mc, "xarray %d: expected:", i);
+          ds_init_empty (&ds);
+          for (row = 0; row < params->max_rows; row++)
+            {
+              ds_clear (&ds);
+              for (col = 0; col < n_columns; col++)
+                ds_put_format (&ds, " %d", model->data[row][col]);
+              mc_error (mc, "xarray %d: row %zu:%s", i, row, ds_cstr (&ds));
+            }
+
+          mc_error (mc, "xarray %d: actual:", i);
+          ds_init_empty (&ds);
+          for (row = 0; row < params->max_rows; row++)
+            {
+              unsigned char data[MAX_COLS];
+
+              if (!sparse_xarray_read (sx, row, 0, n_columns, data))
+                NOT_REACHED ();
+
+              ds_clear (&ds);
+              for (col = 0; col < n_columns; col++)
+                ds_put_format (&ds, " %d", data[col]);
+              mc_error (mc, "xarray %d: row %zu:%s", i, row, ds_cstr (&ds));
+            }
+
+          ds_destroy (&ds);
+        }
+    }
+
+  hash = 0;
+  for (i = 0; i < params->n_xarrays; i++)
+    hash = sparse_xarray_model_checker_hash (ts->xarrays[i], hash);
+  if (mc_discard_dup_state (mc, hash))
+    test_state_destroy (params, ts);
+  else
+    mc_add_state (mc, ts);
+}
+
+static bool
+next_data (unsigned char *data, int n, int n_values)
+{
+  int i;
+  for (i = n - 1; i >= 0; i--)
+    {
+      data[i]++;
+      if (data[i] < n_values)
+        return true;
+      data[i] = 0;
+    }
+  return false;
+}
+
+struct copy_columns_params
+  {
+    int n;                      /* Number of columns to copy. */
+    int src;                    /* Offset of first source column. */
+    int dst;                    /* Offset of first destination column. */
+  };
+
+static bool
+copy_columns (const void *src_, void *dst_, void *copy_)
+{
+  const struct copy_columns_params *copy = copy_;
+  const uint8_t *src = src_;
+  uint8_t *dst = dst_;
+
+  memmove (dst + copy->dst, src + copy->src, copy->n);
+  return true;
+}
+
+/* "init" function for struct mc_class. */
+static void
+sparse_xarray_mc_init (struct mc *mc)
+{
+  struct test_params *params = mc_get_aux (mc);
+  struct test_state *ts;
+  struct test_model tm;
+  int i;
+
+  mc_name_operation (mc, "empty sparse_xarray with n_columns=%zu, "
+                     "max_memory_rows=%zu",
+                     params->n_columns, params->max_memory_rows);
+  ts = xmalloc (sizeof *ts);
+  for (i = 0; i < params->n_xarrays; i++)
+    ts->xarrays[i] = sparse_xarray_create (params->n_columns,
+                                           params->max_memory_rows);
+  memset (&tm, 0, sizeof tm);
+  check_state (mc, ts, &tm);
+}
+
+/* "mutate" function for struct mc_class. */
+static void
+sparse_xarray_mc_mutate (struct mc *mc, const void *ots_)
+{
+  struct test_params *params = mc_get_aux (mc);
+  size_t n_columns = params->n_columns;
+  const struct test_state *ots = ots_;
+  struct test_model otm;
+  int i;
+
+  test_model_extract (params, ots, &otm);
+  for (i = 0; i < params->n_xarrays; i++)
+    {
+      unsigned char value;
+      int row, col, n, src, dst;
+
+      /* Write all possible values to each possible single cell. */
+      if (params->write_cells)
+        for (row = 0; row < params->max_rows; row++)
+          for (col = 0; col < n_columns; col++)
+            for (value = 0; value < params->n_values; value++)
+              if (mc_include_state (mc))
+                {
+                  struct test_state *ts = test_state_clone (params, ots);
+                  struct sparse_xarray *sx = ts->xarrays[i];
+                  struct test_model tm = otm;
+                  struct xarray_model *model = &tm.models[i];
+
+                  mc_name_operation (mc, "xarray %d: set (%d,%d) to %d",
+                                     i, row, col, value);
+                  if (!sparse_xarray_write (sx, row, col, 1, &value))
+                    NOT_REACHED ();
+                  model->data[row][col] = value;
+                  model->contains_row[row] = true;
+                  check_state (mc, ts, &tm);
+                }
+
+      /* Write all possible row contents to each row. */
+      if (params->write_rows)
+        for (row = 0; row < params->max_rows; row++)
+          {
+            struct test_model tm = otm;
+            struct xarray_model *model = &tm.models[i];
+
+            memset (model->data[row], 0, n_columns);
+            model->contains_row[row] = true;
+            do
+              {
+                if (mc_include_state (mc))
+                  {
+                    struct test_state *ts = test_state_clone (params, ots);
+                    struct sparse_xarray *sx = ts->xarrays[i];
+                    char row_string[MAX_COLS + 1];
+
+                    mc_name_operation (mc, "xarray %d: set row %d to %s",
+                                       i, row, row_string);
+                    for (col = 0; col < n_columns; col++)
+                      {
+                        value = model->data[row][col];
+                        row_string[col] = value < 10 ? '0' + value : '*';
+                      }
+                    row_string[n_columns] = '\0';
+                    if (!sparse_xarray_write (sx, row, 0, n_columns,
+                                              model->data[row]))
+                      NOT_REACHED ();
+                    check_state (mc, ts, &tm);
+                  }
+              }
+            while (next_data (model->data[row], n_columns, params->n_values));
+          }
+
+      /* Write all possible values to each possible column. */
+      if (params->write_columns)
+        for (col = 0; col < n_columns; col++)
+          for (value = 0; value < params->n_values; value++)
+            if (mc_include_state (mc))
+              {
+                struct test_state *ts = test_state_clone (params, ots);
+                struct sparse_xarray *sx = ts->xarrays[i];
+                struct test_model tm = otm;
+                struct xarray_model *model = &tm.models[i];
+
+                mc_name_operation (mc, "xarray %d: write value %d to "
+                                   "column %d", i, value, col);
+                if (!sparse_xarray_write_columns (sx, col, 1, &value))
+                  NOT_REACHED ();
+                for (row = 0; row < params->max_rows; row++)
+                  model->data[row][col] = value;
+                check_state (mc, ts, &tm);
+              }
+
+      /* Copy all possible column ranges within a single sparse_xarray. */
+      if (params->copy_within_xarray)
+        for (n = 1; n <= n_columns; n++)
+          for (src = 0; src <= n_columns - n; src++)
+            for (dst = 0; dst <= n_columns - n; dst++)
+              if (mc_include_state (mc))
+                {
+                  struct copy_columns_params copy_aux;
+                  struct test_state *ts = test_state_clone (params, ots);
+                  struct sparse_xarray *sx = ts->xarrays[i];
+                  struct test_model tm = otm;
+                  struct xarray_model *model = &tm.models[i];
+
+                  mc_name_operation (mc, "xarray %d: copy %d columns from "
+                                     "offset %d to offset %d", i, n, src, dst);
+
+                  copy_aux.n = n;
+                  copy_aux.src = src;
+                  copy_aux.dst = dst;
+                  if (!sparse_xarray_copy (sx, sx, copy_columns, &copy_aux))
+                    NOT_REACHED ();
+
+                  for (row = 0; row < params->max_rows; row++)
+                    memmove (&model->data[row][dst],
+                             &model->data[row][src], n);
+
+                  check_state (mc, ts, &tm);
+                }
+    }
+
+  if (params->n_xarrays == 2)
+    {
+      int row, n, src, dst;
+
+      /* Copy all possible column ranges from xarrays[0] to xarrays[1]. */
+      for (n = 1; n <= n_columns; n++)
+        for (src = 0; src <= n_columns - n; src++)
+          for (dst = 0; dst <= n_columns - n; dst++)
+            if (mc_include_state (mc))
+              {
+                struct copy_columns_params copy_aux;
+                struct test_state *ts = test_state_clone (params, ots);
+                struct test_model tm = otm;
+
+                mc_name_operation (mc, "copy %d columns from offset %d in "
+                                   "xarray 0 to offset %d in xarray 1",
+                                   n, src, dst);
+
+                copy_aux.n = n;
+                copy_aux.src = src;
+                copy_aux.dst = dst;
+                if (!sparse_xarray_copy (ts->xarrays[0], ts->xarrays[1],
+                                         copy_columns, &copy_aux))
+                  NOT_REACHED ();
+
+                for (row = 0; row < params->max_rows; row++)
+                  {
+                    if (tm.models[0].contains_row[row])
+                      tm.models[1].contains_row[row] = true;
+                    memmove (&tm.models[1].data[row][dst],
+                             &tm.models[0].data[row][src], n);
+                  }
+
+                check_state (mc, ts, &tm);
+              }
+    }
+}
+
+/* "destroy" function for struct mc_class. */
+static void
+sparse_xarray_mc_destroy (const struct mc *mc UNUSED, void *ts_)
+{
+  struct test_params *params = mc_get_aux (mc);
+  struct test_state *ts = ts_;
+
+  test_state_destroy (params, ts);
+}
+
+static void
+usage (void)
+{
+  printf ("%s, for testing the sparse_xarray implementation.\n"
+          "Usage: %s [OPTION]...\n"
+          "\nTest state space parameters (min...max, default):\n"
+          "  --columns=N          Number of columns per row (0...5, 3)\n"
+          "  --max-rows=N         Maximum number of rows (0...5, 3)\n"
+          "  --max-memory-rows=N  Max rows before paging to disk (0...5, 3)\n"
+          "  --values=N           Number of unique cell values (1...254, 3)\n"
+          "  --xarrays=N          Number of xarrays at a time (1...2, 1)\n"
+          "\nTest operation parameters:\n"
+          "  --no-write-cells     Do not write individual cells\n"
+          "  --no-write-rows      Do not write whole rows\n"
+          "  --no-write-columns   Do not write whole columns\n"
+          "  --no-copy-columns    Do not copy column ranges in an xarray\n",
+          program_name, program_name);
+  mc_options_usage ();
+  fputs ("\nOther options:\n"
+         "  --help               Display this help message\n"
+         "\nReport bugs to <bug-gnu-pspp@gnu.org>\n",
+         stdout);
+  exit (0);
+}
+\f
+enum
+  {
+    OPT_COLUMNS,
+    OPT_MAX_ROWS,
+    OPT_MAX_MEMORY_ROWS,
+    OPT_VALUES,
+    OPT_XARRAYS,
+    OPT_NO_WRITE_CELLS,
+    OPT_NO_WRITE_ROWS,
+    OPT_NO_WRITE_COLUMNS,
+    OPT_NO_COPY_COLUMNS,
+    OPT_HELP
+  };
+
+static struct argv_option sparse_xarray_argv_options[] =
+  {
+    {"columns", 0, required_argument, OPT_COLUMNS},
+    {"max-rows", 0, required_argument, OPT_MAX_ROWS},
+    {"max-memory-rows", 0, required_argument, OPT_MAX_MEMORY_ROWS},
+    {"values", 0, required_argument, OPT_VALUES},
+    {"xarrays", 0, required_argument, OPT_XARRAYS},
+    {"no-write-cells", 0, no_argument, OPT_NO_WRITE_CELLS},
+    {"no-write-rows", 0, no_argument, OPT_NO_WRITE_ROWS},
+    {"no-write-columns", 0, no_argument, OPT_NO_WRITE_COLUMNS},
+    {"no-copy-columns", 0, no_argument, OPT_NO_COPY_COLUMNS},
+    {"help", 'h', no_argument, OPT_HELP},
+  };
+enum { N_SPARSE_XARRAY_ARGV_OPTIONS = (sizeof sparse_xarray_argv_options
+                                       / sizeof *sparse_xarray_argv_options) };
+
+static void
+sparse_xarray_option_callback (int id, void *params_)
+{
+  struct test_params *params = params_;
+  switch (id)
+    {
+    case OPT_COLUMNS:
+      params->n_columns = atoi (optarg);
+      break;
+
+    case OPT_MAX_ROWS:
+      params->max_rows = atoi (optarg);
+      break;
+
+    case OPT_MAX_MEMORY_ROWS:
+      params->max_memory_rows = atoi (optarg);
+      break;
+
+    case OPT_VALUES:
+      params->n_values = atoi (optarg);
+      break;
+
+    case OPT_XARRAYS:
+      params->n_xarrays = atoi (optarg);
+      break;
+
+    case OPT_NO_WRITE_CELLS:
+      params->write_cells = false;
+      break;
+
+    case OPT_NO_WRITE_ROWS:
+      params->write_rows = false;
+      break;
+
+    case OPT_NO_WRITE_COLUMNS:
+      params->write_columns = false;
+      break;
+
+    case OPT_NO_COPY_COLUMNS:
+      params->copy_within_xarray = false;
+      break;
+
+    case OPT_HELP:
+      usage ();
+      break;
+
+    default:
+      NOT_REACHED ();
+    }
+}
+
+int
+main (int argc, char *argv[])
+{
+  static const struct mc_class sparse_xarray_mc_class =
+    {
+      sparse_xarray_mc_init,
+      sparse_xarray_mc_mutate,
+      sparse_xarray_mc_destroy,
+    };
+
+  struct test_params params;
+  struct mc_options *options;
+  struct mc_results *results;
+  struct argv_parser *parser;
+  int verbosity;
+  bool success;
+
+  set_program_name (argv[0]);
+
+  /* Default parameters. */
+  params.n_columns = 3;
+  params.max_rows = 3;
+  params.max_memory_rows = 3;
+  params.n_values = 3;
+  params.n_xarrays = 1;
+  params.write_cells = true;
+  params.write_rows = true;
+  params.write_columns = true;
+  params.copy_within_xarray = true;
+
+  /* Parse command line. */
+  parser = argv_parser_create ();
+  options = mc_options_create ();
+  mc_options_register_argv_parser (options, parser);
+  argv_parser_add_options (parser, sparse_xarray_argv_options,
+                           N_SPARSE_XARRAY_ARGV_OPTIONS,
+                           sparse_xarray_option_callback, &params);
+  if (!argv_parser_run (parser, argc, argv))
+    exit (EXIT_FAILURE);
+  argv_parser_destroy (parser);
+  verbosity = mc_options_get_verbosity (options);
+
+  /* Force parameters into allowed ranges. */
+  params.n_columns = MAX (0, MIN (params.n_columns, MAX_COLS));
+  params.max_rows = MAX (0, MIN (params.max_rows, MAX_ROWS));
+  params.max_memory_rows = MAX (0, MIN (params.max_memory_rows,
+                                        params.max_rows));
+  params.n_values = MIN (254, MAX (1, params.n_values));
+  params.n_xarrays = MAX (1, MIN (2, params.n_xarrays));
+  mc_options_set_aux (options, &params);
+  results = mc_run (&sparse_xarray_mc_class, options);
+
+  /* Output results. */
+  success = (mc_results_get_stop_reason (results) != MC_MAX_ERROR_COUNT
+             && mc_results_get_stop_reason (results) != MC_INTERRUPTED);
+  if (verbosity > 0 || !success)
+    {
+      printf ("Parameters: "
+              "--columns=%d --max-rows=%d --max-memory-rows=%d --values=%d "
+              "--xarrays=%d",
+              params.n_columns, params.max_rows, params.max_memory_rows,
+              params.n_values, params.n_xarrays);
+      if (!params.write_cells)
+        printf (" --no-write-cells");
+      if (!params.write_rows)
+        printf (" --no-write-rows");
+      if (!params.write_columns)
+        printf (" --no-write-columns");
+      if (!params.copy_within_xarray)
+        printf (" --no-copy-columns");
+      printf ("\n\n");
+      mc_results_print (results, stdout);
+    }
+  mc_results_destroy (results);
+
+  return success ? 0 : EXIT_FAILURE;
+}
diff --git a/tests/libpspp/sparse-xarray-test.sh b/tests/libpspp/sparse-xarray-test.sh
new file mode 100755 (executable)
index 0000000..87defa9
--- /dev/null
@@ -0,0 +1,62 @@
+#!/bin/sh
+
+# This program tests the sparse_xarray abstract data type implementation.
+
+set -e
+
+: ${top_builddir:=.}
+RUN_TEST="${top_builddir}/tests/libpspp/sparse-xarray-test --verbosity=0"
+
+# Each on-disk sparse_xarray eats up a file descriptor, so for the
+# tests that involve on-disk sparse_xarrays we need to limit the
+# maximum length of the queue.  Figure out how many file descriptors
+# we can let the test program open at once.
+OPEN_MAX=`getconf OPEN_MAX 2>/dev/null`
+case $OPEN_MAX in
+    [0-9]*)
+       # Divide by 2 because some fds are used by other code.
+       queue_limit=`expr $OPEN_MAX / 2` 
+       ;;
+    undefined) 
+       # Assume that any system with a dynamic fd limit has a large limit.
+       queue_limit=500 
+       ;;
+    *)
+       case `uname -m 2>/dev/null` in
+           CYGWIN*)
+                # Cygwin claims a 256-fd limit as OPEN_MAX in <limits.h>.
+               queue_limit=128
+               ;;
+           MINGW*)
+               # The following email claims that Mingw should have a
+               # 2048-fd limit:
+               # http://www.mail-archive.com/squid-users@squid-cache.org/msg35249.html
+               queue_limit=1024
+               ;;
+           *)
+               # This seems fairly conservative these days.
+               queue_limit=50
+               ;;
+       esac
+       ;;
+esac
+
+# Test completely in-memory sparse_xarray.  --values=3 would be a
+# slightly better test but takes much longer.
+$RUN_TEST --columns=3 --max-rows=3 --max-memory-rows=3 --values=2
+
+# Test on-disk sparse_xarrays.
+for max_memory_rows in 0 1 2; do
+    $RUN_TEST --columns=2 --max-rows=3 --max-memory-rows=$max_memory_rows --values=2 --queue-limit=$queue_limit
+done
+
+# Test copying back and forth between a pair of sparse_xarrays in
+# memory.
+$RUN_TEST --columns=2 --max-rows=2 --max-memory-rows=2 --values=2 --xarrays=2 --no-write-rows --no-copy-columns
+
+# Test copying back and forth between a pair of sparse_xarrays on
+# disk.  These parameters are ridiculously low, but it's necessary
+# unless we want the tests to take a very long time.
+for max_memory_rows in 0 1; do
+    $RUN_TEST --columns=1 --max-rows=2 --max-memory-rows=$max_memory_rows --values=2 --xarrays=2 --queue-limit=`expr $queue_limit / 2` --no-write-rows --no-copy-columns
+done