Remove "Written by Ben Pfaff <blp@gnu.org>" lines everywhere.
[pspp-builds.git] / src / data / fastfile.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 2004, 2006 Free Software Foundation, Inc.
3
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License as
6    published by the Free Software Foundation; either version 2 of the
7    License, or (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful, but
10    WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12    General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
17    02110-1301, USA. */
18
19 #include <config.h>
20 #include "casefile.h"
21 #include "casefile-private.h"
22 #include "fastfile.h"
23 #include <assert.h>
24 #include <errno.h>
25 #include <fcntl.h>
26 #include <stdio.h>
27 #include <stdlib.h>
28 #include <string.h>
29 #include <unistd.h>
30 #include <libpspp/alloc.h>
31 #include "case.h"
32 #include <libpspp/compiler.h>
33 #include <libpspp/message.h>
34 #include "full-read.h"
35 #include "full-write.h"
36 #include <libpspp/misc.h>
37 #include "make-file.h"
38 #include "settings.h"
39 #include "variable.h"
40
41 #include "gettext.h"
42 #define _(msgid) gettext (msgid)
43
44 #define IO_BUF_SIZE (8192 / sizeof (union value))
45
46 /* A fastfile represents a sequentially accessible stream of
47    immutable cases.
48
49    If workspace allows, a fastfile is maintained in memory.  If
50    workspace overflows, then the fastfile is pushed to disk.  In
51    either case the interface presented to callers is kept the
52    same.
53
54    The life cycle of a fastfile consists of up to three phases:
55
56        1. Writing.  The fastfile initially contains no cases.  In
57           this phase, any number of cases may be appended to the
58           end of a fastfile.  (Cases are never inserted in the
59           middle or before the beginning of a fastfile.)
60
61           Use casefile_append or casefile_append_xfer to
62           append a case to a fastfile.
63
64        2. Reading.  The fastfile may be read sequentially,
65           starting from the beginning, by "casereaders".  Any
66           number of casereaders may be created, at any time,
67           during the reading phase.  Each casereader has an
68           independent position in the fastfile.
69
70           Ordinary casereaders may only move forward.  They
71           cannot move backward to arbitrary records or seek
72           randomly.  Cloning casereaders is possible, but it is
73           not yet implemented.
74
75           Use casefile_get_reader to create a casereader for
76           use in phase 2.  This also transitions from phase 1 to
77           phase 2.  Calling fastfile_mode_reader makes the same
78           transition, without creating a casereader.
79
80           Use casereader_read or casereader_read_xfer to read
81           a case from a casereader.  Use casereader_destroy to
82           discard a casereader when it is no longer needed.
83
84        3. Destruction.  This phase is optional.  The fastfile is
85           also read with casereaders in this phase, but the
86           ability to create new casereaders is curtailed.
87
88           In this phase, casereaders could still be cloned.
89
90           To transition from phase 1 or 2 to phase 3 and create a
91           casereader, call casefile_get_destructive_reader().
92           The same functions apply to the casereader obtained
93           this way as apply to casereaders obtained in phase 2.
94           
95           After casefile_get_destructive_reader is called, no
96           more casereaders may be created.  (If cloning of
97           casereaders were implemented, it would still be
98           possible.)
99
100           The purpose of the limitations applied to casereaders
101           in phase 3 is to allow in-memory fastfiles to fully
102           transfer ownership of cases to the casereaders,
103           avoiding the need for extra copies of case data.  For
104           relatively static data sets with many variables, I
105           suspect (without evidence) that this may be a big
106           performance boost.
107
108    When a fastfile is no longer needed, it may be destroyed with
109    casefile_destroy.  This function will also destroy any
110    remaining casereaders. */
111
112 /* FIXME: should we implement compression? */
113
114 /* In-memory cases are arranged in an array of arrays.  The top
115    level is variable size and the size of each bottom level array
116    is fixed at the number of cases defined here.  */
117 #define CASES_PER_BLOCK 128
118
119 static const struct class_casefile class;
120
121 /* A fastfile. */
122 struct fastfile
123 {
124   struct casefile cf;           /* Parent */
125
126   size_t value_cnt;             /* Case size in `union value's. */
127   size_t case_acct_size;        /* Case size for accounting. */
128   unsigned long case_cnt;       /* Number of cases stored. */
129   enum { MEMORY, DISK } storage;        /* Where cases are stored. */
130   enum { WRITE, READ } mode;            /* Is writing or reading allowed? */
131
132   bool ok;                      /* False after I/O error. */
133
134   /* Memory storage. */
135   struct ccase **cases;         /* Pointer to array of cases. */
136
137   /* Disk storage. */
138   int fd;                       /* File descriptor, -1 if none. */
139   char *file_name;              /* File name. */
140   union value *buffer;          /* I/O buffer, NULL if none. */
141   size_t buffer_used;           /* Number of values used in buffer. */
142   size_t buffer_size;           /* Buffer size in values. */
143 };
144
145
146 static const struct class_casereader class_reader;
147
148 /* For reading out the cases in a fastfile. */
149 struct fastfilereader
150 {
151   struct casereader cr;         /* Parent */
152
153   unsigned long case_idx;       /* Case number of current case. */
154
155   /* Disk storage. */
156   int fd;                       /* File descriptor. */
157   off_t file_ofs;               /* Current position in fd. */
158   off_t buffer_ofs;             /* File offset of buffer start. */
159   union value *buffer;          /* I/O buffer. */
160   size_t buffer_pos;            /* Offset of buffer position. */
161   struct ccase c;               /* Current case. */
162 };
163
164
165 static void io_error (struct fastfile *, const char *, ...) 
166      PRINTF_FORMAT (2, 3);
167 static int safe_open (const char *file_name, int flags);
168 static int safe_close (int fd);
169 static void write_case_to_disk (struct fastfile *, const struct ccase *);
170 static void flush_buffer (struct fastfile *);
171
172 static void reader_open_file (struct fastfilereader *);
173
174 static void seek_and_fill_buffer (struct fastfilereader *);
175 static bool fill_buffer (struct fastfilereader *);
176
177
178 /* Number of bytes of case allocated in in-memory fastfiles. */
179 static size_t case_bytes;
180
181 /* Destroys READER. */
182 static void fastfilereader_destroy (struct casereader *cr)
183 {
184   struct fastfilereader *reader = (struct fastfilereader *) cr;
185   struct fastfile *ff = (struct fastfile *) casereader_get_casefile (cr);
186
187   if (ff->buffer == NULL)
188     ff->buffer = reader->buffer;
189   else
190     free (reader->buffer);
191
192   if (reader->fd != -1)
193     {
194       if (ff->fd == -1)
195         ff->fd = reader->fd;
196       else
197         safe_close (reader->fd);
198     }
199
200   case_destroy (&reader->c);
201
202   free (reader);
203 }
204
205
206
207 /* Return the case number of the current case */
208 static unsigned long
209 fastfilereader_cnum (const struct casereader *cr)
210 {
211   const struct fastfilereader *ffr = (const struct fastfilereader *) cr;
212   return ffr->case_idx;
213 }
214
215
216 /* Returns the next case pointed to by FFR and increments
217    FFR's pointer.  Returns NULL if FFR points beyond the last case.
218 */
219 static struct ccase *
220 fastfilereader_get_next_case (struct casereader *cr)
221 {
222   struct fastfile *ff = (struct fastfile *) casereader_get_casefile (cr);
223   struct fastfilereader *ffr = (struct fastfilereader *) cr;
224   struct ccase *read_case = NULL ;
225
226   if ( ffr->case_idx >= ff->case_cnt  ) 
227     return NULL ;
228
229   if (ff->storage == MEMORY )
230     {
231       size_t block_idx = ffr->case_idx / CASES_PER_BLOCK;
232       size_t case_idx = ffr->case_idx % CASES_PER_BLOCK;
233       read_case = &ff->cases[block_idx][case_idx];
234     }
235   else
236     {
237       if (ffr->buffer_pos + ff->value_cnt > ff->buffer_size)
238         {
239           if (!fill_buffer (ffr))
240             return NULL;
241           ffr->buffer_pos = 0;
242         }
243
244       case_from_values (&ffr->c, ffr->buffer + ffr->buffer_pos,
245                         ff->value_cnt);
246       ffr->buffer_pos += ff->value_cnt;
247       
248       read_case = &ffr->c;
249     }
250   ffr->case_idx++;
251
252   return read_case;
253 }
254
255 /* Creates and returns a casereader for CF.  A casereader can be used to
256    sequentially read the cases in a fastfile. */
257 static struct casereader *
258 fastfile_get_reader (const struct casefile *cf_)
259 {
260   struct casefile *cf = (struct casefile *) cf_;
261   struct fastfilereader *ffr = xzalloc (sizeof *ffr);
262   struct casereader *reader = (struct casereader *) ffr;
263   struct fastfile *ff = (struct fastfile *) cf;
264
265   assert (!cf->being_destroyed);
266
267   /* Flush the buffer to disk if it's not empty. */
268   if (ff->mode == WRITE && ff->storage == DISK)
269     flush_buffer (ff);
270
271   ff->mode = READ;
272
273   casereader_register (cf, reader, &class_reader);
274
275   ffr->case_idx = 0;
276   reader->destructive = 0;
277   ffr->fd = -1;
278   ffr->buffer = NULL;
279   ffr->buffer_pos = 0;
280   case_nullify (&ffr->c);
281
282   if (ff->storage == DISK)
283     reader_open_file (ffr);
284
285   return reader;
286 }
287
288
289 /* Creates a copy of the casereader CR, and returns it */
290 static struct casereader *
291 fastfilereader_clone (const struct casereader *cr)
292 {
293   const struct fastfilereader *ffr = (const struct fastfilereader *) cr ;
294   struct fastfilereader *new_ffr = xzalloc (sizeof *new_ffr);
295
296   struct casereader *new_reader = (struct casereader *) new_ffr;
297
298   struct casefile *cf =  casereader_get_casefile (cr);
299   struct fastfile *ff = (struct fastfile *) cf;
300
301   assert (!cf->being_destroyed);
302
303   /* Flush the buffer to disk if it's not empty. */
304   if (ff->mode == WRITE && ff->storage == DISK)
305     flush_buffer (ff);
306
307   ff->mode = READ;
308
309   casereader_register (cf, new_reader, &class_reader);
310
311   new_ffr->case_idx = ffr->case_idx ;
312   new_reader->destructive = cr->destructive;
313   new_ffr->fd = ffr->fd ;
314   new_ffr->buffer = ffr->buffer ;
315   new_ffr->buffer_pos = ffr->buffer_pos;
316
317   if (ff->storage == DISK)
318     reader_open_file (new_ffr);
319
320   return new_reader;
321 }
322
323
324
325
326 /* Returns the number of `union value's in a case for CF. */
327 static size_t
328 fastfile_get_value_cnt (const struct casefile *cf)
329 {
330   const struct fastfile *ff = (const struct fastfile *) cf;
331   return ff->value_cnt;
332 }
333
334 /* Appends a copy of case C to fastfile CF.  Not valid after any
335    reader for CF has been created.
336    Returns true if successful, false if an I/O error occurred. */
337 static bool
338 fastfile_append (struct casefile *cf, const struct ccase *c)
339 {
340   struct fastfile *ff = (struct fastfile *) cf;
341   assert (ff->mode == WRITE);
342   assert (c != NULL);
343
344   /* Try memory first. */
345   if (ff->storage == MEMORY)
346     {
347       if (case_bytes < get_workspace ())
348         {
349           size_t block_idx = ff->case_cnt / CASES_PER_BLOCK;
350           size_t case_idx = ff->case_cnt % CASES_PER_BLOCK;
351           struct ccase new_case;
352
353           case_bytes += ff->case_acct_size;
354           case_clone (&new_case, c);
355           if (case_idx == 0)
356             {
357               if ((block_idx & (block_idx - 1)) == 0)
358                 {
359                   size_t block_cap = block_idx == 0 ? 1 : block_idx * 2;
360                   ff->cases = xnrealloc (ff->cases,
361                                          block_cap, sizeof *ff->cases);
362                 }
363
364               ff->cases[block_idx] = xnmalloc (CASES_PER_BLOCK,
365                                                sizeof **ff->cases);
366             }
367
368           case_move (&ff->cases[block_idx][case_idx], &new_case);
369         }
370       else
371         {
372           casefile_to_disk (cf);
373           assert (ff->storage == DISK);
374           write_case_to_disk (ff, c);
375         }
376     }
377   else
378     write_case_to_disk (ff, c);
379
380   ff->case_cnt++;
381   return ff->ok;
382 }
383
384
385 /* Returns the number of cases in fastfile CF. */
386 static unsigned long
387 fastfile_get_case_cnt (const struct casefile *cf)
388 {
389   const struct fastfile *ff = (const struct fastfile *) cf;
390   return ff->case_cnt;
391 }
392
393
394 /* Returns true only if fastfile CF is stored in memory (instead of on
395    disk), false otherwise. */
396 static bool
397 fastfile_in_core (const struct casefile *cf)
398 {
399   const struct fastfile *ff = (const struct fastfile *) cf;
400   return (ff->storage == MEMORY);
401 }
402
403
404 /* If CF is currently stored in memory, writes it to disk.  Readers, if any,
405    retain their current positions.
406    Returns true if successful, false if an I/O error occurred. */
407 static bool
408 fastfile_to_disk (const struct casefile *cf_)
409 {
410   struct fastfile *ff = (struct fastfile *) cf_;
411   struct casefile *cf = &ff->cf;
412
413   if (ff->storage == MEMORY)
414     {
415       size_t idx, block_cnt;
416       struct casereader *reader;
417
418       assert (ff->file_name == NULL);
419       assert (ff->fd == -1);
420       assert (ff->buffer_used == 0);
421
422       if (!make_temp_file (&ff->fd, &ff->file_name))
423         {
424           ff->ok = false;
425           return false;
426         }
427       ff->storage = DISK;
428
429       ff->buffer = xnmalloc (ff->buffer_size, sizeof *ff->buffer);
430       memset (ff->buffer, 0, ff->buffer_size * sizeof *ff->buffer);
431
432       case_bytes -= ff->case_cnt * ff->case_acct_size;
433       for (idx = 0; idx < ff->case_cnt; idx++)
434         {
435           size_t block_idx = idx / CASES_PER_BLOCK;
436           size_t case_idx = idx % CASES_PER_BLOCK;
437           struct ccase *c = &ff->cases[block_idx][case_idx];
438           write_case_to_disk (ff, c);
439           case_destroy (c);
440         }
441
442       block_cnt = DIV_RND_UP (ff->case_cnt, CASES_PER_BLOCK);
443       for (idx = 0; idx < block_cnt; idx++)
444         free (ff->cases[idx]);
445
446       free (ff->cases);
447       ff->cases = NULL;
448
449       if (ff->mode == READ)
450         flush_buffer (ff);
451
452       ll_for_each (reader, struct casereader, ll, &cf->reader_list)
453         reader_open_file ((struct fastfilereader *) reader);
454
455     }
456   return ff->ok;
457 }
458
459 /* Puts a fastfile to "sleep", that is, minimizes the resources
460    needed for it by closing its file descriptor and freeing its
461    buffer.  This is useful if we need so many fastfiles that we
462    might not have enough memory and file descriptors to go
463    around.
464
465    For simplicity, this implementation always converts the
466    fastfile to reader mode.  If this turns out to be a problem,
467    with a little extra work we could also support sleeping
468    writers.
469
470    Returns true if successful, false if an I/O error occurred. */
471 static bool
472 fastfile_sleep (const struct casefile *cf_)
473 {
474   struct fastfile *ff = (struct fastfile *) cf_;
475   struct casefile *cf = &ff->cf;
476
477   fastfile_to_disk (cf);
478   flush_buffer (ff);
479
480   if (ff->fd != -1)
481     {
482       safe_close (ff->fd);
483       ff->fd = -1;
484     }
485   if (ff->buffer != NULL)
486     {
487       free (ff->buffer);
488       ff->buffer = NULL;
489     }
490
491   return ff->ok;
492 }
493
494
495 /* Returns true if an I/O error has occurred in fastfile CF. */
496 static bool
497 fastfile_error (const struct casefile *cf)
498 {
499   const struct fastfile *ff = (const struct fastfile *) cf;
500   return !ff->ok;
501 }
502
503 /* Destroys fastfile CF. */
504 static void
505 fastfile_destroy (struct casefile *cf)
506 {
507   struct fastfile *ff = (struct fastfile *) cf;
508
509   if (cf != NULL)
510     {
511       if (ff->cases != NULL)
512         {
513           size_t idx, block_cnt;
514
515           case_bytes -= ff->case_cnt * ff->case_acct_size;
516           for (idx = 0; idx < ff->case_cnt; idx++)
517             {
518               size_t block_idx = idx / CASES_PER_BLOCK;
519               size_t case_idx = idx % CASES_PER_BLOCK;
520               struct ccase *c = &ff->cases[block_idx][case_idx];
521               case_destroy (c);
522             }
523
524           block_cnt = DIV_RND_UP (ff->case_cnt, CASES_PER_BLOCK);
525           for (idx = 0; idx < block_cnt; idx++)
526             free (ff->cases[idx]);
527
528           free (ff->cases);
529         }
530
531       if (ff->fd != -1)
532         safe_close (ff->fd);
533
534       if (ff->file_name != NULL && remove (ff->file_name) == -1)
535         io_error (ff, _("%s: Removing temporary file: %s."),
536                   ff->file_name, strerror (errno));
537       free (ff->file_name);
538
539       free (ff->buffer);
540
541       free (ff);
542     }
543 }
544
545
546 /* Creates and returns a fastfile to store cases of VALUE_CNT
547    `union value's each. */
548 struct casefile *
549 fastfile_create (size_t value_cnt)
550 {
551   struct fastfile *ff = xzalloc (sizeof *ff);
552   struct casefile *cf = &ff->cf;
553
554   casefile_register (cf, &class);
555
556   ff->value_cnt = value_cnt;
557   ff->case_acct_size = (ff->value_cnt + 4) * sizeof *ff->buffer;
558   ff->case_cnt = 0;
559   ff->storage = MEMORY;
560   ff->mode = WRITE;
561   cf->being_destroyed = false;
562   ff->ok = true;
563   ff->cases = NULL;
564   ff->fd = -1;
565   ff->file_name = NULL;
566   ff->buffer = NULL;
567   ff->buffer_size = ROUND_UP (ff->value_cnt, IO_BUF_SIZE);
568   if (ff->value_cnt > 0 && ff->buffer_size % ff->value_cnt > 64)
569     ff->buffer_size = ff->value_cnt;
570   ff->buffer_used = 0;
571
572   return cf;
573 }
574
575
576
577 /* Marks FF as having encountered an I/O error.
578    If this is the first error on CF, reports FORMAT to the user,
579    doing printf()-style substitutions. */
580 static void
581 io_error (struct fastfile *ff, const char *format, ...)
582 {
583   if (ff->ok)
584     {
585       struct msg m;
586       va_list args;
587
588       m.category = MSG_GENERAL;
589       m.severity = MSG_ERROR;
590       m.where.file_name = NULL;
591       m.where.line_number = -1;
592       va_start (args, format);
593       m.text = xvasprintf (format, args);
594       va_end (args);
595
596       msg_emit (&m);
597     }
598   ff->ok = false;
599 }
600
601 /* Calls open(), passing FILE_NAME and FLAGS, repeating as necessary
602    to deal with interrupted calls. */
603 static int
604 safe_open (const char *file_name, int flags)
605 {
606   int fd;
607
608   do
609     {
610       fd = open (file_name, flags);
611     }
612   while (fd == -1 && errno == EINTR);
613
614   return fd;
615 }
616
617 /* Calls close(), passing FD, repeating as necessary to deal with
618    interrupted calls. */
619 static int
620 safe_close (int fd)
621 {
622   int retval;
623
624   do
625     {
626       retval = close (fd);
627     }
628   while (retval == -1 && errno == EINTR);
629
630   return retval;
631 }
632
633
634 /* Writes case C to fastfile CF's disk buffer, first flushing the buffer to
635    disk if it would otherwise overflow.
636    Returns true if successful, false if an I/O error occurred. */
637 static void
638 write_case_to_disk (struct fastfile *ff, const struct ccase *c)
639 {
640   if (!ff->ok)
641     return;
642
643   case_to_values (c, ff->buffer + ff->buffer_used, ff->value_cnt);
644   ff->buffer_used += ff->value_cnt;
645   if (ff->buffer_used + ff->value_cnt > ff->buffer_size)
646     flush_buffer (ff);
647 }
648
649
650 /* If any bytes in FF's output buffer are used, flush them to
651    disk. */
652 static void
653 flush_buffer (struct fastfile *ff)
654 {
655   if (ff->ok && ff->buffer_used > 0)
656     {
657       if (!full_write (ff->fd, ff->buffer,
658                        ff->buffer_size * sizeof *ff->buffer))
659         io_error (ff, _("Error writing temporary file: %s."),
660                   strerror (errno));
661       ff->buffer_used = 0;
662     }
663 }
664
665
666 /* Opens a disk file for READER and seeks to the current position as indicated
667    by case_idx.  Normally the current position is the beginning of the file,
668    but fastfile_to_disk may cause the file to be opened at a different
669    position. */
670 static void
671 reader_open_file (struct fastfilereader *reader)
672 {
673   struct casefile *cf = casereader_get_casefile(&reader->cr);
674   struct fastfile *ff = (struct fastfile *) cf;
675   if (!ff->ok || reader->case_idx >= ff->case_cnt)
676     return;
677
678   if (ff->fd != -1)
679     {
680       reader->fd = ff->fd;
681       ff->fd = -1;
682     }
683   else
684     {
685       reader->fd = safe_open (ff->file_name, O_RDONLY);
686       if (reader->fd < 0)
687         io_error (ff, _("%s: Opening temporary file: %s."),
688                   ff->file_name, strerror (errno));
689     }
690
691   if (ff->buffer != NULL)
692     {
693       reader->buffer = ff->buffer;
694       ff->buffer = NULL;
695     }
696   else
697     {
698       reader->buffer = xnmalloc (ff->buffer_size, sizeof *ff->buffer);
699       memset (reader->buffer, 0, ff->buffer_size * sizeof *ff->buffer);
700     }
701
702   case_create (&reader->c, ff->value_cnt);
703
704   reader->buffer_ofs = -1;
705   reader->file_ofs = -1;
706   seek_and_fill_buffer (reader);
707 }
708
709 /* Seeks the backing file for READER to the proper position and
710    refreshes the buffer contents. */
711 static void
712 seek_and_fill_buffer (struct fastfilereader *reader)
713 {
714   struct casefile *cf = casereader_get_casefile(&reader->cr);
715   struct fastfile *ff = (struct fastfile *) cf;
716   off_t new_ofs;
717
718   if (ff->value_cnt != 0)
719     {
720       size_t buffer_case_cnt = ff->buffer_size / ff->value_cnt;
721       new_ofs = ((off_t) reader->case_idx / buffer_case_cnt
722                  * ff->buffer_size * sizeof *ff->buffer);
723       reader->buffer_pos = (reader->case_idx % buffer_case_cnt
724                             * ff->value_cnt);
725     }
726   else
727     new_ofs = 0;
728   if (new_ofs != reader->file_ofs)
729     {
730       if (lseek (reader->fd, new_ofs, SEEK_SET) != new_ofs)
731         io_error (ff, _("%s: Seeking temporary file: %s."),
732                   ff->file_name, strerror (errno));
733       else
734         reader->file_ofs = new_ofs;
735     }
736
737   if (ff->case_cnt > 0 && ff->value_cnt > 0 && reader->buffer_ofs != new_ofs)
738     fill_buffer (reader);
739 }
740
741 /* Fills READER's buffer by reading a block from disk. */
742 static bool
743 fill_buffer (struct fastfilereader *reader)
744 {
745   struct casefile *cf = casereader_get_casefile(&reader->cr);
746   struct fastfile *ff = (struct fastfile *) cf;
747   if (ff->ok)
748     {
749       int bytes = full_read (reader->fd, reader->buffer,
750                              ff->buffer_size *
751                              sizeof *reader->buffer);
752       if (bytes < 0)
753         io_error (ff, _("%s: Reading temporary file: %s."),
754                   ff->file_name, strerror (errno));
755       else if (bytes != ff->buffer_size * sizeof *reader->buffer)
756         io_error (ff, _("%s: Temporary file ended unexpectedly."),
757                   ff->file_name);
758       else
759         {
760           reader->buffer_ofs = reader->file_ofs;
761           reader->file_ofs += bytes;
762         }
763     }
764   return ff->ok;
765 }
766
767 static const struct class_casefile class = 
768   {
769     fastfile_destroy,
770     fastfile_error,
771     fastfile_get_value_cnt,
772     fastfile_get_case_cnt,
773     fastfile_get_reader,
774     fastfile_append,
775
776
777     fastfile_in_core,
778     fastfile_to_disk,
779     fastfile_sleep,
780   };
781
782 static const struct class_casereader class_reader = 
783   {
784     fastfilereader_get_next_case,
785     fastfilereader_cnum,
786     fastfilereader_destroy,
787     fastfilereader_clone,
788   };