1 /* PSPP - computes sample statistics.
2 Copyright (C) 2004 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
29 #include <libpspp/alloc.h>
31 #include <libpspp/compiler.h>
32 #include <libpspp/message.h>
33 #include "full-read.h"
34 #include "full-write.h"
35 #include <libpspp/misc.h>
36 #include "make-file.h"
41 #define _(msgid) gettext (msgid)
43 #define IO_BUF_SIZE (8192 / sizeof (union value))
45 /* A casefile represents a sequentially accessible stream of
48 If workspace allows, a casefile is maintained in memory. If
49 workspace overflows, then the casefile is pushed to disk. In
50 either case the interface presented to callers is kept the
53 The life cycle of a casefile consists of up to three phases:
55 1. Writing. The casefile initially contains no cases. In
56 this phase, any number of cases may be appended to the
57 end of a casefile. (Cases are never inserted in the
58 middle or before the beginning of a casefile.)
60 Use casefile_append() or casefile_append_xfer() to
61 append a case to a casefile.
63 2. Reading. The casefile may be read sequentially,
64 starting from the beginning, by "casereaders". Any
65 number of casereaders may be created, at any time,
66 during the reading phase. Each casereader has an
67 independent position in the casefile.
69 Ordinary casereaders may only move forward. They
70 cannot move backward to arbitrary records or seek
71 randomly. Cloning casereaders is possible, but it is
74 Use casefile_get_reader() to create a casereader for
75 use in phase 2. This also transitions from phase 1 to
76 phase 2. Calling casefile_mode_reader() makes the same
77 transition, without creating a casereader.
79 Use casereader_read() or casereader_read_xfer() to read
80 a case from a casereader. Use casereader_destroy() to
81 discard a casereader when it is no longer needed.
83 "Random" casereaders, which support a seek operation,
84 may also be created. These should not, generally, be
85 used for statistical procedures, because random access
86 is much slower than sequential access. They are
87 intended for use by the GUI.
89 3. Destruction. This phase is optional. The casefile is
90 also read with casereaders in this phase, but the
91 ability to create new casereaders is curtailed.
93 In this phase, casereaders could still be cloned (once
94 we eventually implement cloning).
96 To transition from phase 1 or 2 to phase 3 and create a
97 casereader, call casefile_get_destructive_reader().
98 The same functions apply to the casereader obtained
99 this way as apply to casereaders obtained in phase 2.
101 After casefile_get_destructive_reader() is called, no
102 more casereaders may be created with
103 casefile_get_reader() or
104 casefile_get_destructive_reader(). (If cloning of
105 casereaders were implemented, it would still be
108 The purpose of the limitations applied to casereaders
109 in phase 3 is to allow in-memory casefiles to fully
110 transfer ownership of cases to the casereaders,
111 avoiding the need for extra copies of case data. For
112 relatively static data sets with many variables, I
113 suspect (without evidence) that this may be a big
116 When a casefile is no longer needed, it may be destroyed with
117 casefile_destroy(). This function will also destroy any
118 remaining casereaders. */
120 /* FIXME: should we implement compression? */
122 /* In-memory cases are arranged in an array of arrays. The top
123 level is variable size and the size of each bottom level array
124 is fixed at the number of cases defined here. */
125 #define CASES_PER_BLOCK 128
131 struct casefile *next, *prev; /* Next, prev in global list. */
132 size_t value_cnt; /* Case size in `union value's. */
133 size_t case_acct_size; /* Case size for accounting. */
134 unsigned long case_cnt; /* Number of cases stored. */
135 enum { MEMORY, DISK } storage; /* Where cases are stored. */
136 enum { WRITE, READ } mode; /* Is writing or reading allowed? */
137 struct casereader *readers; /* List of our readers. */
138 bool being_destroyed; /* Does a destructive reader exist? */
139 bool ok; /* False after I/O error. */
141 /* Memory storage. */
142 struct ccase **cases; /* Pointer to array of cases. */
145 int fd; /* File descriptor, -1 if none. */
146 char *file_name; /* File name. */
147 union value *buffer; /* I/O buffer, NULL if none. */
148 size_t buffer_used; /* Number of values used in buffer. */
149 size_t buffer_size; /* Buffer size in values. */
152 /* For reading out the cases in a casefile. */
155 struct casereader *next, *prev; /* Next, prev in casefile's list. */
156 struct casefile *cf; /* Our casefile. */
157 unsigned long case_idx; /* Case number of current case. */
158 bool destructive; /* Is this a destructive reader? */
159 bool random; /* Is this a random reader? */
162 int fd; /* File descriptor. */
163 off_t file_ofs; /* Current position in fd. */
164 off_t buffer_ofs; /* File offset of buffer start. */
165 union value *buffer; /* I/O buffer. */
166 size_t buffer_pos; /* Offset of buffer position. */
167 struct ccase c; /* Current case. */
170 /* Return the case number of the current case */
172 casereader_cnum(const struct casereader *r)
177 /* Doubly linked list of all casefiles. */
178 static struct casefile *casefiles;
180 /* Number of bytes of case allocated in in-memory casefiles. */
181 static size_t case_bytes;
183 static void register_atexit (void);
184 static void exit_handler (void);
186 static void reader_open_file (struct casereader *);
187 static void write_case_to_disk (struct casefile *, const struct ccase *);
188 static void flush_buffer (struct casefile *);
189 static void seek_and_fill_buffer (struct casereader *);
190 static bool fill_buffer (struct casereader *);
192 static void io_error (struct casefile *, const char *, ...)
193 PRINTF_FORMAT (2, 3);
194 static int safe_open (const char *file_name, int flags);
195 static int safe_close (int fd);
197 /* Creates and returns a casefile to store cases of VALUE_CNT
198 `union value's each. */
200 casefile_create (size_t value_cnt)
202 struct casefile *cf = xmalloc (sizeof *cf);
203 cf->next = casefiles;
205 if (cf->next != NULL)
208 cf->value_cnt = value_cnt;
209 cf->case_acct_size = (cf->value_cnt + 4) * sizeof *cf->buffer;
211 cf->storage = MEMORY;
214 cf->being_destroyed = 0;
218 cf->file_name = NULL;
220 cf->buffer_size = ROUND_UP (cf->value_cnt, IO_BUF_SIZE);
221 if (cf->value_cnt > 0 && cf->buffer_size % cf->value_cnt > 64)
222 cf->buffer_size = cf->value_cnt;
228 /* Destroys casefile CF. */
230 casefile_destroy (struct casefile *cf)
234 if (cf->next != NULL)
235 cf->next->prev = cf->prev;
236 if (cf->prev != NULL)
237 cf->prev->next = cf->next;
239 casefiles = cf->next;
241 while (cf->readers != NULL)
242 casereader_destroy (cf->readers);
244 if (cf->cases != NULL)
246 size_t idx, block_cnt;
248 case_bytes -= cf->case_cnt * cf->case_acct_size;
249 for (idx = 0; idx < cf->case_cnt; idx++)
251 size_t block_idx = idx / CASES_PER_BLOCK;
252 size_t case_idx = idx % CASES_PER_BLOCK;
253 struct ccase *c = &cf->cases[block_idx][case_idx];
257 block_cnt = DIV_RND_UP (cf->case_cnt, CASES_PER_BLOCK);
258 for (idx = 0; idx < block_cnt; idx++)
259 free (cf->cases[idx]);
267 if (cf->file_name != NULL && remove (cf->file_name) == -1)
268 io_error (cf, _("%s: Removing temporary file: %s."),
269 cf->file_name, strerror (errno));
270 free (cf->file_name);
278 /* Returns true if an I/O error has occurred in casefile CF. */
280 casefile_error (const struct casefile *cf)
285 /* Returns true only if casefile CF is stored in memory (instead of on
286 disk), false otherwise. */
288 casefile_in_core (const struct casefile *cf)
292 return cf->storage == MEMORY;
295 /* Puts a casefile to "sleep", that is, minimizes the resources
296 needed for it by closing its file descriptor and freeing its
297 buffer. This is useful if we need so many casefiles that we
298 might not have enough memory and file descriptors to go
301 For simplicity, this implementation always converts the
302 casefile to reader mode. If this turns out to be a problem,
303 with a little extra work we could also support sleeping
306 Returns true if successful, false if an I/O error occurred. */
308 casefile_sleep (const struct casefile *cf_)
310 struct casefile *cf = (struct casefile *) cf_;
313 casefile_mode_reader (cf);
314 casefile_to_disk (cf);
322 if (cf->buffer != NULL)
331 /* Returns the number of `union value's in a case for CF. */
333 casefile_get_value_cnt (const struct casefile *cf)
337 return cf->value_cnt;
340 /* Returns the number of cases in casefile CF. */
342 casefile_get_case_cnt (const struct casefile *cf)
349 /* Appends a copy of case C to casefile CF. Not valid after any
350 reader for CF has been created.
351 Returns true if successful, false if an I/O error occurred. */
353 casefile_append (struct casefile *cf, const struct ccase *c)
357 assert (cf->mode == WRITE);
359 /* Try memory first. */
360 if (cf->storage == MEMORY)
362 if (case_bytes < get_workspace ())
364 size_t block_idx = cf->case_cnt / CASES_PER_BLOCK;
365 size_t case_idx = cf->case_cnt % CASES_PER_BLOCK;
366 struct ccase new_case;
368 case_bytes += cf->case_acct_size;
369 case_clone (&new_case, c);
372 if ((block_idx & (block_idx - 1)) == 0)
374 size_t block_cap = block_idx == 0 ? 1 : block_idx * 2;
375 cf->cases = xnrealloc (cf->cases,
376 block_cap, sizeof *cf->cases);
379 cf->cases[block_idx] = xnmalloc (CASES_PER_BLOCK,
383 case_move (&cf->cases[block_idx][case_idx], &new_case);
387 casefile_to_disk (cf);
388 assert (cf->storage == DISK);
389 write_case_to_disk (cf, c);
393 write_case_to_disk (cf, c);
399 /* Appends case C to casefile CF, which takes over ownership of
400 C. Not valid after any reader for CF has been created.
401 Returns true if successful, false if an I/O error occurred. */
403 casefile_append_xfer (struct casefile *cf, struct ccase *c)
405 casefile_append (cf, c);
410 /* Writes case C to casefile CF's disk buffer, first flushing the buffer to
411 disk if it would otherwise overflow.
412 Returns true if successful, false if an I/O error occurred. */
414 write_case_to_disk (struct casefile *cf, const struct ccase *c)
419 case_to_values (c, cf->buffer + cf->buffer_used, cf->value_cnt);
420 cf->buffer_used += cf->value_cnt;
421 if (cf->buffer_used + cf->value_cnt > cf->buffer_size)
425 /* If any bytes in CF's output buffer are used, flush them to
428 flush_buffer (struct casefile *cf)
430 if (cf->ok && cf->buffer_used > 0)
432 if (!full_write (cf->fd, cf->buffer,
433 cf->buffer_size * sizeof *cf->buffer))
434 io_error (cf, _("Error writing temporary file: %s."),
440 /* If CF is currently stored in memory, writes it to disk. Readers, if any,
441 retain their current positions.
442 Returns true if successful, false if an I/O error occurred. */
444 casefile_to_disk (const struct casefile *cf_)
446 struct casefile *cf = (struct casefile *) cf_;
447 struct casereader *reader;
451 if (cf->storage == MEMORY)
453 size_t idx, block_cnt;
455 assert (cf->file_name == NULL);
456 assert (cf->fd == -1);
457 assert (cf->buffer_used == 0);
459 if (!make_temp_file (&cf->fd, &cf->file_name))
466 cf->buffer = xnmalloc (cf->buffer_size, sizeof *cf->buffer);
467 memset (cf->buffer, 0, cf->buffer_size * sizeof *cf->buffer);
469 case_bytes -= cf->case_cnt * cf->case_acct_size;
470 for (idx = 0; idx < cf->case_cnt; idx++)
472 size_t block_idx = idx / CASES_PER_BLOCK;
473 size_t case_idx = idx % CASES_PER_BLOCK;
474 struct ccase *c = &cf->cases[block_idx][case_idx];
475 write_case_to_disk (cf, c);
479 block_cnt = DIV_RND_UP (cf->case_cnt, CASES_PER_BLOCK);
480 for (idx = 0; idx < block_cnt; idx++)
481 free (cf->cases[idx]);
486 if (cf->mode == READ)
489 for (reader = cf->readers; reader != NULL; reader = reader->next)
490 reader_open_file (reader);
495 /* Changes CF to reader mode, ensuring that no more cases may be
496 added. Creating a casereader for CF has the same effect. */
498 casefile_mode_reader (struct casefile *cf)
504 /* Creates and returns a casereader for CF. A casereader can be used to
505 sequentially read the cases in a casefile. */
507 casefile_get_reader (const struct casefile *cf_)
509 struct casefile *cf = (struct casefile *) cf_;
510 struct casereader *reader;
513 assert (!cf->being_destroyed);
515 /* Flush the buffer to disk if it's not empty. */
516 if (cf->mode == WRITE && cf->storage == DISK)
521 reader = xmalloc (sizeof *reader);
522 reader->next = cf->readers;
523 if (cf->readers != NULL)
524 reader->next->prev = reader;
525 cf->readers = reader;
528 reader->case_idx = 0;
529 reader->destructive = 0;
530 reader->random = false;
532 reader->buffer = NULL;
533 reader->buffer_pos = 0;
534 case_nullify (&reader->c);
536 if (reader->cf->storage == DISK)
537 reader_open_file (reader);
542 /* Creates and returns a random casereader for CF. A random
543 casereader can be used to randomly read the cases in a
546 casefile_get_random_reader (const struct casefile *cf)
548 struct casereader *reader = casefile_get_reader (cf);
549 reader->random = true;
553 /* Creates and returns a destructive casereader for CF. Like a
554 normal casereader, a destructive casereader sequentially reads
555 the cases in a casefile. Unlike a normal casereader, a
556 destructive reader cannot operate concurrently with any other
557 reader. (This restriction could be relaxed in a few ways, but
558 it is so far unnecessary for other code.) */
560 casefile_get_destructive_reader (struct casefile *cf)
562 struct casereader *reader;
564 assert (cf->readers == NULL);
565 reader = casefile_get_reader (cf);
566 reader->destructive = 1;
567 cf->being_destroyed = 1;
571 /* Opens a disk file for READER and seeks to the current position as indicated
572 by case_idx. Normally the current position is the beginning of the file,
573 but casefile_to_disk may cause the file to be opened at a different
576 reader_open_file (struct casereader *reader)
578 struct casefile *cf = reader->cf;
579 if (!cf->ok || reader->case_idx >= cf->case_cnt)
589 reader->fd = safe_open (cf->file_name, O_RDONLY);
591 io_error (cf, _("%s: Opening temporary file: %s."),
592 cf->file_name, strerror (errno));
595 if (cf->buffer != NULL)
597 reader->buffer = cf->buffer;
602 reader->buffer = xnmalloc (cf->buffer_size, sizeof *cf->buffer);
603 memset (reader->buffer, 0, cf->buffer_size * sizeof *cf->buffer);
606 case_create (&reader->c, cf->value_cnt);
608 reader->buffer_ofs = -1;
609 reader->file_ofs = -1;
610 seek_and_fill_buffer (reader);
613 /* Seeks the backing file for READER to the proper position and
614 refreshes the buffer contents. */
616 seek_and_fill_buffer (struct casereader *reader)
618 struct casefile *cf = reader->cf;
621 if (cf->value_cnt != 0)
623 size_t buffer_case_cnt = cf->buffer_size / cf->value_cnt;
624 new_ofs = ((off_t) reader->case_idx / buffer_case_cnt
625 * cf->buffer_size * sizeof *cf->buffer);
626 reader->buffer_pos = (reader->case_idx % buffer_case_cnt
631 if (new_ofs != reader->file_ofs)
633 if (lseek (reader->fd, new_ofs, SEEK_SET) != new_ofs)
634 io_error (cf, _("%s: Seeking temporary file: %s."),
635 cf->file_name, strerror (errno));
637 reader->file_ofs = new_ofs;
640 if (cf->case_cnt > 0 && cf->value_cnt > 0 && reader->buffer_ofs != new_ofs)
641 fill_buffer (reader);
644 /* Fills READER's buffer by reading a block from disk. */
646 fill_buffer (struct casereader *reader)
650 int bytes = full_read (reader->fd, reader->buffer,
651 reader->cf->buffer_size * sizeof *reader->buffer);
653 io_error (reader->cf, _("%s: Reading temporary file: %s."),
654 reader->cf->file_name, strerror (errno));
655 else if (bytes != reader->cf->buffer_size * sizeof *reader->buffer)
656 io_error (reader->cf, _("%s: Temporary file ended unexpectedly."),
657 reader->cf->file_name);
660 reader->buffer_ofs = reader->file_ofs;
661 reader->file_ofs += bytes;
664 return reader->cf->ok;
667 /* Returns the casefile that READER reads. */
668 const struct casefile *
669 casereader_get_casefile (const struct casereader *reader)
671 assert (reader != NULL);
676 /* Reads a copy of the next case from READER into C.
677 Caller is responsible for destroying C.
678 Returns true if successful, false at end of file. */
680 casereader_read (struct casereader *reader, struct ccase *c)
682 assert (reader != NULL);
684 if (!reader->cf->ok || reader->case_idx >= reader->cf->case_cnt)
687 if (reader->cf->storage == MEMORY)
689 size_t block_idx = reader->case_idx / CASES_PER_BLOCK;
690 size_t case_idx = reader->case_idx % CASES_PER_BLOCK;
692 case_clone (c, &reader->cf->cases[block_idx][case_idx]);
698 if (reader->buffer_pos + reader->cf->value_cnt > reader->cf->buffer_size)
700 if (!fill_buffer (reader))
702 reader->buffer_pos = 0;
705 case_from_values (&reader->c, reader->buffer + reader->buffer_pos,
706 reader->cf->value_cnt);
707 reader->buffer_pos += reader->cf->value_cnt;
710 case_clone (c, &reader->c);
715 /* Reads the next case from READER into C and transfers ownership
716 to the caller. Caller is responsible for destroying C.
717 Returns true if successful, false at end of file or on I/O
720 casereader_read_xfer (struct casereader *reader, struct ccase *c)
722 assert (reader != NULL);
724 if (reader->destructive == 0
725 || reader->case_idx >= reader->cf->case_cnt
726 || reader->cf->storage == DISK)
727 return casereader_read (reader, c);
730 size_t block_idx = reader->case_idx / CASES_PER_BLOCK;
731 size_t case_idx = reader->case_idx % CASES_PER_BLOCK;
732 struct ccase *read_case = &reader->cf->cases[block_idx][case_idx];
734 case_move (c, read_case);
740 /* Sets the next case to be read by READER to CASE_IDX,
741 which must be less than the number of cases in the casefile.
742 Allowed only for random readers. */
744 casereader_seek (struct casereader *reader, unsigned long case_idx)
746 assert (reader != NULL);
747 assert (reader->random);
748 assert (case_idx < reader->cf->case_cnt);
750 reader->case_idx = case_idx;
751 if (reader->cf->storage == DISK)
752 seek_and_fill_buffer (reader);
755 /* Destroys READER. */
757 casereader_destroy (struct casereader *reader)
759 assert (reader != NULL);
761 if (reader->next != NULL)
762 reader->next->prev = reader->prev;
763 if (reader->prev != NULL)
764 reader->prev->next = reader->next;
765 if (reader->cf->readers == reader)
766 reader->cf->readers = reader->next;
768 if (reader->cf->buffer == NULL)
769 reader->cf->buffer = reader->buffer;
771 free (reader->buffer);
773 if (reader->fd != -1)
775 if (reader->cf->fd == -1)
776 reader->cf->fd = reader->fd;
778 safe_close (reader->fd);
781 case_destroy (&reader->c);
786 /* Marks CF as having encountered an I/O error.
787 If this is the first error on CF, reports FORMAT to the user,
788 doing printf()-style substitutions. */
790 io_error (struct casefile *cf, const char *format, ...)
797 m.category = MSG_GENERAL;
798 m.severity = MSG_ERROR;
799 m.where.file_name = NULL;
800 m.where.line_number = -1;
801 va_start (args, format);
802 m.text = xvasprintf (format, args);
810 /* Calls open(), passing FILE_NAME and FLAGS, repeating as necessary
811 to deal with interrupted calls. */
813 safe_open (const char *file_name, int flags)
819 fd = open (file_name, flags);
821 while (fd == -1 && errno == EINTR);
826 /* Calls close(), passing FD, repeating as necessary to deal with
827 interrupted calls. */
828 static int safe_close (int fd)
836 while (retval == -1 && errno == EINTR);
841 /* Registers our exit handler with atexit() if it has not already
844 register_atexit (void)
846 static bool registered = false;
850 atexit (exit_handler);
854 /* atexit() handler that closes and deletes our temporary
859 while (casefiles != NULL)
860 casefile_destroy (casefiles);