1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
44 #include <sys/types.h>
51 #include "debug-print.h"
53 /* Variables to sort. */
54 struct variable **v_sort;
57 /* Used when internal-sorting to a separate file. */
58 static struct case_list **separate_case_tab;
60 /* Other prototypes. */
61 static int compare_case_lists (const void *, const void *);
62 static int do_internal_sort (int separate);
63 static int do_external_sort (int separate);
64 int parse_sort_variables (void);
65 void read_sort_output (int (*write_case) (void));
67 /* Performs the SORT CASES procedures. */
71 /* First, just parse the command. */
72 lex_match_id ("SORT");
73 lex_match_id ("CASES");
76 if (!parse_sort_variables ())
81 /* Then it's time to do the actual work. There are two cases:
83 (internal sort) All the data is in memory. In this case, we
84 perform an EXECUTE to get the data into the desired form, then
85 sort the cases in place, if it is still all in memory.
87 (external sort) The data is not in memory. It may be coming from
88 a system file or other data file, etc. In any case, it is now
89 time to perform an external sort. This is better explained in
90 do_external_sort(). */
92 /* Do all this dirty work. */
94 int success = sort_cases (0);
97 return lex_end_of_command ();
103 /* Parses a list of sort variables into v_sort and nv_sort. */
105 parse_sort_variables (void)
111 int prev_nv_sort = nv_sort;
112 int order = SRT_ASCEND;
114 if (!parse_variables (&default_dict, &v_sort, &nv_sort,
115 PV_NO_DUPLICATE | PV_APPEND | PV_NO_SCRATCH))
119 if (lex_match_id ("D") || lex_match_id ("DOWN"))
121 else if (!lex_match_id ("A") && !lex_match_id ("UP"))
124 msg (SE, _("`A' or `D' expected inside parentheses."));
127 if (!lex_match (')'))
130 msg (SE, _("`)' expected."));
134 for (; prev_nv_sort < nv_sort; prev_nv_sort++)
135 v_sort[prev_nv_sort]->p.srt.order = order;
137 while (token != '.' && token != '/');
142 /* Sorts the active file based on the key variables specified in
143 global variables v_sort and nv_sort. The output is either to the
144 active file, if SEPARATE is zero, or to a separate file, if
145 SEPARATE is nonzero. In the latter case the output cases can be
146 read with a call to read_sort_output(). (In the former case the
147 output cases should be dealt with through the usual vfm interface.)
149 The caller is responsible for freeing v_sort[]. */
151 sort_cases (int separate)
153 assert (separate_case_tab == NULL);
155 /* Not sure this is necessary but it's good to be safe. */
156 if (separate && vfm_source == &sort_stream)
157 procedure (NULL, NULL, NULL);
159 /* SORT CASES cancels PROCESS IF. */
160 expr_free (process_if_expr);
161 process_if_expr = NULL;
163 if (do_internal_sort (separate))
167 return do_external_sort (separate);
170 /* If a reasonable situation is set up, do an internal sort of the
171 data. Return success. */
173 do_internal_sort (int separate)
175 if (vfm_source != &vfm_disk_stream)
177 if (vfm_source != &vfm_memory_stream)
178 procedure (NULL, NULL, NULL);
179 if (vfm_source == &vfm_memory_stream)
181 struct case_list **case_tab = malloc (sizeof *case_tab
182 * (vfm_source_info.ncases + 1));
183 if (vfm_source_info.ncases == 0)
188 if (case_tab != NULL)
190 struct case_list *clp = memory_source_cases;
191 struct case_list **ctp = case_tab;
194 for (; clp; clp = clp->next)
196 qsort (case_tab, vfm_source_info.ncases, sizeof *case_tab,
201 memory_source_cases = case_tab[0];
202 for (i = 1; i < vfm_source_info.ncases; i++)
203 case_tab[i - 1]->next = case_tab[i];
204 case_tab[vfm_source_info.ncases - 1]->next = NULL;
207 case_tab[vfm_source_info.ncases] = NULL;
208 separate_case_tab = case_tab;
218 /* Compares the NV_SORT variables in V_SORT[] between the
219 `case_list's at A and B, and returns a strcmp()-type
222 compare_case_lists (const void *a_, const void *b_)
224 struct case_list *const *pa = a_;
225 struct case_list *const *pb = b_;
226 struct case_list *a = *pa;
227 struct case_list *b = *pb;
232 for (i = 0; i < nv_sort; i++)
236 if (v->type == NUMERIC)
238 double af = a->c.data[v->fv].f;
239 double bf = b->c.data[v->fv].f;
241 result = af < bf ? -1 : af > bf;
244 result = memcmp (a->c.data[v->fv].s, b->c.data[v->fv].s, v->width);
250 if (v->p.srt.order == SRT_DESCEND)
257 /* Maximum number of input + output file handles. */
258 #if defined FOPEN_MAX && (FOPEN_MAX - 5 < 18)
259 #define MAX_FILE_HANDLES (FOPEN_MAX - 5)
261 #define MAX_FILE_HANDLES 18
264 #if MAX_FILE_HANDLES < 3
265 #error At least 3 file handles must be available for sorting.
268 /* Number of input buffers. */
269 #define N_INPUT_BUFFERS (MAX_FILE_HANDLES - 1)
271 /* Maximum order of merge. This is the value suggested by Knuth;
272 specifically, he said to use tree selection, which we don't
273 implement, for larger orders of merge. */
274 #define MAX_MERGE_ORDER 7
276 /* Minimum total number of records for buffers. */
277 #define MIN_BUFFER_TOTAL_SIZE_RECS 64
279 /* Minimum single input or output buffer size, in bytes and records. */
280 #define MIN_BUFFER_SIZE_BYTES 4096
281 #define MIN_BUFFER_SIZE_RECS 16
283 /* Structure for replacement selection tree. */
286 struct repl_sel_tree *loser;/* Loser associated w/this internal node. */
287 int rn; /* Run number of `loser'. */
288 struct repl_sel_tree *fe; /* Internal node above this external node. */
289 struct repl_sel_tree *fi; /* Internal node above this internal node. */
290 union value record[1]; /* The case proper. */
293 /* Static variables used for sorting. */
294 static struct repl_sel_tree **x; /* Buffers. */
295 static int x_max; /* Size of buffers, in records. */
296 static int records_per_buffer; /* Number of records in each buffer. */
298 /* In the merge phase, the first N_INPUT_BUFFERS handle[] elements are
299 input files and the last element is the output file. Before that,
300 they're all used as output files, although the last one is
302 static FILE *handle[MAX_FILE_HANDLES]; /* File handles. */
304 /* Now, MAX_FILE_HANDLES is the maximum number of files we will *try*
305 to open. But if we can't open that many, max_handles will be set
306 to the number we apparently can open. */
307 static int max_handles; /* Maximum number of handles. */
309 /* When we create temporary files, they are all put in the same
310 directory and numbered sequentially from zero. tmp_basename is the
311 drive/directory, etc., and tmp_extname can be sprintf() with "%08x"
312 to the file number, then tmp_basename used to open the file. */
313 static char *tmp_basename; /* Temporary file basename. */
314 static char *tmp_extname; /* Temporary file extension name. */
316 /* We use Huffman's method to determine the merge pattern. This means
317 that we need to know which runs are the shortest at any given time.
318 Priority queues as implemented by heap.c are a natural for this
319 task (probably because I wrote the code specifically for it). */
320 static struct heap *huffman_queue; /* Huffman priority queue. */
322 /* Prototypes for helper functions. */
323 static void sort_stream_write (void);
324 static int write_initial_runs (int separate);
325 static int allocate_cases (void);
326 static int allocate_file_handles (void);
327 static int merge (void);
328 static void rmdir_temp_dir (void);
330 /* Performs an external sort of the active file. A description of the
331 procedure follows. All page references refer to Knuth's _Art of
332 Computer Programming, Vol. 3: Sorting and Searching_, which is the
333 canonical resource for sorting.
335 1. The data is read and S initial runs are formed through the
336 action of algorithm 5.4.1R (replacement selection).
338 2. Huffman's method (p. 365-366) is used to determine the optimum
341 3. If an OS that supports overlapped reading, writing, and
342 computing is being run, we should use 5.4.6F for forecasting.
343 Otherwise, buffers are filled only when they run out of data.
344 FIXME: Since the author of PSPP uses GNU/Linux, which does not
345 yet implement overlapped r/w/c, 5.4.6F is not used.
347 4. We perform P-way merges:
349 (a) The desired P is the smallest P such that ceil(ln(S)/ln(P))
350 is minimized. (FIXME: Since I don't have an algorithm for
351 minimizing this, it's just set to MAX_MERGE_ORDER.)
353 (b) P is reduced if the selected value would make input buffers
354 less than 4096 bytes each, or 16 records, whichever is larger.
356 (c) P is reduced if we run out of available file handles or space
359 (d) P is reduced if we don't have space for one or two output
360 buffers, which have the same minimum size as input buffers. (We
361 need two output buffers if 5.4.6F is in use for forecasting.) */
363 do_external_sort (int separate)
367 assert (MAX_FILE_HANDLES >= 3);
372 huffman_queue = heap_create (512);
373 if (huffman_queue == NULL)
376 if (!allocate_cases ())
379 if (!allocate_file_handles ())
382 if (!write_initial_runs (separate))
389 /* Despite the name, flow of control comes here regardless of
390 whether or not the sort is successful. */
392 heap_destroy (huffman_queue);
398 for (i = 0; i <= x_max; i++)
413 /* Sets up to open temporary files. */
414 /* PORTME: This creates a directory for temporary files. Some OSes
415 might not have that concept... */
417 allocate_file_handles (void)
419 const char *dir; /* Directory prefix. */
420 char *buf; /* String buffer. */
421 char *cp; /* Pointer into buf. */
423 dir = getenv ("SPSSTMPDIR");
425 dir = getenv ("SPSSXTMPDIR");
427 dir = getenv ("TMPDIR");
437 dir = getenv ("TEMP");
439 dir = getenv ("TMP");
446 buf = xmalloc (strlen (dir) + 1 + 4 + 8 + 4 + 1 + INT_DIGITS + 1);
447 cp = spprintf (buf, "%s%c%04lX%04lXpspp", dir, DIR_SEPARATOR,
448 ((long) time (0)) & 0xffff, ((long) getpid ()) & 0xffff);
449 if (-1 == mkdir (buf, S_IRWXU))
452 msg (SE, _("%s: Cannot create temporary directory: %s."),
453 buf, strerror (errno));
456 *cp++ = DIR_SEPARATOR;
461 max_handles = MAX_FILE_HANDLES;
466 /* Removes the directory created for temporary files, if one exists.
467 Also frees tmp_basename. */
469 rmdir_temp_dir (void)
471 if (NULL == tmp_basename)
474 tmp_extname[-1] = '\0';
475 if (rmdir (tmp_basename) == -1)
476 msg (SE, _("%s: Error removing directory for temporary files: %s."),
477 tmp_basename, strerror (errno));
482 /* Allocates room for lots of cases as a buffer. */
484 allocate_cases (void)
486 /* This is the size of one case. */
487 const int case_size = (sizeof (struct repl_sel_tree)
488 + sizeof (union value) * (default_dict.nval - 1)
489 + sizeof (struct repl_sel_tree *));
493 /* Allocate as many cases as we can, assuming a space of four
494 void pointers for malloc()'s internal bookkeeping. */
495 x_max = MAX_WORKSPACE / (case_size + 4 * sizeof (void *));
496 x = malloc (sizeof (struct repl_sel_tree *) * x_max);
501 for (i = 0; i < x_max; i++)
503 x[i] = malloc (sizeof (struct repl_sel_tree)
504 + sizeof (union value) * (default_dict.nval - 1));
510 if (x == NULL || x_max < MIN_BUFFER_TOTAL_SIZE_RECS)
516 for (i = 0; i < x_max; i++)
520 msg (SE, _("Out of memory. Could not allocate room for minimum of %d "
521 "cases of %d bytes each. (PSPP workspace is currently "
522 "restricted to a maximum of %d KB.)"),
523 MIN_BUFFER_TOTAL_SIZE_RECS, case_size, MAX_WORKSPACE / 1024);
529 /* The last element of the array is used to store lastkey. */
532 debug_printf ((_("allocated %d cases == %d bytes\n"),
533 x_max, x_max * case_size));
537 /* Replacement selection. */
539 static int rmax, rc, rq;
540 static struct repl_sel_tree *q;
541 static union value *lastkey;
542 static int run_no, file_index;
543 static int deferred_abort;
544 static int run_length;
546 static int compare_record (union value *, union value *);
549 output_record (union value *v)
551 union value *src_case;
556 if (compaction_necessary)
558 compact_case (compaction_case, (struct ccase *) v);
559 src_case = (union value *) compaction_case;
562 src_case = (union value *) v;
564 if ((int) fwrite (src_case, sizeof *src_case, compaction_nval,
569 sprintf (tmp_extname, "%08x", run_no);
570 msg (SE, _("%s: Error writing temporary file: %s."),
571 tmp_basename, strerror (errno));
581 int result = fclose (handle[i]);
582 msg (VM (2), _("SORT: Closing handle %d."), i);
587 sprintf (tmp_extname, "%08x", i);
588 msg (SE, _("%s: Error closing temporary file: %s."),
589 tmp_basename, strerror (errno));
596 close_handles (int beg, int end)
601 for (i = beg; i < end; i++)
602 success &= close_handle (i);
607 open_handle_w (int handle_no, int run_no)
609 sprintf (tmp_extname, "%08x", run_no);
610 msg (VM (1), _("SORT: %s: Opening for writing as run %d."),
611 tmp_basename, run_no);
613 /* The `x' modifier causes the GNU C library to insist on creating a
614 new file--if the file already exists, an error is signaled. The
615 ANSI C standard says that other libraries should ignore anything
616 after the `w+b', so it shouldn't be a problem. */
617 return NULL != (handle[handle_no] = fopen (tmp_basename, "w+bx"));
621 open_handle_r (int handle_no, int run_no)
625 sprintf (tmp_extname, "%08x", run_no);
626 msg (VM (1), _("SORT: %s: Opening for writing as run %d."),
627 tmp_basename, run_no);
628 f = handle[handle_no] = fopen (tmp_basename, "rb");
632 msg (SE, _("%s: Error opening temporary file for reading: %s."),
633 tmp_basename, strerror (errno));
640 /* Begins a new initial run, specifically its output file. */
644 /* Decide which handle[] to use. If run_no is max_handles or
645 greater, then we've run out of handles so it's time to just do
646 one file at a time, which by default is handle 0. */
647 file_index = (run_no < max_handles ? run_no : 0);
650 /* Alright, now create the temporary file. */
651 if (open_handle_w (file_index, run_no) == 0)
653 /* Failure to create the temporary file. Check if there are
654 unacceptably few files already open. */
658 msg (SE, _("%s: Error creating temporary file: %s."),
659 tmp_basename, strerror (errno));
663 /* Close all the open temporary files. */
664 if (!close_handles (0, file_index))
667 /* Now try again to create the temporary file. */
668 max_handles = file_index;
670 if (open_handle_w (0, run_no) == 0)
672 /* It still failed, report it this time. */
674 msg (SE, _("%s: Error creating temporary file: %s."),
675 tmp_basename, strerror (errno));
681 /* Ends the current initial run. Just increments run_no if no initial
682 run has been started yet. */
686 /* Close file handles if necessary. */
690 if (run_no == max_handles - 1)
691 result = close_handles (0, max_handles);
692 else if (run_no >= max_handles)
693 result = close_handle (0);
700 /* Advance to next run. */
703 heap_insert (huffman_queue, run_no - 1, run_length);
706 /* Performs 5.4.1R. */
708 write_initial_runs (int separate)
713 /* Steps R1, R2, R3. */
722 for (j = 0; j < x_max; j++)
724 struct repl_sel_tree *J = x[j];
728 J->fe = x[(x_max + j) / 2];
730 memset (J->record, 0, default_dict.nval * sizeof (union value));
734 /* Most of the iterations of steps R4, R5, R6, R7, R2, R3, ... */
738 vfm_sink->destroy_sink ();
739 vfm_sink = &sort_stream;
741 procedure (NULL, NULL, NULL);
743 /* Final iterations of steps R4, R5, R6, R7, R2, R3, ... */
746 struct repl_sel_tree *t;
760 && compare_record (t->loser->record, q->record) < 0))
762 struct repl_sel_tree *temp_tree;
765 temp_tree = t->loser;
793 output_record (q->record);
794 lastkey = x[x_max]->record;
795 memcpy (lastkey, q->record, sizeof (union value) * vfm_sink_info.nval);
798 assert (run_no == rmax);
800 /* If an unrecoverable error occurred somewhere in the above code,
801 then the `deferred_abort' flag would have been set. */
806 for (i = 0; i < max_handles; i++)
807 if (handle[i] != NULL)
809 sprintf (tmp_extname, "%08x", i);
811 if (fclose (handle[i]) == EOF)
812 msg (SE, _("%s: Error closing temporary file: %s."),
813 tmp_basename, strerror (errno));
815 if (remove (tmp_basename) != 0)
816 msg (SE, _("%s: Error removing temporary file: %s."),
817 tmp_basename, strerror (errno));
827 /* Compares the NV_SORT variables in V_SORT[] between the `value's at
828 A and B, and returns a strcmp()-type result. */
830 compare_record (union value * a, union value * b)
837 if (b == NULL) /* Sort NULLs after everything else. */
840 for (i = 0; i < nv_sort; i++)
844 if (v->type == NUMERIC)
846 if (approx_ne (a[v->fv].f, b[v->fv].f))
848 result = (a[v->fv].f > b[v->fv].f) ? 1 : -1;
854 result = memcmp (a[v->fv].s, b[v->fv].s, v->width);
860 if (v->p.srt.order == SRT_ASCEND)
864 assert (v->p.srt.order == SRT_DESCEND);
871 static int merge_once (int run_index[], int run_length[], int n_runs);
873 /* Modula function as defined by Knuth. */
881 result = abs (x) % abs (y);
887 /* Performs a series of P-way merges of initial runs using Huffman's
892 /* Order of merge. */
896 assert (MIN_BUFFER_SIZE_RECS * 2 <= MIN_BUFFER_TOTAL_SIZE_RECS - 1);
898 /* Close all the input files. I hope that the boundary conditions
899 are correct on this but I'm not sure. */
900 if (run_no < max_handles)
904 for (i = 0; i < run_no; )
905 if (!close_handle (i++))
907 for (; i < run_no; i++)
913 /* Determine order of merge. */
914 order = MAX_MERGE_ORDER;
915 if (x_max / order < MIN_BUFFER_SIZE_RECS)
916 order = x_max / MIN_BUFFER_SIZE_RECS;
917 else if (x_max / order * sizeof (union value) * default_dict.nval
918 < MIN_BUFFER_SIZE_BYTES)
919 order = x_max / (MIN_BUFFER_SIZE_BYTES
920 / (sizeof (union value) * (default_dict.nval - 1)));
922 /* Make sure the order of merge is bounded. */
927 assert (x_max / order > 0);
929 /* Calculate number of records per buffer. */
930 records_per_buffer = x_max / order;
932 /* Add (1 - S) mod (P - 1) dummy runs of length 0. */
934 int n_dummy_runs = mod (1 - rmax, order - 1);
935 debug_printf (("rmax=%d, order=%d, n_dummy_runs=%d\n",
936 rmax, order, n_dummy_runs));
937 assert (n_dummy_runs >= 0);
938 while (n_dummy_runs--)
940 heap_insert (huffman_queue, -2, 0);
945 /* Repeatedly merge the P shortest existing runs until only one run
949 int run_index[MAX_MERGE_ORDER];
950 int run_length[MAX_MERGE_ORDER];
951 int total_run_length = 0;
954 assert (rmax >= order);
956 /* Find the shortest runs; put them in runs[] in reverse order
957 of length, to force dummy runs of length 0 to the end of the
959 debug_printf ((_("merging runs")));
960 for (i = order - 1; i >= 0; i--)
962 run_index[i] = heap_delete (huffman_queue, &run_length[i]);
963 assert (run_index[i] != -1);
964 total_run_length += run_length[i];
965 debug_printf ((" %d(%d)", run_index[i], run_length[i]));
967 debug_printf ((_(" into run %d(%d)\n"), run_no, total_run_length));
969 if (!merge_once (run_index, run_length, order))
973 while (-1 != (index = heap_delete (huffman_queue, NULL)))
975 sprintf (tmp_extname, "%08x", index);
976 if (remove (tmp_basename) != 0)
977 msg (SE, _("%s: Error removing temporary file: %s."),
978 tmp_basename, strerror (errno));
984 if (!heap_insert (huffman_queue, run_no++, total_run_length))
986 msg (SE, _("Out of memory expanding Huffman priority queue."));
993 /* There should be exactly one element in the priority queue after
994 all that merging. This represents the entire sorted active file.
995 So we could find a total case count by deleting this element from
997 assert (heap_size (huffman_queue) == 1);
1002 /* Merges N_RUNS initial runs into a new run. The jth run for 0 <= j
1003 < N_RUNS is taken from temporary file RUN_INDEX[j]; it is composed
1004 of RUN_LENGTH[j] cases. */
1006 merge_once (int run_index[], int run_length[], int n_runs)
1008 /* For each run, the number of records remaining in its buffer. */
1009 int buffered[MAX_MERGE_ORDER];
1011 /* For each run, the index of the next record in the buffer. */
1012 int buffer_ptr[MAX_MERGE_ORDER];
1014 /* Open input files. */
1018 for (i = 0; i < n_runs; i++)
1019 if (run_index[i] != -2 && !open_handle_r (i, run_index[i]))
1021 /* Close and remove temporary files. */
1025 sprintf (tmp_extname, "%08x", i);
1026 if (remove (tmp_basename) != 0)
1027 msg (SE, _("%s: Error removing temporary file: %s."),
1028 tmp_basename, strerror (errno));
1035 /* Create output file. */
1036 if (!open_handle_w (N_INPUT_BUFFERS, run_no))
1038 msg (SE, _("%s: Error creating temporary file for merge: %s."),
1039 tmp_basename, strerror (errno));
1043 /* Prime each buffer. */
1047 for (i = 0; i < n_runs; i++)
1048 if (run_index[i] == -2)
1056 int ofs = records_per_buffer * i;
1058 buffered[i] = min (records_per_buffer, run_length[i]);
1059 for (j = 0; j < buffered[i]; j++)
1060 if ((int) fread (x[j + ofs]->record, sizeof (union value),
1061 default_dict.nval, handle[i])
1062 != default_dict.nval)
1064 sprintf (tmp_extname, "%08x", run_index[i]);
1065 if (ferror (handle[i]))
1066 msg (SE, _("%s: Error reading temporary file in merge: %s."),
1067 tmp_basename, strerror (errno));
1069 msg (SE, _("%s: Unexpected end of temporary file in merge."),
1073 buffer_ptr[i] = ofs;
1074 run_length[i] -= buffered[i];
1078 /* Perform the merge proper. */
1079 while (n_runs) /* Loop while some data is left. */
1084 for (i = 1; i < n_runs; i++)
1085 if (compare_record (x[buffer_ptr[min]]->record,
1086 x[buffer_ptr[i]]->record) > 0)
1089 if ((int) fwrite (x[buffer_ptr[min]]->record, sizeof (union value),
1090 default_dict.nval, handle[N_INPUT_BUFFERS])
1091 != default_dict.nval)
1093 sprintf (tmp_extname, "%08x", run_index[i]);
1094 msg (SE, _("%s: Error writing temporary file in "
1095 "merge: %s."), tmp_basename, strerror (errno));
1099 /* Remove one case from the buffer for this input file. */
1100 if (--buffered[min] == 0)
1102 /* The input buffer is empty. Do any cases remain in the
1103 initial run on disk? */
1104 if (run_length[min])
1106 /* Yes. Read them in. */
1111 /* Reset the buffer pointer. Note that we can't simply
1112 set it to (i * records_per_buffer) since the run
1113 order might have changed. */
1114 ofs = buffer_ptr[min] -= buffer_ptr[min] % records_per_buffer;
1116 buffered[min] = min (records_per_buffer, run_length[min]);
1117 for (j = 0; j < buffered[min]; j++)
1118 if ((int) fread (x[j + ofs]->record, sizeof (union value),
1119 default_dict.nval, handle[min])
1120 != default_dict.nval)
1122 sprintf (tmp_extname, "%08x", run_index[min]);
1123 if (ferror (handle[min]))
1124 msg (SE, _("%s: Error reading temporary file in "
1126 tmp_basename, strerror (errno));
1128 msg (SE, _("%s: Unexpected end of temporary file "
1133 run_length[min] -= buffered[min];
1137 /* No. Delete this run. */
1139 /* Close the file. */
1140 FILE *f = handle[min];
1142 sprintf (tmp_extname, "%08x", run_index[min]);
1143 if (fclose (f) == EOF)
1144 msg (SE, _("%s: Error closing temporary file in merge: "
1145 "%s."), tmp_basename, strerror (errno));
1147 /* Delete the file. */
1148 if (remove (tmp_basename) != 0)
1149 msg (SE, _("%s: Error removing temporary file in merge: "
1150 "%s."), tmp_basename, strerror (errno));
1155 /* Since this isn't the last run, we move the last
1156 run into its spot to force all the runs to be
1158 run_index[min] = run_index[n_runs];
1159 run_length[min] = run_length[n_runs];
1160 buffer_ptr[min] = buffer_ptr[n_runs];
1161 buffered[min] = buffered[n_runs];
1162 handle[min] = handle[n_runs];
1170 /* Close output file. */
1172 FILE *f = handle[N_INPUT_BUFFERS];
1173 handle[N_INPUT_BUFFERS] = NULL;
1174 if (fclose (f) == EOF)
1176 sprintf (tmp_extname, "%08x", run_no);
1177 msg (SE, _("%s: Error closing temporary file in merge: "
1179 tmp_basename, strerror (errno));
1187 /* Close all the input and output files. */
1191 for (i = 0; i < n_runs; i++)
1192 if (run_length[i] != 0)
1195 sprintf (tmp_basename, "%08x", run_index[i]);
1196 if (remove (tmp_basename) != 0)
1197 msg (SE, _("%s: Error removing temporary file: %s."),
1198 tmp_basename, strerror (errno));
1201 close_handle (N_INPUT_BUFFERS);
1202 sprintf (tmp_basename, "%08x", run_no);
1203 if (remove (tmp_basename) != 0)
1204 msg (SE, _("%s: Error removing temporary file: %s."),
1205 tmp_basename, strerror (errno));
1209 /* External sort input program. */
1211 /* Reads all the records from the source stream and passes them
1214 sort_stream_read (void)
1216 read_sort_output (write_case);
1219 /* Reads all the records from the output stream and passes them to the
1220 function provided, which must have an interface identical to
1223 read_sort_output (int (*write_case) (void))
1228 if (separate_case_tab)
1230 struct ccase *save_temp_case = temp_case;
1231 struct case_list **p;
1233 for (p = separate_case_tab; *p; p++)
1235 temp_case = &(*p)->c;
1239 free (separate_case_tab);
1240 separate_case_tab = NULL;
1242 temp_case = save_temp_case;
1244 sprintf (tmp_extname, "%08x", run_no - 1);
1245 f = fopen (tmp_basename, "rb");
1248 msg (ME, _("%s: Cannot open sort result file: %s."), tmp_basename,
1254 for (i = 0; i < vfm_source_info.ncases; i++)
1256 if (!fread (temp_case, vfm_source_info.case_size, 1, f))
1259 msg (ME, _("%s: Error reading sort result file: %s."),
1260 tmp_basename, strerror (errno));
1262 msg (ME, _("%s: Unexpected end of sort result file: %s."),
1263 tmp_basename, strerror (errno));
1272 if (fclose (f) == EOF)
1273 msg (ME, _("%s: Error closing sort result file: %s."), tmp_basename,
1276 if (remove (tmp_basename) != 0)
1277 msg (ME, _("%s: Error removing sort result file: %s."), tmp_basename,
1284 #if 0 /* dead code */
1285 /* Alternate interface to sort_stream_write used for external sorting
1286 when SEPARATE is true. */
1288 write_separate (struct ccase *c)
1290 assert (c == temp_case);
1292 sort_stream_write ();
1297 /* Performs one iteration of 5.4.1R steps R4, R5, R6, R7, R2, and
1300 sort_stream_write (void)
1302 struct repl_sel_tree *t;
1305 memcpy (q->record, temp_case->data, vfm_sink_info.case_size);
1306 if (compare_record (q->record, lastkey) < 0)
1318 || (t->rn == rq && compare_record (t->loser->record, q->record) < 0))
1320 struct repl_sel_tree *temp_tree;
1323 temp_tree = t->loser;
1343 assert (rq <= rmax);
1350 output_record (q->record);
1351 lastkey = x[x_max]->record;
1352 memcpy (lastkey, q->record, vfm_sink_info.case_size);
1356 /* Switches mode from sink to source. */
1358 sort_stream_mode (void)
1360 /* If this is not done, then we get the following source/sink pairs:
1361 source=memory/disk/DATA LIST/etc., sink=SORT; source=SORT,
1362 sink=SORT; which is not good. */
1366 struct case_stream sort_stream =