64dea786877796dfdbffd2f2daa6eeb698fd4e69
[pspp-builds.git] / src / data / datasheet.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2007, 2009 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <data/datasheet.h>
20
21 #include <stdlib.h>
22 #include <string.h>
23
24 #include <data/casereader-provider.h>
25 #include <data/casereader.h>
26 #include <data/casewriter.h>
27 #include <data/lazy-casereader.h>
28 #include <data/sparse-cases.h>
29 #include <libpspp/array.h>
30 #include <libpspp/assertion.h>
31 #include <libpspp/range-map.h>
32 #include <libpspp/range-set.h>
33 #include <libpspp/taint.h>
34 #include <libpspp/tower.h>
35
36 #include "minmax.h"
37 #include "md4.h"
38 #include "xalloc.h"
39
40 static struct axis *axis_create (void);
41 static struct axis *axis_clone (const struct axis *);
42 static void axis_destroy (struct axis *);
43
44 static void axis_hash (const struct axis *, struct md4_ctx *);
45
46 static bool axis_allocate (struct axis *, unsigned long int request,
47                            unsigned long int *start,
48                            unsigned long int *width);
49 static void axis_make_available (struct axis *,
50                                  unsigned long int start,
51                                  unsigned long int width);
52 static unsigned long int axis_extend (struct axis *, unsigned long int width);
53
54 static unsigned long int axis_map (const struct axis *, unsigned long log_pos);
55
56 static unsigned long axis_get_size (const struct axis *);
57 static void axis_insert (struct axis *,
58                          unsigned long int log_start,
59                          unsigned long int phy_start,
60                          unsigned long int cnt);
61 static void axis_remove (struct axis *,
62                          unsigned long int start, unsigned long int cnt);
63 static void axis_move (struct axis *,
64                        unsigned long int old_start,
65                        unsigned long int new_start,
66                        unsigned long int cnt);
67
68 static struct source *source_create_empty (size_t column_cnt);
69 static struct source *source_create_casereader (struct casereader *);
70 static struct source *source_clone (const struct source *);
71 static void source_destroy (struct source *);
72
73 static casenumber source_get_backing_row_cnt (const struct source *);
74 static size_t source_get_column_cnt (const struct source *);
75
76 static bool source_read (const struct source *,
77                          casenumber row, size_t column,
78                          union value[], size_t value_cnt);
79 static bool source_write (struct source *,
80                           casenumber row, size_t column,
81                           const union value[], size_t value_cnt);
82 static bool source_write_columns (struct source *, size_t start_column,
83                                   const union value[], size_t value_cnt);
84 static bool source_has_backing (const struct source *);
85 static void source_increase_use (struct source *, size_t delta);
86 static void source_decrease_use (struct source *, size_t delta);
87 static bool source_in_use (const struct source *);
88
89 /* A datasheet is internally composed from a set of data files,
90    called "sources".  The sources that make up a datasheet must
91    have the same number of rows (cases), but their numbers of
92    columns (variables) may vary.
93
94    A datasheet's external view is produced by mapping (permuting
95    and selecting) its internal data.  Thus, we can rearrange or
96    delete rows or columns simply by modifying the mapping.  We
97    add rows by adding rows to each source and to the row mapping.
98    We add columns by adding a new source, then adding that source
99    to the column mapping.
100
101    Each source in a datasheet can be a casereader or a
102    sparse_cases.  Casereaders are read-only, so when sources made
103    from casereaders need to be modified, it is done "virtually"
104    through being overlaid by a sparse_cases. */
105
106 /* A datasheet. */
107 struct datasheet
108 {
109   /* Mappings from logical to physical columns/rows. */
110   struct axis *columns;
111   struct axis *rows;
112
113   /* Mapping from physical columns to "source_info"s. */
114   struct range_map sources;
115
116   /* Minimum number of columns to put in a new source when we
117      need new columns and none are free.  We double it whenever
118      we add a new source to keep the number of file descriptors
119      needed by the datasheet to a minimum, reducing the
120      likelihood of running out. */
121   unsigned column_min_alloc;
122
123   /* Indicates corrupted data in the datasheet. */
124   struct taint *taint;
125 };
126
127 /* Maps from a range of physical columns to a source. */
128 struct source_info
129 {
130   struct range_map_node column_range;
131   struct source *source;
132 };
133
134 /* Is this operation a read or a write? */
135 enum rw_op
136   {
137     OP_READ,
138     OP_WRITE
139   };
140
141 static void free_source_info (struct datasheet *, struct source_info *);
142 static struct source_info *source_info_from_range_map (
143                                                        struct range_map_node *);
144 static bool rw_case (struct datasheet *ds, enum rw_op op,
145                      casenumber lrow, size_t start_column, size_t column_cnt,
146                      union value data[]);
147
148 /* Creates and returns a new datasheet.
149
150    If READER is nonnull, then the datasheet initially contains
151    the contents of READER. */
152 struct datasheet *
153 datasheet_create (struct casereader *reader)
154 {
155   /* Create datasheet. */
156   struct datasheet *ds = xmalloc (sizeof *ds);
157   ds->columns = axis_create ();
158   ds->rows = axis_create ();
159   range_map_init (&ds->sources);
160   ds->column_min_alloc = 1;
161   ds->taint = taint_create ();
162
163   /* Add backing. */
164   if (reader != NULL)
165     {
166       size_t column_cnt;
167       casenumber row_cnt;
168       struct source_info *si;
169
170       si = xmalloc (sizeof *si);
171       si->source = source_create_casereader (reader);
172       column_cnt = source_get_column_cnt (si->source);
173       row_cnt = source_get_backing_row_cnt (si->source);
174       source_increase_use (si->source, column_cnt);
175
176       if ( column_cnt > 0 )
177         {
178           unsigned long int column_start;
179           column_start = axis_extend (ds->columns, column_cnt);
180           axis_insert (ds->columns, 0, column_start, column_cnt);
181           range_map_insert (&ds->sources, column_start, column_cnt,
182                             &si->column_range);
183         }
184
185       if ( row_cnt > 0 )
186         {
187           unsigned long int row_start;
188           row_start = axis_extend (ds->rows, row_cnt);
189           axis_insert (ds->rows, 0, row_start, row_cnt);
190         }
191     }
192
193   return ds;
194 }
195
196 /* Destroys datasheet DS. */
197 void
198 datasheet_destroy (struct datasheet *ds)
199 {
200   if (ds == NULL)
201     return;
202
203   axis_destroy (ds->columns);
204   axis_destroy (ds->rows);
205   while (!range_map_is_empty (&ds->sources))
206     {
207       struct range_map_node *r = range_map_first (&ds->sources);
208       struct source_info *si = source_info_from_range_map (r);
209       free_source_info (ds, si);
210     }
211   taint_destroy (ds->taint);
212   free (ds);
213 }
214
215 /* Moves datasheet DS to a new location in memory, and returns
216    the new location.  Afterward, the datasheet must not be
217    accessed at its former location.
218
219    This function is useful for ensuring that all references to a
220    datasheet have been dropped, especially in conjunction with
221    tools like Valgrind. */
222 struct datasheet *
223 datasheet_rename (struct datasheet *ds)
224 {
225   struct datasheet *new = xmemdup (ds, sizeof *ds);
226   free (ds);
227   return new;
228 }
229
230 /* Returns true if datasheet DS is tainted.
231    A datasheet is tainted by an I/O error or by taint
232    propagation to the datasheet. */
233 bool
234 datasheet_error (const struct datasheet *ds)
235 {
236   return taint_is_tainted (ds->taint);
237 }
238
239 /* Marks datasheet DS tainted. */
240 void
241 datasheet_force_error (struct datasheet *ds)
242 {
243   taint_set_taint (ds->taint);
244 }
245
246 /* Returns datasheet DS's taint object. */
247 const struct taint *
248 datasheet_get_taint (const struct datasheet *ds)
249 {
250   return ds->taint;
251 }
252
253 /* Returns the number of rows in DS. */
254 casenumber
255 datasheet_get_row_cnt (const struct datasheet *ds)
256 {
257   return axis_get_size (ds->rows);
258 }
259
260 /* Returns the number of columns in DS. */
261 size_t
262 datasheet_get_column_cnt (const struct datasheet *ds)
263 {
264   return axis_get_size (ds->columns);
265 }
266
267 /* Inserts CNT columns into datasheet DS just before column
268    BEFORE.  Initializes the contents of each row in the inserted
269    columns to the CNT values in INIT_VALUES.
270
271    Returns true if successful, false on failure.  In case of
272    failure, the datasheet is unchanged. */
273 bool
274 datasheet_insert_columns (struct datasheet *ds,
275                           const union value init_values[], size_t cnt,
276                           size_t before)
277 {
278   size_t added = 0;
279   while (cnt > 0)
280     {
281       unsigned long first_phy; /* First allocated physical column. */
282       unsigned long phy_cnt;   /* Number of allocated physical columns. */
283
284       /* Allocate physical columns from the pool of available
285          columns. */
286       if (!axis_allocate (ds->columns, cnt, &first_phy, &phy_cnt))
287         {
288           /* No columns were available.  Create a new source and
289              extend the axis to make some new ones available. */
290           struct source_info *si;
291
292           phy_cnt = MAX (cnt, ds->column_min_alloc);
293           first_phy = axis_extend (ds->columns, phy_cnt);
294           ds->column_min_alloc = MIN (65536, ds->column_min_alloc * 2);
295
296           si = xmalloc (sizeof *si);
297           si->source = source_create_empty (phy_cnt);
298           range_map_insert (&ds->sources, first_phy, phy_cnt,
299                             &si->column_range);
300           if (phy_cnt > cnt)
301             {
302               axis_make_available (ds->columns, first_phy + cnt,
303                                    phy_cnt - cnt);
304               phy_cnt = cnt;
305             }
306         }
307
308       /* Initialize the columns and insert them into the columns
309          axis. */
310       while (phy_cnt > 0)
311         {
312           struct range_map_node *r; /* Range map holding FIRST_PHY column. */
313           struct source_info *s;    /* Source containing FIRST_PHY column. */
314           size_t source_avail;      /* Number of phys columns available. */
315           size_t source_cnt;        /* Number of phys columns to use. */
316
317           /* Figure out how many columns we can and want to take
318              starting at FIRST_PHY, and then insert them into the
319              columns axis. */
320           r = range_map_lookup (&ds->sources, first_phy);
321           s = source_info_from_range_map (r);
322           source_avail = range_map_node_get_end (r) - first_phy;
323           source_cnt = MIN (phy_cnt, source_avail);
324           axis_insert (ds->columns, before, first_phy, source_cnt);
325
326           /* Initialize the data for those columns in the
327              source. */
328           if (!source_write_columns (s->source,
329                                      first_phy - range_map_node_get_start (r),
330                                      init_values, source_cnt))
331             {
332               datasheet_delete_columns (ds, before - added,
333                                         source_cnt + added);
334               taint_set_taint (ds->taint);
335               return false;
336             }
337           source_increase_use (s->source, source_cnt);
338
339           /* Advance. */
340           phy_cnt -= source_cnt;
341           first_phy += source_cnt;
342           init_values += source_cnt;
343           cnt -= source_cnt;
344           before += source_cnt;
345           added += source_cnt;
346         }
347     }
348   return true;
349 }
350
351 /* Deletes the CNT columns in DS starting from column START. */
352 void
353 datasheet_delete_columns (struct datasheet *ds, size_t start, size_t cnt)
354 {
355   size_t lcol;
356
357   assert ( start + cnt <= axis_get_size (ds->columns) );
358
359   /* Free up columns for reuse. */
360   for (lcol = start; lcol < start + cnt; lcol++)
361     {
362       size_t pcol = axis_map (ds->columns, lcol);
363       struct range_map_node *r = range_map_lookup (&ds->sources, pcol);
364       struct source_info *si = source_info_from_range_map (r);
365
366       source_decrease_use (si->source, 1);
367       if (source_has_backing (si->source))
368         {
369           if (!source_in_use (si->source))
370             free_source_info (ds, si);
371         }
372       else
373         axis_make_available (ds->columns, pcol, 1);
374     }
375
376   /* Remove columns from logical-to-physical mapping. */
377   axis_remove (ds->columns, start, cnt);
378 }
379
380 /* Moves the CNT columns in DS starting at position OLD_START so
381    that they then start at position NEW_START.  Equivalent to
382    deleting the column rows, then inserting them at what becomes
383    position NEW_START after the deletion.*/
384 void
385 datasheet_move_columns (struct datasheet *ds,
386                         size_t old_start, size_t new_start,
387                         size_t cnt)
388 {
389   axis_move (ds->columns, old_start, new_start, cnt);
390 }
391
392 /* Retrieves and returns the contents of the given ROW in
393    datasheet DS.  The caller owns the returned case and must
394    unref it when it is no longer needed.  Returns a null pointer
395    on I/O error. */
396 struct ccase *
397 datasheet_get_row (const struct datasheet *ds, casenumber row)
398 {
399   size_t column_cnt = datasheet_get_column_cnt (ds);
400   struct ccase *c = case_create (column_cnt);
401   if (rw_case ((struct datasheet *) ds, OP_READ,
402                row, 0, column_cnt, case_data_all_rw (c)))
403     return c;
404   else
405     {
406       case_unref (c);
407       return NULL;
408     }
409 }
410
411 /* Stores the contents of case C, which is destroyed, into the
412    given ROW in DS.  Returns true on success, false on I/O error.
413    On failure, the given ROW might be partially modified or
414    corrupted. */
415 bool
416 datasheet_put_row (struct datasheet *ds, casenumber row, struct ccase *c)
417 {
418   size_t column_cnt = datasheet_get_column_cnt (ds);
419   bool ok = rw_case (ds, OP_WRITE, row, 0, column_cnt,
420                      (union value *) case_data_all (c));
421   case_unref (c);
422   return ok;
423 }
424
425 /* Stores the values of the WIDTH columns in DS in the given ROW
426    starting at COLUMN in DS into VALUES.  Returns true if
427    successful, false on I/O error. */
428 bool
429 datasheet_get_value (const struct datasheet *ds, casenumber row, size_t column,
430                      union value *value, int width)
431 {
432   assert ( row >= 0 );
433   return rw_case ((struct datasheet *) ds,
434                   OP_READ, row, column, value_cnt_from_width (width), value);
435 }
436
437 /* Stores the WIDTH given VALUES into the given ROW in DS
438    starting at COLUMN.  Returns true if successful, false on I/O
439    error.  On failure, the given ROW might be partially modified
440    or corrupted. */
441 bool
442 datasheet_put_value (struct datasheet *ds, casenumber row, size_t column,
443                      const union value *value, int width)
444 {
445   return rw_case (ds, OP_WRITE, row, column, value_cnt_from_width (width),
446                   (union value *) value);
447 }
448
449 /* Inserts the CNT cases at C into datasheet DS just before row
450    BEFORE.  Returns true if successful, false on I/O error.  On
451    failure, datasheet DS is not modified.
452
453    Regardless of success, this function unrefs all of the cases
454    in C. */
455 bool
456 datasheet_insert_rows (struct datasheet *ds,
457                        casenumber before, struct ccase *c[],
458                        casenumber cnt)
459 {
460   casenumber added = 0;
461   while (cnt > 0)
462     {
463       unsigned long first_phy;
464       unsigned long phy_cnt;
465       unsigned long i;
466
467       /* Allocate physical rows from the pool of available
468          rows. */
469       if (!axis_allocate (ds->rows, cnt, &first_phy, &phy_cnt))
470         {
471           /* No rows were available.  Extend the row axis to make
472              some new ones available. */
473           phy_cnt = cnt;
474           first_phy = axis_extend (ds->rows, cnt);
475         }
476
477       /* Insert the new rows into the row mapping. */
478       axis_insert (ds->rows, before, first_phy, phy_cnt);
479
480       /* Initialize the new rows. */
481       for (i = 0; i < phy_cnt; i++)
482         if (!datasheet_put_row (ds, before + i, c[i]))
483           {
484             while (++i < cnt)
485               case_unref (c[i]);
486             datasheet_delete_rows (ds, before - added, phy_cnt + added);
487             return false;
488           }
489
490       /* Advance. */
491       c += phy_cnt;
492       cnt -= phy_cnt;
493       before += phy_cnt;
494       added += phy_cnt;
495     }
496   return true;
497 }
498
499 /* Deletes the CNT rows in DS starting from row FIRST. */
500 void
501 datasheet_delete_rows (struct datasheet *ds,
502                        casenumber first, casenumber cnt)
503 {
504   size_t lrow;
505
506   /* Free up rows for reuse.
507      FIXME: optimize. */
508   for (lrow = first; lrow < first + cnt; lrow++)
509     axis_make_available (ds->rows, axis_map (ds->rows, lrow), 1);
510
511   /* Remove rows from logical-to-physical mapping. */
512   axis_remove (ds->rows, first, cnt);
513 }
514
515 /* Moves the CNT rows in DS starting at position OLD_START so
516    that they then start at position NEW_START.  Equivalent to
517    deleting the given rows, then inserting them at what becomes
518    position NEW_START after the deletion. */
519 void
520 datasheet_move_rows (struct datasheet *ds,
521                      size_t old_start, size_t new_start,
522                      size_t cnt)
523 {
524   axis_move (ds->rows, old_start, new_start, cnt);
525 }
526 \f
527 static const struct casereader_random_class datasheet_reader_class;
528
529 /* Creates and returns a casereader whose input cases are the
530    rows in datasheet DS.  From the caller's perspective, DS is
531    effectively destroyed by this operation, such that the caller
532    must not reference it again. */
533 struct casereader *
534 datasheet_make_reader (struct datasheet *ds)
535 {
536   struct casereader *reader;
537   ds = datasheet_rename (ds);
538   reader = casereader_create_random (datasheet_get_column_cnt (ds),
539                                      datasheet_get_row_cnt (ds),
540                                      &datasheet_reader_class, ds);
541   taint_propagate (datasheet_get_taint (ds), casereader_get_taint (reader));
542   return reader;
543 }
544
545 /* "read" function for the datasheet random casereader. */
546 static struct ccase *
547 datasheet_reader_read (struct casereader *reader UNUSED, void *ds_,
548                        casenumber case_idx)
549 {
550   struct datasheet *ds = ds_;
551   if (case_idx < datasheet_get_row_cnt (ds))
552     {
553       struct ccase *c = datasheet_get_row (ds, case_idx);
554       if (c == NULL)
555         taint_set_taint (ds->taint);
556       return c;
557     }
558   else
559     return NULL;
560 }
561
562 /* "destroy" function for the datasheet random casereader. */
563 static void
564 datasheet_reader_destroy (struct casereader *reader UNUSED, void *ds_)
565 {
566   struct datasheet *ds = ds_;
567   datasheet_destroy (ds);
568 }
569
570 /* "advance" function for the datasheet random casereader. */
571 static void
572 datasheet_reader_advance (struct casereader *reader UNUSED, void *ds_,
573                           casenumber case_cnt)
574 {
575   struct datasheet *ds = ds_;
576   datasheet_delete_rows (ds, 0, case_cnt);
577 }
578
579 /* Random casereader class for a datasheet. */
580 static const struct casereader_random_class datasheet_reader_class =
581   {
582     datasheet_reader_read,
583     datasheet_reader_destroy,
584     datasheet_reader_advance,
585   };
586 \f
587 /* Removes SI from DS's set of sources and destroys its
588    source. */
589 static void
590 free_source_info (struct datasheet *ds, struct source_info *si)
591 {
592   range_map_delete (&ds->sources, &si->column_range);
593   source_destroy (si->source);
594   free (si);
595 }
596
597 static struct source_info *
598 source_info_from_range_map (struct range_map_node *node)
599 {
600   return range_map_data (node, struct source_info, column_range);
601 }
602
603 /* Reads (if OP is OP_READ) or writes (if op is OP_WRITE) the
604    COLUMN_CNT columns starting from column START_COLUMN in row
605    LROW to/from the COLUMN_CNT values in DATA. */
606 static bool
607 rw_case (struct datasheet *ds, enum rw_op op,
608          casenumber lrow, size_t start_column, size_t column_cnt,
609          union value data[])
610 {
611   casenumber prow;
612   size_t lcol;
613
614   assert (lrow < datasheet_get_row_cnt (ds));
615   assert (column_cnt <= datasheet_get_column_cnt (ds));
616   assert (start_column + column_cnt <= datasheet_get_column_cnt (ds));
617
618   prow = axis_map (ds->rows, lrow);
619   for (lcol = start_column; lcol < start_column + column_cnt; lcol++, data++)
620     {
621       size_t pcol = axis_map (ds->columns, lcol);
622       struct range_map_node *r = range_map_lookup (&ds->sources, pcol);
623       struct source_info *s = source_info_from_range_map (r);
624       size_t pcol_ofs = pcol - range_map_node_get_start (r);
625       if (!(op == OP_READ
626             ? source_read (s->source, prow, pcol_ofs, data, 1)
627             : source_write (s->source, prow, pcol_ofs, data, 1)))
628         {
629           taint_set_taint (ds->taint);
630           return false;
631         }
632     }
633   return true;
634 }
635 \f
636 /* An axis.
637
638    An axis has two functions.  First, it maintains a mapping from
639    logical (client-visible) to physical (storage) ordinates.  The
640    axis_map and axis_get_size functions inspect this mapping, and
641    the axis_insert, axis_remove, and axis_move functions modify
642    it.  Second, it tracks the set of ordinates that are unused
643    and available for reuse.  (Not all unused ordinates are
644    available for reuse: in particular, unused columns that are
645    backed by a casereader are never reused.)  The axis_allocate,
646    axis_make_available, and axis_extend functions affect the set
647    of available ordinates. */
648 struct axis
649 {
650   struct tower log_to_phy;     /* Map from logical to physical ordinates;
651                                   contains "struct axis_group"s. */
652   struct range_set *available; /* Set of unused, available ordinates. */
653   unsigned long int phy_size;  /* Current physical length of axis. */
654 };
655
656 /* A mapping from logical to physical ordinates. */
657 struct axis_group
658 {
659   struct tower_node logical;  /* Range of logical ordinates. */
660   unsigned long phy_start;    /* First corresponding physical ordinate. */
661 };
662
663 static struct axis_group *axis_group_from_tower_node (struct tower_node *);
664 static struct tower_node *make_axis_group (unsigned long int phy_start);
665 static struct tower_node *split_axis (struct axis *, unsigned long int where);
666 static void merge_axis_nodes (struct axis *, struct tower_node *,
667                               struct tower_node **other_node);
668 static void check_axis_merged (const struct axis *axis UNUSED);
669
670 /* Creates and returns a new, initially empty axis. */
671 static struct axis *
672 axis_create (void)
673 {
674   struct axis *axis = xmalloc (sizeof *axis);
675   tower_init (&axis->log_to_phy);
676   axis->available = range_set_create ();
677   axis->phy_size = 0;
678   return axis;
679 }
680
681 /* Returns a clone of existing axis OLD.
682
683    Currently this is used only by the datasheet model checker
684    driver, but it could be otherwise useful. */
685 static struct axis *
686 axis_clone (const struct axis *old)
687 {
688   const struct tower_node *node;
689   struct axis *new;
690
691   new = xmalloc (sizeof *new);
692   tower_init (&new->log_to_phy);
693   new->available = range_set_clone (old->available, NULL);
694   new->phy_size = old->phy_size;
695
696   for (node = tower_first (&old->log_to_phy); node != NULL;
697        node = tower_next (&old->log_to_phy, node))
698     {
699       unsigned long int size = tower_node_get_size (node);
700       struct axis_group *group = tower_data (node, struct axis_group, logical);
701       tower_insert (&new->log_to_phy, size, make_axis_group (group->phy_start),
702                     NULL);
703     }
704
705   return new;
706 }
707
708 /* Adds the state of AXIS to the MD4 hash context CTX.
709
710    This is only used by the datasheet model checker driver.  It
711    is unlikely to be otherwise useful. */
712 static void
713 axis_hash (const struct axis *axis, struct md4_ctx *ctx)
714 {
715   const struct tower_node *tn;
716   const struct range_set_node *rsn;
717
718   for (tn = tower_first (&axis->log_to_phy); tn != NULL;
719        tn = tower_next (&axis->log_to_phy, tn))
720     {
721       struct axis_group *group = tower_data (tn, struct axis_group, logical);
722       unsigned long int phy_start = group->phy_start;
723       unsigned long int size = tower_node_get_size (tn);
724
725       md4_process_bytes (&phy_start, sizeof phy_start, ctx);
726       md4_process_bytes (&size, sizeof size, ctx);
727     }
728
729   for (rsn = range_set_first (axis->available); rsn != NULL;
730        rsn = range_set_next (axis->available, rsn))
731     {
732       unsigned long int start = range_set_node_get_start (rsn);
733       unsigned long int end = range_set_node_get_end (rsn);
734
735       md4_process_bytes (&start, sizeof start, ctx);
736       md4_process_bytes (&end, sizeof end, ctx);
737     }
738
739   md4_process_bytes (&axis->phy_size, sizeof axis->phy_size, ctx);
740 }
741
742 /* Destroys AXIS. */
743 static void
744 axis_destroy (struct axis *axis)
745 {
746   if (axis == NULL)
747     return;
748
749   while (!tower_is_empty (&axis->log_to_phy))
750     {
751       struct tower_node *node = tower_first (&axis->log_to_phy);
752       struct axis_group *group = tower_data (node, struct axis_group,
753                                              logical);
754       tower_delete (&axis->log_to_phy, node);
755       free (group);
756     }
757
758   range_set_destroy (axis->available);
759   free (axis);
760 }
761
762 /* Allocates up to REQUEST contiguous unused and available
763    ordinates from AXIS.  If successful, stores the number
764    obtained into *WIDTH and the ordinate of the first into
765    *START, marks the ordinates as now unavailable return true.
766    On failure, which occurs only if AXIS has no unused and
767    available ordinates, returns false without modifying AXIS. */
768 static bool
769 axis_allocate (struct axis *axis, unsigned long int request,
770                unsigned long int *start, unsigned long int *width)
771 {
772   return range_set_allocate (axis->available, request, start, width);
773 }
774
775 /* Marks the WIDTH contiguous ordinates in AXIS, starting from
776    START, as unused and available. */
777 static void
778 axis_make_available (struct axis *axis,
779                      unsigned long int start, unsigned long int width)
780 {
781   range_set_insert (axis->available, start, width);
782 }
783
784 /* Extends the total physical length of AXIS by WIDTH and returns
785    the first ordinate in the new physical region. */
786 static unsigned long int
787 axis_extend (struct axis *axis, unsigned long int width)
788 {
789   unsigned long int start = axis->phy_size;
790   axis->phy_size += width;
791   return start;
792 }
793
794 /* Returns the physical ordinate in AXIS corresponding to logical
795    ordinate LOG_POS.  LOG_POS must be less than the logical
796    length of AXIS. */
797 static unsigned long int
798 axis_map (const struct axis *axis, unsigned long log_pos)
799 {
800   struct tower_node *node;
801   struct axis_group *group;
802   unsigned long int group_start;
803
804   node = tower_lookup (&axis->log_to_phy, log_pos, &group_start);
805   group = tower_data (node, struct axis_group, logical);
806   return group->phy_start + (log_pos - group_start);
807 }
808
809 /* Returns the logical length of AXIS. */
810 static unsigned long
811 axis_get_size (const struct axis *axis)
812 {
813   return tower_height (&axis->log_to_phy);
814 }
815
816 /* Inserts the CNT contiguous physical ordinates starting at
817    PHY_START into AXIS's logical-to-physical mapping, starting at
818    logical position LOG_START. */
819 static void
820 axis_insert (struct axis *axis,
821              unsigned long int log_start, unsigned long int phy_start,
822              unsigned long int cnt)
823 {
824   struct tower_node *before = split_axis (axis, log_start);
825   struct tower_node *new = make_axis_group (phy_start);
826   tower_insert (&axis->log_to_phy, cnt, new, before);
827   merge_axis_nodes (axis, new, NULL);
828   check_axis_merged (axis);
829 }
830
831 /* Removes CNT ordinates from AXIS's logical-to-physical mapping
832    starting at logical position START. */
833 static void
834 axis_remove (struct axis *axis,
835              unsigned long int start, unsigned long int cnt)
836 {
837   if (cnt > 0)
838     {
839       struct tower_node *last = split_axis (axis, start + cnt);
840       struct tower_node *cur, *next;
841       for (cur = split_axis (axis, start); cur != last; cur = next)
842         {
843           next = tower_delete (&axis->log_to_phy, cur);
844           free (axis_group_from_tower_node (cur));
845         }
846       merge_axis_nodes (axis, last, NULL);
847       check_axis_merged (axis);
848     }
849 }
850
851 /* Moves the CNT ordinates in AXIS's logical-to-mapping starting
852    at logical position OLD_START so that they then start at
853    position NEW_START. */
854 static void
855 axis_move (struct axis *axis,
856            unsigned long int old_start, unsigned long int new_start,
857            unsigned long int cnt)
858 {
859   if (cnt > 0 && old_start != new_start)
860     {
861       struct tower_node *old_first, *old_last, *new_first;
862       struct tower_node *merge1, *merge2;
863       struct tower tmp_array;
864
865       /* Move ordinates OLD_START...(OLD_START + CNT) into new,
866          separate TMP_ARRAY. */
867       old_first = split_axis (axis, old_start);
868       old_last = split_axis (axis, old_start + cnt);
869       tower_init (&tmp_array);
870       tower_splice (&tmp_array, NULL,
871                     &axis->log_to_phy, old_first, old_last);
872       merge_axis_nodes (axis, old_last, NULL);
873       check_axis_merged (axis);
874
875       /* Move TMP_ARRAY to position NEW_START. */
876       new_first = split_axis (axis, new_start);
877       merge1 = tower_first (&tmp_array);
878       merge2 = tower_last (&tmp_array);
879       if (merge2 == merge1)
880         merge2 = NULL;
881       tower_splice (&axis->log_to_phy, new_first, &tmp_array, old_first, NULL);
882       merge_axis_nodes (axis, merge1, &merge2);
883       merge_axis_nodes (axis, merge2, NULL);
884       check_axis_merged (axis);
885     }
886 }
887
888 /* Returns the axis_group in which NODE is embedded. */
889 static struct axis_group *
890 axis_group_from_tower_node (struct tower_node *node)
891 {
892   return tower_data (node, struct axis_group, logical);
893 }
894
895 /* Creates and returns a new axis_group at physical position
896    PHY_START. */
897 static struct tower_node *
898 make_axis_group (unsigned long phy_start)
899 {
900   struct axis_group *group = xmalloc (sizeof *group);
901   group->phy_start = phy_start;
902   return &group->logical;
903 }
904
905 /* Returns the tower_node in AXIS's logical-to-physical map whose
906    bottom edge is at exact level WHERE.  If there is no such
907    tower_node in AXIS's logical-to-physical map, then split_axis
908    creates one by breaking an existing tower_node into two
909    separate ones, unless WHERE is equal to the tower height, in
910    which case it simply returns a null pointer. */
911 static struct tower_node *
912 split_axis (struct axis *axis, unsigned long int where)
913 {
914   unsigned long int group_start;
915   struct tower_node *group_node;
916   struct axis_group *group;
917
918   assert (where <= tower_height (&axis->log_to_phy));
919   if (where >= tower_height (&axis->log_to_phy))
920     return NULL;
921
922   group_node = tower_lookup (&axis->log_to_phy, where, &group_start);
923   group = axis_group_from_tower_node (group_node);
924   if (where > group_start)
925     {
926       unsigned long int size_1 = where - group_start;
927       unsigned long int size_2 = tower_node_get_size (group_node) - size_1;
928       struct tower_node *next = tower_next (&axis->log_to_phy, group_node);
929       struct tower_node *new = make_axis_group (group->phy_start + size_1);
930       tower_resize (&axis->log_to_phy, group_node, size_1);
931       tower_insert (&axis->log_to_phy, size_2, new, next);
932       return new;
933     }
934   else
935     return &group->logical;
936 }
937
938 /* Within AXIS, attempts to merge NODE (or the last node in AXIS,
939    if NODE is null) with its neighbor nodes.  This is possible
940    when logically adjacent nodes are also adjacent physically (in
941    the same order).
942
943    When a merge occurs, and OTHER_NODE is non-null and points to
944    the node to be deleted, this function also updates
945    *OTHER_NODE, if necessary, to ensure that it remains a valid
946    pointer. */
947 static void
948 merge_axis_nodes (struct axis *axis, struct tower_node *node,
949                   struct tower_node **other_node)
950 {
951   struct tower *t = &axis->log_to_phy;
952   struct axis_group *group;
953   struct tower_node *next, *prev;
954
955   /* Find node to potentially merge with neighbors. */
956   if (node == NULL)
957     node = tower_last (t);
958   if (node == NULL)
959     return;
960   group = axis_group_from_tower_node (node);
961
962   /* Try to merge NODE with successor. */
963   next = tower_next (t, node);
964   if (next != NULL)
965     {
966       struct axis_group *next_group = axis_group_from_tower_node (next);
967       unsigned long this_height = tower_node_get_size (node);
968
969       if (group->phy_start + this_height == next_group->phy_start)
970         {
971           unsigned long next_height = tower_node_get_size (next);
972           tower_resize (t, node, this_height + next_height);
973           if (other_node != NULL && *other_node == next)
974             *other_node = tower_next (t, *other_node);
975           tower_delete (t, next);
976           free (next_group);
977         }
978     }
979
980   /* Try to merge NODE with predecessor. */
981   prev = tower_prev (t, node);
982   if (prev != NULL)
983     {
984       struct axis_group *prev_group = axis_group_from_tower_node (prev);
985       unsigned long prev_height = tower_node_get_size (prev);
986
987       if (prev_group->phy_start + prev_height == group->phy_start)
988         {
989           unsigned long this_height = tower_node_get_size (node);
990           group->phy_start = prev_group->phy_start;
991           tower_resize (t, node, this_height + prev_height);
992           if (other_node != NULL && *other_node == prev)
993             *other_node = tower_next (t, *other_node);
994           tower_delete (t, prev);
995           free (prev_group);
996         }
997     }
998 }
999
1000 /* Verify that all potentially merge-able nodes in AXIS are
1001    actually merged. */
1002 static void
1003 check_axis_merged (const struct axis *axis UNUSED)
1004 {
1005 #if ASSERT_LEVEL >= 10
1006   struct tower_node *prev, *node;
1007
1008   for (prev = NULL, node = tower_first (&axis->log_to_phy); node != NULL;
1009        prev = node, node = tower_next (&axis->log_to_phy, node))
1010     if (prev != NULL)
1011       {
1012         struct axis_group *prev_group = axis_group_from_tower_node (prev);
1013         unsigned long prev_height = tower_node_get_size (prev);
1014         struct axis_group *node_group = axis_group_from_tower_node (node);
1015         assert (prev_group->phy_start + prev_height != node_group->phy_start);
1016       }
1017 #endif
1018 }
1019 \f
1020 /* A source. */
1021 struct source
1022 {
1023   size_t columns_used;        /* Number of columns in use by client. */
1024   struct sparse_cases *data;  /* Data at top level, atop the backing. */
1025   struct casereader *backing; /* Backing casereader (or null). */
1026   casenumber backing_rows;    /* Number of rows in backing (if nonnull). */
1027 };
1028
1029 /* Creates and returns an empty, unbacked source with COLUMN_CNT
1030    columns and an initial "columns_used" of 0. */
1031 static struct source *
1032 source_create_empty (size_t column_cnt)
1033 {
1034   struct source *source = xmalloc (sizeof *source);
1035   source->columns_used = 0;
1036   source->data = sparse_cases_create (column_cnt);
1037   source->backing = NULL;
1038   source->backing_rows = 0;
1039   return source;
1040 }
1041
1042 /* Creates and returns a new source backed by READER and with the
1043    same initial dimensions and content. */
1044 static struct source *
1045 source_create_casereader (struct casereader *reader)
1046 {
1047   struct source *source
1048     = source_create_empty (casereader_get_value_cnt (reader));
1049   source->backing = reader;
1050   source->backing_rows = casereader_count_cases (reader);
1051   return source;
1052 }
1053
1054 /* Returns a clone of source OLD with the same data and backing
1055    (if any).
1056
1057    Currently this is used only by the datasheet model checker
1058    driver, but it could be otherwise useful. */
1059 static struct source *
1060 source_clone (const struct source *old)
1061 {
1062   struct source *new = xmalloc (sizeof *new);
1063   new->columns_used = old->columns_used;
1064   new->data = sparse_cases_clone (old->data);
1065   new->backing = old->backing != NULL ? casereader_clone (old->backing) : NULL;
1066   new->backing_rows = old->backing_rows;
1067   if (new->data == NULL)
1068     {
1069       source_destroy (new);
1070       new = NULL;
1071     }
1072   return new;
1073 }
1074
1075 /* Increases the columns_used count of SOURCE by DELTA.
1076    The new value must not exceed SOURCE's number of columns. */
1077 static void
1078 source_increase_use (struct source *source, size_t delta)
1079 {
1080   source->columns_used += delta;
1081   assert (source->columns_used <= sparse_cases_get_value_cnt (source->data));
1082 }
1083
1084 /* Decreases the columns_used count of SOURCE by DELTA.
1085    This must not attempt to decrease the columns_used count below
1086    zero. */
1087 static void
1088 source_decrease_use (struct source *source, size_t delta)
1089 {
1090   assert (delta <= source->columns_used);
1091   source->columns_used -= delta;
1092 }
1093
1094 /* Returns true if SOURCE has any columns in use,
1095    false otherwise. */
1096 static bool
1097 source_in_use (const struct source *source)
1098 {
1099   return source->columns_used > 0;
1100 }
1101
1102 /* Destroys SOURCE and its data and backing, if any. */
1103 static void
1104 source_destroy (struct source *source)
1105 {
1106   if (source != NULL)
1107     {
1108       sparse_cases_destroy (source->data);
1109       casereader_destroy (source->backing);
1110       free (source);
1111     }
1112 }
1113
1114 /* Returns the number of rows in SOURCE's backing casereader
1115    (SOURCE must have a backing casereader). */
1116 static casenumber
1117 source_get_backing_row_cnt (const struct source *source)
1118 {
1119   assert (source_has_backing (source));
1120   return source->backing_rows;
1121 }
1122
1123 /* Returns the number of columns in SOURCE. */
1124 static size_t
1125 source_get_column_cnt (const struct source *source)
1126 {
1127   return sparse_cases_get_value_cnt (source->data);
1128 }
1129
1130 /* Reads VALUE_CNT columns from SOURCE in the given ROW, starting
1131    from COLUMN, into VALUES.  Returns true if successful, false
1132    on I/O error. */
1133 static bool
1134 source_read (const struct source *source,
1135              casenumber row, size_t column,
1136              union value values[], size_t value_cnt)
1137 {
1138   if (source->backing == NULL || sparse_cases_contains_row (source->data, row))
1139     return sparse_cases_read (source->data, row, column, values, value_cnt);
1140   else
1141     {
1142       struct ccase *c = casereader_peek (source->backing, row);
1143       bool ok = c != NULL;
1144       if (ok)
1145         {
1146           case_copy_out (c, column, values, value_cnt);
1147           case_unref (c);
1148         }
1149       return ok;
1150     }
1151 }
1152
1153 /* Writes the VALUE_CNT values in VALUES to SOURCE in the given
1154    ROW, starting at ROW.  Returns true if successful, false on
1155    I/O error.  On error, the row's data may be completely or
1156    partially corrupted, both inside and outside the region to be
1157    written.  */
1158 static bool
1159 source_write (struct source *source,
1160               casenumber row, size_t column,
1161               const union value values[], size_t value_cnt)
1162 {
1163   size_t column_cnt = sparse_cases_get_value_cnt (source->data);
1164   bool ok;
1165
1166   if (source->backing == NULL
1167       || (column == 0 && value_cnt == column_cnt)
1168       || sparse_cases_contains_row (source->data, row))
1169     ok = sparse_cases_write (source->data, row, column, values, value_cnt);
1170   else
1171     {
1172       struct ccase *c;
1173
1174       if (row < source->backing_rows)
1175         c = case_unshare (casereader_peek (source->backing, row));
1176       else
1177         {
1178           /* It's not one of the backed rows.  Ideally, this
1179              should never happen: we'd always be writing the full
1180              contents of new, unbacked rows in a single call to
1181              this function, so that the first case above would
1182              trigger.  But that's a little difficult at higher
1183              levels, so that we in fact usually write the full
1184              contents of new, unbacked rows in multiple calls to
1185              this function.  Make this work. */
1186           c = case_create (column_cnt);
1187         }
1188       ok = c != NULL;
1189
1190       if (ok)
1191         {
1192           case_copy_in (c, column, values, value_cnt);
1193           ok = sparse_cases_write (source->data, row, 0,
1194                                    case_data_all (c), column_cnt);
1195           case_unref (c);
1196         }
1197     }
1198   return ok;
1199 }
1200
1201 /* Within SOURCE, which must not have a backing casereader,
1202    writes the VALUE_CNT values in VALUES_CNT to the VALUE_CNT
1203    columns starting from START_COLUMN, in every row, even in rows
1204    not yet otherwise initialized.  Returns true if successful,
1205    false if an I/O error occurs.
1206
1207    We don't support backing != NULL because (1) it's harder and
1208    (2) source_write_columns is only called by
1209    datasheet_insert_columns, which doesn't reuse columns from
1210    sources that are backed by casereaders. */
1211 static bool
1212 source_write_columns (struct source *source, size_t start_column,
1213                       const union value values[], size_t value_cnt)
1214 {
1215   assert (source->backing == NULL);
1216
1217   return sparse_cases_write_columns (source->data, start_column,
1218                                      values, value_cnt);
1219 }
1220
1221 /* Returns true if SOURCE has a backing casereader, false
1222    otherwise. */
1223 static bool
1224 source_has_backing (const struct source *source)
1225 {
1226   return source->backing != NULL;
1227 }
1228 \f
1229 /* Datasheet model checker test driver. */
1230
1231 /* Maximum size of datasheet supported for model checking
1232    purposes. */
1233 #define MAX_ROWS 5
1234 #define MAX_COLS 5
1235
1236
1237 /* Clones the structure and contents of ODS into a new datasheet,
1238    and returns the new datasheet. */
1239 struct datasheet *
1240 clone_datasheet (const struct datasheet *ods)
1241 {
1242   struct datasheet *ds;
1243   struct range_map_node *r;
1244
1245   ds = xmalloc (sizeof *ds);
1246   ds->columns = axis_clone (ods->columns);
1247   ds->rows = axis_clone (ods->rows);
1248   range_map_init (&ds->sources);
1249   for (r = range_map_first (&ods->sources); r != NULL;
1250        r = range_map_next (&ods->sources, r))
1251     {
1252       const struct source_info *osi = source_info_from_range_map (r);
1253       struct source_info *si = xmalloc (sizeof *si);
1254       si->source = source_clone (osi->source);
1255       range_map_insert (&ds->sources, range_map_node_get_start (r),
1256                         range_map_node_get_width (r), &si->column_range);
1257     }
1258   ds->column_min_alloc = ods->column_min_alloc;
1259   ds->taint = taint_create ();
1260
1261   return ds;
1262 }
1263
1264
1265 /* Hashes the structure of datasheet DS and returns the hash.
1266    We use MD4 because it is much faster than MD5 or SHA-1 but its
1267    collision resistance is just as good. */
1268 unsigned int
1269 hash_datasheet (const struct datasheet *ds)
1270 {
1271   unsigned int hash[DIV_RND_UP (20, sizeof (unsigned int))];
1272   struct md4_ctx ctx;
1273   struct range_map_node *r;
1274
1275   md4_init_ctx (&ctx);
1276   axis_hash (ds->columns, &ctx);
1277   axis_hash (ds->rows, &ctx);
1278   for (r = range_map_first (&ds->sources); r != NULL;
1279        r = range_map_next (&ds->sources, r))
1280     {
1281       unsigned long int start = range_map_node_get_start (r);
1282       unsigned long int end = range_map_node_get_end (r);
1283       md4_process_bytes (&start, sizeof start, &ctx);
1284       md4_process_bytes (&end, sizeof end, &ctx);
1285     }
1286   md4_process_bytes (&ds->column_min_alloc, sizeof ds->column_min_alloc,
1287                      &ctx);
1288   md4_finish_ctx (&ctx, hash);
1289   return hash[0];
1290 }
1291