1 /* PSPP - computes sample statistics.
2 Copyright (C) 2004 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
32 #include "full-read.h"
33 #include "full-write.h"
40 #define _(msgid) gettext (msgid)
42 #define IO_BUF_SIZE (8192 / sizeof (union value))
44 /* A casefile represents a sequentially accessible stream of
47 If workspace allows, a casefile is maintained in memory. If
48 workspace overflows, then the casefile is pushed to disk. In
49 either case the interface presented to callers is kept the
52 The life cycle of a casefile consists of up to three phases:
54 1. Writing. The casefile initially contains no cases. In
55 this phase, any number of cases may be appended to the
56 end of a casefile. (Cases are never inserted in the
57 middle or before the beginning of a casefile.)
59 Use casefile_append() or casefile_append_xfer() to
60 append a case to a casefile.
62 2. Reading. The casefile may be read sequentially,
63 starting from the beginning, by "casereaders". Any
64 number of casereaders may be created, at any time,
65 during the reading phase. Each casereader has an
66 independent position in the casefile.
68 Casereaders may only move forward. They cannot move
69 backward to arbitrary records or seek randomly.
70 Cloning casereaders is possible, but it is not yet
73 Use casefile_get_reader() to create a casereader for
74 use in phase 2. This also transitions from phase 1 to
75 phase 2. Calling casefile_mode_reader() makes the same
76 transition, without creating a casereader.
78 Use casereader_read(), casereader_read_xfer(), or
79 casereader_read_xfer_assert() to read a case from a
80 casereader. Use casereader_destroy() to discard a
81 casereader when it is no longer needed.
83 3. Destruction. This phase is optional. The casefile is
84 also read with casereaders in this phase, but the
85 ability to create new casereaders is curtailed.
87 In this phase, casereaders could still be cloned (once
88 we eventually implement cloning).
90 To transition from phase 1 or 2 to phase 3 and create a
91 casereader, call casefile_get_destructive_reader().
92 The same functions apply to the casereader obtained
93 this way as apply to casereaders obtained in phase 2.
95 After casefile_get_destructive_reader() is called, no
96 more casereaders may be created with
97 casefile_get_reader() or
98 casefile_get_destructive_reader(). (If cloning of
99 casereaders were implemented, it would still be
102 The purpose of the limitations applied to casereaders
103 in phase 3 is to allow in-memory casefiles to fully
104 transfer ownership of cases to the casereaders,
105 avoiding the need for extra copies of case data. For
106 relatively static data sets with many variables, I
107 suspect (without evidence) that this may be a big
110 When a casefile is no longer needed, it may be destroyed with
111 casefile_destroy(). This function will also destroy any
112 remaining casereaders. */
114 /* In-memory cases are arranged in an array of arrays. The top
115 level is variable size and the size of each bottom level array
116 is fixed at the number of cases defined here. */
117 #define CASES_PER_BLOCK 128
123 struct casefile *next, *prev; /* Next, prev in global list. */
124 size_t value_cnt; /* Case size in `union value's. */
125 size_t case_acct_size; /* Case size for accounting. */
126 unsigned long case_cnt; /* Number of cases stored. */
127 enum { MEMORY, DISK } storage; /* Where cases are stored. */
128 enum { WRITE, READ } mode; /* Is writing or reading allowed? */
129 struct casereader *readers; /* List of our readers. */
130 int being_destroyed; /* Does a destructive reader exist? */
132 /* Memory storage. */
133 struct ccase **cases; /* Pointer to array of cases. */
136 int fd; /* File descriptor, -1 if none. */
137 char *filename; /* Filename. */
138 union value *buffer; /* I/O buffer, NULL if none. */
139 size_t buffer_used; /* Number of values used in buffer. */
140 size_t buffer_size; /* Buffer size in values. */
143 /* For reading out the cases in a casefile. */
146 struct casereader *next, *prev; /* Next, prev in casefile's list. */
147 struct casefile *cf; /* Our casefile. */
148 unsigned long case_idx; /* Case number of current case. */
149 int destructive; /* Is this a destructive reader? */
152 int fd; /* File descriptor. */
153 union value *buffer; /* I/O buffer. */
154 size_t buffer_pos; /* Offset of buffer position. */
155 struct ccase c; /* Current case. */
158 /* Return the case number of the current case */
160 casereader_cnum(const struct casereader *r)
165 /* Doubly linked list of all casefiles. */
166 static struct casefile *casefiles;
168 /* Number of bytes of case allocated in in-memory casefiles. */
169 static size_t case_bytes;
171 static void register_atexit (void);
172 static void exit_handler (void);
174 static void reader_open_file (struct casereader *reader);
175 static void write_case_to_disk (struct casefile *cf, const struct ccase *c);
176 static void flush_buffer (struct casefile *cf);
177 static void fill_buffer (struct casereader *reader);
179 static int safe_open (const char *filename, int flags);
180 static int safe_close (int fd);
182 /* Creates and returns a casefile to store cases of VALUE_CNT
183 `union value's each. */
185 casefile_create (size_t value_cnt)
187 struct casefile *cf = xmalloc (sizeof *cf);
188 cf->next = casefiles;
190 if (cf->next != NULL)
193 cf->value_cnt = value_cnt;
194 cf->case_acct_size = (cf->value_cnt + 4) * sizeof *cf->buffer;
196 cf->storage = MEMORY;
199 cf->being_destroyed = 0;
204 cf->buffer_size = ROUND_UP (cf->value_cnt, IO_BUF_SIZE);
205 if (cf->value_cnt > 0 && cf->buffer_size % cf->value_cnt > 64)
206 cf->buffer_size = cf->value_cnt;
212 /* Destroys casefile CF. */
214 casefile_destroy (struct casefile *cf)
218 if (cf->next != NULL)
219 cf->next->prev = cf->prev;
220 if (cf->prev != NULL)
221 cf->prev->next = cf->next;
223 casefiles = cf->next;
225 while (cf->readers != NULL)
226 casereader_destroy (cf->readers);
228 if (cf->cases != NULL)
230 size_t idx, block_cnt;
232 case_bytes -= cf->case_cnt * cf->case_acct_size;
233 for (idx = 0; idx < cf->case_cnt; idx++)
235 size_t block_idx = idx / CASES_PER_BLOCK;
236 size_t case_idx = idx % CASES_PER_BLOCK;
237 struct ccase *c = &cf->cases[block_idx][case_idx];
241 block_cnt = DIV_RND_UP (cf->case_cnt, CASES_PER_BLOCK);
242 for (idx = 0; idx < block_cnt; idx++)
243 free (cf->cases[idx]);
251 if (cf->filename != NULL && remove (cf->filename) == -1)
252 msg (ME, _("%s: Removing temporary file: %s."),
253 cf->filename, strerror (errno));
262 /* Returns nonzero only if casefile CF is stored in memory (instead of on
265 casefile_in_core (const struct casefile *cf)
269 return cf->storage == MEMORY;
272 /* Puts a casefile to "sleep", that is, minimizes the resources
273 needed for it by closing its file descriptor and freeing its
274 buffer. This is useful if we need so many casefiles that we
275 might not have enough memory and file descriptors to go
278 For simplicity, this implementation always converts the
279 casefile to reader mode. If this turns out to be a problem,
280 with a little extra work we could also support sleeping
283 casefile_sleep (const struct casefile *cf_)
285 struct casefile *cf = (struct casefile *) cf_;
288 casefile_mode_reader (cf);
289 casefile_to_disk (cf);
297 if (cf->buffer != NULL)
304 /* Returns the number of `union value's in a case for CF. */
306 casefile_get_value_cnt (const struct casefile *cf)
310 return cf->value_cnt;
313 /* Returns the number of cases in casefile CF. */
315 casefile_get_case_cnt (const struct casefile *cf)
322 /* Appends a copy of case C to casefile CF. Not valid after any
323 reader for CF has been created. */
325 casefile_append (struct casefile *cf, const struct ccase *c)
329 assert (cf->mode == WRITE);
331 /* Try memory first. */
332 if (cf->storage == MEMORY)
334 if (case_bytes < get_max_workspace ())
336 size_t block_idx = cf->case_cnt / CASES_PER_BLOCK;
337 size_t case_idx = cf->case_cnt % CASES_PER_BLOCK;
338 struct ccase new_case;
340 case_bytes += cf->case_acct_size;
341 case_clone (&new_case, c);
344 if ((block_idx & (block_idx - 1)) == 0)
346 size_t block_cap = block_idx == 0 ? 1 : block_idx * 2;
347 cf->cases = xrealloc (cf->cases,
348 sizeof *cf->cases * block_cap);
351 cf->cases[block_idx] = xmalloc (sizeof **cf->cases
355 case_move (&cf->cases[block_idx][case_idx], &new_case);
359 casefile_to_disk (cf);
360 assert (cf->storage == DISK);
361 write_case_to_disk (cf, c);
365 write_case_to_disk (cf, c);
370 /* Appends case C to casefile CF, which takes over ownership of
371 C. Not valid after any reader for CF has been created. */
373 casefile_append_xfer (struct casefile *cf, struct ccase *c)
375 casefile_append (cf, c);
379 /* Writes case C to casefile CF's disk buffer, first flushing the buffer to
380 disk if it would otherwise overflow. */
382 write_case_to_disk (struct casefile *cf, const struct ccase *c)
384 case_to_values (c, cf->buffer + cf->buffer_used, cf->value_cnt);
385 cf->buffer_used += cf->value_cnt;
386 if (cf->buffer_used + cf->value_cnt > cf->buffer_size)
390 /* If any bytes in CF's output buffer are used, flush them to
393 flush_buffer (struct casefile *cf)
395 if (cf->buffer_used > 0)
397 if (!full_write (cf->fd, cf->buffer,
398 cf->buffer_size * sizeof *cf->buffer))
399 msg (FE, _("Error writing temporary file: %s."), strerror (errno));
406 /* If CF is currently stored in memory, writes it to disk. Readers, if any,
407 retain their current positions. */
409 casefile_to_disk (const struct casefile *cf_)
411 struct casefile *cf = (struct casefile *) cf_;
412 struct casereader *reader;
416 if (cf->storage == MEMORY)
418 size_t idx, block_cnt;
420 assert (cf->filename == NULL);
421 assert (cf->fd == -1);
422 assert (cf->buffer_used == 0);
425 if (!make_temp_file (&cf->fd, &cf->filename))
427 cf->buffer = xmalloc (cf->buffer_size * sizeof *cf->buffer);
428 memset (cf->buffer, 0, cf->buffer_size * sizeof *cf->buffer);
430 case_bytes -= cf->case_cnt * cf->case_acct_size;
431 for (idx = 0; idx < cf->case_cnt; idx++)
433 size_t block_idx = idx / CASES_PER_BLOCK;
434 size_t case_idx = idx % CASES_PER_BLOCK;
435 struct ccase *c = &cf->cases[block_idx][case_idx];
436 write_case_to_disk (cf, c);
440 block_cnt = DIV_RND_UP (cf->case_cnt, CASES_PER_BLOCK);
441 for (idx = 0; idx < block_cnt; idx++)
442 free (cf->cases[idx]);
447 if (cf->mode == READ)
450 for (reader = cf->readers; reader != NULL; reader = reader->next)
451 reader_open_file (reader);
455 /* Changes CF to reader mode, ensuring that no more cases may be
456 added. Creating a casereader for CF has the same effect. */
458 casefile_mode_reader (struct casefile *cf)
464 /* Creates and returns a casereader for CF. A casereader can be used to
465 sequentially read the cases in a casefile. */
467 casefile_get_reader (const struct casefile *cf_)
469 struct casefile *cf = (struct casefile *) cf_;
470 struct casereader *reader;
473 assert (!cf->being_destroyed);
475 /* Flush the buffer to disk if it's not empty. */
476 if (cf->mode == WRITE && cf->storage == DISK)
481 reader = xmalloc (sizeof *reader);
482 reader->next = cf->readers;
483 if (cf->readers != NULL)
484 reader->next->prev = reader;
485 cf->readers = reader;
488 reader->case_idx = 0;
489 reader->destructive = 0;
491 reader->buffer = NULL;
492 reader->buffer_pos = 0;
493 case_nullify (&reader->c);
495 if (reader->cf->storage == DISK)
496 reader_open_file (reader);
501 /* Creates and returns a destructive casereader for CF. Like a
502 normal casereader, a destructive casereader sequentially reads
503 the cases in a casefile. Unlike a normal casereader, a
504 destructive reader cannot operate concurrently with any other
505 reader. (This restriction could be relaxed in a few ways, but
506 it is so far unnecessary for other code.) */
508 casefile_get_destructive_reader (struct casefile *cf)
510 struct casereader *reader;
512 assert (cf->readers == NULL);
513 reader = casefile_get_reader (cf);
514 reader->destructive = 1;
515 cf->being_destroyed = 1;
519 /* Opens a disk file for READER and seeks to the current position as indicated
520 by case_idx. Normally the current position is the beginning of the file,
521 but casefile_to_disk may cause the file to be opened at a different
524 reader_open_file (struct casereader *reader)
526 struct casefile *cf = reader->cf;
529 if (reader->case_idx >= cf->case_cnt)
539 reader->fd = safe_open (cf->filename, O_RDONLY);
541 msg (FE, _("%s: Opening temporary file: %s."),
542 cf->filename, strerror (errno));
545 if (cf->buffer != NULL)
547 reader->buffer = cf->buffer;
552 reader->buffer = xmalloc (cf->buffer_size * sizeof *cf->buffer);
553 memset (reader->buffer, 0, cf->buffer_size * sizeof *cf->buffer);
556 if (cf->value_cnt != 0)
558 size_t buffer_case_cnt = cf->buffer_size / cf->value_cnt;
559 file_ofs = ((off_t) reader->case_idx / buffer_case_cnt
560 * cf->buffer_size * sizeof *cf->buffer);
561 reader->buffer_pos = (reader->case_idx % buffer_case_cnt
566 if (lseek (reader->fd, file_ofs, SEEK_SET) != file_ofs)
567 msg (FE, _("%s: Seeking temporary file: %s."),
568 cf->filename, strerror (errno));
570 if (cf->case_cnt > 0 && cf->value_cnt > 0)
571 fill_buffer (reader);
573 case_create (&reader->c, cf->value_cnt);
576 /* Fills READER's buffer by reading a block from disk. */
578 fill_buffer (struct casereader *reader)
580 int retval = full_read (reader->fd, reader->buffer,
581 reader->cf->buffer_size * sizeof *reader->buffer);
583 msg (FE, _("%s: Reading temporary file: %s."),
584 reader->cf->filename, strerror (errno));
585 else if (retval != reader->cf->buffer_size * sizeof *reader->buffer)
586 msg (FE, _("%s: Temporary file ended unexpectedly."),
587 reader->cf->filename);
590 /* Returns the casefile that READER reads. */
591 const struct casefile *
592 casereader_get_casefile (const struct casereader *reader)
594 assert (reader != NULL);
599 /* Reads a copy of the next case from READER into C.
600 Caller is responsible for destroying C.
601 Returns true if successful, false at end of file. */
603 casereader_read (struct casereader *reader, struct ccase *c)
605 assert (reader != NULL);
607 if (reader->case_idx >= reader->cf->case_cnt)
610 if (reader->cf->storage == MEMORY)
612 size_t block_idx = reader->case_idx / CASES_PER_BLOCK;
613 size_t case_idx = reader->case_idx % CASES_PER_BLOCK;
615 case_clone (c, &reader->cf->cases[block_idx][case_idx]);
621 if (reader->buffer_pos + reader->cf->value_cnt > reader->cf->buffer_size)
623 fill_buffer (reader);
624 reader->buffer_pos = 0;
627 case_from_values (&reader->c, reader->buffer + reader->buffer_pos,
628 reader->cf->value_cnt);
629 reader->buffer_pos += reader->cf->value_cnt;
632 case_clone (c, &reader->c);
637 /* Reads the next case from READER into C and transfers ownership
638 to the caller. Caller is responsible for destroying C.
639 Returns true if successful, false at end of file. */
641 casereader_read_xfer (struct casereader *reader, struct ccase *c)
643 assert (reader != NULL);
645 if (reader->destructive == 0
646 || reader->case_idx >= reader->cf->case_cnt
647 || reader->cf->storage == DISK)
648 return casereader_read (reader, c);
651 size_t block_idx = reader->case_idx / CASES_PER_BLOCK;
652 size_t case_idx = reader->case_idx % CASES_PER_BLOCK;
653 struct ccase *read_case = &reader->cf->cases[block_idx][case_idx];
655 case_move (c, read_case);
661 /* Reads the next case from READER into C and transfers ownership
662 to the caller. Caller is responsible for destroying C.
663 Assert-fails at end of file. */
665 casereader_read_xfer_assert (struct casereader *reader, struct ccase *c)
667 bool success = casereader_read_xfer (reader, c);
671 /* Destroys READER. */
673 casereader_destroy (struct casereader *reader)
675 assert (reader != NULL);
677 if (reader->next != NULL)
678 reader->next->prev = reader->prev;
679 if (reader->prev != NULL)
680 reader->prev->next = reader->next;
681 if (reader->cf->readers == reader)
682 reader->cf->readers = reader->next;
684 if (reader->cf->buffer == NULL)
685 reader->cf->buffer = reader->buffer;
687 free (reader->buffer);
689 if (reader->fd != -1)
691 if (reader->cf->fd == -1)
692 reader->cf->fd = reader->fd;
694 safe_close (reader->fd);
697 case_destroy (&reader->c);
702 /* Calls open(), passing FILENAME and FLAGS, repeating as necessary
703 to deal with interrupted calls. */
705 safe_open (const char *filename, int flags)
711 fd = open (filename, flags);
713 while (fd == -1 && errno == EINTR);
718 /* Calls close(), passing FD, repeating as necessary to deal with
719 interrupted calls. */
720 static int safe_close (int fd)
728 while (retval == -1 && errno == EINTR);
733 /* Registers our exit handler with atexit() if it has not already
736 register_atexit (void)
738 static int registered = 0;
742 atexit (exit_handler);
748 /* atexit() handler that closes and deletes our temporary
753 while (casefiles != NULL)
754 casefile_destroy (casefiles);
757 #include <gsl/gsl_rng.h>
762 static void test_casefile (int pattern, size_t value_cnt, size_t case_cnt);
763 static void get_random_case (struct ccase *, size_t value_cnt,
765 static void write_random_case (struct casefile *cf, size_t case_idx);
766 static void read_and_verify_random_case (struct casefile *cf,
767 struct casereader *reader,
769 static void fail_test (const char *message, ...);
772 cmd_debug_casefile (void)
774 static const size_t sizes[] =
776 1, 2, 3, 4, 5, 6, 7, 14, 15, 16, 17, 31, 55, 73,
777 100, 137, 257, 521, 1031, 2053
783 size_max = sizeof sizes / sizeof *sizes;
784 if (lex_match_id ("SMALL"))
792 return lex_end_of_command ();
794 for (pattern = 0; pattern < 6; pattern++)
798 for (size = sizes; size < sizes + size_max; size++)
802 for (case_cnt = 0; case_cnt <= case_max;
803 case_cnt = (case_cnt * 2) + 1)
804 test_casefile (pattern, *size, case_cnt);
807 printf ("Casefile tests succeeded.\n");
812 test_casefile (int pattern, size_t value_cnt, size_t case_cnt)
815 struct casereader *r1, *r2;
820 rng = gsl_rng_alloc (gsl_rng_mt19937);
821 cf = casefile_create (value_cnt);
823 casefile_to_disk (cf);
824 for (i = 0; i < case_cnt; i++)
825 write_random_case (cf, i);
828 r1 = casefile_get_reader (cf);
829 r2 = casefile_get_reader (cf);
834 for (i = 0; i < case_cnt; i++)
836 read_and_verify_random_case (cf, r1, i);
837 read_and_verify_random_case (cf, r2, i);
841 for (i = 0; i < case_cnt; i++)
842 read_and_verify_random_case (cf, r1, i);
843 for (i = 0; i < case_cnt; i++)
844 read_and_verify_random_case (cf, r2, i);
849 for (i = j = 0; i < case_cnt; i++)
851 read_and_verify_random_case (cf, r1, i);
852 if (gsl_rng_get (rng) % pattern == 0)
853 read_and_verify_random_case (cf, r2, j++);
854 if (i == case_cnt / 2)
855 casefile_to_disk (cf);
857 for (; j < case_cnt; j++)
858 read_and_verify_random_case (cf, r2, j);
861 if (casereader_read (r1, &c))
862 fail_test ("Casereader 1 not at end of file.");
863 if (casereader_read (r2, &c))
864 fail_test ("Casereader 2 not at end of file.");
866 casereader_destroy (r1);
868 casereader_destroy (r2);
871 r1 = casefile_get_destructive_reader (cf);
872 for (i = 0; i < case_cnt; i++)
874 struct ccase read_case, expected_case;
876 get_random_case (&expected_case, value_cnt, i);
877 if (!casereader_read_xfer (r1, &read_case))
878 fail_test ("Premature end of casefile.");
879 for (j = 0; j < value_cnt; j++)
881 double a = case_num (&read_case, j);
882 double b = case_num (&expected_case, j);
884 fail_test ("Case %lu fails comparison.", (unsigned long) i);
886 case_destroy (&expected_case);
887 case_destroy (&read_case);
889 casereader_destroy (r1);
891 casefile_destroy (cf);
896 get_random_case (struct ccase *c, size_t value_cnt, size_t case_idx)
899 case_create (c, value_cnt);
900 for (i = 0; i < value_cnt; i++)
901 case_data_rw (c, i)->f = case_idx % 257 + i;
905 write_random_case (struct casefile *cf, size_t case_idx)
908 get_random_case (&c, casefile_get_value_cnt (cf), case_idx);
909 casefile_append_xfer (cf, &c);
913 read_and_verify_random_case (struct casefile *cf,
914 struct casereader *reader, size_t case_idx)
916 struct ccase read_case, expected_case;
920 value_cnt = casefile_get_value_cnt (cf);
921 get_random_case (&expected_case, value_cnt, case_idx);
922 if (!casereader_read (reader, &read_case))
923 fail_test ("Premature end of casefile.");
924 for (i = 0; i < value_cnt; i++)
926 double a = case_num (&read_case, i);
927 double b = case_num (&expected_case, i);
929 fail_test ("Case %lu fails comparison.", (unsigned long) case_idx);
931 case_destroy (&read_case);
932 case_destroy (&expected_case);
936 fail_test (const char *message, ...)
940 va_start (args, message);
941 vprintf (message, args);