f7d171f766be4402fdd9a104b8ca827ae7b59023
[pspp-builds.git] / src / casefile.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 2004 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21 #include "casefile.h"
22 #include <assert.h>
23 #include <errno.h>
24 #include <fcntl.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <unistd.h>
29 #include "alloc.h"
30 #include "case.h"
31 #include "error.h"
32 #include "misc.h"
33 #include "mkfile.h"
34 #include "settings.h"
35 #include "var.h"
36
37 #define IO_BUF_SIZE (8192 / sizeof (union value))
38
39 /* A casefile represents a sequentially accessible stream of
40    immutable cases.
41
42    If workspace allows, a casefile is maintained in memory.  If
43    workspace overflows, then the casefile is pushed to disk.  In
44    either case the interface presented to callers is kept the
45    same.
46
47    The life cycle of a casefile consists of up to three phases:
48
49        1. Writing.  The casefile initially contains no cases.  In
50           this phase, any number of cases may be appended to the
51           end of a casefile.  (Cases are never inserted in the
52           middle or before the beginning of a casefile.)
53
54           Use casefile_append() or casefile_append_xfer() to
55           append a case to a casefile.
56
57        2. Reading.  The casefile may be read sequentially,
58           starting from the beginning, by "casereaders".  Any
59           number of casereaders may be created, at any time,
60           during the reading phase.  Each casereader has an
61           independent position in the casefile.
62
63           Casereaders may only move forward.  They cannot move
64           backward to arbitrary records or seek randomly.  (In the
65           future, the concept of a "casemark" will be introduced
66           to allow a limited form of backward seek, but this has
67           not yet been implemented.)
68
69           Cloning casereaders is possible, but no one has had a
70           need for it yet, so it is not implemented.
71
72           Use casefile_get_reader() to create a casereader for
73           use in phase 2.  This also transitions from phase 2 to
74           phase 3.  Calling casefile_mode_reader() makes the same
75           transition, without creating a casereader.
76
77           Use casereader_read(), casereader_read_xfer(), or
78           casereader_read_xfer_assert() to read a case from a
79           casereader.  Use casereader_destroy() to discard a
80           casereader when it is no longer needed.
81
82        3. Destruction.  This phase is optional.  The casefile is
83           also read with casereaders in this phase, but the
84           ability to create new casereaders is curtailed.
85
86           In this phase, casereaders could still be cloned, and
87           casemarks could still be used to seek backward (given
88           that we eventually implement these functionalities).
89
90           To transition from phase 1 or 2 to phase 3 and create a
91           casereader, call casefile_get_destructive_reader().
92           The same functions apply to the casereader obtained
93           this way as apply to casereaders obtained in phase 2.
94           
95           After casefile_get_destructive_reader() is called, no
96           more casereaders may be created with
97           casefile_get_reader() or
98           casefile_get_destructive_reader().  (If cloning of
99           casereaders or casemarks were implemented, they would
100           still be possible.)
101
102           The purpose of the limitations applied to casereaders
103           in phase 3 is to allow in-memory casefiles to fully
104           transfer ownership of cases to the casereaders,
105           avoiding the need for extra copies of case data.  For
106           relatively static data sets with many variables, I
107           suspect (without evidence) that this may be a big
108           performance boost.
109
110    When a casefile is no longer needed, it may be destroyed with
111    casefile_destroy().  This function will also destroy any
112    remaining casereaders. */
113
114 /* In-memory cases are arranged in an array of arrays.  The top
115    level is variable size and the size of each bottom level array
116    is fixed at the number of cases defined here.  */
117 #define CASES_PER_BLOCK 128             
118
119 /* A casefile. */
120 struct casefile 
121   {
122     /* Basic data. */
123     struct casefile *next, *prev;       /* Next, prev in global list. */
124     size_t value_cnt;                   /* Case size in `union value's. */
125     size_t case_acct_size;              /* Case size for accounting. */
126     unsigned long case_cnt;             /* Number of cases stored. */
127     enum { MEMORY, DISK } storage;      /* Where cases are stored. */
128     enum { WRITE, READ } mode;          /* Is writing or reading allowed? */
129     struct casereader *readers;         /* List of our readers. */
130     int being_destroyed;                /* Does a destructive reader exist? */
131
132     /* Memory storage. */
133     struct ccase **cases;               /* Pointer to array of cases. */
134
135     /* Disk storage. */
136     int fd;                             /* File descriptor, -1 if none. */
137     char *filename;                     /* Filename. */
138     union value *buffer;                /* I/O buffer, NULL if none. */
139     size_t buffer_used;                 /* Number of values used in buffer. */
140     size_t buffer_size;                 /* Buffer size in values. */
141   };
142
143 /* For reading out the cases in a casefile. */
144 struct casereader 
145   {
146     struct casereader *next, *prev;     /* Next, prev in casefile's list. */
147     struct casefile *cf;                /* Our casefile. */
148     unsigned long case_idx;             /* Case number of current case. */
149     int destructive;                    /* Is this a destructive reader? */
150
151     /* Disk storage. */
152     int fd;                             /* File descriptor. */
153     union value *buffer;                /* I/O buffer. */
154     size_t buffer_pos;                  /* Offset of buffer position. */
155     struct ccase c;                     /* Current case. */
156   };
157
158 /* Return the case number of the current case */
159 unsigned long
160 casereader_cnum(const struct casereader *r)
161 {
162   return r->case_idx;
163 }
164
165 /* Doubly linked list of all casefiles. */
166 static struct casefile *casefiles;
167
168 /* Number of bytes of case allocated in in-memory casefiles. */
169 static size_t case_bytes;
170
171 static void register_atexit (void);
172 static void exit_handler (void);
173
174 static void reader_open_file (struct casereader *reader);
175 static void write_case_to_disk (struct casefile *cf, const struct ccase *c);
176 static void flush_buffer (struct casefile *cf);
177 static void fill_buffer (struct casereader *reader);
178
179 static int safe_open (const char *filename, int flags);
180 static int safe_close (int fd);
181 static int full_read (int fd, void *buffer, size_t size);
182 static int full_write (int fd, const void *buffer, size_t size);
183
184 /* Creates and returns a casefile to store cases of VALUE_CNT
185    `union value's each. */
186 struct casefile *
187 casefile_create (size_t value_cnt) 
188 {
189   struct casefile *cf = xmalloc (sizeof *cf);
190   cf->next = casefiles;
191   cf->prev = NULL;
192   if (cf->next != NULL)
193     cf->next->prev = cf;
194   casefiles = cf;
195   cf->value_cnt = value_cnt;
196   cf->case_acct_size = (cf->value_cnt + 4) * sizeof *cf->buffer;
197   cf->case_cnt = 0;
198   cf->storage = MEMORY;
199   cf->mode = WRITE;
200   cf->readers = NULL;
201   cf->being_destroyed = 0;
202   cf->cases = NULL;
203   cf->fd = -1;
204   cf->filename = NULL;
205   cf->buffer = NULL;
206   cf->buffer_size = ROUND_UP (cf->value_cnt, IO_BUF_SIZE);
207   if (cf->value_cnt > 0 && cf->buffer_size % cf->value_cnt > 64)
208     cf->buffer_size = cf->value_cnt;
209   cf->buffer_used = 0;
210   register_atexit ();
211   return cf;
212 }
213
214 /* Destroys casefile CF. */
215 void
216 casefile_destroy (struct casefile *cf) 
217 {
218   if (cf != NULL) 
219     {
220       if (cf->next != NULL)
221         cf->next->prev = cf->prev;
222       if (cf->prev != NULL)
223         cf->prev->next = cf->next;
224       if (casefiles == cf)
225         casefiles = cf->next;
226
227       while (cf->readers != NULL) 
228         casereader_destroy (cf->readers);
229
230       if (cf->cases != NULL) 
231         {
232           size_t idx, block_cnt;
233
234           case_bytes -= cf->case_cnt * cf->case_acct_size;
235           for (idx = 0; idx < cf->case_cnt; idx++)
236             {
237               size_t block_idx = idx / CASES_PER_BLOCK;
238               size_t case_idx = idx % CASES_PER_BLOCK;
239               struct ccase *c = &cf->cases[block_idx][case_idx];
240               case_destroy (c);
241             }
242
243           block_cnt = DIV_RND_UP (cf->case_cnt, CASES_PER_BLOCK);
244           for (idx = 0; idx < block_cnt; idx++)
245             free (cf->cases[idx]);
246
247           free (cf->cases);
248         }
249
250       if (cf->fd != -1)
251         safe_close (cf->fd);
252           
253       if (cf->filename != NULL && remove (cf->filename) == -1) 
254         msg (ME, _("%s: Removing temporary file: %s."),
255              cf->filename, strerror (errno));
256       free (cf->filename);
257
258       free (cf->buffer);
259
260       free (cf);
261     }
262 }
263
264 /* Returns nonzero only if casefile CF is stored in memory (instead of on
265    disk). */
266 int
267 casefile_in_core (const struct casefile *cf) 
268 {
269   assert (cf != NULL);
270
271   return cf->storage == MEMORY;
272 }
273
274 /* Puts a casefile to "sleep", that is, minimizes the resources
275    needed for it by closing its file descriptor and freeing its
276    buffer.  This is useful if we need so many casefiles that we
277    might not have enough memory and file descriptors to go
278    around.
279
280    For simplicity, this implementation always converts the
281    casefile to reader mode.  If this turns out to be a problem,
282    with a little extra work we could also support sleeping
283    writers. */
284 void
285 casefile_sleep (const struct casefile *cf_) 
286 {
287   struct casefile *cf = (struct casefile *) cf_;
288   assert (cf != NULL);
289
290   casefile_mode_reader (cf);
291   casefile_to_disk (cf);
292   flush_buffer (cf);
293
294   if (cf->fd != -1) 
295     {
296       safe_close (cf->fd);
297       cf->fd = -1;
298     }
299   if (cf->buffer != NULL) 
300     {
301       free (cf->buffer);
302       cf->buffer = NULL;
303     }
304 }
305
306 /* Returns the number of `union value's in a case for CF. */
307 size_t
308 casefile_get_value_cnt (const struct casefile *cf) 
309 {
310   assert (cf != NULL);
311
312   return cf->value_cnt;
313 }
314
315 /* Returns the number of cases in casefile CF. */
316 unsigned long
317 casefile_get_case_cnt (const struct casefile *cf) 
318 {
319   assert (cf != NULL);
320
321   return cf->case_cnt;
322 }
323
324 /* Appends a copy of case C to casefile CF.  Not valid after any
325    reader for CF has been created. */
326 void
327 casefile_append (struct casefile *cf, const struct ccase *c) 
328 {
329   assert (cf != NULL);
330   assert (c != NULL);
331   assert (cf->mode == WRITE);
332
333   /* Try memory first. */
334   if (cf->storage == MEMORY) 
335     {
336       if (case_bytes < get_max_workspace ())
337         {
338           size_t block_idx = cf->case_cnt / CASES_PER_BLOCK;
339           size_t case_idx = cf->case_cnt % CASES_PER_BLOCK;
340           struct ccase new_case;
341
342           case_bytes += cf->case_acct_size;
343           case_clone (&new_case, c);
344           if (case_idx == 0) 
345             {
346               if ((block_idx & (block_idx - 1)) == 0) 
347                 {
348                   size_t block_cap = block_idx == 0 ? 1 : block_idx * 2;
349                   cf->cases = xrealloc (cf->cases,
350                                         sizeof *cf->cases * block_cap);
351                 }
352
353               cf->cases[block_idx] = xmalloc (sizeof **cf->cases
354                                               * CASES_PER_BLOCK);
355             }
356
357           case_move (&cf->cases[block_idx][case_idx], &new_case);
358         }
359       else
360         {
361           casefile_to_disk (cf);
362           assert (cf->storage == DISK);
363           write_case_to_disk (cf, c);
364         }
365     }
366   else
367     write_case_to_disk (cf, c);
368
369   cf->case_cnt++;
370 }
371
372 /* Appends case C to casefile CF, which takes over ownership of
373    C.  Not valid after any reader for CF has been created. */
374 void
375 casefile_append_xfer (struct casefile *cf, struct ccase *c) 
376 {
377   casefile_append (cf, c);
378   case_destroy (c);
379 }
380
381 /* Writes case C to casefile CF's disk buffer, first flushing the buffer to
382    disk if it would otherwise overflow. */
383 static void
384 write_case_to_disk (struct casefile *cf, const struct ccase *c) 
385 {
386   case_to_values (c, cf->buffer + cf->buffer_used, cf->value_cnt);
387   cf->buffer_used += cf->value_cnt;
388   if (cf->buffer_used + cf->value_cnt > cf->buffer_size)
389     flush_buffer (cf);
390 }
391
392 /* If any bytes in CF's output buffer are used, flush them to
393    disk. */
394 static void
395 flush_buffer (struct casefile *cf) 
396 {
397   if (cf->buffer_used > 0) 
398     {
399       if (!full_write (cf->fd, cf->buffer,
400                        cf->buffer_size * sizeof *cf->buffer)) 
401         msg (FE, _("Error writing temporary file: %s."), strerror (errno));
402
403       cf->buffer_used = 0;
404     } 
405 }
406
407
408 /* If CF is currently stored in memory, writes it to disk.  Readers, if any,
409    retain their current positions. */
410 void
411 casefile_to_disk (const struct casefile *cf_) 
412 {
413   struct casefile *cf = (struct casefile *) cf_;
414   struct casereader *reader;
415   
416   assert (cf != NULL);
417
418   if (cf->storage == MEMORY)
419     {
420       size_t idx, block_cnt;
421       
422       assert (cf->filename == NULL);
423       assert (cf->fd == -1);
424       assert (cf->buffer_used == 0);
425
426       cf->storage = DISK;
427       if (!make_temp_file (&cf->fd, &cf->filename))
428         err_failure ();
429       cf->buffer = xmalloc (cf->buffer_size * sizeof *cf->buffer);
430       memset (cf->buffer, 0, cf->buffer_size * sizeof *cf->buffer);
431
432       case_bytes -= cf->case_cnt * cf->case_acct_size;
433       for (idx = 0; idx < cf->case_cnt; idx++)
434         {
435           size_t block_idx = idx / CASES_PER_BLOCK;
436           size_t case_idx = idx % CASES_PER_BLOCK;
437           struct ccase *c = &cf->cases[block_idx][case_idx];
438           write_case_to_disk (cf, c);
439           case_destroy (c);
440         }
441
442       block_cnt = DIV_RND_UP (cf->case_cnt, CASES_PER_BLOCK);
443       for (idx = 0; idx < block_cnt; idx++)
444         free (cf->cases[idx]);
445
446       free (cf->cases);
447       cf->cases = NULL;
448
449       if (cf->mode == READ)
450         flush_buffer (cf);
451
452       for (reader = cf->readers; reader != NULL; reader = reader->next)
453         reader_open_file (reader);
454     }
455 }
456
457 /* Changes CF to reader mode, ensuring that no more cases may be
458    added.  Creating a casereader for CF has the same effect. */
459 void
460 casefile_mode_reader (struct casefile *cf) 
461 {
462   assert (cf != NULL);
463   cf->mode = READ;
464 }
465
466 /* Creates and returns a casereader for CF.  A casereader can be used to
467    sequentially read the cases in a casefile. */
468 struct casereader *
469 casefile_get_reader (const struct casefile *cf_) 
470 {
471   struct casefile *cf = (struct casefile *) cf_;
472   struct casereader *reader;
473
474   assert (cf != NULL);
475   assert (!cf->being_destroyed);
476
477   /* Flush the buffer to disk if it's not empty. */
478   if (cf->mode == WRITE && cf->storage == DISK)
479     flush_buffer (cf);
480   
481   cf->mode = READ;
482
483   reader = xmalloc (sizeof *reader);
484   reader->next = cf->readers;
485   if (cf->readers != NULL)
486     reader->next->prev = reader;
487   cf->readers = reader;
488   reader->prev = NULL;
489   reader->cf = cf;
490   reader->case_idx = 0;
491   reader->destructive = 0;
492   reader->fd = -1;
493   reader->buffer = NULL;
494   reader->buffer_pos = 0;
495   case_nullify (&reader->c);
496
497   if (reader->cf->storage == DISK) 
498     reader_open_file (reader);
499
500   return reader;
501 }
502
503 /* Creates and returns a destructive casereader for CF.  Like a
504    normal casereader, a destructive casereader sequentially reads
505    the cases in a casefile.  Unlike a normal casereader, a
506    destructive reader cannot operate concurrently with any other
507    reader.  (This restriction could be relaxed in a few ways, but
508    it is so far unnecessary for other code.) */
509 struct casereader *
510 casefile_get_destructive_reader (struct casefile *cf) 
511 {
512   struct casereader *reader;
513   
514   assert (cf->readers == NULL);
515   reader = casefile_get_reader (cf);
516   reader->destructive = 1;
517   cf->being_destroyed = 1;
518   return reader;
519 }
520
521 /* Opens a disk file for READER and seeks to the current position as indicated
522    by case_idx.  Normally the current position is the beginning of the file,
523    but casefile_to_disk may cause the file to be opened at a different
524    position. */
525 static void
526 reader_open_file (struct casereader *reader) 
527 {
528   struct casefile *cf = reader->cf;
529   off_t file_ofs;
530
531   if (reader->case_idx >= cf->case_cnt)
532     return;
533
534   if (cf->fd != -1) 
535     {
536       reader->fd = cf->fd;
537       cf->fd = -1;
538     }
539   else 
540     {
541       reader->fd = safe_open (cf->filename, O_RDONLY);
542       if (reader->fd < 0)
543         msg (FE, _("%s: Opening temporary file: %s."),
544              cf->filename, strerror (errno));
545     }
546
547   if (cf->buffer != NULL) 
548     {
549       reader->buffer = cf->buffer;
550       cf->buffer = NULL; 
551     }
552   else 
553     {
554       reader->buffer = xmalloc (cf->buffer_size * sizeof *cf->buffer);
555       memset (reader->buffer, 0, cf->buffer_size * sizeof *cf->buffer); 
556     }
557
558   if (cf->value_cnt != 0) 
559     {
560       size_t buffer_case_cnt = cf->buffer_size / cf->value_cnt;
561       file_ofs = ((off_t) reader->case_idx / buffer_case_cnt
562                   * cf->buffer_size * sizeof *cf->buffer);
563       reader->buffer_pos = (reader->case_idx % buffer_case_cnt
564                             * cf->value_cnt);
565     }
566   else 
567     file_ofs = 0;
568   if (lseek (reader->fd, file_ofs, SEEK_SET) != file_ofs)
569     msg (FE, _("%s: Seeking temporary file: %s."),
570          cf->filename, strerror (errno));
571
572   if (cf->case_cnt > 0 && cf->value_cnt > 0)
573     fill_buffer (reader);
574
575   case_create (&reader->c, cf->value_cnt);
576 }
577
578 /* Fills READER's buffer by reading a block from disk. */
579 static void
580 fill_buffer (struct casereader *reader)
581 {
582   int retval = full_read (reader->fd, reader->buffer,
583                           reader->cf->buffer_size * sizeof *reader->buffer);
584   if (retval < 0)
585     msg (FE, _("%s: Reading temporary file: %s."),
586          reader->cf->filename, strerror (errno));
587   else if (retval != reader->cf->buffer_size * sizeof *reader->buffer)
588     msg (FE, _("%s: Temporary file ended unexpectedly."),
589          reader->cf->filename); 
590 }
591
592 /* Returns the casefile that READER reads. */
593 const struct casefile *
594 casereader_get_casefile (const struct casereader *reader) 
595 {
596   assert (reader != NULL);
597   
598   return reader->cf;
599 }
600
601 /* Reads a copy of the next case from READER into C.
602    Caller is responsible for destroying C.
603    Returns true if successful, false at end of file. */
604 int
605 casereader_read (struct casereader *reader, struct ccase *c) 
606 {
607   assert (reader != NULL);
608   
609   if (reader->case_idx >= reader->cf->case_cnt) 
610     return 0;
611
612   if (reader->cf->storage == MEMORY) 
613     {
614       size_t block_idx = reader->case_idx / CASES_PER_BLOCK;
615       size_t case_idx = reader->case_idx % CASES_PER_BLOCK;
616
617       case_clone (c, &reader->cf->cases[block_idx][case_idx]);
618       reader->case_idx++;
619       return 1;
620     }
621   else 
622     {
623       if (reader->buffer_pos + reader->cf->value_cnt > reader->cf->buffer_size)
624         {
625           fill_buffer (reader);
626           reader->buffer_pos = 0;
627         }
628
629       case_from_values (&reader->c, reader->buffer + reader->buffer_pos,
630                         reader->cf->value_cnt);
631       reader->buffer_pos += reader->cf->value_cnt;
632       reader->case_idx++;
633
634       case_clone (c, &reader->c);
635       return 1;
636     }
637 }
638
639 /* Reads the next case from READER into C and transfers ownership
640    to the caller.  Caller is responsible for destroying C.
641    Returns true if successful, false at end of file. */
642 int
643 casereader_read_xfer (struct casereader *reader, struct ccase *c)
644 {
645   assert (reader != NULL);
646
647   if (reader->destructive == 0
648       || reader->case_idx >= reader->cf->case_cnt
649       || reader->cf->storage == DISK) 
650     return casereader_read (reader, c);
651   else 
652     {
653       size_t block_idx = reader->case_idx / CASES_PER_BLOCK;
654       size_t case_idx = reader->case_idx % CASES_PER_BLOCK;
655       struct ccase *read_case = &reader->cf->cases[block_idx][case_idx];
656
657       case_move (c, read_case);
658       reader->case_idx++;
659       return 1;
660     }
661 }
662
663 /* Reads the next case from READER into C and transfers ownership
664    to the caller.  Caller is responsible for destroying C.
665    Assert-fails at end of file. */
666 void
667 casereader_read_xfer_assert (struct casereader *reader, struct ccase *c) 
668 {
669   bool success = casereader_read_xfer (reader, c);
670   assert (success);
671 }
672
673 /* Destroys READER. */
674 void
675 casereader_destroy (struct casereader *reader)
676 {
677   assert (reader != NULL);
678
679   if (reader->next != NULL)
680     reader->next->prev = reader->prev;
681   if (reader->prev != NULL)
682     reader->prev->next = reader->next;
683   if (reader->cf->readers == reader)
684     reader->cf->readers = reader->next;
685
686   if (reader->cf->buffer == NULL)
687     reader->cf->buffer = reader->buffer;
688   else
689     free (reader->buffer);
690
691   if (reader->fd != -1) 
692     {
693       if (reader->cf->fd == -1)
694         reader->cf->fd = reader->fd;
695       else
696         safe_close (reader->fd);
697     }
698   
699   case_destroy (&reader->c);
700
701   free (reader);
702 }
703
704 /* Calls open(), passing FILENAME and FLAGS, repeating as necessary
705    to deal with interrupted calls. */
706 static int
707 safe_open (const char *filename, int flags) 
708 {
709   int fd;
710
711   do 
712     {
713       fd = open (filename, flags);
714     }
715   while (fd == -1 && errno == EINTR);
716
717   return fd;
718 }
719
720 /* Calls close(), passing FD, repeating as necessary to deal with
721    interrupted calls. */
722 static int safe_close (int fd) 
723 {
724   int retval;
725
726   do 
727     {
728       retval = close (fd);
729     }
730   while (retval == -1 && errno == EINTR);
731
732   return retval;
733 }
734
735 /* Calls read(), passing FD, BUFFER, and SIZE, repeating as
736    necessary to deal with interrupted calls. */
737 static int
738 full_read (int fd, void *buffer_, size_t size) 
739 {
740   char *buffer = buffer_;
741   size_t bytes_read = 0;
742   
743   while (bytes_read < size)
744     {
745       int retval = read (fd, buffer + bytes_read, size - bytes_read);
746       if (retval > 0) 
747         bytes_read += retval; 
748       else if (retval == 0) 
749         return bytes_read;
750       else if (errno != EINTR)
751         return -1;
752     }
753
754   return bytes_read;
755 }
756
757 /* Calls write(), passing FD, BUFFER, and SIZE, repeating as
758    necessary to deal with interrupted calls. */
759 static int
760 full_write (int fd, const void *buffer_, size_t size) 
761 {
762   const char *buffer = buffer_;
763   size_t bytes_written = 0;
764   
765   while (bytes_written < size)
766     {
767       int retval = write (fd, buffer + bytes_written, size - bytes_written);
768       if (retval >= 0) 
769         bytes_written += retval; 
770       else if (errno != EINTR)
771         return -1;
772     }
773
774   return bytes_written;
775 }
776
777
778 /* Registers our exit handler with atexit() if it has not already
779    been registered. */
780 static void
781 register_atexit (void) 
782 {
783   static int registered = 0;
784   if (!registered) 
785     {
786       registered = 1;
787       atexit (exit_handler);
788     }
789 }
790
791
792
793 /* atexit() handler that closes and deletes our temporary
794    files. */
795 static void
796 exit_handler (void) 
797 {
798   while (casefiles != NULL)
799     casefile_destroy (casefiles);
800 }
801 \f
802 #include <gsl/gsl_rng.h>
803 #include <stdarg.h>
804 #include "command.h"
805 #include "lexer.h"
806
807 static void test_casefile (int pattern, size_t value_cnt, size_t case_cnt);
808 static void get_random_case (struct ccase *, size_t value_cnt,
809                              size_t case_idx);
810 static void write_random_case (struct casefile *cf, size_t case_idx);
811 static void read_and_verify_random_case (struct casefile *cf,
812                                          struct casereader *reader,
813                                          size_t case_idx);
814 static void fail_test (const char *message, ...);
815
816 int
817 cmd_debug_casefile (void) 
818 {
819   static const size_t sizes[] =
820     {
821       1, 2, 3, 4, 5, 6, 7, 14, 15, 16, 17, 31, 55, 73,
822       100, 137, 257, 521, 1031, 2053
823     };
824   int size_max;
825   int case_max;
826   int pattern;
827
828   size_max = sizeof sizes / sizeof *sizes;
829   if (lex_match_id ("SMALL")) 
830     {
831       size_max -= 4;
832       case_max = 511; 
833     }
834   else
835     case_max = 4095;
836   if (token != '.')
837     return lex_end_of_command ();
838     
839   for (pattern = 0; pattern < 6; pattern++) 
840     {
841       const size_t *size;
842
843       for (size = sizes; size < sizes + size_max; size++) 
844         {
845           size_t case_cnt;
846
847           for (case_cnt = 0; case_cnt <= case_max;
848                case_cnt = (case_cnt * 2) + 1)
849             test_casefile (pattern, *size, case_cnt);
850         }
851     }
852   printf ("Casefile tests succeeded.\n");
853   return CMD_SUCCESS;
854 }
855
856 static void
857 test_casefile (int pattern, size_t value_cnt, size_t case_cnt) 
858 {
859   struct casefile *cf;
860   struct casereader *r1, *r2;
861   struct ccase c;
862   gsl_rng *rng;
863   size_t i, j;
864
865   rng = gsl_rng_alloc (gsl_rng_mt19937);
866   cf = casefile_create (value_cnt);
867   if (pattern == 5)
868     casefile_to_disk (cf);
869   for (i = 0; i < case_cnt; i++)
870     write_random_case (cf, i);
871   if (pattern == 5)
872     casefile_sleep (cf);
873   r1 = casefile_get_reader (cf);
874   r2 = casefile_get_reader (cf);
875   switch (pattern) 
876     {
877     case 0:
878     case 5:
879       for (i = 0; i < case_cnt; i++) 
880         {
881           read_and_verify_random_case (cf, r1, i);
882           read_and_verify_random_case (cf, r2, i);
883         } 
884       break;
885     case 1:
886       for (i = 0; i < case_cnt; i++)
887         read_and_verify_random_case (cf, r1, i);
888       for (i = 0; i < case_cnt; i++) 
889         read_and_verify_random_case (cf, r2, i);
890       break;
891     case 2:
892     case 3:
893     case 4:
894       for (i = j = 0; i < case_cnt; i++) 
895         {
896           read_and_verify_random_case (cf, r1, i);
897           if (gsl_rng_get (rng) % pattern == 0) 
898             read_and_verify_random_case (cf, r2, j++); 
899           if (i == case_cnt / 2)
900             casefile_to_disk (cf);
901         }
902       for (; j < case_cnt; j++) 
903         read_and_verify_random_case (cf, r2, j);
904       break;
905     }
906   if (casereader_read (r1, &c))
907     fail_test ("Casereader 1 not at end of file.");
908   if (casereader_read (r2, &c))
909     fail_test ("Casereader 2 not at end of file.");
910   if (pattern != 1)
911     casereader_destroy (r1);
912   if (pattern != 2)
913     casereader_destroy (r2);
914   if (pattern > 2) 
915     {
916       r1 = casefile_get_destructive_reader (cf);
917       for (i = 0; i < case_cnt; i++) 
918         {
919           struct ccase read_case, expected_case;
920           
921           get_random_case (&expected_case, value_cnt, i);
922           if (!casereader_read_xfer (r1, &read_case)) 
923             fail_test ("Premature end of casefile.");
924           for (j = 0; j < value_cnt; j++) 
925             {
926               double a = case_num (&read_case, j);
927               double b = case_num (&expected_case, j);
928               if (a != b)
929                 fail_test ("Case %lu fails comparison.", (unsigned long) i); 
930             }
931           case_destroy (&expected_case);
932           case_destroy (&read_case);
933         }
934       casereader_destroy (r1);
935     }
936   casefile_destroy (cf);
937   gsl_rng_free (rng);
938 }
939
940 static void
941 get_random_case (struct ccase *c, size_t value_cnt, size_t case_idx) 
942 {
943   int i;
944   case_create (c, value_cnt);
945   for (i = 0; i < value_cnt; i++)
946     case_data_rw (c, i)->f = case_idx % 257 + i;
947 }
948
949 static void
950 write_random_case (struct casefile *cf, size_t case_idx) 
951 {
952   struct ccase c;
953   get_random_case (&c, casefile_get_value_cnt (cf), case_idx);
954   casefile_append_xfer (cf, &c);
955 }
956
957 static void
958 read_and_verify_random_case (struct casefile *cf,
959                              struct casereader *reader, size_t case_idx) 
960 {
961   struct ccase read_case, expected_case;
962   size_t value_cnt;
963   size_t i;
964   
965   value_cnt = casefile_get_value_cnt (cf);
966   get_random_case (&expected_case, value_cnt, case_idx);
967   if (!casereader_read (reader, &read_case)) 
968     fail_test ("Premature end of casefile.");
969   for (i = 0; i < value_cnt; i++) 
970     {
971       double a = case_num (&read_case, i);
972       double b = case_num (&expected_case, i);
973       if (a != b)
974         fail_test ("Case %lu fails comparison.", (unsigned long) case_idx); 
975     }
976   case_destroy (&read_case);
977   case_destroy (&expected_case);
978 }
979
980 static void
981 fail_test (const char *message, ...) 
982 {
983   va_list args;
984
985   va_start (args, message);
986   vprintf (message, args);
987   putchar ('\n');
988   va_end (args);
989   
990   exit (1);
991 }