1 /* PSPP - computes sample statistics.
2 Copyright (C) 2004 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
29 #include <libpspp/alloc.h>
31 #include <libpspp/compiler.h>
32 #include <libpspp/message.h>
33 #include "full-read.h"
34 #include "full-write.h"
35 #include <libpspp/misc.h>
36 #include "make-file.h"
41 #define _(msgid) gettext (msgid)
43 #define IO_BUF_SIZE (8192 / sizeof (union value))
45 /* A casefile represents a sequentially accessible stream of
48 If workspace allows, a casefile is maintained in memory. If
49 workspace overflows, then the casefile is pushed to disk. In
50 either case the interface presented to callers is kept the
53 The life cycle of a casefile consists of up to three phases:
55 1. Writing. The casefile initially contains no cases. In
56 this phase, any number of cases may be appended to the
57 end of a casefile. (Cases are never inserted in the
58 middle or before the beginning of a casefile.)
60 Use casefile_append() or casefile_append_xfer() to
61 append a case to a casefile.
63 2. Reading. The casefile may be read sequentially,
64 starting from the beginning, by "casereaders". Any
65 number of casereaders may be created, at any time,
66 during the reading phase. Each casereader has an
67 independent position in the casefile.
69 Ordinary casereaders may only move forward. They
70 cannot move backward to arbitrary records or seek
71 randomly. Cloning casereaders is possible, but it is
74 Use casefile_get_reader() to create a casereader for
75 use in phase 2. This also transitions from phase 1 to
76 phase 2. Calling casefile_mode_reader() makes the same
77 transition, without creating a casereader.
79 Use casereader_read() or casereader_read_xfer() to read
80 a case from a casereader. Use casereader_destroy() to
81 discard a casereader when it is no longer needed.
83 "Random" casereaders, which support a seek operation,
84 may also be created. These should not, generally, be
85 used for statistical procedures, because random access
86 is much slower than sequential access. They are
87 intended for use by the GUI.
89 3. Destruction. This phase is optional. The casefile is
90 also read with casereaders in this phase, but the
91 ability to create new casereaders is curtailed.
93 In this phase, casereaders could still be cloned (once
94 we eventually implement cloning).
96 To transition from phase 1 or 2 to phase 3 and create a
97 casereader, call casefile_get_destructive_reader().
98 The same functions apply to the casereader obtained
99 this way as apply to casereaders obtained in phase 2.
101 After casefile_get_destructive_reader() is called, no
102 more casereaders may be created with
103 casefile_get_reader() or
104 casefile_get_destructive_reader(). (If cloning of
105 casereaders were implemented, it would still be
108 The purpose of the limitations applied to casereaders
109 in phase 3 is to allow in-memory casefiles to fully
110 transfer ownership of cases to the casereaders,
111 avoiding the need for extra copies of case data. For
112 relatively static data sets with many variables, I
113 suspect (without evidence) that this may be a big
116 When a casefile is no longer needed, it may be destroyed with
117 casefile_destroy(). This function will also destroy any
118 remaining casereaders. */
120 /* FIXME: should we implement compression? */
122 /* In-memory cases are arranged in an array of arrays. The top
123 level is variable size and the size of each bottom level array
124 is fixed at the number of cases defined here. */
125 #define CASES_PER_BLOCK 128
131 struct casefile *next, *prev; /* Next, prev in global list. */
132 size_t value_cnt; /* Case size in `union value's. */
133 size_t case_acct_size; /* Case size for accounting. */
134 unsigned long case_cnt; /* Number of cases stored. */
135 enum { MEMORY, DISK } storage; /* Where cases are stored. */
136 enum { WRITE, READ } mode; /* Is writing or reading allowed? */
137 struct casereader *readers; /* List of our readers. */
138 bool being_destroyed; /* Does a destructive reader exist? */
139 bool ok; /* False after I/O error. */
141 /* Memory storage. */
142 struct ccase **cases; /* Pointer to array of cases. */
145 int fd; /* File descriptor, -1 if none. */
146 char *file_name; /* File name. */
147 union value *buffer; /* I/O buffer, NULL if none. */
148 size_t buffer_used; /* Number of values used in buffer. */
149 size_t buffer_size; /* Buffer size in values. */
152 /* For reading out the cases in a casefile. */
155 struct casereader *next, *prev; /* Next, prev in casefile's list. */
156 struct casefile *cf; /* Our casefile. */
157 unsigned long case_idx; /* Case number of current case. */
158 bool destructive; /* Is this a destructive reader? */
159 bool random; /* Is this a random reader? */
162 int fd; /* File descriptor. */
163 off_t file_ofs; /* Current position in fd. */
164 off_t buffer_ofs; /* File offset of buffer start. */
165 union value *buffer; /* I/O buffer. */
166 size_t buffer_pos; /* Offset of buffer position. */
167 struct ccase c; /* Current case. */
170 /* Return the case number of the current case */
172 casereader_cnum(const struct casereader *r)
177 /* Doubly linked list of all casefiles. */
178 static struct casefile *casefiles;
180 /* Number of bytes of case allocated in in-memory casefiles. */
181 static size_t case_bytes;
183 static void register_atexit (void);
184 static void exit_handler (void);
186 static void reader_open_file (struct casereader *);
187 static void write_case_to_disk (struct casefile *, const struct ccase *);
188 static void flush_buffer (struct casefile *);
189 static void seek_and_fill_buffer (struct casereader *);
190 static bool fill_buffer (struct casereader *);
192 static void io_error (struct casefile *, const char *, ...)
193 PRINTF_FORMAT (2, 3);
194 static int safe_open (const char *file_name, int flags);
195 static int safe_close (int fd);
197 /* Creates and returns a casefile to store cases of VALUE_CNT
198 `union value's each. */
200 casefile_create (size_t value_cnt)
202 struct casefile *cf = xmalloc (sizeof *cf);
203 cf->next = casefiles;
205 if (cf->next != NULL)
208 cf->value_cnt = value_cnt;
209 cf->case_acct_size = (cf->value_cnt + 4) * sizeof *cf->buffer;
211 cf->storage = MEMORY;
214 cf->being_destroyed = 0;
218 cf->file_name = NULL;
220 cf->buffer_size = ROUND_UP (cf->value_cnt, IO_BUF_SIZE);
221 if (cf->value_cnt > 0 && cf->buffer_size % cf->value_cnt > 64)
222 cf->buffer_size = cf->value_cnt;
228 /* Destroys casefile CF. */
230 casefile_destroy (struct casefile *cf)
234 if (cf->next != NULL)
235 cf->next->prev = cf->prev;
236 if (cf->prev != NULL)
237 cf->prev->next = cf->next;
239 casefiles = cf->next;
241 while (cf->readers != NULL)
242 casereader_destroy (cf->readers);
244 if (cf->cases != NULL)
246 size_t idx, block_cnt;
248 case_bytes -= cf->case_cnt * cf->case_acct_size;
249 for (idx = 0; idx < cf->case_cnt; idx++)
251 size_t block_idx = idx / CASES_PER_BLOCK;
252 size_t case_idx = idx % CASES_PER_BLOCK;
253 struct ccase *c = &cf->cases[block_idx][case_idx];
257 block_cnt = DIV_RND_UP (cf->case_cnt, CASES_PER_BLOCK);
258 for (idx = 0; idx < block_cnt; idx++)
259 free (cf->cases[idx]);
267 if (cf->file_name != NULL && remove (cf->file_name) == -1)
268 io_error (cf, _("%s: Removing temporary file: %s."),
269 cf->file_name, strerror (errno));
270 free (cf->file_name);
278 /* Returns true if an I/O error has occurred in casefile CF. */
280 casefile_error (const struct casefile *cf)
285 /* Returns true only if casefile CF is stored in memory (instead of on
286 disk), false otherwise. */
288 casefile_in_core (const struct casefile *cf)
292 return cf->storage == MEMORY;
295 /* Puts a casefile to "sleep", that is, minimizes the resources
296 needed for it by closing its file descriptor and freeing its
297 buffer. This is useful if we need so many casefiles that we
298 might not have enough memory and file descriptors to go
301 For simplicity, this implementation always converts the
302 casefile to reader mode. If this turns out to be a problem,
303 with a little extra work we could also support sleeping
306 Returns true if successful, false if an I/O error occurred. */
308 casefile_sleep (const struct casefile *cf_)
310 struct casefile *cf = (struct casefile *) cf_;
313 casefile_mode_reader (cf);
314 casefile_to_disk (cf);
322 if (cf->buffer != NULL)
331 /* Returns the number of `union value's in a case for CF. */
333 casefile_get_value_cnt (const struct casefile *cf)
337 return cf->value_cnt;
340 /* Returns the number of cases in casefile CF. */
342 casefile_get_case_cnt (const struct casefile *cf)
349 /* Appends a copy of case C to casefile CF. Not valid after any
350 reader for CF has been created.
351 Returns true if successful, false if an I/O error occurred. */
353 casefile_append (struct casefile *cf, const struct ccase *c)
357 assert (cf->mode == WRITE);
359 assert ( cf->value_cnt <= c->case_data->value_cnt );
361 /* Try memory first. */
362 if (cf->storage == MEMORY)
364 if (case_bytes < get_workspace ())
366 size_t block_idx = cf->case_cnt / CASES_PER_BLOCK;
367 size_t case_idx = cf->case_cnt % CASES_PER_BLOCK;
368 struct ccase new_case;
370 case_bytes += cf->case_acct_size;
371 case_clone (&new_case, c);
374 if ((block_idx & (block_idx - 1)) == 0)
376 size_t block_cap = block_idx == 0 ? 1 : block_idx * 2;
377 cf->cases = xnrealloc (cf->cases,
378 block_cap, sizeof *cf->cases);
381 cf->cases[block_idx] = xnmalloc (CASES_PER_BLOCK,
385 case_move (&cf->cases[block_idx][case_idx], &new_case);
389 casefile_to_disk (cf);
390 assert (cf->storage == DISK);
391 write_case_to_disk (cf, c);
395 write_case_to_disk (cf, c);
401 /* Appends case C to casefile CF, which takes over ownership of
402 C. Not valid after any reader for CF has been created.
403 Returns true if successful, false if an I/O error occurred. */
405 casefile_append_xfer (struct casefile *cf, struct ccase *c)
407 casefile_append (cf, c);
412 /* Writes case C to casefile CF's disk buffer, first flushing the buffer to
413 disk if it would otherwise overflow.
414 Returns true if successful, false if an I/O error occurred. */
416 write_case_to_disk (struct casefile *cf, const struct ccase *c)
421 case_to_values (c, cf->buffer + cf->buffer_used, cf->value_cnt);
422 cf->buffer_used += cf->value_cnt;
423 if (cf->buffer_used + cf->value_cnt > cf->buffer_size)
427 /* If any bytes in CF's output buffer are used, flush them to
430 flush_buffer (struct casefile *cf)
432 if (cf->ok && cf->buffer_used > 0)
434 if (!full_write (cf->fd, cf->buffer,
435 cf->buffer_size * sizeof *cf->buffer))
436 io_error (cf, _("Error writing temporary file: %s."),
442 /* If CF is currently stored in memory, writes it to disk. Readers, if any,
443 retain their current positions.
444 Returns true if successful, false if an I/O error occurred. */
446 casefile_to_disk (const struct casefile *cf_)
448 struct casefile *cf = (struct casefile *) cf_;
449 struct casereader *reader;
453 if (cf->storage == MEMORY)
455 size_t idx, block_cnt;
457 assert (cf->file_name == NULL);
458 assert (cf->fd == -1);
459 assert (cf->buffer_used == 0);
461 if (!make_temp_file (&cf->fd, &cf->file_name))
468 cf->buffer = xnmalloc (cf->buffer_size, sizeof *cf->buffer);
469 memset (cf->buffer, 0, cf->buffer_size * sizeof *cf->buffer);
471 case_bytes -= cf->case_cnt * cf->case_acct_size;
472 for (idx = 0; idx < cf->case_cnt; idx++)
474 size_t block_idx = idx / CASES_PER_BLOCK;
475 size_t case_idx = idx % CASES_PER_BLOCK;
476 struct ccase *c = &cf->cases[block_idx][case_idx];
477 write_case_to_disk (cf, c);
481 block_cnt = DIV_RND_UP (cf->case_cnt, CASES_PER_BLOCK);
482 for (idx = 0; idx < block_cnt; idx++)
483 free (cf->cases[idx]);
488 if (cf->mode == READ)
491 for (reader = cf->readers; reader != NULL; reader = reader->next)
492 reader_open_file (reader);
497 /* Changes CF to reader mode, ensuring that no more cases may be
498 added. Creating a casereader for CF has the same effect. */
500 casefile_mode_reader (struct casefile *cf)
506 /* Creates and returns a casereader for CF. A casereader can be used to
507 sequentially read the cases in a casefile. */
509 casefile_get_reader (const struct casefile *cf_)
511 struct casefile *cf = (struct casefile *) cf_;
512 struct casereader *reader;
515 assert (!cf->being_destroyed);
517 /* Flush the buffer to disk if it's not empty. */
518 if (cf->mode == WRITE && cf->storage == DISK)
523 reader = xmalloc (sizeof *reader);
524 reader->next = cf->readers;
525 if (cf->readers != NULL)
526 reader->next->prev = reader;
527 cf->readers = reader;
530 reader->case_idx = 0;
531 reader->destructive = 0;
532 reader->random = false;
534 reader->buffer = NULL;
535 reader->buffer_pos = 0;
536 case_nullify (&reader->c);
538 if (reader->cf->storage == DISK)
539 reader_open_file (reader);
544 /* Creates and returns a random casereader for CF. A random
545 casereader can be used to randomly read the cases in a
548 casefile_get_random_reader (const struct casefile *cf)
550 struct casefile *mutable_casefile = (struct casefile*) cf;
551 struct casereader *reader;
553 enum { WRITE, READ } mode = cf->mode ;
554 reader = casefile_get_reader (cf);
555 reader->random = true;
556 mutable_casefile->mode = mode;
561 /* Creates and returns a destructive casereader for CF. Like a
562 normal casereader, a destructive casereader sequentially reads
563 the cases in a casefile. Unlike a normal casereader, a
564 destructive reader cannot operate concurrently with any other
565 reader. (This restriction could be relaxed in a few ways, but
566 it is so far unnecessary for other code.) */
568 casefile_get_destructive_reader (struct casefile *cf)
570 struct casereader *reader;
572 assert (cf->readers == NULL);
573 reader = casefile_get_reader (cf);
574 reader->destructive = 1;
575 cf->being_destroyed = 1;
579 /* Opens a disk file for READER and seeks to the current position as indicated
580 by case_idx. Normally the current position is the beginning of the file,
581 but casefile_to_disk may cause the file to be opened at a different
584 reader_open_file (struct casereader *reader)
586 struct casefile *cf = reader->cf;
587 if (!cf->ok || reader->case_idx >= cf->case_cnt)
597 reader->fd = safe_open (cf->file_name, O_RDONLY);
599 io_error (cf, _("%s: Opening temporary file: %s."),
600 cf->file_name, strerror (errno));
603 if (cf->buffer != NULL)
605 reader->buffer = cf->buffer;
610 reader->buffer = xnmalloc (cf->buffer_size, sizeof *cf->buffer);
611 memset (reader->buffer, 0, cf->buffer_size * sizeof *cf->buffer);
614 case_create (&reader->c, cf->value_cnt);
616 reader->buffer_ofs = -1;
617 reader->file_ofs = -1;
618 seek_and_fill_buffer (reader);
621 /* Seeks the backing file for READER to the proper position and
622 refreshes the buffer contents. */
624 seek_and_fill_buffer (struct casereader *reader)
626 struct casefile *cf = reader->cf;
629 if (cf->value_cnt != 0)
631 size_t buffer_case_cnt = cf->buffer_size / cf->value_cnt;
632 new_ofs = ((off_t) reader->case_idx / buffer_case_cnt
633 * cf->buffer_size * sizeof *cf->buffer);
634 reader->buffer_pos = (reader->case_idx % buffer_case_cnt
639 if (new_ofs != reader->file_ofs)
641 if (lseek (reader->fd, new_ofs, SEEK_SET) != new_ofs)
642 io_error (cf, _("%s: Seeking temporary file: %s."),
643 cf->file_name, strerror (errno));
645 reader->file_ofs = new_ofs;
648 if (cf->case_cnt > 0 && cf->value_cnt > 0 && reader->buffer_ofs != new_ofs)
649 fill_buffer (reader);
652 /* Fills READER's buffer by reading a block from disk. */
654 fill_buffer (struct casereader *reader)
658 int bytes = full_read (reader->fd, reader->buffer,
659 reader->cf->buffer_size * sizeof *reader->buffer);
661 io_error (reader->cf, _("%s: Reading temporary file: %s."),
662 reader->cf->file_name, strerror (errno));
663 else if (bytes != reader->cf->buffer_size * sizeof *reader->buffer)
664 io_error (reader->cf, _("%s: Temporary file ended unexpectedly."),
665 reader->cf->file_name);
668 reader->buffer_ofs = reader->file_ofs;
669 reader->file_ofs += bytes;
672 return reader->cf->ok;
675 /* Returns the casefile that READER reads. */
676 const struct casefile *
677 casereader_get_casefile (const struct casereader *reader)
679 assert (reader != NULL);
684 /* Reads a copy of the next case from READER into C.
685 Caller is responsible for destroying C.
686 Returns true if successful, false at end of file. */
688 casereader_read (struct casereader *reader, struct ccase *c)
690 assert (reader != NULL);
692 if (!reader->cf->ok || reader->case_idx >= reader->cf->case_cnt)
695 if (reader->cf->storage == MEMORY)
697 size_t block_idx = reader->case_idx / CASES_PER_BLOCK;
698 size_t case_idx = reader->case_idx % CASES_PER_BLOCK;
700 case_clone (c, &reader->cf->cases[block_idx][case_idx]);
706 if (reader->buffer_pos + reader->cf->value_cnt > reader->cf->buffer_size)
708 if (!fill_buffer (reader))
710 reader->buffer_pos = 0;
713 case_from_values (&reader->c, reader->buffer + reader->buffer_pos,
714 reader->cf->value_cnt);
715 reader->buffer_pos += reader->cf->value_cnt;
718 case_clone (c, &reader->c);
723 /* Reads the next case from READER into C and transfers ownership
724 to the caller. Caller is responsible for destroying C.
725 Returns true if successful, false at end of file or on I/O
728 casereader_read_xfer (struct casereader *reader, struct ccase *c)
730 assert (reader != NULL);
732 if (reader->destructive == 0
733 || reader->case_idx >= reader->cf->case_cnt
734 || reader->cf->storage == DISK)
735 return casereader_read (reader, c);
738 size_t block_idx = reader->case_idx / CASES_PER_BLOCK;
739 size_t case_idx = reader->case_idx % CASES_PER_BLOCK;
740 struct ccase *read_case = &reader->cf->cases[block_idx][case_idx];
742 case_move (c, read_case);
748 /* Sets the next case to be read by READER to CASE_IDX,
749 which must be less than the number of cases in the casefile.
750 Allowed only for random readers. */
752 casereader_seek (struct casereader *reader, unsigned long case_idx)
754 assert (reader != NULL);
755 assert (reader->random);
756 assert (case_idx < reader->cf->case_cnt);
758 reader->case_idx = case_idx;
759 if (reader->cf->storage == DISK)
760 seek_and_fill_buffer (reader);
763 /* Destroys READER. */
765 casereader_destroy (struct casereader *reader)
767 assert (reader != NULL);
769 if (reader->next != NULL)
770 reader->next->prev = reader->prev;
771 if (reader->prev != NULL)
772 reader->prev->next = reader->next;
773 if (reader->cf->readers == reader)
774 reader->cf->readers = reader->next;
776 if (reader->cf->buffer == NULL)
777 reader->cf->buffer = reader->buffer;
779 free (reader->buffer);
781 if (reader->fd != -1)
783 if (reader->cf->fd == -1)
784 reader->cf->fd = reader->fd;
786 safe_close (reader->fd);
789 case_destroy (&reader->c);
794 /* Marks CF as having encountered an I/O error.
795 If this is the first error on CF, reports FORMAT to the user,
796 doing printf()-style substitutions. */
798 io_error (struct casefile *cf, const char *format, ...)
805 m.category = MSG_GENERAL;
806 m.severity = MSG_ERROR;
807 m.where.file_name = NULL;
808 m.where.line_number = -1;
809 va_start (args, format);
810 m.text = xvasprintf (format, args);
818 /* Calls open(), passing FILE_NAME and FLAGS, repeating as necessary
819 to deal with interrupted calls. */
821 safe_open (const char *file_name, int flags)
827 fd = open (file_name, flags);
829 while (fd == -1 && errno == EINTR);
834 /* Calls close(), passing FD, repeating as necessary to deal with
835 interrupted calls. */
836 static int safe_close (int fd)
844 while (retval == -1 && errno == EINTR);
849 /* Registers our exit handler with atexit() if it has not already
852 register_atexit (void)
854 static bool registered = false;
858 atexit (exit_handler);
862 /* atexit() handler that closes and deletes our temporary
867 while (casefiles != NULL)
868 casefile_destroy (casefiles);