63db4f36d38356293cd93d60b8e40db7408f3519
[pspp-builds.git] / src / data / datasheet.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 2007 Free Software Foundation, Inc.
3
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License as
6    published by the Free Software Foundation; either version 2 of the
7    License, or (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful, but
10    WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12    General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
17    02110-1301, USA. */
18
19 #include <config.h>
20
21 #include <data/datasheet.h>
22
23 #include <stdlib.h>
24 #include <string.h>
25
26 #include <data/casereader-provider.h>
27 #include <data/casereader.h>
28 #include <data/casewriter.h>
29 #include <data/sparse-cases.h>
30 #include <libpspp/array.h>
31 #include <libpspp/assertion.h>
32 #include <libpspp/model-checker.h>
33 #include <libpspp/range-map.h>
34 #include <libpspp/range-set.h>
35 #include <libpspp/taint.h>
36 #include <libpspp/tower.h>
37
38 #include "minmax.h"
39 #include "md4.h"
40 #include "xalloc.h"
41
42 static struct axis *axis_create (void);
43 static struct axis *axis_clone (const struct axis *);
44 static void axis_destroy (struct axis *);
45
46 static void axis_hash (const struct axis *, struct md4_ctx *);
47
48 static bool axis_allocate (struct axis *, unsigned long int request,
49                            unsigned long int *start,
50                            unsigned long int *width);
51 static void axis_make_available (struct axis *,
52                                 unsigned long int start,
53                                 unsigned long int width);
54 static unsigned long int axis_extend (struct axis *, unsigned long int width);
55
56 static unsigned long int axis_map (const struct axis *, unsigned long log_pos);
57
58 static unsigned long axis_get_size (const struct axis *);
59 static void axis_insert (struct axis *,
60                          unsigned long int log_start,
61                          unsigned long int phy_start,
62                          unsigned long int cnt);
63 static void axis_remove (struct axis *,
64                          unsigned long int start, unsigned long int cnt);
65 static void axis_move (struct axis *,
66                        unsigned long int old_start,
67                        unsigned long int new_start,
68                        unsigned long int cnt);
69
70 static struct source *source_create_empty (size_t column_cnt);
71 static struct source *source_create_casereader (struct casereader *);
72 static struct source *source_clone (const struct source *);
73 static void source_destroy (struct source *);
74
75 static casenumber source_get_backing_row_cnt (const struct source *);
76 static size_t source_get_column_cnt (const struct source *);
77
78 static bool source_read (const struct source *,
79                          casenumber row, size_t column,
80                          union value[], size_t value_cnt);
81 static bool source_write (struct source *,
82                           casenumber row, size_t column,
83                           const union value[], size_t value_cnt);
84 static bool source_write_columns (struct source *, size_t start_column,
85                                   const union value[], size_t value_cnt);
86 static bool source_has_backing (const struct source *);
87 static void source_increase_use (struct source *, size_t delta);
88 static void source_decrease_use (struct source *, size_t delta);
89 static bool source_in_use (const struct source *);
90
91 /* A datasheet is internally composed from a set of data files,
92    called "sources".  The sources that make up a datasheet must
93    have the same number of rows (cases), but their numbers of
94    columns (variables) may vary.
95
96    A datasheet's external view is produced by mapping (permuting
97    and selecting) its internal data.  Thus, we can rearrange or
98    delete rows or columns simply by modifying the mapping.  We
99    add rows by adding rows to each source and to the row mapping.
100    We add columns by adding a new source, then adding that source
101    to the column mapping.
102
103    Each source in a datasheet can be a casereader or a
104    sparse_cases.  Casereaders are read-only, so when sources made
105    from casereaders need to be modified, it is done "virtually"
106    through being overlaid by a sparse_cases. */
107
108 /* A datasheet. */
109 struct datasheet
110   {
111     /* Mappings from logical to physical columns/rows. */
112     struct axis *columns;
113     struct axis *rows;
114
115     /* Mapping from physical columns to "source_info"s. */
116     struct range_map sources;
117
118     /* Minimum number of columns to put in a new source when we
119        need new columns and none are free.  We double it whenever
120        we add a new source to keep the number of file descriptors
121        needed by the datasheet to a minimum, reducing the
122        likelihood of running out. */
123     unsigned column_min_alloc;
124
125     /* Indicates corrupted data in the datasheet. */
126     struct taint *taint;
127   };
128
129 /* Maps from a range of physical columns to a source. */
130 struct source_info
131   {
132     struct range_map_node column_range;
133     struct source *source;
134   };
135
136 /* Is this operation a read or a write? */
137 enum rw_op
138   {
139     OP_READ,
140     OP_WRITE
141   };
142
143 static void free_source_info (struct datasheet *, struct source_info *);
144 static struct source_info *source_info_from_range_map (
145   struct range_map_node *);
146 static bool rw_case (struct datasheet *ds, enum rw_op op,
147                      casenumber lrow, size_t start_column, size_t column_cnt,
148                      union value data[]);
149
150 /* Creates and returns a new datasheet.
151
152    If READER is nonnull, then the datasheet initially contains
153    the contents of READER.  READER become owned by the datasheet
154    and the caller must not directly reference it again. */
155 struct datasheet *
156 datasheet_create (struct casereader *reader)
157 {
158   /* Create datasheet. */
159   struct datasheet *ds = xmalloc (sizeof *ds);
160   ds->columns = axis_create ();
161   ds->rows = axis_create ();
162   range_map_init (&ds->sources);
163   ds->column_min_alloc = 1;
164   ds->taint = taint_create ();
165
166   /* Add backing. */
167   if (reader != NULL)
168     {
169       size_t column_cnt;
170       casenumber row_cnt;
171       struct source_info *si;
172
173       si = xmalloc (sizeof *si);
174       si->source = source_create_casereader (reader);
175       column_cnt = source_get_column_cnt (si->source);
176       row_cnt = source_get_backing_row_cnt (si->source);
177       source_increase_use (si->source, column_cnt);
178
179       if ( column_cnt > 0 )
180         {
181           unsigned long int column_start;
182       column_start = axis_extend (ds->columns, column_cnt);
183       axis_insert (ds->columns, 0, column_start, column_cnt);
184       range_map_insert (&ds->sources, column_start, column_cnt,
185                         &si->column_range);
186         }
187
188       if ( row_cnt > 0 )
189         {
190           unsigned long int row_start;
191       row_start = axis_extend (ds->rows, row_cnt);
192       axis_insert (ds->rows, 0, row_start, row_cnt);
193         }
194     }
195
196   return ds;
197 }
198
199 /* Destroys datasheet DS. */
200 void
201 datasheet_destroy (struct datasheet *ds)
202 {
203   if (ds == NULL)
204     return;
205
206   axis_destroy (ds->columns);
207   axis_destroy (ds->rows);
208   while (!range_map_is_empty (&ds->sources))
209     {
210       struct range_map_node *r = range_map_first (&ds->sources);
211       struct source_info *si = source_info_from_range_map (r);
212       free_source_info (ds, si);
213     }
214   taint_destroy (ds->taint);
215   free (ds);
216 }
217
218 /* Moves datasheet DS to a new location in memory, and returns
219    the new location.  Afterward, the datasheet must not be
220    accessed at its former location.
221
222    This function is useful for ensuring that all references to a
223    datasheet have been dropped, especially in conjunction with
224    tools like Valgrind. */
225 struct datasheet *
226 datasheet_rename (struct datasheet *ds)
227 {
228   struct datasheet *new = xmemdup (ds, sizeof *ds);
229   free (ds);
230   return new;
231 }
232
233 /* Returns true if datasheet DS is tainted.
234    A datasheet is tainted by an I/O error or by taint
235    propagation to the datasheet. */
236 bool
237 datasheet_error (const struct datasheet *ds)
238 {
239   return taint_is_tainted (ds->taint);
240 }
241
242 /* Marks datasheet DS tainted. */
243 void
244 datasheet_force_error (struct datasheet *ds)
245 {
246   taint_set_taint (ds->taint);
247 }
248
249 /* Returns datasheet DS's taint object. */
250 const struct taint *
251 datasheet_get_taint (const struct datasheet *ds)
252 {
253   return ds->taint;
254 }
255
256 /* Returns the number of rows in DS. */
257 casenumber
258 datasheet_get_row_cnt (const struct datasheet *ds)
259 {
260   return axis_get_size (ds->rows);
261 }
262
263 /* Returns the number of columns in DS. */
264 size_t
265 datasheet_get_column_cnt (const struct datasheet *ds)
266 {
267   return axis_get_size (ds->columns);
268 }
269
270 /* Inserts CNT columns into datasheet DS just before column
271    BEFORE.  Initializes the contents of each row in the inserted
272    columns to the CNT values in INIT_VALUES.
273
274    Returns true if successful, false on failure.  In case of
275    failure, the datasheet is unchanged. */
276 bool
277 datasheet_insert_columns (struct datasheet *ds,
278                           const union value init_values[], size_t cnt,
279                           size_t before)
280 {
281   size_t added = 0;
282   while (cnt > 0)
283     {
284       unsigned long first_phy; /* First allocated physical column. */
285       unsigned long phy_cnt;   /* Number of allocated physical columns. */
286
287       /* Allocate physical columns from the pool of available
288          columns. */
289       if (!axis_allocate (ds->columns, cnt, &first_phy, &phy_cnt))
290         {
291           /* No columns were available.  Create a new source and
292              extend the axis to make some new ones available. */
293           struct source_info *si;
294
295           phy_cnt = MAX (cnt, ds->column_min_alloc);
296           first_phy = axis_extend (ds->columns, phy_cnt);
297           ds->column_min_alloc = MIN (65536, ds->column_min_alloc * 2);
298
299           si = xmalloc (sizeof *si);
300           si->source = source_create_empty (phy_cnt);
301           range_map_insert (&ds->sources, first_phy, phy_cnt,
302                             &si->column_range);
303           if (phy_cnt > cnt)
304             {
305               axis_make_available (ds->columns, first_phy + cnt,
306                                    phy_cnt - cnt);
307               phy_cnt = cnt;
308             }
309         }
310
311       /* Initialize the columns and insert them into the columns
312          axis. */
313       while (phy_cnt > 0)
314         {
315           struct range_map_node *r; /* Range map holding FIRST_PHY column. */
316           struct source_info *s;    /* Source containing FIRST_PHY column. */
317           size_t source_avail;      /* Number of phys columns available. */
318           size_t source_cnt;        /* Number of phys columns to use. */
319
320           /* Figure out how many columns we can and want to take
321              starting at FIRST_PHY, and then insert them into the
322              columns axis. */
323           r = range_map_lookup (&ds->sources, first_phy);
324           s = source_info_from_range_map (r);
325           source_avail = range_map_node_get_end (r) - first_phy;
326           source_cnt = MIN (phy_cnt, source_avail);
327           axis_insert (ds->columns, before, first_phy, source_cnt);
328
329           /* Initialize the data for those columns in the
330              source. */
331           if (!source_write_columns (s->source,
332                                      first_phy - range_map_node_get_start (r),
333                                      init_values, source_cnt))
334             {
335               datasheet_delete_columns (ds, before - added,
336                                         source_cnt + added);
337               taint_set_taint (ds->taint);
338               return false;
339             }
340           source_increase_use (s->source, source_cnt);
341
342           /* Advance. */
343           phy_cnt -= source_cnt;
344           first_phy += source_cnt;
345           init_values += source_cnt;
346           cnt -= source_cnt;
347           before += source_cnt;
348           added += source_cnt;
349         }
350     }
351   return true;
352 }
353
354 /* Deletes the CNT columns in DS starting from column START. */
355 void
356 datasheet_delete_columns (struct datasheet *ds, size_t start, size_t cnt)
357 {
358   size_t lcol;
359
360   /* Free up columns for reuse. */
361   for (lcol = start; lcol < start + cnt; lcol++)
362     {
363       size_t pcol = axis_map (ds->columns, lcol);
364       struct range_map_node *r = range_map_lookup (&ds->sources, pcol);
365       struct source_info *si = source_info_from_range_map (r);
366
367       source_decrease_use (si->source, 1);
368       if (source_has_backing (si->source))
369         {
370           if (!source_in_use (si->source))
371             free_source_info (ds, si);
372         }
373       else
374         axis_make_available (ds->columns, pcol, 1);
375     }
376
377   /* Remove columns from logical-to-physical mapping. */
378   axis_remove (ds->columns, start, cnt);
379 }
380
381 /* Moves the CNT columns in DS starting at position OLD_START so
382    that they then start at position NEW_START.  Equivalent to
383    deleting the column rows, then inserting them at what becomes
384    position NEW_START after the deletion.*/
385 void
386 datasheet_move_columns (struct datasheet *ds,
387                         size_t old_start, size_t new_start,
388                         size_t cnt)
389 {
390   axis_move (ds->columns, old_start, new_start, cnt);
391 }
392
393 /* Retrieves the contents of the given ROW in datasheet DS into
394    newly created case C.  Returns true if successful, false on
395    I/O error. */
396 bool
397 datasheet_get_row (const struct datasheet *ds, casenumber row, struct ccase *c)
398 {
399   size_t column_cnt = datasheet_get_column_cnt (ds);
400   case_create (c, column_cnt);
401   if (rw_case ((struct datasheet *) ds, OP_READ,
402                row, 0, column_cnt, case_data_all_rw (c)))
403     return true;
404   else
405     {
406       case_destroy (c);
407       return false;
408     }
409 }
410
411 /* Stores the contents of case C, which is destroyed, into the
412    given ROW in DS.  Returns true on success, false on I/O error.
413    On failure, the given ROW might be partially modified or
414    corrupted. */
415 bool
416 datasheet_put_row (struct datasheet *ds, casenumber row, struct ccase *c)
417 {
418   size_t column_cnt = datasheet_get_column_cnt (ds);
419   bool ok = rw_case (ds, OP_WRITE, row, 0, column_cnt,
420                      (union value *) case_data_all (c));
421   case_destroy (c);
422   return ok;
423 }
424
425 /* Stores the values of the WIDTH columns in DS in the given ROW
426    starting at COLUMN in DS into VALUES.  Returns true if
427    successful, false on I/O error. */
428 bool
429 datasheet_get_value (const struct datasheet *ds, casenumber row, size_t column,
430                      union value *value, int width)
431 {
432   return rw_case ((struct datasheet *) ds,
433                   OP_READ, row, column, value_cnt_from_width (width), value);
434 }
435
436 /* Stores the WIDTH given VALUES into the given ROW in DS
437    starting at COLUMN.  Returns true if successful, false on I/O
438    error.  On failure, the given ROW might be partially modified
439    or corrupted. */
440 bool
441 datasheet_put_value (struct datasheet *ds, casenumber row, size_t column,
442                      const union value *value, int width)
443 {
444   return rw_case (ds, OP_WRITE, row, column, value_cnt_from_width (width),
445                   (union value *) value);
446 }
447
448 /* Inserts the CNT cases at C, which are destroyed, into
449    datasheet DS just before row BEFORE.  Returns true if
450    successful, false on I/O error.  On failure, datasheet DS is
451    not modified. */
452 bool
453 datasheet_insert_rows (struct datasheet *ds,
454                        casenumber before, struct ccase c[],
455                        casenumber cnt)
456 {
457   casenumber added = 0;
458   while (cnt > 0)
459     {
460       unsigned long first_phy;
461       unsigned long phy_cnt;
462       unsigned long i;
463
464       /* Allocate physical rows from the pool of available
465          rows. */
466       if (!axis_allocate (ds->rows, cnt, &first_phy, &phy_cnt))
467         {
468           /* No rows were available.  Extend the row axis to make
469              some new ones available. */
470           phy_cnt = cnt;
471           first_phy = axis_extend (ds->rows, cnt);
472         }
473
474       /* Insert the new rows into the row mapping. */
475       axis_insert (ds->rows, before, first_phy, phy_cnt);
476
477       /* Initialize the new rows. */
478       for (i = 0; i < phy_cnt; i++)
479         if (!datasheet_put_row (ds, before + i, &c[i]))
480           {
481             while (++i < cnt)
482               case_destroy (&c[i]);
483             datasheet_delete_rows (ds, before - added, phy_cnt + added);
484             return false;
485           }
486
487       /* Advance. */
488       c += phy_cnt;
489       cnt -= phy_cnt;
490       before += phy_cnt;
491       added += phy_cnt;
492     }
493   return true;
494 }
495
496 /* Deletes the CNT rows in DS starting from row FIRST. */
497 void
498 datasheet_delete_rows (struct datasheet *ds,
499                        casenumber first, casenumber cnt)
500 {
501   size_t lrow;
502
503   /* Free up rows for reuse.
504      FIXME: optimize. */
505   for (lrow = first; lrow < first + cnt; lrow++)
506     axis_make_available (ds->rows, axis_map (ds->rows, lrow), 1);
507
508   /* Remove rows from logical-to-physical mapping. */
509   axis_remove (ds->rows, first, cnt);
510 }
511
512 /* Moves the CNT rows in DS starting at position OLD_START so
513    that they then start at position NEW_START.  Equivalent to
514    deleting the given rows, then inserting them at what becomes
515    position NEW_START after the deletion. */
516 void
517 datasheet_move_rows (struct datasheet *ds,
518                      size_t old_start, size_t new_start,
519                      size_t cnt)
520 {
521   axis_move (ds->rows, old_start, new_start, cnt);
522 }
523 \f
524 static const struct casereader_random_class datasheet_reader_class;
525
526 /* Creates and returns a casereader whose input cases are the
527    rows in datasheet DS.  From the caller's perspective, DS is
528    effectively destroyed by this operation, such that the caller
529    must not reference it again. */
530 struct casereader *
531 datasheet_make_reader (struct datasheet *ds)
532 {
533   struct casereader *reader;
534   ds = datasheet_rename (ds);
535   reader = casereader_create_random (datasheet_get_column_cnt (ds),
536                                      datasheet_get_row_cnt (ds),
537                                      &datasheet_reader_class, ds);
538   taint_propagate (datasheet_get_taint (ds), casereader_get_taint (reader));
539   return reader;
540 }
541
542 /* "read" function for the datasheet random casereader. */
543 static bool
544 datasheet_reader_read (struct casereader *reader UNUSED, void *ds_,
545                        casenumber case_idx, struct ccase *c)
546 {
547   struct datasheet *ds = ds_;
548   if (case_idx >= datasheet_get_row_cnt (ds))
549     return false;
550   else if (datasheet_get_row (ds, case_idx, c))
551     return true;
552   else
553     {
554       taint_set_taint (ds->taint);
555       return false;
556     }
557 }
558
559 /* "destroy" function for the datasheet random casereader. */
560 static void
561 datasheet_reader_destroy (struct casereader *reader UNUSED, void *ds_)
562 {
563   struct datasheet *ds = ds_;
564   datasheet_destroy (ds);
565 }
566
567 /* "advance" function for the datasheet random casereader. */
568 static void
569 datasheet_reader_advance (struct casereader *reader UNUSED, void *ds_,
570                           casenumber case_cnt)
571 {
572   struct datasheet *ds = ds_;
573   datasheet_delete_rows (ds, 0, case_cnt);
574 }
575
576 /* Random casereader class for a datasheet. */
577 static const struct casereader_random_class datasheet_reader_class =
578   {
579     datasheet_reader_read,
580     datasheet_reader_destroy,
581     datasheet_reader_advance,
582   };
583 \f
584 /* Removes SI from DS's set of sources and destroys its
585    source. */
586 static void
587 free_source_info (struct datasheet *ds, struct source_info *si)
588 {
589   range_map_delete (&ds->sources, &si->column_range);
590   source_destroy (si->source);
591   free (si);
592 }
593
594 static struct source_info *
595 source_info_from_range_map (struct range_map_node *node)
596 {
597   return range_map_data (node, struct source_info, column_range);
598 }
599
600 /* Reads (if OP is OP_READ) or writes (if op is OP_WRITE) the
601    COLUMN_CNT columns starting from column START_COLUMN in row
602    LROW to/from the COLUMN_CNT values in DATA. */
603 static bool
604 rw_case (struct datasheet *ds, enum rw_op op,
605          casenumber lrow, size_t start_column, size_t column_cnt,
606          union value data[])
607 {
608   casenumber prow;
609   size_t lcol;
610
611   assert (lrow < datasheet_get_row_cnt (ds));
612   assert (column_cnt <= datasheet_get_column_cnt (ds));
613   assert (start_column + column_cnt <= datasheet_get_column_cnt (ds));
614
615   prow = axis_map (ds->rows, lrow);
616   for (lcol = start_column; lcol < start_column + column_cnt; lcol++, data++)
617     {
618       size_t pcol = axis_map (ds->columns, lcol);
619       struct range_map_node *r = range_map_lookup (&ds->sources, pcol);
620       struct source_info *s = source_info_from_range_map (r);
621       size_t pcol_ofs = pcol - range_map_node_get_start (r);
622       if (!(op == OP_READ
623             ? source_read (s->source, prow, pcol_ofs, data, 1)
624             : source_write (s->source, prow, pcol_ofs, data, 1)))
625         {
626           taint_set_taint (ds->taint);
627           return false;
628         }
629     }
630   return true;
631 }
632 \f
633 /* An axis.
634
635    An axis has two functions.  First, it maintains a mapping from
636    logical (client-visible) to physical (storage) ordinates.  The
637    axis_map and axis_get_size functions inspect this mapping, and
638    the axis_insert, axis_remove, and axis_move functions modify
639    it.  Second, it tracks the set of ordinates that are unused
640    and available for reuse.  (Not all unused ordinates are
641    available for reuse: in particular, unused columns that are
642    backed by a casereader are never reused.)  The axis_allocate,
643    axis_make_available, and axis_extend functions affect the set
644    of available ordinates. */
645 struct axis
646   {
647     struct tower log_to_phy;     /* Map from logical to physical ordinates;
648                                     contains "struct axis_group"s. */
649     struct range_set *available; /* Set of unused, available ordinates. */
650     unsigned long int phy_size;  /* Current physical length of axis. */
651   };
652
653 /* A mapping from logical to physical ordinates. */
654 struct axis_group
655   {
656     struct tower_node logical;  /* Range of logical ordinates. */
657     unsigned long phy_start;    /* First corresponding physical ordinate. */
658   };
659
660 static struct axis_group *axis_group_from_tower_node (struct tower_node *);
661 static struct tower_node *make_axis_group (unsigned long int phy_start);
662 static struct tower_node *split_axis (struct axis *, unsigned long int where);
663 static void merge_axis_nodes (struct axis *, struct tower_node *,
664                               struct tower_node **other_node);
665 static void check_axis_merged (const struct axis *axis UNUSED);
666
667 /* Creates and returns a new, initially empty axis. */
668 static struct axis *
669 axis_create (void)
670 {
671   struct axis *axis = xmalloc (sizeof *axis);
672   tower_init (&axis->log_to_phy);
673   axis->available = range_set_create ();
674   axis->phy_size = 0;
675   return axis;
676 }
677
678 /* Returns a clone of existing axis OLD.
679
680    Currently this is used only by the datasheet model checker
681    driver, but it could be otherwise useful. */
682 static struct axis *
683 axis_clone (const struct axis *old)
684 {
685   const struct tower_node *node;
686   struct axis *new;
687
688   new = xmalloc (sizeof *new);
689   tower_init (&new->log_to_phy);
690   new->available = range_set_clone (old->available, NULL);
691   new->phy_size = old->phy_size;
692
693   for (node = tower_first (&old->log_to_phy); node != NULL;
694        node = tower_next (&old->log_to_phy, node))
695     {
696       unsigned long int size = tower_node_get_height (node);
697       struct axis_group *group = tower_data (node, struct axis_group, logical);
698       tower_insert (&new->log_to_phy, size, make_axis_group (group->phy_start),
699                     NULL);
700     }
701
702   return new;
703 }
704
705 /* Adds the state of AXIS to the MD4 hash context CTX.
706
707    This is only used by the datasheet model checker driver.  It
708    is unlikely to be otherwise useful. */
709 static void
710 axis_hash (const struct axis *axis, struct md4_ctx *ctx)
711 {
712   const struct tower_node *tn;
713   const struct range_set_node *rsn;
714
715   for (tn = tower_first (&axis->log_to_phy); tn != NULL;
716        tn = tower_next (&axis->log_to_phy, tn))
717     {
718       struct axis_group *group = tower_data (tn, struct axis_group, logical);
719       unsigned long int phy_start = group->phy_start;
720       unsigned long int size = tower_node_get_height (tn);
721
722       md4_process_bytes (&phy_start, sizeof phy_start, ctx);
723       md4_process_bytes (&size, sizeof size, ctx);
724     }
725
726   for (rsn = range_set_first (axis->available); rsn != NULL;
727        rsn = range_set_next (axis->available, rsn))
728     {
729       unsigned long int start = range_set_node_get_start (rsn);
730       unsigned long int end = range_set_node_get_end (rsn);
731
732       md4_process_bytes (&start, sizeof start, ctx);
733       md4_process_bytes (&end, sizeof end, ctx);
734     }
735
736   md4_process_bytes (&axis->phy_size, sizeof axis->phy_size, ctx);
737 }
738
739 /* Destroys AXIS. */
740 static void
741 axis_destroy (struct axis *axis)
742 {
743   if (axis == NULL)
744     return;
745
746   while (!tower_is_empty (&axis->log_to_phy))
747     {
748       struct tower_node *node = tower_first (&axis->log_to_phy);
749       struct axis_group *group = tower_data (node, struct axis_group,
750                                              logical);
751       tower_delete (&axis->log_to_phy, node);
752       free (group);
753     }
754
755   range_set_destroy (axis->available);
756   free (axis);
757 }
758
759 /* Allocates up to REQUEST contiguous unused and available
760    ordinates from AXIS.  If successful, stores the number
761    obtained into *WIDTH and the ordinate of the first into
762    *START, marks the ordinates as now unavailable return true.
763    On failure, which occurs only if AXIS has no unused and
764    available ordinates, returns false without modifying AXIS. */
765 static bool
766 axis_allocate (struct axis *axis, unsigned long int request,
767                unsigned long int *start, unsigned long int *width)
768 {
769   return range_set_allocate (axis->available, request, start, width);
770 }
771
772 /* Marks the WIDTH contiguous ordinates in AXIS, starting from
773    START, as unused and available. */
774 static void
775 axis_make_available (struct axis *axis,
776                      unsigned long int start, unsigned long int width)
777 {
778   range_set_insert (axis->available, start, width);
779 }
780
781 /* Extends the total physical length of AXIS by WIDTH and returns
782    the first ordinate in the new physical region. */
783 static unsigned long int
784 axis_extend (struct axis *axis, unsigned long int width)
785 {
786   unsigned long int start = axis->phy_size;
787   axis->phy_size += width;
788   return start;
789 }
790
791 /* Returns the physical ordinate in AXIS corresponding to logical
792    ordinate LOG_POS.  LOG_POS must be less than the logical
793    length of AXIS. */
794 static unsigned long int
795 axis_map (const struct axis *axis, unsigned long log_pos)
796 {
797   struct tower_node *node;
798   struct axis_group *group;
799   unsigned long int group_start;
800
801   node = tower_lookup (&axis->log_to_phy, log_pos, &group_start);
802   group = tower_data (node, struct axis_group, logical);
803   return group->phy_start + (log_pos - group_start);
804 }
805
806 /* Returns the logical length of AXIS. */
807 static unsigned long
808 axis_get_size (const struct axis *axis)
809 {
810   return tower_height (&axis->log_to_phy);
811 }
812
813 /* Inserts the CNT contiguous physical ordinates starting at
814    PHY_START into AXIS's logical-to-physical mapping, starting at
815    logical position LOG_START. */
816 static void
817 axis_insert (struct axis *axis,
818              unsigned long int log_start, unsigned long int phy_start,
819              unsigned long int cnt)
820 {
821   struct tower_node *before = split_axis (axis, log_start);
822   struct tower_node *new = make_axis_group (phy_start);
823   tower_insert (&axis->log_to_phy, cnt, new, before);
824   merge_axis_nodes (axis, new, NULL);
825   check_axis_merged (axis);
826 }
827
828 /* Removes CNT ordinates from AXIS's logical-to-physical mapping
829    starting at logical position START. */
830 static void
831 axis_remove (struct axis *axis,
832              unsigned long int start, unsigned long int cnt)
833 {
834   if (cnt > 0)
835     {
836       struct tower_node *last = split_axis (axis, start + cnt);
837       struct tower_node *cur, *next;
838       for (cur = split_axis (axis, start); cur != last; cur = next)
839         {
840           next = tower_delete (&axis->log_to_phy, cur);
841           free (axis_group_from_tower_node (cur));
842         }
843       merge_axis_nodes (axis, last, NULL);
844       check_axis_merged (axis);
845     }
846 }
847
848 /* Moves the CNT ordinates in AXIS's logical-to-mapping starting
849    at logical position OLD_START so that they then start at
850    position NEW_START. */
851 static void
852 axis_move (struct axis *axis,
853            unsigned long int old_start, unsigned long int new_start,
854            unsigned long int cnt)
855 {
856   if (cnt > 0 && old_start != new_start)
857     {
858       struct tower_node *old_first, *old_last, *new_first;
859       struct tower_node *merge1, *merge2;
860       struct tower tmp_array;
861
862       /* Move ordinates OLD_START...(OLD_START + CNT) into new,
863          separate TMP_ARRAY. */
864       old_first = split_axis (axis, old_start);
865       old_last = split_axis (axis, old_start + cnt);
866       tower_init (&tmp_array);
867       tower_splice (&tmp_array, NULL,
868                     &axis->log_to_phy, old_first, old_last);
869       merge_axis_nodes (axis, old_last, NULL);
870       check_axis_merged (axis);
871
872       /* Move TMP_ARRAY to position NEW_START. */
873       new_first = split_axis (axis, new_start);
874       merge1 = tower_first (&tmp_array);
875       merge2 = tower_last (&tmp_array);
876       if (merge2 == merge1)
877         merge2 = NULL;
878       tower_splice (&axis->log_to_phy, new_first, &tmp_array, old_first, NULL);
879       merge_axis_nodes (axis, merge1, &merge2);
880       merge_axis_nodes (axis, merge2, NULL);
881       check_axis_merged (axis);
882     }
883 }
884
885 /* Returns the axis_group in which NODE is embedded. */
886 static struct axis_group *
887 axis_group_from_tower_node (struct tower_node *node)
888 {
889   return tower_data (node, struct axis_group, logical);
890 }
891
892 /* Creates and returns a new axis_group at physical position
893    PHY_START. */
894 static struct tower_node *
895 make_axis_group (unsigned long phy_start)
896 {
897   struct axis_group *group = xmalloc (sizeof *group);
898   group->phy_start = phy_start;
899   return &group->logical;
900 }
901
902 /* Returns the tower_node in AXIS's logical-to-physical map whose
903    bottom edge is at exact level WHERE.  If there is no such
904    tower_node in AXIS's logical-to-physical map, then split_axis
905    creates one by breaking an existing tower_node into two
906    separate ones, unless WHERE is equal to the tower height, in
907    which case it simply returns a null pointer. */
908 static struct tower_node *
909 split_axis (struct axis *axis, unsigned long int where)
910 {
911   unsigned long int group_start;
912   struct tower_node *group_node;
913   struct axis_group *group;
914
915   assert (where <= tower_height (&axis->log_to_phy));
916   if (where >= tower_height (&axis->log_to_phy))
917     return NULL;
918
919   group_node = tower_lookup (&axis->log_to_phy, where, &group_start);
920   group = axis_group_from_tower_node (group_node);
921   if (where > group_start)
922     {
923       unsigned long int size_1 = where - group_start;
924       unsigned long int size_2 = tower_node_get_height (group_node) - size_1;
925       struct tower_node *next = tower_next (&axis->log_to_phy, group_node);
926       struct tower_node *new = make_axis_group (group->phy_start + size_1);
927       tower_resize (&axis->log_to_phy, group_node, size_1);
928       tower_insert (&axis->log_to_phy, size_2, new, next);
929       return new;
930     }
931   else
932     return &group->logical;
933 }
934
935 /* Within AXIS, attempts to merge NODE (or the last node in AXIS,
936    if NODE is null) with its neighbor nodes.  This is possible
937    when logically adjacent nodes are also adjacent physically (in
938    the same order).
939
940    When a merge occurs, and OTHER_NODE is non-null and points to
941    the node to be deleted, this function also updates
942    *OTHER_NODE, if necessary, to ensure that it remains a valid
943    pointer. */
944 static void
945 merge_axis_nodes (struct axis *axis, struct tower_node *node,
946                   struct tower_node **other_node)
947 {
948   struct tower *t = &axis->log_to_phy;
949   struct axis_group *group;
950   struct tower_node *next, *prev;
951
952   /* Find node to potentially merge with neighbors. */
953   if (node == NULL)
954     node = tower_last (t);
955   if (node == NULL)
956     return;
957   group = axis_group_from_tower_node (node);
958
959   /* Try to merge NODE with successor. */
960   next = tower_next (t, node);
961   if (next != NULL)
962     {
963       struct axis_group *next_group = axis_group_from_tower_node (next);
964       unsigned long this_height = tower_node_get_height (node);
965
966       if (group->phy_start + this_height == next_group->phy_start)
967         {
968           unsigned long next_height = tower_node_get_height (next);
969           tower_resize (t, node, this_height + next_height);
970           if (other_node != NULL && *other_node == next)
971             *other_node = tower_next (t, *other_node);
972           tower_delete (t, next);
973           free (next_group);
974         }
975     }
976
977   /* Try to merge NODE with predecessor. */
978   prev = tower_prev (t, node);
979   if (prev != NULL)
980     {
981       struct axis_group *prev_group = axis_group_from_tower_node (prev);
982       unsigned long prev_height = tower_node_get_height (prev);
983
984       if (prev_group->phy_start + prev_height == group->phy_start)
985         {
986           unsigned long this_height = tower_node_get_height (node);
987           group->phy_start = prev_group->phy_start;
988           tower_resize (t, node, this_height + prev_height);
989           if (other_node != NULL && *other_node == prev)
990             *other_node = tower_next (t, *other_node);
991           tower_delete (t, prev);
992           free (prev_group);
993         }
994     }
995 }
996
997 /* Verify that all potentially merge-able nodes in AXIS are
998    actually merged. */
999 static void
1000 check_axis_merged (const struct axis *axis UNUSED)
1001 {
1002 #if ASSERT_LEVEL >= 10
1003   struct tower_node *prev, *node;
1004
1005   for (prev = NULL, node = tower_first (&axis->log_to_phy); node != NULL;
1006        prev = node, node = tower_next (&axis->log_to_phy, node))
1007     if (prev != NULL)
1008       {
1009         struct axis_group *prev_group = axis_group_from_tower_node (prev);
1010         unsigned long prev_height = tower_node_get_height (prev);
1011         struct axis_group *node_group = axis_group_from_tower_node (node);
1012         assert (prev_group->phy_start + prev_height != node_group->phy_start);
1013       }
1014 #endif
1015 }
1016 \f
1017 /* A source. */
1018 struct source
1019   {
1020     size_t columns_used;        /* Number of columns in use by client. */
1021     struct sparse_cases *data;  /* Data at top level, atop the backing. */
1022     struct casereader *backing; /* Backing casereader (or null). */
1023     casenumber backing_rows;    /* Number of rows in backing (if nonnull). */
1024   };
1025
1026 /* Creates and returns an empty, unbacked source with COLUMN_CNT
1027    columns and an initial "columns_used" of 0. */
1028 static struct source *
1029 source_create_empty (size_t column_cnt)
1030 {
1031   struct source *source = xmalloc (sizeof *source);
1032   source->columns_used = 0;
1033   source->data = sparse_cases_create (column_cnt);
1034   source->backing = NULL;
1035   source->backing_rows = 0;
1036   return source;
1037 }
1038
1039 /* Creates and returns a new source backed by READER and with the
1040    same initial dimensions and content. */
1041 static struct source *
1042 source_create_casereader (struct casereader *reader)
1043 {
1044   struct source *source
1045     = source_create_empty (casereader_get_value_cnt (reader));
1046   source->backing = reader;
1047   source->backing_rows = casereader_count_cases (reader);
1048   return source;
1049 }
1050
1051 /* Returns a clone of source OLD with the same data and backing
1052    (if any).
1053
1054    Currently this is used only by the datasheet model checker
1055    driver, but it could be otherwise useful. */
1056 static struct source *
1057 source_clone (const struct source *old)
1058 {
1059   struct source *new = xmalloc (sizeof *new);
1060   new->columns_used = old->columns_used;
1061   new->data = sparse_cases_clone (old->data);
1062   new->backing = old->backing != NULL ? casereader_clone (old->backing) : NULL;
1063   new->backing_rows = old->backing_rows;
1064   if (new->data == NULL)
1065     {
1066       source_destroy (new);
1067       new = NULL;
1068     }
1069   return new;
1070 }
1071
1072 /* Increases the columns_used count of SOURCE by DELTA.
1073    The new value must not exceed SOURCE's number of columns. */
1074 static void
1075 source_increase_use (struct source *source, size_t delta)
1076 {
1077   source->columns_used += delta;
1078   assert (source->columns_used <= sparse_cases_get_value_cnt (source->data));
1079 }
1080
1081 /* Decreases the columns_used count of SOURCE by DELTA.
1082    This must not attempt to decrease the columns_used count below
1083    zero. */
1084 static void
1085 source_decrease_use (struct source *source, size_t delta)
1086 {
1087   assert (delta <= source->columns_used);
1088   source->columns_used -= delta;
1089 }
1090
1091 /* Returns true if SOURCE has any columns in use,
1092    false otherwise. */
1093 static bool
1094 source_in_use (const struct source *source)
1095 {
1096   return source->columns_used > 0;
1097 }
1098
1099 /* Destroys SOURCE and its data and backing, if any. */
1100 static void
1101 source_destroy (struct source *source)
1102 {
1103   if (source != NULL)
1104     {
1105       sparse_cases_destroy (source->data);
1106       casereader_destroy (source->backing);
1107       free (source);
1108     }
1109 }
1110
1111 /* Returns the number of rows in SOURCE's backing casereader
1112    (SOURCE must have a backing casereader). */
1113 static casenumber
1114 source_get_backing_row_cnt (const struct source *source)
1115 {
1116   assert (source_has_backing (source));
1117   return source->backing_rows;
1118 }
1119
1120 /* Returns the number of columns in SOURCE. */
1121 static size_t
1122 source_get_column_cnt (const struct source *source)
1123 {
1124   return sparse_cases_get_value_cnt (source->data);
1125 }
1126
1127 /* Reads VALUE_CNT columns from SOURCE in the given ROW, starting
1128    from COLUMN, into VALUES.  Returns true if successful, false
1129    on I/O error. */
1130 static bool
1131 source_read (const struct source *source,
1132              casenumber row, size_t column,
1133              union value values[], size_t value_cnt)
1134 {
1135   if (source->backing == NULL || sparse_cases_contains_row (source->data, row))
1136     return sparse_cases_read (source->data, row, column, values, value_cnt);
1137   else
1138     {
1139       struct ccase c;
1140       bool ok;
1141
1142       assert (source->backing != NULL);
1143       ok = casereader_peek (source->backing, row, &c);
1144       if (ok)
1145         {
1146           case_copy_out (&c, column, values, value_cnt);
1147           case_destroy (&c);
1148         }
1149       return ok;
1150     }
1151 }
1152
1153 /* Writes the VALUE_CNT values in VALUES to SOURCE in the given
1154    ROW, starting at ROW.  Returns true if successful, false on
1155    I/O error.  On error, the row's data may be completely or
1156    partially corrupted, both inside and outside the region to be
1157    written.  */
1158 static bool
1159 source_write (struct source *source,
1160               casenumber row, size_t column,
1161               const union value values[], size_t value_cnt)
1162 {
1163   size_t column_cnt = sparse_cases_get_value_cnt (source->data);
1164   bool ok;
1165
1166   if (source->backing == NULL
1167       || (column == 0 && value_cnt == column_cnt)
1168       || sparse_cases_contains_row (source->data, row))
1169     ok = sparse_cases_write (source->data, row, column, values, value_cnt);
1170   else
1171     {
1172       struct ccase c;
1173       if (row < source->backing_rows)
1174         ok = casereader_peek (source->backing, row, &c);
1175       else
1176         {
1177           /* It's not one of the backed rows.  Ideally, this
1178              should never happen: we'd always be writing the full
1179              contents of new, unbacked rows in a single call to
1180              this function, so that the first case above would
1181              trigger.  But that's a little difficult at higher
1182              levels, so that we in fact usually write the full
1183              contents of new, unbacked rows in multiple calls to
1184              this function.  Make this work. */
1185           case_create (&c, column_cnt);
1186           ok = true;
1187         }
1188       if (ok)
1189         {
1190           case_copy_in (&c, column, values, value_cnt);
1191           ok = sparse_cases_write (source->data, row, 0,
1192                                    case_data_all (&c), column_cnt);
1193           case_destroy (&c);
1194         }
1195     }
1196   return ok;
1197 }
1198
1199 /* Within SOURCE, which must not have a backing casereader,
1200    writes the VALUE_CNT values in VALUES_CNT to the VALUE_CNT
1201    columns starting from START_COLUMN, in every row, even in rows
1202    not yet otherwise initialized.  Returns true if successful,
1203    false if an I/O error occurs.
1204
1205    We don't support backing != NULL because (1) it's harder and
1206    (2) source_write_columns is only called by
1207    datasheet_insert_columns, which doesn't reuse columns from
1208    sources that are backed by casereaders. */
1209 static bool
1210 source_write_columns (struct source *source, size_t start_column,
1211                       const union value values[], size_t value_cnt)
1212 {
1213   assert (source->backing == NULL);
1214
1215   return sparse_cases_write_columns (source->data, start_column,
1216                                      values, value_cnt);
1217 }
1218
1219 /* Returns true if SOURCE has a backing casereader, false
1220    otherwise. */
1221 static bool
1222 source_has_backing (const struct source *source)
1223 {
1224   return source->backing != NULL;
1225 }
1226 \f
1227 /* Datasheet model checker test driver. */
1228
1229 /* Maximum size of datasheet supported for model checking
1230    purposes. */
1231 #define MAX_ROWS 5
1232 #define MAX_COLS 5
1233
1234 /* Hashes the structure of datasheet DS and returns the hash.
1235    We use MD4 because it is much faster than MD5 or SHA-1 but its
1236    collision resistance is just as good. */
1237 static unsigned int
1238 hash_datasheet (const struct datasheet *ds)
1239 {
1240   unsigned int hash[DIV_RND_UP (20, sizeof (unsigned int))];
1241   struct md4_ctx ctx;
1242   struct range_map_node *r;
1243
1244   md4_init_ctx (&ctx);
1245   axis_hash (ds->columns, &ctx);
1246   axis_hash (ds->rows, &ctx);
1247   for (r = range_map_first (&ds->sources); r != NULL;
1248        r = range_map_next (&ds->sources, r))
1249     {
1250       unsigned long int start = range_map_node_get_start (r);
1251       unsigned long int end = range_map_node_get_end (r);
1252       md4_process_bytes (&start, sizeof start, &ctx);
1253       md4_process_bytes (&end, sizeof end, &ctx);
1254     }
1255   md4_process_bytes (&ds->column_min_alloc, sizeof ds->column_min_alloc,
1256                       &ctx);
1257   md4_finish_ctx (&ctx, hash);
1258   return hash[0];
1259 }
1260
1261 /* Checks that datasheet DS contains has ROW_CNT rows, COLUMN_CNT
1262    columns, and the same contents as ARRAY, reporting any
1263    mismatches via mc_error.  Then, adds DS to MC as a new state. */
1264 static void
1265 check_datasheet (struct mc *mc, struct datasheet *ds,
1266                  double array[MAX_ROWS][MAX_COLS],
1267                  size_t row_cnt, size_t column_cnt)
1268 {
1269   assert (row_cnt < MAX_ROWS);
1270   assert (column_cnt < MAX_COLS);
1271
1272   /* If it is a duplicate hash, discard the state before checking
1273      its consistency, to save time. */
1274   if (mc_discard_dup_state (mc, hash_datasheet (ds)))
1275     {
1276       datasheet_destroy (ds);
1277       return;
1278     }
1279
1280   if (row_cnt != datasheet_get_row_cnt (ds))
1281     mc_error (mc, "row count (%lu) does not match expected (%zu)",
1282               (unsigned long int) datasheet_get_row_cnt (ds), row_cnt);
1283   else if (column_cnt != datasheet_get_column_cnt (ds))
1284     mc_error (mc, "column count (%lu) does not match expected (%zu)",
1285               (unsigned long int) datasheet_get_column_cnt (ds), column_cnt);
1286   else
1287     {
1288       size_t row, col;
1289
1290       for (row = 0; row < row_cnt; row++)
1291         for (col = 0; col < column_cnt; col++)
1292           {
1293             union value v;
1294             if (!datasheet_get_value (ds, row, col, &v, 1))
1295               NOT_REACHED ();
1296             if (v.f != array[row][col])
1297               mc_error (mc, "element %zu,%zu (of %zu,%zu) differs: %g != %g",
1298                         row, col, row_cnt, column_cnt, v.f, array[row][col]);
1299           }
1300     }
1301
1302   mc_add_state (mc, ds);
1303 }
1304
1305 /* Extracts the contents of DS into DATA. */
1306 static void
1307 extract_data (const struct datasheet *ds, double data[MAX_ROWS][MAX_COLS])
1308 {
1309   size_t column_cnt = datasheet_get_column_cnt (ds);
1310   size_t row_cnt = datasheet_get_row_cnt (ds);
1311   size_t row, col;
1312
1313   assert (row_cnt < MAX_ROWS);
1314   assert (column_cnt < MAX_COLS);
1315   for (row = 0; row < row_cnt; row++)
1316     for (col = 0; col < column_cnt; col++)
1317       {
1318         union value v;
1319         if (!datasheet_get_value (ds, row, col, &v, 1))
1320           NOT_REACHED ();
1321         data[row][col] = v.f;
1322       }
1323 }
1324
1325 /* Clones the structure and contents of ODS into *DS,
1326    and the contents of ODATA into DATA. */
1327 static void
1328 clone_model (const struct datasheet *ods, double odata[MAX_ROWS][MAX_COLS],
1329              struct datasheet **ds_, double data[MAX_ROWS][MAX_COLS])
1330 {
1331   struct datasheet *ds;
1332   struct range_map_node *r;
1333
1334   /* Clone ODS into DS. */
1335   ds = *ds_ = xmalloc (sizeof *ds);
1336   ds->columns = axis_clone (ods->columns);
1337   ds->rows = axis_clone (ods->rows);
1338   range_map_init (&ds->sources);
1339   for (r = range_map_first (&ods->sources); r != NULL;
1340        r = range_map_next (&ods->sources, r))
1341     {
1342       const struct source_info *osi = source_info_from_range_map (r);
1343       struct source_info *si = xmalloc (sizeof *si);
1344       si->source = source_clone (osi->source);
1345       range_map_insert (&ds->sources, range_map_node_get_start (r),
1346                         range_map_node_get_width (r), &si->column_range);
1347     }
1348   ds->column_min_alloc = ods->column_min_alloc;
1349   ds->taint = taint_create ();
1350
1351   /* Clone ODATA into DATA. */
1352   memcpy (data, odata, MAX_ROWS * MAX_COLS * sizeof **data);
1353 }
1354
1355 /* "init" function for struct mc_class. */
1356 static void
1357 datasheet_mc_init (struct mc *mc)
1358 {
1359   struct datasheet_test_params *params = mc_get_aux (mc);
1360   struct datasheet *ds;
1361
1362   if (params->backing_rows == 0 && params->backing_cols == 0)
1363     {
1364       /* Create unbacked datasheet. */
1365       ds = datasheet_create (NULL);
1366       mc_name_operation (mc, "empty datasheet");
1367       check_datasheet (mc, ds, NULL, 0, 0);
1368     }
1369   else
1370     {
1371       /* Create datasheet with backing. */
1372       struct casewriter *writer;
1373       struct casereader *reader;
1374       double data[MAX_ROWS][MAX_COLS];
1375       int row;
1376
1377       assert (params->backing_rows > 0 && params->backing_rows <= MAX_ROWS);
1378       assert (params->backing_cols > 0 && params->backing_cols <= MAX_COLS);
1379
1380       writer = mem_writer_create (params->backing_cols);
1381       for (row = 0; row < params->backing_rows; row++)
1382         {
1383           struct ccase c;
1384           int col;
1385
1386           case_create (&c, params->backing_cols);
1387           for (col = 0; col < params->backing_cols; col++)
1388             {
1389               double value = params->next_value++;
1390               data[row][col] = value;
1391               case_data_rw_idx (&c, col)->f = value;
1392             }
1393           casewriter_write (writer, &c);
1394         }
1395       reader = casewriter_make_reader (writer);
1396       assert (reader != NULL);
1397
1398       ds = datasheet_create (reader);
1399       mc_name_operation (mc, "datasheet with (%d,%d) backing",
1400                          params->backing_rows, params->backing_cols);
1401       check_datasheet (mc, ds, data,
1402                        params->backing_rows, params->backing_cols);
1403     }
1404 }
1405
1406 /* "mutate" function for struct mc_class. */
1407 static void
1408 datasheet_mc_mutate (struct mc *mc, const void *ods_)
1409 {
1410   struct datasheet_test_params *params = mc_get_aux (mc);
1411
1412   const struct datasheet *ods = ods_;
1413   double odata[MAX_ROWS][MAX_COLS];
1414   double data[MAX_ROWS][MAX_COLS];
1415   size_t column_cnt = datasheet_get_column_cnt (ods);
1416   size_t row_cnt = datasheet_get_row_cnt (ods);
1417   size_t pos, new_pos, cnt;
1418
1419   extract_data (ods, odata);
1420
1421   /* Insert all possible numbers of columns in all possible
1422      positions. */
1423   for (pos = 0; pos <= column_cnt; pos++)
1424     for (cnt = 0; cnt <= params->max_cols - column_cnt; cnt++)
1425       if (mc_include_state (mc))
1426         {
1427           struct datasheet *ds;
1428           union value new[MAX_COLS];
1429           size_t i, j;
1430
1431           mc_name_operation (mc, "insert %zu columns at %zu", cnt, pos);
1432           clone_model (ods, odata, &ds, data);
1433
1434           for (i = 0; i < cnt; i++)
1435             new[i].f = params->next_value++;
1436
1437           if (!datasheet_insert_columns (ds, new, cnt, pos))
1438             mc_error (mc, "datasheet_insert_columns failed");
1439
1440           for (i = 0; i < row_cnt; i++)
1441             {
1442               insert_range (&data[i][0], column_cnt, sizeof data[i][0],
1443                             pos, cnt);
1444               for (j = 0; j < cnt; j++)
1445                 data[i][pos + j] = new[j].f;
1446             }
1447
1448           check_datasheet (mc, ds, data, row_cnt, column_cnt + cnt);
1449         }
1450
1451   /* Delete all possible numbers of columns from all possible
1452      positions. */
1453   for (pos = 0; pos < column_cnt; pos++)
1454     for (cnt = 0; cnt < column_cnt - pos; cnt++)
1455       if (mc_include_state (mc))
1456         {
1457           struct datasheet *ds;
1458           size_t i;
1459
1460           mc_name_operation (mc, "delete %zu columns at %zu", cnt, pos);
1461           clone_model (ods, odata, &ds, data);
1462
1463           datasheet_delete_columns (ds, pos, cnt);
1464
1465           for (i = 0; i < row_cnt; i++)
1466             remove_range (&data[i], column_cnt, sizeof *data[i], pos, cnt);
1467
1468           check_datasheet (mc, ds, data, row_cnt, column_cnt - cnt);
1469         }
1470
1471   /* Move all possible numbers of columns from all possible
1472      existing positions to all possible new positions. */
1473   for (pos = 0; pos < column_cnt; pos++)
1474     for (cnt = 0; cnt < column_cnt - pos; cnt++)
1475       for (new_pos = 0; new_pos < column_cnt - cnt; new_pos++)
1476         if (mc_include_state (mc))
1477           {
1478             struct datasheet *ds;
1479             size_t i;
1480
1481             clone_model (ods, odata, &ds, data);
1482             mc_name_operation (mc, "move %zu columns from %zu to %zu",
1483                                cnt, pos, new_pos);
1484
1485             datasheet_move_columns (ds, pos, new_pos, cnt);
1486
1487             for (i = 0; i < row_cnt; i++)
1488               move_range (&data[i], column_cnt, sizeof data[i][0],
1489                           pos, new_pos, cnt);
1490
1491             check_datasheet (mc, ds, data, row_cnt, column_cnt);
1492           }
1493
1494   /* Insert all possible numbers of rows in all possible
1495      positions. */
1496   for (pos = 0; pos <= row_cnt; pos++)
1497     for (cnt = 0; cnt <= params->max_rows - row_cnt; cnt++)
1498       if (mc_include_state (mc))
1499         {
1500           struct datasheet *ds;
1501           struct ccase c[MAX_ROWS];
1502           size_t i, j;
1503
1504           clone_model (ods, odata, &ds, data);
1505           mc_name_operation (mc, "insert %zu rows at %zu", cnt, pos);
1506
1507           for (i = 0; i < cnt; i++)
1508             {
1509               case_create (&c[i], column_cnt);
1510               for (j = 0; j < column_cnt; j++)
1511                 case_data_rw_idx (&c[i], j)->f = params->next_value++;
1512             }
1513
1514           insert_range (data, row_cnt, sizeof data[pos], pos, cnt);
1515           for (i = 0; i < cnt; i++)
1516             for (j = 0; j < column_cnt; j++)
1517               data[i + pos][j] = case_num_idx (&c[i], j);
1518
1519           if (!datasheet_insert_rows (ds, pos, c, cnt))
1520             mc_error (mc, "datasheet_insert_rows failed");
1521
1522           check_datasheet (mc, ds, data, row_cnt + cnt, column_cnt);
1523         }
1524
1525   /* Delete all possible numbers of rows from all possible
1526      positions. */
1527   for (pos = 0; pos < row_cnt; pos++)
1528     for (cnt = 0; cnt < row_cnt - pos; cnt++)
1529       if (mc_include_state (mc))
1530         {
1531           struct datasheet *ds;
1532
1533           clone_model (ods, odata, &ds, data);
1534           mc_name_operation (mc, "delete %zu rows at %zu", cnt, pos);
1535
1536           datasheet_delete_rows (ds, pos, cnt);
1537
1538           remove_range (&data[0], row_cnt, sizeof data[0], pos, cnt);
1539
1540           check_datasheet (mc, ds, data, row_cnt - cnt, column_cnt);
1541         }
1542
1543   /* Move all possible numbers of rows from all possible existing
1544      positions to all possible new positions. */
1545   for (pos = 0; pos < row_cnt; pos++)
1546     for (cnt = 0; cnt < row_cnt - pos; cnt++)
1547       for (new_pos = 0; new_pos < row_cnt - cnt; new_pos++)
1548         if (mc_include_state (mc))
1549           {
1550             struct datasheet *ds;
1551
1552             clone_model (ods, odata, &ds, data);
1553             mc_name_operation (mc, "move %zu rows from %zu to %zu",
1554                                cnt, pos, new_pos);
1555
1556             datasheet_move_rows (ds, pos, new_pos, cnt);
1557
1558             move_range (&data[0], row_cnt, sizeof data[0],
1559                         pos, new_pos, cnt);
1560
1561             check_datasheet (mc, ds, data, row_cnt, column_cnt);
1562           }
1563 }
1564
1565 /* "destroy" function for struct mc_class. */
1566 static void
1567 datasheet_mc_destroy (const struct mc *mc UNUSED, void *ds_)
1568 {
1569   struct datasheet *ds = ds_;
1570   datasheet_destroy (ds);
1571 }
1572
1573 /* Executes the model checker on the datasheet test driver with
1574    the given OPTIONS and passing in the given PARAMS, which must
1575    point to a modifiable "struct datasheet_test_params".  If any
1576    value in PARAMS is out of range, it will be adjusted into the
1577    valid range before running the test.
1578
1579    Returns the results of the model checking run. */
1580 struct mc_results *
1581 datasheet_test (struct mc_options *options, void *params_)
1582 {
1583   struct datasheet_test_params *params = params_;
1584   static const struct mc_class datasheet_mc_class =
1585     {
1586       datasheet_mc_init,
1587       datasheet_mc_mutate,
1588       datasheet_mc_destroy,
1589     };
1590
1591   params->next_value = 1;
1592   params->max_rows = MIN (params->max_rows, MAX_ROWS);
1593   params->max_cols = MIN (params->max_cols, MAX_COLS);
1594   params->backing_rows = MIN (params->backing_rows, params->max_rows);
1595   params->backing_cols = MIN (params->backing_cols, params->max_cols);
1596
1597   mc_options_set_aux (options, params);
1598   return mc_run (&datasheet_mc_class, options);
1599 }