Add random access to casefiles, for use in GUI.
[pspp-builds.git] / src / data / casefile.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 2004 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21 #include "casefile.h"
22 #include <assert.h>
23 #include <errno.h>
24 #include <fcntl.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <unistd.h>
29 #include <libpspp/alloc.h>
30 #include "case.h"
31 #include <libpspp/compiler.h>
32 #include <libpspp/message.h>
33 #include "full-read.h"
34 #include "full-write.h"
35 #include <libpspp/misc.h>
36 #include "make-file.h"
37 #include "settings.h"
38 #include "variable.h"
39
40 #include "gettext.h"
41 #define _(msgid) gettext (msgid)
42
43 #define IO_BUF_SIZE (8192 / sizeof (union value))
44
45 /* A casefile represents a sequentially accessible stream of
46    immutable cases.
47
48    If workspace allows, a casefile is maintained in memory.  If
49    workspace overflows, then the casefile is pushed to disk.  In
50    either case the interface presented to callers is kept the
51    same.
52
53    The life cycle of a casefile consists of up to three phases:
54
55        1. Writing.  The casefile initially contains no cases.  In
56           this phase, any number of cases may be appended to the
57           end of a casefile.  (Cases are never inserted in the
58           middle or before the beginning of a casefile.)
59
60           Use casefile_append() or casefile_append_xfer() to
61           append a case to a casefile.
62
63        2. Reading.  The casefile may be read sequentially,
64           starting from the beginning, by "casereaders".  Any
65           number of casereaders may be created, at any time,
66           during the reading phase.  Each casereader has an
67           independent position in the casefile.
68
69           Casereaders may only move forward.  They cannot move
70           backward to arbitrary records or seek randomly.
71           Cloning casereaders is possible, but it is not yet
72           implemented.
73
74           Use casefile_get_reader() to create a casereader for
75           use in phase 2.  This also transitions from phase 1 to
76           phase 2.  Calling casefile_mode_reader() makes the same
77           transition, without creating a casereader.
78
79           Use casereader_read() or casereader_read_xfer() to read
80           a case from a casereader.  Use casereader_destroy() to
81           discard a casereader when it is no longer needed.
82
83        3. Destruction.  This phase is optional.  The casefile is
84           also read with casereaders in this phase, but the
85           ability to create new casereaders is curtailed.
86
87           In this phase, casereaders could still be cloned (once
88           we eventually implement cloning).
89
90           To transition from phase 1 or 2 to phase 3 and create a
91           casereader, call casefile_get_destructive_reader().
92           The same functions apply to the casereader obtained
93           this way as apply to casereaders obtained in phase 2.
94           
95           After casefile_get_destructive_reader() is called, no
96           more casereaders may be created with
97           casefile_get_reader() or
98           casefile_get_destructive_reader().  (If cloning of
99           casereaders were implemented, it would still be
100           possible.)
101
102           The purpose of the limitations applied to casereaders
103           in phase 3 is to allow in-memory casefiles to fully
104           transfer ownership of cases to the casereaders,
105           avoiding the need for extra copies of case data.  For
106           relatively static data sets with many variables, I
107           suspect (without evidence) that this may be a big
108           performance boost.
109
110    When a casefile is no longer needed, it may be destroyed with
111    casefile_destroy().  This function will also destroy any
112    remaining casereaders. */
113
114 /* FIXME: should we implement compression? */
115
116 /* In-memory cases are arranged in an array of arrays.  The top
117    level is variable size and the size of each bottom level array
118    is fixed at the number of cases defined here.  */
119 #define CASES_PER_BLOCK 128             
120
121 /* A casefile. */
122 struct casefile 
123   {
124     /* Basic data. */
125     struct casefile *next, *prev;       /* Next, prev in global list. */
126     size_t value_cnt;                   /* Case size in `union value's. */
127     size_t case_acct_size;              /* Case size for accounting. */
128     unsigned long case_cnt;             /* Number of cases stored. */
129     enum { MEMORY, DISK } storage;      /* Where cases are stored. */
130     enum { WRITE, READ } mode;          /* Is writing or reading allowed? */
131     struct casereader *readers;         /* List of our readers. */
132     bool being_destroyed;               /* Does a destructive reader exist? */
133     bool ok;                            /* False after I/O error. */
134
135     /* Memory storage. */
136     struct ccase **cases;               /* Pointer to array of cases. */
137
138     /* Disk storage. */
139     int fd;                             /* File descriptor, -1 if none. */
140     char *file_name;                    /* File name. */
141     union value *buffer;                /* I/O buffer, NULL if none. */
142     size_t buffer_used;                 /* Number of values used in buffer. */
143     size_t buffer_size;                 /* Buffer size in values. */
144   };
145
146 /* For reading out the cases in a casefile. */
147 struct casereader 
148   {
149     struct casereader *next, *prev;     /* Next, prev in casefile's list. */
150     struct casefile *cf;                /* Our casefile. */
151     unsigned long case_idx;             /* Case number of current case. */
152     bool destructive;                   /* Is this a destructive reader? */
153     bool random;                        /* Is this a random reader? */
154
155     /* Disk storage. */
156     int fd;                             /* File descriptor. */
157     off_t file_ofs;                     /* Current position in fd. */
158     off_t buffer_ofs;                   /* File offset of buffer start. */
159     union value *buffer;                /* I/O buffer. */
160     size_t buffer_pos;                  /* Offset of buffer position. */
161     struct ccase c;                     /* Current case. */
162   };
163
164 /* Return the case number of the current case */
165 unsigned long
166 casereader_cnum(const struct casereader *r)
167 {
168   return r->case_idx;
169 }
170
171 /* Doubly linked list of all casefiles. */
172 static struct casefile *casefiles;
173
174 /* Number of bytes of case allocated in in-memory casefiles. */
175 static size_t case_bytes;
176
177 static void register_atexit (void);
178 static void exit_handler (void);
179
180 static void reader_open_file (struct casereader *);
181 static void write_case_to_disk (struct casefile *, const struct ccase *);
182 static void flush_buffer (struct casefile *);
183 static void seek_and_fill_buffer (struct casereader *);
184 static bool fill_buffer (struct casereader *);
185
186 static void io_error (struct casefile *, const char *, ...)
187      PRINTF_FORMAT (2, 3);
188 static int safe_open (const char *file_name, int flags);
189 static int safe_close (int fd);
190
191 /* Creates and returns a casefile to store cases of VALUE_CNT
192    `union value's each. */
193 struct casefile *
194 casefile_create (size_t value_cnt) 
195 {
196   struct casefile *cf = xmalloc (sizeof *cf);
197   cf->next = casefiles;
198   cf->prev = NULL;
199   if (cf->next != NULL)
200     cf->next->prev = cf;
201   casefiles = cf;
202   cf->value_cnt = value_cnt;
203   cf->case_acct_size = (cf->value_cnt + 4) * sizeof *cf->buffer;
204   cf->case_cnt = 0;
205   cf->storage = MEMORY;
206   cf->mode = WRITE;
207   cf->readers = NULL;
208   cf->being_destroyed = 0;
209   cf->ok = true;
210   cf->cases = NULL;
211   cf->fd = -1;
212   cf->file_name = NULL;
213   cf->buffer = NULL;
214   cf->buffer_size = ROUND_UP (cf->value_cnt, IO_BUF_SIZE);
215   if (cf->value_cnt > 0 && cf->buffer_size % cf->value_cnt > 64)
216     cf->buffer_size = cf->value_cnt;
217   cf->buffer_used = 0;
218   register_atexit ();
219   return cf;
220 }
221
222 /* Destroys casefile CF. */
223 void
224 casefile_destroy (struct casefile *cf) 
225 {
226   if (cf != NULL) 
227     {
228       if (cf->next != NULL)
229         cf->next->prev = cf->prev;
230       if (cf->prev != NULL)
231         cf->prev->next = cf->next;
232       if (casefiles == cf)
233         casefiles = cf->next;
234
235       while (cf->readers != NULL) 
236         casereader_destroy (cf->readers);
237
238       if (cf->cases != NULL) 
239         {
240           size_t idx, block_cnt;
241
242           case_bytes -= cf->case_cnt * cf->case_acct_size;
243           for (idx = 0; idx < cf->case_cnt; idx++)
244             {
245               size_t block_idx = idx / CASES_PER_BLOCK;
246               size_t case_idx = idx % CASES_PER_BLOCK;
247               struct ccase *c = &cf->cases[block_idx][case_idx];
248               case_destroy (c);
249             }
250
251           block_cnt = DIV_RND_UP (cf->case_cnt, CASES_PER_BLOCK);
252           for (idx = 0; idx < block_cnt; idx++)
253             free (cf->cases[idx]);
254
255           free (cf->cases);
256         }
257
258       if (cf->fd != -1)
259         safe_close (cf->fd);
260           
261       if (cf->file_name != NULL && remove (cf->file_name) == -1) 
262         io_error (cf, _("%s: Removing temporary file: %s."),
263                   cf->file_name, strerror (errno));
264       free (cf->file_name);
265
266       free (cf->buffer);
267
268       free (cf);
269     }
270 }
271
272 /* Returns true if an I/O error has occurred in casefile CF. */
273 bool
274 casefile_error (const struct casefile *cf) 
275 {
276   return !cf->ok;
277 }
278
279 /* Returns true only if casefile CF is stored in memory (instead of on
280    disk), false otherwise. */
281 bool
282 casefile_in_core (const struct casefile *cf) 
283 {
284   assert (cf != NULL);
285
286   return cf->storage == MEMORY;
287 }
288
289 /* Puts a casefile to "sleep", that is, minimizes the resources
290    needed for it by closing its file descriptor and freeing its
291    buffer.  This is useful if we need so many casefiles that we
292    might not have enough memory and file descriptors to go
293    around.
294
295    For simplicity, this implementation always converts the
296    casefile to reader mode.  If this turns out to be a problem,
297    with a little extra work we could also support sleeping
298    writers.
299
300    Returns true if successful, false if an I/O error occurred. */
301 bool
302 casefile_sleep (const struct casefile *cf_) 
303 {
304   struct casefile *cf = (struct casefile *) cf_;
305   assert (cf != NULL);
306
307   casefile_mode_reader (cf);
308   casefile_to_disk (cf);
309   flush_buffer (cf);
310
311   if (cf->fd != -1) 
312     {
313       safe_close (cf->fd);
314       cf->fd = -1;
315     }
316   if (cf->buffer != NULL) 
317     {
318       free (cf->buffer);
319       cf->buffer = NULL;
320     }
321
322   return cf->ok;
323 }
324
325 /* Returns the number of `union value's in a case for CF. */
326 size_t
327 casefile_get_value_cnt (const struct casefile *cf) 
328 {
329   assert (cf != NULL);
330
331   return cf->value_cnt;
332 }
333
334 /* Returns the number of cases in casefile CF. */
335 unsigned long
336 casefile_get_case_cnt (const struct casefile *cf) 
337 {
338   assert (cf != NULL);
339
340   return cf->case_cnt;
341 }
342
343 /* Appends a copy of case C to casefile CF.  Not valid after any
344    reader for CF has been created.
345    Returns true if successful, false if an I/O error occurred. */
346 bool
347 casefile_append (struct casefile *cf, const struct ccase *c) 
348 {
349   assert (cf != NULL);
350   assert (c != NULL);
351   assert (cf->mode == WRITE);
352
353   /* Try memory first. */
354   if (cf->storage == MEMORY) 
355     {
356       if (case_bytes < get_workspace ())
357         {
358           size_t block_idx = cf->case_cnt / CASES_PER_BLOCK;
359           size_t case_idx = cf->case_cnt % CASES_PER_BLOCK;
360           struct ccase new_case;
361
362           case_bytes += cf->case_acct_size;
363           case_clone (&new_case, c);
364           if (case_idx == 0) 
365             {
366               if ((block_idx & (block_idx - 1)) == 0) 
367                 {
368                   size_t block_cap = block_idx == 0 ? 1 : block_idx * 2;
369                   cf->cases = xnrealloc (cf->cases,
370                                          block_cap, sizeof *cf->cases);
371                 }
372
373               cf->cases[block_idx] = xnmalloc (CASES_PER_BLOCK,
374                                                sizeof **cf->cases);
375             }
376
377           case_move (&cf->cases[block_idx][case_idx], &new_case);
378         }
379       else
380         {
381           casefile_to_disk (cf);
382           assert (cf->storage == DISK);
383           write_case_to_disk (cf, c);
384         }
385     }
386   else
387     write_case_to_disk (cf, c);
388
389   cf->case_cnt++;
390   return cf->ok;
391 }
392
393 /* Appends case C to casefile CF, which takes over ownership of
394    C.  Not valid after any reader for CF has been created.
395    Returns true if successful, false if an I/O error occurred. */
396 bool
397 casefile_append_xfer (struct casefile *cf, struct ccase *c) 
398 {
399   casefile_append (cf, c);
400   case_destroy (c);
401   return cf->ok;
402 }
403
404 /* Writes case C to casefile CF's disk buffer, first flushing the buffer to
405    disk if it would otherwise overflow.
406    Returns true if successful, false if an I/O error occurred. */
407 static void
408 write_case_to_disk (struct casefile *cf, const struct ccase *c) 
409 {
410   if (!cf->ok)
411     return;
412   
413   case_to_values (c, cf->buffer + cf->buffer_used, cf->value_cnt);
414   cf->buffer_used += cf->value_cnt;
415   if (cf->buffer_used + cf->value_cnt > cf->buffer_size)
416     flush_buffer (cf);
417 }
418
419 /* If any bytes in CF's output buffer are used, flush them to
420    disk. */
421 static void
422 flush_buffer (struct casefile *cf) 
423 {
424   if (cf->ok && cf->buffer_used > 0) 
425     {
426       if (!full_write (cf->fd, cf->buffer,
427                        cf->buffer_size * sizeof *cf->buffer))
428         io_error (cf, _("Error writing temporary file: %s."),
429                   strerror (errno));
430       cf->buffer_used = 0;
431     }
432 }
433
434 /* If CF is currently stored in memory, writes it to disk.  Readers, if any,
435    retain their current positions.
436    Returns true if successful, false if an I/O error occurred. */
437 bool
438 casefile_to_disk (const struct casefile *cf_) 
439 {
440   struct casefile *cf = (struct casefile *) cf_;
441   struct casereader *reader;
442   
443   assert (cf != NULL);
444
445   if (cf->storage == MEMORY)
446     {
447       size_t idx, block_cnt;
448       
449       assert (cf->file_name == NULL);
450       assert (cf->fd == -1);
451       assert (cf->buffer_used == 0);
452
453       if (!make_temp_file (&cf->fd, &cf->file_name))
454         {
455           cf->ok = false;
456           return false;
457         }
458       cf->storage = DISK;
459
460       cf->buffer = xnmalloc (cf->buffer_size, sizeof *cf->buffer);
461       memset (cf->buffer, 0, cf->buffer_size * sizeof *cf->buffer);
462
463       case_bytes -= cf->case_cnt * cf->case_acct_size;
464       for (idx = 0; idx < cf->case_cnt; idx++)
465         {
466           size_t block_idx = idx / CASES_PER_BLOCK;
467           size_t case_idx = idx % CASES_PER_BLOCK;
468           struct ccase *c = &cf->cases[block_idx][case_idx];
469           write_case_to_disk (cf, c);
470           case_destroy (c);
471         }
472
473       block_cnt = DIV_RND_UP (cf->case_cnt, CASES_PER_BLOCK);
474       for (idx = 0; idx < block_cnt; idx++)
475         free (cf->cases[idx]);
476
477       free (cf->cases);
478       cf->cases = NULL;
479
480       if (cf->mode == READ)
481         flush_buffer (cf);
482
483       for (reader = cf->readers; reader != NULL; reader = reader->next)
484         reader_open_file (reader);
485     }
486   return cf->ok;
487 }
488
489 /* Changes CF to reader mode, ensuring that no more cases may be
490    added.  Creating a casereader for CF has the same effect. */
491 void
492 casefile_mode_reader (struct casefile *cf) 
493 {
494   assert (cf != NULL);
495   cf->mode = READ;
496 }
497
498 /* Creates and returns a casereader for CF.  A casereader can be used to
499    sequentially read the cases in a casefile. */
500 struct casereader *
501 casefile_get_reader (const struct casefile *cf_) 
502 {
503   struct casefile *cf = (struct casefile *) cf_;
504   struct casereader *reader;
505
506   assert (cf != NULL);
507   assert (!cf->being_destroyed);
508
509   /* Flush the buffer to disk if it's not empty. */
510   if (cf->mode == WRITE && cf->storage == DISK)
511     flush_buffer (cf);
512   
513   cf->mode = READ;
514
515   reader = xmalloc (sizeof *reader);
516   reader->next = cf->readers;
517   if (cf->readers != NULL)
518     reader->next->prev = reader;
519   cf->readers = reader;
520   reader->prev = NULL;
521   reader->cf = cf;
522   reader->case_idx = 0;
523   reader->destructive = 0;
524   reader->random = false;
525   reader->fd = -1;
526   reader->buffer = NULL;
527   reader->buffer_pos = 0;
528   case_nullify (&reader->c);
529
530   if (reader->cf->storage == DISK) 
531     reader_open_file (reader);
532
533   return reader;
534 }
535
536 /* Creates and returns a random casereader for CF.  A random
537    casereader can be used to randomly read the cases in a
538    casefile. */
539 struct casereader *
540 casefile_get_random_reader (const struct casefile *cf) 
541 {
542   struct casereader *reader = casefile_get_reader (cf);
543   reader->random = true;
544   return reader;
545 }
546
547 /* Creates and returns a destructive casereader for CF.  Like a
548    normal casereader, a destructive casereader sequentially reads
549    the cases in a casefile.  Unlike a normal casereader, a
550    destructive reader cannot operate concurrently with any other
551    reader.  (This restriction could be relaxed in a few ways, but
552    it is so far unnecessary for other code.) */
553 struct casereader *
554 casefile_get_destructive_reader (struct casefile *cf) 
555 {
556   struct casereader *reader;
557   
558   assert (cf->readers == NULL);
559   reader = casefile_get_reader (cf);
560   reader->destructive = 1;
561   cf->being_destroyed = 1;
562   return reader;
563 }
564
565 /* Opens a disk file for READER and seeks to the current position as indicated
566    by case_idx.  Normally the current position is the beginning of the file,
567    but casefile_to_disk may cause the file to be opened at a different
568    position. */
569 static void
570 reader_open_file (struct casereader *reader) 
571 {
572   struct casefile *cf = reader->cf;
573   if (!cf->ok || reader->case_idx >= cf->case_cnt)
574     return;
575
576   if (cf->fd != -1) 
577     {
578       reader->fd = cf->fd;
579       cf->fd = -1;
580     }
581   else 
582     {
583       reader->fd = safe_open (cf->file_name, O_RDONLY);
584       if (reader->fd < 0)
585         io_error (cf, _("%s: Opening temporary file: %s."),
586                   cf->file_name, strerror (errno));
587     }
588
589   if (cf->buffer != NULL) 
590     {
591       reader->buffer = cf->buffer;
592       cf->buffer = NULL; 
593     }
594   else 
595     {
596       reader->buffer = xnmalloc (cf->buffer_size, sizeof *cf->buffer);
597       memset (reader->buffer, 0, cf->buffer_size * sizeof *cf->buffer); 
598     }
599
600   case_create (&reader->c, cf->value_cnt);
601
602   reader->buffer_ofs = -1;
603   reader->file_ofs = -1;
604   seek_and_fill_buffer (reader);
605 }
606
607 /* Seeks the backing file for READER to the proper position and
608    refreshes the buffer contents. */
609 static void
610 seek_and_fill_buffer (struct casereader *reader) 
611 {
612   struct casefile *cf = reader->cf;
613   off_t new_ofs;
614
615   if (cf->value_cnt != 0) 
616     {
617       size_t buffer_case_cnt = cf->buffer_size / cf->value_cnt;
618       new_ofs = ((off_t) reader->case_idx / buffer_case_cnt
619                   * cf->buffer_size * sizeof *cf->buffer);
620       reader->buffer_pos = (reader->case_idx % buffer_case_cnt
621                             * cf->value_cnt);
622     }
623   else 
624     new_ofs = 0;
625   if (new_ofs != reader->file_ofs) 
626     {
627       if (lseek (reader->fd, new_ofs, SEEK_SET) != new_ofs)
628         io_error (cf, _("%s: Seeking temporary file: %s."),
629                   cf->file_name, strerror (errno));
630       else
631         reader->file_ofs = new_ofs;
632     }
633
634   if (cf->case_cnt > 0 && cf->value_cnt > 0 && reader->buffer_ofs != new_ofs)
635     fill_buffer (reader);
636 }
637
638 /* Fills READER's buffer by reading a block from disk. */
639 static bool
640 fill_buffer (struct casereader *reader)
641 {
642   if (reader->cf->ok) 
643     {
644       int bytes = full_read (reader->fd, reader->buffer,
645                              reader->cf->buffer_size * sizeof *reader->buffer);
646       if (bytes < 0) 
647         io_error (reader->cf, _("%s: Reading temporary file: %s."),
648                   reader->cf->file_name, strerror (errno));
649       else if (bytes != reader->cf->buffer_size * sizeof *reader->buffer) 
650         io_error (reader->cf, _("%s: Temporary file ended unexpectedly."),
651                   reader->cf->file_name);
652       else 
653         {
654           reader->buffer_ofs = reader->file_ofs;
655           reader->file_ofs += bytes; 
656         }
657     }
658   return reader->cf->ok;
659 }
660
661 /* Returns the casefile that READER reads. */
662 const struct casefile *
663 casereader_get_casefile (const struct casereader *reader) 
664 {
665   assert (reader != NULL);
666   
667   return reader->cf;
668 }
669
670 /* Reads a copy of the next case from READER into C.
671    Caller is responsible for destroying C.
672    Returns true if successful, false at end of file. */
673 bool
674 casereader_read (struct casereader *reader, struct ccase *c) 
675 {
676   assert (reader != NULL);
677   
678   if (!reader->cf->ok || reader->case_idx >= reader->cf->case_cnt) 
679     return false;
680
681   if (reader->cf->storage == MEMORY) 
682     {
683       size_t block_idx = reader->case_idx / CASES_PER_BLOCK;
684       size_t case_idx = reader->case_idx % CASES_PER_BLOCK;
685
686       case_clone (c, &reader->cf->cases[block_idx][case_idx]);
687       reader->case_idx++;
688       return true;
689     }
690   else 
691     {
692       if (reader->buffer_pos + reader->cf->value_cnt > reader->cf->buffer_size)
693         {
694           if (!fill_buffer (reader))
695             return false;
696           reader->buffer_pos = 0;
697         }
698
699       case_from_values (&reader->c, reader->buffer + reader->buffer_pos,
700                         reader->cf->value_cnt);
701       reader->buffer_pos += reader->cf->value_cnt;
702       reader->case_idx++;
703
704       case_clone (c, &reader->c);
705       return true;
706     }
707 }
708
709 /* Reads the next case from READER into C and transfers ownership
710    to the caller.  Caller is responsible for destroying C.
711    Returns true if successful, false at end of file or on I/O
712    error. */
713 bool
714 casereader_read_xfer (struct casereader *reader, struct ccase *c)
715 {
716   assert (reader != NULL);
717
718   if (reader->destructive == 0
719       || reader->case_idx >= reader->cf->case_cnt
720       || reader->cf->storage == DISK) 
721     return casereader_read (reader, c);
722   else 
723     {
724       size_t block_idx = reader->case_idx / CASES_PER_BLOCK;
725       size_t case_idx = reader->case_idx % CASES_PER_BLOCK;
726       struct ccase *read_case = &reader->cf->cases[block_idx][case_idx];
727
728       case_move (c, read_case);
729       reader->case_idx++;
730       return true;
731     }
732 }
733
734 /* Sets the next case to be read by READER to CASE_IDX,
735    which must be less than the number of cases in the casefile.
736    Allowed only for random readers. */
737 void
738 casereader_seek (struct casereader *reader, unsigned long case_idx) 
739 {
740   assert (reader != NULL);
741   assert (reader->random);
742   assert (case_idx < reader->cf->case_cnt);
743
744   reader->case_idx = case_idx;
745   if (reader->cf->storage == DISK)
746     seek_and_fill_buffer (reader);
747 }
748
749 /* Destroys READER. */
750 void
751 casereader_destroy (struct casereader *reader)
752 {
753   assert (reader != NULL);
754
755   if (reader->next != NULL)
756     reader->next->prev = reader->prev;
757   if (reader->prev != NULL)
758     reader->prev->next = reader->next;
759   if (reader->cf->readers == reader)
760     reader->cf->readers = reader->next;
761
762   if (reader->cf->buffer == NULL)
763     reader->cf->buffer = reader->buffer;
764   else
765     free (reader->buffer);
766
767   if (reader->fd != -1) 
768     {
769       if (reader->cf->fd == -1)
770         reader->cf->fd = reader->fd;
771       else
772         safe_close (reader->fd);
773     }
774   
775   case_destroy (&reader->c);
776
777   free (reader);
778 }
779
780 /* Marks CF as having encountered an I/O error.
781    If this is the first error on CF, reports FORMAT to the user,
782    doing printf()-style substitutions. */
783 static void
784 io_error (struct casefile *cf, const char *format, ...)
785 {
786   if (cf->ok) 
787     {
788       struct msg m;
789       va_list args;
790
791       m.category = MSG_GENERAL;
792       m.severity = MSG_ERROR;
793       m.where.file_name = NULL;
794       m.where.line_number = -1;
795       va_start (args, format);
796       m.text = xvasprintf (format, args);
797       va_end (args);
798       
799       msg_emit (&m);
800     }
801   cf->ok = false;
802 }
803
804 /* Calls open(), passing FILE_NAME and FLAGS, repeating as necessary
805    to deal with interrupted calls. */
806 static int
807 safe_open (const char *file_name, int flags) 
808 {
809   int fd;
810
811   do 
812     {
813       fd = open (file_name, flags);
814     }
815   while (fd == -1 && errno == EINTR);
816
817   return fd;
818 }
819
820 /* Calls close(), passing FD, repeating as necessary to deal with
821    interrupted calls. */
822 static int safe_close (int fd) 
823 {
824   int retval;
825
826   do 
827     {
828       retval = close (fd);
829     }
830   while (retval == -1 && errno == EINTR);
831
832   return retval;
833 }
834
835 /* Registers our exit handler with atexit() if it has not already
836    been registered. */
837 static void
838 register_atexit (void) 
839 {
840   static bool registered = false;
841   if (!registered) 
842     {
843       registered = true;
844       atexit (exit_handler);
845     }
846 }
847
848 /* atexit() handler that closes and deletes our temporary
849    files. */
850 static void
851 exit_handler (void) 
852 {
853   while (casefiles != NULL)
854     casefile_destroy (casefiles);
855 }