Change "union value" to dynamically allocate long strings.
[pspp-builds.git] / src / data / datasheet.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2007, 2009 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <data/datasheet.h>
20
21 #include <stdlib.h>
22 #include <string.h>
23
24 #include <data/casereader-provider.h>
25 #include <data/casereader.h>
26 #include <data/casewriter.h>
27 #include <data/lazy-casereader.h>
28 #include <data/settings.h>
29 #include <libpspp/array.h>
30 #include <libpspp/assertion.h>
31 #include <libpspp/misc.h>
32 #include <libpspp/range-map.h>
33 #include <libpspp/range-set.h>
34 #include <libpspp/sparse-xarray.h>
35 #include <libpspp/taint.h>
36 #include <libpspp/tower.h>
37
38 #include "minmax.h"
39 #include "md4.h"
40 #include "xalloc.h"
41
42 struct column;
43
44 static struct axis *axis_create (void);
45 static struct axis *axis_clone (const struct axis *);
46 static void axis_destroy (struct axis *);
47
48 static void axis_hash (const struct axis *, struct md4_ctx *);
49
50 static bool axis_allocate (struct axis *, unsigned long int request,
51                            unsigned long int *start,
52                            unsigned long int *width);
53 static void axis_make_available (struct axis *,
54                                  unsigned long int start,
55                                  unsigned long int width);
56 static unsigned long int axis_extend (struct axis *, unsigned long int width);
57
58 static unsigned long int axis_map (const struct axis *, unsigned long log_pos);
59
60 static unsigned long axis_get_size (const struct axis *);
61 static void axis_insert (struct axis *,
62                          unsigned long int log_start,
63                          unsigned long int phy_start,
64                          unsigned long int cnt);
65 static void axis_remove (struct axis *,
66                          unsigned long int start, unsigned long int cnt);
67 static void axis_move (struct axis *,
68                        unsigned long int old_start,
69                        unsigned long int new_start,
70                        unsigned long int cnt);
71
72 static struct source *source_create_empty (size_t n_bytes);
73 static struct source *source_create_casereader (struct casereader *);
74 static struct source *source_clone (const struct source *);
75 static void source_destroy (struct source *);
76
77 static casenumber source_get_backing_n_rows (const struct source *);
78
79 static int source_allocate_column (struct source *, int width);
80 static void source_release_column (struct source *, int ofs, int width);
81 static bool source_in_use (const struct source *);
82
83 static bool source_read (const struct column *, casenumber row, union value *);
84 static bool source_write (const struct column *, casenumber row,
85                           const union value *);
86 static bool source_write_column (struct column *, const union value *);
87 static bool source_has_backing (const struct source *);
88
89 static int get_source_index (const struct datasheet *ds, const struct source *source);
90
91 /* A datasheet is internally composed from a set of data files,
92    called "sources".  The sources that make up a datasheet must
93    have the same number of rows (cases), but their numbers of
94    columns (variables) may vary.
95
96    A datasheet's external view is produced by mapping (permuting
97    and selecting) its internal data.  Thus, we can rearrange or
98    delete rows or columns simply by modifying the mapping.  We
99    add rows by adding rows to each source and to the row mapping.
100    We add columns by adding a new source, then adding that source
101    to the column mapping.
102
103    Each source in a datasheet can be a casereader or a
104    sparse_xarray.  Casereaders are read-only, so when sources
105    made from casereaders need to be modified, it is done
106    "virtually" through being overlaid by a sparse_xarray. */
107 struct source
108   {
109     struct range_set *avail;    /* Free bytes are set to 1s. */
110     struct sparse_xarray *data; /* Data at top level, atop the backing. */
111     struct casereader *backing; /* Backing casereader (or null). */
112     casenumber backing_rows;    /* Number of rows in backing (if backed). */
113     size_t n_used;              /* Number of column in use (if backed). */
114   };
115
116 /* A logical column. */
117 struct column
118   {
119     struct source *source;      /* Source of the underlying physical column. */
120     int value_ofs;              /* If 'source' has a backing casereader,
121                                    column's value offset in its cases. */
122     int byte_ofs;               /* Byte offset in source's sparse_xarray. */
123     int width;                  /* 0=numeric, otherwise string width. */
124   };
125
126 /* A datasheet. */
127 struct datasheet
128   {
129     /* Data sources. */
130     struct source **sources;    /* Sources, in no particular order. */
131     size_t n_sources;           /* Number of sources. */
132
133     /* Columns. */
134     struct caseproto *proto;    /* Prototype for rows (initialized lazily). */
135     struct column *columns;     /* Logical to physical column mapping. */
136     size_t n_columns;           /* Number of logical columns. */
137     unsigned column_min_alloc;  /* Min. # of columns to put in a new source. */
138
139     /* Rows. */
140     struct axis *rows;          /* Logical to physical row mapping. */
141
142     /* Tainting. */
143     struct taint *taint;        /* Indicates corrupted data. */
144   };
145
146 /* Is this operation a read or a write? */
147 enum rw_op
148   {
149     OP_READ,
150     OP_WRITE
151   };
152
153 static void allocate_column (struct datasheet *, int width, struct column *);
154 static void release_source (struct datasheet *, struct source *);
155 static bool rw_case (struct datasheet *ds, enum rw_op op,
156                      casenumber lrow, size_t start_column, size_t n_columns,
157                      union value data[]);
158
159 /* Returns the number of bytes needed to store a value with the
160    given WIDTH on disk. */
161 static size_t
162 width_to_n_bytes (int width)
163 {
164   return width == 0 ? sizeof (double) : width;
165 }
166
167 /* Returns the address of the data in VALUE (for reading or
168    writing to/from disk).  VALUE must have the given WIDTH. */
169 static void *
170 value_to_data (const union value *value_, int width)
171 {
172   union value *value = (union value *) value_;
173   assert (sizeof value->f == sizeof (double));
174   if (width == 0)
175     return &value->f;
176   else
177     return value_str_rw (value, width);
178 }
179
180 /* Returns the number of bytes needed to store all the values in
181    PROTO on disk. */
182 static size_t
183 caseproto_to_n_bytes (const struct caseproto *proto)
184 {
185   size_t n_bytes;
186   size_t i;
187
188   n_bytes = 0;
189   for (i = 0; i < caseproto_get_n_widths (proto); i++)
190     {
191       int width = caseproto_get_width (proto, i);
192       if (width >= 0)
193         n_bytes += width_to_n_bytes (width);
194     }
195   return n_bytes;
196 }
197
198 /* Creates and returns a new datasheet.
199
200    If READER is nonnull, then the datasheet initially contains
201    the contents of READER. */
202 struct datasheet *
203 datasheet_create (struct casereader *reader)
204 {
205   struct datasheet *ds = xmalloc (sizeof *ds);
206   ds->sources = NULL;
207   ds->n_sources = 0;
208   ds->proto = NULL;
209   ds->columns = NULL;
210   ds->n_columns = 0;
211   ds->column_min_alloc = 8;
212   ds->rows = axis_create ();
213   ds->taint = taint_create ();
214
215   if (reader != NULL)
216     {
217       casenumber n_rows;
218       size_t byte_ofs;
219       size_t i;
220
221       taint_propagate (casereader_get_taint (reader), ds->taint);
222
223       ds->proto = caseproto_ref (casereader_get_proto (reader));
224
225       ds->sources = xmalloc (sizeof *ds->sources);
226       ds->sources[0] = source_create_casereader (reader);
227       ds->n_sources = 1;
228
229       ds->n_columns = caseproto_get_n_widths (ds->proto);
230       ds->columns = xnmalloc (ds->n_columns, sizeof *ds->columns);
231       byte_ofs = 0;
232       for (i = 0; i < ds->n_columns; i++)
233         {
234           struct column *column = &ds->columns[i];
235           int width = caseproto_get_width (ds->proto, i);
236           column->source = ds->sources[0];
237           column->width = width;
238           if (width >= 0)
239             {
240               column->value_ofs = i;
241               column->byte_ofs = byte_ofs;
242               byte_ofs += width_to_n_bytes (column->width);
243             }
244         }
245
246       n_rows = source_get_backing_n_rows (ds->sources[0]);
247       if (n_rows > 0)
248         axis_insert (ds->rows, 0, axis_extend (ds->rows, n_rows), n_rows);
249     }
250
251   return ds;
252 }
253
254 /* Destroys datasheet DS. */
255 void
256 datasheet_destroy (struct datasheet *ds)
257 {
258   size_t i;
259
260   if (ds == NULL)
261     return;
262
263   for (i = 0; i < ds->n_sources; i++)
264     source_destroy (ds->sources[i]);
265   free (ds->sources);
266   caseproto_unref (ds->proto);
267   free (ds->columns);
268   axis_destroy (ds->rows);
269   taint_destroy (ds->taint);
270   free (ds);
271 }
272
273 /* Returns the prototype for the cases in DS.  The caller must
274    not unref the returned prototype. */
275 const struct caseproto *
276 datasheet_get_proto (const struct datasheet *ds_)
277 {
278   struct datasheet *ds = (struct datasheet *) ds_;
279   if (ds->proto == NULL)
280     {
281       size_t i;
282
283       ds->proto = caseproto_create ();
284       for (i = 0; i < ds->n_columns; i++)
285         ds->proto = caseproto_add_width (ds->proto, ds->columns[i].width);
286     }
287   return ds->proto;
288 }
289
290 /* Returns the width of the given COLUMN within DS.
291    COLUMN must be less than the number of columns in DS. */
292 int
293 datasheet_get_column_width (const struct datasheet *ds, size_t column)
294 {
295   assert (column < datasheet_get_n_columns (ds));
296   return ds->columns[column].width;
297 }
298
299 /* Moves datasheet DS to a new location in memory, and returns
300    the new location.  Afterward, the datasheet must not be
301    accessed at its former location.
302
303    This function is useful for ensuring that all references to a
304    datasheet have been dropped, especially in conjunction with
305    tools like Valgrind. */
306 struct datasheet *
307 datasheet_rename (struct datasheet *ds)
308 {
309   struct datasheet *new = xmemdup (ds, sizeof *ds);
310   free (ds);
311   return new;
312 }
313
314 /* Returns true if datasheet DS is tainted.
315    A datasheet is tainted by an I/O error or by taint
316    propagation to the datasheet. */
317 bool
318 datasheet_error (const struct datasheet *ds)
319 {
320   return taint_is_tainted (ds->taint);
321 }
322
323 /* Marks datasheet DS tainted. */
324 void
325 datasheet_force_error (struct datasheet *ds)
326 {
327   taint_set_taint (ds->taint);
328 }
329
330 /* Returns datasheet DS's taint object. */
331 const struct taint *
332 datasheet_get_taint (const struct datasheet *ds)
333 {
334   return ds->taint;
335 }
336
337 /* Returns the number of rows in DS. */
338 casenumber
339 datasheet_get_n_rows (const struct datasheet *ds)
340 {
341   return axis_get_size (ds->rows);
342 }
343
344 /* Returns the number of columns in DS. */
345 size_t
346 datasheet_get_n_columns (const struct datasheet *ds)
347 {
348   return ds->n_columns;
349 }
350
351 /* Inserts a column of the given WIDTH into datasheet DS just
352    before column BEFORE.  Initializes the contents of each row in
353    the inserted column to VALUE (which must have width WIDTH).
354
355    Returns true if successful, false on failure.  In case of
356    failure, the datasheet is unchanged. */
357 bool
358 datasheet_insert_column (struct datasheet *ds,
359                          const union value *value, int width, size_t before)
360 {
361   struct column *col;
362
363   ds->columns = xnrealloc (ds->columns,
364                            ds->n_columns + 1, sizeof *ds->columns);
365   insert_element (ds->columns, ds->n_columns, sizeof *ds->columns, before);
366   col = &ds->columns[before];
367   ds->n_columns++;
368
369   allocate_column (ds, width, col);
370
371   if (width >= 0 && !source_write_column (col, value))
372     {
373       datasheet_delete_columns (ds, before, 1);
374       taint_set_taint (ds->taint);
375       return false;
376     }
377
378   return true;
379 }
380
381 /* Deletes the N columns in DS starting from column START. */
382 void
383 datasheet_delete_columns (struct datasheet *ds, size_t start, size_t n)
384 {
385   if (n > 0)
386     {
387       size_t i;
388
389       for (i = start; i < start + n; i++)
390         {
391           struct column *column = &ds->columns[i];
392           struct source *source = column->source;
393           source_release_column (source, column->byte_ofs, column->width);
394           release_source (ds, source);
395         }
396
397       remove_range (ds->columns, ds->n_columns, sizeof *ds->columns, start, n);
398       ds->n_columns -= n;
399
400       caseproto_unref (ds->proto);
401       ds->proto = NULL;
402     }
403 }
404
405 /* Moves the N columns in DS starting at position OLD_START so
406    that they then start at position NEW_START.  Equivalent to
407    deleting the column rows, then inserting them at what becomes
408    position NEW_START after the deletion. */
409 void
410 datasheet_move_columns (struct datasheet *ds,
411                         size_t old_start, size_t new_start,
412                         size_t n)
413 {
414   move_range (ds->columns, ds->n_columns, sizeof *ds->columns,
415               old_start, new_start, n);
416
417   caseproto_unref (ds->proto);
418   ds->proto = NULL;
419 }
420
421 struct resize_datasheet_value_aux
422   {
423     union value src_value;
424     size_t src_ofs;
425     int src_width;
426
427     void (*resize_cb) (const union value *, union value *, void *aux);
428     void *resize_cb_aux;
429
430     union value dst_value;
431     size_t dst_ofs;
432     int dst_width;
433   };
434
435 static bool
436 resize_datasheet_value (const void *src, void *dst, void *aux_)
437 {
438   struct resize_datasheet_value_aux *aux = aux_;
439
440   memcpy (value_to_data (&aux->src_value, aux->src_width),
441           (uint8_t *) src + aux->src_ofs,
442           width_to_n_bytes (aux->src_width));
443
444   aux->resize_cb (&aux->src_value, &aux->dst_value, aux->resize_cb_aux);
445
446   memcpy ((uint8_t *) dst + aux->dst_ofs,
447           value_to_data (&aux->dst_value, aux->dst_width),
448           width_to_n_bytes (aux->dst_width));
449
450   return true;
451 }
452
453 bool
454 datasheet_resize_column (struct datasheet *ds, size_t column, int new_width,
455                          void (*resize_cb) (const union value *,
456                                             union value *, void *aux),
457                          void *resize_cb_aux)
458 {
459   /* XXX needs a test. */
460   struct column old_col;
461   struct column *col;
462   int old_width;
463
464   assert (column < datasheet_get_n_columns (ds));
465
466   col = &ds->columns[column];
467   old_col = *col;
468   old_width = old_col.width;
469
470   if (old_width == new_width)
471     {
472       /* FIXME: for consistency, we should call resize_cb() on
473          each row. */
474     }
475   else if (new_width == -1)
476     {
477       datasheet_delete_columns (ds, column, 1);
478       datasheet_insert_column (ds, NULL, -1, column);
479     }
480   else if (old_width == -1)
481     {
482       union value value;
483       value_init (&value, new_width);
484       value_set_missing (&value, new_width);
485       if (resize_cb != NULL)
486         resize_cb (NULL, &value, resize_cb_aux);
487       datasheet_delete_columns (ds, column, 1);
488       datasheet_insert_column (ds, &value, new_width, column);
489       value_destroy (&value, new_width);
490     }
491   else if (source_has_backing (col->source))
492     {
493       unsigned long int n_rows = axis_get_size (ds->rows);
494       union value src, dst;
495       size_t row;
496
497       source_release_column (col->source, col->byte_ofs, col->width);
498       allocate_column (ds, new_width, col);
499
500       value_init (&src, old_width);
501       value_init (&dst, new_width);
502       for (row = 0; row < n_rows; row++)
503         {
504           if (!source_read (&old_col, row, &src))
505             {
506               /* FIXME: back out col changes. */
507               return false;
508             }
509           resize_cb (&src, &dst, resize_cb_aux);
510           if (!source_write (col, row, &dst))
511             {
512               /* FIXME: back out col changes. */
513               return false;
514             }
515         }
516
517       release_source (ds, old_col.source);
518     }
519   else
520     {
521       struct resize_datasheet_value_aux aux;
522
523       source_release_column (col->source, col->byte_ofs, col->width);
524       allocate_column (ds, new_width, col);
525
526       value_init (&aux.src_value, old_col.width);
527       aux.src_ofs = old_col.byte_ofs;
528       aux.src_width = old_col.width;
529       aux.resize_cb = resize_cb;
530       aux.resize_cb_aux = resize_cb_aux;
531       value_init (&aux.dst_value, new_width);
532       aux.dst_ofs = col->byte_ofs;
533       aux.dst_width = new_width;
534       sparse_xarray_copy (old_col.source->data, col->source->data,
535                           resize_datasheet_value, &aux);
536       value_destroy (&aux.src_value, old_width);
537       value_destroy (&aux.dst_value, new_width);
538
539       release_source (ds, old_col.source);
540     }
541   return true;
542 }
543
544 /* Retrieves and returns the contents of the given ROW in
545    datasheet DS.  The caller owns the returned case and must
546    unref it when it is no longer needed.  Returns a null pointer
547    on I/O error. */
548 struct ccase *
549 datasheet_get_row (const struct datasheet *ds, casenumber row)
550 {
551   size_t n_columns = datasheet_get_n_columns (ds);
552   struct ccase *c = case_create (datasheet_get_proto (ds));
553   if (rw_case ((struct datasheet *) ds, OP_READ,
554                row, 0, n_columns, case_data_all_rw (c)))
555     return c;
556   else
557     {
558       case_unref (c);
559       return NULL;
560     }
561 }
562
563 /* Stores the contents of case C, which is destroyed, into the
564    given ROW in DS.  Returns true on success, false on I/O error.
565    On failure, the given ROW might be partially modified or
566    corrupted. */
567 bool
568 datasheet_put_row (struct datasheet *ds, casenumber row, struct ccase *c)
569 {
570   size_t n_columns = datasheet_get_n_columns (ds);
571   bool ok = rw_case (ds, OP_WRITE, row, 0, n_columns,
572                      (union value *) case_data_all (c));
573   case_unref (c);
574   return ok;
575 }
576
577 /* Stores the values of COLUMN in DS in the given ROW in DS into
578    VALUE.  The caller must have already initialized VALUE as a
579    value of the appropriate width (as returned by
580    datasheet_get_column_width (DS, COLUMN)).  Returns true if
581    successful, false on I/O error. */
582 bool
583 datasheet_get_value (const struct datasheet *ds, casenumber row,
584                      size_t column, union value *value)
585 {
586   assert (row >= 0);
587   return rw_case ((struct datasheet *) ds, OP_READ, row, column, 1, value);
588 }
589
590 /* Stores VALUE into DS in the given ROW and COLUMN.  VALUE must
591    have the correct width for COLUMN (as returned by
592    datasheet_get_column_width (DS, COLUMN)).  Returns true if
593    successful, false on I/O error.  On failure, ROW might be
594    partially modified or corrupted. */
595 bool
596 datasheet_put_value (struct datasheet *ds UNUSED, casenumber row UNUSED,
597                      size_t column UNUSED, const union value *value UNUSED)
598 {
599   return rw_case (ds, OP_WRITE, row, column, 1, (union value *) value);
600 }
601
602 /* Inserts the CNT cases at C into datasheet DS just before row
603    BEFORE.  Returns true if successful, false on I/O error.  On
604    failure, datasheet DS is not modified.
605
606    Regardless of success, this function unrefs all of the cases
607    in C. */
608 bool
609 datasheet_insert_rows (struct datasheet *ds,
610                        casenumber before, struct ccase *c[],
611                        casenumber cnt)
612 {
613   casenumber added = 0;
614   while (cnt > 0)
615     {
616       unsigned long first_phy;
617       unsigned long phy_cnt;
618       unsigned long i;
619
620       /* Allocate physical rows from the pool of available
621          rows. */
622       if (!axis_allocate (ds->rows, cnt, &first_phy, &phy_cnt))
623         {
624           /* No rows were available.  Extend the row axis to make
625              some new ones available. */
626           phy_cnt = cnt;
627           first_phy = axis_extend (ds->rows, cnt);
628         }
629
630       /* Insert the new rows into the row mapping. */
631       axis_insert (ds->rows, before, first_phy, phy_cnt);
632
633       /* Initialize the new rows. */
634       for (i = 0; i < phy_cnt; i++)
635         if (!datasheet_put_row (ds, before + i, c[i]))
636           {
637             while (++i < cnt)
638               case_unref (c[i]);
639             datasheet_delete_rows (ds, before - added, phy_cnt + added);
640             return false;
641           }
642
643       /* Advance. */
644       c += phy_cnt;
645       cnt -= phy_cnt;
646       before += phy_cnt;
647       added += phy_cnt;
648     }
649   return true;
650 }
651
652 /* Deletes the CNT rows in DS starting from row FIRST. */
653 void
654 datasheet_delete_rows (struct datasheet *ds,
655                        casenumber first, casenumber cnt)
656 {
657   size_t lrow;
658
659   /* Free up rows for reuse.
660      FIXME: optimize. */
661   for (lrow = first; lrow < first + cnt; lrow++)
662     axis_make_available (ds->rows, axis_map (ds->rows, lrow), 1);
663
664   /* Remove rows from logical-to-physical mapping. */
665   axis_remove (ds->rows, first, cnt);
666 }
667
668 /* Moves the CNT rows in DS starting at position OLD_START so
669    that they then start at position NEW_START.  Equivalent to
670    deleting the given rows, then inserting them at what becomes
671    position NEW_START after the deletion. */
672 void
673 datasheet_move_rows (struct datasheet *ds,
674                      size_t old_start, size_t new_start,
675                      size_t cnt)
676 {
677   axis_move (ds->rows, old_start, new_start, cnt);
678 }
679 \f
680 static const struct casereader_random_class datasheet_reader_class;
681
682 /* Creates and returns a casereader whose input cases are the
683    rows in datasheet DS.  From the caller's perspective, DS is
684    effectively destroyed by this operation, such that the caller
685    must not reference it again. */
686 struct casereader *
687 datasheet_make_reader (struct datasheet *ds)
688 {
689   struct casereader *reader;
690   ds = datasheet_rename (ds);
691   reader = casereader_create_random (datasheet_get_proto (ds),
692                                      datasheet_get_n_rows (ds),
693                                      &datasheet_reader_class, ds);
694   taint_propagate (datasheet_get_taint (ds), casereader_get_taint (reader));
695   return reader;
696 }
697
698 /* "read" function for the datasheet random casereader. */
699 static struct ccase *
700 datasheet_reader_read (struct casereader *reader UNUSED, void *ds_,
701                        casenumber case_idx)
702 {
703   struct datasheet *ds = ds_;
704   if (case_idx < datasheet_get_n_rows (ds))
705     {
706       struct ccase *c = datasheet_get_row (ds, case_idx);
707       if (c == NULL)
708         taint_set_taint (ds->taint);
709       return c;
710     }
711   else
712     return NULL;
713 }
714
715 /* "destroy" function for the datasheet random casereader. */
716 static void
717 datasheet_reader_destroy (struct casereader *reader UNUSED, void *ds_)
718 {
719   struct datasheet *ds = ds_;
720   datasheet_destroy (ds);
721 }
722
723 /* "advance" function for the datasheet random casereader. */
724 static void
725 datasheet_reader_advance (struct casereader *reader UNUSED, void *ds_,
726                           casenumber case_cnt)
727 {
728   struct datasheet *ds = ds_;
729   datasheet_delete_rows (ds, 0, case_cnt);
730 }
731
732 /* Random casereader class for a datasheet. */
733 static const struct casereader_random_class datasheet_reader_class =
734   {
735     datasheet_reader_read,
736     datasheet_reader_destroy,
737     datasheet_reader_advance,
738   };
739 \f
740 static void
741 allocate_column (struct datasheet *ds, int width, struct column *column)
742 {
743   caseproto_unref (ds->proto);
744   ds->proto = NULL;
745
746   column->value_ofs = -1;
747   column->width = width;
748   if (width >= 0)
749     {
750       int n_bytes;
751       size_t i;
752
753       n_bytes = width_to_n_bytes (width);
754       for (i = 0; i < ds->n_sources; i++)
755         {
756           column->source = ds->sources[i];
757           column->byte_ofs = source_allocate_column (column->source, n_bytes);
758           if (column->byte_ofs >= 0)
759             return;
760         }
761
762       column->source = source_create_empty (MAX (n_bytes,
763                                                  ds->column_min_alloc));
764       ds->sources = xnrealloc (ds->sources,
765                                ds->n_sources + 1, sizeof *ds->sources);
766       ds->sources[ds->n_sources++] = column->source;
767
768       ds->column_min_alloc = MIN (65536, ds->column_min_alloc * 2);
769
770       column->byte_ofs = source_allocate_column (column->source, n_bytes);
771       assert (column->byte_ofs >= 0);
772     }
773   else
774     {
775       column->source = NULL;
776       column->byte_ofs = -1;
777     }
778 }
779
780 static void
781 release_source (struct datasheet *ds, struct source *source)
782 {
783   if (source_has_backing (source) && !source_in_use (source))
784     {
785       /* Since only the first source to be added ever
786          has a backing, this source must have index
787          0.  */
788       assert (source == ds->sources[0]);
789       ds->sources[0] = ds->sources[--ds->n_sources];
790       source_destroy (source);
791     }
792 }
793
794 /* Reads (if OP is OP_READ) or writes (if op is OP_WRITE) the
795    N_COLUMNS columns starting from column START_COLUMN in row
796    LROW to/from the N_COLUMNS values in DATA. */
797 static bool
798 rw_case (struct datasheet *ds, enum rw_op op,
799          casenumber lrow, size_t start_column, size_t n_columns,
800          union value data[])
801 {
802   casenumber prow;
803   size_t i;
804
805   assert (lrow < datasheet_get_n_rows (ds));
806   assert (n_columns <= datasheet_get_n_columns (ds));
807   assert (start_column + n_columns <= datasheet_get_n_columns (ds));
808
809   prow = axis_map (ds->rows, lrow);
810   for (i = 0; i < n_columns; i++)
811     {
812       struct column *c = &ds->columns[start_column + i];
813       if (c->width >= 0)
814         {
815           bool ok;
816
817           if (op == OP_READ)
818             ok = source_read (c, prow, &data[i]);
819           else
820             ok = source_write (c, prow, &data[i]);
821
822           if (!ok)
823             {
824               taint_set_taint (ds->taint);
825               return false;
826             }
827         }
828     }
829   return true;
830 }
831 \f
832 /* An axis.
833
834    An axis has two functions.  First, it maintains a mapping from
835    logical (client-visible) to physical (storage) ordinates.  The
836    axis_map and axis_get_size functions inspect this mapping, and
837    the axis_insert, axis_remove, and axis_move functions modify
838    it.  Second, it tracks the set of ordinates that are unused
839    and available for reuse.  The axis_allocate,
840    axis_make_available, and axis_extend functions affect the set
841    of available ordinates. */
842 struct axis
843 {
844   struct tower log_to_phy;     /* Map from logical to physical ordinates;
845                                   contains "struct axis_group"s. */
846   struct range_set *available; /* Set of unused, available ordinates. */
847   unsigned long int phy_size;  /* Current physical length of axis. */
848 };
849
850 /* A mapping from logical to physical ordinates. */
851 struct axis_group
852 {
853   struct tower_node logical;  /* Range of logical ordinates. */
854   unsigned long phy_start;    /* First corresponding physical ordinate. */
855 };
856
857 static struct axis_group *axis_group_from_tower_node (struct tower_node *);
858 static struct tower_node *make_axis_group (unsigned long int phy_start);
859 static struct tower_node *split_axis (struct axis *, unsigned long int where);
860 static void merge_axis_nodes (struct axis *, struct tower_node *,
861                               struct tower_node **other_node);
862 static void check_axis_merged (const struct axis *axis UNUSED);
863
864 /* Creates and returns a new, initially empty axis. */
865 static struct axis *
866 axis_create (void)
867 {
868   struct axis *axis = xmalloc (sizeof *axis);
869   tower_init (&axis->log_to_phy);
870   axis->available = range_set_create ();
871   axis->phy_size = 0;
872   return axis;
873 }
874
875 /* Returns a clone of existing axis OLD.
876
877    Currently this is used only by the datasheet model checker
878    driver, but it could be otherwise useful. */
879 static struct axis *
880 axis_clone (const struct axis *old)
881 {
882   const struct tower_node *node;
883   struct axis *new;
884
885   new = xmalloc (sizeof *new);
886   tower_init (&new->log_to_phy);
887   new->available = range_set_clone (old->available, NULL);
888   new->phy_size = old->phy_size;
889
890   for (node = tower_first (&old->log_to_phy); node != NULL;
891        node = tower_next (&old->log_to_phy, node))
892     {
893       unsigned long int size = tower_node_get_size (node);
894       struct axis_group *group = tower_data (node, struct axis_group, logical);
895       tower_insert (&new->log_to_phy, size, make_axis_group (group->phy_start),
896                     NULL);
897     }
898
899   return new;
900 }
901
902 /* Adds the state of AXIS to the MD4 hash context CTX.
903
904    This is only used by the datasheet model checker driver.  It
905    is unlikely to be otherwise useful. */
906 static void
907 axis_hash (const struct axis *axis, struct md4_ctx *ctx)
908 {
909   const struct tower_node *tn;
910   const struct range_set_node *rsn;
911
912   for (tn = tower_first (&axis->log_to_phy); tn != NULL;
913        tn = tower_next (&axis->log_to_phy, tn))
914     {
915       struct axis_group *group = tower_data (tn, struct axis_group, logical);
916       unsigned long int phy_start = group->phy_start;
917       unsigned long int size = tower_node_get_size (tn);
918
919       md4_process_bytes (&phy_start, sizeof phy_start, ctx);
920       md4_process_bytes (&size, sizeof size, ctx);
921     }
922
923   for (rsn = range_set_first (axis->available); rsn != NULL;
924        rsn = range_set_next (axis->available, rsn))
925     {
926       unsigned long int start = range_set_node_get_start (rsn);
927       unsigned long int end = range_set_node_get_end (rsn);
928
929       md4_process_bytes (&start, sizeof start, ctx);
930       md4_process_bytes (&end, sizeof end, ctx);
931     }
932
933   md4_process_bytes (&axis->phy_size, sizeof axis->phy_size, ctx);
934 }
935
936 /* Destroys AXIS. */
937 static void
938 axis_destroy (struct axis *axis)
939 {
940   if (axis == NULL)
941     return;
942
943   while (!tower_is_empty (&axis->log_to_phy))
944     {
945       struct tower_node *node = tower_first (&axis->log_to_phy);
946       struct axis_group *group = tower_data (node, struct axis_group,
947                                              logical);
948       tower_delete (&axis->log_to_phy, node);
949       free (group);
950     }
951
952   range_set_destroy (axis->available);
953   free (axis);
954 }
955
956 /* Allocates up to REQUEST contiguous unused and available
957    ordinates from AXIS.  If successful, stores the number
958    obtained into *WIDTH and the ordinate of the first into
959    *START, marks the ordinates as now unavailable return true.
960    On failure, which occurs only if AXIS has no unused and
961    available ordinates, returns false without modifying AXIS. */
962 static bool
963 axis_allocate (struct axis *axis, unsigned long int request,
964                unsigned long int *start, unsigned long int *width)
965 {
966   return range_set_allocate (axis->available, request, start, width);
967 }
968
969 /* Marks the WIDTH contiguous ordinates in AXIS, starting from
970    START, as unused and available. */
971 static void
972 axis_make_available (struct axis *axis,
973                      unsigned long int start, unsigned long int width)
974 {
975   range_set_insert (axis->available, start, width);
976 }
977
978 /* Extends the total physical length of AXIS by WIDTH and returns
979    the first ordinate in the new physical region. */
980 static unsigned long int
981 axis_extend (struct axis *axis, unsigned long int width)
982 {
983   unsigned long int start = axis->phy_size;
984   axis->phy_size += width;
985   return start;
986 }
987
988 /* Returns the physical ordinate in AXIS corresponding to logical
989    ordinate LOG_POS.  LOG_POS must be less than the logical
990    length of AXIS. */
991 static unsigned long int
992 axis_map (const struct axis *axis, unsigned long log_pos)
993 {
994   struct tower_node *node;
995   struct axis_group *group;
996   unsigned long int group_start;
997
998   node = tower_lookup (&axis->log_to_phy, log_pos, &group_start);
999   group = tower_data (node, struct axis_group, logical);
1000   return group->phy_start + (log_pos - group_start);
1001 }
1002
1003 /* Returns the logical length of AXIS. */
1004 static unsigned long
1005 axis_get_size (const struct axis *axis)
1006 {
1007   return tower_height (&axis->log_to_phy);
1008 }
1009
1010 /* Inserts the CNT contiguous physical ordinates starting at
1011    PHY_START into AXIS's logical-to-physical mapping, starting at
1012    logical position LOG_START. */
1013 static void
1014 axis_insert (struct axis *axis,
1015              unsigned long int log_start, unsigned long int phy_start,
1016              unsigned long int cnt)
1017 {
1018   struct tower_node *before = split_axis (axis, log_start);
1019   struct tower_node *new = make_axis_group (phy_start);
1020   tower_insert (&axis->log_to_phy, cnt, new, before);
1021   merge_axis_nodes (axis, new, NULL);
1022   check_axis_merged (axis);
1023 }
1024
1025 /* Removes CNT ordinates from AXIS's logical-to-physical mapping
1026    starting at logical position START. */
1027 static void
1028 axis_remove (struct axis *axis,
1029              unsigned long int start, unsigned long int cnt)
1030 {
1031   if (cnt > 0)
1032     {
1033       struct tower_node *last = split_axis (axis, start + cnt);
1034       struct tower_node *cur, *next;
1035       for (cur = split_axis (axis, start); cur != last; cur = next)
1036         {
1037           next = tower_delete (&axis->log_to_phy, cur);
1038           free (axis_group_from_tower_node (cur));
1039         }
1040       merge_axis_nodes (axis, last, NULL);
1041       check_axis_merged (axis);
1042     }
1043 }
1044
1045 /* Moves the CNT ordinates in AXIS's logical-to-mapping starting
1046    at logical position OLD_START so that they then start at
1047    position NEW_START. */
1048 static void
1049 axis_move (struct axis *axis,
1050            unsigned long int old_start, unsigned long int new_start,
1051            unsigned long int cnt)
1052 {
1053   if (cnt > 0 && old_start != new_start)
1054     {
1055       struct tower_node *old_first, *old_last, *new_first;
1056       struct tower_node *merge1, *merge2;
1057       struct tower tmp_array;
1058
1059       /* Move ordinates OLD_START...(OLD_START + CNT) into new,
1060          separate TMP_ARRAY. */
1061       old_first = split_axis (axis, old_start);
1062       old_last = split_axis (axis, old_start + cnt);
1063       tower_init (&tmp_array);
1064       tower_splice (&tmp_array, NULL,
1065                     &axis->log_to_phy, old_first, old_last);
1066       merge_axis_nodes (axis, old_last, NULL);
1067       check_axis_merged (axis);
1068
1069       /* Move TMP_ARRAY to position NEW_START. */
1070       new_first = split_axis (axis, new_start);
1071       merge1 = tower_first (&tmp_array);
1072       merge2 = tower_last (&tmp_array);
1073       if (merge2 == merge1)
1074         merge2 = NULL;
1075       tower_splice (&axis->log_to_phy, new_first, &tmp_array, old_first, NULL);
1076       merge_axis_nodes (axis, merge1, &merge2);
1077       merge_axis_nodes (axis, merge2, NULL);
1078       check_axis_merged (axis);
1079     }
1080 }
1081
1082 /* Returns the axis_group in which NODE is embedded. */
1083 static struct axis_group *
1084 axis_group_from_tower_node (struct tower_node *node)
1085 {
1086   return tower_data (node, struct axis_group, logical);
1087 }
1088
1089 /* Creates and returns a new axis_group at physical position
1090    PHY_START. */
1091 static struct tower_node *
1092 make_axis_group (unsigned long phy_start)
1093 {
1094   struct axis_group *group = xmalloc (sizeof *group);
1095   group->phy_start = phy_start;
1096   return &group->logical;
1097 }
1098
1099 /* Returns the tower_node in AXIS's logical-to-physical map whose
1100    bottom edge is at exact level WHERE.  If there is no such
1101    tower_node in AXIS's logical-to-physical map, then split_axis
1102    creates one by breaking an existing tower_node into two
1103    separate ones, unless WHERE is equal to the tower height, in
1104    which case it simply returns a null pointer. */
1105 static struct tower_node *
1106 split_axis (struct axis *axis, unsigned long int where)
1107 {
1108   unsigned long int group_start;
1109   struct tower_node *group_node;
1110   struct axis_group *group;
1111
1112   assert (where <= tower_height (&axis->log_to_phy));
1113   if (where >= tower_height (&axis->log_to_phy))
1114     return NULL;
1115
1116   group_node = tower_lookup (&axis->log_to_phy, where, &group_start);
1117   group = axis_group_from_tower_node (group_node);
1118   if (where > group_start)
1119     {
1120       unsigned long int size_1 = where - group_start;
1121       unsigned long int size_2 = tower_node_get_size (group_node) - size_1;
1122       struct tower_node *next = tower_next (&axis->log_to_phy, group_node);
1123       struct tower_node *new = make_axis_group (group->phy_start + size_1);
1124       tower_resize (&axis->log_to_phy, group_node, size_1);
1125       tower_insert (&axis->log_to_phy, size_2, new, next);
1126       return new;
1127     }
1128   else
1129     return &group->logical;
1130 }
1131
1132 /* Within AXIS, attempts to merge NODE (or the last node in AXIS,
1133    if NODE is null) with its neighbor nodes.  This is possible
1134    when logically adjacent nodes are also adjacent physically (in
1135    the same order).
1136
1137    When a merge occurs, and OTHER_NODE is non-null and points to
1138    the node to be deleted, this function also updates
1139    *OTHER_NODE, if necessary, to ensure that it remains a valid
1140    pointer. */
1141 static void
1142 merge_axis_nodes (struct axis *axis, struct tower_node *node,
1143                   struct tower_node **other_node)
1144 {
1145   struct tower *t = &axis->log_to_phy;
1146   struct axis_group *group;
1147   struct tower_node *next, *prev;
1148
1149   /* Find node to potentially merge with neighbors. */
1150   if (node == NULL)
1151     node = tower_last (t);
1152   if (node == NULL)
1153     return;
1154   group = axis_group_from_tower_node (node);
1155
1156   /* Try to merge NODE with successor. */
1157   next = tower_next (t, node);
1158   if (next != NULL)
1159     {
1160       struct axis_group *next_group = axis_group_from_tower_node (next);
1161       unsigned long this_height = tower_node_get_size (node);
1162
1163       if (group->phy_start + this_height == next_group->phy_start)
1164         {
1165           unsigned long next_height = tower_node_get_size (next);
1166           tower_resize (t, node, this_height + next_height);
1167           if (other_node != NULL && *other_node == next)
1168             *other_node = tower_next (t, *other_node);
1169           tower_delete (t, next);
1170           free (next_group);
1171         }
1172     }
1173
1174   /* Try to merge NODE with predecessor. */
1175   prev = tower_prev (t, node);
1176   if (prev != NULL)
1177     {
1178       struct axis_group *prev_group = axis_group_from_tower_node (prev);
1179       unsigned long prev_height = tower_node_get_size (prev);
1180
1181       if (prev_group->phy_start + prev_height == group->phy_start)
1182         {
1183           unsigned long this_height = tower_node_get_size (node);
1184           group->phy_start = prev_group->phy_start;
1185           tower_resize (t, node, this_height + prev_height);
1186           if (other_node != NULL && *other_node == prev)
1187             *other_node = tower_next (t, *other_node);
1188           tower_delete (t, prev);
1189           free (prev_group);
1190         }
1191     }
1192 }
1193
1194 /* Verify that all potentially merge-able nodes in AXIS are
1195    actually merged. */
1196 static void
1197 check_axis_merged (const struct axis *axis UNUSED)
1198 {
1199 #if ASSERT_LEVEL >= 10
1200   struct tower_node *prev, *node;
1201
1202   for (prev = NULL, node = tower_first (&axis->log_to_phy); node != NULL;
1203        prev = node, node = tower_next (&axis->log_to_phy, node))
1204     if (prev != NULL)
1205       {
1206         struct axis_group *prev_group = axis_group_from_tower_node (prev);
1207         unsigned long prev_height = tower_node_get_size (prev);
1208         struct axis_group *node_group = axis_group_from_tower_node (node);
1209         assert (prev_group->phy_start + prev_height != node_group->phy_start);
1210       }
1211 #endif
1212 }
1213 \f
1214 /* A source. */
1215
1216 /* Creates and returns an empty, unbacked source with N_BYTES
1217    bytes per case, none of which are initially in use. */
1218 static struct source *
1219 source_create_empty (size_t n_bytes)
1220 {
1221   struct source *source = xmalloc (sizeof *source);
1222   size_t row_size = n_bytes + 4 * sizeof (void *);
1223   size_t max_memory_rows = settings_get_workspace () / row_size;
1224   source->avail = range_set_create ();
1225   range_set_insert (source->avail, 0, n_bytes);
1226   source->data = sparse_xarray_create (n_bytes, MAX (max_memory_rows, 4));
1227   source->backing = NULL;
1228   source->backing_rows = 0;
1229   source->n_used = 0;
1230   return source;
1231 }
1232
1233 /* Creates and returns a new source backed by READER and with the
1234    same initial dimensions and content. */
1235 static struct source *
1236 source_create_casereader (struct casereader *reader)
1237 {
1238   const struct caseproto *proto = casereader_get_proto (reader);
1239   size_t n_bytes = caseproto_to_n_bytes (proto);
1240   struct source *source = source_create_empty (n_bytes);
1241   size_t n_columns;
1242   size_t i;
1243
1244   range_set_delete (source->avail, 0, n_bytes);
1245   source->backing = reader;
1246   source->backing_rows = casereader_count_cases (reader);
1247
1248   source->n_used = 0;
1249   n_columns = caseproto_get_n_widths (proto);
1250   for (i = 0; i < n_columns; i++)
1251     if (caseproto_get_width (proto, i) >= 0)
1252       source->n_used++;
1253
1254   return source;
1255 }
1256
1257 /* Returns a clone of source OLD with the same data and backing
1258    (if any).
1259
1260    Currently this is used only by the datasheet model checker
1261    driver, but it could be otherwise useful. */
1262 static struct source *
1263 source_clone (const struct source *old)
1264 {
1265   struct source *new = xmalloc (sizeof *new);
1266   new->avail = range_set_clone (old->avail, NULL);
1267   new->data = sparse_xarray_clone (old->data);
1268   new->backing = old->backing != NULL ? casereader_clone (old->backing) : NULL;
1269   new->backing_rows = old->backing_rows;
1270   new->n_used = old->n_used;
1271   if (new->data == NULL)
1272     {
1273       source_destroy (new);
1274       new = NULL;
1275     }
1276   return new;
1277 }
1278
1279 static int
1280 source_allocate_column (struct source *source, int width)
1281 {
1282   unsigned long int start;
1283   int n_bytes;
1284
1285   assert (width >= 0);
1286   n_bytes = width_to_n_bytes (width);
1287   if (source->backing == NULL
1288       && range_set_allocate_fully (source->avail, n_bytes, &start))
1289     return start;
1290   else
1291     return -1;
1292 }
1293
1294 static void
1295 source_release_column (struct source *source, int ofs, int width)
1296 {
1297   assert (width >= 0);
1298   range_set_insert (source->avail, ofs, width_to_n_bytes (width));
1299   if (source->backing != NULL)
1300     source->n_used--;
1301 }
1302
1303 /* Returns true if SOURCE has any columns in use,
1304    false otherwise. */
1305 static bool
1306 source_in_use (const struct source *source)
1307 {
1308   return source->n_used > 0;
1309 }
1310
1311 /* Destroys SOURCE and its data and backing, if any. */
1312 static void
1313 source_destroy (struct source *source)
1314 {
1315   if (source != NULL)
1316     {
1317       range_set_destroy (source->avail);
1318       sparse_xarray_destroy (source->data);
1319       casereader_destroy (source->backing);
1320       free (source);
1321     }
1322 }
1323
1324 /* Returns the number of rows in SOURCE's backing casereader
1325    (SOURCE must have a backing casereader). */
1326 static casenumber
1327 source_get_backing_n_rows (const struct source *source)
1328 {
1329   assert (source_has_backing (source));
1330   return source->backing_rows;
1331 }
1332
1333 /* Reads the given COLUMN from SOURCE in the given ROW, into
1334    VALUE.  Returns true if successful, false on I/O error.
1335
1336    The caller must have initialized VALUE with the proper
1337    width. */
1338 static bool
1339 source_read (const struct column *column, casenumber row, union value *value)
1340 {
1341   struct source *source = column->source;
1342
1343   assert (column->width >= 0);
1344   if (source->backing == NULL
1345       || sparse_xarray_contains_row (source->data, row))
1346     return sparse_xarray_read (source->data, row, column->byte_ofs,
1347                                width_to_n_bytes (column->width),
1348                                value_to_data (value, column->width));
1349   else
1350     {
1351       struct ccase *c = casereader_peek (source->backing, row);
1352       bool ok = c != NULL;
1353       if (ok)
1354         {
1355           value_copy (value, case_data_idx (c, column->value_ofs),
1356                       column->width);
1357           case_unref (c);
1358         }
1359       return ok;
1360     }
1361 }
1362
1363 static bool
1364 copy_case_into_source (struct source *source, struct ccase *c, casenumber row)
1365 {
1366   const struct caseproto *proto = casereader_get_proto (source->backing);
1367   size_t n_widths = caseproto_get_n_widths (proto);
1368   size_t ofs;
1369   size_t i;
1370
1371   ofs = 0;
1372   for (i = 0; i < n_widths; i++)
1373     {
1374       int width = caseproto_get_width (proto, i);
1375       if (width >= 0)
1376         {
1377           int n_bytes = width_to_n_bytes (width);
1378           if (!sparse_xarray_write (source->data, row, ofs, n_bytes,
1379                                     value_to_data (case_data_idx (c, i),
1380                                                    width)))
1381             return false;
1382           ofs += n_bytes;
1383         }
1384     }
1385   return true;
1386 }
1387
1388 /* Writes VALUE to SOURCE in the given ROW and COLUMN.  Returns
1389    true if successful, false on I/O error.  On error, the row's
1390    data may be completely or partially corrupted, both inside and
1391    outside the region to be written.  */
1392 static bool
1393 source_write (const struct column *column, casenumber row,
1394               const union value *value)
1395 {
1396   struct source *source = column->source;
1397   struct casereader *backing = source->backing;
1398
1399   assert (column->width >= 0);
1400   if (backing != NULL
1401       && !sparse_xarray_contains_row (source->data, row)
1402       && row < source->backing_rows)
1403     {
1404       struct ccase *c;
1405       bool ok;
1406
1407       c = casereader_peek (backing, row);
1408       if (c == NULL)
1409         return false;
1410
1411       ok = copy_case_into_source (source, c, row);
1412       case_unref (c);
1413       if (!ok)
1414         return false;
1415     }
1416
1417   return sparse_xarray_write (source->data, row, column->byte_ofs,
1418                               width_to_n_bytes (column->width),
1419                               value_to_data (value, column->width));
1420 }
1421
1422 /* Within SOURCE, which must not have a backing casereader,
1423    writes the VALUE_CNT values in VALUES_CNT to the VALUE_CNT
1424    columns starting from START_COLUMN, in every row, even in rows
1425    not yet otherwise initialized.  Returns true if successful,
1426    false if an I/O error occurs.
1427
1428    We don't support backing != NULL because (1) it's harder and
1429    (2) this function is only called by
1430    datasheet_insert_column, which doesn't reuse columns from
1431    sources that are backed by casereaders. */
1432 static bool
1433 source_write_column (struct column *column, const union value *value)
1434 {
1435   int width = column->width;
1436
1437   assert (column->source->backing == NULL);
1438   assert (width >= 0);
1439
1440   return sparse_xarray_write_columns (column->source->data, column->byte_ofs,
1441                                       width_to_n_bytes (width),
1442                                       value_to_data (value, width));
1443 }
1444
1445 /* Returns true if SOURCE has a backing casereader, false
1446    otherwise. */
1447 static bool
1448 source_has_backing (const struct source *source)
1449 {
1450   return source->backing != NULL;
1451 }
1452 \f
1453 /* Datasheet model checker test driver. */
1454
1455 static int
1456 get_source_index (const struct datasheet *ds, const struct source *source)
1457 {
1458   size_t i;
1459
1460   for (i = 0; i < ds->n_sources; i++)
1461     if (ds->sources[i] == source)
1462       return i;
1463   NOT_REACHED ();
1464 }
1465
1466 /* Clones the structure and contents of ODS into a new datasheet,
1467    and returns the new datasheet. */
1468 struct datasheet *
1469 clone_datasheet (const struct datasheet *ods)
1470 {
1471   struct datasheet *ds;
1472   size_t i;
1473
1474   ds = xmalloc (sizeof *ds);
1475
1476   ds->sources = xmalloc (ods->n_sources * sizeof *ds->sources);
1477   for (i = 0; i < ods->n_sources; i++)
1478     ds->sources[i] = source_clone (ods->sources[i]);
1479   ds->n_sources = ods->n_sources;
1480
1481   ds->proto = ods->proto != NULL ? caseproto_ref (ods->proto) : NULL;
1482   ds->columns = xmemdup (ods->columns, ods->n_columns * sizeof *ods->columns);
1483   for (i = 0; i < ods->n_columns; i++)
1484     ds->columns[i].source
1485       = ds->sources[get_source_index (ods, ods->columns[i].source)];
1486   ds->n_columns = ods->n_columns;
1487   ds->column_min_alloc = ods->column_min_alloc;
1488
1489   ds->rows = axis_clone (ods->rows);
1490
1491   ds->taint = taint_create ();
1492
1493   return ds;
1494 }
1495
1496 /* Hashes the structure of datasheet DS and returns the hash.
1497    We use MD4 because it is much faster than MD5 or SHA-1 but its
1498    collision resistance is just as good. */
1499 unsigned int
1500 hash_datasheet (const struct datasheet *ds)
1501 {
1502   unsigned int hash[DIV_RND_UP (20, sizeof (unsigned int))];
1503   struct md4_ctx ctx;
1504   size_t i;
1505
1506   md4_init_ctx (&ctx);
1507   for (i = 0; i < ds->n_columns; i++)
1508     {
1509       const struct column *column = &ds->columns[i];
1510       int source_n_bytes = sparse_xarray_get_n_columns (column->source->data);
1511       md4_process_bytes (&source_n_bytes, sizeof source_n_bytes, &ctx);
1512       /*md4_process_bytes (&column->byte_ofs, sizeof column->byte_ofs, &ctx);*/
1513       md4_process_bytes (&column->value_ofs, sizeof column->value_ofs, &ctx);
1514       md4_process_bytes (&column->width, sizeof column->width, &ctx);
1515     }
1516   axis_hash (ds->rows, &ctx);
1517   md4_process_bytes (&ds->column_min_alloc, sizeof ds->column_min_alloc, &ctx);
1518   md4_finish_ctx (&ctx, hash);
1519   return hash[0];
1520 }