Fixed a bug reading empty strings from Postgres databases.
[pspp] / src / data / sparse-cases.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2007 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <data/sparse-cases.h>
20
21 #include <stdlib.h>
22 #include <string.h>
23
24 #include <data/settings.h>
25 #include <data/case-tmpfile.h>
26 #include <libpspp/assertion.h>
27 #include <libpspp/range-set.h>
28 #include <libpspp/sparse-array.h>
29
30 #include "xalloc.h"
31
32 /* A sparse array of cases. */
33 struct sparse_cases
34   {
35     size_t column_cnt;                  /* Number of values per case. */
36     union value *default_columns;       /* Defaults for unwritten cases. */
37     casenumber max_memory_cases;        /* Max cases before dumping to disk. */
38     struct sparse_array *memory;        /* Backing, if stored in memory. */
39     struct case_tmpfile *disk;          /* Backing, if stored on disk. */
40     struct range_set *disk_cases;       /* Allocated cases, if on disk. */
41   };
42
43 /* Creates and returns a new sparse array of cases with
44    COLUMN_CNT values per case. */
45 struct sparse_cases *
46 sparse_cases_create (size_t column_cnt)
47 {
48   struct sparse_cases *sc = xmalloc (sizeof *sc);
49   sc->column_cnt = column_cnt;
50   sc->default_columns = NULL;
51   sc->max_memory_cases = settings_get_workspace_cases (column_cnt);
52   sc->memory = sparse_array_create (sizeof (struct ccase));
53   sc->disk = NULL;
54   sc->disk_cases = NULL;
55   return sc;
56 }
57
58 /* Creates and returns a new sparse array of cases that contains
59    the same data as OLD. */
60 struct sparse_cases *
61 sparse_cases_clone (const struct sparse_cases *old)
62 {
63   struct sparse_cases *new = xmalloc (sizeof *new);
64
65   new->column_cnt = old->column_cnt;
66
67   if (old->default_columns != NULL)
68     new->default_columns
69       = xmemdup (old->default_columns,
70                  old->column_cnt * sizeof *old->default_columns);
71   else
72     new->default_columns = NULL;
73
74   new->max_memory_cases = old->max_memory_cases;
75
76   if (old->memory != NULL)
77     {
78       unsigned long int idx;
79       struct ccase *c;
80
81       new->memory = sparse_array_create (sizeof (struct ccase));
82       for (c = sparse_array_scan (old->memory, NULL, &idx); c != NULL;
83            c = sparse_array_scan (old->memory, &idx, &idx))
84         case_clone (sparse_array_insert (new->memory, idx), c);
85     }
86   else
87     new->memory = NULL;
88
89   if (old->disk != NULL)
90     {
91       const struct range_set_node *node;
92
93       new->disk = case_tmpfile_create (old->column_cnt);
94       new->disk_cases = range_set_create ();
95       for (node = range_set_first (old->disk_cases); node != NULL;
96            node = range_set_next (old->disk_cases, node))
97         {
98           unsigned long int start = range_set_node_get_start (node);
99           unsigned long int end = range_set_node_get_end (node);
100           unsigned long int idx;
101           struct ccase c;
102
103           for (idx = start; idx < end; idx++)
104             if (!case_tmpfile_get_case (old->disk, idx, &c)
105                 || !case_tmpfile_put_case (new->disk, idx, &c))
106               {
107                 sparse_cases_destroy (new);
108                 return NULL;
109               }
110         }
111     }
112   else
113     {
114       new->disk = NULL;
115       new->disk_cases = NULL;
116     }
117
118   return new;
119 }
120
121 /* Destroys sparse array of cases SC. */
122 void
123 sparse_cases_destroy (struct sparse_cases *sc)
124 {
125   if (sc != NULL)
126     {
127       if (sc->memory != NULL)
128         {
129           unsigned long int idx;
130           struct ccase *c;
131           for (c = sparse_array_scan (sc->memory, NULL, &idx); c != NULL;
132                c = sparse_array_scan (sc->memory, &idx, &idx))
133             case_destroy (c);
134           sparse_array_destroy (sc->memory);
135         }
136       free (sc->default_columns);
137       case_tmpfile_destroy (sc->disk);
138       range_set_destroy (sc->disk_cases);
139       free (sc);
140     }
141 }
142
143 /* Returns the number of `union value's in each case in SC. */
144 size_t
145 sparse_cases_get_value_cnt (const struct sparse_cases *sc)
146 {
147   return sc->column_cnt;
148 }
149
150 /* Dumps the cases in SC, which must currently be stored in
151    memory, to disk.  Returns true if successful, false on I/O
152    error. */
153 static bool
154 dump_sparse_cases_to_disk (struct sparse_cases *sc)
155 {
156   unsigned long int idx;
157   struct ccase *c;
158
159   assert (sc->memory != NULL);
160   assert (sc->disk == NULL);
161
162   sc->disk = case_tmpfile_create (sc->column_cnt);
163   sc->disk_cases = range_set_create ();
164
165   for (c = sparse_array_scan (sc->memory, NULL, &idx); c != NULL;
166        c = sparse_array_scan (sc->memory, &idx, &idx))
167     {
168       if (!case_tmpfile_put_case (sc->disk, idx, c))
169         {
170           case_tmpfile_destroy (sc->disk);
171           sc->disk = NULL;
172           range_set_destroy (sc->disk_cases);
173           sc->disk_cases = NULL;
174           return false;
175         }
176       range_set_insert (sc->disk_cases, idx, 1);
177     }
178   sparse_array_destroy (sc->memory);
179   sc->memory = NULL;
180   return true;
181 }
182
183 /* Returns true if any data has ever been written to ROW in SC,
184    false otherwise. */
185 bool
186 sparse_cases_contains_row (const struct sparse_cases *sc, casenumber row)
187 {
188   return (sc->memory != NULL
189           ? sparse_array_get (sc->memory, row) != NULL
190           : range_set_contains (sc->disk_cases, row));
191 }
192
193 /* Reads columns COLUMNS...(COLUMNS + VALUE_CNT), exclusive, in
194    the given ROW in SC, into the VALUE_CNT values in VALUES.
195    Returns true if successful, false on I/O error. */
196 bool
197 sparse_cases_read (struct sparse_cases *sc, casenumber row, size_t column,
198                    union value values[], size_t value_cnt)
199 {
200   assert (value_cnt <= sc->column_cnt);
201   assert (column + value_cnt <= sc->column_cnt);
202
203   if (sparse_cases_contains_row (sc, row))
204     {
205       struct ccase c;
206       if (sc->memory != NULL)
207         case_clone (&c, sparse_array_get (sc->memory, row));
208       else if (!case_tmpfile_get_case (sc->disk, row, &c))
209         return false;
210       case_copy_out (&c, column, values, value_cnt);
211       case_destroy (&c);
212     }
213   else
214     {
215       assert (sc->default_columns != NULL);
216       memcpy (values, sc->default_columns + column,
217               sizeof *values * value_cnt);
218     }
219
220   return true;
221 }
222
223 /* Implements sparse_cases_write for an on-disk sparse_cases. */
224 static bool
225 write_disk_case (struct sparse_cases *sc, casenumber row, size_t column,
226                  const union value values[], size_t value_cnt)
227 {
228   struct ccase c;
229   bool ok;
230
231   /* Get current case data. */
232   if (column == 0 && value_cnt == sc->column_cnt)
233     case_create (&c, sc->column_cnt);
234   else if (!case_tmpfile_get_case (sc->disk, row, &c))
235     return false;
236
237   /* Copy in new data. */
238   case_copy_in (&c, column, values, value_cnt);
239
240   /* Write new case. */
241   ok = case_tmpfile_put_case (sc->disk, row, &c);
242   if (ok)
243     range_set_insert (sc->disk_cases, row, 1);
244
245   return ok;
246 }
247
248 /* Writes the VALUE_CNT values in VALUES into columns
249    COLUMNS...(COLUMNS + VALUE_CNT), exclusive, in the given ROW
250    in SC.
251    Returns true if successful, false on I/O error. */
252 bool
253 sparse_cases_write (struct sparse_cases *sc, casenumber row, size_t column,
254                     const union value values[], size_t value_cnt)
255 {
256   if (sc->memory != NULL)
257     {
258       struct ccase *c = sparse_array_get (sc->memory, row);
259       if (c == NULL)
260         {
261           if (sparse_array_count (sc->memory) >= sc->max_memory_cases)
262             {
263               if (!dump_sparse_cases_to_disk (sc))
264                 return false;
265               return write_disk_case (sc, row, column, values, value_cnt);
266             }
267
268           c = sparse_array_insert (sc->memory, row);
269           case_create (c, sc->column_cnt);
270           if (sc->default_columns != NULL
271               && (column != 0 || value_cnt != sc->column_cnt))
272             case_copy_in (c, 0, sc->default_columns, sc->column_cnt);
273         }
274       case_copy_in (c, column, values, value_cnt);
275       return true;
276     }
277   else
278     return write_disk_case (sc, row, column, values, value_cnt);
279 }
280
281 /* Writes the VALUE_CNT values in VALUES to columns
282    START_COLUMN...(START_COLUMN + VALUE_CNT), exclusive, in every
283    row in SC, even those rows that have not yet been written.
284    Returns true if successful, false on I/O error.
285
286    The runtime of this function is linear in the number of rows
287    in SC that have already been written. */
288 bool
289 sparse_cases_write_columns (struct sparse_cases *sc, size_t start_column,
290                             const union value values[], size_t value_cnt)
291 {
292   assert (value_cnt <= sc->column_cnt);
293   assert (start_column + value_cnt <= sc->column_cnt);
294
295   /* Set defaults. */
296   if (sc->default_columns == NULL)
297     sc->default_columns = xnmalloc (sc->column_cnt,
298                                     sizeof *sc->default_columns);
299   memcpy (sc->default_columns + start_column, values,
300           value_cnt * sizeof *sc->default_columns);
301
302   /* Set individual rows. */
303   if (sc->memory != NULL)
304     {
305       struct ccase *c;
306       unsigned long int idx;
307
308       for (c = sparse_array_scan (sc->memory, NULL, &idx); c != NULL;
309            c = sparse_array_scan (sc->memory, &idx, &idx))
310         case_copy_in (c, start_column, values, value_cnt);
311     }
312   else
313     {
314       const struct range_set_node *node;
315
316       for (node = range_set_first (sc->disk_cases); node != NULL;
317            node = range_set_next (sc->disk_cases, node))
318         {
319           unsigned long int start = range_set_node_get_start (node);
320           unsigned long int end = range_set_node_get_end (node);
321           unsigned long int row;
322
323           for (row = start; row < end; row++)
324             case_tmpfile_put_values (sc->disk, row,
325                                      start_column, values, value_cnt);
326         }
327
328       if (case_tmpfile_error (sc->disk))
329         return false;
330     }
331   return true;
332 }