1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
44 #include <sys/types.h>
51 #include "debug-print.h"
53 /* Variables to sort. */
54 struct variable **v_sort;
57 /* Used when internal-sorting to a separate file. */
58 static struct case_list **separate_case_tab;
60 /* Other prototypes. */
61 static int compare_case_lists (const void *, const void *);
62 static int do_internal_sort (int separate);
63 static int do_external_sort (int separate);
64 int parse_sort_variables (void);
65 void read_sort_output (write_case_func *write_case, write_case_data wc_data);
67 /* Performs the SORT CASES procedures. */
71 /* First, just parse the command. */
72 lex_match_id ("SORT");
73 lex_match_id ("CASES");
76 if (!parse_sort_variables ())
81 /* Then it's time to do the actual work. There are two cases:
83 (internal sort) All the data is in memory. In this case, we
84 perform an EXECUTE to get the data into the desired form, then
85 sort the cases in place, if it is still all in memory.
87 (external sort) The data is not in memory. It may be coming from
88 a system file or other data file, etc. In any case, it is now
89 time to perform an external sort. This is better explained in
90 do_external_sort(). */
92 /* Do all this dirty work. */
94 int success = sort_cases (0);
97 return lex_end_of_command ();
103 /* Parses a list of sort variables into v_sort and nv_sort. */
105 parse_sort_variables (void)
111 int prev_nv_sort = nv_sort;
112 int order = SRT_ASCEND;
114 if (!parse_variables (default_dict, &v_sort, &nv_sort,
115 PV_NO_DUPLICATE | PV_APPEND | PV_NO_SCRATCH))
119 if (lex_match_id ("D") || lex_match_id ("DOWN"))
121 else if (!lex_match_id ("A") && !lex_match_id ("UP"))
124 msg (SE, _("`A' or `D' expected inside parentheses."));
127 if (!lex_match (')'))
130 msg (SE, _("`)' expected."));
134 for (; prev_nv_sort < nv_sort; prev_nv_sort++)
135 v_sort[prev_nv_sort]->p.srt.order = order;
137 while (token != '.' && token != '/');
142 /* Sorts the active file based on the key variables specified in
143 global variables v_sort and nv_sort. The output is either to the
144 active file, if SEPARATE is zero, or to a separate file, if
145 SEPARATE is nonzero. In the latter case the output cases can be
146 read with a call to read_sort_output(). (In the former case the
147 output cases should be dealt with through the usual vfm interface.)
149 The caller is responsible for freeing v_sort[]. */
151 sort_cases (int separate)
153 assert (separate_case_tab == NULL);
155 /* Not sure this is necessary but it's good to be safe. */
156 if (separate && vfm_source == &sort_stream)
157 procedure (NULL, NULL, NULL, NULL);
159 /* SORT CASES cancels PROCESS IF. */
160 expr_free (process_if_expr);
161 process_if_expr = NULL;
163 if (do_internal_sort (separate))
167 return do_external_sort (separate);
170 /* If a reasonable situation is set up, do an internal sort of the
171 data. Return success. */
173 do_internal_sort (int separate)
175 if (vfm_source != &vfm_disk_stream)
177 if (vfm_source != &vfm_memory_stream)
178 procedure (NULL, NULL, NULL, NULL);
179 if (vfm_source == &vfm_memory_stream)
181 struct case_list **case_tab = malloc (sizeof *case_tab
182 * (vfm_source_info.ncases + 1));
183 if (vfm_source_info.ncases == 0)
188 if (case_tab != NULL)
190 struct case_list *clp = memory_source_cases;
191 struct case_list **ctp = case_tab;
194 for (; clp; clp = clp->next)
196 qsort (case_tab, vfm_source_info.ncases, sizeof *case_tab,
201 memory_source_cases = case_tab[0];
202 for (i = 1; i < vfm_source_info.ncases; i++)
203 case_tab[i - 1]->next = case_tab[i];
204 case_tab[vfm_source_info.ncases - 1]->next = NULL;
207 case_tab[vfm_source_info.ncases] = NULL;
208 separate_case_tab = case_tab;
218 /* Compares the NV_SORT variables in V_SORT[] between the
219 `case_list's at A and B, and returns a strcmp()-type
222 compare_case_lists (const void *a_, const void *b_)
224 struct case_list *const *pa = a_;
225 struct case_list *const *pb = b_;
226 struct case_list *a = *pa;
227 struct case_list *b = *pb;
232 for (i = 0; i < nv_sort; i++)
236 if (v->type == NUMERIC)
238 double af = a->c.data[v->fv].f;
239 double bf = b->c.data[v->fv].f;
241 result = af < bf ? -1 : af > bf;
244 result = memcmp (a->c.data[v->fv].s, b->c.data[v->fv].s, v->width);
250 if (v->p.srt.order == SRT_DESCEND)
257 /* Maximum number of input + output file handles. */
258 #if defined FOPEN_MAX && (FOPEN_MAX - 5 < 18)
259 #define MAX_FILE_HANDLES (FOPEN_MAX - 5)
261 #define MAX_FILE_HANDLES 18
264 #if MAX_FILE_HANDLES < 3
265 #error At least 3 file handles must be available for sorting.
268 /* Number of input buffers. */
269 #define N_INPUT_BUFFERS (MAX_FILE_HANDLES - 1)
271 /* Maximum order of merge. This is the value suggested by Knuth;
272 specifically, he said to use tree selection, which we don't
273 implement, for larger orders of merge. */
274 #define MAX_MERGE_ORDER 7
276 /* Minimum total number of records for buffers. */
277 #define MIN_BUFFER_TOTAL_SIZE_RECS 64
279 /* Minimum single input or output buffer size, in bytes and records. */
280 #define MIN_BUFFER_SIZE_BYTES 4096
281 #define MIN_BUFFER_SIZE_RECS 16
283 /* Structure for replacement selection tree. */
286 struct repl_sel_tree *loser;/* Loser associated w/this internal node. */
287 int rn; /* Run number of `loser'. */
288 struct repl_sel_tree *fe; /* Internal node above this external node. */
289 struct repl_sel_tree *fi; /* Internal node above this internal node. */
290 union value record[1]; /* The case proper. */
293 /* Static variables used for sorting. */
294 static struct repl_sel_tree **x; /* Buffers. */
295 static int x_max; /* Size of buffers, in records. */
296 static int records_per_buffer; /* Number of records in each buffer. */
298 /* In the merge phase, the first N_INPUT_BUFFERS handle[] elements are
299 input files and the last element is the output file. Before that,
300 they're all used as output files, although the last one is
302 static FILE *handle[MAX_FILE_HANDLES]; /* File handles. */
304 /* Now, MAX_FILE_HANDLES is the maximum number of files we will *try*
305 to open. But if we can't open that many, max_handles will be set
306 to the number we apparently can open. */
307 static int max_handles; /* Maximum number of handles. */
309 /* When we create temporary files, they are all put in the same
310 directory and numbered sequentially from zero. tmp_basename is the
311 drive/directory, etc., and tmp_extname can be sprintf() with "%08x"
312 to the file number, then tmp_basename used to open the file. */
313 static char *tmp_basename; /* Temporary file basename. */
314 static char *tmp_extname; /* Temporary file extension name. */
316 /* We use Huffman's method to determine the merge pattern. This means
317 that we need to know which runs are the shortest at any given time.
318 Priority queues as implemented by heap.c are a natural for this
319 task (probably because I wrote the code specifically for it). */
320 static struct heap *huffman_queue; /* Huffman priority queue. */
322 /* Prototypes for helper functions. */
323 static void sort_stream_write (void);
324 static int write_initial_runs (int separate);
325 static int allocate_cases (void);
326 static int allocate_file_handles (void);
327 static int merge (void);
328 static void rmdir_temp_dir (void);
330 /* Performs an external sort of the active file. A description of the
331 procedure follows. All page references refer to Knuth's _Art of
332 Computer Programming, Vol. 3: Sorting and Searching_, which is the
333 canonical resource for sorting.
335 1. The data is read and S initial runs are formed through the
336 action of algorithm 5.4.1R (replacement selection).
338 2. Huffman's method (p. 365-366) is used to determine the optimum
341 3. If an OS that supports overlapped reading, writing, and
342 computing is being run, we should use 5.4.6F for forecasting.
343 Otherwise, buffers are filled only when they run out of data.
344 FIXME: Since the author of PSPP uses GNU/Linux, which does not
345 yet implement overlapped r/w/c, 5.4.6F is not used.
347 4. We perform P-way merges:
349 (a) The desired P is the smallest P such that ceil(ln(S)/ln(P))
350 is minimized. (FIXME: Since I don't have an algorithm for
351 minimizing this, it's just set to MAX_MERGE_ORDER.)
353 (b) P is reduced if the selected value would make input buffers
354 less than 4096 bytes each, or 16 records, whichever is larger.
356 (c) P is reduced if we run out of available file handles or space
359 (d) P is reduced if we don't have space for one or two output
360 buffers, which have the same minimum size as input buffers. (We
361 need two output buffers if 5.4.6F is in use for forecasting.) */
363 do_external_sort (int separate)
367 assert (MAX_FILE_HANDLES >= 3);
372 huffman_queue = heap_create (512);
373 if (huffman_queue == NULL)
376 if (!allocate_cases ())
379 if (!allocate_file_handles ())
382 if (!write_initial_runs (separate))
389 /* Despite the name, flow of control comes here regardless of
390 whether or not the sort is successful. */
392 heap_destroy (huffman_queue);
398 for (i = 0; i <= x_max; i++)
413 /* Sets up to open temporary files. */
414 /* PORTME: This creates a directory for temporary files. Some OSes
415 might not have that concept... */
417 allocate_file_handles (void)
419 const char *dir; /* Directory prefix. */
420 char *buf; /* String buffer. */
421 char *cp; /* Pointer into buf. */
423 dir = getenv ("SPSSTMPDIR");
425 dir = getenv ("SPSSXTMPDIR");
427 dir = getenv ("TMPDIR");
435 #elif defined (__MSDOS__)
437 dir = getenv ("TEMP");
439 dir = getenv ("TMP");
446 buf = xmalloc (strlen (dir) + 1 + 4 + 8 + 4 + 1 + INT_DIGITS + 1);
447 cp = spprintf (buf, "%s%c%04lX%04lXpspp", dir, DIR_SEPARATOR,
448 ((long) time (0)) & 0xffff, ((long) getpid ()) & 0xffff);
450 if (-1 == mkdir (buf, S_IRWXU))
452 if (-1 == mkdir (buf))
456 msg (SE, _("%s: Cannot create temporary directory: %s."),
457 buf, strerror (errno));
460 *cp++ = DIR_SEPARATOR;
465 max_handles = MAX_FILE_HANDLES;
470 /* Removes the directory created for temporary files, if one exists.
471 Also frees tmp_basename. */
473 rmdir_temp_dir (void)
475 if (NULL == tmp_basename)
478 tmp_extname[-1] = '\0';
479 if (rmdir (tmp_basename) == -1)
480 msg (SE, _("%s: Error removing directory for temporary files: %s."),
481 tmp_basename, strerror (errno));
486 /* Allocates room for lots of cases as a buffer. */
488 allocate_cases (void)
490 /* This is the size of one case. */
491 const int case_size = (sizeof (struct repl_sel_tree)
492 + dict_get_case_size (default_dict)
493 - sizeof (union value)
494 + sizeof (struct repl_sel_tree *));
498 /* Allocate as many cases as we can, assuming a space of four
499 void pointers for malloc()'s internal bookkeeping. */
500 x_max = MAX_WORKSPACE / (case_size + 4 * sizeof (void *));
501 x = malloc (sizeof (struct repl_sel_tree *) * x_max);
506 for (i = 0; i < x_max; i++)
508 x[i] = malloc (sizeof (struct repl_sel_tree)
509 + dict_get_case_size (default_dict)
510 - sizeof (union value));
516 if (x == NULL || x_max < MIN_BUFFER_TOTAL_SIZE_RECS)
522 for (i = 0; i < x_max; i++)
526 msg (SE, _("Out of memory. Could not allocate room for minimum of %d "
527 "cases of %d bytes each. (PSPP workspace is currently "
528 "restricted to a maximum of %d KB.)"),
529 MIN_BUFFER_TOTAL_SIZE_RECS, case_size, MAX_WORKSPACE / 1024);
535 /* The last element of the array is used to store lastkey. */
538 debug_printf ((_("allocated %d cases == %d bytes\n"),
539 x_max, x_max * case_size));
543 /* Replacement selection. */
545 static int rmax, rc, rq;
546 static struct repl_sel_tree *q;
547 static union value *lastkey;
548 static int run_no, file_index;
549 static int deferred_abort;
550 static int run_length;
552 static int compare_record (union value *, union value *);
555 output_record (union value *v)
557 union value *src_case;
562 if (compaction_necessary)
564 compact_case (compaction_case, (struct ccase *) v);
565 src_case = (union value *) compaction_case;
568 src_case = (union value *) v;
570 if ((int) fwrite (src_case, sizeof *src_case, compaction_nval,
575 sprintf (tmp_extname, "%08x", run_no);
576 msg (SE, _("%s: Error writing temporary file: %s."),
577 tmp_basename, strerror (errno));
587 int result = fclose (handle[i]);
588 msg (VM (2), _("SORT: Closing handle %d."), i);
593 sprintf (tmp_extname, "%08x", i);
594 msg (SE, _("%s: Error closing temporary file: %s."),
595 tmp_basename, strerror (errno));
602 close_handles (int beg, int end)
607 for (i = beg; i < end; i++)
608 success &= close_handle (i);
613 open_handle_w (int handle_no, int run_no)
615 sprintf (tmp_extname, "%08x", run_no);
616 msg (VM (1), _("SORT: %s: Opening for writing as run %d."),
617 tmp_basename, run_no);
619 /* The `x' modifier causes the GNU C library to insist on creating a
620 new file--if the file already exists, an error is signaled. The
621 ANSI C standard says that other libraries should ignore anything
622 after the `w+b', so it shouldn't be a problem. */
623 return NULL != (handle[handle_no] = fopen (tmp_basename, "w+bx"));
627 open_handle_r (int handle_no, int run_no)
631 sprintf (tmp_extname, "%08x", run_no);
632 msg (VM (1), _("SORT: %s: Opening for writing as run %d."),
633 tmp_basename, run_no);
634 f = handle[handle_no] = fopen (tmp_basename, "rb");
638 msg (SE, _("%s: Error opening temporary file for reading: %s."),
639 tmp_basename, strerror (errno));
646 /* Begins a new initial run, specifically its output file. */
650 /* Decide which handle[] to use. If run_no is max_handles or
651 greater, then we've run out of handles so it's time to just do
652 one file at a time, which by default is handle 0. */
653 file_index = (run_no < max_handles ? run_no : 0);
656 /* Alright, now create the temporary file. */
657 if (open_handle_w (file_index, run_no) == 0)
659 /* Failure to create the temporary file. Check if there are
660 unacceptably few files already open. */
664 msg (SE, _("%s: Error creating temporary file: %s."),
665 tmp_basename, strerror (errno));
669 /* Close all the open temporary files. */
670 if (!close_handles (0, file_index))
673 /* Now try again to create the temporary file. */
674 max_handles = file_index;
676 if (open_handle_w (0, run_no) == 0)
678 /* It still failed, report it this time. */
680 msg (SE, _("%s: Error creating temporary file: %s."),
681 tmp_basename, strerror (errno));
687 /* Ends the current initial run. Just increments run_no if no initial
688 run has been started yet. */
692 /* Close file handles if necessary. */
696 if (run_no == max_handles - 1)
697 result = close_handles (0, max_handles);
698 else if (run_no >= max_handles)
699 result = close_handle (0);
706 /* Advance to next run. */
709 heap_insert (huffman_queue, run_no - 1, run_length);
712 /* Performs 5.4.1R. */
714 write_initial_runs (int separate)
719 /* Steps R1, R2, R3. */
728 for (j = 0; j < x_max; j++)
730 struct repl_sel_tree *J = x[j];
734 J->fe = x[(x_max + j) / 2];
736 memset (J->record, 0, dict_get_case_size (default_dict));
740 /* Most of the iterations of steps R4, R5, R6, R7, R2, R3, ... */
744 vfm_sink->destroy_sink ();
745 vfm_sink = &sort_stream;
747 procedure (NULL, NULL, NULL, NULL);
749 /* Final iterations of steps R4, R5, R6, R7, R2, R3, ... */
752 struct repl_sel_tree *t;
766 && compare_record (t->loser->record, q->record) < 0))
768 struct repl_sel_tree *temp_tree;
771 temp_tree = t->loser;
799 output_record (q->record);
800 lastkey = x[x_max]->record;
801 memcpy (lastkey, q->record, sizeof (union value) * vfm_sink_info.nval);
804 assert (run_no == rmax);
806 /* If an unrecoverable error occurred somewhere in the above code,
807 then the `deferred_abort' flag would have been set. */
812 for (i = 0; i < max_handles; i++)
813 if (handle[i] != NULL)
815 sprintf (tmp_extname, "%08x", i);
817 if (fclose (handle[i]) == EOF)
818 msg (SE, _("%s: Error closing temporary file: %s."),
819 tmp_basename, strerror (errno));
821 if (remove (tmp_basename) != 0)
822 msg (SE, _("%s: Error removing temporary file: %s."),
823 tmp_basename, strerror (errno));
833 /* Compares the NV_SORT variables in V_SORT[] between the `value's at
834 A and B, and returns a strcmp()-type result. */
836 compare_record (union value * a, union value * b)
843 if (b == NULL) /* Sort NULLs after everything else. */
846 for (i = 0; i < nv_sort; i++)
850 if (v->type == NUMERIC)
852 if (approx_ne (a[v->fv].f, b[v->fv].f))
854 result = (a[v->fv].f > b[v->fv].f) ? 1 : -1;
860 result = memcmp (a[v->fv].s, b[v->fv].s, v->width);
866 if (v->p.srt.order == SRT_ASCEND)
870 assert (v->p.srt.order == SRT_DESCEND);
877 static int merge_once (int run_index[], int run_length[], int n_runs);
879 /* Modula function as defined by Knuth. */
887 result = abs (x) % abs (y);
893 /* Performs a series of P-way merges of initial runs using Huffman's
898 /* Order of merge. */
902 assert (MIN_BUFFER_SIZE_RECS * 2 <= MIN_BUFFER_TOTAL_SIZE_RECS - 1);
904 /* Close all the input files. I hope that the boundary conditions
905 are correct on this but I'm not sure. */
906 if (run_no < max_handles)
910 for (i = 0; i < run_no; )
911 if (!close_handle (i++))
913 for (; i < run_no; i++)
919 /* Determine order of merge. */
920 order = MAX_MERGE_ORDER;
921 if (x_max / order < MIN_BUFFER_SIZE_RECS)
922 order = x_max / MIN_BUFFER_SIZE_RECS;
923 else if (x_max / order * dict_get_case_size (default_dict)
924 < MIN_BUFFER_SIZE_BYTES)
925 order = x_max / (MIN_BUFFER_SIZE_BYTES
926 / dict_get_case_size (default_dict));
928 /* Make sure the order of merge is bounded. */
933 assert (x_max / order > 0);
935 /* Calculate number of records per buffer. */
936 records_per_buffer = x_max / order;
938 /* Add (1 - S) mod (P - 1) dummy runs of length 0. */
940 int n_dummy_runs = mod (1 - rmax, order - 1);
941 debug_printf (("rmax=%d, order=%d, n_dummy_runs=%d\n",
942 rmax, order, n_dummy_runs));
943 assert (n_dummy_runs >= 0);
944 while (n_dummy_runs--)
946 heap_insert (huffman_queue, -2, 0);
951 /* Repeatedly merge the P shortest existing runs until only one run
955 int run_index[MAX_MERGE_ORDER];
956 int run_length[MAX_MERGE_ORDER];
957 int total_run_length = 0;
960 assert (rmax >= order);
962 /* Find the shortest runs; put them in runs[] in reverse order
963 of length, to force dummy runs of length 0 to the end of the
965 debug_printf ((_("merging runs")));
966 for (i = order - 1; i >= 0; i--)
968 run_index[i] = heap_delete (huffman_queue, &run_length[i]);
969 assert (run_index[i] != -1);
970 total_run_length += run_length[i];
971 debug_printf ((" %d(%d)", run_index[i], run_length[i]));
973 debug_printf ((_(" into run %d(%d)\n"), run_no, total_run_length));
975 if (!merge_once (run_index, run_length, order))
979 while (-1 != (index = heap_delete (huffman_queue, NULL)))
981 sprintf (tmp_extname, "%08x", index);
982 if (remove (tmp_basename) != 0)
983 msg (SE, _("%s: Error removing temporary file: %s."),
984 tmp_basename, strerror (errno));
990 if (!heap_insert (huffman_queue, run_no++, total_run_length))
992 msg (SE, _("Out of memory expanding Huffman priority queue."));
999 /* There should be exactly one element in the priority queue after
1000 all that merging. This represents the entire sorted active file.
1001 So we could find a total case count by deleting this element from
1003 assert (heap_size (huffman_queue) == 1);
1008 /* Merges N_RUNS initial runs into a new run. The jth run for 0 <= j
1009 < N_RUNS is taken from temporary file RUN_INDEX[j]; it is composed
1010 of RUN_LENGTH[j] cases. */
1012 merge_once (int run_index[], int run_length[], int n_runs)
1014 /* For each run, the number of records remaining in its buffer. */
1015 int buffered[MAX_MERGE_ORDER];
1017 /* For each run, the index of the next record in the buffer. */
1018 int buffer_ptr[MAX_MERGE_ORDER];
1020 /* Open input files. */
1024 for (i = 0; i < n_runs; i++)
1025 if (run_index[i] != -2 && !open_handle_r (i, run_index[i]))
1027 /* Close and remove temporary files. */
1031 sprintf (tmp_extname, "%08x", i);
1032 if (remove (tmp_basename) != 0)
1033 msg (SE, _("%s: Error removing temporary file: %s."),
1034 tmp_basename, strerror (errno));
1041 /* Create output file. */
1042 if (!open_handle_w (N_INPUT_BUFFERS, run_no))
1044 msg (SE, _("%s: Error creating temporary file for merge: %s."),
1045 tmp_basename, strerror (errno));
1049 /* Prime each buffer. */
1053 for (i = 0; i < n_runs; i++)
1054 if (run_index[i] == -2)
1062 int ofs = records_per_buffer * i;
1064 buffered[i] = min (records_per_buffer, run_length[i]);
1065 for (j = 0; j < buffered[i]; j++)
1066 if (fread (x[j + ofs]->record,
1067 dict_get_case_size (default_dict), 1, handle[i]) != 1)
1069 sprintf (tmp_extname, "%08x", run_index[i]);
1070 if (ferror (handle[i]))
1071 msg (SE, _("%s: Error reading temporary file in merge: %s."),
1072 tmp_basename, strerror (errno));
1074 msg (SE, _("%s: Unexpected end of temporary file in merge."),
1078 buffer_ptr[i] = ofs;
1079 run_length[i] -= buffered[i];
1083 /* Perform the merge proper. */
1084 while (n_runs) /* Loop while some data is left. */
1089 for (i = 1; i < n_runs; i++)
1090 if (compare_record (x[buffer_ptr[min]]->record,
1091 x[buffer_ptr[i]]->record) > 0)
1094 if (fwrite (x[buffer_ptr[min]]->record,
1095 dict_get_case_size (default_dict), 1,
1096 handle[N_INPUT_BUFFERS]) != 1)
1098 sprintf (tmp_extname, "%08x", run_index[i]);
1099 msg (SE, _("%s: Error writing temporary file in "
1100 "merge: %s."), tmp_basename, strerror (errno));
1104 /* Remove one case from the buffer for this input file. */
1105 if (--buffered[min] == 0)
1107 /* The input buffer is empty. Do any cases remain in the
1108 initial run on disk? */
1109 if (run_length[min])
1111 /* Yes. Read them in. */
1116 /* Reset the buffer pointer. Note that we can't simply
1117 set it to (i * records_per_buffer) since the run
1118 order might have changed. */
1119 ofs = buffer_ptr[min] -= buffer_ptr[min] % records_per_buffer;
1121 buffered[min] = min (records_per_buffer, run_length[min]);
1122 for (j = 0; j < buffered[min]; j++)
1123 if (fread (x[j + ofs]->record,
1124 dict_get_case_size (default_dict), 1, handle[min])
1127 sprintf (tmp_extname, "%08x", run_index[min]);
1128 if (ferror (handle[min]))
1129 msg (SE, _("%s: Error reading temporary file in "
1131 tmp_basename, strerror (errno));
1133 msg (SE, _("%s: Unexpected end of temporary file "
1138 run_length[min] -= buffered[min];
1142 /* No. Delete this run. */
1144 /* Close the file. */
1145 FILE *f = handle[min];
1147 sprintf (tmp_extname, "%08x", run_index[min]);
1148 if (fclose (f) == EOF)
1149 msg (SE, _("%s: Error closing temporary file in merge: "
1150 "%s."), tmp_basename, strerror (errno));
1152 /* Delete the file. */
1153 if (remove (tmp_basename) != 0)
1154 msg (SE, _("%s: Error removing temporary file in merge: "
1155 "%s."), tmp_basename, strerror (errno));
1160 /* Since this isn't the last run, we move the last
1161 run into its spot to force all the runs to be
1163 run_index[min] = run_index[n_runs];
1164 run_length[min] = run_length[n_runs];
1165 buffer_ptr[min] = buffer_ptr[n_runs];
1166 buffered[min] = buffered[n_runs];
1167 handle[min] = handle[n_runs];
1175 /* Close output file. */
1177 FILE *f = handle[N_INPUT_BUFFERS];
1178 handle[N_INPUT_BUFFERS] = NULL;
1179 if (fclose (f) == EOF)
1181 sprintf (tmp_extname, "%08x", run_no);
1182 msg (SE, _("%s: Error closing temporary file in merge: "
1184 tmp_basename, strerror (errno));
1192 /* Close all the input and output files. */
1196 for (i = 0; i < n_runs; i++)
1197 if (run_length[i] != 0)
1200 sprintf (tmp_basename, "%08x", run_index[i]);
1201 if (remove (tmp_basename) != 0)
1202 msg (SE, _("%s: Error removing temporary file: %s."),
1203 tmp_basename, strerror (errno));
1206 close_handle (N_INPUT_BUFFERS);
1207 sprintf (tmp_basename, "%08x", run_no);
1208 if (remove (tmp_basename) != 0)
1209 msg (SE, _("%s: Error removing temporary file: %s."),
1210 tmp_basename, strerror (errno));
1214 /* External sort input program. */
1216 /* Reads all the records from the source stream and passes them
1219 sort_stream_read (write_case_func *write_case, write_case_data wc_data)
1221 read_sort_output (write_case, wc_data);
1224 /* Reads all the records from the output stream and passes them to the
1225 function provided, which must have an interface identical to
1228 read_sort_output (write_case_func *write_case, write_case_data wc_data)
1233 if (separate_case_tab)
1235 struct ccase *save_temp_case = temp_case;
1236 struct case_list **p;
1238 for (p = separate_case_tab; *p; p++)
1240 temp_case = &(*p)->c;
1241 write_case (wc_data);
1244 free (separate_case_tab);
1245 separate_case_tab = NULL;
1247 temp_case = save_temp_case;
1249 sprintf (tmp_extname, "%08x", run_no - 1);
1250 f = fopen (tmp_basename, "rb");
1253 msg (ME, _("%s: Cannot open sort result file: %s."), tmp_basename,
1259 for (i = 0; i < vfm_source_info.ncases; i++)
1261 if (!fread (temp_case, vfm_source_info.case_size, 1, f))
1264 msg (ME, _("%s: Error reading sort result file: %s."),
1265 tmp_basename, strerror (errno));
1267 msg (ME, _("%s: Unexpected end of sort result file: %s."),
1268 tmp_basename, strerror (errno));
1273 if (!write_case (wc_data))
1277 if (fclose (f) == EOF)
1278 msg (ME, _("%s: Error closing sort result file: %s."), tmp_basename,
1281 if (remove (tmp_basename) != 0)
1282 msg (ME, _("%s: Error removing sort result file: %s."), tmp_basename,
1289 #if 0 /* dead code */
1290 /* Alternate interface to sort_stream_write used for external sorting
1291 when SEPARATE is true. */
1293 write_separate (struct ccase *c)
1295 assert (c == temp_case);
1297 sort_stream_write ();
1302 /* Performs one iteration of 5.4.1R steps R4, R5, R6, R7, R2, and
1305 sort_stream_write (void)
1307 struct repl_sel_tree *t;
1310 memcpy (q->record, temp_case->data, vfm_sink_info.case_size);
1311 if (compare_record (q->record, lastkey) < 0)
1323 || (t->rn == rq && compare_record (t->loser->record, q->record) < 0))
1325 struct repl_sel_tree *temp_tree;
1328 temp_tree = t->loser;
1348 assert (rq <= rmax);
1355 output_record (q->record);
1356 lastkey = x[x_max]->record;
1357 memcpy (lastkey, q->record, vfm_sink_info.case_size);
1361 /* Switches mode from sink to source. */
1363 sort_stream_mode (void)
1365 /* If this is not done, then we get the following source/sink pairs:
1366 source=memory/disk/DATA LIST/etc., sink=SORT; source=SORT,
1367 sink=SORT; which is not good. */
1371 struct case_stream sort_stream =