1 /* PSPP - computes sample statistics.
2 Copyright (C) 2004 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
29 #include <libpspp/alloc.h>
31 #include <libpspp/compiler.h>
32 #include <libpspp/message.h>
33 #include "full-read.h"
34 #include "full-write.h"
35 #include <libpspp/misc.h>
36 #include <libpspp/str.h>
37 #include "make-file.h"
42 #define _(msgid) gettext (msgid)
44 #define IO_BUF_SIZE (8192 / sizeof (union value))
46 /* A casefile represents a sequentially accessible stream of
49 If workspace allows, a casefile is maintained in memory. If
50 workspace overflows, then the casefile is pushed to disk. In
51 either case the interface presented to callers is kept the
54 The life cycle of a casefile consists of up to three phases:
56 1. Writing. The casefile initially contains no cases. In
57 this phase, any number of cases may be appended to the
58 end of a casefile. (Cases are never inserted in the
59 middle or before the beginning of a casefile.)
61 Use casefile_append() or casefile_append_xfer() to
62 append a case to a casefile.
64 2. Reading. The casefile may be read sequentially,
65 starting from the beginning, by "casereaders". Any
66 number of casereaders may be created, at any time,
67 during the reading phase. Each casereader has an
68 independent position in the casefile.
70 Ordinary casereaders may only move forward. They
71 cannot move backward to arbitrary records or seek
72 randomly. Cloning casereaders is possible, but it is
75 Use casefile_get_reader() to create a casereader for
76 use in phase 2. This also transitions from phase 1 to
77 phase 2. Calling casefile_mode_reader() makes the same
78 transition, without creating a casereader.
80 Use casereader_read() or casereader_read_xfer() to read
81 a case from a casereader. Use casereader_destroy() to
82 discard a casereader when it is no longer needed.
84 "Random" casereaders, which support a seek operation,
85 may also be created. These should not, generally, be
86 used for statistical procedures, because random access
87 is much slower than sequential access. They are
88 intended for use by the GUI.
90 3. Destruction. This phase is optional. The casefile is
91 also read with casereaders in this phase, but the
92 ability to create new casereaders is curtailed.
94 In this phase, casereaders could still be cloned (once
95 we eventually implement cloning).
97 To transition from phase 1 or 2 to phase 3 and create a
98 casereader, call casefile_get_destructive_reader().
99 The same functions apply to the casereader obtained
100 this way as apply to casereaders obtained in phase 2.
102 After casefile_get_destructive_reader() is called, no
103 more casereaders may be created with
104 casefile_get_reader() or
105 casefile_get_destructive_reader(). (If cloning of
106 casereaders were implemented, it would still be
109 The purpose of the limitations applied to casereaders
110 in phase 3 is to allow in-memory casefiles to fully
111 transfer ownership of cases to the casereaders,
112 avoiding the need for extra copies of case data. For
113 relatively static data sets with many variables, I
114 suspect (without evidence) that this may be a big
117 When a casefile is no longer needed, it may be destroyed with
118 casefile_destroy(). This function will also destroy any
119 remaining casereaders. */
121 /* FIXME: should we implement compression? */
123 /* In-memory cases are arranged in an array of arrays. The top
124 level is variable size and the size of each bottom level array
125 is fixed at the number of cases defined here. */
126 #define CASES_PER_BLOCK 128
132 struct casefile *next, *prev; /* Next, prev in global list. */
133 size_t value_cnt; /* Case size in `union value's. */
134 size_t case_acct_size; /* Case size for accounting. */
135 unsigned long case_cnt; /* Number of cases stored. */
136 enum { MEMORY, DISK } storage; /* Where cases are stored. */
137 enum { WRITE, READ } mode; /* Is writing or reading allowed? */
138 struct casereader *readers; /* List of our readers. */
139 bool being_destroyed; /* Does a destructive reader exist? */
140 bool ok; /* False after I/O error. */
142 /* Memory storage. */
143 struct ccase **cases; /* Pointer to array of cases. */
146 int fd; /* File descriptor, -1 if none. */
147 char *file_name; /* File name. */
148 union value *buffer; /* I/O buffer, NULL if none. */
149 size_t buffer_used; /* Number of values used in buffer. */
150 size_t buffer_size; /* Buffer size in values. */
153 /* For reading out the cases in a casefile. */
156 struct casereader *next, *prev; /* Next, prev in casefile's list. */
157 struct casefile *cf; /* Our casefile. */
158 unsigned long case_idx; /* Case number of current case. */
159 bool destructive; /* Is this a destructive reader? */
160 bool random; /* Is this a random reader? */
163 int fd; /* File descriptor. */
164 off_t file_ofs; /* Current position in fd. */
165 off_t buffer_ofs; /* File offset of buffer start. */
166 union value *buffer; /* I/O buffer. */
167 size_t buffer_pos; /* Offset of buffer position. */
168 struct ccase c; /* Current case. */
171 /* Return the case number of the current case */
173 casereader_cnum(const struct casereader *r)
178 /* Doubly linked list of all casefiles. */
179 static struct casefile *casefiles;
181 /* Number of bytes of case allocated in in-memory casefiles. */
182 static size_t case_bytes;
184 static void register_atexit (void);
185 static void exit_handler (void);
187 static void reader_open_file (struct casereader *);
188 static void write_case_to_disk (struct casefile *, const struct ccase *);
189 static void flush_buffer (struct casefile *);
190 static void seek_and_fill_buffer (struct casereader *);
191 static bool fill_buffer (struct casereader *);
193 static void io_error (struct casefile *, const char *, ...)
194 PRINTF_FORMAT (2, 3);
195 static int safe_open (const char *file_name, int flags);
196 static int safe_close (int fd);
198 /* Creates and returns a casefile to store cases of VALUE_CNT
199 `union value's each. */
201 casefile_create (size_t value_cnt)
203 struct casefile *cf = xmalloc (sizeof *cf);
204 cf->next = casefiles;
206 if (cf->next != NULL)
209 cf->value_cnt = value_cnt;
210 cf->case_acct_size = (cf->value_cnt + 4) * sizeof *cf->buffer;
212 cf->storage = MEMORY;
215 cf->being_destroyed = 0;
219 cf->file_name = NULL;
221 cf->buffer_size = ROUND_UP (cf->value_cnt, IO_BUF_SIZE);
222 if (cf->value_cnt > 0 && cf->buffer_size % cf->value_cnt > 64)
223 cf->buffer_size = cf->value_cnt;
229 /* Destroys casefile CF. */
231 casefile_destroy (struct casefile *cf)
235 if (cf->next != NULL)
236 cf->next->prev = cf->prev;
237 if (cf->prev != NULL)
238 cf->prev->next = cf->next;
240 casefiles = cf->next;
242 while (cf->readers != NULL)
243 casereader_destroy (cf->readers);
245 if (cf->cases != NULL)
247 size_t idx, block_cnt;
249 case_bytes -= cf->case_cnt * cf->case_acct_size;
250 for (idx = 0; idx < cf->case_cnt; idx++)
252 size_t block_idx = idx / CASES_PER_BLOCK;
253 size_t case_idx = idx % CASES_PER_BLOCK;
254 struct ccase *c = &cf->cases[block_idx][case_idx];
258 block_cnt = DIV_RND_UP (cf->case_cnt, CASES_PER_BLOCK);
259 for (idx = 0; idx < block_cnt; idx++)
260 free (cf->cases[idx]);
268 if (cf->file_name != NULL && remove (cf->file_name) == -1)
269 io_error (cf, _("%s: Removing temporary file: %s."),
270 cf->file_name, strerror (errno));
271 free (cf->file_name);
279 /* Returns true if an I/O error has occurred in casefile CF. */
281 casefile_error (const struct casefile *cf)
286 /* Returns true only if casefile CF is stored in memory (instead of on
287 disk), false otherwise. */
289 casefile_in_core (const struct casefile *cf)
293 return cf->storage == MEMORY;
296 /* Puts a casefile to "sleep", that is, minimizes the resources
297 needed for it by closing its file descriptor and freeing its
298 buffer. This is useful if we need so many casefiles that we
299 might not have enough memory and file descriptors to go
302 For simplicity, this implementation always converts the
303 casefile to reader mode. If this turns out to be a problem,
304 with a little extra work we could also support sleeping
307 Returns true if successful, false if an I/O error occurred. */
309 casefile_sleep (const struct casefile *cf_)
311 struct casefile *cf = (struct casefile *) cf_;
314 casefile_mode_reader (cf);
315 casefile_to_disk (cf);
323 if (cf->buffer != NULL)
332 /* Returns the number of `union value's in a case for CF. */
334 casefile_get_value_cnt (const struct casefile *cf)
338 return cf->value_cnt;
341 /* Returns the number of cases in casefile CF. */
343 casefile_get_case_cnt (const struct casefile *cf)
350 /* Appends a copy of case C to casefile CF. Not valid after any
351 reader for CF has been created.
352 Returns true if successful, false if an I/O error occurred. */
354 casefile_append (struct casefile *cf, const struct ccase *c)
358 assert (cf->mode == WRITE);
360 assert ( cf->value_cnt <= c->case_data->value_cnt );
362 /* Try memory first. */
363 if (cf->storage == MEMORY)
365 if (case_bytes < get_workspace ())
367 size_t block_idx = cf->case_cnt / CASES_PER_BLOCK;
368 size_t case_idx = cf->case_cnt % CASES_PER_BLOCK;
369 struct ccase new_case;
371 case_bytes += cf->case_acct_size;
372 case_clone (&new_case, c);
375 if ((block_idx & (block_idx - 1)) == 0)
377 size_t block_cap = block_idx == 0 ? 1 : block_idx * 2;
378 cf->cases = xnrealloc (cf->cases,
379 block_cap, sizeof *cf->cases);
382 cf->cases[block_idx] = xnmalloc (CASES_PER_BLOCK,
386 case_move (&cf->cases[block_idx][case_idx], &new_case);
390 casefile_to_disk (cf);
391 assert (cf->storage == DISK);
392 write_case_to_disk (cf, c);
396 write_case_to_disk (cf, c);
402 /* Appends case C to casefile CF, which takes over ownership of
403 C. Not valid after any reader for CF has been created.
404 Returns true if successful, false if an I/O error occurred. */
406 casefile_append_xfer (struct casefile *cf, struct ccase *c)
408 casefile_append (cf, c);
413 /* Writes case C to casefile CF's disk buffer, first flushing the buffer to
414 disk if it would otherwise overflow.
415 Returns true if successful, false if an I/O error occurred. */
417 write_case_to_disk (struct casefile *cf, const struct ccase *c)
422 case_to_values (c, cf->buffer + cf->buffer_used, cf->value_cnt);
423 cf->buffer_used += cf->value_cnt;
424 if (cf->buffer_used + cf->value_cnt > cf->buffer_size)
428 /* If any bytes in CF's output buffer are used, flush them to
431 flush_buffer (struct casefile *cf)
433 if (cf->ok && cf->buffer_used > 0)
435 if (!full_write (cf->fd, cf->buffer,
436 cf->buffer_size * sizeof *cf->buffer))
437 io_error (cf, _("Error writing temporary file: %s."),
443 /* If CF is currently stored in memory, writes it to disk. Readers, if any,
444 retain their current positions.
445 Returns true if successful, false if an I/O error occurred. */
447 casefile_to_disk (const struct casefile *cf_)
449 struct casefile *cf = (struct casefile *) cf_;
450 struct casereader *reader;
454 if (cf->storage == MEMORY)
456 size_t idx, block_cnt;
458 assert (cf->file_name == NULL);
459 assert (cf->fd == -1);
460 assert (cf->buffer_used == 0);
462 if (!make_temp_file (&cf->fd, &cf->file_name))
469 cf->buffer = xnmalloc (cf->buffer_size, sizeof *cf->buffer);
470 memset (cf->buffer, 0, cf->buffer_size * sizeof *cf->buffer);
472 case_bytes -= cf->case_cnt * cf->case_acct_size;
473 for (idx = 0; idx < cf->case_cnt; idx++)
475 size_t block_idx = idx / CASES_PER_BLOCK;
476 size_t case_idx = idx % CASES_PER_BLOCK;
477 struct ccase *c = &cf->cases[block_idx][case_idx];
478 write_case_to_disk (cf, c);
482 block_cnt = DIV_RND_UP (cf->case_cnt, CASES_PER_BLOCK);
483 for (idx = 0; idx < block_cnt; idx++)
484 free (cf->cases[idx]);
489 if (cf->mode == READ)
492 for (reader = cf->readers; reader != NULL; reader = reader->next)
493 reader_open_file (reader);
498 /* Changes CF to reader mode, ensuring that no more cases may be
499 added. Creating a casereader for CF has the same effect. */
501 casefile_mode_reader (struct casefile *cf)
507 /* Creates and returns a casereader for CF. A casereader can be used to
508 sequentially read the cases in a casefile. */
510 casefile_get_reader (const struct casefile *cf_)
512 struct casefile *cf = (struct casefile *) cf_;
513 struct casereader *reader;
516 assert (!cf->being_destroyed);
518 /* Flush the buffer to disk if it's not empty. */
519 if (cf->mode == WRITE && cf->storage == DISK)
524 reader = xmalloc (sizeof *reader);
525 reader->next = cf->readers;
526 if (cf->readers != NULL)
527 reader->next->prev = reader;
528 cf->readers = reader;
531 reader->case_idx = 0;
532 reader->destructive = 0;
533 reader->random = false;
535 reader->buffer = NULL;
536 reader->buffer_pos = 0;
537 case_nullify (&reader->c);
539 if (reader->cf->storage == DISK)
540 reader_open_file (reader);
545 /* Creates and returns a random casereader for CF. A random
546 casereader can be used to randomly read the cases in a
549 casefile_get_random_reader (const struct casefile *cf)
551 struct casefile *mutable_casefile = (struct casefile*) cf;
552 struct casereader *reader;
554 enum { WRITE, READ } mode = cf->mode ;
555 reader = casefile_get_reader (cf);
556 reader->random = true;
557 mutable_casefile->mode = mode;
562 /* Creates and returns a destructive casereader for CF. Like a
563 normal casereader, a destructive casereader sequentially reads
564 the cases in a casefile. Unlike a normal casereader, a
565 destructive reader cannot operate concurrently with any other
566 reader. (This restriction could be relaxed in a few ways, but
567 it is so far unnecessary for other code.) */
569 casefile_get_destructive_reader (struct casefile *cf)
571 struct casereader *reader;
573 assert (cf->readers == NULL);
574 reader = casefile_get_reader (cf);
575 reader->destructive = 1;
576 cf->being_destroyed = 1;
580 /* Opens a disk file for READER and seeks to the current position as indicated
581 by case_idx. Normally the current position is the beginning of the file,
582 but casefile_to_disk may cause the file to be opened at a different
585 reader_open_file (struct casereader *reader)
587 struct casefile *cf = reader->cf;
588 if (!cf->ok || reader->case_idx >= cf->case_cnt)
598 reader->fd = safe_open (cf->file_name, O_RDONLY);
600 io_error (cf, _("%s: Opening temporary file: %s."),
601 cf->file_name, strerror (errno));
604 if (cf->buffer != NULL)
606 reader->buffer = cf->buffer;
611 reader->buffer = xnmalloc (cf->buffer_size, sizeof *cf->buffer);
612 memset (reader->buffer, 0, cf->buffer_size * sizeof *cf->buffer);
615 case_create (&reader->c, cf->value_cnt);
617 reader->buffer_ofs = -1;
618 reader->file_ofs = -1;
619 seek_and_fill_buffer (reader);
622 /* Seeks the backing file for READER to the proper position and
623 refreshes the buffer contents. */
625 seek_and_fill_buffer (struct casereader *reader)
627 struct casefile *cf = reader->cf;
630 if (cf->value_cnt != 0)
632 size_t buffer_case_cnt = cf->buffer_size / cf->value_cnt;
633 new_ofs = ((off_t) reader->case_idx / buffer_case_cnt
634 * cf->buffer_size * sizeof *cf->buffer);
635 reader->buffer_pos = (reader->case_idx % buffer_case_cnt
640 if (new_ofs != reader->file_ofs)
642 if (lseek (reader->fd, new_ofs, SEEK_SET) != new_ofs)
643 io_error (cf, _("%s: Seeking temporary file: %s."),
644 cf->file_name, strerror (errno));
646 reader->file_ofs = new_ofs;
649 if (cf->case_cnt > 0 && cf->value_cnt > 0 && reader->buffer_ofs != new_ofs)
650 fill_buffer (reader);
653 /* Fills READER's buffer by reading a block from disk. */
655 fill_buffer (struct casereader *reader)
659 int bytes = full_read (reader->fd, reader->buffer,
660 reader->cf->buffer_size * sizeof *reader->buffer);
662 io_error (reader->cf, _("%s: Reading temporary file: %s."),
663 reader->cf->file_name, strerror (errno));
664 else if (bytes != reader->cf->buffer_size * sizeof *reader->buffer)
665 io_error (reader->cf, _("%s: Temporary file ended unexpectedly."),
666 reader->cf->file_name);
669 reader->buffer_ofs = reader->file_ofs;
670 reader->file_ofs += bytes;
673 return reader->cf->ok;
676 /* Returns the casefile that READER reads. */
677 const struct casefile *
678 casereader_get_casefile (const struct casereader *reader)
680 assert (reader != NULL);
685 /* Reads a copy of the next case from READER into C.
686 Caller is responsible for destroying C.
687 Returns true if successful, false at end of file. */
689 casereader_read (struct casereader *reader, struct ccase *c)
691 assert (reader != NULL);
693 if (!reader->cf->ok || reader->case_idx >= reader->cf->case_cnt)
696 if (reader->cf->storage == MEMORY)
698 size_t block_idx = reader->case_idx / CASES_PER_BLOCK;
699 size_t case_idx = reader->case_idx % CASES_PER_BLOCK;
701 case_clone (c, &reader->cf->cases[block_idx][case_idx]);
707 if (reader->buffer_pos + reader->cf->value_cnt > reader->cf->buffer_size)
709 if (!fill_buffer (reader))
711 reader->buffer_pos = 0;
714 case_from_values (&reader->c, reader->buffer + reader->buffer_pos,
715 reader->cf->value_cnt);
716 reader->buffer_pos += reader->cf->value_cnt;
719 case_clone (c, &reader->c);
724 /* Reads the next case from READER into C and transfers ownership
725 to the caller. Caller is responsible for destroying C.
726 Returns true if successful, false at end of file or on I/O
729 casereader_read_xfer (struct casereader *reader, struct ccase *c)
731 assert (reader != NULL);
733 if (reader->destructive == 0
734 || reader->case_idx >= reader->cf->case_cnt
735 || reader->cf->storage == DISK)
736 return casereader_read (reader, c);
739 size_t block_idx = reader->case_idx / CASES_PER_BLOCK;
740 size_t case_idx = reader->case_idx % CASES_PER_BLOCK;
741 struct ccase *read_case = &reader->cf->cases[block_idx][case_idx];
743 case_move (c, read_case);
749 /* Sets the next case to be read by READER to CASE_IDX,
750 which must be less than the number of cases in the casefile.
751 Allowed only for random readers. */
753 casereader_seek (struct casereader *reader, unsigned long case_idx)
755 assert (reader != NULL);
756 assert (reader->random);
757 assert (case_idx < reader->cf->case_cnt);
759 reader->case_idx = case_idx;
760 if (reader->cf->storage == DISK)
761 seek_and_fill_buffer (reader);
764 /* Destroys READER. */
766 casereader_destroy (struct casereader *reader)
768 assert (reader != NULL);
770 if (reader->next != NULL)
771 reader->next->prev = reader->prev;
772 if (reader->prev != NULL)
773 reader->prev->next = reader->next;
774 if (reader->cf->readers == reader)
775 reader->cf->readers = reader->next;
777 if (reader->cf->buffer == NULL)
778 reader->cf->buffer = reader->buffer;
780 free (reader->buffer);
782 if (reader->fd != -1)
784 if (reader->cf->fd == -1)
785 reader->cf->fd = reader->fd;
787 safe_close (reader->fd);
790 case_destroy (&reader->c);
795 /* Marks CF as having encountered an I/O error.
796 If this is the first error on CF, reports FORMAT to the user,
797 doing printf()-style substitutions. */
799 io_error (struct casefile *cf, const char *format, ...)
806 m.category = MSG_GENERAL;
807 m.severity = MSG_ERROR;
808 m.where.file_name = NULL;
809 m.where.line_number = -1;
810 va_start (args, format);
811 m.text = xvasprintf (format, args);
819 /* Calls open(), passing FILE_NAME and FLAGS, repeating as necessary
820 to deal with interrupted calls. */
822 safe_open (const char *file_name, int flags)
828 fd = open (file_name, flags);
830 while (fd == -1 && errno == EINTR);
835 /* Calls close(), passing FD, repeating as necessary to deal with
836 interrupted calls. */
837 static int safe_close (int fd)
845 while (retval == -1 && errno == EINTR);
850 /* Registers our exit handler with atexit() if it has not already
853 register_atexit (void)
855 static bool registered = false;
859 atexit (exit_handler);
863 /* atexit() handler that closes and deletes our temporary
868 while (casefiles != NULL)
869 casefile_destroy (casefiles);