Update GPG key for signing flatpaks.
[pspp] / data / casereader.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 2007 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <data/casereader.h>
20 #include <data/casereader-provider.h>
21
22 #include <stdlib.h>
23
24 #include <data/casewindow.h>
25 #include <data/casewriter.h>
26 #include <data/settings.h>
27 #include <libpspp/assertion.h>
28 #include <libpspp/heap.h>
29 #include <libpspp/taint.h>
30
31 #include "xalloc.h"
32
33 /* A casereader. */
34 struct casereader
35   {
36     struct taint *taint;                  /* Corrupted? */
37     size_t value_cnt;                     /* Values per case. */
38     casenumber case_cnt;                  /* Number of cases,
39                                              CASENUMBER_MAX if unknown. */
40     const struct casereader_class *class; /* Class. */
41     void *aux;                            /* Auxiliary data for class. */
42   };
43
44 static void insert_shim (struct casereader *);
45
46 /* Creates a new case in C and reads the next case from READER
47    into it.  The caller owns C and must destroy C when its data
48    is no longer needed.  Return true if successful, false when
49    cases have been exhausted or upon detection of an I/O error.
50    In the latter case, C is set to the null case.
51
52    The case returned is effectively consumed: it can never be
53    read again through READER.  If this is inconvenient, READER
54    may be cloned in advance with casereader_clone, or
55    casereader_peek may be used instead. */
56 bool
57 casereader_read (struct casereader *reader, struct ccase *c)
58 {
59   if (reader->case_cnt != 0)
60     {
61       /* ->read may use casereader_swap to replace itself by
62          another reader and then delegate to that reader by
63          recursively calling casereader_read.  Currently only
64          lazy_casereader does this and, with luck, nothing else
65          ever will.
66
67          To allow this to work, however, we must decrement
68          case_cnt before calling ->read.  If we decremented
69          case_cnt after calling ->read, then this would actually
70          drop two cases from case_cnt instead of one, and we'd
71          lose the last case in the casereader. */
72       if (reader->case_cnt != CASENUMBER_MAX)
73         reader->case_cnt--;
74       if (reader->class->read (reader, reader->aux, c))
75         {
76           assert (case_get_value_cnt (c) >= reader->value_cnt);
77           return true;
78         }
79     }
80   reader->case_cnt = 0;
81   case_nullify (c);
82   return false;
83 }
84
85 /* Destroys READER.
86    Returns false if an I/O error was detected on READER, true
87    otherwise. */
88 bool
89 casereader_destroy (struct casereader *reader)
90 {
91   bool ok = true;
92   if (reader != NULL)
93     {
94       reader->class->destroy (reader, reader->aux);
95       ok = taint_destroy (reader->taint);
96       free (reader);
97     }
98   return ok;
99 }
100
101 /* Returns a clone of READER.  READER and its clone may be used
102    to read the same sequence of cases in the same order, barring
103    I/O errors. */
104 struct casereader *
105 casereader_clone (const struct casereader *reader_)
106 {
107   struct casereader *reader = (struct casereader *) reader_;
108   struct casereader *clone;
109   if ( reader == NULL ) 
110     return NULL;
111
112   if (reader->class->clone == NULL)
113     insert_shim (reader);
114   clone = reader->class->clone (reader, reader->aux);
115   assert (clone != NULL);
116   assert (clone != reader);
117   return clone;
118 }
119
120 /* Makes a copy of ORIGINAL into *NEW1 (if NEW1 is non-null) and
121    *NEW2 (if NEW2 is non-null), then destroys ORIGINAL. */
122 void
123 casereader_split (struct casereader *original,
124                   struct casereader **new1, struct casereader **new2)
125 {
126   if (new1 != NULL && new2 != NULL)
127     {
128       *new1 = casereader_rename (original);
129       *new2 = casereader_clone (*new1);
130     }
131   else if (new1 != NULL)
132     *new1 = casereader_rename (original);
133   else if (new2 != NULL)
134     *new2 = casereader_rename (original);
135   else
136     casereader_destroy (original);
137 }
138
139 /* Returns a copy of READER, which is itself destroyed.
140    Useful for taking over ownership of a casereader, to enforce
141    preventing the original owner from accessing the casereader
142    again. */
143 struct casereader *
144 casereader_rename (struct casereader *reader)
145 {
146   struct casereader *new = xmemdup (reader, sizeof *reader);
147   free (reader);
148   return new;
149 }
150
151 /* Exchanges the casereaders referred to by A and B. */
152 void
153 casereader_swap (struct casereader *a, struct casereader *b)
154 {
155   if (a != b)
156     {
157       struct casereader tmp = *a;
158       *a = *b;
159       *b = tmp;
160     }
161 }
162
163 /* Creates a new case in C and reads the (IDX + 1)'th case from
164    READER into it.  The caller owns C and must destroy C when its
165    data is no longer needed.  Return true if successful, false
166    when cases have been exhausted or upon detection of an I/O
167    error.  In the latter case, C is set to the null case. */
168 bool
169 casereader_peek (struct casereader *reader, casenumber idx, struct ccase *c)
170 {
171   if (idx < reader->case_cnt)
172     {
173       if (reader->class->peek == NULL)
174         insert_shim (reader);
175       if (reader->class->peek (reader, reader->aux, idx, c))
176         return true;
177       else if (casereader_error (reader))
178         reader->case_cnt = 0;
179     }
180   if (reader->case_cnt > idx)
181     reader->case_cnt = idx;
182   case_nullify (c);
183   return false;
184 }
185
186 /* Returns true if no cases remain to be read from READER, or if
187    an error has occurred on READER.  (A return value of false
188    does *not* mean that the next call to casereader_peek or
189    casereader_read will return true, because an error can occur
190    in the meantime.) */
191 bool
192 casereader_is_empty (struct casereader *reader)
193 {
194   struct ccase c;
195   if (reader->case_cnt == 0 || !casereader_peek (reader, 0, &c))
196     return true;
197   else
198     {
199       case_destroy (&c);
200       return false;
201     }
202 }
203
204 /* Returns true if an I/O error or another hard error has
205    occurred on READER, a clone of READER, or on some object on
206    which READER's data has a dependency, false otherwise. */
207 bool
208 casereader_error (const struct casereader *reader)
209 {
210   return taint_is_tainted (reader->taint);
211 }
212
213 /* Marks READER as having encountered an error.
214
215    Ordinarily, this function should be called by the
216    implementation of a casereader, not by the casereader's
217    client.  Instead, casereader clients should usually ensure
218    that a casereader's error state is correct by using
219    taint_propagate to propagate to the casereader's taint
220    structure, which may be obtained via casereader_get_taint. */
221 void
222 casereader_force_error (struct casereader *reader)
223 {
224   taint_set_taint (reader->taint);
225 }
226
227 /* Returns READER's associate taint object, for use with
228    taint_propagate and other taint functions. */
229 const struct taint *
230 casereader_get_taint (const struct casereader *reader)
231 {
232   return reader->taint;
233 }
234
235 /* Returns the number of cases that will be read by successive
236    calls to casereader_read for READER, assuming that no errors
237    occur.  Upon an error condition, the case count drops to 0, so
238    that no more cases can be obtained.
239
240    Not all casereaders can predict the number of cases that they
241    will produce without actually reading all of them.  In that
242    case, this function returns CASENUMBER_MAX.  To obtain the
243    actual number of cases in such a casereader, use
244    casereader_count_cases. */
245 casenumber
246 casereader_get_case_cnt (struct casereader *reader)
247 {
248   return reader->case_cnt;
249 }
250
251 /* Returns the number of cases that will be read by successive
252    calls to casereader_read for READER, assuming that no errors
253    occur.  Upon an error condition, the case count drops to 0, so
254    that no more cases can be obtained.
255
256    For a casereader that cannot predict the number of cases it
257    will produce, this function actually reads (and discards) all
258    of the contents of a clone of READER.  Thus, the return value
259    is always correct in the absence of I/O errors. */
260 casenumber
261 casereader_count_cases (struct casereader *reader)
262 {
263   if (reader->case_cnt == CASENUMBER_MAX)
264     {
265       casenumber n_cases = 0;
266       struct ccase c;
267
268       struct casereader *clone = casereader_clone (reader);
269
270       for (; casereader_read (clone, &c); case_destroy (&c))
271         n_cases++;
272
273       casereader_destroy (clone);
274       reader->case_cnt = n_cases;
275     }
276
277   return reader->case_cnt;
278 }
279
280 /* Returns the number of struct values in each case in READER. */
281 size_t
282 casereader_get_value_cnt (struct casereader *reader)
283 {
284   return reader->value_cnt;
285 }
286
287 /* Copies all the cases in READER to WRITER, propagating errors
288    appropriately. */
289 void
290 casereader_transfer (struct casereader *reader, struct casewriter *writer)
291 {
292   struct ccase c;
293
294   taint_propagate (casereader_get_taint (reader),
295                    casewriter_get_taint (writer));
296   while (casereader_read (reader, &c))
297     casewriter_write (writer, &c);
298   casereader_destroy (reader);
299 }
300
301 /* Creates and returns a new casereader.  This function is
302    intended for use by casereader implementations, not by
303    casereader clients.
304
305    This function is most suited for creating a casereader for a
306    data source that is naturally sequential.
307    casereader_create_random may be more appropriate for a data
308    source that supports random access.
309
310    Ordinarily, specify a null pointer for TAINT, in which case
311    the new casereader will have a new, unique taint object.  If
312    the new casereader should have a clone of an existing taint
313    object, specify that object as TAINT.  (This is most commonly
314    useful in an implementation of the "clone" casereader_class
315    function, in which case the cloned casereader should have the
316    same taint object as the original casereader.)
317
318    VALUE_CNT must be the number of struct values per case read
319    from the casereader.
320
321    CASE_CNT is an upper limit on the number of cases that
322    casereader_read will return from the casereader in successive
323    calls.  Ordinarily, this is the actual number of cases in the
324    data source or CASENUMBER_MAX if the number of cases cannot be
325    predicted in advance.
326
327    CLASS and AUX are a set of casereader implementation-specific
328    member functions and auxiliary data to pass to those member
329    functions, respectively. */
330 struct casereader *
331 casereader_create_sequential (const struct taint *taint,
332                               size_t value_cnt, casenumber case_cnt,
333                               const struct casereader_class *class, void *aux)
334 {
335   struct casereader *reader = xmalloc (sizeof *reader);
336   reader->taint = taint != NULL ? taint_clone (taint) : taint_create ();
337   reader->value_cnt = value_cnt;
338   reader->case_cnt = case_cnt;
339   reader->class = class;
340   reader->aux = aux;
341   return reader;
342 }
343
344 /* If READER is a casereader of the given CLASS, returns its
345    associated auxiliary data; otherwise, returns a null pointer.
346
347    This function is intended for use from casereader
348    implementations, not by casereader users.  Even within
349    casereader implementations, its usefulness is quite limited,
350    for at least two reasons.  First, every casereader member
351    function already receives a pointer to the casereader's
352    auxiliary data.  Second, a casereader's class can change
353    (through a call to casereader_swap) and this is in practice
354    quite common (e.g. any call to casereader_clone on a
355    casereader that does not directly support clone will cause the
356    casereader to be replaced by a shim caseader). */
357 void *
358 casereader_dynamic_cast (struct casereader *reader,
359                          const struct casereader_class *class)
360 {
361   return reader->class == class ? reader->aux : NULL;
362 }
363 \f
364 /* Random-access casereader implementation.
365
366    This is a set of wrappers around casereader_create_sequential
367    and struct casereader_class to make it easy to create
368    efficient casereaders for data sources that natively support
369    random access. */
370
371 /* One clone of a random reader. */
372 struct random_reader
373   {
374     struct random_reader_shared *shared; /* Data shared among clones. */
375     struct heap_node heap_node; /* Node in shared data's heap of readers. */
376     casenumber offset;          /* Number of cases already read. */
377   };
378
379 /* Returns the random_reader in which the given heap_node is
380    embedded. */
381 static struct random_reader *
382 random_reader_from_heap_node (const struct heap_node *node)
383 {
384   return heap_data (node, struct random_reader, heap_node);
385 }
386
387 /* Data shared among clones of a random reader. */
388 struct random_reader_shared
389   {
390     struct heap *readers;       /* Heap of struct random_readers. */
391     casenumber min_offset;      /* Smallest offset of any random_reader. */
392     const struct casereader_random_class *class;
393     void *aux;
394   };
395
396 static const struct casereader_class random_reader_casereader_class;
397
398 /* Creates and returns a new random_reader with the given SHARED
399    data and OFFSET.  Inserts the new random reader into the
400    shared heap. */
401 static struct random_reader *
402 make_random_reader (struct random_reader_shared *shared, casenumber offset)
403 {
404   struct random_reader *br = xmalloc (sizeof *br);
405   br->offset = offset;
406   br->shared = shared;
407   heap_insert (shared->readers, &br->heap_node);
408   return br;
409 }
410
411 /* Compares random_readers A and B by offset and returns a
412    strcmp()-like result. */
413 static int
414 compare_random_readers_by_offset (const struct heap_node *a_,
415                                   const struct heap_node *b_,
416                                   const void *aux UNUSED)
417 {
418   const struct random_reader *a = random_reader_from_heap_node (a_);
419   const struct random_reader *b = random_reader_from_heap_node (b_);
420   return a->offset < b->offset ? -1 : a->offset > b->offset;
421 }
422
423 /* Creates and returns a new casereader.  This function is
424    intended for use by casereader implementations, not by
425    casereader clients.
426
427    This function is most suited for creating a casereader for a
428    data source that supports random access.
429    casereader_create_sequential is more appropriate for a data
430    source that is naturally sequential.
431
432    VALUE_CNT must be the number of struct values per case read
433    from the casereader.
434
435    CASE_CNT is an upper limit on the number of cases that
436    casereader_read will return from the casereader in successive
437    calls.  Ordinarily, this is the actual number of cases in the
438    data source or CASENUMBER_MAX if the number of cases cannot be
439    predicted in advance.
440
441    CLASS and AUX are a set of casereader implementation-specific
442    member functions and auxiliary data to pass to those member
443    functions, respectively. */
444 struct casereader *
445 casereader_create_random (size_t value_cnt, casenumber case_cnt,
446                           const struct casereader_random_class *class,
447                           void *aux)
448 {
449   struct random_reader_shared *shared = xmalloc (sizeof *shared);
450   shared->readers = heap_create (compare_random_readers_by_offset, NULL);
451   shared->class = class;
452   shared->aux = aux;
453   shared->min_offset = 0;
454   return casereader_create_sequential (NULL, value_cnt, case_cnt,
455                                        &random_reader_casereader_class,
456                                        make_random_reader (shared, 0));
457 }
458
459 /* Reassesses the min_offset in SHARED based on the minimum
460    offset in the heap.   */
461 static void
462 advance_random_reader (struct casereader *reader,
463                        struct random_reader_shared *shared)
464 {
465   casenumber old, new;
466
467   old = shared->min_offset;
468   new = random_reader_from_heap_node (heap_minimum (shared->readers))->offset;
469   assert (new >= old);
470   if (new > old)
471     {
472       shared->min_offset = new;
473       shared->class->advance (reader, shared->aux, new - old);
474     }
475 }
476
477 /* struct casereader_class "read" function for random reader. */
478 static bool
479 random_reader_read (struct casereader *reader, void *br_, struct ccase *c)
480 {
481   struct random_reader *br = br_;
482   struct random_reader_shared *shared = br->shared;
483
484   if (shared->class->read (reader, shared->aux,
485                            br->offset - shared->min_offset, c))
486     {
487       br->offset++;
488       heap_changed (shared->readers, &br->heap_node);
489       advance_random_reader (reader, shared);
490       return true;
491     }
492   else
493     return false;
494 }
495
496 /* struct casereader_class "destroy" function for random
497    reader. */
498 static void
499 random_reader_destroy (struct casereader *reader, void *br_)
500 {
501   struct random_reader *br = br_;
502   struct random_reader_shared *shared = br->shared;
503
504   heap_delete (shared->readers, &br->heap_node);
505   if (heap_is_empty (shared->readers))
506     {
507       heap_destroy (shared->readers);
508       shared->class->destroy (reader, shared->aux);
509       free (shared);
510     }
511   else
512     advance_random_reader (reader, shared);
513
514   free (br);
515 }
516
517 /* struct casereader_class "clone" function for random reader. */
518 static struct casereader *
519 random_reader_clone (struct casereader *reader, void *br_)
520 {
521   struct random_reader *br = br_;
522   struct random_reader_shared *shared = br->shared;
523   return casereader_create_sequential (casereader_get_taint (reader),
524                                        casereader_get_value_cnt (reader),
525                                        casereader_get_case_cnt (reader),
526                                        &random_reader_casereader_class,
527                                        make_random_reader (shared,
528                                                            br->offset));
529 }
530
531 /* struct casereader_class "peek" function for random reader. */
532 static bool
533 random_reader_peek (struct casereader *reader, void *br_,
534                     casenumber idx, struct ccase *c)
535 {
536   struct random_reader *br = br_;
537   struct random_reader_shared *shared = br->shared;
538
539   return shared->class->read (reader, shared->aux,
540                               br->offset - shared->min_offset + idx, c);
541 }
542
543 /* Casereader class for random reader. */
544 static const struct casereader_class random_reader_casereader_class =
545   {
546     random_reader_read,
547     random_reader_destroy,
548     random_reader_clone,
549     random_reader_peek,
550   };
551 \f
552 /* Buffering shim for implementing clone and peek operations.
553
554    The "clone" and "peek" operations aren't implemented by all
555    types of casereaders, but we have to expose a uniform
556    interface anyhow.  We do this by interposing a buffering
557    casereader on top of the existing casereader on the first call
558    to "clone" or "peek".  The buffering casereader maintains a
559    window of cases that spans the positions of the original
560    casereader and all of its clones (the "clone set"), from the
561    position of the casereader that has read the fewest cases to
562    the position of the casereader that has read the most.
563
564    Thus, if all of the casereaders in the clone set are at
565    approximately the same position, only a few cases are buffered
566    and there is little inefficiency.  If, on the other hand, one
567    casereader is not used to read any cases at all, but another
568    one is used to read all of the cases, the entire contents of
569    the casereader is copied into the buffer.  This still might
570    not be so inefficient, given that case data in memory is
571    shared across multiple identical copies, but in the worst case
572    the window implementation will write cases to disk instead of
573    maintaining them in-memory. */
574
575 /* A buffering shim for a non-clonable or non-peekable
576    casereader. */
577 struct shim
578   {
579     struct casewindow *window;          /* Window of buffered cases. */
580     struct casereader *subreader;       /* Subordinate casereader. */
581   };
582
583 static const struct casereader_random_class shim_class;
584
585 /* Interposes a buffering shim atop READER. */
586 static void
587 insert_shim (struct casereader *reader)
588 {
589   size_t value_cnt = casereader_get_value_cnt (reader);
590   casenumber case_cnt = casereader_get_case_cnt (reader);
591   struct shim *b = xmalloc (sizeof *b);
592   b->window = casewindow_create (value_cnt, settings_get_workspace_cases (value_cnt));
593   b->subreader = casereader_create_random (value_cnt, case_cnt,
594                                            &shim_class, b);
595   casereader_swap (reader, b->subreader);
596   taint_propagate (casewindow_get_taint (b->window),
597                    casereader_get_taint (reader));
598   taint_propagate (casereader_get_taint (b->subreader),
599                    casereader_get_taint (reader));
600 }
601
602 /* Ensures that B's window contains at least CASE_CNT cases.
603    Return true if successful, false upon reaching the end of B's
604    subreader or an I/O error. */
605 static bool
606 prime_buffer (struct shim *b, casenumber case_cnt)
607 {
608   while (casewindow_get_case_cnt (b->window) < case_cnt)
609     {
610       struct ccase tmp;
611       if (!casereader_read (b->subreader, &tmp))
612         return false;
613       casewindow_push_head (b->window, &tmp);
614     }
615   return true;
616 }
617
618 /* Reads the case at the given 0-based OFFSET from the front of
619    the window into C.  Returns true if successful, false if
620    OFFSET is beyond the end of file or upon I/O error. */
621 static bool
622 shim_read (struct casereader *reader UNUSED, void *b_,
623            casenumber offset, struct ccase *c)
624 {
625   struct shim *b = b_;
626   return (prime_buffer (b, offset + 1)
627           && casewindow_get_case (b->window, offset, c));
628 }
629
630 /* Destroys B. */
631 static void
632 shim_destroy (struct casereader *reader UNUSED, void *b_)
633 {
634   struct shim *b = b_;
635   casewindow_destroy (b->window);
636   casereader_destroy (b->subreader);
637   free (b);
638 }
639
640 /* Discards CNT cases from the front of B's window. */
641 static void
642 shim_advance (struct casereader *reader UNUSED, void *b_, casenumber case_cnt)
643 {
644   struct shim *b = b_;
645   casewindow_pop_tail (b->window, case_cnt);
646 }
647
648 /* Class for the buffered reader. */
649 static const struct casereader_random_class shim_class =
650   {
651     shim_read,
652     shim_destroy,
653     shim_advance,
654   };