1 /* PSPP - computes sample statistics.
2 Copyright (C) 2004 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
29 #include <libpspp/alloc.h>
31 #include <libpspp/compiler.h>
32 #include <libpspp/message.h>
33 #include "full-read.h"
34 #include "full-write.h"
35 #include <libpspp/misc.h>
36 #include "make-file.h"
41 #define _(msgid) gettext (msgid)
43 #define IO_BUF_SIZE (8192 / sizeof (union value))
45 /* A casefile represents a sequentially accessible stream of
48 If workspace allows, a casefile is maintained in memory. If
49 workspace overflows, then the casefile is pushed to disk. In
50 either case the interface presented to callers is kept the
53 The life cycle of a casefile consists of up to three phases:
55 1. Writing. The casefile initially contains no cases. In
56 this phase, any number of cases may be appended to the
57 end of a casefile. (Cases are never inserted in the
58 middle or before the beginning of a casefile.)
60 Use casefile_append() or casefile_append_xfer() to
61 append a case to a casefile.
63 2. Reading. The casefile may be read sequentially,
64 starting from the beginning, by "casereaders". Any
65 number of casereaders may be created, at any time,
66 during the reading phase. Each casereader has an
67 independent position in the casefile.
69 Casereaders may only move forward. They cannot move
70 backward to arbitrary records or seek randomly.
71 Cloning casereaders is possible, but it is not yet
74 Use casefile_get_reader() to create a casereader for
75 use in phase 2. This also transitions from phase 1 to
76 phase 2. Calling casefile_mode_reader() makes the same
77 transition, without creating a casereader.
79 Use casereader_read() or casereader_read_xfer() to read
80 a case from a casereader. Use casereader_destroy() to
81 discard a casereader when it is no longer needed.
83 3. Destruction. This phase is optional. The casefile is
84 also read with casereaders in this phase, but the
85 ability to create new casereaders is curtailed.
87 In this phase, casereaders could still be cloned (once
88 we eventually implement cloning).
90 To transition from phase 1 or 2 to phase 3 and create a
91 casereader, call casefile_get_destructive_reader().
92 The same functions apply to the casereader obtained
93 this way as apply to casereaders obtained in phase 2.
95 After casefile_get_destructive_reader() is called, no
96 more casereaders may be created with
97 casefile_get_reader() or
98 casefile_get_destructive_reader(). (If cloning of
99 casereaders were implemented, it would still be
102 The purpose of the limitations applied to casereaders
103 in phase 3 is to allow in-memory casefiles to fully
104 transfer ownership of cases to the casereaders,
105 avoiding the need for extra copies of case data. For
106 relatively static data sets with many variables, I
107 suspect (without evidence) that this may be a big
110 When a casefile is no longer needed, it may be destroyed with
111 casefile_destroy(). This function will also destroy any
112 remaining casereaders. */
114 /* In-memory cases are arranged in an array of arrays. The top
115 level is variable size and the size of each bottom level array
116 is fixed at the number of cases defined here. */
117 #define CASES_PER_BLOCK 128
123 struct casefile *next, *prev; /* Next, prev in global list. */
124 size_t value_cnt; /* Case size in `union value's. */
125 size_t case_acct_size; /* Case size for accounting. */
126 unsigned long case_cnt; /* Number of cases stored. */
127 enum { MEMORY, DISK } storage; /* Where cases are stored. */
128 enum { WRITE, READ } mode; /* Is writing or reading allowed? */
129 struct casereader *readers; /* List of our readers. */
130 int being_destroyed; /* Does a destructive reader exist? */
131 bool ok; /* False after I/O error. */
133 /* Memory storage. */
134 struct ccase **cases; /* Pointer to array of cases. */
137 int fd; /* File descriptor, -1 if none. */
138 char *file_name; /* File name. */
139 union value *buffer; /* I/O buffer, NULL if none. */
140 size_t buffer_used; /* Number of values used in buffer. */
141 size_t buffer_size; /* Buffer size in values. */
144 /* For reading out the cases in a casefile. */
147 struct casereader *next, *prev; /* Next, prev in casefile's list. */
148 struct casefile *cf; /* Our casefile. */
149 unsigned long case_idx; /* Case number of current case. */
150 int destructive; /* Is this a destructive reader? */
153 int fd; /* File descriptor. */
154 union value *buffer; /* I/O buffer. */
155 size_t buffer_pos; /* Offset of buffer position. */
156 struct ccase c; /* Current case. */
159 /* Return the case number of the current case */
161 casereader_cnum(const struct casereader *r)
166 /* Doubly linked list of all casefiles. */
167 static struct casefile *casefiles;
169 /* Number of bytes of case allocated in in-memory casefiles. */
170 static size_t case_bytes;
172 static void register_atexit (void);
173 static void exit_handler (void);
175 static void reader_open_file (struct casereader *reader);
176 static void write_case_to_disk (struct casefile *cf, const struct ccase *c);
177 static void flush_buffer (struct casefile *cf);
178 static bool fill_buffer (struct casereader *reader);
180 static void io_error (struct casefile *, const char *, ...)
181 PRINTF_FORMAT (2, 3);
182 static int safe_open (const char *file_name, int flags);
183 static int safe_close (int fd);
185 /* Creates and returns a casefile to store cases of VALUE_CNT
186 `union value's each. */
188 casefile_create (size_t value_cnt)
190 struct casefile *cf = xmalloc (sizeof *cf);
191 cf->next = casefiles;
193 if (cf->next != NULL)
196 cf->value_cnt = value_cnt;
197 cf->case_acct_size = (cf->value_cnt + 4) * sizeof *cf->buffer;
199 cf->storage = MEMORY;
202 cf->being_destroyed = 0;
206 cf->file_name = NULL;
208 cf->buffer_size = ROUND_UP (cf->value_cnt, IO_BUF_SIZE);
209 if (cf->value_cnt > 0 && cf->buffer_size % cf->value_cnt > 64)
210 cf->buffer_size = cf->value_cnt;
216 /* Destroys casefile CF. */
218 casefile_destroy (struct casefile *cf)
222 if (cf->next != NULL)
223 cf->next->prev = cf->prev;
224 if (cf->prev != NULL)
225 cf->prev->next = cf->next;
227 casefiles = cf->next;
229 while (cf->readers != NULL)
230 casereader_destroy (cf->readers);
232 if (cf->cases != NULL)
234 size_t idx, block_cnt;
236 case_bytes -= cf->case_cnt * cf->case_acct_size;
237 for (idx = 0; idx < cf->case_cnt; idx++)
239 size_t block_idx = idx / CASES_PER_BLOCK;
240 size_t case_idx = idx % CASES_PER_BLOCK;
241 struct ccase *c = &cf->cases[block_idx][case_idx];
245 block_cnt = DIV_RND_UP (cf->case_cnt, CASES_PER_BLOCK);
246 for (idx = 0; idx < block_cnt; idx++)
247 free (cf->cases[idx]);
255 if (cf->file_name != NULL && remove (cf->file_name) == -1)
256 io_error (cf, _("%s: Removing temporary file: %s."),
257 cf->file_name, strerror (errno));
258 free (cf->file_name);
266 /* Returns true if an I/O error has occurred in casefile CF. */
268 casefile_error (const struct casefile *cf)
273 /* Returns nonzero only if casefile CF is stored in memory (instead of on
276 casefile_in_core (const struct casefile *cf)
280 return cf->storage == MEMORY;
283 /* Puts a casefile to "sleep", that is, minimizes the resources
284 needed for it by closing its file descriptor and freeing its
285 buffer. This is useful if we need so many casefiles that we
286 might not have enough memory and file descriptors to go
289 For simplicity, this implementation always converts the
290 casefile to reader mode. If this turns out to be a problem,
291 with a little extra work we could also support sleeping
294 Returns true if successful, false if an I/O error occurred. */
296 casefile_sleep (const struct casefile *cf_)
298 struct casefile *cf = (struct casefile *) cf_;
301 casefile_mode_reader (cf);
302 casefile_to_disk (cf);
310 if (cf->buffer != NULL)
319 /* Returns the number of `union value's in a case for CF. */
321 casefile_get_value_cnt (const struct casefile *cf)
325 return cf->value_cnt;
328 /* Returns the number of cases in casefile CF. */
330 casefile_get_case_cnt (const struct casefile *cf)
337 /* Appends a copy of case C to casefile CF. Not valid after any
338 reader for CF has been created.
339 Returns true if successful, false if an I/O error occurred. */
341 casefile_append (struct casefile *cf, const struct ccase *c)
345 assert (cf->mode == WRITE);
347 /* Try memory first. */
348 if (cf->storage == MEMORY)
350 if (case_bytes < get_workspace ())
352 size_t block_idx = cf->case_cnt / CASES_PER_BLOCK;
353 size_t case_idx = cf->case_cnt % CASES_PER_BLOCK;
354 struct ccase new_case;
356 case_bytes += cf->case_acct_size;
357 case_clone (&new_case, c);
360 if ((block_idx & (block_idx - 1)) == 0)
362 size_t block_cap = block_idx == 0 ? 1 : block_idx * 2;
363 cf->cases = xnrealloc (cf->cases,
364 block_cap, sizeof *cf->cases);
367 cf->cases[block_idx] = xnmalloc (CASES_PER_BLOCK,
371 case_move (&cf->cases[block_idx][case_idx], &new_case);
375 casefile_to_disk (cf);
376 assert (cf->storage == DISK);
377 write_case_to_disk (cf, c);
381 write_case_to_disk (cf, c);
387 /* Appends case C to casefile CF, which takes over ownership of
388 C. Not valid after any reader for CF has been created.
389 Returns true if successful, false if an I/O error occurred. */
391 casefile_append_xfer (struct casefile *cf, struct ccase *c)
393 casefile_append (cf, c);
398 /* Writes case C to casefile CF's disk buffer, first flushing the buffer to
399 disk if it would otherwise overflow.
400 Returns true if successful, false if an I/O error occurred. */
402 write_case_to_disk (struct casefile *cf, const struct ccase *c)
407 case_to_values (c, cf->buffer + cf->buffer_used, cf->value_cnt);
408 cf->buffer_used += cf->value_cnt;
409 if (cf->buffer_used + cf->value_cnt > cf->buffer_size)
413 /* If any bytes in CF's output buffer are used, flush them to
416 flush_buffer (struct casefile *cf)
418 if (cf->ok && cf->buffer_used > 0)
420 if (!full_write (cf->fd, cf->buffer,
421 cf->buffer_size * sizeof *cf->buffer))
422 io_error (cf, _("Error writing temporary file: %s."),
430 /* If CF is currently stored in memory, writes it to disk. Readers, if any,
431 retain their current positions.
432 Returns true if successful, false if an I/O error occurred. */
434 casefile_to_disk (const struct casefile *cf_)
436 struct casefile *cf = (struct casefile *) cf_;
437 struct casereader *reader;
441 if (cf->storage == MEMORY)
443 size_t idx, block_cnt;
445 assert (cf->file_name == NULL);
446 assert (cf->fd == -1);
447 assert (cf->buffer_used == 0);
449 if (!make_temp_file (&cf->fd, &cf->file_name))
456 cf->buffer = xnmalloc (cf->buffer_size, sizeof *cf->buffer);
457 memset (cf->buffer, 0, cf->buffer_size * sizeof *cf->buffer);
459 case_bytes -= cf->case_cnt * cf->case_acct_size;
460 for (idx = 0; idx < cf->case_cnt; idx++)
462 size_t block_idx = idx / CASES_PER_BLOCK;
463 size_t case_idx = idx % CASES_PER_BLOCK;
464 struct ccase *c = &cf->cases[block_idx][case_idx];
465 write_case_to_disk (cf, c);
469 block_cnt = DIV_RND_UP (cf->case_cnt, CASES_PER_BLOCK);
470 for (idx = 0; idx < block_cnt; idx++)
471 free (cf->cases[idx]);
476 if (cf->mode == READ)
479 for (reader = cf->readers; reader != NULL; reader = reader->next)
480 reader_open_file (reader);
485 /* Changes CF to reader mode, ensuring that no more cases may be
486 added. Creating a casereader for CF has the same effect. */
488 casefile_mode_reader (struct casefile *cf)
494 /* Creates and returns a casereader for CF. A casereader can be used to
495 sequentially read the cases in a casefile. */
497 casefile_get_reader (const struct casefile *cf_)
499 struct casefile *cf = (struct casefile *) cf_;
500 struct casereader *reader;
503 assert (!cf->being_destroyed);
505 /* Flush the buffer to disk if it's not empty. */
506 if (cf->mode == WRITE && cf->storage == DISK)
511 reader = xmalloc (sizeof *reader);
512 reader->next = cf->readers;
513 if (cf->readers != NULL)
514 reader->next->prev = reader;
515 cf->readers = reader;
518 reader->case_idx = 0;
519 reader->destructive = 0;
521 reader->buffer = NULL;
522 reader->buffer_pos = 0;
523 case_nullify (&reader->c);
525 if (reader->cf->storage == DISK)
526 reader_open_file (reader);
531 /* Creates and returns a destructive casereader for CF. Like a
532 normal casereader, a destructive casereader sequentially reads
533 the cases in a casefile. Unlike a normal casereader, a
534 destructive reader cannot operate concurrently with any other
535 reader. (This restriction could be relaxed in a few ways, but
536 it is so far unnecessary for other code.) */
538 casefile_get_destructive_reader (struct casefile *cf)
540 struct casereader *reader;
542 assert (cf->readers == NULL);
543 reader = casefile_get_reader (cf);
544 reader->destructive = 1;
545 cf->being_destroyed = 1;
549 /* Opens a disk file for READER and seeks to the current position as indicated
550 by case_idx. Normally the current position is the beginning of the file,
551 but casefile_to_disk may cause the file to be opened at a different
554 reader_open_file (struct casereader *reader)
556 struct casefile *cf = reader->cf;
559 if (!cf->ok || reader->case_idx >= cf->case_cnt)
569 reader->fd = safe_open (cf->file_name, O_RDONLY);
571 io_error (cf, _("%s: Opening temporary file: %s."),
572 cf->file_name, strerror (errno));
575 if (cf->buffer != NULL)
577 reader->buffer = cf->buffer;
582 reader->buffer = xnmalloc (cf->buffer_size, sizeof *cf->buffer);
583 memset (reader->buffer, 0, cf->buffer_size * sizeof *cf->buffer);
586 if (cf->value_cnt != 0)
588 size_t buffer_case_cnt = cf->buffer_size / cf->value_cnt;
589 file_ofs = ((off_t) reader->case_idx / buffer_case_cnt
590 * cf->buffer_size * sizeof *cf->buffer);
591 reader->buffer_pos = (reader->case_idx % buffer_case_cnt
596 if (lseek (reader->fd, file_ofs, SEEK_SET) != file_ofs)
597 io_error (cf, _("%s: Seeking temporary file: %s."),
598 cf->file_name, strerror (errno));
600 if (cf->case_cnt > 0 && cf->value_cnt > 0)
601 fill_buffer (reader);
603 case_create (&reader->c, cf->value_cnt);
606 /* Fills READER's buffer by reading a block from disk. */
608 fill_buffer (struct casereader *reader)
612 int bytes = full_read (reader->fd, reader->buffer,
613 reader->cf->buffer_size * sizeof *reader->buffer);
615 io_error (reader->cf, _("%s: Reading temporary file: %s."),
616 reader->cf->file_name, strerror (errno));
617 else if (bytes != reader->cf->buffer_size * sizeof *reader->buffer)
618 io_error (reader->cf, _("%s: Temporary file ended unexpectedly."),
619 reader->cf->file_name);
621 return reader->cf->ok;
624 /* Returns the casefile that READER reads. */
625 const struct casefile *
626 casereader_get_casefile (const struct casereader *reader)
628 assert (reader != NULL);
633 /* Reads a copy of the next case from READER into C.
634 Caller is responsible for destroying C.
635 Returns true if successful, false at end of file. */
637 casereader_read (struct casereader *reader, struct ccase *c)
639 assert (reader != NULL);
641 if (!reader->cf->ok || reader->case_idx >= reader->cf->case_cnt)
644 if (reader->cf->storage == MEMORY)
646 size_t block_idx = reader->case_idx / CASES_PER_BLOCK;
647 size_t case_idx = reader->case_idx % CASES_PER_BLOCK;
649 case_clone (c, &reader->cf->cases[block_idx][case_idx]);
655 if (reader->buffer_pos + reader->cf->value_cnt > reader->cf->buffer_size)
657 if (!fill_buffer (reader))
659 reader->buffer_pos = 0;
662 case_from_values (&reader->c, reader->buffer + reader->buffer_pos,
663 reader->cf->value_cnt);
664 reader->buffer_pos += reader->cf->value_cnt;
667 case_clone (c, &reader->c);
672 /* Reads the next case from READER into C and transfers ownership
673 to the caller. Caller is responsible for destroying C.
674 Returns true if successful, false at end of file or on I/O
677 casereader_read_xfer (struct casereader *reader, struct ccase *c)
679 assert (reader != NULL);
681 if (reader->destructive == 0
682 || reader->case_idx >= reader->cf->case_cnt
683 || reader->cf->storage == DISK)
684 return casereader_read (reader, c);
687 size_t block_idx = reader->case_idx / CASES_PER_BLOCK;
688 size_t case_idx = reader->case_idx % CASES_PER_BLOCK;
689 struct ccase *read_case = &reader->cf->cases[block_idx][case_idx];
691 case_move (c, read_case);
697 /* Destroys READER. */
699 casereader_destroy (struct casereader *reader)
701 assert (reader != NULL);
703 if (reader->next != NULL)
704 reader->next->prev = reader->prev;
705 if (reader->prev != NULL)
706 reader->prev->next = reader->next;
707 if (reader->cf->readers == reader)
708 reader->cf->readers = reader->next;
710 if (reader->cf->buffer == NULL)
711 reader->cf->buffer = reader->buffer;
713 free (reader->buffer);
715 if (reader->fd != -1)
717 if (reader->cf->fd == -1)
718 reader->cf->fd = reader->fd;
720 safe_close (reader->fd);
723 case_destroy (&reader->c);
728 /* Marks CF as having encountered an I/O error.
729 If this is the first error on CF, reports FORMAT to the user,
730 doing printf()-style substitutions. */
732 io_error (struct casefile *cf, const char *format, ...)
740 e.where.file_name = NULL;
741 e.where.line_number = -1;
744 va_start (args, format);
745 err_vmsg (&e, format, args);
751 /* Calls open(), passing FILE_NAME and FLAGS, repeating as necessary
752 to deal with interrupted calls. */
754 safe_open (const char *file_name, int flags)
760 fd = open (file_name, flags);
762 while (fd == -1 && errno == EINTR);
767 /* Calls close(), passing FD, repeating as necessary to deal with
768 interrupted calls. */
769 static int safe_close (int fd)
777 while (retval == -1 && errno == EINTR);
782 /* Registers our exit handler with atexit() if it has not already
785 register_atexit (void)
787 static int registered = 0;
791 atexit (exit_handler);
797 /* atexit() handler that closes and deletes our temporary
802 while (casefiles != NULL)
803 casefile_destroy (casefiles);