Added CONFIGUREFLAGS option to Smake.
[pspp-builds.git] / src / casefile.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 2004 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21 #include "casefile.h"
22 #include <assert.h>
23 #include <errno.h>
24 #include <fcntl.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <unistd.h>
29 #include "alloc.h"
30 #include "case.h"
31 #include "error.h"
32 #include "misc.h"
33 #include "mkfile.h"
34 #include "settings.h"
35 #include "var.h"
36
37 #define IO_BUF_SIZE (8192 / sizeof (union value))
38
39 /* A casefile is a sequentially accessible array of immutable
40    cases.  It may be stored in memory or on disk as workspace
41    allows.  Cases may be appended to the end of the file.  Cases
42    may be read sequentially starting from the beginning of the
43    file.  Once any cases have been read, no more cases may be
44    appended.  The entire file is discarded at once. */
45
46 /* In-memory cases are arranged in an array of arrays.  The top
47    level is variable size and the size of each bottom level array
48    is fixed at the number of cases defined here.  */
49 #define CASES_PER_BLOCK 128             
50
51 /* A casefile. */
52 struct casefile 
53   {
54     /* Basic data. */
55     struct casefile *next, *prev;       /* Next, prev in global list. */
56     size_t value_cnt;                   /* Case size in `union value's. */
57     size_t case_acct_size;              /* Case size for accounting. */
58     unsigned long case_cnt;             /* Number of cases stored. */
59     enum { MEMORY, DISK } storage;      /* Where cases are stored. */
60     enum { WRITE, READ } mode;          /* Is writing or reading allowed? */
61     struct casereader *readers;         /* List of our readers. */
62     int being_destroyed;                /* Does a destructive reader exist? */
63
64     /* Memory storage. */
65     struct ccase **cases;               /* Pointer to array of cases. */
66
67     /* Disk storage. */
68     int fd;                             /* File descriptor, -1 if none. */
69     char *filename;                     /* Filename. */
70     union value *buffer;                /* I/O buffer, NULL if none. */
71     size_t buffer_used;                 /* Number of values used in buffer. */
72     size_t buffer_size;                 /* Buffer size in values. */
73   };
74
75 /* For reading out the cases in a casefile. */
76 struct casereader 
77   {
78     struct casereader *next, *prev;     /* Next, prev in casefile's list. */
79     struct casefile *cf;                /* Our casefile. */
80     unsigned long case_idx;             /* Case number of current case. */
81     int destructive;                    /* Is this a destructive reader? */
82
83     /* Disk storage. */
84     int fd;                             /* File descriptor. */
85     union value *buffer;                /* I/O buffer. */
86     size_t buffer_pos;                  /* Offset of buffer position. */
87     struct ccase c;                     /* Current case. */
88   };
89
90 /* Return the case number of the current case */
91 unsigned long
92 casereader_cnum(const struct casereader *r)
93 {
94   return r->case_idx;
95 }
96
97 /* Doubly linked list of all casefiles. */
98 static struct casefile *casefiles;
99
100 /* Number of bytes of case allocated in in-memory casefiles. */
101 static size_t case_bytes;
102
103 static void register_atexit (void);
104 static void exit_handler (void);
105
106 static void reader_open_file (struct casereader *reader);
107 static void write_case_to_disk (struct casefile *cf, const struct ccase *c);
108 static void flush_buffer (struct casefile *cf);
109 static void fill_buffer (struct casereader *reader);
110
111 static int safe_open (const char *filename, int flags);
112 static int safe_close (int fd);
113 static int full_read (int fd, void *buffer, size_t size);
114 static int full_write (int fd, const void *buffer, size_t size);
115
116 /* Creates and returns a casefile to store cases of VALUE_CNT
117    `union value's each. */
118 struct casefile *
119 casefile_create (size_t value_cnt) 
120 {
121   struct casefile *cf = xmalloc (sizeof *cf);
122   cf->next = casefiles;
123   cf->prev = NULL;
124   if (cf->next != NULL)
125     cf->next->prev = cf;
126   casefiles = cf;
127   cf->value_cnt = value_cnt;
128   cf->case_acct_size = (cf->value_cnt + 4) * sizeof *cf->buffer;
129   cf->case_cnt = 0;
130   cf->storage = MEMORY;
131   cf->mode = WRITE;
132   cf->readers = NULL;
133   cf->being_destroyed = 0;
134   cf->cases = NULL;
135   cf->fd = -1;
136   cf->filename = NULL;
137   cf->buffer = NULL;
138   cf->buffer_size = ROUND_UP (cf->value_cnt, IO_BUF_SIZE);
139   if (cf->value_cnt > 0 && cf->buffer_size % cf->value_cnt > 64)
140     cf->buffer_size = cf->value_cnt;
141   cf->buffer_used = 0;
142   register_atexit ();
143   return cf;
144 }
145
146 /* Destroys casefile CF. */
147 void
148 casefile_destroy (struct casefile *cf) 
149 {
150   if (cf != NULL) 
151     {
152       if (cf->next != NULL)
153         cf->next->prev = cf->prev;
154       if (cf->prev != NULL)
155         cf->prev->next = cf->next;
156       if (casefiles == cf)
157         casefiles = cf->next;
158
159       while (cf->readers != NULL) 
160         casereader_destroy (cf->readers);
161
162       if (cf->cases != NULL) 
163         {
164           size_t idx, block_cnt;
165
166           case_bytes -= cf->case_cnt * cf->case_acct_size;
167           for (idx = 0; idx < cf->case_cnt; idx++)
168             {
169               size_t block_idx = idx / CASES_PER_BLOCK;
170               size_t case_idx = idx % CASES_PER_BLOCK;
171               struct ccase *c = &cf->cases[block_idx][case_idx];
172               case_destroy (c);
173             }
174
175           block_cnt = DIV_RND_UP (cf->case_cnt, CASES_PER_BLOCK);
176           for (idx = 0; idx < block_cnt; idx++)
177             free (cf->cases[idx]);
178
179           free (cf->cases);
180         }
181
182       if (cf->fd != -1)
183         safe_close (cf->fd);
184           
185       if (cf->filename != NULL && remove (cf->filename) == -1) 
186         msg (ME, _("%s: Removing temporary file: %s."),
187              cf->filename, strerror (errno));
188       free (cf->filename);
189
190       free (cf->buffer);
191
192       free (cf);
193     }
194 }
195
196 /* Returns nonzero only if casefile CF is stored in memory (instead of on
197    disk). */
198 int
199 casefile_in_core (const struct casefile *cf) 
200 {
201   assert (cf != NULL);
202
203   return cf->storage == MEMORY;
204 }
205
206 /* Puts a casefile to "sleep", that is, minimizes the resources
207    needed for it by closing its file descriptor and freeing its
208    buffer.  This is useful if we need so many casefiles that we
209    might not have enough memory and file descriptors to go
210    around.
211
212    For simplicity, this implementation always converts the
213    casefile to reader mode.  If this turns out to be a problem,
214    with a little extra work we could also support sleeping
215    writers. */
216 void
217 casefile_sleep (const struct casefile *cf_) 
218 {
219   struct casefile *cf = (struct casefile *) cf_;
220   assert (cf != NULL);
221
222   casefile_mode_reader (cf);
223   casefile_to_disk (cf);
224   flush_buffer (cf);
225
226   if (cf->fd != -1) 
227     {
228       safe_close (cf->fd);
229       cf->fd = -1;
230     }
231   if (cf->buffer != NULL) 
232     {
233       free (cf->buffer);
234       cf->buffer = NULL;
235     }
236 }
237
238 /* Returns the number of `union value's in a case for CF. */
239 size_t
240 casefile_get_value_cnt (const struct casefile *cf) 
241 {
242   assert (cf != NULL);
243
244   return cf->value_cnt;
245 }
246
247 /* Returns the number of cases in casefile CF. */
248 unsigned long
249 casefile_get_case_cnt (const struct casefile *cf) 
250 {
251   assert (cf != NULL);
252
253   return cf->case_cnt;
254 }
255
256 /* Appends a copy of case C to casefile CF.  Not valid after any
257    reader for CF has been created. */
258 void
259 casefile_append (struct casefile *cf, const struct ccase *c) 
260 {
261   assert (cf != NULL);
262   assert (c != NULL);
263   assert (cf->mode == WRITE);
264
265   /* Try memory first. */
266   if (cf->storage == MEMORY) 
267     {
268       if (case_bytes < get_max_workspace ())
269         {
270           size_t block_idx = cf->case_cnt / CASES_PER_BLOCK;
271           size_t case_idx = cf->case_cnt % CASES_PER_BLOCK;
272           struct ccase new_case;
273
274           case_bytes += cf->case_acct_size;
275           case_clone (&new_case, c);
276           if (case_idx == 0) 
277             {
278               if ((block_idx & (block_idx - 1)) == 0) 
279                 {
280                   size_t block_cap = block_idx == 0 ? 1 : block_idx * 2;
281                   cf->cases = xrealloc (cf->cases,
282                                         sizeof *cf->cases * block_cap);
283                 }
284
285               cf->cases[block_idx] = xmalloc (sizeof **cf->cases
286                                               * CASES_PER_BLOCK);
287             }
288
289           case_move (&cf->cases[block_idx][case_idx], &new_case);
290         }
291       else
292         {
293           casefile_to_disk (cf);
294           assert (cf->storage == DISK);
295           write_case_to_disk (cf, c);
296         }
297     }
298   else
299     write_case_to_disk (cf, c);
300
301   cf->case_cnt++;
302 }
303
304 /* Appends case C to casefile CF, which takes over ownership of
305    C.  Not valid after any reader for CF has been created. */
306 void
307 casefile_append_xfer (struct casefile *cf, struct ccase *c) 
308 {
309   casefile_append (cf, c);
310   case_destroy (c);
311 }
312
313 /* Writes case C to casefile CF's disk buffer, first flushing the buffer to
314    disk if it would otherwise overflow. */
315 static void
316 write_case_to_disk (struct casefile *cf, const struct ccase *c) 
317 {
318   case_to_values (c, cf->buffer + cf->buffer_used, cf->value_cnt);
319   cf->buffer_used += cf->value_cnt;
320   if (cf->buffer_used + cf->value_cnt > cf->buffer_size)
321     flush_buffer (cf);
322 }
323
324 /* If any bytes in CF's output buffer are used, flush them to
325    disk. */
326 static void
327 flush_buffer (struct casefile *cf) 
328 {
329   if (cf->buffer_used > 0) 
330     {
331       if (!full_write (cf->fd, cf->buffer,
332                        cf->buffer_size * sizeof *cf->buffer)) 
333         msg (FE, _("Error writing temporary file: %s."), strerror (errno));
334
335       cf->buffer_used = 0;
336     } 
337 }
338
339
340 /* If CF is currently stored in memory, writes it to disk.  Readers, if any,
341    retain their current positions. */
342 void
343 casefile_to_disk (const struct casefile *cf_) 
344 {
345   struct casefile *cf = (struct casefile *) cf_;
346   struct casereader *reader;
347   
348   assert (cf != NULL);
349
350   if (cf->storage == MEMORY)
351     {
352       size_t idx, block_cnt;
353       
354       assert (cf->filename == NULL);
355       assert (cf->fd == -1);
356       assert (cf->buffer_used == 0);
357
358       cf->storage = DISK;
359       if (!make_temp_file (&cf->fd, &cf->filename))
360         err_failure ();
361       cf->buffer = xmalloc (cf->buffer_size * sizeof *cf->buffer);
362       memset (cf->buffer, 0, cf->buffer_size * sizeof *cf->buffer);
363
364       case_bytes -= cf->case_cnt * cf->case_acct_size;
365       for (idx = 0; idx < cf->case_cnt; idx++)
366         {
367           size_t block_idx = idx / CASES_PER_BLOCK;
368           size_t case_idx = idx % CASES_PER_BLOCK;
369           struct ccase *c = &cf->cases[block_idx][case_idx];
370           write_case_to_disk (cf, c);
371           case_destroy (c);
372         }
373
374       block_cnt = DIV_RND_UP (cf->case_cnt, CASES_PER_BLOCK);
375       for (idx = 0; idx < block_cnt; idx++)
376         free (cf->cases[idx]);
377
378       free (cf->cases);
379       cf->cases = NULL;
380
381       if (cf->mode == READ)
382         flush_buffer (cf);
383
384       for (reader = cf->readers; reader != NULL; reader = reader->next)
385         reader_open_file (reader);
386     }
387 }
388
389 /* Changes CF to reader mode, ensuring that no more cases may be
390    added.  Creating a casereader for CF has the same effect. */
391 void
392 casefile_mode_reader (struct casefile *cf) 
393 {
394   assert (cf != NULL);
395   cf->mode = READ;
396 }
397
398 /* Creates and returns a casereader for CF.  A casereader can be used to
399    sequentially read the cases in a casefile. */
400 struct casereader *
401 casefile_get_reader (const struct casefile *cf_) 
402 {
403   struct casefile *cf = (struct casefile *) cf_;
404   struct casereader *reader;
405
406   assert (cf != NULL);
407   assert (!cf->being_destroyed);
408
409   /* Flush the buffer to disk if it's not empty. */
410   if (cf->mode == WRITE && cf->storage == DISK)
411     flush_buffer (cf);
412   
413   cf->mode = READ;
414
415   reader = xmalloc (sizeof *reader);
416   reader->next = cf->readers;
417   if (cf->readers != NULL)
418     reader->next->prev = reader;
419   cf->readers = reader;
420   reader->prev = NULL;
421   reader->cf = cf;
422   reader->case_idx = 0;
423   reader->destructive = 0;
424   reader->fd = -1;
425   reader->buffer = NULL;
426   reader->buffer_pos = 0;
427   case_nullify (&reader->c);
428
429   if (reader->cf->storage == DISK) 
430     reader_open_file (reader);
431
432   return reader;
433 }
434
435 /* Creates and returns a destructive casereader for CF.  Like a
436    normal casereader, a destructive casereader sequentially reads
437    the cases in a casefile.  Unlike a normal casereader, a
438    destructive reader cannot operate concurrently with any other
439    reader.  (This restriction could be relaxed in a few ways, but
440    it is so far unnecessary for other code.) */
441 struct casereader *
442 casefile_get_destructive_reader (struct casefile *cf) 
443 {
444   struct casereader *reader;
445   
446   assert (cf->readers == NULL);
447   reader = casefile_get_reader (cf);
448   reader->destructive = 1;
449   cf->being_destroyed = 1;
450   return reader;
451 }
452
453 /* Opens a disk file for READER and seeks to the current position as indicated
454    by case_idx.  Normally the current position is the beginning of the file,
455    but casefile_to_disk may cause the file to be opened at a different
456    position. */
457 static void
458 reader_open_file (struct casereader *reader) 
459 {
460   struct casefile *cf = reader->cf;
461   off_t file_ofs;
462
463   if (reader->case_idx >= cf->case_cnt)
464     return;
465
466   if (cf->fd != -1) 
467     {
468       reader->fd = cf->fd;
469       cf->fd = -1;
470     }
471   else 
472     {
473       reader->fd = safe_open (cf->filename, O_RDONLY);
474       if (reader->fd < 0)
475         msg (FE, _("%s: Opening temporary file: %s."),
476              cf->filename, strerror (errno));
477     }
478
479   if (cf->buffer != NULL) 
480     {
481       reader->buffer = cf->buffer;
482       cf->buffer = NULL; 
483     }
484   else 
485     {
486       reader->buffer = xmalloc (cf->buffer_size * sizeof *cf->buffer);
487       memset (reader->buffer, 0, cf->buffer_size * sizeof *cf->buffer); 
488     }
489
490   if (cf->value_cnt != 0) 
491     {
492       size_t buffer_case_cnt = cf->buffer_size / cf->value_cnt;
493       file_ofs = ((off_t) reader->case_idx / buffer_case_cnt
494                   * cf->buffer_size * sizeof *cf->buffer);
495       reader->buffer_pos = (reader->case_idx % buffer_case_cnt
496                             * cf->value_cnt);
497     }
498   else 
499     file_ofs = 0;
500   if (lseek (reader->fd, file_ofs, SEEK_SET) != file_ofs)
501     msg (FE, _("%s: Seeking temporary file: %s."),
502          cf->filename, strerror (errno));
503
504   if (cf->case_cnt > 0 && cf->value_cnt > 0)
505     fill_buffer (reader);
506
507   case_create (&reader->c, cf->value_cnt);
508 }
509
510 /* Fills READER's buffer by reading a block from disk. */
511 static void
512 fill_buffer (struct casereader *reader)
513 {
514   int retval = full_read (reader->fd, reader->buffer,
515                           reader->cf->buffer_size * sizeof *reader->buffer);
516   if (retval < 0)
517     msg (FE, _("%s: Reading temporary file: %s."),
518          reader->cf->filename, strerror (errno));
519   else if (retval != reader->cf->buffer_size * sizeof *reader->buffer)
520     msg (FE, _("%s: Temporary file ended unexpectedly."),
521          reader->cf->filename); 
522 }
523
524 /* Returns the casefile that READER reads. */
525 const struct casefile *
526 casereader_get_casefile (const struct casereader *reader) 
527 {
528   assert (reader != NULL);
529   
530   return reader->cf;
531 }
532
533 /* Reads a copy of the next case from READER into C.
534    Caller is responsible for destroying C.
535    Returns true if successful, false at end of file. */
536 int
537 casereader_read (struct casereader *reader, struct ccase *c) 
538 {
539   assert (reader != NULL);
540   
541   if (reader->case_idx >= reader->cf->case_cnt) 
542     return 0;
543
544   if (reader->cf->storage == MEMORY) 
545     {
546       size_t block_idx = reader->case_idx / CASES_PER_BLOCK;
547       size_t case_idx = reader->case_idx % CASES_PER_BLOCK;
548
549       case_clone (c, &reader->cf->cases[block_idx][case_idx]);
550       reader->case_idx++;
551       return 1;
552     }
553   else 
554     {
555       if (reader->buffer_pos + reader->cf->value_cnt > reader->cf->buffer_size)
556         {
557           fill_buffer (reader);
558           reader->buffer_pos = 0;
559         }
560
561       case_from_values (&reader->c, reader->buffer + reader->buffer_pos,
562                         reader->cf->value_cnt);
563       reader->buffer_pos += reader->cf->value_cnt;
564       reader->case_idx++;
565
566       case_clone (c, &reader->c);
567       return 1;
568     }
569 }
570
571 /* Reads the next case from READER into C and transfers ownership
572    to the caller.  Caller is responsible for destroying C.
573    Returns true if successful, false at end of file. */
574 int
575 casereader_read_xfer (struct casereader *reader, struct ccase *c)
576 {
577   assert (reader != NULL);
578
579   if (reader->destructive == 0
580       || reader->case_idx >= reader->cf->case_cnt
581       || reader->cf->storage == DISK) 
582     return casereader_read (reader, c);
583   else 
584     {
585       size_t block_idx = reader->case_idx / CASES_PER_BLOCK;
586       size_t case_idx = reader->case_idx % CASES_PER_BLOCK;
587       struct ccase *read_case = &reader->cf->cases[block_idx][case_idx];
588
589       case_move (c, read_case);
590       reader->case_idx++;
591       return 1;
592     }
593 }
594
595 /* Reads the next case from READER into C and transfers ownership
596    to the caller.  Caller is responsible for destroying C.
597    Assert-fails at end of file. */
598 void
599 casereader_read_xfer_assert (struct casereader *reader, struct ccase *c) 
600 {
601   bool success = casereader_read_xfer (reader, c);
602   assert (success);
603 }
604
605 /* Destroys READER. */
606 void
607 casereader_destroy (struct casereader *reader)
608 {
609   assert (reader != NULL);
610
611   if (reader->next != NULL)
612     reader->next->prev = reader->prev;
613   if (reader->prev != NULL)
614     reader->prev->next = reader->next;
615   if (reader->cf->readers == reader)
616     reader->cf->readers = reader->next;
617
618   if (reader->cf->buffer == NULL)
619     reader->cf->buffer = reader->buffer;
620   else
621     free (reader->buffer);
622
623   if (reader->fd != -1) 
624     {
625       if (reader->cf->fd == -1)
626         reader->cf->fd = reader->fd;
627       else
628         safe_close (reader->fd);
629     }
630   
631   case_destroy (&reader->c);
632
633   free (reader);
634 }
635
636 /* Calls open(), passing FILENAME and FLAGS, repeating as necessary
637    to deal with interrupted calls. */
638 static int
639 safe_open (const char *filename, int flags) 
640 {
641   int fd;
642
643   do 
644     {
645       fd = open (filename, flags);
646     }
647   while (fd == -1 && errno == EINTR);
648
649   return fd;
650 }
651
652 /* Calls close(), passing FD, repeating as necessary to deal with
653    interrupted calls. */
654 static int safe_close (int fd) 
655 {
656   int retval;
657
658   do 
659     {
660       retval = close (fd);
661     }
662   while (retval == -1 && errno == EINTR);
663
664   return retval;
665 }
666
667 /* Calls read(), passing FD, BUFFER, and SIZE, repeating as
668    necessary to deal with interrupted calls. */
669 static int
670 full_read (int fd, void *buffer_, size_t size) 
671 {
672   char *buffer = buffer_;
673   size_t bytes_read = 0;
674   
675   while (bytes_read < size)
676     {
677       int retval = read (fd, buffer + bytes_read, size - bytes_read);
678       if (retval > 0) 
679         bytes_read += retval; 
680       else if (retval == 0) 
681         return bytes_read;
682       else if (errno != EINTR)
683         return -1;
684     }
685
686   return bytes_read;
687 }
688
689 /* Calls write(), passing FD, BUFFER, and SIZE, repeating as
690    necessary to deal with interrupted calls. */
691 static int
692 full_write (int fd, const void *buffer_, size_t size) 
693 {
694   const char *buffer = buffer_;
695   size_t bytes_written = 0;
696   
697   while (bytes_written < size)
698     {
699       int retval = write (fd, buffer + bytes_written, size - bytes_written);
700       if (retval >= 0) 
701         bytes_written += retval; 
702       else if (errno != EINTR)
703         return -1;
704     }
705
706   return bytes_written;
707 }
708
709
710 /* Registers our exit handler with atexit() if it has not already
711    been registered. */
712 static void
713 register_atexit (void) 
714 {
715   static int registered = 0;
716   if (!registered) 
717     {
718       registered = 1;
719       atexit (exit_handler);
720     }
721 }
722
723
724
725 /* atexit() handler that closes and deletes our temporary
726    files. */
727 static void
728 exit_handler (void) 
729 {
730   while (casefiles != NULL)
731     casefile_destroy (casefiles);
732 }
733 \f
734 #include <gsl/gsl_rng.h>
735 #include <stdarg.h>
736 #include "command.h"
737 #include "lexer.h"
738
739 static void test_casefile (int pattern, size_t value_cnt, size_t case_cnt);
740 static void get_random_case (struct ccase *, size_t value_cnt,
741                              size_t case_idx);
742 static void write_random_case (struct casefile *cf, size_t case_idx);
743 static void read_and_verify_random_case (struct casefile *cf,
744                                          struct casereader *reader,
745                                          size_t case_idx);
746 static void fail_test (const char *message, ...);
747
748 int
749 cmd_debug_casefile (void) 
750 {
751   static const size_t sizes[] =
752     {
753       1, 2, 3, 4, 5, 6, 7, 14, 15, 16, 17, 31, 55, 73,
754       100, 137, 257, 521, 1031, 2053
755     };
756   int size_max;
757   int case_max;
758   int pattern;
759
760   size_max = sizeof sizes / sizeof *sizes;
761   if (lex_match_id ("SMALL")) 
762     {
763       size_max -= 4;
764       case_max = 511; 
765     }
766   else
767     case_max = 4095;
768   if (token != '.')
769     return lex_end_of_command ();
770     
771   for (pattern = 0; pattern < 6; pattern++) 
772     {
773       const size_t *size;
774
775       for (size = sizes; size < sizes + size_max; size++) 
776         {
777           size_t case_cnt;
778
779           for (case_cnt = 0; case_cnt <= case_max;
780                case_cnt = (case_cnt * 2) + 1)
781             test_casefile (pattern, *size, case_cnt);
782         }
783     }
784   printf ("Casefile tests succeeded.\n");
785   return CMD_SUCCESS;
786 }
787
788 static void
789 test_casefile (int pattern, size_t value_cnt, size_t case_cnt) 
790 {
791   struct casefile *cf;
792   struct casereader *r1, *r2;
793   struct ccase c;
794   gsl_rng *rng;
795   size_t i, j;
796
797   rng = gsl_rng_alloc (gsl_rng_mt19937);
798   cf = casefile_create (value_cnt);
799   if (pattern == 5)
800     casefile_to_disk (cf);
801   for (i = 0; i < case_cnt; i++)
802     write_random_case (cf, i);
803   if (pattern == 5)
804     casefile_sleep (cf);
805   r1 = casefile_get_reader (cf);
806   r2 = casefile_get_reader (cf);
807   switch (pattern) 
808     {
809     case 0:
810     case 5:
811       for (i = 0; i < case_cnt; i++) 
812         {
813           read_and_verify_random_case (cf, r1, i);
814           read_and_verify_random_case (cf, r2, i);
815         } 
816       break;
817     case 1:
818       for (i = 0; i < case_cnt; i++)
819         read_and_verify_random_case (cf, r1, i);
820       for (i = 0; i < case_cnt; i++) 
821         read_and_verify_random_case (cf, r2, i);
822       break;
823     case 2:
824     case 3:
825     case 4:
826       for (i = j = 0; i < case_cnt; i++) 
827         {
828           read_and_verify_random_case (cf, r1, i);
829           if (gsl_rng_get (rng) % pattern == 0) 
830             read_and_verify_random_case (cf, r2, j++); 
831           if (i == case_cnt / 2)
832             casefile_to_disk (cf);
833         }
834       for (; j < case_cnt; j++) 
835         read_and_verify_random_case (cf, r2, j);
836       break;
837     }
838   if (casereader_read (r1, &c))
839     fail_test ("Casereader 1 not at end of file.");
840   if (casereader_read (r2, &c))
841     fail_test ("Casereader 2 not at end of file.");
842   if (pattern != 1)
843     casereader_destroy (r1);
844   if (pattern != 2)
845     casereader_destroy (r2);
846   if (pattern > 2) 
847     {
848       r1 = casefile_get_destructive_reader (cf);
849       for (i = 0; i < case_cnt; i++) 
850         {
851           struct ccase read_case, expected_case;
852           
853           get_random_case (&expected_case, value_cnt, i);
854           if (!casereader_read_xfer (r1, &read_case)) 
855             fail_test ("Premature end of casefile.");
856           for (j = 0; j < value_cnt; j++) 
857             {
858               double a = case_num (&read_case, j);
859               double b = case_num (&expected_case, j);
860               if (a != b)
861                 fail_test ("Case %lu fails comparison.", (unsigned long) i); 
862             }
863           case_destroy (&expected_case);
864           case_destroy (&read_case);
865         }
866       casereader_destroy (r1);
867     }
868   casefile_destroy (cf);
869   gsl_rng_free (rng);
870 }
871
872 static void
873 get_random_case (struct ccase *c, size_t value_cnt, size_t case_idx) 
874 {
875   int i;
876   case_create (c, value_cnt);
877   for (i = 0; i < value_cnt; i++)
878     case_data_rw (c, i)->f = case_idx % 257 + i;
879 }
880
881 static void
882 write_random_case (struct casefile *cf, size_t case_idx) 
883 {
884   struct ccase c;
885   get_random_case (&c, casefile_get_value_cnt (cf), case_idx);
886   casefile_append_xfer (cf, &c);
887 }
888
889 static void
890 read_and_verify_random_case (struct casefile *cf,
891                              struct casereader *reader, size_t case_idx) 
892 {
893   struct ccase read_case, expected_case;
894   size_t value_cnt;
895   size_t i;
896   
897   value_cnt = casefile_get_value_cnt (cf);
898   get_random_case (&expected_case, value_cnt, case_idx);
899   if (!casereader_read (reader, &read_case)) 
900     fail_test ("Premature end of casefile.");
901   for (i = 0; i < value_cnt; i++) 
902     {
903       double a = case_num (&read_case, i);
904       double b = case_num (&expected_case, i);
905       if (a != b)
906         fail_test ("Case %lu fails comparison.", (unsigned long) case_idx); 
907     }
908   case_destroy (&read_case);
909   case_destroy (&expected_case);
910 }
911
912 static void
913 fail_test (const char *message, ...) 
914 {
915   va_list args;
916
917   va_start (args, message);
918   vprintf (message, args);
919   putchar ('\n');
920   va_end (args);
921   
922   exit (1);
923 }