1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 2007, 2009 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
19 #include <data/sparse-cases.h>
24 #include <data/case.h>
25 #include <data/settings.h>
26 #include <data/case-tmpfile.h>
27 #include <libpspp/assertion.h>
28 #include <libpspp/range-set.h>
29 #include <libpspp/sparse-array.h>
33 /* A sparse array of cases. */
36 size_t column_cnt; /* Number of values per case. */
37 union value *default_columns; /* Defaults for unwritten cases. */
38 casenumber max_memory_cases; /* Max cases before dumping to disk. */
39 struct sparse_array *memory; /* Backing, if stored in memory. */
40 struct case_tmpfile *disk; /* Backing, if stored on disk. */
41 struct range_set *disk_cases; /* Allocated cases, if on disk. */
44 /* Creates and returns a new sparse array of cases with
45 COLUMN_CNT values per case. */
47 sparse_cases_create (size_t column_cnt)
49 struct sparse_cases *sc = xmalloc (sizeof *sc);
50 sc->column_cnt = column_cnt;
51 sc->default_columns = NULL;
52 sc->max_memory_cases = settings_get_workspace_cases (column_cnt);
53 sc->memory = sparse_array_create (sizeof (struct ccase *));
55 sc->disk_cases = NULL;
59 /* Creates and returns a new sparse array of cases that contains
60 the same data as OLD. */
62 sparse_cases_clone (const struct sparse_cases *old)
64 struct sparse_cases *new = xmalloc (sizeof *new);
66 new->column_cnt = old->column_cnt;
68 if (old->default_columns != NULL)
70 = xmemdup (old->default_columns,
71 old->column_cnt * sizeof *old->default_columns);
73 new->default_columns = NULL;
75 new->max_memory_cases = old->max_memory_cases;
77 if (old->memory != NULL)
79 unsigned long int idx;
82 new->memory = sparse_array_create (sizeof (struct ccase *));
83 for (cp = sparse_array_scan (old->memory, NULL, &idx); cp != NULL;
84 cp = sparse_array_scan (old->memory, &idx, &idx))
86 struct ccase **ncp = sparse_array_insert (new->memory, idx);
87 *ncp = case_ref (*cp);
93 if (old->disk != NULL)
95 const struct range_set_node *node;
97 new->disk = case_tmpfile_create (old->column_cnt);
98 new->disk_cases = range_set_create ();
99 for (node = range_set_first (old->disk_cases); node != NULL;
100 node = range_set_next (old->disk_cases, node))
102 unsigned long int start = range_set_node_get_start (node);
103 unsigned long int end = range_set_node_get_end (node);
104 unsigned long int idx;
106 for (idx = start; idx < end; idx++)
108 struct ccase *c = case_tmpfile_get_case (old->disk, idx);
109 if (c == NULL || !case_tmpfile_put_case (new->disk, idx, c))
111 sparse_cases_destroy (new);
120 new->disk_cases = NULL;
126 /* Destroys sparse array of cases SC. */
128 sparse_cases_destroy (struct sparse_cases *sc)
132 if (sc->memory != NULL)
134 unsigned long int idx;
136 for (cp = sparse_array_scan (sc->memory, NULL, &idx); cp != NULL;
137 cp = sparse_array_scan (sc->memory, &idx, &idx))
139 sparse_array_destroy (sc->memory);
141 free (sc->default_columns);
142 case_tmpfile_destroy (sc->disk);
143 range_set_destroy (sc->disk_cases);
148 /* Returns the number of `union value's in each case in SC. */
150 sparse_cases_get_value_cnt (const struct sparse_cases *sc)
152 return sc->column_cnt;
155 /* Dumps the cases in SC, which must currently be stored in
156 memory, to disk. Returns true if successful, false on I/O
159 dump_sparse_cases_to_disk (struct sparse_cases *sc)
161 unsigned long int idx;
164 assert (sc->memory != NULL);
165 assert (sc->disk == NULL);
167 sc->disk = case_tmpfile_create (sc->column_cnt);
168 sc->disk_cases = range_set_create ();
170 for (cp = sparse_array_scan (sc->memory, NULL, &idx); cp != NULL;
171 cp = sparse_array_scan (sc->memory, &idx, &idx))
173 if (!case_tmpfile_put_case (sc->disk, idx, *cp))
175 case_tmpfile_destroy (sc->disk);
177 range_set_destroy (sc->disk_cases);
178 sc->disk_cases = NULL;
181 range_set_insert (sc->disk_cases, idx, 1);
183 sparse_array_destroy (sc->memory);
188 /* Returns true if any data has ever been written to ROW in SC,
191 sparse_cases_contains_row (const struct sparse_cases *sc, casenumber row)
193 return (sc->memory != NULL
194 ? sparse_array_get (sc->memory, row) != NULL
195 : range_set_contains (sc->disk_cases, row));
198 /* Reads columns COLUMNS...(COLUMNS + VALUE_CNT), exclusive, in
199 the given ROW in SC, into the VALUE_CNT values in VALUES.
200 Returns true if successful, false on I/O error. */
202 sparse_cases_read (struct sparse_cases *sc, casenumber row, size_t column,
203 union value values[], size_t value_cnt)
205 assert (value_cnt <= sc->column_cnt);
206 assert (column + value_cnt <= sc->column_cnt);
208 if (sparse_cases_contains_row (sc, row))
211 if (sc->memory != NULL)
213 struct ccase **cp = sparse_array_get (sc->memory, row);
218 c = case_tmpfile_get_case (sc->disk, row);
222 case_copy_out (c, column, values, value_cnt);
227 assert (sc->default_columns != NULL);
228 memcpy (values, sc->default_columns + column,
229 sizeof *values * value_cnt);
235 /* Implements sparse_cases_write for an on-disk sparse_cases. */
237 write_disk_case (struct sparse_cases *sc, casenumber row, size_t column,
238 const union value values[], size_t value_cnt)
243 /* Get current case data. */
244 if (column == 0 && value_cnt == sc->column_cnt)
245 c = case_create (sc->column_cnt);
248 c = case_tmpfile_get_case (sc->disk, row);
253 /* Copy in new data. */
254 case_copy_in (c, column, values, value_cnt);
256 /* Write new case. */
257 ok = case_tmpfile_put_case (sc->disk, row, c);
259 range_set_insert (sc->disk_cases, row, 1);
264 /* Writes the VALUE_CNT values in VALUES into columns
265 COLUMNS...(COLUMNS + VALUE_CNT), exclusive, in the given ROW
267 Returns true if successful, false on I/O error. */
269 sparse_cases_write (struct sparse_cases *sc, casenumber row, size_t column,
270 const union value values[], size_t value_cnt)
272 if (sc->memory != NULL)
274 struct ccase *c, **cp;
275 cp = sparse_array_get (sc->memory, row);
277 c = *cp = case_unshare (*cp);
280 if (sparse_array_count (sc->memory) >= sc->max_memory_cases)
282 if (!dump_sparse_cases_to_disk (sc))
284 return write_disk_case (sc, row, column, values, value_cnt);
287 cp = sparse_array_insert (sc->memory, row);
288 c = *cp = case_create (sc->column_cnt);
289 if (sc->default_columns != NULL
290 && (column != 0 || value_cnt != sc->column_cnt))
291 case_copy_in (c, 0, sc->default_columns, sc->column_cnt);
293 case_copy_in (c, column, values, value_cnt);
297 return write_disk_case (sc, row, column, values, value_cnt);
300 /* Writes the VALUE_CNT values in VALUES to columns
301 START_COLUMN...(START_COLUMN + VALUE_CNT), exclusive, in every
302 row in SC, even those rows that have not yet been written.
303 Returns true if successful, false on I/O error.
305 The runtime of this function is linear in the number of rows
306 in SC that have already been written. */
308 sparse_cases_write_columns (struct sparse_cases *sc, size_t start_column,
309 const union value values[], size_t value_cnt)
311 assert (value_cnt <= sc->column_cnt);
312 assert (start_column + value_cnt <= sc->column_cnt);
315 if (sc->default_columns == NULL)
316 sc->default_columns = xnmalloc (sc->column_cnt,
317 sizeof *sc->default_columns);
318 memcpy (sc->default_columns + start_column, values,
319 value_cnt * sizeof *sc->default_columns);
321 /* Set individual rows. */
322 if (sc->memory != NULL)
325 unsigned long int idx;
327 for (cp = sparse_array_scan (sc->memory, NULL, &idx); cp != NULL;
328 cp = sparse_array_scan (sc->memory, &idx, &idx))
330 *cp = case_unshare (*cp);
331 case_copy_in (*cp, start_column, values, value_cnt);
336 const struct range_set_node *node;
338 for (node = range_set_first (sc->disk_cases); node != NULL;
339 node = range_set_next (sc->disk_cases, node))
341 unsigned long int start = range_set_node_get_start (node);
342 unsigned long int end = range_set_node_get_end (node);
343 unsigned long int row;
345 for (row = start; row < end; row++)
346 case_tmpfile_put_values (sc->disk, row,
347 start_column, values, value_cnt);
350 if (case_tmpfile_error (sc->disk))