New data structure sparse_xarray.
[pspp-builds.git] / src / libpspp / sparse-xarray.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2007, 2009 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <libpspp/sparse-xarray.h>
20
21 #include <stdint.h>
22 #include <stdlib.h>
23 #include <string.h>
24
25 #include <libpspp/assertion.h>
26 #include <libpspp/misc.h>
27 #include <libpspp/range-set.h>
28 #include <libpspp/sparse-array.h>
29 #include <libpspp/tmpfile.h>
30
31 #include "md4.h"
32 #include "minmax.h"
33 #include "xalloc.h"
34
35 /* A sparse array of arrays of bytes. */
36 struct sparse_xarray
37   {
38     size_t n_bytes;                     /* Number of bytes per row. */
39     uint8_t *default_row;               /* Defaults for unwritten rows. */
40     unsigned long int max_memory_rows;  /* Max rows before dumping to disk. */
41     struct sparse_array *memory;        /* Backing, if stored in memory. */
42     struct tmpfile *disk;               /* Backing, if stored on disk. */
43     struct range_set *disk_rows;        /* Allocated rows, if on disk. */
44   };
45
46 static bool UNUSED
47 range_is_valid (const struct sparse_xarray *sx, size_t ofs, size_t n)
48 {
49   return n <= sx->n_bytes && ofs <= sx->n_bytes && ofs + n <= sx->n_bytes;
50 }
51
52 /* Creates and returns a new sparse array of arrays of bytes.
53    Each row in the sparse array will consist of N_BYTES bytes.
54    If fewer than MAX_MEMORY_ROWS rows are added to the array,
55    then it will be kept in memory; if more than that are added,
56    then it will be stored on disk. */
57 struct sparse_xarray *
58 sparse_xarray_create (size_t n_bytes, size_t max_memory_rows)
59 {
60   struct sparse_xarray *sx = xmalloc (sizeof *sx);
61   sx->n_bytes = n_bytes;
62   sx->default_row = xzalloc (n_bytes);
63   sx->max_memory_rows = max_memory_rows;
64   sx->memory = sparse_array_create (sizeof (uint8_t *));
65   sx->disk = NULL;
66   sx->disk_rows = NULL;
67   return sx;
68 }
69
70 /* Creates and returns a new sparse array of rows that contains
71    the same data as OLD.  Returns a null pointer if cloning
72    fails. */
73 struct sparse_xarray *
74 sparse_xarray_clone (const struct sparse_xarray *old)
75 {
76   struct sparse_xarray *new = xmalloc (sizeof *new);
77
78   new->n_bytes = old->n_bytes;
79
80   new->default_row = xmemdup (old->default_row, old->n_bytes);
81
82   new->max_memory_rows = old->max_memory_rows;
83
84   if (old->memory != NULL)
85     {
86       unsigned long int idx;
87       uint8_t **old_row;
88
89       new->memory = sparse_array_create (sizeof (uint8_t *));
90       for (old_row = sparse_array_first (old->memory, &idx); old_row != NULL;
91            old_row = sparse_array_next (old->memory, idx, &idx))
92         {
93           uint8_t **new_row = sparse_array_insert (new->memory, idx);
94           *new_row = xmemdup (*old_row, new->n_bytes);
95         }
96     }
97   else
98     new->memory = NULL;
99
100   if (old->disk != NULL)
101     {
102       const struct range_set_node *node;
103       void *tmp = xmalloc (old->n_bytes);
104
105       new->disk = tmpfile_create ();
106       new->disk_rows = range_set_clone (old->disk_rows, NULL);
107       for (node = range_set_first (old->disk_rows); node != NULL;
108            node = range_set_next (old->disk_rows, node))
109         {
110           unsigned long int start = range_set_node_get_start (node);
111           unsigned long int end = range_set_node_get_end (node);
112           unsigned long int idx;
113
114           for (idx = start; idx < end; idx++)
115             {
116               off_t offset = (off_t) idx * old->n_bytes;
117               if (!tmpfile_read (old->disk, offset, old->n_bytes, tmp)
118                   || !tmpfile_write (new->disk, offset, old->n_bytes, tmp))
119                 {
120                   free (tmp);
121                   sparse_xarray_destroy (new);
122                   return NULL;
123                 }
124             }
125         }
126       free (tmp);
127     }
128   else
129     {
130       new->disk = NULL;
131       new->disk_rows = NULL;
132     }
133
134   return new;
135 }
136
137 /* Destroys sparse array of rows SX. */
138 void
139 sparse_xarray_destroy (struct sparse_xarray *sx)
140 {
141   if (sx != NULL)
142     {
143       free (sx->default_row);
144       if (sx->memory != NULL)
145         {
146           unsigned long int idx;
147           uint8_t **row;
148           for (row = sparse_array_first (sx->memory, &idx); row != NULL;
149                row = sparse_array_next (sx->memory, idx, &idx))
150             free (*row);
151           sparse_array_destroy (sx->memory);
152         }
153       tmpfile_destroy (sx->disk);
154       range_set_destroy (sx->disk_rows);
155       free (sx);
156     }
157 }
158
159 /* Returns the number of bytes in each row in SX. */
160 size_t
161 sparse_xarray_get_n_columns (const struct sparse_xarray *sx)
162 {
163   return sx->n_bytes;
164 }
165
166 /* Returns the number of rows in SX. */
167 size_t
168 sparse_xarray_get_n_rows (const struct sparse_xarray *sx)
169 {
170   if (sx->memory)
171     {
172       unsigned long int idx;
173       return sparse_array_last (sx->memory, &idx) != NULL ? idx + 1 : 0;
174     }
175   else
176     {
177       const struct range_set_node *last = range_set_last (sx->disk_rows);
178       return last != NULL ? range_set_node_get_end (last) : 0;
179     }
180 }
181
182 /* Dumps the rows in SX, which must currently be stored in
183    memory, to disk.  Returns true if successful, false on I/O
184    error. */
185 static bool
186 dump_sparse_xarray_to_disk (struct sparse_xarray *sx)
187 {
188   unsigned long int idx;
189   uint8_t **row;
190
191   assert (sx->memory != NULL);
192   assert (sx->disk == NULL);
193
194   sx->disk = tmpfile_create ();
195   sx->disk_rows = range_set_create ();
196
197   for (row = sparse_array_first (sx->memory, &idx); row != NULL;
198        row = sparse_array_next (sx->memory, idx, &idx))
199     {
200       if (!tmpfile_write (sx->disk, (off_t) idx * sx->n_bytes, sx->n_bytes,
201                           *row))
202         {
203           tmpfile_destroy (sx->disk);
204           sx->disk = NULL;
205           range_set_destroy (sx->disk_rows);
206           sx->disk_rows = NULL;
207           return false;
208         }
209       range_set_insert (sx->disk_rows, idx, 1);
210     }
211   sparse_array_destroy (sx->memory);
212   sx->memory = NULL;
213   return true;
214 }
215
216 /* Returns true if any data has ever been written to ROW in SX,
217    false otherwise. */
218 bool
219 sparse_xarray_contains_row (const struct sparse_xarray *sx,
220                             unsigned long int row)
221 {
222   return (sx->memory != NULL
223           ? sparse_array_get (sx->memory, row) != NULL
224           : range_set_contains (sx->disk_rows, row));
225 }
226
227 /* Reads columns COLUMNS...(COLUMNS + VALUE_CNT), exclusive, in
228    the given ROW in SX, into the VALUE_CNT values in VALUES.
229    Returns true if successful, false on I/O error. */
230 bool
231 sparse_xarray_read (const struct sparse_xarray *sx, unsigned long int row,
232                     size_t start, size_t n, void *data)
233 {
234   assert (range_is_valid (sx, start, n));
235
236   if (sx->memory != NULL)
237     {
238       uint8_t **p = sparse_array_get (sx->memory, row);
239       if (p != NULL)
240         {
241           memcpy (data, *p + start, n);
242           return true;
243         }
244     }
245   else
246     {
247       if (range_set_contains (sx->disk_rows, row))
248         return tmpfile_read (sx->disk, (off_t) row * sx->n_bytes + start,
249                              n, data);
250     }
251
252   memcpy (data, sx->default_row + start, n);
253   return true;
254 }
255
256 /* Implements sparse_xarray_write for an on-disk sparse_xarray. */
257 static bool
258 write_disk_row (struct sparse_xarray *sx, unsigned long int row,
259                 size_t start, size_t n, const void *data)
260 {
261   off_t ofs = (off_t) row * sx->n_bytes;
262   if (range_set_contains (sx->disk_rows, row))
263     return tmpfile_write (sx->disk, ofs + start, n, data);
264   else
265     {
266       range_set_insert (sx->disk_rows, row, 1);
267       return (tmpfile_write (sx->disk, ofs, start, sx->default_row)
268               && tmpfile_write (sx->disk, ofs + start, n, data)
269               && tmpfile_write (sx->disk, ofs + start + n,
270                                 sx->n_bytes - start - n,
271                                 sx->default_row + start + n));
272     }
273 }
274
275 /* Writes the VALUE_CNT values in VALUES into columns
276    COLUMNS...(COLUMNS + VALUE_CNT), exclusive, in the given ROW
277    in SX.
278    Returns true if successful, false on I/O error. */
279 bool
280 sparse_xarray_write (struct sparse_xarray *sx, unsigned long int row,
281                      size_t start, size_t n, const void *data)
282 {
283   assert (range_is_valid (sx, start, n));
284   if (sx->memory != NULL)
285     {
286       uint8_t **p = sparse_array_get (sx->memory, row);
287       if (p == NULL)
288         {
289           if (sparse_array_count (sx->memory) < sx->max_memory_rows)
290             {
291               p = sparse_array_insert (sx->memory, row);
292               *p = xmemdup (sx->default_row, sx->n_bytes);
293             }
294           else
295             {
296               if (!dump_sparse_xarray_to_disk (sx))
297                 return false;
298               return write_disk_row (sx, row, start, n, data);
299             }
300         }
301       memcpy (*p + start, data, n);
302       return true;
303     }
304   else
305     return write_disk_row (sx, row, start, n, data);
306 }
307
308 /* Writes the VALUE_CNT values in VALUES to columns
309    START_COLUMN...(START_COLUMN + VALUE_CNT), exclusive, in every
310    row in SX, even those rows that have not yet been written.
311    Returns true if successful, false on I/O error.
312
313    The runtime of this function is linear in the number of rows
314    in SX that have already been written. */
315 bool
316 sparse_xarray_write_columns (struct sparse_xarray *sx, size_t start,
317                              size_t n, const void *data)
318 {
319   assert (range_is_valid (sx, start, n));
320
321   /* Set defaults. */
322   memcpy (sx->default_row + start, data, n);
323
324   /* Set individual rows. */
325   if (sx->memory != NULL)
326     {
327       unsigned long int idx;
328       uint8_t **p;
329
330       for (p = sparse_array_first (sx->memory, &idx); p != NULL;
331            p = sparse_array_next (sx->memory, idx, &idx))
332         memcpy (*p + start, data, n);
333     }
334   else
335     {
336       const struct range_set_node *node;
337
338       for (node = range_set_first (sx->disk_rows); node != NULL;
339            node = range_set_next (sx->disk_rows, node))
340         {
341           unsigned long int start_row = range_set_node_get_start (node);
342           unsigned long int end_row = range_set_node_get_end (node);
343           unsigned long int row;
344
345           for (row = start_row; row < end_row; row++)
346             {
347               off_t offset = (off_t) row * sx->n_bytes;
348               if (!tmpfile_write (sx->disk, offset + start, n, data))
349                 break;
350             }
351         }
352
353       if (tmpfile_error (sx->disk))
354         return false;
355     }
356   return true;
357 }
358
359 static unsigned long int
360 scan_first (const struct sparse_xarray *sx)
361 {
362   if (sx->memory)
363     {
364       unsigned long int idx;
365       return sparse_array_first (sx->memory, &idx) ? idx : ULONG_MAX;
366     }
367   else
368     return range_set_scan (sx->disk_rows, 0);
369 }
370
371 static unsigned long int
372 scan_next (const struct sparse_xarray *sx, unsigned long int start)
373 {
374   if (sx->memory)
375     {
376       unsigned long int idx;
377       return sparse_array_next (sx->memory, start, &idx) ? idx : ULONG_MAX;
378     }
379   else
380     return range_set_scan (sx->disk_rows, start + 1);
381 }
382
383 /* Only works for rows for which sparse_xarray_contains_row()
384    would return true. */
385 static uint8_t *
386 get_row (const struct sparse_xarray *sx, unsigned long int idx,
387          uint8_t *buffer)
388 {
389   if (sx->memory)
390     {
391       uint8_t **p = sparse_array_get (sx->memory, idx);
392       return *p;
393     }
394   else if (tmpfile_read (sx->disk, (off_t) idx * sx->n_bytes,
395                          sx->n_bytes, buffer))
396     return buffer;
397   else
398     return NULL;
399 }
400
401 /* Iterates over all the rows in SX and DX, passing each pair of
402    rows with equal indexes to CB.  CB's modifications, if any, to
403    destination rows are written back to DX.
404
405    All rows that are actually in use in SX or in DX or both are
406    passed to CB.  If a row is in use in SX but not in DX, or vice
407    versa, then the "default" row (as set by
408    sparse_xarray_write_columns) is passed as the contents of the
409    other row.
410
411    CB is also called once with the default row from SX and the
412    default row from DX.  Modifying the data passed as the default
413    row from DX will change DX's default row.
414
415    Returns true if successful, false if I/O on SX or DX fails or
416    if CB returns false.  On failure, the contents of DX are
417    undefined. */
418 bool
419 sparse_xarray_copy (const struct sparse_xarray *sx, struct sparse_xarray *dx,
420                     bool (*cb) (const void *src, void *dst, void *aux),
421                     void *aux)
422 {
423   bool success = true;
424
425   if (!cb (sx->default_row, dx->default_row, aux))
426     return false;
427
428   if (sx == dx)
429     {
430       if (sx->memory)
431         {
432           unsigned long int idx;
433           uint8_t **row;
434
435           for (row = sparse_array_first (sx->memory, &idx); row != NULL;
436                row = sparse_array_next (sx->memory, idx, &idx))
437             {
438               success = cb (*row, *row, aux);
439               if (!success)
440                 break;
441             }
442         }
443       else if (sx->disk)
444         {
445           const struct range_set_node *node;
446           void *tmp = xmalloc (sx->n_bytes);
447
448           for (node = range_set_first (sx->disk_rows); node != NULL;
449                node = range_set_next (sx->disk_rows, node))
450             {
451               unsigned long int start = range_set_node_get_start (node);
452               unsigned long int end = range_set_node_get_end (node);
453               unsigned long int row;
454
455               for (row = start; row < end; row++)
456                 {
457                   off_t offset = (off_t) row * sx->n_bytes;
458                   success = (tmpfile_read (sx->disk, offset, sx->n_bytes, tmp)
459                              && cb (tmp, tmp, aux)
460                              && tmpfile_write (dx->disk, offset,
461                                                dx->n_bytes, tmp));
462                   if (!success)
463                     break;
464                 }
465             }
466
467           free (tmp);
468         }
469     }
470   else
471     {
472       unsigned long int src_idx = scan_first (sx);
473       unsigned long int dst_idx = scan_first (dx);
474
475       uint8_t *tmp_src_row = xmalloc (sx->n_bytes);
476       uint8_t *tmp_dst_row = xmalloc (dx->n_bytes);
477
478       for (;;)
479         {
480           unsigned long int idx;
481           const uint8_t *src_row;
482           uint8_t *dst_row;
483
484           /* Determine the index of the row to process.  If
485              src_idx == dst_idx, then the row has been written in
486              both SX and DX.  Otherwise, it has been written in
487              only the sparse_xarray corresponding to the smaller
488              index, and has the default contents in the other. */
489           idx = MIN (src_idx, dst_idx);
490           if (idx == ULONG_MAX)
491             break;
492
493           /* Obtain a copy of the source row as src_row. */
494           if (idx == src_idx)
495             src_row = get_row (sx, idx, tmp_src_row);
496           else
497             src_row = sx->default_row;
498
499           /* Obtain the destination row as dst_row. */
500           if (idx == dst_idx)
501             dst_row = get_row (dx, idx, tmp_dst_row);
502           else if (dx->memory
503                    && sparse_array_count (dx->memory) < dx->max_memory_rows)
504             {
505               uint8_t **p = sparse_array_insert (dx->memory, idx);
506               dst_row = *p = xmemdup (dx->default_row, dx->n_bytes);
507             }
508           else
509             {
510               memcpy (tmp_dst_row, dx->default_row, dx->n_bytes);
511               dst_row = tmp_dst_row;
512             }
513
514           /* Run the callback. */
515           success = cb (src_row, dst_row, aux);
516           if (!success)
517             break;
518
519           /* Write back the destination row, if necessary. */
520           if (dst_row == tmp_dst_row)
521             {
522               success = sparse_xarray_write (dx, idx, 0, dx->n_bytes, dst_row);
523               if (!success)
524                 break;
525             }
526           else
527             {
528               /* Nothing to do: we modified the destination row in-place. */
529             }
530
531           /* Advance to the next row. */
532           if (src_idx == idx)
533             src_idx = scan_next (sx, src_idx);
534           if (dst_idx == idx)
535             dst_idx = scan_next (dx, dst_idx);
536         }
537
538       free (tmp_src_row);
539       free (tmp_dst_row);
540     }
541
542   return success;
543 }
544
545 /* Returns a hash value for SX suitable for use with the model
546    checker.  The value in BASIS is folded into the hash.
547
548    The returned hash value is *not* suitable for storage and
549    retrieval of sparse_xarrays that have identical contents,
550    because it will return different hash values for
551    sparse_xarrays that have the same contents (and it's slow).
552
553    We use MD4 because it is much faster than MD5 or SHA-1 but its
554    collision resistance is just as good. */
555 unsigned int
556 sparse_xarray_model_checker_hash (const struct sparse_xarray *sx,
557                                   unsigned int basis)
558 {
559   unsigned int hash[DIV_RND_UP (20, sizeof (unsigned int))];
560   struct md4_ctx ctx;
561
562   md4_init_ctx (&ctx);
563   md4_process_bytes (&basis, sizeof basis, &ctx);
564   md4_process_bytes (&sx->n_bytes, sizeof sx->n_bytes, &ctx);
565   md4_process_bytes (sx->default_row, sx->n_bytes, &ctx);
566
567   if (sx->memory)
568     {
569       unsigned long int idx;
570       uint8_t **row;
571
572       md4_process_bytes ("m", 1, &ctx);
573       md4_process_bytes (&sx->max_memory_rows, sizeof sx->max_memory_rows,
574                          &ctx);
575       for (row = sparse_array_first (sx->memory, &idx); row != NULL;
576            row = sparse_array_next (sx->memory, idx, &idx))
577         {
578           md4_process_bytes (&idx, sizeof idx, &ctx);
579           md4_process_bytes (*row, sx->n_bytes, &ctx);
580         }
581     }
582   else
583     {
584       const struct range_set_node *node;
585       void *tmp = xmalloc (sx->n_bytes);
586
587       md4_process_bytes ("d", 1, &ctx);
588       for (node = range_set_first (sx->disk_rows); node != NULL;
589            node = range_set_next (sx->disk_rows, node))
590         {
591           unsigned long int start = range_set_node_get_start (node);
592           unsigned long int end = range_set_node_get_end (node);
593           unsigned long int idx;
594
595           for (idx = start; idx < end; idx++)
596             {
597               off_t offset = (off_t) idx * sx->n_bytes;
598               if (!tmpfile_read (sx->disk, offset, sx->n_bytes, tmp))
599                 NOT_REACHED ();
600               md4_process_bytes (&idx, sizeof idx, &ctx);
601               md4_process_bytes (tmp, sx->n_bytes, &ctx);
602             }
603         }
604       free (tmp);
605     }
606   md4_finish_ctx (&ctx, hash);
607   return hash[0];
608 }