1a12221b4b294a4ccd1b8b9f0c6da44ccafaf993
[pspp-builds.git] / src / data / sparse-cases.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 2007 Free Software Foundation, Inc.
3
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License as
6    published by the Free Software Foundation; either version 2 of the
7    License, or (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful, but
10    WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12    General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
17    02110-1301, USA. */
18
19 #include <config.h>
20
21 #include <data/sparse-cases.h>
22
23 #include <stdlib.h>
24 #include <string.h>
25
26 #include <data/settings.h>
27 #include <data/case-tmpfile.h>
28 #include <libpspp/assertion.h>
29 #include <libpspp/range-set.h>
30 #include <libpspp/sparse-array.h>
31
32 #include "xalloc.h"
33
34 /* A sparse array of cases. */
35 struct sparse_cases
36   {
37     size_t column_cnt;                  /* Number of values per case. */
38     union value *default_columns;       /* Defaults for unwritten cases. */
39     casenumber max_memory_cases;        /* Max cases before dumping to disk. */
40     struct sparse_array *memory;        /* Backing, if stored in memory. */
41     struct case_tmpfile *disk;          /* Backing, if stored on disk. */
42     struct range_set *disk_cases;       /* Allocated cases, if on disk. */
43   };
44
45 /* Creates and returns a new sparse array of cases with
46    COLUMN_CNT values per case. */
47 struct sparse_cases *
48 sparse_cases_create (size_t column_cnt)
49 {
50   struct sparse_cases *sc = xmalloc (sizeof *sc);
51   sc->column_cnt = column_cnt;
52   sc->default_columns = NULL;
53   sc->max_memory_cases = get_workspace_cases (column_cnt);
54   sc->memory = sparse_array_create (sizeof (struct ccase));
55   sc->disk = NULL;
56   sc->disk_cases = NULL;
57   return sc;
58 }
59
60 /* Creates and returns a new sparse array of cases that contains
61    the same data as OLD. */
62 struct sparse_cases *
63 sparse_cases_clone (const struct sparse_cases *old)
64 {
65   struct sparse_cases *new = xmalloc (sizeof *new);
66
67   new->column_cnt = old->column_cnt;
68
69   if (old->default_columns != NULL)
70     new->default_columns
71       = xmemdup (old->default_columns,
72                  old->column_cnt * sizeof *old->default_columns);
73   else
74     new->default_columns = NULL;
75
76   new->max_memory_cases = old->max_memory_cases;
77
78   if (old->memory != NULL)
79     {
80       unsigned long int idx;
81       struct ccase *c;
82
83       new->memory = sparse_array_create (sizeof (struct ccase));
84       for (c = sparse_array_scan (old->memory, NULL, &idx); c != NULL;
85            c = sparse_array_scan (old->memory, &idx, &idx))
86         case_clone (sparse_array_insert (new->memory, idx), c);
87     }
88   else
89     new->memory = NULL;
90
91   if (old->disk != NULL)
92     {
93       const struct range_set_node *node;
94
95       new->disk = case_tmpfile_create (old->column_cnt);
96       new->disk_cases = range_set_create ();
97       for (node = range_set_first (old->disk_cases); node != NULL;
98            node = range_set_next (old->disk_cases, node))
99         {
100           unsigned long int start = range_set_node_get_start (node);
101           unsigned long int end = range_set_node_get_end (node);
102           unsigned long int idx;
103           struct ccase c;
104
105           for (idx = start; idx < end; idx++)
106             if (!case_tmpfile_get_case (old->disk, idx, &c)
107                 || !case_tmpfile_put_case (new->disk, idx, &c))
108               {
109                 sparse_cases_destroy (new);
110                 return NULL;
111               }
112         }
113     }
114   else
115     {
116       new->disk = NULL;
117       new->disk_cases = NULL;
118     }
119
120   return new;
121 }
122
123 /* Destroys sparse array of cases SC. */
124 void
125 sparse_cases_destroy (struct sparse_cases *sc)
126 {
127   if (sc != NULL)
128     {
129       if (sc->memory != NULL)
130         {
131           unsigned long int idx;
132           struct ccase *c;
133           for (c = sparse_array_scan (sc->memory, NULL, &idx); c != NULL;
134                c = sparse_array_scan (sc->memory, &idx, &idx))
135             case_destroy (c);
136           sparse_array_destroy (sc->memory);
137         }
138       free (sc->default_columns);
139       case_tmpfile_destroy (sc->disk);
140       range_set_destroy (sc->disk_cases);
141       free (sc);
142     }
143 }
144
145 /* Returns the number of `union value's in each case in SC. */
146 size_t
147 sparse_cases_get_value_cnt (const struct sparse_cases *sc)
148 {
149   return sc->column_cnt;
150 }
151
152 /* Dumps the cases in SC, which must currently be stored in
153    memory, to disk.  Returns true if successful, false on I/O
154    error. */
155 static bool
156 dump_sparse_cases_to_disk (struct sparse_cases *sc)
157 {
158   unsigned long int idx;
159   struct ccase *c;
160
161   assert (sc->memory != NULL);
162   assert (sc->disk == NULL);
163
164   sc->disk = case_tmpfile_create (sc->column_cnt);
165   sc->disk_cases = range_set_create ();
166
167   for (c = sparse_array_scan (sc->memory, NULL, &idx); c != NULL;
168        c = sparse_array_scan (sc->memory, &idx, &idx))
169     {
170       if (!case_tmpfile_put_case (sc->disk, idx, c))
171         {
172           case_tmpfile_destroy (sc->disk);
173           sc->disk = NULL;
174           range_set_destroy (sc->disk_cases);
175           sc->disk_cases = NULL;
176           return false;
177         }
178       range_set_insert (sc->disk_cases, idx, 1);
179     }
180   sparse_array_destroy (sc->memory);
181   sc->memory = NULL;
182   return true;
183 }
184
185 /* Returns true if any data has ever been written to ROW in SC,
186    false otherwise. */
187 bool
188 sparse_cases_contains_row (const struct sparse_cases *sc, casenumber row)
189 {
190   return (sc->memory != NULL
191           ? sparse_array_get (sc->memory, row) != NULL
192           : range_set_contains (sc->disk_cases, row));
193 }
194
195 /* Reads columns COLUMNS...(COLUMNS + VALUE_CNT), exclusive, in
196    the given ROW in SC, into the VALUE_CNT values in VALUES.
197    Returns true if successful, false on I/O error. */
198 bool
199 sparse_cases_read (struct sparse_cases *sc, casenumber row, size_t column,
200                    union value values[], size_t value_cnt)
201 {
202   assert (value_cnt <= sc->column_cnt);
203   assert (column + value_cnt <= sc->column_cnt);
204
205   if (sparse_cases_contains_row (sc, row))
206     {
207       struct ccase c;
208       if (sc->memory != NULL)
209         case_clone (&c, sparse_array_get (sc->memory, row));
210       else if (!case_tmpfile_get_case (sc->disk, row, &c))
211         return false;
212       case_copy_out (&c, column, values, value_cnt);
213       case_destroy (&c);
214     }
215   else
216     {
217       assert (sc->default_columns != NULL);
218       memcpy (values, sc->default_columns + column,
219               sizeof *values * value_cnt);
220     }
221
222   return true;
223 }
224
225 /* Implements sparse_cases_write for an on-disk sparse_cases. */
226 static bool
227 write_disk_case (struct sparse_cases *sc, casenumber row, size_t column,
228                  const union value values[], size_t value_cnt)
229 {
230   struct ccase c;
231   bool ok;
232
233   /* Get current case data. */
234   if (column == 0 && value_cnt == sc->column_cnt)
235     case_create (&c, sc->column_cnt);
236   else if (!case_tmpfile_get_case (sc->disk, row, &c))
237     return false;
238
239   /* Copy in new data. */
240   case_copy_in (&c, column, values, value_cnt);
241
242   /* Write new case. */
243   ok = case_tmpfile_put_case (sc->disk, row, &c);
244   if (ok)
245     range_set_insert (sc->disk_cases, row, 1);
246
247   return ok;
248 }
249
250 /* Writes the VALUE_CNT values in VALUES into columns
251    COLUMNS...(COLUMNS + VALUE_CNT), exclusive, in the given ROW
252    in SC.
253    Returns true if successful, false on I/O error. */
254 bool
255 sparse_cases_write (struct sparse_cases *sc, casenumber row, size_t column,
256                     const union value values[], size_t value_cnt)
257 {
258   if (sc->memory != NULL)
259     {
260       struct ccase *c = sparse_array_get (sc->memory, row);
261       if (c == NULL)
262         {
263           if (sparse_array_count (sc->memory) >= sc->max_memory_cases)
264             {
265               if (!dump_sparse_cases_to_disk (sc))
266                 return false;
267               return write_disk_case (sc, row, column, values, value_cnt);
268             }
269
270           c = sparse_array_insert (sc->memory, row);
271           case_create (c, sc->column_cnt);
272           if (sc->default_columns != NULL
273               && (column != 0 || value_cnt != sc->column_cnt))
274             case_copy_in (c, 0, sc->default_columns, sc->column_cnt);
275         }
276       case_copy_in (c, column, values, value_cnt);
277       return true;
278     }
279   else
280     return write_disk_case (sc, row, column, values, value_cnt);
281 }
282
283 /* Writes the VALUE_CNT values in VALUES to columns
284    START_COLUMN...(START_COLUMN + VALUE_CNT), exclusive, in every
285    row in SC, even those rows that have not yet been written.
286    Returns true if successful, false on I/O error.
287
288    The runtime of this function is linear in the number of rows
289    in SC that have already been written. */
290 bool
291 sparse_cases_write_columns (struct sparse_cases *sc, size_t start_column,
292                             const union value values[], size_t value_cnt)
293 {
294   assert (value_cnt <= sc->column_cnt);
295   assert (start_column + value_cnt <= sc->column_cnt);
296
297   /* Set defaults. */
298   if (sc->default_columns == NULL)
299     sc->default_columns = xnmalloc (sc->column_cnt,
300                                     sizeof *sc->default_columns);
301   memcpy (sc->default_columns + start_column, values,
302           value_cnt * sizeof *sc->default_columns);
303
304   /* Set individual rows. */
305   if (sc->memory != NULL)
306     {
307       struct ccase *c;
308       unsigned long int idx;
309
310       for (c = sparse_array_scan (sc->memory, NULL, &idx); c != NULL;
311            c = sparse_array_scan (sc->memory, &idx, &idx))
312         case_copy_in (c, start_column, values, value_cnt);
313     }
314   else
315     {
316       const struct range_set_node *node;
317
318       for (node = range_set_first (sc->disk_cases); node != NULL;
319            node = range_set_next (sc->disk_cases, node))
320         {
321           unsigned long int start = range_set_node_get_start (node);
322           unsigned long int end = range_set_node_get_end (node);
323           unsigned long int row;
324
325           for (row = start; row < end; row++)
326             case_tmpfile_put_values (sc->disk, row,
327                                      start_column, values, value_cnt);
328         }
329
330       if (case_tmpfile_error (sc->disk))
331         return false;
332     }
333   return true;
334 }