fdf033cbae672c7ae29135dd7d2cb62c8f0031a2
[pspp-builds.git] / src / casefile.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 2004 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include "casefile.h"
22 #include <assert.h>
23 #include <errno.h>
24 #include <fcntl.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <unistd.h>
29 #include "alloc.h"
30 #include "case.h"
31 #include "error.h"
32 #include "misc.h"
33 #include "settings.h"
34 #include "var.h"
35
36 #ifdef HAVE_VALGRIND_VALGRIND_H
37 #include <valgrind/valgrind.h>
38 #endif
39
40 #define IO_BUF_SIZE (8192 / sizeof (union value))
41
42 /* A casefile is a sequentially accessible array of immutable
43    cases.  It may be stored in memory or on disk as workspace
44    allows.  Cases may be appended to the end of the file.  Cases
45    may be read sequentially starting from the beginning of the
46    file.  Once any cases have been read, no more cases may be
47    appended.  The entire file is discarded at once. */
48
49 /* In-memory cases are arranged in an array of arrays.  The top
50    level is variable size and the size of each bottom level array
51    is fixed at the number of cases defined here.  */
52 #define CASES_PER_BLOCK 128             
53
54 /* A casefile. */
55 struct casefile 
56   {
57     /* Basic data. */
58     struct casefile *next, *prev;       /* Next, prev in global list. */
59     size_t value_cnt;                   /* Case size in `union value's. */
60     size_t case_acct_size;              /* Case size for accounting. */
61     unsigned long case_cnt;             /* Number of cases stored. */
62     enum { MEMORY, DISK } storage;      /* Where cases are stored. */
63     enum { WRITE, READ } mode;          /* Is writing or reading allowed? */
64     struct casereader *readers;         /* List of our readers. */
65     int being_destroyed;                /* Does a destructive reader exist? */
66
67     /* Memory storage. */
68     struct ccase **cases;               /* Pointer to array of cases. */
69
70     /* Disk storage. */
71     int fd;                             /* File descriptor, -1 if none. */
72     char *filename;                     /* Filename. */
73     union value *buffer;                /* I/O buffer, NULL if none. */
74     size_t buffer_used;                 /* Number of values used in buffer. */
75     size_t buffer_size;                 /* Buffer size in values. */
76   };
77
78 /* For reading out the cases in a casefile. */
79 struct casereader 
80   {
81     struct casereader *next, *prev;     /* Next, prev in casefile's list. */
82     struct casefile *cf;                /* Our casefile. */
83     unsigned long case_idx;             /* Case number of current case. */
84     int destructive;                    /* Is this a destructive reader? */
85
86     /* Disk storage. */
87     int fd;                             /* File descriptor. */
88     union value *buffer;                /* I/O buffer. */
89     size_t buffer_pos;                  /* Offset of buffer position. */
90     struct ccase c;                     /* Current case. */
91   };
92
93 /* Return the case number of the current case */
94 unsigned long
95 casereader_cnum(const struct casereader *r)
96 {
97   return r->case_idx;
98 }
99
100 /* Doubly linked list of all casefiles. */
101 static struct casefile *casefiles;
102
103 /* Number of bytes of case allocated in in-memory casefiles. */
104 static size_t case_bytes;
105
106 static void register_atexit (void);
107 static void exit_handler (void);
108
109 static void reader_open_file (struct casereader *reader);
110 static void write_case_to_disk (struct casefile *cf, const struct ccase *c);
111 static void flush_buffer (struct casefile *cf);
112 static void fill_buffer (struct casereader *reader);
113
114 static int safe_open (const char *filename, int flags);
115 static int safe_close (int fd);
116 static int full_read (int fd, void *buffer, size_t size);
117 static int full_write (int fd, const void *buffer, size_t size);
118
119 /* Creates and returns a casefile to store cases of VALUE_CNT
120    `union value's each. */
121 struct casefile *
122 casefile_create (size_t value_cnt) 
123 {
124   struct casefile *cf = xmalloc (sizeof *cf);
125   cf->next = casefiles;
126   cf->prev = NULL;
127   if (cf->next != NULL)
128     cf->next->prev = cf;
129   casefiles = cf;
130   cf->value_cnt = value_cnt;
131   cf->case_acct_size = (cf->value_cnt + 4) * sizeof *cf->buffer;
132   cf->case_cnt = 0;
133   cf->storage = MEMORY;
134   cf->mode = WRITE;
135   cf->readers = NULL;
136   cf->being_destroyed = 0;
137   cf->cases = NULL;
138   cf->fd = -1;
139   cf->filename = NULL;
140   cf->buffer = NULL;
141   cf->buffer_size = ROUND_UP (cf->value_cnt, IO_BUF_SIZE);
142   if (cf->value_cnt > 0 && cf->buffer_size % cf->value_cnt > 64)
143     cf->buffer_size = cf->value_cnt;
144   cf->buffer_used = 0;
145   register_atexit ();
146   return cf;
147 }
148
149 /* Destroys casefile CF. */
150 void
151 casefile_destroy (struct casefile *cf) 
152 {
153   if (cf != NULL) 
154     {
155       if (cf->next != NULL)
156         cf->next->prev = cf->prev;
157       if (cf->prev != NULL)
158         cf->prev->next = cf->next;
159       if (casefiles == cf)
160         casefiles = cf->next;
161
162       while (cf->readers != NULL) 
163         casereader_destroy (cf->readers);
164
165       if (cf->cases != NULL) 
166         {
167           size_t idx, block_cnt;
168
169           case_bytes -= cf->case_cnt * cf->case_acct_size;
170           for (idx = 0; idx < cf->case_cnt; idx++)
171             {
172               size_t block_idx = idx / CASES_PER_BLOCK;
173               size_t case_idx = idx % CASES_PER_BLOCK;
174               struct ccase *c = &cf->cases[block_idx][case_idx];
175               case_destroy (c);
176             }
177
178           block_cnt = DIV_RND_UP (cf->case_cnt, CASES_PER_BLOCK);
179           for (idx = 0; idx < block_cnt; idx++)
180             free (cf->cases[idx]);
181
182           free (cf->cases);
183         }
184
185       if (cf->fd != -1)
186         safe_close (cf->fd);
187           
188       if (cf->filename != NULL && remove (cf->filename) == -1) 
189         msg (ME, _("%s: Removing temporary file: %s."),
190              cf->filename, strerror (errno));
191       free (cf->filename);
192
193       free (cf->buffer);
194
195       free (cf);
196     }
197 }
198
199 /* Returns nonzero only if casefile CF is stored in memory (instead of on
200    disk). */
201 int
202 casefile_in_core (const struct casefile *cf) 
203 {
204   assert (cf != NULL);
205
206   return cf->storage == MEMORY;
207 }
208
209 /* Puts a casefile to "sleep", that is, minimizes the resources
210    needed for it by closing its file descriptor and freeing its
211    buffer.  This is useful if we need so many casefiles that we
212    might not have enough memory and file descriptors to go
213    around.
214
215    For simplicity, this implementation always converts the
216    casefile to reader mode.  If this turns out to be a problem,
217    with a little extra work we could also support sleeping
218    writers. */
219 void
220 casefile_sleep (const struct casefile *cf_) 
221 {
222   struct casefile *cf = (struct casefile *) cf_;
223   assert (cf != NULL);
224
225   casefile_mode_reader (cf);
226   casefile_to_disk (cf);
227   flush_buffer (cf);
228
229   if (cf->fd != -1) 
230     {
231       safe_close (cf->fd);
232       cf->fd = -1;
233     }
234   if (cf->buffer != NULL) 
235     {
236       free (cf->buffer);
237       cf->buffer = NULL;
238     }
239 }
240
241 /* Returns the number of `union value's in a case for CF. */
242 size_t
243 casefile_get_value_cnt (const struct casefile *cf) 
244 {
245   assert (cf != NULL);
246
247   return cf->value_cnt;
248 }
249
250 /* Returns the number of cases in casefile CF. */
251 unsigned long
252 casefile_get_case_cnt (const struct casefile *cf) 
253 {
254   assert (cf != NULL);
255
256   return cf->case_cnt;
257 }
258
259 /* Appends a copy of case C to casefile CF.  Not valid after any
260    reader for CF has been created. */
261 void
262 casefile_append (struct casefile *cf, const struct ccase *c) 
263 {
264   assert (cf != NULL);
265   assert (c != NULL);
266   assert (cf->mode == WRITE);
267
268   /* Try memory first. */
269   if (cf->storage == MEMORY) 
270     {
271       if (case_bytes < get_max_workspace ())
272         {
273           size_t block_idx = cf->case_cnt / CASES_PER_BLOCK;
274           size_t case_idx = cf->case_cnt % CASES_PER_BLOCK;
275           struct ccase new_case;
276
277           case_bytes += cf->case_acct_size;
278           case_clone (&new_case, c);
279           if (case_idx == 0) 
280             {
281               if ((block_idx & (block_idx - 1)) == 0) 
282                 {
283                   size_t block_cap = block_idx == 0 ? 1 : block_idx * 2;
284                   cf->cases = xrealloc (cf->cases,
285                                         sizeof *cf->cases * block_cap);
286                 }
287
288               cf->cases[block_idx] = xmalloc (sizeof **cf->cases
289                                               * CASES_PER_BLOCK);
290             }
291
292           case_move (&cf->cases[block_idx][case_idx], &new_case);
293         }
294       else
295         {
296           casefile_to_disk (cf);
297           assert (cf->storage == DISK);
298           write_case_to_disk (cf, c);
299         }
300     }
301   else
302     write_case_to_disk (cf, c);
303
304   cf->case_cnt++;
305 }
306
307 /* Appends case C to casefile CF, which takes over ownership of
308    C.  Not valid after any reader for CF has been created. */
309 void
310 casefile_append_xfer (struct casefile *cf, struct ccase *c) 
311 {
312   casefile_append (cf, c);
313   case_destroy (c);
314 }
315
316 /* Writes case C to casefile CF's disk buffer, first flushing the buffer to
317    disk if it would otherwise overflow. */
318 static void
319 write_case_to_disk (struct casefile *cf, const struct ccase *c) 
320 {
321   case_to_values (c, cf->buffer + cf->buffer_used, cf->value_cnt);
322   cf->buffer_used += cf->value_cnt;
323   if (cf->buffer_used + cf->value_cnt > cf->buffer_size)
324     flush_buffer (cf);
325 }
326
327 /* If any bytes in CF's output buffer are used, flush them to
328    disk. */
329 static void
330 flush_buffer (struct casefile *cf) 
331 {
332   if (cf->buffer_used > 0) 
333     {
334       if (!full_write (cf->fd, cf->buffer,
335                        cf->buffer_size * sizeof *cf->buffer)) 
336         msg (FE, _("Error writing temporary file: %s."), strerror (errno));
337
338       cf->buffer_used = 0;
339     } 
340 }
341
342
343 /* If CF is currently stored in memory, writes it to disk.  Readers, if any,
344    retain their current positions. */
345 void
346 casefile_to_disk (const struct casefile *cf_) 
347 {
348   struct casefile *cf = (struct casefile *) cf_;
349   struct casereader *reader;
350   
351   assert (cf != NULL);
352
353   if (cf->storage == MEMORY)
354     {
355       size_t idx, block_cnt;
356       
357       assert (cf->filename == NULL);
358       assert (cf->fd == -1);
359       assert (cf->buffer_used == 0);
360
361       cf->storage = DISK;
362       if (!make_temp_file (&cf->fd, &cf->filename))
363         err_failure ();
364       cf->buffer = xmalloc (cf->buffer_size * sizeof *cf->buffer);
365       memset (cf->buffer, 0, cf->buffer_size * sizeof *cf->buffer);
366
367       case_bytes -= cf->case_cnt * cf->case_acct_size;
368       for (idx = 0; idx < cf->case_cnt; idx++)
369         {
370           size_t block_idx = idx / CASES_PER_BLOCK;
371           size_t case_idx = idx % CASES_PER_BLOCK;
372           struct ccase *c = &cf->cases[block_idx][case_idx];
373           write_case_to_disk (cf, c);
374           case_destroy (c);
375         }
376
377       block_cnt = DIV_RND_UP (cf->case_cnt, CASES_PER_BLOCK);
378       for (idx = 0; idx < block_cnt; idx++)
379         free (cf->cases[idx]);
380
381       free (cf->cases);
382       cf->cases = NULL;
383
384       if (cf->mode == READ)
385         flush_buffer (cf);
386
387       for (reader = cf->readers; reader != NULL; reader = reader->next)
388         reader_open_file (reader);
389     }
390 }
391
392 /* Changes CF to reader mode, ensuring that no more cases may be
393    added.  Creating a casereader for CF has the same effect. */
394 void
395 casefile_mode_reader (struct casefile *cf) 
396 {
397   assert (cf != NULL);
398   cf->mode = READ;
399 }
400
401 /* Creates and returns a casereader for CF.  A casereader can be used to
402    sequentially read the cases in a casefile. */
403 struct casereader *
404 casefile_get_reader (const struct casefile *cf_) 
405 {
406   struct casefile *cf = (struct casefile *) cf_;
407   struct casereader *reader;
408
409   assert (cf != NULL);
410   assert (!cf->being_destroyed);
411
412   /* Flush the buffer to disk if it's not empty. */
413   if (cf->mode == WRITE && cf->storage == DISK)
414     flush_buffer (cf);
415   
416   cf->mode = READ;
417
418   reader = xmalloc (sizeof *reader);
419   reader->next = cf->readers;
420   if (cf->readers != NULL)
421     reader->next->prev = reader;
422   cf->readers = reader;
423   reader->prev = NULL;
424   reader->cf = cf;
425   reader->case_idx = 0;
426   reader->destructive = 0;
427   reader->fd = -1;
428   reader->buffer = NULL;
429   reader->buffer_pos = 0;
430   case_nullify (&reader->c);
431
432   if (reader->cf->storage == DISK) 
433     reader_open_file (reader);
434
435   return reader;
436 }
437
438 /* Creates and returns a destructive casereader for CF.  Like a
439    normal casereader, a destructive casereader sequentially reads
440    the cases in a casefile.  Unlike a normal casereader, a
441    destructive reader cannot operate concurrently with any other
442    reader.  (This restriction could be relaxed in a few ways, but
443    it is so far unnecessary for other code.) */
444 struct casereader *
445 casefile_get_destructive_reader (struct casefile *cf) 
446 {
447   struct casereader *reader;
448   
449   assert (cf->readers == NULL);
450   reader = casefile_get_reader (cf);
451   reader->destructive = 1;
452   cf->being_destroyed = 1;
453   return reader;
454 }
455
456 /* Opens a disk file for READER and seeks to the current position as indicated
457    by case_idx.  Normally the current position is the beginning of the file,
458    but casefile_to_disk may cause the file to be opened at a different
459    position. */
460 static void
461 reader_open_file (struct casereader *reader) 
462 {
463   struct casefile *cf = reader->cf;
464   off_t file_ofs;
465
466   if (reader->case_idx >= cf->case_cnt)
467     return;
468
469   if (cf->fd != -1) 
470     {
471       reader->fd = cf->fd;
472       cf->fd = -1;
473     }
474   else 
475     {
476       reader->fd = safe_open (cf->filename, O_RDONLY);
477       if (reader->fd < 0)
478         msg (FE, _("%s: Opening temporary file: %s."),
479              cf->filename, strerror (errno));
480     }
481
482   if (cf->buffer != NULL) 
483     {
484       reader->buffer = cf->buffer;
485       cf->buffer = NULL; 
486     }
487   else 
488     {
489       reader->buffer = xmalloc (cf->buffer_size * sizeof *cf->buffer);
490       memset (reader->buffer, 0, cf->buffer_size * sizeof *cf->buffer); 
491     }
492
493   if (cf->value_cnt != 0) 
494     {
495       size_t buffer_case_cnt = cf->buffer_size / cf->value_cnt;
496       file_ofs = ((off_t) reader->case_idx / buffer_case_cnt
497                   * cf->buffer_size * sizeof *cf->buffer);
498       reader->buffer_pos = (reader->case_idx % buffer_case_cnt
499                             * cf->value_cnt);
500     }
501   else 
502     file_ofs = 0;
503   if (lseek (reader->fd, file_ofs, SEEK_SET) != file_ofs)
504     msg (FE, _("%s: Seeking temporary file: %s."),
505          cf->filename, strerror (errno));
506
507   if (cf->case_cnt > 0 && cf->value_cnt > 0)
508     fill_buffer (reader);
509
510   case_create (&reader->c, cf->value_cnt);
511 }
512
513 /* Fills READER's buffer by reading a block from disk. */
514 static void
515 fill_buffer (struct casereader *reader)
516 {
517   int retval = full_read (reader->fd, reader->buffer,
518                           reader->cf->buffer_size * sizeof *reader->buffer);
519   if (retval < 0)
520     msg (FE, _("%s: Reading temporary file: %s."),
521          reader->cf->filename, strerror (errno));
522   else if (retval != reader->cf->buffer_size * sizeof *reader->buffer)
523     msg (FE, _("%s: Temporary file ended unexpectedly."),
524          reader->cf->filename); 
525 }
526
527 /* Returns the casefile that READER reads. */
528 const struct casefile *
529 casereader_get_casefile (const struct casereader *reader) 
530 {
531   assert (reader != NULL);
532   
533   return reader->cf;
534 }
535
536 /* Reads a copy of the next case from READER into C.
537    Caller is responsible for destroying C. */
538 int
539 casereader_read (struct casereader *reader, struct ccase *c) 
540 {
541   assert (reader != NULL);
542   
543   if (reader->case_idx >= reader->cf->case_cnt) 
544     return 0;
545
546   if (reader->cf->storage == MEMORY) 
547     {
548       size_t block_idx = reader->case_idx / CASES_PER_BLOCK;
549       size_t case_idx = reader->case_idx % CASES_PER_BLOCK;
550
551       case_clone (c, &reader->cf->cases[block_idx][case_idx]);
552       reader->case_idx++;
553       return 1;
554     }
555   else 
556     {
557       if (reader->buffer_pos + reader->cf->value_cnt > reader->cf->buffer_size)
558         {
559           fill_buffer (reader);
560           reader->buffer_pos = 0;
561         }
562
563       case_from_values (&reader->c, reader->buffer + reader->buffer_pos,
564                         reader->cf->value_cnt);
565       reader->buffer_pos += reader->cf->value_cnt;
566       reader->case_idx++;
567
568       case_clone (c, &reader->c);
569       return 1;
570     }
571 }
572
573 /* Reads the next case from READER into C and transfers ownership
574    to the caller.  Caller is responsible for destroying C. */
575 int
576 casereader_read_xfer (struct casereader *reader, struct ccase *c)
577 {
578   assert (reader != NULL);
579
580   if (reader->destructive == 0
581       || reader->case_idx >= reader->cf->case_cnt
582       || reader->cf->storage == DISK) 
583     return casereader_read (reader, c);
584   else 
585     {
586       size_t block_idx = reader->case_idx / CASES_PER_BLOCK;
587       size_t case_idx = reader->case_idx % CASES_PER_BLOCK;
588       struct ccase *read_case = &reader->cf->cases[block_idx][case_idx];
589
590       case_move (c, read_case);
591       reader->case_idx++;
592       return 1;
593     }
594 }
595
596 /* Destroys READER. */
597 void
598 casereader_destroy (struct casereader *reader)
599 {
600   assert (reader != NULL);
601
602   if (reader->next != NULL)
603     reader->next->prev = reader->prev;
604   if (reader->prev != NULL)
605     reader->prev->next = reader->next;
606   if (reader->cf->readers == reader)
607     reader->cf->readers = reader->next;
608
609   if (reader->cf->buffer == NULL)
610     reader->cf->buffer = reader->buffer;
611   else
612     free (reader->buffer);
613
614   if (reader->fd != -1) 
615     {
616       if (reader->cf->fd == -1)
617         reader->cf->fd = reader->fd;
618       else
619         safe_close (reader->fd);
620     }
621   
622   case_destroy (&reader->c);
623
624   free (reader);
625 }
626
627 /* Calls open(), passing FILENAME and FLAGS, repeating as necessary
628    to deal with interrupted calls. */
629 static int
630 safe_open (const char *filename, int flags) 
631 {
632   int fd;
633
634   do 
635     {
636       fd = open (filename, flags);
637     }
638   while (fd == -1 && errno == EINTR);
639
640   return fd;
641 }
642
643 /* Calls close(), passing FD, repeating as necessary to deal with
644    interrupted calls. */
645 static int safe_close (int fd) 
646 {
647   int retval;
648
649   do 
650     {
651       retval = close (fd);
652     }
653   while (retval == -1 && errno == EINTR);
654
655   return retval;
656 }
657
658 /* Calls read(), passing FD, BUFFER, and SIZE, repeating as
659    necessary to deal with interrupted calls. */
660 static int
661 full_read (int fd, void *buffer_, size_t size) 
662 {
663   char *buffer = buffer_;
664   size_t bytes_read = 0;
665   
666   while (bytes_read < size)
667     {
668       int retval = read (fd, buffer + bytes_read, size - bytes_read);
669       if (retval > 0) 
670         bytes_read += retval; 
671       else if (retval == 0) 
672         return bytes_read;
673       else if (errno != EINTR)
674         return -1;
675     }
676
677   return bytes_read;
678 }
679
680 /* Calls write(), passing FD, BUFFER, and SIZE, repeating as
681    necessary to deal with interrupted calls. */
682 static int
683 full_write (int fd, const void *buffer_, size_t size) 
684 {
685   const char *buffer = buffer_;
686   size_t bytes_written = 0;
687   
688   while (bytes_written < size)
689     {
690       int retval = write (fd, buffer + bytes_written, size - bytes_written);
691       if (retval >= 0) 
692         bytes_written += retval; 
693       else if (errno != EINTR)
694         return -1;
695     }
696
697   return bytes_written;
698 }
699
700
701 /* Registers our exit handler with atexit() if it has not already
702    been registered. */
703 static void
704 register_atexit (void) 
705 {
706   static int registered = 0;
707   if (!registered) 
708     {
709       registered = 1;
710       atexit (exit_handler);
711     }
712 }
713
714
715
716 /* atexit() handler that closes and deletes our temporary
717    files. */
718 static void
719 exit_handler (void) 
720 {
721   while (casefiles != NULL)
722     casefile_destroy (casefiles);
723 }
724 \f
725 #include <gsl/gsl_rng.h>
726 #include <stdarg.h>
727 #include "command.h"
728 #include "lexer.h"
729
730 static void test_casefile (int pattern, size_t value_cnt, size_t case_cnt);
731 static void get_random_case (struct ccase *, size_t value_cnt,
732                              size_t case_idx);
733 static void write_random_case (struct casefile *cf, size_t case_idx);
734 static void read_and_verify_random_case (struct casefile *cf,
735                                          struct casereader *reader,
736                                          size_t case_idx);
737 static void fail_test (const char *message, ...);
738
739 int
740 cmd_debug_casefile (void) 
741 {
742   static const size_t sizes[] =
743     {
744       1, 2, 3, 4, 5, 6, 7, 14, 15, 16, 17, 31, 55, 73,
745       100, 137, 257, 521, 1031, 2053
746     };
747   int size_max;
748   int case_max;
749   int pattern;
750
751   size_max = sizeof sizes / sizeof *sizes;
752   if (lex_match_id ("SMALL")) 
753     {
754       size_max -= 4;
755       case_max = 511; 
756     }
757   else
758     case_max = 4095;
759   if (token != '.')
760     return lex_end_of_command ();
761     
762   for (pattern = 0; pattern < 6; pattern++) 
763     {
764       const size_t *size;
765
766       for (size = sizes; size < sizes + size_max; size++) 
767         {
768           size_t case_cnt;
769
770           for (case_cnt = 0; case_cnt <= case_max;
771                case_cnt = (case_cnt * 2) + 1)
772             test_casefile (pattern, *size, case_cnt);
773         }
774     }
775   printf ("Casefile tests succeeded.\n");
776   return CMD_SUCCESS;
777 }
778
779 static void
780 test_casefile (int pattern, size_t value_cnt, size_t case_cnt) 
781 {
782   struct casefile *cf;
783   struct casereader *r1, *r2;
784   struct ccase c;
785   gsl_rng *rng;
786   size_t i, j;
787
788   rng = gsl_rng_alloc (gsl_rng_mt19937);
789   cf = casefile_create (value_cnt);
790   if (pattern == 5)
791     casefile_to_disk (cf);
792   for (i = 0; i < case_cnt; i++)
793     write_random_case (cf, i);
794   if (pattern == 5)
795     casefile_sleep (cf);
796   r1 = casefile_get_reader (cf);
797   r2 = casefile_get_reader (cf);
798   switch (pattern) 
799     {
800     case 0:
801     case 5:
802       for (i = 0; i < case_cnt; i++) 
803         {
804           read_and_verify_random_case (cf, r1, i);
805           read_and_verify_random_case (cf, r2, i);
806         } 
807       break;
808     case 1:
809       for (i = 0; i < case_cnt; i++)
810         read_and_verify_random_case (cf, r1, i);
811       for (i = 0; i < case_cnt; i++) 
812         read_and_verify_random_case (cf, r2, i);
813       break;
814     case 2:
815     case 3:
816     case 4:
817       for (i = j = 0; i < case_cnt; i++) 
818         {
819           read_and_verify_random_case (cf, r1, i);
820           if (gsl_rng_get (rng) % pattern == 0) 
821             read_and_verify_random_case (cf, r2, j++); 
822           if (i == case_cnt / 2)
823             casefile_to_disk (cf);
824         }
825       for (; j < case_cnt; j++) 
826         read_and_verify_random_case (cf, r2, j);
827       break;
828     }
829   if (casereader_read (r1, &c))
830     fail_test ("Casereader 1 not at end of file.");
831   if (casereader_read (r2, &c))
832     fail_test ("Casereader 2 not at end of file.");
833   if (pattern != 1)
834     casereader_destroy (r1);
835   if (pattern != 2)
836     casereader_destroy (r2);
837   if (pattern > 2) 
838     {
839       r1 = casefile_get_destructive_reader (cf);
840       for (i = 0; i < case_cnt; i++) 
841         {
842           struct ccase read_case, expected_case;
843           
844           get_random_case (&expected_case, value_cnt, i);
845           if (!casereader_read_xfer (r1, &read_case)) 
846             fail_test ("Premature end of casefile.");
847           for (j = 0; j < value_cnt; j++) 
848             {
849               double a = case_num (&read_case, j);
850               double b = case_num (&expected_case, j);
851               if (a != b)
852                 fail_test ("Case %lu fails comparison.", (unsigned long) i); 
853             }
854           case_destroy (&expected_case);
855           case_destroy (&read_case);
856         }
857       casereader_destroy (r1);
858     }
859   casefile_destroy (cf);
860   gsl_rng_free (rng);
861 }
862
863 static void
864 get_random_case (struct ccase *c, size_t value_cnt, size_t case_idx) 
865 {
866   int i;
867   case_create (c, value_cnt);
868   for (i = 0; i < value_cnt; i++)
869     case_data_rw (c, i)->f = case_idx % 257 + i;
870 }
871
872 static void
873 write_random_case (struct casefile *cf, size_t case_idx) 
874 {
875   struct ccase c;
876   get_random_case (&c, casefile_get_value_cnt (cf), case_idx);
877   casefile_append_xfer (cf, &c);
878 }
879
880 static void
881 read_and_verify_random_case (struct casefile *cf,
882                              struct casereader *reader, size_t case_idx) 
883 {
884   struct ccase read_case, expected_case;
885   size_t value_cnt;
886   size_t i;
887   
888   value_cnt = casefile_get_value_cnt (cf);
889   get_random_case (&expected_case, value_cnt, case_idx);
890   if (!casereader_read (reader, &read_case)) 
891     fail_test ("Premature end of casefile.");
892   for (i = 0; i < value_cnt; i++) 
893     {
894       double a = case_num (&read_case, i);
895       double b = case_num (&expected_case, i);
896       if (a != b)
897         fail_test ("Case %lu fails comparison.", (unsigned long) case_idx); 
898     }
899   case_destroy (&read_case);
900   case_destroy (&expected_case);
901 }
902
903 static void
904 fail_test (const char *message, ...) 
905 {
906   va_list args;
907
908   va_start (args, message);
909   vprintf (message, args);
910   putchar ('\n');
911   va_end (args);
912   
913   exit (1);
914 }