7abe429147a893af7a48bf88eb5db99cb25313ae
[pspp-builds.git] / src / data / sparse-cases.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2007, 2009 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <data/sparse-cases.h>
20
21 #include <stdlib.h>
22 #include <string.h>
23
24 #include <data/case.h>
25 #include <data/settings.h>
26 #include <data/case-tmpfile.h>
27 #include <libpspp/assertion.h>
28 #include <libpspp/range-set.h>
29 #include <libpspp/sparse-array.h>
30
31 #include "xalloc.h"
32
33 /* A sparse array of cases. */
34 struct sparse_cases
35   {
36     size_t column_cnt;                  /* Number of values per case. */
37     union value *default_columns;       /* Defaults for unwritten cases. */
38     casenumber max_memory_cases;        /* Max cases before dumping to disk. */
39     struct sparse_array *memory;        /* Backing, if stored in memory. */
40     struct case_tmpfile *disk;          /* Backing, if stored on disk. */
41     struct range_set *disk_cases;       /* Allocated cases, if on disk. */
42   };
43
44 /* Creates and returns a new sparse array of cases with
45    COLUMN_CNT values per case. */
46 struct sparse_cases *
47 sparse_cases_create (size_t column_cnt)
48 {
49   struct sparse_cases *sc = xmalloc (sizeof *sc);
50   sc->column_cnt = column_cnt;
51   sc->default_columns = NULL;
52   sc->max_memory_cases = settings_get_workspace_cases (column_cnt);
53   sc->memory = sparse_array_create (sizeof (struct ccase *));
54   sc->disk = NULL;
55   sc->disk_cases = NULL;
56   return sc;
57 }
58
59 /* Creates and returns a new sparse array of cases that contains
60    the same data as OLD. */
61 struct sparse_cases *
62 sparse_cases_clone (const struct sparse_cases *old)
63 {
64   struct sparse_cases *new = xmalloc (sizeof *new);
65
66   new->column_cnt = old->column_cnt;
67
68   if (old->default_columns != NULL)
69     new->default_columns
70       = xmemdup (old->default_columns,
71                  old->column_cnt * sizeof *old->default_columns);
72   else
73     new->default_columns = NULL;
74
75   new->max_memory_cases = old->max_memory_cases;
76
77   if (old->memory != NULL)
78     {
79       unsigned long int idx;
80       struct ccase **cp;
81
82       new->memory = sparse_array_create (sizeof (struct ccase *));
83       for (cp = sparse_array_first (old->memory, &idx); cp != NULL;
84            cp = sparse_array_next (old->memory, idx, &idx))
85         {
86           struct ccase **ncp = sparse_array_insert (new->memory, idx);
87           *ncp = case_ref (*cp);
88         }
89     }
90   else
91     new->memory = NULL;
92
93   if (old->disk != NULL)
94     {
95       const struct range_set_node *node;
96
97       new->disk = case_tmpfile_create (old->column_cnt);
98       new->disk_cases = range_set_create ();
99       for (node = range_set_first (old->disk_cases); node != NULL;
100            node = range_set_next (old->disk_cases, node))
101         {
102           unsigned long int start = range_set_node_get_start (node);
103           unsigned long int end = range_set_node_get_end (node);
104           unsigned long int idx;
105
106           for (idx = start; idx < end; idx++) 
107             {
108               struct ccase *c = case_tmpfile_get_case (old->disk, idx);
109               if (c == NULL || !case_tmpfile_put_case (new->disk, idx, c))
110                 {
111                   sparse_cases_destroy (new);
112                   return NULL;
113                 }
114             }
115         }
116     }
117   else
118     {
119       new->disk = NULL;
120       new->disk_cases = NULL;
121     }
122
123   return new;
124 }
125
126 /* Destroys sparse array of cases SC. */
127 void
128 sparse_cases_destroy (struct sparse_cases *sc)
129 {
130   if (sc != NULL)
131     {
132       if (sc->memory != NULL)
133         {
134           unsigned long int idx;
135           struct ccase **cp;
136           for (cp = sparse_array_first (sc->memory, &idx); cp != NULL;
137                cp = sparse_array_next (sc->memory, idx, &idx))
138             case_unref (*cp);
139           sparse_array_destroy (sc->memory);
140         }
141       free (sc->default_columns);
142       case_tmpfile_destroy (sc->disk);
143       range_set_destroy (sc->disk_cases);
144       free (sc);
145     }
146 }
147
148 /* Returns the number of `union value's in each case in SC. */
149 size_t
150 sparse_cases_get_value_cnt (const struct sparse_cases *sc)
151 {
152   return sc->column_cnt;
153 }
154
155 /* Dumps the cases in SC, which must currently be stored in
156    memory, to disk.  Returns true if successful, false on I/O
157    error. */
158 static bool
159 dump_sparse_cases_to_disk (struct sparse_cases *sc)
160 {
161   unsigned long int idx;
162   struct ccase **cp;
163
164   assert (sc->memory != NULL);
165   assert (sc->disk == NULL);
166
167   sc->disk = case_tmpfile_create (sc->column_cnt);
168   sc->disk_cases = range_set_create ();
169
170   for (cp = sparse_array_first (sc->memory, &idx); cp != NULL;
171        cp = sparse_array_next (sc->memory, idx, &idx))
172     {
173       if (!case_tmpfile_put_case (sc->disk, idx, *cp))
174         {
175           case_tmpfile_destroy (sc->disk);
176           sc->disk = NULL;
177           range_set_destroy (sc->disk_cases);
178           sc->disk_cases = NULL;
179           return false;
180         }
181       range_set_insert (sc->disk_cases, idx, 1);
182     }
183   sparse_array_destroy (sc->memory);
184   sc->memory = NULL;
185   return true;
186 }
187
188 /* Returns true if any data has ever been written to ROW in SC,
189    false otherwise. */
190 bool
191 sparse_cases_contains_row (const struct sparse_cases *sc, casenumber row)
192 {
193   return (sc->memory != NULL
194           ? sparse_array_get (sc->memory, row) != NULL
195           : range_set_contains (sc->disk_cases, row));
196 }
197
198 /* Reads columns COLUMNS...(COLUMNS + VALUE_CNT), exclusive, in
199    the given ROW in SC, into the VALUE_CNT values in VALUES.
200    Returns true if successful, false on I/O error. */
201 bool
202 sparse_cases_read (struct sparse_cases *sc, casenumber row, size_t column,
203                    union value values[], size_t value_cnt)
204 {
205   assert (value_cnt <= sc->column_cnt);
206   assert (column + value_cnt <= sc->column_cnt);
207
208   if (sparse_cases_contains_row (sc, row))
209     {
210       struct ccase *c;
211       if (sc->memory != NULL)
212         {
213           struct ccase **cp = sparse_array_get (sc->memory, row);
214           c = case_ref (*cp);
215         }
216       else
217         {
218           c = case_tmpfile_get_case (sc->disk, row);
219           if (c == NULL)
220             return false;
221         }
222       case_copy_out (c, column, values, value_cnt);
223       case_unref (c);
224     }
225   else
226     {
227       assert (sc->default_columns != NULL);
228       memcpy (values, sc->default_columns + column,
229               sizeof *values * value_cnt);
230     }
231
232   return true;
233 }
234
235 /* Implements sparse_cases_write for an on-disk sparse_cases. */
236 static bool
237 write_disk_case (struct sparse_cases *sc, casenumber row, size_t column,
238                  const union value values[], size_t value_cnt)
239 {
240   struct ccase *c;
241   bool ok;
242
243   /* Get current case data. */
244   if (column == 0 && value_cnt == sc->column_cnt)
245     c = case_create (sc->column_cnt);
246   else
247     {
248       c = case_tmpfile_get_case (sc->disk, row);
249       if (c == NULL)
250         return false;
251     }
252
253   /* Copy in new data. */
254   case_copy_in (c, column, values, value_cnt);
255
256   /* Write new case. */
257   ok = case_tmpfile_put_case (sc->disk, row, c);
258   if (ok)
259     range_set_insert (sc->disk_cases, row, 1);
260
261   return ok;
262 }
263
264 /* Writes the VALUE_CNT values in VALUES into columns
265    COLUMNS...(COLUMNS + VALUE_CNT), exclusive, in the given ROW
266    in SC.
267    Returns true if successful, false on I/O error. */
268 bool
269 sparse_cases_write (struct sparse_cases *sc, casenumber row, size_t column,
270                     const union value values[], size_t value_cnt)
271 {
272   if (sc->memory != NULL)
273     {
274       struct ccase *c, **cp;
275       cp = sparse_array_get (sc->memory, row);
276       if (cp != NULL)
277         c = *cp = case_unshare (*cp);
278       else
279         {
280           if (sparse_array_count (sc->memory) >= sc->max_memory_cases)
281             {
282               if (!dump_sparse_cases_to_disk (sc))
283                 return false;
284               return write_disk_case (sc, row, column, values, value_cnt);
285             }
286
287           cp = sparse_array_insert (sc->memory, row);
288           c = *cp = case_create (sc->column_cnt);
289           if (sc->default_columns != NULL
290               && (column != 0 || value_cnt != sc->column_cnt))
291             case_copy_in (c, 0, sc->default_columns, sc->column_cnt);
292         }
293       case_copy_in (c, column, values, value_cnt);
294       return true;
295     }
296   else
297     return write_disk_case (sc, row, column, values, value_cnt);
298 }
299
300 /* Writes the VALUE_CNT values in VALUES to columns
301    START_COLUMN...(START_COLUMN + VALUE_CNT), exclusive, in every
302    row in SC, even those rows that have not yet been written.
303    Returns true if successful, false on I/O error.
304
305    The runtime of this function is linear in the number of rows
306    in SC that have already been written. */
307 bool
308 sparse_cases_write_columns (struct sparse_cases *sc, size_t start_column,
309                             const union value values[], size_t value_cnt)
310 {
311   assert (value_cnt <= sc->column_cnt);
312   assert (start_column + value_cnt <= sc->column_cnt);
313
314   /* Set defaults. */
315   if (sc->default_columns == NULL)
316     sc->default_columns = xnmalloc (sc->column_cnt,
317                                     sizeof *sc->default_columns);
318   memcpy (sc->default_columns + start_column, values,
319           value_cnt * sizeof *sc->default_columns);
320
321   /* Set individual rows. */
322   if (sc->memory != NULL)
323     {
324       struct ccase **cp;
325       unsigned long int idx;
326
327       for (cp = sparse_array_first (sc->memory, &idx); cp != NULL;
328            cp = sparse_array_next (sc->memory, idx, &idx))
329         {
330           *cp = case_unshare (*cp);
331           case_copy_in (*cp, start_column, values, value_cnt);
332         }
333     }
334   else
335     {
336       const struct range_set_node *node;
337
338       for (node = range_set_first (sc->disk_cases); node != NULL;
339            node = range_set_next (sc->disk_cases, node))
340         {
341           unsigned long int start = range_set_node_get_start (node);
342           unsigned long int end = range_set_node_get_end (node);
343           unsigned long int row;
344
345           for (row = start; row < end; row++)
346             case_tmpfile_put_values (sc->disk, row,
347                                      start_column, values, value_cnt);
348         }
349
350       if (case_tmpfile_error (sc->disk))
351         return false;
352     }
353   return true;
354 }