1 /* PSPP - computes sample statistics.
2 Copyright (C) 2004, 2006 Free Software Foundation, Inc.
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License as
6 published by the Free Software Foundation; either version 2 of the
7 License, or (at your option) any later version.
9 This program is distributed in the hope that it will be useful, but
10 WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 #include "casefile-private.h"
30 #include <libpspp/alloc.h>
32 #include <libpspp/compiler.h>
33 #include <libpspp/message.h>
34 #include "full-read.h"
35 #include "full-write.h"
36 #include <libpspp/misc.h>
37 #include "make-file.h"
42 #define _(msgid) gettext (msgid)
44 #define IO_BUF_SIZE (8192 / sizeof (union value))
46 /* A fastfile represents a sequentially accessible stream of
49 If workspace allows, a fastfile is maintained in memory. If
50 workspace overflows, then the fastfile is pushed to disk. In
51 either case the interface presented to callers is kept the
54 The life cycle of a fastfile consists of up to three phases:
56 1. Writing. The fastfile initially contains no cases. In
57 this phase, any number of cases may be appended to the
58 end of a fastfile. (Cases are never inserted in the
59 middle or before the beginning of a fastfile.)
61 Use casefile_append or casefile_append_xfer to
62 append a case to a fastfile.
64 2. Reading. The fastfile may be read sequentially,
65 starting from the beginning, by "casereaders". Any
66 number of casereaders may be created, at any time,
67 during the reading phase. Each casereader has an
68 independent position in the fastfile.
70 Ordinary casereaders may only move forward. They
71 cannot move backward to arbitrary records or seek
72 randomly. Cloning casereaders is possible, but it is
75 Use casefile_get_reader to create a casereader for
76 use in phase 2. This also transitions from phase 1 to
77 phase 2. Calling fastfile_mode_reader makes the same
78 transition, without creating a casereader.
80 Use casereader_read or casereader_read_xfer to read
81 a case from a casereader. Use casereader_destroy to
82 discard a casereader when it is no longer needed.
84 3. Destruction. This phase is optional. The fastfile is
85 also read with casereaders in this phase, but the
86 ability to create new casereaders is curtailed.
88 In this phase, casereaders could still be cloned.
90 To transition from phase 1 or 2 to phase 3 and create a
91 casereader, call casefile_get_destructive_reader().
92 The same functions apply to the casereader obtained
93 this way as apply to casereaders obtained in phase 2.
95 After casefile_get_destructive_reader is called, no
96 more casereaders may be created. (If cloning of
97 casereaders were implemented, it would still be
100 The purpose of the limitations applied to casereaders
101 in phase 3 is to allow in-memory fastfiles to fully
102 transfer ownership of cases to the casereaders,
103 avoiding the need for extra copies of case data. For
104 relatively static data sets with many variables, I
105 suspect (without evidence) that this may be a big
108 When a fastfile is no longer needed, it may be destroyed with
109 casefile_destroy. This function will also destroy any
110 remaining casereaders. */
112 /* FIXME: should we implement compression? */
114 /* In-memory cases are arranged in an array of arrays. The top
115 level is variable size and the size of each bottom level array
116 is fixed at the number of cases defined here. */
117 #define CASES_PER_BLOCK 128
119 static const struct class_casefile class;
124 struct casefile cf; /* Parent */
126 size_t value_cnt; /* Case size in `union value's. */
127 size_t case_acct_size; /* Case size for accounting. */
128 unsigned long case_cnt; /* Number of cases stored. */
129 enum { MEMORY, DISK } storage; /* Where cases are stored. */
130 enum { WRITE, READ } mode; /* Is writing or reading allowed? */
132 bool ok; /* False after I/O error. */
134 /* Memory storage. */
135 struct ccase **cases; /* Pointer to array of cases. */
138 int fd; /* File descriptor, -1 if none. */
139 char *file_name; /* File name. */
140 union value *buffer; /* I/O buffer, NULL if none. */
141 size_t buffer_used; /* Number of values used in buffer. */
142 size_t buffer_size; /* Buffer size in values. */
146 static const struct class_casereader class_reader;
148 /* For reading out the cases in a fastfile. */
149 struct fastfilereader
151 struct casereader cr; /* Parent */
153 unsigned long case_idx; /* Case number of current case. */
156 int fd; /* File descriptor. */
157 off_t file_ofs; /* Current position in fd. */
158 off_t buffer_ofs; /* File offset of buffer start. */
159 union value *buffer; /* I/O buffer. */
160 size_t buffer_pos; /* Offset of buffer position. */
161 struct ccase c; /* Current case. */
165 static void io_error (struct fastfile *, const char *, ...)
166 PRINTF_FORMAT (2, 3);
167 static int safe_open (const char *file_name, int flags);
168 static int safe_close (int fd);
169 static void write_case_to_disk (struct fastfile *, const struct ccase *);
170 static void flush_buffer (struct fastfile *);
172 static void reader_open_file (struct fastfilereader *);
174 static void seek_and_fill_buffer (struct fastfilereader *);
175 static bool fill_buffer (struct fastfilereader *);
178 /* Number of bytes of case allocated in in-memory fastfiles. */
179 static size_t case_bytes;
181 /* Destroys READER. */
182 static void fastfilereader_destroy (struct casereader *cr)
184 struct fastfilereader *reader = (struct fastfilereader *) cr;
185 struct fastfile *ff = (struct fastfile *) casereader_get_casefile (cr);
187 if (ff->buffer == NULL)
188 ff->buffer = reader->buffer;
190 free (reader->buffer);
192 if (reader->fd != -1)
197 safe_close (reader->fd);
200 case_destroy (&reader->c);
207 /* Return the case number of the current case */
209 fastfilereader_cnum (const struct casereader *cr)
211 const struct fastfilereader *ffr = (const struct fastfilereader *) cr;
212 return ffr->case_idx;
216 /* Returns the next case pointed to by FFR and increments
217 FFR's pointer. Returns NULL if FFR points beyond the last case.
219 static struct ccase *
220 fastfilereader_get_next_case (struct casereader *cr)
222 struct fastfile *ff = (struct fastfile *) casereader_get_casefile (cr);
223 struct fastfilereader *ffr = (struct fastfilereader *) cr;
224 struct ccase *read_case = NULL ;
226 if ( ffr->case_idx >= ff->case_cnt )
229 if (ff->storage == MEMORY )
231 size_t block_idx = ffr->case_idx / CASES_PER_BLOCK;
232 size_t case_idx = ffr->case_idx % CASES_PER_BLOCK;
233 read_case = &ff->cases[block_idx][case_idx];
237 if (ffr->buffer_pos + ff->value_cnt > ff->buffer_size)
239 if (!fill_buffer (ffr))
244 case_from_values (&ffr->c, ffr->buffer + ffr->buffer_pos,
246 ffr->buffer_pos += ff->value_cnt;
255 /* Creates and returns a casereader for CF. A casereader can be used to
256 sequentially read the cases in a fastfile. */
257 static struct casereader *
258 fastfile_get_reader (const struct casefile *cf_)
260 struct casefile *cf = (struct casefile *) cf_;
261 struct fastfilereader *ffr = xzalloc (sizeof *ffr);
262 struct casereader *reader = (struct casereader *) ffr;
263 struct fastfile *ff = (struct fastfile *) cf;
265 assert (!cf->being_destroyed);
267 /* Flush the buffer to disk if it's not empty. */
268 if (ff->mode == WRITE && ff->storage == DISK)
273 casereader_register (cf, reader, &class_reader);
276 reader->destructive = 0;
280 case_nullify (&ffr->c);
282 if (ff->storage == DISK)
283 reader_open_file (ffr);
289 /* Creates a copy of the casereader CR, and returns it */
290 static struct casereader *
291 fastfilereader_clone (const struct casereader *cr)
293 const struct fastfilereader *ffr = (const struct fastfilereader *) cr ;
294 struct fastfilereader *new_ffr = xzalloc (sizeof *new_ffr);
296 struct casereader *new_reader = (struct casereader *) new_ffr;
298 struct casefile *cf = casereader_get_casefile (cr);
299 struct fastfile *ff = (struct fastfile *) cf;
301 assert (!cf->being_destroyed);
303 /* Flush the buffer to disk if it's not empty. */
304 if (ff->mode == WRITE && ff->storage == DISK)
309 casereader_register (cf, new_reader, &class_reader);
311 new_ffr->case_idx = ffr->case_idx ;
312 new_reader->destructive = cr->destructive;
313 new_ffr->fd = ffr->fd ;
314 new_ffr->buffer = ffr->buffer ;
315 new_ffr->buffer_pos = ffr->buffer_pos;
317 if (ff->storage == DISK)
318 reader_open_file (new_ffr);
326 /* Returns the number of `union value's in a case for CF. */
328 fastfile_get_value_cnt (const struct casefile *cf)
330 const struct fastfile *ff = (const struct fastfile *) cf;
331 return ff->value_cnt;
334 /* Appends a copy of case C to fastfile CF. Not valid after any
335 reader for CF has been created.
336 Returns true if successful, false if an I/O error occurred. */
338 fastfile_append (struct casefile *cf, const struct ccase *c)
340 struct fastfile *ff = (struct fastfile *) cf;
341 assert (ff->mode == WRITE);
344 /* Try memory first. */
345 if (ff->storage == MEMORY)
347 if (case_bytes < get_workspace ())
349 size_t block_idx = ff->case_cnt / CASES_PER_BLOCK;
350 size_t case_idx = ff->case_cnt % CASES_PER_BLOCK;
351 struct ccase new_case;
353 case_bytes += ff->case_acct_size;
354 case_clone (&new_case, c);
357 if ((block_idx & (block_idx - 1)) == 0)
359 size_t block_cap = block_idx == 0 ? 1 : block_idx * 2;
360 ff->cases = xnrealloc (ff->cases,
361 block_cap, sizeof *ff->cases);
364 ff->cases[block_idx] = xnmalloc (CASES_PER_BLOCK,
368 case_move (&ff->cases[block_idx][case_idx], &new_case);
372 casefile_to_disk (cf);
373 assert (ff->storage == DISK);
374 write_case_to_disk (ff, c);
378 write_case_to_disk (ff, c);
385 /* Returns the number of cases in fastfile CF. */
387 fastfile_get_case_cnt (const struct casefile *cf)
389 const struct fastfile *ff = (const struct fastfile *) cf;
394 /* Returns true only if fastfile CF is stored in memory (instead of on
395 disk), false otherwise. */
397 fastfile_in_core (const struct casefile *cf)
399 const struct fastfile *ff = (const struct fastfile *) cf;
400 return (ff->storage == MEMORY);
404 /* If CF is currently stored in memory, writes it to disk. Readers, if any,
405 retain their current positions.
406 Returns true if successful, false if an I/O error occurred. */
408 fastfile_to_disk (const struct casefile *cf_)
410 struct fastfile *ff = (struct fastfile *) cf_;
411 struct casefile *cf = &ff->cf;
413 if (ff->storage == MEMORY)
415 size_t idx, block_cnt;
416 struct casereader *reader;
418 assert (ff->file_name == NULL);
419 assert (ff->fd == -1);
420 assert (ff->buffer_used == 0);
422 if (!make_temp_file (&ff->fd, &ff->file_name))
429 ff->buffer = xnmalloc (ff->buffer_size, sizeof *ff->buffer);
430 memset (ff->buffer, 0, ff->buffer_size * sizeof *ff->buffer);
432 case_bytes -= ff->case_cnt * ff->case_acct_size;
433 for (idx = 0; idx < ff->case_cnt; idx++)
435 size_t block_idx = idx / CASES_PER_BLOCK;
436 size_t case_idx = idx % CASES_PER_BLOCK;
437 struct ccase *c = &ff->cases[block_idx][case_idx];
438 write_case_to_disk (ff, c);
442 block_cnt = DIV_RND_UP (ff->case_cnt, CASES_PER_BLOCK);
443 for (idx = 0; idx < block_cnt; idx++)
444 free (ff->cases[idx]);
449 if (ff->mode == READ)
452 ll_for_each (reader, struct casereader, ll, &cf->reader_list)
453 reader_open_file ((struct fastfilereader *) reader);
459 /* Puts a fastfile to "sleep", that is, minimizes the resources
460 needed for it by closing its file descriptor and freeing its
461 buffer. This is useful if we need so many fastfiles that we
462 might not have enough memory and file descriptors to go
465 For simplicity, this implementation always converts the
466 fastfile to reader mode. If this turns out to be a problem,
467 with a little extra work we could also support sleeping
470 Returns true if successful, false if an I/O error occurred. */
472 fastfile_sleep (const struct casefile *cf_)
474 struct fastfile *ff = (struct fastfile *) cf_;
475 struct casefile *cf = &ff->cf;
477 fastfile_to_disk (cf);
485 if (ff->buffer != NULL)
495 /* Returns true if an I/O error has occurred in fastfile CF. */
497 fastfile_error (const struct casefile *cf)
499 const struct fastfile *ff = (const struct fastfile *) cf;
503 /* Destroys fastfile CF. */
505 fastfile_destroy (struct casefile *cf)
507 struct fastfile *ff = (struct fastfile *) cf;
511 if (ff->cases != NULL)
513 size_t idx, block_cnt;
515 case_bytes -= ff->case_cnt * ff->case_acct_size;
516 for (idx = 0; idx < ff->case_cnt; idx++)
518 size_t block_idx = idx / CASES_PER_BLOCK;
519 size_t case_idx = idx % CASES_PER_BLOCK;
520 struct ccase *c = &ff->cases[block_idx][case_idx];
524 block_cnt = DIV_RND_UP (ff->case_cnt, CASES_PER_BLOCK);
525 for (idx = 0; idx < block_cnt; idx++)
526 free (ff->cases[idx]);
534 if (ff->file_name != NULL && remove (ff->file_name) == -1)
535 io_error (ff, _("%s: Removing temporary file: %s."),
536 ff->file_name, strerror (errno));
537 free (ff->file_name);
546 /* Creates and returns a fastfile to store cases of VALUE_CNT
547 `union value's each. */
549 fastfile_create (size_t value_cnt)
551 struct fastfile *ff = xzalloc (sizeof *ff);
552 struct casefile *cf = &ff->cf;
554 casefile_register (cf, &class);
556 ff->value_cnt = value_cnt;
557 ff->case_acct_size = (ff->value_cnt + 4) * sizeof *ff->buffer;
559 ff->storage = MEMORY;
561 cf->being_destroyed = false;
565 ff->file_name = NULL;
567 ff->buffer_size = ROUND_UP (ff->value_cnt, IO_BUF_SIZE);
568 if (ff->value_cnt > 0 && ff->buffer_size % ff->value_cnt > 64)
569 ff->buffer_size = ff->value_cnt;
577 /* Marks FF as having encountered an I/O error.
578 If this is the first error on CF, reports FORMAT to the user,
579 doing printf()-style substitutions. */
581 io_error (struct fastfile *ff, const char *format, ...)
588 m.category = MSG_GENERAL;
589 m.severity = MSG_ERROR;
590 m.where.file_name = NULL;
591 m.where.line_number = -1;
592 va_start (args, format);
593 m.text = xvasprintf (format, args);
601 /* Calls open(), passing FILE_NAME and FLAGS, repeating as necessary
602 to deal with interrupted calls. */
604 safe_open (const char *file_name, int flags)
610 fd = open (file_name, flags);
612 while (fd == -1 && errno == EINTR);
617 /* Calls close(), passing FD, repeating as necessary to deal with
618 interrupted calls. */
628 while (retval == -1 && errno == EINTR);
634 /* Writes case C to fastfile CF's disk buffer, first flushing the buffer to
635 disk if it would otherwise overflow.
636 Returns true if successful, false if an I/O error occurred. */
638 write_case_to_disk (struct fastfile *ff, const struct ccase *c)
643 case_to_values (c, ff->buffer + ff->buffer_used, ff->value_cnt);
644 ff->buffer_used += ff->value_cnt;
645 if (ff->buffer_used + ff->value_cnt > ff->buffer_size)
650 /* If any bytes in FF's output buffer are used, flush them to
653 flush_buffer (struct fastfile *ff)
655 if (ff->ok && ff->buffer_used > 0)
657 if (!full_write (ff->fd, ff->buffer,
658 ff->buffer_size * sizeof *ff->buffer))
659 io_error (ff, _("Error writing temporary file: %s."),
666 /* Opens a disk file for READER and seeks to the current position as indicated
667 by case_idx. Normally the current position is the beginning of the file,
668 but fastfile_to_disk may cause the file to be opened at a different
671 reader_open_file (struct fastfilereader *reader)
673 struct casefile *cf = casereader_get_casefile(&reader->cr);
674 struct fastfile *ff = (struct fastfile *) cf;
675 if (!ff->ok || reader->case_idx >= ff->case_cnt)
685 reader->fd = safe_open (ff->file_name, O_RDONLY);
687 io_error (ff, _("%s: Opening temporary file: %s."),
688 ff->file_name, strerror (errno));
691 if (ff->buffer != NULL)
693 reader->buffer = ff->buffer;
698 reader->buffer = xnmalloc (ff->buffer_size, sizeof *ff->buffer);
699 memset (reader->buffer, 0, ff->buffer_size * sizeof *ff->buffer);
702 case_create (&reader->c, ff->value_cnt);
704 reader->buffer_ofs = -1;
705 reader->file_ofs = -1;
706 seek_and_fill_buffer (reader);
709 /* Seeks the backing file for READER to the proper position and
710 refreshes the buffer contents. */
712 seek_and_fill_buffer (struct fastfilereader *reader)
714 struct casefile *cf = casereader_get_casefile(&reader->cr);
715 struct fastfile *ff = (struct fastfile *) cf;
718 if (ff->value_cnt != 0)
720 size_t buffer_case_cnt = ff->buffer_size / ff->value_cnt;
721 new_ofs = ((off_t) reader->case_idx / buffer_case_cnt
722 * ff->buffer_size * sizeof *ff->buffer);
723 reader->buffer_pos = (reader->case_idx % buffer_case_cnt
728 if (new_ofs != reader->file_ofs)
730 if (lseek (reader->fd, new_ofs, SEEK_SET) != new_ofs)
731 io_error (ff, _("%s: Seeking temporary file: %s."),
732 ff->file_name, strerror (errno));
734 reader->file_ofs = new_ofs;
737 if (ff->case_cnt > 0 && ff->value_cnt > 0 && reader->buffer_ofs != new_ofs)
738 fill_buffer (reader);
741 /* Fills READER's buffer by reading a block from disk. */
743 fill_buffer (struct fastfilereader *reader)
745 struct casefile *cf = casereader_get_casefile(&reader->cr);
746 struct fastfile *ff = (struct fastfile *) cf;
749 int bytes = full_read (reader->fd, reader->buffer,
751 sizeof *reader->buffer);
753 io_error (ff, _("%s: Reading temporary file: %s."),
754 ff->file_name, strerror (errno));
755 else if (bytes != ff->buffer_size * sizeof *reader->buffer)
756 io_error (ff, _("%s: Temporary file ended unexpectedly."),
760 reader->buffer_ofs = reader->file_ofs;
761 reader->file_ofs += bytes;
767 static const struct class_casefile class =
771 fastfile_get_value_cnt,
772 fastfile_get_case_cnt,
782 static const struct class_casereader class_reader =
784 fastfilereader_get_next_case,
786 fastfilereader_destroy,
787 fastfilereader_clone,