1 /* PSPP - computes sample statistics.
2 Copyright (C) 2004 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
37 #define IO_BUF_SIZE (8192 / sizeof (union value))
39 /* A casefile represents a sequentially accessible stream of
42 If workspace allows, a casefile is maintained in memory. If
43 workspace overflows, then the casefile is pushed to disk. In
44 either case the interface presented to callers is kept the
47 The life cycle of a casefile consists of up to three phases:
49 1. Writing. The casefile initially contains no cases. In
50 this phase, any number of cases may be appended to the
51 end of a casefile. (Cases are never inserted in the
52 middle or before the beginning of a casefile.)
54 Use casefile_append() or casefile_append_xfer() to
55 append a case to a casefile.
57 2. Reading. The casefile may be read sequentially,
58 starting from the beginning, by "casereaders". Any
59 number of casereaders may be created, at any time,
60 during the reading phase. Each casereader has an
61 independent position in the casefile.
63 Casereaders may only move forward. They cannot move
64 backward to arbitrary records or seek randomly.
65 Cloning casereaders is possible, but it is not yet
68 Use casefile_get_reader() to create a casereader for
69 use in phase 2. This also transitions from phase 1 to
70 phase 2. Calling casefile_mode_reader() makes the same
71 transition, without creating a casereader.
73 Use casereader_read(), casereader_read_xfer(), or
74 casereader_read_xfer_assert() to read a case from a
75 casereader. Use casereader_destroy() to discard a
76 casereader when it is no longer needed.
78 3. Destruction. This phase is optional. The casefile is
79 also read with casereaders in this phase, but the
80 ability to create new casereaders is curtailed.
82 In this phase, casereaders could still be cloned (once
83 we eventually implement cloning).
85 To transition from phase 1 or 2 to phase 3 and create a
86 casereader, call casefile_get_destructive_reader().
87 The same functions apply to the casereader obtained
88 this way as apply to casereaders obtained in phase 2.
90 After casefile_get_destructive_reader() is called, no
91 more casereaders may be created with
92 casefile_get_reader() or
93 casefile_get_destructive_reader(). (If cloning of
94 casereaders were implemented, it would still be
97 The purpose of the limitations applied to casereaders
98 in phase 3 is to allow in-memory casefiles to fully
99 transfer ownership of cases to the casereaders,
100 avoiding the need for extra copies of case data. For
101 relatively static data sets with many variables, I
102 suspect (without evidence) that this may be a big
105 When a casefile is no longer needed, it may be destroyed with
106 casefile_destroy(). This function will also destroy any
107 remaining casereaders. */
109 /* In-memory cases are arranged in an array of arrays. The top
110 level is variable size and the size of each bottom level array
111 is fixed at the number of cases defined here. */
112 #define CASES_PER_BLOCK 128
118 struct casefile *next, *prev; /* Next, prev in global list. */
119 size_t value_cnt; /* Case size in `union value's. */
120 size_t case_acct_size; /* Case size for accounting. */
121 unsigned long case_cnt; /* Number of cases stored. */
122 enum { MEMORY, DISK } storage; /* Where cases are stored. */
123 enum { WRITE, READ } mode; /* Is writing or reading allowed? */
124 struct casereader *readers; /* List of our readers. */
125 int being_destroyed; /* Does a destructive reader exist? */
127 /* Memory storage. */
128 struct ccase **cases; /* Pointer to array of cases. */
131 int fd; /* File descriptor, -1 if none. */
132 char *filename; /* Filename. */
133 union value *buffer; /* I/O buffer, NULL if none. */
134 size_t buffer_used; /* Number of values used in buffer. */
135 size_t buffer_size; /* Buffer size in values. */
138 /* For reading out the cases in a casefile. */
141 struct casereader *next, *prev; /* Next, prev in casefile's list. */
142 struct casefile *cf; /* Our casefile. */
143 unsigned long case_idx; /* Case number of current case. */
144 int destructive; /* Is this a destructive reader? */
147 int fd; /* File descriptor. */
148 union value *buffer; /* I/O buffer. */
149 size_t buffer_pos; /* Offset of buffer position. */
150 struct ccase c; /* Current case. */
153 /* Return the case number of the current case */
155 casereader_cnum(const struct casereader *r)
160 /* Doubly linked list of all casefiles. */
161 static struct casefile *casefiles;
163 /* Number of bytes of case allocated in in-memory casefiles. */
164 static size_t case_bytes;
166 static void register_atexit (void);
167 static void exit_handler (void);
169 static void reader_open_file (struct casereader *reader);
170 static void write_case_to_disk (struct casefile *cf, const struct ccase *c);
171 static void flush_buffer (struct casefile *cf);
172 static void fill_buffer (struct casereader *reader);
174 static int safe_open (const char *filename, int flags);
175 static int safe_close (int fd);
176 static int full_read (int fd, void *buffer, size_t size);
177 static int full_write (int fd, const void *buffer, size_t size);
179 /* Creates and returns a casefile to store cases of VALUE_CNT
180 `union value's each. */
182 casefile_create (size_t value_cnt)
184 struct casefile *cf = xmalloc (sizeof *cf);
185 cf->next = casefiles;
187 if (cf->next != NULL)
190 cf->value_cnt = value_cnt;
191 cf->case_acct_size = (cf->value_cnt + 4) * sizeof *cf->buffer;
193 cf->storage = MEMORY;
196 cf->being_destroyed = 0;
201 cf->buffer_size = ROUND_UP (cf->value_cnt, IO_BUF_SIZE);
202 if (cf->value_cnt > 0 && cf->buffer_size % cf->value_cnt > 64)
203 cf->buffer_size = cf->value_cnt;
209 /* Destroys casefile CF. */
211 casefile_destroy (struct casefile *cf)
215 if (cf->next != NULL)
216 cf->next->prev = cf->prev;
217 if (cf->prev != NULL)
218 cf->prev->next = cf->next;
220 casefiles = cf->next;
222 while (cf->readers != NULL)
223 casereader_destroy (cf->readers);
225 if (cf->cases != NULL)
227 size_t idx, block_cnt;
229 case_bytes -= cf->case_cnt * cf->case_acct_size;
230 for (idx = 0; idx < cf->case_cnt; idx++)
232 size_t block_idx = idx / CASES_PER_BLOCK;
233 size_t case_idx = idx % CASES_PER_BLOCK;
234 struct ccase *c = &cf->cases[block_idx][case_idx];
238 block_cnt = DIV_RND_UP (cf->case_cnt, CASES_PER_BLOCK);
239 for (idx = 0; idx < block_cnt; idx++)
240 free (cf->cases[idx]);
248 if (cf->filename != NULL && remove (cf->filename) == -1)
249 msg (ME, _("%s: Removing temporary file: %s."),
250 cf->filename, strerror (errno));
259 /* Returns nonzero only if casefile CF is stored in memory (instead of on
262 casefile_in_core (const struct casefile *cf)
266 return cf->storage == MEMORY;
269 /* Puts a casefile to "sleep", that is, minimizes the resources
270 needed for it by closing its file descriptor and freeing its
271 buffer. This is useful if we need so many casefiles that we
272 might not have enough memory and file descriptors to go
275 For simplicity, this implementation always converts the
276 casefile to reader mode. If this turns out to be a problem,
277 with a little extra work we could also support sleeping
280 casefile_sleep (const struct casefile *cf_)
282 struct casefile *cf = (struct casefile *) cf_;
285 casefile_mode_reader (cf);
286 casefile_to_disk (cf);
294 if (cf->buffer != NULL)
301 /* Returns the number of `union value's in a case for CF. */
303 casefile_get_value_cnt (const struct casefile *cf)
307 return cf->value_cnt;
310 /* Returns the number of cases in casefile CF. */
312 casefile_get_case_cnt (const struct casefile *cf)
319 /* Appends a copy of case C to casefile CF. Not valid after any
320 reader for CF has been created. */
322 casefile_append (struct casefile *cf, const struct ccase *c)
326 assert (cf->mode == WRITE);
328 /* Try memory first. */
329 if (cf->storage == MEMORY)
331 if (case_bytes < get_max_workspace ())
333 size_t block_idx = cf->case_cnt / CASES_PER_BLOCK;
334 size_t case_idx = cf->case_cnt % CASES_PER_BLOCK;
335 struct ccase new_case;
337 case_bytes += cf->case_acct_size;
338 case_clone (&new_case, c);
341 if ((block_idx & (block_idx - 1)) == 0)
343 size_t block_cap = block_idx == 0 ? 1 : block_idx * 2;
344 cf->cases = xrealloc (cf->cases,
345 sizeof *cf->cases * block_cap);
348 cf->cases[block_idx] = xmalloc (sizeof **cf->cases
352 case_move (&cf->cases[block_idx][case_idx], &new_case);
356 casefile_to_disk (cf);
357 assert (cf->storage == DISK);
358 write_case_to_disk (cf, c);
362 write_case_to_disk (cf, c);
367 /* Appends case C to casefile CF, which takes over ownership of
368 C. Not valid after any reader for CF has been created. */
370 casefile_append_xfer (struct casefile *cf, struct ccase *c)
372 casefile_append (cf, c);
376 /* Writes case C to casefile CF's disk buffer, first flushing the buffer to
377 disk if it would otherwise overflow. */
379 write_case_to_disk (struct casefile *cf, const struct ccase *c)
381 case_to_values (c, cf->buffer + cf->buffer_used, cf->value_cnt);
382 cf->buffer_used += cf->value_cnt;
383 if (cf->buffer_used + cf->value_cnt > cf->buffer_size)
387 /* If any bytes in CF's output buffer are used, flush them to
390 flush_buffer (struct casefile *cf)
392 if (cf->buffer_used > 0)
394 if (!full_write (cf->fd, cf->buffer,
395 cf->buffer_size * sizeof *cf->buffer))
396 msg (FE, _("Error writing temporary file: %s."), strerror (errno));
403 /* If CF is currently stored in memory, writes it to disk. Readers, if any,
404 retain their current positions. */
406 casefile_to_disk (const struct casefile *cf_)
408 struct casefile *cf = (struct casefile *) cf_;
409 struct casereader *reader;
413 if (cf->storage == MEMORY)
415 size_t idx, block_cnt;
417 assert (cf->filename == NULL);
418 assert (cf->fd == -1);
419 assert (cf->buffer_used == 0);
422 if (!make_temp_file (&cf->fd, &cf->filename))
424 cf->buffer = xmalloc (cf->buffer_size * sizeof *cf->buffer);
425 memset (cf->buffer, 0, cf->buffer_size * sizeof *cf->buffer);
427 case_bytes -= cf->case_cnt * cf->case_acct_size;
428 for (idx = 0; idx < cf->case_cnt; idx++)
430 size_t block_idx = idx / CASES_PER_BLOCK;
431 size_t case_idx = idx % CASES_PER_BLOCK;
432 struct ccase *c = &cf->cases[block_idx][case_idx];
433 write_case_to_disk (cf, c);
437 block_cnt = DIV_RND_UP (cf->case_cnt, CASES_PER_BLOCK);
438 for (idx = 0; idx < block_cnt; idx++)
439 free (cf->cases[idx]);
444 if (cf->mode == READ)
447 for (reader = cf->readers; reader != NULL; reader = reader->next)
448 reader_open_file (reader);
452 /* Changes CF to reader mode, ensuring that no more cases may be
453 added. Creating a casereader for CF has the same effect. */
455 casefile_mode_reader (struct casefile *cf)
461 /* Creates and returns a casereader for CF. A casereader can be used to
462 sequentially read the cases in a casefile. */
464 casefile_get_reader (const struct casefile *cf_)
466 struct casefile *cf = (struct casefile *) cf_;
467 struct casereader *reader;
470 assert (!cf->being_destroyed);
472 /* Flush the buffer to disk if it's not empty. */
473 if (cf->mode == WRITE && cf->storage == DISK)
478 reader = xmalloc (sizeof *reader);
479 reader->next = cf->readers;
480 if (cf->readers != NULL)
481 reader->next->prev = reader;
482 cf->readers = reader;
485 reader->case_idx = 0;
486 reader->destructive = 0;
488 reader->buffer = NULL;
489 reader->buffer_pos = 0;
490 case_nullify (&reader->c);
492 if (reader->cf->storage == DISK)
493 reader_open_file (reader);
498 /* Creates and returns a destructive casereader for CF. Like a
499 normal casereader, a destructive casereader sequentially reads
500 the cases in a casefile. Unlike a normal casereader, a
501 destructive reader cannot operate concurrently with any other
502 reader. (This restriction could be relaxed in a few ways, but
503 it is so far unnecessary for other code.) */
505 casefile_get_destructive_reader (struct casefile *cf)
507 struct casereader *reader;
509 assert (cf->readers == NULL);
510 reader = casefile_get_reader (cf);
511 reader->destructive = 1;
512 cf->being_destroyed = 1;
516 /* Opens a disk file for READER and seeks to the current position as indicated
517 by case_idx. Normally the current position is the beginning of the file,
518 but casefile_to_disk may cause the file to be opened at a different
521 reader_open_file (struct casereader *reader)
523 struct casefile *cf = reader->cf;
526 if (reader->case_idx >= cf->case_cnt)
536 reader->fd = safe_open (cf->filename, O_RDONLY);
538 msg (FE, _("%s: Opening temporary file: %s."),
539 cf->filename, strerror (errno));
542 if (cf->buffer != NULL)
544 reader->buffer = cf->buffer;
549 reader->buffer = xmalloc (cf->buffer_size * sizeof *cf->buffer);
550 memset (reader->buffer, 0, cf->buffer_size * sizeof *cf->buffer);
553 if (cf->value_cnt != 0)
555 size_t buffer_case_cnt = cf->buffer_size / cf->value_cnt;
556 file_ofs = ((off_t) reader->case_idx / buffer_case_cnt
557 * cf->buffer_size * sizeof *cf->buffer);
558 reader->buffer_pos = (reader->case_idx % buffer_case_cnt
563 if (lseek (reader->fd, file_ofs, SEEK_SET) != file_ofs)
564 msg (FE, _("%s: Seeking temporary file: %s."),
565 cf->filename, strerror (errno));
567 if (cf->case_cnt > 0 && cf->value_cnt > 0)
568 fill_buffer (reader);
570 case_create (&reader->c, cf->value_cnt);
573 /* Fills READER's buffer by reading a block from disk. */
575 fill_buffer (struct casereader *reader)
577 int retval = full_read (reader->fd, reader->buffer,
578 reader->cf->buffer_size * sizeof *reader->buffer);
580 msg (FE, _("%s: Reading temporary file: %s."),
581 reader->cf->filename, strerror (errno));
582 else if (retval != reader->cf->buffer_size * sizeof *reader->buffer)
583 msg (FE, _("%s: Temporary file ended unexpectedly."),
584 reader->cf->filename);
587 /* Returns the casefile that READER reads. */
588 const struct casefile *
589 casereader_get_casefile (const struct casereader *reader)
591 assert (reader != NULL);
596 /* Reads a copy of the next case from READER into C.
597 Caller is responsible for destroying C.
598 Returns true if successful, false at end of file. */
600 casereader_read (struct casereader *reader, struct ccase *c)
602 assert (reader != NULL);
604 if (reader->case_idx >= reader->cf->case_cnt)
607 if (reader->cf->storage == MEMORY)
609 size_t block_idx = reader->case_idx / CASES_PER_BLOCK;
610 size_t case_idx = reader->case_idx % CASES_PER_BLOCK;
612 case_clone (c, &reader->cf->cases[block_idx][case_idx]);
618 if (reader->buffer_pos + reader->cf->value_cnt > reader->cf->buffer_size)
620 fill_buffer (reader);
621 reader->buffer_pos = 0;
624 case_from_values (&reader->c, reader->buffer + reader->buffer_pos,
625 reader->cf->value_cnt);
626 reader->buffer_pos += reader->cf->value_cnt;
629 case_clone (c, &reader->c);
634 /* Reads the next case from READER into C and transfers ownership
635 to the caller. Caller is responsible for destroying C.
636 Returns true if successful, false at end of file. */
638 casereader_read_xfer (struct casereader *reader, struct ccase *c)
640 assert (reader != NULL);
642 if (reader->destructive == 0
643 || reader->case_idx >= reader->cf->case_cnt
644 || reader->cf->storage == DISK)
645 return casereader_read (reader, c);
648 size_t block_idx = reader->case_idx / CASES_PER_BLOCK;
649 size_t case_idx = reader->case_idx % CASES_PER_BLOCK;
650 struct ccase *read_case = &reader->cf->cases[block_idx][case_idx];
652 case_move (c, read_case);
658 /* Reads the next case from READER into C and transfers ownership
659 to the caller. Caller is responsible for destroying C.
660 Assert-fails at end of file. */
662 casereader_read_xfer_assert (struct casereader *reader, struct ccase *c)
664 bool success = casereader_read_xfer (reader, c);
668 /* Destroys READER. */
670 casereader_destroy (struct casereader *reader)
672 assert (reader != NULL);
674 if (reader->next != NULL)
675 reader->next->prev = reader->prev;
676 if (reader->prev != NULL)
677 reader->prev->next = reader->next;
678 if (reader->cf->readers == reader)
679 reader->cf->readers = reader->next;
681 if (reader->cf->buffer == NULL)
682 reader->cf->buffer = reader->buffer;
684 free (reader->buffer);
686 if (reader->fd != -1)
688 if (reader->cf->fd == -1)
689 reader->cf->fd = reader->fd;
691 safe_close (reader->fd);
694 case_destroy (&reader->c);
699 /* Calls open(), passing FILENAME and FLAGS, repeating as necessary
700 to deal with interrupted calls. */
702 safe_open (const char *filename, int flags)
708 fd = open (filename, flags);
710 while (fd == -1 && errno == EINTR);
715 /* Calls close(), passing FD, repeating as necessary to deal with
716 interrupted calls. */
717 static int safe_close (int fd)
725 while (retval == -1 && errno == EINTR);
730 /* Calls read(), passing FD, BUFFER, and SIZE, repeating as
731 necessary to deal with interrupted calls. */
733 full_read (int fd, void *buffer_, size_t size)
735 char *buffer = buffer_;
736 size_t bytes_read = 0;
738 while (bytes_read < size)
740 int retval = read (fd, buffer + bytes_read, size - bytes_read);
742 bytes_read += retval;
743 else if (retval == 0)
745 else if (errno != EINTR)
752 /* Calls write(), passing FD, BUFFER, and SIZE, repeating as
753 necessary to deal with interrupted calls. */
755 full_write (int fd, const void *buffer_, size_t size)
757 const char *buffer = buffer_;
758 size_t bytes_written = 0;
760 while (bytes_written < size)
762 int retval = write (fd, buffer + bytes_written, size - bytes_written);
764 bytes_written += retval;
765 else if (errno != EINTR)
769 return bytes_written;
773 /* Registers our exit handler with atexit() if it has not already
776 register_atexit (void)
778 static int registered = 0;
782 atexit (exit_handler);
788 /* atexit() handler that closes and deletes our temporary
793 while (casefiles != NULL)
794 casefile_destroy (casefiles);
797 #include <gsl/gsl_rng.h>
802 static void test_casefile (int pattern, size_t value_cnt, size_t case_cnt);
803 static void get_random_case (struct ccase *, size_t value_cnt,
805 static void write_random_case (struct casefile *cf, size_t case_idx);
806 static void read_and_verify_random_case (struct casefile *cf,
807 struct casereader *reader,
809 static void fail_test (const char *message, ...);
812 cmd_debug_casefile (void)
814 static const size_t sizes[] =
816 1, 2, 3, 4, 5, 6, 7, 14, 15, 16, 17, 31, 55, 73,
817 100, 137, 257, 521, 1031, 2053
823 size_max = sizeof sizes / sizeof *sizes;
824 if (lex_match_id ("SMALL"))
832 return lex_end_of_command ();
834 for (pattern = 0; pattern < 6; pattern++)
838 for (size = sizes; size < sizes + size_max; size++)
842 for (case_cnt = 0; case_cnt <= case_max;
843 case_cnt = (case_cnt * 2) + 1)
844 test_casefile (pattern, *size, case_cnt);
847 printf ("Casefile tests succeeded.\n");
852 test_casefile (int pattern, size_t value_cnt, size_t case_cnt)
855 struct casereader *r1, *r2;
860 rng = gsl_rng_alloc (gsl_rng_mt19937);
861 cf = casefile_create (value_cnt);
863 casefile_to_disk (cf);
864 for (i = 0; i < case_cnt; i++)
865 write_random_case (cf, i);
868 r1 = casefile_get_reader (cf);
869 r2 = casefile_get_reader (cf);
874 for (i = 0; i < case_cnt; i++)
876 read_and_verify_random_case (cf, r1, i);
877 read_and_verify_random_case (cf, r2, i);
881 for (i = 0; i < case_cnt; i++)
882 read_and_verify_random_case (cf, r1, i);
883 for (i = 0; i < case_cnt; i++)
884 read_and_verify_random_case (cf, r2, i);
889 for (i = j = 0; i < case_cnt; i++)
891 read_and_verify_random_case (cf, r1, i);
892 if (gsl_rng_get (rng) % pattern == 0)
893 read_and_verify_random_case (cf, r2, j++);
894 if (i == case_cnt / 2)
895 casefile_to_disk (cf);
897 for (; j < case_cnt; j++)
898 read_and_verify_random_case (cf, r2, j);
901 if (casereader_read (r1, &c))
902 fail_test ("Casereader 1 not at end of file.");
903 if (casereader_read (r2, &c))
904 fail_test ("Casereader 2 not at end of file.");
906 casereader_destroy (r1);
908 casereader_destroy (r2);
911 r1 = casefile_get_destructive_reader (cf);
912 for (i = 0; i < case_cnt; i++)
914 struct ccase read_case, expected_case;
916 get_random_case (&expected_case, value_cnt, i);
917 if (!casereader_read_xfer (r1, &read_case))
918 fail_test ("Premature end of casefile.");
919 for (j = 0; j < value_cnt; j++)
921 double a = case_num (&read_case, j);
922 double b = case_num (&expected_case, j);
924 fail_test ("Case %lu fails comparison.", (unsigned long) i);
926 case_destroy (&expected_case);
927 case_destroy (&read_case);
929 casereader_destroy (r1);
931 casefile_destroy (cf);
936 get_random_case (struct ccase *c, size_t value_cnt, size_t case_idx)
939 case_create (c, value_cnt);
940 for (i = 0; i < value_cnt; i++)
941 case_data_rw (c, i)->f = case_idx % 257 + i;
945 write_random_case (struct casefile *cf, size_t case_idx)
948 get_random_case (&c, casefile_get_value_cnt (cf), case_idx);
949 casefile_append_xfer (cf, &c);
953 read_and_verify_random_case (struct casefile *cf,
954 struct casereader *reader, size_t case_idx)
956 struct ccase read_case, expected_case;
960 value_cnt = casefile_get_value_cnt (cf);
961 get_random_case (&expected_case, value_cnt, case_idx);
962 if (!casereader_read (reader, &read_case))
963 fail_test ("Premature end of casefile.");
964 for (i = 0; i < value_cnt; i++)
966 double a = case_num (&read_case, i);
967 double b = case_num (&expected_case, i);
969 fail_test ("Case %lu fails comparison.", (unsigned long) case_idx);
971 case_destroy (&read_case);
972 case_destroy (&expected_case);
976 fail_test (const char *message, ...)
980 va_start (args, message);
981 vprintf (message, args);