Clean up and improve case code.
[pspp-builds.git] / src / data / fastfile.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 2004, 2006, 2007 Free Software Foundation, Inc.
3
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License as
6    published by the Free Software Foundation; either version 2 of the
7    License, or (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful, but
10    WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12    General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
17    02110-1301, USA. */
18
19 #include <config.h>
20
21 #include "casefile.h"
22 #include "casefile-private.h"
23 #include "fastfile.h"
24
25 #include <assert.h>
26 #include <errno.h>
27 #include <fcntl.h>
28 #include <stdio.h>
29 #include <stdlib.h>
30 #include <string.h>
31 #include <unistd.h>
32
33 #include <data/case.h>
34 #include <data/make-file.h>
35 #include <data/settings.h>
36 #include <data/variable.h>
37 #include <libpspp/alloc.h>
38 #include <libpspp/compiler.h>
39 #include <libpspp/message.h>
40 #include <libpspp/misc.h>
41 #include <libpspp/str.h>
42
43 #include "full-read.h"
44 #include "full-write.h"
45
46 #include "gettext.h"
47 #define _(msgid) gettext (msgid)
48
49 #define IO_BUF_SIZE (8192 / sizeof (union value))
50
51 /* A fastfile represents a sequentially accessible stream of
52    immutable cases.
53
54    If workspace allows, a fastfile is maintained in memory.  If
55    workspace overflows, then the fastfile is pushed to disk.  In
56    either case the interface presented to callers is kept the
57    same.
58
59    The life cycle of a fastfile consists of up to three phases:
60
61        1. Writing.  The fastfile initially contains no cases.  In
62           this phase, any number of cases may be appended to the
63           end of a fastfile.  (Cases are never inserted in the
64           middle or before the beginning of a fastfile.)
65
66           Use casefile_append or casefile_append_xfer to
67           append a case to a fastfile.
68
69        2. Reading.  The fastfile may be read sequentially,
70           starting from the beginning, by "casereaders".  Any
71           number of casereaders may be created, at any time,
72           during the reading phase.  Each casereader has an
73           independent position in the fastfile.
74
75           Ordinary casereaders may only move forward.  They
76           cannot move backward to arbitrary records or seek
77           randomly.  Cloning casereaders is possible, but it is
78           not yet implemented.
79
80           Use casefile_get_reader to create a casereader for
81           use in phase 2.  This also transitions from phase 1 to
82           phase 2.  Calling fastfile_mode_reader makes the same
83           transition, without creating a casereader.
84
85           Use casereader_read or casereader_read_xfer to read
86           a case from a casereader.  Use casereader_destroy to
87           discard a casereader when it is no longer needed.
88
89        3. Destruction.  This phase is optional.  The fastfile is
90           also read with casereaders in this phase, but the
91           ability to create new casereaders is curtailed.
92
93           In this phase, casereaders could still be cloned.
94
95           To transition from phase 1 or 2 to phase 3 and create a
96           casereader, call casefile_get_destructive_reader().
97           The same functions apply to the casereader obtained
98           this way as apply to casereaders obtained in phase 2.
99           
100           After casefile_get_destructive_reader is called, no
101           more casereaders may be created.  (If cloning of
102           casereaders were implemented, it would still be
103           possible.)
104
105           The purpose of the limitations applied to casereaders
106           in phase 3 is to allow in-memory fastfiles to fully
107           transfer ownership of cases to the casereaders,
108           avoiding the need for extra copies of case data.  For
109           relatively static data sets with many variables, I
110           suspect (without evidence) that this may be a big
111           performance boost.
112
113    When a fastfile is no longer needed, it may be destroyed with
114    casefile_destroy.  This function will also destroy any
115    remaining casereaders. */
116
117 /* FIXME: should we implement compression? */
118
119 /* In-memory cases are arranged in an array of arrays.  The top
120    level is variable size and the size of each bottom level array
121    is fixed at the number of cases defined here.  */
122 #define CASES_PER_BLOCK 128
123
124 static const struct class_casefile class;
125
126 /* A fastfile. */
127 struct fastfile
128 {
129   struct casefile cf;           /* Parent */
130
131   size_t value_cnt;             /* Case size in `union value's. */
132   size_t case_acct_size;        /* Case size for accounting. */
133   unsigned long case_cnt;       /* Number of cases stored. */
134   enum { MEMORY, DISK } storage;        /* Where cases are stored. */
135   enum { WRITE, READ } mode;            /* Is writing or reading allowed? */
136
137   bool ok;                      /* False after I/O error. */
138
139   /* Memory storage. */
140   struct ccase **cases;         /* Pointer to array of cases. */
141
142   /* Disk storage. */
143   int fd;                       /* File descriptor, -1 if none. */
144   char *file_name;              /* File name. */
145   union value *buffer;          /* I/O buffer, NULL if none. */
146   size_t buffer_used;           /* Number of values used in buffer. */
147   size_t buffer_size;           /* Buffer size in values. */
148 };
149
150
151 static const struct class_casereader class_reader;
152
153 /* For reading out the cases in a fastfile. */
154 struct fastfilereader
155 {
156   struct casereader cr;         /* Parent */
157
158   unsigned long case_idx;       /* Case number of current case. */
159
160   /* Disk storage. */
161   int fd;                       /* File descriptor. */
162   off_t file_ofs;               /* Current position in fd. */
163   off_t buffer_ofs;             /* File offset of buffer start. */
164   union value *buffer;          /* I/O buffer. */
165   size_t buffer_pos;            /* Offset of buffer position. */
166   struct ccase c;               /* Current case. */
167 };
168
169
170 static void io_error (struct fastfile *, const char *, ...) 
171      PRINTF_FORMAT (2, 3);
172 static int safe_open (const char *file_name, int flags);
173 static int safe_close (int fd);
174 static void write_case_to_disk (struct fastfile *, const struct ccase *);
175 static void flush_buffer (struct fastfile *);
176
177 static void reader_open_file (struct fastfilereader *);
178
179 static void seek_and_fill_buffer (struct fastfilereader *);
180 static bool fill_buffer (struct fastfilereader *);
181
182
183 /* Number of bytes of case allocated in in-memory fastfiles. */
184 static size_t case_bytes;
185
186 /* Destroys READER. */
187 static void fastfilereader_destroy (struct casereader *cr)
188 {
189   struct fastfilereader *reader = (struct fastfilereader *) cr;
190   struct fastfile *ff = (struct fastfile *) casereader_get_casefile (cr);
191
192   if (ff->buffer == NULL)
193     ff->buffer = reader->buffer;
194   else
195     free (reader->buffer);
196
197   if (reader->fd != -1)
198     {
199       if (ff->fd == -1)
200         ff->fd = reader->fd;
201       else
202         safe_close (reader->fd);
203     }
204
205   case_destroy (&reader->c);
206
207   free (reader);
208 }
209
210
211
212 /* Return the case number of the current case */
213 static unsigned long
214 fastfilereader_cnum (const struct casereader *cr)
215 {
216   const struct fastfilereader *ffr = (const struct fastfilereader *) cr;
217   return ffr->case_idx;
218 }
219
220
221 /* Returns the next case pointed to by FFR and increments
222    FFR's pointer.  Returns NULL if FFR points beyond the last case.
223 */
224 static struct ccase *
225 fastfilereader_get_next_case (struct casereader *cr)
226 {
227   struct fastfile *ff = (struct fastfile *) casereader_get_casefile (cr);
228   struct fastfilereader *ffr = (struct fastfilereader *) cr;
229   struct ccase *read_case = NULL ;
230
231   if ( ffr->case_idx >= ff->case_cnt  ) 
232     return NULL ;
233
234   if (ff->storage == MEMORY )
235     {
236       size_t block_idx = ffr->case_idx / CASES_PER_BLOCK;
237       size_t case_idx = ffr->case_idx % CASES_PER_BLOCK;
238       read_case = &ff->cases[block_idx][case_idx];
239     }
240   else
241     {
242       if (ffr->buffer_pos + ff->value_cnt > ff->buffer_size)
243         {
244           if (!fill_buffer (ffr))
245             return NULL;
246           ffr->buffer_pos = 0;
247         }
248
249       case_from_values (&ffr->c, ffr->buffer + ffr->buffer_pos,
250                         ff->value_cnt);
251       ffr->buffer_pos += ff->value_cnt;
252       
253       read_case = &ffr->c;
254     }
255   ffr->case_idx++;
256
257   return read_case;
258 }
259
260 /* Creates and returns a casereader for CF.  A casereader can be used to
261    sequentially read the cases in a fastfile. */
262 static struct casereader *
263 fastfile_get_reader (const struct casefile *cf_)
264 {
265   struct casefile *cf = (struct casefile *) cf_;
266   struct fastfilereader *ffr = xzalloc (sizeof *ffr);
267   struct casereader *reader = (struct casereader *) ffr;
268   struct fastfile *ff = (struct fastfile *) cf;
269
270   assert (!cf->being_destroyed);
271
272   /* Flush the buffer to disk if it's not empty. */
273   if (ff->mode == WRITE && ff->storage == DISK)
274     flush_buffer (ff);
275
276   ff->mode = READ;
277
278   casereader_register (cf, reader, &class_reader);
279
280   ffr->case_idx = 0;
281   reader->destructive = 0;
282   ffr->fd = -1;
283   ffr->buffer = NULL;
284   ffr->buffer_pos = 0;
285   case_nullify (&ffr->c);
286
287   if (ff->storage == DISK)
288     reader_open_file (ffr);
289
290   return reader;
291 }
292
293
294 /* Creates a copy of the casereader CR, and returns it */
295 static struct casereader *
296 fastfilereader_clone (const struct casereader *cr)
297 {
298   const struct fastfilereader *ffr = (const struct fastfilereader *) cr ;
299   struct fastfilereader *new_ffr = xzalloc (sizeof *new_ffr);
300
301   struct casereader *new_reader = (struct casereader *) new_ffr;
302
303   struct casefile *cf =  casereader_get_casefile (cr);
304   struct fastfile *ff = (struct fastfile *) cf;
305
306   assert (!cf->being_destroyed);
307
308   /* Flush the buffer to disk if it's not empty. */
309   if (ff->mode == WRITE && ff->storage == DISK)
310     flush_buffer (ff);
311
312   ff->mode = READ;
313
314   casereader_register (cf, new_reader, &class_reader);
315
316   new_ffr->case_idx = ffr->case_idx ;
317   new_reader->destructive = cr->destructive;
318   new_ffr->fd = ffr->fd ;
319   new_ffr->buffer = ffr->buffer ;
320   new_ffr->buffer_pos = ffr->buffer_pos;
321
322   if (ff->storage == DISK)
323     reader_open_file (new_ffr);
324
325   return new_reader;
326 }
327
328
329
330
331 /* Returns the number of `union value's in a case for CF. */
332 static size_t
333 fastfile_get_value_cnt (const struct casefile *cf)
334 {
335   const struct fastfile *ff = (const struct fastfile *) cf;
336   return ff->value_cnt;
337 }
338
339 /* Appends a copy of case C to fastfile CF.  Not valid after any
340    reader for CF has been created.
341    Returns true if successful, false if an I/O error occurred. */
342 static bool
343 fastfile_append (struct casefile *cf, const struct ccase *c)
344 {
345   struct fastfile *ff = (struct fastfile *) cf;
346   assert (ff->mode == WRITE);
347   assert (c != NULL);
348
349   /* Try memory first. */
350   if (ff->storage == MEMORY)
351     {
352       if (case_bytes < get_workspace ())
353         {
354           size_t block_idx = ff->case_cnt / CASES_PER_BLOCK;
355           size_t case_idx = ff->case_cnt % CASES_PER_BLOCK;
356           struct ccase new_case;
357
358           case_bytes += ff->case_acct_size;
359           case_clone (&new_case, c);
360           if (case_idx == 0)
361             {
362               if ((block_idx & (block_idx - 1)) == 0)
363                 {
364                   size_t block_cap = block_idx == 0 ? 1 : block_idx * 2;
365                   ff->cases = xnrealloc (ff->cases,
366                                          block_cap, sizeof *ff->cases);
367                 }
368
369               ff->cases[block_idx] = xnmalloc (CASES_PER_BLOCK,
370                                                sizeof **ff->cases);
371             }
372
373           case_move (&ff->cases[block_idx][case_idx], &new_case);
374         }
375       else
376         {
377           casefile_to_disk (cf);
378           assert (ff->storage == DISK);
379           write_case_to_disk (ff, c);
380         }
381     }
382   else
383     write_case_to_disk (ff, c);
384
385   ff->case_cnt++;
386   return ff->ok;
387 }
388
389
390 /* Returns the number of cases in fastfile CF. */
391 static unsigned long
392 fastfile_get_case_cnt (const struct casefile *cf)
393 {
394   const struct fastfile *ff = (const struct fastfile *) cf;
395   return ff->case_cnt;
396 }
397
398
399 /* Returns true only if fastfile CF is stored in memory (instead of on
400    disk), false otherwise. */
401 static bool
402 fastfile_in_core (const struct casefile *cf)
403 {
404   const struct fastfile *ff = (const struct fastfile *) cf;
405   return (ff->storage == MEMORY);
406 }
407
408
409 /* If CF is currently stored in memory, writes it to disk.  Readers, if any,
410    retain their current positions.
411    Returns true if successful, false if an I/O error occurred. */
412 static bool
413 fastfile_to_disk (const struct casefile *cf_)
414 {
415   struct fastfile *ff = (struct fastfile *) cf_;
416   struct casefile *cf = &ff->cf;
417
418   if (ff->storage == MEMORY)
419     {
420       size_t idx, block_cnt;
421       struct casereader *reader;
422
423       assert (ff->file_name == NULL);
424       assert (ff->fd == -1);
425       assert (ff->buffer_used == 0);
426
427       if (!make_temp_file (&ff->fd, &ff->file_name))
428         {
429           ff->ok = false;
430           return false;
431         }
432       ff->storage = DISK;
433
434       ff->buffer = xnmalloc (ff->buffer_size, sizeof *ff->buffer);
435       memset (ff->buffer, 0, ff->buffer_size * sizeof *ff->buffer);
436
437       case_bytes -= ff->case_cnt * ff->case_acct_size;
438       for (idx = 0; idx < ff->case_cnt; idx++)
439         {
440           size_t block_idx = idx / CASES_PER_BLOCK;
441           size_t case_idx = idx % CASES_PER_BLOCK;
442           struct ccase *c = &ff->cases[block_idx][case_idx];
443           write_case_to_disk (ff, c);
444           case_destroy (c);
445         }
446
447       block_cnt = DIV_RND_UP (ff->case_cnt, CASES_PER_BLOCK);
448       for (idx = 0; idx < block_cnt; idx++)
449         free (ff->cases[idx]);
450
451       free (ff->cases);
452       ff->cases = NULL;
453
454       if (ff->mode == READ)
455         flush_buffer (ff);
456
457       ll_for_each (reader, struct casereader, ll, &cf->reader_list)
458         reader_open_file ((struct fastfilereader *) reader);
459
460     }
461   return ff->ok;
462 }
463
464 /* Puts a fastfile to "sleep", that is, minimizes the resources
465    needed for it by closing its file descriptor and freeing its
466    buffer.  This is useful if we need so many fastfiles that we
467    might not have enough memory and file descriptors to go
468    around.
469
470    For simplicity, this implementation always converts the
471    fastfile to reader mode.  If this turns out to be a problem,
472    with a little extra work we could also support sleeping
473    writers.
474
475    Returns true if successful, false if an I/O error occurred. */
476 static bool
477 fastfile_sleep (const struct casefile *cf_)
478 {
479   struct fastfile *ff = (struct fastfile *) cf_;
480   struct casefile *cf = &ff->cf;
481
482   fastfile_to_disk (cf);
483   flush_buffer (ff);
484
485   if (ff->fd != -1)
486     {
487       safe_close (ff->fd);
488       ff->fd = -1;
489     }
490   if (ff->buffer != NULL)
491     {
492       free (ff->buffer);
493       ff->buffer = NULL;
494     }
495
496   return ff->ok;
497 }
498
499
500 /* Returns true if an I/O error has occurred in fastfile CF. */
501 static bool
502 fastfile_error (const struct casefile *cf)
503 {
504   const struct fastfile *ff = (const struct fastfile *) cf;
505   return !ff->ok;
506 }
507
508 /* Destroys fastfile CF. */
509 static void
510 fastfile_destroy (struct casefile *cf)
511 {
512   struct fastfile *ff = (struct fastfile *) cf;
513
514   if (cf != NULL)
515     {
516       if (ff->cases != NULL)
517         {
518           size_t idx, block_cnt;
519
520           case_bytes -= ff->case_cnt * ff->case_acct_size;
521           for (idx = 0; idx < ff->case_cnt; idx++)
522             {
523               size_t block_idx = idx / CASES_PER_BLOCK;
524               size_t case_idx = idx % CASES_PER_BLOCK;
525               struct ccase *c = &ff->cases[block_idx][case_idx];
526               case_destroy (c);
527             }
528
529           block_cnt = DIV_RND_UP (ff->case_cnt, CASES_PER_BLOCK);
530           for (idx = 0; idx < block_cnt; idx++)
531             free (ff->cases[idx]);
532
533           free (ff->cases);
534         }
535
536       if (ff->fd != -1)
537         safe_close (ff->fd);
538
539       if (ff->file_name != NULL && remove (ff->file_name) == -1)
540         io_error (ff, _("%s: Removing temporary file: %s."),
541                   ff->file_name, strerror (errno));
542       free (ff->file_name);
543
544       free (ff->buffer);
545
546       free (ff);
547     }
548 }
549
550
551 /* Creates and returns a fastfile to store cases of VALUE_CNT
552    `union value's each. */
553 struct casefile *
554 fastfile_create (size_t value_cnt)
555 {
556   struct fastfile *ff = xzalloc (sizeof *ff);
557   struct casefile *cf = &ff->cf;
558
559   casefile_register (cf, &class);
560
561   ff->value_cnt = value_cnt;
562   ff->case_acct_size = (ff->value_cnt + 4) * sizeof *ff->buffer;
563   ff->case_cnt = 0;
564   ff->storage = MEMORY;
565   ff->mode = WRITE;
566   cf->being_destroyed = false;
567   ff->ok = true;
568   ff->cases = NULL;
569   ff->fd = -1;
570   ff->file_name = NULL;
571   ff->buffer = NULL;
572   ff->buffer_size = ROUND_UP (ff->value_cnt, IO_BUF_SIZE);
573   if (ff->value_cnt > 0 && ff->buffer_size % ff->value_cnt > 64)
574     ff->buffer_size = ff->value_cnt;
575   ff->buffer_used = 0;
576
577   return cf;
578 }
579
580
581
582 /* Marks FF as having encountered an I/O error.
583    If this is the first error on CF, reports FORMAT to the user,
584    doing printf()-style substitutions. */
585 static void
586 io_error (struct fastfile *ff, const char *format, ...)
587 {
588   if (ff->ok)
589     {
590       struct msg m;
591       va_list args;
592
593       m.category = MSG_GENERAL;
594       m.severity = MSG_ERROR;
595       m.where.file_name = NULL;
596       m.where.line_number = -1;
597       va_start (args, format);
598       m.text = xvasprintf (format, args);
599       va_end (args);
600
601       msg_emit (&m);
602     }
603   ff->ok = false;
604 }
605
606 /* Calls open(), passing FILE_NAME and FLAGS, repeating as necessary
607    to deal with interrupted calls. */
608 static int
609 safe_open (const char *file_name, int flags)
610 {
611   int fd;
612
613   do
614     {
615       fd = open (file_name, flags);
616     }
617   while (fd == -1 && errno == EINTR);
618
619   return fd;
620 }
621
622 /* Calls close(), passing FD, repeating as necessary to deal with
623    interrupted calls. */
624 static int
625 safe_close (int fd)
626 {
627   int retval;
628
629   do
630     {
631       retval = close (fd);
632     }
633   while (retval == -1 && errno == EINTR);
634
635   return retval;
636 }
637
638
639 /* Writes case C to fastfile CF's disk buffer, first flushing the buffer to
640    disk if it would otherwise overflow.
641    Returns true if successful, false if an I/O error occurred. */
642 static void
643 write_case_to_disk (struct fastfile *ff, const struct ccase *c)
644 {
645   if (!ff->ok)
646     return;
647
648   case_to_values (c, ff->buffer + ff->buffer_used, ff->value_cnt);
649   ff->buffer_used += ff->value_cnt;
650   if (ff->buffer_used + ff->value_cnt > ff->buffer_size)
651     flush_buffer (ff);
652 }
653
654
655 /* If any bytes in FF's output buffer are used, flush them to
656    disk. */
657 static void
658 flush_buffer (struct fastfile *ff)
659 {
660   if (ff->ok && ff->buffer_used > 0)
661     {
662       if (!full_write (ff->fd, ff->buffer,
663                        ff->buffer_size * sizeof *ff->buffer))
664         io_error (ff, _("Error writing temporary file: %s."),
665                   strerror (errno));
666       ff->buffer_used = 0;
667     }
668 }
669
670
671 /* Opens a disk file for READER and seeks to the current position as indicated
672    by case_idx.  Normally the current position is the beginning of the file,
673    but fastfile_to_disk may cause the file to be opened at a different
674    position. */
675 static void
676 reader_open_file (struct fastfilereader *reader)
677 {
678   struct casefile *cf = casereader_get_casefile(&reader->cr);
679   struct fastfile *ff = (struct fastfile *) cf;
680   if (!ff->ok || reader->case_idx >= ff->case_cnt)
681     return;
682
683   if (ff->fd != -1)
684     {
685       reader->fd = ff->fd;
686       ff->fd = -1;
687     }
688   else
689     {
690       reader->fd = safe_open (ff->file_name, O_RDONLY);
691       if (reader->fd < 0)
692         io_error (ff, _("%s: Opening temporary file: %s."),
693                   ff->file_name, strerror (errno));
694     }
695
696   if (ff->buffer != NULL)
697     {
698       reader->buffer = ff->buffer;
699       ff->buffer = NULL;
700     }
701   else
702     {
703       reader->buffer = xnmalloc (ff->buffer_size, sizeof *ff->buffer);
704       memset (reader->buffer, 0, ff->buffer_size * sizeof *ff->buffer);
705     }
706
707   case_create (&reader->c, ff->value_cnt);
708
709   reader->buffer_ofs = -1;
710   reader->file_ofs = -1;
711   seek_and_fill_buffer (reader);
712 }
713
714 /* Seeks the backing file for READER to the proper position and
715    refreshes the buffer contents. */
716 static void
717 seek_and_fill_buffer (struct fastfilereader *reader)
718 {
719   struct casefile *cf = casereader_get_casefile(&reader->cr);
720   struct fastfile *ff = (struct fastfile *) cf;
721   off_t new_ofs;
722
723   if (ff->value_cnt != 0)
724     {
725       size_t buffer_case_cnt = ff->buffer_size / ff->value_cnt;
726       new_ofs = ((off_t) reader->case_idx / buffer_case_cnt
727                  * ff->buffer_size * sizeof *ff->buffer);
728       reader->buffer_pos = (reader->case_idx % buffer_case_cnt
729                             * ff->value_cnt);
730     }
731   else
732     new_ofs = 0;
733   if (new_ofs != reader->file_ofs)
734     {
735       if (lseek (reader->fd, new_ofs, SEEK_SET) != new_ofs)
736         io_error (ff, _("%s: Seeking temporary file: %s."),
737                   ff->file_name, strerror (errno));
738       else
739         reader->file_ofs = new_ofs;
740     }
741
742   if (ff->case_cnt > 0 && ff->value_cnt > 0 && reader->buffer_ofs != new_ofs)
743     fill_buffer (reader);
744 }
745
746 /* Fills READER's buffer by reading a block from disk. */
747 static bool
748 fill_buffer (struct fastfilereader *reader)
749 {
750   struct casefile *cf = casereader_get_casefile(&reader->cr);
751   struct fastfile *ff = (struct fastfile *) cf;
752   if (ff->ok)
753     {
754       int bytes = full_read (reader->fd, reader->buffer,
755                              ff->buffer_size *
756                              sizeof *reader->buffer);
757       if (bytes < 0)
758         io_error (ff, _("%s: Reading temporary file: %s."),
759                   ff->file_name, strerror (errno));
760       else if (bytes != ff->buffer_size * sizeof *reader->buffer)
761         io_error (ff, _("%s: Temporary file ended unexpectedly."),
762                   ff->file_name);
763       else
764         {
765           reader->buffer_ofs = reader->file_ofs;
766           reader->file_ofs += bytes;
767         }
768     }
769   return ff->ok;
770 }
771
772 static const struct class_casefile class = 
773   {
774     fastfile_destroy,
775     fastfile_error,
776     fastfile_get_value_cnt,
777     fastfile_get_case_cnt,
778     fastfile_get_reader,
779     fastfile_append,
780
781
782     fastfile_in_core,
783     fastfile_to_disk,
784     fastfile_sleep,
785   };
786
787 static const struct class_casereader class_reader = 
788   {
789     fastfilereader_get_next_case,
790     fastfilereader_cnum,
791     fastfilereader_destroy,
792     fastfilereader_clone,
793   };