1 /* PSPP - computes sample statistics.
2 Copyright (C) 2004 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
37 #ifdef HAVE_VALGRIND_VALGRIND_H
38 #include <valgrind/valgrind.h>
41 #define IO_BUF_SIZE (8192 / sizeof (union value))
43 /* A casefile is a sequentially accessible array of immutable
44 cases. It may be stored in memory or on disk as workspace
45 allows. Cases may be appended to the end of the file. Cases
46 may be read sequentially starting from the beginning of the
47 file. Once any cases have been read, no more cases may be
48 appended. The entire file is discarded at once. */
50 /* In-memory cases are arranged in an array of arrays. The top
51 level is variable size and the size of each bottom level array
52 is fixed at the number of cases defined here. */
53 #define CASES_PER_BLOCK 128
59 struct casefile *next, *prev; /* Next, prev in global list. */
60 size_t value_cnt; /* Case size in `union value's. */
61 size_t case_acct_size; /* Case size for accounting. */
62 unsigned long case_cnt; /* Number of cases stored. */
63 enum { MEMORY, DISK } storage; /* Where cases are stored. */
64 enum { WRITE, READ } mode; /* Is writing or reading allowed? */
65 struct casereader *readers; /* List of our readers. */
66 int being_destroyed; /* Does a destructive reader exist? */
69 struct ccase **cases; /* Pointer to array of cases. */
72 int fd; /* File descriptor, -1 if none. */
73 char *filename; /* Filename. */
74 union value *buffer; /* I/O buffer, NULL if none. */
75 size_t buffer_used; /* Number of values used in buffer. */
76 size_t buffer_size; /* Buffer size in values. */
79 /* For reading out the cases in a casefile. */
82 struct casereader *next, *prev; /* Next, prev in casefile's list. */
83 struct casefile *cf; /* Our casefile. */
84 unsigned long case_idx; /* Case number of current case. */
85 int destructive; /* Is this a destructive reader? */
88 int fd; /* File descriptor. */
89 union value *buffer; /* I/O buffer. */
90 size_t buffer_pos; /* Offset of buffer position. */
91 struct ccase c; /* Current case. */
94 /* Return the case number of the current case */
96 casereader_cnum(const struct casereader *r)
101 /* Doubly linked list of all casefiles. */
102 static struct casefile *casefiles;
104 /* Number of bytes of case allocated in in-memory casefiles. */
105 static size_t case_bytes;
107 static void register_atexit (void);
108 static void exit_handler (void);
110 static void reader_open_file (struct casereader *reader);
111 static void write_case_to_disk (struct casefile *cf, const struct ccase *c);
112 static void flush_buffer (struct casefile *cf);
113 static void fill_buffer (struct casereader *reader);
115 static int safe_open (const char *filename, int flags);
116 static int safe_close (int fd);
117 static int full_read (int fd, void *buffer, size_t size);
118 static int full_write (int fd, const void *buffer, size_t size);
120 /* Creates and returns a casefile to store cases of VALUE_CNT
121 `union value's each. */
123 casefile_create (size_t value_cnt)
125 struct casefile *cf = xmalloc (sizeof *cf);
126 cf->next = casefiles;
128 if (cf->next != NULL)
131 cf->value_cnt = value_cnt;
132 cf->case_acct_size = (cf->value_cnt + 4) * sizeof *cf->buffer;
134 cf->storage = MEMORY;
137 cf->being_destroyed = 0;
142 cf->buffer_size = ROUND_UP (cf->value_cnt, IO_BUF_SIZE);
143 if (cf->value_cnt > 0 && cf->buffer_size % cf->value_cnt > 64)
144 cf->buffer_size = cf->value_cnt;
150 /* Destroys casefile CF. */
152 casefile_destroy (struct casefile *cf)
156 if (cf->next != NULL)
157 cf->next->prev = cf->prev;
158 if (cf->prev != NULL)
159 cf->prev->next = cf->next;
161 casefiles = cf->next;
163 while (cf->readers != NULL)
164 casereader_destroy (cf->readers);
166 if (cf->cases != NULL)
168 size_t idx, block_cnt;
170 case_bytes -= cf->case_cnt * cf->case_acct_size;
171 for (idx = 0; idx < cf->case_cnt; idx++)
173 size_t block_idx = idx / CASES_PER_BLOCK;
174 size_t case_idx = idx % CASES_PER_BLOCK;
175 struct ccase *c = &cf->cases[block_idx][case_idx];
179 block_cnt = DIV_RND_UP (cf->case_cnt, CASES_PER_BLOCK);
180 for (idx = 0; idx < block_cnt; idx++)
181 free (cf->cases[idx]);
189 if (cf->filename != NULL && remove (cf->filename) == -1)
190 msg (ME, _("%s: Removing temporary file: %s."),
191 cf->filename, strerror (errno));
200 /* Returns nonzero only if casefile CF is stored in memory (instead of on
203 casefile_in_core (const struct casefile *cf)
207 return cf->storage == MEMORY;
210 /* Puts a casefile to "sleep", that is, minimizes the resources
211 needed for it by closing its file descriptor and freeing its
212 buffer. This is useful if we need so many casefiles that we
213 might not have enough memory and file descriptors to go
216 For simplicity, this implementation always converts the
217 casefile to reader mode. If this turns out to be a problem,
218 with a little extra work we could also support sleeping
221 casefile_sleep (const struct casefile *cf_)
223 struct casefile *cf = (struct casefile *) cf_;
226 casefile_mode_reader (cf);
227 casefile_to_disk (cf);
235 if (cf->buffer != NULL)
242 /* Returns the number of `union value's in a case for CF. */
244 casefile_get_value_cnt (const struct casefile *cf)
248 return cf->value_cnt;
251 /* Returns the number of cases in casefile CF. */
253 casefile_get_case_cnt (const struct casefile *cf)
260 /* Appends a copy of case C to casefile CF. Not valid after any
261 reader for CF has been created. */
263 casefile_append (struct casefile *cf, const struct ccase *c)
267 assert (cf->mode == WRITE);
269 /* Try memory first. */
270 if (cf->storage == MEMORY)
272 if (case_bytes < get_max_workspace ())
274 size_t block_idx = cf->case_cnt / CASES_PER_BLOCK;
275 size_t case_idx = cf->case_cnt % CASES_PER_BLOCK;
276 struct ccase new_case;
278 case_bytes += cf->case_acct_size;
279 case_clone (&new_case, c);
282 if ((block_idx & (block_idx - 1)) == 0)
284 size_t block_cap = block_idx == 0 ? 1 : block_idx * 2;
285 cf->cases = xrealloc (cf->cases,
286 sizeof *cf->cases * block_cap);
289 cf->cases[block_idx] = xmalloc (sizeof **cf->cases
293 case_move (&cf->cases[block_idx][case_idx], &new_case);
297 casefile_to_disk (cf);
298 assert (cf->storage == DISK);
299 write_case_to_disk (cf, c);
303 write_case_to_disk (cf, c);
308 /* Appends case C to casefile CF, which takes over ownership of
309 C. Not valid after any reader for CF has been created. */
311 casefile_append_xfer (struct casefile *cf, struct ccase *c)
313 casefile_append (cf, c);
317 /* Writes case C to casefile CF's disk buffer, first flushing the buffer to
318 disk if it would otherwise overflow. */
320 write_case_to_disk (struct casefile *cf, const struct ccase *c)
322 case_to_values (c, cf->buffer + cf->buffer_used, cf->value_cnt);
323 cf->buffer_used += cf->value_cnt;
324 if (cf->buffer_used + cf->value_cnt > cf->buffer_size)
328 /* If any bytes in CF's output buffer are used, flush them to
331 flush_buffer (struct casefile *cf)
333 if (cf->buffer_used > 0)
335 if (!full_write (cf->fd, cf->buffer,
336 cf->buffer_size * sizeof *cf->buffer))
337 msg (FE, _("Error writing temporary file: %s."), strerror (errno));
343 /* Creates a temporary file and stores its name in *FILENAME and
344 a file descriptor for it in *FD. Returns success. Caller is
345 responsible for freeing *FILENAME. */
347 make_temp_file (int *fd, char **filename)
349 const char *parent_dir;
351 assert (filename != NULL);
354 if (getenv ("TMPDIR") != NULL)
355 parent_dir = getenv ("TMPDIR");
357 parent_dir = P_tmpdir;
359 *filename = xmalloc (strlen (parent_dir) + 32);
360 sprintf (*filename, "%s%cpsppXXXXXX", parent_dir, DIR_SEPARATOR);
361 *fd = mkstemp (*filename);
364 msg (FE, _("%s: Creating temporary file: %s."),
365 *filename, strerror (errno));
373 /* If CF is currently stored in memory, writes it to disk. Readers, if any,
374 retain their current positions. */
376 casefile_to_disk (const struct casefile *cf_)
378 struct casefile *cf = (struct casefile *) cf_;
379 struct casereader *reader;
383 if (cf->storage == MEMORY)
385 size_t idx, block_cnt;
387 assert (cf->filename == NULL);
388 assert (cf->fd == -1);
389 assert (cf->buffer_used == 0);
392 if (!make_temp_file (&cf->fd, &cf->filename))
394 cf->buffer = xmalloc (cf->buffer_size * sizeof *cf->buffer);
395 memset (cf->buffer, 0, cf->buffer_size * sizeof *cf->buffer);
397 case_bytes -= cf->case_cnt * cf->case_acct_size;
398 for (idx = 0; idx < cf->case_cnt; idx++)
400 size_t block_idx = idx / CASES_PER_BLOCK;
401 size_t case_idx = idx % CASES_PER_BLOCK;
402 struct ccase *c = &cf->cases[block_idx][case_idx];
403 write_case_to_disk (cf, c);
407 block_cnt = DIV_RND_UP (cf->case_cnt, CASES_PER_BLOCK);
408 for (idx = 0; idx < block_cnt; idx++)
409 free (cf->cases[idx]);
414 if (cf->mode == READ)
417 for (reader = cf->readers; reader != NULL; reader = reader->next)
418 reader_open_file (reader);
422 /* Changes CF to reader mode, ensuring that no more cases may be
423 added. Creating a casereader for CF has the same effect. */
425 casefile_mode_reader (struct casefile *cf)
431 /* Creates and returns a casereader for CF. A casereader can be used to
432 sequentially read the cases in a casefile. */
434 casefile_get_reader (const struct casefile *cf_)
436 struct casefile *cf = (struct casefile *) cf_;
437 struct casereader *reader;
440 assert (!cf->being_destroyed);
442 /* Flush the buffer to disk if it's not empty. */
443 if (cf->mode == WRITE && cf->storage == DISK)
448 reader = xmalloc (sizeof *reader);
449 reader->next = cf->readers;
450 if (cf->readers != NULL)
451 reader->next->prev = reader;
452 cf->readers = reader;
455 reader->case_idx = 0;
456 reader->destructive = 0;
458 reader->buffer = NULL;
459 reader->buffer_pos = 0;
460 case_nullify (&reader->c);
462 if (reader->cf->storage == DISK)
463 reader_open_file (reader);
468 /* Creates and returns a destructive casereader for CF. Like a
469 normal casereader, a destructive casereader sequentially reads
470 the cases in a casefile. Unlike a normal casereader, a
471 destructive reader cannot operate concurrently with any other
472 reader. (This restriction could be relaxed in a few ways, but
473 it is so far unnecessary for other code.) */
475 casefile_get_destructive_reader (struct casefile *cf)
477 struct casereader *reader;
479 assert (cf->readers == NULL);
480 reader = casefile_get_reader (cf);
481 reader->destructive = 1;
482 cf->being_destroyed = 1;
486 /* Opens a disk file for READER and seeks to the current position as indicated
487 by case_idx. Normally the current position is the beginning of the file,
488 but casefile_to_disk may cause the file to be opened at a different
491 reader_open_file (struct casereader *reader)
493 struct casefile *cf = reader->cf;
496 if (reader->case_idx >= cf->case_cnt)
506 reader->fd = safe_open (cf->filename, O_RDONLY);
508 msg (FE, _("%s: Opening temporary file: %s."),
509 cf->filename, strerror (errno));
512 if (cf->buffer != NULL)
514 reader->buffer = cf->buffer;
519 reader->buffer = xmalloc (cf->buffer_size * sizeof *cf->buffer);
520 memset (reader->buffer, 0, cf->buffer_size * sizeof *cf->buffer);
523 if (cf->value_cnt != 0)
525 size_t buffer_case_cnt = cf->buffer_size / cf->value_cnt;
526 file_ofs = ((off_t) reader->case_idx / buffer_case_cnt
527 * cf->buffer_size * sizeof *cf->buffer);
528 reader->buffer_pos = (reader->case_idx % buffer_case_cnt
533 if (lseek (reader->fd, file_ofs, SEEK_SET) != file_ofs)
534 msg (FE, _("%s: Seeking temporary file: %s."),
535 cf->filename, strerror (errno));
537 if (cf->case_cnt > 0 && cf->value_cnt > 0)
538 fill_buffer (reader);
540 case_create (&reader->c, cf->value_cnt);
543 /* Fills READER's buffer by reading a block from disk. */
545 fill_buffer (struct casereader *reader)
547 int retval = full_read (reader->fd, reader->buffer,
548 reader->cf->buffer_size * sizeof *reader->buffer);
550 msg (FE, _("%s: Reading temporary file: %s."),
551 reader->cf->filename, strerror (errno));
552 else if (retval != reader->cf->buffer_size * sizeof *reader->buffer)
553 msg (FE, _("%s: Temporary file ended unexpectedly."),
554 reader->cf->filename);
557 /* Returns the casefile that READER reads. */
558 const struct casefile *
559 casereader_get_casefile (const struct casereader *reader)
561 assert (reader != NULL);
566 /* Reads a copy of the next case from READER into C.
567 Caller is responsible for destroying C. */
569 casereader_read (struct casereader *reader, struct ccase *c)
571 assert (reader != NULL);
573 if (reader->case_idx >= reader->cf->case_cnt)
576 if (reader->cf->storage == MEMORY)
578 size_t block_idx = reader->case_idx / CASES_PER_BLOCK;
579 size_t case_idx = reader->case_idx % CASES_PER_BLOCK;
581 case_clone (c, &reader->cf->cases[block_idx][case_idx]);
587 if (reader->buffer_pos + reader->cf->value_cnt > reader->cf->buffer_size)
589 fill_buffer (reader);
590 reader->buffer_pos = 0;
593 case_from_values (&reader->c, reader->buffer + reader->buffer_pos,
594 reader->cf->value_cnt);
595 reader->buffer_pos += reader->cf->value_cnt;
598 case_clone (c, &reader->c);
603 /* Reads the next case from READER into C and transfers ownership
604 to the caller. Caller is responsible for destroying C. */
606 casereader_read_xfer (struct casereader *reader, struct ccase *c)
608 assert (reader != NULL);
610 if (reader->destructive == 0
611 || reader->case_idx >= reader->cf->case_cnt
612 || reader->cf->storage == DISK)
613 return casereader_read (reader, c);
616 size_t block_idx = reader->case_idx / CASES_PER_BLOCK;
617 size_t case_idx = reader->case_idx % CASES_PER_BLOCK;
618 struct ccase *read_case = &reader->cf->cases[block_idx][case_idx];
620 case_move (c, read_case);
626 /* Destroys READER. */
628 casereader_destroy (struct casereader *reader)
630 assert (reader != NULL);
632 if (reader->next != NULL)
633 reader->next->prev = reader->prev;
634 if (reader->prev != NULL)
635 reader->prev->next = reader->next;
636 if (reader->cf->readers == reader)
637 reader->cf->readers = reader->next;
639 if (reader->cf->buffer == NULL)
640 reader->cf->buffer = reader->buffer;
642 free (reader->buffer);
644 if (reader->fd != -1)
646 if (reader->cf->fd == -1)
647 reader->cf->fd = reader->fd;
649 safe_close (reader->fd);
652 case_destroy (&reader->c);
657 /* Calls open(), passing FILENAME and FLAGS, repeating as necessary
658 to deal with interrupted calls. */
660 safe_open (const char *filename, int flags)
666 fd = open (filename, flags);
668 while (fd == -1 && errno == EINTR);
673 /* Calls close(), passing FD, repeating as necessary to deal with
674 interrupted calls. */
675 static int safe_close (int fd)
683 while (retval == -1 && errno == EINTR);
688 /* Calls read(), passing FD, BUFFER, and SIZE, repeating as
689 necessary to deal with interrupted calls. */
691 full_read (int fd, void *buffer_, size_t size)
693 char *buffer = buffer_;
694 size_t bytes_read = 0;
696 while (bytes_read < size)
698 int retval = read (fd, buffer + bytes_read, size - bytes_read);
700 bytes_read += retval;
701 else if (retval == 0)
703 else if (errno != EINTR)
710 /* Calls write(), passing FD, BUFFER, and SIZE, repeating as
711 necessary to deal with interrupted calls. */
713 full_write (int fd, const void *buffer_, size_t size)
715 const char *buffer = buffer_;
716 size_t bytes_written = 0;
718 while (bytes_written < size)
720 int retval = write (fd, buffer + bytes_written, size - bytes_written);
722 bytes_written += retval;
723 else if (errno != EINTR)
727 return bytes_written;
730 /* Registers our exit handler with atexit() if it has not already
733 register_atexit (void)
735 static int registered = 0;
739 signal (SIGQUIT, (sighandler_t) exit_handler);
740 signal (SIGINT, (sighandler_t) exit_handler);
741 atexit (exit_handler);
745 /* atexit() handler that closes and deletes our temporary
750 while (casefiles != NULL)
751 casefile_destroy (casefiles);
754 #include <gsl/gsl_rng.h>
759 static void test_casefile (int pattern, size_t value_cnt, size_t case_cnt);
760 static void get_random_case (struct ccase *, size_t value_cnt,
762 static void write_random_case (struct casefile *cf, size_t case_idx);
763 static void read_and_verify_random_case (struct casefile *cf,
764 struct casereader *reader,
766 static void fail_test (const char *message, ...);
769 cmd_debug_casefile (void)
771 static const size_t sizes[] =
773 1, 2, 3, 4, 5, 6, 7, 14, 15, 16, 17, 31, 55, 73,
774 100, 137, 257, 521, 1031, 2053
780 size_max = sizeof sizes / sizeof *sizes;
781 if (lex_match_id ("SMALL"))
789 return lex_end_of_command ();
791 for (pattern = 0; pattern < 6; pattern++)
795 for (size = sizes; size < sizes + size_max; size++)
799 for (case_cnt = 0; case_cnt <= case_max;
800 case_cnt = (case_cnt * 2) + 1)
801 test_casefile (pattern, *size, case_cnt);
804 printf ("Casefile tests succeeded.\n");
809 test_casefile (int pattern, size_t value_cnt, size_t case_cnt)
812 struct casereader *r1, *r2;
817 rng = gsl_rng_alloc (gsl_rng_mt19937);
818 cf = casefile_create (value_cnt);
820 casefile_to_disk (cf);
821 for (i = 0; i < case_cnt; i++)
822 write_random_case (cf, i);
825 r1 = casefile_get_reader (cf);
826 r2 = casefile_get_reader (cf);
831 for (i = 0; i < case_cnt; i++)
833 read_and_verify_random_case (cf, r1, i);
834 read_and_verify_random_case (cf, r2, i);
838 for (i = 0; i < case_cnt; i++)
839 read_and_verify_random_case (cf, r1, i);
840 for (i = 0; i < case_cnt; i++)
841 read_and_verify_random_case (cf, r2, i);
846 for (i = j = 0; i < case_cnt; i++)
848 read_and_verify_random_case (cf, r1, i);
849 if (gsl_rng_get (rng) % pattern == 0)
850 read_and_verify_random_case (cf, r2, j++);
851 if (i == case_cnt / 2)
852 casefile_to_disk (cf);
854 for (; j < case_cnt; j++)
855 read_and_verify_random_case (cf, r2, j);
858 if (casereader_read (r1, &c))
859 fail_test ("Casereader 1 not at end of file.");
860 if (casereader_read (r2, &c))
861 fail_test ("Casereader 2 not at end of file.");
863 casereader_destroy (r1);
865 casereader_destroy (r2);
868 r1 = casefile_get_destructive_reader (cf);
869 for (i = 0; i < case_cnt; i++)
871 struct ccase read_case, expected_case;
873 get_random_case (&expected_case, value_cnt, i);
874 if (!casereader_read_xfer (r1, &read_case))
875 fail_test ("Premature end of casefile.");
876 for (j = 0; j < value_cnt; j++)
878 double a = case_num (&read_case, j);
879 double b = case_num (&expected_case, j);
881 fail_test ("Case %lu fails comparison.", (unsigned long) i);
883 case_destroy (&expected_case);
884 case_destroy (&read_case);
886 casereader_destroy (r1);
888 casefile_destroy (cf);
893 get_random_case (struct ccase *c, size_t value_cnt, size_t case_idx)
896 case_create (c, value_cnt);
897 for (i = 0; i < value_cnt; i++)
898 case_data_rw (c, i)->f = case_idx % 257 + i;
902 write_random_case (struct casefile *cf, size_t case_idx)
905 get_random_case (&c, casefile_get_value_cnt (cf), case_idx);
906 casefile_append_xfer (cf, &c);
910 read_and_verify_random_case (struct casefile *cf,
911 struct casereader *reader, size_t case_idx)
913 struct ccase read_case, expected_case;
917 value_cnt = casefile_get_value_cnt (cf);
918 get_random_case (&expected_case, value_cnt, case_idx);
919 if (!casereader_read (reader, &read_case))
920 fail_test ("Premature end of casefile.");
921 for (i = 0; i < value_cnt; i++)
923 double a = case_num (&read_case, i);
924 double b = case_num (&expected_case, i);
926 fail_test ("Case %lu fails comparison.", (unsigned long) case_idx);
928 case_destroy (&read_case);
929 case_destroy (&expected_case);
933 fail_test (const char *message, ...)
937 va_start (args, message);
938 vprintf (message, args);