datasheet: Fix bugs in datasheet_resize_column() found with new test.
[pspp-builds.git] / src / data / datasheet.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2007, 2009 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <data/datasheet.h>
20
21 #include <stdlib.h>
22 #include <string.h>
23
24 #include <data/casereader-provider.h>
25 #include <data/casereader.h>
26 #include <data/casewriter.h>
27 #include <data/lazy-casereader.h>
28 #include <data/settings.h>
29 #include <libpspp/array.h>
30 #include <libpspp/assertion.h>
31 #include <libpspp/misc.h>
32 #include <libpspp/range-map.h>
33 #include <libpspp/range-set.h>
34 #include <libpspp/sparse-xarray.h>
35 #include <libpspp/taint.h>
36 #include <libpspp/tower.h>
37
38 #include "minmax.h"
39 #include "md4.h"
40 #include "xalloc.h"
41
42 struct column;
43
44 static struct axis *axis_create (void);
45 static struct axis *axis_clone (const struct axis *);
46 static void axis_destroy (struct axis *);
47
48 static void axis_hash (const struct axis *, struct md4_ctx *);
49
50 static bool axis_allocate (struct axis *, unsigned long int request,
51                            unsigned long int *start,
52                            unsigned long int *width);
53 static void axis_make_available (struct axis *,
54                                  unsigned long int start,
55                                  unsigned long int width);
56 static unsigned long int axis_extend (struct axis *, unsigned long int width);
57
58 static unsigned long int axis_map (const struct axis *, unsigned long log_pos);
59
60 static unsigned long axis_get_size (const struct axis *);
61 static void axis_insert (struct axis *,
62                          unsigned long int log_start,
63                          unsigned long int phy_start,
64                          unsigned long int cnt);
65 static void axis_remove (struct axis *,
66                          unsigned long int start, unsigned long int cnt);
67 static void axis_move (struct axis *,
68                        unsigned long int old_start,
69                        unsigned long int new_start,
70                        unsigned long int cnt);
71
72 static struct source *source_create_empty (size_t n_bytes);
73 static struct source *source_create_casereader (struct casereader *);
74 static struct source *source_clone (const struct source *);
75 static void source_destroy (struct source *);
76
77 static casenumber source_get_backing_n_rows (const struct source *);
78
79 static int source_allocate_column (struct source *, int width);
80 static void source_release_column (struct source *, int ofs, int width);
81 static bool source_in_use (const struct source *);
82
83 static bool source_read (const struct column *, casenumber row, union value *);
84 static bool source_write (const struct column *, casenumber row,
85                           const union value *);
86 static bool source_write_column (struct column *, const union value *);
87 static bool source_has_backing (const struct source *);
88
89 static int get_source_index (const struct datasheet *ds, const struct source *source);
90
91 /* A datasheet is internally composed from a set of data files,
92    called "sources".  The sources that make up a datasheet must
93    have the same number of rows (cases), but their numbers of
94    columns (variables) may vary.
95
96    A datasheet's external view is produced by mapping (permuting
97    and selecting) its internal data.  Thus, we can rearrange or
98    delete rows or columns simply by modifying the mapping.  We
99    add rows by adding rows to each source and to the row mapping.
100    We add columns by adding a new source, then adding that source
101    to the column mapping.
102
103    Each source in a datasheet can be a casereader or a
104    sparse_xarray.  Casereaders are read-only, so when sources
105    made from casereaders need to be modified, it is done
106    "virtually" through being overlaid by a sparse_xarray. */
107 struct source
108   {
109     struct range_set *avail;    /* Free bytes are set to 1s. */
110     struct sparse_xarray *data; /* Data at top level, atop the backing. */
111     struct casereader *backing; /* Backing casereader (or null). */
112     casenumber backing_rows;    /* Number of rows in backing (if backed). */
113     size_t n_used;              /* Number of column in use (if backed). */
114   };
115
116 /* A logical column. */
117 struct column
118   {
119     struct source *source;      /* Source of the underlying physical column. */
120     int value_ofs;              /* If 'source' has a backing casereader,
121                                    column's value offset in its cases. */
122     int byte_ofs;               /* Byte offset in source's sparse_xarray. */
123     int width;                  /* 0=numeric, otherwise string width. */
124   };
125
126 /* A datasheet. */
127 struct datasheet
128   {
129     /* Data sources. */
130     struct source **sources;    /* Sources, in no particular order. */
131     size_t n_sources;           /* Number of sources. */
132
133     /* Columns. */
134     struct caseproto *proto;    /* Prototype for rows (initialized lazily). */
135     struct column *columns;     /* Logical to physical column mapping. */
136     size_t n_columns;           /* Number of logical columns. */
137     unsigned column_min_alloc;  /* Min. # of columns to put in a new source. */
138
139     /* Rows. */
140     struct axis *rows;          /* Logical to physical row mapping. */
141
142     /* Tainting. */
143     struct taint *taint;        /* Indicates corrupted data. */
144   };
145
146 /* Is this operation a read or a write? */
147 enum rw_op
148   {
149     OP_READ,
150     OP_WRITE
151   };
152
153 static void allocate_column (struct datasheet *, int width, struct column *);
154 static void release_source (struct datasheet *, struct source *);
155 static bool rw_case (struct datasheet *ds, enum rw_op op,
156                      casenumber lrow, size_t start_column, size_t n_columns,
157                      union value data[]);
158
159 /* Returns the number of bytes needed to store a value with the
160    given WIDTH on disk. */
161 static size_t
162 width_to_n_bytes (int width)
163 {
164   return width == 0 ? sizeof (double) : width;
165 }
166
167 /* Returns the address of the data in VALUE (for reading or
168    writing to/from disk).  VALUE must have the given WIDTH. */
169 static void *
170 value_to_data (const union value *value_, int width)
171 {
172   union value *value = (union value *) value_;
173   assert (sizeof value->f == sizeof (double));
174   if (width == 0)
175     return &value->f;
176   else
177     return value_str_rw (value, width);
178 }
179
180 /* Returns the number of bytes needed to store all the values in
181    PROTO on disk. */
182 static size_t
183 caseproto_to_n_bytes (const struct caseproto *proto)
184 {
185   size_t n_bytes;
186   size_t i;
187
188   n_bytes = 0;
189   for (i = 0; i < caseproto_get_n_widths (proto); i++)
190     {
191       int width = caseproto_get_width (proto, i);
192       if (width >= 0)
193         n_bytes += width_to_n_bytes (width);
194     }
195   return n_bytes;
196 }
197
198 /* Creates and returns a new datasheet.
199
200    If READER is nonnull, then the datasheet initially contains
201    the contents of READER. */
202 struct datasheet *
203 datasheet_create (struct casereader *reader)
204 {
205   struct datasheet *ds = xmalloc (sizeof *ds);
206   ds->sources = NULL;
207   ds->n_sources = 0;
208   ds->proto = NULL;
209   ds->columns = NULL;
210   ds->n_columns = 0;
211   ds->column_min_alloc = 8;
212   ds->rows = axis_create ();
213   ds->taint = taint_create ();
214
215   if (reader != NULL)
216     {
217       casenumber n_rows;
218       size_t byte_ofs;
219       size_t i;
220
221       taint_propagate (casereader_get_taint (reader), ds->taint);
222
223       ds->proto = caseproto_ref (casereader_get_proto (reader));
224
225       ds->sources = xmalloc (sizeof *ds->sources);
226       ds->sources[0] = source_create_casereader (reader);
227       ds->n_sources = 1;
228
229       ds->n_columns = caseproto_get_n_widths (ds->proto);
230       ds->columns = xnmalloc (ds->n_columns, sizeof *ds->columns);
231       byte_ofs = 0;
232       for (i = 0; i < ds->n_columns; i++)
233         {
234           struct column *column = &ds->columns[i];
235           int width = caseproto_get_width (ds->proto, i);
236           column->source = ds->sources[0];
237           column->width = width;
238           if (width >= 0)
239             {
240               column->value_ofs = i;
241               column->byte_ofs = byte_ofs;
242               byte_ofs += width_to_n_bytes (column->width);
243             }
244         }
245
246       n_rows = source_get_backing_n_rows (ds->sources[0]);
247       if (n_rows > 0)
248         axis_insert (ds->rows, 0, axis_extend (ds->rows, n_rows), n_rows);
249     }
250
251   return ds;
252 }
253
254 /* Destroys datasheet DS. */
255 void
256 datasheet_destroy (struct datasheet *ds)
257 {
258   size_t i;
259
260   if (ds == NULL)
261     return;
262
263   for (i = 0; i < ds->n_sources; i++)
264     source_destroy (ds->sources[i]);
265   free (ds->sources);
266   caseproto_unref (ds->proto);
267   free (ds->columns);
268   axis_destroy (ds->rows);
269   taint_destroy (ds->taint);
270   free (ds);
271 }
272
273 /* Returns the prototype for the cases in DS.  The caller must
274    not unref the returned prototype. */
275 const struct caseproto *
276 datasheet_get_proto (const struct datasheet *ds_)
277 {
278   struct datasheet *ds = (struct datasheet *) ds_;
279   if (ds->proto == NULL)
280     {
281       size_t i;
282
283       ds->proto = caseproto_create ();
284       for (i = 0; i < ds->n_columns; i++)
285         ds->proto = caseproto_add_width (ds->proto, ds->columns[i].width);
286     }
287   return ds->proto;
288 }
289
290 /* Returns the width of the given COLUMN within DS.
291    COLUMN must be less than the number of columns in DS. */
292 int
293 datasheet_get_column_width (const struct datasheet *ds, size_t column)
294 {
295   assert (column < datasheet_get_n_columns (ds));
296   return ds->columns[column].width;
297 }
298
299 /* Moves datasheet DS to a new location in memory, and returns
300    the new location.  Afterward, the datasheet must not be
301    accessed at its former location.
302
303    This function is useful for ensuring that all references to a
304    datasheet have been dropped, especially in conjunction with
305    tools like Valgrind. */
306 struct datasheet *
307 datasheet_rename (struct datasheet *ds)
308 {
309   struct datasheet *new = xmemdup (ds, sizeof *ds);
310   free (ds);
311   return new;
312 }
313
314 /* Returns true if datasheet DS is tainted.
315    A datasheet is tainted by an I/O error or by taint
316    propagation to the datasheet. */
317 bool
318 datasheet_error (const struct datasheet *ds)
319 {
320   return taint_is_tainted (ds->taint);
321 }
322
323 /* Marks datasheet DS tainted. */
324 void
325 datasheet_force_error (struct datasheet *ds)
326 {
327   taint_set_taint (ds->taint);
328 }
329
330 /* Returns datasheet DS's taint object. */
331 const struct taint *
332 datasheet_get_taint (const struct datasheet *ds)
333 {
334   return ds->taint;
335 }
336
337 /* Returns the number of rows in DS. */
338 casenumber
339 datasheet_get_n_rows (const struct datasheet *ds)
340 {
341   return axis_get_size (ds->rows);
342 }
343
344 /* Returns the number of columns in DS. */
345 size_t
346 datasheet_get_n_columns (const struct datasheet *ds)
347 {
348   return ds->n_columns;
349 }
350
351 /* Inserts a column of the given WIDTH into datasheet DS just
352    before column BEFORE.  Initializes the contents of each row in
353    the inserted column to VALUE (which must have width WIDTH).
354
355    Returns true if successful, false on failure.  In case of
356    failure, the datasheet is unchanged. */
357 bool
358 datasheet_insert_column (struct datasheet *ds,
359                          const union value *value, int width, size_t before)
360 {
361   struct column *col;
362
363   ds->columns = xnrealloc (ds->columns,
364                            ds->n_columns + 1, sizeof *ds->columns);
365   insert_element (ds->columns, ds->n_columns, sizeof *ds->columns, before);
366   col = &ds->columns[before];
367   ds->n_columns++;
368
369   allocate_column (ds, width, col);
370
371   if (width >= 0 && !source_write_column (col, value))
372     {
373       datasheet_delete_columns (ds, before, 1);
374       taint_set_taint (ds->taint);
375       return false;
376     }
377
378   return true;
379 }
380
381 /* Deletes the N columns in DS starting from column START. */
382 void
383 datasheet_delete_columns (struct datasheet *ds, size_t start, size_t n)
384 {
385   if (n > 0)
386     {
387       size_t i;
388
389       for (i = start; i < start + n; i++)
390         {
391           struct column *column = &ds->columns[i];
392           struct source *source = column->source;
393           source_release_column (source, column->byte_ofs, column->width);
394           release_source (ds, source);
395         }
396
397       remove_range (ds->columns, ds->n_columns, sizeof *ds->columns, start, n);
398       ds->n_columns -= n;
399
400       caseproto_unref (ds->proto);
401       ds->proto = NULL;
402     }
403 }
404
405 /* Moves the N columns in DS starting at position OLD_START so
406    that they then start at position NEW_START.  Equivalent to
407    deleting the column rows, then inserting them at what becomes
408    position NEW_START after the deletion. */
409 void
410 datasheet_move_columns (struct datasheet *ds,
411                         size_t old_start, size_t new_start,
412                         size_t n)
413 {
414   move_range (ds->columns, ds->n_columns, sizeof *ds->columns,
415               old_start, new_start, n);
416
417   caseproto_unref (ds->proto);
418   ds->proto = NULL;
419 }
420
421 struct resize_datasheet_value_aux
422   {
423     union value src_value;
424     size_t src_ofs;
425     int src_width;
426
427     void (*resize_cb) (const union value *, union value *, void *aux);
428     void *resize_cb_aux;
429
430     union value dst_value;
431     size_t dst_ofs;
432     int dst_width;
433   };
434
435 static bool
436 resize_datasheet_value (const void *src, void *dst, void *aux_)
437 {
438   struct resize_datasheet_value_aux *aux = aux_;
439
440   memcpy (value_to_data (&aux->src_value, aux->src_width),
441           (uint8_t *) src + aux->src_ofs,
442           width_to_n_bytes (aux->src_width));
443
444   aux->resize_cb (&aux->src_value, &aux->dst_value, aux->resize_cb_aux);
445
446   memcpy ((uint8_t *) dst + aux->dst_ofs,
447           value_to_data (&aux->dst_value, aux->dst_width),
448           width_to_n_bytes (aux->dst_width));
449
450   return true;
451 }
452
453 bool
454 datasheet_resize_column (struct datasheet *ds, size_t column, int new_width,
455                          void (*resize_cb) (const union value *,
456                                             union value *, void *aux),
457                          void *resize_cb_aux)
458 {
459   struct column old_col;
460   struct column *col;
461   int old_width;
462
463   assert (column < datasheet_get_n_columns (ds));
464
465   col = &ds->columns[column];
466   old_col = *col;
467   old_width = old_col.width;
468
469   if (new_width == -1)
470     {
471       if (old_width != -1)
472         {
473           datasheet_delete_columns (ds, column, 1);
474           datasheet_insert_column (ds, NULL, -1, column);
475         }
476     }
477   else if (old_width == -1)
478     {
479       union value value;
480       value_init (&value, new_width);
481       value_set_missing (&value, new_width);
482       if (resize_cb != NULL)
483         resize_cb (NULL, &value, resize_cb_aux);
484       datasheet_delete_columns (ds, column, 1);
485       datasheet_insert_column (ds, &value, new_width, column);
486       value_destroy (&value, new_width);
487     }
488   else if (source_has_backing (col->source))
489     {
490       unsigned long int n_rows = axis_get_size (ds->rows);
491       unsigned long int lrow;
492       union value src, dst;
493
494       source_release_column (col->source, col->byte_ofs, col->width);
495       allocate_column (ds, new_width, col);
496
497       value_init (&src, old_width);
498       value_init (&dst, new_width);
499       for (lrow = 0; lrow < n_rows; lrow++)
500         {
501           unsigned long int prow = axis_map (ds->rows, lrow);
502           if (!source_read (&old_col, prow, &src))
503             {
504               /* FIXME: back out col changes. */
505               return false;
506             }
507           resize_cb (&src, &dst, resize_cb_aux);
508           if (!source_write (col, prow, &dst))
509             {
510               /* FIXME: back out col changes. */
511               return false;
512             }
513         }
514
515       release_source (ds, old_col.source);
516     }
517   else
518     {
519       struct resize_datasheet_value_aux aux;
520
521       source_release_column (col->source, col->byte_ofs, col->width);
522       allocate_column (ds, new_width, col);
523
524       value_init (&aux.src_value, old_col.width);
525       aux.src_ofs = old_col.byte_ofs;
526       aux.src_width = old_col.width;
527       aux.resize_cb = resize_cb;
528       aux.resize_cb_aux = resize_cb_aux;
529       value_init (&aux.dst_value, new_width);
530       aux.dst_ofs = col->byte_ofs;
531       aux.dst_width = new_width;
532       sparse_xarray_copy (old_col.source->data, col->source->data,
533                           resize_datasheet_value, &aux);
534       value_destroy (&aux.src_value, old_width);
535       value_destroy (&aux.dst_value, new_width);
536
537       release_source (ds, old_col.source);
538     }
539   return true;
540 }
541
542 /* Retrieves and returns the contents of the given ROW in
543    datasheet DS.  The caller owns the returned case and must
544    unref it when it is no longer needed.  Returns a null pointer
545    on I/O error. */
546 struct ccase *
547 datasheet_get_row (const struct datasheet *ds, casenumber row)
548 {
549   size_t n_columns = datasheet_get_n_columns (ds);
550   struct ccase *c = case_create (datasheet_get_proto (ds));
551   if (rw_case ((struct datasheet *) ds, OP_READ,
552                row, 0, n_columns, case_data_all_rw (c)))
553     return c;
554   else
555     {
556       case_unref (c);
557       return NULL;
558     }
559 }
560
561 /* Stores the contents of case C, which is destroyed, into the
562    given ROW in DS.  Returns true on success, false on I/O error.
563    On failure, the given ROW might be partially modified or
564    corrupted. */
565 bool
566 datasheet_put_row (struct datasheet *ds, casenumber row, struct ccase *c)
567 {
568   size_t n_columns = datasheet_get_n_columns (ds);
569   bool ok = rw_case (ds, OP_WRITE, row, 0, n_columns,
570                      (union value *) case_data_all (c));
571   case_unref (c);
572   return ok;
573 }
574
575 /* Stores the values of COLUMN in DS in the given ROW in DS into
576    VALUE.  The caller must have already initialized VALUE as a
577    value of the appropriate width (as returned by
578    datasheet_get_column_width (DS, COLUMN)).  Returns true if
579    successful, false on I/O error. */
580 bool
581 datasheet_get_value (const struct datasheet *ds, casenumber row,
582                      size_t column, union value *value)
583 {
584   assert (row >= 0);
585   return rw_case ((struct datasheet *) ds, OP_READ, row, column, 1, value);
586 }
587
588 /* Stores VALUE into DS in the given ROW and COLUMN.  VALUE must
589    have the correct width for COLUMN (as returned by
590    datasheet_get_column_width (DS, COLUMN)).  Returns true if
591    successful, false on I/O error.  On failure, ROW might be
592    partially modified or corrupted. */
593 bool
594 datasheet_put_value (struct datasheet *ds UNUSED, casenumber row UNUSED,
595                      size_t column UNUSED, const union value *value UNUSED)
596 {
597   return rw_case (ds, OP_WRITE, row, column, 1, (union value *) value);
598 }
599
600 /* Inserts the CNT cases at C into datasheet DS just before row
601    BEFORE.  Returns true if successful, false on I/O error.  On
602    failure, datasheet DS is not modified.
603
604    Regardless of success, this function unrefs all of the cases
605    in C. */
606 bool
607 datasheet_insert_rows (struct datasheet *ds,
608                        casenumber before, struct ccase *c[],
609                        casenumber cnt)
610 {
611   casenumber added = 0;
612   while (cnt > 0)
613     {
614       unsigned long first_phy;
615       unsigned long phy_cnt;
616       unsigned long i;
617
618       /* Allocate physical rows from the pool of available
619          rows. */
620       if (!axis_allocate (ds->rows, cnt, &first_phy, &phy_cnt))
621         {
622           /* No rows were available.  Extend the row axis to make
623              some new ones available. */
624           phy_cnt = cnt;
625           first_phy = axis_extend (ds->rows, cnt);
626         }
627
628       /* Insert the new rows into the row mapping. */
629       axis_insert (ds->rows, before, first_phy, phy_cnt);
630
631       /* Initialize the new rows. */
632       for (i = 0; i < phy_cnt; i++)
633         if (!datasheet_put_row (ds, before + i, c[i]))
634           {
635             while (++i < cnt)
636               case_unref (c[i]);
637             datasheet_delete_rows (ds, before - added, phy_cnt + added);
638             return false;
639           }
640
641       /* Advance. */
642       c += phy_cnt;
643       cnt -= phy_cnt;
644       before += phy_cnt;
645       added += phy_cnt;
646     }
647   return true;
648 }
649
650 /* Deletes the CNT rows in DS starting from row FIRST. */
651 void
652 datasheet_delete_rows (struct datasheet *ds,
653                        casenumber first, casenumber cnt)
654 {
655   size_t lrow;
656
657   /* Free up rows for reuse.
658      FIXME: optimize. */
659   for (lrow = first; lrow < first + cnt; lrow++)
660     axis_make_available (ds->rows, axis_map (ds->rows, lrow), 1);
661
662   /* Remove rows from logical-to-physical mapping. */
663   axis_remove (ds->rows, first, cnt);
664 }
665
666 /* Moves the CNT rows in DS starting at position OLD_START so
667    that they then start at position NEW_START.  Equivalent to
668    deleting the given rows, then inserting them at what becomes
669    position NEW_START after the deletion. */
670 void
671 datasheet_move_rows (struct datasheet *ds,
672                      size_t old_start, size_t new_start,
673                      size_t cnt)
674 {
675   axis_move (ds->rows, old_start, new_start, cnt);
676 }
677 \f
678 static const struct casereader_random_class datasheet_reader_class;
679
680 /* Creates and returns a casereader whose input cases are the
681    rows in datasheet DS.  From the caller's perspective, DS is
682    effectively destroyed by this operation, such that the caller
683    must not reference it again. */
684 struct casereader *
685 datasheet_make_reader (struct datasheet *ds)
686 {
687   struct casereader *reader;
688   ds = datasheet_rename (ds);
689   reader = casereader_create_random (datasheet_get_proto (ds),
690                                      datasheet_get_n_rows (ds),
691                                      &datasheet_reader_class, ds);
692   taint_propagate (datasheet_get_taint (ds), casereader_get_taint (reader));
693   return reader;
694 }
695
696 /* "read" function for the datasheet random casereader. */
697 static struct ccase *
698 datasheet_reader_read (struct casereader *reader UNUSED, void *ds_,
699                        casenumber case_idx)
700 {
701   struct datasheet *ds = ds_;
702   if (case_idx < datasheet_get_n_rows (ds))
703     {
704       struct ccase *c = datasheet_get_row (ds, case_idx);
705       if (c == NULL)
706         taint_set_taint (ds->taint);
707       return c;
708     }
709   else
710     return NULL;
711 }
712
713 /* "destroy" function for the datasheet random casereader. */
714 static void
715 datasheet_reader_destroy (struct casereader *reader UNUSED, void *ds_)
716 {
717   struct datasheet *ds = ds_;
718   datasheet_destroy (ds);
719 }
720
721 /* "advance" function for the datasheet random casereader. */
722 static void
723 datasheet_reader_advance (struct casereader *reader UNUSED, void *ds_,
724                           casenumber case_cnt)
725 {
726   struct datasheet *ds = ds_;
727   datasheet_delete_rows (ds, 0, case_cnt);
728 }
729
730 /* Random casereader class for a datasheet. */
731 static const struct casereader_random_class datasheet_reader_class =
732   {
733     datasheet_reader_read,
734     datasheet_reader_destroy,
735     datasheet_reader_advance,
736   };
737 \f
738 static void
739 allocate_column (struct datasheet *ds, int width, struct column *column)
740 {
741   caseproto_unref (ds->proto);
742   ds->proto = NULL;
743
744   column->value_ofs = -1;
745   column->width = width;
746   if (width >= 0)
747     {
748       int n_bytes;
749       size_t i;
750
751       n_bytes = width_to_n_bytes (width);
752       for (i = 0; i < ds->n_sources; i++)
753         {
754           column->source = ds->sources[i];
755           column->byte_ofs = source_allocate_column (column->source, n_bytes);
756           if (column->byte_ofs >= 0)
757             return;
758         }
759
760       column->source = source_create_empty (MAX (n_bytes,
761                                                  ds->column_min_alloc));
762       ds->sources = xnrealloc (ds->sources,
763                                ds->n_sources + 1, sizeof *ds->sources);
764       ds->sources[ds->n_sources++] = column->source;
765
766       ds->column_min_alloc = MIN (65536, ds->column_min_alloc * 2);
767
768       column->byte_ofs = source_allocate_column (column->source, n_bytes);
769       assert (column->byte_ofs >= 0);
770     }
771   else
772     {
773       column->source = NULL;
774       column->byte_ofs = -1;
775     }
776 }
777
778 static void
779 release_source (struct datasheet *ds, struct source *source)
780 {
781   if (source_has_backing (source) && !source_in_use (source))
782     {
783       /* Since only the first source to be added ever
784          has a backing, this source must have index
785          0.  */
786       assert (source == ds->sources[0]);
787       ds->sources[0] = ds->sources[--ds->n_sources];
788       source_destroy (source);
789     }
790 }
791
792 /* Reads (if OP is OP_READ) or writes (if op is OP_WRITE) the
793    N_COLUMNS columns starting from column START_COLUMN in row
794    LROW to/from the N_COLUMNS values in DATA. */
795 static bool
796 rw_case (struct datasheet *ds, enum rw_op op,
797          casenumber lrow, size_t start_column, size_t n_columns,
798          union value data[])
799 {
800   casenumber prow;
801   size_t i;
802
803   assert (lrow < datasheet_get_n_rows (ds));
804   assert (n_columns <= datasheet_get_n_columns (ds));
805   assert (start_column + n_columns <= datasheet_get_n_columns (ds));
806
807   prow = axis_map (ds->rows, lrow);
808   for (i = 0; i < n_columns; i++)
809     {
810       struct column *c = &ds->columns[start_column + i];
811       if (c->width >= 0)
812         {
813           bool ok;
814
815           if (op == OP_READ)
816             ok = source_read (c, prow, &data[i]);
817           else
818             ok = source_write (c, prow, &data[i]);
819
820           if (!ok)
821             {
822               taint_set_taint (ds->taint);
823               return false;
824             }
825         }
826     }
827   return true;
828 }
829 \f
830 /* An axis.
831
832    An axis has two functions.  First, it maintains a mapping from
833    logical (client-visible) to physical (storage) ordinates.  The
834    axis_map and axis_get_size functions inspect this mapping, and
835    the axis_insert, axis_remove, and axis_move functions modify
836    it.  Second, it tracks the set of ordinates that are unused
837    and available for reuse.  The axis_allocate,
838    axis_make_available, and axis_extend functions affect the set
839    of available ordinates. */
840 struct axis
841 {
842   struct tower log_to_phy;     /* Map from logical to physical ordinates;
843                                   contains "struct axis_group"s. */
844   struct range_set *available; /* Set of unused, available ordinates. */
845   unsigned long int phy_size;  /* Current physical length of axis. */
846 };
847
848 /* A mapping from logical to physical ordinates. */
849 struct axis_group
850 {
851   struct tower_node logical;  /* Range of logical ordinates. */
852   unsigned long phy_start;    /* First corresponding physical ordinate. */
853 };
854
855 static struct axis_group *axis_group_from_tower_node (struct tower_node *);
856 static struct tower_node *make_axis_group (unsigned long int phy_start);
857 static struct tower_node *split_axis (struct axis *, unsigned long int where);
858 static void merge_axis_nodes (struct axis *, struct tower_node *,
859                               struct tower_node **other_node);
860 static void check_axis_merged (const struct axis *axis UNUSED);
861
862 /* Creates and returns a new, initially empty axis. */
863 static struct axis *
864 axis_create (void)
865 {
866   struct axis *axis = xmalloc (sizeof *axis);
867   tower_init (&axis->log_to_phy);
868   axis->available = range_set_create ();
869   axis->phy_size = 0;
870   return axis;
871 }
872
873 /* Returns a clone of existing axis OLD.
874
875    Currently this is used only by the datasheet model checker
876    driver, but it could be otherwise useful. */
877 static struct axis *
878 axis_clone (const struct axis *old)
879 {
880   const struct tower_node *node;
881   struct axis *new;
882
883   new = xmalloc (sizeof *new);
884   tower_init (&new->log_to_phy);
885   new->available = range_set_clone (old->available, NULL);
886   new->phy_size = old->phy_size;
887
888   for (node = tower_first (&old->log_to_phy); node != NULL;
889        node = tower_next (&old->log_to_phy, node))
890     {
891       unsigned long int size = tower_node_get_size (node);
892       struct axis_group *group = tower_data (node, struct axis_group, logical);
893       tower_insert (&new->log_to_phy, size, make_axis_group (group->phy_start),
894                     NULL);
895     }
896
897   return new;
898 }
899
900 /* Adds the state of AXIS to the MD4 hash context CTX.
901
902    This is only used by the datasheet model checker driver.  It
903    is unlikely to be otherwise useful. */
904 static void
905 axis_hash (const struct axis *axis, struct md4_ctx *ctx)
906 {
907   const struct tower_node *tn;
908   const struct range_set_node *rsn;
909
910   for (tn = tower_first (&axis->log_to_phy); tn != NULL;
911        tn = tower_next (&axis->log_to_phy, tn))
912     {
913       struct axis_group *group = tower_data (tn, struct axis_group, logical);
914       unsigned long int phy_start = group->phy_start;
915       unsigned long int size = tower_node_get_size (tn);
916
917       md4_process_bytes (&phy_start, sizeof phy_start, ctx);
918       md4_process_bytes (&size, sizeof size, ctx);
919     }
920
921   for (rsn = range_set_first (axis->available); rsn != NULL;
922        rsn = range_set_next (axis->available, rsn))
923     {
924       unsigned long int start = range_set_node_get_start (rsn);
925       unsigned long int end = range_set_node_get_end (rsn);
926
927       md4_process_bytes (&start, sizeof start, ctx);
928       md4_process_bytes (&end, sizeof end, ctx);
929     }
930
931   md4_process_bytes (&axis->phy_size, sizeof axis->phy_size, ctx);
932 }
933
934 /* Destroys AXIS. */
935 static void
936 axis_destroy (struct axis *axis)
937 {
938   if (axis == NULL)
939     return;
940
941   while (!tower_is_empty (&axis->log_to_phy))
942     {
943       struct tower_node *node = tower_first (&axis->log_to_phy);
944       struct axis_group *group = tower_data (node, struct axis_group,
945                                              logical);
946       tower_delete (&axis->log_to_phy, node);
947       free (group);
948     }
949
950   range_set_destroy (axis->available);
951   free (axis);
952 }
953
954 /* Allocates up to REQUEST contiguous unused and available
955    ordinates from AXIS.  If successful, stores the number
956    obtained into *WIDTH and the ordinate of the first into
957    *START, marks the ordinates as now unavailable return true.
958    On failure, which occurs only if AXIS has no unused and
959    available ordinates, returns false without modifying AXIS. */
960 static bool
961 axis_allocate (struct axis *axis, unsigned long int request,
962                unsigned long int *start, unsigned long int *width)
963 {
964   return range_set_allocate (axis->available, request, start, width);
965 }
966
967 /* Marks the WIDTH contiguous ordinates in AXIS, starting from
968    START, as unused and available. */
969 static void
970 axis_make_available (struct axis *axis,
971                      unsigned long int start, unsigned long int width)
972 {
973   range_set_insert (axis->available, start, width);
974 }
975
976 /* Extends the total physical length of AXIS by WIDTH and returns
977    the first ordinate in the new physical region. */
978 static unsigned long int
979 axis_extend (struct axis *axis, unsigned long int width)
980 {
981   unsigned long int start = axis->phy_size;
982   axis->phy_size += width;
983   return start;
984 }
985
986 /* Returns the physical ordinate in AXIS corresponding to logical
987    ordinate LOG_POS.  LOG_POS must be less than the logical
988    length of AXIS. */
989 static unsigned long int
990 axis_map (const struct axis *axis, unsigned long log_pos)
991 {
992   struct tower_node *node;
993   struct axis_group *group;
994   unsigned long int group_start;
995
996   node = tower_lookup (&axis->log_to_phy, log_pos, &group_start);
997   group = tower_data (node, struct axis_group, logical);
998   return group->phy_start + (log_pos - group_start);
999 }
1000
1001 /* Returns the logical length of AXIS. */
1002 static unsigned long
1003 axis_get_size (const struct axis *axis)
1004 {
1005   return tower_height (&axis->log_to_phy);
1006 }
1007
1008 /* Inserts the CNT contiguous physical ordinates starting at
1009    PHY_START into AXIS's logical-to-physical mapping, starting at
1010    logical position LOG_START. */
1011 static void
1012 axis_insert (struct axis *axis,
1013              unsigned long int log_start, unsigned long int phy_start,
1014              unsigned long int cnt)
1015 {
1016   struct tower_node *before = split_axis (axis, log_start);
1017   struct tower_node *new = make_axis_group (phy_start);
1018   tower_insert (&axis->log_to_phy, cnt, new, before);
1019   merge_axis_nodes (axis, new, NULL);
1020   check_axis_merged (axis);
1021 }
1022
1023 /* Removes CNT ordinates from AXIS's logical-to-physical mapping
1024    starting at logical position START. */
1025 static void
1026 axis_remove (struct axis *axis,
1027              unsigned long int start, unsigned long int cnt)
1028 {
1029   if (cnt > 0)
1030     {
1031       struct tower_node *last = split_axis (axis, start + cnt);
1032       struct tower_node *cur, *next;
1033       for (cur = split_axis (axis, start); cur != last; cur = next)
1034         {
1035           next = tower_delete (&axis->log_to_phy, cur);
1036           free (axis_group_from_tower_node (cur));
1037         }
1038       merge_axis_nodes (axis, last, NULL);
1039       check_axis_merged (axis);
1040     }
1041 }
1042
1043 /* Moves the CNT ordinates in AXIS's logical-to-mapping starting
1044    at logical position OLD_START so that they then start at
1045    position NEW_START. */
1046 static void
1047 axis_move (struct axis *axis,
1048            unsigned long int old_start, unsigned long int new_start,
1049            unsigned long int cnt)
1050 {
1051   if (cnt > 0 && old_start != new_start)
1052     {
1053       struct tower_node *old_first, *old_last, *new_first;
1054       struct tower_node *merge1, *merge2;
1055       struct tower tmp_array;
1056
1057       /* Move ordinates OLD_START...(OLD_START + CNT) into new,
1058          separate TMP_ARRAY. */
1059       old_first = split_axis (axis, old_start);
1060       old_last = split_axis (axis, old_start + cnt);
1061       tower_init (&tmp_array);
1062       tower_splice (&tmp_array, NULL,
1063                     &axis->log_to_phy, old_first, old_last);
1064       merge_axis_nodes (axis, old_last, NULL);
1065       check_axis_merged (axis);
1066
1067       /* Move TMP_ARRAY to position NEW_START. */
1068       new_first = split_axis (axis, new_start);
1069       merge1 = tower_first (&tmp_array);
1070       merge2 = tower_last (&tmp_array);
1071       if (merge2 == merge1)
1072         merge2 = NULL;
1073       tower_splice (&axis->log_to_phy, new_first, &tmp_array, old_first, NULL);
1074       merge_axis_nodes (axis, merge1, &merge2);
1075       merge_axis_nodes (axis, merge2, NULL);
1076       check_axis_merged (axis);
1077     }
1078 }
1079
1080 /* Returns the axis_group in which NODE is embedded. */
1081 static struct axis_group *
1082 axis_group_from_tower_node (struct tower_node *node)
1083 {
1084   return tower_data (node, struct axis_group, logical);
1085 }
1086
1087 /* Creates and returns a new axis_group at physical position
1088    PHY_START. */
1089 static struct tower_node *
1090 make_axis_group (unsigned long phy_start)
1091 {
1092   struct axis_group *group = xmalloc (sizeof *group);
1093   group->phy_start = phy_start;
1094   return &group->logical;
1095 }
1096
1097 /* Returns the tower_node in AXIS's logical-to-physical map whose
1098    bottom edge is at exact level WHERE.  If there is no such
1099    tower_node in AXIS's logical-to-physical map, then split_axis
1100    creates one by breaking an existing tower_node into two
1101    separate ones, unless WHERE is equal to the tower height, in
1102    which case it simply returns a null pointer. */
1103 static struct tower_node *
1104 split_axis (struct axis *axis, unsigned long int where)
1105 {
1106   unsigned long int group_start;
1107   struct tower_node *group_node;
1108   struct axis_group *group;
1109
1110   assert (where <= tower_height (&axis->log_to_phy));
1111   if (where >= tower_height (&axis->log_to_phy))
1112     return NULL;
1113
1114   group_node = tower_lookup (&axis->log_to_phy, where, &group_start);
1115   group = axis_group_from_tower_node (group_node);
1116   if (where > group_start)
1117     {
1118       unsigned long int size_1 = where - group_start;
1119       unsigned long int size_2 = tower_node_get_size (group_node) - size_1;
1120       struct tower_node *next = tower_next (&axis->log_to_phy, group_node);
1121       struct tower_node *new = make_axis_group (group->phy_start + size_1);
1122       tower_resize (&axis->log_to_phy, group_node, size_1);
1123       tower_insert (&axis->log_to_phy, size_2, new, next);
1124       return new;
1125     }
1126   else
1127     return &group->logical;
1128 }
1129
1130 /* Within AXIS, attempts to merge NODE (or the last node in AXIS,
1131    if NODE is null) with its neighbor nodes.  This is possible
1132    when logically adjacent nodes are also adjacent physically (in
1133    the same order).
1134
1135    When a merge occurs, and OTHER_NODE is non-null and points to
1136    the node to be deleted, this function also updates
1137    *OTHER_NODE, if necessary, to ensure that it remains a valid
1138    pointer. */
1139 static void
1140 merge_axis_nodes (struct axis *axis, struct tower_node *node,
1141                   struct tower_node **other_node)
1142 {
1143   struct tower *t = &axis->log_to_phy;
1144   struct axis_group *group;
1145   struct tower_node *next, *prev;
1146
1147   /* Find node to potentially merge with neighbors. */
1148   if (node == NULL)
1149     node = tower_last (t);
1150   if (node == NULL)
1151     return;
1152   group = axis_group_from_tower_node (node);
1153
1154   /* Try to merge NODE with successor. */
1155   next = tower_next (t, node);
1156   if (next != NULL)
1157     {
1158       struct axis_group *next_group = axis_group_from_tower_node (next);
1159       unsigned long this_height = tower_node_get_size (node);
1160
1161       if (group->phy_start + this_height == next_group->phy_start)
1162         {
1163           unsigned long next_height = tower_node_get_size (next);
1164           tower_resize (t, node, this_height + next_height);
1165           if (other_node != NULL && *other_node == next)
1166             *other_node = tower_next (t, *other_node);
1167           tower_delete (t, next);
1168           free (next_group);
1169         }
1170     }
1171
1172   /* Try to merge NODE with predecessor. */
1173   prev = tower_prev (t, node);
1174   if (prev != NULL)
1175     {
1176       struct axis_group *prev_group = axis_group_from_tower_node (prev);
1177       unsigned long prev_height = tower_node_get_size (prev);
1178
1179       if (prev_group->phy_start + prev_height == group->phy_start)
1180         {
1181           unsigned long this_height = tower_node_get_size (node);
1182           group->phy_start = prev_group->phy_start;
1183           tower_resize (t, node, this_height + prev_height);
1184           if (other_node != NULL && *other_node == prev)
1185             *other_node = tower_next (t, *other_node);
1186           tower_delete (t, prev);
1187           free (prev_group);
1188         }
1189     }
1190 }
1191
1192 /* Verify that all potentially merge-able nodes in AXIS are
1193    actually merged. */
1194 static void
1195 check_axis_merged (const struct axis *axis UNUSED)
1196 {
1197 #if ASSERT_LEVEL >= 10
1198   struct tower_node *prev, *node;
1199
1200   for (prev = NULL, node = tower_first (&axis->log_to_phy); node != NULL;
1201        prev = node, node = tower_next (&axis->log_to_phy, node))
1202     if (prev != NULL)
1203       {
1204         struct axis_group *prev_group = axis_group_from_tower_node (prev);
1205         unsigned long prev_height = tower_node_get_size (prev);
1206         struct axis_group *node_group = axis_group_from_tower_node (node);
1207         assert (prev_group->phy_start + prev_height != node_group->phy_start);
1208       }
1209 #endif
1210 }
1211 \f
1212 /* A source. */
1213
1214 /* Creates and returns an empty, unbacked source with N_BYTES
1215    bytes per case, none of which are initially in use. */
1216 static struct source *
1217 source_create_empty (size_t n_bytes)
1218 {
1219   struct source *source = xmalloc (sizeof *source);
1220   size_t row_size = n_bytes + 4 * sizeof (void *);
1221   size_t max_memory_rows = settings_get_workspace () / row_size;
1222   source->avail = range_set_create ();
1223   range_set_insert (source->avail, 0, n_bytes);
1224   source->data = sparse_xarray_create (n_bytes, MAX (max_memory_rows, 4));
1225   source->backing = NULL;
1226   source->backing_rows = 0;
1227   source->n_used = 0;
1228   return source;
1229 }
1230
1231 /* Creates and returns a new source backed by READER and with the
1232    same initial dimensions and content. */
1233 static struct source *
1234 source_create_casereader (struct casereader *reader)
1235 {
1236   const struct caseproto *proto = casereader_get_proto (reader);
1237   size_t n_bytes = caseproto_to_n_bytes (proto);
1238   struct source *source = source_create_empty (n_bytes);
1239   size_t n_columns;
1240   size_t i;
1241
1242   range_set_delete (source->avail, 0, n_bytes);
1243   source->backing = reader;
1244   source->backing_rows = casereader_count_cases (reader);
1245
1246   source->n_used = 0;
1247   n_columns = caseproto_get_n_widths (proto);
1248   for (i = 0; i < n_columns; i++)
1249     if (caseproto_get_width (proto, i) >= 0)
1250       source->n_used++;
1251
1252   return source;
1253 }
1254
1255 /* Returns a clone of source OLD with the same data and backing
1256    (if any).
1257
1258    Currently this is used only by the datasheet model checker
1259    driver, but it could be otherwise useful. */
1260 static struct source *
1261 source_clone (const struct source *old)
1262 {
1263   struct source *new = xmalloc (sizeof *new);
1264   new->avail = range_set_clone (old->avail, NULL);
1265   new->data = sparse_xarray_clone (old->data);
1266   new->backing = old->backing != NULL ? casereader_clone (old->backing) : NULL;
1267   new->backing_rows = old->backing_rows;
1268   new->n_used = old->n_used;
1269   if (new->data == NULL)
1270     {
1271       source_destroy (new);
1272       new = NULL;
1273     }
1274   return new;
1275 }
1276
1277 static int
1278 source_allocate_column (struct source *source, int width)
1279 {
1280   unsigned long int start;
1281   int n_bytes;
1282
1283   assert (width >= 0);
1284   n_bytes = width_to_n_bytes (width);
1285   if (source->backing == NULL
1286       && range_set_allocate_fully (source->avail, n_bytes, &start))
1287     return start;
1288   else
1289     return -1;
1290 }
1291
1292 static void
1293 source_release_column (struct source *source, int ofs, int width)
1294 {
1295   assert (width >= 0);
1296   range_set_insert (source->avail, ofs, width_to_n_bytes (width));
1297   if (source->backing != NULL)
1298     source->n_used--;
1299 }
1300
1301 /* Returns true if SOURCE has any columns in use,
1302    false otherwise. */
1303 static bool
1304 source_in_use (const struct source *source)
1305 {
1306   return source->n_used > 0;
1307 }
1308
1309 /* Destroys SOURCE and its data and backing, if any. */
1310 static void
1311 source_destroy (struct source *source)
1312 {
1313   if (source != NULL)
1314     {
1315       range_set_destroy (source->avail);
1316       sparse_xarray_destroy (source->data);
1317       casereader_destroy (source->backing);
1318       free (source);
1319     }
1320 }
1321
1322 /* Returns the number of rows in SOURCE's backing casereader
1323    (SOURCE must have a backing casereader). */
1324 static casenumber
1325 source_get_backing_n_rows (const struct source *source)
1326 {
1327   assert (source_has_backing (source));
1328   return source->backing_rows;
1329 }
1330
1331 /* Reads the given COLUMN from SOURCE in the given ROW, into
1332    VALUE.  Returns true if successful, false on I/O error.
1333
1334    The caller must have initialized VALUE with the proper
1335    width. */
1336 static bool
1337 source_read (const struct column *column, casenumber row, union value *value)
1338 {
1339   struct source *source = column->source;
1340
1341   assert (column->width >= 0);
1342   if (source->backing == NULL
1343       || sparse_xarray_contains_row (source->data, row))
1344     return sparse_xarray_read (source->data, row, column->byte_ofs,
1345                                width_to_n_bytes (column->width),
1346                                value_to_data (value, column->width));
1347   else
1348     {
1349       struct ccase *c = casereader_peek (source->backing, row);
1350       bool ok = c != NULL;
1351       if (ok)
1352         {
1353           value_copy (value, case_data_idx (c, column->value_ofs),
1354                       column->width);
1355           case_unref (c);
1356         }
1357       return ok;
1358     }
1359 }
1360
1361 static bool
1362 copy_case_into_source (struct source *source, struct ccase *c, casenumber row)
1363 {
1364   const struct caseproto *proto = casereader_get_proto (source->backing);
1365   size_t n_widths = caseproto_get_n_widths (proto);
1366   size_t ofs;
1367   size_t i;
1368
1369   ofs = 0;
1370   for (i = 0; i < n_widths; i++)
1371     {
1372       int width = caseproto_get_width (proto, i);
1373       if (width >= 0)
1374         {
1375           int n_bytes = width_to_n_bytes (width);
1376           if (!sparse_xarray_write (source->data, row, ofs, n_bytes,
1377                                     value_to_data (case_data_idx (c, i),
1378                                                    width)))
1379             return false;
1380           ofs += n_bytes;
1381         }
1382     }
1383   return true;
1384 }
1385
1386 /* Writes VALUE to SOURCE in the given ROW and COLUMN.  Returns
1387    true if successful, false on I/O error.  On error, the row's
1388    data may be completely or partially corrupted, both inside and
1389    outside the region to be written.  */
1390 static bool
1391 source_write (const struct column *column, casenumber row,
1392               const union value *value)
1393 {
1394   struct source *source = column->source;
1395   struct casereader *backing = source->backing;
1396
1397   assert (column->width >= 0);
1398   if (backing != NULL
1399       && !sparse_xarray_contains_row (source->data, row)
1400       && row < source->backing_rows)
1401     {
1402       struct ccase *c;
1403       bool ok;
1404
1405       c = casereader_peek (backing, row);
1406       if (c == NULL)
1407         return false;
1408
1409       ok = copy_case_into_source (source, c, row);
1410       case_unref (c);
1411       if (!ok)
1412         return false;
1413     }
1414
1415   return sparse_xarray_write (source->data, row, column->byte_ofs,
1416                               width_to_n_bytes (column->width),
1417                               value_to_data (value, column->width));
1418 }
1419
1420 /* Within SOURCE, which must not have a backing casereader,
1421    writes the VALUE_CNT values in VALUES_CNT to the VALUE_CNT
1422    columns starting from START_COLUMN, in every row, even in rows
1423    not yet otherwise initialized.  Returns true if successful,
1424    false if an I/O error occurs.
1425
1426    We don't support backing != NULL because (1) it's harder and
1427    (2) this function is only called by
1428    datasheet_insert_column, which doesn't reuse columns from
1429    sources that are backed by casereaders. */
1430 static bool
1431 source_write_column (struct column *column, const union value *value)
1432 {
1433   int width = column->width;
1434
1435   assert (column->source->backing == NULL);
1436   assert (width >= 0);
1437
1438   return sparse_xarray_write_columns (column->source->data, column->byte_ofs,
1439                                       width_to_n_bytes (width),
1440                                       value_to_data (value, width));
1441 }
1442
1443 /* Returns true if SOURCE has a backing casereader, false
1444    otherwise. */
1445 static bool
1446 source_has_backing (const struct source *source)
1447 {
1448   return source->backing != NULL;
1449 }
1450 \f
1451 /* Datasheet model checker test driver. */
1452
1453 static int
1454 get_source_index (const struct datasheet *ds, const struct source *source)
1455 {
1456   size_t i;
1457
1458   for (i = 0; i < ds->n_sources; i++)
1459     if (ds->sources[i] == source)
1460       return i;
1461   NOT_REACHED ();
1462 }
1463
1464 /* Clones the structure and contents of ODS into a new datasheet,
1465    and returns the new datasheet. */
1466 struct datasheet *
1467 clone_datasheet (const struct datasheet *ods)
1468 {
1469   struct datasheet *ds;
1470   size_t i;
1471
1472   ds = xmalloc (sizeof *ds);
1473
1474   ds->sources = xmalloc (ods->n_sources * sizeof *ds->sources);
1475   for (i = 0; i < ods->n_sources; i++)
1476     ds->sources[i] = source_clone (ods->sources[i]);
1477   ds->n_sources = ods->n_sources;
1478
1479   ds->proto = ods->proto != NULL ? caseproto_ref (ods->proto) : NULL;
1480   ds->columns = xmemdup (ods->columns, ods->n_columns * sizeof *ods->columns);
1481   for (i = 0; i < ods->n_columns; i++)
1482     ds->columns[i].source
1483       = ds->sources[get_source_index (ods, ods->columns[i].source)];
1484   ds->n_columns = ods->n_columns;
1485   ds->column_min_alloc = ods->column_min_alloc;
1486
1487   ds->rows = axis_clone (ods->rows);
1488
1489   ds->taint = taint_create ();
1490
1491   return ds;
1492 }
1493
1494 /* Hashes the structure of datasheet DS and returns the hash.
1495    We use MD4 because it is much faster than MD5 or SHA-1 but its
1496    collision resistance is just as good. */
1497 unsigned int
1498 hash_datasheet (const struct datasheet *ds)
1499 {
1500   unsigned int hash[DIV_RND_UP (20, sizeof (unsigned int))];
1501   struct md4_ctx ctx;
1502   size_t i;
1503
1504   md4_init_ctx (&ctx);
1505   for (i = 0; i < ds->n_columns; i++)
1506     {
1507       const struct column *column = &ds->columns[i];
1508       int source_n_bytes = sparse_xarray_get_n_columns (column->source->data);
1509       md4_process_bytes (&source_n_bytes, sizeof source_n_bytes, &ctx);
1510       /*md4_process_bytes (&column->byte_ofs, sizeof column->byte_ofs, &ctx);*/
1511       md4_process_bytes (&column->value_ofs, sizeof column->value_ofs, &ctx);
1512       md4_process_bytes (&column->width, sizeof column->width, &ctx);
1513     }
1514   axis_hash (ds->rows, &ctx);
1515   md4_process_bytes (&ds->column_min_alloc, sizeof ds->column_min_alloc, &ctx);
1516   md4_finish_ctx (&ctx, hash);
1517   return hash[0];
1518 }