1 /* PSPP - computes sample statistics.
2 Copyright (C) 2004 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
37 #ifdef HAVE_VALGRIND_VALGRIND_H
38 #include <valgrind/valgrind.h>
41 #define IO_BUF_SIZE (8192 / sizeof (union value))
43 /* A casefile is a sequentially accessible array of immutable
44 cases. It may be stored in memory or on disk as workspace
45 allows. Cases may be appended to the end of the file. Cases
46 may be read sequentially starting from the beginning of the
47 file. Once any cases have been read, no more cases may be
48 appended. The entire file is discarded at once. */
50 /* In-memory cases are arranged in an array of arrays. The top
51 level is variable size and the size of each bottom level array
52 is fixed at the number of cases defined here. */
53 #define CASES_PER_BLOCK 128
59 struct casefile *next, *prev; /* Next, prev in global list. */
60 size_t value_cnt; /* Case size in `union value's. */
61 size_t case_acct_size; /* Case size for accounting. */
62 unsigned long case_cnt; /* Number of cases stored. */
63 enum { MEMORY, DISK } storage; /* Where cases are stored. */
64 enum { WRITE, READ } mode; /* Is writing or reading allowed? */
65 struct casereader *readers; /* List of our readers. */
66 int being_destroyed; /* Does a destructive reader exist? */
69 struct ccase **cases; /* Pointer to array of cases. */
72 int fd; /* File descriptor, -1 if none. */
73 char *filename; /* Filename. */
74 union value *buffer; /* I/O buffer, NULL if none. */
75 size_t buffer_used; /* Number of values used in buffer. */
76 size_t buffer_size; /* Buffer size in values. */
79 /* For reading out the cases in a casefile. */
82 struct casereader *next, *prev; /* Next, prev in casefile's list. */
83 struct casefile *cf; /* Our casefile. */
84 unsigned long case_idx; /* Case number of current case. */
85 int destructive; /* Is this a destructive reader? */
88 int fd; /* File descriptor. */
89 union value *buffer; /* I/O buffer. */
90 size_t buffer_pos; /* Offset of buffer position. */
91 struct ccase c; /* Current case. */
94 /* Return the case number of the current case */
96 casereader_cnum(const struct casereader *r)
101 /* Doubly linked list of all casefiles. */
102 static struct casefile *casefiles;
104 /* Number of bytes of case allocated in in-memory casefiles. */
105 static size_t case_bytes;
107 static void register_atexit (void);
108 static void exit_handler (void);
110 static void reader_open_file (struct casereader *reader);
111 static void write_case_to_disk (struct casefile *cf, const struct ccase *c);
112 static void flush_buffer (struct casefile *cf);
113 static void fill_buffer (struct casereader *reader);
115 static int safe_open (const char *filename, int flags);
116 static int safe_close (int fd);
117 static int full_read (int fd, void *buffer, size_t size);
118 static int full_write (int fd, const void *buffer, size_t size);
120 /* Creates and returns a casefile to store cases of VALUE_CNT
121 `union value's each. */
123 casefile_create (size_t value_cnt)
125 struct casefile *cf = xmalloc (sizeof *cf);
126 cf->next = casefiles;
128 if (cf->next != NULL)
131 cf->value_cnt = value_cnt;
132 cf->case_acct_size = (cf->value_cnt + 4) * sizeof *cf->buffer;
134 cf->storage = MEMORY;
137 cf->being_destroyed = 0;
142 cf->buffer_size = ROUND_UP (cf->value_cnt, IO_BUF_SIZE);
143 if (cf->value_cnt > 0 && cf->buffer_size % cf->value_cnt > 64)
144 cf->buffer_size = cf->value_cnt;
150 /* Destroys casefile CF. */
152 casefile_destroy (struct casefile *cf)
156 if (cf->next != NULL)
157 cf->next->prev = cf->prev;
158 if (cf->prev != NULL)
159 cf->prev->next = cf->next;
161 casefiles = cf->next;
163 while (cf->readers != NULL)
164 casereader_destroy (cf->readers);
166 if (cf->cases != NULL)
168 size_t idx, block_cnt;
170 case_bytes -= cf->case_cnt * cf->case_acct_size;
171 for (idx = 0; idx < cf->case_cnt; idx++)
173 size_t block_idx = idx / CASES_PER_BLOCK;
174 size_t case_idx = idx % CASES_PER_BLOCK;
175 struct ccase *c = &cf->cases[block_idx][case_idx];
179 block_cnt = DIV_RND_UP (cf->case_cnt, CASES_PER_BLOCK);
180 for (idx = 0; idx < block_cnt; idx++)
181 free (cf->cases[idx]);
189 if (cf->filename != NULL && remove (cf->filename) == -1)
190 msg (ME, _("%s: Removing temporary file: %s."),
191 cf->filename, strerror (errno));
200 /* Returns nonzero only if casefile CF is stored in memory (instead of on
203 casefile_in_core (const struct casefile *cf)
207 return cf->storage == MEMORY;
210 /* Puts a casefile to "sleep", that is, minimizes the resources
211 needed for it by closing its file descriptor and freeing its
212 buffer. This is useful if we need so many casefiles that we
213 might not have enough memory and file descriptors to go
216 For simplicity, this implementation always converts the
217 casefile to reader mode. If this turns out to be a problem,
218 with a little extra work we could also support sleeping
221 casefile_sleep (const struct casefile *cf_)
223 struct casefile *cf = (struct casefile *) cf_;
226 casefile_mode_reader (cf);
227 casefile_to_disk (cf);
235 if (cf->buffer != NULL)
242 /* Returns the number of `union value's in a case for CF. */
244 casefile_get_value_cnt (const struct casefile *cf)
248 return cf->value_cnt;
251 /* Returns the number of cases in casefile CF. */
253 casefile_get_case_cnt (const struct casefile *cf)
260 /* Appends a copy of case C to casefile CF. Not valid after any
261 reader for CF has been created. */
263 casefile_append (struct casefile *cf, const struct ccase *c)
267 assert (cf->mode == WRITE);
269 /* Try memory first. */
270 if (cf->storage == MEMORY)
272 if (case_bytes < get_max_workspace ())
274 size_t block_idx = cf->case_cnt / CASES_PER_BLOCK;
275 size_t case_idx = cf->case_cnt % CASES_PER_BLOCK;
276 struct ccase new_case;
278 case_bytes += cf->case_acct_size;
279 case_clone (&new_case, c);
282 if ((block_idx & (block_idx - 1)) == 0)
284 size_t block_cap = block_idx == 0 ? 1 : block_idx * 2;
285 cf->cases = xrealloc (cf->cases,
286 sizeof *cf->cases * block_cap);
289 cf->cases[block_idx] = xmalloc (sizeof **cf->cases
293 case_move (&cf->cases[block_idx][case_idx], &new_case);
297 casefile_to_disk (cf);
298 assert (cf->storage == DISK);
299 write_case_to_disk (cf, c);
303 write_case_to_disk (cf, c);
308 /* Appends case C to casefile CF, which takes over ownership of
309 C. Not valid after any reader for CF has been created. */
311 casefile_append_xfer (struct casefile *cf, struct ccase *c)
313 casefile_append (cf, c);
317 /* Writes case C to casefile CF's disk buffer, first flushing the buffer to
318 disk if it would otherwise overflow. */
320 write_case_to_disk (struct casefile *cf, const struct ccase *c)
322 case_to_values (c, cf->buffer + cf->buffer_used, cf->value_cnt);
323 cf->buffer_used += cf->value_cnt;
324 if (cf->buffer_used + cf->value_cnt > cf->buffer_size)
328 /* If any bytes in CF's output buffer are used, flush them to
331 flush_buffer (struct casefile *cf)
333 if (cf->buffer_used > 0)
335 if (!full_write (cf->fd, cf->buffer,
336 cf->buffer_size * sizeof *cf->buffer))
337 msg (FE, _("Error writing temporary file: %s."), strerror (errno));
344 /* If CF is currently stored in memory, writes it to disk. Readers, if any,
345 retain their current positions. */
347 casefile_to_disk (const struct casefile *cf_)
349 struct casefile *cf = (struct casefile *) cf_;
350 struct casereader *reader;
354 if (cf->storage == MEMORY)
356 size_t idx, block_cnt;
358 assert (cf->filename == NULL);
359 assert (cf->fd == -1);
360 assert (cf->buffer_used == 0);
363 if (!make_temp_file (&cf->fd, &cf->filename))
365 cf->buffer = xmalloc (cf->buffer_size * sizeof *cf->buffer);
366 memset (cf->buffer, 0, cf->buffer_size * sizeof *cf->buffer);
368 case_bytes -= cf->case_cnt * cf->case_acct_size;
369 for (idx = 0; idx < cf->case_cnt; idx++)
371 size_t block_idx = idx / CASES_PER_BLOCK;
372 size_t case_idx = idx % CASES_PER_BLOCK;
373 struct ccase *c = &cf->cases[block_idx][case_idx];
374 write_case_to_disk (cf, c);
378 block_cnt = DIV_RND_UP (cf->case_cnt, CASES_PER_BLOCK);
379 for (idx = 0; idx < block_cnt; idx++)
380 free (cf->cases[idx]);
385 if (cf->mode == READ)
388 for (reader = cf->readers; reader != NULL; reader = reader->next)
389 reader_open_file (reader);
393 /* Changes CF to reader mode, ensuring that no more cases may be
394 added. Creating a casereader for CF has the same effect. */
396 casefile_mode_reader (struct casefile *cf)
402 /* Creates and returns a casereader for CF. A casereader can be used to
403 sequentially read the cases in a casefile. */
405 casefile_get_reader (const struct casefile *cf_)
407 struct casefile *cf = (struct casefile *) cf_;
408 struct casereader *reader;
411 assert (!cf->being_destroyed);
413 /* Flush the buffer to disk if it's not empty. */
414 if (cf->mode == WRITE && cf->storage == DISK)
419 reader = xmalloc (sizeof *reader);
420 reader->next = cf->readers;
421 if (cf->readers != NULL)
422 reader->next->prev = reader;
423 cf->readers = reader;
426 reader->case_idx = 0;
427 reader->destructive = 0;
429 reader->buffer = NULL;
430 reader->buffer_pos = 0;
431 case_nullify (&reader->c);
433 if (reader->cf->storage == DISK)
434 reader_open_file (reader);
439 /* Creates and returns a destructive casereader for CF. Like a
440 normal casereader, a destructive casereader sequentially reads
441 the cases in a casefile. Unlike a normal casereader, a
442 destructive reader cannot operate concurrently with any other
443 reader. (This restriction could be relaxed in a few ways, but
444 it is so far unnecessary for other code.) */
446 casefile_get_destructive_reader (struct casefile *cf)
448 struct casereader *reader;
450 assert (cf->readers == NULL);
451 reader = casefile_get_reader (cf);
452 reader->destructive = 1;
453 cf->being_destroyed = 1;
457 /* Opens a disk file for READER and seeks to the current position as indicated
458 by case_idx. Normally the current position is the beginning of the file,
459 but casefile_to_disk may cause the file to be opened at a different
462 reader_open_file (struct casereader *reader)
464 struct casefile *cf = reader->cf;
467 if (reader->case_idx >= cf->case_cnt)
477 reader->fd = safe_open (cf->filename, O_RDONLY);
479 msg (FE, _("%s: Opening temporary file: %s."),
480 cf->filename, strerror (errno));
483 if (cf->buffer != NULL)
485 reader->buffer = cf->buffer;
490 reader->buffer = xmalloc (cf->buffer_size * sizeof *cf->buffer);
491 memset (reader->buffer, 0, cf->buffer_size * sizeof *cf->buffer);
494 if (cf->value_cnt != 0)
496 size_t buffer_case_cnt = cf->buffer_size / cf->value_cnt;
497 file_ofs = ((off_t) reader->case_idx / buffer_case_cnt
498 * cf->buffer_size * sizeof *cf->buffer);
499 reader->buffer_pos = (reader->case_idx % buffer_case_cnt
504 if (lseek (reader->fd, file_ofs, SEEK_SET) != file_ofs)
505 msg (FE, _("%s: Seeking temporary file: %s."),
506 cf->filename, strerror (errno));
508 if (cf->case_cnt > 0 && cf->value_cnt > 0)
509 fill_buffer (reader);
511 case_create (&reader->c, cf->value_cnt);
514 /* Fills READER's buffer by reading a block from disk. */
516 fill_buffer (struct casereader *reader)
518 int retval = full_read (reader->fd, reader->buffer,
519 reader->cf->buffer_size * sizeof *reader->buffer);
521 msg (FE, _("%s: Reading temporary file: %s."),
522 reader->cf->filename, strerror (errno));
523 else if (retval != reader->cf->buffer_size * sizeof *reader->buffer)
524 msg (FE, _("%s: Temporary file ended unexpectedly."),
525 reader->cf->filename);
528 /* Returns the casefile that READER reads. */
529 const struct casefile *
530 casereader_get_casefile (const struct casereader *reader)
532 assert (reader != NULL);
537 /* Reads a copy of the next case from READER into C.
538 Caller is responsible for destroying C. */
540 casereader_read (struct casereader *reader, struct ccase *c)
542 assert (reader != NULL);
544 if (reader->case_idx >= reader->cf->case_cnt)
547 if (reader->cf->storage == MEMORY)
549 size_t block_idx = reader->case_idx / CASES_PER_BLOCK;
550 size_t case_idx = reader->case_idx % CASES_PER_BLOCK;
552 case_clone (c, &reader->cf->cases[block_idx][case_idx]);
558 if (reader->buffer_pos + reader->cf->value_cnt > reader->cf->buffer_size)
560 fill_buffer (reader);
561 reader->buffer_pos = 0;
564 case_from_values (&reader->c, reader->buffer + reader->buffer_pos,
565 reader->cf->value_cnt);
566 reader->buffer_pos += reader->cf->value_cnt;
569 case_clone (c, &reader->c);
574 /* Reads the next case from READER into C and transfers ownership
575 to the caller. Caller is responsible for destroying C. */
577 casereader_read_xfer (struct casereader *reader, struct ccase *c)
579 assert (reader != NULL);
581 if (reader->destructive == 0
582 || reader->case_idx >= reader->cf->case_cnt
583 || reader->cf->storage == DISK)
584 return casereader_read (reader, c);
587 size_t block_idx = reader->case_idx / CASES_PER_BLOCK;
588 size_t case_idx = reader->case_idx % CASES_PER_BLOCK;
589 struct ccase *read_case = &reader->cf->cases[block_idx][case_idx];
591 case_move (c, read_case);
597 /* Destroys READER. */
599 casereader_destroy (struct casereader *reader)
601 assert (reader != NULL);
603 if (reader->next != NULL)
604 reader->next->prev = reader->prev;
605 if (reader->prev != NULL)
606 reader->prev->next = reader->next;
607 if (reader->cf->readers == reader)
608 reader->cf->readers = reader->next;
610 if (reader->cf->buffer == NULL)
611 reader->cf->buffer = reader->buffer;
613 free (reader->buffer);
615 if (reader->fd != -1)
617 if (reader->cf->fd == -1)
618 reader->cf->fd = reader->fd;
620 safe_close (reader->fd);
623 case_destroy (&reader->c);
628 /* Calls open(), passing FILENAME and FLAGS, repeating as necessary
629 to deal with interrupted calls. */
631 safe_open (const char *filename, int flags)
637 fd = open (filename, flags);
639 while (fd == -1 && errno == EINTR);
644 /* Calls close(), passing FD, repeating as necessary to deal with
645 interrupted calls. */
646 static int safe_close (int fd)
654 while (retval == -1 && errno == EINTR);
659 /* Calls read(), passing FD, BUFFER, and SIZE, repeating as
660 necessary to deal with interrupted calls. */
662 full_read (int fd, void *buffer_, size_t size)
664 char *buffer = buffer_;
665 size_t bytes_read = 0;
667 while (bytes_read < size)
669 int retval = read (fd, buffer + bytes_read, size - bytes_read);
671 bytes_read += retval;
672 else if (retval == 0)
674 else if (errno != EINTR)
681 /* Calls write(), passing FD, BUFFER, and SIZE, repeating as
682 necessary to deal with interrupted calls. */
684 full_write (int fd, const void *buffer_, size_t size)
686 const char *buffer = buffer_;
687 size_t bytes_written = 0;
689 while (bytes_written < size)
691 int retval = write (fd, buffer + bytes_written, size - bytes_written);
693 bytes_written += retval;
694 else if (errno != EINTR)
698 return bytes_written;
702 /* Registers our exit handler with atexit() if it has not already
705 register_atexit (void)
707 static int registered = 0;
711 atexit (exit_handler);
717 /* atexit() handler that closes and deletes our temporary
722 while (casefiles != NULL)
723 casefile_destroy (casefiles);
726 #include <gsl/gsl_rng.h>
731 static void test_casefile (int pattern, size_t value_cnt, size_t case_cnt);
732 static void get_random_case (struct ccase *, size_t value_cnt,
734 static void write_random_case (struct casefile *cf, size_t case_idx);
735 static void read_and_verify_random_case (struct casefile *cf,
736 struct casereader *reader,
738 static void fail_test (const char *message, ...);
741 cmd_debug_casefile (void)
743 static const size_t sizes[] =
745 1, 2, 3, 4, 5, 6, 7, 14, 15, 16, 17, 31, 55, 73,
746 100, 137, 257, 521, 1031, 2053
752 size_max = sizeof sizes / sizeof *sizes;
753 if (lex_match_id ("SMALL"))
761 return lex_end_of_command ();
763 for (pattern = 0; pattern < 6; pattern++)
767 for (size = sizes; size < sizes + size_max; size++)
771 for (case_cnt = 0; case_cnt <= case_max;
772 case_cnt = (case_cnt * 2) + 1)
773 test_casefile (pattern, *size, case_cnt);
776 printf ("Casefile tests succeeded.\n");
781 test_casefile (int pattern, size_t value_cnt, size_t case_cnt)
784 struct casereader *r1, *r2;
789 rng = gsl_rng_alloc (gsl_rng_mt19937);
790 cf = casefile_create (value_cnt);
792 casefile_to_disk (cf);
793 for (i = 0; i < case_cnt; i++)
794 write_random_case (cf, i);
797 r1 = casefile_get_reader (cf);
798 r2 = casefile_get_reader (cf);
803 for (i = 0; i < case_cnt; i++)
805 read_and_verify_random_case (cf, r1, i);
806 read_and_verify_random_case (cf, r2, i);
810 for (i = 0; i < case_cnt; i++)
811 read_and_verify_random_case (cf, r1, i);
812 for (i = 0; i < case_cnt; i++)
813 read_and_verify_random_case (cf, r2, i);
818 for (i = j = 0; i < case_cnt; i++)
820 read_and_verify_random_case (cf, r1, i);
821 if (gsl_rng_get (rng) % pattern == 0)
822 read_and_verify_random_case (cf, r2, j++);
823 if (i == case_cnt / 2)
824 casefile_to_disk (cf);
826 for (; j < case_cnt; j++)
827 read_and_verify_random_case (cf, r2, j);
830 if (casereader_read (r1, &c))
831 fail_test ("Casereader 1 not at end of file.");
832 if (casereader_read (r2, &c))
833 fail_test ("Casereader 2 not at end of file.");
835 casereader_destroy (r1);
837 casereader_destroy (r2);
840 r1 = casefile_get_destructive_reader (cf);
841 for (i = 0; i < case_cnt; i++)
843 struct ccase read_case, expected_case;
845 get_random_case (&expected_case, value_cnt, i);
846 if (!casereader_read_xfer (r1, &read_case))
847 fail_test ("Premature end of casefile.");
848 for (j = 0; j < value_cnt; j++)
850 double a = case_num (&read_case, j);
851 double b = case_num (&expected_case, j);
853 fail_test ("Case %lu fails comparison.", (unsigned long) i);
855 case_destroy (&expected_case);
856 case_destroy (&read_case);
858 casereader_destroy (r1);
860 casefile_destroy (cf);
865 get_random_case (struct ccase *c, size_t value_cnt, size_t case_idx)
868 case_create (c, value_cnt);
869 for (i = 0; i < value_cnt; i++)
870 case_data_rw (c, i)->f = case_idx % 257 + i;
874 write_random_case (struct casefile *cf, size_t case_idx)
877 get_random_case (&c, casefile_get_value_cnt (cf), case_idx);
878 casefile_append_xfer (cf, &c);
882 read_and_verify_random_case (struct casefile *cf,
883 struct casereader *reader, size_t case_idx)
885 struct ccase read_case, expected_case;
889 value_cnt = casefile_get_value_cnt (cf);
890 get_random_case (&expected_case, value_cnt, case_idx);
891 if (!casereader_read (reader, &read_case))
892 fail_test ("Premature end of casefile.");
893 for (i = 0; i < value_cnt; i++)
895 double a = case_num (&read_case, i);
896 double b = case_num (&expected_case, i);
898 fail_test ("Case %lu fails comparison.", (unsigned long) case_idx);
900 case_destroy (&read_case);
901 case_destroy (&expected_case);
905 fail_test (const char *message, ...)
909 va_start (args, message);
910 vprintf (message, args);