Fix constness of datasheet_resize_column arguments
[pspp] / src / data / datasheet.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2007, 2009, 2010, 2011, 2012 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include "data/datasheet.h"
20
21 #include <stdlib.h>
22 #include <string.h>
23
24 #include "data/casereader-provider.h"
25 #include "data/casereader.h"
26 #include "data/casewriter.h"
27 #include "data/lazy-casereader.h"
28 #include "data/settings.h"
29 #include "libpspp/array.h"
30 #include "libpspp/assertion.h"
31 #include "libpspp/misc.h"
32 #include "libpspp/range-map.h"
33 #include "libpspp/range-set.h"
34 #include "libpspp/sparse-xarray.h"
35 #include "libpspp/taint.h"
36 #include "libpspp/tower.h"
37
38 #include "gl/minmax.h"
39 #include "gl/md4.h"
40 #include "gl/xalloc.h"
41
42 struct column;
43
44 static struct axis *axis_create (void);
45 static struct axis *axis_clone (const struct axis *);
46 static void axis_destroy (struct axis *);
47
48 static void axis_hash (const struct axis *, struct md4_ctx *);
49
50 static bool axis_allocate (struct axis *, unsigned long int request,
51                            unsigned long int *start,
52                            unsigned long int *width);
53 static void axis_make_available (struct axis *,
54                                  unsigned long int start,
55                                  unsigned long int width);
56 static unsigned long int axis_extend (struct axis *, unsigned long int width);
57
58 static unsigned long int axis_map (const struct axis *, unsigned long log_pos);
59
60 static unsigned long axis_get_size (const struct axis *);
61 static void axis_insert (struct axis *,
62                          unsigned long int log_start,
63                          unsigned long int phy_start,
64                          unsigned long int cnt);
65 static void axis_remove (struct axis *,
66                          unsigned long int start, unsigned long int cnt);
67 static void axis_move (struct axis *,
68                        unsigned long int old_start,
69                        unsigned long int new_start,
70                        unsigned long int cnt);
71
72 static struct source *source_create_empty (size_t n_bytes);
73 static struct source *source_create_casereader (struct casereader *);
74 static struct source *source_clone (const struct source *);
75 static void source_destroy (struct source *);
76
77 static casenumber source_get_backing_n_rows (const struct source *);
78
79 static int source_allocate_column (struct source *, int width);
80 static void source_release_column (struct source *, int ofs, int width);
81 static bool source_in_use (const struct source *);
82
83 static bool source_read (const struct column *, casenumber row, union value *);
84 static bool source_write (const struct column *, casenumber row,
85                           const union value *);
86 static bool source_write_column (struct column *, const union value *);
87 static bool source_has_backing (const struct source *);
88
89 static int get_source_index (const struct datasheet *ds, const struct source *source);
90
91 /* A datasheet is internally composed from a set of data files,
92    called "sources".  The sources that make up a datasheet must
93    have the same number of rows (cases), but their numbers of
94    columns (variables) may vary.
95
96    A datasheet's external view is produced by mapping (permuting
97    and selecting) its internal data.  Thus, we can rearrange or
98    delete rows or columns simply by modifying the mapping.  We
99    add rows by adding rows to each source and to the row mapping.
100    We add columns by adding a new source, then adding that source
101    to the column mapping.
102
103    Each source in a datasheet can be a casereader or a
104    sparse_xarray.  Casereaders are read-only, so when sources
105    made from casereaders need to be modified, it is done
106    "virtually" through being overlaid by a sparse_xarray. */
107 struct source
108   {
109     struct range_set *avail;    /* Free bytes are set to 1s. */
110     struct sparse_xarray *data; /* Data at top level, atop the backing. */
111     struct casereader *backing; /* Backing casereader (or null). */
112     casenumber backing_rows;    /* Number of rows in backing (if backed). */
113     size_t n_used;              /* Number of column in use (if backed). */
114   };
115
116 /* A logical column. */
117 struct column
118   {
119     struct source *source;      /* Source of the underlying physical column. */
120     int value_ofs;              /* If 'source' has a backing casereader,
121                                    column's value offset in its cases. */
122     int byte_ofs;               /* Byte offset in source's sparse_xarray. */
123     int width;                  /* 0=numeric, otherwise string width. */
124   };
125
126 /* A datasheet. */
127 struct datasheet
128   {
129     /* Data sources. */
130     struct source **sources;    /* Sources, in no particular order. */
131     size_t n_sources;           /* Number of sources. */
132
133     /* Columns. */
134     struct caseproto *proto;    /* Prototype for rows (initialized lazily). */
135     struct column *columns;     /* Logical to physical column mapping. */
136     size_t n_columns;           /* Number of logical columns. */
137     unsigned column_min_alloc;  /* Min. # of columns to put in a new source. */
138
139     /* Rows. */
140     struct axis *rows;          /* Logical to physical row mapping. */
141
142     /* Tainting. */
143     struct taint *taint;        /* Indicates corrupted data. */
144   };
145
146 /* Is this operation a read or a write? */
147 enum rw_op
148   {
149     OP_READ,
150     OP_WRITE
151   };
152
153 static void allocate_column (struct datasheet *, int width, struct column *);
154 static void release_source (struct datasheet *, struct source *);
155 static bool rw_case (struct datasheet *ds, enum rw_op op,
156                      casenumber lrow, size_t start_column, size_t n_columns,
157                      union value data[]);
158
159 /* Returns the number of bytes needed to store a value with the
160    given WIDTH on disk. */
161 static size_t
162 width_to_n_bytes (int width)
163 {
164   return width == 0 ? sizeof (double) : width;
165 }
166
167 /* Returns the address of the data in VALUE (for reading or
168    writing to/from disk).  VALUE must have the given WIDTH. */
169 static void *
170 value_to_data (const union value *value_, int width)
171 {
172   union value *value = (union value *) value_;
173   assert (sizeof value->f == sizeof (double));
174   if (width == 0)
175     return &value->f;
176   else
177     return value_str_rw (value, width);
178 }
179
180 /* Returns the number of bytes needed to store all the values in
181    PROTO on disk. */
182 static size_t
183 caseproto_to_n_bytes (const struct caseproto *proto)
184 {
185   size_t n_bytes;
186   size_t i;
187
188   n_bytes = 0;
189   for (i = 0; i < caseproto_get_n_widths (proto); i++)
190     {
191       int width = caseproto_get_width (proto, i);
192       if (width >= 0)
193         n_bytes += width_to_n_bytes (width);
194     }
195   return n_bytes;
196 }
197
198 /* Creates and returns a new datasheet.
199
200    If READER is nonnull, then the datasheet initially contains
201    the contents of READER. */
202 struct datasheet *
203 datasheet_create (struct casereader *reader)
204 {
205   struct datasheet *ds = xmalloc (sizeof *ds);
206   ds->sources = NULL;
207   ds->n_sources = 0;
208   ds->proto = NULL;
209   ds->columns = NULL;
210   ds->n_columns = 0;
211   ds->column_min_alloc = 8;
212   ds->rows = axis_create ();
213   ds->taint = taint_create ();
214
215   if (reader != NULL)
216     {
217       casenumber n_rows;
218       size_t byte_ofs;
219       size_t i;
220
221       taint_propagate (casereader_get_taint (reader), ds->taint);
222
223       ds->proto = caseproto_ref (casereader_get_proto (reader));
224
225       ds->sources = xmalloc (sizeof *ds->sources);
226       ds->sources[0] = source_create_casereader (reader);
227       ds->n_sources = 1;
228
229       ds->n_columns = caseproto_get_n_widths (ds->proto);
230       ds->columns = xnmalloc (ds->n_columns, sizeof *ds->columns);
231       byte_ofs = 0;
232       for (i = 0; i < ds->n_columns; i++)
233         {
234           struct column *column = &ds->columns[i];
235           int width = caseproto_get_width (ds->proto, i);
236           column->source = ds->sources[0];
237           column->width = width;
238           if (width >= 0)
239             {
240               column->value_ofs = i;
241               column->byte_ofs = byte_ofs;
242               byte_ofs += width_to_n_bytes (column->width);
243             }
244         }
245
246       n_rows = source_get_backing_n_rows (ds->sources[0]);
247       if (n_rows > 0)
248         axis_insert (ds->rows, 0, axis_extend (ds->rows, n_rows), n_rows);
249     }
250
251   return ds;
252 }
253
254 /* Destroys datasheet DS. */
255 void
256 datasheet_destroy (struct datasheet *ds)
257 {
258   size_t i;
259
260   if (ds == NULL)
261     return;
262
263   for (i = 0; i < ds->n_sources; i++)
264     source_destroy (ds->sources[i]);
265   free (ds->sources);
266   caseproto_unref (ds->proto);
267   free (ds->columns);
268   axis_destroy (ds->rows);
269   taint_destroy (ds->taint);
270   free (ds);
271 }
272
273 /* Returns the prototype for the cases in DS.  The caller must
274    not unref the returned prototype. */
275 const struct caseproto *
276 datasheet_get_proto (const struct datasheet *ds_)
277 {
278   struct datasheet *ds = CONST_CAST (struct datasheet *, ds_);
279   if (ds->proto == NULL)
280     {
281       size_t i;
282
283       ds->proto = caseproto_create ();
284       for (i = 0; i < ds->n_columns; i++)
285         ds->proto = caseproto_add_width (ds->proto, ds->columns[i].width);
286     }
287   return ds->proto;
288 }
289
290 /* Returns the width of the given COLUMN within DS.
291    COLUMN must be less than the number of columns in DS. */
292 int
293 datasheet_get_column_width (const struct datasheet *ds, size_t column)
294 {
295   assert (column < datasheet_get_n_columns (ds));
296   return ds->columns[column].width;
297 }
298
299 /* Moves datasheet DS to a new location in memory, and returns
300    the new location.  Afterward, the datasheet must not be
301    accessed at its former location.
302
303    This function is useful for ensuring that all references to a
304    datasheet have been dropped, especially in conjunction with
305    tools like Valgrind. */
306 struct datasheet *
307 datasheet_rename (struct datasheet *ds)
308 {
309   struct datasheet *new = xmemdup (ds, sizeof *ds);
310   free (ds);
311   return new;
312 }
313
314 /* Returns true if datasheet DS is tainted.
315    A datasheet is tainted by an I/O error or by taint
316    propagation to the datasheet. */
317 bool
318 datasheet_error (const struct datasheet *ds)
319 {
320   return taint_is_tainted (ds->taint);
321 }
322
323 /* Marks datasheet DS tainted. */
324 void
325 datasheet_force_error (struct datasheet *ds)
326 {
327   taint_set_taint (ds->taint);
328 }
329
330 /* Returns datasheet DS's taint object. */
331 const struct taint *
332 datasheet_get_taint (const struct datasheet *ds)
333 {
334   return ds->taint;
335 }
336
337 /* Returns the number of rows in DS. */
338 casenumber
339 datasheet_get_n_rows (const struct datasheet *ds)
340 {
341   return axis_get_size (ds->rows);
342 }
343
344 /* Returns the number of columns in DS. */
345 size_t
346 datasheet_get_n_columns (const struct datasheet *ds)
347 {
348   return ds->n_columns;
349 }
350
351 /* Inserts a column of the given WIDTH into datasheet DS just
352    before column BEFORE.  Initializes the contents of each row in
353    the inserted column to VALUE (which must have width WIDTH).
354
355    Returns true if successful, false on failure.  In case of
356    failure, the datasheet is unchanged. */
357 bool
358 datasheet_insert_column (struct datasheet *ds,
359                          const union value *value, int width, size_t before)
360 {
361   struct column *col;
362
363   assert (before <= ds->n_columns);
364
365   ds->columns = xnrealloc (ds->columns,
366                            ds->n_columns + 1, sizeof *ds->columns);
367   insert_element (ds->columns, ds->n_columns, sizeof *ds->columns, before);
368   col = &ds->columns[before];
369   ds->n_columns++;
370
371   allocate_column (ds, width, col);
372
373   if (width >= 0 && !source_write_column (col, value))
374     {
375       datasheet_delete_columns (ds, before, 1);
376       taint_set_taint (ds->taint);
377       return false;
378     }
379
380   return true;
381 }
382
383 /* Deletes the N columns in DS starting from column START. */
384 void
385 datasheet_delete_columns (struct datasheet *ds, size_t start, size_t n)
386 {
387   assert (start + n <= ds->n_columns);
388
389   if (n > 0)
390     {
391       size_t i;
392
393       for (i = start; i < start + n; i++)
394         {
395           struct column *column = &ds->columns[i];
396           struct source *source = column->source;
397           source_release_column (source, column->byte_ofs, column->width);
398           release_source (ds, source);
399         }
400
401       remove_range (ds->columns, ds->n_columns, sizeof *ds->columns, start, n);
402       ds->n_columns -= n;
403
404       caseproto_unref (ds->proto);
405       ds->proto = NULL;
406     }
407 }
408
409 /* Moves the N columns in DS starting at position OLD_START so
410    that they then start at position NEW_START.  Equivalent to
411    deleting the column rows, then inserting them at what becomes
412    position NEW_START after the deletion. */
413 void
414 datasheet_move_columns (struct datasheet *ds,
415                         size_t old_start, size_t new_start,
416                         size_t n)
417 {
418   assert (old_start + n <= ds->n_columns);
419   assert (new_start + n <= ds->n_columns);
420
421   move_range (ds->columns, ds->n_columns, sizeof *ds->columns,
422               old_start, new_start, n);
423
424   caseproto_unref (ds->proto);
425   ds->proto = NULL;
426 }
427
428 struct resize_datasheet_value_aux
429   {
430     union value src_value;
431     size_t src_ofs;
432     int src_width;
433
434     void (*resize_cb) (const union value *, union value *, const void *aux);
435     const void *resize_cb_aux;
436
437     union value dst_value;
438     size_t dst_ofs;
439     int dst_width;
440   };
441
442 static bool
443 resize_datasheet_value (const void *src, void *dst, void *aux_)
444 {
445   struct resize_datasheet_value_aux *aux = aux_;
446
447   memcpy (value_to_data (&aux->src_value, aux->src_width),
448           (uint8_t *) src + aux->src_ofs,
449           width_to_n_bytes (aux->src_width));
450
451   aux->resize_cb (&aux->src_value, &aux->dst_value, aux->resize_cb_aux);
452
453   memcpy ((uint8_t *) dst + aux->dst_ofs,
454           value_to_data (&aux->dst_value, aux->dst_width),
455           width_to_n_bytes (aux->dst_width));
456
457   return true;
458 }
459
460 bool
461 datasheet_resize_column (struct datasheet *ds, size_t column, int new_width,
462                          void (*resize_cb) (const union value *,
463                                             union value *, const void *aux),
464                          const void *resize_cb_aux)
465 {
466   struct column old_col;
467   struct column *col;
468   int old_width;
469
470   assert (column < datasheet_get_n_columns (ds));
471
472   col = &ds->columns[column];
473   old_col = *col;
474   old_width = old_col.width;
475
476   if (new_width == -1)
477     {
478       if (old_width != -1)
479         {
480           datasheet_delete_columns (ds, column, 1);
481           datasheet_insert_column (ds, NULL, -1, column);
482         }
483     }
484   else if (old_width == -1)
485     {
486       union value value;
487       value_init (&value, new_width);
488       value_set_missing (&value, new_width);
489       if (resize_cb != NULL)
490         resize_cb (NULL, &value, resize_cb_aux);
491       datasheet_delete_columns (ds, column, 1);
492       datasheet_insert_column (ds, &value, new_width, column);
493       value_destroy (&value, new_width);
494     }
495   else if (source_has_backing (col->source))
496     {
497       unsigned long int n_rows = axis_get_size (ds->rows);
498       unsigned long int lrow;
499       union value src, dst;
500
501       source_release_column (col->source, col->byte_ofs, col->width);
502       allocate_column (ds, new_width, col);
503
504       value_init (&src, old_width);
505       value_init (&dst, new_width);
506       for (lrow = 0; lrow < n_rows; lrow++)
507         {
508           unsigned long int prow = axis_map (ds->rows, lrow);
509           if (!source_read (&old_col, prow, &src))
510             {
511               /* FIXME: back out col changes. */
512               break;
513             }
514           resize_cb (&src, &dst, resize_cb_aux);
515           if (!source_write (col, prow, &dst))
516             {
517               /* FIXME: back out col changes. */
518               break;
519             }
520         }
521       value_destroy (&src, old_width);
522       value_destroy (&dst, new_width);
523       if (lrow < n_rows)
524         return false;
525
526       release_source (ds, old_col.source);
527     }
528   else
529     {
530       struct resize_datasheet_value_aux aux;
531
532       source_release_column (col->source, col->byte_ofs, col->width);
533       allocate_column (ds, new_width, col);
534
535       value_init (&aux.src_value, old_col.width);
536       aux.src_ofs = old_col.byte_ofs;
537       aux.src_width = old_col.width;
538       aux.resize_cb = resize_cb;
539       aux.resize_cb_aux = resize_cb_aux;
540       value_init (&aux.dst_value, new_width);
541       aux.dst_ofs = col->byte_ofs;
542       aux.dst_width = new_width;
543       sparse_xarray_copy (old_col.source->data, col->source->data,
544                           resize_datasheet_value, &aux);
545       value_destroy (&aux.src_value, old_width);
546       value_destroy (&aux.dst_value, new_width);
547
548       release_source (ds, old_col.source);
549     }
550   return true;
551 }
552
553 /* Retrieves and returns the contents of the given ROW in
554    datasheet DS.  The caller owns the returned case and must
555    unref it when it is no longer needed.  Returns a null pointer
556    on I/O error. */
557 struct ccase *
558 datasheet_get_row (const struct datasheet *ds, casenumber row)
559 {
560   size_t n_columns = datasheet_get_n_columns (ds);
561   struct ccase *c = case_create (datasheet_get_proto (ds));
562   if (rw_case (CONST_CAST (struct datasheet *, ds), OP_READ,
563                row, 0, n_columns, case_data_all_rw (c)))
564     return c;
565   else
566     {
567       case_unref (c);
568       return NULL;
569     }
570 }
571
572 /* Stores the contents of case C, which is destroyed, into the
573    given ROW in DS.  Returns true on success, false on I/O error.
574    On failure, the given ROW might be partially modified or
575    corrupted. */
576 bool
577 datasheet_put_row (struct datasheet *ds, casenumber row, struct ccase *c)
578 {
579   size_t n_columns = datasheet_get_n_columns (ds);
580   bool ok = rw_case (ds, OP_WRITE, row, 0, n_columns,
581                      (union value *) case_data_all (c));
582   case_unref (c);
583   return ok;
584 }
585
586 /* Stores the values of COLUMN in DS in the given ROW in DS into
587    VALUE.  The caller must have already initialized VALUE as a
588    value of the appropriate width (as returned by
589    datasheet_get_column_width (DS, COLUMN)).  Returns true if
590    successful, false on I/O error. */
591 bool
592 datasheet_get_value (const struct datasheet *ds, casenumber row,
593                      size_t column, union value *value)
594 {
595   assert (row >= 0);
596   return rw_case (CONST_CAST (struct datasheet *, ds), OP_READ,
597                   row, column, 1, value);
598 }
599
600 /* Stores VALUE into DS in the given ROW and COLUMN.  VALUE must
601    have the correct width for COLUMN (as returned by
602    datasheet_get_column_width (DS, COLUMN)).  Returns true if
603    successful, false on I/O error.  On failure, ROW might be
604    partially modified or corrupted. */
605 bool
606 datasheet_put_value (struct datasheet *ds UNUSED, casenumber row UNUSED,
607                      size_t column UNUSED, const union value *value UNUSED)
608 {
609   return rw_case (ds, OP_WRITE, row, column, 1, (union value *) value);
610 }
611
612 /* Inserts the CNT cases at C into datasheet DS just before row
613    BEFORE.  Returns true if successful, false on I/O error.  On
614    failure, datasheet DS is not modified.
615
616    Regardless of success, this function unrefs all of the cases
617    in C. */
618 bool
619 datasheet_insert_rows (struct datasheet *ds,
620                        casenumber before, struct ccase *c[],
621                        casenumber cnt)
622 {
623   casenumber added = 0;
624   while (cnt > 0)
625     {
626       unsigned long first_phy;
627       unsigned long phy_cnt;
628       unsigned long i;
629
630       /* Allocate physical rows from the pool of available
631          rows. */
632       if (!axis_allocate (ds->rows, cnt, &first_phy, &phy_cnt))
633         {
634           /* No rows were available.  Extend the row axis to make
635              some new ones available. */
636           phy_cnt = cnt;
637           first_phy = axis_extend (ds->rows, cnt);
638         }
639
640       /* Insert the new rows into the row mapping. */
641       axis_insert (ds->rows, before, first_phy, phy_cnt);
642
643       /* Initialize the new rows. */
644       for (i = 0; i < phy_cnt; i++)
645         if (!datasheet_put_row (ds, before + i, c[i]))
646           {
647             while (++i < cnt)
648               case_unref (c[i]);
649             datasheet_delete_rows (ds, before - added, phy_cnt + added);
650             return false;
651           }
652
653       /* Advance. */
654       c += phy_cnt;
655       cnt -= phy_cnt;
656       before += phy_cnt;
657       added += phy_cnt;
658     }
659   return true;
660 }
661
662 /* Deletes the CNT rows in DS starting from row FIRST. */
663 void
664 datasheet_delete_rows (struct datasheet *ds,
665                        casenumber first, casenumber cnt)
666 {
667   size_t lrow;
668
669   /* Free up rows for reuse.
670      FIXME: optimize. */
671   for (lrow = first; lrow < first + cnt; lrow++)
672     axis_make_available (ds->rows, axis_map (ds->rows, lrow), 1);
673
674   /* Remove rows from logical-to-physical mapping. */
675   axis_remove (ds->rows, first, cnt);
676 }
677
678 /* Moves the CNT rows in DS starting at position OLD_START so
679    that they then start at position NEW_START.  Equivalent to
680    deleting the given rows, then inserting them at what becomes
681    position NEW_START after the deletion. */
682 void
683 datasheet_move_rows (struct datasheet *ds,
684                      size_t old_start, size_t new_start,
685                      size_t cnt)
686 {
687   axis_move (ds->rows, old_start, new_start, cnt);
688 }
689 \f
690 static const struct casereader_random_class datasheet_reader_class;
691
692 /* Creates and returns a casereader whose input cases are the
693    rows in datasheet DS.  From the caller's perspective, DS is
694    effectively destroyed by this operation, such that the caller
695    must not reference it again. */
696 struct casereader *
697 datasheet_make_reader (struct datasheet *ds)
698 {
699   struct casereader *reader;
700   ds = datasheet_rename (ds);
701   reader = casereader_create_random (datasheet_get_proto (ds),
702                                      datasheet_get_n_rows (ds),
703                                      &datasheet_reader_class, ds);
704   taint_propagate (datasheet_get_taint (ds), casereader_get_taint (reader));
705   return reader;
706 }
707
708 /* "read" function for the datasheet random casereader. */
709 static struct ccase *
710 datasheet_reader_read (struct casereader *reader UNUSED, void *ds_,
711                        casenumber case_idx)
712 {
713   struct datasheet *ds = ds_;
714   if (case_idx < datasheet_get_n_rows (ds))
715     {
716       struct ccase *c = datasheet_get_row (ds, case_idx);
717       if (c == NULL)
718         taint_set_taint (ds->taint);
719       return c;
720     }
721   else
722     return NULL;
723 }
724
725 /* "destroy" function for the datasheet random casereader. */
726 static void
727 datasheet_reader_destroy (struct casereader *reader UNUSED, void *ds_)
728 {
729   struct datasheet *ds = ds_;
730   datasheet_destroy (ds);
731 }
732
733 /* "advance" function for the datasheet random casereader. */
734 static void
735 datasheet_reader_advance (struct casereader *reader UNUSED, void *ds_,
736                           casenumber case_cnt)
737 {
738   struct datasheet *ds = ds_;
739   datasheet_delete_rows (ds, 0, case_cnt);
740 }
741
742 /* Random casereader class for a datasheet. */
743 static const struct casereader_random_class datasheet_reader_class =
744   {
745     datasheet_reader_read,
746     datasheet_reader_destroy,
747     datasheet_reader_advance,
748   };
749 \f
750 static void
751 allocate_column (struct datasheet *ds, int width, struct column *column)
752 {
753   caseproto_unref (ds->proto);
754   ds->proto = NULL;
755
756   column->value_ofs = -1;
757   column->width = width;
758   if (width >= 0)
759     {
760       int n_bytes;
761       size_t i;
762
763       n_bytes = width_to_n_bytes (width);
764       for (i = 0; i < ds->n_sources; i++)
765         {
766           column->source = ds->sources[i];
767           column->byte_ofs = source_allocate_column (column->source, n_bytes);
768           if (column->byte_ofs >= 0)
769             return;
770         }
771
772       column->source = source_create_empty (MAX (n_bytes,
773                                                  ds->column_min_alloc));
774       ds->sources = xnrealloc (ds->sources,
775                                ds->n_sources + 1, sizeof *ds->sources);
776       ds->sources[ds->n_sources++] = column->source;
777
778       ds->column_min_alloc = MIN (65536, ds->column_min_alloc * 2);
779
780       column->byte_ofs = source_allocate_column (column->source, n_bytes);
781       assert (column->byte_ofs >= 0);
782     }
783   else
784     {
785       column->source = NULL;
786       column->byte_ofs = -1;
787     }
788 }
789
790 static void
791 release_source (struct datasheet *ds, struct source *source)
792 {
793   if (source_has_backing (source) && !source_in_use (source))
794     {
795       /* Since only the first source to be added ever
796          has a backing, this source must have index
797          0.  */
798       assert (source == ds->sources[0]);
799       ds->sources[0] = ds->sources[--ds->n_sources];
800       source_destroy (source);
801     }
802 }
803
804 /* Reads (if OP is OP_READ) or writes (if op is OP_WRITE) the
805    N_COLUMNS columns starting from column START_COLUMN in row
806    LROW to/from the N_COLUMNS values in DATA. */
807 static bool
808 rw_case (struct datasheet *ds, enum rw_op op,
809          casenumber lrow, size_t start_column, size_t n_columns,
810          union value data[])
811 {
812   casenumber prow;
813   size_t i;
814
815   assert (lrow < datasheet_get_n_rows (ds));
816   assert (n_columns <= datasheet_get_n_columns (ds));
817   assert (start_column + n_columns <= datasheet_get_n_columns (ds));
818
819   prow = axis_map (ds->rows, lrow);
820   for (i = 0; i < n_columns; i++)
821     {
822       struct column *c = &ds->columns[start_column + i];
823       if (c->width >= 0)
824         {
825           bool ok;
826
827           if (op == OP_READ)
828             ok = source_read (c, prow, &data[i]);
829           else
830             ok = source_write (c, prow, &data[i]);
831
832           if (!ok)
833             {
834               taint_set_taint (ds->taint);
835               return false;
836             }
837         }
838     }
839   return true;
840 }
841 \f
842 /* An axis.
843
844    An axis has two functions.  First, it maintains a mapping from
845    logical (client-visible) to physical (storage) ordinates.  The
846    axis_map and axis_get_size functions inspect this mapping, and
847    the axis_insert, axis_remove, and axis_move functions modify
848    it.  Second, it tracks the set of ordinates that are unused
849    and available for reuse.  The axis_allocate,
850    axis_make_available, and axis_extend functions affect the set
851    of available ordinates. */
852 struct axis
853 {
854   struct tower log_to_phy;     /* Map from logical to physical ordinates;
855                                   contains "struct axis_group"s. */
856   struct range_set *available; /* Set of unused, available ordinates. */
857   unsigned long int phy_size;  /* Current physical length of axis. */
858 };
859
860 /* A mapping from logical to physical ordinates. */
861 struct axis_group
862 {
863   struct tower_node logical;  /* Range of logical ordinates. */
864   unsigned long phy_start;    /* First corresponding physical ordinate. */
865 };
866
867 static struct axis_group *axis_group_from_tower_node (struct tower_node *);
868 static struct tower_node *make_axis_group (unsigned long int phy_start);
869 static struct tower_node *split_axis (struct axis *, unsigned long int where);
870 static void merge_axis_nodes (struct axis *, struct tower_node *,
871                               struct tower_node **other_node);
872 static void check_axis_merged (const struct axis *axis UNUSED);
873
874 /* Creates and returns a new, initially empty axis. */
875 static struct axis *
876 axis_create (void)
877 {
878   struct axis *axis = xmalloc (sizeof *axis);
879   tower_init (&axis->log_to_phy);
880   axis->available = range_set_create ();
881   axis->phy_size = 0;
882   return axis;
883 }
884
885 /* Returns a clone of existing axis OLD.
886
887    Currently this is used only by the datasheet model checker
888    driver, but it could be otherwise useful. */
889 static struct axis *
890 axis_clone (const struct axis *old)
891 {
892   const struct tower_node *node;
893   struct axis *new;
894
895   new = xmalloc (sizeof *new);
896   tower_init (&new->log_to_phy);
897   new->available = range_set_clone (old->available, NULL);
898   new->phy_size = old->phy_size;
899
900   for (node = tower_first (&old->log_to_phy); node != NULL;
901        node = tower_next (&old->log_to_phy, node))
902     {
903       unsigned long int size = tower_node_get_size (node);
904       struct axis_group *group = tower_data (node, struct axis_group, logical);
905       tower_insert (&new->log_to_phy, size, make_axis_group (group->phy_start),
906                     NULL);
907     }
908
909   return new;
910 }
911
912 /* Adds the state of AXIS to the MD4 hash context CTX.
913
914    This is only used by the datasheet model checker driver.  It
915    is unlikely to be otherwise useful. */
916 static void
917 axis_hash (const struct axis *axis, struct md4_ctx *ctx)
918 {
919   const struct tower_node *tn;
920   const struct range_set_node *rsn;
921
922   for (tn = tower_first (&axis->log_to_phy); tn != NULL;
923        tn = tower_next (&axis->log_to_phy, tn))
924     {
925       struct axis_group *group = tower_data (tn, struct axis_group, logical);
926       unsigned long int phy_start = group->phy_start;
927       unsigned long int size = tower_node_get_size (tn);
928
929       md4_process_bytes (&phy_start, sizeof phy_start, ctx);
930       md4_process_bytes (&size, sizeof size, ctx);
931     }
932
933   RANGE_SET_FOR_EACH (rsn, axis->available)
934     {
935       unsigned long int start = range_set_node_get_start (rsn);
936       unsigned long int end = range_set_node_get_end (rsn);
937
938       md4_process_bytes (&start, sizeof start, ctx);
939       md4_process_bytes (&end, sizeof end, ctx);
940     }
941
942   md4_process_bytes (&axis->phy_size, sizeof axis->phy_size, ctx);
943 }
944
945 /* Destroys AXIS. */
946 static void
947 axis_destroy (struct axis *axis)
948 {
949   if (axis == NULL)
950     return;
951
952   while (!tower_is_empty (&axis->log_to_phy))
953     {
954       struct tower_node *node = tower_first (&axis->log_to_phy);
955       struct axis_group *group = tower_data (node, struct axis_group,
956                                              logical);
957       tower_delete (&axis->log_to_phy, node);
958       free (group);
959     }
960
961   range_set_destroy (axis->available);
962   free (axis);
963 }
964
965 /* Allocates up to REQUEST contiguous unused and available
966    ordinates from AXIS.  If successful, stores the number
967    obtained into *WIDTH and the ordinate of the first into
968    *START, marks the ordinates as now unavailable return true.
969    On failure, which occurs only if AXIS has no unused and
970    available ordinates, returns false without modifying AXIS. */
971 static bool
972 axis_allocate (struct axis *axis, unsigned long int request,
973                unsigned long int *start, unsigned long int *width)
974 {
975   return range_set_allocate (axis->available, request, start, width);
976 }
977
978 /* Marks the WIDTH contiguous ordinates in AXIS, starting from
979    START, as unused and available. */
980 static void
981 axis_make_available (struct axis *axis,
982                      unsigned long int start, unsigned long int width)
983 {
984   range_set_set1 (axis->available, start, width);
985 }
986
987 /* Extends the total physical length of AXIS by WIDTH and returns
988    the first ordinate in the new physical region. */
989 static unsigned long int
990 axis_extend (struct axis *axis, unsigned long int width)
991 {
992   unsigned long int start = axis->phy_size;
993   axis->phy_size += width;
994   return start;
995 }
996
997 /* Returns the physical ordinate in AXIS corresponding to logical
998    ordinate LOG_POS.  LOG_POS must be less than the logical
999    length of AXIS. */
1000 static unsigned long int
1001 axis_map (const struct axis *axis, unsigned long log_pos)
1002 {
1003   struct tower_node *node;
1004   struct axis_group *group;
1005   unsigned long int group_start;
1006
1007   node = tower_lookup (&axis->log_to_phy, log_pos, &group_start);
1008   group = tower_data (node, struct axis_group, logical);
1009   return group->phy_start + (log_pos - group_start);
1010 }
1011
1012 /* Returns the logical length of AXIS. */
1013 static unsigned long
1014 axis_get_size (const struct axis *axis)
1015 {
1016   return tower_height (&axis->log_to_phy);
1017 }
1018
1019 /* Inserts the CNT contiguous physical ordinates starting at
1020    PHY_START into AXIS's logical-to-physical mapping, starting at
1021    logical position LOG_START. */
1022 static void
1023 axis_insert (struct axis *axis,
1024              unsigned long int log_start, unsigned long int phy_start,
1025              unsigned long int cnt)
1026 {
1027   struct tower_node *before = split_axis (axis, log_start);
1028   struct tower_node *new = make_axis_group (phy_start);
1029   tower_insert (&axis->log_to_phy, cnt, new, before);
1030   merge_axis_nodes (axis, new, NULL);
1031   check_axis_merged (axis);
1032 }
1033
1034 /* Removes CNT ordinates from AXIS's logical-to-physical mapping
1035    starting at logical position START. */
1036 static void
1037 axis_remove (struct axis *axis,
1038              unsigned long int start, unsigned long int cnt)
1039 {
1040   if (cnt > 0)
1041     {
1042       struct tower_node *last = split_axis (axis, start + cnt);
1043       struct tower_node *cur, *next;
1044       for (cur = split_axis (axis, start); cur != last; cur = next)
1045         {
1046           next = tower_delete (&axis->log_to_phy, cur);
1047           free (axis_group_from_tower_node (cur));
1048         }
1049       merge_axis_nodes (axis, last, NULL);
1050       check_axis_merged (axis);
1051     }
1052 }
1053
1054 /* Moves the CNT ordinates in AXIS's logical-to-mapping starting
1055    at logical position OLD_START so that they then start at
1056    position NEW_START. */
1057 static void
1058 axis_move (struct axis *axis,
1059            unsigned long int old_start, unsigned long int new_start,
1060            unsigned long int cnt)
1061 {
1062   if (cnt > 0 && old_start != new_start)
1063     {
1064       struct tower_node *old_first, *old_last, *new_first;
1065       struct tower_node *merge1, *merge2;
1066       struct tower tmp_array;
1067
1068       /* Move ordinates OLD_START...(OLD_START + CNT) into new,
1069          separate TMP_ARRAY. */
1070       old_first = split_axis (axis, old_start);
1071       old_last = split_axis (axis, old_start + cnt);
1072       tower_init (&tmp_array);
1073       tower_splice (&tmp_array, NULL,
1074                     &axis->log_to_phy, old_first, old_last);
1075       merge_axis_nodes (axis, old_last, NULL);
1076       check_axis_merged (axis);
1077
1078       /* Move TMP_ARRAY to position NEW_START. */
1079       new_first = split_axis (axis, new_start);
1080       merge1 = tower_first (&tmp_array);
1081       merge2 = tower_last (&tmp_array);
1082       if (merge2 == merge1)
1083         merge2 = NULL;
1084       tower_splice (&axis->log_to_phy, new_first, &tmp_array, old_first, NULL);
1085       merge_axis_nodes (axis, merge1, &merge2);
1086       merge_axis_nodes (axis, merge2, NULL);
1087       check_axis_merged (axis);
1088     }
1089 }
1090
1091 /* Returns the axis_group in which NODE is embedded. */
1092 static struct axis_group *
1093 axis_group_from_tower_node (struct tower_node *node)
1094 {
1095   return tower_data (node, struct axis_group, logical);
1096 }
1097
1098 /* Creates and returns a new axis_group at physical position
1099    PHY_START. */
1100 static struct tower_node *
1101 make_axis_group (unsigned long phy_start)
1102 {
1103   struct axis_group *group = xmalloc (sizeof *group);
1104   group->phy_start = phy_start;
1105   return &group->logical;
1106 }
1107
1108 /* Returns the tower_node in AXIS's logical-to-physical map whose
1109    bottom edge is at exact level WHERE.  If there is no such
1110    tower_node in AXIS's logical-to-physical map, then split_axis
1111    creates one by breaking an existing tower_node into two
1112    separate ones, unless WHERE is equal to the tower height, in
1113    which case it simply returns a null pointer. */
1114 static struct tower_node *
1115 split_axis (struct axis *axis, unsigned long int where)
1116 {
1117   unsigned long int group_start;
1118   struct tower_node *group_node;
1119   struct axis_group *group;
1120
1121   assert (where <= tower_height (&axis->log_to_phy));
1122   if (where >= tower_height (&axis->log_to_phy))
1123     return NULL;
1124
1125   group_node = tower_lookup (&axis->log_to_phy, where, &group_start);
1126   group = axis_group_from_tower_node (group_node);
1127   if (where > group_start)
1128     {
1129       unsigned long int size_1 = where - group_start;
1130       unsigned long int size_2 = tower_node_get_size (group_node) - size_1;
1131       struct tower_node *next = tower_next (&axis->log_to_phy, group_node);
1132       struct tower_node *new = make_axis_group (group->phy_start + size_1);
1133       tower_resize (&axis->log_to_phy, group_node, size_1);
1134       tower_insert (&axis->log_to_phy, size_2, new, next);
1135       return new;
1136     }
1137   else
1138     return &group->logical;
1139 }
1140
1141 /* Within AXIS, attempts to merge NODE (or the last node in AXIS,
1142    if NODE is null) with its neighbor nodes.  This is possible
1143    when logically adjacent nodes are also adjacent physically (in
1144    the same order).
1145
1146    When a merge occurs, and OTHER_NODE is non-null and points to
1147    the node to be deleted, this function also updates
1148    *OTHER_NODE, if necessary, to ensure that it remains a valid
1149    pointer. */
1150 static void
1151 merge_axis_nodes (struct axis *axis, struct tower_node *node,
1152                   struct tower_node **other_node)
1153 {
1154   struct tower *t = &axis->log_to_phy;
1155   struct axis_group *group;
1156   struct tower_node *next, *prev;
1157
1158   /* Find node to potentially merge with neighbors. */
1159   if (node == NULL)
1160     node = tower_last (t);
1161   if (node == NULL)
1162     return;
1163   group = axis_group_from_tower_node (node);
1164
1165   /* Try to merge NODE with successor. */
1166   next = tower_next (t, node);
1167   if (next != NULL)
1168     {
1169       struct axis_group *next_group = axis_group_from_tower_node (next);
1170       unsigned long this_height = tower_node_get_size (node);
1171
1172       if (group->phy_start + this_height == next_group->phy_start)
1173         {
1174           unsigned long next_height = tower_node_get_size (next);
1175           tower_resize (t, node, this_height + next_height);
1176           if (other_node != NULL && *other_node == next)
1177             *other_node = tower_next (t, *other_node);
1178           tower_delete (t, next);
1179           free (next_group);
1180         }
1181     }
1182
1183   /* Try to merge NODE with predecessor. */
1184   prev = tower_prev (t, node);
1185   if (prev != NULL)
1186     {
1187       struct axis_group *prev_group = axis_group_from_tower_node (prev);
1188       unsigned long prev_height = tower_node_get_size (prev);
1189
1190       if (prev_group->phy_start + prev_height == group->phy_start)
1191         {
1192           unsigned long this_height = tower_node_get_size (node);
1193           group->phy_start = prev_group->phy_start;
1194           tower_resize (t, node, this_height + prev_height);
1195           if (other_node != NULL && *other_node == prev)
1196             *other_node = tower_next (t, *other_node);
1197           tower_delete (t, prev);
1198           free (prev_group);
1199         }
1200     }
1201 }
1202
1203 /* Verify that all potentially merge-able nodes in AXIS are
1204    actually merged. */
1205 static void
1206 check_axis_merged (const struct axis *axis UNUSED)
1207 {
1208 #if ASSERT_LEVEL >= 10
1209   struct tower_node *prev, *node;
1210
1211   for (prev = NULL, node = tower_first (&axis->log_to_phy); node != NULL;
1212        prev = node, node = tower_next (&axis->log_to_phy, node))
1213     if (prev != NULL)
1214       {
1215         struct axis_group *prev_group = axis_group_from_tower_node (prev);
1216         unsigned long prev_height = tower_node_get_size (prev);
1217         struct axis_group *node_group = axis_group_from_tower_node (node);
1218         assert (prev_group->phy_start + prev_height != node_group->phy_start);
1219       }
1220 #endif
1221 }
1222 \f
1223 /* A source. */
1224
1225 /* Creates and returns an empty, unbacked source with N_BYTES
1226    bytes per case, none of which are initially in use. */
1227 static struct source *
1228 source_create_empty (size_t n_bytes)
1229 {
1230   struct source *source = xmalloc (sizeof *source);
1231   size_t row_size = n_bytes + 4 * sizeof (void *);
1232   size_t max_memory_rows = settings_get_workspace () / row_size;
1233   source->avail = range_set_create ();
1234   range_set_set1 (source->avail, 0, n_bytes);
1235   source->data = sparse_xarray_create (n_bytes, MAX (max_memory_rows, 4));
1236   source->backing = NULL;
1237   source->backing_rows = 0;
1238   source->n_used = 0;
1239   return source;
1240 }
1241
1242 /* Creates and returns a new source backed by READER and with the
1243    same initial dimensions and content. */
1244 static struct source *
1245 source_create_casereader (struct casereader *reader)
1246 {
1247   const struct caseproto *proto = casereader_get_proto (reader);
1248   size_t n_bytes = caseproto_to_n_bytes (proto);
1249   struct source *source = source_create_empty (n_bytes);
1250   size_t n_columns;
1251   size_t i;
1252
1253   range_set_set0 (source->avail, 0, n_bytes);
1254   source->backing = reader;
1255   source->backing_rows = casereader_count_cases (reader);
1256
1257   source->n_used = 0;
1258   n_columns = caseproto_get_n_widths (proto);
1259   for (i = 0; i < n_columns; i++)
1260     if (caseproto_get_width (proto, i) >= 0)
1261       source->n_used++;
1262
1263   return source;
1264 }
1265
1266 /* Returns a clone of source OLD with the same data and backing
1267    (if any).
1268
1269    Currently this is used only by the datasheet model checker
1270    driver, but it could be otherwise useful. */
1271 static struct source *
1272 source_clone (const struct source *old)
1273 {
1274   struct source *new = xmalloc (sizeof *new);
1275   new->avail = range_set_clone (old->avail, NULL);
1276   new->data = sparse_xarray_clone (old->data);
1277   new->backing = old->backing != NULL ? casereader_clone (old->backing) : NULL;
1278   new->backing_rows = old->backing_rows;
1279   new->n_used = old->n_used;
1280   if (new->data == NULL)
1281     {
1282       source_destroy (new);
1283       new = NULL;
1284     }
1285   return new;
1286 }
1287
1288 static int
1289 source_allocate_column (struct source *source, int width)
1290 {
1291   unsigned long int start;
1292   int n_bytes;
1293
1294   assert (width >= 0);
1295   n_bytes = width_to_n_bytes (width);
1296   if (source->backing == NULL
1297       && range_set_allocate_fully (source->avail, n_bytes, &start))
1298     return start;
1299   else
1300     return -1;
1301 }
1302
1303 static void
1304 source_release_column (struct source *source, int ofs, int width)
1305 {
1306   assert (width >= 0);
1307   range_set_set1 (source->avail, ofs, width_to_n_bytes (width));
1308   if (source->backing != NULL)
1309     source->n_used--;
1310 }
1311
1312 /* Returns true if SOURCE has any columns in use,
1313    false otherwise. */
1314 static bool
1315 source_in_use (const struct source *source)
1316 {
1317   return source->n_used > 0;
1318 }
1319
1320 /* Destroys SOURCE and its data and backing, if any. */
1321 static void
1322 source_destroy (struct source *source)
1323 {
1324   if (source != NULL)
1325     {
1326       range_set_destroy (source->avail);
1327       sparse_xarray_destroy (source->data);
1328       casereader_destroy (source->backing);
1329       free (source);
1330     }
1331 }
1332
1333 /* Returns the number of rows in SOURCE's backing casereader
1334    (SOURCE must have a backing casereader). */
1335 static casenumber
1336 source_get_backing_n_rows (const struct source *source)
1337 {
1338   assert (source_has_backing (source));
1339   return source->backing_rows;
1340 }
1341
1342 /* Reads the given COLUMN from SOURCE in the given ROW, into
1343    VALUE.  Returns true if successful, false on I/O error.
1344
1345    The caller must have initialized VALUE with the proper
1346    width. */
1347 static bool
1348 source_read (const struct column *column, casenumber row, union value *value)
1349 {
1350   struct source *source = column->source;
1351
1352   assert (column->width >= 0);
1353   if (source->backing == NULL
1354       || sparse_xarray_contains_row (source->data, row))
1355     return sparse_xarray_read (source->data, row, column->byte_ofs,
1356                                width_to_n_bytes (column->width),
1357                                value_to_data (value, column->width));
1358   else
1359     {
1360       struct ccase *c = casereader_peek (source->backing, row);
1361       bool ok = c != NULL;
1362       if (ok)
1363         {
1364           value_copy (value, case_data_idx (c, column->value_ofs),
1365                       column->width);
1366           case_unref (c);
1367         }
1368       return ok;
1369     }
1370 }
1371
1372 static bool
1373 copy_case_into_source (struct source *source, struct ccase *c, casenumber row)
1374 {
1375   const struct caseproto *proto = casereader_get_proto (source->backing);
1376   size_t n_widths = caseproto_get_n_widths (proto);
1377   size_t ofs;
1378   size_t i;
1379
1380   ofs = 0;
1381   for (i = 0; i < n_widths; i++)
1382     {
1383       int width = caseproto_get_width (proto, i);
1384       if (width >= 0)
1385         {
1386           int n_bytes = width_to_n_bytes (width);
1387           if (!sparse_xarray_write (source->data, row, ofs, n_bytes,
1388                                     value_to_data (case_data_idx (c, i),
1389                                                    width)))
1390             return false;
1391           ofs += n_bytes;
1392         }
1393     }
1394   return true;
1395 }
1396
1397 /* Writes VALUE to SOURCE in the given ROW and COLUMN.  Returns
1398    true if successful, false on I/O error.  On error, the row's
1399    data may be completely or partially corrupted, both inside and
1400    outside the region to be written.  */
1401 static bool
1402 source_write (const struct column *column, casenumber row,
1403               const union value *value)
1404 {
1405   struct source *source = column->source;
1406   struct casereader *backing = source->backing;
1407
1408   assert (column->width >= 0);
1409   if (backing != NULL
1410       && !sparse_xarray_contains_row (source->data, row)
1411       && row < source->backing_rows)
1412     {
1413       struct ccase *c;
1414       bool ok;
1415
1416       c = casereader_peek (backing, row);
1417       if (c == NULL)
1418         return false;
1419
1420       ok = copy_case_into_source (source, c, row);
1421       case_unref (c);
1422       if (!ok)
1423         return false;
1424     }
1425
1426   return sparse_xarray_write (source->data, row, column->byte_ofs,
1427                               width_to_n_bytes (column->width),
1428                               value_to_data (value, column->width));
1429 }
1430
1431 /* Within SOURCE, which must not have a backing casereader,
1432    writes the VALUE_CNT values in VALUES_CNT to the VALUE_CNT
1433    columns starting from START_COLUMN, in every row, even in rows
1434    not yet otherwise initialized.  Returns true if successful,
1435    false if an I/O error occurs.
1436
1437    We don't support backing != NULL because (1) it's harder and
1438    (2) this function is only called by
1439    datasheet_insert_column, which doesn't reuse columns from
1440    sources that are backed by casereaders. */
1441 static bool
1442 source_write_column (struct column *column, const union value *value)
1443 {
1444   int width = column->width;
1445
1446   assert (column->source->backing == NULL);
1447   assert (width >= 0);
1448
1449   return sparse_xarray_write_columns (column->source->data, column->byte_ofs,
1450                                       width_to_n_bytes (width),
1451                                       value_to_data (value, width));
1452 }
1453
1454 /* Returns true if SOURCE has a backing casereader, false
1455    otherwise. */
1456 static bool
1457 source_has_backing (const struct source *source)
1458 {
1459   return source->backing != NULL;
1460 }
1461 \f
1462 /* Datasheet model checker test driver. */
1463
1464 static int
1465 get_source_index (const struct datasheet *ds, const struct source *source)
1466 {
1467   size_t i;
1468
1469   for (i = 0; i < ds->n_sources; i++)
1470     if (ds->sources[i] == source)
1471       return i;
1472   NOT_REACHED ();
1473 }
1474
1475 /* Clones the structure and contents of ODS into a new datasheet,
1476    and returns the new datasheet. */
1477 struct datasheet *
1478 clone_datasheet (const struct datasheet *ods)
1479 {
1480   struct datasheet *ds;
1481   size_t i;
1482
1483   ds = xmalloc (sizeof *ds);
1484
1485   ds->sources = xmalloc (ods->n_sources * sizeof *ds->sources);
1486   for (i = 0; i < ods->n_sources; i++)
1487     ds->sources[i] = source_clone (ods->sources[i]);
1488   ds->n_sources = ods->n_sources;
1489
1490   ds->proto = ods->proto != NULL ? caseproto_ref (ods->proto) : NULL;
1491   ds->columns = xmemdup (ods->columns, ods->n_columns * sizeof *ods->columns);
1492   for (i = 0; i < ods->n_columns; i++)
1493     ds->columns[i].source
1494       = ds->sources[get_source_index (ods, ods->columns[i].source)];
1495   ds->n_columns = ods->n_columns;
1496   ds->column_min_alloc = ods->column_min_alloc;
1497
1498   ds->rows = axis_clone (ods->rows);
1499
1500   ds->taint = taint_create ();
1501
1502   return ds;
1503 }
1504
1505 /* Hashes the structure of datasheet DS and returns the hash.
1506    We use MD4 because it is much faster than MD5 or SHA-1 but its
1507    collision resistance is just as good. */
1508 unsigned int
1509 hash_datasheet (const struct datasheet *ds)
1510 {
1511   unsigned int hash[DIV_RND_UP (20, sizeof (unsigned int))];
1512   struct md4_ctx ctx;
1513   size_t i;
1514
1515   md4_init_ctx (&ctx);
1516   for (i = 0; i < ds->n_columns; i++)
1517     {
1518       const struct column *column = &ds->columns[i];
1519       int source_n_bytes = sparse_xarray_get_n_columns (column->source->data);
1520       md4_process_bytes (&source_n_bytes, sizeof source_n_bytes, &ctx);
1521       /*md4_process_bytes (&column->byte_ofs, sizeof column->byte_ofs, &ctx);*/
1522       md4_process_bytes (&column->value_ofs, sizeof column->value_ofs, &ctx);
1523       md4_process_bytes (&column->width, sizeof column->width, &ctx);
1524     }
1525   axis_hash (ds->rows, &ctx);
1526   md4_process_bytes (&ds->column_min_alloc, sizeof ds->column_min_alloc, &ctx);
1527   md4_finish_ctx (&ctx, hash);
1528   return hash[0];
1529 }