1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 2007, 2009, 2010, 2011, 2012 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
19 #include "data/datasheet.h"
24 #include "data/casereader-provider.h"
25 #include "data/casereader.h"
26 #include "data/casewriter.h"
27 #include "data/lazy-casereader.h"
28 #include "data/settings.h"
29 #include "libpspp/array.h"
30 #include "libpspp/assertion.h"
31 #include "libpspp/misc.h"
32 #include "libpspp/range-map.h"
33 #include "libpspp/range-set.h"
34 #include "libpspp/sparse-xarray.h"
35 #include "libpspp/taint.h"
36 #include "libpspp/tower.h"
38 #include "gl/minmax.h"
40 #include "gl/xalloc.h"
44 static struct axis *axis_create (void);
45 static struct axis *axis_clone (const struct axis *);
46 static void axis_destroy (struct axis *);
48 static void axis_hash (const struct axis *, struct md4_ctx *);
50 static bool axis_allocate (struct axis *, unsigned long int request,
51 unsigned long int *start,
52 unsigned long int *width);
53 static void axis_make_available (struct axis *,
54 unsigned long int start,
55 unsigned long int width);
56 static unsigned long int axis_extend (struct axis *, unsigned long int width);
58 static unsigned long int axis_map (const struct axis *, unsigned long log_pos);
60 static unsigned long axis_get_size (const struct axis *);
61 static void axis_insert (struct axis *,
62 unsigned long int log_start,
63 unsigned long int phy_start,
64 unsigned long int cnt);
65 static void axis_remove (struct axis *,
66 unsigned long int start, unsigned long int cnt);
67 static void axis_move (struct axis *,
68 unsigned long int old_start,
69 unsigned long int new_start,
70 unsigned long int cnt);
72 static struct source *source_create_empty (size_t n_bytes);
73 static struct source *source_create_casereader (struct casereader *);
74 static struct source *source_clone (const struct source *);
75 static void source_destroy (struct source *);
77 static casenumber source_get_backing_n_rows (const struct source *);
79 static int source_allocate_column (struct source *, int width);
80 static void source_release_column (struct source *, int ofs, int width);
81 static bool source_in_use (const struct source *);
83 static bool source_read (const struct column *, casenumber row, union value *);
84 static bool source_write (const struct column *, casenumber row,
86 static bool source_write_column (struct column *, const union value *);
87 static bool source_has_backing (const struct source *);
89 static int get_source_index (const struct datasheet *ds, const struct source *source);
91 /* A datasheet is internally composed from a set of data files,
92 called "sources". The sources that make up a datasheet must
93 have the same number of rows (cases), but their numbers of
94 columns (variables) may vary.
96 A datasheet's external view is produced by mapping (permuting
97 and selecting) its internal data. Thus, we can rearrange or
98 delete rows or columns simply by modifying the mapping. We
99 add rows by adding rows to each source and to the row mapping.
100 We add columns by adding a new source, then adding that source
101 to the column mapping.
103 Each source in a datasheet can be a casereader or a
104 sparse_xarray. Casereaders are read-only, so when sources
105 made from casereaders need to be modified, it is done
106 "virtually" through being overlaid by a sparse_xarray. */
109 struct range_set *avail; /* Free bytes are set to 1s. */
110 struct sparse_xarray *data; /* Data at top level, atop the backing. */
111 struct casereader *backing; /* Backing casereader (or null). */
112 casenumber backing_rows; /* Number of rows in backing (if backed). */
113 size_t n_used; /* Number of column in use (if backed). */
116 /* A logical column. */
119 struct source *source; /* Source of the underlying physical column. */
120 int value_ofs; /* If 'source' has a backing casereader,
121 column's value offset in its cases. */
122 int byte_ofs; /* Byte offset in source's sparse_xarray. */
123 int width; /* 0=numeric, otherwise string width. */
130 struct source **sources; /* Sources, in no particular order. */
131 size_t n_sources; /* Number of sources. */
134 struct caseproto *proto; /* Prototype for rows (initialized lazily). */
135 struct column *columns; /* Logical to physical column mapping. */
136 size_t n_columns; /* Number of logical columns. */
137 unsigned column_min_alloc; /* Min. # of columns to put in a new source. */
140 struct axis *rows; /* Logical to physical row mapping. */
143 struct taint *taint; /* Indicates corrupted data. */
146 /* Is this operation a read or a write? */
153 static void allocate_column (struct datasheet *, int width, struct column *);
154 static void release_source (struct datasheet *, struct source *);
155 static bool rw_case (struct datasheet *ds, enum rw_op op,
156 casenumber lrow, size_t start_column, size_t n_columns,
159 /* Returns the number of bytes needed to store a value with the
160 given WIDTH on disk. */
162 width_to_n_bytes (int width)
164 return width == 0 ? sizeof (double) : width;
167 /* Returns the address of the data in VALUE (for reading or
168 writing to/from disk). VALUE must have the given WIDTH. */
170 value_to_data (const union value *value_, int width)
172 union value *value = (union value *) value_;
173 assert (sizeof value->f == sizeof (double));
177 return value_str_rw (value, width);
180 /* Returns the number of bytes needed to store all the values in
183 caseproto_to_n_bytes (const struct caseproto *proto)
189 for (i = 0; i < caseproto_get_n_widths (proto); i++)
191 int width = caseproto_get_width (proto, i);
193 n_bytes += width_to_n_bytes (width);
198 /* Creates and returns a new datasheet.
200 If READER is nonnull, then the datasheet initially contains
201 the contents of READER. */
203 datasheet_create (struct casereader *reader)
205 struct datasheet *ds = xmalloc (sizeof *ds);
211 ds->column_min_alloc = 8;
212 ds->rows = axis_create ();
213 ds->taint = taint_create ();
221 taint_propagate (casereader_get_taint (reader), ds->taint);
223 ds->proto = caseproto_ref (casereader_get_proto (reader));
225 ds->sources = xmalloc (sizeof *ds->sources);
226 ds->sources[0] = source_create_casereader (reader);
229 ds->n_columns = caseproto_get_n_widths (ds->proto);
230 ds->columns = xnmalloc (ds->n_columns, sizeof *ds->columns);
232 for (i = 0; i < ds->n_columns; i++)
234 struct column *column = &ds->columns[i];
235 int width = caseproto_get_width (ds->proto, i);
236 column->source = ds->sources[0];
237 column->width = width;
240 column->value_ofs = i;
241 column->byte_ofs = byte_ofs;
242 byte_ofs += width_to_n_bytes (column->width);
246 n_rows = source_get_backing_n_rows (ds->sources[0]);
248 axis_insert (ds->rows, 0, axis_extend (ds->rows, n_rows), n_rows);
254 /* Destroys datasheet DS. */
256 datasheet_destroy (struct datasheet *ds)
263 for (i = 0; i < ds->n_sources; i++)
264 source_destroy (ds->sources[i]);
266 caseproto_unref (ds->proto);
268 axis_destroy (ds->rows);
269 taint_destroy (ds->taint);
273 /* Returns the prototype for the cases in DS. The caller must
274 not unref the returned prototype. */
275 const struct caseproto *
276 datasheet_get_proto (const struct datasheet *ds_)
278 struct datasheet *ds = CONST_CAST (struct datasheet *, ds_);
279 if (ds->proto == NULL)
283 ds->proto = caseproto_create ();
284 for (i = 0; i < ds->n_columns; i++)
285 ds->proto = caseproto_add_width (ds->proto, ds->columns[i].width);
290 /* Returns the width of the given COLUMN within DS.
291 COLUMN must be less than the number of columns in DS. */
293 datasheet_get_column_width (const struct datasheet *ds, size_t column)
295 assert (column < datasheet_get_n_columns (ds));
296 return ds->columns[column].width;
299 /* Moves datasheet DS to a new location in memory, and returns
300 the new location. Afterward, the datasheet must not be
301 accessed at its former location.
303 This function is useful for ensuring that all references to a
304 datasheet have been dropped, especially in conjunction with
305 tools like Valgrind. */
307 datasheet_rename (struct datasheet *ds)
309 struct datasheet *new = xmemdup (ds, sizeof *ds);
314 /* Returns true if datasheet DS is tainted.
315 A datasheet is tainted by an I/O error or by taint
316 propagation to the datasheet. */
318 datasheet_error (const struct datasheet *ds)
320 return taint_is_tainted (ds->taint);
323 /* Marks datasheet DS tainted. */
325 datasheet_force_error (struct datasheet *ds)
327 taint_set_taint (ds->taint);
330 /* Returns datasheet DS's taint object. */
332 datasheet_get_taint (const struct datasheet *ds)
337 /* Returns the number of rows in DS. */
339 datasheet_get_n_rows (const struct datasheet *ds)
341 return axis_get_size (ds->rows);
344 /* Returns the number of columns in DS. */
346 datasheet_get_n_columns (const struct datasheet *ds)
348 return ds->n_columns;
351 /* Inserts a column of the given WIDTH into datasheet DS just
352 before column BEFORE. Initializes the contents of each row in
353 the inserted column to VALUE (which must have width WIDTH).
355 Returns true if successful, false on failure. In case of
356 failure, the datasheet is unchanged. */
358 datasheet_insert_column (struct datasheet *ds,
359 const union value *value, int width, size_t before)
363 assert (before <= ds->n_columns);
365 ds->columns = xnrealloc (ds->columns,
366 ds->n_columns + 1, sizeof *ds->columns);
367 insert_element (ds->columns, ds->n_columns, sizeof *ds->columns, before);
368 col = &ds->columns[before];
371 allocate_column (ds, width, col);
373 if (width >= 0 && !source_write_column (col, value))
375 datasheet_delete_columns (ds, before, 1);
376 taint_set_taint (ds->taint);
383 /* Deletes the N columns in DS starting from column START. */
385 datasheet_delete_columns (struct datasheet *ds, size_t start, size_t n)
387 assert (start + n <= ds->n_columns);
393 for (i = start; i < start + n; i++)
395 struct column *column = &ds->columns[i];
396 struct source *source = column->source;
397 source_release_column (source, column->byte_ofs, column->width);
398 release_source (ds, source);
401 remove_range (ds->columns, ds->n_columns, sizeof *ds->columns, start, n);
404 caseproto_unref (ds->proto);
409 /* Moves the N columns in DS starting at position OLD_START so
410 that they then start at position NEW_START. Equivalent to
411 deleting the column rows, then inserting them at what becomes
412 position NEW_START after the deletion. */
414 datasheet_move_columns (struct datasheet *ds,
415 size_t old_start, size_t new_start,
418 assert (old_start + n <= ds->n_columns);
419 assert (new_start + n <= ds->n_columns);
421 move_range (ds->columns, ds->n_columns, sizeof *ds->columns,
422 old_start, new_start, n);
424 caseproto_unref (ds->proto);
428 struct resize_datasheet_value_aux
430 union value src_value;
434 void (*resize_cb) (const union value *, union value *, void *aux);
437 union value dst_value;
443 resize_datasheet_value (const void *src, void *dst, void *aux_)
445 struct resize_datasheet_value_aux *aux = aux_;
447 memcpy (value_to_data (&aux->src_value, aux->src_width),
448 (uint8_t *) src + aux->src_ofs,
449 width_to_n_bytes (aux->src_width));
451 aux->resize_cb (&aux->src_value, &aux->dst_value, aux->resize_cb_aux);
453 memcpy ((uint8_t *) dst + aux->dst_ofs,
454 value_to_data (&aux->dst_value, aux->dst_width),
455 width_to_n_bytes (aux->dst_width));
461 datasheet_resize_column (struct datasheet *ds, size_t column, int new_width,
462 void (*resize_cb) (const union value *,
463 union value *, void *aux),
466 struct column old_col;
470 assert (column < datasheet_get_n_columns (ds));
472 col = &ds->columns[column];
474 old_width = old_col.width;
480 datasheet_delete_columns (ds, column, 1);
481 datasheet_insert_column (ds, NULL, -1, column);
484 else if (old_width == -1)
487 value_init (&value, new_width);
488 value_set_missing (&value, new_width);
489 if (resize_cb != NULL)
490 resize_cb (NULL, &value, resize_cb_aux);
491 datasheet_delete_columns (ds, column, 1);
492 datasheet_insert_column (ds, &value, new_width, column);
493 value_destroy (&value, new_width);
495 else if (source_has_backing (col->source))
497 unsigned long int n_rows = axis_get_size (ds->rows);
498 unsigned long int lrow;
499 union value src, dst;
501 source_release_column (col->source, col->byte_ofs, col->width);
502 allocate_column (ds, new_width, col);
504 value_init (&src, old_width);
505 value_init (&dst, new_width);
506 for (lrow = 0; lrow < n_rows; lrow++)
508 unsigned long int prow = axis_map (ds->rows, lrow);
509 if (!source_read (&old_col, prow, &src))
511 /* FIXME: back out col changes. */
514 resize_cb (&src, &dst, resize_cb_aux);
515 if (!source_write (col, prow, &dst))
517 /* FIXME: back out col changes. */
522 release_source (ds, old_col.source);
526 struct resize_datasheet_value_aux aux;
528 source_release_column (col->source, col->byte_ofs, col->width);
529 allocate_column (ds, new_width, col);
531 value_init (&aux.src_value, old_col.width);
532 aux.src_ofs = old_col.byte_ofs;
533 aux.src_width = old_col.width;
534 aux.resize_cb = resize_cb;
535 aux.resize_cb_aux = resize_cb_aux;
536 value_init (&aux.dst_value, new_width);
537 aux.dst_ofs = col->byte_ofs;
538 aux.dst_width = new_width;
539 sparse_xarray_copy (old_col.source->data, col->source->data,
540 resize_datasheet_value, &aux);
541 value_destroy (&aux.src_value, old_width);
542 value_destroy (&aux.dst_value, new_width);
544 release_source (ds, old_col.source);
549 /* Retrieves and returns the contents of the given ROW in
550 datasheet DS. The caller owns the returned case and must
551 unref it when it is no longer needed. Returns a null pointer
554 datasheet_get_row (const struct datasheet *ds, casenumber row)
556 size_t n_columns = datasheet_get_n_columns (ds);
557 struct ccase *c = case_create (datasheet_get_proto (ds));
558 if (rw_case (CONST_CAST (struct datasheet *, ds), OP_READ,
559 row, 0, n_columns, case_data_all_rw (c)))
568 /* Stores the contents of case C, which is destroyed, into the
569 given ROW in DS. Returns true on success, false on I/O error.
570 On failure, the given ROW might be partially modified or
573 datasheet_put_row (struct datasheet *ds, casenumber row, struct ccase *c)
575 size_t n_columns = datasheet_get_n_columns (ds);
576 bool ok = rw_case (ds, OP_WRITE, row, 0, n_columns,
577 (union value *) case_data_all (c));
582 /* Stores the values of COLUMN in DS in the given ROW in DS into
583 VALUE. The caller must have already initialized VALUE as a
584 value of the appropriate width (as returned by
585 datasheet_get_column_width (DS, COLUMN)). Returns true if
586 successful, false on I/O error. */
588 datasheet_get_value (const struct datasheet *ds, casenumber row,
589 size_t column, union value *value)
592 return rw_case (CONST_CAST (struct datasheet *, ds), OP_READ,
593 row, column, 1, value);
596 /* Stores VALUE into DS in the given ROW and COLUMN. VALUE must
597 have the correct width for COLUMN (as returned by
598 datasheet_get_column_width (DS, COLUMN)). Returns true if
599 successful, false on I/O error. On failure, ROW might be
600 partially modified or corrupted. */
602 datasheet_put_value (struct datasheet *ds UNUSED, casenumber row UNUSED,
603 size_t column UNUSED, const union value *value UNUSED)
605 return rw_case (ds, OP_WRITE, row, column, 1, (union value *) value);
608 /* Inserts the CNT cases at C into datasheet DS just before row
609 BEFORE. Returns true if successful, false on I/O error. On
610 failure, datasheet DS is not modified.
612 Regardless of success, this function unrefs all of the cases
615 datasheet_insert_rows (struct datasheet *ds,
616 casenumber before, struct ccase *c[],
619 casenumber added = 0;
622 unsigned long first_phy;
623 unsigned long phy_cnt;
626 /* Allocate physical rows from the pool of available
628 if (!axis_allocate (ds->rows, cnt, &first_phy, &phy_cnt))
630 /* No rows were available. Extend the row axis to make
631 some new ones available. */
633 first_phy = axis_extend (ds->rows, cnt);
636 /* Insert the new rows into the row mapping. */
637 axis_insert (ds->rows, before, first_phy, phy_cnt);
639 /* Initialize the new rows. */
640 for (i = 0; i < phy_cnt; i++)
641 if (!datasheet_put_row (ds, before + i, c[i]))
645 datasheet_delete_rows (ds, before - added, phy_cnt + added);
658 /* Deletes the CNT rows in DS starting from row FIRST. */
660 datasheet_delete_rows (struct datasheet *ds,
661 casenumber first, casenumber cnt)
665 /* Free up rows for reuse.
667 for (lrow = first; lrow < first + cnt; lrow++)
668 axis_make_available (ds->rows, axis_map (ds->rows, lrow), 1);
670 /* Remove rows from logical-to-physical mapping. */
671 axis_remove (ds->rows, first, cnt);
674 /* Moves the CNT rows in DS starting at position OLD_START so
675 that they then start at position NEW_START. Equivalent to
676 deleting the given rows, then inserting them at what becomes
677 position NEW_START after the deletion. */
679 datasheet_move_rows (struct datasheet *ds,
680 size_t old_start, size_t new_start,
683 axis_move (ds->rows, old_start, new_start, cnt);
686 static const struct casereader_random_class datasheet_reader_class;
688 /* Creates and returns a casereader whose input cases are the
689 rows in datasheet DS. From the caller's perspective, DS is
690 effectively destroyed by this operation, such that the caller
691 must not reference it again. */
693 datasheet_make_reader (struct datasheet *ds)
695 struct casereader *reader;
696 ds = datasheet_rename (ds);
697 reader = casereader_create_random (datasheet_get_proto (ds),
698 datasheet_get_n_rows (ds),
699 &datasheet_reader_class, ds);
700 taint_propagate (datasheet_get_taint (ds), casereader_get_taint (reader));
704 /* "read" function for the datasheet random casereader. */
705 static struct ccase *
706 datasheet_reader_read (struct casereader *reader UNUSED, void *ds_,
709 struct datasheet *ds = ds_;
710 if (case_idx < datasheet_get_n_rows (ds))
712 struct ccase *c = datasheet_get_row (ds, case_idx);
714 taint_set_taint (ds->taint);
721 /* "destroy" function for the datasheet random casereader. */
723 datasheet_reader_destroy (struct casereader *reader UNUSED, void *ds_)
725 struct datasheet *ds = ds_;
726 datasheet_destroy (ds);
729 /* "advance" function for the datasheet random casereader. */
731 datasheet_reader_advance (struct casereader *reader UNUSED, void *ds_,
734 struct datasheet *ds = ds_;
735 datasheet_delete_rows (ds, 0, case_cnt);
738 /* Random casereader class for a datasheet. */
739 static const struct casereader_random_class datasheet_reader_class =
741 datasheet_reader_read,
742 datasheet_reader_destroy,
743 datasheet_reader_advance,
747 allocate_column (struct datasheet *ds, int width, struct column *column)
749 caseproto_unref (ds->proto);
752 column->value_ofs = -1;
753 column->width = width;
759 n_bytes = width_to_n_bytes (width);
760 for (i = 0; i < ds->n_sources; i++)
762 column->source = ds->sources[i];
763 column->byte_ofs = source_allocate_column (column->source, n_bytes);
764 if (column->byte_ofs >= 0)
768 column->source = source_create_empty (MAX (n_bytes,
769 ds->column_min_alloc));
770 ds->sources = xnrealloc (ds->sources,
771 ds->n_sources + 1, sizeof *ds->sources);
772 ds->sources[ds->n_sources++] = column->source;
774 ds->column_min_alloc = MIN (65536, ds->column_min_alloc * 2);
776 column->byte_ofs = source_allocate_column (column->source, n_bytes);
777 assert (column->byte_ofs >= 0);
781 column->source = NULL;
782 column->byte_ofs = -1;
787 release_source (struct datasheet *ds, struct source *source)
789 if (source_has_backing (source) && !source_in_use (source))
791 /* Since only the first source to be added ever
792 has a backing, this source must have index
794 assert (source == ds->sources[0]);
795 ds->sources[0] = ds->sources[--ds->n_sources];
796 source_destroy (source);
800 /* Reads (if OP is OP_READ) or writes (if op is OP_WRITE) the
801 N_COLUMNS columns starting from column START_COLUMN in row
802 LROW to/from the N_COLUMNS values in DATA. */
804 rw_case (struct datasheet *ds, enum rw_op op,
805 casenumber lrow, size_t start_column, size_t n_columns,
811 assert (lrow < datasheet_get_n_rows (ds));
812 assert (n_columns <= datasheet_get_n_columns (ds));
813 assert (start_column + n_columns <= datasheet_get_n_columns (ds));
815 prow = axis_map (ds->rows, lrow);
816 for (i = 0; i < n_columns; i++)
818 struct column *c = &ds->columns[start_column + i];
824 ok = source_read (c, prow, &data[i]);
826 ok = source_write (c, prow, &data[i]);
830 taint_set_taint (ds->taint);
840 An axis has two functions. First, it maintains a mapping from
841 logical (client-visible) to physical (storage) ordinates. The
842 axis_map and axis_get_size functions inspect this mapping, and
843 the axis_insert, axis_remove, and axis_move functions modify
844 it. Second, it tracks the set of ordinates that are unused
845 and available for reuse. The axis_allocate,
846 axis_make_available, and axis_extend functions affect the set
847 of available ordinates. */
850 struct tower log_to_phy; /* Map from logical to physical ordinates;
851 contains "struct axis_group"s. */
852 struct range_set *available; /* Set of unused, available ordinates. */
853 unsigned long int phy_size; /* Current physical length of axis. */
856 /* A mapping from logical to physical ordinates. */
859 struct tower_node logical; /* Range of logical ordinates. */
860 unsigned long phy_start; /* First corresponding physical ordinate. */
863 static struct axis_group *axis_group_from_tower_node (struct tower_node *);
864 static struct tower_node *make_axis_group (unsigned long int phy_start);
865 static struct tower_node *split_axis (struct axis *, unsigned long int where);
866 static void merge_axis_nodes (struct axis *, struct tower_node *,
867 struct tower_node **other_node);
868 static void check_axis_merged (const struct axis *axis UNUSED);
870 /* Creates and returns a new, initially empty axis. */
874 struct axis *axis = xmalloc (sizeof *axis);
875 tower_init (&axis->log_to_phy);
876 axis->available = range_set_create ();
881 /* Returns a clone of existing axis OLD.
883 Currently this is used only by the datasheet model checker
884 driver, but it could be otherwise useful. */
886 axis_clone (const struct axis *old)
888 const struct tower_node *node;
891 new = xmalloc (sizeof *new);
892 tower_init (&new->log_to_phy);
893 new->available = range_set_clone (old->available, NULL);
894 new->phy_size = old->phy_size;
896 for (node = tower_first (&old->log_to_phy); node != NULL;
897 node = tower_next (&old->log_to_phy, node))
899 unsigned long int size = tower_node_get_size (node);
900 struct axis_group *group = tower_data (node, struct axis_group, logical);
901 tower_insert (&new->log_to_phy, size, make_axis_group (group->phy_start),
908 /* Adds the state of AXIS to the MD4 hash context CTX.
910 This is only used by the datasheet model checker driver. It
911 is unlikely to be otherwise useful. */
913 axis_hash (const struct axis *axis, struct md4_ctx *ctx)
915 const struct tower_node *tn;
916 const struct range_set_node *rsn;
918 for (tn = tower_first (&axis->log_to_phy); tn != NULL;
919 tn = tower_next (&axis->log_to_phy, tn))
921 struct axis_group *group = tower_data (tn, struct axis_group, logical);
922 unsigned long int phy_start = group->phy_start;
923 unsigned long int size = tower_node_get_size (tn);
925 md4_process_bytes (&phy_start, sizeof phy_start, ctx);
926 md4_process_bytes (&size, sizeof size, ctx);
929 RANGE_SET_FOR_EACH (rsn, axis->available)
931 unsigned long int start = range_set_node_get_start (rsn);
932 unsigned long int end = range_set_node_get_end (rsn);
934 md4_process_bytes (&start, sizeof start, ctx);
935 md4_process_bytes (&end, sizeof end, ctx);
938 md4_process_bytes (&axis->phy_size, sizeof axis->phy_size, ctx);
943 axis_destroy (struct axis *axis)
948 while (!tower_is_empty (&axis->log_to_phy))
950 struct tower_node *node = tower_first (&axis->log_to_phy);
951 struct axis_group *group = tower_data (node, struct axis_group,
953 tower_delete (&axis->log_to_phy, node);
957 range_set_destroy (axis->available);
961 /* Allocates up to REQUEST contiguous unused and available
962 ordinates from AXIS. If successful, stores the number
963 obtained into *WIDTH and the ordinate of the first into
964 *START, marks the ordinates as now unavailable return true.
965 On failure, which occurs only if AXIS has no unused and
966 available ordinates, returns false without modifying AXIS. */
968 axis_allocate (struct axis *axis, unsigned long int request,
969 unsigned long int *start, unsigned long int *width)
971 return range_set_allocate (axis->available, request, start, width);
974 /* Marks the WIDTH contiguous ordinates in AXIS, starting from
975 START, as unused and available. */
977 axis_make_available (struct axis *axis,
978 unsigned long int start, unsigned long int width)
980 range_set_set1 (axis->available, start, width);
983 /* Extends the total physical length of AXIS by WIDTH and returns
984 the first ordinate in the new physical region. */
985 static unsigned long int
986 axis_extend (struct axis *axis, unsigned long int width)
988 unsigned long int start = axis->phy_size;
989 axis->phy_size += width;
993 /* Returns the physical ordinate in AXIS corresponding to logical
994 ordinate LOG_POS. LOG_POS must be less than the logical
996 static unsigned long int
997 axis_map (const struct axis *axis, unsigned long log_pos)
999 struct tower_node *node;
1000 struct axis_group *group;
1001 unsigned long int group_start;
1003 node = tower_lookup (&axis->log_to_phy, log_pos, &group_start);
1004 group = tower_data (node, struct axis_group, logical);
1005 return group->phy_start + (log_pos - group_start);
1008 /* Returns the logical length of AXIS. */
1009 static unsigned long
1010 axis_get_size (const struct axis *axis)
1012 return tower_height (&axis->log_to_phy);
1015 /* Inserts the CNT contiguous physical ordinates starting at
1016 PHY_START into AXIS's logical-to-physical mapping, starting at
1017 logical position LOG_START. */
1019 axis_insert (struct axis *axis,
1020 unsigned long int log_start, unsigned long int phy_start,
1021 unsigned long int cnt)
1023 struct tower_node *before = split_axis (axis, log_start);
1024 struct tower_node *new = make_axis_group (phy_start);
1025 tower_insert (&axis->log_to_phy, cnt, new, before);
1026 merge_axis_nodes (axis, new, NULL);
1027 check_axis_merged (axis);
1030 /* Removes CNT ordinates from AXIS's logical-to-physical mapping
1031 starting at logical position START. */
1033 axis_remove (struct axis *axis,
1034 unsigned long int start, unsigned long int cnt)
1038 struct tower_node *last = split_axis (axis, start + cnt);
1039 struct tower_node *cur, *next;
1040 for (cur = split_axis (axis, start); cur != last; cur = next)
1042 next = tower_delete (&axis->log_to_phy, cur);
1043 free (axis_group_from_tower_node (cur));
1045 merge_axis_nodes (axis, last, NULL);
1046 check_axis_merged (axis);
1050 /* Moves the CNT ordinates in AXIS's logical-to-mapping starting
1051 at logical position OLD_START so that they then start at
1052 position NEW_START. */
1054 axis_move (struct axis *axis,
1055 unsigned long int old_start, unsigned long int new_start,
1056 unsigned long int cnt)
1058 if (cnt > 0 && old_start != new_start)
1060 struct tower_node *old_first, *old_last, *new_first;
1061 struct tower_node *merge1, *merge2;
1062 struct tower tmp_array;
1064 /* Move ordinates OLD_START...(OLD_START + CNT) into new,
1065 separate TMP_ARRAY. */
1066 old_first = split_axis (axis, old_start);
1067 old_last = split_axis (axis, old_start + cnt);
1068 tower_init (&tmp_array);
1069 tower_splice (&tmp_array, NULL,
1070 &axis->log_to_phy, old_first, old_last);
1071 merge_axis_nodes (axis, old_last, NULL);
1072 check_axis_merged (axis);
1074 /* Move TMP_ARRAY to position NEW_START. */
1075 new_first = split_axis (axis, new_start);
1076 merge1 = tower_first (&tmp_array);
1077 merge2 = tower_last (&tmp_array);
1078 if (merge2 == merge1)
1080 tower_splice (&axis->log_to_phy, new_first, &tmp_array, old_first, NULL);
1081 merge_axis_nodes (axis, merge1, &merge2);
1082 merge_axis_nodes (axis, merge2, NULL);
1083 check_axis_merged (axis);
1087 /* Returns the axis_group in which NODE is embedded. */
1088 static struct axis_group *
1089 axis_group_from_tower_node (struct tower_node *node)
1091 return tower_data (node, struct axis_group, logical);
1094 /* Creates and returns a new axis_group at physical position
1096 static struct tower_node *
1097 make_axis_group (unsigned long phy_start)
1099 struct axis_group *group = xmalloc (sizeof *group);
1100 group->phy_start = phy_start;
1101 return &group->logical;
1104 /* Returns the tower_node in AXIS's logical-to-physical map whose
1105 bottom edge is at exact level WHERE. If there is no such
1106 tower_node in AXIS's logical-to-physical map, then split_axis
1107 creates one by breaking an existing tower_node into two
1108 separate ones, unless WHERE is equal to the tower height, in
1109 which case it simply returns a null pointer. */
1110 static struct tower_node *
1111 split_axis (struct axis *axis, unsigned long int where)
1113 unsigned long int group_start;
1114 struct tower_node *group_node;
1115 struct axis_group *group;
1117 assert (where <= tower_height (&axis->log_to_phy));
1118 if (where >= tower_height (&axis->log_to_phy))
1121 group_node = tower_lookup (&axis->log_to_phy, where, &group_start);
1122 group = axis_group_from_tower_node (group_node);
1123 if (where > group_start)
1125 unsigned long int size_1 = where - group_start;
1126 unsigned long int size_2 = tower_node_get_size (group_node) - size_1;
1127 struct tower_node *next = tower_next (&axis->log_to_phy, group_node);
1128 struct tower_node *new = make_axis_group (group->phy_start + size_1);
1129 tower_resize (&axis->log_to_phy, group_node, size_1);
1130 tower_insert (&axis->log_to_phy, size_2, new, next);
1134 return &group->logical;
1137 /* Within AXIS, attempts to merge NODE (or the last node in AXIS,
1138 if NODE is null) with its neighbor nodes. This is possible
1139 when logically adjacent nodes are also adjacent physically (in
1142 When a merge occurs, and OTHER_NODE is non-null and points to
1143 the node to be deleted, this function also updates
1144 *OTHER_NODE, if necessary, to ensure that it remains a valid
1147 merge_axis_nodes (struct axis *axis, struct tower_node *node,
1148 struct tower_node **other_node)
1150 struct tower *t = &axis->log_to_phy;
1151 struct axis_group *group;
1152 struct tower_node *next, *prev;
1154 /* Find node to potentially merge with neighbors. */
1156 node = tower_last (t);
1159 group = axis_group_from_tower_node (node);
1161 /* Try to merge NODE with successor. */
1162 next = tower_next (t, node);
1165 struct axis_group *next_group = axis_group_from_tower_node (next);
1166 unsigned long this_height = tower_node_get_size (node);
1168 if (group->phy_start + this_height == next_group->phy_start)
1170 unsigned long next_height = tower_node_get_size (next);
1171 tower_resize (t, node, this_height + next_height);
1172 if (other_node != NULL && *other_node == next)
1173 *other_node = tower_next (t, *other_node);
1174 tower_delete (t, next);
1179 /* Try to merge NODE with predecessor. */
1180 prev = tower_prev (t, node);
1183 struct axis_group *prev_group = axis_group_from_tower_node (prev);
1184 unsigned long prev_height = tower_node_get_size (prev);
1186 if (prev_group->phy_start + prev_height == group->phy_start)
1188 unsigned long this_height = tower_node_get_size (node);
1189 group->phy_start = prev_group->phy_start;
1190 tower_resize (t, node, this_height + prev_height);
1191 if (other_node != NULL && *other_node == prev)
1192 *other_node = tower_next (t, *other_node);
1193 tower_delete (t, prev);
1199 /* Verify that all potentially merge-able nodes in AXIS are
1202 check_axis_merged (const struct axis *axis UNUSED)
1204 #if ASSERT_LEVEL >= 10
1205 struct tower_node *prev, *node;
1207 for (prev = NULL, node = tower_first (&axis->log_to_phy); node != NULL;
1208 prev = node, node = tower_next (&axis->log_to_phy, node))
1211 struct axis_group *prev_group = axis_group_from_tower_node (prev);
1212 unsigned long prev_height = tower_node_get_size (prev);
1213 struct axis_group *node_group = axis_group_from_tower_node (node);
1214 assert (prev_group->phy_start + prev_height != node_group->phy_start);
1221 /* Creates and returns an empty, unbacked source with N_BYTES
1222 bytes per case, none of which are initially in use. */
1223 static struct source *
1224 source_create_empty (size_t n_bytes)
1226 struct source *source = xmalloc (sizeof *source);
1227 size_t row_size = n_bytes + 4 * sizeof (void *);
1228 size_t max_memory_rows = settings_get_workspace () / row_size;
1229 source->avail = range_set_create ();
1230 range_set_set1 (source->avail, 0, n_bytes);
1231 source->data = sparse_xarray_create (n_bytes, MAX (max_memory_rows, 4));
1232 source->backing = NULL;
1233 source->backing_rows = 0;
1238 /* Creates and returns a new source backed by READER and with the
1239 same initial dimensions and content. */
1240 static struct source *
1241 source_create_casereader (struct casereader *reader)
1243 const struct caseproto *proto = casereader_get_proto (reader);
1244 size_t n_bytes = caseproto_to_n_bytes (proto);
1245 struct source *source = source_create_empty (n_bytes);
1249 range_set_set0 (source->avail, 0, n_bytes);
1250 source->backing = reader;
1251 source->backing_rows = casereader_count_cases (reader);
1254 n_columns = caseproto_get_n_widths (proto);
1255 for (i = 0; i < n_columns; i++)
1256 if (caseproto_get_width (proto, i) >= 0)
1262 /* Returns a clone of source OLD with the same data and backing
1265 Currently this is used only by the datasheet model checker
1266 driver, but it could be otherwise useful. */
1267 static struct source *
1268 source_clone (const struct source *old)
1270 struct source *new = xmalloc (sizeof *new);
1271 new->avail = range_set_clone (old->avail, NULL);
1272 new->data = sparse_xarray_clone (old->data);
1273 new->backing = old->backing != NULL ? casereader_clone (old->backing) : NULL;
1274 new->backing_rows = old->backing_rows;
1275 new->n_used = old->n_used;
1276 if (new->data == NULL)
1278 source_destroy (new);
1285 source_allocate_column (struct source *source, int width)
1287 unsigned long int start;
1290 assert (width >= 0);
1291 n_bytes = width_to_n_bytes (width);
1292 if (source->backing == NULL
1293 && range_set_allocate_fully (source->avail, n_bytes, &start))
1300 source_release_column (struct source *source, int ofs, int width)
1302 assert (width >= 0);
1303 range_set_set1 (source->avail, ofs, width_to_n_bytes (width));
1304 if (source->backing != NULL)
1308 /* Returns true if SOURCE has any columns in use,
1311 source_in_use (const struct source *source)
1313 return source->n_used > 0;
1316 /* Destroys SOURCE and its data and backing, if any. */
1318 source_destroy (struct source *source)
1322 range_set_destroy (source->avail);
1323 sparse_xarray_destroy (source->data);
1324 casereader_destroy (source->backing);
1329 /* Returns the number of rows in SOURCE's backing casereader
1330 (SOURCE must have a backing casereader). */
1332 source_get_backing_n_rows (const struct source *source)
1334 assert (source_has_backing (source));
1335 return source->backing_rows;
1338 /* Reads the given COLUMN from SOURCE in the given ROW, into
1339 VALUE. Returns true if successful, false on I/O error.
1341 The caller must have initialized VALUE with the proper
1344 source_read (const struct column *column, casenumber row, union value *value)
1346 struct source *source = column->source;
1348 assert (column->width >= 0);
1349 if (source->backing == NULL
1350 || sparse_xarray_contains_row (source->data, row))
1351 return sparse_xarray_read (source->data, row, column->byte_ofs,
1352 width_to_n_bytes (column->width),
1353 value_to_data (value, column->width));
1356 struct ccase *c = casereader_peek (source->backing, row);
1357 bool ok = c != NULL;
1360 value_copy (value, case_data_idx (c, column->value_ofs),
1369 copy_case_into_source (struct source *source, struct ccase *c, casenumber row)
1371 const struct caseproto *proto = casereader_get_proto (source->backing);
1372 size_t n_widths = caseproto_get_n_widths (proto);
1377 for (i = 0; i < n_widths; i++)
1379 int width = caseproto_get_width (proto, i);
1382 int n_bytes = width_to_n_bytes (width);
1383 if (!sparse_xarray_write (source->data, row, ofs, n_bytes,
1384 value_to_data (case_data_idx (c, i),
1393 /* Writes VALUE to SOURCE in the given ROW and COLUMN. Returns
1394 true if successful, false on I/O error. On error, the row's
1395 data may be completely or partially corrupted, both inside and
1396 outside the region to be written. */
1398 source_write (const struct column *column, casenumber row,
1399 const union value *value)
1401 struct source *source = column->source;
1402 struct casereader *backing = source->backing;
1404 assert (column->width >= 0);
1406 && !sparse_xarray_contains_row (source->data, row)
1407 && row < source->backing_rows)
1412 c = casereader_peek (backing, row);
1416 ok = copy_case_into_source (source, c, row);
1422 return sparse_xarray_write (source->data, row, column->byte_ofs,
1423 width_to_n_bytes (column->width),
1424 value_to_data (value, column->width));
1427 /* Within SOURCE, which must not have a backing casereader,
1428 writes the VALUE_CNT values in VALUES_CNT to the VALUE_CNT
1429 columns starting from START_COLUMN, in every row, even in rows
1430 not yet otherwise initialized. Returns true if successful,
1431 false if an I/O error occurs.
1433 We don't support backing != NULL because (1) it's harder and
1434 (2) this function is only called by
1435 datasheet_insert_column, which doesn't reuse columns from
1436 sources that are backed by casereaders. */
1438 source_write_column (struct column *column, const union value *value)
1440 int width = column->width;
1442 assert (column->source->backing == NULL);
1443 assert (width >= 0);
1445 return sparse_xarray_write_columns (column->source->data, column->byte_ofs,
1446 width_to_n_bytes (width),
1447 value_to_data (value, width));
1450 /* Returns true if SOURCE has a backing casereader, false
1453 source_has_backing (const struct source *source)
1455 return source->backing != NULL;
1458 /* Datasheet model checker test driver. */
1461 get_source_index (const struct datasheet *ds, const struct source *source)
1465 for (i = 0; i < ds->n_sources; i++)
1466 if (ds->sources[i] == source)
1471 /* Clones the structure and contents of ODS into a new datasheet,
1472 and returns the new datasheet. */
1474 clone_datasheet (const struct datasheet *ods)
1476 struct datasheet *ds;
1479 ds = xmalloc (sizeof *ds);
1481 ds->sources = xmalloc (ods->n_sources * sizeof *ds->sources);
1482 for (i = 0; i < ods->n_sources; i++)
1483 ds->sources[i] = source_clone (ods->sources[i]);
1484 ds->n_sources = ods->n_sources;
1486 ds->proto = ods->proto != NULL ? caseproto_ref (ods->proto) : NULL;
1487 ds->columns = xmemdup (ods->columns, ods->n_columns * sizeof *ods->columns);
1488 for (i = 0; i < ods->n_columns; i++)
1489 ds->columns[i].source
1490 = ds->sources[get_source_index (ods, ods->columns[i].source)];
1491 ds->n_columns = ods->n_columns;
1492 ds->column_min_alloc = ods->column_min_alloc;
1494 ds->rows = axis_clone (ods->rows);
1496 ds->taint = taint_create ();
1501 /* Hashes the structure of datasheet DS and returns the hash.
1502 We use MD4 because it is much faster than MD5 or SHA-1 but its
1503 collision resistance is just as good. */
1505 hash_datasheet (const struct datasheet *ds)
1507 unsigned int hash[DIV_RND_UP (20, sizeof (unsigned int))];
1511 md4_init_ctx (&ctx);
1512 for (i = 0; i < ds->n_columns; i++)
1514 const struct column *column = &ds->columns[i];
1515 int source_n_bytes = sparse_xarray_get_n_columns (column->source->data);
1516 md4_process_bytes (&source_n_bytes, sizeof source_n_bytes, &ctx);
1517 /*md4_process_bytes (&column->byte_ofs, sizeof column->byte_ofs, &ctx);*/
1518 md4_process_bytes (&column->value_ofs, sizeof column->value_ofs, &ctx);
1519 md4_process_bytes (&column->width, sizeof column->width, &ctx);
1521 axis_hash (ds->rows, &ctx);
1522 md4_process_bytes (&ds->column_min_alloc, sizeof ds->column_min_alloc, &ctx);
1523 md4_finish_ctx (&ctx, hash);