1 /* PSPP - computes sample statistics.
2 Copyright (C) 2004 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
32 #include "full-read.h"
33 #include "full-write.h"
35 #include "make-file.h"
40 #define _(msgid) gettext (msgid)
42 #define IO_BUF_SIZE (8192 / sizeof (union value))
44 /* A casefile represents a sequentially accessible stream of
47 If workspace allows, a casefile is maintained in memory. If
48 workspace overflows, then the casefile is pushed to disk. In
49 either case the interface presented to callers is kept the
52 The life cycle of a casefile consists of up to three phases:
54 1. Writing. The casefile initially contains no cases. In
55 this phase, any number of cases may be appended to the
56 end of a casefile. (Cases are never inserted in the
57 middle or before the beginning of a casefile.)
59 Use casefile_append() or casefile_append_xfer() to
60 append a case to a casefile.
62 2. Reading. The casefile may be read sequentially,
63 starting from the beginning, by "casereaders". Any
64 number of casereaders may be created, at any time,
65 during the reading phase. Each casereader has an
66 independent position in the casefile.
68 Casereaders may only move forward. They cannot move
69 backward to arbitrary records or seek randomly.
70 Cloning casereaders is possible, but it is not yet
73 Use casefile_get_reader() to create a casereader for
74 use in phase 2. This also transitions from phase 1 to
75 phase 2. Calling casefile_mode_reader() makes the same
76 transition, without creating a casereader.
78 Use casereader_read() or casereader_read_xfer() to read
79 a case from a casereader. Use casereader_destroy() to
80 discard a casereader when it is no longer needed.
82 3. Destruction. This phase is optional. The casefile is
83 also read with casereaders in this phase, but the
84 ability to create new casereaders is curtailed.
86 In this phase, casereaders could still be cloned (once
87 we eventually implement cloning).
89 To transition from phase 1 or 2 to phase 3 and create a
90 casereader, call casefile_get_destructive_reader().
91 The same functions apply to the casereader obtained
92 this way as apply to casereaders obtained in phase 2.
94 After casefile_get_destructive_reader() is called, no
95 more casereaders may be created with
96 casefile_get_reader() or
97 casefile_get_destructive_reader(). (If cloning of
98 casereaders were implemented, it would still be
101 The purpose of the limitations applied to casereaders
102 in phase 3 is to allow in-memory casefiles to fully
103 transfer ownership of cases to the casereaders,
104 avoiding the need for extra copies of case data. For
105 relatively static data sets with many variables, I
106 suspect (without evidence) that this may be a big
109 When a casefile is no longer needed, it may be destroyed with
110 casefile_destroy(). This function will also destroy any
111 remaining casereaders. */
113 /* In-memory cases are arranged in an array of arrays. The top
114 level is variable size and the size of each bottom level array
115 is fixed at the number of cases defined here. */
116 #define CASES_PER_BLOCK 128
122 struct casefile *next, *prev; /* Next, prev in global list. */
123 size_t value_cnt; /* Case size in `union value's. */
124 size_t case_acct_size; /* Case size for accounting. */
125 unsigned long case_cnt; /* Number of cases stored. */
126 enum { MEMORY, DISK } storage; /* Where cases are stored. */
127 enum { WRITE, READ } mode; /* Is writing or reading allowed? */
128 struct casereader *readers; /* List of our readers. */
129 int being_destroyed; /* Does a destructive reader exist? */
130 bool ok; /* False after I/O error. */
132 /* Memory storage. */
133 struct ccase **cases; /* Pointer to array of cases. */
136 int fd; /* File descriptor, -1 if none. */
137 char *filename; /* Filename. */
138 union value *buffer; /* I/O buffer, NULL if none. */
139 size_t buffer_used; /* Number of values used in buffer. */
140 size_t buffer_size; /* Buffer size in values. */
143 /* For reading out the cases in a casefile. */
146 struct casereader *next, *prev; /* Next, prev in casefile's list. */
147 struct casefile *cf; /* Our casefile. */
148 unsigned long case_idx; /* Case number of current case. */
149 int destructive; /* Is this a destructive reader? */
152 int fd; /* File descriptor. */
153 union value *buffer; /* I/O buffer. */
154 size_t buffer_pos; /* Offset of buffer position. */
155 struct ccase c; /* Current case. */
158 /* Return the case number of the current case */
160 casereader_cnum(const struct casereader *r)
165 /* Doubly linked list of all casefiles. */
166 static struct casefile *casefiles;
168 /* Number of bytes of case allocated in in-memory casefiles. */
169 static size_t case_bytes;
171 static void register_atexit (void);
172 static void exit_handler (void);
174 static void reader_open_file (struct casereader *reader);
175 static void write_case_to_disk (struct casefile *cf, const struct ccase *c);
176 static void flush_buffer (struct casefile *cf);
177 static bool fill_buffer (struct casereader *reader);
179 static void io_error (struct casefile *, const char *, ...)
180 PRINTF_FORMAT (2, 3);
181 static int safe_open (const char *filename, int flags);
182 static int safe_close (int fd);
184 /* Creates and returns a casefile to store cases of VALUE_CNT
185 `union value's each. */
187 casefile_create (size_t value_cnt)
189 struct casefile *cf = xmalloc (sizeof *cf);
190 cf->next = casefiles;
192 if (cf->next != NULL)
195 cf->value_cnt = value_cnt;
196 cf->case_acct_size = (cf->value_cnt + 4) * sizeof *cf->buffer;
198 cf->storage = MEMORY;
201 cf->being_destroyed = 0;
207 cf->buffer_size = ROUND_UP (cf->value_cnt, IO_BUF_SIZE);
208 if (cf->value_cnt > 0 && cf->buffer_size % cf->value_cnt > 64)
209 cf->buffer_size = cf->value_cnt;
215 /* Destroys casefile CF. */
217 casefile_destroy (struct casefile *cf)
221 if (cf->next != NULL)
222 cf->next->prev = cf->prev;
223 if (cf->prev != NULL)
224 cf->prev->next = cf->next;
226 casefiles = cf->next;
228 while (cf->readers != NULL)
229 casereader_destroy (cf->readers);
231 if (cf->cases != NULL)
233 size_t idx, block_cnt;
235 case_bytes -= cf->case_cnt * cf->case_acct_size;
236 for (idx = 0; idx < cf->case_cnt; idx++)
238 size_t block_idx = idx / CASES_PER_BLOCK;
239 size_t case_idx = idx % CASES_PER_BLOCK;
240 struct ccase *c = &cf->cases[block_idx][case_idx];
244 block_cnt = DIV_RND_UP (cf->case_cnt, CASES_PER_BLOCK);
245 for (idx = 0; idx < block_cnt; idx++)
246 free (cf->cases[idx]);
254 if (cf->filename != NULL && remove (cf->filename) == -1)
255 io_error (cf, _("%s: Removing temporary file: %s."),
256 cf->filename, strerror (errno));
265 /* Returns true if an I/O error has occurred in casefile CF. */
267 casefile_error (const struct casefile *cf)
272 /* Returns nonzero only if casefile CF is stored in memory (instead of on
275 casefile_in_core (const struct casefile *cf)
279 return cf->storage == MEMORY;
282 /* Puts a casefile to "sleep", that is, minimizes the resources
283 needed for it by closing its file descriptor and freeing its
284 buffer. This is useful if we need so many casefiles that we
285 might not have enough memory and file descriptors to go
288 For simplicity, this implementation always converts the
289 casefile to reader mode. If this turns out to be a problem,
290 with a little extra work we could also support sleeping
293 Returns true if successful, false if an I/O error occurred. */
295 casefile_sleep (const struct casefile *cf_)
297 struct casefile *cf = (struct casefile *) cf_;
300 casefile_mode_reader (cf);
301 casefile_to_disk (cf);
309 if (cf->buffer != NULL)
318 /* Returns the number of `union value's in a case for CF. */
320 casefile_get_value_cnt (const struct casefile *cf)
324 return cf->value_cnt;
327 /* Returns the number of cases in casefile CF. */
329 casefile_get_case_cnt (const struct casefile *cf)
336 /* Appends a copy of case C to casefile CF. Not valid after any
337 reader for CF has been created.
338 Returns true if successful, false if an I/O error occurred. */
340 casefile_append (struct casefile *cf, const struct ccase *c)
344 assert (cf->mode == WRITE);
346 /* Try memory first. */
347 if (cf->storage == MEMORY)
349 if (case_bytes < get_workspace ())
351 size_t block_idx = cf->case_cnt / CASES_PER_BLOCK;
352 size_t case_idx = cf->case_cnt % CASES_PER_BLOCK;
353 struct ccase new_case;
355 case_bytes += cf->case_acct_size;
356 case_clone (&new_case, c);
359 if ((block_idx & (block_idx - 1)) == 0)
361 size_t block_cap = block_idx == 0 ? 1 : block_idx * 2;
362 cf->cases = xnrealloc (cf->cases,
363 block_cap, sizeof *cf->cases);
366 cf->cases[block_idx] = xnmalloc (CASES_PER_BLOCK,
370 case_move (&cf->cases[block_idx][case_idx], &new_case);
374 casefile_to_disk (cf);
375 assert (cf->storage == DISK);
376 write_case_to_disk (cf, c);
380 write_case_to_disk (cf, c);
386 /* Appends case C to casefile CF, which takes over ownership of
387 C. Not valid after any reader for CF has been created.
388 Returns true if successful, false if an I/O error occurred. */
390 casefile_append_xfer (struct casefile *cf, struct ccase *c)
392 casefile_append (cf, c);
397 /* Writes case C to casefile CF's disk buffer, first flushing the buffer to
398 disk if it would otherwise overflow.
399 Returns true if successful, false if an I/O error occurred. */
401 write_case_to_disk (struct casefile *cf, const struct ccase *c)
406 case_to_values (c, cf->buffer + cf->buffer_used, cf->value_cnt);
407 cf->buffer_used += cf->value_cnt;
408 if (cf->buffer_used + cf->value_cnt > cf->buffer_size)
412 /* If any bytes in CF's output buffer are used, flush them to
415 flush_buffer (struct casefile *cf)
417 if (cf->ok && cf->buffer_used > 0)
419 if (!full_write (cf->fd, cf->buffer,
420 cf->buffer_size * sizeof *cf->buffer))
421 io_error (cf, _("Error writing temporary file: %s."),
429 /* If CF is currently stored in memory, writes it to disk. Readers, if any,
430 retain their current positions.
431 Returns true if successful, false if an I/O error occurred. */
433 casefile_to_disk (const struct casefile *cf_)
435 struct casefile *cf = (struct casefile *) cf_;
436 struct casereader *reader;
440 if (cf->storage == MEMORY)
442 size_t idx, block_cnt;
444 assert (cf->filename == NULL);
445 assert (cf->fd == -1);
446 assert (cf->buffer_used == 0);
448 if (!make_temp_file (&cf->fd, &cf->filename))
455 cf->buffer = xnmalloc (cf->buffer_size, sizeof *cf->buffer);
456 memset (cf->buffer, 0, cf->buffer_size * sizeof *cf->buffer);
458 case_bytes -= cf->case_cnt * cf->case_acct_size;
459 for (idx = 0; idx < cf->case_cnt; idx++)
461 size_t block_idx = idx / CASES_PER_BLOCK;
462 size_t case_idx = idx % CASES_PER_BLOCK;
463 struct ccase *c = &cf->cases[block_idx][case_idx];
464 write_case_to_disk (cf, c);
468 block_cnt = DIV_RND_UP (cf->case_cnt, CASES_PER_BLOCK);
469 for (idx = 0; idx < block_cnt; idx++)
470 free (cf->cases[idx]);
475 if (cf->mode == READ)
478 for (reader = cf->readers; reader != NULL; reader = reader->next)
479 reader_open_file (reader);
484 /* Changes CF to reader mode, ensuring that no more cases may be
485 added. Creating a casereader for CF has the same effect. */
487 casefile_mode_reader (struct casefile *cf)
493 /* Creates and returns a casereader for CF. A casereader can be used to
494 sequentially read the cases in a casefile. */
496 casefile_get_reader (const struct casefile *cf_)
498 struct casefile *cf = (struct casefile *) cf_;
499 struct casereader *reader;
502 assert (!cf->being_destroyed);
504 /* Flush the buffer to disk if it's not empty. */
505 if (cf->mode == WRITE && cf->storage == DISK)
510 reader = xmalloc (sizeof *reader);
511 reader->next = cf->readers;
512 if (cf->readers != NULL)
513 reader->next->prev = reader;
514 cf->readers = reader;
517 reader->case_idx = 0;
518 reader->destructive = 0;
520 reader->buffer = NULL;
521 reader->buffer_pos = 0;
522 case_nullify (&reader->c);
524 if (reader->cf->storage == DISK)
525 reader_open_file (reader);
530 /* Creates and returns a destructive casereader for CF. Like a
531 normal casereader, a destructive casereader sequentially reads
532 the cases in a casefile. Unlike a normal casereader, a
533 destructive reader cannot operate concurrently with any other
534 reader. (This restriction could be relaxed in a few ways, but
535 it is so far unnecessary for other code.) */
537 casefile_get_destructive_reader (struct casefile *cf)
539 struct casereader *reader;
541 assert (cf->readers == NULL);
542 reader = casefile_get_reader (cf);
543 reader->destructive = 1;
544 cf->being_destroyed = 1;
548 /* Opens a disk file for READER and seeks to the current position as indicated
549 by case_idx. Normally the current position is the beginning of the file,
550 but casefile_to_disk may cause the file to be opened at a different
553 reader_open_file (struct casereader *reader)
555 struct casefile *cf = reader->cf;
558 if (!cf->ok || reader->case_idx >= cf->case_cnt)
568 reader->fd = safe_open (cf->filename, O_RDONLY);
570 io_error (cf, _("%s: Opening temporary file: %s."),
571 cf->filename, strerror (errno));
574 if (cf->buffer != NULL)
576 reader->buffer = cf->buffer;
581 reader->buffer = xnmalloc (cf->buffer_size, sizeof *cf->buffer);
582 memset (reader->buffer, 0, cf->buffer_size * sizeof *cf->buffer);
585 if (cf->value_cnt != 0)
587 size_t buffer_case_cnt = cf->buffer_size / cf->value_cnt;
588 file_ofs = ((off_t) reader->case_idx / buffer_case_cnt
589 * cf->buffer_size * sizeof *cf->buffer);
590 reader->buffer_pos = (reader->case_idx % buffer_case_cnt
595 if (lseek (reader->fd, file_ofs, SEEK_SET) != file_ofs)
596 io_error (cf, _("%s: Seeking temporary file: %s."),
597 cf->filename, strerror (errno));
599 if (cf->case_cnt > 0 && cf->value_cnt > 0)
600 fill_buffer (reader);
602 case_create (&reader->c, cf->value_cnt);
605 /* Fills READER's buffer by reading a block from disk. */
607 fill_buffer (struct casereader *reader)
611 int bytes = full_read (reader->fd, reader->buffer,
612 reader->cf->buffer_size * sizeof *reader->buffer);
614 io_error (reader->cf, _("%s: Reading temporary file: %s."),
615 reader->cf->filename, strerror (errno));
616 else if (bytes != reader->cf->buffer_size * sizeof *reader->buffer)
617 io_error (reader->cf, _("%s: Temporary file ended unexpectedly."),
618 reader->cf->filename);
620 return reader->cf->ok;
623 /* Returns the casefile that READER reads. */
624 const struct casefile *
625 casereader_get_casefile (const struct casereader *reader)
627 assert (reader != NULL);
632 /* Reads a copy of the next case from READER into C.
633 Caller is responsible for destroying C.
634 Returns true if successful, false at end of file. */
636 casereader_read (struct casereader *reader, struct ccase *c)
638 assert (reader != NULL);
640 if (!reader->cf->ok || reader->case_idx >= reader->cf->case_cnt)
643 if (reader->cf->storage == MEMORY)
645 size_t block_idx = reader->case_idx / CASES_PER_BLOCK;
646 size_t case_idx = reader->case_idx % CASES_PER_BLOCK;
648 case_clone (c, &reader->cf->cases[block_idx][case_idx]);
654 if (reader->buffer_pos + reader->cf->value_cnt > reader->cf->buffer_size)
656 if (!fill_buffer (reader))
658 reader->buffer_pos = 0;
661 case_from_values (&reader->c, reader->buffer + reader->buffer_pos,
662 reader->cf->value_cnt);
663 reader->buffer_pos += reader->cf->value_cnt;
666 case_clone (c, &reader->c);
671 /* Reads the next case from READER into C and transfers ownership
672 to the caller. Caller is responsible for destroying C.
673 Returns true if successful, false at end of file or on I/O
676 casereader_read_xfer (struct casereader *reader, struct ccase *c)
678 assert (reader != NULL);
680 if (reader->destructive == 0
681 || reader->case_idx >= reader->cf->case_cnt
682 || reader->cf->storage == DISK)
683 return casereader_read (reader, c);
686 size_t block_idx = reader->case_idx / CASES_PER_BLOCK;
687 size_t case_idx = reader->case_idx % CASES_PER_BLOCK;
688 struct ccase *read_case = &reader->cf->cases[block_idx][case_idx];
690 case_move (c, read_case);
696 /* Destroys READER. */
698 casereader_destroy (struct casereader *reader)
700 assert (reader != NULL);
702 if (reader->next != NULL)
703 reader->next->prev = reader->prev;
704 if (reader->prev != NULL)
705 reader->prev->next = reader->next;
706 if (reader->cf->readers == reader)
707 reader->cf->readers = reader->next;
709 if (reader->cf->buffer == NULL)
710 reader->cf->buffer = reader->buffer;
712 free (reader->buffer);
714 if (reader->fd != -1)
716 if (reader->cf->fd == -1)
717 reader->cf->fd = reader->fd;
719 safe_close (reader->fd);
722 case_destroy (&reader->c);
727 /* Marks CF as having encountered an I/O error.
728 If this is the first error on CF, reports FORMAT to the user,
729 doing printf()-style substitutions. */
731 io_error (struct casefile *cf, const char *format, ...)
739 e.where.filename = NULL;
740 e.where.line_number = -1;
743 va_start (args, format);
744 err_vmsg (&e, format, args);
750 /* Calls open(), passing FILENAME and FLAGS, repeating as necessary
751 to deal with interrupted calls. */
753 safe_open (const char *filename, int flags)
759 fd = open (filename, flags);
761 while (fd == -1 && errno == EINTR);
766 /* Calls close(), passing FD, repeating as necessary to deal with
767 interrupted calls. */
768 static int safe_close (int fd)
776 while (retval == -1 && errno == EINTR);
781 /* Registers our exit handler with atexit() if it has not already
784 register_atexit (void)
786 static int registered = 0;
790 atexit (exit_handler);
796 /* atexit() handler that closes and deletes our temporary
801 while (casefiles != NULL)
802 casefile_destroy (casefiles);