de59e73fe842f4fa94009d24b626cc9157464306
[pspp-builds.git] / src / data / datasheet.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2007, 2009, 2010 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <data/datasheet.h>
20
21 #include <stdlib.h>
22 #include <string.h>
23
24 #include <data/casereader-provider.h>
25 #include <data/casereader.h>
26 #include <data/casewriter.h>
27 #include <data/lazy-casereader.h>
28 #include <data/settings.h>
29 #include <libpspp/array.h>
30 #include <libpspp/assertion.h>
31 #include <libpspp/misc.h>
32 #include <libpspp/range-map.h>
33 #include <libpspp/range-set.h>
34 #include <libpspp/sparse-xarray.h>
35 #include <libpspp/taint.h>
36 #include <libpspp/tower.h>
37
38 #include "minmax.h"
39 #include "md4.h"
40 #include "xalloc.h"
41
42 struct column;
43
44 static struct axis *axis_create (void);
45 static struct axis *axis_clone (const struct axis *);
46 static void axis_destroy (struct axis *);
47
48 static void axis_hash (const struct axis *, struct md4_ctx *);
49
50 static bool axis_allocate (struct axis *, unsigned long int request,
51                            unsigned long int *start,
52                            unsigned long int *width);
53 static void axis_make_available (struct axis *,
54                                  unsigned long int start,
55                                  unsigned long int width);
56 static unsigned long int axis_extend (struct axis *, unsigned long int width);
57
58 static unsigned long int axis_map (const struct axis *, unsigned long log_pos);
59
60 static unsigned long axis_get_size (const struct axis *);
61 static void axis_insert (struct axis *,
62                          unsigned long int log_start,
63                          unsigned long int phy_start,
64                          unsigned long int cnt);
65 static void axis_remove (struct axis *,
66                          unsigned long int start, unsigned long int cnt);
67 static void axis_move (struct axis *,
68                        unsigned long int old_start,
69                        unsigned long int new_start,
70                        unsigned long int cnt);
71
72 static struct source *source_create_empty (size_t n_bytes);
73 static struct source *source_create_casereader (struct casereader *);
74 static struct source *source_clone (const struct source *);
75 static void source_destroy (struct source *);
76
77 static casenumber source_get_backing_n_rows (const struct source *);
78
79 static int source_allocate_column (struct source *, int width);
80 static void source_release_column (struct source *, int ofs, int width);
81 static bool source_in_use (const struct source *);
82
83 static bool source_read (const struct column *, casenumber row, union value *);
84 static bool source_write (const struct column *, casenumber row,
85                           const union value *);
86 static bool source_write_column (struct column *, const union value *);
87 static bool source_has_backing (const struct source *);
88
89 static int get_source_index (const struct datasheet *ds, const struct source *source);
90
91 /* A datasheet is internally composed from a set of data files,
92    called "sources".  The sources that make up a datasheet must
93    have the same number of rows (cases), but their numbers of
94    columns (variables) may vary.
95
96    A datasheet's external view is produced by mapping (permuting
97    and selecting) its internal data.  Thus, we can rearrange or
98    delete rows or columns simply by modifying the mapping.  We
99    add rows by adding rows to each source and to the row mapping.
100    We add columns by adding a new source, then adding that source
101    to the column mapping.
102
103    Each source in a datasheet can be a casereader or a
104    sparse_xarray.  Casereaders are read-only, so when sources
105    made from casereaders need to be modified, it is done
106    "virtually" through being overlaid by a sparse_xarray. */
107 struct source
108   {
109     struct range_set *avail;    /* Free bytes are set to 1s. */
110     struct sparse_xarray *data; /* Data at top level, atop the backing. */
111     struct casereader *backing; /* Backing casereader (or null). */
112     casenumber backing_rows;    /* Number of rows in backing (if backed). */
113     size_t n_used;              /* Number of column in use (if backed). */
114   };
115
116 /* A logical column. */
117 struct column
118   {
119     struct source *source;      /* Source of the underlying physical column. */
120     int value_ofs;              /* If 'source' has a backing casereader,
121                                    column's value offset in its cases. */
122     int byte_ofs;               /* Byte offset in source's sparse_xarray. */
123     int width;                  /* 0=numeric, otherwise string width. */
124   };
125
126 /* A datasheet. */
127 struct datasheet
128   {
129     /* Data sources. */
130     struct source **sources;    /* Sources, in no particular order. */
131     size_t n_sources;           /* Number of sources. */
132
133     /* Columns. */
134     struct caseproto *proto;    /* Prototype for rows (initialized lazily). */
135     struct column *columns;     /* Logical to physical column mapping. */
136     size_t n_columns;           /* Number of logical columns. */
137     unsigned column_min_alloc;  /* Min. # of columns to put in a new source. */
138
139     /* Rows. */
140     struct axis *rows;          /* Logical to physical row mapping. */
141
142     /* Tainting. */
143     struct taint *taint;        /* Indicates corrupted data. */
144   };
145
146 /* Is this operation a read or a write? */
147 enum rw_op
148   {
149     OP_READ,
150     OP_WRITE
151   };
152
153 static void allocate_column (struct datasheet *, int width, struct column *);
154 static void release_source (struct datasheet *, struct source *);
155 static bool rw_case (struct datasheet *ds, enum rw_op op,
156                      casenumber lrow, size_t start_column, size_t n_columns,
157                      union value data[]);
158
159 /* Returns the number of bytes needed to store a value with the
160    given WIDTH on disk. */
161 static size_t
162 width_to_n_bytes (int width)
163 {
164   return width == 0 ? sizeof (double) : width;
165 }
166
167 /* Returns the address of the data in VALUE (for reading or
168    writing to/from disk).  VALUE must have the given WIDTH. */
169 static void *
170 value_to_data (const union value *value_, int width)
171 {
172   union value *value = (union value *) value_;
173   assert (sizeof value->f == sizeof (double));
174   if (width == 0)
175     return &value->f;
176   else
177     return value_str_rw (value, width);
178 }
179
180 /* Returns the number of bytes needed to store all the values in
181    PROTO on disk. */
182 static size_t
183 caseproto_to_n_bytes (const struct caseproto *proto)
184 {
185   size_t n_bytes;
186   size_t i;
187
188   n_bytes = 0;
189   for (i = 0; i < caseproto_get_n_widths (proto); i++)
190     {
191       int width = caseproto_get_width (proto, i);
192       if (width >= 0)
193         n_bytes += width_to_n_bytes (width);
194     }
195   return n_bytes;
196 }
197
198 /* Creates and returns a new datasheet.
199
200    If READER is nonnull, then the datasheet initially contains
201    the contents of READER. */
202 struct datasheet *
203 datasheet_create (struct casereader *reader)
204 {
205   struct datasheet *ds = xmalloc (sizeof *ds);
206   ds->sources = NULL;
207   ds->n_sources = 0;
208   ds->proto = NULL;
209   ds->columns = NULL;
210   ds->n_columns = 0;
211   ds->column_min_alloc = 8;
212   ds->rows = axis_create ();
213   ds->taint = taint_create ();
214
215   if (reader != NULL)
216     {
217       casenumber n_rows;
218       size_t byte_ofs;
219       size_t i;
220
221       taint_propagate (casereader_get_taint (reader), ds->taint);
222
223       ds->proto = caseproto_ref (casereader_get_proto (reader));
224
225       ds->sources = xmalloc (sizeof *ds->sources);
226       ds->sources[0] = source_create_casereader (reader);
227       ds->n_sources = 1;
228
229       ds->n_columns = caseproto_get_n_widths (ds->proto);
230       ds->columns = xnmalloc (ds->n_columns, sizeof *ds->columns);
231       byte_ofs = 0;
232       for (i = 0; i < ds->n_columns; i++)
233         {
234           struct column *column = &ds->columns[i];
235           int width = caseproto_get_width (ds->proto, i);
236           column->source = ds->sources[0];
237           column->width = width;
238           if (width >= 0)
239             {
240               column->value_ofs = i;
241               column->byte_ofs = byte_ofs;
242               byte_ofs += width_to_n_bytes (column->width);
243             }
244         }
245
246       n_rows = source_get_backing_n_rows (ds->sources[0]);
247       if (n_rows > 0)
248         axis_insert (ds->rows, 0, axis_extend (ds->rows, n_rows), n_rows);
249     }
250
251   return ds;
252 }
253
254 /* Destroys datasheet DS. */
255 void
256 datasheet_destroy (struct datasheet *ds)
257 {
258   size_t i;
259
260   if (ds == NULL)
261     return;
262
263   for (i = 0; i < ds->n_sources; i++)
264     source_destroy (ds->sources[i]);
265   free (ds->sources);
266   caseproto_unref (ds->proto);
267   free (ds->columns);
268   axis_destroy (ds->rows);
269   taint_destroy (ds->taint);
270   free (ds);
271 }
272
273 /* Returns the prototype for the cases in DS.  The caller must
274    not unref the returned prototype. */
275 const struct caseproto *
276 datasheet_get_proto (const struct datasheet *ds_)
277 {
278   struct datasheet *ds = CONST_CAST (struct datasheet *, ds_);
279   if (ds->proto == NULL)
280     {
281       size_t i;
282
283       ds->proto = caseproto_create ();
284       for (i = 0; i < ds->n_columns; i++)
285         ds->proto = caseproto_add_width (ds->proto, ds->columns[i].width);
286     }
287   return ds->proto;
288 }
289
290 /* Returns the width of the given COLUMN within DS.
291    COLUMN must be less than the number of columns in DS. */
292 int
293 datasheet_get_column_width (const struct datasheet *ds, size_t column)
294 {
295   assert (column < datasheet_get_n_columns (ds));
296   return ds->columns[column].width;
297 }
298
299 /* Moves datasheet DS to a new location in memory, and returns
300    the new location.  Afterward, the datasheet must not be
301    accessed at its former location.
302
303    This function is useful for ensuring that all references to a
304    datasheet have been dropped, especially in conjunction with
305    tools like Valgrind. */
306 struct datasheet *
307 datasheet_rename (struct datasheet *ds)
308 {
309   struct datasheet *new = xmemdup (ds, sizeof *ds);
310   free (ds);
311   return new;
312 }
313
314 /* Returns true if datasheet DS is tainted.
315    A datasheet is tainted by an I/O error or by taint
316    propagation to the datasheet. */
317 bool
318 datasheet_error (const struct datasheet *ds)
319 {
320   return taint_is_tainted (ds->taint);
321 }
322
323 /* Marks datasheet DS tainted. */
324 void
325 datasheet_force_error (struct datasheet *ds)
326 {
327   taint_set_taint (ds->taint);
328 }
329
330 /* Returns datasheet DS's taint object. */
331 const struct taint *
332 datasheet_get_taint (const struct datasheet *ds)
333 {
334   return ds->taint;
335 }
336
337 /* Returns the number of rows in DS. */
338 casenumber
339 datasheet_get_n_rows (const struct datasheet *ds)
340 {
341   return axis_get_size (ds->rows);
342 }
343
344 /* Returns the number of columns in DS. */
345 size_t
346 datasheet_get_n_columns (const struct datasheet *ds)
347 {
348   return ds->n_columns;
349 }
350
351 /* Inserts a column of the given WIDTH into datasheet DS just
352    before column BEFORE.  Initializes the contents of each row in
353    the inserted column to VALUE (which must have width WIDTH).
354
355    Returns true if successful, false on failure.  In case of
356    failure, the datasheet is unchanged. */
357 bool
358 datasheet_insert_column (struct datasheet *ds,
359                          const union value *value, int width, size_t before)
360 {
361   struct column *col;
362
363   assert (before <= ds->n_columns);
364
365   ds->columns = xnrealloc (ds->columns,
366                            ds->n_columns + 1, sizeof *ds->columns);
367   insert_element (ds->columns, ds->n_columns, sizeof *ds->columns, before);
368   col = &ds->columns[before];
369   ds->n_columns++;
370
371   allocate_column (ds, width, col);
372
373   if (width >= 0 && !source_write_column (col, value))
374     {
375       datasheet_delete_columns (ds, before, 1);
376       taint_set_taint (ds->taint);
377       return false;
378     }
379
380   return true;
381 }
382
383 /* Deletes the N columns in DS starting from column START. */
384 void
385 datasheet_delete_columns (struct datasheet *ds, size_t start, size_t n)
386 {
387   assert (start + n <= ds->n_columns);
388
389   if (n > 0)
390     {
391       size_t i;
392
393       for (i = start; i < start + n; i++)
394         {
395           struct column *column = &ds->columns[i];
396           struct source *source = column->source;
397           source_release_column (source, column->byte_ofs, column->width);
398           release_source (ds, source);
399         }
400
401       remove_range (ds->columns, ds->n_columns, sizeof *ds->columns, start, n);
402       ds->n_columns -= n;
403
404       caseproto_unref (ds->proto);
405       ds->proto = NULL;
406     }
407 }
408
409 /* Moves the N columns in DS starting at position OLD_START so
410    that they then start at position NEW_START.  Equivalent to
411    deleting the column rows, then inserting them at what becomes
412    position NEW_START after the deletion. */
413 void
414 datasheet_move_columns (struct datasheet *ds,
415                         size_t old_start, size_t new_start,
416                         size_t n)
417 {
418   assert (old_start + n <= ds->n_columns);
419   assert (new_start + n <= ds->n_columns);
420
421   move_range (ds->columns, ds->n_columns, sizeof *ds->columns,
422               old_start, new_start, n);
423
424   caseproto_unref (ds->proto);
425   ds->proto = NULL;
426 }
427
428 struct resize_datasheet_value_aux
429   {
430     union value src_value;
431     size_t src_ofs;
432     int src_width;
433
434     void (*resize_cb) (const union value *, union value *, void *aux);
435     void *resize_cb_aux;
436
437     union value dst_value;
438     size_t dst_ofs;
439     int dst_width;
440   };
441
442 static bool
443 resize_datasheet_value (const void *src, void *dst, void *aux_)
444 {
445   struct resize_datasheet_value_aux *aux = aux_;
446
447   memcpy (value_to_data (&aux->src_value, aux->src_width),
448           (uint8_t *) src + aux->src_ofs,
449           width_to_n_bytes (aux->src_width));
450
451   aux->resize_cb (&aux->src_value, &aux->dst_value, aux->resize_cb_aux);
452
453   memcpy ((uint8_t *) dst + aux->dst_ofs,
454           value_to_data (&aux->dst_value, aux->dst_width),
455           width_to_n_bytes (aux->dst_width));
456
457   return true;
458 }
459
460 bool
461 datasheet_resize_column (struct datasheet *ds, size_t column, int new_width,
462                          void (*resize_cb) (const union value *,
463                                             union value *, void *aux),
464                          void *resize_cb_aux)
465 {
466   struct column old_col;
467   struct column *col;
468   int old_width;
469
470   assert (column < datasheet_get_n_columns (ds));
471
472   col = &ds->columns[column];
473   old_col = *col;
474   old_width = old_col.width;
475
476   if (new_width == -1)
477     {
478       if (old_width != -1)
479         {
480           datasheet_delete_columns (ds, column, 1);
481           datasheet_insert_column (ds, NULL, -1, column);
482         }
483     }
484   else if (old_width == -1)
485     {
486       union value value;
487       value_init (&value, new_width);
488       value_set_missing (&value, new_width);
489       if (resize_cb != NULL)
490         resize_cb (NULL, &value, resize_cb_aux);
491       datasheet_delete_columns (ds, column, 1);
492       datasheet_insert_column (ds, &value, new_width, column);
493       value_destroy (&value, new_width);
494     }
495   else if (source_has_backing (col->source))
496     {
497       unsigned long int n_rows = axis_get_size (ds->rows);
498       unsigned long int lrow;
499       union value src, dst;
500
501       source_release_column (col->source, col->byte_ofs, col->width);
502       allocate_column (ds, new_width, col);
503
504       value_init (&src, old_width);
505       value_init (&dst, new_width);
506       for (lrow = 0; lrow < n_rows; lrow++)
507         {
508           unsigned long int prow = axis_map (ds->rows, lrow);
509           if (!source_read (&old_col, prow, &src))
510             {
511               /* FIXME: back out col changes. */
512               return false;
513             }
514           resize_cb (&src, &dst, resize_cb_aux);
515           if (!source_write (col, prow, &dst))
516             {
517               /* FIXME: back out col changes. */
518               return false;
519             }
520         }
521
522       release_source (ds, old_col.source);
523     }
524   else
525     {
526       struct resize_datasheet_value_aux aux;
527
528       source_release_column (col->source, col->byte_ofs, col->width);
529       allocate_column (ds, new_width, col);
530
531       value_init (&aux.src_value, old_col.width);
532       aux.src_ofs = old_col.byte_ofs;
533       aux.src_width = old_col.width;
534       aux.resize_cb = resize_cb;
535       aux.resize_cb_aux = resize_cb_aux;
536       value_init (&aux.dst_value, new_width);
537       aux.dst_ofs = col->byte_ofs;
538       aux.dst_width = new_width;
539       sparse_xarray_copy (old_col.source->data, col->source->data,
540                           resize_datasheet_value, &aux);
541       value_destroy (&aux.src_value, old_width);
542       value_destroy (&aux.dst_value, new_width);
543
544       release_source (ds, old_col.source);
545     }
546   return true;
547 }
548
549 /* Retrieves and returns the contents of the given ROW in
550    datasheet DS.  The caller owns the returned case and must
551    unref it when it is no longer needed.  Returns a null pointer
552    on I/O error. */
553 struct ccase *
554 datasheet_get_row (const struct datasheet *ds, casenumber row)
555 {
556   size_t n_columns = datasheet_get_n_columns (ds);
557   struct ccase *c = case_create (datasheet_get_proto (ds));
558   if (rw_case (CONST_CAST (struct datasheet *, ds), OP_READ,
559                row, 0, n_columns, case_data_all_rw (c)))
560     return c;
561   else
562     {
563       case_unref (c);
564       return NULL;
565     }
566 }
567
568 /* Stores the contents of case C, which is destroyed, into the
569    given ROW in DS.  Returns true on success, false on I/O error.
570    On failure, the given ROW might be partially modified or
571    corrupted. */
572 bool
573 datasheet_put_row (struct datasheet *ds, casenumber row, struct ccase *c)
574 {
575   size_t n_columns = datasheet_get_n_columns (ds);
576   bool ok = rw_case (ds, OP_WRITE, row, 0, n_columns,
577                      (union value *) case_data_all (c));
578   case_unref (c);
579   return ok;
580 }
581
582 /* Stores the values of COLUMN in DS in the given ROW in DS into
583    VALUE.  The caller must have already initialized VALUE as a
584    value of the appropriate width (as returned by
585    datasheet_get_column_width (DS, COLUMN)).  Returns true if
586    successful, false on I/O error. */
587 bool
588 datasheet_get_value (const struct datasheet *ds, casenumber row,
589                      size_t column, union value *value)
590 {
591   assert (row >= 0);
592   return rw_case (CONST_CAST (struct datasheet *, ds), OP_READ,
593                   row, column, 1, value);
594 }
595
596 /* Stores VALUE into DS in the given ROW and COLUMN.  VALUE must
597    have the correct width for COLUMN (as returned by
598    datasheet_get_column_width (DS, COLUMN)).  Returns true if
599    successful, false on I/O error.  On failure, ROW might be
600    partially modified or corrupted. */
601 bool
602 datasheet_put_value (struct datasheet *ds UNUSED, casenumber row UNUSED,
603                      size_t column UNUSED, const union value *value UNUSED)
604 {
605   return rw_case (ds, OP_WRITE, row, column, 1, (union value *) value);
606 }
607
608 /* Inserts the CNT cases at C into datasheet DS just before row
609    BEFORE.  Returns true if successful, false on I/O error.  On
610    failure, datasheet DS is not modified.
611
612    Regardless of success, this function unrefs all of the cases
613    in C. */
614 bool
615 datasheet_insert_rows (struct datasheet *ds,
616                        casenumber before, struct ccase *c[],
617                        casenumber cnt)
618 {
619   casenumber added = 0;
620   while (cnt > 0)
621     {
622       unsigned long first_phy;
623       unsigned long phy_cnt;
624       unsigned long i;
625
626       /* Allocate physical rows from the pool of available
627          rows. */
628       if (!axis_allocate (ds->rows, cnt, &first_phy, &phy_cnt))
629         {
630           /* No rows were available.  Extend the row axis to make
631              some new ones available. */
632           phy_cnt = cnt;
633           first_phy = axis_extend (ds->rows, cnt);
634         }
635
636       /* Insert the new rows into the row mapping. */
637       axis_insert (ds->rows, before, first_phy, phy_cnt);
638
639       /* Initialize the new rows. */
640       for (i = 0; i < phy_cnt; i++)
641         if (!datasheet_put_row (ds, before + i, c[i]))
642           {
643             while (++i < cnt)
644               case_unref (c[i]);
645             datasheet_delete_rows (ds, before - added, phy_cnt + added);
646             return false;
647           }
648
649       /* Advance. */
650       c += phy_cnt;
651       cnt -= phy_cnt;
652       before += phy_cnt;
653       added += phy_cnt;
654     }
655   return true;
656 }
657
658 /* Deletes the CNT rows in DS starting from row FIRST. */
659 void
660 datasheet_delete_rows (struct datasheet *ds,
661                        casenumber first, casenumber cnt)
662 {
663   size_t lrow;
664
665   /* Free up rows for reuse.
666      FIXME: optimize. */
667   for (lrow = first; lrow < first + cnt; lrow++)
668     axis_make_available (ds->rows, axis_map (ds->rows, lrow), 1);
669
670   /* Remove rows from logical-to-physical mapping. */
671   axis_remove (ds->rows, first, cnt);
672 }
673
674 /* Moves the CNT rows in DS starting at position OLD_START so
675    that they then start at position NEW_START.  Equivalent to
676    deleting the given rows, then inserting them at what becomes
677    position NEW_START after the deletion. */
678 void
679 datasheet_move_rows (struct datasheet *ds,
680                      size_t old_start, size_t new_start,
681                      size_t cnt)
682 {
683   axis_move (ds->rows, old_start, new_start, cnt);
684 }
685 \f
686 static const struct casereader_random_class datasheet_reader_class;
687
688 /* Creates and returns a casereader whose input cases are the
689    rows in datasheet DS.  From the caller's perspective, DS is
690    effectively destroyed by this operation, such that the caller
691    must not reference it again. */
692 struct casereader *
693 datasheet_make_reader (struct datasheet *ds)
694 {
695   struct casereader *reader;
696   ds = datasheet_rename (ds);
697   reader = casereader_create_random (datasheet_get_proto (ds),
698                                      datasheet_get_n_rows (ds),
699                                      &datasheet_reader_class, ds);
700   taint_propagate (datasheet_get_taint (ds), casereader_get_taint (reader));
701   return reader;
702 }
703
704 /* "read" function for the datasheet random casereader. */
705 static struct ccase *
706 datasheet_reader_read (struct casereader *reader UNUSED, void *ds_,
707                        casenumber case_idx)
708 {
709   struct datasheet *ds = ds_;
710   if (case_idx < datasheet_get_n_rows (ds))
711     {
712       struct ccase *c = datasheet_get_row (ds, case_idx);
713       if (c == NULL)
714         taint_set_taint (ds->taint);
715       return c;
716     }
717   else
718     return NULL;
719 }
720
721 /* "destroy" function for the datasheet random casereader. */
722 static void
723 datasheet_reader_destroy (struct casereader *reader UNUSED, void *ds_)
724 {
725   struct datasheet *ds = ds_;
726   datasheet_destroy (ds);
727 }
728
729 /* "advance" function for the datasheet random casereader. */
730 static void
731 datasheet_reader_advance (struct casereader *reader UNUSED, void *ds_,
732                           casenumber case_cnt)
733 {
734   struct datasheet *ds = ds_;
735   datasheet_delete_rows (ds, 0, case_cnt);
736 }
737
738 /* Random casereader class for a datasheet. */
739 static const struct casereader_random_class datasheet_reader_class =
740   {
741     datasheet_reader_read,
742     datasheet_reader_destroy,
743     datasheet_reader_advance,
744   };
745 \f
746 static void
747 allocate_column (struct datasheet *ds, int width, struct column *column)
748 {
749   caseproto_unref (ds->proto);
750   ds->proto = NULL;
751
752   column->value_ofs = -1;
753   column->width = width;
754   if (width >= 0)
755     {
756       int n_bytes;
757       size_t i;
758
759       n_bytes = width_to_n_bytes (width);
760       for (i = 0; i < ds->n_sources; i++)
761         {
762           column->source = ds->sources[i];
763           column->byte_ofs = source_allocate_column (column->source, n_bytes);
764           if (column->byte_ofs >= 0)
765             return;
766         }
767
768       column->source = source_create_empty (MAX (n_bytes,
769                                                  ds->column_min_alloc));
770       ds->sources = xnrealloc (ds->sources,
771                                ds->n_sources + 1, sizeof *ds->sources);
772       ds->sources[ds->n_sources++] = column->source;
773
774       ds->column_min_alloc = MIN (65536, ds->column_min_alloc * 2);
775
776       column->byte_ofs = source_allocate_column (column->source, n_bytes);
777       assert (column->byte_ofs >= 0);
778     }
779   else
780     {
781       column->source = NULL;
782       column->byte_ofs = -1;
783     }
784 }
785
786 static void
787 release_source (struct datasheet *ds, struct source *source)
788 {
789   if (source_has_backing (source) && !source_in_use (source))
790     {
791       /* Since only the first source to be added ever
792          has a backing, this source must have index
793          0.  */
794       assert (source == ds->sources[0]);
795       ds->sources[0] = ds->sources[--ds->n_sources];
796       source_destroy (source);
797     }
798 }
799
800 /* Reads (if OP is OP_READ) or writes (if op is OP_WRITE) the
801    N_COLUMNS columns starting from column START_COLUMN in row
802    LROW to/from the N_COLUMNS values in DATA. */
803 static bool
804 rw_case (struct datasheet *ds, enum rw_op op,
805          casenumber lrow, size_t start_column, size_t n_columns,
806          union value data[])
807 {
808   casenumber prow;
809   size_t i;
810
811   assert (lrow < datasheet_get_n_rows (ds));
812   assert (n_columns <= datasheet_get_n_columns (ds));
813   assert (start_column + n_columns <= datasheet_get_n_columns (ds));
814
815   prow = axis_map (ds->rows, lrow);
816   for (i = 0; i < n_columns; i++)
817     {
818       struct column *c = &ds->columns[start_column + i];
819       if (c->width >= 0)
820         {
821           bool ok;
822
823           if (op == OP_READ)
824             ok = source_read (c, prow, &data[i]);
825           else
826             ok = source_write (c, prow, &data[i]);
827
828           if (!ok)
829             {
830               taint_set_taint (ds->taint);
831               return false;
832             }
833         }
834     }
835   return true;
836 }
837 \f
838 /* An axis.
839
840    An axis has two functions.  First, it maintains a mapping from
841    logical (client-visible) to physical (storage) ordinates.  The
842    axis_map and axis_get_size functions inspect this mapping, and
843    the axis_insert, axis_remove, and axis_move functions modify
844    it.  Second, it tracks the set of ordinates that are unused
845    and available for reuse.  The axis_allocate,
846    axis_make_available, and axis_extend functions affect the set
847    of available ordinates. */
848 struct axis
849 {
850   struct tower log_to_phy;     /* Map from logical to physical ordinates;
851                                   contains "struct axis_group"s. */
852   struct range_set *available; /* Set of unused, available ordinates. */
853   unsigned long int phy_size;  /* Current physical length of axis. */
854 };
855
856 /* A mapping from logical to physical ordinates. */
857 struct axis_group
858 {
859   struct tower_node logical;  /* Range of logical ordinates. */
860   unsigned long phy_start;    /* First corresponding physical ordinate. */
861 };
862
863 static struct axis_group *axis_group_from_tower_node (struct tower_node *);
864 static struct tower_node *make_axis_group (unsigned long int phy_start);
865 static struct tower_node *split_axis (struct axis *, unsigned long int where);
866 static void merge_axis_nodes (struct axis *, struct tower_node *,
867                               struct tower_node **other_node);
868 static void check_axis_merged (const struct axis *axis UNUSED);
869
870 /* Creates and returns a new, initially empty axis. */
871 static struct axis *
872 axis_create (void)
873 {
874   struct axis *axis = xmalloc (sizeof *axis);
875   tower_init (&axis->log_to_phy);
876   axis->available = range_set_create ();
877   axis->phy_size = 0;
878   return axis;
879 }
880
881 /* Returns a clone of existing axis OLD.
882
883    Currently this is used only by the datasheet model checker
884    driver, but it could be otherwise useful. */
885 static struct axis *
886 axis_clone (const struct axis *old)
887 {
888   const struct tower_node *node;
889   struct axis *new;
890
891   new = xmalloc (sizeof *new);
892   tower_init (&new->log_to_phy);
893   new->available = range_set_clone (old->available, NULL);
894   new->phy_size = old->phy_size;
895
896   for (node = tower_first (&old->log_to_phy); node != NULL;
897        node = tower_next (&old->log_to_phy, node))
898     {
899       unsigned long int size = tower_node_get_size (node);
900       struct axis_group *group = tower_data (node, struct axis_group, logical);
901       tower_insert (&new->log_to_phy, size, make_axis_group (group->phy_start),
902                     NULL);
903     }
904
905   return new;
906 }
907
908 /* Adds the state of AXIS to the MD4 hash context CTX.
909
910    This is only used by the datasheet model checker driver.  It
911    is unlikely to be otherwise useful. */
912 static void
913 axis_hash (const struct axis *axis, struct md4_ctx *ctx)
914 {
915   const struct tower_node *tn;
916   const struct range_set_node *rsn;
917
918   for (tn = tower_first (&axis->log_to_phy); tn != NULL;
919        tn = tower_next (&axis->log_to_phy, tn))
920     {
921       struct axis_group *group = tower_data (tn, struct axis_group, logical);
922       unsigned long int phy_start = group->phy_start;
923       unsigned long int size = tower_node_get_size (tn);
924
925       md4_process_bytes (&phy_start, sizeof phy_start, ctx);
926       md4_process_bytes (&size, sizeof size, ctx);
927     }
928
929   for (rsn = range_set_first (axis->available); rsn != NULL;
930        rsn = range_set_next (axis->available, rsn))
931     {
932       unsigned long int start = range_set_node_get_start (rsn);
933       unsigned long int end = range_set_node_get_end (rsn);
934
935       md4_process_bytes (&start, sizeof start, ctx);
936       md4_process_bytes (&end, sizeof end, ctx);
937     }
938
939   md4_process_bytes (&axis->phy_size, sizeof axis->phy_size, ctx);
940 }
941
942 /* Destroys AXIS. */
943 static void
944 axis_destroy (struct axis *axis)
945 {
946   if (axis == NULL)
947     return;
948
949   while (!tower_is_empty (&axis->log_to_phy))
950     {
951       struct tower_node *node = tower_first (&axis->log_to_phy);
952       struct axis_group *group = tower_data (node, struct axis_group,
953                                              logical);
954       tower_delete (&axis->log_to_phy, node);
955       free (group);
956     }
957
958   range_set_destroy (axis->available);
959   free (axis);
960 }
961
962 /* Allocates up to REQUEST contiguous unused and available
963    ordinates from AXIS.  If successful, stores the number
964    obtained into *WIDTH and the ordinate of the first into
965    *START, marks the ordinates as now unavailable return true.
966    On failure, which occurs only if AXIS has no unused and
967    available ordinates, returns false without modifying AXIS. */
968 static bool
969 axis_allocate (struct axis *axis, unsigned long int request,
970                unsigned long int *start, unsigned long int *width)
971 {
972   return range_set_allocate (axis->available, request, start, width);
973 }
974
975 /* Marks the WIDTH contiguous ordinates in AXIS, starting from
976    START, as unused and available. */
977 static void
978 axis_make_available (struct axis *axis,
979                      unsigned long int start, unsigned long int width)
980 {
981   range_set_insert (axis->available, start, width);
982 }
983
984 /* Extends the total physical length of AXIS by WIDTH and returns
985    the first ordinate in the new physical region. */
986 static unsigned long int
987 axis_extend (struct axis *axis, unsigned long int width)
988 {
989   unsigned long int start = axis->phy_size;
990   axis->phy_size += width;
991   return start;
992 }
993
994 /* Returns the physical ordinate in AXIS corresponding to logical
995    ordinate LOG_POS.  LOG_POS must be less than the logical
996    length of AXIS. */
997 static unsigned long int
998 axis_map (const struct axis *axis, unsigned long log_pos)
999 {
1000   struct tower_node *node;
1001   struct axis_group *group;
1002   unsigned long int group_start;
1003
1004   node = tower_lookup (&axis->log_to_phy, log_pos, &group_start);
1005   group = tower_data (node, struct axis_group, logical);
1006   return group->phy_start + (log_pos - group_start);
1007 }
1008
1009 /* Returns the logical length of AXIS. */
1010 static unsigned long
1011 axis_get_size (const struct axis *axis)
1012 {
1013   return tower_height (&axis->log_to_phy);
1014 }
1015
1016 /* Inserts the CNT contiguous physical ordinates starting at
1017    PHY_START into AXIS's logical-to-physical mapping, starting at
1018    logical position LOG_START. */
1019 static void
1020 axis_insert (struct axis *axis,
1021              unsigned long int log_start, unsigned long int phy_start,
1022              unsigned long int cnt)
1023 {
1024   struct tower_node *before = split_axis (axis, log_start);
1025   struct tower_node *new = make_axis_group (phy_start);
1026   tower_insert (&axis->log_to_phy, cnt, new, before);
1027   merge_axis_nodes (axis, new, NULL);
1028   check_axis_merged (axis);
1029 }
1030
1031 /* Removes CNT ordinates from AXIS's logical-to-physical mapping
1032    starting at logical position START. */
1033 static void
1034 axis_remove (struct axis *axis,
1035              unsigned long int start, unsigned long int cnt)
1036 {
1037   if (cnt > 0)
1038     {
1039       struct tower_node *last = split_axis (axis, start + cnt);
1040       struct tower_node *cur, *next;
1041       for (cur = split_axis (axis, start); cur != last; cur = next)
1042         {
1043           next = tower_delete (&axis->log_to_phy, cur);
1044           free (axis_group_from_tower_node (cur));
1045         }
1046       merge_axis_nodes (axis, last, NULL);
1047       check_axis_merged (axis);
1048     }
1049 }
1050
1051 /* Moves the CNT ordinates in AXIS's logical-to-mapping starting
1052    at logical position OLD_START so that they then start at
1053    position NEW_START. */
1054 static void
1055 axis_move (struct axis *axis,
1056            unsigned long int old_start, unsigned long int new_start,
1057            unsigned long int cnt)
1058 {
1059   if (cnt > 0 && old_start != new_start)
1060     {
1061       struct tower_node *old_first, *old_last, *new_first;
1062       struct tower_node *merge1, *merge2;
1063       struct tower tmp_array;
1064
1065       /* Move ordinates OLD_START...(OLD_START + CNT) into new,
1066          separate TMP_ARRAY. */
1067       old_first = split_axis (axis, old_start);
1068       old_last = split_axis (axis, old_start + cnt);
1069       tower_init (&tmp_array);
1070       tower_splice (&tmp_array, NULL,
1071                     &axis->log_to_phy, old_first, old_last);
1072       merge_axis_nodes (axis, old_last, NULL);
1073       check_axis_merged (axis);
1074
1075       /* Move TMP_ARRAY to position NEW_START. */
1076       new_first = split_axis (axis, new_start);
1077       merge1 = tower_first (&tmp_array);
1078       merge2 = tower_last (&tmp_array);
1079       if (merge2 == merge1)
1080         merge2 = NULL;
1081       tower_splice (&axis->log_to_phy, new_first, &tmp_array, old_first, NULL);
1082       merge_axis_nodes (axis, merge1, &merge2);
1083       merge_axis_nodes (axis, merge2, NULL);
1084       check_axis_merged (axis);
1085     }
1086 }
1087
1088 /* Returns the axis_group in which NODE is embedded. */
1089 static struct axis_group *
1090 axis_group_from_tower_node (struct tower_node *node)
1091 {
1092   return tower_data (node, struct axis_group, logical);
1093 }
1094
1095 /* Creates and returns a new axis_group at physical position
1096    PHY_START. */
1097 static struct tower_node *
1098 make_axis_group (unsigned long phy_start)
1099 {
1100   struct axis_group *group = xmalloc (sizeof *group);
1101   group->phy_start = phy_start;
1102   return &group->logical;
1103 }
1104
1105 /* Returns the tower_node in AXIS's logical-to-physical map whose
1106    bottom edge is at exact level WHERE.  If there is no such
1107    tower_node in AXIS's logical-to-physical map, then split_axis
1108    creates one by breaking an existing tower_node into two
1109    separate ones, unless WHERE is equal to the tower height, in
1110    which case it simply returns a null pointer. */
1111 static struct tower_node *
1112 split_axis (struct axis *axis, unsigned long int where)
1113 {
1114   unsigned long int group_start;
1115   struct tower_node *group_node;
1116   struct axis_group *group;
1117
1118   assert (where <= tower_height (&axis->log_to_phy));
1119   if (where >= tower_height (&axis->log_to_phy))
1120     return NULL;
1121
1122   group_node = tower_lookup (&axis->log_to_phy, where, &group_start);
1123   group = axis_group_from_tower_node (group_node);
1124   if (where > group_start)
1125     {
1126       unsigned long int size_1 = where - group_start;
1127       unsigned long int size_2 = tower_node_get_size (group_node) - size_1;
1128       struct tower_node *next = tower_next (&axis->log_to_phy, group_node);
1129       struct tower_node *new = make_axis_group (group->phy_start + size_1);
1130       tower_resize (&axis->log_to_phy, group_node, size_1);
1131       tower_insert (&axis->log_to_phy, size_2, new, next);
1132       return new;
1133     }
1134   else
1135     return &group->logical;
1136 }
1137
1138 /* Within AXIS, attempts to merge NODE (or the last node in AXIS,
1139    if NODE is null) with its neighbor nodes.  This is possible
1140    when logically adjacent nodes are also adjacent physically (in
1141    the same order).
1142
1143    When a merge occurs, and OTHER_NODE is non-null and points to
1144    the node to be deleted, this function also updates
1145    *OTHER_NODE, if necessary, to ensure that it remains a valid
1146    pointer. */
1147 static void
1148 merge_axis_nodes (struct axis *axis, struct tower_node *node,
1149                   struct tower_node **other_node)
1150 {
1151   struct tower *t = &axis->log_to_phy;
1152   struct axis_group *group;
1153   struct tower_node *next, *prev;
1154
1155   /* Find node to potentially merge with neighbors. */
1156   if (node == NULL)
1157     node = tower_last (t);
1158   if (node == NULL)
1159     return;
1160   group = axis_group_from_tower_node (node);
1161
1162   /* Try to merge NODE with successor. */
1163   next = tower_next (t, node);
1164   if (next != NULL)
1165     {
1166       struct axis_group *next_group = axis_group_from_tower_node (next);
1167       unsigned long this_height = tower_node_get_size (node);
1168
1169       if (group->phy_start + this_height == next_group->phy_start)
1170         {
1171           unsigned long next_height = tower_node_get_size (next);
1172           tower_resize (t, node, this_height + next_height);
1173           if (other_node != NULL && *other_node == next)
1174             *other_node = tower_next (t, *other_node);
1175           tower_delete (t, next);
1176           free (next_group);
1177         }
1178     }
1179
1180   /* Try to merge NODE with predecessor. */
1181   prev = tower_prev (t, node);
1182   if (prev != NULL)
1183     {
1184       struct axis_group *prev_group = axis_group_from_tower_node (prev);
1185       unsigned long prev_height = tower_node_get_size (prev);
1186
1187       if (prev_group->phy_start + prev_height == group->phy_start)
1188         {
1189           unsigned long this_height = tower_node_get_size (node);
1190           group->phy_start = prev_group->phy_start;
1191           tower_resize (t, node, this_height + prev_height);
1192           if (other_node != NULL && *other_node == prev)
1193             *other_node = tower_next (t, *other_node);
1194           tower_delete (t, prev);
1195           free (prev_group);
1196         }
1197     }
1198 }
1199
1200 /* Verify that all potentially merge-able nodes in AXIS are
1201    actually merged. */
1202 static void
1203 check_axis_merged (const struct axis *axis UNUSED)
1204 {
1205 #if ASSERT_LEVEL >= 10
1206   struct tower_node *prev, *node;
1207
1208   for (prev = NULL, node = tower_first (&axis->log_to_phy); node != NULL;
1209        prev = node, node = tower_next (&axis->log_to_phy, node))
1210     if (prev != NULL)
1211       {
1212         struct axis_group *prev_group = axis_group_from_tower_node (prev);
1213         unsigned long prev_height = tower_node_get_size (prev);
1214         struct axis_group *node_group = axis_group_from_tower_node (node);
1215         assert (prev_group->phy_start + prev_height != node_group->phy_start);
1216       }
1217 #endif
1218 }
1219 \f
1220 /* A source. */
1221
1222 /* Creates and returns an empty, unbacked source with N_BYTES
1223    bytes per case, none of which are initially in use. */
1224 static struct source *
1225 source_create_empty (size_t n_bytes)
1226 {
1227   struct source *source = xmalloc (sizeof *source);
1228   size_t row_size = n_bytes + 4 * sizeof (void *);
1229   size_t max_memory_rows = settings_get_workspace () / row_size;
1230   source->avail = range_set_create ();
1231   range_set_insert (source->avail, 0, n_bytes);
1232   source->data = sparse_xarray_create (n_bytes, MAX (max_memory_rows, 4));
1233   source->backing = NULL;
1234   source->backing_rows = 0;
1235   source->n_used = 0;
1236   return source;
1237 }
1238
1239 /* Creates and returns a new source backed by READER and with the
1240    same initial dimensions and content. */
1241 static struct source *
1242 source_create_casereader (struct casereader *reader)
1243 {
1244   const struct caseproto *proto = casereader_get_proto (reader);
1245   size_t n_bytes = caseproto_to_n_bytes (proto);
1246   struct source *source = source_create_empty (n_bytes);
1247   size_t n_columns;
1248   size_t i;
1249
1250   range_set_delete (source->avail, 0, n_bytes);
1251   source->backing = reader;
1252   source->backing_rows = casereader_count_cases (reader);
1253
1254   source->n_used = 0;
1255   n_columns = caseproto_get_n_widths (proto);
1256   for (i = 0; i < n_columns; i++)
1257     if (caseproto_get_width (proto, i) >= 0)
1258       source->n_used++;
1259
1260   return source;
1261 }
1262
1263 /* Returns a clone of source OLD with the same data and backing
1264    (if any).
1265
1266    Currently this is used only by the datasheet model checker
1267    driver, but it could be otherwise useful. */
1268 static struct source *
1269 source_clone (const struct source *old)
1270 {
1271   struct source *new = xmalloc (sizeof *new);
1272   new->avail = range_set_clone (old->avail, NULL);
1273   new->data = sparse_xarray_clone (old->data);
1274   new->backing = old->backing != NULL ? casereader_clone (old->backing) : NULL;
1275   new->backing_rows = old->backing_rows;
1276   new->n_used = old->n_used;
1277   if (new->data == NULL)
1278     {
1279       source_destroy (new);
1280       new = NULL;
1281     }
1282   return new;
1283 }
1284
1285 static int
1286 source_allocate_column (struct source *source, int width)
1287 {
1288   unsigned long int start;
1289   int n_bytes;
1290
1291   assert (width >= 0);
1292   n_bytes = width_to_n_bytes (width);
1293   if (source->backing == NULL
1294       && range_set_allocate_fully (source->avail, n_bytes, &start))
1295     return start;
1296   else
1297     return -1;
1298 }
1299
1300 static void
1301 source_release_column (struct source *source, int ofs, int width)
1302 {
1303   assert (width >= 0);
1304   range_set_insert (source->avail, ofs, width_to_n_bytes (width));
1305   if (source->backing != NULL)
1306     source->n_used--;
1307 }
1308
1309 /* Returns true if SOURCE has any columns in use,
1310    false otherwise. */
1311 static bool
1312 source_in_use (const struct source *source)
1313 {
1314   return source->n_used > 0;
1315 }
1316
1317 /* Destroys SOURCE and its data and backing, if any. */
1318 static void
1319 source_destroy (struct source *source)
1320 {
1321   if (source != NULL)
1322     {
1323       range_set_destroy (source->avail);
1324       sparse_xarray_destroy (source->data);
1325       casereader_destroy (source->backing);
1326       free (source);
1327     }
1328 }
1329
1330 /* Returns the number of rows in SOURCE's backing casereader
1331    (SOURCE must have a backing casereader). */
1332 static casenumber
1333 source_get_backing_n_rows (const struct source *source)
1334 {
1335   assert (source_has_backing (source));
1336   return source->backing_rows;
1337 }
1338
1339 /* Reads the given COLUMN from SOURCE in the given ROW, into
1340    VALUE.  Returns true if successful, false on I/O error.
1341
1342    The caller must have initialized VALUE with the proper
1343    width. */
1344 static bool
1345 source_read (const struct column *column, casenumber row, union value *value)
1346 {
1347   struct source *source = column->source;
1348
1349   assert (column->width >= 0);
1350   if (source->backing == NULL
1351       || sparse_xarray_contains_row (source->data, row))
1352     return sparse_xarray_read (source->data, row, column->byte_ofs,
1353                                width_to_n_bytes (column->width),
1354                                value_to_data (value, column->width));
1355   else
1356     {
1357       struct ccase *c = casereader_peek (source->backing, row);
1358       bool ok = c != NULL;
1359       if (ok)
1360         {
1361           value_copy (value, case_data_idx (c, column->value_ofs),
1362                       column->width);
1363           case_unref (c);
1364         }
1365       return ok;
1366     }
1367 }
1368
1369 static bool
1370 copy_case_into_source (struct source *source, struct ccase *c, casenumber row)
1371 {
1372   const struct caseproto *proto = casereader_get_proto (source->backing);
1373   size_t n_widths = caseproto_get_n_widths (proto);
1374   size_t ofs;
1375   size_t i;
1376
1377   ofs = 0;
1378   for (i = 0; i < n_widths; i++)
1379     {
1380       int width = caseproto_get_width (proto, i);
1381       if (width >= 0)
1382         {
1383           int n_bytes = width_to_n_bytes (width);
1384           if (!sparse_xarray_write (source->data, row, ofs, n_bytes,
1385                                     value_to_data (case_data_idx (c, i),
1386                                                    width)))
1387             return false;
1388           ofs += n_bytes;
1389         }
1390     }
1391   return true;
1392 }
1393
1394 /* Writes VALUE to SOURCE in the given ROW and COLUMN.  Returns
1395    true if successful, false on I/O error.  On error, the row's
1396    data may be completely or partially corrupted, both inside and
1397    outside the region to be written.  */
1398 static bool
1399 source_write (const struct column *column, casenumber row,
1400               const union value *value)
1401 {
1402   struct source *source = column->source;
1403   struct casereader *backing = source->backing;
1404
1405   assert (column->width >= 0);
1406   if (backing != NULL
1407       && !sparse_xarray_contains_row (source->data, row)
1408       && row < source->backing_rows)
1409     {
1410       struct ccase *c;
1411       bool ok;
1412
1413       c = casereader_peek (backing, row);
1414       if (c == NULL)
1415         return false;
1416
1417       ok = copy_case_into_source (source, c, row);
1418       case_unref (c);
1419       if (!ok)
1420         return false;
1421     }
1422
1423   return sparse_xarray_write (source->data, row, column->byte_ofs,
1424                               width_to_n_bytes (column->width),
1425                               value_to_data (value, column->width));
1426 }
1427
1428 /* Within SOURCE, which must not have a backing casereader,
1429    writes the VALUE_CNT values in VALUES_CNT to the VALUE_CNT
1430    columns starting from START_COLUMN, in every row, even in rows
1431    not yet otherwise initialized.  Returns true if successful,
1432    false if an I/O error occurs.
1433
1434    We don't support backing != NULL because (1) it's harder and
1435    (2) this function is only called by
1436    datasheet_insert_column, which doesn't reuse columns from
1437    sources that are backed by casereaders. */
1438 static bool
1439 source_write_column (struct column *column, const union value *value)
1440 {
1441   int width = column->width;
1442
1443   assert (column->source->backing == NULL);
1444   assert (width >= 0);
1445
1446   return sparse_xarray_write_columns (column->source->data, column->byte_ofs,
1447                                       width_to_n_bytes (width),
1448                                       value_to_data (value, width));
1449 }
1450
1451 /* Returns true if SOURCE has a backing casereader, false
1452    otherwise. */
1453 static bool
1454 source_has_backing (const struct source *source)
1455 {
1456   return source->backing != NULL;
1457 }
1458 \f
1459 /* Datasheet model checker test driver. */
1460
1461 static int
1462 get_source_index (const struct datasheet *ds, const struct source *source)
1463 {
1464   size_t i;
1465
1466   for (i = 0; i < ds->n_sources; i++)
1467     if (ds->sources[i] == source)
1468       return i;
1469   NOT_REACHED ();
1470 }
1471
1472 /* Clones the structure and contents of ODS into a new datasheet,
1473    and returns the new datasheet. */
1474 struct datasheet *
1475 clone_datasheet (const struct datasheet *ods)
1476 {
1477   struct datasheet *ds;
1478   size_t i;
1479
1480   ds = xmalloc (sizeof *ds);
1481
1482   ds->sources = xmalloc (ods->n_sources * sizeof *ds->sources);
1483   for (i = 0; i < ods->n_sources; i++)
1484     ds->sources[i] = source_clone (ods->sources[i]);
1485   ds->n_sources = ods->n_sources;
1486
1487   ds->proto = ods->proto != NULL ? caseproto_ref (ods->proto) : NULL;
1488   ds->columns = xmemdup (ods->columns, ods->n_columns * sizeof *ods->columns);
1489   for (i = 0; i < ods->n_columns; i++)
1490     ds->columns[i].source
1491       = ds->sources[get_source_index (ods, ods->columns[i].source)];
1492   ds->n_columns = ods->n_columns;
1493   ds->column_min_alloc = ods->column_min_alloc;
1494
1495   ds->rows = axis_clone (ods->rows);
1496
1497   ds->taint = taint_create ();
1498
1499   return ds;
1500 }
1501
1502 /* Hashes the structure of datasheet DS and returns the hash.
1503    We use MD4 because it is much faster than MD5 or SHA-1 but its
1504    collision resistance is just as good. */
1505 unsigned int
1506 hash_datasheet (const struct datasheet *ds)
1507 {
1508   unsigned int hash[DIV_RND_UP (20, sizeof (unsigned int))];
1509   struct md4_ctx ctx;
1510   size_t i;
1511
1512   md4_init_ctx (&ctx);
1513   for (i = 0; i < ds->n_columns; i++)
1514     {
1515       const struct column *column = &ds->columns[i];
1516       int source_n_bytes = sparse_xarray_get_n_columns (column->source->data);
1517       md4_process_bytes (&source_n_bytes, sizeof source_n_bytes, &ctx);
1518       /*md4_process_bytes (&column->byte_ofs, sizeof column->byte_ofs, &ctx);*/
1519       md4_process_bytes (&column->value_ofs, sizeof column->value_ofs, &ctx);
1520       md4_process_bytes (&column->width, sizeof column->width, &ctx);
1521     }
1522   axis_hash (ds->rows, &ctx);
1523   md4_process_bytes (&ds->column_min_alloc, sizeof ds->column_min_alloc, &ctx);
1524   md4_finish_ctx (&ctx, hash);
1525   return hash[0];
1526 }