Minor cleanup and spot fixes.
[pspp] / src / data / casefile.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 2004 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21 #include "casefile.h"
22 #include <assert.h>
23 #include <errno.h>
24 #include <fcntl.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <unistd.h>
29 #include <libpspp/alloc.h>
30 #include "case.h"
31 #include <libpspp/compiler.h>
32 #include <libpspp/message.h>
33 #include "full-read.h"
34 #include "full-write.h"
35 #include <libpspp/misc.h>
36 #include "make-file.h"
37 #include "settings.h"
38 #include "variable.h"
39
40 #include "gettext.h"
41 #define _(msgid) gettext (msgid)
42
43 #define IO_BUF_SIZE (8192 / sizeof (union value))
44
45 /* A casefile represents a sequentially accessible stream of
46    immutable cases.
47
48    If workspace allows, a casefile is maintained in memory.  If
49    workspace overflows, then the casefile is pushed to disk.  In
50    either case the interface presented to callers is kept the
51    same.
52
53    The life cycle of a casefile consists of up to three phases:
54
55        1. Writing.  The casefile initially contains no cases.  In
56           this phase, any number of cases may be appended to the
57           end of a casefile.  (Cases are never inserted in the
58           middle or before the beginning of a casefile.)
59
60           Use casefile_append() or casefile_append_xfer() to
61           append a case to a casefile.
62
63        2. Reading.  The casefile may be read sequentially,
64           starting from the beginning, by "casereaders".  Any
65           number of casereaders may be created, at any time,
66           during the reading phase.  Each casereader has an
67           independent position in the casefile.
68
69           Ordinary casereaders may only move forward.  They
70           cannot move backward to arbitrary records or seek
71           randomly.  Cloning casereaders is possible, but it is
72           not yet implemented.
73
74           Use casefile_get_reader() to create a casereader for
75           use in phase 2.  This also transitions from phase 1 to
76           phase 2.  Calling casefile_mode_reader() makes the same
77           transition, without creating a casereader.
78
79           Use casereader_read() or casereader_read_xfer() to read
80           a case from a casereader.  Use casereader_destroy() to
81           discard a casereader when it is no longer needed.
82
83           "Random" casereaders, which support a seek operation,
84           may also be created.  These should not, generally, be
85           used for statistical procedures, because random access
86           is much slower than sequential access.  They are
87           intended for use by the GUI.
88
89        3. Destruction.  This phase is optional.  The casefile is
90           also read with casereaders in this phase, but the
91           ability to create new casereaders is curtailed.
92
93           In this phase, casereaders could still be cloned (once
94           we eventually implement cloning).
95
96           To transition from phase 1 or 2 to phase 3 and create a
97           casereader, call casefile_get_destructive_reader().
98           The same functions apply to the casereader obtained
99           this way as apply to casereaders obtained in phase 2.
100           
101           After casefile_get_destructive_reader() is called, no
102           more casereaders may be created with
103           casefile_get_reader() or
104           casefile_get_destructive_reader().  (If cloning of
105           casereaders were implemented, it would still be
106           possible.)
107
108           The purpose of the limitations applied to casereaders
109           in phase 3 is to allow in-memory casefiles to fully
110           transfer ownership of cases to the casereaders,
111           avoiding the need for extra copies of case data.  For
112           relatively static data sets with many variables, I
113           suspect (without evidence) that this may be a big
114           performance boost.
115
116    When a casefile is no longer needed, it may be destroyed with
117    casefile_destroy().  This function will also destroy any
118    remaining casereaders. */
119
120 /* FIXME: should we implement compression? */
121
122 /* In-memory cases are arranged in an array of arrays.  The top
123    level is variable size and the size of each bottom level array
124    is fixed at the number of cases defined here.  */
125 #define CASES_PER_BLOCK 128             
126
127 /* A casefile. */
128 struct casefile 
129   {
130     /* Basic data. */
131     struct casefile *next, *prev;       /* Next, prev in global list. */
132     size_t value_cnt;                   /* Case size in `union value's. */
133     size_t case_acct_size;              /* Case size for accounting. */
134     unsigned long case_cnt;             /* Number of cases stored. */
135     enum { MEMORY, DISK } storage;      /* Where cases are stored. */
136     enum { WRITE, READ } mode;          /* Is writing or reading allowed? */
137     struct casereader *readers;         /* List of our readers. */
138     bool being_destroyed;               /* Does a destructive reader exist? */
139     bool ok;                            /* False after I/O error. */
140
141     /* Memory storage. */
142     struct ccase **cases;               /* Pointer to array of cases. */
143
144     /* Disk storage. */
145     int fd;                             /* File descriptor, -1 if none. */
146     char *file_name;                    /* File name. */
147     union value *buffer;                /* I/O buffer, NULL if none. */
148     size_t buffer_used;                 /* Number of values used in buffer. */
149     size_t buffer_size;                 /* Buffer size in values. */
150   };
151
152 /* For reading out the cases in a casefile. */
153 struct casereader 
154   {
155     struct casereader *next, *prev;     /* Next, prev in casefile's list. */
156     struct casefile *cf;                /* Our casefile. */
157     unsigned long case_idx;             /* Case number of current case. */
158     bool destructive;                   /* Is this a destructive reader? */
159     bool random;                        /* Is this a random reader? */
160
161     /* Disk storage. */
162     int fd;                             /* File descriptor. */
163     off_t file_ofs;                     /* Current position in fd. */
164     off_t buffer_ofs;                   /* File offset of buffer start. */
165     union value *buffer;                /* I/O buffer. */
166     size_t buffer_pos;                  /* Offset of buffer position. */
167     struct ccase c;                     /* Current case. */
168   };
169
170 /* Return the case number of the current case */
171 unsigned long
172 casereader_cnum(const struct casereader *r)
173 {
174   return r->case_idx;
175 }
176
177 /* Doubly linked list of all casefiles. */
178 static struct casefile *casefiles;
179
180 /* Number of bytes of case allocated in in-memory casefiles. */
181 static size_t case_bytes;
182
183 static void register_atexit (void);
184 static void exit_handler (void);
185
186 static void reader_open_file (struct casereader *);
187 static void write_case_to_disk (struct casefile *, const struct ccase *);
188 static void flush_buffer (struct casefile *);
189 static void seek_and_fill_buffer (struct casereader *);
190 static bool fill_buffer (struct casereader *);
191
192 static void io_error (struct casefile *, const char *, ...)
193      PRINTF_FORMAT (2, 3);
194 static int safe_open (const char *file_name, int flags);
195 static int safe_close (int fd);
196
197 /* Creates and returns a casefile to store cases of VALUE_CNT
198    `union value's each. */
199 struct casefile *
200 casefile_create (size_t value_cnt) 
201 {
202   struct casefile *cf = xmalloc (sizeof *cf);
203   cf->next = casefiles;
204   cf->prev = NULL;
205   if (cf->next != NULL)
206     cf->next->prev = cf;
207   casefiles = cf;
208   cf->value_cnt = value_cnt;
209   cf->case_acct_size = (cf->value_cnt + 4) * sizeof *cf->buffer;
210   cf->case_cnt = 0;
211   cf->storage = MEMORY;
212   cf->mode = WRITE;
213   cf->readers = NULL;
214   cf->being_destroyed = 0;
215   cf->ok = true;
216   cf->cases = NULL;
217   cf->fd = -1;
218   cf->file_name = NULL;
219   cf->buffer = NULL;
220   cf->buffer_size = ROUND_UP (cf->value_cnt, IO_BUF_SIZE);
221   if (cf->value_cnt > 0 && cf->buffer_size % cf->value_cnt > 64)
222     cf->buffer_size = cf->value_cnt;
223   cf->buffer_used = 0;
224   register_atexit ();
225   return cf;
226 }
227
228 /* Destroys casefile CF. */
229 void
230 casefile_destroy (struct casefile *cf) 
231 {
232   if (cf != NULL) 
233     {
234       if (cf->next != NULL)
235         cf->next->prev = cf->prev;
236       if (cf->prev != NULL)
237         cf->prev->next = cf->next;
238       if (casefiles == cf)
239         casefiles = cf->next;
240
241       while (cf->readers != NULL) 
242         casereader_destroy (cf->readers);
243
244       if (cf->cases != NULL) 
245         {
246           size_t idx, block_cnt;
247
248           case_bytes -= cf->case_cnt * cf->case_acct_size;
249           for (idx = 0; idx < cf->case_cnt; idx++)
250             {
251               size_t block_idx = idx / CASES_PER_BLOCK;
252               size_t case_idx = idx % CASES_PER_BLOCK;
253               struct ccase *c = &cf->cases[block_idx][case_idx];
254               case_destroy (c);
255             }
256
257           block_cnt = DIV_RND_UP (cf->case_cnt, CASES_PER_BLOCK);
258           for (idx = 0; idx < block_cnt; idx++)
259             free (cf->cases[idx]);
260
261           free (cf->cases);
262         }
263
264       if (cf->fd != -1)
265         safe_close (cf->fd);
266           
267       if (cf->file_name != NULL && remove (cf->file_name) == -1) 
268         io_error (cf, _("%s: Removing temporary file: %s."),
269                   cf->file_name, strerror (errno));
270       free (cf->file_name);
271
272       free (cf->buffer);
273
274       free (cf);
275     }
276 }
277
278 /* Returns true if an I/O error has occurred in casefile CF. */
279 bool
280 casefile_error (const struct casefile *cf) 
281 {
282   return !cf->ok;
283 }
284
285 /* Returns true only if casefile CF is stored in memory (instead of on
286    disk), false otherwise. */
287 bool
288 casefile_in_core (const struct casefile *cf) 
289 {
290   assert (cf != NULL);
291
292   return cf->storage == MEMORY;
293 }
294
295 /* Puts a casefile to "sleep", that is, minimizes the resources
296    needed for it by closing its file descriptor and freeing its
297    buffer.  This is useful if we need so many casefiles that we
298    might not have enough memory and file descriptors to go
299    around.
300
301    For simplicity, this implementation always converts the
302    casefile to reader mode.  If this turns out to be a problem,
303    with a little extra work we could also support sleeping
304    writers.
305
306    Returns true if successful, false if an I/O error occurred. */
307 bool
308 casefile_sleep (const struct casefile *cf_) 
309 {
310   struct casefile *cf = (struct casefile *) cf_;
311   assert (cf != NULL);
312
313   casefile_mode_reader (cf);
314   casefile_to_disk (cf);
315   flush_buffer (cf);
316
317   if (cf->fd != -1) 
318     {
319       safe_close (cf->fd);
320       cf->fd = -1;
321     }
322   if (cf->buffer != NULL) 
323     {
324       free (cf->buffer);
325       cf->buffer = NULL;
326     }
327
328   return cf->ok;
329 }
330
331 /* Returns the number of `union value's in a case for CF. */
332 size_t
333 casefile_get_value_cnt (const struct casefile *cf) 
334 {
335   assert (cf != NULL);
336
337   return cf->value_cnt;
338 }
339
340 /* Returns the number of cases in casefile CF. */
341 unsigned long
342 casefile_get_case_cnt (const struct casefile *cf) 
343 {
344   assert (cf != NULL);
345
346   return cf->case_cnt;
347 }
348
349 /* Appends a copy of case C to casefile CF.  Not valid after any
350    reader for CF has been created.
351    Returns true if successful, false if an I/O error occurred. */
352 bool
353 casefile_append (struct casefile *cf, const struct ccase *c) 
354 {
355   assert (cf != NULL);
356   assert (c != NULL);
357   assert (cf->mode == WRITE);
358
359   assert ( cf->value_cnt <= c->case_data->value_cnt );
360
361   /* Try memory first. */
362   if (cf->storage == MEMORY) 
363     {
364       if (case_bytes < get_workspace ())
365         {
366           size_t block_idx = cf->case_cnt / CASES_PER_BLOCK;
367           size_t case_idx = cf->case_cnt % CASES_PER_BLOCK;
368           struct ccase new_case;
369
370           case_bytes += cf->case_acct_size;
371           case_clone (&new_case, c);
372           if (case_idx == 0) 
373             {
374               if ((block_idx & (block_idx - 1)) == 0) 
375                 {
376                   size_t block_cap = block_idx == 0 ? 1 : block_idx * 2;
377                   cf->cases = xnrealloc (cf->cases,
378                                          block_cap, sizeof *cf->cases);
379                 }
380
381               cf->cases[block_idx] = xnmalloc (CASES_PER_BLOCK,
382                                                sizeof **cf->cases);
383             }
384
385           case_move (&cf->cases[block_idx][case_idx], &new_case);
386         }
387       else
388         {
389           casefile_to_disk (cf);
390           assert (cf->storage == DISK);
391           write_case_to_disk (cf, c);
392         }
393     }
394   else
395     write_case_to_disk (cf, c);
396
397   cf->case_cnt++;
398   return cf->ok;
399 }
400
401 /* Appends case C to casefile CF, which takes over ownership of
402    C.  Not valid after any reader for CF has been created.
403    Returns true if successful, false if an I/O error occurred. */
404 bool
405 casefile_append_xfer (struct casefile *cf, struct ccase *c) 
406 {
407   casefile_append (cf, c);
408   case_destroy (c);
409   return cf->ok;
410 }
411
412 /* Writes case C to casefile CF's disk buffer, first flushing the buffer to
413    disk if it would otherwise overflow.
414    Returns true if successful, false if an I/O error occurred. */
415 static void
416 write_case_to_disk (struct casefile *cf, const struct ccase *c) 
417 {
418   if (!cf->ok)
419     return;
420   
421   case_to_values (c, cf->buffer + cf->buffer_used, cf->value_cnt);
422   cf->buffer_used += cf->value_cnt;
423   if (cf->buffer_used + cf->value_cnt > cf->buffer_size)
424     flush_buffer (cf);
425 }
426
427 /* If any bytes in CF's output buffer are used, flush them to
428    disk. */
429 static void
430 flush_buffer (struct casefile *cf) 
431 {
432   if (cf->ok && cf->buffer_used > 0) 
433     {
434       if (!full_write (cf->fd, cf->buffer,
435                        cf->buffer_size * sizeof *cf->buffer))
436         io_error (cf, _("Error writing temporary file: %s."),
437                   strerror (errno));
438       cf->buffer_used = 0;
439     }
440 }
441
442 /* If CF is currently stored in memory, writes it to disk.  Readers, if any,
443    retain their current positions.
444    Returns true if successful, false if an I/O error occurred. */
445 bool
446 casefile_to_disk (const struct casefile *cf_) 
447 {
448   struct casefile *cf = (struct casefile *) cf_;
449   struct casereader *reader;
450   
451   assert (cf != NULL);
452
453   if (cf->storage == MEMORY)
454     {
455       size_t idx, block_cnt;
456       
457       assert (cf->file_name == NULL);
458       assert (cf->fd == -1);
459       assert (cf->buffer_used == 0);
460
461       if (!make_temp_file (&cf->fd, &cf->file_name))
462         {
463           cf->ok = false;
464           return false;
465         }
466       cf->storage = DISK;
467
468       cf->buffer = xnmalloc (cf->buffer_size, sizeof *cf->buffer);
469       memset (cf->buffer, 0, cf->buffer_size * sizeof *cf->buffer);
470
471       case_bytes -= cf->case_cnt * cf->case_acct_size;
472       for (idx = 0; idx < cf->case_cnt; idx++)
473         {
474           size_t block_idx = idx / CASES_PER_BLOCK;
475           size_t case_idx = idx % CASES_PER_BLOCK;
476           struct ccase *c = &cf->cases[block_idx][case_idx];
477           write_case_to_disk (cf, c);
478           case_destroy (c);
479         }
480
481       block_cnt = DIV_RND_UP (cf->case_cnt, CASES_PER_BLOCK);
482       for (idx = 0; idx < block_cnt; idx++)
483         free (cf->cases[idx]);
484
485       free (cf->cases);
486       cf->cases = NULL;
487
488       if (cf->mode == READ)
489         flush_buffer (cf);
490
491       for (reader = cf->readers; reader != NULL; reader = reader->next)
492         reader_open_file (reader);
493     }
494   return cf->ok;
495 }
496
497 /* Changes CF to reader mode, ensuring that no more cases may be
498    added.  Creating a casereader for CF has the same effect. */
499 void
500 casefile_mode_reader (struct casefile *cf) 
501 {
502   assert (cf != NULL);
503   cf->mode = READ;
504 }
505
506 /* Creates and returns a casereader for CF.  A casereader can be used to
507    sequentially read the cases in a casefile. */
508 struct casereader *
509 casefile_get_reader (const struct casefile *cf_) 
510 {
511   struct casefile *cf = (struct casefile *) cf_;
512   struct casereader *reader;
513
514   assert (cf != NULL);
515   assert (!cf->being_destroyed);
516
517   /* Flush the buffer to disk if it's not empty. */
518   if (cf->mode == WRITE && cf->storage == DISK)
519     flush_buffer (cf);
520   
521   cf->mode = READ;
522
523   reader = xmalloc (sizeof *reader);
524   reader->next = cf->readers;
525   if (cf->readers != NULL)
526     reader->next->prev = reader;
527   cf->readers = reader;
528   reader->prev = NULL;
529   reader->cf = cf;
530   reader->case_idx = 0;
531   reader->destructive = 0;
532   reader->random = false;
533   reader->fd = -1;
534   reader->buffer = NULL;
535   reader->buffer_pos = 0;
536   case_nullify (&reader->c);
537
538   if (reader->cf->storage == DISK) 
539     reader_open_file (reader);
540
541   return reader;
542 }
543
544 /* Creates and returns a random casereader for CF.  A random
545    casereader can be used to randomly read the cases in a
546    casefile. */
547 struct casereader *
548 casefile_get_random_reader (const struct casefile *cf) 
549 {
550   struct casefile  *mutable_casefile = (struct casefile*) cf;
551   struct casereader *reader;
552
553   enum { WRITE, READ } mode = cf->mode ;
554   reader = casefile_get_reader (cf);
555   reader->random = true;
556   mutable_casefile->mode = mode;
557   
558   return reader;
559 }
560
561 /* Creates and returns a destructive casereader for CF.  Like a
562    normal casereader, a destructive casereader sequentially reads
563    the cases in a casefile.  Unlike a normal casereader, a
564    destructive reader cannot operate concurrently with any other
565    reader.  (This restriction could be relaxed in a few ways, but
566    it is so far unnecessary for other code.) */
567 struct casereader *
568 casefile_get_destructive_reader (struct casefile *cf) 
569 {
570   struct casereader *reader;
571   
572   assert (cf->readers == NULL);
573   reader = casefile_get_reader (cf);
574   reader->destructive = 1;
575   cf->being_destroyed = 1;
576   return reader;
577 }
578
579 /* Opens a disk file for READER and seeks to the current position as indicated
580    by case_idx.  Normally the current position is the beginning of the file,
581    but casefile_to_disk may cause the file to be opened at a different
582    position. */
583 static void
584 reader_open_file (struct casereader *reader) 
585 {
586   struct casefile *cf = reader->cf;
587   if (!cf->ok || reader->case_idx >= cf->case_cnt)
588     return;
589
590   if (cf->fd != -1) 
591     {
592       reader->fd = cf->fd;
593       cf->fd = -1;
594     }
595   else 
596     {
597       reader->fd = safe_open (cf->file_name, O_RDONLY);
598       if (reader->fd < 0)
599         io_error (cf, _("%s: Opening temporary file: %s."),
600                   cf->file_name, strerror (errno));
601     }
602
603   if (cf->buffer != NULL) 
604     {
605       reader->buffer = cf->buffer;
606       cf->buffer = NULL; 
607     }
608   else 
609     {
610       reader->buffer = xnmalloc (cf->buffer_size, sizeof *cf->buffer);
611       memset (reader->buffer, 0, cf->buffer_size * sizeof *cf->buffer); 
612     }
613
614   case_create (&reader->c, cf->value_cnt);
615
616   reader->buffer_ofs = -1;
617   reader->file_ofs = -1;
618   seek_and_fill_buffer (reader);
619 }
620
621 /* Seeks the backing file for READER to the proper position and
622    refreshes the buffer contents. */
623 static void
624 seek_and_fill_buffer (struct casereader *reader) 
625 {
626   struct casefile *cf = reader->cf;
627   off_t new_ofs;
628
629   if (cf->value_cnt != 0) 
630     {
631       size_t buffer_case_cnt = cf->buffer_size / cf->value_cnt;
632       new_ofs = ((off_t) reader->case_idx / buffer_case_cnt
633                   * cf->buffer_size * sizeof *cf->buffer);
634       reader->buffer_pos = (reader->case_idx % buffer_case_cnt
635                             * cf->value_cnt);
636     }
637   else 
638     new_ofs = 0;
639   if (new_ofs != reader->file_ofs) 
640     {
641       if (lseek (reader->fd, new_ofs, SEEK_SET) != new_ofs)
642         io_error (cf, _("%s: Seeking temporary file: %s."),
643                   cf->file_name, strerror (errno));
644       else
645         reader->file_ofs = new_ofs;
646     }
647
648   if (cf->case_cnt > 0 && cf->value_cnt > 0 && reader->buffer_ofs != new_ofs)
649     fill_buffer (reader);
650 }
651
652 /* Fills READER's buffer by reading a block from disk. */
653 static bool
654 fill_buffer (struct casereader *reader)
655 {
656   if (reader->cf->ok) 
657     {
658       int bytes = full_read (reader->fd, reader->buffer,
659                              reader->cf->buffer_size * sizeof *reader->buffer);
660       if (bytes < 0) 
661         io_error (reader->cf, _("%s: Reading temporary file: %s."),
662                   reader->cf->file_name, strerror (errno));
663       else if (bytes != reader->cf->buffer_size * sizeof *reader->buffer) 
664         io_error (reader->cf, _("%s: Temporary file ended unexpectedly."),
665                   reader->cf->file_name);
666       else 
667         {
668           reader->buffer_ofs = reader->file_ofs;
669           reader->file_ofs += bytes; 
670         }
671     }
672   return reader->cf->ok;
673 }
674
675 /* Returns the casefile that READER reads. */
676 const struct casefile *
677 casereader_get_casefile (const struct casereader *reader) 
678 {
679   assert (reader != NULL);
680   
681   return reader->cf;
682 }
683
684 /* Reads a copy of the next case from READER into C.
685    Caller is responsible for destroying C.
686    Returns true if successful, false at end of file. */
687 bool
688 casereader_read (struct casereader *reader, struct ccase *c) 
689 {
690   assert (reader != NULL);
691   
692   if (!reader->cf->ok || reader->case_idx >= reader->cf->case_cnt) 
693     return false;
694
695   if (reader->cf->storage == MEMORY) 
696     {
697       size_t block_idx = reader->case_idx / CASES_PER_BLOCK;
698       size_t case_idx = reader->case_idx % CASES_PER_BLOCK;
699
700       case_clone (c, &reader->cf->cases[block_idx][case_idx]);
701       reader->case_idx++;
702       return true;
703     }
704   else 
705     {
706       if (reader->buffer_pos + reader->cf->value_cnt > reader->cf->buffer_size)
707         {
708           if (!fill_buffer (reader))
709             return false;
710           reader->buffer_pos = 0;
711         }
712
713       case_from_values (&reader->c, reader->buffer + reader->buffer_pos,
714                         reader->cf->value_cnt);
715       reader->buffer_pos += reader->cf->value_cnt;
716       reader->case_idx++;
717
718       case_clone (c, &reader->c);
719       return true;
720     }
721 }
722
723 /* Reads the next case from READER into C and transfers ownership
724    to the caller.  Caller is responsible for destroying C.
725    Returns true if successful, false at end of file or on I/O
726    error. */
727 bool
728 casereader_read_xfer (struct casereader *reader, struct ccase *c)
729 {
730   assert (reader != NULL);
731
732   if (reader->destructive == 0
733       || reader->case_idx >= reader->cf->case_cnt
734       || reader->cf->storage == DISK) 
735     return casereader_read (reader, c);
736   else 
737     {
738       size_t block_idx = reader->case_idx / CASES_PER_BLOCK;
739       size_t case_idx = reader->case_idx % CASES_PER_BLOCK;
740       struct ccase *read_case = &reader->cf->cases[block_idx][case_idx];
741
742       case_move (c, read_case);
743       reader->case_idx++;
744       return true;
745     }
746 }
747
748 /* Sets the next case to be read by READER to CASE_IDX,
749    which must be less than the number of cases in the casefile.
750    Allowed only for random readers. */
751 void
752 casereader_seek (struct casereader *reader, unsigned long case_idx) 
753 {
754   assert (reader != NULL);
755   assert (reader->random);
756   assert (case_idx < reader->cf->case_cnt);
757
758   reader->case_idx = case_idx;
759   if (reader->cf->storage == DISK)
760     seek_and_fill_buffer (reader);
761 }
762
763 /* Destroys READER. */
764 void
765 casereader_destroy (struct casereader *reader)
766 {
767   assert (reader != NULL);
768
769   if (reader->next != NULL)
770     reader->next->prev = reader->prev;
771   if (reader->prev != NULL)
772     reader->prev->next = reader->next;
773   if (reader->cf->readers == reader)
774     reader->cf->readers = reader->next;
775
776   if (reader->cf->buffer == NULL)
777     reader->cf->buffer = reader->buffer;
778   else
779     free (reader->buffer);
780
781   if (reader->fd != -1) 
782     {
783       if (reader->cf->fd == -1)
784         reader->cf->fd = reader->fd;
785       else
786         safe_close (reader->fd);
787     }
788   
789   case_destroy (&reader->c);
790
791   free (reader);
792 }
793
794 /* Marks CF as having encountered an I/O error.
795    If this is the first error on CF, reports FORMAT to the user,
796    doing printf()-style substitutions. */
797 static void
798 io_error (struct casefile *cf, const char *format, ...)
799 {
800   if (cf->ok) 
801     {
802       struct msg m;
803       va_list args;
804
805       m.category = MSG_GENERAL;
806       m.severity = MSG_ERROR;
807       m.where.file_name = NULL;
808       m.where.line_number = -1;
809       va_start (args, format);
810       m.text = xvasprintf (format, args);
811       va_end (args);
812       
813       msg_emit (&m);
814     }
815   cf->ok = false;
816 }
817
818 /* Calls open(), passing FILE_NAME and FLAGS, repeating as necessary
819    to deal with interrupted calls. */
820 static int
821 safe_open (const char *file_name, int flags) 
822 {
823   int fd;
824
825   do 
826     {
827       fd = open (file_name, flags);
828     }
829   while (fd == -1 && errno == EINTR);
830
831   return fd;
832 }
833
834 /* Calls close(), passing FD, repeating as necessary to deal with
835    interrupted calls. */
836 static int safe_close (int fd) 
837 {
838   int retval;
839
840   do 
841     {
842       retval = close (fd);
843     }
844   while (retval == -1 && errno == EINTR);
845
846   return retval;
847 }
848
849 /* Registers our exit handler with atexit() if it has not already
850    been registered. */
851 static void
852 register_atexit (void) 
853 {
854   static bool registered = false;
855   if (!registered) 
856     {
857       registered = true;
858       atexit (exit_handler);
859     }
860 }
861
862 /* atexit() handler that closes and deletes our temporary
863    files. */
864 static void
865 exit_handler (void) 
866 {
867   while (casefiles != NULL)
868     casefile_destroy (casefiles);
869 }