Update comment to reflect availability of random casereaders.
[pspp-builds.git] / src / data / casefile.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 2004 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21 #include "casefile.h"
22 #include <assert.h>
23 #include <errno.h>
24 #include <fcntl.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <unistd.h>
29 #include <libpspp/alloc.h>
30 #include "case.h"
31 #include <libpspp/compiler.h>
32 #include <libpspp/message.h>
33 #include "full-read.h"
34 #include "full-write.h"
35 #include <libpspp/misc.h>
36 #include "make-file.h"
37 #include "settings.h"
38 #include "variable.h"
39
40 #include "gettext.h"
41 #define _(msgid) gettext (msgid)
42
43 #define IO_BUF_SIZE (8192 / sizeof (union value))
44
45 /* A casefile represents a sequentially accessible stream of
46    immutable cases.
47
48    If workspace allows, a casefile is maintained in memory.  If
49    workspace overflows, then the casefile is pushed to disk.  In
50    either case the interface presented to callers is kept the
51    same.
52
53    The life cycle of a casefile consists of up to three phases:
54
55        1. Writing.  The casefile initially contains no cases.  In
56           this phase, any number of cases may be appended to the
57           end of a casefile.  (Cases are never inserted in the
58           middle or before the beginning of a casefile.)
59
60           Use casefile_append() or casefile_append_xfer() to
61           append a case to a casefile.
62
63        2. Reading.  The casefile may be read sequentially,
64           starting from the beginning, by "casereaders".  Any
65           number of casereaders may be created, at any time,
66           during the reading phase.  Each casereader has an
67           independent position in the casefile.
68
69           Ordinary casereaders may only move forward.  They
70           cannot move backward to arbitrary records or seek
71           randomly.  Cloning casereaders is possible, but it is
72           not yet implemented.
73
74           Use casefile_get_reader() to create a casereader for
75           use in phase 2.  This also transitions from phase 1 to
76           phase 2.  Calling casefile_mode_reader() makes the same
77           transition, without creating a casereader.
78
79           Use casereader_read() or casereader_read_xfer() to read
80           a case from a casereader.  Use casereader_destroy() to
81           discard a casereader when it is no longer needed.
82
83           "Random" casereaders, which support a seek operation,
84           may also be created.  These should not, generally, be
85           used for statistical procedures, because random access
86           is much slower than sequential access.  They are
87           intended for use by the GUI.
88
89        3. Destruction.  This phase is optional.  The casefile is
90           also read with casereaders in this phase, but the
91           ability to create new casereaders is curtailed.
92
93           In this phase, casereaders could still be cloned (once
94           we eventually implement cloning).
95
96           To transition from phase 1 or 2 to phase 3 and create a
97           casereader, call casefile_get_destructive_reader().
98           The same functions apply to the casereader obtained
99           this way as apply to casereaders obtained in phase 2.
100           
101           After casefile_get_destructive_reader() is called, no
102           more casereaders may be created with
103           casefile_get_reader() or
104           casefile_get_destructive_reader().  (If cloning of
105           casereaders were implemented, it would still be
106           possible.)
107
108           The purpose of the limitations applied to casereaders
109           in phase 3 is to allow in-memory casefiles to fully
110           transfer ownership of cases to the casereaders,
111           avoiding the need for extra copies of case data.  For
112           relatively static data sets with many variables, I
113           suspect (without evidence) that this may be a big
114           performance boost.
115
116    When a casefile is no longer needed, it may be destroyed with
117    casefile_destroy().  This function will also destroy any
118    remaining casereaders. */
119
120 /* FIXME: should we implement compression? */
121
122 /* In-memory cases are arranged in an array of arrays.  The top
123    level is variable size and the size of each bottom level array
124    is fixed at the number of cases defined here.  */
125 #define CASES_PER_BLOCK 128             
126
127 /* A casefile. */
128 struct casefile 
129   {
130     /* Basic data. */
131     struct casefile *next, *prev;       /* Next, prev in global list. */
132     size_t value_cnt;                   /* Case size in `union value's. */
133     size_t case_acct_size;              /* Case size for accounting. */
134     unsigned long case_cnt;             /* Number of cases stored. */
135     enum { MEMORY, DISK } storage;      /* Where cases are stored. */
136     enum { WRITE, READ } mode;          /* Is writing or reading allowed? */
137     struct casereader *readers;         /* List of our readers. */
138     bool being_destroyed;               /* Does a destructive reader exist? */
139     bool ok;                            /* False after I/O error. */
140
141     /* Memory storage. */
142     struct ccase **cases;               /* Pointer to array of cases. */
143
144     /* Disk storage. */
145     int fd;                             /* File descriptor, -1 if none. */
146     char *file_name;                    /* File name. */
147     union value *buffer;                /* I/O buffer, NULL if none. */
148     size_t buffer_used;                 /* Number of values used in buffer. */
149     size_t buffer_size;                 /* Buffer size in values. */
150   };
151
152 /* For reading out the cases in a casefile. */
153 struct casereader 
154   {
155     struct casereader *next, *prev;     /* Next, prev in casefile's list. */
156     struct casefile *cf;                /* Our casefile. */
157     unsigned long case_idx;             /* Case number of current case. */
158     bool destructive;                   /* Is this a destructive reader? */
159     bool random;                        /* Is this a random reader? */
160
161     /* Disk storage. */
162     int fd;                             /* File descriptor. */
163     off_t file_ofs;                     /* Current position in fd. */
164     off_t buffer_ofs;                   /* File offset of buffer start. */
165     union value *buffer;                /* I/O buffer. */
166     size_t buffer_pos;                  /* Offset of buffer position. */
167     struct ccase c;                     /* Current case. */
168   };
169
170 /* Return the case number of the current case */
171 unsigned long
172 casereader_cnum(const struct casereader *r)
173 {
174   return r->case_idx;
175 }
176
177 /* Doubly linked list of all casefiles. */
178 static struct casefile *casefiles;
179
180 /* Number of bytes of case allocated in in-memory casefiles. */
181 static size_t case_bytes;
182
183 static void register_atexit (void);
184 static void exit_handler (void);
185
186 static void reader_open_file (struct casereader *);
187 static void write_case_to_disk (struct casefile *, const struct ccase *);
188 static void flush_buffer (struct casefile *);
189 static void seek_and_fill_buffer (struct casereader *);
190 static bool fill_buffer (struct casereader *);
191
192 static void io_error (struct casefile *, const char *, ...)
193      PRINTF_FORMAT (2, 3);
194 static int safe_open (const char *file_name, int flags);
195 static int safe_close (int fd);
196
197 /* Creates and returns a casefile to store cases of VALUE_CNT
198    `union value's each. */
199 struct casefile *
200 casefile_create (size_t value_cnt) 
201 {
202   struct casefile *cf = xmalloc (sizeof *cf);
203   cf->next = casefiles;
204   cf->prev = NULL;
205   if (cf->next != NULL)
206     cf->next->prev = cf;
207   casefiles = cf;
208   cf->value_cnt = value_cnt;
209   cf->case_acct_size = (cf->value_cnt + 4) * sizeof *cf->buffer;
210   cf->case_cnt = 0;
211   cf->storage = MEMORY;
212   cf->mode = WRITE;
213   cf->readers = NULL;
214   cf->being_destroyed = 0;
215   cf->ok = true;
216   cf->cases = NULL;
217   cf->fd = -1;
218   cf->file_name = NULL;
219   cf->buffer = NULL;
220   cf->buffer_size = ROUND_UP (cf->value_cnt, IO_BUF_SIZE);
221   if (cf->value_cnt > 0 && cf->buffer_size % cf->value_cnt > 64)
222     cf->buffer_size = cf->value_cnt;
223   cf->buffer_used = 0;
224   register_atexit ();
225   return cf;
226 }
227
228 /* Destroys casefile CF. */
229 void
230 casefile_destroy (struct casefile *cf) 
231 {
232   if (cf != NULL) 
233     {
234       if (cf->next != NULL)
235         cf->next->prev = cf->prev;
236       if (cf->prev != NULL)
237         cf->prev->next = cf->next;
238       if (casefiles == cf)
239         casefiles = cf->next;
240
241       while (cf->readers != NULL) 
242         casereader_destroy (cf->readers);
243
244       if (cf->cases != NULL) 
245         {
246           size_t idx, block_cnt;
247
248           case_bytes -= cf->case_cnt * cf->case_acct_size;
249           for (idx = 0; idx < cf->case_cnt; idx++)
250             {
251               size_t block_idx = idx / CASES_PER_BLOCK;
252               size_t case_idx = idx % CASES_PER_BLOCK;
253               struct ccase *c = &cf->cases[block_idx][case_idx];
254               case_destroy (c);
255             }
256
257           block_cnt = DIV_RND_UP (cf->case_cnt, CASES_PER_BLOCK);
258           for (idx = 0; idx < block_cnt; idx++)
259             free (cf->cases[idx]);
260
261           free (cf->cases);
262         }
263
264       if (cf->fd != -1)
265         safe_close (cf->fd);
266           
267       if (cf->file_name != NULL && remove (cf->file_name) == -1) 
268         io_error (cf, _("%s: Removing temporary file: %s."),
269                   cf->file_name, strerror (errno));
270       free (cf->file_name);
271
272       free (cf->buffer);
273
274       free (cf);
275     }
276 }
277
278 /* Returns true if an I/O error has occurred in casefile CF. */
279 bool
280 casefile_error (const struct casefile *cf) 
281 {
282   return !cf->ok;
283 }
284
285 /* Returns true only if casefile CF is stored in memory (instead of on
286    disk), false otherwise. */
287 bool
288 casefile_in_core (const struct casefile *cf) 
289 {
290   assert (cf != NULL);
291
292   return cf->storage == MEMORY;
293 }
294
295 /* Puts a casefile to "sleep", that is, minimizes the resources
296    needed for it by closing its file descriptor and freeing its
297    buffer.  This is useful if we need so many casefiles that we
298    might not have enough memory and file descriptors to go
299    around.
300
301    For simplicity, this implementation always converts the
302    casefile to reader mode.  If this turns out to be a problem,
303    with a little extra work we could also support sleeping
304    writers.
305
306    Returns true if successful, false if an I/O error occurred. */
307 bool
308 casefile_sleep (const struct casefile *cf_) 
309 {
310   struct casefile *cf = (struct casefile *) cf_;
311   assert (cf != NULL);
312
313   casefile_mode_reader (cf);
314   casefile_to_disk (cf);
315   flush_buffer (cf);
316
317   if (cf->fd != -1) 
318     {
319       safe_close (cf->fd);
320       cf->fd = -1;
321     }
322   if (cf->buffer != NULL) 
323     {
324       free (cf->buffer);
325       cf->buffer = NULL;
326     }
327
328   return cf->ok;
329 }
330
331 /* Returns the number of `union value's in a case for CF. */
332 size_t
333 casefile_get_value_cnt (const struct casefile *cf) 
334 {
335   assert (cf != NULL);
336
337   return cf->value_cnt;
338 }
339
340 /* Returns the number of cases in casefile CF. */
341 unsigned long
342 casefile_get_case_cnt (const struct casefile *cf) 
343 {
344   assert (cf != NULL);
345
346   return cf->case_cnt;
347 }
348
349 /* Appends a copy of case C to casefile CF.  Not valid after any
350    reader for CF has been created.
351    Returns true if successful, false if an I/O error occurred. */
352 bool
353 casefile_append (struct casefile *cf, const struct ccase *c) 
354 {
355   assert (cf != NULL);
356   assert (c != NULL);
357   assert (cf->mode == WRITE);
358
359   /* Try memory first. */
360   if (cf->storage == MEMORY) 
361     {
362       if (case_bytes < get_workspace ())
363         {
364           size_t block_idx = cf->case_cnt / CASES_PER_BLOCK;
365           size_t case_idx = cf->case_cnt % CASES_PER_BLOCK;
366           struct ccase new_case;
367
368           case_bytes += cf->case_acct_size;
369           case_clone (&new_case, c);
370           if (case_idx == 0) 
371             {
372               if ((block_idx & (block_idx - 1)) == 0) 
373                 {
374                   size_t block_cap = block_idx == 0 ? 1 : block_idx * 2;
375                   cf->cases = xnrealloc (cf->cases,
376                                          block_cap, sizeof *cf->cases);
377                 }
378
379               cf->cases[block_idx] = xnmalloc (CASES_PER_BLOCK,
380                                                sizeof **cf->cases);
381             }
382
383           case_move (&cf->cases[block_idx][case_idx], &new_case);
384         }
385       else
386         {
387           casefile_to_disk (cf);
388           assert (cf->storage == DISK);
389           write_case_to_disk (cf, c);
390         }
391     }
392   else
393     write_case_to_disk (cf, c);
394
395   cf->case_cnt++;
396   return cf->ok;
397 }
398
399 /* Appends case C to casefile CF, which takes over ownership of
400    C.  Not valid after any reader for CF has been created.
401    Returns true if successful, false if an I/O error occurred. */
402 bool
403 casefile_append_xfer (struct casefile *cf, struct ccase *c) 
404 {
405   casefile_append (cf, c);
406   case_destroy (c);
407   return cf->ok;
408 }
409
410 /* Writes case C to casefile CF's disk buffer, first flushing the buffer to
411    disk if it would otherwise overflow.
412    Returns true if successful, false if an I/O error occurred. */
413 static void
414 write_case_to_disk (struct casefile *cf, const struct ccase *c) 
415 {
416   if (!cf->ok)
417     return;
418   
419   case_to_values (c, cf->buffer + cf->buffer_used, cf->value_cnt);
420   cf->buffer_used += cf->value_cnt;
421   if (cf->buffer_used + cf->value_cnt > cf->buffer_size)
422     flush_buffer (cf);
423 }
424
425 /* If any bytes in CF's output buffer are used, flush them to
426    disk. */
427 static void
428 flush_buffer (struct casefile *cf) 
429 {
430   if (cf->ok && cf->buffer_used > 0) 
431     {
432       if (!full_write (cf->fd, cf->buffer,
433                        cf->buffer_size * sizeof *cf->buffer))
434         io_error (cf, _("Error writing temporary file: %s."),
435                   strerror (errno));
436       cf->buffer_used = 0;
437     }
438 }
439
440 /* If CF is currently stored in memory, writes it to disk.  Readers, if any,
441    retain their current positions.
442    Returns true if successful, false if an I/O error occurred. */
443 bool
444 casefile_to_disk (const struct casefile *cf_) 
445 {
446   struct casefile *cf = (struct casefile *) cf_;
447   struct casereader *reader;
448   
449   assert (cf != NULL);
450
451   if (cf->storage == MEMORY)
452     {
453       size_t idx, block_cnt;
454       
455       assert (cf->file_name == NULL);
456       assert (cf->fd == -1);
457       assert (cf->buffer_used == 0);
458
459       if (!make_temp_file (&cf->fd, &cf->file_name))
460         {
461           cf->ok = false;
462           return false;
463         }
464       cf->storage = DISK;
465
466       cf->buffer = xnmalloc (cf->buffer_size, sizeof *cf->buffer);
467       memset (cf->buffer, 0, cf->buffer_size * sizeof *cf->buffer);
468
469       case_bytes -= cf->case_cnt * cf->case_acct_size;
470       for (idx = 0; idx < cf->case_cnt; idx++)
471         {
472           size_t block_idx = idx / CASES_PER_BLOCK;
473           size_t case_idx = idx % CASES_PER_BLOCK;
474           struct ccase *c = &cf->cases[block_idx][case_idx];
475           write_case_to_disk (cf, c);
476           case_destroy (c);
477         }
478
479       block_cnt = DIV_RND_UP (cf->case_cnt, CASES_PER_BLOCK);
480       for (idx = 0; idx < block_cnt; idx++)
481         free (cf->cases[idx]);
482
483       free (cf->cases);
484       cf->cases = NULL;
485
486       if (cf->mode == READ)
487         flush_buffer (cf);
488
489       for (reader = cf->readers; reader != NULL; reader = reader->next)
490         reader_open_file (reader);
491     }
492   return cf->ok;
493 }
494
495 /* Changes CF to reader mode, ensuring that no more cases may be
496    added.  Creating a casereader for CF has the same effect. */
497 void
498 casefile_mode_reader (struct casefile *cf) 
499 {
500   assert (cf != NULL);
501   cf->mode = READ;
502 }
503
504 /* Creates and returns a casereader for CF.  A casereader can be used to
505    sequentially read the cases in a casefile. */
506 struct casereader *
507 casefile_get_reader (const struct casefile *cf_) 
508 {
509   struct casefile *cf = (struct casefile *) cf_;
510   struct casereader *reader;
511
512   assert (cf != NULL);
513   assert (!cf->being_destroyed);
514
515   /* Flush the buffer to disk if it's not empty. */
516   if (cf->mode == WRITE && cf->storage == DISK)
517     flush_buffer (cf);
518   
519   cf->mode = READ;
520
521   reader = xmalloc (sizeof *reader);
522   reader->next = cf->readers;
523   if (cf->readers != NULL)
524     reader->next->prev = reader;
525   cf->readers = reader;
526   reader->prev = NULL;
527   reader->cf = cf;
528   reader->case_idx = 0;
529   reader->destructive = 0;
530   reader->random = false;
531   reader->fd = -1;
532   reader->buffer = NULL;
533   reader->buffer_pos = 0;
534   case_nullify (&reader->c);
535
536   if (reader->cf->storage == DISK) 
537     reader_open_file (reader);
538
539   return reader;
540 }
541
542 /* Creates and returns a random casereader for CF.  A random
543    casereader can be used to randomly read the cases in a
544    casefile. */
545 struct casereader *
546 casefile_get_random_reader (const struct casefile *cf) 
547 {
548   struct casereader *reader = casefile_get_reader (cf);
549   reader->random = true;
550   return reader;
551 }
552
553 /* Creates and returns a destructive casereader for CF.  Like a
554    normal casereader, a destructive casereader sequentially reads
555    the cases in a casefile.  Unlike a normal casereader, a
556    destructive reader cannot operate concurrently with any other
557    reader.  (This restriction could be relaxed in a few ways, but
558    it is so far unnecessary for other code.) */
559 struct casereader *
560 casefile_get_destructive_reader (struct casefile *cf) 
561 {
562   struct casereader *reader;
563   
564   assert (cf->readers == NULL);
565   reader = casefile_get_reader (cf);
566   reader->destructive = 1;
567   cf->being_destroyed = 1;
568   return reader;
569 }
570
571 /* Opens a disk file for READER and seeks to the current position as indicated
572    by case_idx.  Normally the current position is the beginning of the file,
573    but casefile_to_disk may cause the file to be opened at a different
574    position. */
575 static void
576 reader_open_file (struct casereader *reader) 
577 {
578   struct casefile *cf = reader->cf;
579   if (!cf->ok || reader->case_idx >= cf->case_cnt)
580     return;
581
582   if (cf->fd != -1) 
583     {
584       reader->fd = cf->fd;
585       cf->fd = -1;
586     }
587   else 
588     {
589       reader->fd = safe_open (cf->file_name, O_RDONLY);
590       if (reader->fd < 0)
591         io_error (cf, _("%s: Opening temporary file: %s."),
592                   cf->file_name, strerror (errno));
593     }
594
595   if (cf->buffer != NULL) 
596     {
597       reader->buffer = cf->buffer;
598       cf->buffer = NULL; 
599     }
600   else 
601     {
602       reader->buffer = xnmalloc (cf->buffer_size, sizeof *cf->buffer);
603       memset (reader->buffer, 0, cf->buffer_size * sizeof *cf->buffer); 
604     }
605
606   case_create (&reader->c, cf->value_cnt);
607
608   reader->buffer_ofs = -1;
609   reader->file_ofs = -1;
610   seek_and_fill_buffer (reader);
611 }
612
613 /* Seeks the backing file for READER to the proper position and
614    refreshes the buffer contents. */
615 static void
616 seek_and_fill_buffer (struct casereader *reader) 
617 {
618   struct casefile *cf = reader->cf;
619   off_t new_ofs;
620
621   if (cf->value_cnt != 0) 
622     {
623       size_t buffer_case_cnt = cf->buffer_size / cf->value_cnt;
624       new_ofs = ((off_t) reader->case_idx / buffer_case_cnt
625                   * cf->buffer_size * sizeof *cf->buffer);
626       reader->buffer_pos = (reader->case_idx % buffer_case_cnt
627                             * cf->value_cnt);
628     }
629   else 
630     new_ofs = 0;
631   if (new_ofs != reader->file_ofs) 
632     {
633       if (lseek (reader->fd, new_ofs, SEEK_SET) != new_ofs)
634         io_error (cf, _("%s: Seeking temporary file: %s."),
635                   cf->file_name, strerror (errno));
636       else
637         reader->file_ofs = new_ofs;
638     }
639
640   if (cf->case_cnt > 0 && cf->value_cnt > 0 && reader->buffer_ofs != new_ofs)
641     fill_buffer (reader);
642 }
643
644 /* Fills READER's buffer by reading a block from disk. */
645 static bool
646 fill_buffer (struct casereader *reader)
647 {
648   if (reader->cf->ok) 
649     {
650       int bytes = full_read (reader->fd, reader->buffer,
651                              reader->cf->buffer_size * sizeof *reader->buffer);
652       if (bytes < 0) 
653         io_error (reader->cf, _("%s: Reading temporary file: %s."),
654                   reader->cf->file_name, strerror (errno));
655       else if (bytes != reader->cf->buffer_size * sizeof *reader->buffer) 
656         io_error (reader->cf, _("%s: Temporary file ended unexpectedly."),
657                   reader->cf->file_name);
658       else 
659         {
660           reader->buffer_ofs = reader->file_ofs;
661           reader->file_ofs += bytes; 
662         }
663     }
664   return reader->cf->ok;
665 }
666
667 /* Returns the casefile that READER reads. */
668 const struct casefile *
669 casereader_get_casefile (const struct casereader *reader) 
670 {
671   assert (reader != NULL);
672   
673   return reader->cf;
674 }
675
676 /* Reads a copy of the next case from READER into C.
677    Caller is responsible for destroying C.
678    Returns true if successful, false at end of file. */
679 bool
680 casereader_read (struct casereader *reader, struct ccase *c) 
681 {
682   assert (reader != NULL);
683   
684   if (!reader->cf->ok || reader->case_idx >= reader->cf->case_cnt) 
685     return false;
686
687   if (reader->cf->storage == MEMORY) 
688     {
689       size_t block_idx = reader->case_idx / CASES_PER_BLOCK;
690       size_t case_idx = reader->case_idx % CASES_PER_BLOCK;
691
692       case_clone (c, &reader->cf->cases[block_idx][case_idx]);
693       reader->case_idx++;
694       return true;
695     }
696   else 
697     {
698       if (reader->buffer_pos + reader->cf->value_cnt > reader->cf->buffer_size)
699         {
700           if (!fill_buffer (reader))
701             return false;
702           reader->buffer_pos = 0;
703         }
704
705       case_from_values (&reader->c, reader->buffer + reader->buffer_pos,
706                         reader->cf->value_cnt);
707       reader->buffer_pos += reader->cf->value_cnt;
708       reader->case_idx++;
709
710       case_clone (c, &reader->c);
711       return true;
712     }
713 }
714
715 /* Reads the next case from READER into C and transfers ownership
716    to the caller.  Caller is responsible for destroying C.
717    Returns true if successful, false at end of file or on I/O
718    error. */
719 bool
720 casereader_read_xfer (struct casereader *reader, struct ccase *c)
721 {
722   assert (reader != NULL);
723
724   if (reader->destructive == 0
725       || reader->case_idx >= reader->cf->case_cnt
726       || reader->cf->storage == DISK) 
727     return casereader_read (reader, c);
728   else 
729     {
730       size_t block_idx = reader->case_idx / CASES_PER_BLOCK;
731       size_t case_idx = reader->case_idx % CASES_PER_BLOCK;
732       struct ccase *read_case = &reader->cf->cases[block_idx][case_idx];
733
734       case_move (c, read_case);
735       reader->case_idx++;
736       return true;
737     }
738 }
739
740 /* Sets the next case to be read by READER to CASE_IDX,
741    which must be less than the number of cases in the casefile.
742    Allowed only for random readers. */
743 void
744 casereader_seek (struct casereader *reader, unsigned long case_idx) 
745 {
746   assert (reader != NULL);
747   assert (reader->random);
748   assert (case_idx < reader->cf->case_cnt);
749
750   reader->case_idx = case_idx;
751   if (reader->cf->storage == DISK)
752     seek_and_fill_buffer (reader);
753 }
754
755 /* Destroys READER. */
756 void
757 casereader_destroy (struct casereader *reader)
758 {
759   assert (reader != NULL);
760
761   if (reader->next != NULL)
762     reader->next->prev = reader->prev;
763   if (reader->prev != NULL)
764     reader->prev->next = reader->next;
765   if (reader->cf->readers == reader)
766     reader->cf->readers = reader->next;
767
768   if (reader->cf->buffer == NULL)
769     reader->cf->buffer = reader->buffer;
770   else
771     free (reader->buffer);
772
773   if (reader->fd != -1) 
774     {
775       if (reader->cf->fd == -1)
776         reader->cf->fd = reader->fd;
777       else
778         safe_close (reader->fd);
779     }
780   
781   case_destroy (&reader->c);
782
783   free (reader);
784 }
785
786 /* Marks CF as having encountered an I/O error.
787    If this is the first error on CF, reports FORMAT to the user,
788    doing printf()-style substitutions. */
789 static void
790 io_error (struct casefile *cf, const char *format, ...)
791 {
792   if (cf->ok) 
793     {
794       struct msg m;
795       va_list args;
796
797       m.category = MSG_GENERAL;
798       m.severity = MSG_ERROR;
799       m.where.file_name = NULL;
800       m.where.line_number = -1;
801       va_start (args, format);
802       m.text = xvasprintf (format, args);
803       va_end (args);
804       
805       msg_emit (&m);
806     }
807   cf->ok = false;
808 }
809
810 /* Calls open(), passing FILE_NAME and FLAGS, repeating as necessary
811    to deal with interrupted calls. */
812 static int
813 safe_open (const char *file_name, int flags) 
814 {
815   int fd;
816
817   do 
818     {
819       fd = open (file_name, flags);
820     }
821   while (fd == -1 && errno == EINTR);
822
823   return fd;
824 }
825
826 /* Calls close(), passing FD, repeating as necessary to deal with
827    interrupted calls. */
828 static int safe_close (int fd) 
829 {
830   int retval;
831
832   do 
833     {
834       retval = close (fd);
835     }
836   while (retval == -1 && errno == EINTR);
837
838   return retval;
839 }
840
841 /* Registers our exit handler with atexit() if it has not already
842    been registered. */
843 static void
844 register_atexit (void) 
845 {
846   static bool registered = false;
847   if (!registered) 
848     {
849       registered = true;
850       atexit (exit_handler);
851     }
852 }
853
854 /* atexit() handler that closes and deletes our temporary
855    files. */
856 static void
857 exit_handler (void) 
858 {
859   while (casefiles != NULL)
860     casefile_destroy (casefiles);
861 }