Change "union value" to dynamically allocate long strings.
[pspp-builds.git] / src / data / datasheet.c
index 64dea786877796dfdbffd2f2daa6eeb698fd4e69..4a48368546ad71715a0b99f241251efb35b9a928 100644 (file)
 #include <data/casereader.h>
 #include <data/casewriter.h>
 #include <data/lazy-casereader.h>
-#include <data/sparse-cases.h>
+#include <data/settings.h>
 #include <libpspp/array.h>
 #include <libpspp/assertion.h>
+#include <libpspp/misc.h>
 #include <libpspp/range-map.h>
 #include <libpspp/range-set.h>
+#include <libpspp/sparse-xarray.h>
 #include <libpspp/taint.h>
 #include <libpspp/tower.h>
 
@@ -37,6 +39,8 @@
 #include "md4.h"
 #include "xalloc.h"
 
+struct column;
+
 static struct axis *axis_create (void);
 static struct axis *axis_clone (const struct axis *);
 static void axis_destroy (struct axis *);
@@ -65,27 +69,25 @@ static void axis_move (struct axis *,
                        unsigned long int new_start,
                        unsigned long int cnt);
 
-static struct source *source_create_empty (size_t column_cnt);
+static struct source *source_create_empty (size_t n_bytes);
 static struct source *source_create_casereader (struct casereader *);
 static struct source *source_clone (const struct source *);
 static void source_destroy (struct source *);
 
-static casenumber source_get_backing_row_cnt (const struct source *);
-static size_t source_get_column_cnt (const struct source *);
-
-static bool source_read (const struct source *,
-                         casenumber row, size_t column,
-                         union value[], size_t value_cnt);
-static bool source_write (struct source *,
-                          casenumber row, size_t column,
-                          const union value[], size_t value_cnt);
-static bool source_write_columns (struct source *, size_t start_column,
-                                  const union value[], size_t value_cnt);
-static bool source_has_backing (const struct source *);
-static void source_increase_use (struct source *, size_t delta);
-static void source_decrease_use (struct source *, size_t delta);
+static casenumber source_get_backing_n_rows (const struct source *);
+
+static int source_allocate_column (struct source *, int width);
+static void source_release_column (struct source *, int ofs, int width);
 static bool source_in_use (const struct source *);
 
+static bool source_read (const struct column *, casenumber row, union value *);
+static bool source_write (const struct column *, casenumber row,
+                          const union value *);
+static bool source_write_column (struct column *, const union value *);
+static bool source_has_backing (const struct source *);
+
+static int get_source_index (const struct datasheet *ds, const struct source *source);
+
 /* A datasheet is internally composed from a set of data files,
    called "sources".  The sources that make up a datasheet must
    have the same number of rows (cases), but their numbers of
@@ -99,37 +101,47 @@ static bool source_in_use (const struct source *);
    to the column mapping.
 
    Each source in a datasheet can be a casereader or a
-   sparse_cases.  Casereaders are read-only, so when sources made
-   from casereaders need to be modified, it is done "virtually"
-   through being overlaid by a sparse_cases. */
+   sparse_xarray.  Casereaders are read-only, so when sources
+   made from casereaders need to be modified, it is done
+   "virtually" through being overlaid by a sparse_xarray. */
+struct source
+  {
+    struct range_set *avail;    /* Free bytes are set to 1s. */
+    struct sparse_xarray *data; /* Data at top level, atop the backing. */
+    struct casereader *backing; /* Backing casereader (or null). */
+    casenumber backing_rows;    /* Number of rows in backing (if backed). */
+    size_t n_used;              /* Number of column in use (if backed). */
+  };
+
+/* A logical column. */
+struct column
+  {
+    struct source *source;      /* Source of the underlying physical column. */
+    int value_ofs;              /* If 'source' has a backing casereader,
+                                   column's value offset in its cases. */
+    int byte_ofs;               /* Byte offset in source's sparse_xarray. */
+    int width;                  /* 0=numeric, otherwise string width. */
+  };
 
 /* A datasheet. */
 struct datasheet
-{
-  /* Mappings from logical to physical columns/rows. */
-  struct axis *columns;
-  struct axis *rows;
-
-  /* Mapping from physical columns to "source_info"s. */
-  struct range_map sources;
+  {
+    /* Data sources. */
+    struct source **sources;    /* Sources, in no particular order. */
+    size_t n_sources;           /* Number of sources. */
 
-  /* Minimum number of columns to put in a new source when we
-     need new columns and none are free.  We double it whenever
-     we add a new source to keep the number of file descriptors
-     needed by the datasheet to a minimum, reducing the
-     likelihood of running out. */
-  unsigned column_min_alloc;
+    /* Columns. */
+    struct caseproto *proto;    /* Prototype for rows (initialized lazily). */
+    struct column *columns;     /* Logical to physical column mapping. */
+    size_t n_columns;           /* Number of logical columns. */
+    unsigned column_min_alloc;  /* Min. # of columns to put in a new source. */
 
-  /* Indicates corrupted data in the datasheet. */
-  struct taint *taint;
-};
+    /* Rows. */
+    struct axis *rows;          /* Logical to physical row mapping. */
 
-/* Maps from a range of physical columns to a source. */
-struct source_info
-{
-  struct range_map_node column_range;
-  struct source *source;
-};
+    /* Tainting. */
+    struct taint *taint;        /* Indicates corrupted data. */
+  };
 
 /* Is this operation a read or a write? */
 enum rw_op
@@ -138,13 +150,51 @@ enum rw_op
     OP_WRITE
   };
 
-static void free_source_info (struct datasheet *, struct source_info *);
-static struct source_info *source_info_from_range_map (
-                                                      struct range_map_node *);
+static void allocate_column (struct datasheet *, int width, struct column *);
+static void release_source (struct datasheet *, struct source *);
 static bool rw_case (struct datasheet *ds, enum rw_op op,
-                     casenumber lrow, size_t start_column, size_t column_cnt,
+                     casenumber lrow, size_t start_column, size_t n_columns,
                      union value data[]);
 
+/* Returns the number of bytes needed to store a value with the
+   given WIDTH on disk. */
+static size_t
+width_to_n_bytes (int width)
+{
+  return width == 0 ? sizeof (double) : width;
+}
+
+/* Returns the address of the data in VALUE (for reading or
+   writing to/from disk).  VALUE must have the given WIDTH. */
+static void *
+value_to_data (const union value *value_, int width)
+{
+  union value *value = (union value *) value_;
+  assert (sizeof value->f == sizeof (double));
+  if (width == 0)
+    return &value->f;
+  else
+    return value_str_rw (value, width);
+}
+
+/* Returns the number of bytes needed to store all the values in
+   PROTO on disk. */
+static size_t
+caseproto_to_n_bytes (const struct caseproto *proto)
+{
+  size_t n_bytes;
+  size_t i;
+
+  n_bytes = 0;
+  for (i = 0; i < caseproto_get_n_widths (proto); i++)
+    {
+      int width = caseproto_get_width (proto, i);
+      if (width >= 0)
+        n_bytes += width_to_n_bytes (width);
+    }
+  return n_bytes;
+}
+
 /* Creates and returns a new datasheet.
 
    If READER is nonnull, then the datasheet initially contains
@@ -152,42 +202,50 @@ static bool rw_case (struct datasheet *ds, enum rw_op op,
 struct datasheet *
 datasheet_create (struct casereader *reader)
 {
-  /* Create datasheet. */
   struct datasheet *ds = xmalloc (sizeof *ds);
-  ds->columns = axis_create ();
+  ds->sources = NULL;
+  ds->n_sources = 0;
+  ds->proto = NULL;
+  ds->columns = NULL;
+  ds->n_columns = 0;
+  ds->column_min_alloc = 8;
   ds->rows = axis_create ();
-  range_map_init (&ds->sources);
-  ds->column_min_alloc = 1;
   ds->taint = taint_create ();
 
-  /* Add backing. */
   if (reader != NULL)
     {
-      size_t column_cnt;
-      casenumber row_cnt;
-      struct source_info *si;
-
-      si = xmalloc (sizeof *si);
-      si->source = source_create_casereader (reader);
-      column_cnt = source_get_column_cnt (si->source);
-      row_cnt = source_get_backing_row_cnt (si->source);
-      source_increase_use (si->source, column_cnt);
-
-      if ( column_cnt > 0 )
-       {
-         unsigned long int column_start;
-         column_start = axis_extend (ds->columns, column_cnt);
-         axis_insert (ds->columns, 0, column_start, column_cnt);
-         range_map_insert (&ds->sources, column_start, column_cnt,
-                           &si->column_range);
-       }
-
-      if ( row_cnt > 0 )
-       {
-         unsigned long int row_start;
-         row_start = axis_extend (ds->rows, row_cnt);
-         axis_insert (ds->rows, 0, row_start, row_cnt);
-       }
+      casenumber n_rows;
+      size_t byte_ofs;
+      size_t i;
+
+      taint_propagate (casereader_get_taint (reader), ds->taint);
+
+      ds->proto = caseproto_ref (casereader_get_proto (reader));
+
+      ds->sources = xmalloc (sizeof *ds->sources);
+      ds->sources[0] = source_create_casereader (reader);
+      ds->n_sources = 1;
+
+      ds->n_columns = caseproto_get_n_widths (ds->proto);
+      ds->columns = xnmalloc (ds->n_columns, sizeof *ds->columns);
+      byte_ofs = 0;
+      for (i = 0; i < ds->n_columns; i++)
+        {
+          struct column *column = &ds->columns[i];
+          int width = caseproto_get_width (ds->proto, i);
+          column->source = ds->sources[0];
+          column->width = width;
+          if (width >= 0)
+            {
+              column->value_ofs = i;
+              column->byte_ofs = byte_ofs;
+              byte_ofs += width_to_n_bytes (column->width);
+            }
+        }
+
+      n_rows = source_get_backing_n_rows (ds->sources[0]);
+      if (n_rows > 0)
+        axis_insert (ds->rows, 0, axis_extend (ds->rows, n_rows), n_rows);
     }
 
   return ds;
@@ -197,21 +255,47 @@ datasheet_create (struct casereader *reader)
 void
 datasheet_destroy (struct datasheet *ds)
 {
+  size_t i;
+
   if (ds == NULL)
     return;
 
-  axis_destroy (ds->columns);
+  for (i = 0; i < ds->n_sources; i++)
+    source_destroy (ds->sources[i]);
+  free (ds->sources);
+  caseproto_unref (ds->proto);
+  free (ds->columns);
   axis_destroy (ds->rows);
-  while (!range_map_is_empty (&ds->sources))
-    {
-      struct range_map_node *r = range_map_first (&ds->sources);
-      struct source_info *si = source_info_from_range_map (r);
-      free_source_info (ds, si);
-    }
   taint_destroy (ds->taint);
   free (ds);
 }
 
+/* Returns the prototype for the cases in DS.  The caller must
+   not unref the returned prototype. */
+const struct caseproto *
+datasheet_get_proto (const struct datasheet *ds_)
+{
+  struct datasheet *ds = (struct datasheet *) ds_;
+  if (ds->proto == NULL)
+    {
+      size_t i;
+
+      ds->proto = caseproto_create ();
+      for (i = 0; i < ds->n_columns; i++)
+        ds->proto = caseproto_add_width (ds->proto, ds->columns[i].width);
+    }
+  return ds->proto;
+}
+
+/* Returns the width of the given COLUMN within DS.
+   COLUMN must be less than the number of columns in DS. */
+int
+datasheet_get_column_width (const struct datasheet *ds, size_t column)
+{
+  assert (column < datasheet_get_n_columns (ds));
+  return ds->columns[column].width;
+}
+
 /* Moves datasheet DS to a new location in memory, and returns
    the new location.  Afterward, the datasheet must not be
    accessed at its former location.
@@ -252,141 +336,209 @@ datasheet_get_taint (const struct datasheet *ds)
 
 /* Returns the number of rows in DS. */
 casenumber
-datasheet_get_row_cnt (const struct datasheet *ds)
+datasheet_get_n_rows (const struct datasheet *ds)
 {
   return axis_get_size (ds->rows);
 }
 
 /* Returns the number of columns in DS. */
 size_t
-datasheet_get_column_cnt (const struct datasheet *ds)
+datasheet_get_n_columns (const struct datasheet *ds)
 {
-  return axis_get_size (ds->columns);
+  return ds->n_columns;
 }
 
-/* Inserts CNT columns into datasheet DS just before column
-   BEFORE.  Initializes the contents of each row in the inserted
-   columns to the CNT values in INIT_VALUES.
+/* Inserts a column of the given WIDTH into datasheet DS just
+   before column BEFORE.  Initializes the contents of each row in
+   the inserted column to VALUE (which must have width WIDTH).
 
    Returns true if successful, false on failure.  In case of
    failure, the datasheet is unchanged. */
 bool
-datasheet_insert_columns (struct datasheet *ds,
-                          const union value init_values[], size_t cnt,
-                          size_t before)
+datasheet_insert_column (struct datasheet *ds,
+                         const union value *value, int width, size_t before)
 {
-  size_t added = 0;
-  while (cnt > 0)
-    {
-      unsigned long first_phy; /* First allocated physical column. */
-      unsigned long phy_cnt;   /* Number of allocated physical columns. */
+  struct column *col;
 
-      /* Allocate physical columns from the pool of available
-         columns. */
-      if (!axis_allocate (ds->columns, cnt, &first_phy, &phy_cnt))
-        {
-          /* No columns were available.  Create a new source and
-             extend the axis to make some new ones available. */
-          struct source_info *si;
-
-          phy_cnt = MAX (cnt, ds->column_min_alloc);
-          first_phy = axis_extend (ds->columns, phy_cnt);
-          ds->column_min_alloc = MIN (65536, ds->column_min_alloc * 2);
-
-          si = xmalloc (sizeof *si);
-          si->source = source_create_empty (phy_cnt);
-          range_map_insert (&ds->sources, first_phy, phy_cnt,
-                            &si->column_range);
-          if (phy_cnt > cnt)
-            {
-              axis_make_available (ds->columns, first_phy + cnt,
-                                   phy_cnt - cnt);
-              phy_cnt = cnt;
-            }
-        }
+  ds->columns = xnrealloc (ds->columns,
+                           ds->n_columns + 1, sizeof *ds->columns);
+  insert_element (ds->columns, ds->n_columns, sizeof *ds->columns, before);
+  col = &ds->columns[before];
+  ds->n_columns++;
 
-      /* Initialize the columns and insert them into the columns
-         axis. */
-      while (phy_cnt > 0)
-        {
-          struct range_map_node *r; /* Range map holding FIRST_PHY column. */
-          struct source_info *s;    /* Source containing FIRST_PHY column. */
-          size_t source_avail;      /* Number of phys columns available. */
-          size_t source_cnt;        /* Number of phys columns to use. */
-
-          /* Figure out how many columns we can and want to take
-             starting at FIRST_PHY, and then insert them into the
-             columns axis. */
-          r = range_map_lookup (&ds->sources, first_phy);
-          s = source_info_from_range_map (r);
-          source_avail = range_map_node_get_end (r) - first_phy;
-          source_cnt = MIN (phy_cnt, source_avail);
-          axis_insert (ds->columns, before, first_phy, source_cnt);
-
-          /* Initialize the data for those columns in the
-             source. */
-          if (!source_write_columns (s->source,
-                                     first_phy - range_map_node_get_start (r),
-                                     init_values, source_cnt))
-            {
-              datasheet_delete_columns (ds, before - added,
-                                        source_cnt + added);
-              taint_set_taint (ds->taint);
-              return false;
-            }
-          source_increase_use (s->source, source_cnt);
-
-          /* Advance. */
-          phy_cnt -= source_cnt;
-          first_phy += source_cnt;
-          init_values += source_cnt;
-          cnt -= source_cnt;
-          before += source_cnt;
-          added += source_cnt;
-        }
+  allocate_column (ds, width, col);
+
+  if (width >= 0 && !source_write_column (col, value))
+    {
+      datasheet_delete_columns (ds, before, 1);
+      taint_set_taint (ds->taint);
+      return false;
     }
+
   return true;
 }
 
-/* Deletes the CNT columns in DS starting from column START. */
+/* Deletes the N columns in DS starting from column START. */
 void
-datasheet_delete_columns (struct datasheet *ds, size_t start, size_t cnt)
+datasheet_delete_columns (struct datasheet *ds, size_t start, size_t n)
 {
-  size_t lcol;
-
-  assert ( start + cnt <= axis_get_size (ds->columns) );
-
-  /* Free up columns for reuse. */
-  for (lcol = start; lcol < start + cnt; lcol++)
+  if (n > 0)
     {
-      size_t pcol = axis_map (ds->columns, lcol);
-      struct range_map_node *r = range_map_lookup (&ds->sources, pcol);
-      struct source_info *si = source_info_from_range_map (r);
+      size_t i;
 
-      source_decrease_use (si->source, 1);
-      if (source_has_backing (si->source))
+      for (i = start; i < start + n; i++)
         {
-          if (!source_in_use (si->source))
-            free_source_info (ds, si);
+          struct column *column = &ds->columns[i];
+          struct source *source = column->source;
+          source_release_column (source, column->byte_ofs, column->width);
+          release_source (ds, source);
         }
-      else
-        axis_make_available (ds->columns, pcol, 1);
-    }
 
-  /* Remove columns from logical-to-physical mapping. */
-  axis_remove (ds->columns, start, cnt);
+      remove_range (ds->columns, ds->n_columns, sizeof *ds->columns, start, n);
+      ds->n_columns -= n;
+
+      caseproto_unref (ds->proto);
+      ds->proto = NULL;
+    }
 }
 
-/* Moves the CNT columns in DS starting at position OLD_START so
+/* Moves the N columns in DS starting at position OLD_START so
    that they then start at position NEW_START.  Equivalent to
    deleting the column rows, then inserting them at what becomes
-   position NEW_START after the deletion.*/
+   position NEW_START after the deletion. */
 void
 datasheet_move_columns (struct datasheet *ds,
                         size_t old_start, size_t new_start,
-                        size_t cnt)
+                        size_t n)
+{
+  move_range (ds->columns, ds->n_columns, sizeof *ds->columns,
+              old_start, new_start, n);
+
+  caseproto_unref (ds->proto);
+  ds->proto = NULL;
+}
+
+struct resize_datasheet_value_aux
+  {
+    union value src_value;
+    size_t src_ofs;
+    int src_width;
+
+    void (*resize_cb) (const union value *, union value *, void *aux);
+    void *resize_cb_aux;
+
+    union value dst_value;
+    size_t dst_ofs;
+    int dst_width;
+  };
+
+static bool
+resize_datasheet_value (const void *src, void *dst, void *aux_)
 {
-  axis_move (ds->columns, old_start, new_start, cnt);
+  struct resize_datasheet_value_aux *aux = aux_;
+
+  memcpy (value_to_data (&aux->src_value, aux->src_width),
+          (uint8_t *) src + aux->src_ofs,
+          width_to_n_bytes (aux->src_width));
+
+  aux->resize_cb (&aux->src_value, &aux->dst_value, aux->resize_cb_aux);
+
+  memcpy ((uint8_t *) dst + aux->dst_ofs,
+          value_to_data (&aux->dst_value, aux->dst_width),
+          width_to_n_bytes (aux->dst_width));
+
+  return true;
+}
+
+bool
+datasheet_resize_column (struct datasheet *ds, size_t column, int new_width,
+                         void (*resize_cb) (const union value *,
+                                            union value *, void *aux),
+                         void *resize_cb_aux)
+{
+  /* XXX needs a test. */
+  struct column old_col;
+  struct column *col;
+  int old_width;
+
+  assert (column < datasheet_get_n_columns (ds));
+
+  col = &ds->columns[column];
+  old_col = *col;
+  old_width = old_col.width;
+
+  if (old_width == new_width)
+    {
+      /* FIXME: for consistency, we should call resize_cb() on
+         each row. */
+    }
+  else if (new_width == -1)
+    {
+      datasheet_delete_columns (ds, column, 1);
+      datasheet_insert_column (ds, NULL, -1, column);
+    }
+  else if (old_width == -1)
+    {
+      union value value;
+      value_init (&value, new_width);
+      value_set_missing (&value, new_width);
+      if (resize_cb != NULL)
+        resize_cb (NULL, &value, resize_cb_aux);
+      datasheet_delete_columns (ds, column, 1);
+      datasheet_insert_column (ds, &value, new_width, column);
+      value_destroy (&value, new_width);
+    }
+  else if (source_has_backing (col->source))
+    {
+      unsigned long int n_rows = axis_get_size (ds->rows);
+      union value src, dst;
+      size_t row;
+
+      source_release_column (col->source, col->byte_ofs, col->width);
+      allocate_column (ds, new_width, col);
+
+      value_init (&src, old_width);
+      value_init (&dst, new_width);
+      for (row = 0; row < n_rows; row++)
+        {
+          if (!source_read (&old_col, row, &src))
+            {
+              /* FIXME: back out col changes. */
+              return false;
+            }
+          resize_cb (&src, &dst, resize_cb_aux);
+          if (!source_write (col, row, &dst))
+            {
+              /* FIXME: back out col changes. */
+              return false;
+            }
+        }
+
+      release_source (ds, old_col.source);
+    }
+  else
+    {
+      struct resize_datasheet_value_aux aux;
+
+      source_release_column (col->source, col->byte_ofs, col->width);
+      allocate_column (ds, new_width, col);
+
+      value_init (&aux.src_value, old_col.width);
+      aux.src_ofs = old_col.byte_ofs;
+      aux.src_width = old_col.width;
+      aux.resize_cb = resize_cb;
+      aux.resize_cb_aux = resize_cb_aux;
+      value_init (&aux.dst_value, new_width);
+      aux.dst_ofs = col->byte_ofs;
+      aux.dst_width = new_width;
+      sparse_xarray_copy (old_col.source->data, col->source->data,
+                          resize_datasheet_value, &aux);
+      value_destroy (&aux.src_value, old_width);
+      value_destroy (&aux.dst_value, new_width);
+
+      release_source (ds, old_col.source);
+    }
+  return true;
 }
 
 /* Retrieves and returns the contents of the given ROW in
@@ -396,10 +548,10 @@ datasheet_move_columns (struct datasheet *ds,
 struct ccase *
 datasheet_get_row (const struct datasheet *ds, casenumber row)
 {
-  size_t column_cnt = datasheet_get_column_cnt (ds);
-  struct ccase *c = case_create (column_cnt);
+  size_t n_columns = datasheet_get_n_columns (ds);
+  struct ccase *c = case_create (datasheet_get_proto (ds));
   if (rw_case ((struct datasheet *) ds, OP_READ,
-               row, 0, column_cnt, case_data_all_rw (c)))
+               row, 0, n_columns, case_data_all_rw (c)))
     return c;
   else
     {
@@ -415,35 +567,36 @@ datasheet_get_row (const struct datasheet *ds, casenumber row)
 bool
 datasheet_put_row (struct datasheet *ds, casenumber row, struct ccase *c)
 {
-  size_t column_cnt = datasheet_get_column_cnt (ds);
-  bool ok = rw_case (ds, OP_WRITE, row, 0, column_cnt,
+  size_t n_columns = datasheet_get_n_columns (ds);
+  bool ok = rw_case (ds, OP_WRITE, row, 0, n_columns,
                      (union value *) case_data_all (c));
   case_unref (c);
   return ok;
 }
 
-/* Stores the values of the WIDTH columns in DS in the given ROW
-   starting at COLUMN in DS into VALUES.  Returns true if
+/* Stores the values of COLUMN in DS in the given ROW in DS into
+   VALUE.  The caller must have already initialized VALUE as a
+   value of the appropriate width (as returned by
+   datasheet_get_column_width (DS, COLUMN)).  Returns true if
    successful, false on I/O error. */
 bool
-datasheet_get_value (const struct datasheet *ds, casenumber row, size_t column,
-                     union value *value, int width)
+datasheet_get_value (const struct datasheet *ds, casenumber row,
+                     size_t column, union value *value)
 {
-  assert ( row >= 0 );
-  return rw_case ((struct datasheet *) ds,
-                  OP_READ, row, column, value_cnt_from_width (width), value);
+  assert (row >= 0);
+  return rw_case ((struct datasheet *) ds, OP_READ, row, column, 1, value);
 }
 
-/* Stores the WIDTH given VALUES into the given ROW in DS
-   starting at COLUMN.  Returns true if successful, false on I/O
-   error.  On failure, the given ROW might be partially modified
-   or corrupted. */
+/* Stores VALUE into DS in the given ROW and COLUMN.  VALUE must
+   have the correct width for COLUMN (as returned by
+   datasheet_get_column_width (DS, COLUMN)).  Returns true if
+   successful, false on I/O error.  On failure, ROW might be
+   partially modified or corrupted. */
 bool
-datasheet_put_value (struct datasheet *ds, casenumber row, size_t column,
-                     const union value *value, int width)
+datasheet_put_value (struct datasheet *ds UNUSED, casenumber row UNUSED,
+                     size_t column UNUSED, const union value *value UNUSED)
 {
-  return rw_case (ds, OP_WRITE, row, column, value_cnt_from_width (width),
-                  (union value *) value);
+  return rw_case (ds, OP_WRITE, row, column, 1, (union value *) value);
 }
 
 /* Inserts the CNT cases at C into datasheet DS just before row
@@ -535,8 +688,8 @@ datasheet_make_reader (struct datasheet *ds)
 {
   struct casereader *reader;
   ds = datasheet_rename (ds);
-  reader = casereader_create_random (datasheet_get_column_cnt (ds),
-                                     datasheet_get_row_cnt (ds),
+  reader = casereader_create_random (datasheet_get_proto (ds),
+                                     datasheet_get_n_rows (ds),
                                      &datasheet_reader_class, ds);
   taint_propagate (datasheet_get_taint (ds), casereader_get_taint (reader));
   return reader;
@@ -548,7 +701,7 @@ datasheet_reader_read (struct casereader *reader UNUSED, void *ds_,
                        casenumber case_idx)
 {
   struct datasheet *ds = ds_;
-  if (case_idx < datasheet_get_row_cnt (ds))
+  if (case_idx < datasheet_get_n_rows (ds))
     {
       struct ccase *c = datasheet_get_row (ds, case_idx);
       if (c == NULL)
@@ -584,50 +737,93 @@ static const struct casereader_random_class datasheet_reader_class =
     datasheet_reader_advance,
   };
 \f
-/* Removes SI from DS's set of sources and destroys its
-   source. */
 static void
-free_source_info (struct datasheet *ds, struct source_info *si)
+allocate_column (struct datasheet *ds, int width, struct column *column)
 {
-  range_map_delete (&ds->sources, &si->column_range);
-  source_destroy (si->source);
-  free (si);
+  caseproto_unref (ds->proto);
+  ds->proto = NULL;
+
+  column->value_ofs = -1;
+  column->width = width;
+  if (width >= 0)
+    {
+      int n_bytes;
+      size_t i;
+
+      n_bytes = width_to_n_bytes (width);
+      for (i = 0; i < ds->n_sources; i++)
+        {
+          column->source = ds->sources[i];
+          column->byte_ofs = source_allocate_column (column->source, n_bytes);
+          if (column->byte_ofs >= 0)
+            return;
+        }
+
+      column->source = source_create_empty (MAX (n_bytes,
+                                                 ds->column_min_alloc));
+      ds->sources = xnrealloc (ds->sources,
+                               ds->n_sources + 1, sizeof *ds->sources);
+      ds->sources[ds->n_sources++] = column->source;
+
+      ds->column_min_alloc = MIN (65536, ds->column_min_alloc * 2);
+
+      column->byte_ofs = source_allocate_column (column->source, n_bytes);
+      assert (column->byte_ofs >= 0);
+    }
+  else
+    {
+      column->source = NULL;
+      column->byte_ofs = -1;
+    }
 }
 
-static struct source_info *
-source_info_from_range_map (struct range_map_node *node)
+static void
+release_source (struct datasheet *ds, struct source *source)
 {
-  return range_map_data (node, struct source_info, column_range);
+  if (source_has_backing (source) && !source_in_use (source))
+    {
+      /* Since only the first source to be added ever
+         has a backing, this source must have index
+         0.  */
+      assert (source == ds->sources[0]);
+      ds->sources[0] = ds->sources[--ds->n_sources];
+      source_destroy (source);
+    }
 }
 
 /* Reads (if OP is OP_READ) or writes (if op is OP_WRITE) the
-   COLUMN_CNT columns starting from column START_COLUMN in row
-   LROW to/from the COLUMN_CNT values in DATA. */
+   N_COLUMNS columns starting from column START_COLUMN in row
+   LROW to/from the N_COLUMNS values in DATA. */
 static bool
 rw_case (struct datasheet *ds, enum rw_op op,
-         casenumber lrow, size_t start_column, size_t column_cnt,
+         casenumber lrow, size_t start_column, size_t n_columns,
          union value data[])
 {
   casenumber prow;
-  size_t lcol;
+  size_t i;
 
-  assert (lrow < datasheet_get_row_cnt (ds));
-  assert (column_cnt <= datasheet_get_column_cnt (ds));
-  assert (start_column + column_cnt <= datasheet_get_column_cnt (ds));
+  assert (lrow < datasheet_get_n_rows (ds));
+  assert (n_columns <= datasheet_get_n_columns (ds));
+  assert (start_column + n_columns <= datasheet_get_n_columns (ds));
 
   prow = axis_map (ds->rows, lrow);
-  for (lcol = start_column; lcol < start_column + column_cnt; lcol++, data++)
+  for (i = 0; i < n_columns; i++)
     {
-      size_t pcol = axis_map (ds->columns, lcol);
-      struct range_map_node *r = range_map_lookup (&ds->sources, pcol);
-      struct source_info *s = source_info_from_range_map (r);
-      size_t pcol_ofs = pcol - range_map_node_get_start (r);
-      if (!(op == OP_READ
-            ? source_read (s->source, prow, pcol_ofs, data, 1)
-            : source_write (s->source, prow, pcol_ofs, data, 1)))
+      struct column *c = &ds->columns[start_column + i];
+      if (c->width >= 0)
         {
-          taint_set_taint (ds->taint);
-          return false;
+          bool ok;
+
+          if (op == OP_READ)
+            ok = source_read (c, prow, &data[i]);
+          else
+            ok = source_write (c, prow, &data[i]);
+
+          if (!ok)
+            {
+              taint_set_taint (ds->taint);
+              return false;
+            }
         }
     }
   return true;
@@ -640,9 +836,7 @@ rw_case (struct datasheet *ds, enum rw_op op,
    axis_map and axis_get_size functions inspect this mapping, and
    the axis_insert, axis_remove, and axis_move functions modify
    it.  Second, it tracks the set of ordinates that are unused
-   and available for reuse.  (Not all unused ordinates are
-   available for reuse: in particular, unused columns that are
-   backed by a casereader are never reused.)  The axis_allocate,
+   and available for reuse.  The axis_allocate,
    axis_make_available, and axis_extend functions affect the set
    of available ordinates. */
 struct axis
@@ -1018,24 +1212,21 @@ check_axis_merged (const struct axis *axis UNUSED)
 }
 \f
 /* A source. */
-struct source
-{
-  size_t columns_used;        /* Number of columns in use by client. */
-  struct sparse_cases *data;  /* Data at top level, atop the backing. */
-  struct casereader *backing; /* Backing casereader (or null). */
-  casenumber backing_rows;    /* Number of rows in backing (if nonnull). */
-};
 
-/* Creates and returns an empty, unbacked source with COLUMN_CNT
-   columns and an initial "columns_used" of 0. */
+/* Creates and returns an empty, unbacked source with N_BYTES
+   bytes per case, none of which are initially in use. */
 static struct source *
-source_create_empty (size_t column_cnt)
+source_create_empty (size_t n_bytes)
 {
   struct source *source = xmalloc (sizeof *source);
-  source->columns_used = 0;
-  source->data = sparse_cases_create (column_cnt);
+  size_t row_size = n_bytes + 4 * sizeof (void *);
+  size_t max_memory_rows = settings_get_workspace () / row_size;
+  source->avail = range_set_create ();
+  range_set_insert (source->avail, 0, n_bytes);
+  source->data = sparse_xarray_create (n_bytes, MAX (max_memory_rows, 4));
   source->backing = NULL;
   source->backing_rows = 0;
+  source->n_used = 0;
   return source;
 }
 
@@ -1044,10 +1235,22 @@ source_create_empty (size_t column_cnt)
 static struct source *
 source_create_casereader (struct casereader *reader)
 {
-  struct source *source
-    = source_create_empty (casereader_get_value_cnt (reader));
+  const struct caseproto *proto = casereader_get_proto (reader);
+  size_t n_bytes = caseproto_to_n_bytes (proto);
+  struct source *source = source_create_empty (n_bytes);
+  size_t n_columns;
+  size_t i;
+
+  range_set_delete (source->avail, 0, n_bytes);
   source->backing = reader;
   source->backing_rows = casereader_count_cases (reader);
+
+  source->n_used = 0;
+  n_columns = caseproto_get_n_widths (proto);
+  for (i = 0; i < n_columns; i++)
+    if (caseproto_get_width (proto, i) >= 0)
+      source->n_used++;
+
   return source;
 }
 
@@ -1060,10 +1263,11 @@ static struct source *
 source_clone (const struct source *old)
 {
   struct source *new = xmalloc (sizeof *new);
-  new->columns_used = old->columns_used;
-  new->data = sparse_cases_clone (old->data);
+  new->avail = range_set_clone (old->avail, NULL);
+  new->data = sparse_xarray_clone (old->data);
   new->backing = old->backing != NULL ? casereader_clone (old->backing) : NULL;
   new->backing_rows = old->backing_rows;
+  new->n_used = old->n_used;
   if (new->data == NULL)
     {
       source_destroy (new);
@@ -1072,23 +1276,28 @@ source_clone (const struct source *old)
   return new;
 }
 
-/* Increases the columns_used count of SOURCE by DELTA.
-   The new value must not exceed SOURCE's number of columns. */
-static void
-source_increase_use (struct source *source, size_t delta)
+static int
+source_allocate_column (struct source *source, int width)
 {
-  source->columns_used += delta;
-  assert (source->columns_used <= sparse_cases_get_value_cnt (source->data));
+  unsigned long int start;
+  int n_bytes;
+
+  assert (width >= 0);
+  n_bytes = width_to_n_bytes (width);
+  if (source->backing == NULL
+      && range_set_allocate_fully (source->avail, n_bytes, &start))
+    return start;
+  else
+    return -1;
 }
 
-/* Decreases the columns_used count of SOURCE by DELTA.
-   This must not attempt to decrease the columns_used count below
-   zero. */
 static void
-source_decrease_use (struct source *source, size_t delta)
+source_release_column (struct source *source, int ofs, int width)
 {
-  assert (delta <= source->columns_used);
-  source->columns_used -= delta;
+  assert (width >= 0);
+  range_set_insert (source->avail, ofs, width_to_n_bytes (width));
+  if (source->backing != NULL)
+    source->n_used--;
 }
 
 /* Returns true if SOURCE has any columns in use,
@@ -1096,7 +1305,7 @@ source_decrease_use (struct source *source, size_t delta)
 static bool
 source_in_use (const struct source *source)
 {
-  return source->columns_used > 0;
+  return source->n_used > 0;
 }
 
 /* Destroys SOURCE and its data and backing, if any. */
@@ -1105,7 +1314,8 @@ source_destroy (struct source *source)
 {
   if (source != NULL)
     {
-      sparse_cases_destroy (source->data);
+      range_set_destroy (source->avail);
+      sparse_xarray_destroy (source->data);
       casereader_destroy (source->backing);
       free (source);
     }
@@ -1114,88 +1324,99 @@ source_destroy (struct source *source)
 /* Returns the number of rows in SOURCE's backing casereader
    (SOURCE must have a backing casereader). */
 static casenumber
-source_get_backing_row_cnt (const struct source *source)
+source_get_backing_n_rows (const struct source *source)
 {
   assert (source_has_backing (source));
   return source->backing_rows;
 }
 
-/* Returns the number of columns in SOURCE. */
-static size_t
-source_get_column_cnt (const struct source *source)
-{
-  return sparse_cases_get_value_cnt (source->data);
-}
+/* Reads the given COLUMN from SOURCE in the given ROW, into
+   VALUE.  Returns true if successful, false on I/O error.
 
-/* Reads VALUE_CNT columns from SOURCE in the given ROW, starting
-   from COLUMN, into VALUES.  Returns true if successful, false
-   on I/O error. */
+   The caller must have initialized VALUE with the proper
+   width. */
 static bool
-source_read (const struct source *source,
-             casenumber row, size_t column,
-             union value values[], size_t value_cnt)
+source_read (const struct column *column, casenumber row, union value *value)
 {
-  if (source->backing == NULL || sparse_cases_contains_row (source->data, row))
-    return sparse_cases_read (source->data, row, column, values, value_cnt);
+  struct source *source = column->source;
+
+  assert (column->width >= 0);
+  if (source->backing == NULL
+      || sparse_xarray_contains_row (source->data, row))
+    return sparse_xarray_read (source->data, row, column->byte_ofs,
+                               width_to_n_bytes (column->width),
+                               value_to_data (value, column->width));
   else
     {
       struct ccase *c = casereader_peek (source->backing, row);
       bool ok = c != NULL;
       if (ok)
         {
-          case_copy_out (c, column, values, value_cnt);
+          value_copy (value, case_data_idx (c, column->value_ofs),
+                      column->width);
           case_unref (c);
         }
       return ok;
     }
 }
 
-/* Writes the VALUE_CNT values in VALUES to SOURCE in the given
-   ROW, starting at ROW.  Returns true if successful, false on
-   I/O error.  On error, the row's data may be completely or
-   partially corrupted, both inside and outside the region to be
-   written.  */
 static bool
-source_write (struct source *source,
-              casenumber row, size_t column,
-              const union value values[], size_t value_cnt)
+copy_case_into_source (struct source *source, struct ccase *c, casenumber row)
 {
-  size_t column_cnt = sparse_cases_get_value_cnt (source->data);
-  bool ok;
+  const struct caseproto *proto = casereader_get_proto (source->backing);
+  size_t n_widths = caseproto_get_n_widths (proto);
+  size_t ofs;
+  size_t i;
 
-  if (source->backing == NULL
-      || (column == 0 && value_cnt == column_cnt)
-      || sparse_cases_contains_row (source->data, row))
-    ok = sparse_cases_write (source->data, row, column, values, value_cnt);
-  else
+  ofs = 0;
+  for (i = 0; i < n_widths; i++)
     {
-      struct ccase *c;
-
-      if (row < source->backing_rows)
-        c = case_unshare (casereader_peek (source->backing, row));
-      else
+      int width = caseproto_get_width (proto, i);
+      if (width >= 0)
         {
-          /* It's not one of the backed rows.  Ideally, this
-             should never happen: we'd always be writing the full
-             contents of new, unbacked rows in a single call to
-             this function, so that the first case above would
-             trigger.  But that's a little difficult at higher
-             levels, so that we in fact usually write the full
-             contents of new, unbacked rows in multiple calls to
-             this function.  Make this work. */
-          c = case_create (column_cnt);
+          int n_bytes = width_to_n_bytes (width);
+          if (!sparse_xarray_write (source->data, row, ofs, n_bytes,
+                                    value_to_data (case_data_idx (c, i),
+                                                   width)))
+            return false;
+          ofs += n_bytes;
         }
-      ok = c != NULL;
+    }
+  return true;
+}
 
-      if (ok)
-        {
-          case_copy_in (c, column, values, value_cnt);
-          ok = sparse_cases_write (source->data, row, 0,
-                                   case_data_all (c), column_cnt);
-          case_unref (c);
-        }
+/* Writes VALUE to SOURCE in the given ROW and COLUMN.  Returns
+   true if successful, false on I/O error.  On error, the row's
+   data may be completely or partially corrupted, both inside and
+   outside the region to be written.  */
+static bool
+source_write (const struct column *column, casenumber row,
+              const union value *value)
+{
+  struct source *source = column->source;
+  struct casereader *backing = source->backing;
+
+  assert (column->width >= 0);
+  if (backing != NULL
+      && !sparse_xarray_contains_row (source->data, row)
+      && row < source->backing_rows)
+    {
+      struct ccase *c;
+      bool ok;
+
+      c = casereader_peek (backing, row);
+      if (c == NULL)
+        return false;
+
+      ok = copy_case_into_source (source, c, row);
+      case_unref (c);
+      if (!ok)
+        return false;
     }
-  return ok;
+
+  return sparse_xarray_write (source->data, row, column->byte_ofs,
+                              width_to_n_bytes (column->width),
+                              value_to_data (value, column->width));
 }
 
 /* Within SOURCE, which must not have a backing casereader,
@@ -1205,17 +1426,20 @@ source_write (struct source *source,
    false if an I/O error occurs.
 
    We don't support backing != NULL because (1) it's harder and
-   (2) source_write_columns is only called by
-   datasheet_insert_columns, which doesn't reuse columns from
+   (2) this function is only called by
+   datasheet_insert_column, which doesn't reuse columns from
    sources that are backed by casereaders. */
 static bool
-source_write_columns (struct source *source, size_t start_column,
-                      const union value values[], size_t value_cnt)
+source_write_column (struct column *column, const union value *value)
 {
-  assert (source->backing == NULL);
+  int width = column->width;
+
+  assert (column->source->backing == NULL);
+  assert (width >= 0);
 
-  return sparse_cases_write_columns (source->data, start_column,
-                                     values, value_cnt);
+  return sparse_xarray_write_columns (column->source->data, column->byte_ofs,
+                                      width_to_n_bytes (width),
+                                      value_to_data (value, width));
 }
 
 /* Returns true if SOURCE has a backing casereader, false
@@ -1228,11 +1452,16 @@ source_has_backing (const struct source *source)
 \f
 /* Datasheet model checker test driver. */
 
-/* Maximum size of datasheet supported for model checking
-   purposes. */
-#define MAX_ROWS 5
-#define MAX_COLS 5
+static int
+get_source_index (const struct datasheet *ds, const struct source *source)
+{
+  size_t i;
 
+  for (i = 0; i < ds->n_sources; i++)
+    if (ds->sources[i] == source)
+      return i;
+  NOT_REACHED ();
+}
 
 /* Clones the structure and contents of ODS into a new datasheet,
    and returns the new datasheet. */
@@ -1240,28 +1469,30 @@ struct datasheet *
 clone_datasheet (const struct datasheet *ods)
 {
   struct datasheet *ds;
-  struct range_map_node *r;
+  size_t i;
 
   ds = xmalloc (sizeof *ds);
-  ds->columns = axis_clone (ods->columns);
-  ds->rows = axis_clone (ods->rows);
-  range_map_init (&ds->sources);
-  for (r = range_map_first (&ods->sources); r != NULL;
-       r = range_map_next (&ods->sources, r))
-    {
-      const struct source_info *osi = source_info_from_range_map (r);
-      struct source_info *si = xmalloc (sizeof *si);
-      si->source = source_clone (osi->source);
-      range_map_insert (&ds->sources, range_map_node_get_start (r),
-                        range_map_node_get_width (r), &si->column_range);
-    }
+
+  ds->sources = xmalloc (ods->n_sources * sizeof *ds->sources);
+  for (i = 0; i < ods->n_sources; i++)
+    ds->sources[i] = source_clone (ods->sources[i]);
+  ds->n_sources = ods->n_sources;
+
+  ds->proto = ods->proto != NULL ? caseproto_ref (ods->proto) : NULL;
+  ds->columns = xmemdup (ods->columns, ods->n_columns * sizeof *ods->columns);
+  for (i = 0; i < ods->n_columns; i++)
+    ds->columns[i].source
+      = ds->sources[get_source_index (ods, ods->columns[i].source)];
+  ds->n_columns = ods->n_columns;
   ds->column_min_alloc = ods->column_min_alloc;
+
+  ds->rows = axis_clone (ods->rows);
+
   ds->taint = taint_create ();
 
   return ds;
 }
 
-
 /* Hashes the structure of datasheet DS and returns the hash.
    We use MD4 because it is much faster than MD5 or SHA-1 but its
    collision resistance is just as good. */
@@ -1270,22 +1501,20 @@ hash_datasheet (const struct datasheet *ds)
 {
   unsigned int hash[DIV_RND_UP (20, sizeof (unsigned int))];
   struct md4_ctx ctx;
-  struct range_map_node *r;
+  size_t i;
 
   md4_init_ctx (&ctx);
-  axis_hash (ds->columns, &ctx);
-  axis_hash (ds->rows, &ctx);
-  for (r = range_map_first (&ds->sources); r != NULL;
-       r = range_map_next (&ds->sources, r))
+  for (i = 0; i < ds->n_columns; i++)
     {
-      unsigned long int start = range_map_node_get_start (r);
-      unsigned long int end = range_map_node_get_end (r);
-      md4_process_bytes (&start, sizeof start, &ctx);
-      md4_process_bytes (&end, sizeof end, &ctx);
+      const struct column *column = &ds->columns[i];
+      int source_n_bytes = sparse_xarray_get_n_columns (column->source->data);
+      md4_process_bytes (&source_n_bytes, sizeof source_n_bytes, &ctx);
+      /*md4_process_bytes (&column->byte_ofs, sizeof column->byte_ofs, &ctx);*/
+      md4_process_bytes (&column->value_ofs, sizeof column->value_ofs, &ctx);
+      md4_process_bytes (&column->width, sizeof column->width, &ctx);
     }
-  md4_process_bytes (&ds->column_min_alloc, sizeof ds->column_min_alloc,
-                    &ctx);
+  axis_hash (ds->rows, &ctx);
+  md4_process_bytes (&ds->column_min_alloc, sizeof ds->column_min_alloc, &ctx);
   md4_finish_ctx (&ctx, hash);
   return hash[0];
 }
-