Slightly generalize case_to_values and case_from_values functions, and
[pspp-builds.git] / src / data / fastfile.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 2004, 2006, 2007 Free Software Foundation, Inc.
3
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License as
6    published by the Free Software Foundation; either version 2 of the
7    License, or (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful, but
10    WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12    General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
17    02110-1301, USA. */
18
19 #include <config.h>
20
21 #include "casefile.h"
22 #include "casefile-private.h"
23 #include "fastfile.h"
24
25 #include <assert.h>
26 #include <errno.h>
27 #include <fcntl.h>
28 #include <stdio.h>
29 #include <stdlib.h>
30 #include <string.h>
31 #include <unistd.h>
32
33 #include <data/case.h>
34 #include <data/make-file.h>
35 #include <data/settings.h>
36 #include <data/variable.h>
37 #include <libpspp/alloc.h>
38 #include <libpspp/compiler.h>
39 #include <libpspp/message.h>
40 #include <libpspp/misc.h>
41 #include <libpspp/str.h>
42
43 #include "full-read.h"
44 #include "full-write.h"
45
46 #include "gettext.h"
47 #define _(msgid) gettext (msgid)
48
49 #define IO_BUF_SIZE (8192 / sizeof (union value))
50
51 /* A fastfile represents a sequentially accessible stream of
52    immutable cases.
53
54    If workspace allows, a fastfile is maintained in memory.  If
55    workspace overflows, then the fastfile is pushed to disk.  In
56    either case the interface presented to callers is kept the
57    same.
58
59    The life cycle of a fastfile consists of up to three phases:
60
61        1. Writing.  The fastfile initially contains no cases.  In
62           this phase, any number of cases may be appended to the
63           end of a fastfile.  (Cases are never inserted in the
64           middle or before the beginning of a fastfile.)
65
66           Use casefile_append or casefile_append_xfer to
67           append a case to a fastfile.
68
69        2. Reading.  The fastfile may be read sequentially,
70           starting from the beginning, by "casereaders".  Any
71           number of casereaders may be created, at any time,
72           during the reading phase.  Each casereader has an
73           independent position in the fastfile.
74
75           Ordinary casereaders may only move forward.  They
76           cannot move backward to arbitrary records or seek
77           randomly.  Cloning casereaders is possible, but it is
78           not yet implemented.
79
80           Use casefile_get_reader to create a casereader for
81           use in phase 2.  This also transitions from phase 1 to
82           phase 2.  Calling fastfile_mode_reader makes the same
83           transition, without creating a casereader.
84
85           Use casereader_read or casereader_read_xfer to read
86           a case from a casereader.  Use casereader_destroy to
87           discard a casereader when it is no longer needed.
88
89        3. Destruction.  This phase is optional.  The fastfile is
90           also read with casereaders in this phase, but the
91           ability to create new casereaders is curtailed.
92
93           In this phase, casereaders could still be cloned.
94
95           To transition from phase 1 or 2 to phase 3 and create a
96           casereader, call casefile_get_destructive_reader().
97           The same functions apply to the casereader obtained
98           this way as apply to casereaders obtained in phase 2.
99           
100           After casefile_get_destructive_reader is called, no
101           more casereaders may be created.  (If cloning of
102           casereaders were implemented, it would still be
103           possible.)
104
105           The purpose of the limitations applied to casereaders
106           in phase 3 is to allow in-memory fastfiles to fully
107           transfer ownership of cases to the casereaders,
108           avoiding the need for extra copies of case data.  For
109           relatively static data sets with many variables, I
110           suspect (without evidence) that this may be a big
111           performance boost.
112
113    When a fastfile is no longer needed, it may be destroyed with
114    casefile_destroy.  This function will also destroy any
115    remaining casereaders. */
116
117 /* FIXME: should we implement compression? */
118
119 /* In-memory cases are arranged in an array of arrays.  The top
120    level is variable size and the size of each bottom level array
121    is fixed at the number of cases defined here.  */
122 #define CASES_PER_BLOCK 128
123
124 static const struct class_casefile class;
125
126 /* A fastfile. */
127 struct fastfile
128 {
129   struct casefile cf;           /* Parent */
130
131   size_t value_cnt;             /* Case size in `union value's. */
132   size_t case_acct_size;        /* Case size for accounting. */
133   unsigned long case_cnt;       /* Number of cases stored. */
134   enum { MEMORY, DISK } storage;        /* Where cases are stored. */
135   enum { WRITE, READ } mode;            /* Is writing or reading allowed? */
136
137   bool ok;                      /* False after I/O error. */
138
139   /* Memory storage. */
140   struct ccase **cases;         /* Pointer to array of cases. */
141
142   /* Disk storage. */
143   int fd;                       /* File descriptor, -1 if none. */
144   char *file_name;              /* File name. */
145   union value *buffer;          /* I/O buffer, NULL if none. */
146   size_t buffer_used;           /* Number of values used in buffer. */
147   size_t buffer_size;           /* Buffer size in values. */
148 };
149
150
151 static const struct class_casereader class_reader;
152
153 /* For reading out the cases in a fastfile. */
154 struct fastfilereader
155 {
156   struct casereader cr;         /* Parent */
157
158   unsigned long case_idx;       /* Case number of current case. */
159
160   /* Disk storage. */
161   int fd;                       /* File descriptor. */
162   off_t file_ofs;               /* Current position in fd. */
163   off_t buffer_ofs;             /* File offset of buffer start. */
164   union value *buffer;          /* I/O buffer. */
165   size_t buffer_pos;            /* Offset of buffer position. */
166   struct ccase c;               /* Current case. */
167 };
168
169
170 static void io_error (struct fastfile *, const char *, ...) 
171      PRINTF_FORMAT (2, 3);
172 static int safe_open (const char *file_name, int flags);
173 static int safe_close (int fd);
174 static void write_case_to_disk (struct fastfile *, const struct ccase *);
175 static void flush_buffer (struct fastfile *);
176
177 static void reader_open_file (struct fastfilereader *);
178
179 static void seek_and_fill_buffer (struct fastfilereader *);
180 static bool fill_buffer (struct fastfilereader *);
181
182
183 /* Number of bytes of case allocated in in-memory fastfiles. */
184 static size_t case_bytes;
185
186 /* Destroys READER. */
187 static void fastfilereader_destroy (struct casereader *cr)
188 {
189   struct fastfilereader *reader = (struct fastfilereader *) cr;
190   struct fastfile *ff = (struct fastfile *) casereader_get_casefile (cr);
191
192   if (ff->buffer == NULL)
193     ff->buffer = reader->buffer;
194   else
195     free (reader->buffer);
196
197   if (reader->fd != -1)
198     {
199       if (ff->fd == -1)
200         ff->fd = reader->fd;
201       else
202         safe_close (reader->fd);
203     }
204
205   case_destroy (&reader->c);
206
207   free (reader);
208 }
209
210
211
212 /* Return the case number of the current case */
213 static unsigned long
214 fastfilereader_cnum (const struct casereader *cr)
215 {
216   const struct fastfilereader *ffr = (const struct fastfilereader *) cr;
217   return ffr->case_idx;
218 }
219
220
221 /* Returns the next case pointed to by FFR and increments
222    FFR's pointer.  Returns NULL if FFR points beyond the last case.
223 */
224 static struct ccase *
225 fastfilereader_get_next_case (struct casereader *cr)
226 {
227   struct fastfile *ff = (struct fastfile *) casereader_get_casefile (cr);
228   struct fastfilereader *ffr = (struct fastfilereader *) cr;
229   struct ccase *read_case = NULL ;
230
231   if ( ffr->case_idx >= ff->case_cnt  ) 
232     return NULL ;
233
234   if (ff->storage == MEMORY )
235     {
236       size_t block_idx = ffr->case_idx / CASES_PER_BLOCK;
237       size_t case_idx = ffr->case_idx % CASES_PER_BLOCK;
238       read_case = &ff->cases[block_idx][case_idx];
239     }
240   else
241     {
242       if (ffr->buffer_pos + ff->value_cnt > ff->buffer_size)
243         {
244           if (!fill_buffer (ffr))
245             return NULL;
246           ffr->buffer_pos = 0;
247         }
248
249       case_copy_in (&ffr->c, 0, ffr->buffer + ffr->buffer_pos, ff->value_cnt);
250       ffr->buffer_pos += ff->value_cnt;
251       
252       read_case = &ffr->c;
253     }
254   ffr->case_idx++;
255
256   return read_case;
257 }
258
259 /* Creates and returns a casereader for CF.  A casereader can be used to
260    sequentially read the cases in a fastfile. */
261 static struct casereader *
262 fastfile_get_reader (const struct casefile *cf_)
263 {
264   struct casefile *cf = (struct casefile *) cf_;
265   struct fastfilereader *ffr = xzalloc (sizeof *ffr);
266   struct casereader *reader = (struct casereader *) ffr;
267   struct fastfile *ff = (struct fastfile *) cf;
268
269   assert (!cf->being_destroyed);
270
271   /* Flush the buffer to disk if it's not empty. */
272   if (ff->mode == WRITE && ff->storage == DISK)
273     flush_buffer (ff);
274
275   ff->mode = READ;
276
277   casereader_register (cf, reader, &class_reader);
278
279   ffr->case_idx = 0;
280   reader->destructive = 0;
281   ffr->fd = -1;
282   ffr->buffer = NULL;
283   ffr->buffer_pos = 0;
284   case_nullify (&ffr->c);
285
286   if (ff->storage == DISK)
287     reader_open_file (ffr);
288
289   return reader;
290 }
291
292
293 /* Creates a copy of the casereader CR, and returns it */
294 static struct casereader *
295 fastfilereader_clone (const struct casereader *cr)
296 {
297   const struct fastfilereader *ffr = (const struct fastfilereader *) cr ;
298   struct fastfilereader *new_ffr = xzalloc (sizeof *new_ffr);
299
300   struct casereader *new_reader = (struct casereader *) new_ffr;
301
302   struct casefile *cf =  casereader_get_casefile (cr);
303   struct fastfile *ff = (struct fastfile *) cf;
304
305   assert (!cf->being_destroyed);
306
307   /* Flush the buffer to disk if it's not empty. */
308   if (ff->mode == WRITE && ff->storage == DISK)
309     flush_buffer (ff);
310
311   ff->mode = READ;
312
313   casereader_register (cf, new_reader, &class_reader);
314
315   new_ffr->case_idx = ffr->case_idx ;
316   new_reader->destructive = cr->destructive;
317   new_ffr->fd = ffr->fd ;
318   new_ffr->buffer = ffr->buffer ;
319   new_ffr->buffer_pos = ffr->buffer_pos;
320
321   if (ff->storage == DISK)
322     reader_open_file (new_ffr);
323
324   return new_reader;
325 }
326
327
328
329
330 /* Returns the number of `union value's in a case for CF. */
331 static size_t
332 fastfile_get_value_cnt (const struct casefile *cf)
333 {
334   const struct fastfile *ff = (const struct fastfile *) cf;
335   return ff->value_cnt;
336 }
337
338 /* Appends a copy of case C to fastfile CF.  Not valid after any
339    reader for CF has been created.
340    Returns true if successful, false if an I/O error occurred. */
341 static bool
342 fastfile_append (struct casefile *cf, const struct ccase *c)
343 {
344   struct fastfile *ff = (struct fastfile *) cf;
345   assert (ff->mode == WRITE);
346   assert (c != NULL);
347
348   /* Try memory first. */
349   if (ff->storage == MEMORY)
350     {
351       if (case_bytes < get_workspace ())
352         {
353           size_t block_idx = ff->case_cnt / CASES_PER_BLOCK;
354           size_t case_idx = ff->case_cnt % CASES_PER_BLOCK;
355           struct ccase new_case;
356
357           case_bytes += ff->case_acct_size;
358           case_clone (&new_case, c);
359           if (case_idx == 0)
360             {
361               if ((block_idx & (block_idx - 1)) == 0)
362                 {
363                   size_t block_cap = block_idx == 0 ? 1 : block_idx * 2;
364                   ff->cases = xnrealloc (ff->cases,
365                                          block_cap, sizeof *ff->cases);
366                 }
367
368               ff->cases[block_idx] = xnmalloc (CASES_PER_BLOCK,
369                                                sizeof **ff->cases);
370             }
371
372           case_move (&ff->cases[block_idx][case_idx], &new_case);
373         }
374       else
375         {
376           casefile_to_disk (cf);
377           assert (ff->storage == DISK);
378           write_case_to_disk (ff, c);
379         }
380     }
381   else
382     write_case_to_disk (ff, c);
383
384   ff->case_cnt++;
385   return ff->ok;
386 }
387
388
389 /* Returns the number of cases in fastfile CF. */
390 static unsigned long
391 fastfile_get_case_cnt (const struct casefile *cf)
392 {
393   const struct fastfile *ff = (const struct fastfile *) cf;
394   return ff->case_cnt;
395 }
396
397
398 /* Returns true only if fastfile CF is stored in memory (instead of on
399    disk), false otherwise. */
400 static bool
401 fastfile_in_core (const struct casefile *cf)
402 {
403   const struct fastfile *ff = (const struct fastfile *) cf;
404   return (ff->storage == MEMORY);
405 }
406
407
408 /* If CF is currently stored in memory, writes it to disk.  Readers, if any,
409    retain their current positions.
410    Returns true if successful, false if an I/O error occurred. */
411 static bool
412 fastfile_to_disk (const struct casefile *cf_)
413 {
414   struct fastfile *ff = (struct fastfile *) cf_;
415   struct casefile *cf = &ff->cf;
416
417   if (ff->storage == MEMORY)
418     {
419       size_t idx, block_cnt;
420       struct casereader *reader;
421
422       assert (ff->file_name == NULL);
423       assert (ff->fd == -1);
424       assert (ff->buffer_used == 0);
425
426       if (!make_temp_file (&ff->fd, &ff->file_name))
427         {
428           ff->ok = false;
429           return false;
430         }
431       ff->storage = DISK;
432
433       ff->buffer = xnmalloc (ff->buffer_size, sizeof *ff->buffer);
434       memset (ff->buffer, 0, ff->buffer_size * sizeof *ff->buffer);
435
436       case_bytes -= ff->case_cnt * ff->case_acct_size;
437       for (idx = 0; idx < ff->case_cnt; idx++)
438         {
439           size_t block_idx = idx / CASES_PER_BLOCK;
440           size_t case_idx = idx % CASES_PER_BLOCK;
441           struct ccase *c = &ff->cases[block_idx][case_idx];
442           write_case_to_disk (ff, c);
443           case_destroy (c);
444         }
445
446       block_cnt = DIV_RND_UP (ff->case_cnt, CASES_PER_BLOCK);
447       for (idx = 0; idx < block_cnt; idx++)
448         free (ff->cases[idx]);
449
450       free (ff->cases);
451       ff->cases = NULL;
452
453       if (ff->mode == READ)
454         flush_buffer (ff);
455
456       ll_for_each (reader, struct casereader, ll, &cf->reader_list)
457         reader_open_file ((struct fastfilereader *) reader);
458
459     }
460   return ff->ok;
461 }
462
463 /* Puts a fastfile to "sleep", that is, minimizes the resources
464    needed for it by closing its file descriptor and freeing its
465    buffer.  This is useful if we need so many fastfiles that we
466    might not have enough memory and file descriptors to go
467    around.
468
469    For simplicity, this implementation always converts the
470    fastfile to reader mode.  If this turns out to be a problem,
471    with a little extra work we could also support sleeping
472    writers.
473
474    Returns true if successful, false if an I/O error occurred. */
475 static bool
476 fastfile_sleep (const struct casefile *cf_)
477 {
478   struct fastfile *ff = (struct fastfile *) cf_;
479   struct casefile *cf = &ff->cf;
480
481   fastfile_to_disk (cf);
482   flush_buffer (ff);
483
484   if (ff->fd != -1)
485     {
486       safe_close (ff->fd);
487       ff->fd = -1;
488     }
489   if (ff->buffer != NULL)
490     {
491       free (ff->buffer);
492       ff->buffer = NULL;
493     }
494
495   return ff->ok;
496 }
497
498
499 /* Returns true if an I/O error has occurred in fastfile CF. */
500 static bool
501 fastfile_error (const struct casefile *cf)
502 {
503   const struct fastfile *ff = (const struct fastfile *) cf;
504   return !ff->ok;
505 }
506
507 /* Destroys fastfile CF. */
508 static void
509 fastfile_destroy (struct casefile *cf)
510 {
511   struct fastfile *ff = (struct fastfile *) cf;
512
513   if (cf != NULL)
514     {
515       if (ff->cases != NULL)
516         {
517           size_t idx, block_cnt;
518
519           case_bytes -= ff->case_cnt * ff->case_acct_size;
520           for (idx = 0; idx < ff->case_cnt; idx++)
521             {
522               size_t block_idx = idx / CASES_PER_BLOCK;
523               size_t case_idx = idx % CASES_PER_BLOCK;
524               struct ccase *c = &ff->cases[block_idx][case_idx];
525               case_destroy (c);
526             }
527
528           block_cnt = DIV_RND_UP (ff->case_cnt, CASES_PER_BLOCK);
529           for (idx = 0; idx < block_cnt; idx++)
530             free (ff->cases[idx]);
531
532           free (ff->cases);
533         }
534
535       if (ff->fd != -1)
536         safe_close (ff->fd);
537
538       if (ff->file_name != NULL && remove (ff->file_name) == -1)
539         io_error (ff, _("%s: Removing temporary file: %s."),
540                   ff->file_name, strerror (errno));
541       free (ff->file_name);
542
543       free (ff->buffer);
544
545       free (ff);
546     }
547 }
548
549
550 /* Creates and returns a fastfile to store cases of VALUE_CNT
551    `union value's each. */
552 struct casefile *
553 fastfile_create (size_t value_cnt)
554 {
555   struct fastfile *ff = xzalloc (sizeof *ff);
556   struct casefile *cf = &ff->cf;
557
558   casefile_register (cf, &class);
559
560   ff->value_cnt = value_cnt;
561   ff->case_acct_size = (ff->value_cnt + 4) * sizeof *ff->buffer;
562   ff->case_cnt = 0;
563   ff->storage = MEMORY;
564   ff->mode = WRITE;
565   cf->being_destroyed = false;
566   ff->ok = true;
567   ff->cases = NULL;
568   ff->fd = -1;
569   ff->file_name = NULL;
570   ff->buffer = NULL;
571   ff->buffer_size = ROUND_UP (ff->value_cnt, IO_BUF_SIZE);
572   if (ff->value_cnt > 0 && ff->buffer_size % ff->value_cnt > 64)
573     ff->buffer_size = ff->value_cnt;
574   ff->buffer_used = 0;
575
576   return cf;
577 }
578
579
580
581 /* Marks FF as having encountered an I/O error.
582    If this is the first error on CF, reports FORMAT to the user,
583    doing printf()-style substitutions. */
584 static void
585 io_error (struct fastfile *ff, const char *format, ...)
586 {
587   if (ff->ok)
588     {
589       struct msg m;
590       va_list args;
591
592       m.category = MSG_GENERAL;
593       m.severity = MSG_ERROR;
594       m.where.file_name = NULL;
595       m.where.line_number = -1;
596       va_start (args, format);
597       m.text = xvasprintf (format, args);
598       va_end (args);
599
600       msg_emit (&m);
601     }
602   ff->ok = false;
603 }
604
605 /* Calls open(), passing FILE_NAME and FLAGS, repeating as necessary
606    to deal with interrupted calls. */
607 static int
608 safe_open (const char *file_name, int flags)
609 {
610   int fd;
611
612   do
613     {
614       fd = open (file_name, flags);
615     }
616   while (fd == -1 && errno == EINTR);
617
618   return fd;
619 }
620
621 /* Calls close(), passing FD, repeating as necessary to deal with
622    interrupted calls. */
623 static int
624 safe_close (int fd)
625 {
626   int retval;
627
628   do
629     {
630       retval = close (fd);
631     }
632   while (retval == -1 && errno == EINTR);
633
634   return retval;
635 }
636
637
638 /* Writes case C to fastfile CF's disk buffer, first flushing the buffer to
639    disk if it would otherwise overflow.
640    Returns true if successful, false if an I/O error occurred. */
641 static void
642 write_case_to_disk (struct fastfile *ff, const struct ccase *c)
643 {
644   if (!ff->ok)
645     return;
646
647   case_copy_out (c, 0, ff->buffer + ff->buffer_used, ff->value_cnt);
648   ff->buffer_used += ff->value_cnt;
649   if (ff->buffer_used + ff->value_cnt > ff->buffer_size)
650     flush_buffer (ff);
651 }
652
653
654 /* If any bytes in FF's output buffer are used, flush them to
655    disk. */
656 static void
657 flush_buffer (struct fastfile *ff)
658 {
659   if (ff->ok && ff->buffer_used > 0)
660     {
661       if (!full_write (ff->fd, ff->buffer,
662                        ff->buffer_size * sizeof *ff->buffer))
663         io_error (ff, _("Error writing temporary file: %s."),
664                   strerror (errno));
665       ff->buffer_used = 0;
666     }
667 }
668
669
670 /* Opens a disk file for READER and seeks to the current position as indicated
671    by case_idx.  Normally the current position is the beginning of the file,
672    but fastfile_to_disk may cause the file to be opened at a different
673    position. */
674 static void
675 reader_open_file (struct fastfilereader *reader)
676 {
677   struct casefile *cf = casereader_get_casefile(&reader->cr);
678   struct fastfile *ff = (struct fastfile *) cf;
679   if (!ff->ok || reader->case_idx >= ff->case_cnt)
680     return;
681
682   if (ff->fd != -1)
683     {
684       reader->fd = ff->fd;
685       ff->fd = -1;
686     }
687   else
688     {
689       reader->fd = safe_open (ff->file_name, O_RDONLY);
690       if (reader->fd < 0)
691         io_error (ff, _("%s: Opening temporary file: %s."),
692                   ff->file_name, strerror (errno));
693     }
694
695   if (ff->buffer != NULL)
696     {
697       reader->buffer = ff->buffer;
698       ff->buffer = NULL;
699     }
700   else
701     {
702       reader->buffer = xnmalloc (ff->buffer_size, sizeof *ff->buffer);
703       memset (reader->buffer, 0, ff->buffer_size * sizeof *ff->buffer);
704     }
705
706   case_create (&reader->c, ff->value_cnt);
707
708   reader->buffer_ofs = -1;
709   reader->file_ofs = -1;
710   seek_and_fill_buffer (reader);
711 }
712
713 /* Seeks the backing file for READER to the proper position and
714    refreshes the buffer contents. */
715 static void
716 seek_and_fill_buffer (struct fastfilereader *reader)
717 {
718   struct casefile *cf = casereader_get_casefile(&reader->cr);
719   struct fastfile *ff = (struct fastfile *) cf;
720   off_t new_ofs;
721
722   if (ff->value_cnt != 0)
723     {
724       size_t buffer_case_cnt = ff->buffer_size / ff->value_cnt;
725       new_ofs = ((off_t) reader->case_idx / buffer_case_cnt
726                  * ff->buffer_size * sizeof *ff->buffer);
727       reader->buffer_pos = (reader->case_idx % buffer_case_cnt
728                             * ff->value_cnt);
729     }
730   else
731     new_ofs = 0;
732   if (new_ofs != reader->file_ofs)
733     {
734       if (lseek (reader->fd, new_ofs, SEEK_SET) != new_ofs)
735         io_error (ff, _("%s: Seeking temporary file: %s."),
736                   ff->file_name, strerror (errno));
737       else
738         reader->file_ofs = new_ofs;
739     }
740
741   if (ff->case_cnt > 0 && ff->value_cnt > 0 && reader->buffer_ofs != new_ofs)
742     fill_buffer (reader);
743 }
744
745 /* Fills READER's buffer by reading a block from disk. */
746 static bool
747 fill_buffer (struct fastfilereader *reader)
748 {
749   struct casefile *cf = casereader_get_casefile(&reader->cr);
750   struct fastfile *ff = (struct fastfile *) cf;
751   if (ff->ok)
752     {
753       int bytes = full_read (reader->fd, reader->buffer,
754                              ff->buffer_size *
755                              sizeof *reader->buffer);
756       if (bytes < 0)
757         io_error (ff, _("%s: Reading temporary file: %s."),
758                   ff->file_name, strerror (errno));
759       else if (bytes != ff->buffer_size * sizeof *reader->buffer)
760         io_error (ff, _("%s: Temporary file ended unexpectedly."),
761                   ff->file_name);
762       else
763         {
764           reader->buffer_ofs = reader->file_ofs;
765           reader->file_ofs += bytes;
766         }
767     }
768   return ff->ok;
769 }
770
771 static const struct class_casefile class = 
772   {
773     fastfile_destroy,
774     fastfile_error,
775     fastfile_get_value_cnt,
776     fastfile_get_case_cnt,
777     fastfile_get_reader,
778     fastfile_append,
779
780
781     fastfile_in_core,
782     fastfile_to_disk,
783     fastfile_sleep,
784   };
785
786 static const struct class_casereader class_reader = 
787   {
788     fastfilereader_get_next_case,
789     fastfilereader_cnum,
790     fastfilereader_destroy,
791     fastfilereader_clone,
792   };