1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
44 #include <sys/types.h>
51 #include "debug-print.h"
53 /* Variables to sort. */
54 struct variable **v_sort;
57 /* Used when internal-sorting to a separate file. */
58 static struct case_list **separate_case_tab;
60 /* Other prototypes. */
61 static int compare_case_lists (const void *, const void *);
62 static int do_internal_sort (int separate);
63 static int do_external_sort (int separate);
64 int parse_sort_variables (void);
65 void read_sort_output (int (*write_case) (void));
67 /* Performs the SORT CASES procedures. */
71 /* First, just parse the command. */
72 lex_match_id ("SORT");
73 lex_match_id ("CASES");
76 if (!parse_sort_variables ())
81 /* Then it's time to do the actual work. There are two cases:
83 (internal sort) All the data is in memory. In this case, we
84 perform an EXECUTE to get the data into the desired form, then
85 sort the cases in place, if it is still all in memory.
87 (external sort) The data is not in memory. It may be coming from
88 a system file or other data file, etc. In any case, it is now
89 time to perform an external sort. This is better explained in
90 do_external_sort(). */
92 /* Do all this dirty work. */
94 int success = sort_cases (0);
97 return lex_end_of_command ();
103 /* Parses a list of sort variables into v_sort and nv_sort. */
105 parse_sort_variables (void)
111 int prev_nv_sort = nv_sort;
112 int order = SRT_ASCEND;
114 if (!parse_variables (default_dict, &v_sort, &nv_sort,
115 PV_NO_DUPLICATE | PV_APPEND | PV_NO_SCRATCH))
119 if (lex_match_id ("D") || lex_match_id ("DOWN"))
121 else if (!lex_match_id ("A") && !lex_match_id ("UP"))
124 msg (SE, _("`A' or `D' expected inside parentheses."));
127 if (!lex_match (')'))
130 msg (SE, _("`)' expected."));
134 for (; prev_nv_sort < nv_sort; prev_nv_sort++)
135 v_sort[prev_nv_sort]->p.srt.order = order;
137 while (token != '.' && token != '/');
142 /* Sorts the active file based on the key variables specified in
143 global variables v_sort and nv_sort. The output is either to the
144 active file, if SEPARATE is zero, or to a separate file, if
145 SEPARATE is nonzero. In the latter case the output cases can be
146 read with a call to read_sort_output(). (In the former case the
147 output cases should be dealt with through the usual vfm interface.)
149 The caller is responsible for freeing v_sort[]. */
151 sort_cases (int separate)
153 assert (separate_case_tab == NULL);
155 /* Not sure this is necessary but it's good to be safe. */
156 if (separate && vfm_source == &sort_stream)
157 procedure (NULL, NULL, NULL);
159 /* SORT CASES cancels PROCESS IF. */
160 expr_free (process_if_expr);
161 process_if_expr = NULL;
163 if (do_internal_sort (separate))
167 return do_external_sort (separate);
170 /* If a reasonable situation is set up, do an internal sort of the
171 data. Return success. */
173 do_internal_sort (int separate)
175 if (vfm_source != &vfm_disk_stream)
177 if (vfm_source != &vfm_memory_stream)
178 procedure (NULL, NULL, NULL);
179 if (vfm_source == &vfm_memory_stream)
181 struct case_list **case_tab = malloc (sizeof *case_tab
182 * (vfm_source_info.ncases + 1));
183 if (vfm_source_info.ncases == 0)
188 if (case_tab != NULL)
190 struct case_list *clp = memory_source_cases;
191 struct case_list **ctp = case_tab;
194 for (; clp; clp = clp->next)
196 qsort (case_tab, vfm_source_info.ncases, sizeof *case_tab,
201 memory_source_cases = case_tab[0];
202 for (i = 1; i < vfm_source_info.ncases; i++)
203 case_tab[i - 1]->next = case_tab[i];
204 case_tab[vfm_source_info.ncases - 1]->next = NULL;
207 case_tab[vfm_source_info.ncases] = NULL;
208 separate_case_tab = case_tab;
218 /* Compares the NV_SORT variables in V_SORT[] between the
219 `case_list's at A and B, and returns a strcmp()-type
222 compare_case_lists (const void *a_, const void *b_)
224 struct case_list *const *pa = a_;
225 struct case_list *const *pb = b_;
226 struct case_list *a = *pa;
227 struct case_list *b = *pb;
232 for (i = 0; i < nv_sort; i++)
236 if (v->type == NUMERIC)
238 double af = a->c.data[v->fv].f;
239 double bf = b->c.data[v->fv].f;
241 result = af < bf ? -1 : af > bf;
244 result = memcmp (a->c.data[v->fv].s, b->c.data[v->fv].s, v->width);
250 if (v->p.srt.order == SRT_DESCEND)
257 /* Maximum number of input + output file handles. */
258 #if defined FOPEN_MAX && (FOPEN_MAX - 5 < 18)
259 #define MAX_FILE_HANDLES (FOPEN_MAX - 5)
261 #define MAX_FILE_HANDLES 18
264 #if MAX_FILE_HANDLES < 3
265 #error At least 3 file handles must be available for sorting.
268 /* Number of input buffers. */
269 #define N_INPUT_BUFFERS (MAX_FILE_HANDLES - 1)
271 /* Maximum order of merge. This is the value suggested by Knuth;
272 specifically, he said to use tree selection, which we don't
273 implement, for larger orders of merge. */
274 #define MAX_MERGE_ORDER 7
276 /* Minimum total number of records for buffers. */
277 #define MIN_BUFFER_TOTAL_SIZE_RECS 64
279 /* Minimum single input or output buffer size, in bytes and records. */
280 #define MIN_BUFFER_SIZE_BYTES 4096
281 #define MIN_BUFFER_SIZE_RECS 16
283 /* Structure for replacement selection tree. */
286 struct repl_sel_tree *loser;/* Loser associated w/this internal node. */
287 int rn; /* Run number of `loser'. */
288 struct repl_sel_tree *fe; /* Internal node above this external node. */
289 struct repl_sel_tree *fi; /* Internal node above this internal node. */
290 union value record[1]; /* The case proper. */
293 /* Static variables used for sorting. */
294 static struct repl_sel_tree **x; /* Buffers. */
295 static int x_max; /* Size of buffers, in records. */
296 static int records_per_buffer; /* Number of records in each buffer. */
298 /* In the merge phase, the first N_INPUT_BUFFERS handle[] elements are
299 input files and the last element is the output file. Before that,
300 they're all used as output files, although the last one is
302 static FILE *handle[MAX_FILE_HANDLES]; /* File handles. */
304 /* Now, MAX_FILE_HANDLES is the maximum number of files we will *try*
305 to open. But if we can't open that many, max_handles will be set
306 to the number we apparently can open. */
307 static int max_handles; /* Maximum number of handles. */
309 /* When we create temporary files, they are all put in the same
310 directory and numbered sequentially from zero. tmp_basename is the
311 drive/directory, etc., and tmp_extname can be sprintf() with "%08x"
312 to the file number, then tmp_basename used to open the file. */
313 static char *tmp_basename; /* Temporary file basename. */
314 static char *tmp_extname; /* Temporary file extension name. */
316 /* We use Huffman's method to determine the merge pattern. This means
317 that we need to know which runs are the shortest at any given time.
318 Priority queues as implemented by heap.c are a natural for this
319 task (probably because I wrote the code specifically for it). */
320 static struct heap *huffman_queue; /* Huffman priority queue. */
322 /* Prototypes for helper functions. */
323 static void sort_stream_write (void);
324 static int write_initial_runs (int separate);
325 static int allocate_cases (void);
326 static int allocate_file_handles (void);
327 static int merge (void);
328 static void rmdir_temp_dir (void);
330 /* Performs an external sort of the active file. A description of the
331 procedure follows. All page references refer to Knuth's _Art of
332 Computer Programming, Vol. 3: Sorting and Searching_, which is the
333 canonical resource for sorting.
335 1. The data is read and S initial runs are formed through the
336 action of algorithm 5.4.1R (replacement selection).
338 2. Huffman's method (p. 365-366) is used to determine the optimum
341 3. If an OS that supports overlapped reading, writing, and
342 computing is being run, we should use 5.4.6F for forecasting.
343 Otherwise, buffers are filled only when they run out of data.
344 FIXME: Since the author of PSPP uses GNU/Linux, which does not
345 yet implement overlapped r/w/c, 5.4.6F is not used.
347 4. We perform P-way merges:
349 (a) The desired P is the smallest P such that ceil(ln(S)/ln(P))
350 is minimized. (FIXME: Since I don't have an algorithm for
351 minimizing this, it's just set to MAX_MERGE_ORDER.)
353 (b) P is reduced if the selected value would make input buffers
354 less than 4096 bytes each, or 16 records, whichever is larger.
356 (c) P is reduced if we run out of available file handles or space
359 (d) P is reduced if we don't have space for one or two output
360 buffers, which have the same minimum size as input buffers. (We
361 need two output buffers if 5.4.6F is in use for forecasting.) */
363 do_external_sort (int separate)
367 assert (MAX_FILE_HANDLES >= 3);
372 huffman_queue = heap_create (512);
373 if (huffman_queue == NULL)
376 if (!allocate_cases ())
379 if (!allocate_file_handles ())
382 if (!write_initial_runs (separate))
389 /* Despite the name, flow of control comes here regardless of
390 whether or not the sort is successful. */
392 heap_destroy (huffman_queue);
398 for (i = 0; i <= x_max; i++)
413 /* Sets up to open temporary files. */
414 /* PORTME: This creates a directory for temporary files. Some OSes
415 might not have that concept... */
417 allocate_file_handles (void)
419 const char *dir; /* Directory prefix. */
420 char *buf; /* String buffer. */
421 char *cp; /* Pointer into buf. */
423 dir = getenv ("SPSSTMPDIR");
425 dir = getenv ("SPSSXTMPDIR");
427 dir = getenv ("TMPDIR");
435 #elif defined (__MSDOS__)
437 dir = getenv ("TEMP");
439 dir = getenv ("TMP");
446 buf = xmalloc (strlen (dir) + 1 + 4 + 8 + 4 + 1 + INT_DIGITS + 1);
447 cp = spprintf (buf, "%s%c%04lX%04lXpspp", dir, DIR_SEPARATOR,
448 ((long) time (0)) & 0xffff, ((long) getpid ()) & 0xffff);
450 if (-1 == mkdir (buf, S_IRWXU))
452 if (-1 == mkdir (buf))
456 msg (SE, _("%s: Cannot create temporary directory: %s."),
457 buf, strerror (errno));
460 *cp++ = DIR_SEPARATOR;
465 max_handles = MAX_FILE_HANDLES;
470 /* Removes the directory created for temporary files, if one exists.
471 Also frees tmp_basename. */
473 rmdir_temp_dir (void)
475 if (NULL == tmp_basename)
478 tmp_extname[-1] = '\0';
479 if (rmdir (tmp_basename) == -1)
480 msg (SE, _("%s: Error removing directory for temporary files: %s."),
481 tmp_basename, strerror (errno));
486 /* Allocates room for lots of cases as a buffer. */
488 allocate_cases (void)
490 /* This is the size of one case. */
491 const int case_size = (sizeof (struct repl_sel_tree)
492 + (sizeof (union value)
493 * (dict_get_value_cnt (default_dict) - 1))
494 + sizeof (struct repl_sel_tree *));
498 /* Allocate as many cases as we can, assuming a space of four
499 void pointers for malloc()'s internal bookkeeping. */
500 x_max = MAX_WORKSPACE / (case_size + 4 * sizeof (void *));
501 x = malloc (sizeof (struct repl_sel_tree *) * x_max);
506 for (i = 0; i < x_max; i++)
508 x[i] = malloc (sizeof (struct repl_sel_tree)
509 + (sizeof (union value)
510 * (dict_get_value_cnt (default_dict) - 1)));
516 if (x == NULL || x_max < MIN_BUFFER_TOTAL_SIZE_RECS)
522 for (i = 0; i < x_max; i++)
526 msg (SE, _("Out of memory. Could not allocate room for minimum of %d "
527 "cases of %d bytes each. (PSPP workspace is currently "
528 "restricted to a maximum of %d KB.)"),
529 MIN_BUFFER_TOTAL_SIZE_RECS, case_size, MAX_WORKSPACE / 1024);
535 /* The last element of the array is used to store lastkey. */
538 debug_printf ((_("allocated %d cases == %d bytes\n"),
539 x_max, x_max * case_size));
543 /* Replacement selection. */
545 static int rmax, rc, rq;
546 static struct repl_sel_tree *q;
547 static union value *lastkey;
548 static int run_no, file_index;
549 static int deferred_abort;
550 static int run_length;
552 static int compare_record (union value *, union value *);
555 output_record (union value *v)
557 union value *src_case;
562 if (compaction_necessary)
564 compact_case (compaction_case, (struct ccase *) v);
565 src_case = (union value *) compaction_case;
568 src_case = (union value *) v;
570 if ((int) fwrite (src_case, sizeof *src_case, compaction_nval,
575 sprintf (tmp_extname, "%08x", run_no);
576 msg (SE, _("%s: Error writing temporary file: %s."),
577 tmp_basename, strerror (errno));
587 int result = fclose (handle[i]);
588 msg (VM (2), _("SORT: Closing handle %d."), i);
593 sprintf (tmp_extname, "%08x", i);
594 msg (SE, _("%s: Error closing temporary file: %s."),
595 tmp_basename, strerror (errno));
602 close_handles (int beg, int end)
607 for (i = beg; i < end; i++)
608 success &= close_handle (i);
613 open_handle_w (int handle_no, int run_no)
615 sprintf (tmp_extname, "%08x", run_no);
616 msg (VM (1), _("SORT: %s: Opening for writing as run %d."),
617 tmp_basename, run_no);
619 /* The `x' modifier causes the GNU C library to insist on creating a
620 new file--if the file already exists, an error is signaled. The
621 ANSI C standard says that other libraries should ignore anything
622 after the `w+b', so it shouldn't be a problem. */
623 return NULL != (handle[handle_no] = fopen (tmp_basename, "w+bx"));
627 open_handle_r (int handle_no, int run_no)
631 sprintf (tmp_extname, "%08x", run_no);
632 msg (VM (1), _("SORT: %s: Opening for writing as run %d."),
633 tmp_basename, run_no);
634 f = handle[handle_no] = fopen (tmp_basename, "rb");
638 msg (SE, _("%s: Error opening temporary file for reading: %s."),
639 tmp_basename, strerror (errno));
646 /* Begins a new initial run, specifically its output file. */
650 /* Decide which handle[] to use. If run_no is max_handles or
651 greater, then we've run out of handles so it's time to just do
652 one file at a time, which by default is handle 0. */
653 file_index = (run_no < max_handles ? run_no : 0);
656 /* Alright, now create the temporary file. */
657 if (open_handle_w (file_index, run_no) == 0)
659 /* Failure to create the temporary file. Check if there are
660 unacceptably few files already open. */
664 msg (SE, _("%s: Error creating temporary file: %s."),
665 tmp_basename, strerror (errno));
669 /* Close all the open temporary files. */
670 if (!close_handles (0, file_index))
673 /* Now try again to create the temporary file. */
674 max_handles = file_index;
676 if (open_handle_w (0, run_no) == 0)
678 /* It still failed, report it this time. */
680 msg (SE, _("%s: Error creating temporary file: %s."),
681 tmp_basename, strerror (errno));
687 /* Ends the current initial run. Just increments run_no if no initial
688 run has been started yet. */
692 /* Close file handles if necessary. */
696 if (run_no == max_handles - 1)
697 result = close_handles (0, max_handles);
698 else if (run_no >= max_handles)
699 result = close_handle (0);
706 /* Advance to next run. */
709 heap_insert (huffman_queue, run_no - 1, run_length);
712 /* Performs 5.4.1R. */
714 write_initial_runs (int separate)
719 /* Steps R1, R2, R3. */
728 for (j = 0; j < x_max; j++)
730 struct repl_sel_tree *J = x[j];
734 J->fe = x[(x_max + j) / 2];
736 memset (J->record, 0,
737 dict_get_value_cnt (default_dict) * sizeof (union value));
741 /* Most of the iterations of steps R4, R5, R6, R7, R2, R3, ... */
745 vfm_sink->destroy_sink ();
746 vfm_sink = &sort_stream;
748 procedure (NULL, NULL, NULL);
750 /* Final iterations of steps R4, R5, R6, R7, R2, R3, ... */
753 struct repl_sel_tree *t;
767 && compare_record (t->loser->record, q->record) < 0))
769 struct repl_sel_tree *temp_tree;
772 temp_tree = t->loser;
800 output_record (q->record);
801 lastkey = x[x_max]->record;
802 memcpy (lastkey, q->record, sizeof (union value) * vfm_sink_info.nval);
805 assert (run_no == rmax);
807 /* If an unrecoverable error occurred somewhere in the above code,
808 then the `deferred_abort' flag would have been set. */
813 for (i = 0; i < max_handles; i++)
814 if (handle[i] != NULL)
816 sprintf (tmp_extname, "%08x", i);
818 if (fclose (handle[i]) == EOF)
819 msg (SE, _("%s: Error closing temporary file: %s."),
820 tmp_basename, strerror (errno));
822 if (remove (tmp_basename) != 0)
823 msg (SE, _("%s: Error removing temporary file: %s."),
824 tmp_basename, strerror (errno));
834 /* Compares the NV_SORT variables in V_SORT[] between the `value's at
835 A and B, and returns a strcmp()-type result. */
837 compare_record (union value * a, union value * b)
844 if (b == NULL) /* Sort NULLs after everything else. */
847 for (i = 0; i < nv_sort; i++)
851 if (v->type == NUMERIC)
853 if (approx_ne (a[v->fv].f, b[v->fv].f))
855 result = (a[v->fv].f > b[v->fv].f) ? 1 : -1;
861 result = memcmp (a[v->fv].s, b[v->fv].s, v->width);
867 if (v->p.srt.order == SRT_ASCEND)
871 assert (v->p.srt.order == SRT_DESCEND);
878 static int merge_once (int run_index[], int run_length[], int n_runs);
880 /* Modula function as defined by Knuth. */
888 result = abs (x) % abs (y);
894 /* Performs a series of P-way merges of initial runs using Huffman's
899 /* Order of merge. */
903 assert (MIN_BUFFER_SIZE_RECS * 2 <= MIN_BUFFER_TOTAL_SIZE_RECS - 1);
905 /* Close all the input files. I hope that the boundary conditions
906 are correct on this but I'm not sure. */
907 if (run_no < max_handles)
911 for (i = 0; i < run_no; )
912 if (!close_handle (i++))
914 for (; i < run_no; i++)
920 /* Determine order of merge. */
921 order = MAX_MERGE_ORDER;
922 if (x_max / order < MIN_BUFFER_SIZE_RECS)
923 order = x_max / MIN_BUFFER_SIZE_RECS;
924 else if (x_max / order * sizeof (union value) * dict_get_value_cnt (default_dict)
925 < MIN_BUFFER_SIZE_BYTES)
926 order = x_max / (MIN_BUFFER_SIZE_BYTES
927 / (sizeof (union value)
928 * (dict_get_value_cnt (default_dict) - 1)));
930 /* Make sure the order of merge is bounded. */
935 assert (x_max / order > 0);
937 /* Calculate number of records per buffer. */
938 records_per_buffer = x_max / order;
940 /* Add (1 - S) mod (P - 1) dummy runs of length 0. */
942 int n_dummy_runs = mod (1 - rmax, order - 1);
943 debug_printf (("rmax=%d, order=%d, n_dummy_runs=%d\n",
944 rmax, order, n_dummy_runs));
945 assert (n_dummy_runs >= 0);
946 while (n_dummy_runs--)
948 heap_insert (huffman_queue, -2, 0);
953 /* Repeatedly merge the P shortest existing runs until only one run
957 int run_index[MAX_MERGE_ORDER];
958 int run_length[MAX_MERGE_ORDER];
959 int total_run_length = 0;
962 assert (rmax >= order);
964 /* Find the shortest runs; put them in runs[] in reverse order
965 of length, to force dummy runs of length 0 to the end of the
967 debug_printf ((_("merging runs")));
968 for (i = order - 1; i >= 0; i--)
970 run_index[i] = heap_delete (huffman_queue, &run_length[i]);
971 assert (run_index[i] != -1);
972 total_run_length += run_length[i];
973 debug_printf ((" %d(%d)", run_index[i], run_length[i]));
975 debug_printf ((_(" into run %d(%d)\n"), run_no, total_run_length));
977 if (!merge_once (run_index, run_length, order))
981 while (-1 != (index = heap_delete (huffman_queue, NULL)))
983 sprintf (tmp_extname, "%08x", index);
984 if (remove (tmp_basename) != 0)
985 msg (SE, _("%s: Error removing temporary file: %s."),
986 tmp_basename, strerror (errno));
992 if (!heap_insert (huffman_queue, run_no++, total_run_length))
994 msg (SE, _("Out of memory expanding Huffman priority queue."));
1001 /* There should be exactly one element in the priority queue after
1002 all that merging. This represents the entire sorted active file.
1003 So we could find a total case count by deleting this element from
1005 assert (heap_size (huffman_queue) == 1);
1010 /* Merges N_RUNS initial runs into a new run. The jth run for 0 <= j
1011 < N_RUNS is taken from temporary file RUN_INDEX[j]; it is composed
1012 of RUN_LENGTH[j] cases. */
1014 merge_once (int run_index[], int run_length[], int n_runs)
1016 /* For each run, the number of records remaining in its buffer. */
1017 int buffered[MAX_MERGE_ORDER];
1019 /* For each run, the index of the next record in the buffer. */
1020 int buffer_ptr[MAX_MERGE_ORDER];
1022 /* Open input files. */
1026 for (i = 0; i < n_runs; i++)
1027 if (run_index[i] != -2 && !open_handle_r (i, run_index[i]))
1029 /* Close and remove temporary files. */
1033 sprintf (tmp_extname, "%08x", i);
1034 if (remove (tmp_basename) != 0)
1035 msg (SE, _("%s: Error removing temporary file: %s."),
1036 tmp_basename, strerror (errno));
1043 /* Create output file. */
1044 if (!open_handle_w (N_INPUT_BUFFERS, run_no))
1046 msg (SE, _("%s: Error creating temporary file for merge: %s."),
1047 tmp_basename, strerror (errno));
1051 /* Prime each buffer. */
1055 for (i = 0; i < n_runs; i++)
1056 if (run_index[i] == -2)
1064 int ofs = records_per_buffer * i;
1066 buffered[i] = min (records_per_buffer, run_length[i]);
1067 for (j = 0; j < buffered[i]; j++)
1068 if ((int) fread (x[j + ofs]->record, sizeof (union value),
1069 dict_get_value_cnt (default_dict), handle[i])
1070 != dict_get_value_cnt (default_dict))
1072 sprintf (tmp_extname, "%08x", run_index[i]);
1073 if (ferror (handle[i]))
1074 msg (SE, _("%s: Error reading temporary file in merge: %s."),
1075 tmp_basename, strerror (errno));
1077 msg (SE, _("%s: Unexpected end of temporary file in merge."),
1081 buffer_ptr[i] = ofs;
1082 run_length[i] -= buffered[i];
1086 /* Perform the merge proper. */
1087 while (n_runs) /* Loop while some data is left. */
1092 for (i = 1; i < n_runs; i++)
1093 if (compare_record (x[buffer_ptr[min]]->record,
1094 x[buffer_ptr[i]]->record) > 0)
1097 if ((int) fwrite (x[buffer_ptr[min]]->record, sizeof (union value),
1098 dict_get_value_cnt (default_dict),
1099 handle[N_INPUT_BUFFERS])
1100 != dict_get_value_cnt (default_dict))
1102 sprintf (tmp_extname, "%08x", run_index[i]);
1103 msg (SE, _("%s: Error writing temporary file in "
1104 "merge: %s."), tmp_basename, strerror (errno));
1108 /* Remove one case from the buffer for this input file. */
1109 if (--buffered[min] == 0)
1111 /* The input buffer is empty. Do any cases remain in the
1112 initial run on disk? */
1113 if (run_length[min])
1115 /* Yes. Read them in. */
1120 /* Reset the buffer pointer. Note that we can't simply
1121 set it to (i * records_per_buffer) since the run
1122 order might have changed. */
1123 ofs = buffer_ptr[min] -= buffer_ptr[min] % records_per_buffer;
1125 buffered[min] = min (records_per_buffer, run_length[min]);
1126 for (j = 0; j < buffered[min]; j++)
1127 if ((int) fread (x[j + ofs]->record, sizeof (union value),
1128 dict_get_value_cnt (default_dict),
1130 != dict_get_value_cnt (default_dict))
1132 sprintf (tmp_extname, "%08x", run_index[min]);
1133 if (ferror (handle[min]))
1134 msg (SE, _("%s: Error reading temporary file in "
1136 tmp_basename, strerror (errno));
1138 msg (SE, _("%s: Unexpected end of temporary file "
1143 run_length[min] -= buffered[min];
1147 /* No. Delete this run. */
1149 /* Close the file. */
1150 FILE *f = handle[min];
1152 sprintf (tmp_extname, "%08x", run_index[min]);
1153 if (fclose (f) == EOF)
1154 msg (SE, _("%s: Error closing temporary file in merge: "
1155 "%s."), tmp_basename, strerror (errno));
1157 /* Delete the file. */
1158 if (remove (tmp_basename) != 0)
1159 msg (SE, _("%s: Error removing temporary file in merge: "
1160 "%s."), tmp_basename, strerror (errno));
1165 /* Since this isn't the last run, we move the last
1166 run into its spot to force all the runs to be
1168 run_index[min] = run_index[n_runs];
1169 run_length[min] = run_length[n_runs];
1170 buffer_ptr[min] = buffer_ptr[n_runs];
1171 buffered[min] = buffered[n_runs];
1172 handle[min] = handle[n_runs];
1180 /* Close output file. */
1182 FILE *f = handle[N_INPUT_BUFFERS];
1183 handle[N_INPUT_BUFFERS] = NULL;
1184 if (fclose (f) == EOF)
1186 sprintf (tmp_extname, "%08x", run_no);
1187 msg (SE, _("%s: Error closing temporary file in merge: "
1189 tmp_basename, strerror (errno));
1197 /* Close all the input and output files. */
1201 for (i = 0; i < n_runs; i++)
1202 if (run_length[i] != 0)
1205 sprintf (tmp_basename, "%08x", run_index[i]);
1206 if (remove (tmp_basename) != 0)
1207 msg (SE, _("%s: Error removing temporary file: %s."),
1208 tmp_basename, strerror (errno));
1211 close_handle (N_INPUT_BUFFERS);
1212 sprintf (tmp_basename, "%08x", run_no);
1213 if (remove (tmp_basename) != 0)
1214 msg (SE, _("%s: Error removing temporary file: %s."),
1215 tmp_basename, strerror (errno));
1219 /* External sort input program. */
1221 /* Reads all the records from the source stream and passes them
1224 sort_stream_read (void)
1226 read_sort_output (write_case);
1229 /* Reads all the records from the output stream and passes them to the
1230 function provided, which must have an interface identical to
1233 read_sort_output (int (*write_case) (void))
1238 if (separate_case_tab)
1240 struct ccase *save_temp_case = temp_case;
1241 struct case_list **p;
1243 for (p = separate_case_tab; *p; p++)
1245 temp_case = &(*p)->c;
1249 free (separate_case_tab);
1250 separate_case_tab = NULL;
1252 temp_case = save_temp_case;
1254 sprintf (tmp_extname, "%08x", run_no - 1);
1255 f = fopen (tmp_basename, "rb");
1258 msg (ME, _("%s: Cannot open sort result file: %s."), tmp_basename,
1264 for (i = 0; i < vfm_source_info.ncases; i++)
1266 if (!fread (temp_case, vfm_source_info.case_size, 1, f))
1269 msg (ME, _("%s: Error reading sort result file: %s."),
1270 tmp_basename, strerror (errno));
1272 msg (ME, _("%s: Unexpected end of sort result file: %s."),
1273 tmp_basename, strerror (errno));
1282 if (fclose (f) == EOF)
1283 msg (ME, _("%s: Error closing sort result file: %s."), tmp_basename,
1286 if (remove (tmp_basename) != 0)
1287 msg (ME, _("%s: Error removing sort result file: %s."), tmp_basename,
1294 #if 0 /* dead code */
1295 /* Alternate interface to sort_stream_write used for external sorting
1296 when SEPARATE is true. */
1298 write_separate (struct ccase *c)
1300 assert (c == temp_case);
1302 sort_stream_write ();
1307 /* Performs one iteration of 5.4.1R steps R4, R5, R6, R7, R2, and
1310 sort_stream_write (void)
1312 struct repl_sel_tree *t;
1315 memcpy (q->record, temp_case->data, vfm_sink_info.case_size);
1316 if (compare_record (q->record, lastkey) < 0)
1328 || (t->rn == rq && compare_record (t->loser->record, q->record) < 0))
1330 struct repl_sel_tree *temp_tree;
1333 temp_tree = t->loser;
1353 assert (rq <= rmax);
1360 output_record (q->record);
1361 lastkey = x[x_max]->record;
1362 memcpy (lastkey, q->record, vfm_sink_info.case_size);
1366 /* Switches mode from sink to source. */
1368 sort_stream_mode (void)
1370 /* If this is not done, then we get the following source/sink pairs:
1371 source=memory/disk/DATA LIST/etc., sink=SORT; source=SORT,
1372 sink=SORT; which is not good. */
1376 struct case_stream sort_stream =