1 /* PSPP - computes sample statistics.
2 Copyright (C) 2004 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
29 #include <libpspp/alloc.h>
31 #include <libpspp/compiler.h>
32 #include <libpspp/message.h>
33 #include "full-read.h"
34 #include "full-write.h"
35 #include <libpspp/misc.h>
36 #include "make-file.h"
41 #define _(msgid) gettext (msgid)
43 #define IO_BUF_SIZE (8192 / sizeof (union value))
45 /* A casefile represents a sequentially accessible stream of
48 If workspace allows, a casefile is maintained in memory. If
49 workspace overflows, then the casefile is pushed to disk. In
50 either case the interface presented to callers is kept the
53 The life cycle of a casefile consists of up to three phases:
55 1. Writing. The casefile initially contains no cases. In
56 this phase, any number of cases may be appended to the
57 end of a casefile. (Cases are never inserted in the
58 middle or before the beginning of a casefile.)
60 Use casefile_append() or casefile_append_xfer() to
61 append a case to a casefile.
63 2. Reading. The casefile may be read sequentially,
64 starting from the beginning, by "casereaders". Any
65 number of casereaders may be created, at any time,
66 during the reading phase. Each casereader has an
67 independent position in the casefile.
69 Casereaders may only move forward. They cannot move
70 backward to arbitrary records or seek randomly.
71 Cloning casereaders is possible, but it is not yet
74 Use casefile_get_reader() to create a casereader for
75 use in phase 2. This also transitions from phase 1 to
76 phase 2. Calling casefile_mode_reader() makes the same
77 transition, without creating a casereader.
79 Use casereader_read() or casereader_read_xfer() to read
80 a case from a casereader. Use casereader_destroy() to
81 discard a casereader when it is no longer needed.
83 3. Destruction. This phase is optional. The casefile is
84 also read with casereaders in this phase, but the
85 ability to create new casereaders is curtailed.
87 In this phase, casereaders could still be cloned (once
88 we eventually implement cloning).
90 To transition from phase 1 or 2 to phase 3 and create a
91 casereader, call casefile_get_destructive_reader().
92 The same functions apply to the casereader obtained
93 this way as apply to casereaders obtained in phase 2.
95 After casefile_get_destructive_reader() is called, no
96 more casereaders may be created with
97 casefile_get_reader() or
98 casefile_get_destructive_reader(). (If cloning of
99 casereaders were implemented, it would still be
102 The purpose of the limitations applied to casereaders
103 in phase 3 is to allow in-memory casefiles to fully
104 transfer ownership of cases to the casereaders,
105 avoiding the need for extra copies of case data. For
106 relatively static data sets with many variables, I
107 suspect (without evidence) that this may be a big
110 When a casefile is no longer needed, it may be destroyed with
111 casefile_destroy(). This function will also destroy any
112 remaining casereaders. */
114 /* FIXME: should we implement compression? */
116 /* In-memory cases are arranged in an array of arrays. The top
117 level is variable size and the size of each bottom level array
118 is fixed at the number of cases defined here. */
119 #define CASES_PER_BLOCK 128
125 struct casefile *next, *prev; /* Next, prev in global list. */
126 size_t value_cnt; /* Case size in `union value's. */
127 size_t case_acct_size; /* Case size for accounting. */
128 unsigned long case_cnt; /* Number of cases stored. */
129 enum { MEMORY, DISK } storage; /* Where cases are stored. */
130 enum { WRITE, READ } mode; /* Is writing or reading allowed? */
131 struct casereader *readers; /* List of our readers. */
132 bool being_destroyed; /* Does a destructive reader exist? */
133 bool ok; /* False after I/O error. */
135 /* Memory storage. */
136 struct ccase **cases; /* Pointer to array of cases. */
139 int fd; /* File descriptor, -1 if none. */
140 char *file_name; /* File name. */
141 union value *buffer; /* I/O buffer, NULL if none. */
142 size_t buffer_used; /* Number of values used in buffer. */
143 size_t buffer_size; /* Buffer size in values. */
146 /* For reading out the cases in a casefile. */
149 struct casereader *next, *prev; /* Next, prev in casefile's list. */
150 struct casefile *cf; /* Our casefile. */
151 unsigned long case_idx; /* Case number of current case. */
152 bool destructive; /* Is this a destructive reader? */
153 bool random; /* Is this a random reader? */
156 int fd; /* File descriptor. */
157 off_t file_ofs; /* Current position in fd. */
158 off_t buffer_ofs; /* File offset of buffer start. */
159 union value *buffer; /* I/O buffer. */
160 size_t buffer_pos; /* Offset of buffer position. */
161 struct ccase c; /* Current case. */
164 /* Return the case number of the current case */
166 casereader_cnum(const struct casereader *r)
171 /* Doubly linked list of all casefiles. */
172 static struct casefile *casefiles;
174 /* Number of bytes of case allocated in in-memory casefiles. */
175 static size_t case_bytes;
177 static void register_atexit (void);
178 static void exit_handler (void);
180 static void reader_open_file (struct casereader *);
181 static void write_case_to_disk (struct casefile *, const struct ccase *);
182 static void flush_buffer (struct casefile *);
183 static void seek_and_fill_buffer (struct casereader *);
184 static bool fill_buffer (struct casereader *);
186 static void io_error (struct casefile *, const char *, ...)
187 PRINTF_FORMAT (2, 3);
188 static int safe_open (const char *file_name, int flags);
189 static int safe_close (int fd);
191 /* Creates and returns a casefile to store cases of VALUE_CNT
192 `union value's each. */
194 casefile_create (size_t value_cnt)
196 struct casefile *cf = xmalloc (sizeof *cf);
197 cf->next = casefiles;
199 if (cf->next != NULL)
202 cf->value_cnt = value_cnt;
203 cf->case_acct_size = (cf->value_cnt + 4) * sizeof *cf->buffer;
205 cf->storage = MEMORY;
208 cf->being_destroyed = 0;
212 cf->file_name = NULL;
214 cf->buffer_size = ROUND_UP (cf->value_cnt, IO_BUF_SIZE);
215 if (cf->value_cnt > 0 && cf->buffer_size % cf->value_cnt > 64)
216 cf->buffer_size = cf->value_cnt;
222 /* Destroys casefile CF. */
224 casefile_destroy (struct casefile *cf)
228 if (cf->next != NULL)
229 cf->next->prev = cf->prev;
230 if (cf->prev != NULL)
231 cf->prev->next = cf->next;
233 casefiles = cf->next;
235 while (cf->readers != NULL)
236 casereader_destroy (cf->readers);
238 if (cf->cases != NULL)
240 size_t idx, block_cnt;
242 case_bytes -= cf->case_cnt * cf->case_acct_size;
243 for (idx = 0; idx < cf->case_cnt; idx++)
245 size_t block_idx = idx / CASES_PER_BLOCK;
246 size_t case_idx = idx % CASES_PER_BLOCK;
247 struct ccase *c = &cf->cases[block_idx][case_idx];
251 block_cnt = DIV_RND_UP (cf->case_cnt, CASES_PER_BLOCK);
252 for (idx = 0; idx < block_cnt; idx++)
253 free (cf->cases[idx]);
261 if (cf->file_name != NULL && remove (cf->file_name) == -1)
262 io_error (cf, _("%s: Removing temporary file: %s."),
263 cf->file_name, strerror (errno));
264 free (cf->file_name);
272 /* Returns true if an I/O error has occurred in casefile CF. */
274 casefile_error (const struct casefile *cf)
279 /* Returns true only if casefile CF is stored in memory (instead of on
280 disk), false otherwise. */
282 casefile_in_core (const struct casefile *cf)
286 return cf->storage == MEMORY;
289 /* Puts a casefile to "sleep", that is, minimizes the resources
290 needed for it by closing its file descriptor and freeing its
291 buffer. This is useful if we need so many casefiles that we
292 might not have enough memory and file descriptors to go
295 For simplicity, this implementation always converts the
296 casefile to reader mode. If this turns out to be a problem,
297 with a little extra work we could also support sleeping
300 Returns true if successful, false if an I/O error occurred. */
302 casefile_sleep (const struct casefile *cf_)
304 struct casefile *cf = (struct casefile *) cf_;
307 casefile_mode_reader (cf);
308 casefile_to_disk (cf);
316 if (cf->buffer != NULL)
325 /* Returns the number of `union value's in a case for CF. */
327 casefile_get_value_cnt (const struct casefile *cf)
331 return cf->value_cnt;
334 /* Returns the number of cases in casefile CF. */
336 casefile_get_case_cnt (const struct casefile *cf)
343 /* Appends a copy of case C to casefile CF. Not valid after any
344 reader for CF has been created.
345 Returns true if successful, false if an I/O error occurred. */
347 casefile_append (struct casefile *cf, const struct ccase *c)
351 assert (cf->mode == WRITE);
353 /* Try memory first. */
354 if (cf->storage == MEMORY)
356 if (case_bytes < get_workspace ())
358 size_t block_idx = cf->case_cnt / CASES_PER_BLOCK;
359 size_t case_idx = cf->case_cnt % CASES_PER_BLOCK;
360 struct ccase new_case;
362 case_bytes += cf->case_acct_size;
363 case_clone (&new_case, c);
366 if ((block_idx & (block_idx - 1)) == 0)
368 size_t block_cap = block_idx == 0 ? 1 : block_idx * 2;
369 cf->cases = xnrealloc (cf->cases,
370 block_cap, sizeof *cf->cases);
373 cf->cases[block_idx] = xnmalloc (CASES_PER_BLOCK,
377 case_move (&cf->cases[block_idx][case_idx], &new_case);
381 casefile_to_disk (cf);
382 assert (cf->storage == DISK);
383 write_case_to_disk (cf, c);
387 write_case_to_disk (cf, c);
393 /* Appends case C to casefile CF, which takes over ownership of
394 C. Not valid after any reader for CF has been created.
395 Returns true if successful, false if an I/O error occurred. */
397 casefile_append_xfer (struct casefile *cf, struct ccase *c)
399 casefile_append (cf, c);
404 /* Writes case C to casefile CF's disk buffer, first flushing the buffer to
405 disk if it would otherwise overflow.
406 Returns true if successful, false if an I/O error occurred. */
408 write_case_to_disk (struct casefile *cf, const struct ccase *c)
413 case_to_values (c, cf->buffer + cf->buffer_used, cf->value_cnt);
414 cf->buffer_used += cf->value_cnt;
415 if (cf->buffer_used + cf->value_cnt > cf->buffer_size)
419 /* If any bytes in CF's output buffer are used, flush them to
422 flush_buffer (struct casefile *cf)
424 if (cf->ok && cf->buffer_used > 0)
426 if (!full_write (cf->fd, cf->buffer,
427 cf->buffer_size * sizeof *cf->buffer))
428 io_error (cf, _("Error writing temporary file: %s."),
434 /* If CF is currently stored in memory, writes it to disk. Readers, if any,
435 retain their current positions.
436 Returns true if successful, false if an I/O error occurred. */
438 casefile_to_disk (const struct casefile *cf_)
440 struct casefile *cf = (struct casefile *) cf_;
441 struct casereader *reader;
445 if (cf->storage == MEMORY)
447 size_t idx, block_cnt;
449 assert (cf->file_name == NULL);
450 assert (cf->fd == -1);
451 assert (cf->buffer_used == 0);
453 if (!make_temp_file (&cf->fd, &cf->file_name))
460 cf->buffer = xnmalloc (cf->buffer_size, sizeof *cf->buffer);
461 memset (cf->buffer, 0, cf->buffer_size * sizeof *cf->buffer);
463 case_bytes -= cf->case_cnt * cf->case_acct_size;
464 for (idx = 0; idx < cf->case_cnt; idx++)
466 size_t block_idx = idx / CASES_PER_BLOCK;
467 size_t case_idx = idx % CASES_PER_BLOCK;
468 struct ccase *c = &cf->cases[block_idx][case_idx];
469 write_case_to_disk (cf, c);
473 block_cnt = DIV_RND_UP (cf->case_cnt, CASES_PER_BLOCK);
474 for (idx = 0; idx < block_cnt; idx++)
475 free (cf->cases[idx]);
480 if (cf->mode == READ)
483 for (reader = cf->readers; reader != NULL; reader = reader->next)
484 reader_open_file (reader);
489 /* Changes CF to reader mode, ensuring that no more cases may be
490 added. Creating a casereader for CF has the same effect. */
492 casefile_mode_reader (struct casefile *cf)
498 /* Creates and returns a casereader for CF. A casereader can be used to
499 sequentially read the cases in a casefile. */
501 casefile_get_reader (const struct casefile *cf_)
503 struct casefile *cf = (struct casefile *) cf_;
504 struct casereader *reader;
507 assert (!cf->being_destroyed);
509 /* Flush the buffer to disk if it's not empty. */
510 if (cf->mode == WRITE && cf->storage == DISK)
515 reader = xmalloc (sizeof *reader);
516 reader->next = cf->readers;
517 if (cf->readers != NULL)
518 reader->next->prev = reader;
519 cf->readers = reader;
522 reader->case_idx = 0;
523 reader->destructive = 0;
524 reader->random = false;
526 reader->buffer = NULL;
527 reader->buffer_pos = 0;
528 case_nullify (&reader->c);
530 if (reader->cf->storage == DISK)
531 reader_open_file (reader);
536 /* Creates and returns a random casereader for CF. A random
537 casereader can be used to randomly read the cases in a
540 casefile_get_random_reader (const struct casefile *cf)
542 struct casereader *reader = casefile_get_reader (cf);
543 reader->random = true;
547 /* Creates and returns a destructive casereader for CF. Like a
548 normal casereader, a destructive casereader sequentially reads
549 the cases in a casefile. Unlike a normal casereader, a
550 destructive reader cannot operate concurrently with any other
551 reader. (This restriction could be relaxed in a few ways, but
552 it is so far unnecessary for other code.) */
554 casefile_get_destructive_reader (struct casefile *cf)
556 struct casereader *reader;
558 assert (cf->readers == NULL);
559 reader = casefile_get_reader (cf);
560 reader->destructive = 1;
561 cf->being_destroyed = 1;
565 /* Opens a disk file for READER and seeks to the current position as indicated
566 by case_idx. Normally the current position is the beginning of the file,
567 but casefile_to_disk may cause the file to be opened at a different
570 reader_open_file (struct casereader *reader)
572 struct casefile *cf = reader->cf;
573 if (!cf->ok || reader->case_idx >= cf->case_cnt)
583 reader->fd = safe_open (cf->file_name, O_RDONLY);
585 io_error (cf, _("%s: Opening temporary file: %s."),
586 cf->file_name, strerror (errno));
589 if (cf->buffer != NULL)
591 reader->buffer = cf->buffer;
596 reader->buffer = xnmalloc (cf->buffer_size, sizeof *cf->buffer);
597 memset (reader->buffer, 0, cf->buffer_size * sizeof *cf->buffer);
600 case_create (&reader->c, cf->value_cnt);
602 reader->buffer_ofs = -1;
603 reader->file_ofs = -1;
604 seek_and_fill_buffer (reader);
607 /* Seeks the backing file for READER to the proper position and
608 refreshes the buffer contents. */
610 seek_and_fill_buffer (struct casereader *reader)
612 struct casefile *cf = reader->cf;
615 if (cf->value_cnt != 0)
617 size_t buffer_case_cnt = cf->buffer_size / cf->value_cnt;
618 new_ofs = ((off_t) reader->case_idx / buffer_case_cnt
619 * cf->buffer_size * sizeof *cf->buffer);
620 reader->buffer_pos = (reader->case_idx % buffer_case_cnt
625 if (new_ofs != reader->file_ofs)
627 if (lseek (reader->fd, new_ofs, SEEK_SET) != new_ofs)
628 io_error (cf, _("%s: Seeking temporary file: %s."),
629 cf->file_name, strerror (errno));
631 reader->file_ofs = new_ofs;
634 if (cf->case_cnt > 0 && cf->value_cnt > 0 && reader->buffer_ofs != new_ofs)
635 fill_buffer (reader);
638 /* Fills READER's buffer by reading a block from disk. */
640 fill_buffer (struct casereader *reader)
644 int bytes = full_read (reader->fd, reader->buffer,
645 reader->cf->buffer_size * sizeof *reader->buffer);
647 io_error (reader->cf, _("%s: Reading temporary file: %s."),
648 reader->cf->file_name, strerror (errno));
649 else if (bytes != reader->cf->buffer_size * sizeof *reader->buffer)
650 io_error (reader->cf, _("%s: Temporary file ended unexpectedly."),
651 reader->cf->file_name);
654 reader->buffer_ofs = reader->file_ofs;
655 reader->file_ofs += bytes;
658 return reader->cf->ok;
661 /* Returns the casefile that READER reads. */
662 const struct casefile *
663 casereader_get_casefile (const struct casereader *reader)
665 assert (reader != NULL);
670 /* Reads a copy of the next case from READER into C.
671 Caller is responsible for destroying C.
672 Returns true if successful, false at end of file. */
674 casereader_read (struct casereader *reader, struct ccase *c)
676 assert (reader != NULL);
678 if (!reader->cf->ok || reader->case_idx >= reader->cf->case_cnt)
681 if (reader->cf->storage == MEMORY)
683 size_t block_idx = reader->case_idx / CASES_PER_BLOCK;
684 size_t case_idx = reader->case_idx % CASES_PER_BLOCK;
686 case_clone (c, &reader->cf->cases[block_idx][case_idx]);
692 if (reader->buffer_pos + reader->cf->value_cnt > reader->cf->buffer_size)
694 if (!fill_buffer (reader))
696 reader->buffer_pos = 0;
699 case_from_values (&reader->c, reader->buffer + reader->buffer_pos,
700 reader->cf->value_cnt);
701 reader->buffer_pos += reader->cf->value_cnt;
704 case_clone (c, &reader->c);
709 /* Reads the next case from READER into C and transfers ownership
710 to the caller. Caller is responsible for destroying C.
711 Returns true if successful, false at end of file or on I/O
714 casereader_read_xfer (struct casereader *reader, struct ccase *c)
716 assert (reader != NULL);
718 if (reader->destructive == 0
719 || reader->case_idx >= reader->cf->case_cnt
720 || reader->cf->storage == DISK)
721 return casereader_read (reader, c);
724 size_t block_idx = reader->case_idx / CASES_PER_BLOCK;
725 size_t case_idx = reader->case_idx % CASES_PER_BLOCK;
726 struct ccase *read_case = &reader->cf->cases[block_idx][case_idx];
728 case_move (c, read_case);
734 /* Sets the next case to be read by READER to CASE_IDX,
735 which must be less than the number of cases in the casefile.
736 Allowed only for random readers. */
738 casereader_seek (struct casereader *reader, unsigned long case_idx)
740 assert (reader != NULL);
741 assert (reader->random);
742 assert (case_idx < reader->cf->case_cnt);
744 reader->case_idx = case_idx;
745 if (reader->cf->storage == DISK)
746 seek_and_fill_buffer (reader);
749 /* Destroys READER. */
751 casereader_destroy (struct casereader *reader)
753 assert (reader != NULL);
755 if (reader->next != NULL)
756 reader->next->prev = reader->prev;
757 if (reader->prev != NULL)
758 reader->prev->next = reader->next;
759 if (reader->cf->readers == reader)
760 reader->cf->readers = reader->next;
762 if (reader->cf->buffer == NULL)
763 reader->cf->buffer = reader->buffer;
765 free (reader->buffer);
767 if (reader->fd != -1)
769 if (reader->cf->fd == -1)
770 reader->cf->fd = reader->fd;
772 safe_close (reader->fd);
775 case_destroy (&reader->c);
780 /* Marks CF as having encountered an I/O error.
781 If this is the first error on CF, reports FORMAT to the user,
782 doing printf()-style substitutions. */
784 io_error (struct casefile *cf, const char *format, ...)
791 m.category = MSG_GENERAL;
792 m.severity = MSG_ERROR;
793 m.where.file_name = NULL;
794 m.where.line_number = -1;
795 va_start (args, format);
796 m.text = xvasprintf (format, args);
804 /* Calls open(), passing FILE_NAME and FLAGS, repeating as necessary
805 to deal with interrupted calls. */
807 safe_open (const char *file_name, int flags)
813 fd = open (file_name, flags);
815 while (fd == -1 && errno == EINTR);
820 /* Calls close(), passing FD, repeating as necessary to deal with
821 interrupted calls. */
822 static int safe_close (int fd)
830 while (retval == -1 && errno == EINTR);
835 /* Registers our exit handler with atexit() if it has not already
838 register_atexit (void)
840 static bool registered = false;
844 atexit (exit_handler);
848 /* atexit() handler that closes and deletes our temporary
853 while (casefiles != NULL)
854 casefile_destroy (casefiles);