dictionary: Always compact immediately upon deletion of a variable.
[pspp] / src / data / datasheet.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2007, 2009, 2010, 2011, 2012, 2013 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include "data/datasheet.h"
20
21 #include <stdlib.h>
22 #include <string.h>
23
24 #include "data/casereader-provider.h"
25 #include "data/casereader.h"
26 #include "data/casewriter.h"
27 #include "data/lazy-casereader.h"
28 #include "data/settings.h"
29 #include "libpspp/array.h"
30 #include "libpspp/assertion.h"
31 #include "libpspp/misc.h"
32 #include "libpspp/range-map.h"
33 #include "libpspp/range-set.h"
34 #include "libpspp/sparse-xarray.h"
35 #include "libpspp/taint.h"
36 #include "libpspp/tower.h"
37
38 #include "gl/minmax.h"
39 #include "gl/md4.h"
40 #include "gl/xalloc.h"
41
42 struct column;
43
44 static struct axis *axis_create (void);
45 static struct axis *axis_clone (const struct axis *);
46 static void axis_destroy (struct axis *);
47
48 static void axis_hash (const struct axis *, struct md4_ctx *);
49
50 static bool axis_allocate (struct axis *, unsigned long int request,
51                            unsigned long int *start,
52                            unsigned long int *width);
53 static void axis_make_available (struct axis *,
54                                  unsigned long int start,
55                                  unsigned long int width);
56 static unsigned long int axis_extend (struct axis *, unsigned long int width);
57
58 static unsigned long int axis_map (const struct axis *, unsigned long log_pos);
59
60 static unsigned long axis_get_size (const struct axis *);
61 static void axis_insert (struct axis *,
62                          unsigned long int log_start,
63                          unsigned long int phy_start,
64                          unsigned long int n);
65 static void axis_remove (struct axis *,
66                          unsigned long int start, unsigned long int n);
67 static void axis_move (struct axis *,
68                        unsigned long int old_start,
69                        unsigned long int new_start,
70                        unsigned long int n);
71
72 static struct source *source_create_empty (size_t n_bytes);
73 static struct source *source_create_casereader (struct casereader *);
74 static struct source *source_clone (const struct source *);
75 static void source_destroy (struct source *);
76
77 static casenumber source_get_backing_n_rows (const struct source *);
78
79 static int source_allocate_column (struct source *, int width);
80 static void source_release_column (struct source *, int ofs, int width);
81 static bool source_in_use (const struct source *);
82
83 static bool source_read (const struct column *, casenumber row, union value *,
84                          size_t n);
85 static bool source_write (const struct column *, casenumber row,
86                           const union value *, size_t n);
87 static bool source_write_column (struct column *, const union value *);
88 static bool source_has_backing (const struct source *);
89
90 static int get_source_index (const struct datasheet *ds, const struct source *source);
91
92 /* A datasheet is internally composed from a set of data files,
93    called "sources".  The sources that make up a datasheet must
94    have the same number of rows (cases), but their numbers of
95    columns (variables) may vary.
96
97    A datasheet's external view is produced by mapping (permuting
98    and selecting) its internal data.  Thus, we can rearrange or
99    delete rows or columns simply by modifying the mapping.  We
100    add rows by adding rows to each source and to the row mapping.
101    We add columns by adding a new source, then adding that source
102    to the column mapping.
103
104    Each source in a datasheet can be a casereader or a
105    sparse_xarray.  Casereaders are read-only, so when sources
106    made from casereaders need to be modified, it is done
107    "virtually" through being overlaid by a sparse_xarray. */
108 struct source
109   {
110     struct range_set *avail;    /* Free bytes are set to 1s. */
111     struct sparse_xarray *data; /* Data at top level, atop the backing. */
112     struct casereader *backing; /* Backing casereader (or null). */
113     casenumber backing_rows;    /* Number of rows in backing (if backed). */
114     size_t n_used;              /* Number of column in use (if backed). */
115   };
116
117 /* A logical column. */
118 struct column
119   {
120     struct source *source;      /* Source of the underlying physical column. */
121     int value_ofs;              /* If 'source' has a backing casereader,
122                                    column's value offset in its cases. */
123     int byte_ofs;               /* Byte offset in source's sparse_xarray. */
124     int width;                  /* 0=numeric, otherwise string width. */
125   };
126
127 /* A datasheet. */
128 struct datasheet
129   {
130     /* Data sources. */
131     struct source **sources;    /* Sources, in no particular order. */
132     size_t n_sources;           /* Number of sources. */
133
134     /* Columns. */
135     struct caseproto *proto;    /* Prototype for rows (initialized lazily). */
136     struct column *columns;     /* Logical to physical column mapping. */
137     size_t n_columns;           /* Number of logical columns. */
138     unsigned column_min_alloc;  /* Min. # of columns to put in a new source. */
139
140     /* Rows. */
141     struct axis *rows;          /* Logical to physical row mapping. */
142
143     /* Tainting. */
144     struct taint *taint;        /* Indicates corrupted data. */
145   };
146
147 /* Is this operation a read or a write? */
148 enum rw_op
149   {
150     OP_READ,
151     OP_WRITE
152   };
153
154 static void allocate_column (struct datasheet *, int width, struct column *);
155 static void release_source (struct datasheet *, struct source *);
156 static bool rw_case (struct datasheet *ds, enum rw_op op,
157                      casenumber lrow, size_t start_column, size_t n_columns,
158                      union value data[]);
159
160 /* Returns the number of bytes needed to store a value with the
161    given WIDTH on disk. */
162 static size_t
163 width_to_n_bytes (int width)
164 {
165   return width == 0 ? sizeof (double) : width;
166 }
167
168 /* Returns the address of the data in VALUE (for reading or
169    writing to/from disk).  VALUE must have the given WIDTH. */
170 static void *
171 value_to_data (const union value *value_, int width)
172 {
173   union value *value = (union value *) value_;
174   assert (sizeof value->f == sizeof (double));
175   if (width == 0)
176     return &value->f;
177   else
178     return value->s;
179 }
180
181 /* Returns the number of bytes needed to store all the values in
182    PROTO on disk. */
183 static size_t
184 caseproto_to_n_bytes (const struct caseproto *proto)
185 {
186   size_t n_bytes;
187   size_t i;
188
189   n_bytes = 0;
190   for (i = 0; i < caseproto_get_n_widths (proto); i++)
191     {
192       int width = caseproto_get_width (proto, i);
193       assert (width >= 0);
194       if (width >= 0)
195         n_bytes += width_to_n_bytes (width);
196     }
197   return n_bytes;
198 }
199
200 /* Creates and returns a new datasheet.
201
202    If READER is nonnull, then the datasheet initially contains
203    the contents of READER. */
204 struct datasheet *
205 datasheet_create (struct casereader *reader)
206 {
207   struct datasheet *ds = xmalloc (sizeof *ds);
208   ds->sources = NULL;
209   ds->n_sources = 0;
210   ds->proto = NULL;
211   ds->columns = NULL;
212   ds->n_columns = 0;
213   ds->column_min_alloc = 8;
214   ds->rows = axis_create ();
215   ds->taint = taint_create ();
216
217   if (reader != NULL)
218     {
219       casenumber n_rows;
220       size_t byte_ofs;
221       size_t i;
222
223       taint_propagate (casereader_get_taint (reader), ds->taint);
224
225       ds->proto = caseproto_ref (casereader_get_proto (reader));
226
227       ds->sources = xmalloc (sizeof *ds->sources);
228       ds->sources[0] = source_create_casereader (reader);
229       ds->n_sources = 1;
230
231       ds->n_columns = caseproto_get_n_widths (ds->proto);
232       ds->columns = xnmalloc (ds->n_columns, sizeof *ds->columns);
233       byte_ofs = 0;
234       for (i = 0; i < ds->n_columns; i++)
235         {
236           struct column *column = &ds->columns[i];
237           int width = caseproto_get_width (ds->proto, i);
238           column->source = ds->sources[0];
239           column->width = width;
240           assert (width >= 0);
241           if (width >= 0)
242             {
243               column->value_ofs = i;
244               column->byte_ofs = byte_ofs;
245               byte_ofs += width_to_n_bytes (column->width);
246             }
247         }
248
249       n_rows = source_get_backing_n_rows (ds->sources[0]);
250       if (n_rows > 0)
251         axis_insert (ds->rows, 0, axis_extend (ds->rows, n_rows), n_rows);
252     }
253
254   return ds;
255 }
256
257 /* Destroys datasheet DS. */
258 void
259 datasheet_destroy (struct datasheet *ds)
260 {
261   size_t i;
262
263   if (ds == NULL)
264     return;
265
266   for (i = 0; i < ds->n_sources; i++)
267     source_destroy (ds->sources[i]);
268   free (ds->sources);
269   caseproto_unref (ds->proto);
270   free (ds->columns);
271   axis_destroy (ds->rows);
272   taint_destroy (ds->taint);
273   free (ds);
274 }
275
276 /* Returns the prototype for the cases in DS.  The caller must
277    not unref the returned prototype. */
278 const struct caseproto *
279 datasheet_get_proto (const struct datasheet *ds_)
280 {
281   struct datasheet *ds = CONST_CAST (struct datasheet *, ds_);
282   if (ds->proto == NULL)
283     {
284       size_t i;
285
286       ds->proto = caseproto_create ();
287       for (i = 0; i < ds->n_columns; i++)
288         ds->proto = caseproto_add_width (ds->proto, ds->columns[i].width);
289     }
290   return ds->proto;
291 }
292
293 /* Returns the width of the given COLUMN within DS.
294    COLUMN must be less than the number of columns in DS. */
295 int
296 datasheet_get_column_width (const struct datasheet *ds, size_t column)
297 {
298   assert (column < datasheet_get_n_columns (ds));
299   return ds->columns[column].width;
300 }
301
302 /* Moves datasheet DS to a new location in memory, and returns
303    the new location.  Afterward, the datasheet must not be
304    accessed at its former location.
305
306    This function is useful for ensuring that all references to a
307    datasheet have been dropped, especially in conjunction with
308    tools like Valgrind. */
309 struct datasheet *
310 datasheet_rename (struct datasheet *ds)
311 {
312   struct datasheet *new = xmemdup (ds, sizeof *ds);
313   free (ds);
314   return new;
315 }
316
317 /* Returns true if datasheet DS is tainted.
318    A datasheet is tainted by an I/O error or by taint
319    propagation to the datasheet. */
320 bool
321 datasheet_error (const struct datasheet *ds)
322 {
323   return taint_is_tainted (ds->taint);
324 }
325
326 /* Marks datasheet DS tainted. */
327 void
328 datasheet_force_error (struct datasheet *ds)
329 {
330   taint_set_taint (ds->taint);
331 }
332
333 /* Returns datasheet DS's taint object. */
334 const struct taint *
335 datasheet_get_taint (const struct datasheet *ds)
336 {
337   return ds->taint;
338 }
339
340 /* Returns the number of rows in DS. */
341 casenumber
342 datasheet_get_n_rows (const struct datasheet *ds)
343 {
344   return axis_get_size (ds->rows);
345 }
346
347 /* Returns the number of columns in DS. */
348 size_t
349 datasheet_get_n_columns (const struct datasheet *ds)
350 {
351   return ds->n_columns;
352 }
353
354 /* Inserts a column of the given WIDTH into datasheet DS just
355    before column BEFORE.  Initializes the contents of each row in
356    the inserted column to VALUE (which must have width WIDTH).
357
358    Returns true if successful, false on failure.  In case of
359    failure, the datasheet is unchanged. */
360 bool
361 datasheet_insert_column (struct datasheet *ds,
362                          const union value *value, int width, size_t before)
363 {
364   struct column *col;
365
366   assert (before <= ds->n_columns);
367
368   ds->columns = xnrealloc (ds->columns,
369                            ds->n_columns + 1, sizeof *ds->columns);
370   insert_element (ds->columns, ds->n_columns, sizeof *ds->columns, before);
371   col = &ds->columns[before];
372   ds->n_columns++;
373
374   allocate_column (ds, width, col);
375
376   if (width >= 0 && !source_write_column (col, value))
377     {
378       datasheet_delete_columns (ds, before, 1);
379       taint_set_taint (ds->taint);
380       return false;
381     }
382
383   return true;
384 }
385
386 /* Deletes the N columns in DS starting from column START. */
387 void
388 datasheet_delete_columns (struct datasheet *ds, size_t start, size_t n)
389 {
390   assert (start + n <= ds->n_columns);
391
392   if (n > 0)
393     {
394       size_t i;
395
396       for (i = start; i < start + n; i++)
397         {
398           struct column *column = &ds->columns[i];
399           struct source *source = column->source;
400           source_release_column (source, column->byte_ofs, column->width);
401           release_source (ds, source);
402         }
403
404       remove_range (ds->columns, ds->n_columns, sizeof *ds->columns, start, n);
405       ds->n_columns -= n;
406
407       caseproto_unref (ds->proto);
408       ds->proto = NULL;
409     }
410 }
411
412 /* Moves the N columns in DS starting at position OLD_START so
413    that they then start at position NEW_START.  Equivalent to
414    deleting the column rows, then inserting them at what becomes
415    position NEW_START after the deletion. */
416 void
417 datasheet_move_columns (struct datasheet *ds,
418                         size_t old_start, size_t new_start,
419                         size_t n)
420 {
421   assert (old_start + n <= ds->n_columns);
422   assert (new_start + n <= ds->n_columns);
423
424   move_range (ds->columns, ds->n_columns, sizeof *ds->columns,
425               old_start, new_start, n);
426
427   caseproto_unref (ds->proto);
428   ds->proto = NULL;
429 }
430
431 struct resize_datasheet_value_aux
432   {
433     union value src_value;
434     size_t src_ofs;
435     int src_width;
436
437     void (*resize_cb) (const union value *, union value *, const void *aux);
438     const void *resize_cb_aux;
439
440     union value dst_value;
441     size_t dst_ofs;
442     int dst_width;
443   };
444
445 static bool
446 resize_datasheet_value (const void *src, void *dst, void *aux_)
447 {
448   struct resize_datasheet_value_aux *aux = aux_;
449
450   memcpy (value_to_data (&aux->src_value, aux->src_width),
451           (uint8_t *) src + aux->src_ofs,
452           width_to_n_bytes (aux->src_width));
453
454   aux->resize_cb (&aux->src_value, &aux->dst_value, aux->resize_cb_aux);
455
456   memcpy ((uint8_t *) dst + aux->dst_ofs,
457           value_to_data (&aux->dst_value, aux->dst_width),
458           width_to_n_bytes (aux->dst_width));
459
460   return true;
461 }
462
463 bool
464 datasheet_resize_column (struct datasheet *ds, size_t column, int new_width,
465                          void (*resize_cb) (const union value *,
466                                             union value *, const void *aux),
467                          const void *resize_cb_aux)
468 {
469   struct column old_col;
470   struct column *col;
471   int old_width;
472
473   assert (column < datasheet_get_n_columns (ds));
474
475   col = &ds->columns[column];
476   old_col = *col;
477   old_width = old_col.width;
478   assert (old_width >= 0);
479   assert (new_width >= 0);
480
481   if (new_width == -1)
482     {
483       if (old_width != -1)
484         {
485           datasheet_delete_columns (ds, column, 1);
486           datasheet_insert_column (ds, NULL, -1, column);
487         }
488     }
489   else if (old_width == -1)
490     {
491       union value value;
492       value_init (&value, new_width);
493       value_set_missing (&value, new_width);
494       if (resize_cb != NULL)
495         resize_cb (NULL, &value, resize_cb_aux);
496       datasheet_delete_columns (ds, column, 1);
497       datasheet_insert_column (ds, &value, new_width, column);
498       value_destroy (&value, new_width);
499     }
500   else if (source_has_backing (col->source))
501     {
502       unsigned long int n_rows = axis_get_size (ds->rows);
503       unsigned long int lrow;
504       union value src, dst;
505
506       source_release_column (col->source, col->byte_ofs, col->width);
507       allocate_column (ds, new_width, col);
508
509       value_init (&src, old_width);
510       value_init (&dst, new_width);
511       for (lrow = 0; lrow < n_rows; lrow++)
512         {
513           unsigned long int prow = axis_map (ds->rows, lrow);
514           if (!source_read (&old_col, prow, &src, 1))
515             {
516               /* FIXME: back out col changes. */
517               break;
518             }
519           resize_cb (&src, &dst, resize_cb_aux);
520           if (!source_write (col, prow, &dst, 1))
521             {
522               /* FIXME: back out col changes. */
523               break;
524             }
525         }
526       value_destroy (&src, old_width);
527       value_destroy (&dst, new_width);
528       if (lrow < n_rows)
529         return false;
530
531       release_source (ds, old_col.source);
532     }
533   else
534     {
535       struct resize_datasheet_value_aux aux;
536
537       source_release_column (col->source, col->byte_ofs, col->width);
538       allocate_column (ds, new_width, col);
539
540       value_init (&aux.src_value, old_col.width);
541       aux.src_ofs = old_col.byte_ofs;
542       aux.src_width = old_col.width;
543       aux.resize_cb = resize_cb;
544       aux.resize_cb_aux = resize_cb_aux;
545       value_init (&aux.dst_value, new_width);
546       aux.dst_ofs = col->byte_ofs;
547       aux.dst_width = new_width;
548       sparse_xarray_copy (old_col.source->data, col->source->data,
549                           resize_datasheet_value, &aux);
550       value_destroy (&aux.src_value, old_width);
551       value_destroy (&aux.dst_value, new_width);
552
553       release_source (ds, old_col.source);
554     }
555   return true;
556 }
557
558 /* Retrieves and returns the contents of the given ROW in
559    datasheet DS.  The caller owns the returned case and must
560    unref it when it is no longer needed.  Returns a null pointer
561    on I/O error. */
562 struct ccase *
563 datasheet_get_row (const struct datasheet *ds, casenumber row)
564 {
565   size_t n_columns = datasheet_get_n_columns (ds);
566   struct ccase *c = case_create (datasheet_get_proto (ds));
567   if (rw_case (CONST_CAST (struct datasheet *, ds), OP_READ,
568                row, 0, n_columns, case_data_all_rw (c)))
569     return c;
570   else
571     {
572       case_unref (c);
573       return NULL;
574     }
575 }
576
577 /* Stores the contents of case C, which is destroyed, into the
578    given ROW in DS.  Returns true on success, false on I/O error.
579    On failure, the given ROW might be partially modified or
580    corrupted. */
581 bool
582 datasheet_put_row (struct datasheet *ds, casenumber row, struct ccase *c)
583 {
584   size_t n_columns = datasheet_get_n_columns (ds);
585   bool ok = rw_case (ds, OP_WRITE, row, 0, n_columns,
586                      (union value *) case_data_all (c));
587   case_unref (c);
588   return ok;
589 }
590
591 /* Stores the values of COLUMN in DS in the given ROW in DS into
592    VALUE.  The caller must have already initialized VALUE as a
593    value of the appropriate width (as returned by
594    datasheet_get_column_width (DS, COLUMN)).  Returns true if
595    successful, false on I/O error. */
596 bool
597 datasheet_get_value (const struct datasheet *ds, casenumber row,
598                      size_t column, union value *value)
599 {
600   assert (row >= 0);
601   return rw_case (CONST_CAST (struct datasheet *, ds), OP_READ,
602                   row, column, 1, value);
603 }
604
605 /* Stores VALUE into DS in the given ROW and COLUMN.  VALUE must
606    have the correct width for COLUMN (as returned by
607    datasheet_get_column_width (DS, COLUMN)).  Returns true if
608    successful, false on I/O error.  On failure, ROW might be
609    partially modified or corrupted. */
610 bool
611 datasheet_put_value (struct datasheet *ds, casenumber row,
612                      size_t column, const union value *value)
613 {
614   return rw_case (ds, OP_WRITE, row, column, 1, (union value *) value);
615 }
616
617 /* Inserts the N cases at C into datasheet DS just before row
618    BEFORE.  Returns true if successful, false on I/O error.  On
619    failure, datasheet DS is not modified.
620
621    Regardless of success, this function unrefs all of the cases
622    in C. */
623 bool
624 datasheet_insert_rows (struct datasheet *ds,
625                        casenumber before, struct ccase *c[],
626                        casenumber n)
627 {
628   casenumber added = 0;
629   while (n > 0)
630     {
631       unsigned long first_phy;
632       unsigned long n_phys;
633       unsigned long i;
634
635       /* Allocate physical rows from the pool of available
636          rows. */
637       if (!axis_allocate (ds->rows, n, &first_phy, &n_phys))
638         {
639           /* No rows were available.  Extend the row axis to make
640              some new ones available. */
641           n_phys = n;
642           first_phy = axis_extend (ds->rows, n);
643         }
644
645       /* Insert the new rows into the row mapping. */
646       axis_insert (ds->rows, before, first_phy, n_phys);
647
648       /* Initialize the new rows. */
649       for (i = 0; i < n_phys; i++)
650         if (!datasheet_put_row (ds, before + i, c[i]))
651           {
652             while (++i < n)
653               case_unref (c[i]);
654             datasheet_delete_rows (ds, before - added, n_phys + added);
655             return false;
656           }
657
658       /* Advance. */
659       c += n_phys;
660       n -= n_phys;
661       before += n_phys;
662       added += n_phys;
663     }
664   return true;
665 }
666
667 /* Deletes the N rows in DS starting from row FIRST. */
668 void
669 datasheet_delete_rows (struct datasheet *ds,
670                        casenumber first, casenumber n)
671 {
672   size_t lrow;
673
674   /* Free up rows for reuse.
675      FIXME: optimize. */
676   for (lrow = first; lrow < first + n; lrow++)
677     axis_make_available (ds->rows, axis_map (ds->rows, lrow), 1);
678
679   /* Remove rows from logical-to-physical mapping. */
680   axis_remove (ds->rows, first, n);
681 }
682
683 /* Moves the N rows in DS starting at position OLD_START so
684    that they then start at position NEW_START.  Equivalent to
685    deleting the given rows, then inserting them at what becomes
686    position NEW_START after the deletion. */
687 void
688 datasheet_move_rows (struct datasheet *ds,
689                      size_t old_start, size_t new_start,
690                      size_t n)
691 {
692   axis_move (ds->rows, old_start, new_start, n);
693 }
694 \f
695 static const struct casereader_random_class datasheet_reader_class;
696
697 /* Creates and returns a casereader whose input cases are the
698    rows in datasheet DS.  From the caller's perspective, DS is
699    effectively destroyed by this operation, such that the caller
700    must not reference it again. */
701 struct casereader *
702 datasheet_make_reader (struct datasheet *ds)
703 {
704   struct casereader *reader;
705   ds = datasheet_rename (ds);
706   reader = casereader_create_random (datasheet_get_proto (ds),
707                                      datasheet_get_n_rows (ds),
708                                      &datasheet_reader_class, ds);
709   taint_propagate (datasheet_get_taint (ds), casereader_get_taint (reader));
710   return reader;
711 }
712
713 /* "read" function for the datasheet random casereader. */
714 static struct ccase *
715 datasheet_reader_read (struct casereader *reader UNUSED, void *ds_,
716                        casenumber case_idx)
717 {
718   struct datasheet *ds = ds_;
719   if (case_idx < datasheet_get_n_rows (ds))
720     {
721       struct ccase *c = datasheet_get_row (ds, case_idx);
722       if (c == NULL)
723         taint_set_taint (ds->taint);
724       return c;
725     }
726   else
727     return NULL;
728 }
729
730 /* "destroy" function for the datasheet random casereader. */
731 static void
732 datasheet_reader_destroy (struct casereader *reader UNUSED, void *ds_)
733 {
734   struct datasheet *ds = ds_;
735   datasheet_destroy (ds);
736 }
737
738 /* "advance" function for the datasheet random casereader. */
739 static void
740 datasheet_reader_advance (struct casereader *reader UNUSED, void *ds_,
741                           casenumber n_cases)
742 {
743   struct datasheet *ds = ds_;
744   datasheet_delete_rows (ds, 0, n_cases);
745 }
746
747 /* Random casereader class for a datasheet. */
748 static const struct casereader_random_class datasheet_reader_class =
749   {
750     datasheet_reader_read,
751     datasheet_reader_destroy,
752     datasheet_reader_advance,
753   };
754 \f
755 static void
756 allocate_column (struct datasheet *ds, int width, struct column *column)
757 {
758   caseproto_unref (ds->proto);
759   ds->proto = NULL;
760
761   column->value_ofs = -1;
762   column->width = width;
763   assert (width >= 0);
764   if (width >= 0)
765     {
766       int n_bytes;
767       size_t i;
768
769       n_bytes = width_to_n_bytes (width);
770       for (i = 0; i < ds->n_sources; i++)
771         {
772           column->source = ds->sources[i];
773           column->byte_ofs = source_allocate_column (column->source, n_bytes);
774           if (column->byte_ofs >= 0)
775             return;
776         }
777
778       column->source = source_create_empty (MAX (n_bytes,
779                                                  ds->column_min_alloc));
780       ds->sources = xnrealloc (ds->sources,
781                                ds->n_sources + 1, sizeof *ds->sources);
782       ds->sources[ds->n_sources++] = column->source;
783
784       ds->column_min_alloc = MIN (65536, ds->column_min_alloc * 2);
785
786       column->byte_ofs = source_allocate_column (column->source, n_bytes);
787       assert (column->byte_ofs >= 0);
788     }
789   else
790     {
791       column->source = NULL;
792       column->byte_ofs = -1;
793     }
794 }
795
796 static void
797 release_source (struct datasheet *ds, struct source *source)
798 {
799   if (source_has_backing (source) && !source_in_use (source))
800     {
801       /* Since only the first source to be added ever
802          has a backing, this source must have index
803          0.  */
804       assert (source == ds->sources[0]);
805       ds->sources[0] = ds->sources[--ds->n_sources];
806       source_destroy (source);
807     }
808 }
809
810 /* Reads (if OP is OP_READ) or writes (if op is OP_WRITE) the
811    N_COLUMNS columns starting from column START_COLUMN in row
812    LROW to/from the N_COLUMNS values in DATA. */
813 static bool
814 rw_case (struct datasheet *ds, enum rw_op op,
815          casenumber lrow, size_t start_column, size_t n_columns,
816          union value data[])
817 {
818   struct column *columns = &ds->columns[start_column];
819   casenumber prow;
820   size_t i;
821
822   assert (lrow < datasheet_get_n_rows (ds));
823   assert (n_columns <= datasheet_get_n_columns (ds));
824   assert (start_column + n_columns <= datasheet_get_n_columns (ds));
825
826   prow = axis_map (ds->rows, lrow);
827   for (i = 0; i < n_columns;)
828     {
829       struct source *source = columns[i].source;
830       size_t j;
831       bool ok;
832
833       if (columns[i].width < 0)
834         {
835           i++;
836           continue;
837         }
838
839       for (j = i + 1; j < n_columns; j++)
840         if (columns[j].width < 0 || columns[j].source != source)
841           break;
842
843       if (op == OP_READ)
844         ok = source_read (&columns[i], prow, &data[i], j - i);
845       else
846         ok = source_write (&columns[i], prow, &data[i], j - i);
847
848       if (!ok)
849         {
850           taint_set_taint (ds->taint);
851           return false;
852         }
853
854       i = j;
855     }
856   return true;
857 }
858 \f
859 /* An axis.
860
861    An axis has two functions.  First, it maintains a mapping from
862    logical (client-visible) to physical (storage) ordinates.  The
863    axis_map and axis_get_size functions inspect this mapping, and
864    the axis_insert, axis_remove, and axis_move functions modify
865    it.  Second, it tracks the set of ordinates that are unused
866    and available for reuse.  The axis_allocate,
867    axis_make_available, and axis_extend functions affect the set
868    of available ordinates. */
869 struct axis
870 {
871   struct tower log_to_phy;     /* Map from logical to physical ordinates;
872                                   contains "struct axis_group"s. */
873   struct range_set *available; /* Set of unused, available ordinates. */
874   unsigned long int phy_size;  /* Current physical length of axis. */
875 };
876
877 /* A mapping from logical to physical ordinates. */
878 struct axis_group
879 {
880   struct tower_node logical;  /* Range of logical ordinates. */
881   unsigned long phy_start;    /* First corresponding physical ordinate. */
882 };
883
884 static struct axis_group *axis_group_from_tower_node (struct tower_node *);
885 static struct tower_node *make_axis_group (unsigned long int phy_start);
886 static struct tower_node *split_axis (struct axis *, unsigned long int where);
887 static void merge_axis_nodes (struct axis *, struct tower_node *,
888                               struct tower_node **other_node);
889 static void check_axis_merged (const struct axis *axis UNUSED);
890
891 /* Creates and returns a new, initially empty axis. */
892 static struct axis *
893 axis_create (void)
894 {
895   struct axis *axis = xmalloc (sizeof *axis);
896   tower_init (&axis->log_to_phy);
897   axis->available = range_set_create ();
898   axis->phy_size = 0;
899   return axis;
900 }
901
902 /* Returns a clone of existing axis OLD.
903
904    Currently this is used only by the datasheet model checker
905    driver, but it could be otherwise useful. */
906 static struct axis *
907 axis_clone (const struct axis *old)
908 {
909   const struct tower_node *node;
910   struct axis *new;
911
912   new = xmalloc (sizeof *new);
913   tower_init (&new->log_to_phy);
914   new->available = range_set_clone (old->available, NULL);
915   new->phy_size = old->phy_size;
916
917   for (node = tower_first (&old->log_to_phy); node != NULL;
918        node = tower_next (&old->log_to_phy, node))
919     {
920       unsigned long int size = tower_node_get_size (node);
921       struct axis_group *group = tower_data (node, struct axis_group, logical);
922       tower_insert (&new->log_to_phy, size, make_axis_group (group->phy_start),
923                     NULL);
924     }
925
926   return new;
927 }
928
929 /* Adds the state of AXIS to the MD4 hash context CTX.
930
931    This is only used by the datasheet model checker driver.  It
932    is unlikely to be otherwise useful. */
933 static void
934 axis_hash (const struct axis *axis, struct md4_ctx *ctx)
935 {
936   const struct tower_node *tn;
937   const struct range_set_node *rsn;
938
939   for (tn = tower_first (&axis->log_to_phy); tn != NULL;
940        tn = tower_next (&axis->log_to_phy, tn))
941     {
942       struct axis_group *group = tower_data (tn, struct axis_group, logical);
943       unsigned long int phy_start = group->phy_start;
944       unsigned long int size = tower_node_get_size (tn);
945
946       md4_process_bytes (&phy_start, sizeof phy_start, ctx);
947       md4_process_bytes (&size, sizeof size, ctx);
948     }
949
950   RANGE_SET_FOR_EACH (rsn, axis->available)
951     {
952       unsigned long int start = range_set_node_get_start (rsn);
953       unsigned long int end = range_set_node_get_end (rsn);
954
955       md4_process_bytes (&start, sizeof start, ctx);
956       md4_process_bytes (&end, sizeof end, ctx);
957     }
958
959   md4_process_bytes (&axis->phy_size, sizeof axis->phy_size, ctx);
960 }
961
962 /* Destroys AXIS. */
963 static void
964 axis_destroy (struct axis *axis)
965 {
966   if (axis == NULL)
967     return;
968
969   while (!tower_is_empty (&axis->log_to_phy))
970     {
971       struct tower_node *node = tower_first (&axis->log_to_phy);
972       struct axis_group *group = tower_data (node, struct axis_group,
973                                              logical);
974       tower_delete (&axis->log_to_phy, node);
975       free (group);
976     }
977
978   range_set_destroy (axis->available);
979   free (axis);
980 }
981
982 /* Allocates up to REQUEST contiguous unused and available
983    ordinates from AXIS.  If successful, stores the number
984    obtained into *WIDTH and the ordinate of the first into
985    *START, marks the ordinates as now unavailable return true.
986    On failure, which occurs only if AXIS has no unused and
987    available ordinates, returns false without modifying AXIS. */
988 static bool
989 axis_allocate (struct axis *axis, unsigned long int request,
990                unsigned long int *start, unsigned long int *width)
991 {
992   return range_set_allocate (axis->available, request, start, width);
993 }
994
995 /* Marks the WIDTH contiguous ordinates in AXIS, starting from
996    START, as unused and available. */
997 static void
998 axis_make_available (struct axis *axis,
999                      unsigned long int start, unsigned long int width)
1000 {
1001   range_set_set1 (axis->available, start, width);
1002 }
1003
1004 /* Extends the total physical length of AXIS by WIDTH and returns
1005    the first ordinate in the new physical region. */
1006 static unsigned long int
1007 axis_extend (struct axis *axis, unsigned long int width)
1008 {
1009   unsigned long int start = axis->phy_size;
1010   axis->phy_size += width;
1011   return start;
1012 }
1013
1014 /* Returns the physical ordinate in AXIS corresponding to logical
1015    ordinate LOG_POS.  LOG_POS must be less than the logical
1016    length of AXIS. */
1017 static unsigned long int
1018 axis_map (const struct axis *axis, unsigned long log_pos)
1019 {
1020   struct tower_node *node;
1021   struct axis_group *group;
1022   unsigned long int group_start;
1023
1024   node = tower_lookup (&axis->log_to_phy, log_pos, &group_start);
1025   group = tower_data (node, struct axis_group, logical);
1026   return group->phy_start + (log_pos - group_start);
1027 }
1028
1029 /* Returns the logical length of AXIS. */
1030 static unsigned long
1031 axis_get_size (const struct axis *axis)
1032 {
1033   return tower_height (&axis->log_to_phy);
1034 }
1035
1036 /* Inserts the N contiguous physical ordinates starting at
1037    PHY_START into AXIS's logical-to-physical mapping, starting at
1038    logical position LOG_START. */
1039 static void
1040 axis_insert (struct axis *axis,
1041              unsigned long int log_start, unsigned long int phy_start,
1042              unsigned long int n)
1043 {
1044   struct tower_node *before = split_axis (axis, log_start);
1045   struct tower_node *new = make_axis_group (phy_start);
1046   tower_insert (&axis->log_to_phy, n, new, before);
1047   merge_axis_nodes (axis, new, NULL);
1048   check_axis_merged (axis);
1049 }
1050
1051 /* Removes N ordinates from AXIS's logical-to-physical mapping
1052    starting at logical position START. */
1053 static void
1054 axis_remove (struct axis *axis,
1055              unsigned long int start, unsigned long int n)
1056 {
1057   if (n > 0)
1058     {
1059       struct tower_node *last = split_axis (axis, start + n);
1060       struct tower_node *cur, *next;
1061       for (cur = split_axis (axis, start); cur != last; cur = next)
1062         {
1063           next = tower_delete (&axis->log_to_phy, cur);
1064           free (axis_group_from_tower_node (cur));
1065         }
1066       merge_axis_nodes (axis, last, NULL);
1067       check_axis_merged (axis);
1068     }
1069 }
1070
1071 /* Moves the N ordinates in AXIS's logical-to-mapping starting
1072    at logical position OLD_START so that they then start at
1073    position NEW_START. */
1074 static void
1075 axis_move (struct axis *axis,
1076            unsigned long int old_start, unsigned long int new_start,
1077            unsigned long int n)
1078 {
1079   if (n > 0 && old_start != new_start)
1080     {
1081       struct tower_node *old_first, *old_last, *new_first;
1082       struct tower_node *merge1, *merge2;
1083       struct tower tmp_array;
1084
1085       /* Move ordinates OLD_START...(OLD_START + N) into new,
1086          separate TMP_ARRAY. */
1087       old_first = split_axis (axis, old_start);
1088       old_last = split_axis (axis, old_start + n);
1089       tower_init (&tmp_array);
1090       tower_splice (&tmp_array, NULL,
1091                     &axis->log_to_phy, old_first, old_last);
1092       merge_axis_nodes (axis, old_last, NULL);
1093       check_axis_merged (axis);
1094
1095       /* Move TMP_ARRAY to position NEW_START. */
1096       new_first = split_axis (axis, new_start);
1097       merge1 = tower_first (&tmp_array);
1098       merge2 = tower_last (&tmp_array);
1099       if (merge2 == merge1)
1100         merge2 = NULL;
1101       tower_splice (&axis->log_to_phy, new_first, &tmp_array, old_first, NULL);
1102       merge_axis_nodes (axis, merge1, &merge2);
1103       merge_axis_nodes (axis, merge2, NULL);
1104       check_axis_merged (axis);
1105     }
1106 }
1107
1108 /* Returns the axis_group in which NODE is embedded. */
1109 static struct axis_group *
1110 axis_group_from_tower_node (struct tower_node *node)
1111 {
1112   return tower_data (node, struct axis_group, logical);
1113 }
1114
1115 /* Creates and returns a new axis_group at physical position
1116    PHY_START. */
1117 static struct tower_node *
1118 make_axis_group (unsigned long phy_start)
1119 {
1120   struct axis_group *group = xmalloc (sizeof *group);
1121   group->phy_start = phy_start;
1122   return &group->logical;
1123 }
1124
1125 /* Returns the tower_node in AXIS's logical-to-physical map whose
1126    bottom edge is at exact level WHERE.  If there is no such
1127    tower_node in AXIS's logical-to-physical map, then split_axis
1128    creates one by breaking an existing tower_node into two
1129    separate ones, unless WHERE is equal to the tower height, in
1130    which case it simply returns a null pointer. */
1131 static struct tower_node *
1132 split_axis (struct axis *axis, unsigned long int where)
1133 {
1134   unsigned long int group_start;
1135   struct tower_node *group_node;
1136   struct axis_group *group;
1137
1138   assert (where <= tower_height (&axis->log_to_phy));
1139   if (where >= tower_height (&axis->log_to_phy))
1140     return NULL;
1141
1142   group_node = tower_lookup (&axis->log_to_phy, where, &group_start);
1143   group = axis_group_from_tower_node (group_node);
1144   if (where > group_start)
1145     {
1146       unsigned long int size_1 = where - group_start;
1147       unsigned long int size_2 = tower_node_get_size (group_node) - size_1;
1148       struct tower_node *next = tower_next (&axis->log_to_phy, group_node);
1149       struct tower_node *new = make_axis_group (group->phy_start + size_1);
1150       tower_resize (&axis->log_to_phy, group_node, size_1);
1151       tower_insert (&axis->log_to_phy, size_2, new, next);
1152       return new;
1153     }
1154   else
1155     return &group->logical;
1156 }
1157
1158 /* Within AXIS, attempts to merge NODE (or the last node in AXIS,
1159    if NODE is null) with its neighbor nodes.  This is possible
1160    when logically adjacent nodes are also adjacent physically (in
1161    the same order).
1162
1163    When a merge occurs, and OTHER_NODE is non-null and points to
1164    the node to be deleted, this function also updates
1165    *OTHER_NODE, if necessary, to ensure that it remains a valid
1166    pointer. */
1167 static void
1168 merge_axis_nodes (struct axis *axis, struct tower_node *node,
1169                   struct tower_node **other_node)
1170 {
1171   struct tower *t = &axis->log_to_phy;
1172   struct axis_group *group;
1173   struct tower_node *next, *prev;
1174
1175   /* Find node to potentially merge with neighbors. */
1176   if (node == NULL)
1177     node = tower_last (t);
1178   if (node == NULL)
1179     return;
1180   group = axis_group_from_tower_node (node);
1181
1182   /* Try to merge NODE with successor. */
1183   next = tower_next (t, node);
1184   if (next != NULL)
1185     {
1186       struct axis_group *next_group = axis_group_from_tower_node (next);
1187       unsigned long this_height = tower_node_get_size (node);
1188
1189       if (group->phy_start + this_height == next_group->phy_start)
1190         {
1191           unsigned long next_height = tower_node_get_size (next);
1192           tower_resize (t, node, this_height + next_height);
1193           if (other_node != NULL && *other_node == next)
1194             *other_node = tower_next (t, *other_node);
1195           tower_delete (t, next);
1196           free (next_group);
1197         }
1198     }
1199
1200   /* Try to merge NODE with predecessor. */
1201   prev = tower_prev (t, node);
1202   if (prev != NULL)
1203     {
1204       struct axis_group *prev_group = axis_group_from_tower_node (prev);
1205       unsigned long prev_height = tower_node_get_size (prev);
1206
1207       if (prev_group->phy_start + prev_height == group->phy_start)
1208         {
1209           unsigned long this_height = tower_node_get_size (node);
1210           group->phy_start = prev_group->phy_start;
1211           tower_resize (t, node, this_height + prev_height);
1212           if (other_node != NULL && *other_node == prev)
1213             *other_node = tower_next (t, *other_node);
1214           tower_delete (t, prev);
1215           free (prev_group);
1216         }
1217     }
1218 }
1219
1220 /* Verify that all potentially merge-able nodes in AXIS are
1221    actually merged. */
1222 static void
1223 check_axis_merged (const struct axis *axis UNUSED)
1224 {
1225 #if ASSERT_LEVEL >= 10
1226   struct tower_node *prev, *node;
1227
1228   for (prev = NULL, node = tower_first (&axis->log_to_phy); node != NULL;
1229        prev = node, node = tower_next (&axis->log_to_phy, node))
1230     if (prev != NULL)
1231       {
1232         struct axis_group *prev_group = axis_group_from_tower_node (prev);
1233         unsigned long prev_height = tower_node_get_size (prev);
1234         struct axis_group *node_group = axis_group_from_tower_node (node);
1235         assert (prev_group->phy_start + prev_height != node_group->phy_start);
1236       }
1237 #endif
1238 }
1239 \f
1240 /* A source. */
1241
1242 /* Creates and returns an empty, unbacked source with N_BYTES
1243    bytes per case, none of which are initially in use. */
1244 static struct source *
1245 source_create_empty (size_t n_bytes)
1246 {
1247   struct source *source = xmalloc (sizeof *source);
1248   size_t row_size = n_bytes + 4 * sizeof (void *);
1249   size_t max_memory_rows = settings_get_workspace () / row_size;
1250   source->avail = range_set_create ();
1251   range_set_set1 (source->avail, 0, n_bytes);
1252   source->data = sparse_xarray_create (n_bytes, MAX (max_memory_rows, 4));
1253   source->backing = NULL;
1254   source->backing_rows = 0;
1255   source->n_used = 0;
1256   return source;
1257 }
1258
1259 /* Creates and returns a new source backed by READER and with the
1260    same initial dimensions and content. */
1261 static struct source *
1262 source_create_casereader (struct casereader *reader)
1263 {
1264   const struct caseproto *proto = casereader_get_proto (reader);
1265   size_t n_bytes = caseproto_to_n_bytes (proto);
1266   struct source *source = source_create_empty (n_bytes);
1267   size_t n_columns;
1268   size_t i;
1269
1270   range_set_set0 (source->avail, 0, n_bytes);
1271   source->backing = reader;
1272   source->backing_rows = casereader_count_cases (reader);
1273
1274   source->n_used = 0;
1275   n_columns = caseproto_get_n_widths (proto);
1276   for (i = 0; i < n_columns; i++)
1277     if (caseproto_get_width (proto, i) >= 0)
1278       source->n_used++;
1279
1280   return source;
1281 }
1282
1283 /* Returns a clone of source OLD with the same data and backing
1284    (if any).
1285
1286    Currently this is used only by the datasheet model checker
1287    driver, but it could be otherwise useful. */
1288 static struct source *
1289 source_clone (const struct source *old)
1290 {
1291   struct source *new = xmalloc (sizeof *new);
1292   new->avail = range_set_clone (old->avail, NULL);
1293   new->data = sparse_xarray_clone (old->data);
1294   new->backing = old->backing != NULL ? casereader_clone (old->backing) : NULL;
1295   new->backing_rows = old->backing_rows;
1296   new->n_used = old->n_used;
1297   if (new->data == NULL)
1298     {
1299       source_destroy (new);
1300       new = NULL;
1301     }
1302   return new;
1303 }
1304
1305 static int
1306 source_allocate_column (struct source *source, int width)
1307 {
1308   unsigned long int start;
1309   int n_bytes;
1310
1311   assert (width >= 0);
1312   n_bytes = width_to_n_bytes (width);
1313   if (source->backing == NULL
1314       && range_set_allocate_fully (source->avail, n_bytes, &start))
1315     return start;
1316   else
1317     return -1;
1318 }
1319
1320 static void
1321 source_release_column (struct source *source, int ofs, int width)
1322 {
1323   assert (width >= 0);
1324   range_set_set1 (source->avail, ofs, width_to_n_bytes (width));
1325   if (source->backing != NULL)
1326     source->n_used--;
1327 }
1328
1329 /* Returns true if SOURCE has any columns in use,
1330    false otherwise. */
1331 static bool
1332 source_in_use (const struct source *source)
1333 {
1334   return source->n_used > 0;
1335 }
1336
1337 /* Destroys SOURCE and its data and backing, if any. */
1338 static void
1339 source_destroy (struct source *source)
1340 {
1341   if (source != NULL)
1342     {
1343       range_set_destroy (source->avail);
1344       sparse_xarray_destroy (source->data);
1345       casereader_destroy (source->backing);
1346       free (source);
1347     }
1348 }
1349
1350 /* Returns the number of rows in SOURCE's backing casereader
1351    (SOURCE must have a backing casereader). */
1352 static casenumber
1353 source_get_backing_n_rows (const struct source *source)
1354 {
1355   assert (source_has_backing (source));
1356   return source->backing_rows;
1357 }
1358
1359 /* Reads the N COLUMNS in the given ROW, into the N VALUES.  Returns true if
1360    successful, false on I/O error.
1361
1362    All of the COLUMNS must have the same source.
1363
1364    The caller must have initialized VALUES with the proper width. */
1365 static bool
1366 source_read (const struct column columns[], casenumber row,
1367              union value values[], size_t n)
1368 {
1369   struct source *source = columns[0].source;
1370   size_t i;
1371
1372   if (source->backing == NULL
1373       || sparse_xarray_contains_row (source->data, row))
1374     {
1375       bool ok = true;
1376
1377       for (i = 0; i < n && ok; i++)
1378         ok = sparse_xarray_read (source->data, row, columns[i].byte_ofs,
1379                                  width_to_n_bytes (columns[i].width),
1380                                  value_to_data (&values[i], columns[i].width));
1381       return ok;
1382     }
1383   else
1384     {
1385       struct ccase *c = casereader_peek (source->backing, row);
1386       bool ok = c != NULL;
1387       if (ok)
1388         {
1389           for (i = 0; i < n; i++)
1390             value_copy (&values[i], case_data_idx (c, columns[i].value_ofs),
1391                         columns[i].width);
1392           case_unref (c);
1393         }
1394       return ok;
1395     }
1396 }
1397
1398 static bool
1399 copy_case_into_source (struct source *source, struct ccase *c, casenumber row)
1400 {
1401   const struct caseproto *proto = casereader_get_proto (source->backing);
1402   size_t n_widths = caseproto_get_n_widths (proto);
1403   size_t ofs;
1404   size_t i;
1405
1406   ofs = 0;
1407   for (i = 0; i < n_widths; i++)
1408     {
1409       int width = caseproto_get_width (proto, i);
1410       assert (width >= 0);
1411       if (width >= 0)
1412         {
1413           int n_bytes = width_to_n_bytes (width);
1414           if (!sparse_xarray_write (source->data, row, ofs, n_bytes,
1415                                     value_to_data (case_data_idx (c, i),
1416                                                    width)))
1417             return false;
1418           ofs += n_bytes;
1419         }
1420     }
1421   return true;
1422 }
1423
1424 /* Writes the N VALUES to their source in the given ROW and COLUMNS.  Returns
1425    true if successful, false on I/O error.  On error, the row's data may be
1426    completely or partially corrupted, both inside and outside the region to be
1427    written.
1428
1429    All of the COLUMNS must have the same source. */
1430 static bool
1431 source_write (const struct column columns[], casenumber row,
1432               const union value values[], size_t n)
1433 {
1434   struct source *source = columns[0].source;
1435   struct casereader *backing = source->backing;
1436   size_t i;
1437
1438   if (backing != NULL
1439       && !sparse_xarray_contains_row (source->data, row)
1440       && row < source->backing_rows)
1441     {
1442       struct ccase *c;
1443       bool ok;
1444
1445       c = casereader_peek (backing, row);
1446       if (c == NULL)
1447         return false;
1448
1449       ok = copy_case_into_source (source, c, row);
1450       case_unref (c);
1451       if (!ok)
1452         return false;
1453     }
1454
1455   for (i = 0; i < n; i++)
1456     if (!sparse_xarray_write (source->data, row, columns[i].byte_ofs,
1457                               width_to_n_bytes (columns[i].width),
1458                               value_to_data (&values[i], columns[i].width)))
1459       return false;
1460   return true;
1461 }
1462
1463 static bool
1464 source_write_column (struct column *column, const union value *value)
1465 {
1466   int width = column->width;
1467
1468   assert (column->source->backing == NULL);
1469   assert (width >= 0);
1470
1471   return sparse_xarray_write_columns (column->source->data, column->byte_ofs,
1472                                       width_to_n_bytes (width),
1473                                       value_to_data (value, width));
1474 }
1475
1476 /* Returns true if SOURCE has a backing casereader, false
1477    otherwise. */
1478 static bool
1479 source_has_backing (const struct source *source)
1480 {
1481   return source->backing != NULL;
1482 }
1483 \f
1484 /* Datasheet model checker test driver. */
1485
1486 static int
1487 get_source_index (const struct datasheet *ds, const struct source *source)
1488 {
1489   size_t i;
1490
1491   for (i = 0; i < ds->n_sources; i++)
1492     if (ds->sources[i] == source)
1493       return i;
1494   NOT_REACHED ();
1495 }
1496
1497 /* Clones the structure and contents of ODS into a new datasheet,
1498    and returns the new datasheet. */
1499 struct datasheet *
1500 clone_datasheet (const struct datasheet *ods)
1501 {
1502   struct datasheet *ds;
1503   size_t i;
1504
1505   ds = xmalloc (sizeof *ds);
1506
1507   ds->sources = xmalloc (ods->n_sources * sizeof *ds->sources);
1508   for (i = 0; i < ods->n_sources; i++)
1509     ds->sources[i] = source_clone (ods->sources[i]);
1510   ds->n_sources = ods->n_sources;
1511
1512   ds->proto = ods->proto != NULL ? caseproto_ref (ods->proto) : NULL;
1513   ds->columns = xmemdup (ods->columns, ods->n_columns * sizeof *ods->columns);
1514   for (i = 0; i < ods->n_columns; i++)
1515     ds->columns[i].source
1516       = ds->sources[get_source_index (ods, ods->columns[i].source)];
1517   ds->n_columns = ods->n_columns;
1518   ds->column_min_alloc = ods->column_min_alloc;
1519
1520   ds->rows = axis_clone (ods->rows);
1521
1522   ds->taint = taint_create ();
1523
1524   return ds;
1525 }
1526
1527 /* Hashes the structure of datasheet DS and returns the hash.
1528    We use MD4 because it is much faster than MD5 or SHA-1 but its
1529    collision resistance is just as good. */
1530 unsigned int
1531 hash_datasheet (const struct datasheet *ds)
1532 {
1533   unsigned int hash[DIV_RND_UP (20, sizeof (unsigned int))];
1534   struct md4_ctx ctx;
1535   size_t i;
1536
1537   md4_init_ctx (&ctx);
1538   for (i = 0; i < ds->n_columns; i++)
1539     {
1540       const struct column *column = &ds->columns[i];
1541       int source_n_bytes = sparse_xarray_get_n_columns (column->source->data);
1542       md4_process_bytes (&source_n_bytes, sizeof source_n_bytes, &ctx);
1543       /*md4_process_bytes (&column->byte_ofs, sizeof column->byte_ofs, &ctx);*/
1544       md4_process_bytes (&column->value_ofs, sizeof column->value_ofs, &ctx);
1545       md4_process_bytes (&column->width, sizeof column->width, &ctx);
1546     }
1547   axis_hash (ds->rows, &ctx);
1548   md4_process_bytes (&ds->column_min_alloc, sizeof ds->column_min_alloc, &ctx);
1549   md4_finish_ctx (&ctx, hash);
1550   return hash[0];
1551 }