Moved the datasheet testing code out of src/{libspp,data}
[pspp-builds.git] / src / data / datasheet.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2007 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <data/datasheet.h>
20
21 #include <stdlib.h>
22 #include <string.h>
23
24 #include <data/casereader-provider.h>
25 #include <data/casereader.h>
26 #include <data/casewriter.h>
27 #include <data/lazy-casereader.h>
28 #include <data/sparse-cases.h>
29 #include <libpspp/array.h>
30 #include <libpspp/assertion.h>
31 #include <libpspp/range-map.h>
32 #include <libpspp/range-set.h>
33 #include <libpspp/taint.h>
34 #include <libpspp/tower.h>
35
36 #include "minmax.h"
37 #include "md4.h"
38 #include "xalloc.h"
39
40 static struct axis *axis_create (void);
41 static struct axis *axis_clone (const struct axis *);
42 static void axis_destroy (struct axis *);
43
44 static void axis_hash (const struct axis *, struct md4_ctx *);
45
46 static bool axis_allocate (struct axis *, unsigned long int request,
47                            unsigned long int *start,
48                            unsigned long int *width);
49 static void axis_make_available (struct axis *,
50                                 unsigned long int start,
51                                 unsigned long int width);
52 static unsigned long int axis_extend (struct axis *, unsigned long int width);
53
54 static unsigned long int axis_map (const struct axis *, unsigned long log_pos);
55
56 static unsigned long axis_get_size (const struct axis *);
57 static void axis_insert (struct axis *,
58                          unsigned long int log_start,
59                          unsigned long int phy_start,
60                          unsigned long int cnt);
61 static void axis_remove (struct axis *,
62                          unsigned long int start, unsigned long int cnt);
63 static void axis_move (struct axis *,
64                        unsigned long int old_start,
65                        unsigned long int new_start,
66                        unsigned long int cnt);
67
68 static struct source *source_create_empty (size_t column_cnt);
69 static struct source *source_create_casereader (struct casereader *);
70 static struct source *source_clone (const struct source *);
71 static void source_destroy (struct source *);
72
73 static casenumber source_get_backing_row_cnt (const struct source *);
74 static size_t source_get_column_cnt (const struct source *);
75
76 static bool source_read (const struct source *,
77                          casenumber row, size_t column,
78                          union value[], size_t value_cnt);
79 static bool source_write (struct source *,
80                           casenumber row, size_t column,
81                           const union value[], size_t value_cnt);
82 static bool source_write_columns (struct source *, size_t start_column,
83                                   const union value[], size_t value_cnt);
84 static bool source_has_backing (const struct source *);
85 static void source_increase_use (struct source *, size_t delta);
86 static void source_decrease_use (struct source *, size_t delta);
87 static bool source_in_use (const struct source *);
88
89 /* A datasheet is internally composed from a set of data files,
90    called "sources".  The sources that make up a datasheet must
91    have the same number of rows (cases), but their numbers of
92    columns (variables) may vary.
93
94    A datasheet's external view is produced by mapping (permuting
95    and selecting) its internal data.  Thus, we can rearrange or
96    delete rows or columns simply by modifying the mapping.  We
97    add rows by adding rows to each source and to the row mapping.
98    We add columns by adding a new source, then adding that source
99    to the column mapping.
100
101    Each source in a datasheet can be a casereader or a
102    sparse_cases.  Casereaders are read-only, so when sources made
103    from casereaders need to be modified, it is done "virtually"
104    through being overlaid by a sparse_cases. */
105
106 /* A datasheet. */
107 struct datasheet
108   {
109     /* Mappings from logical to physical columns/rows. */
110     struct axis *columns;
111     struct axis *rows;
112
113     /* Mapping from physical columns to "source_info"s. */
114     struct range_map sources;
115
116     /* Minimum number of columns to put in a new source when we
117        need new columns and none are free.  We double it whenever
118        we add a new source to keep the number of file descriptors
119        needed by the datasheet to a minimum, reducing the
120        likelihood of running out. */
121     unsigned column_min_alloc;
122
123     /* Indicates corrupted data in the datasheet. */
124     struct taint *taint;
125   };
126
127 /* Maps from a range of physical columns to a source. */
128 struct source_info
129   {
130     struct range_map_node column_range;
131     struct source *source;
132   };
133
134 /* Is this operation a read or a write? */
135 enum rw_op
136   {
137     OP_READ,
138     OP_WRITE
139   };
140
141 static void free_source_info (struct datasheet *, struct source_info *);
142 static struct source_info *source_info_from_range_map (
143   struct range_map_node *);
144 static bool rw_case (struct datasheet *ds, enum rw_op op,
145                      casenumber lrow, size_t start_column, size_t column_cnt,
146                      union value data[]);
147
148 /* Creates and returns a new datasheet.
149
150    If READER is nonnull, then the datasheet initially contains
151    the contents of READER. */
152 struct datasheet *
153 datasheet_create (struct casereader *reader)
154 {
155   /* Create datasheet. */
156   struct datasheet *ds = xmalloc (sizeof *ds);
157   ds->columns = axis_create ();
158   ds->rows = axis_create ();
159   range_map_init (&ds->sources);
160   ds->column_min_alloc = 1;
161   ds->taint = taint_create ();
162
163   /* Add backing. */
164   if (reader != NULL)
165     {
166       size_t column_cnt;
167       casenumber row_cnt;
168       struct source_info *si;
169
170       si = xmalloc (sizeof *si);
171       si->source = source_create_casereader (reader);
172       column_cnt = source_get_column_cnt (si->source);
173       row_cnt = source_get_backing_row_cnt (si->source);
174       source_increase_use (si->source, column_cnt);
175
176       if ( column_cnt > 0 )
177         {
178           unsigned long int column_start;
179       column_start = axis_extend (ds->columns, column_cnt);
180       axis_insert (ds->columns, 0, column_start, column_cnt);
181       range_map_insert (&ds->sources, column_start, column_cnt,
182                         &si->column_range);
183         }
184
185       if ( row_cnt > 0 )
186         {
187           unsigned long int row_start;
188       row_start = axis_extend (ds->rows, row_cnt);
189       axis_insert (ds->rows, 0, row_start, row_cnt);
190         }
191     }
192
193   return ds;
194 }
195
196 /* Destroys datasheet DS. */
197 void
198 datasheet_destroy (struct datasheet *ds)
199 {
200   if (ds == NULL)
201     return;
202
203   axis_destroy (ds->columns);
204   axis_destroy (ds->rows);
205   while (!range_map_is_empty (&ds->sources))
206     {
207       struct range_map_node *r = range_map_first (&ds->sources);
208       struct source_info *si = source_info_from_range_map (r);
209       free_source_info (ds, si);
210     }
211   taint_destroy (ds->taint);
212   free (ds);
213 }
214
215 /* Moves datasheet DS to a new location in memory, and returns
216    the new location.  Afterward, the datasheet must not be
217    accessed at its former location.
218
219    This function is useful for ensuring that all references to a
220    datasheet have been dropped, especially in conjunction with
221    tools like Valgrind. */
222 struct datasheet *
223 datasheet_rename (struct datasheet *ds)
224 {
225   struct datasheet *new = xmemdup (ds, sizeof *ds);
226   free (ds);
227   return new;
228 }
229
230 /* Returns true if datasheet DS is tainted.
231    A datasheet is tainted by an I/O error or by taint
232    propagation to the datasheet. */
233 bool
234 datasheet_error (const struct datasheet *ds)
235 {
236   return taint_is_tainted (ds->taint);
237 }
238
239 /* Marks datasheet DS tainted. */
240 void
241 datasheet_force_error (struct datasheet *ds)
242 {
243   taint_set_taint (ds->taint);
244 }
245
246 /* Returns datasheet DS's taint object. */
247 const struct taint *
248 datasheet_get_taint (const struct datasheet *ds)
249 {
250   return ds->taint;
251 }
252
253 /* Returns the number of rows in DS. */
254 casenumber
255 datasheet_get_row_cnt (const struct datasheet *ds)
256 {
257   return axis_get_size (ds->rows);
258 }
259
260 /* Returns the number of columns in DS. */
261 size_t
262 datasheet_get_column_cnt (const struct datasheet *ds)
263 {
264   return axis_get_size (ds->columns);
265 }
266
267 /* Inserts CNT columns into datasheet DS just before column
268    BEFORE.  Initializes the contents of each row in the inserted
269    columns to the CNT values in INIT_VALUES.
270
271    Returns true if successful, false on failure.  In case of
272    failure, the datasheet is unchanged. */
273 bool
274 datasheet_insert_columns (struct datasheet *ds,
275                           const union value init_values[], size_t cnt,
276                           size_t before)
277 {
278   size_t added = 0;
279   while (cnt > 0)
280     {
281       unsigned long first_phy; /* First allocated physical column. */
282       unsigned long phy_cnt;   /* Number of allocated physical columns. */
283
284       /* Allocate physical columns from the pool of available
285          columns. */
286       if (!axis_allocate (ds->columns, cnt, &first_phy, &phy_cnt))
287         {
288           /* No columns were available.  Create a new source and
289              extend the axis to make some new ones available. */
290           struct source_info *si;
291
292           phy_cnt = MAX (cnt, ds->column_min_alloc);
293           first_phy = axis_extend (ds->columns, phy_cnt);
294           ds->column_min_alloc = MIN (65536, ds->column_min_alloc * 2);
295
296           si = xmalloc (sizeof *si);
297           si->source = source_create_empty (phy_cnt);
298           range_map_insert (&ds->sources, first_phy, phy_cnt,
299                             &si->column_range);
300           if (phy_cnt > cnt)
301             {
302               axis_make_available (ds->columns, first_phy + cnt,
303                                    phy_cnt - cnt);
304               phy_cnt = cnt;
305             }
306         }
307
308       /* Initialize the columns and insert them into the columns
309          axis. */
310       while (phy_cnt > 0)
311         {
312           struct range_map_node *r; /* Range map holding FIRST_PHY column. */
313           struct source_info *s;    /* Source containing FIRST_PHY column. */
314           size_t source_avail;      /* Number of phys columns available. */
315           size_t source_cnt;        /* Number of phys columns to use. */
316
317           /* Figure out how many columns we can and want to take
318              starting at FIRST_PHY, and then insert them into the
319              columns axis. */
320           r = range_map_lookup (&ds->sources, first_phy);
321           s = source_info_from_range_map (r);
322           source_avail = range_map_node_get_end (r) - first_phy;
323           source_cnt = MIN (phy_cnt, source_avail);
324           axis_insert (ds->columns, before, first_phy, source_cnt);
325
326           /* Initialize the data for those columns in the
327              source. */
328           if (!source_write_columns (s->source,
329                                      first_phy - range_map_node_get_start (r),
330                                      init_values, source_cnt))
331             {
332               datasheet_delete_columns (ds, before - added,
333                                         source_cnt + added);
334               taint_set_taint (ds->taint);
335               return false;
336             }
337           source_increase_use (s->source, source_cnt);
338
339           /* Advance. */
340           phy_cnt -= source_cnt;
341           first_phy += source_cnt;
342           init_values += source_cnt;
343           cnt -= source_cnt;
344           before += source_cnt;
345           added += source_cnt;
346         }
347     }
348   return true;
349 }
350
351 /* Deletes the CNT columns in DS starting from column START. */
352 void
353 datasheet_delete_columns (struct datasheet *ds, size_t start, size_t cnt)
354 {
355   size_t lcol;
356
357   assert ( start + cnt <= axis_get_size (ds->columns) );
358
359   /* Free up columns for reuse. */
360   for (lcol = start; lcol < start + cnt; lcol++)
361     {
362       size_t pcol = axis_map (ds->columns, lcol);
363       struct range_map_node *r = range_map_lookup (&ds->sources, pcol);
364       struct source_info *si = source_info_from_range_map (r);
365
366       source_decrease_use (si->source, 1);
367       if (source_has_backing (si->source))
368         {
369           if (!source_in_use (si->source))
370             free_source_info (ds, si);
371         }
372       else
373         axis_make_available (ds->columns, pcol, 1);
374     }
375
376   /* Remove columns from logical-to-physical mapping. */
377   axis_remove (ds->columns, start, cnt);
378 }
379
380 /* Moves the CNT columns in DS starting at position OLD_START so
381    that they then start at position NEW_START.  Equivalent to
382    deleting the column rows, then inserting them at what becomes
383    position NEW_START after the deletion.*/
384 void
385 datasheet_move_columns (struct datasheet *ds,
386                         size_t old_start, size_t new_start,
387                         size_t cnt)
388 {
389   axis_move (ds->columns, old_start, new_start, cnt);
390 }
391
392 /* Retrieves the contents of the given ROW in datasheet DS into
393    newly created case C.  Returns true if successful, false on
394    I/O error. */
395 bool
396 datasheet_get_row (const struct datasheet *ds, casenumber row, struct ccase *c)
397 {
398   size_t column_cnt = datasheet_get_column_cnt (ds);
399   case_create (c, column_cnt);
400   if (rw_case ((struct datasheet *) ds, OP_READ,
401                row, 0, column_cnt, case_data_all_rw (c)))
402     return true;
403   else
404     {
405       case_destroy (c);
406       return false;
407     }
408 }
409
410 /* Stores the contents of case C, which is destroyed, into the
411    given ROW in DS.  Returns true on success, false on I/O error.
412    On failure, the given ROW might be partially modified or
413    corrupted. */
414 bool
415 datasheet_put_row (struct datasheet *ds, casenumber row, struct ccase *c)
416 {
417   size_t column_cnt = datasheet_get_column_cnt (ds);
418   bool ok = rw_case (ds, OP_WRITE, row, 0, column_cnt,
419                      (union value *) case_data_all (c));
420   case_destroy (c);
421   return ok;
422 }
423
424 /* Stores the values of the WIDTH columns in DS in the given ROW
425    starting at COLUMN in DS into VALUES.  Returns true if
426    successful, false on I/O error. */
427 bool
428 datasheet_get_value (const struct datasheet *ds, casenumber row, size_t column,
429                      union value *value, int width)
430 {
431   assert ( row >= 0 );
432   return rw_case ((struct datasheet *) ds,
433                   OP_READ, row, column, value_cnt_from_width (width), value);
434 }
435
436 /* Stores the WIDTH given VALUES into the given ROW in DS
437    starting at COLUMN.  Returns true if successful, false on I/O
438    error.  On failure, the given ROW might be partially modified
439    or corrupted. */
440 bool
441 datasheet_put_value (struct datasheet *ds, casenumber row, size_t column,
442                      const union value *value, int width)
443 {
444   return rw_case (ds, OP_WRITE, row, column, value_cnt_from_width (width),
445                   (union value *) value);
446 }
447
448 /* Inserts the CNT cases at C, which are destroyed, into
449    datasheet DS just before row BEFORE.  Returns true if
450    successful, false on I/O error.  On failure, datasheet DS is
451    not modified. */
452 bool
453 datasheet_insert_rows (struct datasheet *ds,
454                        casenumber before, struct ccase c[],
455                        casenumber cnt)
456 {
457   casenumber added = 0;
458   while (cnt > 0)
459     {
460       unsigned long first_phy;
461       unsigned long phy_cnt;
462       unsigned long i;
463
464       /* Allocate physical rows from the pool of available
465          rows. */
466       if (!axis_allocate (ds->rows, cnt, &first_phy, &phy_cnt))
467         {
468           /* No rows were available.  Extend the row axis to make
469              some new ones available. */
470           phy_cnt = cnt;
471           first_phy = axis_extend (ds->rows, cnt);
472         }
473
474       /* Insert the new rows into the row mapping. */
475       axis_insert (ds->rows, before, first_phy, phy_cnt);
476
477       /* Initialize the new rows. */
478       for (i = 0; i < phy_cnt; i++)
479         if (!datasheet_put_row (ds, before + i, &c[i]))
480           {
481             while (++i < cnt)
482               case_destroy (&c[i]);
483             datasheet_delete_rows (ds, before - added, phy_cnt + added);
484             return false;
485           }
486
487       /* Advance. */
488       c += phy_cnt;
489       cnt -= phy_cnt;
490       before += phy_cnt;
491       added += phy_cnt;
492     }
493   return true;
494 }
495
496 /* Deletes the CNT rows in DS starting from row FIRST. */
497 void
498 datasheet_delete_rows (struct datasheet *ds,
499                        casenumber first, casenumber cnt)
500 {
501   size_t lrow;
502
503   /* Free up rows for reuse.
504      FIXME: optimize. */
505   for (lrow = first; lrow < first + cnt; lrow++)
506     axis_make_available (ds->rows, axis_map (ds->rows, lrow), 1);
507
508   /* Remove rows from logical-to-physical mapping. */
509   axis_remove (ds->rows, first, cnt);
510 }
511
512 /* Moves the CNT rows in DS starting at position OLD_START so
513    that they then start at position NEW_START.  Equivalent to
514    deleting the given rows, then inserting them at what becomes
515    position NEW_START after the deletion. */
516 void
517 datasheet_move_rows (struct datasheet *ds,
518                      size_t old_start, size_t new_start,
519                      size_t cnt)
520 {
521   axis_move (ds->rows, old_start, new_start, cnt);
522 }
523 \f
524 static const struct casereader_random_class datasheet_reader_class;
525
526 /* Creates and returns a casereader whose input cases are the
527    rows in datasheet DS.  From the caller's perspective, DS is
528    effectively destroyed by this operation, such that the caller
529    must not reference it again. */
530 struct casereader *
531 datasheet_make_reader (struct datasheet *ds)
532 {
533   struct casereader *reader;
534   ds = datasheet_rename (ds);
535   reader = casereader_create_random (datasheet_get_column_cnt (ds),
536                                      datasheet_get_row_cnt (ds),
537                                      &datasheet_reader_class, ds);
538   taint_propagate (datasheet_get_taint (ds), casereader_get_taint (reader));
539   return reader;
540 }
541
542 /* "read" function for the datasheet random casereader. */
543 static bool
544 datasheet_reader_read (struct casereader *reader UNUSED, void *ds_,
545                        casenumber case_idx, struct ccase *c)
546 {
547   struct datasheet *ds = ds_;
548   if (case_idx >= datasheet_get_row_cnt (ds))
549     return false;
550   else if (datasheet_get_row (ds, case_idx, c))
551     return true;
552   else
553     {
554       taint_set_taint (ds->taint);
555       return false;
556     }
557 }
558
559 /* "destroy" function for the datasheet random casereader. */
560 static void
561 datasheet_reader_destroy (struct casereader *reader UNUSED, void *ds_)
562 {
563   struct datasheet *ds = ds_;
564   datasheet_destroy (ds);
565 }
566
567 /* "advance" function for the datasheet random casereader. */
568 static void
569 datasheet_reader_advance (struct casereader *reader UNUSED, void *ds_,
570                           casenumber case_cnt)
571 {
572   struct datasheet *ds = ds_;
573   datasheet_delete_rows (ds, 0, case_cnt);
574 }
575
576 /* Random casereader class for a datasheet. */
577 static const struct casereader_random_class datasheet_reader_class =
578   {
579     datasheet_reader_read,
580     datasheet_reader_destroy,
581     datasheet_reader_advance,
582   };
583 \f
584 /* Removes SI from DS's set of sources and destroys its
585    source. */
586 static void
587 free_source_info (struct datasheet *ds, struct source_info *si)
588 {
589   range_map_delete (&ds->sources, &si->column_range);
590   source_destroy (si->source);
591   free (si);
592 }
593
594 static struct source_info *
595 source_info_from_range_map (struct range_map_node *node)
596 {
597   return range_map_data (node, struct source_info, column_range);
598 }
599
600 /* Reads (if OP is OP_READ) or writes (if op is OP_WRITE) the
601    COLUMN_CNT columns starting from column START_COLUMN in row
602    LROW to/from the COLUMN_CNT values in DATA. */
603 static bool
604 rw_case (struct datasheet *ds, enum rw_op op,
605          casenumber lrow, size_t start_column, size_t column_cnt,
606          union value data[])
607 {
608   casenumber prow;
609   size_t lcol;
610
611   assert (lrow < datasheet_get_row_cnt (ds));
612   assert (column_cnt <= datasheet_get_column_cnt (ds));
613   assert (start_column + column_cnt <= datasheet_get_column_cnt (ds));
614
615   prow = axis_map (ds->rows, lrow);
616   for (lcol = start_column; lcol < start_column + column_cnt; lcol++, data++)
617     {
618       size_t pcol = axis_map (ds->columns, lcol);
619       struct range_map_node *r = range_map_lookup (&ds->sources, pcol);
620       struct source_info *s = source_info_from_range_map (r);
621       size_t pcol_ofs = pcol - range_map_node_get_start (r);
622       if (!(op == OP_READ
623             ? source_read (s->source, prow, pcol_ofs, data, 1)
624             : source_write (s->source, prow, pcol_ofs, data, 1)))
625         {
626           taint_set_taint (ds->taint);
627           return false;
628         }
629     }
630   return true;
631 }
632 \f
633 /* An axis.
634
635    An axis has two functions.  First, it maintains a mapping from
636    logical (client-visible) to physical (storage) ordinates.  The
637    axis_map and axis_get_size functions inspect this mapping, and
638    the axis_insert, axis_remove, and axis_move functions modify
639    it.  Second, it tracks the set of ordinates that are unused
640    and available for reuse.  (Not all unused ordinates are
641    available for reuse: in particular, unused columns that are
642    backed by a casereader are never reused.)  The axis_allocate,
643    axis_make_available, and axis_extend functions affect the set
644    of available ordinates. */
645 struct axis
646   {
647     struct tower log_to_phy;     /* Map from logical to physical ordinates;
648                                     contains "struct axis_group"s. */
649     struct range_set *available; /* Set of unused, available ordinates. */
650     unsigned long int phy_size;  /* Current physical length of axis. */
651   };
652
653 /* A mapping from logical to physical ordinates. */
654 struct axis_group
655   {
656     struct tower_node logical;  /* Range of logical ordinates. */
657     unsigned long phy_start;    /* First corresponding physical ordinate. */
658   };
659
660 static struct axis_group *axis_group_from_tower_node (struct tower_node *);
661 static struct tower_node *make_axis_group (unsigned long int phy_start);
662 static struct tower_node *split_axis (struct axis *, unsigned long int where);
663 static void merge_axis_nodes (struct axis *, struct tower_node *,
664                               struct tower_node **other_node);
665 static void check_axis_merged (const struct axis *axis UNUSED);
666
667 /* Creates and returns a new, initially empty axis. */
668 static struct axis *
669 axis_create (void)
670 {
671   struct axis *axis = xmalloc (sizeof *axis);
672   tower_init (&axis->log_to_phy);
673   axis->available = range_set_create ();
674   axis->phy_size = 0;
675   return axis;
676 }
677
678 /* Returns a clone of existing axis OLD.
679
680    Currently this is used only by the datasheet model checker
681    driver, but it could be otherwise useful. */
682 static struct axis *
683 axis_clone (const struct axis *old)
684 {
685   const struct tower_node *node;
686   struct axis *new;
687
688   new = xmalloc (sizeof *new);
689   tower_init (&new->log_to_phy);
690   new->available = range_set_clone (old->available, NULL);
691   new->phy_size = old->phy_size;
692
693   for (node = tower_first (&old->log_to_phy); node != NULL;
694        node = tower_next (&old->log_to_phy, node))
695     {
696       unsigned long int size = tower_node_get_height (node);
697       struct axis_group *group = tower_data (node, struct axis_group, logical);
698       tower_insert (&new->log_to_phy, size, make_axis_group (group->phy_start),
699                     NULL);
700     }
701
702   return new;
703 }
704
705 /* Adds the state of AXIS to the MD4 hash context CTX.
706
707    This is only used by the datasheet model checker driver.  It
708    is unlikely to be otherwise useful. */
709 static void
710 axis_hash (const struct axis *axis, struct md4_ctx *ctx)
711 {
712   const struct tower_node *tn;
713   const struct range_set_node *rsn;
714
715   for (tn = tower_first (&axis->log_to_phy); tn != NULL;
716        tn = tower_next (&axis->log_to_phy, tn))
717     {
718       struct axis_group *group = tower_data (tn, struct axis_group, logical);
719       unsigned long int phy_start = group->phy_start;
720       unsigned long int size = tower_node_get_height (tn);
721
722       md4_process_bytes (&phy_start, sizeof phy_start, ctx);
723       md4_process_bytes (&size, sizeof size, ctx);
724     }
725
726   for (rsn = range_set_first (axis->available); rsn != NULL;
727        rsn = range_set_next (axis->available, rsn))
728     {
729       unsigned long int start = range_set_node_get_start (rsn);
730       unsigned long int end = range_set_node_get_end (rsn);
731
732       md4_process_bytes (&start, sizeof start, ctx);
733       md4_process_bytes (&end, sizeof end, ctx);
734     }
735
736   md4_process_bytes (&axis->phy_size, sizeof axis->phy_size, ctx);
737 }
738
739 /* Destroys AXIS. */
740 static void
741 axis_destroy (struct axis *axis)
742 {
743   if (axis == NULL)
744     return;
745
746   while (!tower_is_empty (&axis->log_to_phy))
747     {
748       struct tower_node *node = tower_first (&axis->log_to_phy);
749       struct axis_group *group = tower_data (node, struct axis_group,
750                                              logical);
751       tower_delete (&axis->log_to_phy, node);
752       free (group);
753     }
754
755   range_set_destroy (axis->available);
756   free (axis);
757 }
758
759 /* Allocates up to REQUEST contiguous unused and available
760    ordinates from AXIS.  If successful, stores the number
761    obtained into *WIDTH and the ordinate of the first into
762    *START, marks the ordinates as now unavailable return true.
763    On failure, which occurs only if AXIS has no unused and
764    available ordinates, returns false without modifying AXIS. */
765 static bool
766 axis_allocate (struct axis *axis, unsigned long int request,
767                unsigned long int *start, unsigned long int *width)
768 {
769   return range_set_allocate (axis->available, request, start, width);
770 }
771
772 /* Marks the WIDTH contiguous ordinates in AXIS, starting from
773    START, as unused and available. */
774 static void
775 axis_make_available (struct axis *axis,
776                      unsigned long int start, unsigned long int width)
777 {
778   range_set_insert (axis->available, start, width);
779 }
780
781 /* Extends the total physical length of AXIS by WIDTH and returns
782    the first ordinate in the new physical region. */
783 static unsigned long int
784 axis_extend (struct axis *axis, unsigned long int width)
785 {
786   unsigned long int start = axis->phy_size;
787   axis->phy_size += width;
788   return start;
789 }
790
791 /* Returns the physical ordinate in AXIS corresponding to logical
792    ordinate LOG_POS.  LOG_POS must be less than the logical
793    length of AXIS. */
794 static unsigned long int
795 axis_map (const struct axis *axis, unsigned long log_pos)
796 {
797   struct tower_node *node;
798   struct axis_group *group;
799   unsigned long int group_start;
800
801   node = tower_lookup (&axis->log_to_phy, log_pos, &group_start);
802   group = tower_data (node, struct axis_group, logical);
803   return group->phy_start + (log_pos - group_start);
804 }
805
806 /* Returns the logical length of AXIS. */
807 static unsigned long
808 axis_get_size (const struct axis *axis)
809 {
810   return tower_height (&axis->log_to_phy);
811 }
812
813 /* Inserts the CNT contiguous physical ordinates starting at
814    PHY_START into AXIS's logical-to-physical mapping, starting at
815    logical position LOG_START. */
816 static void
817 axis_insert (struct axis *axis,
818              unsigned long int log_start, unsigned long int phy_start,
819              unsigned long int cnt)
820 {
821   struct tower_node *before = split_axis (axis, log_start);
822   struct tower_node *new = make_axis_group (phy_start);
823   tower_insert (&axis->log_to_phy, cnt, new, before);
824   merge_axis_nodes (axis, new, NULL);
825   check_axis_merged (axis);
826 }
827
828 /* Removes CNT ordinates from AXIS's logical-to-physical mapping
829    starting at logical position START. */
830 static void
831 axis_remove (struct axis *axis,
832              unsigned long int start, unsigned long int cnt)
833 {
834   if (cnt > 0)
835     {
836       struct tower_node *last = split_axis (axis, start + cnt);
837       struct tower_node *cur, *next;
838       for (cur = split_axis (axis, start); cur != last; cur = next)
839         {
840           next = tower_delete (&axis->log_to_phy, cur);
841           free (axis_group_from_tower_node (cur));
842         }
843       merge_axis_nodes (axis, last, NULL);
844       check_axis_merged (axis);
845     }
846 }
847
848 /* Moves the CNT ordinates in AXIS's logical-to-mapping starting
849    at logical position OLD_START so that they then start at
850    position NEW_START. */
851 static void
852 axis_move (struct axis *axis,
853            unsigned long int old_start, unsigned long int new_start,
854            unsigned long int cnt)
855 {
856   if (cnt > 0 && old_start != new_start)
857     {
858       struct tower_node *old_first, *old_last, *new_first;
859       struct tower_node *merge1, *merge2;
860       struct tower tmp_array;
861
862       /* Move ordinates OLD_START...(OLD_START + CNT) into new,
863          separate TMP_ARRAY. */
864       old_first = split_axis (axis, old_start);
865       old_last = split_axis (axis, old_start + cnt);
866       tower_init (&tmp_array);
867       tower_splice (&tmp_array, NULL,
868                     &axis->log_to_phy, old_first, old_last);
869       merge_axis_nodes (axis, old_last, NULL);
870       check_axis_merged (axis);
871
872       /* Move TMP_ARRAY to position NEW_START. */
873       new_first = split_axis (axis, new_start);
874       merge1 = tower_first (&tmp_array);
875       merge2 = tower_last (&tmp_array);
876       if (merge2 == merge1)
877         merge2 = NULL;
878       tower_splice (&axis->log_to_phy, new_first, &tmp_array, old_first, NULL);
879       merge_axis_nodes (axis, merge1, &merge2);
880       merge_axis_nodes (axis, merge2, NULL);
881       check_axis_merged (axis);
882     }
883 }
884
885 /* Returns the axis_group in which NODE is embedded. */
886 static struct axis_group *
887 axis_group_from_tower_node (struct tower_node *node)
888 {
889   return tower_data (node, struct axis_group, logical);
890 }
891
892 /* Creates and returns a new axis_group at physical position
893    PHY_START. */
894 static struct tower_node *
895 make_axis_group (unsigned long phy_start)
896 {
897   struct axis_group *group = xmalloc (sizeof *group);
898   group->phy_start = phy_start;
899   return &group->logical;
900 }
901
902 /* Returns the tower_node in AXIS's logical-to-physical map whose
903    bottom edge is at exact level WHERE.  If there is no such
904    tower_node in AXIS's logical-to-physical map, then split_axis
905    creates one by breaking an existing tower_node into two
906    separate ones, unless WHERE is equal to the tower height, in
907    which case it simply returns a null pointer. */
908 static struct tower_node *
909 split_axis (struct axis *axis, unsigned long int where)
910 {
911   unsigned long int group_start;
912   struct tower_node *group_node;
913   struct axis_group *group;
914
915   assert (where <= tower_height (&axis->log_to_phy));
916   if (where >= tower_height (&axis->log_to_phy))
917     return NULL;
918
919   group_node = tower_lookup (&axis->log_to_phy, where, &group_start);
920   group = axis_group_from_tower_node (group_node);
921   if (where > group_start)
922     {
923       unsigned long int size_1 = where - group_start;
924       unsigned long int size_2 = tower_node_get_height (group_node) - size_1;
925       struct tower_node *next = tower_next (&axis->log_to_phy, group_node);
926       struct tower_node *new = make_axis_group (group->phy_start + size_1);
927       tower_resize (&axis->log_to_phy, group_node, size_1);
928       tower_insert (&axis->log_to_phy, size_2, new, next);
929       return new;
930     }
931   else
932     return &group->logical;
933 }
934
935 /* Within AXIS, attempts to merge NODE (or the last node in AXIS,
936    if NODE is null) with its neighbor nodes.  This is possible
937    when logically adjacent nodes are also adjacent physically (in
938    the same order).
939
940    When a merge occurs, and OTHER_NODE is non-null and points to
941    the node to be deleted, this function also updates
942    *OTHER_NODE, if necessary, to ensure that it remains a valid
943    pointer. */
944 static void
945 merge_axis_nodes (struct axis *axis, struct tower_node *node,
946                   struct tower_node **other_node)
947 {
948   struct tower *t = &axis->log_to_phy;
949   struct axis_group *group;
950   struct tower_node *next, *prev;
951
952   /* Find node to potentially merge with neighbors. */
953   if (node == NULL)
954     node = tower_last (t);
955   if (node == NULL)
956     return;
957   group = axis_group_from_tower_node (node);
958
959   /* Try to merge NODE with successor. */
960   next = tower_next (t, node);
961   if (next != NULL)
962     {
963       struct axis_group *next_group = axis_group_from_tower_node (next);
964       unsigned long this_height = tower_node_get_height (node);
965
966       if (group->phy_start + this_height == next_group->phy_start)
967         {
968           unsigned long next_height = tower_node_get_height (next);
969           tower_resize (t, node, this_height + next_height);
970           if (other_node != NULL && *other_node == next)
971             *other_node = tower_next (t, *other_node);
972           tower_delete (t, next);
973           free (next_group);
974         }
975     }
976
977   /* Try to merge NODE with predecessor. */
978   prev = tower_prev (t, node);
979   if (prev != NULL)
980     {
981       struct axis_group *prev_group = axis_group_from_tower_node (prev);
982       unsigned long prev_height = tower_node_get_height (prev);
983
984       if (prev_group->phy_start + prev_height == group->phy_start)
985         {
986           unsigned long this_height = tower_node_get_height (node);
987           group->phy_start = prev_group->phy_start;
988           tower_resize (t, node, this_height + prev_height);
989           if (other_node != NULL && *other_node == prev)
990             *other_node = tower_next (t, *other_node);
991           tower_delete (t, prev);
992           free (prev_group);
993         }
994     }
995 }
996
997 /* Verify that all potentially merge-able nodes in AXIS are
998    actually merged. */
999 static void
1000 check_axis_merged (const struct axis *axis UNUSED)
1001 {
1002 #if ASSERT_LEVEL >= 10
1003   struct tower_node *prev, *node;
1004
1005   for (prev = NULL, node = tower_first (&axis->log_to_phy); node != NULL;
1006        prev = node, node = tower_next (&axis->log_to_phy, node))
1007     if (prev != NULL)
1008       {
1009         struct axis_group *prev_group = axis_group_from_tower_node (prev);
1010         unsigned long prev_height = tower_node_get_height (prev);
1011         struct axis_group *node_group = axis_group_from_tower_node (node);
1012         assert (prev_group->phy_start + prev_height != node_group->phy_start);
1013       }
1014 #endif
1015 }
1016 \f
1017 /* A source. */
1018 struct source
1019   {
1020     size_t columns_used;        /* Number of columns in use by client. */
1021     struct sparse_cases *data;  /* Data at top level, atop the backing. */
1022     struct casereader *backing; /* Backing casereader (or null). */
1023     casenumber backing_rows;    /* Number of rows in backing (if nonnull). */
1024   };
1025
1026 /* Creates and returns an empty, unbacked source with COLUMN_CNT
1027    columns and an initial "columns_used" of 0. */
1028 static struct source *
1029 source_create_empty (size_t column_cnt)
1030 {
1031   struct source *source = xmalloc (sizeof *source);
1032   source->columns_used = 0;
1033   source->data = sparse_cases_create (column_cnt);
1034   source->backing = NULL;
1035   source->backing_rows = 0;
1036   return source;
1037 }
1038
1039 /* Creates and returns a new source backed by READER and with the
1040    same initial dimensions and content. */
1041 static struct source *
1042 source_create_casereader (struct casereader *reader)
1043 {
1044   struct source *source
1045     = source_create_empty (casereader_get_value_cnt (reader));
1046   source->backing = reader;
1047   source->backing_rows = casereader_count_cases (reader);
1048   return source;
1049 }
1050
1051 /* Returns a clone of source OLD with the same data and backing
1052    (if any).
1053
1054    Currently this is used only by the datasheet model checker
1055    driver, but it could be otherwise useful. */
1056 static struct source *
1057 source_clone (const struct source *old)
1058 {
1059   struct source *new = xmalloc (sizeof *new);
1060   new->columns_used = old->columns_used;
1061   new->data = sparse_cases_clone (old->data);
1062   new->backing = old->backing != NULL ? casereader_clone (old->backing) : NULL;
1063   new->backing_rows = old->backing_rows;
1064   if (new->data == NULL)
1065     {
1066       source_destroy (new);
1067       new = NULL;
1068     }
1069   return new;
1070 }
1071
1072 /* Increases the columns_used count of SOURCE by DELTA.
1073    The new value must not exceed SOURCE's number of columns. */
1074 static void
1075 source_increase_use (struct source *source, size_t delta)
1076 {
1077   source->columns_used += delta;
1078   assert (source->columns_used <= sparse_cases_get_value_cnt (source->data));
1079 }
1080
1081 /* Decreases the columns_used count of SOURCE by DELTA.
1082    This must not attempt to decrease the columns_used count below
1083    zero. */
1084 static void
1085 source_decrease_use (struct source *source, size_t delta)
1086 {
1087   assert (delta <= source->columns_used);
1088   source->columns_used -= delta;
1089 }
1090
1091 /* Returns true if SOURCE has any columns in use,
1092    false otherwise. */
1093 static bool
1094 source_in_use (const struct source *source)
1095 {
1096   return source->columns_used > 0;
1097 }
1098
1099 /* Destroys SOURCE and its data and backing, if any. */
1100 static void
1101 source_destroy (struct source *source)
1102 {
1103   if (source != NULL)
1104     {
1105       sparse_cases_destroy (source->data);
1106       casereader_destroy (source->backing);
1107       free (source);
1108     }
1109 }
1110
1111 /* Returns the number of rows in SOURCE's backing casereader
1112    (SOURCE must have a backing casereader). */
1113 static casenumber
1114 source_get_backing_row_cnt (const struct source *source)
1115 {
1116   assert (source_has_backing (source));
1117   return source->backing_rows;
1118 }
1119
1120 /* Returns the number of columns in SOURCE. */
1121 static size_t
1122 source_get_column_cnt (const struct source *source)
1123 {
1124   return sparse_cases_get_value_cnt (source->data);
1125 }
1126
1127 /* Reads VALUE_CNT columns from SOURCE in the given ROW, starting
1128    from COLUMN, into VALUES.  Returns true if successful, false
1129    on I/O error. */
1130 static bool
1131 source_read (const struct source *source,
1132              casenumber row, size_t column,
1133              union value values[], size_t value_cnt)
1134 {
1135   if (source->backing == NULL || sparse_cases_contains_row (source->data, row))
1136     return sparse_cases_read (source->data, row, column, values, value_cnt);
1137   else
1138     {
1139       struct ccase c;
1140       bool ok;
1141
1142       assert (source->backing != NULL);
1143       ok = casereader_peek (source->backing, row, &c);
1144       if (ok)
1145         {
1146           case_copy_out (&c, column, values, value_cnt);
1147           case_destroy (&c);
1148         }
1149       return ok;
1150     }
1151 }
1152
1153 /* Writes the VALUE_CNT values in VALUES to SOURCE in the given
1154    ROW, starting at ROW.  Returns true if successful, false on
1155    I/O error.  On error, the row's data may be completely or
1156    partially corrupted, both inside and outside the region to be
1157    written.  */
1158 static bool
1159 source_write (struct source *source,
1160               casenumber row, size_t column,
1161               const union value values[], size_t value_cnt)
1162 {
1163   size_t column_cnt = sparse_cases_get_value_cnt (source->data);
1164   bool ok;
1165
1166   if (source->backing == NULL
1167       || (column == 0 && value_cnt == column_cnt)
1168       || sparse_cases_contains_row (source->data, row))
1169     ok = sparse_cases_write (source->data, row, column, values, value_cnt);
1170   else
1171     {
1172       struct ccase c;
1173       if (row < source->backing_rows)
1174         ok = casereader_peek (source->backing, row, &c);
1175       else
1176         {
1177           /* It's not one of the backed rows.  Ideally, this
1178              should never happen: we'd always be writing the full
1179              contents of new, unbacked rows in a single call to
1180              this function, so that the first case above would
1181              trigger.  But that's a little difficult at higher
1182              levels, so that we in fact usually write the full
1183              contents of new, unbacked rows in multiple calls to
1184              this function.  Make this work. */
1185           case_create (&c, column_cnt);
1186           ok = true;
1187         }
1188       if (ok)
1189         {
1190           case_copy_in (&c, column, values, value_cnt);
1191           ok = sparse_cases_write (source->data, row, 0,
1192                                    case_data_all (&c), column_cnt);
1193           case_destroy (&c);
1194         }
1195     }
1196   return ok;
1197 }
1198
1199 /* Within SOURCE, which must not have a backing casereader,
1200    writes the VALUE_CNT values in VALUES_CNT to the VALUE_CNT
1201    columns starting from START_COLUMN, in every row, even in rows
1202    not yet otherwise initialized.  Returns true if successful,
1203    false if an I/O error occurs.
1204
1205    We don't support backing != NULL because (1) it's harder and
1206    (2) source_write_columns is only called by
1207    datasheet_insert_columns, which doesn't reuse columns from
1208    sources that are backed by casereaders. */
1209 static bool
1210 source_write_columns (struct source *source, size_t start_column,
1211                       const union value values[], size_t value_cnt)
1212 {
1213   assert (source->backing == NULL);
1214
1215   return sparse_cases_write_columns (source->data, start_column,
1216                                      values, value_cnt);
1217 }
1218
1219 /* Returns true if SOURCE has a backing casereader, false
1220    otherwise. */
1221 static bool
1222 source_has_backing (const struct source *source)
1223 {
1224   return source->backing != NULL;
1225 }
1226 \f
1227 /* Datasheet model checker test driver. */
1228
1229 /* Maximum size of datasheet supported for model checking
1230    purposes. */
1231 #define MAX_ROWS 5
1232 #define MAX_COLS 5
1233
1234
1235 /* Clones the structure and contents of ODS into a new datasheet,
1236    and returns the new datasheet. */
1237 struct datasheet *
1238 clone_datasheet (const struct datasheet *ods)
1239 {
1240   struct datasheet *ds;
1241   struct range_map_node *r;
1242
1243   ds = xmalloc (sizeof *ds);
1244   ds->columns = axis_clone (ods->columns);
1245   ds->rows = axis_clone (ods->rows);
1246   range_map_init (&ds->sources);
1247   for (r = range_map_first (&ods->sources); r != NULL;
1248        r = range_map_next (&ods->sources, r))
1249     {
1250       const struct source_info *osi = source_info_from_range_map (r);
1251       struct source_info *si = xmalloc (sizeof *si);
1252       si->source = source_clone (osi->source);
1253       range_map_insert (&ds->sources, range_map_node_get_start (r),
1254                         range_map_node_get_width (r), &si->column_range);
1255     }
1256   ds->column_min_alloc = ods->column_min_alloc;
1257   ds->taint = taint_create ();
1258
1259   return ds;
1260 }
1261
1262
1263 /* Hashes the structure of datasheet DS and returns the hash.
1264    We use MD4 because it is much faster than MD5 or SHA-1 but its
1265    collision resistance is just as good. */
1266 unsigned int
1267 hash_datasheet (const struct datasheet *ds)
1268 {
1269   unsigned int hash[DIV_RND_UP (20, sizeof (unsigned int))];
1270   struct md4_ctx ctx;
1271   struct range_map_node *r;
1272
1273   md4_init_ctx (&ctx);
1274   axis_hash (ds->columns, &ctx);
1275   axis_hash (ds->rows, &ctx);
1276   for (r = range_map_first (&ds->sources); r != NULL;
1277        r = range_map_next (&ds->sources, r))
1278     {
1279       unsigned long int start = range_map_node_get_start (r);
1280       unsigned long int end = range_map_node_get_end (r);
1281       md4_process_bytes (&start, sizeof start, &ctx);
1282       md4_process_bytes (&end, sizeof end, &ctx);
1283     }
1284   md4_process_bytes (&ds->column_min_alloc, sizeof ds->column_min_alloc,
1285                       &ctx);
1286   md4_finish_ctx (&ctx, hash);
1287   return hash[0];
1288 }
1289
1290
1291