sparse-xarray: Add missing #include <limits.h>.
[pspp-builds.git] / src / libpspp / sparse-xarray.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2007, 2009 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <libpspp/sparse-xarray.h>
20
21 #include <limits.h>
22 #include <stdint.h>
23 #include <stdlib.h>
24 #include <string.h>
25
26 #include <libpspp/assertion.h>
27 #include <libpspp/misc.h>
28 #include <libpspp/range-set.h>
29 #include <libpspp/sparse-array.h>
30 #include <libpspp/tmpfile.h>
31
32 #include "md4.h"
33 #include "minmax.h"
34 #include "xalloc.h"
35
36 /* A sparse array of arrays of bytes. */
37 struct sparse_xarray
38   {
39     size_t n_bytes;                     /* Number of bytes per row. */
40     uint8_t *default_row;               /* Defaults for unwritten rows. */
41     unsigned long int max_memory_rows;  /* Max rows before dumping to disk. */
42     struct sparse_array *memory;        /* Backing, if stored in memory. */
43     struct tmpfile *disk;               /* Backing, if stored on disk. */
44     struct range_set *disk_rows;        /* Allocated rows, if on disk. */
45   };
46
47 static bool UNUSED
48 range_is_valid (const struct sparse_xarray *sx, size_t ofs, size_t n)
49 {
50   return n <= sx->n_bytes && ofs <= sx->n_bytes && ofs + n <= sx->n_bytes;
51 }
52
53 /* Creates and returns a new sparse array of arrays of bytes.
54    Each row in the sparse array will consist of N_BYTES bytes.
55    If fewer than MAX_MEMORY_ROWS rows are added to the array,
56    then it will be kept in memory; if more than that are added,
57    then it will be stored on disk. */
58 struct sparse_xarray *
59 sparse_xarray_create (size_t n_bytes, size_t max_memory_rows)
60 {
61   struct sparse_xarray *sx = xmalloc (sizeof *sx);
62   sx->n_bytes = n_bytes;
63   sx->default_row = xzalloc (n_bytes);
64   sx->max_memory_rows = max_memory_rows;
65   sx->memory = sparse_array_create (sizeof (uint8_t *));
66   sx->disk = NULL;
67   sx->disk_rows = NULL;
68   return sx;
69 }
70
71 /* Creates and returns a new sparse array of rows that contains
72    the same data as OLD.  Returns a null pointer if cloning
73    fails. */
74 struct sparse_xarray *
75 sparse_xarray_clone (const struct sparse_xarray *old)
76 {
77   struct sparse_xarray *new = xmalloc (sizeof *new);
78
79   new->n_bytes = old->n_bytes;
80
81   new->default_row = xmemdup (old->default_row, old->n_bytes);
82
83   new->max_memory_rows = old->max_memory_rows;
84
85   if (old->memory != NULL)
86     {
87       unsigned long int idx;
88       uint8_t **old_row;
89
90       new->memory = sparse_array_create (sizeof (uint8_t *));
91       for (old_row = sparse_array_first (old->memory, &idx); old_row != NULL;
92            old_row = sparse_array_next (old->memory, idx, &idx))
93         {
94           uint8_t **new_row = sparse_array_insert (new->memory, idx);
95           *new_row = xmemdup (*old_row, new->n_bytes);
96         }
97     }
98   else
99     new->memory = NULL;
100
101   if (old->disk != NULL)
102     {
103       const struct range_set_node *node;
104       void *tmp = xmalloc (old->n_bytes);
105
106       new->disk = tmpfile_create ();
107       new->disk_rows = range_set_clone (old->disk_rows, NULL);
108       for (node = range_set_first (old->disk_rows); node != NULL;
109            node = range_set_next (old->disk_rows, node))
110         {
111           unsigned long int start = range_set_node_get_start (node);
112           unsigned long int end = range_set_node_get_end (node);
113           unsigned long int idx;
114
115           for (idx = start; idx < end; idx++)
116             {
117               off_t offset = (off_t) idx * old->n_bytes;
118               if (!tmpfile_read (old->disk, offset, old->n_bytes, tmp)
119                   || !tmpfile_write (new->disk, offset, old->n_bytes, tmp))
120                 {
121                   free (tmp);
122                   sparse_xarray_destroy (new);
123                   return NULL;
124                 }
125             }
126         }
127       free (tmp);
128     }
129   else
130     {
131       new->disk = NULL;
132       new->disk_rows = NULL;
133     }
134
135   return new;
136 }
137
138 /* Destroys sparse array of rows SX. */
139 void
140 sparse_xarray_destroy (struct sparse_xarray *sx)
141 {
142   if (sx != NULL)
143     {
144       free (sx->default_row);
145       if (sx->memory != NULL)
146         {
147           unsigned long int idx;
148           uint8_t **row;
149           for (row = sparse_array_first (sx->memory, &idx); row != NULL;
150                row = sparse_array_next (sx->memory, idx, &idx))
151             free (*row);
152           sparse_array_destroy (sx->memory);
153         }
154       tmpfile_destroy (sx->disk);
155       range_set_destroy (sx->disk_rows);
156       free (sx);
157     }
158 }
159
160 /* Returns the number of bytes in each row in SX. */
161 size_t
162 sparse_xarray_get_n_columns (const struct sparse_xarray *sx)
163 {
164   return sx->n_bytes;
165 }
166
167 /* Returns the number of rows in SX. */
168 size_t
169 sparse_xarray_get_n_rows (const struct sparse_xarray *sx)
170 {
171   if (sx->memory)
172     {
173       unsigned long int idx;
174       return sparse_array_last (sx->memory, &idx) != NULL ? idx + 1 : 0;
175     }
176   else
177     {
178       const struct range_set_node *last = range_set_last (sx->disk_rows);
179       return last != NULL ? range_set_node_get_end (last) : 0;
180     }
181 }
182
183 /* Dumps the rows in SX, which must currently be stored in
184    memory, to disk.  Returns true if successful, false on I/O
185    error. */
186 static bool
187 dump_sparse_xarray_to_disk (struct sparse_xarray *sx)
188 {
189   unsigned long int idx;
190   uint8_t **row;
191
192   assert (sx->memory != NULL);
193   assert (sx->disk == NULL);
194
195   sx->disk = tmpfile_create ();
196   sx->disk_rows = range_set_create ();
197
198   for (row = sparse_array_first (sx->memory, &idx); row != NULL;
199        row = sparse_array_next (sx->memory, idx, &idx))
200     {
201       if (!tmpfile_write (sx->disk, (off_t) idx * sx->n_bytes, sx->n_bytes,
202                           *row))
203         {
204           tmpfile_destroy (sx->disk);
205           sx->disk = NULL;
206           range_set_destroy (sx->disk_rows);
207           sx->disk_rows = NULL;
208           return false;
209         }
210       range_set_insert (sx->disk_rows, idx, 1);
211     }
212   sparse_array_destroy (sx->memory);
213   sx->memory = NULL;
214   return true;
215 }
216
217 /* Returns true if any data has ever been written to ROW in SX,
218    false otherwise. */
219 bool
220 sparse_xarray_contains_row (const struct sparse_xarray *sx,
221                             unsigned long int row)
222 {
223   return (sx->memory != NULL
224           ? sparse_array_get (sx->memory, row) != NULL
225           : range_set_contains (sx->disk_rows, row));
226 }
227
228 /* Reads columns COLUMNS...(COLUMNS + VALUE_CNT), exclusive, in
229    the given ROW in SX, into the VALUE_CNT values in VALUES.
230    Returns true if successful, false on I/O error. */
231 bool
232 sparse_xarray_read (const struct sparse_xarray *sx, unsigned long int row,
233                     size_t start, size_t n, void *data)
234 {
235   assert (range_is_valid (sx, start, n));
236
237   if (sx->memory != NULL)
238     {
239       uint8_t **p = sparse_array_get (sx->memory, row);
240       if (p != NULL)
241         {
242           memcpy (data, *p + start, n);
243           return true;
244         }
245     }
246   else
247     {
248       if (range_set_contains (sx->disk_rows, row))
249         return tmpfile_read (sx->disk, (off_t) row * sx->n_bytes + start,
250                              n, data);
251     }
252
253   memcpy (data, sx->default_row + start, n);
254   return true;
255 }
256
257 /* Implements sparse_xarray_write for an on-disk sparse_xarray. */
258 static bool
259 write_disk_row (struct sparse_xarray *sx, unsigned long int row,
260                 size_t start, size_t n, const void *data)
261 {
262   off_t ofs = (off_t) row * sx->n_bytes;
263   if (range_set_contains (sx->disk_rows, row))
264     return tmpfile_write (sx->disk, ofs + start, n, data);
265   else
266     {
267       range_set_insert (sx->disk_rows, row, 1);
268       return (tmpfile_write (sx->disk, ofs, start, sx->default_row)
269               && tmpfile_write (sx->disk, ofs + start, n, data)
270               && tmpfile_write (sx->disk, ofs + start + n,
271                                 sx->n_bytes - start - n,
272                                 sx->default_row + start + n));
273     }
274 }
275
276 /* Writes the VALUE_CNT values in VALUES into columns
277    COLUMNS...(COLUMNS + VALUE_CNT), exclusive, in the given ROW
278    in SX.
279    Returns true if successful, false on I/O error. */
280 bool
281 sparse_xarray_write (struct sparse_xarray *sx, unsigned long int row,
282                      size_t start, size_t n, const void *data)
283 {
284   assert (range_is_valid (sx, start, n));
285   if (sx->memory != NULL)
286     {
287       uint8_t **p = sparse_array_get (sx->memory, row);
288       if (p == NULL)
289         {
290           if (sparse_array_count (sx->memory) < sx->max_memory_rows)
291             {
292               p = sparse_array_insert (sx->memory, row);
293               *p = xmemdup (sx->default_row, sx->n_bytes);
294             }
295           else
296             {
297               if (!dump_sparse_xarray_to_disk (sx))
298                 return false;
299               return write_disk_row (sx, row, start, n, data);
300             }
301         }
302       memcpy (*p + start, data, n);
303       return true;
304     }
305   else
306     return write_disk_row (sx, row, start, n, data);
307 }
308
309 /* Writes the VALUE_CNT values in VALUES to columns
310    START_COLUMN...(START_COLUMN + VALUE_CNT), exclusive, in every
311    row in SX, even those rows that have not yet been written.
312    Returns true if successful, false on I/O error.
313
314    The runtime of this function is linear in the number of rows
315    in SX that have already been written. */
316 bool
317 sparse_xarray_write_columns (struct sparse_xarray *sx, size_t start,
318                              size_t n, const void *data)
319 {
320   assert (range_is_valid (sx, start, n));
321
322   /* Set defaults. */
323   memcpy (sx->default_row + start, data, n);
324
325   /* Set individual rows. */
326   if (sx->memory != NULL)
327     {
328       unsigned long int idx;
329       uint8_t **p;
330
331       for (p = sparse_array_first (sx->memory, &idx); p != NULL;
332            p = sparse_array_next (sx->memory, idx, &idx))
333         memcpy (*p + start, data, n);
334     }
335   else
336     {
337       const struct range_set_node *node;
338
339       for (node = range_set_first (sx->disk_rows); node != NULL;
340            node = range_set_next (sx->disk_rows, node))
341         {
342           unsigned long int start_row = range_set_node_get_start (node);
343           unsigned long int end_row = range_set_node_get_end (node);
344           unsigned long int row;
345
346           for (row = start_row; row < end_row; row++)
347             {
348               off_t offset = (off_t) row * sx->n_bytes;
349               if (!tmpfile_write (sx->disk, offset + start, n, data))
350                 break;
351             }
352         }
353
354       if (tmpfile_error (sx->disk))
355         return false;
356     }
357   return true;
358 }
359
360 static unsigned long int
361 scan_first (const struct sparse_xarray *sx)
362 {
363   if (sx->memory)
364     {
365       unsigned long int idx;
366       return sparse_array_first (sx->memory, &idx) ? idx : ULONG_MAX;
367     }
368   else
369     return range_set_scan (sx->disk_rows, 0);
370 }
371
372 static unsigned long int
373 scan_next (const struct sparse_xarray *sx, unsigned long int start)
374 {
375   if (sx->memory)
376     {
377       unsigned long int idx;
378       return sparse_array_next (sx->memory, start, &idx) ? idx : ULONG_MAX;
379     }
380   else
381     return range_set_scan (sx->disk_rows, start + 1);
382 }
383
384 /* Only works for rows for which sparse_xarray_contains_row()
385    would return true. */
386 static uint8_t *
387 get_row (const struct sparse_xarray *sx, unsigned long int idx,
388          uint8_t *buffer)
389 {
390   if (sx->memory)
391     {
392       uint8_t **p = sparse_array_get (sx->memory, idx);
393       return *p;
394     }
395   else if (tmpfile_read (sx->disk, (off_t) idx * sx->n_bytes,
396                          sx->n_bytes, buffer))
397     return buffer;
398   else
399     return NULL;
400 }
401
402 /* Iterates over all the rows in SX and DX, passing each pair of
403    rows with equal indexes to CB.  CB's modifications, if any, to
404    destination rows are written back to DX.
405
406    All rows that are actually in use in SX or in DX or both are
407    passed to CB.  If a row is in use in SX but not in DX, or vice
408    versa, then the "default" row (as set by
409    sparse_xarray_write_columns) is passed as the contents of the
410    other row.
411
412    CB is also called once with the default row from SX and the
413    default row from DX.  Modifying the data passed as the default
414    row from DX will change DX's default row.
415
416    Returns true if successful, false if I/O on SX or DX fails or
417    if CB returns false.  On failure, the contents of DX are
418    undefined. */
419 bool
420 sparse_xarray_copy (const struct sparse_xarray *sx, struct sparse_xarray *dx,
421                     bool (*cb) (const void *src, void *dst, void *aux),
422                     void *aux)
423 {
424   bool success = true;
425
426   if (!cb (sx->default_row, dx->default_row, aux))
427     return false;
428
429   if (sx == dx)
430     {
431       if (sx->memory)
432         {
433           unsigned long int idx;
434           uint8_t **row;
435
436           for (row = sparse_array_first (sx->memory, &idx); row != NULL;
437                row = sparse_array_next (sx->memory, idx, &idx))
438             {
439               success = cb (*row, *row, aux);
440               if (!success)
441                 break;
442             }
443         }
444       else if (sx->disk)
445         {
446           const struct range_set_node *node;
447           void *tmp = xmalloc (sx->n_bytes);
448
449           for (node = range_set_first (sx->disk_rows); node != NULL;
450                node = range_set_next (sx->disk_rows, node))
451             {
452               unsigned long int start = range_set_node_get_start (node);
453               unsigned long int end = range_set_node_get_end (node);
454               unsigned long int row;
455
456               for (row = start; row < end; row++)
457                 {
458                   off_t offset = (off_t) row * sx->n_bytes;
459                   success = (tmpfile_read (sx->disk, offset, sx->n_bytes, tmp)
460                              && cb (tmp, tmp, aux)
461                              && tmpfile_write (dx->disk, offset,
462                                                dx->n_bytes, tmp));
463                   if (!success)
464                     break;
465                 }
466             }
467
468           free (tmp);
469         }
470     }
471   else
472     {
473       unsigned long int src_idx = scan_first (sx);
474       unsigned long int dst_idx = scan_first (dx);
475
476       uint8_t *tmp_src_row = xmalloc (sx->n_bytes);
477       uint8_t *tmp_dst_row = xmalloc (dx->n_bytes);
478
479       for (;;)
480         {
481           unsigned long int idx;
482           const uint8_t *src_row;
483           uint8_t *dst_row;
484
485           /* Determine the index of the row to process.  If
486              src_idx == dst_idx, then the row has been written in
487              both SX and DX.  Otherwise, it has been written in
488              only the sparse_xarray corresponding to the smaller
489              index, and has the default contents in the other. */
490           idx = MIN (src_idx, dst_idx);
491           if (idx == ULONG_MAX)
492             break;
493
494           /* Obtain a copy of the source row as src_row. */
495           if (idx == src_idx)
496             src_row = get_row (sx, idx, tmp_src_row);
497           else
498             src_row = sx->default_row;
499
500           /* Obtain the destination row as dst_row. */
501           if (idx == dst_idx)
502             dst_row = get_row (dx, idx, tmp_dst_row);
503           else if (dx->memory
504                    && sparse_array_count (dx->memory) < dx->max_memory_rows)
505             {
506               uint8_t **p = sparse_array_insert (dx->memory, idx);
507               dst_row = *p = xmemdup (dx->default_row, dx->n_bytes);
508             }
509           else
510             {
511               memcpy (tmp_dst_row, dx->default_row, dx->n_bytes);
512               dst_row = tmp_dst_row;
513             }
514
515           /* Run the callback. */
516           success = cb (src_row, dst_row, aux);
517           if (!success)
518             break;
519
520           /* Write back the destination row, if necessary. */
521           if (dst_row == tmp_dst_row)
522             {
523               success = sparse_xarray_write (dx, idx, 0, dx->n_bytes, dst_row);
524               if (!success)
525                 break;
526             }
527           else
528             {
529               /* Nothing to do: we modified the destination row in-place. */
530             }
531
532           /* Advance to the next row. */
533           if (src_idx == idx)
534             src_idx = scan_next (sx, src_idx);
535           if (dst_idx == idx)
536             dst_idx = scan_next (dx, dst_idx);
537         }
538
539       free (tmp_src_row);
540       free (tmp_dst_row);
541     }
542
543   return success;
544 }
545
546 /* Returns a hash value for SX suitable for use with the model
547    checker.  The value in BASIS is folded into the hash.
548
549    The returned hash value is *not* suitable for storage and
550    retrieval of sparse_xarrays that have identical contents,
551    because it will return different hash values for
552    sparse_xarrays that have the same contents (and it's slow).
553
554    We use MD4 because it is much faster than MD5 or SHA-1 but its
555    collision resistance is just as good. */
556 unsigned int
557 sparse_xarray_model_checker_hash (const struct sparse_xarray *sx,
558                                   unsigned int basis)
559 {
560   unsigned int hash[DIV_RND_UP (20, sizeof (unsigned int))];
561   struct md4_ctx ctx;
562
563   md4_init_ctx (&ctx);
564   md4_process_bytes (&basis, sizeof basis, &ctx);
565   md4_process_bytes (&sx->n_bytes, sizeof sx->n_bytes, &ctx);
566   md4_process_bytes (sx->default_row, sx->n_bytes, &ctx);
567
568   if (sx->memory)
569     {
570       unsigned long int idx;
571       uint8_t **row;
572
573       md4_process_bytes ("m", 1, &ctx);
574       md4_process_bytes (&sx->max_memory_rows, sizeof sx->max_memory_rows,
575                          &ctx);
576       for (row = sparse_array_first (sx->memory, &idx); row != NULL;
577            row = sparse_array_next (sx->memory, idx, &idx))
578         {
579           md4_process_bytes (&idx, sizeof idx, &ctx);
580           md4_process_bytes (*row, sx->n_bytes, &ctx);
581         }
582     }
583   else
584     {
585       const struct range_set_node *node;
586       void *tmp = xmalloc (sx->n_bytes);
587
588       md4_process_bytes ("d", 1, &ctx);
589       for (node = range_set_first (sx->disk_rows); node != NULL;
590            node = range_set_next (sx->disk_rows, node))
591         {
592           unsigned long int start = range_set_node_get_start (node);
593           unsigned long int end = range_set_node_get_end (node);
594           unsigned long int idx;
595
596           for (idx = start; idx < end; idx++)
597             {
598               off_t offset = (off_t) idx * sx->n_bytes;
599               if (!tmpfile_read (sx->disk, offset, sx->n_bytes, tmp))
600                 NOT_REACHED ();
601               md4_process_bytes (&idx, sizeof idx, &ctx);
602               md4_process_bytes (tmp, sx->n_bytes, &ctx);
603             }
604         }
605       free (tmp);
606     }
607   md4_finish_ctx (&ctx, hash);
608   return hash[0];
609 }