Merge commit 'origin/covariance'
[pspp-builds.git] / src / data / datasheet.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2007, 2009 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <data/datasheet.h>
20
21 #include <stdlib.h>
22 #include <string.h>
23
24 #include <data/casereader-provider.h>
25 #include <data/casereader.h>
26 #include <data/casewriter.h>
27 #include <data/lazy-casereader.h>
28 #include <data/settings.h>
29 #include <libpspp/array.h>
30 #include <libpspp/assertion.h>
31 #include <libpspp/misc.h>
32 #include <libpspp/range-map.h>
33 #include <libpspp/range-set.h>
34 #include <libpspp/sparse-xarray.h>
35 #include <libpspp/taint.h>
36 #include <libpspp/tower.h>
37
38 #include "minmax.h"
39 #include "md4.h"
40 #include "xalloc.h"
41
42 struct column;
43
44 static struct axis *axis_create (void);
45 static struct axis *axis_clone (const struct axis *);
46 static void axis_destroy (struct axis *);
47
48 static void axis_hash (const struct axis *, struct md4_ctx *);
49
50 static bool axis_allocate (struct axis *, unsigned long int request,
51                            unsigned long int *start,
52                            unsigned long int *width);
53 static void axis_make_available (struct axis *,
54                                  unsigned long int start,
55                                  unsigned long int width);
56 static unsigned long int axis_extend (struct axis *, unsigned long int width);
57
58 static unsigned long int axis_map (const struct axis *, unsigned long log_pos);
59
60 static unsigned long axis_get_size (const struct axis *);
61 static void axis_insert (struct axis *,
62                          unsigned long int log_start,
63                          unsigned long int phy_start,
64                          unsigned long int cnt);
65 static void axis_remove (struct axis *,
66                          unsigned long int start, unsigned long int cnt);
67 static void axis_move (struct axis *,
68                        unsigned long int old_start,
69                        unsigned long int new_start,
70                        unsigned long int cnt);
71
72 static struct source *source_create_empty (size_t n_bytes);
73 static struct source *source_create_casereader (struct casereader *);
74 static struct source *source_clone (const struct source *);
75 static void source_destroy (struct source *);
76
77 static casenumber source_get_backing_n_rows (const struct source *);
78
79 static int source_allocate_column (struct source *, int width);
80 static void source_release_column (struct source *, int ofs, int width);
81 static bool source_in_use (const struct source *);
82
83 static bool source_read (const struct column *, casenumber row, union value *);
84 static bool source_write (const struct column *, casenumber row,
85                           const union value *);
86 static bool source_write_column (struct column *, const union value *);
87 static bool source_has_backing (const struct source *);
88
89 static int get_source_index (const struct datasheet *ds, const struct source *source);
90
91 /* A datasheet is internally composed from a set of data files,
92    called "sources".  The sources that make up a datasheet must
93    have the same number of rows (cases), but their numbers of
94    columns (variables) may vary.
95
96    A datasheet's external view is produced by mapping (permuting
97    and selecting) its internal data.  Thus, we can rearrange or
98    delete rows or columns simply by modifying the mapping.  We
99    add rows by adding rows to each source and to the row mapping.
100    We add columns by adding a new source, then adding that source
101    to the column mapping.
102
103    Each source in a datasheet can be a casereader or a
104    sparse_xarray.  Casereaders are read-only, so when sources
105    made from casereaders need to be modified, it is done
106    "virtually" through being overlaid by a sparse_xarray. */
107 struct source
108   {
109     struct range_set *avail;    /* Free bytes are set to 1s. */
110     struct sparse_xarray *data; /* Data at top level, atop the backing. */
111     struct casereader *backing; /* Backing casereader (or null). */
112     casenumber backing_rows;    /* Number of rows in backing (if backed). */
113     size_t n_used;              /* Number of column in use (if backed). */
114   };
115
116 /* A logical column. */
117 struct column
118   {
119     struct source *source;      /* Source of the underlying physical column. */
120     int value_ofs;              /* If 'source' has a backing casereader,
121                                    column's value offset in its cases. */
122     int byte_ofs;               /* Byte offset in source's sparse_xarray. */
123     int width;                  /* 0=numeric, otherwise string width. */
124   };
125
126 /* A datasheet. */
127 struct datasheet
128   {
129     /* Data sources. */
130     struct source **sources;    /* Sources, in no particular order. */
131     size_t n_sources;           /* Number of sources. */
132
133     /* Columns. */
134     struct caseproto *proto;    /* Prototype for rows (initialized lazily). */
135     struct column *columns;     /* Logical to physical column mapping. */
136     size_t n_columns;           /* Number of logical columns. */
137     unsigned column_min_alloc;  /* Min. # of columns to put in a new source. */
138
139     /* Rows. */
140     struct axis *rows;          /* Logical to physical row mapping. */
141
142     /* Tainting. */
143     struct taint *taint;        /* Indicates corrupted data. */
144   };
145
146 /* Is this operation a read or a write? */
147 enum rw_op
148   {
149     OP_READ,
150     OP_WRITE
151   };
152
153 static void allocate_column (struct datasheet *, int width, struct column *);
154 static void release_source (struct datasheet *, struct source *);
155 static bool rw_case (struct datasheet *ds, enum rw_op op,
156                      casenumber lrow, size_t start_column, size_t n_columns,
157                      union value data[]);
158
159 /* Returns the number of bytes needed to store a value with the
160    given WIDTH on disk. */
161 static size_t
162 width_to_n_bytes (int width)
163 {
164   return width == 0 ? sizeof (double) : width;
165 }
166
167 /* Returns the address of the data in VALUE (for reading or
168    writing to/from disk).  VALUE must have the given WIDTH. */
169 static void *
170 value_to_data (const union value *value_, int width)
171 {
172   union value *value = (union value *) value_;
173   assert (sizeof value->f == sizeof (double));
174   if (width == 0)
175     return &value->f;
176   else
177     return value_str_rw (value, width);
178 }
179
180 /* Returns the number of bytes needed to store all the values in
181    PROTO on disk. */
182 static size_t
183 caseproto_to_n_bytes (const struct caseproto *proto)
184 {
185   size_t n_bytes;
186   size_t i;
187
188   n_bytes = 0;
189   for (i = 0; i < caseproto_get_n_widths (proto); i++)
190     {
191       int width = caseproto_get_width (proto, i);
192       if (width >= 0)
193         n_bytes += width_to_n_bytes (width);
194     }
195   return n_bytes;
196 }
197
198 /* Creates and returns a new datasheet.
199
200    If READER is nonnull, then the datasheet initially contains
201    the contents of READER. */
202 struct datasheet *
203 datasheet_create (struct casereader *reader)
204 {
205   struct datasheet *ds = xmalloc (sizeof *ds);
206   ds->sources = NULL;
207   ds->n_sources = 0;
208   ds->proto = NULL;
209   ds->columns = NULL;
210   ds->n_columns = 0;
211   ds->column_min_alloc = 8;
212   ds->rows = axis_create ();
213   ds->taint = taint_create ();
214
215   if (reader != NULL)
216     {
217       casenumber n_rows;
218       size_t byte_ofs;
219       size_t i;
220
221       taint_propagate (casereader_get_taint (reader), ds->taint);
222
223       ds->proto = caseproto_ref (casereader_get_proto (reader));
224
225       ds->sources = xmalloc (sizeof *ds->sources);
226       ds->sources[0] = source_create_casereader (reader);
227       ds->n_sources = 1;
228
229       ds->n_columns = caseproto_get_n_widths (ds->proto);
230       ds->columns = xnmalloc (ds->n_columns, sizeof *ds->columns);
231       byte_ofs = 0;
232       for (i = 0; i < ds->n_columns; i++)
233         {
234           struct column *column = &ds->columns[i];
235           int width = caseproto_get_width (ds->proto, i);
236           column->source = ds->sources[0];
237           column->width = width;
238           if (width >= 0)
239             {
240               column->value_ofs = i;
241               column->byte_ofs = byte_ofs;
242               byte_ofs += width_to_n_bytes (column->width);
243             }
244         }
245
246       n_rows = source_get_backing_n_rows (ds->sources[0]);
247       if (n_rows > 0)
248         axis_insert (ds->rows, 0, axis_extend (ds->rows, n_rows), n_rows);
249     }
250
251   return ds;
252 }
253
254 /* Destroys datasheet DS. */
255 void
256 datasheet_destroy (struct datasheet *ds)
257 {
258   size_t i;
259
260   if (ds == NULL)
261     return;
262
263   for (i = 0; i < ds->n_sources; i++)
264     source_destroy (ds->sources[i]);
265   free (ds->sources);
266   caseproto_unref (ds->proto);
267   free (ds->columns);
268   axis_destroy (ds->rows);
269   taint_destroy (ds->taint);
270   free (ds);
271 }
272
273 /* Returns the prototype for the cases in DS.  The caller must
274    not unref the returned prototype. */
275 const struct caseproto *
276 datasheet_get_proto (const struct datasheet *ds_)
277 {
278   struct datasheet *ds = CONST_CAST (struct datasheet *, ds_);
279   if (ds->proto == NULL)
280     {
281       size_t i;
282
283       ds->proto = caseproto_create ();
284       for (i = 0; i < ds->n_columns; i++)
285         ds->proto = caseproto_add_width (ds->proto, ds->columns[i].width);
286     }
287   return ds->proto;
288 }
289
290 /* Returns the width of the given COLUMN within DS.
291    COLUMN must be less than the number of columns in DS. */
292 int
293 datasheet_get_column_width (const struct datasheet *ds, size_t column)
294 {
295   assert (column < datasheet_get_n_columns (ds));
296   return ds->columns[column].width;
297 }
298
299 /* Moves datasheet DS to a new location in memory, and returns
300    the new location.  Afterward, the datasheet must not be
301    accessed at its former location.
302
303    This function is useful for ensuring that all references to a
304    datasheet have been dropped, especially in conjunction with
305    tools like Valgrind. */
306 struct datasheet *
307 datasheet_rename (struct datasheet *ds)
308 {
309   struct datasheet *new = xmemdup (ds, sizeof *ds);
310   free (ds);
311   return new;
312 }
313
314 /* Returns true if datasheet DS is tainted.
315    A datasheet is tainted by an I/O error or by taint
316    propagation to the datasheet. */
317 bool
318 datasheet_error (const struct datasheet *ds)
319 {
320   return taint_is_tainted (ds->taint);
321 }
322
323 /* Marks datasheet DS tainted. */
324 void
325 datasheet_force_error (struct datasheet *ds)
326 {
327   taint_set_taint (ds->taint);
328 }
329
330 /* Returns datasheet DS's taint object. */
331 const struct taint *
332 datasheet_get_taint (const struct datasheet *ds)
333 {
334   return ds->taint;
335 }
336
337 /* Returns the number of rows in DS. */
338 casenumber
339 datasheet_get_n_rows (const struct datasheet *ds)
340 {
341   return axis_get_size (ds->rows);
342 }
343
344 /* Returns the number of columns in DS. */
345 size_t
346 datasheet_get_n_columns (const struct datasheet *ds)
347 {
348   return ds->n_columns;
349 }
350
351 /* Inserts a column of the given WIDTH into datasheet DS just
352    before column BEFORE.  Initializes the contents of each row in
353    the inserted column to VALUE (which must have width WIDTH).
354
355    Returns true if successful, false on failure.  In case of
356    failure, the datasheet is unchanged. */
357 bool
358 datasheet_insert_column (struct datasheet *ds,
359                          const union value *value, int width, size_t before)
360 {
361   struct column *col;
362
363   ds->columns = xnrealloc (ds->columns,
364                            ds->n_columns + 1, sizeof *ds->columns);
365   insert_element (ds->columns, ds->n_columns, sizeof *ds->columns, before);
366   col = &ds->columns[before];
367   ds->n_columns++;
368
369   allocate_column (ds, width, col);
370
371   if (width >= 0 && !source_write_column (col, value))
372     {
373       datasheet_delete_columns (ds, before, 1);
374       taint_set_taint (ds->taint);
375       return false;
376     }
377
378   return true;
379 }
380
381 /* Deletes the N columns in DS starting from column START. */
382 void
383 datasheet_delete_columns (struct datasheet *ds, size_t start, size_t n)
384 {
385   if (n > 0)
386     {
387       size_t i;
388
389       for (i = start; i < start + n; i++)
390         {
391           struct column *column = &ds->columns[i];
392           struct source *source = column->source;
393           source_release_column (source, column->byte_ofs, column->width);
394           release_source (ds, source);
395         }
396
397       remove_range (ds->columns, ds->n_columns, sizeof *ds->columns, start, n);
398       ds->n_columns -= n;
399
400       caseproto_unref (ds->proto);
401       ds->proto = NULL;
402     }
403 }
404
405 /* Moves the N columns in DS starting at position OLD_START so
406    that they then start at position NEW_START.  Equivalent to
407    deleting the column rows, then inserting them at what becomes
408    position NEW_START after the deletion. */
409 void
410 datasheet_move_columns (struct datasheet *ds,
411                         size_t old_start, size_t new_start,
412                         size_t n)
413 {
414   move_range (ds->columns, ds->n_columns, sizeof *ds->columns,
415               old_start, new_start, n);
416
417   caseproto_unref (ds->proto);
418   ds->proto = NULL;
419 }
420
421 struct resize_datasheet_value_aux
422   {
423     union value src_value;
424     size_t src_ofs;
425     int src_width;
426
427     void (*resize_cb) (const union value *, union value *, void *aux);
428     void *resize_cb_aux;
429
430     union value dst_value;
431     size_t dst_ofs;
432     int dst_width;
433   };
434
435 static bool
436 resize_datasheet_value (const void *src, void *dst, void *aux_)
437 {
438   struct resize_datasheet_value_aux *aux = aux_;
439
440   memcpy (value_to_data (&aux->src_value, aux->src_width),
441           (uint8_t *) src + aux->src_ofs,
442           width_to_n_bytes (aux->src_width));
443
444   aux->resize_cb (&aux->src_value, &aux->dst_value, aux->resize_cb_aux);
445
446   memcpy ((uint8_t *) dst + aux->dst_ofs,
447           value_to_data (&aux->dst_value, aux->dst_width),
448           width_to_n_bytes (aux->dst_width));
449
450   return true;
451 }
452
453 bool
454 datasheet_resize_column (struct datasheet *ds, size_t column, int new_width,
455                          void (*resize_cb) (const union value *,
456                                             union value *, void *aux),
457                          void *resize_cb_aux)
458 {
459   struct column old_col;
460   struct column *col;
461   int old_width;
462
463   assert (column < datasheet_get_n_columns (ds));
464
465   col = &ds->columns[column];
466   old_col = *col;
467   old_width = old_col.width;
468
469   if (new_width == -1)
470     {
471       if (old_width != -1)
472         {
473           datasheet_delete_columns (ds, column, 1);
474           datasheet_insert_column (ds, NULL, -1, column);
475         }
476     }
477   else if (old_width == -1)
478     {
479       union value value;
480       value_init (&value, new_width);
481       value_set_missing (&value, new_width);
482       if (resize_cb != NULL)
483         resize_cb (NULL, &value, resize_cb_aux);
484       datasheet_delete_columns (ds, column, 1);
485       datasheet_insert_column (ds, &value, new_width, column);
486       value_destroy (&value, new_width);
487     }
488   else if (source_has_backing (col->source))
489     {
490       unsigned long int n_rows = axis_get_size (ds->rows);
491       unsigned long int lrow;
492       union value src, dst;
493
494       source_release_column (col->source, col->byte_ofs, col->width);
495       allocate_column (ds, new_width, col);
496
497       value_init (&src, old_width);
498       value_init (&dst, new_width);
499       for (lrow = 0; lrow < n_rows; lrow++)
500         {
501           unsigned long int prow = axis_map (ds->rows, lrow);
502           if (!source_read (&old_col, prow, &src))
503             {
504               /* FIXME: back out col changes. */
505               return false;
506             }
507           resize_cb (&src, &dst, resize_cb_aux);
508           if (!source_write (col, prow, &dst))
509             {
510               /* FIXME: back out col changes. */
511               return false;
512             }
513         }
514
515       release_source (ds, old_col.source);
516     }
517   else
518     {
519       struct resize_datasheet_value_aux aux;
520
521       source_release_column (col->source, col->byte_ofs, col->width);
522       allocate_column (ds, new_width, col);
523
524       value_init (&aux.src_value, old_col.width);
525       aux.src_ofs = old_col.byte_ofs;
526       aux.src_width = old_col.width;
527       aux.resize_cb = resize_cb;
528       aux.resize_cb_aux = resize_cb_aux;
529       value_init (&aux.dst_value, new_width);
530       aux.dst_ofs = col->byte_ofs;
531       aux.dst_width = new_width;
532       sparse_xarray_copy (old_col.source->data, col->source->data,
533                           resize_datasheet_value, &aux);
534       value_destroy (&aux.src_value, old_width);
535       value_destroy (&aux.dst_value, new_width);
536
537       release_source (ds, old_col.source);
538     }
539   return true;
540 }
541
542 /* Retrieves and returns the contents of the given ROW in
543    datasheet DS.  The caller owns the returned case and must
544    unref it when it is no longer needed.  Returns a null pointer
545    on I/O error. */
546 struct ccase *
547 datasheet_get_row (const struct datasheet *ds, casenumber row)
548 {
549   size_t n_columns = datasheet_get_n_columns (ds);
550   struct ccase *c = case_create (datasheet_get_proto (ds));
551   if (rw_case (CONST_CAST (struct datasheet *, ds), OP_READ,
552                row, 0, n_columns, case_data_all_rw (c)))
553     return c;
554   else
555     {
556       case_unref (c);
557       return NULL;
558     }
559 }
560
561 /* Stores the contents of case C, which is destroyed, into the
562    given ROW in DS.  Returns true on success, false on I/O error.
563    On failure, the given ROW might be partially modified or
564    corrupted. */
565 bool
566 datasheet_put_row (struct datasheet *ds, casenumber row, struct ccase *c)
567 {
568   size_t n_columns = datasheet_get_n_columns (ds);
569   bool ok = rw_case (ds, OP_WRITE, row, 0, n_columns,
570                      (union value *) case_data_all (c));
571   case_unref (c);
572   return ok;
573 }
574
575 /* Stores the values of COLUMN in DS in the given ROW in DS into
576    VALUE.  The caller must have already initialized VALUE as a
577    value of the appropriate width (as returned by
578    datasheet_get_column_width (DS, COLUMN)).  Returns true if
579    successful, false on I/O error. */
580 bool
581 datasheet_get_value (const struct datasheet *ds, casenumber row,
582                      size_t column, union value *value)
583 {
584   assert (row >= 0);
585   return rw_case (CONST_CAST (struct datasheet *, ds), OP_READ,
586                   row, column, 1, value);
587 }
588
589 /* Stores VALUE into DS in the given ROW and COLUMN.  VALUE must
590    have the correct width for COLUMN (as returned by
591    datasheet_get_column_width (DS, COLUMN)).  Returns true if
592    successful, false on I/O error.  On failure, ROW might be
593    partially modified or corrupted. */
594 bool
595 datasheet_put_value (struct datasheet *ds UNUSED, casenumber row UNUSED,
596                      size_t column UNUSED, const union value *value UNUSED)
597 {
598   return rw_case (ds, OP_WRITE, row, column, 1, (union value *) value);
599 }
600
601 /* Inserts the CNT cases at C into datasheet DS just before row
602    BEFORE.  Returns true if successful, false on I/O error.  On
603    failure, datasheet DS is not modified.
604
605    Regardless of success, this function unrefs all of the cases
606    in C. */
607 bool
608 datasheet_insert_rows (struct datasheet *ds,
609                        casenumber before, struct ccase *c[],
610                        casenumber cnt)
611 {
612   casenumber added = 0;
613   while (cnt > 0)
614     {
615       unsigned long first_phy;
616       unsigned long phy_cnt;
617       unsigned long i;
618
619       /* Allocate physical rows from the pool of available
620          rows. */
621       if (!axis_allocate (ds->rows, cnt, &first_phy, &phy_cnt))
622         {
623           /* No rows were available.  Extend the row axis to make
624              some new ones available. */
625           phy_cnt = cnt;
626           first_phy = axis_extend (ds->rows, cnt);
627         }
628
629       /* Insert the new rows into the row mapping. */
630       axis_insert (ds->rows, before, first_phy, phy_cnt);
631
632       /* Initialize the new rows. */
633       for (i = 0; i < phy_cnt; i++)
634         if (!datasheet_put_row (ds, before + i, c[i]))
635           {
636             while (++i < cnt)
637               case_unref (c[i]);
638             datasheet_delete_rows (ds, before - added, phy_cnt + added);
639             return false;
640           }
641
642       /* Advance. */
643       c += phy_cnt;
644       cnt -= phy_cnt;
645       before += phy_cnt;
646       added += phy_cnt;
647     }
648   return true;
649 }
650
651 /* Deletes the CNT rows in DS starting from row FIRST. */
652 void
653 datasheet_delete_rows (struct datasheet *ds,
654                        casenumber first, casenumber cnt)
655 {
656   size_t lrow;
657
658   /* Free up rows for reuse.
659      FIXME: optimize. */
660   for (lrow = first; lrow < first + cnt; lrow++)
661     axis_make_available (ds->rows, axis_map (ds->rows, lrow), 1);
662
663   /* Remove rows from logical-to-physical mapping. */
664   axis_remove (ds->rows, first, cnt);
665 }
666
667 /* Moves the CNT rows in DS starting at position OLD_START so
668    that they then start at position NEW_START.  Equivalent to
669    deleting the given rows, then inserting them at what becomes
670    position NEW_START after the deletion. */
671 void
672 datasheet_move_rows (struct datasheet *ds,
673                      size_t old_start, size_t new_start,
674                      size_t cnt)
675 {
676   axis_move (ds->rows, old_start, new_start, cnt);
677 }
678 \f
679 static const struct casereader_random_class datasheet_reader_class;
680
681 /* Creates and returns a casereader whose input cases are the
682    rows in datasheet DS.  From the caller's perspective, DS is
683    effectively destroyed by this operation, such that the caller
684    must not reference it again. */
685 struct casereader *
686 datasheet_make_reader (struct datasheet *ds)
687 {
688   struct casereader *reader;
689   ds = datasheet_rename (ds);
690   reader = casereader_create_random (datasheet_get_proto (ds),
691                                      datasheet_get_n_rows (ds),
692                                      &datasheet_reader_class, ds);
693   taint_propagate (datasheet_get_taint (ds), casereader_get_taint (reader));
694   return reader;
695 }
696
697 /* "read" function for the datasheet random casereader. */
698 static struct ccase *
699 datasheet_reader_read (struct casereader *reader UNUSED, void *ds_,
700                        casenumber case_idx)
701 {
702   struct datasheet *ds = ds_;
703   if (case_idx < datasheet_get_n_rows (ds))
704     {
705       struct ccase *c = datasheet_get_row (ds, case_idx);
706       if (c == NULL)
707         taint_set_taint (ds->taint);
708       return c;
709     }
710   else
711     return NULL;
712 }
713
714 /* "destroy" function for the datasheet random casereader. */
715 static void
716 datasheet_reader_destroy (struct casereader *reader UNUSED, void *ds_)
717 {
718   struct datasheet *ds = ds_;
719   datasheet_destroy (ds);
720 }
721
722 /* "advance" function for the datasheet random casereader. */
723 static void
724 datasheet_reader_advance (struct casereader *reader UNUSED, void *ds_,
725                           casenumber case_cnt)
726 {
727   struct datasheet *ds = ds_;
728   datasheet_delete_rows (ds, 0, case_cnt);
729 }
730
731 /* Random casereader class for a datasheet. */
732 static const struct casereader_random_class datasheet_reader_class =
733   {
734     datasheet_reader_read,
735     datasheet_reader_destroy,
736     datasheet_reader_advance,
737   };
738 \f
739 static void
740 allocate_column (struct datasheet *ds, int width, struct column *column)
741 {
742   caseproto_unref (ds->proto);
743   ds->proto = NULL;
744
745   column->value_ofs = -1;
746   column->width = width;
747   if (width >= 0)
748     {
749       int n_bytes;
750       size_t i;
751
752       n_bytes = width_to_n_bytes (width);
753       for (i = 0; i < ds->n_sources; i++)
754         {
755           column->source = ds->sources[i];
756           column->byte_ofs = source_allocate_column (column->source, n_bytes);
757           if (column->byte_ofs >= 0)
758             return;
759         }
760
761       column->source = source_create_empty (MAX (n_bytes,
762                                                  ds->column_min_alloc));
763       ds->sources = xnrealloc (ds->sources,
764                                ds->n_sources + 1, sizeof *ds->sources);
765       ds->sources[ds->n_sources++] = column->source;
766
767       ds->column_min_alloc = MIN (65536, ds->column_min_alloc * 2);
768
769       column->byte_ofs = source_allocate_column (column->source, n_bytes);
770       assert (column->byte_ofs >= 0);
771     }
772   else
773     {
774       column->source = NULL;
775       column->byte_ofs = -1;
776     }
777 }
778
779 static void
780 release_source (struct datasheet *ds, struct source *source)
781 {
782   if (source_has_backing (source) && !source_in_use (source))
783     {
784       /* Since only the first source to be added ever
785          has a backing, this source must have index
786          0.  */
787       assert (source == ds->sources[0]);
788       ds->sources[0] = ds->sources[--ds->n_sources];
789       source_destroy (source);
790     }
791 }
792
793 /* Reads (if OP is OP_READ) or writes (if op is OP_WRITE) the
794    N_COLUMNS columns starting from column START_COLUMN in row
795    LROW to/from the N_COLUMNS values in DATA. */
796 static bool
797 rw_case (struct datasheet *ds, enum rw_op op,
798          casenumber lrow, size_t start_column, size_t n_columns,
799          union value data[])
800 {
801   casenumber prow;
802   size_t i;
803
804   assert (lrow < datasheet_get_n_rows (ds));
805   assert (n_columns <= datasheet_get_n_columns (ds));
806   assert (start_column + n_columns <= datasheet_get_n_columns (ds));
807
808   prow = axis_map (ds->rows, lrow);
809   for (i = 0; i < n_columns; i++)
810     {
811       struct column *c = &ds->columns[start_column + i];
812       if (c->width >= 0)
813         {
814           bool ok;
815
816           if (op == OP_READ)
817             ok = source_read (c, prow, &data[i]);
818           else
819             ok = source_write (c, prow, &data[i]);
820
821           if (!ok)
822             {
823               taint_set_taint (ds->taint);
824               return false;
825             }
826         }
827     }
828   return true;
829 }
830 \f
831 /* An axis.
832
833    An axis has two functions.  First, it maintains a mapping from
834    logical (client-visible) to physical (storage) ordinates.  The
835    axis_map and axis_get_size functions inspect this mapping, and
836    the axis_insert, axis_remove, and axis_move functions modify
837    it.  Second, it tracks the set of ordinates that are unused
838    and available for reuse.  The axis_allocate,
839    axis_make_available, and axis_extend functions affect the set
840    of available ordinates. */
841 struct axis
842 {
843   struct tower log_to_phy;     /* Map from logical to physical ordinates;
844                                   contains "struct axis_group"s. */
845   struct range_set *available; /* Set of unused, available ordinates. */
846   unsigned long int phy_size;  /* Current physical length of axis. */
847 };
848
849 /* A mapping from logical to physical ordinates. */
850 struct axis_group
851 {
852   struct tower_node logical;  /* Range of logical ordinates. */
853   unsigned long phy_start;    /* First corresponding physical ordinate. */
854 };
855
856 static struct axis_group *axis_group_from_tower_node (struct tower_node *);
857 static struct tower_node *make_axis_group (unsigned long int phy_start);
858 static struct tower_node *split_axis (struct axis *, unsigned long int where);
859 static void merge_axis_nodes (struct axis *, struct tower_node *,
860                               struct tower_node **other_node);
861 static void check_axis_merged (const struct axis *axis UNUSED);
862
863 /* Creates and returns a new, initially empty axis. */
864 static struct axis *
865 axis_create (void)
866 {
867   struct axis *axis = xmalloc (sizeof *axis);
868   tower_init (&axis->log_to_phy);
869   axis->available = range_set_create ();
870   axis->phy_size = 0;
871   return axis;
872 }
873
874 /* Returns a clone of existing axis OLD.
875
876    Currently this is used only by the datasheet model checker
877    driver, but it could be otherwise useful. */
878 static struct axis *
879 axis_clone (const struct axis *old)
880 {
881   const struct tower_node *node;
882   struct axis *new;
883
884   new = xmalloc (sizeof *new);
885   tower_init (&new->log_to_phy);
886   new->available = range_set_clone (old->available, NULL);
887   new->phy_size = old->phy_size;
888
889   for (node = tower_first (&old->log_to_phy); node != NULL;
890        node = tower_next (&old->log_to_phy, node))
891     {
892       unsigned long int size = tower_node_get_size (node);
893       struct axis_group *group = tower_data (node, struct axis_group, logical);
894       tower_insert (&new->log_to_phy, size, make_axis_group (group->phy_start),
895                     NULL);
896     }
897
898   return new;
899 }
900
901 /* Adds the state of AXIS to the MD4 hash context CTX.
902
903    This is only used by the datasheet model checker driver.  It
904    is unlikely to be otherwise useful. */
905 static void
906 axis_hash (const struct axis *axis, struct md4_ctx *ctx)
907 {
908   const struct tower_node *tn;
909   const struct range_set_node *rsn;
910
911   for (tn = tower_first (&axis->log_to_phy); tn != NULL;
912        tn = tower_next (&axis->log_to_phy, tn))
913     {
914       struct axis_group *group = tower_data (tn, struct axis_group, logical);
915       unsigned long int phy_start = group->phy_start;
916       unsigned long int size = tower_node_get_size (tn);
917
918       md4_process_bytes (&phy_start, sizeof phy_start, ctx);
919       md4_process_bytes (&size, sizeof size, ctx);
920     }
921
922   for (rsn = range_set_first (axis->available); rsn != NULL;
923        rsn = range_set_next (axis->available, rsn))
924     {
925       unsigned long int start = range_set_node_get_start (rsn);
926       unsigned long int end = range_set_node_get_end (rsn);
927
928       md4_process_bytes (&start, sizeof start, ctx);
929       md4_process_bytes (&end, sizeof end, ctx);
930     }
931
932   md4_process_bytes (&axis->phy_size, sizeof axis->phy_size, ctx);
933 }
934
935 /* Destroys AXIS. */
936 static void
937 axis_destroy (struct axis *axis)
938 {
939   if (axis == NULL)
940     return;
941
942   while (!tower_is_empty (&axis->log_to_phy))
943     {
944       struct tower_node *node = tower_first (&axis->log_to_phy);
945       struct axis_group *group = tower_data (node, struct axis_group,
946                                              logical);
947       tower_delete (&axis->log_to_phy, node);
948       free (group);
949     }
950
951   range_set_destroy (axis->available);
952   free (axis);
953 }
954
955 /* Allocates up to REQUEST contiguous unused and available
956    ordinates from AXIS.  If successful, stores the number
957    obtained into *WIDTH and the ordinate of the first into
958    *START, marks the ordinates as now unavailable return true.
959    On failure, which occurs only if AXIS has no unused and
960    available ordinates, returns false without modifying AXIS. */
961 static bool
962 axis_allocate (struct axis *axis, unsigned long int request,
963                unsigned long int *start, unsigned long int *width)
964 {
965   return range_set_allocate (axis->available, request, start, width);
966 }
967
968 /* Marks the WIDTH contiguous ordinates in AXIS, starting from
969    START, as unused and available. */
970 static void
971 axis_make_available (struct axis *axis,
972                      unsigned long int start, unsigned long int width)
973 {
974   range_set_insert (axis->available, start, width);
975 }
976
977 /* Extends the total physical length of AXIS by WIDTH and returns
978    the first ordinate in the new physical region. */
979 static unsigned long int
980 axis_extend (struct axis *axis, unsigned long int width)
981 {
982   unsigned long int start = axis->phy_size;
983   axis->phy_size += width;
984   return start;
985 }
986
987 /* Returns the physical ordinate in AXIS corresponding to logical
988    ordinate LOG_POS.  LOG_POS must be less than the logical
989    length of AXIS. */
990 static unsigned long int
991 axis_map (const struct axis *axis, unsigned long log_pos)
992 {
993   struct tower_node *node;
994   struct axis_group *group;
995   unsigned long int group_start;
996
997   node = tower_lookup (&axis->log_to_phy, log_pos, &group_start);
998   group = tower_data (node, struct axis_group, logical);
999   return group->phy_start + (log_pos - group_start);
1000 }
1001
1002 /* Returns the logical length of AXIS. */
1003 static unsigned long
1004 axis_get_size (const struct axis *axis)
1005 {
1006   return tower_height (&axis->log_to_phy);
1007 }
1008
1009 /* Inserts the CNT contiguous physical ordinates starting at
1010    PHY_START into AXIS's logical-to-physical mapping, starting at
1011    logical position LOG_START. */
1012 static void
1013 axis_insert (struct axis *axis,
1014              unsigned long int log_start, unsigned long int phy_start,
1015              unsigned long int cnt)
1016 {
1017   struct tower_node *before = split_axis (axis, log_start);
1018   struct tower_node *new = make_axis_group (phy_start);
1019   tower_insert (&axis->log_to_phy, cnt, new, before);
1020   merge_axis_nodes (axis, new, NULL);
1021   check_axis_merged (axis);
1022 }
1023
1024 /* Removes CNT ordinates from AXIS's logical-to-physical mapping
1025    starting at logical position START. */
1026 static void
1027 axis_remove (struct axis *axis,
1028              unsigned long int start, unsigned long int cnt)
1029 {
1030   if (cnt > 0)
1031     {
1032       struct tower_node *last = split_axis (axis, start + cnt);
1033       struct tower_node *cur, *next;
1034       for (cur = split_axis (axis, start); cur != last; cur = next)
1035         {
1036           next = tower_delete (&axis->log_to_phy, cur);
1037           free (axis_group_from_tower_node (cur));
1038         }
1039       merge_axis_nodes (axis, last, NULL);
1040       check_axis_merged (axis);
1041     }
1042 }
1043
1044 /* Moves the CNT ordinates in AXIS's logical-to-mapping starting
1045    at logical position OLD_START so that they then start at
1046    position NEW_START. */
1047 static void
1048 axis_move (struct axis *axis,
1049            unsigned long int old_start, unsigned long int new_start,
1050            unsigned long int cnt)
1051 {
1052   if (cnt > 0 && old_start != new_start)
1053     {
1054       struct tower_node *old_first, *old_last, *new_first;
1055       struct tower_node *merge1, *merge2;
1056       struct tower tmp_array;
1057
1058       /* Move ordinates OLD_START...(OLD_START + CNT) into new,
1059          separate TMP_ARRAY. */
1060       old_first = split_axis (axis, old_start);
1061       old_last = split_axis (axis, old_start + cnt);
1062       tower_init (&tmp_array);
1063       tower_splice (&tmp_array, NULL,
1064                     &axis->log_to_phy, old_first, old_last);
1065       merge_axis_nodes (axis, old_last, NULL);
1066       check_axis_merged (axis);
1067
1068       /* Move TMP_ARRAY to position NEW_START. */
1069       new_first = split_axis (axis, new_start);
1070       merge1 = tower_first (&tmp_array);
1071       merge2 = tower_last (&tmp_array);
1072       if (merge2 == merge1)
1073         merge2 = NULL;
1074       tower_splice (&axis->log_to_phy, new_first, &tmp_array, old_first, NULL);
1075       merge_axis_nodes (axis, merge1, &merge2);
1076       merge_axis_nodes (axis, merge2, NULL);
1077       check_axis_merged (axis);
1078     }
1079 }
1080
1081 /* Returns the axis_group in which NODE is embedded. */
1082 static struct axis_group *
1083 axis_group_from_tower_node (struct tower_node *node)
1084 {
1085   return tower_data (node, struct axis_group, logical);
1086 }
1087
1088 /* Creates and returns a new axis_group at physical position
1089    PHY_START. */
1090 static struct tower_node *
1091 make_axis_group (unsigned long phy_start)
1092 {
1093   struct axis_group *group = xmalloc (sizeof *group);
1094   group->phy_start = phy_start;
1095   return &group->logical;
1096 }
1097
1098 /* Returns the tower_node in AXIS's logical-to-physical map whose
1099    bottom edge is at exact level WHERE.  If there is no such
1100    tower_node in AXIS's logical-to-physical map, then split_axis
1101    creates one by breaking an existing tower_node into two
1102    separate ones, unless WHERE is equal to the tower height, in
1103    which case it simply returns a null pointer. */
1104 static struct tower_node *
1105 split_axis (struct axis *axis, unsigned long int where)
1106 {
1107   unsigned long int group_start;
1108   struct tower_node *group_node;
1109   struct axis_group *group;
1110
1111   assert (where <= tower_height (&axis->log_to_phy));
1112   if (where >= tower_height (&axis->log_to_phy))
1113     return NULL;
1114
1115   group_node = tower_lookup (&axis->log_to_phy, where, &group_start);
1116   group = axis_group_from_tower_node (group_node);
1117   if (where > group_start)
1118     {
1119       unsigned long int size_1 = where - group_start;
1120       unsigned long int size_2 = tower_node_get_size (group_node) - size_1;
1121       struct tower_node *next = tower_next (&axis->log_to_phy, group_node);
1122       struct tower_node *new = make_axis_group (group->phy_start + size_1);
1123       tower_resize (&axis->log_to_phy, group_node, size_1);
1124       tower_insert (&axis->log_to_phy, size_2, new, next);
1125       return new;
1126     }
1127   else
1128     return &group->logical;
1129 }
1130
1131 /* Within AXIS, attempts to merge NODE (or the last node in AXIS,
1132    if NODE is null) with its neighbor nodes.  This is possible
1133    when logically adjacent nodes are also adjacent physically (in
1134    the same order).
1135
1136    When a merge occurs, and OTHER_NODE is non-null and points to
1137    the node to be deleted, this function also updates
1138    *OTHER_NODE, if necessary, to ensure that it remains a valid
1139    pointer. */
1140 static void
1141 merge_axis_nodes (struct axis *axis, struct tower_node *node,
1142                   struct tower_node **other_node)
1143 {
1144   struct tower *t = &axis->log_to_phy;
1145   struct axis_group *group;
1146   struct tower_node *next, *prev;
1147
1148   /* Find node to potentially merge with neighbors. */
1149   if (node == NULL)
1150     node = tower_last (t);
1151   if (node == NULL)
1152     return;
1153   group = axis_group_from_tower_node (node);
1154
1155   /* Try to merge NODE with successor. */
1156   next = tower_next (t, node);
1157   if (next != NULL)
1158     {
1159       struct axis_group *next_group = axis_group_from_tower_node (next);
1160       unsigned long this_height = tower_node_get_size (node);
1161
1162       if (group->phy_start + this_height == next_group->phy_start)
1163         {
1164           unsigned long next_height = tower_node_get_size (next);
1165           tower_resize (t, node, this_height + next_height);
1166           if (other_node != NULL && *other_node == next)
1167             *other_node = tower_next (t, *other_node);
1168           tower_delete (t, next);
1169           free (next_group);
1170         }
1171     }
1172
1173   /* Try to merge NODE with predecessor. */
1174   prev = tower_prev (t, node);
1175   if (prev != NULL)
1176     {
1177       struct axis_group *prev_group = axis_group_from_tower_node (prev);
1178       unsigned long prev_height = tower_node_get_size (prev);
1179
1180       if (prev_group->phy_start + prev_height == group->phy_start)
1181         {
1182           unsigned long this_height = tower_node_get_size (node);
1183           group->phy_start = prev_group->phy_start;
1184           tower_resize (t, node, this_height + prev_height);
1185           if (other_node != NULL && *other_node == prev)
1186             *other_node = tower_next (t, *other_node);
1187           tower_delete (t, prev);
1188           free (prev_group);
1189         }
1190     }
1191 }
1192
1193 /* Verify that all potentially merge-able nodes in AXIS are
1194    actually merged. */
1195 static void
1196 check_axis_merged (const struct axis *axis UNUSED)
1197 {
1198 #if ASSERT_LEVEL >= 10
1199   struct tower_node *prev, *node;
1200
1201   for (prev = NULL, node = tower_first (&axis->log_to_phy); node != NULL;
1202        prev = node, node = tower_next (&axis->log_to_phy, node))
1203     if (prev != NULL)
1204       {
1205         struct axis_group *prev_group = axis_group_from_tower_node (prev);
1206         unsigned long prev_height = tower_node_get_size (prev);
1207         struct axis_group *node_group = axis_group_from_tower_node (node);
1208         assert (prev_group->phy_start + prev_height != node_group->phy_start);
1209       }
1210 #endif
1211 }
1212 \f
1213 /* A source. */
1214
1215 /* Creates and returns an empty, unbacked source with N_BYTES
1216    bytes per case, none of which are initially in use. */
1217 static struct source *
1218 source_create_empty (size_t n_bytes)
1219 {
1220   struct source *source = xmalloc (sizeof *source);
1221   size_t row_size = n_bytes + 4 * sizeof (void *);
1222   size_t max_memory_rows = settings_get_workspace () / row_size;
1223   source->avail = range_set_create ();
1224   range_set_insert (source->avail, 0, n_bytes);
1225   source->data = sparse_xarray_create (n_bytes, MAX (max_memory_rows, 4));
1226   source->backing = NULL;
1227   source->backing_rows = 0;
1228   source->n_used = 0;
1229   return source;
1230 }
1231
1232 /* Creates and returns a new source backed by READER and with the
1233    same initial dimensions and content. */
1234 static struct source *
1235 source_create_casereader (struct casereader *reader)
1236 {
1237   const struct caseproto *proto = casereader_get_proto (reader);
1238   size_t n_bytes = caseproto_to_n_bytes (proto);
1239   struct source *source = source_create_empty (n_bytes);
1240   size_t n_columns;
1241   size_t i;
1242
1243   range_set_delete (source->avail, 0, n_bytes);
1244   source->backing = reader;
1245   source->backing_rows = casereader_count_cases (reader);
1246
1247   source->n_used = 0;
1248   n_columns = caseproto_get_n_widths (proto);
1249   for (i = 0; i < n_columns; i++)
1250     if (caseproto_get_width (proto, i) >= 0)
1251       source->n_used++;
1252
1253   return source;
1254 }
1255
1256 /* Returns a clone of source OLD with the same data and backing
1257    (if any).
1258
1259    Currently this is used only by the datasheet model checker
1260    driver, but it could be otherwise useful. */
1261 static struct source *
1262 source_clone (const struct source *old)
1263 {
1264   struct source *new = xmalloc (sizeof *new);
1265   new->avail = range_set_clone (old->avail, NULL);
1266   new->data = sparse_xarray_clone (old->data);
1267   new->backing = old->backing != NULL ? casereader_clone (old->backing) : NULL;
1268   new->backing_rows = old->backing_rows;
1269   new->n_used = old->n_used;
1270   if (new->data == NULL)
1271     {
1272       source_destroy (new);
1273       new = NULL;
1274     }
1275   return new;
1276 }
1277
1278 static int
1279 source_allocate_column (struct source *source, int width)
1280 {
1281   unsigned long int start;
1282   int n_bytes;
1283
1284   assert (width >= 0);
1285   n_bytes = width_to_n_bytes (width);
1286   if (source->backing == NULL
1287       && range_set_allocate_fully (source->avail, n_bytes, &start))
1288     return start;
1289   else
1290     return -1;
1291 }
1292
1293 static void
1294 source_release_column (struct source *source, int ofs, int width)
1295 {
1296   assert (width >= 0);
1297   range_set_insert (source->avail, ofs, width_to_n_bytes (width));
1298   if (source->backing != NULL)
1299     source->n_used--;
1300 }
1301
1302 /* Returns true if SOURCE has any columns in use,
1303    false otherwise. */
1304 static bool
1305 source_in_use (const struct source *source)
1306 {
1307   return source->n_used > 0;
1308 }
1309
1310 /* Destroys SOURCE and its data and backing, if any. */
1311 static void
1312 source_destroy (struct source *source)
1313 {
1314   if (source != NULL)
1315     {
1316       range_set_destroy (source->avail);
1317       sparse_xarray_destroy (source->data);
1318       casereader_destroy (source->backing);
1319       free (source);
1320     }
1321 }
1322
1323 /* Returns the number of rows in SOURCE's backing casereader
1324    (SOURCE must have a backing casereader). */
1325 static casenumber
1326 source_get_backing_n_rows (const struct source *source)
1327 {
1328   assert (source_has_backing (source));
1329   return source->backing_rows;
1330 }
1331
1332 /* Reads the given COLUMN from SOURCE in the given ROW, into
1333    VALUE.  Returns true if successful, false on I/O error.
1334
1335    The caller must have initialized VALUE with the proper
1336    width. */
1337 static bool
1338 source_read (const struct column *column, casenumber row, union value *value)
1339 {
1340   struct source *source = column->source;
1341
1342   assert (column->width >= 0);
1343   if (source->backing == NULL
1344       || sparse_xarray_contains_row (source->data, row))
1345     return sparse_xarray_read (source->data, row, column->byte_ofs,
1346                                width_to_n_bytes (column->width),
1347                                value_to_data (value, column->width));
1348   else
1349     {
1350       struct ccase *c = casereader_peek (source->backing, row);
1351       bool ok = c != NULL;
1352       if (ok)
1353         {
1354           value_copy (value, case_data_idx (c, column->value_ofs),
1355                       column->width);
1356           case_unref (c);
1357         }
1358       return ok;
1359     }
1360 }
1361
1362 static bool
1363 copy_case_into_source (struct source *source, struct ccase *c, casenumber row)
1364 {
1365   const struct caseproto *proto = casereader_get_proto (source->backing);
1366   size_t n_widths = caseproto_get_n_widths (proto);
1367   size_t ofs;
1368   size_t i;
1369
1370   ofs = 0;
1371   for (i = 0; i < n_widths; i++)
1372     {
1373       int width = caseproto_get_width (proto, i);
1374       if (width >= 0)
1375         {
1376           int n_bytes = width_to_n_bytes (width);
1377           if (!sparse_xarray_write (source->data, row, ofs, n_bytes,
1378                                     value_to_data (case_data_idx (c, i),
1379                                                    width)))
1380             return false;
1381           ofs += n_bytes;
1382         }
1383     }
1384   return true;
1385 }
1386
1387 /* Writes VALUE to SOURCE in the given ROW and COLUMN.  Returns
1388    true if successful, false on I/O error.  On error, the row's
1389    data may be completely or partially corrupted, both inside and
1390    outside the region to be written.  */
1391 static bool
1392 source_write (const struct column *column, casenumber row,
1393               const union value *value)
1394 {
1395   struct source *source = column->source;
1396   struct casereader *backing = source->backing;
1397
1398   assert (column->width >= 0);
1399   if (backing != NULL
1400       && !sparse_xarray_contains_row (source->data, row)
1401       && row < source->backing_rows)
1402     {
1403       struct ccase *c;
1404       bool ok;
1405
1406       c = casereader_peek (backing, row);
1407       if (c == NULL)
1408         return false;
1409
1410       ok = copy_case_into_source (source, c, row);
1411       case_unref (c);
1412       if (!ok)
1413         return false;
1414     }
1415
1416   return sparse_xarray_write (source->data, row, column->byte_ofs,
1417                               width_to_n_bytes (column->width),
1418                               value_to_data (value, column->width));
1419 }
1420
1421 /* Within SOURCE, which must not have a backing casereader,
1422    writes the VALUE_CNT values in VALUES_CNT to the VALUE_CNT
1423    columns starting from START_COLUMN, in every row, even in rows
1424    not yet otherwise initialized.  Returns true if successful,
1425    false if an I/O error occurs.
1426
1427    We don't support backing != NULL because (1) it's harder and
1428    (2) this function is only called by
1429    datasheet_insert_column, which doesn't reuse columns from
1430    sources that are backed by casereaders. */
1431 static bool
1432 source_write_column (struct column *column, const union value *value)
1433 {
1434   int width = column->width;
1435
1436   assert (column->source->backing == NULL);
1437   assert (width >= 0);
1438
1439   return sparse_xarray_write_columns (column->source->data, column->byte_ofs,
1440                                       width_to_n_bytes (width),
1441                                       value_to_data (value, width));
1442 }
1443
1444 /* Returns true if SOURCE has a backing casereader, false
1445    otherwise. */
1446 static bool
1447 source_has_backing (const struct source *source)
1448 {
1449   return source->backing != NULL;
1450 }
1451 \f
1452 /* Datasheet model checker test driver. */
1453
1454 static int
1455 get_source_index (const struct datasheet *ds, const struct source *source)
1456 {
1457   size_t i;
1458
1459   for (i = 0; i < ds->n_sources; i++)
1460     if (ds->sources[i] == source)
1461       return i;
1462   NOT_REACHED ();
1463 }
1464
1465 /* Clones the structure and contents of ODS into a new datasheet,
1466    and returns the new datasheet. */
1467 struct datasheet *
1468 clone_datasheet (const struct datasheet *ods)
1469 {
1470   struct datasheet *ds;
1471   size_t i;
1472
1473   ds = xmalloc (sizeof *ds);
1474
1475   ds->sources = xmalloc (ods->n_sources * sizeof *ds->sources);
1476   for (i = 0; i < ods->n_sources; i++)
1477     ds->sources[i] = source_clone (ods->sources[i]);
1478   ds->n_sources = ods->n_sources;
1479
1480   ds->proto = ods->proto != NULL ? caseproto_ref (ods->proto) : NULL;
1481   ds->columns = xmemdup (ods->columns, ods->n_columns * sizeof *ods->columns);
1482   for (i = 0; i < ods->n_columns; i++)
1483     ds->columns[i].source
1484       = ds->sources[get_source_index (ods, ods->columns[i].source)];
1485   ds->n_columns = ods->n_columns;
1486   ds->column_min_alloc = ods->column_min_alloc;
1487
1488   ds->rows = axis_clone (ods->rows);
1489
1490   ds->taint = taint_create ();
1491
1492   return ds;
1493 }
1494
1495 /* Hashes the structure of datasheet DS and returns the hash.
1496    We use MD4 because it is much faster than MD5 or SHA-1 but its
1497    collision resistance is just as good. */
1498 unsigned int
1499 hash_datasheet (const struct datasheet *ds)
1500 {
1501   unsigned int hash[DIV_RND_UP (20, sizeof (unsigned int))];
1502   struct md4_ctx ctx;
1503   size_t i;
1504
1505   md4_init_ctx (&ctx);
1506   for (i = 0; i < ds->n_columns; i++)
1507     {
1508       const struct column *column = &ds->columns[i];
1509       int source_n_bytes = sparse_xarray_get_n_columns (column->source->data);
1510       md4_process_bytes (&source_n_bytes, sizeof source_n_bytes, &ctx);
1511       /*md4_process_bytes (&column->byte_ofs, sizeof column->byte_ofs, &ctx);*/
1512       md4_process_bytes (&column->value_ofs, sizeof column->value_ofs, &ctx);
1513       md4_process_bytes (&column->width, sizeof column->width, &ctx);
1514     }
1515   axis_hash (ds->rows, &ctx);
1516   md4_process_bytes (&ds->column_min_alloc, sizeof ds->column_min_alloc, &ctx);
1517   md4_finish_ctx (&ctx, hash);
1518   return hash[0];
1519 }