6d74e8e286ee3a8451b90db6951fb2f7599789b1
[pspp-builds.git] / src / casefile.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 2004 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include "casefile.h"
22 #include <assert.h>
23 #include <errno.h>
24 #include <fcntl.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <unistd.h>
29 #include "alloc.h"
30 #include "case.h"
31 #include "error.h"
32 #include "misc.h"
33 #include "settings.h"
34 #include "var.h"
35
36 #ifdef HAVE_VALGRIND_VALGRIND_H
37 #include <valgrind/valgrind.h>
38 #endif
39
40 #define IO_BUF_SIZE (8192 / sizeof (union value))
41
42 /* A casefile is a sequentially accessible array of immutable
43    cases.  It may be stored in memory or on disk as workspace
44    allows.  Cases may be appended to the end of the file.  Cases
45    may be read sequentially starting from the beginning of the
46    file.  Once any cases have been read, no more cases may be
47    appended.  The entire file is discarded at once. */
48
49 /* In-memory cases are arranged in an array of arrays.  The top
50    level is variable size and the size of each bottom level array
51    is fixed at the number of cases defined here.  */
52 #define CASES_PER_BLOCK 128             
53
54 /* A casefile. */
55 struct casefile 
56   {
57     /* Basic data. */
58     struct casefile *next, *prev;       /* Next, prev in global list. */
59     size_t value_cnt;                   /* Case size in `union value's. */
60     size_t case_acct_size;              /* Case size for accounting. */
61     unsigned long case_cnt;             /* Number of cases stored. */
62     enum { MEMORY, DISK } storage;      /* Where cases are stored. */
63     enum { WRITE, READ } mode;          /* Is writing or reading allowed? */
64     struct casereader *readers;         /* List of our readers. */
65     int being_destroyed;                /* Does a destructive reader exist? */
66
67     /* Memory storage. */
68     struct ccase **cases;               /* Pointer to array of cases. */
69
70     /* Disk storage. */
71     int fd;                             /* File descriptor, -1 if none. */
72     char *filename;                     /* Filename. */
73     union value *buffer;                /* I/O buffer, NULL if none. */
74     size_t buffer_used;                 /* Number of values used in buffer. */
75     size_t buffer_size;                 /* Buffer size in values. */
76   };
77
78 /* For reading out the cases in a casefile. */
79 struct casereader 
80   {
81     struct casereader *next, *prev;     /* Next, prev in casefile's list. */
82     struct casefile *cf;                /* Our casefile. */
83     unsigned long case_idx;             /* Case number of current case. */
84     int destructive;                    /* Is this a destructive reader? */
85
86     /* Disk storage. */
87     int fd;                             /* File descriptor. */
88     union value *buffer;                /* I/O buffer. */
89     size_t buffer_pos;                  /* Offset of buffer position. */
90     struct ccase c;                     /* Current case. */
91   };
92
93 /* Doubly linked list of all casefiles. */
94 static struct casefile *casefiles;
95
96 /* Number of bytes of case allocated in in-memory casefiles. */
97 static size_t case_bytes;
98
99 static void register_atexit (void);
100 static void exit_handler (void);
101
102 static void reader_open_file (struct casereader *reader);
103 static void write_case_to_disk (struct casefile *cf, const struct ccase *c);
104 static void flush_buffer (struct casefile *cf);
105 static void fill_buffer (struct casereader *reader);
106
107 static int safe_open (const char *filename, int flags);
108 static int safe_close (int fd);
109 static int full_read (int fd, void *buffer, size_t size);
110 static int full_write (int fd, const void *buffer, size_t size);
111
112 /* Creates and returns a casefile to store cases of VALUE_CNT
113    `union value's each. */
114 struct casefile *
115 casefile_create (size_t value_cnt) 
116 {
117   struct casefile *cf = xmalloc (sizeof *cf);
118   cf->next = casefiles;
119   cf->prev = NULL;
120   if (cf->next != NULL)
121     cf->next->prev = cf;
122   casefiles = cf;
123   cf->value_cnt = value_cnt;
124   cf->case_acct_size = (cf->value_cnt + 4) * sizeof *cf->buffer;
125   cf->case_cnt = 0;
126   cf->storage = MEMORY;
127   cf->mode = WRITE;
128   cf->readers = NULL;
129   cf->being_destroyed = 0;
130   cf->cases = NULL;
131   cf->fd = -1;
132   cf->filename = NULL;
133   cf->buffer = NULL;
134   cf->buffer_size = ROUND_UP (cf->value_cnt, IO_BUF_SIZE);
135   if (cf->value_cnt > 0 && cf->buffer_size % cf->value_cnt > 64)
136     cf->buffer_size = cf->value_cnt;
137   cf->buffer_used = 0;
138   register_atexit ();
139   return cf;
140 }
141
142 /* Destroys casefile CF. */
143 void
144 casefile_destroy (struct casefile *cf) 
145 {
146   if (cf != NULL) 
147     {
148       if (cf->next != NULL)
149         cf->next->prev = cf->prev;
150       if (cf->prev != NULL)
151         cf->prev->next = cf->next;
152       if (casefiles == cf)
153         casefiles = cf->next;
154
155       while (cf->readers != NULL) 
156         casereader_destroy (cf->readers);
157
158       if (cf->cases != NULL) 
159         {
160           size_t idx, block_cnt;
161
162           case_bytes -= cf->case_cnt * cf->case_acct_size;
163           for (idx = 0; idx < cf->case_cnt; idx++)
164             {
165               size_t block_idx = idx / CASES_PER_BLOCK;
166               size_t case_idx = idx % CASES_PER_BLOCK;
167               struct ccase *c = &cf->cases[block_idx][case_idx];
168               case_destroy (c);
169             }
170
171           block_cnt = DIV_RND_UP (cf->case_cnt, CASES_PER_BLOCK);
172           for (idx = 0; idx < block_cnt; idx++)
173             free (cf->cases[idx]);
174
175           free (cf->cases);
176         }
177
178       if (cf->fd != -1)
179         safe_close (cf->fd);
180           
181       if (cf->filename != NULL && remove (cf->filename) == -1) 
182         msg (ME, _("%s: Removing temporary file: %s."),
183              cf->filename, strerror (errno));
184       free (cf->filename);
185
186       free (cf->buffer);
187
188       free (cf);
189     }
190 }
191
192 /* Returns nonzero only if casefile CF is stored in memory (instead of on
193    disk). */
194 int
195 casefile_in_core (const struct casefile *cf) 
196 {
197   assert (cf != NULL);
198
199   return cf->storage == MEMORY;
200 }
201
202 /* Puts a casefile to "sleep", that is, minimizes the resources
203    needed for it by closing its file descriptor and freeing its
204    buffer.  This is useful if we need so many casefiles that we
205    might not have enough memory and file descriptors to go
206    around.
207
208    For simplicity, this implementation always converts the
209    casefile to reader mode.  If this turns out to be a problem,
210    with a little extra work we could also support sleeping
211    writers. */
212 void
213 casefile_sleep (const struct casefile *cf_) 
214 {
215   struct casefile *cf = (struct casefile *) cf_;
216   assert (cf != NULL);
217
218   casefile_mode_reader (cf);
219   casefile_to_disk (cf);
220
221   if (cf->fd != -1) 
222     {
223       safe_close (cf->fd);
224       cf->fd = -1;
225     }
226   if (cf->buffer != NULL) 
227     {
228       free (cf->buffer);
229       cf->buffer = NULL;
230     }
231 }
232
233 /* Returns the number of `union value's in a case for CF. */
234 size_t
235 casefile_get_value_cnt (const struct casefile *cf) 
236 {
237   assert (cf != NULL);
238
239   return cf->value_cnt;
240 }
241
242 /* Returns the number of cases in casefile CF. */
243 unsigned long
244 casefile_get_case_cnt (const struct casefile *cf) 
245 {
246   assert (cf != NULL);
247
248   return cf->case_cnt;
249 }
250
251 /* Appends a copy of case C to casefile CF.  Not valid after any
252    reader for CF has been created. */
253 void
254 casefile_append (struct casefile *cf, const struct ccase *c) 
255 {
256   assert (cf != NULL);
257   assert (c != NULL);
258   assert (cf->mode == WRITE);
259
260   /* Try memory first. */
261   if (cf->storage == MEMORY) 
262     {
263       if (case_bytes < get_max_workspace ())
264         {
265           size_t block_idx = cf->case_cnt / CASES_PER_BLOCK;
266           size_t case_idx = cf->case_cnt % CASES_PER_BLOCK;
267           struct ccase new_case;
268
269           case_bytes += cf->case_acct_size;
270           case_clone (&new_case, c);
271           if (case_idx == 0) 
272             {
273               if ((block_idx & (block_idx - 1)) == 0) 
274                 {
275                   size_t block_cap = block_idx == 0 ? 1 : block_idx * 2;
276                   cf->cases = xrealloc (cf->cases,
277                                         sizeof *cf->cases * block_cap);
278                 }
279
280               cf->cases[block_idx] = xmalloc (sizeof **cf->cases
281                                               * CASES_PER_BLOCK);
282             }
283
284           case_move (&cf->cases[block_idx][case_idx], &new_case);
285         }
286       else
287         {
288           casefile_to_disk (cf);
289           assert (cf->storage == DISK);
290           write_case_to_disk (cf, c);
291         }
292     }
293   else
294     write_case_to_disk (cf, c);
295
296   cf->case_cnt++;
297 }
298
299 /* Appends case C to casefile CF, which takes over ownership of
300    C.  Not valid after any reader for CF has been created. */
301 void
302 casefile_append_xfer (struct casefile *cf, struct ccase *c) 
303 {
304   casefile_append (cf, c);
305   case_destroy (c);
306 }
307
308 /* Writes case C to casefile CF's disk buffer, first flushing the buffer to
309    disk if it would otherwise overflow. */
310 static void
311 write_case_to_disk (struct casefile *cf, const struct ccase *c) 
312 {
313   case_to_values (c, cf->buffer + cf->buffer_used, cf->value_cnt);
314   cf->buffer_used += cf->value_cnt;
315   if (cf->buffer_used + cf->value_cnt > cf->buffer_size)
316     flush_buffer (cf);
317 }
318
319 /* If any bytes in CF's output buffer are used, flush them to
320    disk. */
321 static void
322 flush_buffer (struct casefile *cf) 
323 {
324   if (cf->buffer_used > 0) 
325     {
326       if (!full_write (cf->fd, cf->buffer,
327                        cf->buffer_size * sizeof *cf->buffer)) 
328         msg (FE, _("Error writing temporary file: %s."), strerror (errno));
329
330       cf->buffer_used = 0;
331     } 
332 }
333
334 /* Creates a temporary file and stores its name in *FILENAME and
335    a file descriptor for it in *FD.  Returns success.  Caller is
336    responsible for freeing *FILENAME. */
337 static int
338 make_temp_file (int *fd, char **filename)
339 {
340   const char *parent_dir;
341
342   assert (filename != NULL);
343   assert (fd != NULL);
344
345   if (getenv ("TMPDIR") != NULL)
346     parent_dir = getenv ("TMPDIR");
347   else
348     parent_dir = P_tmpdir;
349
350   *filename = xmalloc (strlen (parent_dir) + 32);
351   sprintf (*filename, "%s%cpsppXXXXXX", parent_dir, DIR_SEPARATOR);
352   *fd = mkstemp (*filename);
353   if (*fd < 0)
354     {
355       msg (FE, _("%s: Creating temporary file: %s."),
356            *filename, strerror (errno));
357       free (*filename);
358       *filename = NULL;
359       return 0;
360     }
361   return 1;
362 }
363
364 /* If CF is currently stored in memory, writes it to disk.  Readers, if any,
365    retain their current positions. */
366 void
367 casefile_to_disk (const struct casefile *cf_) 
368 {
369   struct casefile *cf = (struct casefile *) cf_;
370   struct casereader *reader;
371   
372   assert (cf != NULL);
373
374   if (cf->storage == MEMORY)
375     {
376       size_t idx, block_cnt;
377       
378       assert (cf->filename == NULL);
379       assert (cf->fd == -1);
380       assert (cf->buffer_used == 0);
381
382       cf->storage = DISK;
383       if (!make_temp_file (&cf->fd, &cf->filename))
384         err_failure ();
385       cf->buffer = xmalloc (cf->buffer_size * sizeof *cf->buffer);
386       memset (cf->buffer, 0, cf->buffer_size * sizeof *cf->buffer);
387
388       case_bytes -= cf->case_cnt * cf->case_acct_size;
389       for (idx = 0; idx < cf->case_cnt; idx++)
390         {
391           size_t block_idx = idx / CASES_PER_BLOCK;
392           size_t case_idx = idx % CASES_PER_BLOCK;
393           struct ccase *c = &cf->cases[block_idx][case_idx];
394           write_case_to_disk (cf, c);
395           case_destroy (c);
396         }
397
398       block_cnt = DIV_RND_UP (cf->case_cnt, CASES_PER_BLOCK);
399       for (idx = 0; idx < block_cnt; idx++)
400         free (cf->cases[idx]);
401
402       free (cf->cases);
403       cf->cases = NULL;
404
405       if (cf->mode == READ)
406         flush_buffer (cf);
407
408       for (reader = cf->readers; reader != NULL; reader = reader->next)
409         reader_open_file (reader);
410     }
411 }
412
413 /* Changes CF to reader mode, ensuring that no more cases may be
414    added.  Creating a casereader for CF has the same effect. */
415 void
416 casefile_mode_reader (struct casefile *cf) 
417 {
418   assert (cf != NULL);
419   cf->mode = READ;
420 }
421
422 /* Creates and returns a casereader for CF.  A casereader can be used to
423    sequentially read the cases in a casefile. */
424 struct casereader *
425 casefile_get_reader (const struct casefile *cf_) 
426 {
427   struct casefile *cf = (struct casefile *) cf_;
428   struct casereader *reader;
429
430   assert (cf != NULL);
431   assert (!cf->being_destroyed);
432
433   /* Flush the buffer to disk if it's not empty. */
434   if (cf->mode == WRITE && cf->storage == DISK)
435     flush_buffer (cf);
436   
437   cf->mode = READ;
438
439   reader = xmalloc (sizeof *reader);
440   reader->cf = cf;
441   reader->next = cf->readers;
442   if (cf->readers != NULL)
443     reader->next->prev = reader;
444   reader->prev = NULL;
445   cf->readers = reader;
446   reader->case_idx = 0;
447   reader->fd = -1;
448   reader->buffer = NULL;
449   reader->buffer_pos = 0;
450   case_nullify (&reader->c);
451
452   if (reader->cf->storage == DISK) 
453     reader_open_file (reader);
454
455   return reader;
456 }
457
458 /* Creates and returns a destructive casereader for CF.  Like a
459    normal casereader, a destructive casereader sequentially reads
460    the cases in a casefile.  Unlike a normal casereader, a
461    destructive reader cannot operate concurrently with any other
462    reader.  (This restriction could be relaxed in a few ways, but
463    it is so far unnecessary for other code.) */
464 struct casereader *
465 casefile_get_destructive_reader (struct casefile *cf) 
466 {
467   struct casereader *reader;
468   
469   assert (cf->readers == NULL);
470   reader = casefile_get_reader (cf);
471   reader->destructive = 1;
472   cf->being_destroyed = 1;
473   return reader;
474 }
475
476 /* Opens a disk file for READER and seeks to the current position as indicated
477    by case_idx.  Normally the current position is the beginning of the file,
478    but casefile_to_disk may cause the file to be opened at a different
479    position. */
480 static void
481 reader_open_file (struct casereader *reader) 
482 {
483   struct casefile *cf = reader->cf;
484   off_t file_ofs;
485
486   if (reader->case_idx >= cf->case_cnt)
487     return;
488
489   if (cf->fd != -1) 
490     {
491       reader->fd = cf->fd;
492       cf->fd = -1;
493     }
494   else 
495     {
496       reader->fd = safe_open (cf->filename, O_RDONLY);
497       if (reader->fd < 0)
498         msg (FE, _("%s: Opening temporary file: %s."),
499              cf->filename, strerror (errno));
500     }
501
502   if (cf->buffer != NULL) 
503     {
504       reader->buffer = cf->buffer;
505       cf->buffer = NULL; 
506     }
507   else 
508     {
509       reader->buffer = xmalloc (cf->buffer_size * sizeof *cf->buffer);
510       memset (reader->buffer, 0, cf->buffer_size * sizeof *cf->buffer); 
511     }
512
513   if (cf->value_cnt != 0) 
514     {
515       size_t buffer_case_cnt = cf->buffer_size / cf->value_cnt;
516       file_ofs = ((off_t) reader->case_idx / buffer_case_cnt
517                   * cf->buffer_size * sizeof *cf->buffer);
518       reader->buffer_pos = (reader->case_idx % buffer_case_cnt
519                             * cf->value_cnt);
520     }
521   else 
522     file_ofs = 0;
523   if (lseek (reader->fd, file_ofs, SEEK_SET) != file_ofs)
524     msg (FE, _("%s: Seeking temporary file: %s."),
525          cf->filename, strerror (errno));
526
527   if (cf->case_cnt > 0 && cf->value_cnt > 0)
528     fill_buffer (reader);
529
530   case_create (&reader->c, cf->value_cnt);
531 }
532
533 /* Fills READER's buffer by reading a block from disk. */
534 static void
535 fill_buffer (struct casereader *reader)
536 {
537   int retval = full_read (reader->fd, reader->buffer,
538                           reader->cf->buffer_size * sizeof *reader->buffer);
539   if (retval < 0)
540     msg (FE, _("%s: Reading temporary file: %s."),
541          reader->cf->filename, strerror (errno));
542   else if (retval != reader->cf->buffer_size * sizeof *reader->buffer)
543     msg (FE, _("%s: Temporary file ended unexpectedly."),
544          reader->cf->filename); 
545 }
546
547 /* Returns the casefile that READER reads. */
548 const struct casefile *
549 casereader_get_casefile (const struct casereader *reader) 
550 {
551   assert (reader != NULL);
552   
553   return reader->cf;
554 }
555
556 /* Reads a copy of the next case from READER into C.
557    Caller is responsible for destroying C. */
558 int
559 casereader_read (struct casereader *reader, struct ccase *c) 
560 {
561   assert (reader != NULL);
562   
563   if (reader->case_idx >= reader->cf->case_cnt) 
564     return 0;
565
566   if (reader->cf->storage == MEMORY) 
567     {
568       size_t block_idx = reader->case_idx / CASES_PER_BLOCK;
569       size_t case_idx = reader->case_idx % CASES_PER_BLOCK;
570
571       case_clone (c, &reader->cf->cases[block_idx][case_idx]);
572       reader->case_idx++;
573       return 1;
574     }
575   else 
576     {
577       if (reader->buffer_pos + reader->cf->value_cnt > reader->cf->buffer_size)
578         {
579           fill_buffer (reader);
580           reader->buffer_pos = 0;
581         }
582
583       case_from_values (&reader->c, reader->buffer + reader->buffer_pos,
584                         reader->cf->value_cnt);
585       reader->buffer_pos += reader->cf->value_cnt;
586       reader->case_idx++;
587
588       case_clone (c, &reader->c);
589       return 1;
590     }
591 }
592
593 /* Reads the next case from READER into C and transfers ownership
594    to the caller.  Caller is responsible for destroying C. */
595 int
596 casereader_read_xfer (struct casereader *reader, struct ccase *c)
597 {
598   assert (reader != NULL);
599
600   if (reader->destructive == 0
601       || reader->case_idx >= reader->cf->case_cnt
602       || reader->cf->storage == DISK) 
603     return casereader_read (reader, c);
604   else 
605     {
606       size_t block_idx = reader->case_idx / CASES_PER_BLOCK;
607       size_t case_idx = reader->case_idx % CASES_PER_BLOCK;
608       struct ccase *read_case = &reader->cf->cases[block_idx][case_idx];
609
610       case_move (c, read_case);
611       reader->case_idx++;
612       return 1;
613     }
614 }
615
616 /* Destroys READER. */
617 void
618 casereader_destroy (struct casereader *reader)
619 {
620   assert (reader != NULL);
621
622   if (reader->next != NULL)
623     reader->next->prev = reader->prev;
624   if (reader->prev != NULL)
625     reader->prev->next = reader->next;
626   if (reader->cf->readers == reader)
627     reader->cf->readers = reader->next;
628
629   if (reader->cf->buffer == NULL)
630     reader->cf->buffer = reader->buffer;
631   else
632     free (reader->buffer);
633
634   if (reader->fd != -1) 
635     {
636       if (reader->cf->fd == -1)
637         reader->cf->fd = reader->fd;
638       else
639         safe_close (reader->fd);
640     }
641   
642   case_destroy (&reader->c);
643
644   free (reader);
645 }
646
647 /* Calls open(), passing FILENAME and FLAGS, repeating as necessary
648    to deal with interrupted calls. */
649 static int
650 safe_open (const char *filename, int flags) 
651 {
652   int fd;
653
654   do 
655     {
656       fd = open (filename, flags);
657     }
658   while (fd == -1 && errno == EINTR);
659
660   return fd;
661 }
662
663 /* Calls close(), passing FD, repeating as necessary to deal with
664    interrupted calls. */
665 static int safe_close (int fd) 
666 {
667   int retval;
668
669   do 
670     {
671       retval = close (fd);
672     }
673   while (retval == -1 && errno == EINTR);
674
675   return retval;
676 }
677
678 /* Calls read(), passing FD, BUFFER, and SIZE, repeating as
679    necessary to deal with interrupted calls. */
680 static int
681 full_read (int fd, void *buffer_, size_t size) 
682 {
683   char *buffer = buffer_;
684   size_t bytes_read = 0;
685   
686   while (bytes_read < size)
687     {
688       int retval = read (fd, buffer + bytes_read, size - bytes_read);
689       if (retval > 0) 
690         bytes_read += retval; 
691       else if (retval == 0) 
692         return bytes_read;
693       else if (errno != EINTR)
694         return -1;
695     }
696
697   return bytes_read;
698 }
699
700 /* Calls write(), passing FD, BUFFER, and SIZE, repeating as
701    necessary to deal with interrupted calls. */
702 static int
703 full_write (int fd, const void *buffer_, size_t size) 
704 {
705   const char *buffer = buffer_;
706   size_t bytes_written = 0;
707   
708   while (bytes_written < size)
709     {
710       int retval = write (fd, buffer + bytes_written, size - bytes_written);
711       if (retval >= 0) 
712         bytes_written += retval; 
713       else if (errno != EINTR)
714         return -1;
715     }
716
717   return bytes_written;
718 }
719
720 /* Registers our exit handler with atexit() if it has not already
721    been registered. */
722 static void
723 register_atexit (void) 
724 {
725   static int registered = 0;
726   if (!registered) 
727     {
728       registered = 1;
729       atexit (exit_handler);
730     }
731 }
732
733 /* atexit() handler that closes and deletes our temporary
734    files. */
735 static void
736 exit_handler (void) 
737 {
738   while (casefiles != NULL)
739     casefile_destroy (casefiles);
740 }
741 \f
742 #include <gsl/gsl_rng.h>
743 #include <stdarg.h>
744 #include "command.h"
745 #include "lexer.h"
746
747 static void test_casefile (int pattern, size_t value_cnt, size_t case_cnt);
748 static void get_random_case (struct ccase *, size_t value_cnt,
749                              size_t case_idx);
750 static void write_random_case (struct casefile *cf, size_t case_idx);
751 static void read_and_verify_random_case (struct casefile *cf,
752                                          struct casereader *reader,
753                                          size_t case_idx);
754 static void fail_test (const char *message, ...);
755
756 int
757 cmd_debug_casefile (void) 
758 {
759   static const size_t sizes[] =
760     {
761       1, 2, 3, 4, 5, 6, 7, 14, 15, 16, 17, 31, 55, 73,
762       100, 137, 257, 521, 1031, 2053
763     };
764   int size_max;
765   int case_max;
766   int pattern;
767
768   size_max = sizeof sizes / sizeof *sizes;
769   if (lex_match_id ("SMALL")) 
770     {
771       size_max -= 4;
772       case_max = 511; 
773     }
774   else
775     case_max = 4095;
776   if (token != '.')
777     return lex_end_of_command ();
778     
779   for (pattern = 0; pattern < 5; pattern++) 
780     {
781       const size_t *size;
782
783       for (size = sizes; size < sizes + size_max; size++) 
784         {
785           size_t case_cnt;
786
787           for (case_cnt = 0; case_cnt <= case_max;
788                case_cnt = (case_cnt * 2) + 1)
789             test_casefile (pattern, *size, case_cnt);
790         }
791     }
792   printf ("Casefile tests succeeded.\n");
793   return CMD_SUCCESS;
794 }
795
796 static void
797 test_casefile (int pattern, size_t value_cnt, size_t case_cnt) 
798 {
799   int zero = 0;
800   struct casefile *cf;
801   struct casereader *r1, *r2;
802   struct ccase c;
803   gsl_rng *rng;
804   size_t i, j;
805
806   rng = gsl_rng_alloc (gsl_rng_mt19937);
807   cf = casefile_create (value_cnt);
808   for (i = 0; i < case_cnt; i++)
809     write_random_case (cf, i);
810   r1 = casefile_get_reader (cf);
811   r2 = casefile_get_reader (cf);
812   switch (pattern) 
813     {
814     case 0:
815       for (i = 0; i < case_cnt; i++) 
816         {
817           read_and_verify_random_case (cf, r1, i);
818           read_and_verify_random_case (cf, r2, i);
819         } 
820       break;
821     case 1:
822       for (i = 0; i < case_cnt; i++)
823         read_and_verify_random_case (cf, r1, i);
824       for (i = 0; i < case_cnt; i++) 
825         read_and_verify_random_case (cf, r2, i);
826       break;
827     case 2:
828     case 3:
829     case 4:
830       for (i = j = 0; i < case_cnt; i++) 
831         {
832           read_and_verify_random_case (cf, r1, i);
833           if (gsl_rng_get (rng) % pattern == 0) 
834             read_and_verify_random_case (cf, r2, j++); 
835           if (i == case_cnt / 2)
836             casefile_to_disk (cf);
837         }
838       for (; j < case_cnt; j++) 
839         read_and_verify_random_case (cf, r2, j);
840       break;
841     }
842   if (casereader_read (r1, &c))
843     fail_test ("Casereader 1 not at end of file.");
844   if (casereader_read (r2, &c))
845     fail_test ("Casereader 2 not at end of file.");
846   if (pattern != 1)
847     casereader_destroy (r1);
848   if (pattern != 2)
849     casereader_destroy (r2);
850   if (pattern > 2) 
851     {
852       r1 = casefile_get_destructive_reader (cf);
853       for (i = 0; i < case_cnt; i++) 
854         {
855           struct ccase read_case, expected_case;
856           
857           get_random_case (&expected_case, value_cnt, i);
858           if (!casereader_read_xfer (r1, &read_case)) 
859             fail_test ("Premature end of casefile.");
860           for (j = 0; j < value_cnt; j++) 
861             {
862               double a = case_num (&read_case, j);
863               double b = case_num (&expected_case, j);
864               if (a != b)
865                 fail_test ("Case %lu fails comparison.", (unsigned long) i); 
866             }
867           case_destroy (&expected_case);
868           case_destroy (&read_case);
869         }
870       casereader_destroy (r1);
871     }
872   casefile_destroy (cf);
873   gsl_rng_free (rng);
874 }
875
876 static void
877 get_random_case (struct ccase *c, size_t value_cnt, size_t case_idx) 
878 {
879   int i;
880   case_create (c, value_cnt);
881   for (i = 0; i < value_cnt; i++)
882     case_data_rw (c, i)->f = case_idx % 257 + i;
883 }
884
885 static void
886 write_random_case (struct casefile *cf, size_t case_idx) 
887 {
888   struct ccase c;
889   get_random_case (&c, casefile_get_value_cnt (cf), case_idx);
890   casefile_append_xfer (cf, &c);
891 }
892
893 static void
894 read_and_verify_random_case (struct casefile *cf,
895                              struct casereader *reader, size_t case_idx) 
896 {
897   struct ccase read_case, expected_case;
898   size_t value_cnt;
899   size_t i;
900   
901   value_cnt = casefile_get_value_cnt (cf);
902   get_random_case (&expected_case, value_cnt, case_idx);
903   if (!casereader_read (reader, &read_case)) 
904     fail_test ("Premature end of casefile.");
905   for (i = 0; i < value_cnt; i++) 
906     {
907       double a = case_num (&read_case, i);
908       double b = case_num (&expected_case, i);
909       if (a != b)
910         fail_test ("Case %lu fails comparison.", (unsigned long) case_idx); 
911     }
912   case_destroy (&read_case);
913   case_destroy (&expected_case);
914 }
915
916 static void
917 fail_test (const char *message, ...) 
918 {
919   va_list args;
920
921   va_start (args, message);
922   vprintf (message, args);
923   putchar ('\n');
924   va_end (args);
925   
926   exit (1);
927 }