1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
43 #include <sys/types.h>
50 #include "debug-print.h"
52 /* Variables to sort. */
53 struct variable **v_sort;
56 /* Used when internal-sorting to a separate file. */
57 static struct case_list **separate_case_tab;
59 /* Other prototypes. */
60 static int compare_case_lists (const void *, const void *);
61 static int do_internal_sort (int separate);
62 static int do_external_sort (int separate);
63 int parse_sort_variables (void);
64 void read_sort_output (write_case_func *write_case, write_case_data wc_data);
66 /* Performs the SORT CASES procedures. */
70 /* First, just parse the command. */
71 lex_match_id ("SORT");
72 lex_match_id ("CASES");
75 if (!parse_sort_variables ())
80 /* Then it's time to do the actual work. There are two cases:
82 (internal sort) All the data is in memory. In this case, we
83 perform an EXECUTE to get the data into the desired form, then
84 sort the cases in place, if it is still all in memory.
86 (external sort) The data is not in memory. It may be coming from
87 a system file or other data file, etc. In any case, it is now
88 time to perform an external sort. This is better explained in
89 do_external_sort(). */
91 /* Do all this dirty work. */
93 int success = sort_cases (0);
96 return lex_end_of_command ();
102 /* Parses a list of sort variables into v_sort and nv_sort. */
104 parse_sort_variables (void)
110 int prev_nv_sort = nv_sort;
111 int order = SRT_ASCEND;
113 if (!parse_variables (default_dict, &v_sort, &nv_sort,
114 PV_NO_DUPLICATE | PV_APPEND | PV_NO_SCRATCH))
118 if (lex_match_id ("D") || lex_match_id ("DOWN"))
120 else if (!lex_match_id ("A") && !lex_match_id ("UP"))
123 msg (SE, _("`A' or `D' expected inside parentheses."));
126 if (!lex_match (')'))
129 msg (SE, _("`)' expected."));
133 for (; prev_nv_sort < nv_sort; prev_nv_sort++)
134 v_sort[prev_nv_sort]->p.srt.order = order;
136 while (token != '.' && token != '/');
141 /* Sorts the active file based on the key variables specified in
142 global variables v_sort and nv_sort. The output is either to the
143 active file, if SEPARATE is zero, or to a separate file, if
144 SEPARATE is nonzero. In the latter case the output cases can be
145 read with a call to read_sort_output(). (In the former case the
146 output cases should be dealt with through the usual vfm interface.)
148 The caller is responsible for freeing v_sort[]. */
150 sort_cases (int separate)
152 assert (separate_case_tab == NULL);
154 /* Not sure this is necessary but it's good to be safe. */
155 if (separate && vfm_source == &sort_stream)
156 procedure (NULL, NULL, NULL, NULL);
158 /* SORT CASES cancels PROCESS IF. */
159 expr_free (process_if_expr);
160 process_if_expr = NULL;
162 if (do_internal_sort (separate))
166 return do_external_sort (separate);
169 /* If a reasonable situation is set up, do an internal sort of the
170 data. Return success. */
172 do_internal_sort (int separate)
174 if (vfm_source != &vfm_disk_stream)
176 if (vfm_source != &vfm_memory_stream)
177 procedure (NULL, NULL, NULL, NULL);
178 if (vfm_source == &vfm_memory_stream)
180 struct case_list **case_tab = malloc (sizeof *case_tab
181 * (vfm_source_info.ncases + 1));
182 if (vfm_source_info.ncases == 0)
187 if (case_tab != NULL)
189 struct case_list *clp = memory_source_cases;
190 struct case_list **ctp = case_tab;
193 for (; clp; clp = clp->next)
195 qsort (case_tab, vfm_source_info.ncases, sizeof *case_tab,
200 memory_source_cases = case_tab[0];
201 for (i = 1; i < vfm_source_info.ncases; i++)
202 case_tab[i - 1]->next = case_tab[i];
203 case_tab[vfm_source_info.ncases - 1]->next = NULL;
206 case_tab[vfm_source_info.ncases] = NULL;
207 separate_case_tab = case_tab;
217 /* Compares the NV_SORT variables in V_SORT[] between the
218 `case_list's at A and B, and returns a strcmp()-type
221 compare_case_lists (const void *a_, const void *b_)
223 struct case_list *const *pa = a_;
224 struct case_list *const *pb = b_;
225 struct case_list *a = *pa;
226 struct case_list *b = *pb;
231 for (i = 0; i < nv_sort; i++)
235 if (v->type == NUMERIC)
237 double af = a->c.data[v->fv].f;
238 double bf = b->c.data[v->fv].f;
240 result = af < bf ? -1 : af > bf;
243 result = memcmp (a->c.data[v->fv].s, b->c.data[v->fv].s, v->width);
249 if (v->p.srt.order == SRT_DESCEND)
256 /* Maximum number of input + output file handles. */
257 #if defined FOPEN_MAX && (FOPEN_MAX - 5 < 18)
258 #define MAX_FILE_HANDLES (FOPEN_MAX - 5)
260 #define MAX_FILE_HANDLES 18
263 #if MAX_FILE_HANDLES < 3
264 #error At least 3 file handles must be available for sorting.
267 /* Number of input buffers. */
268 #define N_INPUT_BUFFERS (MAX_FILE_HANDLES - 1)
270 /* Maximum order of merge. This is the value suggested by Knuth;
271 specifically, he said to use tree selection, which we don't
272 implement, for larger orders of merge. */
273 #define MAX_MERGE_ORDER 7
275 /* Minimum total number of records for buffers. */
276 #define MIN_BUFFER_TOTAL_SIZE_RECS 64
278 /* Minimum single input or output buffer size, in bytes and records. */
279 #define MIN_BUFFER_SIZE_BYTES 4096
280 #define MIN_BUFFER_SIZE_RECS 16
282 /* Structure for replacement selection tree. */
285 struct repl_sel_tree *loser;/* Loser associated w/this internal node. */
286 int rn; /* Run number of `loser'. */
287 struct repl_sel_tree *fe; /* Internal node above this external node. */
288 struct repl_sel_tree *fi; /* Internal node above this internal node. */
289 union value record[1]; /* The case proper. */
292 /* Static variables used for sorting. */
293 static struct repl_sel_tree **x; /* Buffers. */
294 static int x_max; /* Size of buffers, in records. */
295 static int records_per_buffer; /* Number of records in each buffer. */
297 /* In the merge phase, the first N_INPUT_BUFFERS handle[] elements are
298 input files and the last element is the output file. Before that,
299 they're all used as output files, although the last one is
301 static FILE *handle[MAX_FILE_HANDLES]; /* File handles. */
303 /* Now, MAX_FILE_HANDLES is the maximum number of files we will *try*
304 to open. But if we can't open that many, max_handles will be set
305 to the number we apparently can open. */
306 static int max_handles; /* Maximum number of handles. */
308 /* When we create temporary files, they are all put in the same
309 directory and numbered sequentially from zero. tmp_basename is the
310 drive/directory, etc., and tmp_extname can be sprintf() with "%08x"
311 to the file number, then tmp_basename used to open the file. */
312 static char *tmp_basename; /* Temporary file basename. */
313 static char *tmp_extname; /* Temporary file extension name. */
315 /* We use Huffman's method to determine the merge pattern. This means
316 that we need to know which runs are the shortest at any given time.
317 Priority queues as implemented by heap.c are a natural for this
318 task (probably because I wrote the code specifically for it). */
319 static struct heap *huffman_queue; /* Huffman priority queue. */
321 /* Prototypes for helper functions. */
322 static void sort_stream_write (void);
323 static int write_initial_runs (int separate);
324 static int allocate_cases (void);
325 static int allocate_file_handles (void);
326 static int merge (void);
327 static void rmdir_temp_dir (void);
329 /* Performs an external sort of the active file. A description of the
330 procedure follows. All page references refer to Knuth's _Art of
331 Computer Programming, Vol. 3: Sorting and Searching_, which is the
332 canonical resource for sorting.
334 1. The data is read and S initial runs are formed through the
335 action of algorithm 5.4.1R (replacement selection).
337 2. Huffman's method (p. 365-366) is used to determine the optimum
340 3. If an OS that supports overlapped reading, writing, and
341 computing is being run, we should use 5.4.6F for forecasting.
342 Otherwise, buffers are filled only when they run out of data.
343 FIXME: Since the author of PSPP uses GNU/Linux, which does not
344 yet implement overlapped r/w/c, 5.4.6F is not used.
346 4. We perform P-way merges:
348 (a) The desired P is the smallest P such that ceil(ln(S)/ln(P))
349 is minimized. (FIXME: Since I don't have an algorithm for
350 minimizing this, it's just set to MAX_MERGE_ORDER.)
352 (b) P is reduced if the selected value would make input buffers
353 less than 4096 bytes each, or 16 records, whichever is larger.
355 (c) P is reduced if we run out of available file handles or space
358 (d) P is reduced if we don't have space for one or two output
359 buffers, which have the same minimum size as input buffers. (We
360 need two output buffers if 5.4.6F is in use for forecasting.) */
362 do_external_sort (int separate)
366 assert (MAX_FILE_HANDLES >= 3);
371 huffman_queue = heap_create (512);
372 if (huffman_queue == NULL)
375 if (!allocate_cases ())
378 if (!allocate_file_handles ())
381 if (!write_initial_runs (separate))
388 /* Despite the name, flow of control comes here regardless of
389 whether or not the sort is successful. */
391 heap_destroy (huffman_queue);
397 for (i = 0; i <= x_max; i++)
412 /* Sets up to open temporary files. */
413 /* PORTME: This creates a directory for temporary files. Some OSes
414 might not have that concept... */
416 allocate_file_handles (void)
418 const char *dir; /* Directory prefix. */
419 char *buf; /* String buffer. */
420 char *cp; /* Pointer into buf. */
422 dir = getenv ("SPSSTMPDIR");
424 dir = getenv ("SPSSXTMPDIR");
426 dir = getenv ("TMPDIR");
434 #elif defined (__MSDOS__)
436 dir = getenv ("TEMP");
438 dir = getenv ("TMP");
445 buf = xmalloc (strlen (dir) + 1 + 4 + 8 + 4 + 1 + INT_DIGITS + 1);
446 cp = spprintf (buf, "%s%c%04lX%04lXpspp", dir, DIR_SEPARATOR,
447 ((long) time (0)) & 0xffff, ((long) getpid ()) & 0xffff);
449 if (-1 == mkdir (buf, S_IRWXU))
451 if (-1 == mkdir (buf))
455 msg (SE, _("%s: Cannot create temporary directory: %s."),
456 buf, strerror (errno));
459 *cp++ = DIR_SEPARATOR;
464 max_handles = MAX_FILE_HANDLES;
469 /* Removes the directory created for temporary files, if one exists.
470 Also frees tmp_basename. */
472 rmdir_temp_dir (void)
474 if (NULL == tmp_basename)
477 tmp_extname[-1] = '\0';
478 if (rmdir (tmp_basename) == -1)
479 msg (SE, _("%s: Error removing directory for temporary files: %s."),
480 tmp_basename, strerror (errno));
485 /* Allocates room for lots of cases as a buffer. */
487 allocate_cases (void)
489 /* This is the size of one case. */
490 const int case_size = (sizeof (struct repl_sel_tree)
491 + dict_get_case_size (default_dict)
492 - sizeof (union value)
493 + sizeof (struct repl_sel_tree *));
497 /* Allocate as many cases as we can, assuming a space of four
498 void pointers for malloc()'s internal bookkeeping. */
499 x_max = MAX_WORKSPACE / (case_size + 4 * sizeof (void *));
500 x = malloc (sizeof (struct repl_sel_tree *) * x_max);
505 for (i = 0; i < x_max; i++)
507 x[i] = malloc (sizeof (struct repl_sel_tree)
508 + dict_get_case_size (default_dict)
509 - sizeof (union value));
515 if (x == NULL || x_max < MIN_BUFFER_TOTAL_SIZE_RECS)
521 for (i = 0; i < x_max; i++)
525 msg (SE, _("Out of memory. Could not allocate room for minimum of %d "
526 "cases of %d bytes each. (PSPP workspace is currently "
527 "restricted to a maximum of %d KB.)"),
528 MIN_BUFFER_TOTAL_SIZE_RECS, case_size, MAX_WORKSPACE / 1024);
534 /* The last element of the array is used to store lastkey. */
537 debug_printf ((_("allocated %d cases == %d bytes\n"),
538 x_max, x_max * case_size));
542 /* Replacement selection. */
544 static int rmax, rc, rq;
545 static struct repl_sel_tree *q;
546 static union value *lastkey;
547 static int run_no, file_index;
548 static int deferred_abort;
549 static int run_length;
551 static int compare_record (union value *, union value *);
554 output_record (union value *v)
556 union value *src_case;
561 if (compaction_necessary)
563 compact_case (compaction_case, (struct ccase *) v);
564 src_case = (union value *) compaction_case;
567 src_case = (union value *) v;
569 if ((int) fwrite (src_case, sizeof *src_case, compaction_nval,
574 sprintf (tmp_extname, "%08x", run_no);
575 msg (SE, _("%s: Error writing temporary file: %s."),
576 tmp_basename, strerror (errno));
586 int result = fclose (handle[i]);
587 msg (VM (2), _("SORT: Closing handle %d."), i);
592 sprintf (tmp_extname, "%08x", i);
593 msg (SE, _("%s: Error closing temporary file: %s."),
594 tmp_basename, strerror (errno));
601 close_handles (int beg, int end)
606 for (i = beg; i < end; i++)
607 success &= close_handle (i);
612 open_handle_w (int handle_no, int run_no)
614 sprintf (tmp_extname, "%08x", run_no);
615 msg (VM (1), _("SORT: %s: Opening for writing as run %d."),
616 tmp_basename, run_no);
618 /* The `x' modifier causes the GNU C library to insist on creating a
619 new file--if the file already exists, an error is signaled. The
620 ANSI C standard says that other libraries should ignore anything
621 after the `w+b', so it shouldn't be a problem. */
622 return NULL != (handle[handle_no] = fopen (tmp_basename, "w+bx"));
626 open_handle_r (int handle_no, int run_no)
630 sprintf (tmp_extname, "%08x", run_no);
631 msg (VM (1), _("SORT: %s: Opening for writing as run %d."),
632 tmp_basename, run_no);
633 f = handle[handle_no] = fopen (tmp_basename, "rb");
637 msg (SE, _("%s: Error opening temporary file for reading: %s."),
638 tmp_basename, strerror (errno));
645 /* Begins a new initial run, specifically its output file. */
649 /* Decide which handle[] to use. If run_no is max_handles or
650 greater, then we've run out of handles so it's time to just do
651 one file at a time, which by default is handle 0. */
652 file_index = (run_no < max_handles ? run_no : 0);
655 /* Alright, now create the temporary file. */
656 if (open_handle_w (file_index, run_no) == 0)
658 /* Failure to create the temporary file. Check if there are
659 unacceptably few files already open. */
663 msg (SE, _("%s: Error creating temporary file: %s."),
664 tmp_basename, strerror (errno));
668 /* Close all the open temporary files. */
669 if (!close_handles (0, file_index))
672 /* Now try again to create the temporary file. */
673 max_handles = file_index;
675 if (open_handle_w (0, run_no) == 0)
677 /* It still failed, report it this time. */
679 msg (SE, _("%s: Error creating temporary file: %s."),
680 tmp_basename, strerror (errno));
686 /* Ends the current initial run. Just increments run_no if no initial
687 run has been started yet. */
691 /* Close file handles if necessary. */
695 if (run_no == max_handles - 1)
696 result = close_handles (0, max_handles);
697 else if (run_no >= max_handles)
698 result = close_handle (0);
705 /* Advance to next run. */
708 heap_insert (huffman_queue, run_no - 1, run_length);
711 /* Performs 5.4.1R. */
713 write_initial_runs (int separate)
718 /* Steps R1, R2, R3. */
727 for (j = 0; j < x_max; j++)
729 struct repl_sel_tree *J = x[j];
733 J->fe = x[(x_max + j) / 2];
735 memset (J->record, 0, dict_get_case_size (default_dict));
739 /* Most of the iterations of steps R4, R5, R6, R7, R2, R3, ... */
743 vfm_sink->destroy_sink ();
744 vfm_sink = &sort_stream;
746 procedure (NULL, NULL, NULL, NULL);
748 /* Final iterations of steps R4, R5, R6, R7, R2, R3, ... */
751 struct repl_sel_tree *t;
765 && compare_record (t->loser->record, q->record) < 0))
767 struct repl_sel_tree *temp_tree;
770 temp_tree = t->loser;
798 output_record (q->record);
799 lastkey = x[x_max]->record;
800 memcpy (lastkey, q->record, sizeof (union value) * vfm_sink_info.nval);
803 assert (run_no == rmax);
805 /* If an unrecoverable error occurred somewhere in the above code,
806 then the `deferred_abort' flag would have been set. */
811 for (i = 0; i < max_handles; i++)
812 if (handle[i] != NULL)
814 sprintf (tmp_extname, "%08x", i);
816 if (fclose (handle[i]) == EOF)
817 msg (SE, _("%s: Error closing temporary file: %s."),
818 tmp_basename, strerror (errno));
820 if (remove (tmp_basename) != 0)
821 msg (SE, _("%s: Error removing temporary file: %s."),
822 tmp_basename, strerror (errno));
832 /* Compares the NV_SORT variables in V_SORT[] between the `value's at
833 A and B, and returns a strcmp()-type result. */
835 compare_record (union value * a, union value * b)
842 if (b == NULL) /* Sort NULLs after everything else. */
845 for (i = 0; i < nv_sort; i++)
849 if (v->type == NUMERIC)
851 if (a[v->fv].f != b[v->fv].f)
853 result = (a[v->fv].f > b[v->fv].f) ? 1 : -1;
859 result = memcmp (a[v->fv].s, b[v->fv].s, v->width);
865 if (v->p.srt.order == SRT_ASCEND)
869 assert (v->p.srt.order == SRT_DESCEND);
876 static int merge_once (int run_index[], int run_length[], int n_runs);
878 /* Modula function as defined by Knuth. */
886 result = abs (x) % abs (y);
892 /* Performs a series of P-way merges of initial runs using Huffman's
897 /* Order of merge. */
901 assert (MIN_BUFFER_SIZE_RECS * 2 <= MIN_BUFFER_TOTAL_SIZE_RECS - 1);
903 /* Close all the input files. I hope that the boundary conditions
904 are correct on this but I'm not sure. */
905 if (run_no < max_handles)
909 for (i = 0; i < run_no; )
910 if (!close_handle (i++))
912 for (; i < run_no; i++)
918 /* Determine order of merge. */
919 order = MAX_MERGE_ORDER;
920 if (x_max / order < MIN_BUFFER_SIZE_RECS)
921 order = x_max / MIN_BUFFER_SIZE_RECS;
922 else if (x_max / order * dict_get_case_size (default_dict)
923 < MIN_BUFFER_SIZE_BYTES)
924 order = x_max / (MIN_BUFFER_SIZE_BYTES
925 / dict_get_case_size (default_dict));
927 /* Make sure the order of merge is bounded. */
932 assert (x_max / order > 0);
934 /* Calculate number of records per buffer. */
935 records_per_buffer = x_max / order;
937 /* Add (1 - S) mod (P - 1) dummy runs of length 0. */
939 int n_dummy_runs = mod (1 - rmax, order - 1);
940 debug_printf (("rmax=%d, order=%d, n_dummy_runs=%d\n",
941 rmax, order, n_dummy_runs));
942 assert (n_dummy_runs >= 0);
943 while (n_dummy_runs--)
945 heap_insert (huffman_queue, -2, 0);
950 /* Repeatedly merge the P shortest existing runs until only one run
954 int run_index[MAX_MERGE_ORDER];
955 int run_length[MAX_MERGE_ORDER];
956 int total_run_length = 0;
959 assert (rmax >= order);
961 /* Find the shortest runs; put them in runs[] in reverse order
962 of length, to force dummy runs of length 0 to the end of the
964 debug_printf ((_("merging runs")));
965 for (i = order - 1; i >= 0; i--)
967 run_index[i] = heap_delete (huffman_queue, &run_length[i]);
968 assert (run_index[i] != -1);
969 total_run_length += run_length[i];
970 debug_printf ((" %d(%d)", run_index[i], run_length[i]));
972 debug_printf ((_(" into run %d(%d)\n"), run_no, total_run_length));
974 if (!merge_once (run_index, run_length, order))
978 while (-1 != (index = heap_delete (huffman_queue, NULL)))
980 sprintf (tmp_extname, "%08x", index);
981 if (remove (tmp_basename) != 0)
982 msg (SE, _("%s: Error removing temporary file: %s."),
983 tmp_basename, strerror (errno));
989 if (!heap_insert (huffman_queue, run_no++, total_run_length))
991 msg (SE, _("Out of memory expanding Huffman priority queue."));
998 /* There should be exactly one element in the priority queue after
999 all that merging. This represents the entire sorted active file.
1000 So we could find a total case count by deleting this element from
1002 assert (heap_size (huffman_queue) == 1);
1007 /* Merges N_RUNS initial runs into a new run. The jth run for 0 <= j
1008 < N_RUNS is taken from temporary file RUN_INDEX[j]; it is composed
1009 of RUN_LENGTH[j] cases. */
1011 merge_once (int run_index[], int run_length[], int n_runs)
1013 /* For each run, the number of records remaining in its buffer. */
1014 int buffered[MAX_MERGE_ORDER];
1016 /* For each run, the index of the next record in the buffer. */
1017 int buffer_ptr[MAX_MERGE_ORDER];
1019 /* Open input files. */
1023 for (i = 0; i < n_runs; i++)
1024 if (run_index[i] != -2 && !open_handle_r (i, run_index[i]))
1026 /* Close and remove temporary files. */
1030 sprintf (tmp_extname, "%08x", i);
1031 if (remove (tmp_basename) != 0)
1032 msg (SE, _("%s: Error removing temporary file: %s."),
1033 tmp_basename, strerror (errno));
1040 /* Create output file. */
1041 if (!open_handle_w (N_INPUT_BUFFERS, run_no))
1043 msg (SE, _("%s: Error creating temporary file for merge: %s."),
1044 tmp_basename, strerror (errno));
1048 /* Prime each buffer. */
1052 for (i = 0; i < n_runs; i++)
1053 if (run_index[i] == -2)
1061 int ofs = records_per_buffer * i;
1063 buffered[i] = min (records_per_buffer, run_length[i]);
1064 for (j = 0; j < buffered[i]; j++)
1065 if (fread (x[j + ofs]->record,
1066 dict_get_case_size (default_dict), 1, handle[i]) != 1)
1068 sprintf (tmp_extname, "%08x", run_index[i]);
1069 if (ferror (handle[i]))
1070 msg (SE, _("%s: Error reading temporary file in merge: %s."),
1071 tmp_basename, strerror (errno));
1073 msg (SE, _("%s: Unexpected end of temporary file in merge."),
1077 buffer_ptr[i] = ofs;
1078 run_length[i] -= buffered[i];
1082 /* Perform the merge proper. */
1083 while (n_runs) /* Loop while some data is left. */
1088 for (i = 1; i < n_runs; i++)
1089 if (compare_record (x[buffer_ptr[min]]->record,
1090 x[buffer_ptr[i]]->record) > 0)
1093 if (fwrite (x[buffer_ptr[min]]->record,
1094 dict_get_case_size (default_dict), 1,
1095 handle[N_INPUT_BUFFERS]) != 1)
1097 sprintf (tmp_extname, "%08x", run_index[i]);
1098 msg (SE, _("%s: Error writing temporary file in "
1099 "merge: %s."), tmp_basename, strerror (errno));
1103 /* Remove one case from the buffer for this input file. */
1104 if (--buffered[min] == 0)
1106 /* The input buffer is empty. Do any cases remain in the
1107 initial run on disk? */
1108 if (run_length[min])
1110 /* Yes. Read them in. */
1115 /* Reset the buffer pointer. Note that we can't simply
1116 set it to (i * records_per_buffer) since the run
1117 order might have changed. */
1118 ofs = buffer_ptr[min] -= buffer_ptr[min] % records_per_buffer;
1120 buffered[min] = min (records_per_buffer, run_length[min]);
1121 for (j = 0; j < buffered[min]; j++)
1122 if (fread (x[j + ofs]->record,
1123 dict_get_case_size (default_dict), 1, handle[min])
1126 sprintf (tmp_extname, "%08x", run_index[min]);
1127 if (ferror (handle[min]))
1128 msg (SE, _("%s: Error reading temporary file in "
1130 tmp_basename, strerror (errno));
1132 msg (SE, _("%s: Unexpected end of temporary file "
1137 run_length[min] -= buffered[min];
1141 /* No. Delete this run. */
1143 /* Close the file. */
1144 FILE *f = handle[min];
1146 sprintf (tmp_extname, "%08x", run_index[min]);
1147 if (fclose (f) == EOF)
1148 msg (SE, _("%s: Error closing temporary file in merge: "
1149 "%s."), tmp_basename, strerror (errno));
1151 /* Delete the file. */
1152 if (remove (tmp_basename) != 0)
1153 msg (SE, _("%s: Error removing temporary file in merge: "
1154 "%s."), tmp_basename, strerror (errno));
1159 /* Since this isn't the last run, we move the last
1160 run into its spot to force all the runs to be
1162 run_index[min] = run_index[n_runs];
1163 run_length[min] = run_length[n_runs];
1164 buffer_ptr[min] = buffer_ptr[n_runs];
1165 buffered[min] = buffered[n_runs];
1166 handle[min] = handle[n_runs];
1174 /* Close output file. */
1176 FILE *f = handle[N_INPUT_BUFFERS];
1177 handle[N_INPUT_BUFFERS] = NULL;
1178 if (fclose (f) == EOF)
1180 sprintf (tmp_extname, "%08x", run_no);
1181 msg (SE, _("%s: Error closing temporary file in merge: "
1183 tmp_basename, strerror (errno));
1191 /* Close all the input and output files. */
1195 for (i = 0; i < n_runs; i++)
1196 if (run_length[i] != 0)
1199 sprintf (tmp_basename, "%08x", run_index[i]);
1200 if (remove (tmp_basename) != 0)
1201 msg (SE, _("%s: Error removing temporary file: %s."),
1202 tmp_basename, strerror (errno));
1205 close_handle (N_INPUT_BUFFERS);
1206 sprintf (tmp_basename, "%08x", run_no);
1207 if (remove (tmp_basename) != 0)
1208 msg (SE, _("%s: Error removing temporary file: %s."),
1209 tmp_basename, strerror (errno));
1213 /* External sort input program. */
1215 /* Reads all the records from the source stream and passes them
1218 sort_stream_read (write_case_func *write_case, write_case_data wc_data)
1220 read_sort_output (write_case, wc_data);
1223 /* Reads all the records from the output stream and passes them to the
1224 function provided, which must have an interface identical to
1227 read_sort_output (write_case_func *write_case, write_case_data wc_data)
1232 if (separate_case_tab)
1234 struct ccase *save_temp_case = temp_case;
1235 struct case_list **p;
1237 for (p = separate_case_tab; *p; p++)
1239 temp_case = &(*p)->c;
1240 write_case (wc_data);
1243 free (separate_case_tab);
1244 separate_case_tab = NULL;
1246 temp_case = save_temp_case;
1248 sprintf (tmp_extname, "%08x", run_no - 1);
1249 f = fopen (tmp_basename, "rb");
1252 msg (ME, _("%s: Cannot open sort result file: %s."), tmp_basename,
1258 for (i = 0; i < vfm_source_info.ncases; i++)
1260 if (!fread (temp_case, vfm_source_info.case_size, 1, f))
1263 msg (ME, _("%s: Error reading sort result file: %s."),
1264 tmp_basename, strerror (errno));
1266 msg (ME, _("%s: Unexpected end of sort result file: %s."),
1267 tmp_basename, strerror (errno));
1272 if (!write_case (wc_data))
1276 if (fclose (f) == EOF)
1277 msg (ME, _("%s: Error closing sort result file: %s."), tmp_basename,
1280 if (remove (tmp_basename) != 0)
1281 msg (ME, _("%s: Error removing sort result file: %s."), tmp_basename,
1288 #if 0 /* dead code */
1289 /* Alternate interface to sort_stream_write used for external sorting
1290 when SEPARATE is true. */
1292 write_separate (struct ccase *c)
1294 assert (c == temp_case);
1296 sort_stream_write ();
1301 /* Performs one iteration of 5.4.1R steps R4, R5, R6, R7, R2, and
1304 sort_stream_write (void)
1306 struct repl_sel_tree *t;
1309 memcpy (q->record, temp_case->data, vfm_sink_info.case_size);
1310 if (compare_record (q->record, lastkey) < 0)
1322 || (t->rn == rq && compare_record (t->loser->record, q->record) < 0))
1324 struct repl_sel_tree *temp_tree;
1327 temp_tree = t->loser;
1347 assert (rq <= rmax);
1354 output_record (q->record);
1355 lastkey = x[x_max]->record;
1356 memcpy (lastkey, q->record, vfm_sink_info.case_size);
1360 /* Switches mode from sink to source. */
1362 sort_stream_mode (void)
1364 /* If this is not done, then we get the following source/sink pairs:
1365 source=memory/disk/DATA LIST/etc., sink=SORT; source=SORT,
1366 sink=SORT; which is not good. */
1370 struct case_stream sort_stream =