1 /* PSPP - computes sample statistics.
2 Copyright (C) 2004 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
29 #include <libpspp/alloc.h>
31 #include <libpspp/compiler.h>
32 #include <libpspp/message.h>
33 #include "full-read.h"
34 #include "full-write.h"
35 #include <libpspp/misc.h>
36 #include "make-file.h"
41 #define _(msgid) gettext (msgid)
43 #define IO_BUF_SIZE (8192 / sizeof (union value))
45 /* A casefile represents a sequentially accessible stream of
48 If workspace allows, a casefile is maintained in memory. If
49 workspace overflows, then the casefile is pushed to disk. In
50 either case the interface presented to callers is kept the
53 The life cycle of a casefile consists of up to three phases:
55 1. Writing. The casefile initially contains no cases. In
56 this phase, any number of cases may be appended to the
57 end of a casefile. (Cases are never inserted in the
58 middle or before the beginning of a casefile.)
60 Use casefile_append() or casefile_append_xfer() to
61 append a case to a casefile.
63 2. Reading. The casefile may be read sequentially,
64 starting from the beginning, by "casereaders". Any
65 number of casereaders may be created, at any time,
66 during the reading phase. Each casereader has an
67 independent position in the casefile.
69 Casereaders may only move forward. They cannot move
70 backward to arbitrary records or seek randomly.
71 Cloning casereaders is possible, but it is not yet
74 Use casefile_get_reader() to create a casereader for
75 use in phase 2. This also transitions from phase 1 to
76 phase 2. Calling casefile_mode_reader() makes the same
77 transition, without creating a casereader.
79 Use casereader_read() or casereader_read_xfer() to read
80 a case from a casereader. Use casereader_destroy() to
81 discard a casereader when it is no longer needed.
83 3. Destruction. This phase is optional. The casefile is
84 also read with casereaders in this phase, but the
85 ability to create new casereaders is curtailed.
87 In this phase, casereaders could still be cloned (once
88 we eventually implement cloning).
90 To transition from phase 1 or 2 to phase 3 and create a
91 casereader, call casefile_get_destructive_reader().
92 The same functions apply to the casereader obtained
93 this way as apply to casereaders obtained in phase 2.
95 After casefile_get_destructive_reader() is called, no
96 more casereaders may be created with
97 casefile_get_reader() or
98 casefile_get_destructive_reader(). (If cloning of
99 casereaders were implemented, it would still be
102 The purpose of the limitations applied to casereaders
103 in phase 3 is to allow in-memory casefiles to fully
104 transfer ownership of cases to the casereaders,
105 avoiding the need for extra copies of case data. For
106 relatively static data sets with many variables, I
107 suspect (without evidence) that this may be a big
110 When a casefile is no longer needed, it may be destroyed with
111 casefile_destroy(). This function will also destroy any
112 remaining casereaders. */
114 /* FIXME: should we implement compression? */
116 /* In-memory cases are arranged in an array of arrays. The top
117 level is variable size and the size of each bottom level array
118 is fixed at the number of cases defined here. */
119 #define CASES_PER_BLOCK 128
125 struct casefile *next, *prev; /* Next, prev in global list. */
126 size_t value_cnt; /* Case size in `union value's. */
127 size_t case_acct_size; /* Case size for accounting. */
128 unsigned long case_cnt; /* Number of cases stored. */
129 enum { MEMORY, DISK } storage; /* Where cases are stored. */
130 enum { WRITE, READ } mode; /* Is writing or reading allowed? */
131 struct casereader *readers; /* List of our readers. */
132 bool being_destroyed; /* Does a destructive reader exist? */
133 bool ok; /* False after I/O error. */
135 /* Memory storage. */
136 struct ccase **cases; /* Pointer to array of cases. */
139 int fd; /* File descriptor, -1 if none. */
140 char *file_name; /* File name. */
141 union value *buffer; /* I/O buffer, NULL if none. */
142 size_t buffer_used; /* Number of values used in buffer. */
143 size_t buffer_size; /* Buffer size in values. */
146 /* For reading out the cases in a casefile. */
149 struct casereader *next, *prev; /* Next, prev in casefile's list. */
150 struct casefile *cf; /* Our casefile. */
151 unsigned long case_idx; /* Case number of current case. */
152 bool destructive; /* Is this a destructive reader? */
155 int fd; /* File descriptor. */
156 union value *buffer; /* I/O buffer. */
157 size_t buffer_pos; /* Offset of buffer position. */
158 struct ccase c; /* Current case. */
161 /* Return the case number of the current case */
163 casereader_cnum(const struct casereader *r)
168 /* Doubly linked list of all casefiles. */
169 static struct casefile *casefiles;
171 /* Number of bytes of case allocated in in-memory casefiles. */
172 static size_t case_bytes;
174 static void register_atexit (void);
175 static void exit_handler (void);
177 static void reader_open_file (struct casereader *reader);
178 static void write_case_to_disk (struct casefile *cf, const struct ccase *c);
179 static void flush_buffer (struct casefile *cf);
180 static bool fill_buffer (struct casereader *reader);
182 static void io_error (struct casefile *, const char *, ...)
183 PRINTF_FORMAT (2, 3);
184 static int safe_open (const char *file_name, int flags);
185 static int safe_close (int fd);
187 /* Creates and returns a casefile to store cases of VALUE_CNT
188 `union value's each. */
190 casefile_create (size_t value_cnt)
192 struct casefile *cf = xmalloc (sizeof *cf);
193 cf->next = casefiles;
195 if (cf->next != NULL)
198 cf->value_cnt = value_cnt;
199 cf->case_acct_size = (cf->value_cnt + 4) * sizeof *cf->buffer;
201 cf->storage = MEMORY;
204 cf->being_destroyed = 0;
208 cf->file_name = NULL;
210 cf->buffer_size = ROUND_UP (cf->value_cnt, IO_BUF_SIZE);
211 if (cf->value_cnt > 0 && cf->buffer_size % cf->value_cnt > 64)
212 cf->buffer_size = cf->value_cnt;
218 /* Destroys casefile CF. */
220 casefile_destroy (struct casefile *cf)
224 if (cf->next != NULL)
225 cf->next->prev = cf->prev;
226 if (cf->prev != NULL)
227 cf->prev->next = cf->next;
229 casefiles = cf->next;
231 while (cf->readers != NULL)
232 casereader_destroy (cf->readers);
234 if (cf->cases != NULL)
236 size_t idx, block_cnt;
238 case_bytes -= cf->case_cnt * cf->case_acct_size;
239 for (idx = 0; idx < cf->case_cnt; idx++)
241 size_t block_idx = idx / CASES_PER_BLOCK;
242 size_t case_idx = idx % CASES_PER_BLOCK;
243 struct ccase *c = &cf->cases[block_idx][case_idx];
247 block_cnt = DIV_RND_UP (cf->case_cnt, CASES_PER_BLOCK);
248 for (idx = 0; idx < block_cnt; idx++)
249 free (cf->cases[idx]);
257 if (cf->file_name != NULL && remove (cf->file_name) == -1)
258 io_error (cf, _("%s: Removing temporary file: %s."),
259 cf->file_name, strerror (errno));
260 free (cf->file_name);
268 /* Returns true if an I/O error has occurred in casefile CF. */
270 casefile_error (const struct casefile *cf)
275 /* Returns true only if casefile CF is stored in memory (instead of on
276 disk), false otherwise. */
278 casefile_in_core (const struct casefile *cf)
282 return cf->storage == MEMORY;
285 /* Puts a casefile to "sleep", that is, minimizes the resources
286 needed for it by closing its file descriptor and freeing its
287 buffer. This is useful if we need so many casefiles that we
288 might not have enough memory and file descriptors to go
291 For simplicity, this implementation always converts the
292 casefile to reader mode. If this turns out to be a problem,
293 with a little extra work we could also support sleeping
296 Returns true if successful, false if an I/O error occurred. */
298 casefile_sleep (const struct casefile *cf_)
300 struct casefile *cf = (struct casefile *) cf_;
303 casefile_mode_reader (cf);
304 casefile_to_disk (cf);
312 if (cf->buffer != NULL)
321 /* Returns the number of `union value's in a case for CF. */
323 casefile_get_value_cnt (const struct casefile *cf)
327 return cf->value_cnt;
330 /* Returns the number of cases in casefile CF. */
332 casefile_get_case_cnt (const struct casefile *cf)
339 /* Appends a copy of case C to casefile CF. Not valid after any
340 reader for CF has been created.
341 Returns true if successful, false if an I/O error occurred. */
343 casefile_append (struct casefile *cf, const struct ccase *c)
347 assert (cf->mode == WRITE);
349 /* Try memory first. */
350 if (cf->storage == MEMORY)
352 if (case_bytes < get_workspace ())
354 size_t block_idx = cf->case_cnt / CASES_PER_BLOCK;
355 size_t case_idx = cf->case_cnt % CASES_PER_BLOCK;
356 struct ccase new_case;
358 case_bytes += cf->case_acct_size;
359 case_clone (&new_case, c);
362 if ((block_idx & (block_idx - 1)) == 0)
364 size_t block_cap = block_idx == 0 ? 1 : block_idx * 2;
365 cf->cases = xnrealloc (cf->cases,
366 block_cap, sizeof *cf->cases);
369 cf->cases[block_idx] = xnmalloc (CASES_PER_BLOCK,
373 case_move (&cf->cases[block_idx][case_idx], &new_case);
377 casefile_to_disk (cf);
378 assert (cf->storage == DISK);
379 write_case_to_disk (cf, c);
383 write_case_to_disk (cf, c);
389 /* Appends case C to casefile CF, which takes over ownership of
390 C. Not valid after any reader for CF has been created.
391 Returns true if successful, false if an I/O error occurred. */
393 casefile_append_xfer (struct casefile *cf, struct ccase *c)
395 casefile_append (cf, c);
400 /* Writes case C to casefile CF's disk buffer, first flushing the buffer to
401 disk if it would otherwise overflow.
402 Returns true if successful, false if an I/O error occurred. */
404 write_case_to_disk (struct casefile *cf, const struct ccase *c)
409 case_to_values (c, cf->buffer + cf->buffer_used, cf->value_cnt);
410 cf->buffer_used += cf->value_cnt;
411 if (cf->buffer_used + cf->value_cnt > cf->buffer_size)
415 /* If any bytes in CF's output buffer are used, flush them to
418 flush_buffer (struct casefile *cf)
420 if (cf->ok && cf->buffer_used > 0)
422 if (!full_write (cf->fd, cf->buffer,
423 cf->buffer_size * sizeof *cf->buffer))
424 io_error (cf, _("Error writing temporary file: %s."),
432 /* If CF is currently stored in memory, writes it to disk. Readers, if any,
433 retain their current positions.
434 Returns true if successful, false if an I/O error occurred. */
436 casefile_to_disk (const struct casefile *cf_)
438 struct casefile *cf = (struct casefile *) cf_;
439 struct casereader *reader;
443 if (cf->storage == MEMORY)
445 size_t idx, block_cnt;
447 assert (cf->file_name == NULL);
448 assert (cf->fd == -1);
449 assert (cf->buffer_used == 0);
451 if (!make_temp_file (&cf->fd, &cf->file_name))
458 cf->buffer = xnmalloc (cf->buffer_size, sizeof *cf->buffer);
459 memset (cf->buffer, 0, cf->buffer_size * sizeof *cf->buffer);
461 case_bytes -= cf->case_cnt * cf->case_acct_size;
462 for (idx = 0; idx < cf->case_cnt; idx++)
464 size_t block_idx = idx / CASES_PER_BLOCK;
465 size_t case_idx = idx % CASES_PER_BLOCK;
466 struct ccase *c = &cf->cases[block_idx][case_idx];
467 write_case_to_disk (cf, c);
471 block_cnt = DIV_RND_UP (cf->case_cnt, CASES_PER_BLOCK);
472 for (idx = 0; idx < block_cnt; idx++)
473 free (cf->cases[idx]);
478 if (cf->mode == READ)
481 for (reader = cf->readers; reader != NULL; reader = reader->next)
482 reader_open_file (reader);
487 /* Changes CF to reader mode, ensuring that no more cases may be
488 added. Creating a casereader for CF has the same effect. */
490 casefile_mode_reader (struct casefile *cf)
496 /* Creates and returns a casereader for CF. A casereader can be used to
497 sequentially read the cases in a casefile. */
499 casefile_get_reader (const struct casefile *cf_)
501 struct casefile *cf = (struct casefile *) cf_;
502 struct casereader *reader;
505 assert (!cf->being_destroyed);
507 /* Flush the buffer to disk if it's not empty. */
508 if (cf->mode == WRITE && cf->storage == DISK)
513 reader = xmalloc (sizeof *reader);
514 reader->next = cf->readers;
515 if (cf->readers != NULL)
516 reader->next->prev = reader;
517 cf->readers = reader;
520 reader->case_idx = 0;
521 reader->destructive = 0;
523 reader->buffer = NULL;
524 reader->buffer_pos = 0;
525 case_nullify (&reader->c);
527 if (reader->cf->storage == DISK)
528 reader_open_file (reader);
533 /* Creates and returns a destructive casereader for CF. Like a
534 normal casereader, a destructive casereader sequentially reads
535 the cases in a casefile. Unlike a normal casereader, a
536 destructive reader cannot operate concurrently with any other
537 reader. (This restriction could be relaxed in a few ways, but
538 it is so far unnecessary for other code.) */
540 casefile_get_destructive_reader (struct casefile *cf)
542 struct casereader *reader;
544 assert (cf->readers == NULL);
545 reader = casefile_get_reader (cf);
546 reader->destructive = 1;
547 cf->being_destroyed = 1;
551 /* Opens a disk file for READER and seeks to the current position as indicated
552 by case_idx. Normally the current position is the beginning of the file,
553 but casefile_to_disk may cause the file to be opened at a different
556 reader_open_file (struct casereader *reader)
558 struct casefile *cf = reader->cf;
561 if (!cf->ok || reader->case_idx >= cf->case_cnt)
571 reader->fd = safe_open (cf->file_name, O_RDONLY);
573 io_error (cf, _("%s: Opening temporary file: %s."),
574 cf->file_name, strerror (errno));
577 if (cf->buffer != NULL)
579 reader->buffer = cf->buffer;
584 reader->buffer = xnmalloc (cf->buffer_size, sizeof *cf->buffer);
585 memset (reader->buffer, 0, cf->buffer_size * sizeof *cf->buffer);
588 if (cf->value_cnt != 0)
590 size_t buffer_case_cnt = cf->buffer_size / cf->value_cnt;
591 file_ofs = ((off_t) reader->case_idx / buffer_case_cnt
592 * cf->buffer_size * sizeof *cf->buffer);
593 reader->buffer_pos = (reader->case_idx % buffer_case_cnt
598 if (lseek (reader->fd, file_ofs, SEEK_SET) != file_ofs)
599 io_error (cf, _("%s: Seeking temporary file: %s."),
600 cf->file_name, strerror (errno));
602 if (cf->case_cnt > 0 && cf->value_cnt > 0)
603 fill_buffer (reader);
605 case_create (&reader->c, cf->value_cnt);
608 /* Fills READER's buffer by reading a block from disk. */
610 fill_buffer (struct casereader *reader)
614 int bytes = full_read (reader->fd, reader->buffer,
615 reader->cf->buffer_size * sizeof *reader->buffer);
617 io_error (reader->cf, _("%s: Reading temporary file: %s."),
618 reader->cf->file_name, strerror (errno));
619 else if (bytes != reader->cf->buffer_size * sizeof *reader->buffer)
620 io_error (reader->cf, _("%s: Temporary file ended unexpectedly."),
621 reader->cf->file_name);
623 return reader->cf->ok;
626 /* Returns the casefile that READER reads. */
627 const struct casefile *
628 casereader_get_casefile (const struct casereader *reader)
630 assert (reader != NULL);
635 /* Reads a copy of the next case from READER into C.
636 Caller is responsible for destroying C.
637 Returns true if successful, false at end of file. */
639 casereader_read (struct casereader *reader, struct ccase *c)
641 assert (reader != NULL);
643 if (!reader->cf->ok || reader->case_idx >= reader->cf->case_cnt)
646 if (reader->cf->storage == MEMORY)
648 size_t block_idx = reader->case_idx / CASES_PER_BLOCK;
649 size_t case_idx = reader->case_idx % CASES_PER_BLOCK;
651 case_clone (c, &reader->cf->cases[block_idx][case_idx]);
657 if (reader->buffer_pos + reader->cf->value_cnt > reader->cf->buffer_size)
659 if (!fill_buffer (reader))
661 reader->buffer_pos = 0;
664 case_from_values (&reader->c, reader->buffer + reader->buffer_pos,
665 reader->cf->value_cnt);
666 reader->buffer_pos += reader->cf->value_cnt;
669 case_clone (c, &reader->c);
674 /* Reads the next case from READER into C and transfers ownership
675 to the caller. Caller is responsible for destroying C.
676 Returns true if successful, false at end of file or on I/O
679 casereader_read_xfer (struct casereader *reader, struct ccase *c)
681 assert (reader != NULL);
683 if (reader->destructive == 0
684 || reader->case_idx >= reader->cf->case_cnt
685 || reader->cf->storage == DISK)
686 return casereader_read (reader, c);
689 size_t block_idx = reader->case_idx / CASES_PER_BLOCK;
690 size_t case_idx = reader->case_idx % CASES_PER_BLOCK;
691 struct ccase *read_case = &reader->cf->cases[block_idx][case_idx];
693 case_move (c, read_case);
699 /* Destroys READER. */
701 casereader_destroy (struct casereader *reader)
703 assert (reader != NULL);
705 if (reader->next != NULL)
706 reader->next->prev = reader->prev;
707 if (reader->prev != NULL)
708 reader->prev->next = reader->next;
709 if (reader->cf->readers == reader)
710 reader->cf->readers = reader->next;
712 if (reader->cf->buffer == NULL)
713 reader->cf->buffer = reader->buffer;
715 free (reader->buffer);
717 if (reader->fd != -1)
719 if (reader->cf->fd == -1)
720 reader->cf->fd = reader->fd;
722 safe_close (reader->fd);
725 case_destroy (&reader->c);
730 /* Marks CF as having encountered an I/O error.
731 If this is the first error on CF, reports FORMAT to the user,
732 doing printf()-style substitutions. */
734 io_error (struct casefile *cf, const char *format, ...)
741 m.category = MSG_GENERAL;
742 m.severity = MSG_ERROR;
743 m.where.file_name = NULL;
744 m.where.line_number = -1;
745 va_start (args, format);
746 m.text = xvasprintf (format, args);
754 /* Calls open(), passing FILE_NAME and FLAGS, repeating as necessary
755 to deal with interrupted calls. */
757 safe_open (const char *file_name, int flags)
763 fd = open (file_name, flags);
765 while (fd == -1 && errno == EINTR);
770 /* Calls close(), passing FD, repeating as necessary to deal with
771 interrupted calls. */
772 static int safe_close (int fd)
780 while (retval == -1 && errno == EINTR);
785 /* Registers our exit handler with atexit() if it has not already
788 register_atexit (void)
790 static bool registered = false;
794 atexit (exit_handler);
798 /* atexit() handler that closes and deletes our temporary
803 while (casefiles != NULL)
804 casefile_destroy (casefiles);