Patch #5209
[pspp-builds.git] / src / data / fastfile.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 2004, 2006 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21 #include "casefile.h"
22 #include "casefile-private.h"
23 #include "fastfile.h"
24 #include <assert.h>
25 #include <errno.h>
26 #include <fcntl.h>
27 #include <stdio.h>
28 #include <stdlib.h>
29 #include <string.h>
30 #include <unistd.h>
31 #include <libpspp/alloc.h>
32 #include "case.h"
33 #include <libpspp/compiler.h>
34 #include <libpspp/message.h>
35 #include "full-read.h"
36 #include "full-write.h"
37 #include <libpspp/misc.h>
38 #include "make-file.h"
39 #include "settings.h"
40 #include "variable.h"
41
42 #include "gettext.h"
43 #define _(msgid) gettext (msgid)
44
45 #define IO_BUF_SIZE (8192 / sizeof (union value))
46
47 /* A fastfile represents a sequentially accessible stream of
48    immutable cases.
49
50    If workspace allows, a fastfile is maintained in memory.  If
51    workspace overflows, then the fastfile is pushed to disk.  In
52    either case the interface presented to callers is kept the
53    same.
54
55    The life cycle of a fastfile consists of up to three phases:
56
57        1. Writing.  The fastfile initially contains no cases.  In
58           this phase, any number of cases may be appended to the
59           end of a fastfile.  (Cases are never inserted in the
60           middle or before the beginning of a fastfile.)
61
62           Use casefile_append or casefile_append_xfer to
63           append a case to a fastfile.
64
65        2. Reading.  The fastfile may be read sequentially,
66           starting from the beginning, by "casereaders".  Any
67           number of casereaders may be created, at any time,
68           during the reading phase.  Each casereader has an
69           independent position in the fastfile.
70
71           Ordinary casereaders may only move forward.  They
72           cannot move backward to arbitrary records or seek
73           randomly.  Cloning casereaders is possible, but it is
74           not yet implemented.
75
76           Use casefile_get_reader to create a casereader for
77           use in phase 2.  This also transitions from phase 1 to
78           phase 2.  Calling fastfile_mode_reader makes the same
79           transition, without creating a casereader.
80
81           Use casereader_read or casereader_read_xfer to read
82           a case from a casereader.  Use casereader_destroy to
83           discard a casereader when it is no longer needed.
84
85        3. Destruction.  This phase is optional.  The fastfile is
86           also read with casereaders in this phase, but the
87           ability to create new casereaders is curtailed.
88
89           In this phase, casereaders could still be cloned (once
90           we eventually implement cloning).
91
92           To transition from phase 1 or 2 to phase 3 and create a
93           casereader, call casefile_get_destructive_reader().
94           The same functions apply to the casereader obtained
95           this way as apply to casereaders obtained in phase 2.
96           
97           After casefile_get_destructive_reader is called, no
98           more casereaders may be created.  (If cloning of
99           casereaders were implemented, it would still be
100           possible.)
101
102           The purpose of the limitations applied to casereaders
103           in phase 3 is to allow in-memory fastfiles to fully
104           transfer ownership of cases to the casereaders,
105           avoiding the need for extra copies of case data.  For
106           relatively static data sets with many variables, I
107           suspect (without evidence) that this may be a big
108           performance boost.
109
110    When a fastfile is no longer needed, it may be destroyed with
111    casefile_destroy.  This function will also destroy any
112    remaining casereaders. */
113
114 /* FIXME: should we implement compression? */
115
116 /* In-memory cases are arranged in an array of arrays.  The top
117    level is variable size and the size of each bottom level array
118    is fixed at the number of cases defined here.  */
119 #define CASES_PER_BLOCK 128
120
121 static const struct class_casefile class;
122
123 /* A fastfile. */
124 struct fastfile
125 {
126   struct casefile cf;           /* Parent */
127
128   size_t value_cnt;             /* Case size in `union value's. */
129   size_t case_acct_size;        /* Case size for accounting. */
130   unsigned long case_cnt;       /* Number of cases stored. */
131   enum { MEMORY, DISK } storage;        /* Where cases are stored. */
132   enum { WRITE, READ } mode;            /* Is writing or reading allowed? */
133
134   bool ok;                      /* False after I/O error. */
135
136   /* Memory storage. */
137   struct ccase **cases;         /* Pointer to array of cases. */
138
139   /* Disk storage. */
140   int fd;                       /* File descriptor, -1 if none. */
141   char *file_name;              /* File name. */
142   union value *buffer;          /* I/O buffer, NULL if none. */
143   size_t buffer_used;           /* Number of values used in buffer. */
144   size_t buffer_size;           /* Buffer size in values. */
145 };
146
147
148 static const struct class_casereader class_reader;
149
150 /* For reading out the cases in a fastfile. */
151 struct fastfilereader
152 {
153   struct casereader cr;         /* Parent */
154
155   unsigned long case_idx;       /* Case number of current case. */
156
157   /* Disk storage. */
158   int fd;                       /* File descriptor. */
159   off_t file_ofs;               /* Current position in fd. */
160   off_t buffer_ofs;             /* File offset of buffer start. */
161   union value *buffer;          /* I/O buffer. */
162   size_t buffer_pos;            /* Offset of buffer position. */
163   struct ccase c;               /* Current case. */
164 };
165
166
167 static void io_error (struct fastfile *, const char *, ...) 
168      PRINTF_FORMAT (2, 3);
169 static int safe_open (const char *file_name, int flags);
170 static int safe_close (int fd);
171 static void write_case_to_disk (struct fastfile *, const struct ccase *);
172 static void flush_buffer (struct fastfile *);
173
174 static void reader_open_file (struct fastfilereader *);
175
176 static void seek_and_fill_buffer (struct fastfilereader *);
177 static bool fill_buffer (struct fastfilereader *);
178
179
180 /* Number of bytes of case allocated in in-memory fastfiles. */
181 static size_t case_bytes;
182
183 /* Destroys READER. */
184 static void fastfilereader_destroy (struct casereader *cr)
185 {
186   struct fastfilereader *reader = (struct fastfilereader *) cr;
187   struct fastfile *ff = (struct fastfile *) casereader_get_casefile (cr);
188
189   if (ff->buffer == NULL)
190     ff->buffer = reader->buffer;
191   else
192     free (reader->buffer);
193
194   if (reader->fd != -1)
195     {
196       if (ff->fd == -1)
197         ff->fd = reader->fd;
198       else
199         safe_close (reader->fd);
200     }
201
202   case_destroy (&reader->c);
203
204   free (reader);
205 }
206
207
208
209 /* Return the case number of the current case */
210 static unsigned long
211 fastfilereader_cnum (const struct casereader *cr)
212 {
213   const struct fastfilereader *ffr = (const struct fastfilereader *) cr;
214   return ffr->case_idx;
215 }
216
217
218 /* Returns the next case pointed to by FFR and increments
219    FFR's pointer.  Returns NULL if FFR points beyond the last case.
220 */
221 static struct ccase *
222 fastfilereader_get_next_case (struct casereader *cr)
223 {
224   struct fastfile *ff = (struct fastfile *) casereader_get_casefile (cr);
225   struct fastfilereader *ffr = (struct fastfilereader *) cr;
226   struct ccase *read_case = NULL ;
227
228   if ( ffr->case_idx >= ff->case_cnt  ) 
229     return NULL ;
230
231   if (ff->storage == MEMORY )
232     {
233       size_t block_idx = ffr->case_idx / CASES_PER_BLOCK;
234       size_t case_idx = ffr->case_idx % CASES_PER_BLOCK;
235       read_case = &ff->cases[block_idx][case_idx];
236     }
237   else
238     {
239       if (ffr->buffer_pos + ff->value_cnt > ff->buffer_size)
240         {
241           if (!fill_buffer (ffr))
242             return NULL;
243           ffr->buffer_pos = 0;
244         }
245
246       case_from_values (&ffr->c, ffr->buffer + ffr->buffer_pos,
247                         ff->value_cnt);
248       ffr->buffer_pos += ff->value_cnt;
249       
250       read_case = &ffr->c;
251     }
252   ffr->case_idx++;
253
254   return read_case;
255 }
256
257 /* Creates and returns a casereader for CF.  A casereader can be used to
258    sequentially read the cases in a fastfile. */
259 static struct casereader *
260 fastfile_get_reader (const struct casefile *cf_)
261 {
262   struct casefile *cf = (struct casefile *) cf_;
263   struct fastfilereader *ffr = xzalloc (sizeof *ffr);
264   struct casereader *reader = (struct casereader *) ffr;
265   struct fastfile *ff = (struct fastfile *) cf;
266
267   assert (!cf->being_destroyed);
268
269   /* Flush the buffer to disk if it's not empty. */
270   if (ff->mode == WRITE && ff->storage == DISK)
271     flush_buffer (ff);
272
273   ff->mode = READ;
274
275   casereader_register (cf, reader, &class_reader);
276
277   ffr->case_idx = 0;
278   reader->destructive = 0;
279   ffr->fd = -1;
280   ffr->buffer = NULL;
281   ffr->buffer_pos = 0;
282   case_nullify (&ffr->c);
283
284   if (ff->storage == DISK)
285     reader_open_file (ffr);
286
287   return reader;
288 }
289
290 /* Returns the number of `union value's in a case for CF. */
291 static size_t
292 fastfile_get_value_cnt (const struct casefile *cf)
293 {
294   const struct fastfile *ff = (const struct fastfile *) cf;
295   return ff->value_cnt;
296 }
297
298 /* Appends a copy of case C to fastfile CF.  Not valid after any
299    reader for CF has been created.
300    Returns true if successful, false if an I/O error occurred. */
301 static bool
302 fastfile_append (struct casefile *cf, const struct ccase *c)
303 {
304   struct fastfile *ff = (struct fastfile *) cf;
305   assert (ff->mode == WRITE);
306   assert (c != NULL);
307
308   /* Try memory first. */
309   if (ff->storage == MEMORY)
310     {
311       if (case_bytes < get_workspace ())
312         {
313           size_t block_idx = ff->case_cnt / CASES_PER_BLOCK;
314           size_t case_idx = ff->case_cnt % CASES_PER_BLOCK;
315           struct ccase new_case;
316
317           case_bytes += ff->case_acct_size;
318           case_clone (&new_case, c);
319           if (case_idx == 0)
320             {
321               if ((block_idx & (block_idx - 1)) == 0)
322                 {
323                   size_t block_cap = block_idx == 0 ? 1 : block_idx * 2;
324                   ff->cases = xnrealloc (ff->cases,
325                                          block_cap, sizeof *ff->cases);
326                 }
327
328               ff->cases[block_idx] = xnmalloc (CASES_PER_BLOCK,
329                                                sizeof **ff->cases);
330             }
331
332           case_move (&ff->cases[block_idx][case_idx], &new_case);
333         }
334       else
335         {
336           casefile_to_disk (cf);
337           assert (ff->storage == DISK);
338           write_case_to_disk (ff, c);
339         }
340     }
341   else
342     write_case_to_disk (ff, c);
343
344   ff->case_cnt++;
345   return ff->ok;
346 }
347
348
349 /* Returns the number of cases in fastfile CF. */
350 static unsigned long
351 fastfile_get_case_cnt (const struct casefile *cf)
352 {
353   const struct fastfile *ff = (const struct fastfile *) cf;
354   return ff->case_cnt;
355 }
356
357
358 /* Returns true only if fastfile CF is stored in memory (instead of on
359    disk), false otherwise. */
360 static bool
361 fastfile_in_core (const struct casefile *cf)
362 {
363   const struct fastfile *ff = (const struct fastfile *) cf;
364   return (ff->storage == MEMORY);
365 }
366
367
368 /* If CF is currently stored in memory, writes it to disk.  Readers, if any,
369    retain their current positions.
370    Returns true if successful, false if an I/O error occurred. */
371 static bool
372 fastfile_to_disk (const struct casefile *cf_)
373 {
374   struct fastfile *ff = (struct fastfile *) cf_;
375   struct casefile *cf = &ff->cf;
376
377   if (ff->storage == MEMORY)
378     {
379       size_t idx, block_cnt;
380       struct casereader *reader;
381
382       assert (ff->file_name == NULL);
383       assert (ff->fd == -1);
384       assert (ff->buffer_used == 0);
385
386       if (!make_temp_file (&ff->fd, &ff->file_name))
387         {
388           ff->ok = false;
389           return false;
390         }
391       ff->storage = DISK;
392
393       ff->buffer = xnmalloc (ff->buffer_size, sizeof *ff->buffer);
394       memset (ff->buffer, 0, ff->buffer_size * sizeof *ff->buffer);
395
396       case_bytes -= ff->case_cnt * ff->case_acct_size;
397       for (idx = 0; idx < ff->case_cnt; idx++)
398         {
399           size_t block_idx = idx / CASES_PER_BLOCK;
400           size_t case_idx = idx % CASES_PER_BLOCK;
401           struct ccase *c = &ff->cases[block_idx][case_idx];
402           write_case_to_disk (ff, c);
403           case_destroy (c);
404         }
405
406       block_cnt = DIV_RND_UP (ff->case_cnt, CASES_PER_BLOCK);
407       for (idx = 0; idx < block_cnt; idx++)
408         free (ff->cases[idx]);
409
410       free (ff->cases);
411       ff->cases = NULL;
412
413       if (ff->mode == READ)
414         flush_buffer (ff);
415
416       ll_for_each (reader, struct casereader, ll, &cf->reader_list)
417         reader_open_file ((struct fastfilereader *) reader);
418
419     }
420   return ff->ok;
421 }
422
423 /* Puts a fastfile to "sleep", that is, minimizes the resources
424    needed for it by closing its file descriptor and freeing its
425    buffer.  This is useful if we need so many fastfiles that we
426    might not have enough memory and file descriptors to go
427    around.
428
429    For simplicity, this implementation always converts the
430    fastfile to reader mode.  If this turns out to be a problem,
431    with a little extra work we could also support sleeping
432    writers.
433
434    Returns true if successful, false if an I/O error occurred. */
435 static bool
436 fastfile_sleep (const struct casefile *cf_)
437 {
438   struct fastfile *ff = (struct fastfile *) cf_;
439   struct casefile *cf = &ff->cf;
440
441   fastfile_to_disk (cf);
442   flush_buffer (ff);
443
444   if (ff->fd != -1)
445     {
446       safe_close (ff->fd);
447       ff->fd = -1;
448     }
449   if (ff->buffer != NULL)
450     {
451       free (ff->buffer);
452       ff->buffer = NULL;
453     }
454
455   return ff->ok;
456 }
457
458
459 /* Returns true if an I/O error has occurred in fastfile CF. */
460 static bool
461 fastfile_error (const struct casefile *cf)
462 {
463   const struct fastfile *ff = (const struct fastfile *) cf;
464   return !ff->ok;
465 }
466
467 /* Destroys fastfile CF. */
468 static void
469 fastfile_destroy (struct casefile *cf)
470 {
471   struct fastfile *ff = (struct fastfile *) cf;
472
473   if (cf != NULL)
474     {
475       if (ff->cases != NULL)
476         {
477           size_t idx, block_cnt;
478
479           case_bytes -= ff->case_cnt * ff->case_acct_size;
480           for (idx = 0; idx < ff->case_cnt; idx++)
481             {
482               size_t block_idx = idx / CASES_PER_BLOCK;
483               size_t case_idx = idx % CASES_PER_BLOCK;
484               struct ccase *c = &ff->cases[block_idx][case_idx];
485               case_destroy (c);
486             }
487
488           block_cnt = DIV_RND_UP (ff->case_cnt, CASES_PER_BLOCK);
489           for (idx = 0; idx < block_cnt; idx++)
490             free (ff->cases[idx]);
491
492           free (ff->cases);
493         }
494
495       if (ff->fd != -1)
496         safe_close (ff->fd);
497
498       if (ff->file_name != NULL && remove (ff->file_name) == -1)
499         io_error (ff, _("%s: Removing temporary file: %s."),
500                   ff->file_name, strerror (errno));
501       free (ff->file_name);
502
503       free (ff->buffer);
504
505       free (ff);
506     }
507 }
508
509
510 /* Creates and returns a fastfile to store cases of VALUE_CNT
511    `union value's each. */
512 struct casefile *
513 fastfile_create (size_t value_cnt)
514 {
515   struct fastfile *ff = xzalloc (sizeof *ff);
516   struct casefile *cf = &ff->cf;
517
518   casefile_register (cf, &class);
519
520   ff->value_cnt = value_cnt;
521   ff->case_acct_size = (ff->value_cnt + 4) * sizeof *ff->buffer;
522   ff->case_cnt = 0;
523   ff->storage = MEMORY;
524   ff->mode = WRITE;
525   cf->being_destroyed = false;
526   ff->ok = true;
527   ff->cases = NULL;
528   ff->fd = -1;
529   ff->file_name = NULL;
530   ff->buffer = NULL;
531   ff->buffer_size = ROUND_UP (ff->value_cnt, IO_BUF_SIZE);
532   if (ff->value_cnt > 0 && ff->buffer_size % ff->value_cnt > 64)
533     ff->buffer_size = ff->value_cnt;
534   ff->buffer_used = 0;
535
536   return cf;
537 }
538
539
540
541 /* Marks FF as having encountered an I/O error.
542    If this is the first error on CF, reports FORMAT to the user,
543    doing printf()-style substitutions. */
544 static void
545 io_error (struct fastfile *ff, const char *format, ...)
546 {
547   if (ff->ok)
548     {
549       struct msg m;
550       va_list args;
551
552       m.category = MSG_GENERAL;
553       m.severity = MSG_ERROR;
554       m.where.file_name = NULL;
555       m.where.line_number = -1;
556       va_start (args, format);
557       m.text = xvasprintf (format, args);
558       va_end (args);
559
560       msg_emit (&m);
561     }
562   ff->ok = false;
563 }
564
565 /* Calls open(), passing FILE_NAME and FLAGS, repeating as necessary
566    to deal with interrupted calls. */
567 static int
568 safe_open (const char *file_name, int flags)
569 {
570   int fd;
571
572   do
573     {
574       fd = open (file_name, flags);
575     }
576   while (fd == -1 && errno == EINTR);
577
578   return fd;
579 }
580
581 /* Calls close(), passing FD, repeating as necessary to deal with
582    interrupted calls. */
583 static int
584 safe_close (int fd)
585 {
586   int retval;
587
588   do
589     {
590       retval = close (fd);
591     }
592   while (retval == -1 && errno == EINTR);
593
594   return retval;
595 }
596
597
598 /* Writes case C to fastfile CF's disk buffer, first flushing the buffer to
599    disk if it would otherwise overflow.
600    Returns true if successful, false if an I/O error occurred. */
601 static void
602 write_case_to_disk (struct fastfile *ff, const struct ccase *c)
603 {
604   if (!ff->ok)
605     return;
606
607   case_to_values (c, ff->buffer + ff->buffer_used, ff->value_cnt);
608   ff->buffer_used += ff->value_cnt;
609   if (ff->buffer_used + ff->value_cnt > ff->buffer_size)
610     flush_buffer (ff);
611 }
612
613
614 /* If any bytes in FF's output buffer are used, flush them to
615    disk. */
616 static void
617 flush_buffer (struct fastfile *ff)
618 {
619   if (ff->ok && ff->buffer_used > 0)
620     {
621       if (!full_write (ff->fd, ff->buffer,
622                        ff->buffer_size * sizeof *ff->buffer))
623         io_error (ff, _("Error writing temporary file: %s."),
624                   strerror (errno));
625       ff->buffer_used = 0;
626     }
627 }
628
629
630 /* Opens a disk file for READER and seeks to the current position as indicated
631    by case_idx.  Normally the current position is the beginning of the file,
632    but fastfile_to_disk may cause the file to be opened at a different
633    position. */
634 static void
635 reader_open_file (struct fastfilereader *reader)
636 {
637   struct casefile *cf = casereader_get_casefile(&reader->cr);
638   struct fastfile *ff = (struct fastfile *) cf;
639   if (!ff->ok || reader->case_idx >= ff->case_cnt)
640     return;
641
642   if (ff->fd != -1)
643     {
644       reader->fd = ff->fd;
645       ff->fd = -1;
646     }
647   else
648     {
649       reader->fd = safe_open (ff->file_name, O_RDONLY);
650       if (reader->fd < 0)
651         io_error (ff, _("%s: Opening temporary file: %s."),
652                   ff->file_name, strerror (errno));
653     }
654
655   if (ff->buffer != NULL)
656     {
657       reader->buffer = ff->buffer;
658       ff->buffer = NULL;
659     }
660   else
661     {
662       reader->buffer = xnmalloc (ff->buffer_size, sizeof *ff->buffer);
663       memset (reader->buffer, 0, ff->buffer_size * sizeof *ff->buffer);
664     }
665
666   case_create (&reader->c, ff->value_cnt);
667
668   reader->buffer_ofs = -1;
669   reader->file_ofs = -1;
670   seek_and_fill_buffer (reader);
671 }
672
673 /* Seeks the backing file for READER to the proper position and
674    refreshes the buffer contents. */
675 static void
676 seek_and_fill_buffer (struct fastfilereader *reader)
677 {
678   struct casefile *cf = casereader_get_casefile(&reader->cr);
679   struct fastfile *ff = (struct fastfile *) cf;
680   off_t new_ofs;
681
682   if (ff->value_cnt != 0)
683     {
684       size_t buffer_case_cnt = ff->buffer_size / ff->value_cnt;
685       new_ofs = ((off_t) reader->case_idx / buffer_case_cnt
686                  * ff->buffer_size * sizeof *ff->buffer);
687       reader->buffer_pos = (reader->case_idx % buffer_case_cnt
688                             * ff->value_cnt);
689     }
690   else
691     new_ofs = 0;
692   if (new_ofs != reader->file_ofs)
693     {
694       if (lseek (reader->fd, new_ofs, SEEK_SET) != new_ofs)
695         io_error (ff, _("%s: Seeking temporary file: %s."),
696                   ff->file_name, strerror (errno));
697       else
698         reader->file_ofs = new_ofs;
699     }
700
701   if (ff->case_cnt > 0 && ff->value_cnt > 0 && reader->buffer_ofs != new_ofs)
702     fill_buffer (reader);
703 }
704
705 /* Fills READER's buffer by reading a block from disk. */
706 static bool
707 fill_buffer (struct fastfilereader *reader)
708 {
709   struct casefile *cf = casereader_get_casefile(&reader->cr);
710   struct fastfile *ff = (struct fastfile *) cf;
711   if (ff->ok)
712     {
713       int bytes = full_read (reader->fd, reader->buffer,
714                              ff->buffer_size *
715                              sizeof *reader->buffer);
716       if (bytes < 0)
717         io_error (ff, _("%s: Reading temporary file: %s."),
718                   ff->file_name, strerror (errno));
719       else if (bytes != ff->buffer_size * sizeof *reader->buffer)
720         io_error (ff, _("%s: Temporary file ended unexpectedly."),
721                   ff->file_name);
722       else
723         {
724           reader->buffer_ofs = reader->file_ofs;
725           reader->file_ofs += bytes;
726         }
727     }
728   return ff->ok;
729 }
730
731 static const struct class_casefile class = 
732   {
733     fastfile_destroy,
734     fastfile_error,
735     fastfile_get_value_cnt,
736     fastfile_get_case_cnt,
737     fastfile_get_reader,
738     fastfile_append,
739
740
741     fastfile_in_core,
742     fastfile_to_disk,
743     fastfile_sleep,
744   };
745
746 static const struct class_casereader class_reader = 
747   {
748     fastfilereader_get_next_case,
749     fastfilereader_cnum,
750     fastfilereader_destroy,
751   };