706a6b93916dc727e5e5711bdf579a730e69a852
[pspp-builds.git] / src / casefile.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 2004 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21 #include "casefile.h"
22 #include <assert.h>
23 #include <errno.h>
24 #include <fcntl.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <unistd.h>
29 #include "alloc.h"
30 #include "case.h"
31 #include "error.h"
32 #include "misc.h"
33 #include "mkfile.h"
34 #include "settings.h"
35 #include "var.h"
36
37 #define IO_BUF_SIZE (8192 / sizeof (union value))
38
39 /* A casefile represents a sequentially accessible stream of
40    immutable cases.
41
42    If workspace allows, a casefile is maintained in memory.  If
43    workspace overflows, then the casefile is pushed to disk.  In
44    either case the interface presented to callers is kept the
45    same.
46
47    The life cycle of a casefile consists of up to three phases:
48
49        1. Writing.  The casefile initially contains no cases.  In
50           this phase, any number of cases may be appended to the
51           end of a casefile.  (Cases are never inserted in the
52           middle or before the beginning of a casefile.)
53
54           Use casefile_append() or casefile_append_xfer() to
55           append a case to a casefile.
56
57        2. Reading.  The casefile may be read sequentially,
58           starting from the beginning, by "casereaders".  Any
59           number of casereaders may be created, at any time,
60           during the reading phase.  Each casereader has an
61           independent position in the casefile.
62
63           Casereaders may only move forward.  They cannot move
64           backward to arbitrary records or seek randomly.
65           Cloning casereaders is possible, but it is not yet
66           implemented.
67
68           Use casefile_get_reader() to create a casereader for
69           use in phase 2.  This also transitions from phase 1 to
70           phase 2.  Calling casefile_mode_reader() makes the same
71           transition, without creating a casereader.
72
73           Use casereader_read(), casereader_read_xfer(), or
74           casereader_read_xfer_assert() to read a case from a
75           casereader.  Use casereader_destroy() to discard a
76           casereader when it is no longer needed.
77
78        3. Destruction.  This phase is optional.  The casefile is
79           also read with casereaders in this phase, but the
80           ability to create new casereaders is curtailed.
81
82           In this phase, casereaders could still be cloned (once
83           we eventually implement cloning).
84
85           To transition from phase 1 or 2 to phase 3 and create a
86           casereader, call casefile_get_destructive_reader().
87           The same functions apply to the casereader obtained
88           this way as apply to casereaders obtained in phase 2.
89           
90           After casefile_get_destructive_reader() is called, no
91           more casereaders may be created with
92           casefile_get_reader() or
93           casefile_get_destructive_reader().  (If cloning of
94           casereaders were implemented, it would still be
95           possible.)
96
97           The purpose of the limitations applied to casereaders
98           in phase 3 is to allow in-memory casefiles to fully
99           transfer ownership of cases to the casereaders,
100           avoiding the need for extra copies of case data.  For
101           relatively static data sets with many variables, I
102           suspect (without evidence) that this may be a big
103           performance boost.
104
105    When a casefile is no longer needed, it may be destroyed with
106    casefile_destroy().  This function will also destroy any
107    remaining casereaders. */
108
109 /* In-memory cases are arranged in an array of arrays.  The top
110    level is variable size and the size of each bottom level array
111    is fixed at the number of cases defined here.  */
112 #define CASES_PER_BLOCK 128             
113
114 /* A casefile. */
115 struct casefile 
116   {
117     /* Basic data. */
118     struct casefile *next, *prev;       /* Next, prev in global list. */
119     size_t value_cnt;                   /* Case size in `union value's. */
120     size_t case_acct_size;              /* Case size for accounting. */
121     unsigned long case_cnt;             /* Number of cases stored. */
122     enum { MEMORY, DISK } storage;      /* Where cases are stored. */
123     enum { WRITE, READ } mode;          /* Is writing or reading allowed? */
124     struct casereader *readers;         /* List of our readers. */
125     int being_destroyed;                /* Does a destructive reader exist? */
126
127     /* Memory storage. */
128     struct ccase **cases;               /* Pointer to array of cases. */
129
130     /* Disk storage. */
131     int fd;                             /* File descriptor, -1 if none. */
132     char *filename;                     /* Filename. */
133     union value *buffer;                /* I/O buffer, NULL if none. */
134     size_t buffer_used;                 /* Number of values used in buffer. */
135     size_t buffer_size;                 /* Buffer size in values. */
136   };
137
138 /* For reading out the cases in a casefile. */
139 struct casereader 
140   {
141     struct casereader *next, *prev;     /* Next, prev in casefile's list. */
142     struct casefile *cf;                /* Our casefile. */
143     unsigned long case_idx;             /* Case number of current case. */
144     int destructive;                    /* Is this a destructive reader? */
145
146     /* Disk storage. */
147     int fd;                             /* File descriptor. */
148     union value *buffer;                /* I/O buffer. */
149     size_t buffer_pos;                  /* Offset of buffer position. */
150     struct ccase c;                     /* Current case. */
151   };
152
153 /* Return the case number of the current case */
154 unsigned long
155 casereader_cnum(const struct casereader *r)
156 {
157   return r->case_idx;
158 }
159
160 /* Doubly linked list of all casefiles. */
161 static struct casefile *casefiles;
162
163 /* Number of bytes of case allocated in in-memory casefiles. */
164 static size_t case_bytes;
165
166 static void register_atexit (void);
167 static void exit_handler (void);
168
169 static void reader_open_file (struct casereader *reader);
170 static void write_case_to_disk (struct casefile *cf, const struct ccase *c);
171 static void flush_buffer (struct casefile *cf);
172 static void fill_buffer (struct casereader *reader);
173
174 static int safe_open (const char *filename, int flags);
175 static int safe_close (int fd);
176 static int full_read (int fd, void *buffer, size_t size);
177 static int full_write (int fd, const void *buffer, size_t size);
178
179 /* Creates and returns a casefile to store cases of VALUE_CNT
180    `union value's each. */
181 struct casefile *
182 casefile_create (size_t value_cnt) 
183 {
184   struct casefile *cf = xmalloc (sizeof *cf);
185   cf->next = casefiles;
186   cf->prev = NULL;
187   if (cf->next != NULL)
188     cf->next->prev = cf;
189   casefiles = cf;
190   cf->value_cnt = value_cnt;
191   cf->case_acct_size = (cf->value_cnt + 4) * sizeof *cf->buffer;
192   cf->case_cnt = 0;
193   cf->storage = MEMORY;
194   cf->mode = WRITE;
195   cf->readers = NULL;
196   cf->being_destroyed = 0;
197   cf->cases = NULL;
198   cf->fd = -1;
199   cf->filename = NULL;
200   cf->buffer = NULL;
201   cf->buffer_size = ROUND_UP (cf->value_cnt, IO_BUF_SIZE);
202   if (cf->value_cnt > 0 && cf->buffer_size % cf->value_cnt > 64)
203     cf->buffer_size = cf->value_cnt;
204   cf->buffer_used = 0;
205   register_atexit ();
206   return cf;
207 }
208
209 /* Destroys casefile CF. */
210 void
211 casefile_destroy (struct casefile *cf) 
212 {
213   if (cf != NULL) 
214     {
215       if (cf->next != NULL)
216         cf->next->prev = cf->prev;
217       if (cf->prev != NULL)
218         cf->prev->next = cf->next;
219       if (casefiles == cf)
220         casefiles = cf->next;
221
222       while (cf->readers != NULL) 
223         casereader_destroy (cf->readers);
224
225       if (cf->cases != NULL) 
226         {
227           size_t idx, block_cnt;
228
229           case_bytes -= cf->case_cnt * cf->case_acct_size;
230           for (idx = 0; idx < cf->case_cnt; idx++)
231             {
232               size_t block_idx = idx / CASES_PER_BLOCK;
233               size_t case_idx = idx % CASES_PER_BLOCK;
234               struct ccase *c = &cf->cases[block_idx][case_idx];
235               case_destroy (c);
236             }
237
238           block_cnt = DIV_RND_UP (cf->case_cnt, CASES_PER_BLOCK);
239           for (idx = 0; idx < block_cnt; idx++)
240             free (cf->cases[idx]);
241
242           free (cf->cases);
243         }
244
245       if (cf->fd != -1)
246         safe_close (cf->fd);
247           
248       if (cf->filename != NULL && remove (cf->filename) == -1) 
249         msg (ME, _("%s: Removing temporary file: %s."),
250              cf->filename, strerror (errno));
251       free (cf->filename);
252
253       free (cf->buffer);
254
255       free (cf);
256     }
257 }
258
259 /* Returns nonzero only if casefile CF is stored in memory (instead of on
260    disk). */
261 int
262 casefile_in_core (const struct casefile *cf) 
263 {
264   assert (cf != NULL);
265
266   return cf->storage == MEMORY;
267 }
268
269 /* Puts a casefile to "sleep", that is, minimizes the resources
270    needed for it by closing its file descriptor and freeing its
271    buffer.  This is useful if we need so many casefiles that we
272    might not have enough memory and file descriptors to go
273    around.
274
275    For simplicity, this implementation always converts the
276    casefile to reader mode.  If this turns out to be a problem,
277    with a little extra work we could also support sleeping
278    writers. */
279 void
280 casefile_sleep (const struct casefile *cf_) 
281 {
282   struct casefile *cf = (struct casefile *) cf_;
283   assert (cf != NULL);
284
285   casefile_mode_reader (cf);
286   casefile_to_disk (cf);
287   flush_buffer (cf);
288
289   if (cf->fd != -1) 
290     {
291       safe_close (cf->fd);
292       cf->fd = -1;
293     }
294   if (cf->buffer != NULL) 
295     {
296       free (cf->buffer);
297       cf->buffer = NULL;
298     }
299 }
300
301 /* Returns the number of `union value's in a case for CF. */
302 size_t
303 casefile_get_value_cnt (const struct casefile *cf) 
304 {
305   assert (cf != NULL);
306
307   return cf->value_cnt;
308 }
309
310 /* Returns the number of cases in casefile CF. */
311 unsigned long
312 casefile_get_case_cnt (const struct casefile *cf) 
313 {
314   assert (cf != NULL);
315
316   return cf->case_cnt;
317 }
318
319 /* Appends a copy of case C to casefile CF.  Not valid after any
320    reader for CF has been created. */
321 void
322 casefile_append (struct casefile *cf, const struct ccase *c) 
323 {
324   assert (cf != NULL);
325   assert (c != NULL);
326   assert (cf->mode == WRITE);
327
328   /* Try memory first. */
329   if (cf->storage == MEMORY) 
330     {
331       if (case_bytes < get_max_workspace ())
332         {
333           size_t block_idx = cf->case_cnt / CASES_PER_BLOCK;
334           size_t case_idx = cf->case_cnt % CASES_PER_BLOCK;
335           struct ccase new_case;
336
337           case_bytes += cf->case_acct_size;
338           case_clone (&new_case, c);
339           if (case_idx == 0) 
340             {
341               if ((block_idx & (block_idx - 1)) == 0) 
342                 {
343                   size_t block_cap = block_idx == 0 ? 1 : block_idx * 2;
344                   cf->cases = xrealloc (cf->cases,
345                                         sizeof *cf->cases * block_cap);
346                 }
347
348               cf->cases[block_idx] = xmalloc (sizeof **cf->cases
349                                               * CASES_PER_BLOCK);
350             }
351
352           case_move (&cf->cases[block_idx][case_idx], &new_case);
353         }
354       else
355         {
356           casefile_to_disk (cf);
357           assert (cf->storage == DISK);
358           write_case_to_disk (cf, c);
359         }
360     }
361   else
362     write_case_to_disk (cf, c);
363
364   cf->case_cnt++;
365 }
366
367 /* Appends case C to casefile CF, which takes over ownership of
368    C.  Not valid after any reader for CF has been created. */
369 void
370 casefile_append_xfer (struct casefile *cf, struct ccase *c) 
371 {
372   casefile_append (cf, c);
373   case_destroy (c);
374 }
375
376 /* Writes case C to casefile CF's disk buffer, first flushing the buffer to
377    disk if it would otherwise overflow. */
378 static void
379 write_case_to_disk (struct casefile *cf, const struct ccase *c) 
380 {
381   case_to_values (c, cf->buffer + cf->buffer_used, cf->value_cnt);
382   cf->buffer_used += cf->value_cnt;
383   if (cf->buffer_used + cf->value_cnt > cf->buffer_size)
384     flush_buffer (cf);
385 }
386
387 /* If any bytes in CF's output buffer are used, flush them to
388    disk. */
389 static void
390 flush_buffer (struct casefile *cf) 
391 {
392   if (cf->buffer_used > 0) 
393     {
394       if (!full_write (cf->fd, cf->buffer,
395                        cf->buffer_size * sizeof *cf->buffer)) 
396         msg (FE, _("Error writing temporary file: %s."), strerror (errno));
397
398       cf->buffer_used = 0;
399     } 
400 }
401
402
403 /* If CF is currently stored in memory, writes it to disk.  Readers, if any,
404    retain their current positions. */
405 void
406 casefile_to_disk (const struct casefile *cf_) 
407 {
408   struct casefile *cf = (struct casefile *) cf_;
409   struct casereader *reader;
410   
411   assert (cf != NULL);
412
413   if (cf->storage == MEMORY)
414     {
415       size_t idx, block_cnt;
416       
417       assert (cf->filename == NULL);
418       assert (cf->fd == -1);
419       assert (cf->buffer_used == 0);
420
421       cf->storage = DISK;
422       if (!make_temp_file (&cf->fd, &cf->filename))
423         err_failure ();
424       cf->buffer = xmalloc (cf->buffer_size * sizeof *cf->buffer);
425       memset (cf->buffer, 0, cf->buffer_size * sizeof *cf->buffer);
426
427       case_bytes -= cf->case_cnt * cf->case_acct_size;
428       for (idx = 0; idx < cf->case_cnt; idx++)
429         {
430           size_t block_idx = idx / CASES_PER_BLOCK;
431           size_t case_idx = idx % CASES_PER_BLOCK;
432           struct ccase *c = &cf->cases[block_idx][case_idx];
433           write_case_to_disk (cf, c);
434           case_destroy (c);
435         }
436
437       block_cnt = DIV_RND_UP (cf->case_cnt, CASES_PER_BLOCK);
438       for (idx = 0; idx < block_cnt; idx++)
439         free (cf->cases[idx]);
440
441       free (cf->cases);
442       cf->cases = NULL;
443
444       if (cf->mode == READ)
445         flush_buffer (cf);
446
447       for (reader = cf->readers; reader != NULL; reader = reader->next)
448         reader_open_file (reader);
449     }
450 }
451
452 /* Changes CF to reader mode, ensuring that no more cases may be
453    added.  Creating a casereader for CF has the same effect. */
454 void
455 casefile_mode_reader (struct casefile *cf) 
456 {
457   assert (cf != NULL);
458   cf->mode = READ;
459 }
460
461 /* Creates and returns a casereader for CF.  A casereader can be used to
462    sequentially read the cases in a casefile. */
463 struct casereader *
464 casefile_get_reader (const struct casefile *cf_) 
465 {
466   struct casefile *cf = (struct casefile *) cf_;
467   struct casereader *reader;
468
469   assert (cf != NULL);
470   assert (!cf->being_destroyed);
471
472   /* Flush the buffer to disk if it's not empty. */
473   if (cf->mode == WRITE && cf->storage == DISK)
474     flush_buffer (cf);
475   
476   cf->mode = READ;
477
478   reader = xmalloc (sizeof *reader);
479   reader->next = cf->readers;
480   if (cf->readers != NULL)
481     reader->next->prev = reader;
482   cf->readers = reader;
483   reader->prev = NULL;
484   reader->cf = cf;
485   reader->case_idx = 0;
486   reader->destructive = 0;
487   reader->fd = -1;
488   reader->buffer = NULL;
489   reader->buffer_pos = 0;
490   case_nullify (&reader->c);
491
492   if (reader->cf->storage == DISK) 
493     reader_open_file (reader);
494
495   return reader;
496 }
497
498 /* Creates and returns a destructive casereader for CF.  Like a
499    normal casereader, a destructive casereader sequentially reads
500    the cases in a casefile.  Unlike a normal casereader, a
501    destructive reader cannot operate concurrently with any other
502    reader.  (This restriction could be relaxed in a few ways, but
503    it is so far unnecessary for other code.) */
504 struct casereader *
505 casefile_get_destructive_reader (struct casefile *cf) 
506 {
507   struct casereader *reader;
508   
509   assert (cf->readers == NULL);
510   reader = casefile_get_reader (cf);
511   reader->destructive = 1;
512   cf->being_destroyed = 1;
513   return reader;
514 }
515
516 /* Opens a disk file for READER and seeks to the current position as indicated
517    by case_idx.  Normally the current position is the beginning of the file,
518    but casefile_to_disk may cause the file to be opened at a different
519    position. */
520 static void
521 reader_open_file (struct casereader *reader) 
522 {
523   struct casefile *cf = reader->cf;
524   off_t file_ofs;
525
526   if (reader->case_idx >= cf->case_cnt)
527     return;
528
529   if (cf->fd != -1) 
530     {
531       reader->fd = cf->fd;
532       cf->fd = -1;
533     }
534   else 
535     {
536       reader->fd = safe_open (cf->filename, O_RDONLY);
537       if (reader->fd < 0)
538         msg (FE, _("%s: Opening temporary file: %s."),
539              cf->filename, strerror (errno));
540     }
541
542   if (cf->buffer != NULL) 
543     {
544       reader->buffer = cf->buffer;
545       cf->buffer = NULL; 
546     }
547   else 
548     {
549       reader->buffer = xmalloc (cf->buffer_size * sizeof *cf->buffer);
550       memset (reader->buffer, 0, cf->buffer_size * sizeof *cf->buffer); 
551     }
552
553   if (cf->value_cnt != 0) 
554     {
555       size_t buffer_case_cnt = cf->buffer_size / cf->value_cnt;
556       file_ofs = ((off_t) reader->case_idx / buffer_case_cnt
557                   * cf->buffer_size * sizeof *cf->buffer);
558       reader->buffer_pos = (reader->case_idx % buffer_case_cnt
559                             * cf->value_cnt);
560     }
561   else 
562     file_ofs = 0;
563   if (lseek (reader->fd, file_ofs, SEEK_SET) != file_ofs)
564     msg (FE, _("%s: Seeking temporary file: %s."),
565          cf->filename, strerror (errno));
566
567   if (cf->case_cnt > 0 && cf->value_cnt > 0)
568     fill_buffer (reader);
569
570   case_create (&reader->c, cf->value_cnt);
571 }
572
573 /* Fills READER's buffer by reading a block from disk. */
574 static void
575 fill_buffer (struct casereader *reader)
576 {
577   int retval = full_read (reader->fd, reader->buffer,
578                           reader->cf->buffer_size * sizeof *reader->buffer);
579   if (retval < 0)
580     msg (FE, _("%s: Reading temporary file: %s."),
581          reader->cf->filename, strerror (errno));
582   else if (retval != reader->cf->buffer_size * sizeof *reader->buffer)
583     msg (FE, _("%s: Temporary file ended unexpectedly."),
584          reader->cf->filename); 
585 }
586
587 /* Returns the casefile that READER reads. */
588 const struct casefile *
589 casereader_get_casefile (const struct casereader *reader) 
590 {
591   assert (reader != NULL);
592   
593   return reader->cf;
594 }
595
596 /* Reads a copy of the next case from READER into C.
597    Caller is responsible for destroying C.
598    Returns true if successful, false at end of file. */
599 int
600 casereader_read (struct casereader *reader, struct ccase *c) 
601 {
602   assert (reader != NULL);
603   
604   if (reader->case_idx >= reader->cf->case_cnt) 
605     return 0;
606
607   if (reader->cf->storage == MEMORY) 
608     {
609       size_t block_idx = reader->case_idx / CASES_PER_BLOCK;
610       size_t case_idx = reader->case_idx % CASES_PER_BLOCK;
611
612       case_clone (c, &reader->cf->cases[block_idx][case_idx]);
613       reader->case_idx++;
614       return 1;
615     }
616   else 
617     {
618       if (reader->buffer_pos + reader->cf->value_cnt > reader->cf->buffer_size)
619         {
620           fill_buffer (reader);
621           reader->buffer_pos = 0;
622         }
623
624       case_from_values (&reader->c, reader->buffer + reader->buffer_pos,
625                         reader->cf->value_cnt);
626       reader->buffer_pos += reader->cf->value_cnt;
627       reader->case_idx++;
628
629       case_clone (c, &reader->c);
630       return 1;
631     }
632 }
633
634 /* Reads the next case from READER into C and transfers ownership
635    to the caller.  Caller is responsible for destroying C.
636    Returns true if successful, false at end of file. */
637 int
638 casereader_read_xfer (struct casereader *reader, struct ccase *c)
639 {
640   assert (reader != NULL);
641
642   if (reader->destructive == 0
643       || reader->case_idx >= reader->cf->case_cnt
644       || reader->cf->storage == DISK) 
645     return casereader_read (reader, c);
646   else 
647     {
648       size_t block_idx = reader->case_idx / CASES_PER_BLOCK;
649       size_t case_idx = reader->case_idx % CASES_PER_BLOCK;
650       struct ccase *read_case = &reader->cf->cases[block_idx][case_idx];
651
652       case_move (c, read_case);
653       reader->case_idx++;
654       return 1;
655     }
656 }
657
658 /* Reads the next case from READER into C and transfers ownership
659    to the caller.  Caller is responsible for destroying C.
660    Assert-fails at end of file. */
661 void
662 casereader_read_xfer_assert (struct casereader *reader, struct ccase *c) 
663 {
664   bool success = casereader_read_xfer (reader, c);
665   assert (success);
666 }
667
668 /* Destroys READER. */
669 void
670 casereader_destroy (struct casereader *reader)
671 {
672   assert (reader != NULL);
673
674   if (reader->next != NULL)
675     reader->next->prev = reader->prev;
676   if (reader->prev != NULL)
677     reader->prev->next = reader->next;
678   if (reader->cf->readers == reader)
679     reader->cf->readers = reader->next;
680
681   if (reader->cf->buffer == NULL)
682     reader->cf->buffer = reader->buffer;
683   else
684     free (reader->buffer);
685
686   if (reader->fd != -1) 
687     {
688       if (reader->cf->fd == -1)
689         reader->cf->fd = reader->fd;
690       else
691         safe_close (reader->fd);
692     }
693   
694   case_destroy (&reader->c);
695
696   free (reader);
697 }
698
699 /* Calls open(), passing FILENAME and FLAGS, repeating as necessary
700    to deal with interrupted calls. */
701 static int
702 safe_open (const char *filename, int flags) 
703 {
704   int fd;
705
706   do 
707     {
708       fd = open (filename, flags);
709     }
710   while (fd == -1 && errno == EINTR);
711
712   return fd;
713 }
714
715 /* Calls close(), passing FD, repeating as necessary to deal with
716    interrupted calls. */
717 static int safe_close (int fd) 
718 {
719   int retval;
720
721   do 
722     {
723       retval = close (fd);
724     }
725   while (retval == -1 && errno == EINTR);
726
727   return retval;
728 }
729
730 /* Calls read(), passing FD, BUFFER, and SIZE, repeating as
731    necessary to deal with interrupted calls. */
732 static int
733 full_read (int fd, void *buffer_, size_t size) 
734 {
735   char *buffer = buffer_;
736   size_t bytes_read = 0;
737   
738   while (bytes_read < size)
739     {
740       int retval = read (fd, buffer + bytes_read, size - bytes_read);
741       if (retval > 0) 
742         bytes_read += retval; 
743       else if (retval == 0) 
744         return bytes_read;
745       else if (errno != EINTR)
746         return -1;
747     }
748
749   return bytes_read;
750 }
751
752 /* Calls write(), passing FD, BUFFER, and SIZE, repeating as
753    necessary to deal with interrupted calls. */
754 static int
755 full_write (int fd, const void *buffer_, size_t size) 
756 {
757   const char *buffer = buffer_;
758   size_t bytes_written = 0;
759   
760   while (bytes_written < size)
761     {
762       int retval = write (fd, buffer + bytes_written, size - bytes_written);
763       if (retval >= 0) 
764         bytes_written += retval; 
765       else if (errno != EINTR)
766         return -1;
767     }
768
769   return bytes_written;
770 }
771
772
773 /* Registers our exit handler with atexit() if it has not already
774    been registered. */
775 static void
776 register_atexit (void) 
777 {
778   static int registered = 0;
779   if (!registered) 
780     {
781       registered = 1;
782       atexit (exit_handler);
783     }
784 }
785
786
787
788 /* atexit() handler that closes and deletes our temporary
789    files. */
790 static void
791 exit_handler (void) 
792 {
793   while (casefiles != NULL)
794     casefile_destroy (casefiles);
795 }
796 \f
797 #include <gsl/gsl_rng.h>
798 #include <stdarg.h>
799 #include "command.h"
800 #include "lexer.h"
801
802 static void test_casefile (int pattern, size_t value_cnt, size_t case_cnt);
803 static void get_random_case (struct ccase *, size_t value_cnt,
804                              size_t case_idx);
805 static void write_random_case (struct casefile *cf, size_t case_idx);
806 static void read_and_verify_random_case (struct casefile *cf,
807                                          struct casereader *reader,
808                                          size_t case_idx);
809 static void fail_test (const char *message, ...);
810
811 int
812 cmd_debug_casefile (void) 
813 {
814   static const size_t sizes[] =
815     {
816       1, 2, 3, 4, 5, 6, 7, 14, 15, 16, 17, 31, 55, 73,
817       100, 137, 257, 521, 1031, 2053
818     };
819   int size_max;
820   int case_max;
821   int pattern;
822
823   size_max = sizeof sizes / sizeof *sizes;
824   if (lex_match_id ("SMALL")) 
825     {
826       size_max -= 4;
827       case_max = 511; 
828     }
829   else
830     case_max = 4095;
831   if (token != '.')
832     return lex_end_of_command ();
833     
834   for (pattern = 0; pattern < 6; pattern++) 
835     {
836       const size_t *size;
837
838       for (size = sizes; size < sizes + size_max; size++) 
839         {
840           size_t case_cnt;
841
842           for (case_cnt = 0; case_cnt <= case_max;
843                case_cnt = (case_cnt * 2) + 1)
844             test_casefile (pattern, *size, case_cnt);
845         }
846     }
847   printf ("Casefile tests succeeded.\n");
848   return CMD_SUCCESS;
849 }
850
851 static void
852 test_casefile (int pattern, size_t value_cnt, size_t case_cnt) 
853 {
854   struct casefile *cf;
855   struct casereader *r1, *r2;
856   struct ccase c;
857   gsl_rng *rng;
858   size_t i, j;
859
860   rng = gsl_rng_alloc (gsl_rng_mt19937);
861   cf = casefile_create (value_cnt);
862   if (pattern == 5)
863     casefile_to_disk (cf);
864   for (i = 0; i < case_cnt; i++)
865     write_random_case (cf, i);
866   if (pattern == 5)
867     casefile_sleep (cf);
868   r1 = casefile_get_reader (cf);
869   r2 = casefile_get_reader (cf);
870   switch (pattern) 
871     {
872     case 0:
873     case 5:
874       for (i = 0; i < case_cnt; i++) 
875         {
876           read_and_verify_random_case (cf, r1, i);
877           read_and_verify_random_case (cf, r2, i);
878         } 
879       break;
880     case 1:
881       for (i = 0; i < case_cnt; i++)
882         read_and_verify_random_case (cf, r1, i);
883       for (i = 0; i < case_cnt; i++) 
884         read_and_verify_random_case (cf, r2, i);
885       break;
886     case 2:
887     case 3:
888     case 4:
889       for (i = j = 0; i < case_cnt; i++) 
890         {
891           read_and_verify_random_case (cf, r1, i);
892           if (gsl_rng_get (rng) % pattern == 0) 
893             read_and_verify_random_case (cf, r2, j++); 
894           if (i == case_cnt / 2)
895             casefile_to_disk (cf);
896         }
897       for (; j < case_cnt; j++) 
898         read_and_verify_random_case (cf, r2, j);
899       break;
900     }
901   if (casereader_read (r1, &c))
902     fail_test ("Casereader 1 not at end of file.");
903   if (casereader_read (r2, &c))
904     fail_test ("Casereader 2 not at end of file.");
905   if (pattern != 1)
906     casereader_destroy (r1);
907   if (pattern != 2)
908     casereader_destroy (r2);
909   if (pattern > 2) 
910     {
911       r1 = casefile_get_destructive_reader (cf);
912       for (i = 0; i < case_cnt; i++) 
913         {
914           struct ccase read_case, expected_case;
915           
916           get_random_case (&expected_case, value_cnt, i);
917           if (!casereader_read_xfer (r1, &read_case)) 
918             fail_test ("Premature end of casefile.");
919           for (j = 0; j < value_cnt; j++) 
920             {
921               double a = case_num (&read_case, j);
922               double b = case_num (&expected_case, j);
923               if (a != b)
924                 fail_test ("Case %lu fails comparison.", (unsigned long) i); 
925             }
926           case_destroy (&expected_case);
927           case_destroy (&read_case);
928         }
929       casereader_destroy (r1);
930     }
931   casefile_destroy (cf);
932   gsl_rng_free (rng);
933 }
934
935 static void
936 get_random_case (struct ccase *c, size_t value_cnt, size_t case_idx) 
937 {
938   int i;
939   case_create (c, value_cnt);
940   for (i = 0; i < value_cnt; i++)
941     case_data_rw (c, i)->f = case_idx % 257 + i;
942 }
943
944 static void
945 write_random_case (struct casefile *cf, size_t case_idx) 
946 {
947   struct ccase c;
948   get_random_case (&c, casefile_get_value_cnt (cf), case_idx);
949   casefile_append_xfer (cf, &c);
950 }
951
952 static void
953 read_and_verify_random_case (struct casefile *cf,
954                              struct casereader *reader, size_t case_idx) 
955 {
956   struct ccase read_case, expected_case;
957   size_t value_cnt;
958   size_t i;
959   
960   value_cnt = casefile_get_value_cnt (cf);
961   get_random_case (&expected_case, value_cnt, case_idx);
962   if (!casereader_read (reader, &read_case)) 
963     fail_test ("Premature end of casefile.");
964   for (i = 0; i < value_cnt; i++) 
965     {
966       double a = case_num (&read_case, i);
967       double b = case_num (&expected_case, i);
968       if (a != b)
969         fail_test ("Case %lu fails comparison.", (unsigned long) case_idx); 
970     }
971   case_destroy (&read_case);
972   case_destroy (&expected_case);
973 }
974
975 static void
976 fail_test (const char *message, ...) 
977 {
978   va_list args;
979
980   va_start (args, message);
981   vprintf (message, args);
982   putchar ('\n');
983   va_end (args);
984   
985   exit (1);
986 }