1 /* PSPP - computes sample statistics.
2 Copyright (C) 2004, 2006 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 #include "casefile-private.h"
31 #include <libpspp/alloc.h>
33 #include <libpspp/compiler.h>
34 #include <libpspp/message.h>
35 #include "full-read.h"
36 #include "full-write.h"
37 #include <libpspp/misc.h>
38 #include "make-file.h"
43 #define _(msgid) gettext (msgid)
45 #define IO_BUF_SIZE (8192 / sizeof (union value))
47 /* A fastfile represents a sequentially accessible stream of
50 If workspace allows, a fastfile is maintained in memory. If
51 workspace overflows, then the fastfile is pushed to disk. In
52 either case the interface presented to callers is kept the
55 The life cycle of a fastfile consists of up to three phases:
57 1. Writing. The fastfile initially contains no cases. In
58 this phase, any number of cases may be appended to the
59 end of a fastfile. (Cases are never inserted in the
60 middle or before the beginning of a fastfile.)
62 Use casefile_append or casefile_append_xfer to
63 append a case to a fastfile.
65 2. Reading. The fastfile may be read sequentially,
66 starting from the beginning, by "casereaders". Any
67 number of casereaders may be created, at any time,
68 during the reading phase. Each casereader has an
69 independent position in the fastfile.
71 Ordinary casereaders may only move forward. They
72 cannot move backward to arbitrary records or seek
73 randomly. Cloning casereaders is possible, but it is
76 Use casefile_get_reader to create a casereader for
77 use in phase 2. This also transitions from phase 1 to
78 phase 2. Calling fastfile_mode_reader makes the same
79 transition, without creating a casereader.
81 Use casereader_read or casereader_read_xfer to read
82 a case from a casereader. Use casereader_destroy to
83 discard a casereader when it is no longer needed.
85 3. Destruction. This phase is optional. The fastfile is
86 also read with casereaders in this phase, but the
87 ability to create new casereaders is curtailed.
89 In this phase, casereaders could still be cloned.
91 To transition from phase 1 or 2 to phase 3 and create a
92 casereader, call casefile_get_destructive_reader().
93 The same functions apply to the casereader obtained
94 this way as apply to casereaders obtained in phase 2.
96 After casefile_get_destructive_reader is called, no
97 more casereaders may be created. (If cloning of
98 casereaders were implemented, it would still be
101 The purpose of the limitations applied to casereaders
102 in phase 3 is to allow in-memory fastfiles to fully
103 transfer ownership of cases to the casereaders,
104 avoiding the need for extra copies of case data. For
105 relatively static data sets with many variables, I
106 suspect (without evidence) that this may be a big
109 When a fastfile is no longer needed, it may be destroyed with
110 casefile_destroy. This function will also destroy any
111 remaining casereaders. */
113 /* FIXME: should we implement compression? */
115 /* In-memory cases are arranged in an array of arrays. The top
116 level is variable size and the size of each bottom level array
117 is fixed at the number of cases defined here. */
118 #define CASES_PER_BLOCK 128
120 static const struct class_casefile class;
125 struct casefile cf; /* Parent */
127 size_t value_cnt; /* Case size in `union value's. */
128 size_t case_acct_size; /* Case size for accounting. */
129 unsigned long case_cnt; /* Number of cases stored. */
130 enum { MEMORY, DISK } storage; /* Where cases are stored. */
131 enum { WRITE, READ } mode; /* Is writing or reading allowed? */
133 bool ok; /* False after I/O error. */
135 /* Memory storage. */
136 struct ccase **cases; /* Pointer to array of cases. */
139 int fd; /* File descriptor, -1 if none. */
140 char *file_name; /* File name. */
141 union value *buffer; /* I/O buffer, NULL if none. */
142 size_t buffer_used; /* Number of values used in buffer. */
143 size_t buffer_size; /* Buffer size in values. */
147 static const struct class_casereader class_reader;
149 /* For reading out the cases in a fastfile. */
150 struct fastfilereader
152 struct casereader cr; /* Parent */
154 unsigned long case_idx; /* Case number of current case. */
157 int fd; /* File descriptor. */
158 off_t file_ofs; /* Current position in fd. */
159 off_t buffer_ofs; /* File offset of buffer start. */
160 union value *buffer; /* I/O buffer. */
161 size_t buffer_pos; /* Offset of buffer position. */
162 struct ccase c; /* Current case. */
166 static void io_error (struct fastfile *, const char *, ...)
167 PRINTF_FORMAT (2, 3);
168 static int safe_open (const char *file_name, int flags);
169 static int safe_close (int fd);
170 static void write_case_to_disk (struct fastfile *, const struct ccase *);
171 static void flush_buffer (struct fastfile *);
173 static void reader_open_file (struct fastfilereader *);
175 static void seek_and_fill_buffer (struct fastfilereader *);
176 static bool fill_buffer (struct fastfilereader *);
179 /* Number of bytes of case allocated in in-memory fastfiles. */
180 static size_t case_bytes;
182 /* Destroys READER. */
183 static void fastfilereader_destroy (struct casereader *cr)
185 struct fastfilereader *reader = (struct fastfilereader *) cr;
186 struct fastfile *ff = (struct fastfile *) casereader_get_casefile (cr);
188 if (ff->buffer == NULL)
189 ff->buffer = reader->buffer;
191 free (reader->buffer);
193 if (reader->fd != -1)
198 safe_close (reader->fd);
201 case_destroy (&reader->c);
208 /* Return the case number of the current case */
210 fastfilereader_cnum (const struct casereader *cr)
212 const struct fastfilereader *ffr = (const struct fastfilereader *) cr;
213 return ffr->case_idx;
217 /* Returns the next case pointed to by FFR and increments
218 FFR's pointer. Returns NULL if FFR points beyond the last case.
220 static struct ccase *
221 fastfilereader_get_next_case (struct casereader *cr)
223 struct fastfile *ff = (struct fastfile *) casereader_get_casefile (cr);
224 struct fastfilereader *ffr = (struct fastfilereader *) cr;
225 struct ccase *read_case = NULL ;
227 if ( ffr->case_idx >= ff->case_cnt )
230 if (ff->storage == MEMORY )
232 size_t block_idx = ffr->case_idx / CASES_PER_BLOCK;
233 size_t case_idx = ffr->case_idx % CASES_PER_BLOCK;
234 read_case = &ff->cases[block_idx][case_idx];
238 if (ffr->buffer_pos + ff->value_cnt > ff->buffer_size)
240 if (!fill_buffer (ffr))
245 case_from_values (&ffr->c, ffr->buffer + ffr->buffer_pos,
247 ffr->buffer_pos += ff->value_cnt;
256 /* Creates and returns a casereader for CF. A casereader can be used to
257 sequentially read the cases in a fastfile. */
258 static struct casereader *
259 fastfile_get_reader (const struct casefile *cf_)
261 struct casefile *cf = (struct casefile *) cf_;
262 struct fastfilereader *ffr = xzalloc (sizeof *ffr);
263 struct casereader *reader = (struct casereader *) ffr;
264 struct fastfile *ff = (struct fastfile *) cf;
266 assert (!cf->being_destroyed);
268 /* Flush the buffer to disk if it's not empty. */
269 if (ff->mode == WRITE && ff->storage == DISK)
274 casereader_register (cf, reader, &class_reader);
277 reader->destructive = 0;
281 case_nullify (&ffr->c);
283 if (ff->storage == DISK)
284 reader_open_file (ffr);
290 /* Creates a copy of the casereader CR, and returns it */
291 static struct casereader *
292 fastfilereader_clone (const struct casereader *cr)
294 const struct fastfilereader *ffr = (const struct fastfilereader *) cr ;
295 struct fastfilereader *new_ffr = xzalloc (sizeof *new_ffr);
297 struct casereader *new_reader = (struct casereader *) new_ffr;
299 struct casefile *cf = casereader_get_casefile (cr);
300 struct fastfile *ff = (struct fastfile *) cf;
302 assert (!cf->being_destroyed);
304 /* Flush the buffer to disk if it's not empty. */
305 if (ff->mode == WRITE && ff->storage == DISK)
310 casereader_register (cf, new_reader, &class_reader);
312 new_ffr->case_idx = ffr->case_idx ;
313 new_reader->destructive = cr->destructive;
314 new_ffr->fd = ffr->fd ;
315 new_ffr->buffer = ffr->buffer ;
316 new_ffr->buffer_pos = ffr->buffer_pos;
318 if (ff->storage == DISK)
319 reader_open_file (new_ffr);
327 /* Returns the number of `union value's in a case for CF. */
329 fastfile_get_value_cnt (const struct casefile *cf)
331 const struct fastfile *ff = (const struct fastfile *) cf;
332 return ff->value_cnt;
335 /* Appends a copy of case C to fastfile CF. Not valid after any
336 reader for CF has been created.
337 Returns true if successful, false if an I/O error occurred. */
339 fastfile_append (struct casefile *cf, const struct ccase *c)
341 struct fastfile *ff = (struct fastfile *) cf;
342 assert (ff->mode == WRITE);
345 /* Try memory first. */
346 if (ff->storage == MEMORY)
348 if (case_bytes < get_workspace ())
350 size_t block_idx = ff->case_cnt / CASES_PER_BLOCK;
351 size_t case_idx = ff->case_cnt % CASES_PER_BLOCK;
352 struct ccase new_case;
354 case_bytes += ff->case_acct_size;
355 case_clone (&new_case, c);
358 if ((block_idx & (block_idx - 1)) == 0)
360 size_t block_cap = block_idx == 0 ? 1 : block_idx * 2;
361 ff->cases = xnrealloc (ff->cases,
362 block_cap, sizeof *ff->cases);
365 ff->cases[block_idx] = xnmalloc (CASES_PER_BLOCK,
369 case_move (&ff->cases[block_idx][case_idx], &new_case);
373 casefile_to_disk (cf);
374 assert (ff->storage == DISK);
375 write_case_to_disk (ff, c);
379 write_case_to_disk (ff, c);
386 /* Returns the number of cases in fastfile CF. */
388 fastfile_get_case_cnt (const struct casefile *cf)
390 const struct fastfile *ff = (const struct fastfile *) cf;
395 /* Returns true only if fastfile CF is stored in memory (instead of on
396 disk), false otherwise. */
398 fastfile_in_core (const struct casefile *cf)
400 const struct fastfile *ff = (const struct fastfile *) cf;
401 return (ff->storage == MEMORY);
405 /* If CF is currently stored in memory, writes it to disk. Readers, if any,
406 retain their current positions.
407 Returns true if successful, false if an I/O error occurred. */
409 fastfile_to_disk (const struct casefile *cf_)
411 struct fastfile *ff = (struct fastfile *) cf_;
412 struct casefile *cf = &ff->cf;
414 if (ff->storage == MEMORY)
416 size_t idx, block_cnt;
417 struct casereader *reader;
419 assert (ff->file_name == NULL);
420 assert (ff->fd == -1);
421 assert (ff->buffer_used == 0);
423 if (!make_temp_file (&ff->fd, &ff->file_name))
430 ff->buffer = xnmalloc (ff->buffer_size, sizeof *ff->buffer);
431 memset (ff->buffer, 0, ff->buffer_size * sizeof *ff->buffer);
433 case_bytes -= ff->case_cnt * ff->case_acct_size;
434 for (idx = 0; idx < ff->case_cnt; idx++)
436 size_t block_idx = idx / CASES_PER_BLOCK;
437 size_t case_idx = idx % CASES_PER_BLOCK;
438 struct ccase *c = &ff->cases[block_idx][case_idx];
439 write_case_to_disk (ff, c);
443 block_cnt = DIV_RND_UP (ff->case_cnt, CASES_PER_BLOCK);
444 for (idx = 0; idx < block_cnt; idx++)
445 free (ff->cases[idx]);
450 if (ff->mode == READ)
453 ll_for_each (reader, struct casereader, ll, &cf->reader_list)
454 reader_open_file ((struct fastfilereader *) reader);
460 /* Puts a fastfile to "sleep", that is, minimizes the resources
461 needed for it by closing its file descriptor and freeing its
462 buffer. This is useful if we need so many fastfiles that we
463 might not have enough memory and file descriptors to go
466 For simplicity, this implementation always converts the
467 fastfile to reader mode. If this turns out to be a problem,
468 with a little extra work we could also support sleeping
471 Returns true if successful, false if an I/O error occurred. */
473 fastfile_sleep (const struct casefile *cf_)
475 struct fastfile *ff = (struct fastfile *) cf_;
476 struct casefile *cf = &ff->cf;
478 fastfile_to_disk (cf);
486 if (ff->buffer != NULL)
496 /* Returns true if an I/O error has occurred in fastfile CF. */
498 fastfile_error (const struct casefile *cf)
500 const struct fastfile *ff = (const struct fastfile *) cf;
504 /* Destroys fastfile CF. */
506 fastfile_destroy (struct casefile *cf)
508 struct fastfile *ff = (struct fastfile *) cf;
512 if (ff->cases != NULL)
514 size_t idx, block_cnt;
516 case_bytes -= ff->case_cnt * ff->case_acct_size;
517 for (idx = 0; idx < ff->case_cnt; idx++)
519 size_t block_idx = idx / CASES_PER_BLOCK;
520 size_t case_idx = idx % CASES_PER_BLOCK;
521 struct ccase *c = &ff->cases[block_idx][case_idx];
525 block_cnt = DIV_RND_UP (ff->case_cnt, CASES_PER_BLOCK);
526 for (idx = 0; idx < block_cnt; idx++)
527 free (ff->cases[idx]);
535 if (ff->file_name != NULL && remove (ff->file_name) == -1)
536 io_error (ff, _("%s: Removing temporary file: %s."),
537 ff->file_name, strerror (errno));
538 free (ff->file_name);
547 /* Creates and returns a fastfile to store cases of VALUE_CNT
548 `union value's each. */
550 fastfile_create (size_t value_cnt)
552 struct fastfile *ff = xzalloc (sizeof *ff);
553 struct casefile *cf = &ff->cf;
555 casefile_register (cf, &class);
557 ff->value_cnt = value_cnt;
558 ff->case_acct_size = (ff->value_cnt + 4) * sizeof *ff->buffer;
560 ff->storage = MEMORY;
562 cf->being_destroyed = false;
566 ff->file_name = NULL;
568 ff->buffer_size = ROUND_UP (ff->value_cnt, IO_BUF_SIZE);
569 if (ff->value_cnt > 0 && ff->buffer_size % ff->value_cnt > 64)
570 ff->buffer_size = ff->value_cnt;
578 /* Marks FF as having encountered an I/O error.
579 If this is the first error on CF, reports FORMAT to the user,
580 doing printf()-style substitutions. */
582 io_error (struct fastfile *ff, const char *format, ...)
589 m.category = MSG_GENERAL;
590 m.severity = MSG_ERROR;
591 m.where.file_name = NULL;
592 m.where.line_number = -1;
593 va_start (args, format);
594 m.text = xvasprintf (format, args);
602 /* Calls open(), passing FILE_NAME and FLAGS, repeating as necessary
603 to deal with interrupted calls. */
605 safe_open (const char *file_name, int flags)
611 fd = open (file_name, flags);
613 while (fd == -1 && errno == EINTR);
618 /* Calls close(), passing FD, repeating as necessary to deal with
619 interrupted calls. */
629 while (retval == -1 && errno == EINTR);
635 /* Writes case C to fastfile CF's disk buffer, first flushing the buffer to
636 disk if it would otherwise overflow.
637 Returns true if successful, false if an I/O error occurred. */
639 write_case_to_disk (struct fastfile *ff, const struct ccase *c)
644 case_to_values (c, ff->buffer + ff->buffer_used, ff->value_cnt);
645 ff->buffer_used += ff->value_cnt;
646 if (ff->buffer_used + ff->value_cnt > ff->buffer_size)
651 /* If any bytes in FF's output buffer are used, flush them to
654 flush_buffer (struct fastfile *ff)
656 if (ff->ok && ff->buffer_used > 0)
658 if (!full_write (ff->fd, ff->buffer,
659 ff->buffer_size * sizeof *ff->buffer))
660 io_error (ff, _("Error writing temporary file: %s."),
667 /* Opens a disk file for READER and seeks to the current position as indicated
668 by case_idx. Normally the current position is the beginning of the file,
669 but fastfile_to_disk may cause the file to be opened at a different
672 reader_open_file (struct fastfilereader *reader)
674 struct casefile *cf = casereader_get_casefile(&reader->cr);
675 struct fastfile *ff = (struct fastfile *) cf;
676 if (!ff->ok || reader->case_idx >= ff->case_cnt)
686 reader->fd = safe_open (ff->file_name, O_RDONLY);
688 io_error (ff, _("%s: Opening temporary file: %s."),
689 ff->file_name, strerror (errno));
692 if (ff->buffer != NULL)
694 reader->buffer = ff->buffer;
699 reader->buffer = xnmalloc (ff->buffer_size, sizeof *ff->buffer);
700 memset (reader->buffer, 0, ff->buffer_size * sizeof *ff->buffer);
703 case_create (&reader->c, ff->value_cnt);
705 reader->buffer_ofs = -1;
706 reader->file_ofs = -1;
707 seek_and_fill_buffer (reader);
710 /* Seeks the backing file for READER to the proper position and
711 refreshes the buffer contents. */
713 seek_and_fill_buffer (struct fastfilereader *reader)
715 struct casefile *cf = casereader_get_casefile(&reader->cr);
716 struct fastfile *ff = (struct fastfile *) cf;
719 if (ff->value_cnt != 0)
721 size_t buffer_case_cnt = ff->buffer_size / ff->value_cnt;
722 new_ofs = ((off_t) reader->case_idx / buffer_case_cnt
723 * ff->buffer_size * sizeof *ff->buffer);
724 reader->buffer_pos = (reader->case_idx % buffer_case_cnt
729 if (new_ofs != reader->file_ofs)
731 if (lseek (reader->fd, new_ofs, SEEK_SET) != new_ofs)
732 io_error (ff, _("%s: Seeking temporary file: %s."),
733 ff->file_name, strerror (errno));
735 reader->file_ofs = new_ofs;
738 if (ff->case_cnt > 0 && ff->value_cnt > 0 && reader->buffer_ofs != new_ofs)
739 fill_buffer (reader);
742 /* Fills READER's buffer by reading a block from disk. */
744 fill_buffer (struct fastfilereader *reader)
746 struct casefile *cf = casereader_get_casefile(&reader->cr);
747 struct fastfile *ff = (struct fastfile *) cf;
750 int bytes = full_read (reader->fd, reader->buffer,
752 sizeof *reader->buffer);
754 io_error (ff, _("%s: Reading temporary file: %s."),
755 ff->file_name, strerror (errno));
756 else if (bytes != ff->buffer_size * sizeof *reader->buffer)
757 io_error (ff, _("%s: Temporary file ended unexpectedly."),
761 reader->buffer_ofs = reader->file_ofs;
762 reader->file_ofs += bytes;
768 static const struct class_casefile class =
772 fastfile_get_value_cnt,
773 fastfile_get_case_cnt,
783 static const struct class_casereader class_reader =
785 fastfilereader_get_next_case,
787 fastfilereader_destroy,
788 fastfilereader_clone,