1 /* PSPP - computes sample statistics.
2 Copyright (C) 2004 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
37 #define IO_BUF_SIZE (8192 / sizeof (union value))
39 /* A casefile represents a sequentially accessible stream of
42 If workspace allows, a casefile is maintained in memory. If
43 workspace overflows, then the casefile is pushed to disk. In
44 either case the interface presented to callers is kept the
47 The life cycle of a casefile consists of up to three phases:
49 1. Writing. The casefile initially contains no cases. In
50 this phase, any number of cases may be appended to the
51 end of a casefile. (Cases are never inserted in the
52 middle or before the beginning of a casefile.)
54 Use casefile_append() or casefile_append_xfer() to
55 append a case to a casefile.
57 2. Reading. The casefile may be read sequentially,
58 starting from the beginning, by "casereaders". Any
59 number of casereaders may be created, at any time,
60 during the reading phase. Each casereader has an
61 independent position in the casefile.
63 Casereaders may only move forward. They cannot move
64 backward to arbitrary records or seek randomly. (In the
65 future, the concept of a "casemark" will be introduced
66 to allow a limited form of backward seek, but this has
67 not yet been implemented.)
69 Cloning casereaders is possible, but no one has had a
70 need for it yet, so it is not implemented.
72 Use casefile_get_reader() to create a casereader for
73 use in phase 2. This also transitions from phase 2 to
74 phase 3. Calling casefile_mode_reader() makes the same
75 transition, without creating a casereader.
77 Use casereader_read(), casereader_read_xfer(), or
78 casereader_read_xfer_assert() to read a case from a
79 casereader. Use casereader_destroy() to discard a
80 casereader when it is no longer needed.
82 3. Destruction. This phase is optional. The casefile is
83 also read with casereaders in this phase, but the
84 ability to create new casereaders is curtailed.
86 In this phase, casereaders could still be cloned, and
87 casemarks could still be used to seek backward (given
88 that we eventually implement these functionalities).
90 To transition from phase 1 or 2 to phase 3 and create a
91 casereader, call casefile_get_destructive_reader().
92 The same functions apply to the casereader obtained
93 this way as apply to casereaders obtained in phase 2.
95 After casefile_get_destructive_reader() is called, no
96 more casereaders may be created with
97 casefile_get_reader() or
98 casefile_get_destructive_reader(). (If cloning of
99 casereaders or casemarks were implemented, they would
102 The purpose of the limitations applied to casereaders
103 in phase 3 is to allow in-memory casefiles to fully
104 transfer ownership of cases to the casereaders,
105 avoiding the need for extra copies of case data. For
106 relatively static data sets with many variables, I
107 suspect (without evidence) that this may be a big
110 When a casefile is no longer needed, it may be destroyed with
111 casefile_destroy(). This function will also destroy any
112 remaining casereaders. */
114 /* In-memory cases are arranged in an array of arrays. The top
115 level is variable size and the size of each bottom level array
116 is fixed at the number of cases defined here. */
117 #define CASES_PER_BLOCK 128
123 struct casefile *next, *prev; /* Next, prev in global list. */
124 size_t value_cnt; /* Case size in `union value's. */
125 size_t case_acct_size; /* Case size for accounting. */
126 unsigned long case_cnt; /* Number of cases stored. */
127 enum { MEMORY, DISK } storage; /* Where cases are stored. */
128 enum { WRITE, READ } mode; /* Is writing or reading allowed? */
129 struct casereader *readers; /* List of our readers. */
130 int being_destroyed; /* Does a destructive reader exist? */
132 /* Memory storage. */
133 struct ccase **cases; /* Pointer to array of cases. */
136 int fd; /* File descriptor, -1 if none. */
137 char *filename; /* Filename. */
138 union value *buffer; /* I/O buffer, NULL if none. */
139 size_t buffer_used; /* Number of values used in buffer. */
140 size_t buffer_size; /* Buffer size in values. */
143 /* For reading out the cases in a casefile. */
146 struct casereader *next, *prev; /* Next, prev in casefile's list. */
147 struct casefile *cf; /* Our casefile. */
148 unsigned long case_idx; /* Case number of current case. */
149 int destructive; /* Is this a destructive reader? */
152 int fd; /* File descriptor. */
153 union value *buffer; /* I/O buffer. */
154 size_t buffer_pos; /* Offset of buffer position. */
155 struct ccase c; /* Current case. */
158 /* Return the case number of the current case */
160 casereader_cnum(const struct casereader *r)
165 /* Doubly linked list of all casefiles. */
166 static struct casefile *casefiles;
168 /* Number of bytes of case allocated in in-memory casefiles. */
169 static size_t case_bytes;
171 static void register_atexit (void);
172 static void exit_handler (void);
174 static void reader_open_file (struct casereader *reader);
175 static void write_case_to_disk (struct casefile *cf, const struct ccase *c);
176 static void flush_buffer (struct casefile *cf);
177 static void fill_buffer (struct casereader *reader);
179 static int safe_open (const char *filename, int flags);
180 static int safe_close (int fd);
181 static int full_read (int fd, void *buffer, size_t size);
182 static int full_write (int fd, const void *buffer, size_t size);
184 /* Creates and returns a casefile to store cases of VALUE_CNT
185 `union value's each. */
187 casefile_create (size_t value_cnt)
189 struct casefile *cf = xmalloc (sizeof *cf);
190 cf->next = casefiles;
192 if (cf->next != NULL)
195 cf->value_cnt = value_cnt;
196 cf->case_acct_size = (cf->value_cnt + 4) * sizeof *cf->buffer;
198 cf->storage = MEMORY;
201 cf->being_destroyed = 0;
206 cf->buffer_size = ROUND_UP (cf->value_cnt, IO_BUF_SIZE);
207 if (cf->value_cnt > 0 && cf->buffer_size % cf->value_cnt > 64)
208 cf->buffer_size = cf->value_cnt;
214 /* Destroys casefile CF. */
216 casefile_destroy (struct casefile *cf)
220 if (cf->next != NULL)
221 cf->next->prev = cf->prev;
222 if (cf->prev != NULL)
223 cf->prev->next = cf->next;
225 casefiles = cf->next;
227 while (cf->readers != NULL)
228 casereader_destroy (cf->readers);
230 if (cf->cases != NULL)
232 size_t idx, block_cnt;
234 case_bytes -= cf->case_cnt * cf->case_acct_size;
235 for (idx = 0; idx < cf->case_cnt; idx++)
237 size_t block_idx = idx / CASES_PER_BLOCK;
238 size_t case_idx = idx % CASES_PER_BLOCK;
239 struct ccase *c = &cf->cases[block_idx][case_idx];
243 block_cnt = DIV_RND_UP (cf->case_cnt, CASES_PER_BLOCK);
244 for (idx = 0; idx < block_cnt; idx++)
245 free (cf->cases[idx]);
253 if (cf->filename != NULL && remove (cf->filename) == -1)
254 msg (ME, _("%s: Removing temporary file: %s."),
255 cf->filename, strerror (errno));
264 /* Returns nonzero only if casefile CF is stored in memory (instead of on
267 casefile_in_core (const struct casefile *cf)
271 return cf->storage == MEMORY;
274 /* Puts a casefile to "sleep", that is, minimizes the resources
275 needed for it by closing its file descriptor and freeing its
276 buffer. This is useful if we need so many casefiles that we
277 might not have enough memory and file descriptors to go
280 For simplicity, this implementation always converts the
281 casefile to reader mode. If this turns out to be a problem,
282 with a little extra work we could also support sleeping
285 casefile_sleep (const struct casefile *cf_)
287 struct casefile *cf = (struct casefile *) cf_;
290 casefile_mode_reader (cf);
291 casefile_to_disk (cf);
299 if (cf->buffer != NULL)
306 /* Returns the number of `union value's in a case for CF. */
308 casefile_get_value_cnt (const struct casefile *cf)
312 return cf->value_cnt;
315 /* Returns the number of cases in casefile CF. */
317 casefile_get_case_cnt (const struct casefile *cf)
324 /* Appends a copy of case C to casefile CF. Not valid after any
325 reader for CF has been created. */
327 casefile_append (struct casefile *cf, const struct ccase *c)
331 assert (cf->mode == WRITE);
333 /* Try memory first. */
334 if (cf->storage == MEMORY)
336 if (case_bytes < get_max_workspace ())
338 size_t block_idx = cf->case_cnt / CASES_PER_BLOCK;
339 size_t case_idx = cf->case_cnt % CASES_PER_BLOCK;
340 struct ccase new_case;
342 case_bytes += cf->case_acct_size;
343 case_clone (&new_case, c);
346 if ((block_idx & (block_idx - 1)) == 0)
348 size_t block_cap = block_idx == 0 ? 1 : block_idx * 2;
349 cf->cases = xrealloc (cf->cases,
350 sizeof *cf->cases * block_cap);
353 cf->cases[block_idx] = xmalloc (sizeof **cf->cases
357 case_move (&cf->cases[block_idx][case_idx], &new_case);
361 casefile_to_disk (cf);
362 assert (cf->storage == DISK);
363 write_case_to_disk (cf, c);
367 write_case_to_disk (cf, c);
372 /* Appends case C to casefile CF, which takes over ownership of
373 C. Not valid after any reader for CF has been created. */
375 casefile_append_xfer (struct casefile *cf, struct ccase *c)
377 casefile_append (cf, c);
381 /* Writes case C to casefile CF's disk buffer, first flushing the buffer to
382 disk if it would otherwise overflow. */
384 write_case_to_disk (struct casefile *cf, const struct ccase *c)
386 case_to_values (c, cf->buffer + cf->buffer_used, cf->value_cnt);
387 cf->buffer_used += cf->value_cnt;
388 if (cf->buffer_used + cf->value_cnt > cf->buffer_size)
392 /* If any bytes in CF's output buffer are used, flush them to
395 flush_buffer (struct casefile *cf)
397 if (cf->buffer_used > 0)
399 if (!full_write (cf->fd, cf->buffer,
400 cf->buffer_size * sizeof *cf->buffer))
401 msg (FE, _("Error writing temporary file: %s."), strerror (errno));
408 /* If CF is currently stored in memory, writes it to disk. Readers, if any,
409 retain their current positions. */
411 casefile_to_disk (const struct casefile *cf_)
413 struct casefile *cf = (struct casefile *) cf_;
414 struct casereader *reader;
418 if (cf->storage == MEMORY)
420 size_t idx, block_cnt;
422 assert (cf->filename == NULL);
423 assert (cf->fd == -1);
424 assert (cf->buffer_used == 0);
427 if (!make_temp_file (&cf->fd, &cf->filename))
429 cf->buffer = xmalloc (cf->buffer_size * sizeof *cf->buffer);
430 memset (cf->buffer, 0, cf->buffer_size * sizeof *cf->buffer);
432 case_bytes -= cf->case_cnt * cf->case_acct_size;
433 for (idx = 0; idx < cf->case_cnt; idx++)
435 size_t block_idx = idx / CASES_PER_BLOCK;
436 size_t case_idx = idx % CASES_PER_BLOCK;
437 struct ccase *c = &cf->cases[block_idx][case_idx];
438 write_case_to_disk (cf, c);
442 block_cnt = DIV_RND_UP (cf->case_cnt, CASES_PER_BLOCK);
443 for (idx = 0; idx < block_cnt; idx++)
444 free (cf->cases[idx]);
449 if (cf->mode == READ)
452 for (reader = cf->readers; reader != NULL; reader = reader->next)
453 reader_open_file (reader);
457 /* Changes CF to reader mode, ensuring that no more cases may be
458 added. Creating a casereader for CF has the same effect. */
460 casefile_mode_reader (struct casefile *cf)
466 /* Creates and returns a casereader for CF. A casereader can be used to
467 sequentially read the cases in a casefile. */
469 casefile_get_reader (const struct casefile *cf_)
471 struct casefile *cf = (struct casefile *) cf_;
472 struct casereader *reader;
475 assert (!cf->being_destroyed);
477 /* Flush the buffer to disk if it's not empty. */
478 if (cf->mode == WRITE && cf->storage == DISK)
483 reader = xmalloc (sizeof *reader);
484 reader->next = cf->readers;
485 if (cf->readers != NULL)
486 reader->next->prev = reader;
487 cf->readers = reader;
490 reader->case_idx = 0;
491 reader->destructive = 0;
493 reader->buffer = NULL;
494 reader->buffer_pos = 0;
495 case_nullify (&reader->c);
497 if (reader->cf->storage == DISK)
498 reader_open_file (reader);
503 /* Creates and returns a destructive casereader for CF. Like a
504 normal casereader, a destructive casereader sequentially reads
505 the cases in a casefile. Unlike a normal casereader, a
506 destructive reader cannot operate concurrently with any other
507 reader. (This restriction could be relaxed in a few ways, but
508 it is so far unnecessary for other code.) */
510 casefile_get_destructive_reader (struct casefile *cf)
512 struct casereader *reader;
514 assert (cf->readers == NULL);
515 reader = casefile_get_reader (cf);
516 reader->destructive = 1;
517 cf->being_destroyed = 1;
521 /* Opens a disk file for READER and seeks to the current position as indicated
522 by case_idx. Normally the current position is the beginning of the file,
523 but casefile_to_disk may cause the file to be opened at a different
526 reader_open_file (struct casereader *reader)
528 struct casefile *cf = reader->cf;
531 if (reader->case_idx >= cf->case_cnt)
541 reader->fd = safe_open (cf->filename, O_RDONLY);
543 msg (FE, _("%s: Opening temporary file: %s."),
544 cf->filename, strerror (errno));
547 if (cf->buffer != NULL)
549 reader->buffer = cf->buffer;
554 reader->buffer = xmalloc (cf->buffer_size * sizeof *cf->buffer);
555 memset (reader->buffer, 0, cf->buffer_size * sizeof *cf->buffer);
558 if (cf->value_cnt != 0)
560 size_t buffer_case_cnt = cf->buffer_size / cf->value_cnt;
561 file_ofs = ((off_t) reader->case_idx / buffer_case_cnt
562 * cf->buffer_size * sizeof *cf->buffer);
563 reader->buffer_pos = (reader->case_idx % buffer_case_cnt
568 if (lseek (reader->fd, file_ofs, SEEK_SET) != file_ofs)
569 msg (FE, _("%s: Seeking temporary file: %s."),
570 cf->filename, strerror (errno));
572 if (cf->case_cnt > 0 && cf->value_cnt > 0)
573 fill_buffer (reader);
575 case_create (&reader->c, cf->value_cnt);
578 /* Fills READER's buffer by reading a block from disk. */
580 fill_buffer (struct casereader *reader)
582 int retval = full_read (reader->fd, reader->buffer,
583 reader->cf->buffer_size * sizeof *reader->buffer);
585 msg (FE, _("%s: Reading temporary file: %s."),
586 reader->cf->filename, strerror (errno));
587 else if (retval != reader->cf->buffer_size * sizeof *reader->buffer)
588 msg (FE, _("%s: Temporary file ended unexpectedly."),
589 reader->cf->filename);
592 /* Returns the casefile that READER reads. */
593 const struct casefile *
594 casereader_get_casefile (const struct casereader *reader)
596 assert (reader != NULL);
601 /* Reads a copy of the next case from READER into C.
602 Caller is responsible for destroying C.
603 Returns true if successful, false at end of file. */
605 casereader_read (struct casereader *reader, struct ccase *c)
607 assert (reader != NULL);
609 if (reader->case_idx >= reader->cf->case_cnt)
612 if (reader->cf->storage == MEMORY)
614 size_t block_idx = reader->case_idx / CASES_PER_BLOCK;
615 size_t case_idx = reader->case_idx % CASES_PER_BLOCK;
617 case_clone (c, &reader->cf->cases[block_idx][case_idx]);
623 if (reader->buffer_pos + reader->cf->value_cnt > reader->cf->buffer_size)
625 fill_buffer (reader);
626 reader->buffer_pos = 0;
629 case_from_values (&reader->c, reader->buffer + reader->buffer_pos,
630 reader->cf->value_cnt);
631 reader->buffer_pos += reader->cf->value_cnt;
634 case_clone (c, &reader->c);
639 /* Reads the next case from READER into C and transfers ownership
640 to the caller. Caller is responsible for destroying C.
641 Returns true if successful, false at end of file. */
643 casereader_read_xfer (struct casereader *reader, struct ccase *c)
645 assert (reader != NULL);
647 if (reader->destructive == 0
648 || reader->case_idx >= reader->cf->case_cnt
649 || reader->cf->storage == DISK)
650 return casereader_read (reader, c);
653 size_t block_idx = reader->case_idx / CASES_PER_BLOCK;
654 size_t case_idx = reader->case_idx % CASES_PER_BLOCK;
655 struct ccase *read_case = &reader->cf->cases[block_idx][case_idx];
657 case_move (c, read_case);
663 /* Reads the next case from READER into C and transfers ownership
664 to the caller. Caller is responsible for destroying C.
665 Assert-fails at end of file. */
667 casereader_read_xfer_assert (struct casereader *reader, struct ccase *c)
669 bool success = casereader_read_xfer (reader, c);
673 /* Destroys READER. */
675 casereader_destroy (struct casereader *reader)
677 assert (reader != NULL);
679 if (reader->next != NULL)
680 reader->next->prev = reader->prev;
681 if (reader->prev != NULL)
682 reader->prev->next = reader->next;
683 if (reader->cf->readers == reader)
684 reader->cf->readers = reader->next;
686 if (reader->cf->buffer == NULL)
687 reader->cf->buffer = reader->buffer;
689 free (reader->buffer);
691 if (reader->fd != -1)
693 if (reader->cf->fd == -1)
694 reader->cf->fd = reader->fd;
696 safe_close (reader->fd);
699 case_destroy (&reader->c);
704 /* Calls open(), passing FILENAME and FLAGS, repeating as necessary
705 to deal with interrupted calls. */
707 safe_open (const char *filename, int flags)
713 fd = open (filename, flags);
715 while (fd == -1 && errno == EINTR);
720 /* Calls close(), passing FD, repeating as necessary to deal with
721 interrupted calls. */
722 static int safe_close (int fd)
730 while (retval == -1 && errno == EINTR);
735 /* Calls read(), passing FD, BUFFER, and SIZE, repeating as
736 necessary to deal with interrupted calls. */
738 full_read (int fd, void *buffer_, size_t size)
740 char *buffer = buffer_;
741 size_t bytes_read = 0;
743 while (bytes_read < size)
745 int retval = read (fd, buffer + bytes_read, size - bytes_read);
747 bytes_read += retval;
748 else if (retval == 0)
750 else if (errno != EINTR)
757 /* Calls write(), passing FD, BUFFER, and SIZE, repeating as
758 necessary to deal with interrupted calls. */
760 full_write (int fd, const void *buffer_, size_t size)
762 const char *buffer = buffer_;
763 size_t bytes_written = 0;
765 while (bytes_written < size)
767 int retval = write (fd, buffer + bytes_written, size - bytes_written);
769 bytes_written += retval;
770 else if (errno != EINTR)
774 return bytes_written;
778 /* Registers our exit handler with atexit() if it has not already
781 register_atexit (void)
783 static int registered = 0;
787 atexit (exit_handler);
793 /* atexit() handler that closes and deletes our temporary
798 while (casefiles != NULL)
799 casefile_destroy (casefiles);
802 #include <gsl/gsl_rng.h>
807 static void test_casefile (int pattern, size_t value_cnt, size_t case_cnt);
808 static void get_random_case (struct ccase *, size_t value_cnt,
810 static void write_random_case (struct casefile *cf, size_t case_idx);
811 static void read_and_verify_random_case (struct casefile *cf,
812 struct casereader *reader,
814 static void fail_test (const char *message, ...);
817 cmd_debug_casefile (void)
819 static const size_t sizes[] =
821 1, 2, 3, 4, 5, 6, 7, 14, 15, 16, 17, 31, 55, 73,
822 100, 137, 257, 521, 1031, 2053
828 size_max = sizeof sizes / sizeof *sizes;
829 if (lex_match_id ("SMALL"))
837 return lex_end_of_command ();
839 for (pattern = 0; pattern < 6; pattern++)
843 for (size = sizes; size < sizes + size_max; size++)
847 for (case_cnt = 0; case_cnt <= case_max;
848 case_cnt = (case_cnt * 2) + 1)
849 test_casefile (pattern, *size, case_cnt);
852 printf ("Casefile tests succeeded.\n");
857 test_casefile (int pattern, size_t value_cnt, size_t case_cnt)
860 struct casereader *r1, *r2;
865 rng = gsl_rng_alloc (gsl_rng_mt19937);
866 cf = casefile_create (value_cnt);
868 casefile_to_disk (cf);
869 for (i = 0; i < case_cnt; i++)
870 write_random_case (cf, i);
873 r1 = casefile_get_reader (cf);
874 r2 = casefile_get_reader (cf);
879 for (i = 0; i < case_cnt; i++)
881 read_and_verify_random_case (cf, r1, i);
882 read_and_verify_random_case (cf, r2, i);
886 for (i = 0; i < case_cnt; i++)
887 read_and_verify_random_case (cf, r1, i);
888 for (i = 0; i < case_cnt; i++)
889 read_and_verify_random_case (cf, r2, i);
894 for (i = j = 0; i < case_cnt; i++)
896 read_and_verify_random_case (cf, r1, i);
897 if (gsl_rng_get (rng) % pattern == 0)
898 read_and_verify_random_case (cf, r2, j++);
899 if (i == case_cnt / 2)
900 casefile_to_disk (cf);
902 for (; j < case_cnt; j++)
903 read_and_verify_random_case (cf, r2, j);
906 if (casereader_read (r1, &c))
907 fail_test ("Casereader 1 not at end of file.");
908 if (casereader_read (r2, &c))
909 fail_test ("Casereader 2 not at end of file.");
911 casereader_destroy (r1);
913 casereader_destroy (r2);
916 r1 = casefile_get_destructive_reader (cf);
917 for (i = 0; i < case_cnt; i++)
919 struct ccase read_case, expected_case;
921 get_random_case (&expected_case, value_cnt, i);
922 if (!casereader_read_xfer (r1, &read_case))
923 fail_test ("Premature end of casefile.");
924 for (j = 0; j < value_cnt; j++)
926 double a = case_num (&read_case, j);
927 double b = case_num (&expected_case, j);
929 fail_test ("Case %lu fails comparison.", (unsigned long) i);
931 case_destroy (&expected_case);
932 case_destroy (&read_case);
934 casereader_destroy (r1);
936 casefile_destroy (cf);
941 get_random_case (struct ccase *c, size_t value_cnt, size_t case_idx)
944 case_create (c, value_cnt);
945 for (i = 0; i < value_cnt; i++)
946 case_data_rw (c, i)->f = case_idx % 257 + i;
950 write_random_case (struct casefile *cf, size_t case_idx)
953 get_random_case (&c, casefile_get_value_cnt (cf), case_idx);
954 casefile_append_xfer (cf, &c);
958 read_and_verify_random_case (struct casefile *cf,
959 struct casereader *reader, size_t case_idx)
961 struct ccase read_case, expected_case;
965 value_cnt = casefile_get_value_cnt (cf);
966 get_random_case (&expected_case, value_cnt, case_idx);
967 if (!casereader_read (reader, &read_case))
968 fail_test ("Premature end of casefile.");
969 for (i = 0; i < value_cnt; i++)
971 double a = case_num (&read_case, i);
972 double b = case_num (&expected_case, i);
974 fail_test ("Case %lu fails comparison.", (unsigned long) case_idx);
976 case_destroy (&read_case);
977 case_destroy (&expected_case);
981 fail_test (const char *message, ...)
985 va_start (args, message);
986 vprintf (message, args);