range-set: New macro RANGE_SET_FOR_EACH to make iteration easier.
[pspp] / src / libpspp / sparse-xarray.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2007, 2009, 2010, 2011, 2012 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include "libpspp/sparse-xarray.h"
20
21 #include <limits.h>
22 #include <stdint.h>
23 #include <stdlib.h>
24 #include <string.h>
25
26 #include "libpspp/assertion.h"
27 #include "libpspp/ext-array.h"
28 #include "libpspp/misc.h"
29 #include "libpspp/range-set.h"
30 #include "libpspp/sparse-array.h"
31
32 #include "gl/md4.h"
33 #include "gl/minmax.h"
34 #include "gl/xalloc.h"
35
36 /* A sparse array of arrays of bytes. */
37 struct sparse_xarray
38   {
39     size_t n_bytes;                     /* Number of bytes per row. */
40     uint8_t *default_row;               /* Defaults for unwritten rows. */
41     unsigned long int max_memory_rows;  /* Max rows before dumping to disk. */
42     struct sparse_array *memory;        /* Backing, if stored in memory. */
43     struct ext_array *disk;             /* Backing, if stored on disk. */
44     struct range_set *disk_rows;        /* Allocated rows, if on disk. */
45   };
46
47 static bool UNUSED
48 range_is_valid (const struct sparse_xarray *sx, size_t ofs, size_t n)
49 {
50   return n <= sx->n_bytes && ofs <= sx->n_bytes && ofs + n <= sx->n_bytes;
51 }
52
53 /* Creates and returns a new sparse array of arrays of bytes.
54    Each row in the sparse array will consist of N_BYTES bytes.
55    If fewer than MAX_MEMORY_ROWS rows are added to the array,
56    then it will be kept in memory; if more than that are added,
57    then it will be stored on disk. */
58 struct sparse_xarray *
59 sparse_xarray_create (size_t n_bytes, size_t max_memory_rows)
60 {
61   struct sparse_xarray *sx = xmalloc (sizeof *sx);
62   sx->n_bytes = n_bytes;
63   sx->default_row = xzalloc (n_bytes);
64   sx->max_memory_rows = max_memory_rows;
65   sx->memory = sparse_array_create (sizeof (uint8_t *));
66   sx->disk = NULL;
67   sx->disk_rows = NULL;
68   return sx;
69 }
70
71 /* Creates and returns a new sparse array of rows that contains
72    the same data as OLD.  Returns a null pointer if cloning
73    fails. */
74 struct sparse_xarray *
75 sparse_xarray_clone (const struct sparse_xarray *old)
76 {
77   struct sparse_xarray *new = xmalloc (sizeof *new);
78
79   new->n_bytes = old->n_bytes;
80
81   new->default_row = xmemdup (old->default_row, old->n_bytes);
82
83   new->max_memory_rows = old->max_memory_rows;
84
85   if (old->memory != NULL)
86     {
87       unsigned long int idx;
88       uint8_t **old_row;
89
90       new->memory = sparse_array_create (sizeof (uint8_t *));
91       for (old_row = sparse_array_first (old->memory, &idx); old_row != NULL;
92            old_row = sparse_array_next (old->memory, idx, &idx))
93         {
94           uint8_t **new_row = sparse_array_insert (new->memory, idx);
95           *new_row = xmemdup (*old_row, new->n_bytes);
96         }
97     }
98   else
99     new->memory = NULL;
100
101   if (old->disk != NULL)
102     {
103       const struct range_set_node *node;
104       void *tmp = xmalloc (old->n_bytes);
105
106       new->disk = ext_array_create ();
107       new->disk_rows = range_set_clone (old->disk_rows, NULL);
108       RANGE_SET_FOR_EACH (node, old->disk_rows)
109         {
110           unsigned long int start = range_set_node_get_start (node);
111           unsigned long int end = range_set_node_get_end (node);
112           unsigned long int idx;
113
114           for (idx = start; idx < end; idx++)
115             {
116               off_t offset = (off_t) idx * old->n_bytes;
117               if (!ext_array_read (old->disk, offset, old->n_bytes, tmp)
118                   || !ext_array_write (new->disk, offset, old->n_bytes, tmp))
119                 {
120                   free (tmp);
121                   sparse_xarray_destroy (new);
122                   return NULL;
123                 }
124             }
125         }
126       free (tmp);
127     }
128   else
129     {
130       new->disk = NULL;
131       new->disk_rows = NULL;
132     }
133
134   return new;
135 }
136
137 /* Destroys sparse array of rows SX. */
138 void
139 sparse_xarray_destroy (struct sparse_xarray *sx)
140 {
141   if (sx != NULL)
142     {
143       free (sx->default_row);
144       if (sx->memory != NULL)
145         {
146           unsigned long int idx;
147           uint8_t **row;
148           for (row = sparse_array_first (sx->memory, &idx); row != NULL;
149                row = sparse_array_next (sx->memory, idx, &idx))
150             free (*row);
151           sparse_array_destroy (sx->memory);
152         }
153       ext_array_destroy (sx->disk);
154       range_set_destroy (sx->disk_rows);
155       free (sx);
156     }
157 }
158
159 /* Returns the number of bytes in each row in SX. */
160 size_t
161 sparse_xarray_get_n_columns (const struct sparse_xarray *sx)
162 {
163   return sx->n_bytes;
164 }
165
166 /* Returns the number of rows in SX. */
167 size_t
168 sparse_xarray_get_n_rows (const struct sparse_xarray *sx)
169 {
170   if (sx->memory)
171     {
172       unsigned long int idx;
173       return sparse_array_last (sx->memory, &idx) != NULL ? idx + 1 : 0;
174     }
175   else
176     {
177       const struct range_set_node *last = range_set_last (sx->disk_rows);
178       return last != NULL ? range_set_node_get_end (last) : 0;
179     }
180 }
181
182 /* Dumps the rows in SX, which must currently be stored in
183    memory, to disk.  Returns true if successful, false on I/O
184    error. */
185 static bool
186 dump_sparse_xarray_to_disk (struct sparse_xarray *sx)
187 {
188   unsigned long int idx;
189   uint8_t **row;
190
191   assert (sx->memory != NULL);
192   assert (sx->disk == NULL);
193
194   sx->disk = ext_array_create ();
195   sx->disk_rows = range_set_create ();
196
197   for (row = sparse_array_first (sx->memory, &idx); row != NULL;
198        row = sparse_array_next (sx->memory, idx, &idx))
199     {
200       if (!ext_array_write (sx->disk, (off_t) idx * sx->n_bytes, sx->n_bytes,
201                           *row))
202         {
203           ext_array_destroy (sx->disk);
204           sx->disk = NULL;
205           range_set_destroy (sx->disk_rows);
206           sx->disk_rows = NULL;
207           return false;
208         }
209       range_set_set1 (sx->disk_rows, idx, 1);
210     }
211   sparse_array_destroy (sx->memory);
212   sx->memory = NULL;
213   return true;
214 }
215
216 /* Returns true if any data has ever been written to ROW in SX,
217    false otherwise. */
218 bool
219 sparse_xarray_contains_row (const struct sparse_xarray *sx,
220                             unsigned long int row)
221 {
222   return (sx->memory != NULL
223           ? sparse_array_get (sx->memory, row) != NULL
224           : range_set_contains (sx->disk_rows, row));
225 }
226
227 /* Reads columns COLUMNS...(COLUMNS + VALUE_CNT), exclusive, in
228    the given ROW in SX, into the VALUE_CNT values in VALUES.
229    Returns true if successful, false on I/O error. */
230 bool
231 sparse_xarray_read (const struct sparse_xarray *sx, unsigned long int row,
232                     size_t start, size_t n, void *data)
233 {
234   assert (range_is_valid (sx, start, n));
235
236   if (sx->memory != NULL)
237     {
238       uint8_t **p = sparse_array_get (sx->memory, row);
239       if (p != NULL)
240         {
241           memcpy (data, *p + start, n);
242           return true;
243         }
244     }
245   else
246     {
247       if (range_set_contains (sx->disk_rows, row))
248         return ext_array_read (sx->disk, (off_t) row * sx->n_bytes + start,
249                                n, data);
250     }
251
252   memcpy (data, sx->default_row + start, n);
253   return true;
254 }
255
256 /* Implements sparse_xarray_write for an on-disk sparse_xarray. */
257 static bool
258 write_disk_row (struct sparse_xarray *sx, unsigned long int row,
259                 size_t start, size_t n, const void *data)
260 {
261   off_t ofs = (off_t) row * sx->n_bytes;
262   if (range_set_contains (sx->disk_rows, row))
263     return ext_array_write (sx->disk, ofs + start, n, data);
264   else
265     {
266       range_set_set1 (sx->disk_rows, row, 1);
267       return (ext_array_write (sx->disk, ofs, start, sx->default_row)
268               && ext_array_write (sx->disk, ofs + start, n, data)
269               && ext_array_write (sx->disk, ofs + start + n,
270                                   sx->n_bytes - start - n,
271                                   sx->default_row + start + n));
272     }
273 }
274
275 /* Writes the VALUE_CNT values in VALUES into columns
276    COLUMNS...(COLUMNS + VALUE_CNT), exclusive, in the given ROW
277    in SX.
278    Returns true if successful, false on I/O error. */
279 bool
280 sparse_xarray_write (struct sparse_xarray *sx, unsigned long int row,
281                      size_t start, size_t n, const void *data)
282 {
283   assert (range_is_valid (sx, start, n));
284   if (sx->memory != NULL)
285     {
286       uint8_t **p = sparse_array_get (sx->memory, row);
287       if (p == NULL)
288         {
289           if (sparse_array_count (sx->memory) < sx->max_memory_rows)
290             {
291               p = sparse_array_insert (sx->memory, row);
292               *p = xmemdup (sx->default_row, sx->n_bytes);
293             }
294           else
295             {
296               if (!dump_sparse_xarray_to_disk (sx))
297                 return false;
298               return write_disk_row (sx, row, start, n, data);
299             }
300         }
301       memcpy (*p + start, data, n);
302       return true;
303     }
304   else
305     return write_disk_row (sx, row, start, n, data);
306 }
307
308 /* Writes the VALUE_CNT values in VALUES to columns
309    START_COLUMN...(START_COLUMN + VALUE_CNT), exclusive, in every
310    row in SX, even those rows that have not yet been written.
311    Returns true if successful, false on I/O error.
312
313    The runtime of this function is linear in the number of rows
314    in SX that have already been written. */
315 bool
316 sparse_xarray_write_columns (struct sparse_xarray *sx, size_t start,
317                              size_t n, const void *data)
318 {
319   assert (range_is_valid (sx, start, n));
320
321   /* Set defaults. */
322   memcpy (sx->default_row + start, data, n);
323
324   /* Set individual rows. */
325   if (sx->memory != NULL)
326     {
327       unsigned long int idx;
328       uint8_t **p;
329
330       for (p = sparse_array_first (sx->memory, &idx); p != NULL;
331            p = sparse_array_next (sx->memory, idx, &idx))
332         memcpy (*p + start, data, n);
333     }
334   else
335     {
336       const struct range_set_node *node;
337
338       RANGE_SET_FOR_EACH (node, sx->disk_rows)
339         {
340           unsigned long int start_row = range_set_node_get_start (node);
341           unsigned long int end_row = range_set_node_get_end (node);
342           unsigned long int row;
343
344           for (row = start_row; row < end_row; row++)
345             {
346               off_t offset = (off_t) row * sx->n_bytes;
347               if (!ext_array_write (sx->disk, offset + start, n, data))
348                 break;
349             }
350         }
351
352       if (ext_array_error (sx->disk))
353         return false;
354     }
355   return true;
356 }
357
358 static unsigned long int
359 scan_first (const struct sparse_xarray *sx)
360 {
361   if (sx->memory)
362     {
363       unsigned long int idx;
364       return sparse_array_first (sx->memory, &idx) ? idx : ULONG_MAX;
365     }
366   else
367     return range_set_scan (sx->disk_rows, 0);
368 }
369
370 static unsigned long int
371 scan_next (const struct sparse_xarray *sx, unsigned long int start)
372 {
373   if (sx->memory)
374     {
375       unsigned long int idx;
376       return sparse_array_next (sx->memory, start, &idx) ? idx : ULONG_MAX;
377     }
378   else
379     return range_set_scan (sx->disk_rows, start + 1);
380 }
381
382 /* Only works for rows for which sparse_xarray_contains_row()
383    would return true. */
384 static uint8_t *
385 get_row (const struct sparse_xarray *sx, unsigned long int idx,
386          uint8_t *buffer)
387 {
388   if (sx->memory)
389     {
390       uint8_t **p = sparse_array_get (sx->memory, idx);
391       return *p;
392     }
393   else if (ext_array_read (sx->disk, (off_t) idx * sx->n_bytes,
394                            sx->n_bytes, buffer))
395     return buffer;
396   else
397     return NULL;
398 }
399
400 /* Iterates over all the rows in SX and DX, passing each pair of
401    rows with equal indexes to CB.  CB's modifications, if any, to
402    destination rows are written back to DX.
403
404    All rows that are actually in use in SX or in DX or both are
405    passed to CB.  If a row is in use in SX but not in DX, or vice
406    versa, then the "default" row (as set by
407    sparse_xarray_write_columns) is passed as the contents of the
408    other row.
409
410    CB is also called once with the default row from SX and the
411    default row from DX.  Modifying the data passed as the default
412    row from DX will change DX's default row.
413
414    Returns true if successful, false if I/O on SX or DX fails or
415    if CB returns false.  On failure, the contents of DX are
416    undefined. */
417 bool
418 sparse_xarray_copy (const struct sparse_xarray *sx, struct sparse_xarray *dx,
419                     bool (*cb) (const void *src, void *dst, void *aux),
420                     void *aux)
421 {
422   bool success = true;
423
424   if (!cb (sx->default_row, dx->default_row, aux))
425     return false;
426
427   if (sx == dx)
428     {
429       if (sx->memory)
430         {
431           unsigned long int idx;
432           uint8_t **row;
433
434           for (row = sparse_array_first (sx->memory, &idx); row != NULL;
435                row = sparse_array_next (sx->memory, idx, &idx))
436             {
437               success = cb (*row, *row, aux);
438               if (!success)
439                 break;
440             }
441         }
442       else if (sx->disk)
443         {
444           const struct range_set_node *node;
445           void *tmp = xmalloc (sx->n_bytes);
446
447           RANGE_SET_FOR_EACH (node, sx->disk_rows)
448             {
449               unsigned long int start = range_set_node_get_start (node);
450               unsigned long int end = range_set_node_get_end (node);
451               unsigned long int row;
452
453               for (row = start; row < end; row++)
454                 {
455                   off_t offset = (off_t) row * sx->n_bytes;
456                   success = (ext_array_read (sx->disk, offset, sx->n_bytes,
457                                              tmp)
458                              && cb (tmp, tmp, aux)
459                              && ext_array_write (dx->disk, offset,
460                                                  dx->n_bytes, tmp));
461                   if (!success)
462                     break;
463                 }
464             }
465
466           free (tmp);
467         }
468     }
469   else
470     {
471       unsigned long int src_idx = scan_first (sx);
472       unsigned long int dst_idx = scan_first (dx);
473
474       uint8_t *tmp_src_row = xmalloc (sx->n_bytes);
475       uint8_t *tmp_dst_row = xmalloc (dx->n_bytes);
476
477       for (;;)
478         {
479           unsigned long int idx;
480           const uint8_t *src_row;
481           uint8_t *dst_row;
482
483           /* Determine the index of the row to process.  If
484              src_idx == dst_idx, then the row has been written in
485              both SX and DX.  Otherwise, it has been written in
486              only the sparse_xarray corresponding to the smaller
487              index, and has the default contents in the other. */
488           idx = MIN (src_idx, dst_idx);
489           if (idx == ULONG_MAX)
490             break;
491
492           /* Obtain a copy of the source row as src_row. */
493           if (idx == src_idx)
494             src_row = get_row (sx, idx, tmp_src_row);
495           else
496             src_row = sx->default_row;
497
498           /* Obtain the destination row as dst_row. */
499           if (idx == dst_idx)
500             dst_row = get_row (dx, idx, tmp_dst_row);
501           else if (dx->memory
502                    && sparse_array_count (dx->memory) < dx->max_memory_rows)
503             {
504               uint8_t **p = sparse_array_insert (dx->memory, idx);
505               dst_row = *p = xmemdup (dx->default_row, dx->n_bytes);
506             }
507           else
508             {
509               memcpy (tmp_dst_row, dx->default_row, dx->n_bytes);
510               dst_row = tmp_dst_row;
511             }
512
513           /* Run the callback. */
514           success = cb (src_row, dst_row, aux);
515           if (!success)
516             break;
517
518           /* Write back the destination row, if necessary. */
519           if (dst_row == tmp_dst_row)
520             {
521               success = sparse_xarray_write (dx, idx, 0, dx->n_bytes, dst_row);
522               if (!success)
523                 break;
524             }
525           else
526             {
527               /* Nothing to do: we modified the destination row in-place. */
528             }
529
530           /* Advance to the next row. */
531           if (src_idx == idx)
532             src_idx = scan_next (sx, src_idx);
533           if (dst_idx == idx)
534             dst_idx = scan_next (dx, dst_idx);
535         }
536
537       free (tmp_src_row);
538       free (tmp_dst_row);
539     }
540
541   return success;
542 }
543
544 /* Returns a hash value for SX suitable for use with the model
545    checker.  The value in BASIS is folded into the hash.
546
547    The returned hash value is *not* suitable for storage and
548    retrieval of sparse_xarrays that have identical contents,
549    because it will return different hash values for
550    sparse_xarrays that have the same contents (and it's slow).
551
552    We use MD4 because it is much faster than MD5 or SHA-1 but its
553    collision resistance is just as good. */
554 unsigned int
555 sparse_xarray_model_checker_hash (const struct sparse_xarray *sx,
556                                   unsigned int basis)
557 {
558   unsigned int hash[DIV_RND_UP (20, sizeof (unsigned int))];
559   struct md4_ctx ctx;
560
561   md4_init_ctx (&ctx);
562   md4_process_bytes (&basis, sizeof basis, &ctx);
563   md4_process_bytes (&sx->n_bytes, sizeof sx->n_bytes, &ctx);
564   md4_process_bytes (sx->default_row, sx->n_bytes, &ctx);
565
566   if (sx->memory)
567     {
568       unsigned long int idx;
569       uint8_t **row;
570
571       md4_process_bytes ("m", 1, &ctx);
572       md4_process_bytes (&sx->max_memory_rows, sizeof sx->max_memory_rows,
573                          &ctx);
574       for (row = sparse_array_first (sx->memory, &idx); row != NULL;
575            row = sparse_array_next (sx->memory, idx, &idx))
576         {
577           md4_process_bytes (&idx, sizeof idx, &ctx);
578           md4_process_bytes (*row, sx->n_bytes, &ctx);
579         }
580     }
581   else
582     {
583       const struct range_set_node *node;
584       void *tmp = xmalloc (sx->n_bytes);
585
586       md4_process_bytes ("d", 1, &ctx);
587       RANGE_SET_FOR_EACH (node, sx->disk_rows)
588         {
589           unsigned long int start = range_set_node_get_start (node);
590           unsigned long int end = range_set_node_get_end (node);
591           unsigned long int idx;
592
593           for (idx = start; idx < end; idx++)
594             {
595               off_t offset = (off_t) idx * sx->n_bytes;
596               if (!ext_array_read (sx->disk, offset, sx->n_bytes, tmp))
597                 NOT_REACHED ();
598               md4_process_bytes (&idx, sizeof idx, &ctx);
599               md4_process_bytes (tmp, sx->n_bytes, &ctx);
600             }
601         }
602       free (tmp);
603     }
604   md4_finish_ctx (&ctx, hash);
605   return hash[0];
606 }