689e2b1302cb3ed8b540e94f8ebe3a6b6809a1b1
[pspp-builds.git] / src / casefile.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 2004 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include "casefile.h"
22 #include <assert.h>
23 #include <errno.h>
24 #include <fcntl.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <unistd.h>
29 #include "alloc.h"
30 #include "case.h"
31 #include "error.h"
32 #include "misc.h"
33 #include "mkfile.h"
34 #include "settings.h"
35 #include "var.h"
36
37 #ifdef HAVE_VALGRIND_VALGRIND_H
38 #include <valgrind/valgrind.h>
39 #endif
40
41 #define IO_BUF_SIZE (8192 / sizeof (union value))
42
43 /* A casefile is a sequentially accessible array of immutable
44    cases.  It may be stored in memory or on disk as workspace
45    allows.  Cases may be appended to the end of the file.  Cases
46    may be read sequentially starting from the beginning of the
47    file.  Once any cases have been read, no more cases may be
48    appended.  The entire file is discarded at once. */
49
50 /* In-memory cases are arranged in an array of arrays.  The top
51    level is variable size and the size of each bottom level array
52    is fixed at the number of cases defined here.  */
53 #define CASES_PER_BLOCK 128             
54
55 /* A casefile. */
56 struct casefile 
57   {
58     /* Basic data. */
59     struct casefile *next, *prev;       /* Next, prev in global list. */
60     size_t value_cnt;                   /* Case size in `union value's. */
61     size_t case_acct_size;              /* Case size for accounting. */
62     unsigned long case_cnt;             /* Number of cases stored. */
63     enum { MEMORY, DISK } storage;      /* Where cases are stored. */
64     enum { WRITE, READ } mode;          /* Is writing or reading allowed? */
65     struct casereader *readers;         /* List of our readers. */
66     int being_destroyed;                /* Does a destructive reader exist? */
67
68     /* Memory storage. */
69     struct ccase **cases;               /* Pointer to array of cases. */
70
71     /* Disk storage. */
72     int fd;                             /* File descriptor, -1 if none. */
73     char *filename;                     /* Filename. */
74     union value *buffer;                /* I/O buffer, NULL if none. */
75     size_t buffer_used;                 /* Number of values used in buffer. */
76     size_t buffer_size;                 /* Buffer size in values. */
77   };
78
79 /* For reading out the cases in a casefile. */
80 struct casereader 
81   {
82     struct casereader *next, *prev;     /* Next, prev in casefile's list. */
83     struct casefile *cf;                /* Our casefile. */
84     unsigned long case_idx;             /* Case number of current case. */
85     int destructive;                    /* Is this a destructive reader? */
86
87     /* Disk storage. */
88     int fd;                             /* File descriptor. */
89     union value *buffer;                /* I/O buffer. */
90     size_t buffer_pos;                  /* Offset of buffer position. */
91     struct ccase c;                     /* Current case. */
92   };
93
94 /* Return the case number of the current case */
95 unsigned long
96 casereader_cnum(const struct casereader *r)
97 {
98   return r->case_idx;
99 }
100
101 /* Doubly linked list of all casefiles. */
102 static struct casefile *casefiles;
103
104 /* Number of bytes of case allocated in in-memory casefiles. */
105 static size_t case_bytes;
106
107 static void register_atexit (void);
108 static void exit_handler (void);
109
110 static void reader_open_file (struct casereader *reader);
111 static void write_case_to_disk (struct casefile *cf, const struct ccase *c);
112 static void flush_buffer (struct casefile *cf);
113 static void fill_buffer (struct casereader *reader);
114
115 static int safe_open (const char *filename, int flags);
116 static int safe_close (int fd);
117 static int full_read (int fd, void *buffer, size_t size);
118 static int full_write (int fd, const void *buffer, size_t size);
119
120 /* Creates and returns a casefile to store cases of VALUE_CNT
121    `union value's each. */
122 struct casefile *
123 casefile_create (size_t value_cnt) 
124 {
125   struct casefile *cf = xmalloc (sizeof *cf);
126   cf->next = casefiles;
127   cf->prev = NULL;
128   if (cf->next != NULL)
129     cf->next->prev = cf;
130   casefiles = cf;
131   cf->value_cnt = value_cnt;
132   cf->case_acct_size = (cf->value_cnt + 4) * sizeof *cf->buffer;
133   cf->case_cnt = 0;
134   cf->storage = MEMORY;
135   cf->mode = WRITE;
136   cf->readers = NULL;
137   cf->being_destroyed = 0;
138   cf->cases = NULL;
139   cf->fd = -1;
140   cf->filename = NULL;
141   cf->buffer = NULL;
142   cf->buffer_size = ROUND_UP (cf->value_cnt, IO_BUF_SIZE);
143   if (cf->value_cnt > 0 && cf->buffer_size % cf->value_cnt > 64)
144     cf->buffer_size = cf->value_cnt;
145   cf->buffer_used = 0;
146   register_atexit ();
147   return cf;
148 }
149
150 /* Destroys casefile CF. */
151 void
152 casefile_destroy (struct casefile *cf) 
153 {
154   if (cf != NULL) 
155     {
156       if (cf->next != NULL)
157         cf->next->prev = cf->prev;
158       if (cf->prev != NULL)
159         cf->prev->next = cf->next;
160       if (casefiles == cf)
161         casefiles = cf->next;
162
163       while (cf->readers != NULL) 
164         casereader_destroy (cf->readers);
165
166       if (cf->cases != NULL) 
167         {
168           size_t idx, block_cnt;
169
170           case_bytes -= cf->case_cnt * cf->case_acct_size;
171           for (idx = 0; idx < cf->case_cnt; idx++)
172             {
173               size_t block_idx = idx / CASES_PER_BLOCK;
174               size_t case_idx = idx % CASES_PER_BLOCK;
175               struct ccase *c = &cf->cases[block_idx][case_idx];
176               case_destroy (c);
177             }
178
179           block_cnt = DIV_RND_UP (cf->case_cnt, CASES_PER_BLOCK);
180           for (idx = 0; idx < block_cnt; idx++)
181             free (cf->cases[idx]);
182
183           free (cf->cases);
184         }
185
186       if (cf->fd != -1)
187         safe_close (cf->fd);
188           
189       if (cf->filename != NULL && remove (cf->filename) == -1) 
190         msg (ME, _("%s: Removing temporary file: %s."),
191              cf->filename, strerror (errno));
192       free (cf->filename);
193
194       free (cf->buffer);
195
196       free (cf);
197     }
198 }
199
200 /* Returns nonzero only if casefile CF is stored in memory (instead of on
201    disk). */
202 int
203 casefile_in_core (const struct casefile *cf) 
204 {
205   assert (cf != NULL);
206
207   return cf->storage == MEMORY;
208 }
209
210 /* Puts a casefile to "sleep", that is, minimizes the resources
211    needed for it by closing its file descriptor and freeing its
212    buffer.  This is useful if we need so many casefiles that we
213    might not have enough memory and file descriptors to go
214    around.
215
216    For simplicity, this implementation always converts the
217    casefile to reader mode.  If this turns out to be a problem,
218    with a little extra work we could also support sleeping
219    writers. */
220 void
221 casefile_sleep (const struct casefile *cf_) 
222 {
223   struct casefile *cf = (struct casefile *) cf_;
224   assert (cf != NULL);
225
226   casefile_mode_reader (cf);
227   casefile_to_disk (cf);
228   flush_buffer (cf);
229
230   if (cf->fd != -1) 
231     {
232       safe_close (cf->fd);
233       cf->fd = -1;
234     }
235   if (cf->buffer != NULL) 
236     {
237       free (cf->buffer);
238       cf->buffer = NULL;
239     }
240 }
241
242 /* Returns the number of `union value's in a case for CF. */
243 size_t
244 casefile_get_value_cnt (const struct casefile *cf) 
245 {
246   assert (cf != NULL);
247
248   return cf->value_cnt;
249 }
250
251 /* Returns the number of cases in casefile CF. */
252 unsigned long
253 casefile_get_case_cnt (const struct casefile *cf) 
254 {
255   assert (cf != NULL);
256
257   return cf->case_cnt;
258 }
259
260 /* Appends a copy of case C to casefile CF.  Not valid after any
261    reader for CF has been created. */
262 void
263 casefile_append (struct casefile *cf, const struct ccase *c) 
264 {
265   assert (cf != NULL);
266   assert (c != NULL);
267   assert (cf->mode == WRITE);
268
269   /* Try memory first. */
270   if (cf->storage == MEMORY) 
271     {
272       if (case_bytes < get_max_workspace ())
273         {
274           size_t block_idx = cf->case_cnt / CASES_PER_BLOCK;
275           size_t case_idx = cf->case_cnt % CASES_PER_BLOCK;
276           struct ccase new_case;
277
278           case_bytes += cf->case_acct_size;
279           case_clone (&new_case, c);
280           if (case_idx == 0) 
281             {
282               if ((block_idx & (block_idx - 1)) == 0) 
283                 {
284                   size_t block_cap = block_idx == 0 ? 1 : block_idx * 2;
285                   cf->cases = xrealloc (cf->cases,
286                                         sizeof *cf->cases * block_cap);
287                 }
288
289               cf->cases[block_idx] = xmalloc (sizeof **cf->cases
290                                               * CASES_PER_BLOCK);
291             }
292
293           case_move (&cf->cases[block_idx][case_idx], &new_case);
294         }
295       else
296         {
297           casefile_to_disk (cf);
298           assert (cf->storage == DISK);
299           write_case_to_disk (cf, c);
300         }
301     }
302   else
303     write_case_to_disk (cf, c);
304
305   cf->case_cnt++;
306 }
307
308 /* Appends case C to casefile CF, which takes over ownership of
309    C.  Not valid after any reader for CF has been created. */
310 void
311 casefile_append_xfer (struct casefile *cf, struct ccase *c) 
312 {
313   casefile_append (cf, c);
314   case_destroy (c);
315 }
316
317 /* Writes case C to casefile CF's disk buffer, first flushing the buffer to
318    disk if it would otherwise overflow. */
319 static void
320 write_case_to_disk (struct casefile *cf, const struct ccase *c) 
321 {
322   case_to_values (c, cf->buffer + cf->buffer_used, cf->value_cnt);
323   cf->buffer_used += cf->value_cnt;
324   if (cf->buffer_used + cf->value_cnt > cf->buffer_size)
325     flush_buffer (cf);
326 }
327
328 /* If any bytes in CF's output buffer are used, flush them to
329    disk. */
330 static void
331 flush_buffer (struct casefile *cf) 
332 {
333   if (cf->buffer_used > 0) 
334     {
335       if (!full_write (cf->fd, cf->buffer,
336                        cf->buffer_size * sizeof *cf->buffer)) 
337         msg (FE, _("Error writing temporary file: %s."), strerror (errno));
338
339       cf->buffer_used = 0;
340     } 
341 }
342
343
344 /* If CF is currently stored in memory, writes it to disk.  Readers, if any,
345    retain their current positions. */
346 void
347 casefile_to_disk (const struct casefile *cf_) 
348 {
349   struct casefile *cf = (struct casefile *) cf_;
350   struct casereader *reader;
351   
352   assert (cf != NULL);
353
354   if (cf->storage == MEMORY)
355     {
356       size_t idx, block_cnt;
357       
358       assert (cf->filename == NULL);
359       assert (cf->fd == -1);
360       assert (cf->buffer_used == 0);
361
362       cf->storage = DISK;
363       if (!make_temp_file (&cf->fd, &cf->filename))
364         err_failure ();
365       cf->buffer = xmalloc (cf->buffer_size * sizeof *cf->buffer);
366       memset (cf->buffer, 0, cf->buffer_size * sizeof *cf->buffer);
367
368       case_bytes -= cf->case_cnt * cf->case_acct_size;
369       for (idx = 0; idx < cf->case_cnt; idx++)
370         {
371           size_t block_idx = idx / CASES_PER_BLOCK;
372           size_t case_idx = idx % CASES_PER_BLOCK;
373           struct ccase *c = &cf->cases[block_idx][case_idx];
374           write_case_to_disk (cf, c);
375           case_destroy (c);
376         }
377
378       block_cnt = DIV_RND_UP (cf->case_cnt, CASES_PER_BLOCK);
379       for (idx = 0; idx < block_cnt; idx++)
380         free (cf->cases[idx]);
381
382       free (cf->cases);
383       cf->cases = NULL;
384
385       if (cf->mode == READ)
386         flush_buffer (cf);
387
388       for (reader = cf->readers; reader != NULL; reader = reader->next)
389         reader_open_file (reader);
390     }
391 }
392
393 /* Changes CF to reader mode, ensuring that no more cases may be
394    added.  Creating a casereader for CF has the same effect. */
395 void
396 casefile_mode_reader (struct casefile *cf) 
397 {
398   assert (cf != NULL);
399   cf->mode = READ;
400 }
401
402 /* Creates and returns a casereader for CF.  A casereader can be used to
403    sequentially read the cases in a casefile. */
404 struct casereader *
405 casefile_get_reader (const struct casefile *cf_) 
406 {
407   struct casefile *cf = (struct casefile *) cf_;
408   struct casereader *reader;
409
410   assert (cf != NULL);
411   assert (!cf->being_destroyed);
412
413   /* Flush the buffer to disk if it's not empty. */
414   if (cf->mode == WRITE && cf->storage == DISK)
415     flush_buffer (cf);
416   
417   cf->mode = READ;
418
419   reader = xmalloc (sizeof *reader);
420   reader->next = cf->readers;
421   if (cf->readers != NULL)
422     reader->next->prev = reader;
423   cf->readers = reader;
424   reader->prev = NULL;
425   reader->cf = cf;
426   reader->case_idx = 0;
427   reader->destructive = 0;
428   reader->fd = -1;
429   reader->buffer = NULL;
430   reader->buffer_pos = 0;
431   case_nullify (&reader->c);
432
433   if (reader->cf->storage == DISK) 
434     reader_open_file (reader);
435
436   return reader;
437 }
438
439 /* Creates and returns a destructive casereader for CF.  Like a
440    normal casereader, a destructive casereader sequentially reads
441    the cases in a casefile.  Unlike a normal casereader, a
442    destructive reader cannot operate concurrently with any other
443    reader.  (This restriction could be relaxed in a few ways, but
444    it is so far unnecessary for other code.) */
445 struct casereader *
446 casefile_get_destructive_reader (struct casefile *cf) 
447 {
448   struct casereader *reader;
449   
450   assert (cf->readers == NULL);
451   reader = casefile_get_reader (cf);
452   reader->destructive = 1;
453   cf->being_destroyed = 1;
454   return reader;
455 }
456
457 /* Opens a disk file for READER and seeks to the current position as indicated
458    by case_idx.  Normally the current position is the beginning of the file,
459    but casefile_to_disk may cause the file to be opened at a different
460    position. */
461 static void
462 reader_open_file (struct casereader *reader) 
463 {
464   struct casefile *cf = reader->cf;
465   off_t file_ofs;
466
467   if (reader->case_idx >= cf->case_cnt)
468     return;
469
470   if (cf->fd != -1) 
471     {
472       reader->fd = cf->fd;
473       cf->fd = -1;
474     }
475   else 
476     {
477       reader->fd = safe_open (cf->filename, O_RDONLY);
478       if (reader->fd < 0)
479         msg (FE, _("%s: Opening temporary file: %s."),
480              cf->filename, strerror (errno));
481     }
482
483   if (cf->buffer != NULL) 
484     {
485       reader->buffer = cf->buffer;
486       cf->buffer = NULL; 
487     }
488   else 
489     {
490       reader->buffer = xmalloc (cf->buffer_size * sizeof *cf->buffer);
491       memset (reader->buffer, 0, cf->buffer_size * sizeof *cf->buffer); 
492     }
493
494   if (cf->value_cnt != 0) 
495     {
496       size_t buffer_case_cnt = cf->buffer_size / cf->value_cnt;
497       file_ofs = ((off_t) reader->case_idx / buffer_case_cnt
498                   * cf->buffer_size * sizeof *cf->buffer);
499       reader->buffer_pos = (reader->case_idx % buffer_case_cnt
500                             * cf->value_cnt);
501     }
502   else 
503     file_ofs = 0;
504   if (lseek (reader->fd, file_ofs, SEEK_SET) != file_ofs)
505     msg (FE, _("%s: Seeking temporary file: %s."),
506          cf->filename, strerror (errno));
507
508   if (cf->case_cnt > 0 && cf->value_cnt > 0)
509     fill_buffer (reader);
510
511   case_create (&reader->c, cf->value_cnt);
512 }
513
514 /* Fills READER's buffer by reading a block from disk. */
515 static void
516 fill_buffer (struct casereader *reader)
517 {
518   int retval = full_read (reader->fd, reader->buffer,
519                           reader->cf->buffer_size * sizeof *reader->buffer);
520   if (retval < 0)
521     msg (FE, _("%s: Reading temporary file: %s."),
522          reader->cf->filename, strerror (errno));
523   else if (retval != reader->cf->buffer_size * sizeof *reader->buffer)
524     msg (FE, _("%s: Temporary file ended unexpectedly."),
525          reader->cf->filename); 
526 }
527
528 /* Returns the casefile that READER reads. */
529 const struct casefile *
530 casereader_get_casefile (const struct casereader *reader) 
531 {
532   assert (reader != NULL);
533   
534   return reader->cf;
535 }
536
537 /* Reads a copy of the next case from READER into C.
538    Caller is responsible for destroying C.
539    Returns true if successful, false at end of file. */
540 int
541 casereader_read (struct casereader *reader, struct ccase *c) 
542 {
543   assert (reader != NULL);
544   
545   if (reader->case_idx >= reader->cf->case_cnt) 
546     return 0;
547
548   if (reader->cf->storage == MEMORY) 
549     {
550       size_t block_idx = reader->case_idx / CASES_PER_BLOCK;
551       size_t case_idx = reader->case_idx % CASES_PER_BLOCK;
552
553       case_clone (c, &reader->cf->cases[block_idx][case_idx]);
554       reader->case_idx++;
555       return 1;
556     }
557   else 
558     {
559       if (reader->buffer_pos + reader->cf->value_cnt > reader->cf->buffer_size)
560         {
561           fill_buffer (reader);
562           reader->buffer_pos = 0;
563         }
564
565       case_from_values (&reader->c, reader->buffer + reader->buffer_pos,
566                         reader->cf->value_cnt);
567       reader->buffer_pos += reader->cf->value_cnt;
568       reader->case_idx++;
569
570       case_clone (c, &reader->c);
571       return 1;
572     }
573 }
574
575 /* Reads the next case from READER into C and transfers ownership
576    to the caller.  Caller is responsible for destroying C.
577    Returns true if successful, false at end of file. */
578 int
579 casereader_read_xfer (struct casereader *reader, struct ccase *c)
580 {
581   assert (reader != NULL);
582
583   if (reader->destructive == 0
584       || reader->case_idx >= reader->cf->case_cnt
585       || reader->cf->storage == DISK) 
586     return casereader_read (reader, c);
587   else 
588     {
589       size_t block_idx = reader->case_idx / CASES_PER_BLOCK;
590       size_t case_idx = reader->case_idx % CASES_PER_BLOCK;
591       struct ccase *read_case = &reader->cf->cases[block_idx][case_idx];
592
593       case_move (c, read_case);
594       reader->case_idx++;
595       return 1;
596     }
597 }
598
599 /* Reads the next case from READER into C and transfers ownership
600    to the caller.  Caller is responsible for destroying C.
601    Assert-fails at end of file. */
602 void
603 casereader_read_xfer_assert (struct casereader *reader, struct ccase *c) 
604 {
605   bool success = casereader_read_xfer (reader, c);
606   assert (success);
607 }
608
609 /* Destroys READER. */
610 void
611 casereader_destroy (struct casereader *reader)
612 {
613   assert (reader != NULL);
614
615   if (reader->next != NULL)
616     reader->next->prev = reader->prev;
617   if (reader->prev != NULL)
618     reader->prev->next = reader->next;
619   if (reader->cf->readers == reader)
620     reader->cf->readers = reader->next;
621
622   if (reader->cf->buffer == NULL)
623     reader->cf->buffer = reader->buffer;
624   else
625     free (reader->buffer);
626
627   if (reader->fd != -1) 
628     {
629       if (reader->cf->fd == -1)
630         reader->cf->fd = reader->fd;
631       else
632         safe_close (reader->fd);
633     }
634   
635   case_destroy (&reader->c);
636
637   free (reader);
638 }
639
640 /* Calls open(), passing FILENAME and FLAGS, repeating as necessary
641    to deal with interrupted calls. */
642 static int
643 safe_open (const char *filename, int flags) 
644 {
645   int fd;
646
647   do 
648     {
649       fd = open (filename, flags);
650     }
651   while (fd == -1 && errno == EINTR);
652
653   return fd;
654 }
655
656 /* Calls close(), passing FD, repeating as necessary to deal with
657    interrupted calls. */
658 static int safe_close (int fd) 
659 {
660   int retval;
661
662   do 
663     {
664       retval = close (fd);
665     }
666   while (retval == -1 && errno == EINTR);
667
668   return retval;
669 }
670
671 /* Calls read(), passing FD, BUFFER, and SIZE, repeating as
672    necessary to deal with interrupted calls. */
673 static int
674 full_read (int fd, void *buffer_, size_t size) 
675 {
676   char *buffer = buffer_;
677   size_t bytes_read = 0;
678   
679   while (bytes_read < size)
680     {
681       int retval = read (fd, buffer + bytes_read, size - bytes_read);
682       if (retval > 0) 
683         bytes_read += retval; 
684       else if (retval == 0) 
685         return bytes_read;
686       else if (errno != EINTR)
687         return -1;
688     }
689
690   return bytes_read;
691 }
692
693 /* Calls write(), passing FD, BUFFER, and SIZE, repeating as
694    necessary to deal with interrupted calls. */
695 static int
696 full_write (int fd, const void *buffer_, size_t size) 
697 {
698   const char *buffer = buffer_;
699   size_t bytes_written = 0;
700   
701   while (bytes_written < size)
702     {
703       int retval = write (fd, buffer + bytes_written, size - bytes_written);
704       if (retval >= 0) 
705         bytes_written += retval; 
706       else if (errno != EINTR)
707         return -1;
708     }
709
710   return bytes_written;
711 }
712
713
714 /* Registers our exit handler with atexit() if it has not already
715    been registered. */
716 static void
717 register_atexit (void) 
718 {
719   static int registered = 0;
720   if (!registered) 
721     {
722       registered = 1;
723       atexit (exit_handler);
724     }
725 }
726
727
728
729 /* atexit() handler that closes and deletes our temporary
730    files. */
731 static void
732 exit_handler (void) 
733 {
734   while (casefiles != NULL)
735     casefile_destroy (casefiles);
736 }
737 \f
738 #include <gsl/gsl_rng.h>
739 #include <stdarg.h>
740 #include "command.h"
741 #include "lexer.h"
742
743 static void test_casefile (int pattern, size_t value_cnt, size_t case_cnt);
744 static void get_random_case (struct ccase *, size_t value_cnt,
745                              size_t case_idx);
746 static void write_random_case (struct casefile *cf, size_t case_idx);
747 static void read_and_verify_random_case (struct casefile *cf,
748                                          struct casereader *reader,
749                                          size_t case_idx);
750 static void fail_test (const char *message, ...);
751
752 int
753 cmd_debug_casefile (void) 
754 {
755   static const size_t sizes[] =
756     {
757       1, 2, 3, 4, 5, 6, 7, 14, 15, 16, 17, 31, 55, 73,
758       100, 137, 257, 521, 1031, 2053
759     };
760   int size_max;
761   int case_max;
762   int pattern;
763
764   size_max = sizeof sizes / sizeof *sizes;
765   if (lex_match_id ("SMALL")) 
766     {
767       size_max -= 4;
768       case_max = 511; 
769     }
770   else
771     case_max = 4095;
772   if (token != '.')
773     return lex_end_of_command ();
774     
775   for (pattern = 0; pattern < 6; pattern++) 
776     {
777       const size_t *size;
778
779       for (size = sizes; size < sizes + size_max; size++) 
780         {
781           size_t case_cnt;
782
783           for (case_cnt = 0; case_cnt <= case_max;
784                case_cnt = (case_cnt * 2) + 1)
785             test_casefile (pattern, *size, case_cnt);
786         }
787     }
788   printf ("Casefile tests succeeded.\n");
789   return CMD_SUCCESS;
790 }
791
792 static void
793 test_casefile (int pattern, size_t value_cnt, size_t case_cnt) 
794 {
795   struct casefile *cf;
796   struct casereader *r1, *r2;
797   struct ccase c;
798   gsl_rng *rng;
799   size_t i, j;
800
801   rng = gsl_rng_alloc (gsl_rng_mt19937);
802   cf = casefile_create (value_cnt);
803   if (pattern == 5)
804     casefile_to_disk (cf);
805   for (i = 0; i < case_cnt; i++)
806     write_random_case (cf, i);
807   if (pattern == 5)
808     casefile_sleep (cf);
809   r1 = casefile_get_reader (cf);
810   r2 = casefile_get_reader (cf);
811   switch (pattern) 
812     {
813     case 0:
814     case 5:
815       for (i = 0; i < case_cnt; i++) 
816         {
817           read_and_verify_random_case (cf, r1, i);
818           read_and_verify_random_case (cf, r2, i);
819         } 
820       break;
821     case 1:
822       for (i = 0; i < case_cnt; i++)
823         read_and_verify_random_case (cf, r1, i);
824       for (i = 0; i < case_cnt; i++) 
825         read_and_verify_random_case (cf, r2, i);
826       break;
827     case 2:
828     case 3:
829     case 4:
830       for (i = j = 0; i < case_cnt; i++) 
831         {
832           read_and_verify_random_case (cf, r1, i);
833           if (gsl_rng_get (rng) % pattern == 0) 
834             read_and_verify_random_case (cf, r2, j++); 
835           if (i == case_cnt / 2)
836             casefile_to_disk (cf);
837         }
838       for (; j < case_cnt; j++) 
839         read_and_verify_random_case (cf, r2, j);
840       break;
841     }
842   if (casereader_read (r1, &c))
843     fail_test ("Casereader 1 not at end of file.");
844   if (casereader_read (r2, &c))
845     fail_test ("Casereader 2 not at end of file.");
846   if (pattern != 1)
847     casereader_destroy (r1);
848   if (pattern != 2)
849     casereader_destroy (r2);
850   if (pattern > 2) 
851     {
852       r1 = casefile_get_destructive_reader (cf);
853       for (i = 0; i < case_cnt; i++) 
854         {
855           struct ccase read_case, expected_case;
856           
857           get_random_case (&expected_case, value_cnt, i);
858           if (!casereader_read_xfer (r1, &read_case)) 
859             fail_test ("Premature end of casefile.");
860           for (j = 0; j < value_cnt; j++) 
861             {
862               double a = case_num (&read_case, j);
863               double b = case_num (&expected_case, j);
864               if (a != b)
865                 fail_test ("Case %lu fails comparison.", (unsigned long) i); 
866             }
867           case_destroy (&expected_case);
868           case_destroy (&read_case);
869         }
870       casereader_destroy (r1);
871     }
872   casefile_destroy (cf);
873   gsl_rng_free (rng);
874 }
875
876 static void
877 get_random_case (struct ccase *c, size_t value_cnt, size_t case_idx) 
878 {
879   int i;
880   case_create (c, value_cnt);
881   for (i = 0; i < value_cnt; i++)
882     case_data_rw (c, i)->f = case_idx % 257 + i;
883 }
884
885 static void
886 write_random_case (struct casefile *cf, size_t case_idx) 
887 {
888   struct ccase c;
889   get_random_case (&c, casefile_get_value_cnt (cf), case_idx);
890   casefile_append_xfer (cf, &c);
891 }
892
893 static void
894 read_and_verify_random_case (struct casefile *cf,
895                              struct casereader *reader, size_t case_idx) 
896 {
897   struct ccase read_case, expected_case;
898   size_t value_cnt;
899   size_t i;
900   
901   value_cnt = casefile_get_value_cnt (cf);
902   get_random_case (&expected_case, value_cnt, case_idx);
903   if (!casereader_read (reader, &read_case)) 
904     fail_test ("Premature end of casefile.");
905   for (i = 0; i < value_cnt; i++) 
906     {
907       double a = case_num (&read_case, i);
908       double b = case_num (&expected_case, i);
909       if (a != b)
910         fail_test ("Case %lu fails comparison.", (unsigned long) case_idx); 
911     }
912   case_destroy (&read_case);
913   case_destroy (&expected_case);
914 }
915
916 static void
917 fail_test (const char *message, ...) 
918 {
919   va_list args;
920
921   va_start (args, message);
922   vprintf (message, args);
923   putchar ('\n');
924   va_end (args);
925   
926   exit (1);
927 }