Adopt use of gnulib for portability.
[pspp-builds.git] / src / casefile.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 2004 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21 #include "casefile.h"
22 #include <assert.h>
23 #include <errno.h>
24 #include <fcntl.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <unistd.h>
29 #include "alloc.h"
30 #include "case.h"
31 #include "error.h"
32 #include "full-read.h"
33 #include "full-write.h"
34 #include "misc.h"
35 #include "mkfile.h"
36 #include "settings.h"
37 #include "var.h"
38
39 #include "gettext.h"
40 #define _(msgid) gettext (msgid)
41
42 #define IO_BUF_SIZE (8192 / sizeof (union value))
43
44 /* A casefile represents a sequentially accessible stream of
45    immutable cases.
46
47    If workspace allows, a casefile is maintained in memory.  If
48    workspace overflows, then the casefile is pushed to disk.  In
49    either case the interface presented to callers is kept the
50    same.
51
52    The life cycle of a casefile consists of up to three phases:
53
54        1. Writing.  The casefile initially contains no cases.  In
55           this phase, any number of cases may be appended to the
56           end of a casefile.  (Cases are never inserted in the
57           middle or before the beginning of a casefile.)
58
59           Use casefile_append() or casefile_append_xfer() to
60           append a case to a casefile.
61
62        2. Reading.  The casefile may be read sequentially,
63           starting from the beginning, by "casereaders".  Any
64           number of casereaders may be created, at any time,
65           during the reading phase.  Each casereader has an
66           independent position in the casefile.
67
68           Casereaders may only move forward.  They cannot move
69           backward to arbitrary records or seek randomly.
70           Cloning casereaders is possible, but it is not yet
71           implemented.
72
73           Use casefile_get_reader() to create a casereader for
74           use in phase 2.  This also transitions from phase 1 to
75           phase 2.  Calling casefile_mode_reader() makes the same
76           transition, without creating a casereader.
77
78           Use casereader_read(), casereader_read_xfer(), or
79           casereader_read_xfer_assert() to read a case from a
80           casereader.  Use casereader_destroy() to discard a
81           casereader when it is no longer needed.
82
83        3. Destruction.  This phase is optional.  The casefile is
84           also read with casereaders in this phase, but the
85           ability to create new casereaders is curtailed.
86
87           In this phase, casereaders could still be cloned (once
88           we eventually implement cloning).
89
90           To transition from phase 1 or 2 to phase 3 and create a
91           casereader, call casefile_get_destructive_reader().
92           The same functions apply to the casereader obtained
93           this way as apply to casereaders obtained in phase 2.
94           
95           After casefile_get_destructive_reader() is called, no
96           more casereaders may be created with
97           casefile_get_reader() or
98           casefile_get_destructive_reader().  (If cloning of
99           casereaders were implemented, it would still be
100           possible.)
101
102           The purpose of the limitations applied to casereaders
103           in phase 3 is to allow in-memory casefiles to fully
104           transfer ownership of cases to the casereaders,
105           avoiding the need for extra copies of case data.  For
106           relatively static data sets with many variables, I
107           suspect (without evidence) that this may be a big
108           performance boost.
109
110    When a casefile is no longer needed, it may be destroyed with
111    casefile_destroy().  This function will also destroy any
112    remaining casereaders. */
113
114 /* In-memory cases are arranged in an array of arrays.  The top
115    level is variable size and the size of each bottom level array
116    is fixed at the number of cases defined here.  */
117 #define CASES_PER_BLOCK 128             
118
119 /* A casefile. */
120 struct casefile 
121   {
122     /* Basic data. */
123     struct casefile *next, *prev;       /* Next, prev in global list. */
124     size_t value_cnt;                   /* Case size in `union value's. */
125     size_t case_acct_size;              /* Case size for accounting. */
126     unsigned long case_cnt;             /* Number of cases stored. */
127     enum { MEMORY, DISK } storage;      /* Where cases are stored. */
128     enum { WRITE, READ } mode;          /* Is writing or reading allowed? */
129     struct casereader *readers;         /* List of our readers. */
130     int being_destroyed;                /* Does a destructive reader exist? */
131
132     /* Memory storage. */
133     struct ccase **cases;               /* Pointer to array of cases. */
134
135     /* Disk storage. */
136     int fd;                             /* File descriptor, -1 if none. */
137     char *filename;                     /* Filename. */
138     union value *buffer;                /* I/O buffer, NULL if none. */
139     size_t buffer_used;                 /* Number of values used in buffer. */
140     size_t buffer_size;                 /* Buffer size in values. */
141   };
142
143 /* For reading out the cases in a casefile. */
144 struct casereader 
145   {
146     struct casereader *next, *prev;     /* Next, prev in casefile's list. */
147     struct casefile *cf;                /* Our casefile. */
148     unsigned long case_idx;             /* Case number of current case. */
149     int destructive;                    /* Is this a destructive reader? */
150
151     /* Disk storage. */
152     int fd;                             /* File descriptor. */
153     union value *buffer;                /* I/O buffer. */
154     size_t buffer_pos;                  /* Offset of buffer position. */
155     struct ccase c;                     /* Current case. */
156   };
157
158 /* Return the case number of the current case */
159 unsigned long
160 casereader_cnum(const struct casereader *r)
161 {
162   return r->case_idx;
163 }
164
165 /* Doubly linked list of all casefiles. */
166 static struct casefile *casefiles;
167
168 /* Number of bytes of case allocated in in-memory casefiles. */
169 static size_t case_bytes;
170
171 static void register_atexit (void);
172 static void exit_handler (void);
173
174 static void reader_open_file (struct casereader *reader);
175 static void write_case_to_disk (struct casefile *cf, const struct ccase *c);
176 static void flush_buffer (struct casefile *cf);
177 static void fill_buffer (struct casereader *reader);
178
179 static int safe_open (const char *filename, int flags);
180 static int safe_close (int fd);
181
182 /* Creates and returns a casefile to store cases of VALUE_CNT
183    `union value's each. */
184 struct casefile *
185 casefile_create (size_t value_cnt) 
186 {
187   struct casefile *cf = xmalloc (sizeof *cf);
188   cf->next = casefiles;
189   cf->prev = NULL;
190   if (cf->next != NULL)
191     cf->next->prev = cf;
192   casefiles = cf;
193   cf->value_cnt = value_cnt;
194   cf->case_acct_size = (cf->value_cnt + 4) * sizeof *cf->buffer;
195   cf->case_cnt = 0;
196   cf->storage = MEMORY;
197   cf->mode = WRITE;
198   cf->readers = NULL;
199   cf->being_destroyed = 0;
200   cf->cases = NULL;
201   cf->fd = -1;
202   cf->filename = NULL;
203   cf->buffer = NULL;
204   cf->buffer_size = ROUND_UP (cf->value_cnt, IO_BUF_SIZE);
205   if (cf->value_cnt > 0 && cf->buffer_size % cf->value_cnt > 64)
206     cf->buffer_size = cf->value_cnt;
207   cf->buffer_used = 0;
208   register_atexit ();
209   return cf;
210 }
211
212 /* Destroys casefile CF. */
213 void
214 casefile_destroy (struct casefile *cf) 
215 {
216   if (cf != NULL) 
217     {
218       if (cf->next != NULL)
219         cf->next->prev = cf->prev;
220       if (cf->prev != NULL)
221         cf->prev->next = cf->next;
222       if (casefiles == cf)
223         casefiles = cf->next;
224
225       while (cf->readers != NULL) 
226         casereader_destroy (cf->readers);
227
228       if (cf->cases != NULL) 
229         {
230           size_t idx, block_cnt;
231
232           case_bytes -= cf->case_cnt * cf->case_acct_size;
233           for (idx = 0; idx < cf->case_cnt; idx++)
234             {
235               size_t block_idx = idx / CASES_PER_BLOCK;
236               size_t case_idx = idx % CASES_PER_BLOCK;
237               struct ccase *c = &cf->cases[block_idx][case_idx];
238               case_destroy (c);
239             }
240
241           block_cnt = DIV_RND_UP (cf->case_cnt, CASES_PER_BLOCK);
242           for (idx = 0; idx < block_cnt; idx++)
243             free (cf->cases[idx]);
244
245           free (cf->cases);
246         }
247
248       if (cf->fd != -1)
249         safe_close (cf->fd);
250           
251       if (cf->filename != NULL && remove (cf->filename) == -1) 
252         msg (ME, _("%s: Removing temporary file: %s."),
253              cf->filename, strerror (errno));
254       free (cf->filename);
255
256       free (cf->buffer);
257
258       free (cf);
259     }
260 }
261
262 /* Returns nonzero only if casefile CF is stored in memory (instead of on
263    disk). */
264 int
265 casefile_in_core (const struct casefile *cf) 
266 {
267   assert (cf != NULL);
268
269   return cf->storage == MEMORY;
270 }
271
272 /* Puts a casefile to "sleep", that is, minimizes the resources
273    needed for it by closing its file descriptor and freeing its
274    buffer.  This is useful if we need so many casefiles that we
275    might not have enough memory and file descriptors to go
276    around.
277
278    For simplicity, this implementation always converts the
279    casefile to reader mode.  If this turns out to be a problem,
280    with a little extra work we could also support sleeping
281    writers. */
282 void
283 casefile_sleep (const struct casefile *cf_) 
284 {
285   struct casefile *cf = (struct casefile *) cf_;
286   assert (cf != NULL);
287
288   casefile_mode_reader (cf);
289   casefile_to_disk (cf);
290   flush_buffer (cf);
291
292   if (cf->fd != -1) 
293     {
294       safe_close (cf->fd);
295       cf->fd = -1;
296     }
297   if (cf->buffer != NULL) 
298     {
299       free (cf->buffer);
300       cf->buffer = NULL;
301     }
302 }
303
304 /* Returns the number of `union value's in a case for CF. */
305 size_t
306 casefile_get_value_cnt (const struct casefile *cf) 
307 {
308   assert (cf != NULL);
309
310   return cf->value_cnt;
311 }
312
313 /* Returns the number of cases in casefile CF. */
314 unsigned long
315 casefile_get_case_cnt (const struct casefile *cf) 
316 {
317   assert (cf != NULL);
318
319   return cf->case_cnt;
320 }
321
322 /* Appends a copy of case C to casefile CF.  Not valid after any
323    reader for CF has been created. */
324 void
325 casefile_append (struct casefile *cf, const struct ccase *c) 
326 {
327   assert (cf != NULL);
328   assert (c != NULL);
329   assert (cf->mode == WRITE);
330
331   /* Try memory first. */
332   if (cf->storage == MEMORY) 
333     {
334       if (case_bytes < get_max_workspace ())
335         {
336           size_t block_idx = cf->case_cnt / CASES_PER_BLOCK;
337           size_t case_idx = cf->case_cnt % CASES_PER_BLOCK;
338           struct ccase new_case;
339
340           case_bytes += cf->case_acct_size;
341           case_clone (&new_case, c);
342           if (case_idx == 0) 
343             {
344               if ((block_idx & (block_idx - 1)) == 0) 
345                 {
346                   size_t block_cap = block_idx == 0 ? 1 : block_idx * 2;
347                   cf->cases = xrealloc (cf->cases,
348                                         sizeof *cf->cases * block_cap);
349                 }
350
351               cf->cases[block_idx] = xmalloc (sizeof **cf->cases
352                                               * CASES_PER_BLOCK);
353             }
354
355           case_move (&cf->cases[block_idx][case_idx], &new_case);
356         }
357       else
358         {
359           casefile_to_disk (cf);
360           assert (cf->storage == DISK);
361           write_case_to_disk (cf, c);
362         }
363     }
364   else
365     write_case_to_disk (cf, c);
366
367   cf->case_cnt++;
368 }
369
370 /* Appends case C to casefile CF, which takes over ownership of
371    C.  Not valid after any reader for CF has been created. */
372 void
373 casefile_append_xfer (struct casefile *cf, struct ccase *c) 
374 {
375   casefile_append (cf, c);
376   case_destroy (c);
377 }
378
379 /* Writes case C to casefile CF's disk buffer, first flushing the buffer to
380    disk if it would otherwise overflow. */
381 static void
382 write_case_to_disk (struct casefile *cf, const struct ccase *c) 
383 {
384   case_to_values (c, cf->buffer + cf->buffer_used, cf->value_cnt);
385   cf->buffer_used += cf->value_cnt;
386   if (cf->buffer_used + cf->value_cnt > cf->buffer_size)
387     flush_buffer (cf);
388 }
389
390 /* If any bytes in CF's output buffer are used, flush them to
391    disk. */
392 static void
393 flush_buffer (struct casefile *cf) 
394 {
395   if (cf->buffer_used > 0) 
396     {
397       if (!full_write (cf->fd, cf->buffer,
398                        cf->buffer_size * sizeof *cf->buffer)) 
399         msg (FE, _("Error writing temporary file: %s."), strerror (errno));
400
401       cf->buffer_used = 0;
402     } 
403 }
404
405
406 /* If CF is currently stored in memory, writes it to disk.  Readers, if any,
407    retain their current positions. */
408 void
409 casefile_to_disk (const struct casefile *cf_) 
410 {
411   struct casefile *cf = (struct casefile *) cf_;
412   struct casereader *reader;
413   
414   assert (cf != NULL);
415
416   if (cf->storage == MEMORY)
417     {
418       size_t idx, block_cnt;
419       
420       assert (cf->filename == NULL);
421       assert (cf->fd == -1);
422       assert (cf->buffer_used == 0);
423
424       cf->storage = DISK;
425       if (!make_temp_file (&cf->fd, &cf->filename))
426         err_failure ();
427       cf->buffer = xmalloc (cf->buffer_size * sizeof *cf->buffer);
428       memset (cf->buffer, 0, cf->buffer_size * sizeof *cf->buffer);
429
430       case_bytes -= cf->case_cnt * cf->case_acct_size;
431       for (idx = 0; idx < cf->case_cnt; idx++)
432         {
433           size_t block_idx = idx / CASES_PER_BLOCK;
434           size_t case_idx = idx % CASES_PER_BLOCK;
435           struct ccase *c = &cf->cases[block_idx][case_idx];
436           write_case_to_disk (cf, c);
437           case_destroy (c);
438         }
439
440       block_cnt = DIV_RND_UP (cf->case_cnt, CASES_PER_BLOCK);
441       for (idx = 0; idx < block_cnt; idx++)
442         free (cf->cases[idx]);
443
444       free (cf->cases);
445       cf->cases = NULL;
446
447       if (cf->mode == READ)
448         flush_buffer (cf);
449
450       for (reader = cf->readers; reader != NULL; reader = reader->next)
451         reader_open_file (reader);
452     }
453 }
454
455 /* Changes CF to reader mode, ensuring that no more cases may be
456    added.  Creating a casereader for CF has the same effect. */
457 void
458 casefile_mode_reader (struct casefile *cf) 
459 {
460   assert (cf != NULL);
461   cf->mode = READ;
462 }
463
464 /* Creates and returns a casereader for CF.  A casereader can be used to
465    sequentially read the cases in a casefile. */
466 struct casereader *
467 casefile_get_reader (const struct casefile *cf_) 
468 {
469   struct casefile *cf = (struct casefile *) cf_;
470   struct casereader *reader;
471
472   assert (cf != NULL);
473   assert (!cf->being_destroyed);
474
475   /* Flush the buffer to disk if it's not empty. */
476   if (cf->mode == WRITE && cf->storage == DISK)
477     flush_buffer (cf);
478   
479   cf->mode = READ;
480
481   reader = xmalloc (sizeof *reader);
482   reader->next = cf->readers;
483   if (cf->readers != NULL)
484     reader->next->prev = reader;
485   cf->readers = reader;
486   reader->prev = NULL;
487   reader->cf = cf;
488   reader->case_idx = 0;
489   reader->destructive = 0;
490   reader->fd = -1;
491   reader->buffer = NULL;
492   reader->buffer_pos = 0;
493   case_nullify (&reader->c);
494
495   if (reader->cf->storage == DISK) 
496     reader_open_file (reader);
497
498   return reader;
499 }
500
501 /* Creates and returns a destructive casereader for CF.  Like a
502    normal casereader, a destructive casereader sequentially reads
503    the cases in a casefile.  Unlike a normal casereader, a
504    destructive reader cannot operate concurrently with any other
505    reader.  (This restriction could be relaxed in a few ways, but
506    it is so far unnecessary for other code.) */
507 struct casereader *
508 casefile_get_destructive_reader (struct casefile *cf) 
509 {
510   struct casereader *reader;
511   
512   assert (cf->readers == NULL);
513   reader = casefile_get_reader (cf);
514   reader->destructive = 1;
515   cf->being_destroyed = 1;
516   return reader;
517 }
518
519 /* Opens a disk file for READER and seeks to the current position as indicated
520    by case_idx.  Normally the current position is the beginning of the file,
521    but casefile_to_disk may cause the file to be opened at a different
522    position. */
523 static void
524 reader_open_file (struct casereader *reader) 
525 {
526   struct casefile *cf = reader->cf;
527   off_t file_ofs;
528
529   if (reader->case_idx >= cf->case_cnt)
530     return;
531
532   if (cf->fd != -1) 
533     {
534       reader->fd = cf->fd;
535       cf->fd = -1;
536     }
537   else 
538     {
539       reader->fd = safe_open (cf->filename, O_RDONLY);
540       if (reader->fd < 0)
541         msg (FE, _("%s: Opening temporary file: %s."),
542              cf->filename, strerror (errno));
543     }
544
545   if (cf->buffer != NULL) 
546     {
547       reader->buffer = cf->buffer;
548       cf->buffer = NULL; 
549     }
550   else 
551     {
552       reader->buffer = xmalloc (cf->buffer_size * sizeof *cf->buffer);
553       memset (reader->buffer, 0, cf->buffer_size * sizeof *cf->buffer); 
554     }
555
556   if (cf->value_cnt != 0) 
557     {
558       size_t buffer_case_cnt = cf->buffer_size / cf->value_cnt;
559       file_ofs = ((off_t) reader->case_idx / buffer_case_cnt
560                   * cf->buffer_size * sizeof *cf->buffer);
561       reader->buffer_pos = (reader->case_idx % buffer_case_cnt
562                             * cf->value_cnt);
563     }
564   else 
565     file_ofs = 0;
566   if (lseek (reader->fd, file_ofs, SEEK_SET) != file_ofs)
567     msg (FE, _("%s: Seeking temporary file: %s."),
568          cf->filename, strerror (errno));
569
570   if (cf->case_cnt > 0 && cf->value_cnt > 0)
571     fill_buffer (reader);
572
573   case_create (&reader->c, cf->value_cnt);
574 }
575
576 /* Fills READER's buffer by reading a block from disk. */
577 static void
578 fill_buffer (struct casereader *reader)
579 {
580   int retval = full_read (reader->fd, reader->buffer,
581                           reader->cf->buffer_size * sizeof *reader->buffer);
582   if (retval < 0)
583     msg (FE, _("%s: Reading temporary file: %s."),
584          reader->cf->filename, strerror (errno));
585   else if (retval != reader->cf->buffer_size * sizeof *reader->buffer)
586     msg (FE, _("%s: Temporary file ended unexpectedly."),
587          reader->cf->filename); 
588 }
589
590 /* Returns the casefile that READER reads. */
591 const struct casefile *
592 casereader_get_casefile (const struct casereader *reader) 
593 {
594   assert (reader != NULL);
595   
596   return reader->cf;
597 }
598
599 /* Reads a copy of the next case from READER into C.
600    Caller is responsible for destroying C.
601    Returns true if successful, false at end of file. */
602 int
603 casereader_read (struct casereader *reader, struct ccase *c) 
604 {
605   assert (reader != NULL);
606   
607   if (reader->case_idx >= reader->cf->case_cnt) 
608     return 0;
609
610   if (reader->cf->storage == MEMORY) 
611     {
612       size_t block_idx = reader->case_idx / CASES_PER_BLOCK;
613       size_t case_idx = reader->case_idx % CASES_PER_BLOCK;
614
615       case_clone (c, &reader->cf->cases[block_idx][case_idx]);
616       reader->case_idx++;
617       return 1;
618     }
619   else 
620     {
621       if (reader->buffer_pos + reader->cf->value_cnt > reader->cf->buffer_size)
622         {
623           fill_buffer (reader);
624           reader->buffer_pos = 0;
625         }
626
627       case_from_values (&reader->c, reader->buffer + reader->buffer_pos,
628                         reader->cf->value_cnt);
629       reader->buffer_pos += reader->cf->value_cnt;
630       reader->case_idx++;
631
632       case_clone (c, &reader->c);
633       return 1;
634     }
635 }
636
637 /* Reads the next case from READER into C and transfers ownership
638    to the caller.  Caller is responsible for destroying C.
639    Returns true if successful, false at end of file. */
640 int
641 casereader_read_xfer (struct casereader *reader, struct ccase *c)
642 {
643   assert (reader != NULL);
644
645   if (reader->destructive == 0
646       || reader->case_idx >= reader->cf->case_cnt
647       || reader->cf->storage == DISK) 
648     return casereader_read (reader, c);
649   else 
650     {
651       size_t block_idx = reader->case_idx / CASES_PER_BLOCK;
652       size_t case_idx = reader->case_idx % CASES_PER_BLOCK;
653       struct ccase *read_case = &reader->cf->cases[block_idx][case_idx];
654
655       case_move (c, read_case);
656       reader->case_idx++;
657       return 1;
658     }
659 }
660
661 /* Reads the next case from READER into C and transfers ownership
662    to the caller.  Caller is responsible for destroying C.
663    Assert-fails at end of file. */
664 void
665 casereader_read_xfer_assert (struct casereader *reader, struct ccase *c) 
666 {
667   bool success = casereader_read_xfer (reader, c);
668   assert (success);
669 }
670
671 /* Destroys READER. */
672 void
673 casereader_destroy (struct casereader *reader)
674 {
675   assert (reader != NULL);
676
677   if (reader->next != NULL)
678     reader->next->prev = reader->prev;
679   if (reader->prev != NULL)
680     reader->prev->next = reader->next;
681   if (reader->cf->readers == reader)
682     reader->cf->readers = reader->next;
683
684   if (reader->cf->buffer == NULL)
685     reader->cf->buffer = reader->buffer;
686   else
687     free (reader->buffer);
688
689   if (reader->fd != -1) 
690     {
691       if (reader->cf->fd == -1)
692         reader->cf->fd = reader->fd;
693       else
694         safe_close (reader->fd);
695     }
696   
697   case_destroy (&reader->c);
698
699   free (reader);
700 }
701
702 /* Calls open(), passing FILENAME and FLAGS, repeating as necessary
703    to deal with interrupted calls. */
704 static int
705 safe_open (const char *filename, int flags) 
706 {
707   int fd;
708
709   do 
710     {
711       fd = open (filename, flags);
712     }
713   while (fd == -1 && errno == EINTR);
714
715   return fd;
716 }
717
718 /* Calls close(), passing FD, repeating as necessary to deal with
719    interrupted calls. */
720 static int safe_close (int fd) 
721 {
722   int retval;
723
724   do 
725     {
726       retval = close (fd);
727     }
728   while (retval == -1 && errno == EINTR);
729
730   return retval;
731 }
732
733 /* Registers our exit handler with atexit() if it has not already
734    been registered. */
735 static void
736 register_atexit (void) 
737 {
738   static int registered = 0;
739   if (!registered) 
740     {
741       registered = 1;
742       atexit (exit_handler);
743     }
744 }
745
746
747
748 /* atexit() handler that closes and deletes our temporary
749    files. */
750 static void
751 exit_handler (void) 
752 {
753   while (casefiles != NULL)
754     casefile_destroy (casefiles);
755 }
756 \f
757 #include <gsl/gsl_rng.h>
758 #include <stdarg.h>
759 #include "command.h"
760 #include "lexer.h"
761
762 static void test_casefile (int pattern, size_t value_cnt, size_t case_cnt);
763 static void get_random_case (struct ccase *, size_t value_cnt,
764                              size_t case_idx);
765 static void write_random_case (struct casefile *cf, size_t case_idx);
766 static void read_and_verify_random_case (struct casefile *cf,
767                                          struct casereader *reader,
768                                          size_t case_idx);
769 static void fail_test (const char *message, ...);
770
771 int
772 cmd_debug_casefile (void) 
773 {
774   static const size_t sizes[] =
775     {
776       1, 2, 3, 4, 5, 6, 7, 14, 15, 16, 17, 31, 55, 73,
777       100, 137, 257, 521, 1031, 2053
778     };
779   int size_max;
780   int case_max;
781   int pattern;
782
783   size_max = sizeof sizes / sizeof *sizes;
784   if (lex_match_id ("SMALL")) 
785     {
786       size_max -= 4;
787       case_max = 511; 
788     }
789   else
790     case_max = 4095;
791   if (token != '.')
792     return lex_end_of_command ();
793     
794   for (pattern = 0; pattern < 6; pattern++) 
795     {
796       const size_t *size;
797
798       for (size = sizes; size < sizes + size_max; size++) 
799         {
800           size_t case_cnt;
801
802           for (case_cnt = 0; case_cnt <= case_max;
803                case_cnt = (case_cnt * 2) + 1)
804             test_casefile (pattern, *size, case_cnt);
805         }
806     }
807   printf ("Casefile tests succeeded.\n");
808   return CMD_SUCCESS;
809 }
810
811 static void
812 test_casefile (int pattern, size_t value_cnt, size_t case_cnt) 
813 {
814   struct casefile *cf;
815   struct casereader *r1, *r2;
816   struct ccase c;
817   gsl_rng *rng;
818   size_t i, j;
819
820   rng = gsl_rng_alloc (gsl_rng_mt19937);
821   cf = casefile_create (value_cnt);
822   if (pattern == 5)
823     casefile_to_disk (cf);
824   for (i = 0; i < case_cnt; i++)
825     write_random_case (cf, i);
826   if (pattern == 5)
827     casefile_sleep (cf);
828   r1 = casefile_get_reader (cf);
829   r2 = casefile_get_reader (cf);
830   switch (pattern) 
831     {
832     case 0:
833     case 5:
834       for (i = 0; i < case_cnt; i++) 
835         {
836           read_and_verify_random_case (cf, r1, i);
837           read_and_verify_random_case (cf, r2, i);
838         } 
839       break;
840     case 1:
841       for (i = 0; i < case_cnt; i++)
842         read_and_verify_random_case (cf, r1, i);
843       for (i = 0; i < case_cnt; i++) 
844         read_and_verify_random_case (cf, r2, i);
845       break;
846     case 2:
847     case 3:
848     case 4:
849       for (i = j = 0; i < case_cnt; i++) 
850         {
851           read_and_verify_random_case (cf, r1, i);
852           if (gsl_rng_get (rng) % pattern == 0) 
853             read_and_verify_random_case (cf, r2, j++); 
854           if (i == case_cnt / 2)
855             casefile_to_disk (cf);
856         }
857       for (; j < case_cnt; j++) 
858         read_and_verify_random_case (cf, r2, j);
859       break;
860     }
861   if (casereader_read (r1, &c))
862     fail_test ("Casereader 1 not at end of file.");
863   if (casereader_read (r2, &c))
864     fail_test ("Casereader 2 not at end of file.");
865   if (pattern != 1)
866     casereader_destroy (r1);
867   if (pattern != 2)
868     casereader_destroy (r2);
869   if (pattern > 2) 
870     {
871       r1 = casefile_get_destructive_reader (cf);
872       for (i = 0; i < case_cnt; i++) 
873         {
874           struct ccase read_case, expected_case;
875           
876           get_random_case (&expected_case, value_cnt, i);
877           if (!casereader_read_xfer (r1, &read_case)) 
878             fail_test ("Premature end of casefile.");
879           for (j = 0; j < value_cnt; j++) 
880             {
881               double a = case_num (&read_case, j);
882               double b = case_num (&expected_case, j);
883               if (a != b)
884                 fail_test ("Case %lu fails comparison.", (unsigned long) i); 
885             }
886           case_destroy (&expected_case);
887           case_destroy (&read_case);
888         }
889       casereader_destroy (r1);
890     }
891   casefile_destroy (cf);
892   gsl_rng_free (rng);
893 }
894
895 static void
896 get_random_case (struct ccase *c, size_t value_cnt, size_t case_idx) 
897 {
898   int i;
899   case_create (c, value_cnt);
900   for (i = 0; i < value_cnt; i++)
901     case_data_rw (c, i)->f = case_idx % 257 + i;
902 }
903
904 static void
905 write_random_case (struct casefile *cf, size_t case_idx) 
906 {
907   struct ccase c;
908   get_random_case (&c, casefile_get_value_cnt (cf), case_idx);
909   casefile_append_xfer (cf, &c);
910 }
911
912 static void
913 read_and_verify_random_case (struct casefile *cf,
914                              struct casereader *reader, size_t case_idx) 
915 {
916   struct ccase read_case, expected_case;
917   size_t value_cnt;
918   size_t i;
919   
920   value_cnt = casefile_get_value_cnt (cf);
921   get_random_case (&expected_case, value_cnt, case_idx);
922   if (!casereader_read (reader, &read_case)) 
923     fail_test ("Premature end of casefile.");
924   for (i = 0; i < value_cnt; i++) 
925     {
926       double a = case_num (&read_case, i);
927       double b = case_num (&expected_case, i);
928       if (a != b)
929         fail_test ("Case %lu fails comparison.", (unsigned long) case_idx); 
930     }
931   case_destroy (&read_case);
932   case_destroy (&expected_case);
933 }
934
935 static void
936 fail_test (const char *message, ...) 
937 {
938   va_list args;
939
940   va_start (args, message);
941   vprintf (message, args);
942   putchar ('\n');
943   va_end (args);
944   
945   exit (1);
946 }