1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 2007, 2009, 2010, 2011, 2012, 2013 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
19 #include "data/datasheet.h"
24 #include "data/casereader-provider.h"
25 #include "data/casereader.h"
26 #include "data/casewriter.h"
27 #include "data/lazy-casereader.h"
28 #include "data/settings.h"
29 #include "libpspp/array.h"
30 #include "libpspp/assertion.h"
31 #include "libpspp/misc.h"
32 #include "libpspp/range-map.h"
33 #include "libpspp/range-set.h"
34 #include "libpspp/sparse-xarray.h"
35 #include "libpspp/taint.h"
36 #include "libpspp/tower.h"
38 #include "gl/minmax.h"
40 #include "gl/xalloc.h"
44 static struct axis *axis_create (void);
45 static struct axis *axis_clone (const struct axis *);
46 static void axis_destroy (struct axis *);
48 static void axis_hash (const struct axis *, struct md4_ctx *);
50 static bool axis_allocate (struct axis *, unsigned long int request,
51 unsigned long int *start,
52 unsigned long int *width);
53 static void axis_make_available (struct axis *,
54 unsigned long int start,
55 unsigned long int width);
56 static unsigned long int axis_extend (struct axis *, unsigned long int width);
58 static unsigned long int axis_map (const struct axis *, unsigned long log_pos);
60 static unsigned long axis_get_size (const struct axis *);
61 static void axis_insert (struct axis *,
62 unsigned long int log_start,
63 unsigned long int phy_start,
65 static void axis_remove (struct axis *,
66 unsigned long int start, unsigned long int n);
67 static void axis_move (struct axis *,
68 unsigned long int old_start,
69 unsigned long int new_start,
72 static struct source *source_create_empty (size_t n_bytes);
73 static struct source *source_create_casereader (struct casereader *);
74 static struct source *source_clone (const struct source *);
75 static void source_destroy (struct source *);
77 static casenumber source_get_backing_n_rows (const struct source *);
79 static int source_allocate_column (struct source *, int width);
80 static void source_release_column (struct source *, int ofs, int width);
81 static bool source_in_use (const struct source *);
83 static bool source_read (const struct column *, casenumber row, union value *,
85 static bool source_write (const struct column *, casenumber row,
86 const union value *, size_t n);
87 static bool source_write_column (struct column *, const union value *);
88 static bool source_has_backing (const struct source *);
90 static int get_source_index (const struct datasheet *ds, const struct source *source);
92 /* A datasheet is internally composed from a set of data files,
93 called "sources". The sources that make up a datasheet must
94 have the same number of rows (cases), but their numbers of
95 columns (variables) may vary.
97 A datasheet's external view is produced by mapping (permuting
98 and selecting) its internal data. Thus, we can rearrange or
99 delete rows or columns simply by modifying the mapping. We
100 add rows by adding rows to each source and to the row mapping.
101 We add columns by adding a new source, then adding that source
102 to the column mapping.
104 Each source in a datasheet can be a casereader or a
105 sparse_xarray. Casereaders are read-only, so when sources
106 made from casereaders need to be modified, it is done
107 "virtually" through being overlaid by a sparse_xarray. */
110 struct range_set *avail; /* Free bytes are set to 1s. */
111 struct sparse_xarray *data; /* Data at top level, atop the backing. */
112 struct casereader *backing; /* Backing casereader (or null). */
113 casenumber backing_rows; /* Number of rows in backing (if backed). */
114 size_t n_used; /* Number of column in use (if backed). */
117 /* A logical column. */
120 struct source *source; /* Source of the underlying physical column. */
121 int value_ofs; /* If 'source' has a backing casereader,
122 column's value offset in its cases. */
123 int byte_ofs; /* Byte offset in source's sparse_xarray. */
124 int width; /* 0=numeric, otherwise string width. */
131 struct source **sources; /* Sources, in no particular order. */
132 size_t n_sources; /* Number of sources. */
135 struct caseproto *proto; /* Prototype for rows (initialized lazily). */
136 struct column *columns; /* Logical to physical column mapping. */
137 size_t n_columns; /* Number of logical columns. */
138 unsigned column_min_alloc; /* Min. # of columns to put in a new source. */
141 struct axis *rows; /* Logical to physical row mapping. */
144 struct taint *taint; /* Indicates corrupted data. */
147 /* Is this operation a read or a write? */
154 static void allocate_column (struct datasheet *, int width, struct column *);
155 static void release_source (struct datasheet *, struct source *);
156 static bool rw_case (struct datasheet *ds, enum rw_op op,
157 casenumber lrow, size_t start_column, size_t n_columns,
160 /* Returns the number of bytes needed to store a value with the
161 given WIDTH on disk. */
163 width_to_n_bytes (int width)
165 return width == 0 ? sizeof (double) : width;
168 /* Returns the address of the data in VALUE (for reading or
169 writing to/from disk). VALUE must have the given WIDTH. */
171 value_to_data (const union value *value_, int width)
173 union value *value = (union value *) value_;
174 assert (sizeof value->f == sizeof (double));
181 /* Returns the number of bytes needed to store all the values in
184 caseproto_to_n_bytes (const struct caseproto *proto)
190 for (i = 0; i < caseproto_get_n_widths (proto); i++)
192 int width = caseproto_get_width (proto, i);
195 n_bytes += width_to_n_bytes (width);
200 /* Creates and returns a new datasheet.
202 If READER is nonnull, then the datasheet initially contains
203 the contents of READER. */
205 datasheet_create (struct casereader *reader)
207 struct datasheet *ds = xmalloc (sizeof *ds);
213 ds->column_min_alloc = 8;
214 ds->rows = axis_create ();
215 ds->taint = taint_create ();
223 taint_propagate (casereader_get_taint (reader), ds->taint);
225 ds->proto = caseproto_ref (casereader_get_proto (reader));
227 ds->sources = xmalloc (sizeof *ds->sources);
228 ds->sources[0] = source_create_casereader (reader);
231 ds->n_columns = caseproto_get_n_widths (ds->proto);
232 ds->columns = xnmalloc (ds->n_columns, sizeof *ds->columns);
234 for (i = 0; i < ds->n_columns; i++)
236 struct column *column = &ds->columns[i];
237 int width = caseproto_get_width (ds->proto, i);
238 column->source = ds->sources[0];
239 column->width = width;
243 column->value_ofs = i;
244 column->byte_ofs = byte_ofs;
245 byte_ofs += width_to_n_bytes (column->width);
249 n_rows = source_get_backing_n_rows (ds->sources[0]);
251 axis_insert (ds->rows, 0, axis_extend (ds->rows, n_rows), n_rows);
257 /* Destroys datasheet DS. */
259 datasheet_destroy (struct datasheet *ds)
266 for (i = 0; i < ds->n_sources; i++)
267 source_destroy (ds->sources[i]);
269 caseproto_unref (ds->proto);
271 axis_destroy (ds->rows);
272 taint_destroy (ds->taint);
276 /* Returns the prototype for the cases in DS. The caller must
277 not unref the returned prototype. */
278 const struct caseproto *
279 datasheet_get_proto (const struct datasheet *ds_)
281 struct datasheet *ds = CONST_CAST (struct datasheet *, ds_);
282 if (ds->proto == NULL)
286 ds->proto = caseproto_create ();
287 for (i = 0; i < ds->n_columns; i++)
288 ds->proto = caseproto_add_width (ds->proto, ds->columns[i].width);
293 /* Returns the width of the given COLUMN within DS.
294 COLUMN must be less than the number of columns in DS. */
296 datasheet_get_column_width (const struct datasheet *ds, size_t column)
298 assert (column < datasheet_get_n_columns (ds));
299 return ds->columns[column].width;
302 /* Moves datasheet DS to a new location in memory, and returns
303 the new location. Afterward, the datasheet must not be
304 accessed at its former location.
306 This function is useful for ensuring that all references to a
307 datasheet have been dropped, especially in conjunction with
308 tools like Valgrind. */
310 datasheet_rename (struct datasheet *ds)
312 struct datasheet *new = xmemdup (ds, sizeof *ds);
317 /* Returns true if datasheet DS is tainted.
318 A datasheet is tainted by an I/O error or by taint
319 propagation to the datasheet. */
321 datasheet_error (const struct datasheet *ds)
323 return taint_is_tainted (ds->taint);
326 /* Marks datasheet DS tainted. */
328 datasheet_force_error (struct datasheet *ds)
330 taint_set_taint (ds->taint);
333 /* Returns datasheet DS's taint object. */
335 datasheet_get_taint (const struct datasheet *ds)
340 /* Returns the number of rows in DS. */
342 datasheet_get_n_rows (const struct datasheet *ds)
344 return axis_get_size (ds->rows);
347 /* Returns the number of columns in DS. */
349 datasheet_get_n_columns (const struct datasheet *ds)
351 return ds->n_columns;
354 /* Inserts a column of the given WIDTH into datasheet DS just
355 before column BEFORE. Initializes the contents of each row in
356 the inserted column to VALUE (which must have width WIDTH).
358 Returns true if successful, false on failure. In case of
359 failure, the datasheet is unchanged. */
361 datasheet_insert_column (struct datasheet *ds,
362 const union value *value, int width, size_t before)
366 assert (before <= ds->n_columns);
368 ds->columns = xnrealloc (ds->columns,
369 ds->n_columns + 1, sizeof *ds->columns);
370 insert_element (ds->columns, ds->n_columns, sizeof *ds->columns, before);
371 col = &ds->columns[before];
374 allocate_column (ds, width, col);
376 if (width >= 0 && !source_write_column (col, value))
378 datasheet_delete_columns (ds, before, 1);
379 taint_set_taint (ds->taint);
386 /* Deletes the N columns in DS starting from column START. */
388 datasheet_delete_columns (struct datasheet *ds, size_t start, size_t n)
390 assert (start + n <= ds->n_columns);
396 for (i = start; i < start + n; i++)
398 struct column *column = &ds->columns[i];
399 struct source *source = column->source;
400 source_release_column (source, column->byte_ofs, column->width);
401 release_source (ds, source);
404 remove_range (ds->columns, ds->n_columns, sizeof *ds->columns, start, n);
407 caseproto_unref (ds->proto);
412 /* Moves the N columns in DS starting at position OLD_START so
413 that they then start at position NEW_START. Equivalent to
414 deleting the column rows, then inserting them at what becomes
415 position NEW_START after the deletion. */
417 datasheet_move_columns (struct datasheet *ds,
418 size_t old_start, size_t new_start,
421 assert (old_start + n <= ds->n_columns);
422 assert (new_start + n <= ds->n_columns);
424 move_range (ds->columns, ds->n_columns, sizeof *ds->columns,
425 old_start, new_start, n);
427 caseproto_unref (ds->proto);
431 struct resize_datasheet_value_aux
433 union value src_value;
437 void (*resize_cb) (const union value *, union value *, const void *aux);
438 const void *resize_cb_aux;
440 union value dst_value;
446 resize_datasheet_value (const void *src, void *dst, void *aux_)
448 struct resize_datasheet_value_aux *aux = aux_;
450 memcpy (value_to_data (&aux->src_value, aux->src_width),
451 (uint8_t *) src + aux->src_ofs,
452 width_to_n_bytes (aux->src_width));
454 aux->resize_cb (&aux->src_value, &aux->dst_value, aux->resize_cb_aux);
456 memcpy ((uint8_t *) dst + aux->dst_ofs,
457 value_to_data (&aux->dst_value, aux->dst_width),
458 width_to_n_bytes (aux->dst_width));
464 datasheet_resize_column (struct datasheet *ds, size_t column, int new_width,
465 void (*resize_cb) (const union value *,
466 union value *, const void *aux),
467 const void *resize_cb_aux)
469 struct column old_col;
473 assert (column < datasheet_get_n_columns (ds));
475 col = &ds->columns[column];
477 old_width = old_col.width;
478 assert (old_width >= 0);
479 assert (new_width >= 0);
485 datasheet_delete_columns (ds, column, 1);
486 datasheet_insert_column (ds, NULL, -1, column);
489 else if (old_width == -1)
492 value_init (&value, new_width);
493 value_set_missing (&value, new_width);
494 if (resize_cb != NULL)
495 resize_cb (NULL, &value, resize_cb_aux);
496 datasheet_delete_columns (ds, column, 1);
497 datasheet_insert_column (ds, &value, new_width, column);
498 value_destroy (&value, new_width);
500 else if (source_has_backing (col->source))
502 unsigned long int n_rows = axis_get_size (ds->rows);
503 unsigned long int lrow;
504 union value src, dst;
506 source_release_column (col->source, col->byte_ofs, col->width);
507 allocate_column (ds, new_width, col);
509 value_init (&src, old_width);
510 value_init (&dst, new_width);
511 for (lrow = 0; lrow < n_rows; lrow++)
513 unsigned long int prow = axis_map (ds->rows, lrow);
514 if (!source_read (&old_col, prow, &src, 1))
516 /* FIXME: back out col changes. */
519 resize_cb (&src, &dst, resize_cb_aux);
520 if (!source_write (col, prow, &dst, 1))
522 /* FIXME: back out col changes. */
526 value_destroy (&src, old_width);
527 value_destroy (&dst, new_width);
531 release_source (ds, old_col.source);
535 struct resize_datasheet_value_aux aux;
537 source_release_column (col->source, col->byte_ofs, col->width);
538 allocate_column (ds, new_width, col);
540 value_init (&aux.src_value, old_col.width);
541 aux.src_ofs = old_col.byte_ofs;
542 aux.src_width = old_col.width;
543 aux.resize_cb = resize_cb;
544 aux.resize_cb_aux = resize_cb_aux;
545 value_init (&aux.dst_value, new_width);
546 aux.dst_ofs = col->byte_ofs;
547 aux.dst_width = new_width;
548 sparse_xarray_copy (old_col.source->data, col->source->data,
549 resize_datasheet_value, &aux);
550 value_destroy (&aux.src_value, old_width);
551 value_destroy (&aux.dst_value, new_width);
553 release_source (ds, old_col.source);
558 /* Retrieves and returns the contents of the given ROW in
559 datasheet DS. The caller owns the returned case and must
560 unref it when it is no longer needed. Returns a null pointer
563 datasheet_get_row (const struct datasheet *ds, casenumber row)
565 size_t n_columns = datasheet_get_n_columns (ds);
566 struct ccase *c = case_create (datasheet_get_proto (ds));
567 if (rw_case (CONST_CAST (struct datasheet *, ds), OP_READ,
568 row, 0, n_columns, case_data_all_rw (c)))
577 /* Stores the contents of case C, which is destroyed, into the
578 given ROW in DS. Returns true on success, false on I/O error.
579 On failure, the given ROW might be partially modified or
582 datasheet_put_row (struct datasheet *ds, casenumber row, struct ccase *c)
584 size_t n_columns = datasheet_get_n_columns (ds);
585 bool ok = rw_case (ds, OP_WRITE, row, 0, n_columns,
586 (union value *) case_data_all (c));
591 /* Stores the values of COLUMN in DS in the given ROW in DS into
592 VALUE. The caller must have already initialized VALUE as a
593 value of the appropriate width (as returned by
594 datasheet_get_column_width (DS, COLUMN)). Returns true if
595 successful, false on I/O error. */
597 datasheet_get_value (const struct datasheet *ds, casenumber row,
598 size_t column, union value *value)
601 return rw_case (CONST_CAST (struct datasheet *, ds), OP_READ,
602 row, column, 1, value);
605 /* Stores VALUE into DS in the given ROW and COLUMN. VALUE must
606 have the correct width for COLUMN (as returned by
607 datasheet_get_column_width (DS, COLUMN)). Returns true if
608 successful, false on I/O error. On failure, ROW might be
609 partially modified or corrupted. */
611 datasheet_put_value (struct datasheet *ds, casenumber row,
612 size_t column, const union value *value)
614 return rw_case (ds, OP_WRITE, row, column, 1, (union value *) value);
617 /* Inserts the N cases at C into datasheet DS just before row
618 BEFORE. Returns true if successful, false on I/O error. On
619 failure, datasheet DS is not modified.
621 Regardless of success, this function unrefs all of the cases
624 datasheet_insert_rows (struct datasheet *ds,
625 casenumber before, struct ccase *c[],
628 casenumber added = 0;
631 unsigned long first_phy;
632 unsigned long n_phys;
635 /* Allocate physical rows from the pool of available
637 if (!axis_allocate (ds->rows, n, &first_phy, &n_phys))
639 /* No rows were available. Extend the row axis to make
640 some new ones available. */
642 first_phy = axis_extend (ds->rows, n);
645 /* Insert the new rows into the row mapping. */
646 axis_insert (ds->rows, before, first_phy, n_phys);
648 /* Initialize the new rows. */
649 for (i = 0; i < n_phys; i++)
650 if (!datasheet_put_row (ds, before + i, c[i]))
654 datasheet_delete_rows (ds, before - added, n_phys + added);
667 /* Deletes the N rows in DS starting from row FIRST. */
669 datasheet_delete_rows (struct datasheet *ds,
670 casenumber first, casenumber n)
674 /* Free up rows for reuse.
676 for (lrow = first; lrow < first + n; lrow++)
677 axis_make_available (ds->rows, axis_map (ds->rows, lrow), 1);
679 /* Remove rows from logical-to-physical mapping. */
680 axis_remove (ds->rows, first, n);
683 /* Moves the N rows in DS starting at position OLD_START so
684 that they then start at position NEW_START. Equivalent to
685 deleting the given rows, then inserting them at what becomes
686 position NEW_START after the deletion. */
688 datasheet_move_rows (struct datasheet *ds,
689 size_t old_start, size_t new_start,
692 axis_move (ds->rows, old_start, new_start, n);
695 static const struct casereader_random_class datasheet_reader_class;
697 /* Creates and returns a casereader whose input cases are the
698 rows in datasheet DS. From the caller's perspective, DS is
699 effectively destroyed by this operation, such that the caller
700 must not reference it again. */
702 datasheet_make_reader (struct datasheet *ds)
704 struct casereader *reader;
705 ds = datasheet_rename (ds);
706 reader = casereader_create_random (datasheet_get_proto (ds),
707 datasheet_get_n_rows (ds),
708 &datasheet_reader_class, ds);
709 taint_propagate (datasheet_get_taint (ds), casereader_get_taint (reader));
713 /* "read" function for the datasheet random casereader. */
714 static struct ccase *
715 datasheet_reader_read (struct casereader *reader UNUSED, void *ds_,
718 struct datasheet *ds = ds_;
719 if (case_idx < datasheet_get_n_rows (ds))
721 struct ccase *c = datasheet_get_row (ds, case_idx);
723 taint_set_taint (ds->taint);
730 /* "destroy" function for the datasheet random casereader. */
732 datasheet_reader_destroy (struct casereader *reader UNUSED, void *ds_)
734 struct datasheet *ds = ds_;
735 datasheet_destroy (ds);
738 /* "advance" function for the datasheet random casereader. */
740 datasheet_reader_advance (struct casereader *reader UNUSED, void *ds_,
743 struct datasheet *ds = ds_;
744 datasheet_delete_rows (ds, 0, n_cases);
747 /* Random casereader class for a datasheet. */
748 static const struct casereader_random_class datasheet_reader_class =
750 datasheet_reader_read,
751 datasheet_reader_destroy,
752 datasheet_reader_advance,
756 allocate_column (struct datasheet *ds, int width, struct column *column)
758 caseproto_unref (ds->proto);
761 column->value_ofs = -1;
762 column->width = width;
769 n_bytes = width_to_n_bytes (width);
770 for (i = 0; i < ds->n_sources; i++)
772 column->source = ds->sources[i];
773 column->byte_ofs = source_allocate_column (column->source, n_bytes);
774 if (column->byte_ofs >= 0)
778 column->source = source_create_empty (MAX (n_bytes,
779 ds->column_min_alloc));
780 ds->sources = xnrealloc (ds->sources,
781 ds->n_sources + 1, sizeof *ds->sources);
782 ds->sources[ds->n_sources++] = column->source;
784 ds->column_min_alloc = MIN (65536, ds->column_min_alloc * 2);
786 column->byte_ofs = source_allocate_column (column->source, n_bytes);
787 assert (column->byte_ofs >= 0);
791 column->source = NULL;
792 column->byte_ofs = -1;
797 release_source (struct datasheet *ds, struct source *source)
799 if (source_has_backing (source) && !source_in_use (source))
801 /* Since only the first source to be added ever
802 has a backing, this source must have index
804 assert (source == ds->sources[0]);
805 ds->sources[0] = ds->sources[--ds->n_sources];
806 source_destroy (source);
810 /* Reads (if OP is OP_READ) or writes (if op is OP_WRITE) the
811 N_COLUMNS columns starting from column START_COLUMN in row
812 LROW to/from the N_COLUMNS values in DATA. */
814 rw_case (struct datasheet *ds, enum rw_op op,
815 casenumber lrow, size_t start_column, size_t n_columns,
818 struct column *columns = &ds->columns[start_column];
822 assert (lrow < datasheet_get_n_rows (ds));
823 assert (n_columns <= datasheet_get_n_columns (ds));
824 assert (start_column + n_columns <= datasheet_get_n_columns (ds));
826 prow = axis_map (ds->rows, lrow);
827 for (i = 0; i < n_columns;)
829 struct source *source = columns[i].source;
833 if (columns[i].width < 0)
839 for (j = i + 1; j < n_columns; j++)
840 if (columns[j].width < 0 || columns[j].source != source)
844 ok = source_read (&columns[i], prow, &data[i], j - i);
846 ok = source_write (&columns[i], prow, &data[i], j - i);
850 taint_set_taint (ds->taint);
861 An axis has two functions. First, it maintains a mapping from
862 logical (client-visible) to physical (storage) ordinates. The
863 axis_map and axis_get_size functions inspect this mapping, and
864 the axis_insert, axis_remove, and axis_move functions modify
865 it. Second, it tracks the set of ordinates that are unused
866 and available for reuse. The axis_allocate,
867 axis_make_available, and axis_extend functions affect the set
868 of available ordinates. */
871 struct tower log_to_phy; /* Map from logical to physical ordinates;
872 contains "struct axis_group"s. */
873 struct range_set *available; /* Set of unused, available ordinates. */
874 unsigned long int phy_size; /* Current physical length of axis. */
877 /* A mapping from logical to physical ordinates. */
880 struct tower_node logical; /* Range of logical ordinates. */
881 unsigned long phy_start; /* First corresponding physical ordinate. */
884 static struct axis_group *axis_group_from_tower_node (struct tower_node *);
885 static struct tower_node *make_axis_group (unsigned long int phy_start);
886 static struct tower_node *split_axis (struct axis *, unsigned long int where);
887 static void merge_axis_nodes (struct axis *, struct tower_node *,
888 struct tower_node **other_node);
889 static void check_axis_merged (const struct axis *axis UNUSED);
891 /* Creates and returns a new, initially empty axis. */
895 struct axis *axis = xmalloc (sizeof *axis);
896 tower_init (&axis->log_to_phy);
897 axis->available = range_set_create ();
902 /* Returns a clone of existing axis OLD.
904 Currently this is used only by the datasheet model checker
905 driver, but it could be otherwise useful. */
907 axis_clone (const struct axis *old)
909 const struct tower_node *node;
912 new = xmalloc (sizeof *new);
913 tower_init (&new->log_to_phy);
914 new->available = range_set_clone (old->available, NULL);
915 new->phy_size = old->phy_size;
917 for (node = tower_first (&old->log_to_phy); node != NULL;
918 node = tower_next (&old->log_to_phy, node))
920 unsigned long int size = tower_node_get_size (node);
921 struct axis_group *group = tower_data (node, struct axis_group, logical);
922 tower_insert (&new->log_to_phy, size, make_axis_group (group->phy_start),
929 /* Adds the state of AXIS to the MD4 hash context CTX.
931 This is only used by the datasheet model checker driver. It
932 is unlikely to be otherwise useful. */
934 axis_hash (const struct axis *axis, struct md4_ctx *ctx)
936 const struct tower_node *tn;
937 const struct range_set_node *rsn;
939 for (tn = tower_first (&axis->log_to_phy); tn != NULL;
940 tn = tower_next (&axis->log_to_phy, tn))
942 struct axis_group *group = tower_data (tn, struct axis_group, logical);
943 unsigned long int phy_start = group->phy_start;
944 unsigned long int size = tower_node_get_size (tn);
946 md4_process_bytes (&phy_start, sizeof phy_start, ctx);
947 md4_process_bytes (&size, sizeof size, ctx);
950 RANGE_SET_FOR_EACH (rsn, axis->available)
952 unsigned long int start = range_set_node_get_start (rsn);
953 unsigned long int end = range_set_node_get_end (rsn);
955 md4_process_bytes (&start, sizeof start, ctx);
956 md4_process_bytes (&end, sizeof end, ctx);
959 md4_process_bytes (&axis->phy_size, sizeof axis->phy_size, ctx);
964 axis_destroy (struct axis *axis)
969 while (!tower_is_empty (&axis->log_to_phy))
971 struct tower_node *node = tower_first (&axis->log_to_phy);
972 struct axis_group *group = tower_data (node, struct axis_group,
974 tower_delete (&axis->log_to_phy, node);
978 range_set_destroy (axis->available);
982 /* Allocates up to REQUEST contiguous unused and available
983 ordinates from AXIS. If successful, stores the number
984 obtained into *WIDTH and the ordinate of the first into
985 *START, marks the ordinates as now unavailable return true.
986 On failure, which occurs only if AXIS has no unused and
987 available ordinates, returns false without modifying AXIS. */
989 axis_allocate (struct axis *axis, unsigned long int request,
990 unsigned long int *start, unsigned long int *width)
992 return range_set_allocate (axis->available, request, start, width);
995 /* Marks the WIDTH contiguous ordinates in AXIS, starting from
996 START, as unused and available. */
998 axis_make_available (struct axis *axis,
999 unsigned long int start, unsigned long int width)
1001 range_set_set1 (axis->available, start, width);
1004 /* Extends the total physical length of AXIS by WIDTH and returns
1005 the first ordinate in the new physical region. */
1006 static unsigned long int
1007 axis_extend (struct axis *axis, unsigned long int width)
1009 unsigned long int start = axis->phy_size;
1010 axis->phy_size += width;
1014 /* Returns the physical ordinate in AXIS corresponding to logical
1015 ordinate LOG_POS. LOG_POS must be less than the logical
1017 static unsigned long int
1018 axis_map (const struct axis *axis, unsigned long log_pos)
1020 struct tower_node *node;
1021 struct axis_group *group;
1022 unsigned long int group_start;
1024 node = tower_lookup (&axis->log_to_phy, log_pos, &group_start);
1025 group = tower_data (node, struct axis_group, logical);
1026 return group->phy_start + (log_pos - group_start);
1029 /* Returns the logical length of AXIS. */
1030 static unsigned long
1031 axis_get_size (const struct axis *axis)
1033 return tower_height (&axis->log_to_phy);
1036 /* Inserts the N contiguous physical ordinates starting at
1037 PHY_START into AXIS's logical-to-physical mapping, starting at
1038 logical position LOG_START. */
1040 axis_insert (struct axis *axis,
1041 unsigned long int log_start, unsigned long int phy_start,
1042 unsigned long int n)
1044 struct tower_node *before = split_axis (axis, log_start);
1045 struct tower_node *new = make_axis_group (phy_start);
1046 tower_insert (&axis->log_to_phy, n, new, before);
1047 merge_axis_nodes (axis, new, NULL);
1048 check_axis_merged (axis);
1051 /* Removes N ordinates from AXIS's logical-to-physical mapping
1052 starting at logical position START. */
1054 axis_remove (struct axis *axis,
1055 unsigned long int start, unsigned long int n)
1059 struct tower_node *last = split_axis (axis, start + n);
1060 struct tower_node *cur, *next;
1061 for (cur = split_axis (axis, start); cur != last; cur = next)
1063 next = tower_delete (&axis->log_to_phy, cur);
1064 free (axis_group_from_tower_node (cur));
1066 merge_axis_nodes (axis, last, NULL);
1067 check_axis_merged (axis);
1071 /* Moves the N ordinates in AXIS's logical-to-mapping starting
1072 at logical position OLD_START so that they then start at
1073 position NEW_START. */
1075 axis_move (struct axis *axis,
1076 unsigned long int old_start, unsigned long int new_start,
1077 unsigned long int n)
1079 if (n > 0 && old_start != new_start)
1081 struct tower_node *old_first, *old_last, *new_first;
1082 struct tower_node *merge1, *merge2;
1083 struct tower tmp_array;
1085 /* Move ordinates OLD_START...(OLD_START + N) into new,
1086 separate TMP_ARRAY. */
1087 old_first = split_axis (axis, old_start);
1088 old_last = split_axis (axis, old_start + n);
1089 tower_init (&tmp_array);
1090 tower_splice (&tmp_array, NULL,
1091 &axis->log_to_phy, old_first, old_last);
1092 merge_axis_nodes (axis, old_last, NULL);
1093 check_axis_merged (axis);
1095 /* Move TMP_ARRAY to position NEW_START. */
1096 new_first = split_axis (axis, new_start);
1097 merge1 = tower_first (&tmp_array);
1098 merge2 = tower_last (&tmp_array);
1099 if (merge2 == merge1)
1101 tower_splice (&axis->log_to_phy, new_first, &tmp_array, old_first, NULL);
1102 merge_axis_nodes (axis, merge1, &merge2);
1103 merge_axis_nodes (axis, merge2, NULL);
1104 check_axis_merged (axis);
1108 /* Returns the axis_group in which NODE is embedded. */
1109 static struct axis_group *
1110 axis_group_from_tower_node (struct tower_node *node)
1112 return tower_data (node, struct axis_group, logical);
1115 /* Creates and returns a new axis_group at physical position
1117 static struct tower_node *
1118 make_axis_group (unsigned long phy_start)
1120 struct axis_group *group = xmalloc (sizeof *group);
1121 group->phy_start = phy_start;
1122 return &group->logical;
1125 /* Returns the tower_node in AXIS's logical-to-physical map whose
1126 bottom edge is at exact level WHERE. If there is no such
1127 tower_node in AXIS's logical-to-physical map, then split_axis
1128 creates one by breaking an existing tower_node into two
1129 separate ones, unless WHERE is equal to the tower height, in
1130 which case it simply returns a null pointer. */
1131 static struct tower_node *
1132 split_axis (struct axis *axis, unsigned long int where)
1134 unsigned long int group_start;
1135 struct tower_node *group_node;
1136 struct axis_group *group;
1138 assert (where <= tower_height (&axis->log_to_phy));
1139 if (where >= tower_height (&axis->log_to_phy))
1142 group_node = tower_lookup (&axis->log_to_phy, where, &group_start);
1143 group = axis_group_from_tower_node (group_node);
1144 if (where > group_start)
1146 unsigned long int size_1 = where - group_start;
1147 unsigned long int size_2 = tower_node_get_size (group_node) - size_1;
1148 struct tower_node *next = tower_next (&axis->log_to_phy, group_node);
1149 struct tower_node *new = make_axis_group (group->phy_start + size_1);
1150 tower_resize (&axis->log_to_phy, group_node, size_1);
1151 tower_insert (&axis->log_to_phy, size_2, new, next);
1155 return &group->logical;
1158 /* Within AXIS, attempts to merge NODE (or the last node in AXIS,
1159 if NODE is null) with its neighbor nodes. This is possible
1160 when logically adjacent nodes are also adjacent physically (in
1163 When a merge occurs, and OTHER_NODE is non-null and points to
1164 the node to be deleted, this function also updates
1165 *OTHER_NODE, if necessary, to ensure that it remains a valid
1168 merge_axis_nodes (struct axis *axis, struct tower_node *node,
1169 struct tower_node **other_node)
1171 struct tower *t = &axis->log_to_phy;
1172 struct axis_group *group;
1173 struct tower_node *next, *prev;
1175 /* Find node to potentially merge with neighbors. */
1177 node = tower_last (t);
1180 group = axis_group_from_tower_node (node);
1182 /* Try to merge NODE with successor. */
1183 next = tower_next (t, node);
1186 struct axis_group *next_group = axis_group_from_tower_node (next);
1187 unsigned long this_height = tower_node_get_size (node);
1189 if (group->phy_start + this_height == next_group->phy_start)
1191 unsigned long next_height = tower_node_get_size (next);
1192 tower_resize (t, node, this_height + next_height);
1193 if (other_node != NULL && *other_node == next)
1194 *other_node = tower_next (t, *other_node);
1195 tower_delete (t, next);
1200 /* Try to merge NODE with predecessor. */
1201 prev = tower_prev (t, node);
1204 struct axis_group *prev_group = axis_group_from_tower_node (prev);
1205 unsigned long prev_height = tower_node_get_size (prev);
1207 if (prev_group->phy_start + prev_height == group->phy_start)
1209 unsigned long this_height = tower_node_get_size (node);
1210 group->phy_start = prev_group->phy_start;
1211 tower_resize (t, node, this_height + prev_height);
1212 if (other_node != NULL && *other_node == prev)
1213 *other_node = tower_next (t, *other_node);
1214 tower_delete (t, prev);
1220 /* Verify that all potentially merge-able nodes in AXIS are
1223 check_axis_merged (const struct axis *axis UNUSED)
1225 #if ASSERT_LEVEL >= 10
1226 struct tower_node *prev, *node;
1228 for (prev = NULL, node = tower_first (&axis->log_to_phy); node != NULL;
1229 prev = node, node = tower_next (&axis->log_to_phy, node))
1232 struct axis_group *prev_group = axis_group_from_tower_node (prev);
1233 unsigned long prev_height = tower_node_get_size (prev);
1234 struct axis_group *node_group = axis_group_from_tower_node (node);
1235 assert (prev_group->phy_start + prev_height != node_group->phy_start);
1242 /* Creates and returns an empty, unbacked source with N_BYTES
1243 bytes per case, none of which are initially in use. */
1244 static struct source *
1245 source_create_empty (size_t n_bytes)
1247 struct source *source = xmalloc (sizeof *source);
1248 size_t row_size = n_bytes + 4 * sizeof (void *);
1249 size_t max_memory_rows = settings_get_workspace () / row_size;
1250 source->avail = range_set_create ();
1251 range_set_set1 (source->avail, 0, n_bytes);
1252 source->data = sparse_xarray_create (n_bytes, MAX (max_memory_rows, 4));
1253 source->backing = NULL;
1254 source->backing_rows = 0;
1259 /* Creates and returns a new source backed by READER and with the
1260 same initial dimensions and content. */
1261 static struct source *
1262 source_create_casereader (struct casereader *reader)
1264 const struct caseproto *proto = casereader_get_proto (reader);
1265 size_t n_bytes = caseproto_to_n_bytes (proto);
1266 struct source *source = source_create_empty (n_bytes);
1270 range_set_set0 (source->avail, 0, n_bytes);
1271 source->backing = reader;
1272 source->backing_rows = casereader_count_cases (reader);
1275 n_columns = caseproto_get_n_widths (proto);
1276 for (i = 0; i < n_columns; i++)
1277 if (caseproto_get_width (proto, i) >= 0)
1283 /* Returns a clone of source OLD with the same data and backing
1286 Currently this is used only by the datasheet model checker
1287 driver, but it could be otherwise useful. */
1288 static struct source *
1289 source_clone (const struct source *old)
1291 struct source *new = xmalloc (sizeof *new);
1292 new->avail = range_set_clone (old->avail, NULL);
1293 new->data = sparse_xarray_clone (old->data);
1294 new->backing = old->backing != NULL ? casereader_clone (old->backing) : NULL;
1295 new->backing_rows = old->backing_rows;
1296 new->n_used = old->n_used;
1297 if (new->data == NULL)
1299 source_destroy (new);
1306 source_allocate_column (struct source *source, int width)
1308 unsigned long int start;
1311 assert (width >= 0);
1312 n_bytes = width_to_n_bytes (width);
1313 if (source->backing == NULL
1314 && range_set_allocate_fully (source->avail, n_bytes, &start))
1321 source_release_column (struct source *source, int ofs, int width)
1323 assert (width >= 0);
1324 range_set_set1 (source->avail, ofs, width_to_n_bytes (width));
1325 if (source->backing != NULL)
1329 /* Returns true if SOURCE has any columns in use,
1332 source_in_use (const struct source *source)
1334 return source->n_used > 0;
1337 /* Destroys SOURCE and its data and backing, if any. */
1339 source_destroy (struct source *source)
1343 range_set_destroy (source->avail);
1344 sparse_xarray_destroy (source->data);
1345 casereader_destroy (source->backing);
1350 /* Returns the number of rows in SOURCE's backing casereader
1351 (SOURCE must have a backing casereader). */
1353 source_get_backing_n_rows (const struct source *source)
1355 assert (source_has_backing (source));
1356 return source->backing_rows;
1359 /* Reads the N COLUMNS in the given ROW, into the N VALUES. Returns true if
1360 successful, false on I/O error.
1362 All of the COLUMNS must have the same source.
1364 The caller must have initialized VALUES with the proper width. */
1366 source_read (const struct column columns[], casenumber row,
1367 union value values[], size_t n)
1369 struct source *source = columns[0].source;
1372 if (source->backing == NULL
1373 || sparse_xarray_contains_row (source->data, row))
1377 for (i = 0; i < n && ok; i++)
1378 ok = sparse_xarray_read (source->data, row, columns[i].byte_ofs,
1379 width_to_n_bytes (columns[i].width),
1380 value_to_data (&values[i], columns[i].width));
1385 struct ccase *c = casereader_peek (source->backing, row);
1386 bool ok = c != NULL;
1389 for (i = 0; i < n; i++)
1390 value_copy (&values[i], case_data_idx (c, columns[i].value_ofs),
1399 copy_case_into_source (struct source *source, struct ccase *c, casenumber row)
1401 const struct caseproto *proto = casereader_get_proto (source->backing);
1402 size_t n_widths = caseproto_get_n_widths (proto);
1407 for (i = 0; i < n_widths; i++)
1409 int width = caseproto_get_width (proto, i);
1410 assert (width >= 0);
1413 int n_bytes = width_to_n_bytes (width);
1414 if (!sparse_xarray_write (source->data, row, ofs, n_bytes,
1415 value_to_data (case_data_idx (c, i),
1424 /* Writes the N VALUES to their source in the given ROW and COLUMNS. Returns
1425 true if successful, false on I/O error. On error, the row's data may be
1426 completely or partially corrupted, both inside and outside the region to be
1429 All of the COLUMNS must have the same source. */
1431 source_write (const struct column columns[], casenumber row,
1432 const union value values[], size_t n)
1434 struct source *source = columns[0].source;
1435 struct casereader *backing = source->backing;
1439 && !sparse_xarray_contains_row (source->data, row)
1440 && row < source->backing_rows)
1445 c = casereader_peek (backing, row);
1449 ok = copy_case_into_source (source, c, row);
1455 for (i = 0; i < n; i++)
1456 if (!sparse_xarray_write (source->data, row, columns[i].byte_ofs,
1457 width_to_n_bytes (columns[i].width),
1458 value_to_data (&values[i], columns[i].width)))
1464 source_write_column (struct column *column, const union value *value)
1466 int width = column->width;
1468 assert (column->source->backing == NULL);
1469 assert (width >= 0);
1471 return sparse_xarray_write_columns (column->source->data, column->byte_ofs,
1472 width_to_n_bytes (width),
1473 value_to_data (value, width));
1476 /* Returns true if SOURCE has a backing casereader, false
1479 source_has_backing (const struct source *source)
1481 return source->backing != NULL;
1484 /* Datasheet model checker test driver. */
1487 get_source_index (const struct datasheet *ds, const struct source *source)
1491 for (i = 0; i < ds->n_sources; i++)
1492 if (ds->sources[i] == source)
1497 /* Clones the structure and contents of ODS into a new datasheet,
1498 and returns the new datasheet. */
1500 clone_datasheet (const struct datasheet *ods)
1502 struct datasheet *ds;
1505 ds = xmalloc (sizeof *ds);
1507 ds->sources = xmalloc (ods->n_sources * sizeof *ds->sources);
1508 for (i = 0; i < ods->n_sources; i++)
1509 ds->sources[i] = source_clone (ods->sources[i]);
1510 ds->n_sources = ods->n_sources;
1512 ds->proto = ods->proto != NULL ? caseproto_ref (ods->proto) : NULL;
1513 ds->columns = xmemdup (ods->columns, ods->n_columns * sizeof *ods->columns);
1514 for (i = 0; i < ods->n_columns; i++)
1515 ds->columns[i].source
1516 = ds->sources[get_source_index (ods, ods->columns[i].source)];
1517 ds->n_columns = ods->n_columns;
1518 ds->column_min_alloc = ods->column_min_alloc;
1520 ds->rows = axis_clone (ods->rows);
1522 ds->taint = taint_create ();
1527 /* Hashes the structure of datasheet DS and returns the hash.
1528 We use MD4 because it is much faster than MD5 or SHA-1 but its
1529 collision resistance is just as good. */
1531 hash_datasheet (const struct datasheet *ds)
1533 unsigned int hash[DIV_RND_UP (20, sizeof (unsigned int))];
1537 md4_init_ctx (&ctx);
1538 for (i = 0; i < ds->n_columns; i++)
1540 const struct column *column = &ds->columns[i];
1541 int source_n_bytes = sparse_xarray_get_n_columns (column->source->data);
1542 md4_process_bytes (&source_n_bytes, sizeof source_n_bytes, &ctx);
1543 /*md4_process_bytes (&column->byte_ofs, sizeof column->byte_ofs, &ctx);*/
1544 md4_process_bytes (&column->value_ofs, sizeof column->value_ofs, &ctx);
1545 md4_process_bytes (&column->width, sizeof column->width, &ctx);
1547 axis_hash (ds->rows, &ctx);
1548 md4_process_bytes (&ds->column_min_alloc, sizeof ds->column_min_alloc, &ctx);
1549 md4_finish_ctx (&ctx, hash);