Implemented data-store using a casefile instead of an array of cases.
[pspp] / src / data / casefile.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 2004 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21 #include "casefile.h"
22 #include <assert.h>
23 #include <errno.h>
24 #include <fcntl.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <unistd.h>
29 #include <libpspp/alloc.h>
30 #include "case.h"
31 #include <libpspp/compiler.h>
32 #include <libpspp/message.h>
33 #include "full-read.h"
34 #include "full-write.h"
35 #include <libpspp/misc.h>
36 #include "make-file.h"
37 #include "settings.h"
38 #include "variable.h"
39
40 #include "gettext.h"
41 #define _(msgid) gettext (msgid)
42
43 #define IO_BUF_SIZE (8192 / sizeof (union value))
44
45 /* A casefile represents a sequentially accessible stream of
46    immutable cases.
47
48    If workspace allows, a casefile is maintained in memory.  If
49    workspace overflows, then the casefile is pushed to disk.  In
50    either case the interface presented to callers is kept the
51    same.
52
53    The life cycle of a casefile consists of up to three phases:
54
55        1. Writing.  The casefile initially contains no cases.  In
56           this phase, any number of cases may be appended to the
57           end of a casefile.  (Cases are never inserted in the
58           middle or before the beginning of a casefile.)
59
60           Use casefile_append() or casefile_append_xfer() to
61           append a case to a casefile.
62
63        2. Reading.  The casefile may be read sequentially,
64           starting from the beginning, by "casereaders".  Any
65           number of casereaders may be created, at any time,
66           during the reading phase.  Each casereader has an
67           independent position in the casefile.
68
69           Ordinary casereaders may only move forward.  They
70           cannot move backward to arbitrary records or seek
71           randomly.  Cloning casereaders is possible, but it is
72           not yet implemented.
73
74           Use casefile_get_reader() to create a casereader for
75           use in phase 2.  This also transitions from phase 1 to
76           phase 2.  Calling casefile_mode_reader() makes the same
77           transition, without creating a casereader.
78
79           Use casereader_read() or casereader_read_xfer() to read
80           a case from a casereader.  Use casereader_destroy() to
81           discard a casereader when it is no longer needed.
82
83           "Random" casereaders, which support a seek operation,
84           may also be created.  These should not, generally, be
85           used for statistical procedures, because random access
86           is much slower than sequential access.  They are
87           intended for use by the GUI.
88
89        3. Destruction.  This phase is optional.  The casefile is
90           also read with casereaders in this phase, but the
91           ability to create new casereaders is curtailed.
92
93           In this phase, casereaders could still be cloned (once
94           we eventually implement cloning).
95
96           To transition from phase 1 or 2 to phase 3 and create a
97           casereader, call casefile_get_destructive_reader().
98           The same functions apply to the casereader obtained
99           this way as apply to casereaders obtained in phase 2.
100           
101           After casefile_get_destructive_reader() is called, no
102           more casereaders may be created with
103           casefile_get_reader() or
104           casefile_get_destructive_reader().  (If cloning of
105           casereaders were implemented, it would still be
106           possible.)
107
108           The purpose of the limitations applied to casereaders
109           in phase 3 is to allow in-memory casefiles to fully
110           transfer ownership of cases to the casereaders,
111           avoiding the need for extra copies of case data.  For
112           relatively static data sets with many variables, I
113           suspect (without evidence) that this may be a big
114           performance boost.
115
116    When a casefile is no longer needed, it may be destroyed with
117    casefile_destroy().  This function will also destroy any
118    remaining casereaders. */
119
120 /* FIXME: should we implement compression? */
121
122 /* In-memory cases are arranged in an array of arrays.  The top
123    level is variable size and the size of each bottom level array
124    is fixed at the number of cases defined here.  */
125 #define CASES_PER_BLOCK 128             
126
127 /* A casefile. */
128 struct casefile 
129   {
130     /* Basic data. */
131     struct casefile *next, *prev;       /* Next, prev in global list. */
132     size_t value_cnt;                   /* Case size in `union value's. */
133     size_t case_acct_size;              /* Case size for accounting. */
134     unsigned long case_cnt;             /* Number of cases stored. */
135     enum { MEMORY, DISK } storage;      /* Where cases are stored. */
136     enum { WRITE, READ } mode;          /* Is writing or reading allowed? */
137     struct casereader *readers;         /* List of our readers. */
138     bool being_destroyed;               /* Does a destructive reader exist? */
139     bool ok;                            /* False after I/O error. */
140
141     /* Memory storage. */
142     struct ccase **cases;               /* Pointer to array of cases. */
143
144     /* Disk storage. */
145     int fd;                             /* File descriptor, -1 if none. */
146     char *file_name;                    /* File name. */
147     union value *buffer;                /* I/O buffer, NULL if none. */
148     size_t buffer_used;                 /* Number of values used in buffer. */
149     size_t buffer_size;                 /* Buffer size in values. */
150   };
151
152 /* For reading out the cases in a casefile. */
153 struct casereader 
154   {
155     struct casereader *next, *prev;     /* Next, prev in casefile's list. */
156     struct casefile *cf;                /* Our casefile. */
157     unsigned long case_idx;             /* Case number of current case. */
158     bool destructive;                   /* Is this a destructive reader? */
159     bool random;                        /* Is this a random reader? */
160
161     /* Disk storage. */
162     int fd;                             /* File descriptor. */
163     off_t file_ofs;                     /* Current position in fd. */
164     off_t buffer_ofs;                   /* File offset of buffer start. */
165     union value *buffer;                /* I/O buffer. */
166     size_t buffer_pos;                  /* Offset of buffer position. */
167     struct ccase c;                     /* Current case. */
168   };
169
170 /* Return the case number of the current case */
171 unsigned long
172 casereader_cnum(const struct casereader *r)
173 {
174   return r->case_idx;
175 }
176
177 /* Doubly linked list of all casefiles. */
178 static struct casefile *casefiles;
179
180 /* Number of bytes of case allocated in in-memory casefiles. */
181 static size_t case_bytes;
182
183 static void register_atexit (void);
184 static void exit_handler (void);
185
186 static void reader_open_file (struct casereader *);
187 static void write_case_to_disk (struct casefile *, const struct ccase *);
188 static void flush_buffer (struct casefile *);
189 static void seek_and_fill_buffer (struct casereader *);
190 static bool fill_buffer (struct casereader *);
191
192 static void io_error (struct casefile *, const char *, ...)
193      PRINTF_FORMAT (2, 3);
194 static int safe_open (const char *file_name, int flags);
195 static int safe_close (int fd);
196
197 /* Creates and returns a casefile to store cases of VALUE_CNT
198    `union value's each. */
199 struct casefile *
200 casefile_create (size_t value_cnt) 
201 {
202   struct casefile *cf = xmalloc (sizeof *cf);
203   cf->next = casefiles;
204   cf->prev = NULL;
205   if (cf->next != NULL)
206     cf->next->prev = cf;
207   casefiles = cf;
208   cf->value_cnt = value_cnt;
209   cf->case_acct_size = (cf->value_cnt + 4) * sizeof *cf->buffer;
210   cf->case_cnt = 0;
211   cf->storage = MEMORY;
212   cf->mode = WRITE;
213   cf->readers = NULL;
214   cf->being_destroyed = 0;
215   cf->ok = true;
216   cf->cases = NULL;
217   cf->fd = -1;
218   cf->file_name = NULL;
219   cf->buffer = NULL;
220   cf->buffer_size = ROUND_UP (cf->value_cnt, IO_BUF_SIZE);
221   if (cf->value_cnt > 0 && cf->buffer_size % cf->value_cnt > 64)
222     cf->buffer_size = cf->value_cnt;
223   cf->buffer_used = 0;
224   register_atexit ();
225   return cf;
226 }
227
228 /* Destroys casefile CF. */
229 void
230 casefile_destroy (struct casefile *cf) 
231 {
232   if (cf != NULL) 
233     {
234       if (cf->next != NULL)
235         cf->next->prev = cf->prev;
236       if (cf->prev != NULL)
237         cf->prev->next = cf->next;
238       if (casefiles == cf)
239         casefiles = cf->next;
240
241       while (cf->readers != NULL) 
242         casereader_destroy (cf->readers);
243
244       if (cf->cases != NULL) 
245         {
246           size_t idx, block_cnt;
247
248           case_bytes -= cf->case_cnt * cf->case_acct_size;
249           for (idx = 0; idx < cf->case_cnt; idx++)
250             {
251               size_t block_idx = idx / CASES_PER_BLOCK;
252               size_t case_idx = idx % CASES_PER_BLOCK;
253               struct ccase *c = &cf->cases[block_idx][case_idx];
254               case_destroy (c);
255             }
256
257           block_cnt = DIV_RND_UP (cf->case_cnt, CASES_PER_BLOCK);
258           for (idx = 0; idx < block_cnt; idx++)
259             free (cf->cases[idx]);
260
261           free (cf->cases);
262         }
263
264       if (cf->fd != -1)
265         safe_close (cf->fd);
266           
267       if (cf->file_name != NULL && remove (cf->file_name) == -1) 
268         io_error (cf, _("%s: Removing temporary file: %s."),
269                   cf->file_name, strerror (errno));
270       free (cf->file_name);
271
272       free (cf->buffer);
273
274       free (cf);
275     }
276 }
277
278 /* Returns true if an I/O error has occurred in casefile CF. */
279 bool
280 casefile_error (const struct casefile *cf) 
281 {
282   return !cf->ok;
283 }
284
285 /* Returns true only if casefile CF is stored in memory (instead of on
286    disk), false otherwise. */
287 bool
288 casefile_in_core (const struct casefile *cf) 
289 {
290   assert (cf != NULL);
291
292   return cf->storage == MEMORY;
293 }
294
295 /* Puts a casefile to "sleep", that is, minimizes the resources
296    needed for it by closing its file descriptor and freeing its
297    buffer.  This is useful if we need so many casefiles that we
298    might not have enough memory and file descriptors to go
299    around.
300
301    For simplicity, this implementation always converts the
302    casefile to reader mode.  If this turns out to be a problem,
303    with a little extra work we could also support sleeping
304    writers.
305
306    Returns true if successful, false if an I/O error occurred. */
307 bool
308 casefile_sleep (const struct casefile *cf_) 
309 {
310   struct casefile *cf = (struct casefile *) cf_;
311   assert (cf != NULL);
312
313   casefile_mode_reader (cf);
314   casefile_to_disk (cf);
315   flush_buffer (cf);
316
317   if (cf->fd != -1) 
318     {
319       safe_close (cf->fd);
320       cf->fd = -1;
321     }
322   if (cf->buffer != NULL) 
323     {
324       free (cf->buffer);
325       cf->buffer = NULL;
326     }
327
328   return cf->ok;
329 }
330
331 /* Returns the number of `union value's in a case for CF. */
332 size_t
333 casefile_get_value_cnt (const struct casefile *cf) 
334 {
335   assert (cf != NULL);
336
337   return cf->value_cnt;
338 }
339
340 /* Returns the number of cases in casefile CF. */
341 unsigned long
342 casefile_get_case_cnt (const struct casefile *cf) 
343 {
344   assert (cf != NULL);
345
346   return cf->case_cnt;
347 }
348
349 /* Appends a copy of case C to casefile CF.  Not valid after any
350    reader for CF has been created.
351    Returns true if successful, false if an I/O error occurred. */
352 bool
353 casefile_append (struct casefile *cf, const struct ccase *c) 
354 {
355   assert (cf != NULL);
356   assert (c != NULL);
357   assert (cf->mode == WRITE);
358
359   /* Try memory first. */
360   if (cf->storage == MEMORY) 
361     {
362       if (case_bytes < get_workspace ())
363         {
364           size_t block_idx = cf->case_cnt / CASES_PER_BLOCK;
365           size_t case_idx = cf->case_cnt % CASES_PER_BLOCK;
366           struct ccase new_case;
367
368           case_bytes += cf->case_acct_size;
369           case_clone (&new_case, c);
370           if (case_idx == 0) 
371             {
372               if ((block_idx & (block_idx - 1)) == 0) 
373                 {
374                   size_t block_cap = block_idx == 0 ? 1 : block_idx * 2;
375                   cf->cases = xnrealloc (cf->cases,
376                                          block_cap, sizeof *cf->cases);
377                 }
378
379               cf->cases[block_idx] = xnmalloc (CASES_PER_BLOCK,
380                                                sizeof **cf->cases);
381             }
382
383           case_move (&cf->cases[block_idx][case_idx], &new_case);
384         }
385       else
386         {
387           casefile_to_disk (cf);
388           assert (cf->storage == DISK);
389           write_case_to_disk (cf, c);
390         }
391     }
392   else
393     write_case_to_disk (cf, c);
394
395   cf->case_cnt++;
396   return cf->ok;
397 }
398
399 /* Appends case C to casefile CF, which takes over ownership of
400    C.  Not valid after any reader for CF has been created.
401    Returns true if successful, false if an I/O error occurred. */
402 bool
403 casefile_append_xfer (struct casefile *cf, struct ccase *c) 
404 {
405   casefile_append (cf, c);
406   case_destroy (c);
407   return cf->ok;
408 }
409
410 /* Writes case C to casefile CF's disk buffer, first flushing the buffer to
411    disk if it would otherwise overflow.
412    Returns true if successful, false if an I/O error occurred. */
413 static void
414 write_case_to_disk (struct casefile *cf, const struct ccase *c) 
415 {
416   if (!cf->ok)
417     return;
418   
419   case_to_values (c, cf->buffer + cf->buffer_used, cf->value_cnt);
420   cf->buffer_used += cf->value_cnt;
421   if (cf->buffer_used + cf->value_cnt > cf->buffer_size)
422     flush_buffer (cf);
423 }
424
425 /* If any bytes in CF's output buffer are used, flush them to
426    disk. */
427 static void
428 flush_buffer (struct casefile *cf) 
429 {
430   if (cf->ok && cf->buffer_used > 0) 
431     {
432       if (!full_write (cf->fd, cf->buffer,
433                        cf->buffer_size * sizeof *cf->buffer))
434         io_error (cf, _("Error writing temporary file: %s."),
435                   strerror (errno));
436       cf->buffer_used = 0;
437     }
438 }
439
440 /* If CF is currently stored in memory, writes it to disk.  Readers, if any,
441    retain their current positions.
442    Returns true if successful, false if an I/O error occurred. */
443 bool
444 casefile_to_disk (const struct casefile *cf_) 
445 {
446   struct casefile *cf = (struct casefile *) cf_;
447   struct casereader *reader;
448   
449   assert (cf != NULL);
450
451   if (cf->storage == MEMORY)
452     {
453       size_t idx, block_cnt;
454       
455       assert (cf->file_name == NULL);
456       assert (cf->fd == -1);
457       assert (cf->buffer_used == 0);
458
459       if (!make_temp_file (&cf->fd, &cf->file_name))
460         {
461           cf->ok = false;
462           return false;
463         }
464       cf->storage = DISK;
465
466       cf->buffer = xnmalloc (cf->buffer_size, sizeof *cf->buffer);
467       memset (cf->buffer, 0, cf->buffer_size * sizeof *cf->buffer);
468
469       case_bytes -= cf->case_cnt * cf->case_acct_size;
470       for (idx = 0; idx < cf->case_cnt; idx++)
471         {
472           size_t block_idx = idx / CASES_PER_BLOCK;
473           size_t case_idx = idx % CASES_PER_BLOCK;
474           struct ccase *c = &cf->cases[block_idx][case_idx];
475           write_case_to_disk (cf, c);
476           case_destroy (c);
477         }
478
479       block_cnt = DIV_RND_UP (cf->case_cnt, CASES_PER_BLOCK);
480       for (idx = 0; idx < block_cnt; idx++)
481         free (cf->cases[idx]);
482
483       free (cf->cases);
484       cf->cases = NULL;
485
486       if (cf->mode == READ)
487         flush_buffer (cf);
488
489       for (reader = cf->readers; reader != NULL; reader = reader->next)
490         reader_open_file (reader);
491     }
492   return cf->ok;
493 }
494
495 /* Changes CF to reader mode, ensuring that no more cases may be
496    added.  Creating a casereader for CF has the same effect. */
497 void
498 casefile_mode_reader (struct casefile *cf) 
499 {
500   assert (cf != NULL);
501   cf->mode = READ;
502 }
503
504 /* Creates and returns a casereader for CF.  A casereader can be used to
505    sequentially read the cases in a casefile. */
506 struct casereader *
507 casefile_get_reader (const struct casefile *cf_) 
508 {
509   struct casefile *cf = (struct casefile *) cf_;
510   struct casereader *reader;
511
512   assert (cf != NULL);
513   assert (!cf->being_destroyed);
514
515   /* Flush the buffer to disk if it's not empty. */
516   if (cf->mode == WRITE && cf->storage == DISK)
517     flush_buffer (cf);
518   
519   cf->mode = READ;
520
521   reader = xmalloc (sizeof *reader);
522   reader->next = cf->readers;
523   if (cf->readers != NULL)
524     reader->next->prev = reader;
525   cf->readers = reader;
526   reader->prev = NULL;
527   reader->cf = cf;
528   reader->case_idx = 0;
529   reader->destructive = 0;
530   reader->random = false;
531   reader->fd = -1;
532   reader->buffer = NULL;
533   reader->buffer_pos = 0;
534   case_nullify (&reader->c);
535
536   if (reader->cf->storage == DISK) 
537     reader_open_file (reader);
538
539   return reader;
540 }
541
542 /* Creates and returns a random casereader for CF.  A random
543    casereader can be used to randomly read the cases in a
544    casefile. */
545 struct casereader *
546 casefile_get_random_reader (const struct casefile *cf) 
547 {
548   struct casefile  *mutable_casefile = (struct casefile*) cf;
549   struct casereader *reader;
550
551   enum { WRITE, READ } mode = cf->mode ;
552   reader = casefile_get_reader (cf);
553   reader->random = true;
554   mutable_casefile->mode = mode;
555   
556   return reader;
557 }
558
559 /* Creates and returns a destructive casereader for CF.  Like a
560    normal casereader, a destructive casereader sequentially reads
561    the cases in a casefile.  Unlike a normal casereader, a
562    destructive reader cannot operate concurrently with any other
563    reader.  (This restriction could be relaxed in a few ways, but
564    it is so far unnecessary for other code.) */
565 struct casereader *
566 casefile_get_destructive_reader (struct casefile *cf) 
567 {
568   struct casereader *reader;
569   
570   assert (cf->readers == NULL);
571   reader = casefile_get_reader (cf);
572   reader->destructive = 1;
573   cf->being_destroyed = 1;
574   return reader;
575 }
576
577 /* Opens a disk file for READER and seeks to the current position as indicated
578    by case_idx.  Normally the current position is the beginning of the file,
579    but casefile_to_disk may cause the file to be opened at a different
580    position. */
581 static void
582 reader_open_file (struct casereader *reader) 
583 {
584   struct casefile *cf = reader->cf;
585   if (!cf->ok || reader->case_idx >= cf->case_cnt)
586     return;
587
588   if (cf->fd != -1) 
589     {
590       reader->fd = cf->fd;
591       cf->fd = -1;
592     }
593   else 
594     {
595       reader->fd = safe_open (cf->file_name, O_RDONLY);
596       if (reader->fd < 0)
597         io_error (cf, _("%s: Opening temporary file: %s."),
598                   cf->file_name, strerror (errno));
599     }
600
601   if (cf->buffer != NULL) 
602     {
603       reader->buffer = cf->buffer;
604       cf->buffer = NULL; 
605     }
606   else 
607     {
608       reader->buffer = xnmalloc (cf->buffer_size, sizeof *cf->buffer);
609       memset (reader->buffer, 0, cf->buffer_size * sizeof *cf->buffer); 
610     }
611
612   case_create (&reader->c, cf->value_cnt);
613
614   reader->buffer_ofs = -1;
615   reader->file_ofs = -1;
616   seek_and_fill_buffer (reader);
617 }
618
619 /* Seeks the backing file for READER to the proper position and
620    refreshes the buffer contents. */
621 static void
622 seek_and_fill_buffer (struct casereader *reader) 
623 {
624   struct casefile *cf = reader->cf;
625   off_t new_ofs;
626
627   if (cf->value_cnt != 0) 
628     {
629       size_t buffer_case_cnt = cf->buffer_size / cf->value_cnt;
630       new_ofs = ((off_t) reader->case_idx / buffer_case_cnt
631                   * cf->buffer_size * sizeof *cf->buffer);
632       reader->buffer_pos = (reader->case_idx % buffer_case_cnt
633                             * cf->value_cnt);
634     }
635   else 
636     new_ofs = 0;
637   if (new_ofs != reader->file_ofs) 
638     {
639       if (lseek (reader->fd, new_ofs, SEEK_SET) != new_ofs)
640         io_error (cf, _("%s: Seeking temporary file: %s."),
641                   cf->file_name, strerror (errno));
642       else
643         reader->file_ofs = new_ofs;
644     }
645
646   if (cf->case_cnt > 0 && cf->value_cnt > 0 && reader->buffer_ofs != new_ofs)
647     fill_buffer (reader);
648 }
649
650 /* Fills READER's buffer by reading a block from disk. */
651 static bool
652 fill_buffer (struct casereader *reader)
653 {
654   if (reader->cf->ok) 
655     {
656       int bytes = full_read (reader->fd, reader->buffer,
657                              reader->cf->buffer_size * sizeof *reader->buffer);
658       if (bytes < 0) 
659         io_error (reader->cf, _("%s: Reading temporary file: %s."),
660                   reader->cf->file_name, strerror (errno));
661       else if (bytes != reader->cf->buffer_size * sizeof *reader->buffer) 
662         io_error (reader->cf, _("%s: Temporary file ended unexpectedly."),
663                   reader->cf->file_name);
664       else 
665         {
666           reader->buffer_ofs = reader->file_ofs;
667           reader->file_ofs += bytes; 
668         }
669     }
670   return reader->cf->ok;
671 }
672
673 /* Returns the casefile that READER reads. */
674 const struct casefile *
675 casereader_get_casefile (const struct casereader *reader) 
676 {
677   assert (reader != NULL);
678   
679   return reader->cf;
680 }
681
682 /* Reads a copy of the next case from READER into C.
683    Caller is responsible for destroying C.
684    Returns true if successful, false at end of file. */
685 bool
686 casereader_read (struct casereader *reader, struct ccase *c) 
687 {
688   assert (reader != NULL);
689   
690   if (!reader->cf->ok || reader->case_idx >= reader->cf->case_cnt) 
691     return false;
692
693   if (reader->cf->storage == MEMORY) 
694     {
695       size_t block_idx = reader->case_idx / CASES_PER_BLOCK;
696       size_t case_idx = reader->case_idx % CASES_PER_BLOCK;
697
698       case_clone (c, &reader->cf->cases[block_idx][case_idx]);
699       reader->case_idx++;
700       return true;
701     }
702   else 
703     {
704       if (reader->buffer_pos + reader->cf->value_cnt > reader->cf->buffer_size)
705         {
706           if (!fill_buffer (reader))
707             return false;
708           reader->buffer_pos = 0;
709         }
710
711       case_from_values (&reader->c, reader->buffer + reader->buffer_pos,
712                         reader->cf->value_cnt);
713       reader->buffer_pos += reader->cf->value_cnt;
714       reader->case_idx++;
715
716       case_clone (c, &reader->c);
717       return true;
718     }
719 }
720
721 /* Reads the next case from READER into C and transfers ownership
722    to the caller.  Caller is responsible for destroying C.
723    Returns true if successful, false at end of file or on I/O
724    error. */
725 bool
726 casereader_read_xfer (struct casereader *reader, struct ccase *c)
727 {
728   assert (reader != NULL);
729
730   if (reader->destructive == 0
731       || reader->case_idx >= reader->cf->case_cnt
732       || reader->cf->storage == DISK) 
733     return casereader_read (reader, c);
734   else 
735     {
736       size_t block_idx = reader->case_idx / CASES_PER_BLOCK;
737       size_t case_idx = reader->case_idx % CASES_PER_BLOCK;
738       struct ccase *read_case = &reader->cf->cases[block_idx][case_idx];
739
740       case_move (c, read_case);
741       reader->case_idx++;
742       return true;
743     }
744 }
745
746 /* Sets the next case to be read by READER to CASE_IDX,
747    which must be less than the number of cases in the casefile.
748    Allowed only for random readers. */
749 void
750 casereader_seek (struct casereader *reader, unsigned long case_idx) 
751 {
752   assert (reader != NULL);
753   assert (reader->random);
754   assert (case_idx < reader->cf->case_cnt);
755
756   reader->case_idx = case_idx;
757   if (reader->cf->storage == DISK)
758     seek_and_fill_buffer (reader);
759 }
760
761 /* Destroys READER. */
762 void
763 casereader_destroy (struct casereader *reader)
764 {
765   assert (reader != NULL);
766
767   if (reader->next != NULL)
768     reader->next->prev = reader->prev;
769   if (reader->prev != NULL)
770     reader->prev->next = reader->next;
771   if (reader->cf->readers == reader)
772     reader->cf->readers = reader->next;
773
774   if (reader->cf->buffer == NULL)
775     reader->cf->buffer = reader->buffer;
776   else
777     free (reader->buffer);
778
779   if (reader->fd != -1) 
780     {
781       if (reader->cf->fd == -1)
782         reader->cf->fd = reader->fd;
783       else
784         safe_close (reader->fd);
785     }
786   
787   case_destroy (&reader->c);
788
789   free (reader);
790 }
791
792 /* Marks CF as having encountered an I/O error.
793    If this is the first error on CF, reports FORMAT to the user,
794    doing printf()-style substitutions. */
795 static void
796 io_error (struct casefile *cf, const char *format, ...)
797 {
798   if (cf->ok) 
799     {
800       struct msg m;
801       va_list args;
802
803       m.category = MSG_GENERAL;
804       m.severity = MSG_ERROR;
805       m.where.file_name = NULL;
806       m.where.line_number = -1;
807       va_start (args, format);
808       m.text = xvasprintf (format, args);
809       va_end (args);
810       
811       msg_emit (&m);
812     }
813   cf->ok = false;
814 }
815
816 /* Calls open(), passing FILE_NAME and FLAGS, repeating as necessary
817    to deal with interrupted calls. */
818 static int
819 safe_open (const char *file_name, int flags) 
820 {
821   int fd;
822
823   do 
824     {
825       fd = open (file_name, flags);
826     }
827   while (fd == -1 && errno == EINTR);
828
829   return fd;
830 }
831
832 /* Calls close(), passing FD, repeating as necessary to deal with
833    interrupted calls. */
834 static int safe_close (int fd) 
835 {
836   int retval;
837
838   do 
839     {
840       retval = close (fd);
841     }
842   while (retval == -1 && errno == EINTR);
843
844   return retval;
845 }
846
847 /* Registers our exit handler with atexit() if it has not already
848    been registered. */
849 static void
850 register_atexit (void) 
851 {
852   static bool registered = false;
853   if (!registered) 
854     {
855       registered = true;
856       atexit (exit_handler);
857     }
858 }
859
860 /* atexit() handler that closes and deletes our temporary
861    files. */
862 static void
863 exit_handler (void) 
864 {
865   while (casefiles != NULL)
866     casefile_destroy (casefiles);
867 }