1 /* PSPP - computes sample statistics.
2 Copyright (C) 2004 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
36 #ifdef HAVE_VALGRIND_VALGRIND_H
37 #include <valgrind/valgrind.h>
40 #define IO_BUF_SIZE (8192 / sizeof (union value))
42 /* A casefile is a sequentially accessible array of immutable
43 cases. It may be stored in memory or on disk as workspace
44 allows. Cases may be appended to the end of the file. Cases
45 may be read sequentially starting from the beginning of the
46 file. Once any cases have been read, no more cases may be
47 appended. The entire file is discarded at once. */
49 /* In-memory cases are arranged in an array of arrays. The top
50 level is variable size and the size of each bottom level array
51 is fixed at the number of cases defined here. */
52 #define CASES_PER_BLOCK 128
58 struct casefile *next, *prev; /* Next, prev in global list. */
59 size_t value_cnt; /* Case size in `union value's. */
60 size_t case_acct_size; /* Case size for accounting. */
61 unsigned long case_cnt; /* Number of cases stored. */
62 enum { MEMORY, DISK } storage; /* Where cases are stored. */
63 enum { WRITE, READ } mode; /* Is writing or reading allowed? */
64 struct casereader *readers; /* List of our readers. */
65 int being_destroyed; /* Does a destructive reader exist? */
68 struct ccase **cases; /* Pointer to array of cases. */
71 int fd; /* File descriptor, -1 if none. */
72 char *filename; /* Filename. */
73 union value *buffer; /* I/O buffer, NULL if none. */
74 size_t buffer_used; /* Number of values used in buffer. */
75 size_t buffer_size; /* Buffer size in values. */
78 /* For reading out the cases in a casefile. */
81 struct casereader *next, *prev; /* Next, prev in casefile's list. */
82 struct casefile *cf; /* Our casefile. */
83 unsigned long case_idx; /* Case number of current case. */
84 int destructive; /* Is this a destructive reader? */
87 int fd; /* File descriptor. */
88 union value *buffer; /* I/O buffer. */
89 size_t buffer_pos; /* Offset of buffer position. */
90 struct ccase c; /* Current case. */
93 /* Return the case number of the current case */
95 casereader_cnum(const struct casereader *r)
100 /* Doubly linked list of all casefiles. */
101 static struct casefile *casefiles;
103 /* Number of bytes of case allocated in in-memory casefiles. */
104 static size_t case_bytes;
106 static void register_atexit (void);
107 static void exit_handler (void);
109 static void reader_open_file (struct casereader *reader);
110 static void write_case_to_disk (struct casefile *cf, const struct ccase *c);
111 static void flush_buffer (struct casefile *cf);
112 static void fill_buffer (struct casereader *reader);
114 static int safe_open (const char *filename, int flags);
115 static int safe_close (int fd);
116 static int full_read (int fd, void *buffer, size_t size);
117 static int full_write (int fd, const void *buffer, size_t size);
119 /* Creates and returns a casefile to store cases of VALUE_CNT
120 `union value's each. */
122 casefile_create (size_t value_cnt)
124 struct casefile *cf = xmalloc (sizeof *cf);
125 cf->next = casefiles;
127 if (cf->next != NULL)
130 cf->value_cnt = value_cnt;
131 cf->case_acct_size = (cf->value_cnt + 4) * sizeof *cf->buffer;
133 cf->storage = MEMORY;
136 cf->being_destroyed = 0;
141 cf->buffer_size = ROUND_UP (cf->value_cnt, IO_BUF_SIZE);
142 if (cf->value_cnt > 0 && cf->buffer_size % cf->value_cnt > 64)
143 cf->buffer_size = cf->value_cnt;
149 /* Destroys casefile CF. */
151 casefile_destroy (struct casefile *cf)
155 if (cf->next != NULL)
156 cf->next->prev = cf->prev;
157 if (cf->prev != NULL)
158 cf->prev->next = cf->next;
160 casefiles = cf->next;
162 while (cf->readers != NULL)
163 casereader_destroy (cf->readers);
165 if (cf->cases != NULL)
167 size_t idx, block_cnt;
169 case_bytes -= cf->case_cnt * cf->case_acct_size;
170 for (idx = 0; idx < cf->case_cnt; idx++)
172 size_t block_idx = idx / CASES_PER_BLOCK;
173 size_t case_idx = idx % CASES_PER_BLOCK;
174 struct ccase *c = &cf->cases[block_idx][case_idx];
178 block_cnt = DIV_RND_UP (cf->case_cnt, CASES_PER_BLOCK);
179 for (idx = 0; idx < block_cnt; idx++)
180 free (cf->cases[idx]);
188 if (cf->filename != NULL && remove (cf->filename) == -1)
189 msg (ME, _("%s: Removing temporary file: %s."),
190 cf->filename, strerror (errno));
199 /* Returns nonzero only if casefile CF is stored in memory (instead of on
202 casefile_in_core (const struct casefile *cf)
206 return cf->storage == MEMORY;
209 /* Puts a casefile to "sleep", that is, minimizes the resources
210 needed for it by closing its file descriptor and freeing its
211 buffer. This is useful if we need so many casefiles that we
212 might not have enough memory and file descriptors to go
215 For simplicity, this implementation always converts the
216 casefile to reader mode. If this turns out to be a problem,
217 with a little extra work we could also support sleeping
220 casefile_sleep (const struct casefile *cf_)
222 struct casefile *cf = (struct casefile *) cf_;
225 casefile_mode_reader (cf);
226 casefile_to_disk (cf);
234 if (cf->buffer != NULL)
241 /* Returns the number of `union value's in a case for CF. */
243 casefile_get_value_cnt (const struct casefile *cf)
247 return cf->value_cnt;
250 /* Returns the number of cases in casefile CF. */
252 casefile_get_case_cnt (const struct casefile *cf)
259 /* Appends a copy of case C to casefile CF. Not valid after any
260 reader for CF has been created. */
262 casefile_append (struct casefile *cf, const struct ccase *c)
266 assert (cf->mode == WRITE);
268 /* Try memory first. */
269 if (cf->storage == MEMORY)
271 if (case_bytes < get_max_workspace ())
273 size_t block_idx = cf->case_cnt / CASES_PER_BLOCK;
274 size_t case_idx = cf->case_cnt % CASES_PER_BLOCK;
275 struct ccase new_case;
277 case_bytes += cf->case_acct_size;
278 case_clone (&new_case, c);
281 if ((block_idx & (block_idx - 1)) == 0)
283 size_t block_cap = block_idx == 0 ? 1 : block_idx * 2;
284 cf->cases = xrealloc (cf->cases,
285 sizeof *cf->cases * block_cap);
288 cf->cases[block_idx] = xmalloc (sizeof **cf->cases
292 case_move (&cf->cases[block_idx][case_idx], &new_case);
296 casefile_to_disk (cf);
297 assert (cf->storage == DISK);
298 write_case_to_disk (cf, c);
302 write_case_to_disk (cf, c);
307 /* Appends case C to casefile CF, which takes over ownership of
308 C. Not valid after any reader for CF has been created. */
310 casefile_append_xfer (struct casefile *cf, struct ccase *c)
312 casefile_append (cf, c);
316 /* Writes case C to casefile CF's disk buffer, first flushing the buffer to
317 disk if it would otherwise overflow. */
319 write_case_to_disk (struct casefile *cf, const struct ccase *c)
321 case_to_values (c, cf->buffer + cf->buffer_used, cf->value_cnt);
322 cf->buffer_used += cf->value_cnt;
323 if (cf->buffer_used + cf->value_cnt > cf->buffer_size)
327 /* If any bytes in CF's output buffer are used, flush them to
330 flush_buffer (struct casefile *cf)
332 if (cf->buffer_used > 0)
334 if (!full_write (cf->fd, cf->buffer,
335 cf->buffer_size * sizeof *cf->buffer))
336 msg (FE, _("Error writing temporary file: %s."), strerror (errno));
343 /* If CF is currently stored in memory, writes it to disk. Readers, if any,
344 retain their current positions. */
346 casefile_to_disk (const struct casefile *cf_)
348 struct casefile *cf = (struct casefile *) cf_;
349 struct casereader *reader;
353 if (cf->storage == MEMORY)
355 size_t idx, block_cnt;
357 assert (cf->filename == NULL);
358 assert (cf->fd == -1);
359 assert (cf->buffer_used == 0);
362 if (!make_temp_file (&cf->fd, &cf->filename))
364 cf->buffer = xmalloc (cf->buffer_size * sizeof *cf->buffer);
365 memset (cf->buffer, 0, cf->buffer_size * sizeof *cf->buffer);
367 case_bytes -= cf->case_cnt * cf->case_acct_size;
368 for (idx = 0; idx < cf->case_cnt; idx++)
370 size_t block_idx = idx / CASES_PER_BLOCK;
371 size_t case_idx = idx % CASES_PER_BLOCK;
372 struct ccase *c = &cf->cases[block_idx][case_idx];
373 write_case_to_disk (cf, c);
377 block_cnt = DIV_RND_UP (cf->case_cnt, CASES_PER_BLOCK);
378 for (idx = 0; idx < block_cnt; idx++)
379 free (cf->cases[idx]);
384 if (cf->mode == READ)
387 for (reader = cf->readers; reader != NULL; reader = reader->next)
388 reader_open_file (reader);
392 /* Changes CF to reader mode, ensuring that no more cases may be
393 added. Creating a casereader for CF has the same effect. */
395 casefile_mode_reader (struct casefile *cf)
401 /* Creates and returns a casereader for CF. A casereader can be used to
402 sequentially read the cases in a casefile. */
404 casefile_get_reader (const struct casefile *cf_)
406 struct casefile *cf = (struct casefile *) cf_;
407 struct casereader *reader;
410 assert (!cf->being_destroyed);
412 /* Flush the buffer to disk if it's not empty. */
413 if (cf->mode == WRITE && cf->storage == DISK)
418 reader = xmalloc (sizeof *reader);
419 reader->next = cf->readers;
420 if (cf->readers != NULL)
421 reader->next->prev = reader;
422 cf->readers = reader;
425 reader->case_idx = 0;
426 reader->destructive = 0;
428 reader->buffer = NULL;
429 reader->buffer_pos = 0;
430 case_nullify (&reader->c);
432 if (reader->cf->storage == DISK)
433 reader_open_file (reader);
438 /* Creates and returns a destructive casereader for CF. Like a
439 normal casereader, a destructive casereader sequentially reads
440 the cases in a casefile. Unlike a normal casereader, a
441 destructive reader cannot operate concurrently with any other
442 reader. (This restriction could be relaxed in a few ways, but
443 it is so far unnecessary for other code.) */
445 casefile_get_destructive_reader (struct casefile *cf)
447 struct casereader *reader;
449 assert (cf->readers == NULL);
450 reader = casefile_get_reader (cf);
451 reader->destructive = 1;
452 cf->being_destroyed = 1;
456 /* Opens a disk file for READER and seeks to the current position as indicated
457 by case_idx. Normally the current position is the beginning of the file,
458 but casefile_to_disk may cause the file to be opened at a different
461 reader_open_file (struct casereader *reader)
463 struct casefile *cf = reader->cf;
466 if (reader->case_idx >= cf->case_cnt)
476 reader->fd = safe_open (cf->filename, O_RDONLY);
478 msg (FE, _("%s: Opening temporary file: %s."),
479 cf->filename, strerror (errno));
482 if (cf->buffer != NULL)
484 reader->buffer = cf->buffer;
489 reader->buffer = xmalloc (cf->buffer_size * sizeof *cf->buffer);
490 memset (reader->buffer, 0, cf->buffer_size * sizeof *cf->buffer);
493 if (cf->value_cnt != 0)
495 size_t buffer_case_cnt = cf->buffer_size / cf->value_cnt;
496 file_ofs = ((off_t) reader->case_idx / buffer_case_cnt
497 * cf->buffer_size * sizeof *cf->buffer);
498 reader->buffer_pos = (reader->case_idx % buffer_case_cnt
503 if (lseek (reader->fd, file_ofs, SEEK_SET) != file_ofs)
504 msg (FE, _("%s: Seeking temporary file: %s."),
505 cf->filename, strerror (errno));
507 if (cf->case_cnt > 0 && cf->value_cnt > 0)
508 fill_buffer (reader);
510 case_create (&reader->c, cf->value_cnt);
513 /* Fills READER's buffer by reading a block from disk. */
515 fill_buffer (struct casereader *reader)
517 int retval = full_read (reader->fd, reader->buffer,
518 reader->cf->buffer_size * sizeof *reader->buffer);
520 msg (FE, _("%s: Reading temporary file: %s."),
521 reader->cf->filename, strerror (errno));
522 else if (retval != reader->cf->buffer_size * sizeof *reader->buffer)
523 msg (FE, _("%s: Temporary file ended unexpectedly."),
524 reader->cf->filename);
527 /* Returns the casefile that READER reads. */
528 const struct casefile *
529 casereader_get_casefile (const struct casereader *reader)
531 assert (reader != NULL);
536 /* Reads a copy of the next case from READER into C.
537 Caller is responsible for destroying C. */
539 casereader_read (struct casereader *reader, struct ccase *c)
541 assert (reader != NULL);
543 if (reader->case_idx >= reader->cf->case_cnt)
546 if (reader->cf->storage == MEMORY)
548 size_t block_idx = reader->case_idx / CASES_PER_BLOCK;
549 size_t case_idx = reader->case_idx % CASES_PER_BLOCK;
551 case_clone (c, &reader->cf->cases[block_idx][case_idx]);
557 if (reader->buffer_pos + reader->cf->value_cnt > reader->cf->buffer_size)
559 fill_buffer (reader);
560 reader->buffer_pos = 0;
563 case_from_values (&reader->c, reader->buffer + reader->buffer_pos,
564 reader->cf->value_cnt);
565 reader->buffer_pos += reader->cf->value_cnt;
568 case_clone (c, &reader->c);
573 /* Reads the next case from READER into C and transfers ownership
574 to the caller. Caller is responsible for destroying C. */
576 casereader_read_xfer (struct casereader *reader, struct ccase *c)
578 assert (reader != NULL);
580 if (reader->destructive == 0
581 || reader->case_idx >= reader->cf->case_cnt
582 || reader->cf->storage == DISK)
583 return casereader_read (reader, c);
586 size_t block_idx = reader->case_idx / CASES_PER_BLOCK;
587 size_t case_idx = reader->case_idx % CASES_PER_BLOCK;
588 struct ccase *read_case = &reader->cf->cases[block_idx][case_idx];
590 case_move (c, read_case);
596 /* Destroys READER. */
598 casereader_destroy (struct casereader *reader)
600 assert (reader != NULL);
602 if (reader->next != NULL)
603 reader->next->prev = reader->prev;
604 if (reader->prev != NULL)
605 reader->prev->next = reader->next;
606 if (reader->cf->readers == reader)
607 reader->cf->readers = reader->next;
609 if (reader->cf->buffer == NULL)
610 reader->cf->buffer = reader->buffer;
612 free (reader->buffer);
614 if (reader->fd != -1)
616 if (reader->cf->fd == -1)
617 reader->cf->fd = reader->fd;
619 safe_close (reader->fd);
622 case_destroy (&reader->c);
627 /* Calls open(), passing FILENAME and FLAGS, repeating as necessary
628 to deal with interrupted calls. */
630 safe_open (const char *filename, int flags)
636 fd = open (filename, flags);
638 while (fd == -1 && errno == EINTR);
643 /* Calls close(), passing FD, repeating as necessary to deal with
644 interrupted calls. */
645 static int safe_close (int fd)
653 while (retval == -1 && errno == EINTR);
658 /* Calls read(), passing FD, BUFFER, and SIZE, repeating as
659 necessary to deal with interrupted calls. */
661 full_read (int fd, void *buffer_, size_t size)
663 char *buffer = buffer_;
664 size_t bytes_read = 0;
666 while (bytes_read < size)
668 int retval = read (fd, buffer + bytes_read, size - bytes_read);
670 bytes_read += retval;
671 else if (retval == 0)
673 else if (errno != EINTR)
680 /* Calls write(), passing FD, BUFFER, and SIZE, repeating as
681 necessary to deal with interrupted calls. */
683 full_write (int fd, const void *buffer_, size_t size)
685 const char *buffer = buffer_;
686 size_t bytes_written = 0;
688 while (bytes_written < size)
690 int retval = write (fd, buffer + bytes_written, size - bytes_written);
692 bytes_written += retval;
693 else if (errno != EINTR)
697 return bytes_written;
701 /* Registers our exit handler with atexit() if it has not already
704 register_atexit (void)
706 static int registered = 0;
710 atexit (exit_handler);
716 /* atexit() handler that closes and deletes our temporary
721 while (casefiles != NULL)
722 casefile_destroy (casefiles);
725 #include <gsl/gsl_rng.h>
730 static void test_casefile (int pattern, size_t value_cnt, size_t case_cnt);
731 static void get_random_case (struct ccase *, size_t value_cnt,
733 static void write_random_case (struct casefile *cf, size_t case_idx);
734 static void read_and_verify_random_case (struct casefile *cf,
735 struct casereader *reader,
737 static void fail_test (const char *message, ...);
740 cmd_debug_casefile (void)
742 static const size_t sizes[] =
744 1, 2, 3, 4, 5, 6, 7, 14, 15, 16, 17, 31, 55, 73,
745 100, 137, 257, 521, 1031, 2053
751 size_max = sizeof sizes / sizeof *sizes;
752 if (lex_match_id ("SMALL"))
760 return lex_end_of_command ();
762 for (pattern = 0; pattern < 6; pattern++)
766 for (size = sizes; size < sizes + size_max; size++)
770 for (case_cnt = 0; case_cnt <= case_max;
771 case_cnt = (case_cnt * 2) + 1)
772 test_casefile (pattern, *size, case_cnt);
775 printf ("Casefile tests succeeded.\n");
780 test_casefile (int pattern, size_t value_cnt, size_t case_cnt)
783 struct casereader *r1, *r2;
788 rng = gsl_rng_alloc (gsl_rng_mt19937);
789 cf = casefile_create (value_cnt);
791 casefile_to_disk (cf);
792 for (i = 0; i < case_cnt; i++)
793 write_random_case (cf, i);
796 r1 = casefile_get_reader (cf);
797 r2 = casefile_get_reader (cf);
802 for (i = 0; i < case_cnt; i++)
804 read_and_verify_random_case (cf, r1, i);
805 read_and_verify_random_case (cf, r2, i);
809 for (i = 0; i < case_cnt; i++)
810 read_and_verify_random_case (cf, r1, i);
811 for (i = 0; i < case_cnt; i++)
812 read_and_verify_random_case (cf, r2, i);
817 for (i = j = 0; i < case_cnt; i++)
819 read_and_verify_random_case (cf, r1, i);
820 if (gsl_rng_get (rng) % pattern == 0)
821 read_and_verify_random_case (cf, r2, j++);
822 if (i == case_cnt / 2)
823 casefile_to_disk (cf);
825 for (; j < case_cnt; j++)
826 read_and_verify_random_case (cf, r2, j);
829 if (casereader_read (r1, &c))
830 fail_test ("Casereader 1 not at end of file.");
831 if (casereader_read (r2, &c))
832 fail_test ("Casereader 2 not at end of file.");
834 casereader_destroy (r1);
836 casereader_destroy (r2);
839 r1 = casefile_get_destructive_reader (cf);
840 for (i = 0; i < case_cnt; i++)
842 struct ccase read_case, expected_case;
844 get_random_case (&expected_case, value_cnt, i);
845 if (!casereader_read_xfer (r1, &read_case))
846 fail_test ("Premature end of casefile.");
847 for (j = 0; j < value_cnt; j++)
849 double a = case_num (&read_case, j);
850 double b = case_num (&expected_case, j);
852 fail_test ("Case %lu fails comparison.", (unsigned long) i);
854 case_destroy (&expected_case);
855 case_destroy (&read_case);
857 casereader_destroy (r1);
859 casefile_destroy (cf);
864 get_random_case (struct ccase *c, size_t value_cnt, size_t case_idx)
867 case_create (c, value_cnt);
868 for (i = 0; i < value_cnt; i++)
869 case_data_rw (c, i)->f = case_idx % 257 + i;
873 write_random_case (struct casefile *cf, size_t case_idx)
876 get_random_case (&c, casefile_get_value_cnt (cf), case_idx);
877 casefile_append_xfer (cf, &c);
881 read_and_verify_random_case (struct casefile *cf,
882 struct casereader *reader, size_t case_idx)
884 struct ccase read_case, expected_case;
888 value_cnt = casefile_get_value_cnt (cf);
889 get_random_case (&expected_case, value_cnt, case_idx);
890 if (!casereader_read (reader, &read_case))
891 fail_test ("Premature end of casefile.");
892 for (i = 0; i < value_cnt; i++)
894 double a = case_num (&read_case, i);
895 double b = case_num (&expected_case, i);
897 fail_test ("Case %lu fails comparison.", (unsigned long) case_idx);
899 case_destroy (&read_case);
900 case_destroy (&expected_case);
904 fail_test (const char *message, ...)
908 va_start (args, message);
909 vprintf (message, args);