better tests
[pspp] / src / data / datasheet.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2007, 2009, 2010, 2011, 2012, 2013 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include "data/datasheet.h"
20
21 #include <stdlib.h>
22 #include <string.h>
23
24 #include "data/casereader-provider.h"
25 #include "data/casereader.h"
26 #include "data/casewriter.h"
27 #include "data/lazy-casereader.h"
28 #include "data/settings.h"
29 #include "libpspp/array.h"
30 #include "libpspp/assertion.h"
31 #include "libpspp/misc.h"
32 #include "libpspp/range-map.h"
33 #include "libpspp/range-set.h"
34 #include "libpspp/sparse-xarray.h"
35 #include "libpspp/taint.h"
36 #include "libpspp/tower.h"
37
38 #include "gl/minmax.h"
39 #include "gl/md4.h"
40 #include "gl/xalloc.h"
41
42 struct column;
43
44 static struct axis *axis_create (void);
45 static struct axis *axis_clone (const struct axis *);
46 static void axis_destroy (struct axis *);
47
48 static void axis_hash (const struct axis *, struct md4_ctx *);
49
50 static bool axis_allocate (struct axis *, unsigned long int request,
51                            unsigned long int *start,
52                            unsigned long int *width);
53 static void axis_make_available (struct axis *,
54                                  unsigned long int start,
55                                  unsigned long int width);
56 static unsigned long int axis_extend (struct axis *, unsigned long int width);
57
58 static unsigned long int axis_map (const struct axis *, unsigned long log_pos);
59
60 static unsigned long axis_get_size (const struct axis *);
61 static void axis_insert (struct axis *,
62                          unsigned long int log_start,
63                          unsigned long int phy_start,
64                          unsigned long int cnt);
65 static void axis_remove (struct axis *,
66                          unsigned long int start, unsigned long int cnt);
67 static void axis_move (struct axis *,
68                        unsigned long int old_start,
69                        unsigned long int new_start,
70                        unsigned long int cnt);
71
72 static struct source *source_create_empty (size_t n_bytes);
73 static struct source *source_create_casereader (struct casereader *);
74 static struct source *source_clone (const struct source *);
75 static void source_destroy (struct source *);
76
77 static casenumber source_get_backing_n_rows (const struct source *);
78
79 static int source_allocate_column (struct source *, int width);
80 static void source_release_column (struct source *, int ofs, int width);
81 static bool source_in_use (const struct source *);
82
83 static bool source_read (const struct column *, casenumber row, union value *,
84                          size_t n);
85 static bool source_write (const struct column *, casenumber row,
86                           const union value *, size_t n);
87 static bool source_write_column (struct column *, const union value *);
88 static bool source_has_backing (const struct source *);
89
90 static int get_source_index (const struct datasheet *ds, const struct source *source);
91
92 /* A datasheet is internally composed from a set of data files,
93    called "sources".  The sources that make up a datasheet must
94    have the same number of rows (cases), but their numbers of
95    columns (variables) may vary.
96
97    A datasheet's external view is produced by mapping (permuting
98    and selecting) its internal data.  Thus, we can rearrange or
99    delete rows or columns simply by modifying the mapping.  We
100    add rows by adding rows to each source and to the row mapping.
101    We add columns by adding a new source, then adding that source
102    to the column mapping.
103
104    Each source in a datasheet can be a casereader or a
105    sparse_xarray.  Casereaders are read-only, so when sources
106    made from casereaders need to be modified, it is done
107    "virtually" through being overlaid by a sparse_xarray. */
108 struct source
109   {
110     struct range_set *avail;    /* Free bytes are set to 1s. */
111     struct sparse_xarray *data; /* Data at top level, atop the backing. */
112     struct casereader *backing; /* Backing casereader (or null). */
113     casenumber backing_rows;    /* Number of rows in backing (if backed). */
114     size_t n_used;              /* Number of column in use (if backed). */
115   };
116
117 /* A logical column. */
118 struct column
119   {
120     struct source *source;      /* Source of the underlying physical column. */
121     int value_ofs;              /* If 'source' has a backing casereader,
122                                    column's value offset in its cases. */
123     int byte_ofs;               /* Byte offset in source's sparse_xarray. */
124     int width;                  /* 0=numeric, otherwise string width. */
125   };
126
127 /* A datasheet. */
128 struct datasheet
129   {
130     /* Data sources. */
131     struct source **sources;    /* Sources, in no particular order. */
132     size_t n_sources;           /* Number of sources. */
133
134     /* Columns. */
135     struct caseproto *proto;    /* Prototype for rows (initialized lazily). */
136     struct column *columns;     /* Logical to physical column mapping. */
137     size_t n_columns;           /* Number of logical columns. */
138     unsigned column_min_alloc;  /* Min. # of columns to put in a new source. */
139
140     /* Rows. */
141     struct axis *rows;          /* Logical to physical row mapping. */
142
143     /* Tainting. */
144     struct taint *taint;        /* Indicates corrupted data. */
145   };
146
147 /* Is this operation a read or a write? */
148 enum rw_op
149   {
150     OP_READ,
151     OP_WRITE
152   };
153
154 static void allocate_column (struct datasheet *, int width, struct column *);
155 static void release_source (struct datasheet *, struct source *);
156 static bool rw_case (struct datasheet *ds, enum rw_op op,
157                      casenumber lrow, size_t start_column, size_t n_columns,
158                      union value data[]);
159
160 /* Returns the number of bytes needed to store a value with the
161    given WIDTH on disk. */
162 static size_t
163 width_to_n_bytes (int width)
164 {
165   return width == 0 ? sizeof (double) : width;
166 }
167
168 /* Returns the address of the data in VALUE (for reading or
169    writing to/from disk).  VALUE must have the given WIDTH. */
170 static void *
171 value_to_data (const union value *value_, int width)
172 {
173   union value *value = (union value *) value_;
174   assert (sizeof value->f == sizeof (double));
175   if (width == 0)
176     return &value->f;
177   else
178     return value->s;
179 }
180
181 /* Returns the number of bytes needed to store all the values in
182    PROTO on disk. */
183 static size_t
184 caseproto_to_n_bytes (const struct caseproto *proto)
185 {
186   size_t n_bytes;
187   size_t i;
188
189   n_bytes = 0;
190   for (i = 0; i < caseproto_get_n_widths (proto); i++)
191     {
192       int width = caseproto_get_width (proto, i);
193       if (width >= 0)
194         n_bytes += width_to_n_bytes (width);
195     }
196   return n_bytes;
197 }
198
199 /* Creates and returns a new datasheet.
200
201    If READER is nonnull, then the datasheet initially contains
202    the contents of READER. */
203 struct datasheet *
204 datasheet_create (struct casereader *reader)
205 {
206   struct datasheet *ds = xmalloc (sizeof *ds);
207   ds->sources = NULL;
208   ds->n_sources = 0;
209   ds->proto = NULL;
210   ds->columns = NULL;
211   ds->n_columns = 0;
212   ds->column_min_alloc = 8;
213   ds->rows = axis_create ();
214   ds->taint = taint_create ();
215
216   if (reader != NULL)
217     {
218       casenumber n_rows;
219       size_t byte_ofs;
220       size_t i;
221
222       taint_propagate (casereader_get_taint (reader), ds->taint);
223
224       ds->proto = caseproto_ref (casereader_get_proto (reader));
225
226       ds->sources = xmalloc (sizeof *ds->sources);
227       ds->sources[0] = source_create_casereader (reader);
228       ds->n_sources = 1;
229
230       ds->n_columns = caseproto_get_n_widths (ds->proto);
231       ds->columns = xnmalloc (ds->n_columns, sizeof *ds->columns);
232       byte_ofs = 0;
233       for (i = 0; i < ds->n_columns; i++)
234         {
235           struct column *column = &ds->columns[i];
236           int width = caseproto_get_width (ds->proto, i);
237           column->source = ds->sources[0];
238           column->width = width;
239           if (width >= 0)
240             {
241               column->value_ofs = i;
242               column->byte_ofs = byte_ofs;
243               byte_ofs += width_to_n_bytes (column->width);
244             }
245         }
246
247       n_rows = source_get_backing_n_rows (ds->sources[0]);
248       if (n_rows > 0)
249         axis_insert (ds->rows, 0, axis_extend (ds->rows, n_rows), n_rows);
250     }
251
252   return ds;
253 }
254
255 /* Destroys datasheet DS. */
256 void
257 datasheet_destroy (struct datasheet *ds)
258 {
259   size_t i;
260
261   if (ds == NULL)
262     return;
263
264   for (i = 0; i < ds->n_sources; i++)
265     source_destroy (ds->sources[i]);
266   free (ds->sources);
267   caseproto_unref (ds->proto);
268   free (ds->columns);
269   axis_destroy (ds->rows);
270   taint_destroy (ds->taint);
271   free (ds);
272 }
273
274 /* Returns the prototype for the cases in DS.  The caller must
275    not unref the returned prototype. */
276 const struct caseproto *
277 datasheet_get_proto (const struct datasheet *ds_)
278 {
279   struct datasheet *ds = CONST_CAST (struct datasheet *, ds_);
280   if (ds->proto == NULL)
281     {
282       size_t i;
283
284       ds->proto = caseproto_create ();
285       for (i = 0; i < ds->n_columns; i++)
286         ds->proto = caseproto_add_width (ds->proto, ds->columns[i].width);
287     }
288   return ds->proto;
289 }
290
291 /* Returns the width of the given COLUMN within DS.
292    COLUMN must be less than the number of columns in DS. */
293 int
294 datasheet_get_column_width (const struct datasheet *ds, size_t column)
295 {
296   assert (column < datasheet_get_n_columns (ds));
297   return ds->columns[column].width;
298 }
299
300 /* Moves datasheet DS to a new location in memory, and returns
301    the new location.  Afterward, the datasheet must not be
302    accessed at its former location.
303
304    This function is useful for ensuring that all references to a
305    datasheet have been dropped, especially in conjunction with
306    tools like Valgrind. */
307 struct datasheet *
308 datasheet_rename (struct datasheet *ds)
309 {
310   struct datasheet *new = xmemdup (ds, sizeof *ds);
311   free (ds);
312   return new;
313 }
314
315 /* Returns true if datasheet DS is tainted.
316    A datasheet is tainted by an I/O error or by taint
317    propagation to the datasheet. */
318 bool
319 datasheet_error (const struct datasheet *ds)
320 {
321   return taint_is_tainted (ds->taint);
322 }
323
324 /* Marks datasheet DS tainted. */
325 void
326 datasheet_force_error (struct datasheet *ds)
327 {
328   taint_set_taint (ds->taint);
329 }
330
331 /* Returns datasheet DS's taint object. */
332 const struct taint *
333 datasheet_get_taint (const struct datasheet *ds)
334 {
335   return ds->taint;
336 }
337
338 /* Returns the number of rows in DS. */
339 casenumber
340 datasheet_get_n_rows (const struct datasheet *ds)
341 {
342   return axis_get_size (ds->rows);
343 }
344
345 /* Returns the number of columns in DS. */
346 size_t
347 datasheet_get_n_columns (const struct datasheet *ds)
348 {
349   return ds->n_columns;
350 }
351
352 /* Inserts a column of the given WIDTH into datasheet DS just
353    before column BEFORE.  Initializes the contents of each row in
354    the inserted column to VALUE (which must have width WIDTH).
355
356    Returns true if successful, false on failure.  In case of
357    failure, the datasheet is unchanged. */
358 bool
359 datasheet_insert_column (struct datasheet *ds,
360                          const union value *value, int width, size_t before)
361 {
362   struct column *col;
363
364   assert (before <= ds->n_columns);
365
366   ds->columns = xnrealloc (ds->columns,
367                            ds->n_columns + 1, sizeof *ds->columns);
368   insert_element (ds->columns, ds->n_columns, sizeof *ds->columns, before);
369   col = &ds->columns[before];
370   ds->n_columns++;
371
372   allocate_column (ds, width, col);
373
374   if (width >= 0 && !source_write_column (col, value))
375     {
376       datasheet_delete_columns (ds, before, 1);
377       taint_set_taint (ds->taint);
378       return false;
379     }
380
381   return true;
382 }
383
384 /* Deletes the N columns in DS starting from column START. */
385 void
386 datasheet_delete_columns (struct datasheet *ds, size_t start, size_t n)
387 {
388   assert (start + n <= ds->n_columns);
389
390   if (n > 0)
391     {
392       size_t i;
393
394       for (i = start; i < start + n; i++)
395         {
396           struct column *column = &ds->columns[i];
397           struct source *source = column->source;
398           source_release_column (source, column->byte_ofs, column->width);
399           release_source (ds, source);
400         }
401
402       remove_range (ds->columns, ds->n_columns, sizeof *ds->columns, start, n);
403       ds->n_columns -= n;
404
405       caseproto_unref (ds->proto);
406       ds->proto = NULL;
407     }
408 }
409
410 /* Moves the N columns in DS starting at position OLD_START so
411    that they then start at position NEW_START.  Equivalent to
412    deleting the column rows, then inserting them at what becomes
413    position NEW_START after the deletion. */
414 void
415 datasheet_move_columns (struct datasheet *ds,
416                         size_t old_start, size_t new_start,
417                         size_t n)
418 {
419   assert (old_start + n <= ds->n_columns);
420   assert (new_start + n <= ds->n_columns);
421
422   move_range (ds->columns, ds->n_columns, sizeof *ds->columns,
423               old_start, new_start, n);
424
425   caseproto_unref (ds->proto);
426   ds->proto = NULL;
427 }
428
429 struct resize_datasheet_value_aux
430   {
431     union value src_value;
432     size_t src_ofs;
433     int src_width;
434
435     void (*resize_cb) (const union value *, union value *, const void *aux);
436     const void *resize_cb_aux;
437
438     union value dst_value;
439     size_t dst_ofs;
440     int dst_width;
441   };
442
443 static bool
444 resize_datasheet_value (const void *src, void *dst, void *aux_)
445 {
446   struct resize_datasheet_value_aux *aux = aux_;
447
448   memcpy (value_to_data (&aux->src_value, aux->src_width),
449           (uint8_t *) src + aux->src_ofs,
450           width_to_n_bytes (aux->src_width));
451
452   aux->resize_cb (&aux->src_value, &aux->dst_value, aux->resize_cb_aux);
453
454   memcpy ((uint8_t *) dst + aux->dst_ofs,
455           value_to_data (&aux->dst_value, aux->dst_width),
456           width_to_n_bytes (aux->dst_width));
457
458   return true;
459 }
460
461 bool
462 datasheet_resize_column (struct datasheet *ds, size_t column, int new_width,
463                          void (*resize_cb) (const union value *,
464                                             union value *, const void *aux),
465                          const void *resize_cb_aux)
466 {
467   struct column old_col;
468   struct column *col;
469   int old_width;
470
471   assert (column < datasheet_get_n_columns (ds));
472
473   col = &ds->columns[column];
474   old_col = *col;
475   old_width = old_col.width;
476
477   if (new_width == -1)
478     {
479       if (old_width != -1)
480         {
481           datasheet_delete_columns (ds, column, 1);
482           datasheet_insert_column (ds, NULL, -1, column);
483         }
484     }
485   else if (old_width == -1)
486     {
487       union value value;
488       value_init (&value, new_width);
489       value_set_missing (&value, new_width);
490       if (resize_cb != NULL)
491         resize_cb (NULL, &value, resize_cb_aux);
492       datasheet_delete_columns (ds, column, 1);
493       datasheet_insert_column (ds, &value, new_width, column);
494       value_destroy (&value, new_width);
495     }
496   else if (source_has_backing (col->source))
497     {
498       unsigned long int n_rows = axis_get_size (ds->rows);
499       unsigned long int lrow;
500       union value src, dst;
501
502       source_release_column (col->source, col->byte_ofs, col->width);
503       allocate_column (ds, new_width, col);
504
505       value_init (&src, old_width);
506       value_init (&dst, new_width);
507       for (lrow = 0; lrow < n_rows; lrow++)
508         {
509           unsigned long int prow = axis_map (ds->rows, lrow);
510           if (!source_read (&old_col, prow, &src, 1))
511             {
512               /* FIXME: back out col changes. */
513               break;
514             }
515           resize_cb (&src, &dst, resize_cb_aux);
516           if (!source_write (col, prow, &dst, 1))
517             {
518               /* FIXME: back out col changes. */
519               break;
520             }
521         }
522       value_destroy (&src, old_width);
523       value_destroy (&dst, new_width);
524       if (lrow < n_rows)
525         return false;
526
527       release_source (ds, old_col.source);
528     }
529   else
530     {
531       struct resize_datasheet_value_aux aux;
532
533       source_release_column (col->source, col->byte_ofs, col->width);
534       allocate_column (ds, new_width, col);
535
536       value_init (&aux.src_value, old_col.width);
537       aux.src_ofs = old_col.byte_ofs;
538       aux.src_width = old_col.width;
539       aux.resize_cb = resize_cb;
540       aux.resize_cb_aux = resize_cb_aux;
541       value_init (&aux.dst_value, new_width);
542       aux.dst_ofs = col->byte_ofs;
543       aux.dst_width = new_width;
544       sparse_xarray_copy (old_col.source->data, col->source->data,
545                           resize_datasheet_value, &aux);
546       value_destroy (&aux.src_value, old_width);
547       value_destroy (&aux.dst_value, new_width);
548
549       release_source (ds, old_col.source);
550     }
551   return true;
552 }
553
554 /* Retrieves and returns the contents of the given ROW in
555    datasheet DS.  The caller owns the returned case and must
556    unref it when it is no longer needed.  Returns a null pointer
557    on I/O error. */
558 struct ccase *
559 datasheet_get_row (const struct datasheet *ds, casenumber row)
560 {
561   size_t n_columns = datasheet_get_n_columns (ds);
562   struct ccase *c = case_create (datasheet_get_proto (ds));
563   if (rw_case (CONST_CAST (struct datasheet *, ds), OP_READ,
564                row, 0, n_columns, case_data_all_rw (c)))
565     return c;
566   else
567     {
568       case_unref (c);
569       return NULL;
570     }
571 }
572
573 /* Stores the contents of case C, which is destroyed, into the
574    given ROW in DS.  Returns true on success, false on I/O error.
575    On failure, the given ROW might be partially modified or
576    corrupted. */
577 bool
578 datasheet_put_row (struct datasheet *ds, casenumber row, struct ccase *c)
579 {
580   size_t n_columns = datasheet_get_n_columns (ds);
581   bool ok = rw_case (ds, OP_WRITE, row, 0, n_columns,
582                      (union value *) case_data_all (c));
583   case_unref (c);
584   return ok;
585 }
586
587 /* Stores the values of COLUMN in DS in the given ROW in DS into
588    VALUE.  The caller must have already initialized VALUE as a
589    value of the appropriate width (as returned by
590    datasheet_get_column_width (DS, COLUMN)).  Returns true if
591    successful, false on I/O error. */
592 bool
593 datasheet_get_value (const struct datasheet *ds, casenumber row,
594                      size_t column, union value *value)
595 {
596   assert (row >= 0);
597   return rw_case (CONST_CAST (struct datasheet *, ds), OP_READ,
598                   row, column, 1, value);
599 }
600
601 /* Stores VALUE into DS in the given ROW and COLUMN.  VALUE must
602    have the correct width for COLUMN (as returned by
603    datasheet_get_column_width (DS, COLUMN)).  Returns true if
604    successful, false on I/O error.  On failure, ROW might be
605    partially modified or corrupted. */
606 bool
607 datasheet_put_value (struct datasheet *ds, casenumber row,
608                      size_t column, const union value *value)
609 {
610   return rw_case (ds, OP_WRITE, row, column, 1, (union value *) value);
611 }
612
613 /* Inserts the CNT cases at C into datasheet DS just before row
614    BEFORE.  Returns true if successful, false on I/O error.  On
615    failure, datasheet DS is not modified.
616
617    Regardless of success, this function unrefs all of the cases
618    in C. */
619 bool
620 datasheet_insert_rows (struct datasheet *ds,
621                        casenumber before, struct ccase *c[],
622                        casenumber cnt)
623 {
624   casenumber added = 0;
625   while (cnt > 0)
626     {
627       unsigned long first_phy;
628       unsigned long n_phys;
629       unsigned long i;
630
631       /* Allocate physical rows from the pool of available
632          rows. */
633       if (!axis_allocate (ds->rows, cnt, &first_phy, &n_phys))
634         {
635           /* No rows were available.  Extend the row axis to make
636              some new ones available. */
637           n_phys = cnt;
638           first_phy = axis_extend (ds->rows, cnt);
639         }
640
641       /* Insert the new rows into the row mapping. */
642       axis_insert (ds->rows, before, first_phy, n_phys);
643
644       /* Initialize the new rows. */
645       for (i = 0; i < n_phys; i++)
646         if (!datasheet_put_row (ds, before + i, c[i]))
647           {
648             while (++i < cnt)
649               case_unref (c[i]);
650             datasheet_delete_rows (ds, before - added, n_phys + added);
651             return false;
652           }
653
654       /* Advance. */
655       c += n_phys;
656       cnt -= n_phys;
657       before += n_phys;
658       added += n_phys;
659     }
660   return true;
661 }
662
663 /* Deletes the CNT rows in DS starting from row FIRST. */
664 void
665 datasheet_delete_rows (struct datasheet *ds,
666                        casenumber first, casenumber cnt)
667 {
668   size_t lrow;
669
670   /* Free up rows for reuse.
671      FIXME: optimize. */
672   for (lrow = first; lrow < first + cnt; lrow++)
673     axis_make_available (ds->rows, axis_map (ds->rows, lrow), 1);
674
675   /* Remove rows from logical-to-physical mapping. */
676   axis_remove (ds->rows, first, cnt);
677 }
678
679 /* Moves the CNT rows in DS starting at position OLD_START so
680    that they then start at position NEW_START.  Equivalent to
681    deleting the given rows, then inserting them at what becomes
682    position NEW_START after the deletion. */
683 void
684 datasheet_move_rows (struct datasheet *ds,
685                      size_t old_start, size_t new_start,
686                      size_t cnt)
687 {
688   axis_move (ds->rows, old_start, new_start, cnt);
689 }
690 \f
691 static const struct casereader_random_class datasheet_reader_class;
692
693 /* Creates and returns a casereader whose input cases are the
694    rows in datasheet DS.  From the caller's perspective, DS is
695    effectively destroyed by this operation, such that the caller
696    must not reference it again. */
697 struct casereader *
698 datasheet_make_reader (struct datasheet *ds)
699 {
700   struct casereader *reader;
701   ds = datasheet_rename (ds);
702   reader = casereader_create_random (datasheet_get_proto (ds),
703                                      datasheet_get_n_rows (ds),
704                                      &datasheet_reader_class, ds);
705   taint_propagate (datasheet_get_taint (ds), casereader_get_taint (reader));
706   return reader;
707 }
708
709 /* "read" function for the datasheet random casereader. */
710 static struct ccase *
711 datasheet_reader_read (struct casereader *reader UNUSED, void *ds_,
712                        casenumber case_idx)
713 {
714   struct datasheet *ds = ds_;
715   if (case_idx < datasheet_get_n_rows (ds))
716     {
717       struct ccase *c = datasheet_get_row (ds, case_idx);
718       if (c == NULL)
719         taint_set_taint (ds->taint);
720       return c;
721     }
722   else
723     return NULL;
724 }
725
726 /* "destroy" function for the datasheet random casereader. */
727 static void
728 datasheet_reader_destroy (struct casereader *reader UNUSED, void *ds_)
729 {
730   struct datasheet *ds = ds_;
731   datasheet_destroy (ds);
732 }
733
734 /* "advance" function for the datasheet random casereader. */
735 static void
736 datasheet_reader_advance (struct casereader *reader UNUSED, void *ds_,
737                           casenumber n_cases)
738 {
739   struct datasheet *ds = ds_;
740   datasheet_delete_rows (ds, 0, n_cases);
741 }
742
743 /* Random casereader class for a datasheet. */
744 static const struct casereader_random_class datasheet_reader_class =
745   {
746     datasheet_reader_read,
747     datasheet_reader_destroy,
748     datasheet_reader_advance,
749   };
750 \f
751 static void
752 allocate_column (struct datasheet *ds, int width, struct column *column)
753 {
754   caseproto_unref (ds->proto);
755   ds->proto = NULL;
756
757   column->value_ofs = -1;
758   column->width = width;
759   if (width >= 0)
760     {
761       int n_bytes;
762       size_t i;
763
764       n_bytes = width_to_n_bytes (width);
765       for (i = 0; i < ds->n_sources; i++)
766         {
767           column->source = ds->sources[i];
768           column->byte_ofs = source_allocate_column (column->source, n_bytes);
769           if (column->byte_ofs >= 0)
770             return;
771         }
772
773       column->source = source_create_empty (MAX (n_bytes,
774                                                  ds->column_min_alloc));
775       ds->sources = xnrealloc (ds->sources,
776                                ds->n_sources + 1, sizeof *ds->sources);
777       ds->sources[ds->n_sources++] = column->source;
778
779       ds->column_min_alloc = MIN (65536, ds->column_min_alloc * 2);
780
781       column->byte_ofs = source_allocate_column (column->source, n_bytes);
782       assert (column->byte_ofs >= 0);
783     }
784   else
785     {
786       column->source = NULL;
787       column->byte_ofs = -1;
788     }
789 }
790
791 static void
792 release_source (struct datasheet *ds, struct source *source)
793 {
794   if (source_has_backing (source) && !source_in_use (source))
795     {
796       /* Since only the first source to be added ever
797          has a backing, this source must have index
798          0.  */
799       assert (source == ds->sources[0]);
800       ds->sources[0] = ds->sources[--ds->n_sources];
801       source_destroy (source);
802     }
803 }
804
805 /* Reads (if OP is OP_READ) or writes (if op is OP_WRITE) the
806    N_COLUMNS columns starting from column START_COLUMN in row
807    LROW to/from the N_COLUMNS values in DATA. */
808 static bool
809 rw_case (struct datasheet *ds, enum rw_op op,
810          casenumber lrow, size_t start_column, size_t n_columns,
811          union value data[])
812 {
813   struct column *columns = &ds->columns[start_column];
814   casenumber prow;
815   size_t i;
816
817   assert (lrow < datasheet_get_n_rows (ds));
818   assert (n_columns <= datasheet_get_n_columns (ds));
819   assert (start_column + n_columns <= datasheet_get_n_columns (ds));
820
821   prow = axis_map (ds->rows, lrow);
822   for (i = 0; i < n_columns;)
823     {
824       struct source *source = columns[i].source;
825       size_t j;
826       bool ok;
827
828       if (columns[i].width < 0)
829         {
830           i++;
831           continue;
832         }
833
834       for (j = i + 1; j < n_columns; j++)
835         if (columns[j].width < 0 || columns[j].source != source)
836           break;
837
838       if (op == OP_READ)
839         ok = source_read (&columns[i], prow, &data[i], j - i);
840       else
841         ok = source_write (&columns[i], prow, &data[i], j - i);
842
843       if (!ok)
844         {
845           taint_set_taint (ds->taint);
846           return false;
847         }
848
849       i = j;
850     }
851   return true;
852 }
853 \f
854 /* An axis.
855
856    An axis has two functions.  First, it maintains a mapping from
857    logical (client-visible) to physical (storage) ordinates.  The
858    axis_map and axis_get_size functions inspect this mapping, and
859    the axis_insert, axis_remove, and axis_move functions modify
860    it.  Second, it tracks the set of ordinates that are unused
861    and available for reuse.  The axis_allocate,
862    axis_make_available, and axis_extend functions affect the set
863    of available ordinates. */
864 struct axis
865 {
866   struct tower log_to_phy;     /* Map from logical to physical ordinates;
867                                   contains "struct axis_group"s. */
868   struct range_set *available; /* Set of unused, available ordinates. */
869   unsigned long int phy_size;  /* Current physical length of axis. */
870 };
871
872 /* A mapping from logical to physical ordinates. */
873 struct axis_group
874 {
875   struct tower_node logical;  /* Range of logical ordinates. */
876   unsigned long phy_start;    /* First corresponding physical ordinate. */
877 };
878
879 static struct axis_group *axis_group_from_tower_node (struct tower_node *);
880 static struct tower_node *make_axis_group (unsigned long int phy_start);
881 static struct tower_node *split_axis (struct axis *, unsigned long int where);
882 static void merge_axis_nodes (struct axis *, struct tower_node *,
883                               struct tower_node **other_node);
884 static void check_axis_merged (const struct axis *axis UNUSED);
885
886 /* Creates and returns a new, initially empty axis. */
887 static struct axis *
888 axis_create (void)
889 {
890   struct axis *axis = xmalloc (sizeof *axis);
891   tower_init (&axis->log_to_phy);
892   axis->available = range_set_create ();
893   axis->phy_size = 0;
894   return axis;
895 }
896
897 /* Returns a clone of existing axis OLD.
898
899    Currently this is used only by the datasheet model checker
900    driver, but it could be otherwise useful. */
901 static struct axis *
902 axis_clone (const struct axis *old)
903 {
904   const struct tower_node *node;
905   struct axis *new;
906
907   new = xmalloc (sizeof *new);
908   tower_init (&new->log_to_phy);
909   new->available = range_set_clone (old->available, NULL);
910   new->phy_size = old->phy_size;
911
912   for (node = tower_first (&old->log_to_phy); node != NULL;
913        node = tower_next (&old->log_to_phy, node))
914     {
915       unsigned long int size = tower_node_get_size (node);
916       struct axis_group *group = tower_data (node, struct axis_group, logical);
917       tower_insert (&new->log_to_phy, size, make_axis_group (group->phy_start),
918                     NULL);
919     }
920
921   return new;
922 }
923
924 /* Adds the state of AXIS to the MD4 hash context CTX.
925
926    This is only used by the datasheet model checker driver.  It
927    is unlikely to be otherwise useful. */
928 static void
929 axis_hash (const struct axis *axis, struct md4_ctx *ctx)
930 {
931   const struct tower_node *tn;
932   const struct range_set_node *rsn;
933
934   for (tn = tower_first (&axis->log_to_phy); tn != NULL;
935        tn = tower_next (&axis->log_to_phy, tn))
936     {
937       struct axis_group *group = tower_data (tn, struct axis_group, logical);
938       unsigned long int phy_start = group->phy_start;
939       unsigned long int size = tower_node_get_size (tn);
940
941       md4_process_bytes (&phy_start, sizeof phy_start, ctx);
942       md4_process_bytes (&size, sizeof size, ctx);
943     }
944
945   RANGE_SET_FOR_EACH (rsn, axis->available)
946     {
947       unsigned long int start = range_set_node_get_start (rsn);
948       unsigned long int end = range_set_node_get_end (rsn);
949
950       md4_process_bytes (&start, sizeof start, ctx);
951       md4_process_bytes (&end, sizeof end, ctx);
952     }
953
954   md4_process_bytes (&axis->phy_size, sizeof axis->phy_size, ctx);
955 }
956
957 /* Destroys AXIS. */
958 static void
959 axis_destroy (struct axis *axis)
960 {
961   if (axis == NULL)
962     return;
963
964   while (!tower_is_empty (&axis->log_to_phy))
965     {
966       struct tower_node *node = tower_first (&axis->log_to_phy);
967       struct axis_group *group = tower_data (node, struct axis_group,
968                                              logical);
969       tower_delete (&axis->log_to_phy, node);
970       free (group);
971     }
972
973   range_set_destroy (axis->available);
974   free (axis);
975 }
976
977 /* Allocates up to REQUEST contiguous unused and available
978    ordinates from AXIS.  If successful, stores the number
979    obtained into *WIDTH and the ordinate of the first into
980    *START, marks the ordinates as now unavailable return true.
981    On failure, which occurs only if AXIS has no unused and
982    available ordinates, returns false without modifying AXIS. */
983 static bool
984 axis_allocate (struct axis *axis, unsigned long int request,
985                unsigned long int *start, unsigned long int *width)
986 {
987   return range_set_allocate (axis->available, request, start, width);
988 }
989
990 /* Marks the WIDTH contiguous ordinates in AXIS, starting from
991    START, as unused and available. */
992 static void
993 axis_make_available (struct axis *axis,
994                      unsigned long int start, unsigned long int width)
995 {
996   range_set_set1 (axis->available, start, width);
997 }
998
999 /* Extends the total physical length of AXIS by WIDTH and returns
1000    the first ordinate in the new physical region. */
1001 static unsigned long int
1002 axis_extend (struct axis *axis, unsigned long int width)
1003 {
1004   unsigned long int start = axis->phy_size;
1005   axis->phy_size += width;
1006   return start;
1007 }
1008
1009 /* Returns the physical ordinate in AXIS corresponding to logical
1010    ordinate LOG_POS.  LOG_POS must be less than the logical
1011    length of AXIS. */
1012 static unsigned long int
1013 axis_map (const struct axis *axis, unsigned long log_pos)
1014 {
1015   struct tower_node *node;
1016   struct axis_group *group;
1017   unsigned long int group_start;
1018
1019   node = tower_lookup (&axis->log_to_phy, log_pos, &group_start);
1020   group = tower_data (node, struct axis_group, logical);
1021   return group->phy_start + (log_pos - group_start);
1022 }
1023
1024 /* Returns the logical length of AXIS. */
1025 static unsigned long
1026 axis_get_size (const struct axis *axis)
1027 {
1028   return tower_height (&axis->log_to_phy);
1029 }
1030
1031 /* Inserts the CNT contiguous physical ordinates starting at
1032    PHY_START into AXIS's logical-to-physical mapping, starting at
1033    logical position LOG_START. */
1034 static void
1035 axis_insert (struct axis *axis,
1036              unsigned long int log_start, unsigned long int phy_start,
1037              unsigned long int cnt)
1038 {
1039   struct tower_node *before = split_axis (axis, log_start);
1040   struct tower_node *new = make_axis_group (phy_start);
1041   tower_insert (&axis->log_to_phy, cnt, new, before);
1042   merge_axis_nodes (axis, new, NULL);
1043   check_axis_merged (axis);
1044 }
1045
1046 /* Removes CNT ordinates from AXIS's logical-to-physical mapping
1047    starting at logical position START. */
1048 static void
1049 axis_remove (struct axis *axis,
1050              unsigned long int start, unsigned long int cnt)
1051 {
1052   if (cnt > 0)
1053     {
1054       struct tower_node *last = split_axis (axis, start + cnt);
1055       struct tower_node *cur, *next;
1056       for (cur = split_axis (axis, start); cur != last; cur = next)
1057         {
1058           next = tower_delete (&axis->log_to_phy, cur);
1059           free (axis_group_from_tower_node (cur));
1060         }
1061       merge_axis_nodes (axis, last, NULL);
1062       check_axis_merged (axis);
1063     }
1064 }
1065
1066 /* Moves the CNT ordinates in AXIS's logical-to-mapping starting
1067    at logical position OLD_START so that they then start at
1068    position NEW_START. */
1069 static void
1070 axis_move (struct axis *axis,
1071            unsigned long int old_start, unsigned long int new_start,
1072            unsigned long int cnt)
1073 {
1074   if (cnt > 0 && old_start != new_start)
1075     {
1076       struct tower_node *old_first, *old_last, *new_first;
1077       struct tower_node *merge1, *merge2;
1078       struct tower tmp_array;
1079
1080       /* Move ordinates OLD_START...(OLD_START + CNT) into new,
1081          separate TMP_ARRAY. */
1082       old_first = split_axis (axis, old_start);
1083       old_last = split_axis (axis, old_start + cnt);
1084       tower_init (&tmp_array);
1085       tower_splice (&tmp_array, NULL,
1086                     &axis->log_to_phy, old_first, old_last);
1087       merge_axis_nodes (axis, old_last, NULL);
1088       check_axis_merged (axis);
1089
1090       /* Move TMP_ARRAY to position NEW_START. */
1091       new_first = split_axis (axis, new_start);
1092       merge1 = tower_first (&tmp_array);
1093       merge2 = tower_last (&tmp_array);
1094       if (merge2 == merge1)
1095         merge2 = NULL;
1096       tower_splice (&axis->log_to_phy, new_first, &tmp_array, old_first, NULL);
1097       merge_axis_nodes (axis, merge1, &merge2);
1098       merge_axis_nodes (axis, merge2, NULL);
1099       check_axis_merged (axis);
1100     }
1101 }
1102
1103 /* Returns the axis_group in which NODE is embedded. */
1104 static struct axis_group *
1105 axis_group_from_tower_node (struct tower_node *node)
1106 {
1107   return tower_data (node, struct axis_group, logical);
1108 }
1109
1110 /* Creates and returns a new axis_group at physical position
1111    PHY_START. */
1112 static struct tower_node *
1113 make_axis_group (unsigned long phy_start)
1114 {
1115   struct axis_group *group = xmalloc (sizeof *group);
1116   group->phy_start = phy_start;
1117   return &group->logical;
1118 }
1119
1120 /* Returns the tower_node in AXIS's logical-to-physical map whose
1121    bottom edge is at exact level WHERE.  If there is no such
1122    tower_node in AXIS's logical-to-physical map, then split_axis
1123    creates one by breaking an existing tower_node into two
1124    separate ones, unless WHERE is equal to the tower height, in
1125    which case it simply returns a null pointer. */
1126 static struct tower_node *
1127 split_axis (struct axis *axis, unsigned long int where)
1128 {
1129   unsigned long int group_start;
1130   struct tower_node *group_node;
1131   struct axis_group *group;
1132
1133   assert (where <= tower_height (&axis->log_to_phy));
1134   if (where >= tower_height (&axis->log_to_phy))
1135     return NULL;
1136
1137   group_node = tower_lookup (&axis->log_to_phy, where, &group_start);
1138   group = axis_group_from_tower_node (group_node);
1139   if (where > group_start)
1140     {
1141       unsigned long int size_1 = where - group_start;
1142       unsigned long int size_2 = tower_node_get_size (group_node) - size_1;
1143       struct tower_node *next = tower_next (&axis->log_to_phy, group_node);
1144       struct tower_node *new = make_axis_group (group->phy_start + size_1);
1145       tower_resize (&axis->log_to_phy, group_node, size_1);
1146       tower_insert (&axis->log_to_phy, size_2, new, next);
1147       return new;
1148     }
1149   else
1150     return &group->logical;
1151 }
1152
1153 /* Within AXIS, attempts to merge NODE (or the last node in AXIS,
1154    if NODE is null) with its neighbor nodes.  This is possible
1155    when logically adjacent nodes are also adjacent physically (in
1156    the same order).
1157
1158    When a merge occurs, and OTHER_NODE is non-null and points to
1159    the node to be deleted, this function also updates
1160    *OTHER_NODE, if necessary, to ensure that it remains a valid
1161    pointer. */
1162 static void
1163 merge_axis_nodes (struct axis *axis, struct tower_node *node,
1164                   struct tower_node **other_node)
1165 {
1166   struct tower *t = &axis->log_to_phy;
1167   struct axis_group *group;
1168   struct tower_node *next, *prev;
1169
1170   /* Find node to potentially merge with neighbors. */
1171   if (node == NULL)
1172     node = tower_last (t);
1173   if (node == NULL)
1174     return;
1175   group = axis_group_from_tower_node (node);
1176
1177   /* Try to merge NODE with successor. */
1178   next = tower_next (t, node);
1179   if (next != NULL)
1180     {
1181       struct axis_group *next_group = axis_group_from_tower_node (next);
1182       unsigned long this_height = tower_node_get_size (node);
1183
1184       if (group->phy_start + this_height == next_group->phy_start)
1185         {
1186           unsigned long next_height = tower_node_get_size (next);
1187           tower_resize (t, node, this_height + next_height);
1188           if (other_node != NULL && *other_node == next)
1189             *other_node = tower_next (t, *other_node);
1190           tower_delete (t, next);
1191           free (next_group);
1192         }
1193     }
1194
1195   /* Try to merge NODE with predecessor. */
1196   prev = tower_prev (t, node);
1197   if (prev != NULL)
1198     {
1199       struct axis_group *prev_group = axis_group_from_tower_node (prev);
1200       unsigned long prev_height = tower_node_get_size (prev);
1201
1202       if (prev_group->phy_start + prev_height == group->phy_start)
1203         {
1204           unsigned long this_height = tower_node_get_size (node);
1205           group->phy_start = prev_group->phy_start;
1206           tower_resize (t, node, this_height + prev_height);
1207           if (other_node != NULL && *other_node == prev)
1208             *other_node = tower_next (t, *other_node);
1209           tower_delete (t, prev);
1210           free (prev_group);
1211         }
1212     }
1213 }
1214
1215 /* Verify that all potentially merge-able nodes in AXIS are
1216    actually merged. */
1217 static void
1218 check_axis_merged (const struct axis *axis UNUSED)
1219 {
1220 #if ASSERT_LEVEL >= 10
1221   struct tower_node *prev, *node;
1222
1223   for (prev = NULL, node = tower_first (&axis->log_to_phy); node != NULL;
1224        prev = node, node = tower_next (&axis->log_to_phy, node))
1225     if (prev != NULL)
1226       {
1227         struct axis_group *prev_group = axis_group_from_tower_node (prev);
1228         unsigned long prev_height = tower_node_get_size (prev);
1229         struct axis_group *node_group = axis_group_from_tower_node (node);
1230         assert (prev_group->phy_start + prev_height != node_group->phy_start);
1231       }
1232 #endif
1233 }
1234 \f
1235 /* A source. */
1236
1237 /* Creates and returns an empty, unbacked source with N_BYTES
1238    bytes per case, none of which are initially in use. */
1239 static struct source *
1240 source_create_empty (size_t n_bytes)
1241 {
1242   struct source *source = xmalloc (sizeof *source);
1243   size_t row_size = n_bytes + 4 * sizeof (void *);
1244   size_t max_memory_rows = settings_get_workspace () / row_size;
1245   source->avail = range_set_create ();
1246   range_set_set1 (source->avail, 0, n_bytes);
1247   source->data = sparse_xarray_create (n_bytes, MAX (max_memory_rows, 4));
1248   source->backing = NULL;
1249   source->backing_rows = 0;
1250   source->n_used = 0;
1251   return source;
1252 }
1253
1254 /* Creates and returns a new source backed by READER and with the
1255    same initial dimensions and content. */
1256 static struct source *
1257 source_create_casereader (struct casereader *reader)
1258 {
1259   const struct caseproto *proto = casereader_get_proto (reader);
1260   size_t n_bytes = caseproto_to_n_bytes (proto);
1261   struct source *source = source_create_empty (n_bytes);
1262   size_t n_columns;
1263   size_t i;
1264
1265   range_set_set0 (source->avail, 0, n_bytes);
1266   source->backing = reader;
1267   source->backing_rows = casereader_count_cases (reader);
1268
1269   source->n_used = 0;
1270   n_columns = caseproto_get_n_widths (proto);
1271   for (i = 0; i < n_columns; i++)
1272     if (caseproto_get_width (proto, i) >= 0)
1273       source->n_used++;
1274
1275   return source;
1276 }
1277
1278 /* Returns a clone of source OLD with the same data and backing
1279    (if any).
1280
1281    Currently this is used only by the datasheet model checker
1282    driver, but it could be otherwise useful. */
1283 static struct source *
1284 source_clone (const struct source *old)
1285 {
1286   struct source *new = xmalloc (sizeof *new);
1287   new->avail = range_set_clone (old->avail, NULL);
1288   new->data = sparse_xarray_clone (old->data);
1289   new->backing = old->backing != NULL ? casereader_clone (old->backing) : NULL;
1290   new->backing_rows = old->backing_rows;
1291   new->n_used = old->n_used;
1292   if (new->data == NULL)
1293     {
1294       source_destroy (new);
1295       new = NULL;
1296     }
1297   return new;
1298 }
1299
1300 static int
1301 source_allocate_column (struct source *source, int width)
1302 {
1303   unsigned long int start;
1304   int n_bytes;
1305
1306   assert (width >= 0);
1307   n_bytes = width_to_n_bytes (width);
1308   if (source->backing == NULL
1309       && range_set_allocate_fully (source->avail, n_bytes, &start))
1310     return start;
1311   else
1312     return -1;
1313 }
1314
1315 static void
1316 source_release_column (struct source *source, int ofs, int width)
1317 {
1318   assert (width >= 0);
1319   range_set_set1 (source->avail, ofs, width_to_n_bytes (width));
1320   if (source->backing != NULL)
1321     source->n_used--;
1322 }
1323
1324 /* Returns true if SOURCE has any columns in use,
1325    false otherwise. */
1326 static bool
1327 source_in_use (const struct source *source)
1328 {
1329   return source->n_used > 0;
1330 }
1331
1332 /* Destroys SOURCE and its data and backing, if any. */
1333 static void
1334 source_destroy (struct source *source)
1335 {
1336   if (source != NULL)
1337     {
1338       range_set_destroy (source->avail);
1339       sparse_xarray_destroy (source->data);
1340       casereader_destroy (source->backing);
1341       free (source);
1342     }
1343 }
1344
1345 /* Returns the number of rows in SOURCE's backing casereader
1346    (SOURCE must have a backing casereader). */
1347 static casenumber
1348 source_get_backing_n_rows (const struct source *source)
1349 {
1350   assert (source_has_backing (source));
1351   return source->backing_rows;
1352 }
1353
1354 /* Reads the N COLUMNS in the given ROW, into the N VALUES.  Returns true if
1355    successful, false on I/O error.
1356
1357    All of the COLUMNS must have the same source.
1358
1359    The caller must have initialized VALUES with the proper width. */
1360 static bool
1361 source_read (const struct column columns[], casenumber row,
1362              union value values[], size_t n)
1363 {
1364   struct source *source = columns[0].source;
1365   size_t i;
1366
1367   if (source->backing == NULL
1368       || sparse_xarray_contains_row (source->data, row))
1369     {
1370       bool ok = true;
1371
1372       for (i = 0; i < n && ok; i++)
1373         ok = sparse_xarray_read (source->data, row, columns[i].byte_ofs,
1374                                  width_to_n_bytes (columns[i].width),
1375                                  value_to_data (&values[i], columns[i].width));
1376       return ok;
1377     }
1378   else
1379     {
1380       struct ccase *c = casereader_peek (source->backing, row);
1381       bool ok = c != NULL;
1382       if (ok)
1383         {
1384           for (i = 0; i < n; i++)
1385             value_copy (&values[i], case_data_idx (c, columns[i].value_ofs),
1386                         columns[i].width);
1387           case_unref (c);
1388         }
1389       return ok;
1390     }
1391 }
1392
1393 static bool
1394 copy_case_into_source (struct source *source, struct ccase *c, casenumber row)
1395 {
1396   const struct caseproto *proto = casereader_get_proto (source->backing);
1397   size_t n_widths = caseproto_get_n_widths (proto);
1398   size_t ofs;
1399   size_t i;
1400
1401   ofs = 0;
1402   for (i = 0; i < n_widths; i++)
1403     {
1404       int width = caseproto_get_width (proto, i);
1405       if (width >= 0)
1406         {
1407           int n_bytes = width_to_n_bytes (width);
1408           if (!sparse_xarray_write (source->data, row, ofs, n_bytes,
1409                                     value_to_data (case_data_idx (c, i),
1410                                                    width)))
1411             return false;
1412           ofs += n_bytes;
1413         }
1414     }
1415   return true;
1416 }
1417
1418 /* Writes the N VALUES to their source in the given ROW and COLUMNS.  Returns
1419    true if successful, false on I/O error.  On error, the row's data may be
1420    completely or partially corrupted, both inside and outside the region to be
1421    written.
1422
1423    All of the COLUMNS must have the same source. */
1424 static bool
1425 source_write (const struct column columns[], casenumber row,
1426               const union value values[], size_t n)
1427 {
1428   struct source *source = columns[0].source;
1429   struct casereader *backing = source->backing;
1430   size_t i;
1431
1432   if (backing != NULL
1433       && !sparse_xarray_contains_row (source->data, row)
1434       && row < source->backing_rows)
1435     {
1436       struct ccase *c;
1437       bool ok;
1438
1439       c = casereader_peek (backing, row);
1440       if (c == NULL)
1441         return false;
1442
1443       ok = copy_case_into_source (source, c, row);
1444       case_unref (c);
1445       if (!ok)
1446         return false;
1447     }
1448
1449   for (i = 0; i < n; i++)
1450     if (!sparse_xarray_write (source->data, row, columns[i].byte_ofs,
1451                               width_to_n_bytes (columns[i].width),
1452                               value_to_data (&values[i], columns[i].width)))
1453       return false;
1454   return true;
1455 }
1456
1457 /* Within SOURCE, which must not have a backing casereader,
1458    writes the VALUE_CNT values in VALUES_CNT to the VALUE_CNT
1459    columns starting from START_COLUMN, in every row, even in rows
1460    not yet otherwise initialized.  Returns true if successful,
1461    false if an I/O error occurs.
1462
1463    We don't support backing != NULL because (1) it's harder and
1464    (2) this function is only called by
1465    datasheet_insert_column, which doesn't reuse columns from
1466    sources that are backed by casereaders. */
1467 static bool
1468 source_write_column (struct column *column, const union value *value)
1469 {
1470   int width = column->width;
1471
1472   assert (column->source->backing == NULL);
1473   assert (width >= 0);
1474
1475   return sparse_xarray_write_columns (column->source->data, column->byte_ofs,
1476                                       width_to_n_bytes (width),
1477                                       value_to_data (value, width));
1478 }
1479
1480 /* Returns true if SOURCE has a backing casereader, false
1481    otherwise. */
1482 static bool
1483 source_has_backing (const struct source *source)
1484 {
1485   return source->backing != NULL;
1486 }
1487 \f
1488 /* Datasheet model checker test driver. */
1489
1490 static int
1491 get_source_index (const struct datasheet *ds, const struct source *source)
1492 {
1493   size_t i;
1494
1495   for (i = 0; i < ds->n_sources; i++)
1496     if (ds->sources[i] == source)
1497       return i;
1498   NOT_REACHED ();
1499 }
1500
1501 /* Clones the structure and contents of ODS into a new datasheet,
1502    and returns the new datasheet. */
1503 struct datasheet *
1504 clone_datasheet (const struct datasheet *ods)
1505 {
1506   struct datasheet *ds;
1507   size_t i;
1508
1509   ds = xmalloc (sizeof *ds);
1510
1511   ds->sources = xmalloc (ods->n_sources * sizeof *ds->sources);
1512   for (i = 0; i < ods->n_sources; i++)
1513     ds->sources[i] = source_clone (ods->sources[i]);
1514   ds->n_sources = ods->n_sources;
1515
1516   ds->proto = ods->proto != NULL ? caseproto_ref (ods->proto) : NULL;
1517   ds->columns = xmemdup (ods->columns, ods->n_columns * sizeof *ods->columns);
1518   for (i = 0; i < ods->n_columns; i++)
1519     ds->columns[i].source
1520       = ds->sources[get_source_index (ods, ods->columns[i].source)];
1521   ds->n_columns = ods->n_columns;
1522   ds->column_min_alloc = ods->column_min_alloc;
1523
1524   ds->rows = axis_clone (ods->rows);
1525
1526   ds->taint = taint_create ();
1527
1528   return ds;
1529 }
1530
1531 /* Hashes the structure of datasheet DS and returns the hash.
1532    We use MD4 because it is much faster than MD5 or SHA-1 but its
1533    collision resistance is just as good. */
1534 unsigned int
1535 hash_datasheet (const struct datasheet *ds)
1536 {
1537   unsigned int hash[DIV_RND_UP (20, sizeof (unsigned int))];
1538   struct md4_ctx ctx;
1539   size_t i;
1540
1541   md4_init_ctx (&ctx);
1542   for (i = 0; i < ds->n_columns; i++)
1543     {
1544       const struct column *column = &ds->columns[i];
1545       int source_n_bytes = sparse_xarray_get_n_columns (column->source->data);
1546       md4_process_bytes (&source_n_bytes, sizeof source_n_bytes, &ctx);
1547       /*md4_process_bytes (&column->byte_ofs, sizeof column->byte_ofs, &ctx);*/
1548       md4_process_bytes (&column->value_ofs, sizeof column->value_ofs, &ctx);
1549       md4_process_bytes (&column->width, sizeof column->width, &ctx);
1550     }
1551   axis_hash (ds->rows, &ctx);
1552   md4_process_bytes (&ds->column_min_alloc, sizeof ds->column_min_alloc, &ctx);
1553   md4_finish_ctx (&ctx, hash);
1554   return hash[0];
1555 }