Rewrite expression code.
[pspp-builds.git] / src / casefile.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 2004 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include "casefile.h"
22 #include <assert.h>
23 #include <errno.h>
24 #include <fcntl.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <unistd.h>
29 #include "alloc.h"
30 #include "case.h"
31 #include "error.h"
32 #include "misc.h"
33 #include "mkfile.h"
34 #include "settings.h"
35 #include "var.h"
36
37 #ifdef HAVE_VALGRIND_VALGRIND_H
38 #include <valgrind/valgrind.h>
39 #endif
40
41 #define IO_BUF_SIZE (8192 / sizeof (union value))
42
43 /* A casefile is a sequentially accessible array of immutable
44    cases.  It may be stored in memory or on disk as workspace
45    allows.  Cases may be appended to the end of the file.  Cases
46    may be read sequentially starting from the beginning of the
47    file.  Once any cases have been read, no more cases may be
48    appended.  The entire file is discarded at once. */
49
50 /* In-memory cases are arranged in an array of arrays.  The top
51    level is variable size and the size of each bottom level array
52    is fixed at the number of cases defined here.  */
53 #define CASES_PER_BLOCK 128             
54
55 /* A casefile. */
56 struct casefile 
57   {
58     /* Basic data. */
59     struct casefile *next, *prev;       /* Next, prev in global list. */
60     size_t value_cnt;                   /* Case size in `union value's. */
61     size_t case_acct_size;              /* Case size for accounting. */
62     unsigned long case_cnt;             /* Number of cases stored. */
63     enum { MEMORY, DISK } storage;      /* Where cases are stored. */
64     enum { WRITE, READ } mode;          /* Is writing or reading allowed? */
65     struct casereader *readers;         /* List of our readers. */
66     int being_destroyed;                /* Does a destructive reader exist? */
67
68     /* Memory storage. */
69     struct ccase **cases;               /* Pointer to array of cases. */
70
71     /* Disk storage. */
72     int fd;                             /* File descriptor, -1 if none. */
73     char *filename;                     /* Filename. */
74     union value *buffer;                /* I/O buffer, NULL if none. */
75     size_t buffer_used;                 /* Number of values used in buffer. */
76     size_t buffer_size;                 /* Buffer size in values. */
77   };
78
79 /* For reading out the cases in a casefile. */
80 struct casereader 
81   {
82     struct casereader *next, *prev;     /* Next, prev in casefile's list. */
83     struct casefile *cf;                /* Our casefile. */
84     unsigned long case_idx;             /* Case number of current case. */
85     int destructive;                    /* Is this a destructive reader? */
86
87     /* Disk storage. */
88     int fd;                             /* File descriptor. */
89     union value *buffer;                /* I/O buffer. */
90     size_t buffer_pos;                  /* Offset of buffer position. */
91     struct ccase c;                     /* Current case. */
92   };
93
94 /* Return the case number of the current case */
95 unsigned long
96 casereader_cnum(const struct casereader *r)
97 {
98   return r->case_idx;
99 }
100
101 /* Doubly linked list of all casefiles. */
102 static struct casefile *casefiles;
103
104 /* Number of bytes of case allocated in in-memory casefiles. */
105 static size_t case_bytes;
106
107 static void register_atexit (void);
108 static void exit_handler (void);
109
110 static void reader_open_file (struct casereader *reader);
111 static void write_case_to_disk (struct casefile *cf, const struct ccase *c);
112 static void flush_buffer (struct casefile *cf);
113 static void fill_buffer (struct casereader *reader);
114
115 static int safe_open (const char *filename, int flags);
116 static int safe_close (int fd);
117 static int full_read (int fd, void *buffer, size_t size);
118 static int full_write (int fd, const void *buffer, size_t size);
119
120 /* Creates and returns a casefile to store cases of VALUE_CNT
121    `union value's each. */
122 struct casefile *
123 casefile_create (size_t value_cnt) 
124 {
125   struct casefile *cf = xmalloc (sizeof *cf);
126   cf->next = casefiles;
127   cf->prev = NULL;
128   if (cf->next != NULL)
129     cf->next->prev = cf;
130   casefiles = cf;
131   cf->value_cnt = value_cnt;
132   cf->case_acct_size = (cf->value_cnt + 4) * sizeof *cf->buffer;
133   cf->case_cnt = 0;
134   cf->storage = MEMORY;
135   cf->mode = WRITE;
136   cf->readers = NULL;
137   cf->being_destroyed = 0;
138   cf->cases = NULL;
139   cf->fd = -1;
140   cf->filename = NULL;
141   cf->buffer = NULL;
142   cf->buffer_size = ROUND_UP (cf->value_cnt, IO_BUF_SIZE);
143   if (cf->value_cnt > 0 && cf->buffer_size % cf->value_cnt > 64)
144     cf->buffer_size = cf->value_cnt;
145   cf->buffer_used = 0;
146   register_atexit ();
147   return cf;
148 }
149
150 /* Destroys casefile CF. */
151 void
152 casefile_destroy (struct casefile *cf) 
153 {
154   if (cf != NULL) 
155     {
156       if (cf->next != NULL)
157         cf->next->prev = cf->prev;
158       if (cf->prev != NULL)
159         cf->prev->next = cf->next;
160       if (casefiles == cf)
161         casefiles = cf->next;
162
163       while (cf->readers != NULL) 
164         casereader_destroy (cf->readers);
165
166       if (cf->cases != NULL) 
167         {
168           size_t idx, block_cnt;
169
170           case_bytes -= cf->case_cnt * cf->case_acct_size;
171           for (idx = 0; idx < cf->case_cnt; idx++)
172             {
173               size_t block_idx = idx / CASES_PER_BLOCK;
174               size_t case_idx = idx % CASES_PER_BLOCK;
175               struct ccase *c = &cf->cases[block_idx][case_idx];
176               case_destroy (c);
177             }
178
179           block_cnt = DIV_RND_UP (cf->case_cnt, CASES_PER_BLOCK);
180           for (idx = 0; idx < block_cnt; idx++)
181             free (cf->cases[idx]);
182
183           free (cf->cases);
184         }
185
186       if (cf->fd != -1)
187         safe_close (cf->fd);
188           
189       if (cf->filename != NULL && remove (cf->filename) == -1) 
190         msg (ME, _("%s: Removing temporary file: %s."),
191              cf->filename, strerror (errno));
192       free (cf->filename);
193
194       free (cf->buffer);
195
196       free (cf);
197     }
198 }
199
200 /* Returns nonzero only if casefile CF is stored in memory (instead of on
201    disk). */
202 int
203 casefile_in_core (const struct casefile *cf) 
204 {
205   assert (cf != NULL);
206
207   return cf->storage == MEMORY;
208 }
209
210 /* Puts a casefile to "sleep", that is, minimizes the resources
211    needed for it by closing its file descriptor and freeing its
212    buffer.  This is useful if we need so many casefiles that we
213    might not have enough memory and file descriptors to go
214    around.
215
216    For simplicity, this implementation always converts the
217    casefile to reader mode.  If this turns out to be a problem,
218    with a little extra work we could also support sleeping
219    writers. */
220 void
221 casefile_sleep (const struct casefile *cf_) 
222 {
223   struct casefile *cf = (struct casefile *) cf_;
224   assert (cf != NULL);
225
226   casefile_mode_reader (cf);
227   casefile_to_disk (cf);
228   flush_buffer (cf);
229
230   if (cf->fd != -1) 
231     {
232       safe_close (cf->fd);
233       cf->fd = -1;
234     }
235   if (cf->buffer != NULL) 
236     {
237       free (cf->buffer);
238       cf->buffer = NULL;
239     }
240 }
241
242 /* Returns the number of `union value's in a case for CF. */
243 size_t
244 casefile_get_value_cnt (const struct casefile *cf) 
245 {
246   assert (cf != NULL);
247
248   return cf->value_cnt;
249 }
250
251 /* Returns the number of cases in casefile CF. */
252 unsigned long
253 casefile_get_case_cnt (const struct casefile *cf) 
254 {
255   assert (cf != NULL);
256
257   return cf->case_cnt;
258 }
259
260 /* Appends a copy of case C to casefile CF.  Not valid after any
261    reader for CF has been created. */
262 void
263 casefile_append (struct casefile *cf, const struct ccase *c) 
264 {
265   assert (cf != NULL);
266   assert (c != NULL);
267   assert (cf->mode == WRITE);
268
269   /* Try memory first. */
270   if (cf->storage == MEMORY) 
271     {
272       if (case_bytes < get_max_workspace ())
273         {
274           size_t block_idx = cf->case_cnt / CASES_PER_BLOCK;
275           size_t case_idx = cf->case_cnt % CASES_PER_BLOCK;
276           struct ccase new_case;
277
278           case_bytes += cf->case_acct_size;
279           case_clone (&new_case, c);
280           if (case_idx == 0) 
281             {
282               if ((block_idx & (block_idx - 1)) == 0) 
283                 {
284                   size_t block_cap = block_idx == 0 ? 1 : block_idx * 2;
285                   cf->cases = xrealloc (cf->cases,
286                                         sizeof *cf->cases * block_cap);
287                 }
288
289               cf->cases[block_idx] = xmalloc (sizeof **cf->cases
290                                               * CASES_PER_BLOCK);
291             }
292
293           case_move (&cf->cases[block_idx][case_idx], &new_case);
294         }
295       else
296         {
297           casefile_to_disk (cf);
298           assert (cf->storage == DISK);
299           write_case_to_disk (cf, c);
300         }
301     }
302   else
303     write_case_to_disk (cf, c);
304
305   cf->case_cnt++;
306 }
307
308 /* Appends case C to casefile CF, which takes over ownership of
309    C.  Not valid after any reader for CF has been created. */
310 void
311 casefile_append_xfer (struct casefile *cf, struct ccase *c) 
312 {
313   casefile_append (cf, c);
314   case_destroy (c);
315 }
316
317 /* Writes case C to casefile CF's disk buffer, first flushing the buffer to
318    disk if it would otherwise overflow. */
319 static void
320 write_case_to_disk (struct casefile *cf, const struct ccase *c) 
321 {
322   case_to_values (c, cf->buffer + cf->buffer_used, cf->value_cnt);
323   cf->buffer_used += cf->value_cnt;
324   if (cf->buffer_used + cf->value_cnt > cf->buffer_size)
325     flush_buffer (cf);
326 }
327
328 /* If any bytes in CF's output buffer are used, flush them to
329    disk. */
330 static void
331 flush_buffer (struct casefile *cf) 
332 {
333   if (cf->buffer_used > 0) 
334     {
335       if (!full_write (cf->fd, cf->buffer,
336                        cf->buffer_size * sizeof *cf->buffer)) 
337         msg (FE, _("Error writing temporary file: %s."), strerror (errno));
338
339       cf->buffer_used = 0;
340     } 
341 }
342
343
344 /* If CF is currently stored in memory, writes it to disk.  Readers, if any,
345    retain their current positions. */
346 void
347 casefile_to_disk (const struct casefile *cf_) 
348 {
349   struct casefile *cf = (struct casefile *) cf_;
350   struct casereader *reader;
351   
352   assert (cf != NULL);
353
354   if (cf->storage == MEMORY)
355     {
356       size_t idx, block_cnt;
357       
358       assert (cf->filename == NULL);
359       assert (cf->fd == -1);
360       assert (cf->buffer_used == 0);
361
362       cf->storage = DISK;
363       if (!make_temp_file (&cf->fd, &cf->filename))
364         err_failure ();
365       cf->buffer = xmalloc (cf->buffer_size * sizeof *cf->buffer);
366       memset (cf->buffer, 0, cf->buffer_size * sizeof *cf->buffer);
367
368       case_bytes -= cf->case_cnt * cf->case_acct_size;
369       for (idx = 0; idx < cf->case_cnt; idx++)
370         {
371           size_t block_idx = idx / CASES_PER_BLOCK;
372           size_t case_idx = idx % CASES_PER_BLOCK;
373           struct ccase *c = &cf->cases[block_idx][case_idx];
374           write_case_to_disk (cf, c);
375           case_destroy (c);
376         }
377
378       block_cnt = DIV_RND_UP (cf->case_cnt, CASES_PER_BLOCK);
379       for (idx = 0; idx < block_cnt; idx++)
380         free (cf->cases[idx]);
381
382       free (cf->cases);
383       cf->cases = NULL;
384
385       if (cf->mode == READ)
386         flush_buffer (cf);
387
388       for (reader = cf->readers; reader != NULL; reader = reader->next)
389         reader_open_file (reader);
390     }
391 }
392
393 /* Changes CF to reader mode, ensuring that no more cases may be
394    added.  Creating a casereader for CF has the same effect. */
395 void
396 casefile_mode_reader (struct casefile *cf) 
397 {
398   assert (cf != NULL);
399   cf->mode = READ;
400 }
401
402 /* Creates and returns a casereader for CF.  A casereader can be used to
403    sequentially read the cases in a casefile. */
404 struct casereader *
405 casefile_get_reader (const struct casefile *cf_) 
406 {
407   struct casefile *cf = (struct casefile *) cf_;
408   struct casereader *reader;
409
410   assert (cf != NULL);
411   assert (!cf->being_destroyed);
412
413   /* Flush the buffer to disk if it's not empty. */
414   if (cf->mode == WRITE && cf->storage == DISK)
415     flush_buffer (cf);
416   
417   cf->mode = READ;
418
419   reader = xmalloc (sizeof *reader);
420   reader->next = cf->readers;
421   if (cf->readers != NULL)
422     reader->next->prev = reader;
423   cf->readers = reader;
424   reader->prev = NULL;
425   reader->cf = cf;
426   reader->case_idx = 0;
427   reader->destructive = 0;
428   reader->fd = -1;
429   reader->buffer = NULL;
430   reader->buffer_pos = 0;
431   case_nullify (&reader->c);
432
433   if (reader->cf->storage == DISK) 
434     reader_open_file (reader);
435
436   return reader;
437 }
438
439 /* Creates and returns a destructive casereader for CF.  Like a
440    normal casereader, a destructive casereader sequentially reads
441    the cases in a casefile.  Unlike a normal casereader, a
442    destructive reader cannot operate concurrently with any other
443    reader.  (This restriction could be relaxed in a few ways, but
444    it is so far unnecessary for other code.) */
445 struct casereader *
446 casefile_get_destructive_reader (struct casefile *cf) 
447 {
448   struct casereader *reader;
449   
450   assert (cf->readers == NULL);
451   reader = casefile_get_reader (cf);
452   reader->destructive = 1;
453   cf->being_destroyed = 1;
454   return reader;
455 }
456
457 /* Opens a disk file for READER and seeks to the current position as indicated
458    by case_idx.  Normally the current position is the beginning of the file,
459    but casefile_to_disk may cause the file to be opened at a different
460    position. */
461 static void
462 reader_open_file (struct casereader *reader) 
463 {
464   struct casefile *cf = reader->cf;
465   off_t file_ofs;
466
467   if (reader->case_idx >= cf->case_cnt)
468     return;
469
470   if (cf->fd != -1) 
471     {
472       reader->fd = cf->fd;
473       cf->fd = -1;
474     }
475   else 
476     {
477       reader->fd = safe_open (cf->filename, O_RDONLY);
478       if (reader->fd < 0)
479         msg (FE, _("%s: Opening temporary file: %s."),
480              cf->filename, strerror (errno));
481     }
482
483   if (cf->buffer != NULL) 
484     {
485       reader->buffer = cf->buffer;
486       cf->buffer = NULL; 
487     }
488   else 
489     {
490       reader->buffer = xmalloc (cf->buffer_size * sizeof *cf->buffer);
491       memset (reader->buffer, 0, cf->buffer_size * sizeof *cf->buffer); 
492     }
493
494   if (cf->value_cnt != 0) 
495     {
496       size_t buffer_case_cnt = cf->buffer_size / cf->value_cnt;
497       file_ofs = ((off_t) reader->case_idx / buffer_case_cnt
498                   * cf->buffer_size * sizeof *cf->buffer);
499       reader->buffer_pos = (reader->case_idx % buffer_case_cnt
500                             * cf->value_cnt);
501     }
502   else 
503     file_ofs = 0;
504   if (lseek (reader->fd, file_ofs, SEEK_SET) != file_ofs)
505     msg (FE, _("%s: Seeking temporary file: %s."),
506          cf->filename, strerror (errno));
507
508   if (cf->case_cnt > 0 && cf->value_cnt > 0)
509     fill_buffer (reader);
510
511   case_create (&reader->c, cf->value_cnt);
512 }
513
514 /* Fills READER's buffer by reading a block from disk. */
515 static void
516 fill_buffer (struct casereader *reader)
517 {
518   int retval = full_read (reader->fd, reader->buffer,
519                           reader->cf->buffer_size * sizeof *reader->buffer);
520   if (retval < 0)
521     msg (FE, _("%s: Reading temporary file: %s."),
522          reader->cf->filename, strerror (errno));
523   else if (retval != reader->cf->buffer_size * sizeof *reader->buffer)
524     msg (FE, _("%s: Temporary file ended unexpectedly."),
525          reader->cf->filename); 
526 }
527
528 /* Returns the casefile that READER reads. */
529 const struct casefile *
530 casereader_get_casefile (const struct casereader *reader) 
531 {
532   assert (reader != NULL);
533   
534   return reader->cf;
535 }
536
537 /* Reads a copy of the next case from READER into C.
538    Caller is responsible for destroying C. */
539 int
540 casereader_read (struct casereader *reader, struct ccase *c) 
541 {
542   assert (reader != NULL);
543   
544   if (reader->case_idx >= reader->cf->case_cnt) 
545     return 0;
546
547   if (reader->cf->storage == MEMORY) 
548     {
549       size_t block_idx = reader->case_idx / CASES_PER_BLOCK;
550       size_t case_idx = reader->case_idx % CASES_PER_BLOCK;
551
552       case_clone (c, &reader->cf->cases[block_idx][case_idx]);
553       reader->case_idx++;
554       return 1;
555     }
556   else 
557     {
558       if (reader->buffer_pos + reader->cf->value_cnt > reader->cf->buffer_size)
559         {
560           fill_buffer (reader);
561           reader->buffer_pos = 0;
562         }
563
564       case_from_values (&reader->c, reader->buffer + reader->buffer_pos,
565                         reader->cf->value_cnt);
566       reader->buffer_pos += reader->cf->value_cnt;
567       reader->case_idx++;
568
569       case_clone (c, &reader->c);
570       return 1;
571     }
572 }
573
574 /* Reads the next case from READER into C and transfers ownership
575    to the caller.  Caller is responsible for destroying C. */
576 int
577 casereader_read_xfer (struct casereader *reader, struct ccase *c)
578 {
579   assert (reader != NULL);
580
581   if (reader->destructive == 0
582       || reader->case_idx >= reader->cf->case_cnt
583       || reader->cf->storage == DISK) 
584     return casereader_read (reader, c);
585   else 
586     {
587       size_t block_idx = reader->case_idx / CASES_PER_BLOCK;
588       size_t case_idx = reader->case_idx % CASES_PER_BLOCK;
589       struct ccase *read_case = &reader->cf->cases[block_idx][case_idx];
590
591       case_move (c, read_case);
592       reader->case_idx++;
593       return 1;
594     }
595 }
596
597 /* Destroys READER. */
598 void
599 casereader_destroy (struct casereader *reader)
600 {
601   assert (reader != NULL);
602
603   if (reader->next != NULL)
604     reader->next->prev = reader->prev;
605   if (reader->prev != NULL)
606     reader->prev->next = reader->next;
607   if (reader->cf->readers == reader)
608     reader->cf->readers = reader->next;
609
610   if (reader->cf->buffer == NULL)
611     reader->cf->buffer = reader->buffer;
612   else
613     free (reader->buffer);
614
615   if (reader->fd != -1) 
616     {
617       if (reader->cf->fd == -1)
618         reader->cf->fd = reader->fd;
619       else
620         safe_close (reader->fd);
621     }
622   
623   case_destroy (&reader->c);
624
625   free (reader);
626 }
627
628 /* Calls open(), passing FILENAME and FLAGS, repeating as necessary
629    to deal with interrupted calls. */
630 static int
631 safe_open (const char *filename, int flags) 
632 {
633   int fd;
634
635   do 
636     {
637       fd = open (filename, flags);
638     }
639   while (fd == -1 && errno == EINTR);
640
641   return fd;
642 }
643
644 /* Calls close(), passing FD, repeating as necessary to deal with
645    interrupted calls. */
646 static int safe_close (int fd) 
647 {
648   int retval;
649
650   do 
651     {
652       retval = close (fd);
653     }
654   while (retval == -1 && errno == EINTR);
655
656   return retval;
657 }
658
659 /* Calls read(), passing FD, BUFFER, and SIZE, repeating as
660    necessary to deal with interrupted calls. */
661 static int
662 full_read (int fd, void *buffer_, size_t size) 
663 {
664   char *buffer = buffer_;
665   size_t bytes_read = 0;
666   
667   while (bytes_read < size)
668     {
669       int retval = read (fd, buffer + bytes_read, size - bytes_read);
670       if (retval > 0) 
671         bytes_read += retval; 
672       else if (retval == 0) 
673         return bytes_read;
674       else if (errno != EINTR)
675         return -1;
676     }
677
678   return bytes_read;
679 }
680
681 /* Calls write(), passing FD, BUFFER, and SIZE, repeating as
682    necessary to deal with interrupted calls. */
683 static int
684 full_write (int fd, const void *buffer_, size_t size) 
685 {
686   const char *buffer = buffer_;
687   size_t bytes_written = 0;
688   
689   while (bytes_written < size)
690     {
691       int retval = write (fd, buffer + bytes_written, size - bytes_written);
692       if (retval >= 0) 
693         bytes_written += retval; 
694       else if (errno != EINTR)
695         return -1;
696     }
697
698   return bytes_written;
699 }
700
701
702 /* Registers our exit handler with atexit() if it has not already
703    been registered. */
704 static void
705 register_atexit (void) 
706 {
707   static int registered = 0;
708   if (!registered) 
709     {
710       registered = 1;
711       atexit (exit_handler);
712     }
713 }
714
715
716
717 /* atexit() handler that closes and deletes our temporary
718    files. */
719 static void
720 exit_handler (void) 
721 {
722   while (casefiles != NULL)
723     casefile_destroy (casefiles);
724 }
725 \f
726 #include <gsl/gsl_rng.h>
727 #include <stdarg.h>
728 #include "command.h"
729 #include "lexer.h"
730
731 static void test_casefile (int pattern, size_t value_cnt, size_t case_cnt);
732 static void get_random_case (struct ccase *, size_t value_cnt,
733                              size_t case_idx);
734 static void write_random_case (struct casefile *cf, size_t case_idx);
735 static void read_and_verify_random_case (struct casefile *cf,
736                                          struct casereader *reader,
737                                          size_t case_idx);
738 static void fail_test (const char *message, ...);
739
740 int
741 cmd_debug_casefile (void) 
742 {
743   static const size_t sizes[] =
744     {
745       1, 2, 3, 4, 5, 6, 7, 14, 15, 16, 17, 31, 55, 73,
746       100, 137, 257, 521, 1031, 2053
747     };
748   int size_max;
749   int case_max;
750   int pattern;
751
752   size_max = sizeof sizes / sizeof *sizes;
753   if (lex_match_id ("SMALL")) 
754     {
755       size_max -= 4;
756       case_max = 511; 
757     }
758   else
759     case_max = 4095;
760   if (token != '.')
761     return lex_end_of_command ();
762     
763   for (pattern = 0; pattern < 6; pattern++) 
764     {
765       const size_t *size;
766
767       for (size = sizes; size < sizes + size_max; size++) 
768         {
769           size_t case_cnt;
770
771           for (case_cnt = 0; case_cnt <= case_max;
772                case_cnt = (case_cnt * 2) + 1)
773             test_casefile (pattern, *size, case_cnt);
774         }
775     }
776   printf ("Casefile tests succeeded.\n");
777   return CMD_SUCCESS;
778 }
779
780 static void
781 test_casefile (int pattern, size_t value_cnt, size_t case_cnt) 
782 {
783   struct casefile *cf;
784   struct casereader *r1, *r2;
785   struct ccase c;
786   gsl_rng *rng;
787   size_t i, j;
788
789   rng = gsl_rng_alloc (gsl_rng_mt19937);
790   cf = casefile_create (value_cnt);
791   if (pattern == 5)
792     casefile_to_disk (cf);
793   for (i = 0; i < case_cnt; i++)
794     write_random_case (cf, i);
795   if (pattern == 5)
796     casefile_sleep (cf);
797   r1 = casefile_get_reader (cf);
798   r2 = casefile_get_reader (cf);
799   switch (pattern) 
800     {
801     case 0:
802     case 5:
803       for (i = 0; i < case_cnt; i++) 
804         {
805           read_and_verify_random_case (cf, r1, i);
806           read_and_verify_random_case (cf, r2, i);
807         } 
808       break;
809     case 1:
810       for (i = 0; i < case_cnt; i++)
811         read_and_verify_random_case (cf, r1, i);
812       for (i = 0; i < case_cnt; i++) 
813         read_and_verify_random_case (cf, r2, i);
814       break;
815     case 2:
816     case 3:
817     case 4:
818       for (i = j = 0; i < case_cnt; i++) 
819         {
820           read_and_verify_random_case (cf, r1, i);
821           if (gsl_rng_get (rng) % pattern == 0) 
822             read_and_verify_random_case (cf, r2, j++); 
823           if (i == case_cnt / 2)
824             casefile_to_disk (cf);
825         }
826       for (; j < case_cnt; j++) 
827         read_and_verify_random_case (cf, r2, j);
828       break;
829     }
830   if (casereader_read (r1, &c))
831     fail_test ("Casereader 1 not at end of file.");
832   if (casereader_read (r2, &c))
833     fail_test ("Casereader 2 not at end of file.");
834   if (pattern != 1)
835     casereader_destroy (r1);
836   if (pattern != 2)
837     casereader_destroy (r2);
838   if (pattern > 2) 
839     {
840       r1 = casefile_get_destructive_reader (cf);
841       for (i = 0; i < case_cnt; i++) 
842         {
843           struct ccase read_case, expected_case;
844           
845           get_random_case (&expected_case, value_cnt, i);
846           if (!casereader_read_xfer (r1, &read_case)) 
847             fail_test ("Premature end of casefile.");
848           for (j = 0; j < value_cnt; j++) 
849             {
850               double a = case_num (&read_case, j);
851               double b = case_num (&expected_case, j);
852               if (a != b)
853                 fail_test ("Case %lu fails comparison.", (unsigned long) i); 
854             }
855           case_destroy (&expected_case);
856           case_destroy (&read_case);
857         }
858       casereader_destroy (r1);
859     }
860   casefile_destroy (cf);
861   gsl_rng_free (rng);
862 }
863
864 static void
865 get_random_case (struct ccase *c, size_t value_cnt, size_t case_idx) 
866 {
867   int i;
868   case_create (c, value_cnt);
869   for (i = 0; i < value_cnt; i++)
870     case_data_rw (c, i)->f = case_idx % 257 + i;
871 }
872
873 static void
874 write_random_case (struct casefile *cf, size_t case_idx) 
875 {
876   struct ccase c;
877   get_random_case (&c, casefile_get_value_cnt (cf), case_idx);
878   casefile_append_xfer (cf, &c);
879 }
880
881 static void
882 read_and_verify_random_case (struct casefile *cf,
883                              struct casereader *reader, size_t case_idx) 
884 {
885   struct ccase read_case, expected_case;
886   size_t value_cnt;
887   size_t i;
888   
889   value_cnt = casefile_get_value_cnt (cf);
890   get_random_case (&expected_case, value_cnt, case_idx);
891   if (!casereader_read (reader, &read_case)) 
892     fail_test ("Premature end of casefile.");
893   for (i = 0; i < value_cnt; i++) 
894     {
895       double a = case_num (&read_case, i);
896       double b = case_num (&expected_case, i);
897       if (a != b)
898         fail_test ("Case %lu fails comparison.", (unsigned long) case_idx); 
899     }
900   case_destroy (&read_case);
901   case_destroy (&expected_case);
902 }
903
904 static void
905 fail_test (const char *message, ...) 
906 {
907   va_list args;
908
909   va_start (args, message);
910   vprintf (message, args);
911   putchar ('\n');
912   va_end (args);
913   
914   exit (1);
915 }