525611bfa140fed8d2adfd09ae9ad70c5c0eabca
[pspp-builds.git] / src / data / casefile.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 2004 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21 #include "casefile.h"
22 #include <assert.h>
23 #include <errno.h>
24 #include <fcntl.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <unistd.h>
29 #include <libpspp/alloc.h>
30 #include "case.h"
31 #include <libpspp/compiler.h>
32 #include <libpspp/message.h>
33 #include "full-read.h"
34 #include "full-write.h"
35 #include <libpspp/misc.h>
36 #include <libpspp/str.h>
37 #include "make-file.h"
38 #include "settings.h"
39 #include "variable.h"
40
41 #include "gettext.h"
42 #define _(msgid) gettext (msgid)
43
44 #define IO_BUF_SIZE (8192 / sizeof (union value))
45
46 /* A casefile represents a sequentially accessible stream of
47    immutable cases.
48
49    If workspace allows, a casefile is maintained in memory.  If
50    workspace overflows, then the casefile is pushed to disk.  In
51    either case the interface presented to callers is kept the
52    same.
53
54    The life cycle of a casefile consists of up to three phases:
55
56        1. Writing.  The casefile initially contains no cases.  In
57           this phase, any number of cases may be appended to the
58           end of a casefile.  (Cases are never inserted in the
59           middle or before the beginning of a casefile.)
60
61           Use casefile_append() or casefile_append_xfer() to
62           append a case to a casefile.
63
64        2. Reading.  The casefile may be read sequentially,
65           starting from the beginning, by "casereaders".  Any
66           number of casereaders may be created, at any time,
67           during the reading phase.  Each casereader has an
68           independent position in the casefile.
69
70           Ordinary casereaders may only move forward.  They
71           cannot move backward to arbitrary records or seek
72           randomly.  Cloning casereaders is possible, but it is
73           not yet implemented.
74
75           Use casefile_get_reader() to create a casereader for
76           use in phase 2.  This also transitions from phase 1 to
77           phase 2.  Calling casefile_mode_reader() makes the same
78           transition, without creating a casereader.
79
80           Use casereader_read() or casereader_read_xfer() to read
81           a case from a casereader.  Use casereader_destroy() to
82           discard a casereader when it is no longer needed.
83
84           "Random" casereaders, which support a seek operation,
85           may also be created.  These should not, generally, be
86           used for statistical procedures, because random access
87           is much slower than sequential access.  They are
88           intended for use by the GUI.
89
90        3. Destruction.  This phase is optional.  The casefile is
91           also read with casereaders in this phase, but the
92           ability to create new casereaders is curtailed.
93
94           In this phase, casereaders could still be cloned (once
95           we eventually implement cloning).
96
97           To transition from phase 1 or 2 to phase 3 and create a
98           casereader, call casefile_get_destructive_reader().
99           The same functions apply to the casereader obtained
100           this way as apply to casereaders obtained in phase 2.
101           
102           After casefile_get_destructive_reader() is called, no
103           more casereaders may be created with
104           casefile_get_reader() or
105           casefile_get_destructive_reader().  (If cloning of
106           casereaders were implemented, it would still be
107           possible.)
108
109           The purpose of the limitations applied to casereaders
110           in phase 3 is to allow in-memory casefiles to fully
111           transfer ownership of cases to the casereaders,
112           avoiding the need for extra copies of case data.  For
113           relatively static data sets with many variables, I
114           suspect (without evidence) that this may be a big
115           performance boost.
116
117    When a casefile is no longer needed, it may be destroyed with
118    casefile_destroy().  This function will also destroy any
119    remaining casereaders. */
120
121 /* FIXME: should we implement compression? */
122
123 /* In-memory cases are arranged in an array of arrays.  The top
124    level is variable size and the size of each bottom level array
125    is fixed at the number of cases defined here.  */
126 #define CASES_PER_BLOCK 128             
127
128 /* A casefile. */
129 struct casefile 
130   {
131     /* Basic data. */
132     struct casefile *next, *prev;       /* Next, prev in global list. */
133     size_t value_cnt;                   /* Case size in `union value's. */
134     size_t case_acct_size;              /* Case size for accounting. */
135     unsigned long case_cnt;             /* Number of cases stored. */
136     enum { MEMORY, DISK } storage;      /* Where cases are stored. */
137     enum { WRITE, READ } mode;          /* Is writing or reading allowed? */
138     struct casereader *readers;         /* List of our readers. */
139     bool being_destroyed;               /* Does a destructive reader exist? */
140     bool ok;                            /* False after I/O error. */
141
142     /* Memory storage. */
143     struct ccase **cases;               /* Pointer to array of cases. */
144
145     /* Disk storage. */
146     int fd;                             /* File descriptor, -1 if none. */
147     char *file_name;                    /* File name. */
148     union value *buffer;                /* I/O buffer, NULL if none. */
149     size_t buffer_used;                 /* Number of values used in buffer. */
150     size_t buffer_size;                 /* Buffer size in values. */
151   };
152
153 /* For reading out the cases in a casefile. */
154 struct casereader 
155   {
156     struct casereader *next, *prev;     /* Next, prev in casefile's list. */
157     struct casefile *cf;                /* Our casefile. */
158     unsigned long case_idx;             /* Case number of current case. */
159     bool destructive;                   /* Is this a destructive reader? */
160     bool random;                        /* Is this a random reader? */
161
162     /* Disk storage. */
163     int fd;                             /* File descriptor. */
164     off_t file_ofs;                     /* Current position in fd. */
165     off_t buffer_ofs;                   /* File offset of buffer start. */
166     union value *buffer;                /* I/O buffer. */
167     size_t buffer_pos;                  /* Offset of buffer position. */
168     struct ccase c;                     /* Current case. */
169   };
170
171 /* Return the case number of the current case */
172 unsigned long
173 casereader_cnum(const struct casereader *r)
174 {
175   return r->case_idx;
176 }
177
178 /* Doubly linked list of all casefiles. */
179 static struct casefile *casefiles;
180
181 /* Number of bytes of case allocated in in-memory casefiles. */
182 static size_t case_bytes;
183
184 static void register_atexit (void);
185 static void exit_handler (void);
186
187 static void reader_open_file (struct casereader *);
188 static void write_case_to_disk (struct casefile *, const struct ccase *);
189 static void flush_buffer (struct casefile *);
190 static void seek_and_fill_buffer (struct casereader *);
191 static bool fill_buffer (struct casereader *);
192
193 static void io_error (struct casefile *, const char *, ...)
194      PRINTF_FORMAT (2, 3);
195 static int safe_open (const char *file_name, int flags);
196 static int safe_close (int fd);
197
198 /* Creates and returns a casefile to store cases of VALUE_CNT
199    `union value's each. */
200 struct casefile *
201 casefile_create (size_t value_cnt) 
202 {
203   struct casefile *cf = xmalloc (sizeof *cf);
204   cf->next = casefiles;
205   cf->prev = NULL;
206   if (cf->next != NULL)
207     cf->next->prev = cf;
208   casefiles = cf;
209   cf->value_cnt = value_cnt;
210   cf->case_acct_size = (cf->value_cnt + 4) * sizeof *cf->buffer;
211   cf->case_cnt = 0;
212   cf->storage = MEMORY;
213   cf->mode = WRITE;
214   cf->readers = NULL;
215   cf->being_destroyed = 0;
216   cf->ok = true;
217   cf->cases = NULL;
218   cf->fd = -1;
219   cf->file_name = NULL;
220   cf->buffer = NULL;
221   cf->buffer_size = ROUND_UP (cf->value_cnt, IO_BUF_SIZE);
222   if (cf->value_cnt > 0 && cf->buffer_size % cf->value_cnt > 64)
223     cf->buffer_size = cf->value_cnt;
224   cf->buffer_used = 0;
225   register_atexit ();
226   return cf;
227 }
228
229 /* Destroys casefile CF. */
230 void
231 casefile_destroy (struct casefile *cf) 
232 {
233   if (cf != NULL) 
234     {
235       if (cf->next != NULL)
236         cf->next->prev = cf->prev;
237       if (cf->prev != NULL)
238         cf->prev->next = cf->next;
239       if (casefiles == cf)
240         casefiles = cf->next;
241
242       while (cf->readers != NULL) 
243         casereader_destroy (cf->readers);
244
245       if (cf->cases != NULL) 
246         {
247           size_t idx, block_cnt;
248
249           case_bytes -= cf->case_cnt * cf->case_acct_size;
250           for (idx = 0; idx < cf->case_cnt; idx++)
251             {
252               size_t block_idx = idx / CASES_PER_BLOCK;
253               size_t case_idx = idx % CASES_PER_BLOCK;
254               struct ccase *c = &cf->cases[block_idx][case_idx];
255               case_destroy (c);
256             }
257
258           block_cnt = DIV_RND_UP (cf->case_cnt, CASES_PER_BLOCK);
259           for (idx = 0; idx < block_cnt; idx++)
260             free (cf->cases[idx]);
261
262           free (cf->cases);
263         }
264
265       if (cf->fd != -1)
266         safe_close (cf->fd);
267           
268       if (cf->file_name != NULL && remove (cf->file_name) == -1) 
269         io_error (cf, _("%s: Removing temporary file: %s."),
270                   cf->file_name, strerror (errno));
271       free (cf->file_name);
272
273       free (cf->buffer);
274
275       free (cf);
276     }
277 }
278
279 /* Returns true if an I/O error has occurred in casefile CF. */
280 bool
281 casefile_error (const struct casefile *cf) 
282 {
283   return !cf->ok;
284 }
285
286 /* Returns true only if casefile CF is stored in memory (instead of on
287    disk), false otherwise. */
288 bool
289 casefile_in_core (const struct casefile *cf) 
290 {
291   assert (cf != NULL);
292
293   return cf->storage == MEMORY;
294 }
295
296 /* Puts a casefile to "sleep", that is, minimizes the resources
297    needed for it by closing its file descriptor and freeing its
298    buffer.  This is useful if we need so many casefiles that we
299    might not have enough memory and file descriptors to go
300    around.
301
302    For simplicity, this implementation always converts the
303    casefile to reader mode.  If this turns out to be a problem,
304    with a little extra work we could also support sleeping
305    writers.
306
307    Returns true if successful, false if an I/O error occurred. */
308 bool
309 casefile_sleep (const struct casefile *cf_) 
310 {
311   struct casefile *cf = (struct casefile *) cf_;
312   assert (cf != NULL);
313
314   casefile_mode_reader (cf);
315   casefile_to_disk (cf);
316   flush_buffer (cf);
317
318   if (cf->fd != -1) 
319     {
320       safe_close (cf->fd);
321       cf->fd = -1;
322     }
323   if (cf->buffer != NULL) 
324     {
325       free (cf->buffer);
326       cf->buffer = NULL;
327     }
328
329   return cf->ok;
330 }
331
332 /* Returns the number of `union value's in a case for CF. */
333 size_t
334 casefile_get_value_cnt (const struct casefile *cf) 
335 {
336   assert (cf != NULL);
337
338   return cf->value_cnt;
339 }
340
341 /* Returns the number of cases in casefile CF. */
342 unsigned long
343 casefile_get_case_cnt (const struct casefile *cf) 
344 {
345   assert (cf != NULL);
346
347   return cf->case_cnt;
348 }
349
350 /* Appends a copy of case C to casefile CF.  Not valid after any
351    reader for CF has been created.
352    Returns true if successful, false if an I/O error occurred. */
353 bool
354 casefile_append (struct casefile *cf, const struct ccase *c) 
355 {
356   assert (cf != NULL);
357   assert (c != NULL);
358   assert (cf->mode == WRITE);
359
360   assert ( cf->value_cnt <= c->case_data->value_cnt );
361
362   /* Try memory first. */
363   if (cf->storage == MEMORY) 
364     {
365       if (case_bytes < get_workspace ())
366         {
367           size_t block_idx = cf->case_cnt / CASES_PER_BLOCK;
368           size_t case_idx = cf->case_cnt % CASES_PER_BLOCK;
369           struct ccase new_case;
370
371           case_bytes += cf->case_acct_size;
372           case_clone (&new_case, c);
373           if (case_idx == 0) 
374             {
375               if ((block_idx & (block_idx - 1)) == 0) 
376                 {
377                   size_t block_cap = block_idx == 0 ? 1 : block_idx * 2;
378                   cf->cases = xnrealloc (cf->cases,
379                                          block_cap, sizeof *cf->cases);
380                 }
381
382               cf->cases[block_idx] = xnmalloc (CASES_PER_BLOCK,
383                                                sizeof **cf->cases);
384             }
385
386           case_move (&cf->cases[block_idx][case_idx], &new_case);
387         }
388       else
389         {
390           casefile_to_disk (cf);
391           assert (cf->storage == DISK);
392           write_case_to_disk (cf, c);
393         }
394     }
395   else
396     write_case_to_disk (cf, c);
397
398   cf->case_cnt++;
399   return cf->ok;
400 }
401
402 /* Appends case C to casefile CF, which takes over ownership of
403    C.  Not valid after any reader for CF has been created.
404    Returns true if successful, false if an I/O error occurred. */
405 bool
406 casefile_append_xfer (struct casefile *cf, struct ccase *c) 
407 {
408   casefile_append (cf, c);
409   case_destroy (c);
410   return cf->ok;
411 }
412
413 /* Writes case C to casefile CF's disk buffer, first flushing the buffer to
414    disk if it would otherwise overflow.
415    Returns true if successful, false if an I/O error occurred. */
416 static void
417 write_case_to_disk (struct casefile *cf, const struct ccase *c) 
418 {
419   if (!cf->ok)
420     return;
421   
422   case_to_values (c, cf->buffer + cf->buffer_used, cf->value_cnt);
423   cf->buffer_used += cf->value_cnt;
424   if (cf->buffer_used + cf->value_cnt > cf->buffer_size)
425     flush_buffer (cf);
426 }
427
428 /* If any bytes in CF's output buffer are used, flush them to
429    disk. */
430 static void
431 flush_buffer (struct casefile *cf) 
432 {
433   if (cf->ok && cf->buffer_used > 0) 
434     {
435       if (!full_write (cf->fd, cf->buffer,
436                        cf->buffer_size * sizeof *cf->buffer))
437         io_error (cf, _("Error writing temporary file: %s."),
438                   strerror (errno));
439       cf->buffer_used = 0;
440     }
441 }
442
443 /* If CF is currently stored in memory, writes it to disk.  Readers, if any,
444    retain their current positions.
445    Returns true if successful, false if an I/O error occurred. */
446 bool
447 casefile_to_disk (const struct casefile *cf_) 
448 {
449   struct casefile *cf = (struct casefile *) cf_;
450   struct casereader *reader;
451   
452   assert (cf != NULL);
453
454   if (cf->storage == MEMORY)
455     {
456       size_t idx, block_cnt;
457       
458       assert (cf->file_name == NULL);
459       assert (cf->fd == -1);
460       assert (cf->buffer_used == 0);
461
462       if (!make_temp_file (&cf->fd, &cf->file_name))
463         {
464           cf->ok = false;
465           return false;
466         }
467       cf->storage = DISK;
468
469       cf->buffer = xnmalloc (cf->buffer_size, sizeof *cf->buffer);
470       memset (cf->buffer, 0, cf->buffer_size * sizeof *cf->buffer);
471
472       case_bytes -= cf->case_cnt * cf->case_acct_size;
473       for (idx = 0; idx < cf->case_cnt; idx++)
474         {
475           size_t block_idx = idx / CASES_PER_BLOCK;
476           size_t case_idx = idx % CASES_PER_BLOCK;
477           struct ccase *c = &cf->cases[block_idx][case_idx];
478           write_case_to_disk (cf, c);
479           case_destroy (c);
480         }
481
482       block_cnt = DIV_RND_UP (cf->case_cnt, CASES_PER_BLOCK);
483       for (idx = 0; idx < block_cnt; idx++)
484         free (cf->cases[idx]);
485
486       free (cf->cases);
487       cf->cases = NULL;
488
489       if (cf->mode == READ)
490         flush_buffer (cf);
491
492       for (reader = cf->readers; reader != NULL; reader = reader->next)
493         reader_open_file (reader);
494     }
495   return cf->ok;
496 }
497
498 /* Changes CF to reader mode, ensuring that no more cases may be
499    added.  Creating a casereader for CF has the same effect. */
500 void
501 casefile_mode_reader (struct casefile *cf) 
502 {
503   assert (cf != NULL);
504   cf->mode = READ;
505 }
506
507 /* Creates and returns a casereader for CF.  A casereader can be used to
508    sequentially read the cases in a casefile. */
509 struct casereader *
510 casefile_get_reader (const struct casefile *cf_) 
511 {
512   struct casefile *cf = (struct casefile *) cf_;
513   struct casereader *reader;
514
515   assert (cf != NULL);
516   assert (!cf->being_destroyed);
517
518   /* Flush the buffer to disk if it's not empty. */
519   if (cf->mode == WRITE && cf->storage == DISK)
520     flush_buffer (cf);
521   
522   cf->mode = READ;
523
524   reader = xmalloc (sizeof *reader);
525   reader->next = cf->readers;
526   if (cf->readers != NULL)
527     reader->next->prev = reader;
528   cf->readers = reader;
529   reader->prev = NULL;
530   reader->cf = cf;
531   reader->case_idx = 0;
532   reader->destructive = 0;
533   reader->random = false;
534   reader->fd = -1;
535   reader->buffer = NULL;
536   reader->buffer_pos = 0;
537   case_nullify (&reader->c);
538
539   if (reader->cf->storage == DISK) 
540     reader_open_file (reader);
541
542   return reader;
543 }
544
545 /* Creates and returns a random casereader for CF.  A random
546    casereader can be used to randomly read the cases in a
547    casefile. */
548 struct casereader *
549 casefile_get_random_reader (const struct casefile *cf) 
550 {
551   struct casefile  *mutable_casefile = (struct casefile*) cf;
552   struct casereader *reader;
553
554   enum { WRITE, READ } mode = cf->mode ;
555   reader = casefile_get_reader (cf);
556   reader->random = true;
557   mutable_casefile->mode = mode;
558   
559   return reader;
560 }
561
562 /* Creates and returns a destructive casereader for CF.  Like a
563    normal casereader, a destructive casereader sequentially reads
564    the cases in a casefile.  Unlike a normal casereader, a
565    destructive reader cannot operate concurrently with any other
566    reader.  (This restriction could be relaxed in a few ways, but
567    it is so far unnecessary for other code.) */
568 struct casereader *
569 casefile_get_destructive_reader (struct casefile *cf) 
570 {
571   struct casereader *reader;
572   
573   assert (cf->readers == NULL);
574   reader = casefile_get_reader (cf);
575   reader->destructive = 1;
576   cf->being_destroyed = 1;
577   return reader;
578 }
579
580 /* Opens a disk file for READER and seeks to the current position as indicated
581    by case_idx.  Normally the current position is the beginning of the file,
582    but casefile_to_disk may cause the file to be opened at a different
583    position. */
584 static void
585 reader_open_file (struct casereader *reader) 
586 {
587   struct casefile *cf = reader->cf;
588   if (!cf->ok || reader->case_idx >= cf->case_cnt)
589     return;
590
591   if (cf->fd != -1) 
592     {
593       reader->fd = cf->fd;
594       cf->fd = -1;
595     }
596   else 
597     {
598       reader->fd = safe_open (cf->file_name, O_RDONLY);
599       if (reader->fd < 0)
600         io_error (cf, _("%s: Opening temporary file: %s."),
601                   cf->file_name, strerror (errno));
602     }
603
604   if (cf->buffer != NULL) 
605     {
606       reader->buffer = cf->buffer;
607       cf->buffer = NULL; 
608     }
609   else 
610     {
611       reader->buffer = xnmalloc (cf->buffer_size, sizeof *cf->buffer);
612       memset (reader->buffer, 0, cf->buffer_size * sizeof *cf->buffer); 
613     }
614
615   case_create (&reader->c, cf->value_cnt);
616
617   reader->buffer_ofs = -1;
618   reader->file_ofs = -1;
619   seek_and_fill_buffer (reader);
620 }
621
622 /* Seeks the backing file for READER to the proper position and
623    refreshes the buffer contents. */
624 static void
625 seek_and_fill_buffer (struct casereader *reader) 
626 {
627   struct casefile *cf = reader->cf;
628   off_t new_ofs;
629
630   if (cf->value_cnt != 0) 
631     {
632       size_t buffer_case_cnt = cf->buffer_size / cf->value_cnt;
633       new_ofs = ((off_t) reader->case_idx / buffer_case_cnt
634                   * cf->buffer_size * sizeof *cf->buffer);
635       reader->buffer_pos = (reader->case_idx % buffer_case_cnt
636                             * cf->value_cnt);
637     }
638   else 
639     new_ofs = 0;
640   if (new_ofs != reader->file_ofs) 
641     {
642       if (lseek (reader->fd, new_ofs, SEEK_SET) != new_ofs)
643         io_error (cf, _("%s: Seeking temporary file: %s."),
644                   cf->file_name, strerror (errno));
645       else
646         reader->file_ofs = new_ofs;
647     }
648
649   if (cf->case_cnt > 0 && cf->value_cnt > 0 && reader->buffer_ofs != new_ofs)
650     fill_buffer (reader);
651 }
652
653 /* Fills READER's buffer by reading a block from disk. */
654 static bool
655 fill_buffer (struct casereader *reader)
656 {
657   if (reader->cf->ok) 
658     {
659       int bytes = full_read (reader->fd, reader->buffer,
660                              reader->cf->buffer_size * sizeof *reader->buffer);
661       if (bytes < 0) 
662         io_error (reader->cf, _("%s: Reading temporary file: %s."),
663                   reader->cf->file_name, strerror (errno));
664       else if (bytes != reader->cf->buffer_size * sizeof *reader->buffer) 
665         io_error (reader->cf, _("%s: Temporary file ended unexpectedly."),
666                   reader->cf->file_name);
667       else 
668         {
669           reader->buffer_ofs = reader->file_ofs;
670           reader->file_ofs += bytes; 
671         }
672     }
673   return reader->cf->ok;
674 }
675
676 /* Returns the casefile that READER reads. */
677 const struct casefile *
678 casereader_get_casefile (const struct casereader *reader) 
679 {
680   assert (reader != NULL);
681   
682   return reader->cf;
683 }
684
685 /* Reads a copy of the next case from READER into C.
686    Caller is responsible for destroying C.
687    Returns true if successful, false at end of file. */
688 bool
689 casereader_read (struct casereader *reader, struct ccase *c) 
690 {
691   assert (reader != NULL);
692   
693   if (!reader->cf->ok || reader->case_idx >= reader->cf->case_cnt) 
694     return false;
695
696   if (reader->cf->storage == MEMORY) 
697     {
698       size_t block_idx = reader->case_idx / CASES_PER_BLOCK;
699       size_t case_idx = reader->case_idx % CASES_PER_BLOCK;
700
701       case_clone (c, &reader->cf->cases[block_idx][case_idx]);
702       reader->case_idx++;
703       return true;
704     }
705   else 
706     {
707       if (reader->buffer_pos + reader->cf->value_cnt > reader->cf->buffer_size)
708         {
709           if (!fill_buffer (reader))
710             return false;
711           reader->buffer_pos = 0;
712         }
713
714       case_from_values (&reader->c, reader->buffer + reader->buffer_pos,
715                         reader->cf->value_cnt);
716       reader->buffer_pos += reader->cf->value_cnt;
717       reader->case_idx++;
718
719       case_clone (c, &reader->c);
720       return true;
721     }
722 }
723
724 /* Reads the next case from READER into C and transfers ownership
725    to the caller.  Caller is responsible for destroying C.
726    Returns true if successful, false at end of file or on I/O
727    error. */
728 bool
729 casereader_read_xfer (struct casereader *reader, struct ccase *c)
730 {
731   assert (reader != NULL);
732
733   if (reader->destructive == 0
734       || reader->case_idx >= reader->cf->case_cnt
735       || reader->cf->storage == DISK) 
736     return casereader_read (reader, c);
737   else 
738     {
739       size_t block_idx = reader->case_idx / CASES_PER_BLOCK;
740       size_t case_idx = reader->case_idx % CASES_PER_BLOCK;
741       struct ccase *read_case = &reader->cf->cases[block_idx][case_idx];
742
743       case_move (c, read_case);
744       reader->case_idx++;
745       return true;
746     }
747 }
748
749 /* Sets the next case to be read by READER to CASE_IDX,
750    which must be less than the number of cases in the casefile.
751    Allowed only for random readers. */
752 void
753 casereader_seek (struct casereader *reader, unsigned long case_idx) 
754 {
755   assert (reader != NULL);
756   assert (reader->random);
757   assert (case_idx < reader->cf->case_cnt);
758
759   reader->case_idx = case_idx;
760   if (reader->cf->storage == DISK)
761     seek_and_fill_buffer (reader);
762 }
763
764 /* Destroys READER. */
765 void
766 casereader_destroy (struct casereader *reader)
767 {
768   assert (reader != NULL);
769
770   if (reader->next != NULL)
771     reader->next->prev = reader->prev;
772   if (reader->prev != NULL)
773     reader->prev->next = reader->next;
774   if (reader->cf->readers == reader)
775     reader->cf->readers = reader->next;
776
777   if (reader->cf->buffer == NULL)
778     reader->cf->buffer = reader->buffer;
779   else
780     free (reader->buffer);
781
782   if (reader->fd != -1) 
783     {
784       if (reader->cf->fd == -1)
785         reader->cf->fd = reader->fd;
786       else
787         safe_close (reader->fd);
788     }
789   
790   case_destroy (&reader->c);
791
792   free (reader);
793 }
794
795 /* Marks CF as having encountered an I/O error.
796    If this is the first error on CF, reports FORMAT to the user,
797    doing printf()-style substitutions. */
798 static void
799 io_error (struct casefile *cf, const char *format, ...)
800 {
801   if (cf->ok) 
802     {
803       struct msg m;
804       va_list args;
805
806       m.category = MSG_GENERAL;
807       m.severity = MSG_ERROR;
808       m.where.file_name = NULL;
809       m.where.line_number = -1;
810       va_start (args, format);
811       m.text = xvasprintf (format, args);
812       va_end (args);
813       
814       msg_emit (&m);
815     }
816   cf->ok = false;
817 }
818
819 /* Calls open(), passing FILE_NAME and FLAGS, repeating as necessary
820    to deal with interrupted calls. */
821 static int
822 safe_open (const char *file_name, int flags) 
823 {
824   int fd;
825
826   do 
827     {
828       fd = open (file_name, flags);
829     }
830   while (fd == -1 && errno == EINTR);
831
832   return fd;
833 }
834
835 /* Calls close(), passing FD, repeating as necessary to deal with
836    interrupted calls. */
837 static int safe_close (int fd) 
838 {
839   int retval;
840
841   do 
842     {
843       retval = close (fd);
844     }
845   while (retval == -1 && errno == EINTR);
846
847   return retval;
848 }
849
850 /* Registers our exit handler with atexit() if it has not already
851    been registered. */
852 static void
853 register_atexit (void) 
854 {
855   static bool registered = false;
856   if (!registered) 
857     {
858       registered = true;
859       atexit (exit_handler);
860     }
861 }
862
863 /* atexit() handler that closes and deletes our temporary
864    files. */
865 static void
866 exit_handler (void) 
867 {
868   while (casefiles != NULL)
869     casefile_destroy (casefiles);
870 }