1 /* PSPP - computes sample statistics.
2 Copyright (C) 2004, 2006, 2007 Free Software Foundation, Inc.
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License as
6 published by the Free Software Foundation; either version 2 of the
7 License, or (at your option) any later version.
9 This program is distributed in the hope that it will be useful, but
10 WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 #include "casefile-private.h"
33 #include <data/case.h>
34 #include <data/make-file.h>
35 #include <data/settings.h>
36 #include <data/variable.h>
37 #include <libpspp/alloc.h>
38 #include <libpspp/compiler.h>
39 #include <libpspp/message.h>
40 #include <libpspp/misc.h>
41 #include <libpspp/str.h>
43 #include "full-read.h"
44 #include "full-write.h"
47 #define _(msgid) gettext (msgid)
49 #define IO_BUF_SIZE (8192 / sizeof (union value))
51 /* A fastfile represents a sequentially accessible stream of
54 If workspace allows, a fastfile is maintained in memory. If
55 workspace overflows, then the fastfile is pushed to disk. In
56 either case the interface presented to callers is kept the
59 The life cycle of a fastfile consists of up to three phases:
61 1. Writing. The fastfile initially contains no cases. In
62 this phase, any number of cases may be appended to the
63 end of a fastfile. (Cases are never inserted in the
64 middle or before the beginning of a fastfile.)
66 Use casefile_append or casefile_append_xfer to
67 append a case to a fastfile.
69 2. Reading. The fastfile may be read sequentially,
70 starting from the beginning, by "casereaders". Any
71 number of casereaders may be created, at any time,
72 during the reading phase. Each casereader has an
73 independent position in the fastfile.
75 Ordinary casereaders may only move forward. They
76 cannot move backward to arbitrary records or seek
77 randomly. Cloning casereaders is possible, but it is
80 Use casefile_get_reader to create a casereader for
81 use in phase 2. This also transitions from phase 1 to
82 phase 2. Calling fastfile_mode_reader makes the same
83 transition, without creating a casereader.
85 Use casereader_read or casereader_read_xfer to read
86 a case from a casereader. Use casereader_destroy to
87 discard a casereader when it is no longer needed.
89 3. Destruction. This phase is optional. The fastfile is
90 also read with casereaders in this phase, but the
91 ability to create new casereaders is curtailed.
93 In this phase, casereaders could still be cloned.
95 To transition from phase 1 or 2 to phase 3 and create a
96 casereader, call casefile_get_destructive_reader().
97 The same functions apply to the casereader obtained
98 this way as apply to casereaders obtained in phase 2.
100 After casefile_get_destructive_reader is called, no
101 more casereaders may be created. (If cloning of
102 casereaders were implemented, it would still be
105 The purpose of the limitations applied to casereaders
106 in phase 3 is to allow in-memory fastfiles to fully
107 transfer ownership of cases to the casereaders,
108 avoiding the need for extra copies of case data. For
109 relatively static data sets with many variables, I
110 suspect (without evidence) that this may be a big
113 When a fastfile is no longer needed, it may be destroyed with
114 casefile_destroy. This function will also destroy any
115 remaining casereaders. */
117 /* FIXME: should we implement compression? */
119 /* In-memory cases are arranged in an array of arrays. The top
120 level is variable size and the size of each bottom level array
121 is fixed at the number of cases defined here. */
122 #define CASES_PER_BLOCK 128
124 static const struct class_casefile class;
129 struct casefile cf; /* Parent */
131 size_t value_cnt; /* Case size in `union value's. */
132 size_t case_acct_size; /* Case size for accounting. */
133 unsigned long case_cnt; /* Number of cases stored. */
134 enum { MEMORY, DISK } storage; /* Where cases are stored. */
135 enum { WRITE, READ } mode; /* Is writing or reading allowed? */
137 bool ok; /* False after I/O error. */
139 /* Memory storage. */
140 struct ccase **cases; /* Pointer to array of cases. */
143 int fd; /* File descriptor, -1 if none. */
144 char *file_name; /* File name. */
145 union value *buffer; /* I/O buffer, NULL if none. */
146 size_t buffer_used; /* Number of values used in buffer. */
147 size_t buffer_size; /* Buffer size in values. */
151 static const struct class_casereader class_reader;
153 /* For reading out the cases in a fastfile. */
154 struct fastfilereader
156 struct casereader cr; /* Parent */
158 unsigned long case_idx; /* Case number of current case. */
161 int fd; /* File descriptor. */
162 off_t file_ofs; /* Current position in fd. */
163 off_t buffer_ofs; /* File offset of buffer start. */
164 union value *buffer; /* I/O buffer. */
165 size_t buffer_pos; /* Offset of buffer position. */
166 struct ccase c; /* Current case. */
170 static void io_error (struct fastfile *, const char *, ...)
171 PRINTF_FORMAT (2, 3);
172 static int safe_open (const char *file_name, int flags);
173 static int safe_close (int fd);
174 static void write_case_to_disk (struct fastfile *, const struct ccase *);
175 static void flush_buffer (struct fastfile *);
177 static void reader_open_file (struct fastfilereader *);
179 static void seek_and_fill_buffer (struct fastfilereader *);
180 static bool fill_buffer (struct fastfilereader *);
183 /* Number of bytes of case allocated in in-memory fastfiles. */
184 static size_t case_bytes;
186 /* Destroys READER. */
187 static void fastfilereader_destroy (struct casereader *cr)
189 struct fastfilereader *reader = (struct fastfilereader *) cr;
190 struct fastfile *ff = (struct fastfile *) casereader_get_casefile (cr);
192 if (ff->buffer == NULL)
193 ff->buffer = reader->buffer;
195 free (reader->buffer);
197 if (reader->fd != -1)
202 safe_close (reader->fd);
205 case_destroy (&reader->c);
212 /* Return the case number of the current case */
214 fastfilereader_cnum (const struct casereader *cr)
216 const struct fastfilereader *ffr = (const struct fastfilereader *) cr;
217 return ffr->case_idx;
221 /* Returns the next case pointed to by FFR and increments
222 FFR's pointer. Returns NULL if FFR points beyond the last case.
224 static struct ccase *
225 fastfilereader_get_next_case (struct casereader *cr)
227 struct fastfile *ff = (struct fastfile *) casereader_get_casefile (cr);
228 struct fastfilereader *ffr = (struct fastfilereader *) cr;
229 struct ccase *read_case = NULL ;
231 if ( ffr->case_idx >= ff->case_cnt )
234 if (ff->storage == MEMORY )
236 size_t block_idx = ffr->case_idx / CASES_PER_BLOCK;
237 size_t case_idx = ffr->case_idx % CASES_PER_BLOCK;
238 read_case = &ff->cases[block_idx][case_idx];
242 if (ffr->buffer_pos + ff->value_cnt > ff->buffer_size)
244 if (!fill_buffer (ffr))
249 case_copy_in (&ffr->c, 0, ffr->buffer + ffr->buffer_pos, ff->value_cnt);
250 ffr->buffer_pos += ff->value_cnt;
259 /* Creates and returns a casereader for CF. A casereader can be used to
260 sequentially read the cases in a fastfile. */
261 static struct casereader *
262 fastfile_get_reader (const struct casefile *cf_)
264 struct casefile *cf = (struct casefile *) cf_;
265 struct fastfilereader *ffr = xzalloc (sizeof *ffr);
266 struct casereader *reader = (struct casereader *) ffr;
267 struct fastfile *ff = (struct fastfile *) cf;
269 assert (!cf->being_destroyed);
271 /* Flush the buffer to disk if it's not empty. */
272 if (ff->mode == WRITE && ff->storage == DISK)
277 casereader_register (cf, reader, &class_reader);
280 reader->destructive = 0;
284 case_nullify (&ffr->c);
286 if (ff->storage == DISK)
287 reader_open_file (ffr);
293 /* Creates a copy of the casereader CR, and returns it */
294 static struct casereader *
295 fastfilereader_clone (const struct casereader *cr)
297 const struct fastfilereader *ffr = (const struct fastfilereader *) cr ;
298 struct fastfilereader *new_ffr = xzalloc (sizeof *new_ffr);
300 struct casereader *new_reader = (struct casereader *) new_ffr;
302 struct casefile *cf = casereader_get_casefile (cr);
303 struct fastfile *ff = (struct fastfile *) cf;
305 assert (!cf->being_destroyed);
307 /* Flush the buffer to disk if it's not empty. */
308 if (ff->mode == WRITE && ff->storage == DISK)
313 casereader_register (cf, new_reader, &class_reader);
315 new_ffr->case_idx = ffr->case_idx ;
316 new_reader->destructive = cr->destructive;
317 new_ffr->fd = ffr->fd ;
318 new_ffr->buffer = ffr->buffer ;
319 new_ffr->buffer_pos = ffr->buffer_pos;
321 if (ff->storage == DISK)
322 reader_open_file (new_ffr);
330 /* Returns the number of `union value's in a case for CF. */
332 fastfile_get_value_cnt (const struct casefile *cf)
334 const struct fastfile *ff = (const struct fastfile *) cf;
335 return ff->value_cnt;
338 /* Appends a copy of case C to fastfile CF. Not valid after any
339 reader for CF has been created.
340 Returns true if successful, false if an I/O error occurred. */
342 fastfile_append (struct casefile *cf, const struct ccase *c)
344 struct fastfile *ff = (struct fastfile *) cf;
345 assert (ff->mode == WRITE);
348 /* Try memory first. */
349 if (ff->storage == MEMORY)
351 if (case_bytes < get_workspace ())
353 size_t block_idx = ff->case_cnt / CASES_PER_BLOCK;
354 size_t case_idx = ff->case_cnt % CASES_PER_BLOCK;
355 struct ccase new_case;
357 case_bytes += ff->case_acct_size;
358 case_clone (&new_case, c);
361 if ((block_idx & (block_idx - 1)) == 0)
363 size_t block_cap = block_idx == 0 ? 1 : block_idx * 2;
364 ff->cases = xnrealloc (ff->cases,
365 block_cap, sizeof *ff->cases);
368 ff->cases[block_idx] = xnmalloc (CASES_PER_BLOCK,
372 case_move (&ff->cases[block_idx][case_idx], &new_case);
376 casefile_to_disk (cf);
377 assert (ff->storage == DISK);
378 write_case_to_disk (ff, c);
382 write_case_to_disk (ff, c);
389 /* Returns the number of cases in fastfile CF. */
391 fastfile_get_case_cnt (const struct casefile *cf)
393 const struct fastfile *ff = (const struct fastfile *) cf;
398 /* Returns true only if fastfile CF is stored in memory (instead of on
399 disk), false otherwise. */
401 fastfile_in_core (const struct casefile *cf)
403 const struct fastfile *ff = (const struct fastfile *) cf;
404 return (ff->storage == MEMORY);
408 /* If CF is currently stored in memory, writes it to disk. Readers, if any,
409 retain their current positions.
410 Returns true if successful, false if an I/O error occurred. */
412 fastfile_to_disk (const struct casefile *cf_)
414 struct fastfile *ff = (struct fastfile *) cf_;
415 struct casefile *cf = &ff->cf;
417 if (ff->storage == MEMORY)
419 size_t idx, block_cnt;
420 struct casereader *reader;
422 assert (ff->file_name == NULL);
423 assert (ff->fd == -1);
424 assert (ff->buffer_used == 0);
426 if (!make_temp_file (&ff->fd, &ff->file_name))
433 ff->buffer = xnmalloc (ff->buffer_size, sizeof *ff->buffer);
434 memset (ff->buffer, 0, ff->buffer_size * sizeof *ff->buffer);
436 case_bytes -= ff->case_cnt * ff->case_acct_size;
437 for (idx = 0; idx < ff->case_cnt; idx++)
439 size_t block_idx = idx / CASES_PER_BLOCK;
440 size_t case_idx = idx % CASES_PER_BLOCK;
441 struct ccase *c = &ff->cases[block_idx][case_idx];
442 write_case_to_disk (ff, c);
446 block_cnt = DIV_RND_UP (ff->case_cnt, CASES_PER_BLOCK);
447 for (idx = 0; idx < block_cnt; idx++)
448 free (ff->cases[idx]);
453 if (ff->mode == READ)
456 ll_for_each (reader, struct casereader, ll, &cf->reader_list)
457 reader_open_file ((struct fastfilereader *) reader);
463 /* Puts a fastfile to "sleep", that is, minimizes the resources
464 needed for it by closing its file descriptor and freeing its
465 buffer. This is useful if we need so many fastfiles that we
466 might not have enough memory and file descriptors to go
469 For simplicity, this implementation always converts the
470 fastfile to reader mode. If this turns out to be a problem,
471 with a little extra work we could also support sleeping
474 Returns true if successful, false if an I/O error occurred. */
476 fastfile_sleep (const struct casefile *cf_)
478 struct fastfile *ff = (struct fastfile *) cf_;
479 struct casefile *cf = &ff->cf;
481 fastfile_to_disk (cf);
489 if (ff->buffer != NULL)
499 /* Returns true if an I/O error has occurred in fastfile CF. */
501 fastfile_error (const struct casefile *cf)
503 const struct fastfile *ff = (const struct fastfile *) cf;
507 /* Destroys fastfile CF. */
509 fastfile_destroy (struct casefile *cf)
511 struct fastfile *ff = (struct fastfile *) cf;
515 if (ff->cases != NULL)
517 size_t idx, block_cnt;
519 case_bytes -= ff->case_cnt * ff->case_acct_size;
520 for (idx = 0; idx < ff->case_cnt; idx++)
522 size_t block_idx = idx / CASES_PER_BLOCK;
523 size_t case_idx = idx % CASES_PER_BLOCK;
524 struct ccase *c = &ff->cases[block_idx][case_idx];
528 block_cnt = DIV_RND_UP (ff->case_cnt, CASES_PER_BLOCK);
529 for (idx = 0; idx < block_cnt; idx++)
530 free (ff->cases[idx]);
538 if (ff->file_name != NULL && remove (ff->file_name) == -1)
539 io_error (ff, _("%s: Removing temporary file: %s."),
540 ff->file_name, strerror (errno));
541 free (ff->file_name);
550 /* Creates and returns a fastfile to store cases of VALUE_CNT
551 `union value's each. */
553 fastfile_create (size_t value_cnt)
555 struct fastfile *ff = xzalloc (sizeof *ff);
556 struct casefile *cf = &ff->cf;
558 casefile_register (cf, &class);
560 ff->value_cnt = value_cnt;
561 ff->case_acct_size = (ff->value_cnt + 4) * sizeof *ff->buffer;
563 ff->storage = MEMORY;
565 cf->being_destroyed = false;
569 ff->file_name = NULL;
571 ff->buffer_size = ROUND_UP (ff->value_cnt, IO_BUF_SIZE);
572 if (ff->value_cnt > 0 && ff->buffer_size % ff->value_cnt > 64)
573 ff->buffer_size = ff->value_cnt;
581 /* Marks FF as having encountered an I/O error.
582 If this is the first error on CF, reports FORMAT to the user,
583 doing printf()-style substitutions. */
585 io_error (struct fastfile *ff, const char *format, ...)
592 m.category = MSG_GENERAL;
593 m.severity = MSG_ERROR;
594 m.where.file_name = NULL;
595 m.where.line_number = -1;
596 va_start (args, format);
597 m.text = xvasprintf (format, args);
605 /* Calls open(), passing FILE_NAME and FLAGS, repeating as necessary
606 to deal with interrupted calls. */
608 safe_open (const char *file_name, int flags)
614 fd = open (file_name, flags);
616 while (fd == -1 && errno == EINTR);
621 /* Calls close(), passing FD, repeating as necessary to deal with
622 interrupted calls. */
632 while (retval == -1 && errno == EINTR);
638 /* Writes case C to fastfile CF's disk buffer, first flushing the buffer to
639 disk if it would otherwise overflow.
640 Returns true if successful, false if an I/O error occurred. */
642 write_case_to_disk (struct fastfile *ff, const struct ccase *c)
647 case_copy_out (c, 0, ff->buffer + ff->buffer_used, ff->value_cnt);
648 ff->buffer_used += ff->value_cnt;
649 if (ff->buffer_used + ff->value_cnt > ff->buffer_size)
654 /* If any bytes in FF's output buffer are used, flush them to
657 flush_buffer (struct fastfile *ff)
659 if (ff->ok && ff->buffer_used > 0)
661 if (!full_write (ff->fd, ff->buffer,
662 ff->buffer_size * sizeof *ff->buffer))
663 io_error (ff, _("Error writing temporary file: %s."),
670 /* Opens a disk file for READER and seeks to the current position as indicated
671 by case_idx. Normally the current position is the beginning of the file,
672 but fastfile_to_disk may cause the file to be opened at a different
675 reader_open_file (struct fastfilereader *reader)
677 struct casefile *cf = casereader_get_casefile(&reader->cr);
678 struct fastfile *ff = (struct fastfile *) cf;
679 if (!ff->ok || reader->case_idx >= ff->case_cnt)
689 reader->fd = safe_open (ff->file_name, O_RDONLY);
691 io_error (ff, _("%s: Opening temporary file: %s."),
692 ff->file_name, strerror (errno));
695 if (ff->buffer != NULL)
697 reader->buffer = ff->buffer;
702 reader->buffer = xnmalloc (ff->buffer_size, sizeof *ff->buffer);
703 memset (reader->buffer, 0, ff->buffer_size * sizeof *ff->buffer);
706 case_create (&reader->c, ff->value_cnt);
708 reader->buffer_ofs = -1;
709 reader->file_ofs = -1;
710 seek_and_fill_buffer (reader);
713 /* Seeks the backing file for READER to the proper position and
714 refreshes the buffer contents. */
716 seek_and_fill_buffer (struct fastfilereader *reader)
718 struct casefile *cf = casereader_get_casefile(&reader->cr);
719 struct fastfile *ff = (struct fastfile *) cf;
722 if (ff->value_cnt != 0)
724 size_t buffer_case_cnt = ff->buffer_size / ff->value_cnt;
725 new_ofs = ((off_t) reader->case_idx / buffer_case_cnt
726 * ff->buffer_size * sizeof *ff->buffer);
727 reader->buffer_pos = (reader->case_idx % buffer_case_cnt
732 if (new_ofs != reader->file_ofs)
734 if (lseek (reader->fd, new_ofs, SEEK_SET) != new_ofs)
735 io_error (ff, _("%s: Seeking temporary file: %s."),
736 ff->file_name, strerror (errno));
738 reader->file_ofs = new_ofs;
741 if (ff->case_cnt > 0 && ff->value_cnt > 0 && reader->buffer_ofs != new_ofs)
742 fill_buffer (reader);
745 /* Fills READER's buffer by reading a block from disk. */
747 fill_buffer (struct fastfilereader *reader)
749 struct casefile *cf = casereader_get_casefile(&reader->cr);
750 struct fastfile *ff = (struct fastfile *) cf;
753 int bytes = full_read (reader->fd, reader->buffer,
755 sizeof *reader->buffer);
757 io_error (ff, _("%s: Reading temporary file: %s."),
758 ff->file_name, strerror (errno));
759 else if (bytes != ff->buffer_size * sizeof *reader->buffer)
760 io_error (ff, _("%s: Temporary file ended unexpectedly."),
764 reader->buffer_ofs = reader->file_ofs;
765 reader->file_ofs += bytes;
771 static const struct class_casefile class =
775 fastfile_get_value_cnt,
776 fastfile_get_case_cnt,
786 static const struct class_casereader class_reader =
788 fastfilereader_get_next_case,
790 fastfilereader_destroy,
791 fastfilereader_clone,