1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
44 #include <sys/types.h>
51 #include "debug-print.h"
53 /* Variables to sort. */
54 struct variable **v_sort;
57 /* Used when internal-sorting to a separate file. */
58 static struct case_list **separate_case_tab;
60 /* Other prototypes. */
61 static int compare_case_lists (const void *, const void *);
62 static int do_internal_sort (int separate);
63 static int do_external_sort (int separate);
64 int parse_sort_variables (void);
65 void read_sort_output (int (*write_case) (void));
67 /* Performs the SORT CASES procedures. */
71 /* First, just parse the command. */
72 lex_match_id ("SORT");
73 lex_match_id ("CASES");
76 if (!parse_sort_variables ())
81 /* Then it's time to do the actual work. There are two cases:
83 (internal sort) All the data is in memory. In this case, we
84 perform an EXECUTE to get the data into the desired form, then
85 sort the cases in place, if it is still all in memory.
87 (external sort) The data is not in memory. It may be coming from
88 a system file or other data file, etc. In any case, it is now
89 time to perform an external sort. This is better explained in
90 do_external_sort(). */
92 /* Do all this dirty work. */
94 int success = sort_cases (0);
97 return lex_end_of_command ();
103 /* Parses a list of sort variables into v_sort and nv_sort. */
105 parse_sort_variables (void)
111 int prev_nv_sort = nv_sort;
112 int order = SRT_ASCEND;
114 if (!parse_variables (&default_dict, &v_sort, &nv_sort,
115 PV_NO_DUPLICATE | PV_APPEND | PV_NO_SCRATCH))
119 if (lex_match_id ("D") || lex_match_id ("DOWN"))
121 else if (!lex_match_id ("A") && !lex_match_id ("UP"))
124 msg (SE, _("`A' or `D' expected inside parentheses."));
127 if (!lex_match (')'))
130 msg (SE, _("`)' expected."));
134 for (; prev_nv_sort < nv_sort; prev_nv_sort++)
135 v_sort[prev_nv_sort]->p.srt.order = order;
137 while (token != '.' && token != '/');
142 /* Sorts the active file based on the key variables specified in
143 global variables v_sort and nv_sort. The output is either to the
144 active file, if SEPARATE is zero, or to a separate file, if
145 SEPARATE is nonzero. In the latter case the output cases can be
146 read with a call to read_sort_output(). (In the former case the
147 output cases should be dealt with through the usual vfm interface.)
149 The caller is responsible for freeing v_sort[]. */
151 sort_cases (int separate)
153 assert (separate_case_tab == NULL);
155 /* Not sure this is necessary but it's good to be safe. */
156 if (separate && vfm_source == &sort_stream)
157 procedure (NULL, NULL, NULL);
159 /* SORT CASES cancels PROCESS IF. */
160 expr_free (process_if_expr);
161 process_if_expr = NULL;
163 if (do_internal_sort (separate))
167 return do_external_sort (separate);
170 /* If a reasonable situation is set up, do an internal sort of the
171 data. Return success. */
173 do_internal_sort (int separate)
175 if (vfm_source != &vfm_disk_stream)
177 if (vfm_source != &vfm_memory_stream)
178 procedure (NULL, NULL, NULL);
179 if (vfm_source == &vfm_memory_stream)
181 struct case_list **case_tab = malloc (sizeof *case_tab
182 * (vfm_source_info.ncases + 1));
183 if (vfm_source_info.ncases == 0)
188 if (case_tab != NULL)
190 struct case_list *clp = memory_source_cases;
191 struct case_list **ctp = case_tab;
194 for (; clp; clp = clp->next)
196 qsort (case_tab, vfm_source_info.ncases, sizeof *case_tab,
201 memory_source_cases = case_tab[0];
202 for (i = 1; i < vfm_source_info.ncases; i++)
203 case_tab[i - 1]->next = case_tab[i];
204 case_tab[vfm_source_info.ncases - 1]->next = NULL;
207 case_tab[vfm_source_info.ncases] = NULL;
208 separate_case_tab = case_tab;
218 /* Compares the NV_SORT variables in V_SORT[] between the `case_list's
219 at _A and _B, and returns a strcmp()-type result. */
221 compare_case_lists (const void *pa, const void *pb)
223 struct case_list *a = *(struct case_list **) pa;
224 struct case_list *b = *(struct case_list **) pb;
229 for (i = 0; i < nv_sort; i++)
233 if (v->type == NUMERIC)
235 if (approx_ne (a->c.data[v->fv].f, b->c.data[v->fv].f))
237 result = (a->c.data[v->fv].f > b->c.data[v->fv].f) ? 1 : -1;
243 result = memcmp (a->c.data[v->fv].s, b->c.data[v->fv].s, v->width);
249 if (v->p.srt.order == SRT_ASCEND)
253 assert (v->p.srt.order == SRT_DESCEND);
260 /* Maximum number of input + output file handles. */
261 #if defined FOPEN_MAX && (FOPEN_MAX - 5 < 18)
262 #define MAX_FILE_HANDLES (FOPEN_MAX - 5)
264 #define MAX_FILE_HANDLES 18
267 #if MAX_FILE_HANDLES < 3
268 #error At least 3 file handles must be available for sorting.
271 /* Number of input buffers. */
272 #define N_INPUT_BUFFERS (MAX_FILE_HANDLES - 1)
274 /* Maximum order of merge. This is the value suggested by Knuth;
275 specifically, he said to use tree selection, which we don't
276 implement, for larger orders of merge. */
277 #define MAX_MERGE_ORDER 7
279 /* Minimum total number of records for buffers. */
280 #define MIN_BUFFER_TOTAL_SIZE_RECS 64
282 /* Minimum single input or output buffer size, in bytes and records. */
283 #define MIN_BUFFER_SIZE_BYTES 4096
284 #define MIN_BUFFER_SIZE_RECS 16
286 /* Structure for replacement selection tree. */
289 struct repl_sel_tree *loser;/* Loser associated w/this internal node. */
290 int rn; /* Run number of `loser'. */
291 struct repl_sel_tree *fe; /* Internal node above this external node. */
292 struct repl_sel_tree *fi; /* Internal node above this internal node. */
293 union value record[1]; /* The case proper. */
296 /* Static variables used for sorting. */
297 static struct repl_sel_tree **x; /* Buffers. */
298 static int x_max; /* Size of buffers, in records. */
299 static int records_per_buffer; /* Number of records in each buffer. */
301 /* In the merge phase, the first N_INPUT_BUFFERS handle[] elements are
302 input files and the last element is the output file. Before that,
303 they're all used as output files, although the last one is
305 static FILE *handle[MAX_FILE_HANDLES]; /* File handles. */
307 /* Now, MAX_FILE_HANDLES is the maximum number of files we will *try*
308 to open. But if we can't open that many, max_handles will be set
309 to the number we apparently can open. */
310 static int max_handles; /* Maximum number of handles. */
312 /* When we create temporary files, they are all put in the same
313 directory and numbered sequentially from zero. tmp_basename is the
314 drive/directory, etc., and tmp_extname can be sprintf() with "%08x"
315 to the file number, then tmp_basename used to open the file. */
316 static char *tmp_basename; /* Temporary file basename. */
317 static char *tmp_extname; /* Temporary file extension name. */
319 /* We use Huffman's method to determine the merge pattern. This means
320 that we need to know which runs are the shortest at any given time.
321 Priority queues as implemented by heap.c are a natural for this
322 task (probably because I wrote the code specifically for it). */
323 static struct heap *huffman_queue; /* Huffman priority queue. */
325 /* Prototypes for helper functions. */
326 static void sort_stream_write (void);
327 static int write_initial_runs (int separate);
328 static int allocate_cases (void);
329 static int allocate_file_handles (void);
330 static int merge (void);
331 static void rmdir_temp_dir (void);
333 /* Performs an external sort of the active file. A description of the
334 procedure follows. All page references refer to Knuth's _Art of
335 Computer Programming, Vol. 3: Sorting and Searching_, which is the
336 canonical resource for sorting.
338 1. The data is read and S initial runs are formed through the
339 action of algorithm 5.4.1R (replacement selection).
341 2. Huffman's method (p. 365-366) is used to determine the optimum
344 3. If an OS that supports overlapped reading, writing, and
345 computing is being run, we should use 5.4.6F for forecasting.
346 Otherwise, buffers are filled only when they run out of data.
347 FIXME: Since the author of PSPP uses GNU/Linux, which does not
348 yet implement overlapped r/w/c, 5.4.6F is not used.
350 4. We perform P-way merges:
352 (a) The desired P is the smallest P such that ceil(ln(S)/ln(P))
353 is minimized. (FIXME: Since I don't have an algorithm for
354 minimizing this, it's just set to MAX_MERGE_ORDER.)
356 (b) P is reduced if the selected value would make input buffers
357 less than 4096 bytes each, or 16 records, whichever is larger.
359 (c) P is reduced if we run out of available file handles or space
362 (d) P is reduced if we don't have space for one or two output
363 buffers, which have the same minimum size as input buffers. (We
364 need two output buffers if 5.4.6F is in use for forecasting.) */
366 do_external_sort (int separate)
370 assert (MAX_FILE_HANDLES >= 3);
375 huffman_queue = heap_create (512);
376 if (huffman_queue == NULL)
379 if (!allocate_cases ())
382 if (!allocate_file_handles ())
385 if (!write_initial_runs (separate))
392 /* Despite the name, flow of control comes here regardless of
393 whether or not the sort is successful. */
395 heap_destroy (huffman_queue);
401 for (i = 0; i <= x_max; i++)
416 /* Sets up to open temporary files. */
417 /* PORTME: This creates a directory for temporary files. Some OSes
418 might not have that concept... */
420 allocate_file_handles (void)
422 const char *dir; /* Directory prefix. */
423 char *buf; /* String buffer. */
424 char *cp; /* Pointer into buf. */
426 dir = getenv ("SPSSTMPDIR");
428 dir = getenv ("SPSSXTMPDIR");
430 dir = getenv ("TMPDIR");
440 dir = getenv ("TEMP");
442 dir = getenv ("TMP");
449 buf = xmalloc (strlen (dir) + 1 + 4 + 8 + 4 + 1 + INT_DIGITS + 1);
450 cp = spprintf (buf, "%s%c%04lX%04lXpspp", dir, DIR_SEPARATOR,
451 ((long) time (0)) & 0xffff, ((long) getpid ()) & 0xffff);
452 if (-1 == mkdir (buf, S_IRWXU))
455 msg (SE, _("%s: Cannot create temporary directory: %s."),
456 buf, strerror (errno));
459 *cp++ = DIR_SEPARATOR;
464 max_handles = MAX_FILE_HANDLES;
469 /* Removes the directory created for temporary files, if one exists.
470 Also frees tmp_basename. */
472 rmdir_temp_dir (void)
474 if (NULL == tmp_basename)
477 tmp_extname[-1] = '\0';
478 if (rmdir (tmp_basename) == -1)
479 msg (SE, _("%s: Error removing directory for temporary files: %s."),
480 tmp_basename, strerror (errno));
485 /* Allocates room for lots of cases as a buffer. */
487 allocate_cases (void)
489 /* This is the size of one case. */
490 const int case_size = (sizeof (struct repl_sel_tree)
491 + sizeof (union value) * (default_dict.nval - 1)
492 + sizeof (struct repl_sel_tree *));
496 /* Allocate as many cases as we can, assuming a space of four
497 void pointers for malloc()'s internal bookkeeping. */
498 x_max = MAX_WORKSPACE / (case_size + 4 * sizeof (void *));
499 x = malloc (sizeof (struct repl_sel_tree *) * x_max);
504 for (i = 0; i < x_max; i++)
506 x[i] = malloc (sizeof (struct repl_sel_tree)
507 + sizeof (union value) * (default_dict.nval - 1));
513 if (x == NULL || x_max < MIN_BUFFER_TOTAL_SIZE_RECS)
519 for (i = 0; i < x_max; i++)
523 msg (SE, _("Out of memory. Could not allocate room for minimum of %d "
524 "cases of %d bytes each. (PSPP workspace is currently "
525 "restricted to a maximum of %d KB.)"),
526 MIN_BUFFER_TOTAL_SIZE_RECS, case_size, MAX_WORKSPACE / 1024);
532 /* The last element of the array is used to store lastkey. */
535 debug_printf ((_("allocated %d cases == %d bytes\n"),
536 x_max, x_max * case_size));
540 /* Replacement selection. */
542 static int rmax, rc, rq;
543 static struct repl_sel_tree *q;
544 static union value *lastkey;
545 static int run_no, file_index;
546 static int deferred_abort;
547 static int run_length;
549 static int compare_record (union value *, union value *);
552 output_record (union value *v)
554 union value *src_case;
559 if (compaction_necessary)
561 compact_case (compaction_case, (struct ccase *) v);
562 src_case = (union value *) compaction_case;
565 src_case = (union value *) v;
567 if ((int) fwrite (src_case, sizeof *src_case, compaction_nval,
572 sprintf (tmp_extname, "%08x", run_no);
573 msg (SE, _("%s: Error writing temporary file: %s."),
574 tmp_basename, strerror (errno));
584 int result = fclose (handle[i]);
585 msg (VM (2), _("SORT: Closing handle %d."), i);
590 sprintf (tmp_extname, "%08x", i);
591 msg (SE, _("%s: Error closing temporary file: %s."),
592 tmp_basename, strerror (errno));
599 close_handles (int beg, int end)
604 for (i = beg; i < end; i++)
605 success &= close_handle (i);
610 open_handle_w (int handle_no, int run_no)
612 sprintf (tmp_extname, "%08x", run_no);
613 msg (VM (1), _("SORT: %s: Opening for writing as run %d."),
614 tmp_basename, run_no);
616 /* The `x' modifier causes the GNU C library to insist on creating a
617 new file--if the file already exists, an error is signaled. The
618 ANSI C standard says that other libraries should ignore anything
619 after the `w+b', so it shouldn't be a problem. */
620 return NULL != (handle[handle_no] = fopen (tmp_basename, "w+bx"));
624 open_handle_r (int handle_no, int run_no)
628 sprintf (tmp_extname, "%08x", run_no);
629 msg (VM (1), _("SORT: %s: Opening for writing as run %d."),
630 tmp_basename, run_no);
631 f = handle[handle_no] = fopen (tmp_basename, "rb");
635 msg (SE, _("%s: Error opening temporary file for reading: %s."),
636 tmp_basename, strerror (errno));
643 /* Begins a new initial run, specifically its output file. */
647 /* Decide which handle[] to use. If run_no is max_handles or
648 greater, then we've run out of handles so it's time to just do
649 one file at a time, which by default is handle 0. */
650 file_index = (run_no < max_handles ? run_no : 0);
653 /* Alright, now create the temporary file. */
654 if (open_handle_w (file_index, run_no) == 0)
656 /* Failure to create the temporary file. Check if there are
657 unacceptably few files already open. */
661 msg (SE, _("%s: Error creating temporary file: %s."),
662 tmp_basename, strerror (errno));
666 /* Close all the open temporary files. */
667 if (!close_handles (0, file_index))
670 /* Now try again to create the temporary file. */
671 max_handles = file_index;
673 if (open_handle_w (0, run_no) == 0)
675 /* It still failed, report it this time. */
677 msg (SE, _("%s: Error creating temporary file: %s."),
678 tmp_basename, strerror (errno));
684 /* Ends the current initial run. Just increments run_no if no initial
685 run has been started yet. */
689 /* Close file handles if necessary. */
693 if (run_no == max_handles - 1)
694 result = close_handles (0, max_handles);
695 else if (run_no >= max_handles)
696 result = close_handle (0);
703 /* Advance to next run. */
706 heap_insert (huffman_queue, run_no - 1, run_length);
709 /* Performs 5.4.1R. */
711 write_initial_runs (int separate)
716 /* Steps R1, R2, R3. */
725 for (j = 0; j < x_max; j++)
727 struct repl_sel_tree *J = x[j];
731 J->fe = x[(x_max + j) / 2];
733 memset (J->record, 0, default_dict.nval * sizeof (union value));
737 /* Most of the iterations of steps R4, R5, R6, R7, R2, R3, ... */
741 vfm_sink->destroy_sink ();
742 vfm_sink = &sort_stream;
744 procedure (NULL, NULL, NULL);
746 /* Final iterations of steps R4, R5, R6, R7, R2, R3, ... */
749 struct repl_sel_tree *t;
763 && compare_record (t->loser->record, q->record) < 0))
765 struct repl_sel_tree *temp_tree;
768 temp_tree = t->loser;
796 output_record (q->record);
797 lastkey = x[x_max]->record;
798 memcpy (lastkey, q->record, sizeof (union value) * vfm_sink_info.nval);
801 assert (run_no == rmax);
803 /* If an unrecoverable error occurred somewhere in the above code,
804 then the `deferred_abort' flag would have been set. */
809 for (i = 0; i < max_handles; i++)
810 if (handle[i] != NULL)
812 sprintf (tmp_extname, "%08x", i);
814 if (fclose (handle[i]) == EOF)
815 msg (SE, _("%s: Error closing temporary file: %s."),
816 tmp_basename, strerror (errno));
818 if (remove (tmp_basename) != 0)
819 msg (SE, _("%s: Error removing temporary file: %s."),
820 tmp_basename, strerror (errno));
830 /* Compares the NV_SORT variables in V_SORT[] between the `value's at
831 A and B, and returns a strcmp()-type result. */
833 compare_record (union value * a, union value * b)
840 if (b == NULL) /* Sort NULLs after everything else. */
843 for (i = 0; i < nv_sort; i++)
847 if (v->type == NUMERIC)
849 if (approx_ne (a[v->fv].f, b[v->fv].f))
851 result = (a[v->fv].f > b[v->fv].f) ? 1 : -1;
857 result = memcmp (a[v->fv].s, b[v->fv].s, v->width);
863 if (v->p.srt.order == SRT_ASCEND)
867 assert (v->p.srt.order == SRT_DESCEND);
874 static int merge_once (int run_index[], int run_length[], int n_runs);
876 /* Modula function as defined by Knuth. */
884 result = abs (x) % abs (y);
890 /* Performs a series of P-way merges of initial runs using Huffman's
895 /* Order of merge. */
899 assert (MIN_BUFFER_SIZE_RECS * 2 <= MIN_BUFFER_TOTAL_SIZE_RECS - 1);
901 /* Close all the input files. I hope that the boundary conditions
902 are correct on this but I'm not sure. */
903 if (run_no < max_handles)
907 for (i = 0; i < run_no; )
908 if (!close_handle (i++))
910 for (; i < run_no; i++)
916 /* Determine order of merge. */
917 order = MAX_MERGE_ORDER;
918 if (x_max / order < MIN_BUFFER_SIZE_RECS)
919 order = x_max / MIN_BUFFER_SIZE_RECS;
920 else if (x_max / order * sizeof (union value) * default_dict.nval
921 < MIN_BUFFER_SIZE_BYTES)
922 order = x_max / (MIN_BUFFER_SIZE_BYTES
923 / (sizeof (union value) * (default_dict.nval - 1)));
925 /* Make sure the order of merge is bounded. */
930 assert (x_max / order > 0);
932 /* Calculate number of records per buffer. */
933 records_per_buffer = x_max / order;
935 /* Add (1 - S) mod (P - 1) dummy runs of length 0. */
937 int n_dummy_runs = mod (1 - rmax, order - 1);
938 debug_printf (("rmax=%d, order=%d, n_dummy_runs=%d\n",
939 rmax, order, n_dummy_runs));
940 assert (n_dummy_runs >= 0);
941 while (n_dummy_runs--)
943 heap_insert (huffman_queue, -2, 0);
948 /* Repeatedly merge the P shortest existing runs until only one run
952 int run_index[MAX_MERGE_ORDER];
953 int run_length[MAX_MERGE_ORDER];
954 int total_run_length = 0;
957 assert (rmax >= order);
959 /* Find the shortest runs; put them in runs[] in reverse order
960 of length, to force dummy runs of length 0 to the end of the
962 debug_printf ((_("merging runs")));
963 for (i = order - 1; i >= 0; i--)
965 run_index[i] = heap_delete (huffman_queue, &run_length[i]);
966 assert (run_index[i] != -1);
967 total_run_length += run_length[i];
968 debug_printf ((" %d(%d)", run_index[i], run_length[i]));
970 debug_printf ((_(" into run %d(%d)\n"), run_no, total_run_length));
972 if (!merge_once (run_index, run_length, order))
976 while (-1 != (index = heap_delete (huffman_queue, NULL)))
978 sprintf (tmp_extname, "%08x", index);
979 if (remove (tmp_basename) != 0)
980 msg (SE, _("%s: Error removing temporary file: %s."),
981 tmp_basename, strerror (errno));
987 if (!heap_insert (huffman_queue, run_no++, total_run_length))
989 msg (SE, _("Out of memory expanding Huffman priority queue."));
996 /* There should be exactly one element in the priority queue after
997 all that merging. This represents the entire sorted active file.
998 So we could find a total case count by deleting this element from
1000 assert (heap_size (huffman_queue) == 1);
1005 /* Merges N_RUNS initial runs into a new run. The jth run for 0 <= j
1006 < N_RUNS is taken from temporary file RUN_INDEX[j]; it is composed
1007 of RUN_LENGTH[j] cases. */
1009 merge_once (int run_index[], int run_length[], int n_runs)
1011 /* For each run, the number of records remaining in its buffer. */
1012 int buffered[MAX_MERGE_ORDER];
1014 /* For each run, the index of the next record in the buffer. */
1015 int buffer_ptr[MAX_MERGE_ORDER];
1017 /* Open input files. */
1021 for (i = 0; i < n_runs; i++)
1022 if (run_index[i] != -2 && !open_handle_r (i, run_index[i]))
1024 /* Close and remove temporary files. */
1028 sprintf (tmp_extname, "%08x", i);
1029 if (remove (tmp_basename) != 0)
1030 msg (SE, _("%s: Error removing temporary file: %s."),
1031 tmp_basename, strerror (errno));
1038 /* Create output file. */
1039 if (!open_handle_w (N_INPUT_BUFFERS, run_no))
1041 msg (SE, _("%s: Error creating temporary file for merge: %s."),
1042 tmp_basename, strerror (errno));
1046 /* Prime each buffer. */
1050 for (i = 0; i < n_runs; i++)
1051 if (run_index[i] == -2)
1059 int ofs = records_per_buffer * i;
1061 buffered[i] = min (records_per_buffer, run_length[i]);
1062 for (j = 0; j < buffered[i]; j++)
1063 if ((int) fread (x[j + ofs]->record, sizeof (union value),
1064 default_dict.nval, handle[i])
1065 != default_dict.nval)
1067 sprintf (tmp_extname, "%08x", run_index[i]);
1068 if (ferror (handle[i]))
1069 msg (SE, _("%s: Error reading temporary file in merge: %s."),
1070 tmp_basename, strerror (errno));
1072 msg (SE, _("%s: Unexpected end of temporary file in merge."),
1076 buffer_ptr[i] = ofs;
1077 run_length[i] -= buffered[i];
1081 /* Perform the merge proper. */
1082 while (n_runs) /* Loop while some data is left. */
1087 for (i = 1; i < n_runs; i++)
1088 if (compare_record (x[buffer_ptr[min]]->record,
1089 x[buffer_ptr[i]]->record) > 0)
1092 if ((int) fwrite (x[buffer_ptr[min]]->record, sizeof (union value),
1093 default_dict.nval, handle[N_INPUT_BUFFERS])
1094 != default_dict.nval)
1096 sprintf (tmp_extname, "%08x", run_index[i]);
1097 msg (SE, _("%s: Error writing temporary file in "
1098 "merge: %s."), tmp_basename, strerror (errno));
1102 /* Remove one case from the buffer for this input file. */
1103 if (--buffered[min] == 0)
1105 /* The input buffer is empty. Do any cases remain in the
1106 initial run on disk? */
1107 if (run_length[min])
1109 /* Yes. Read them in. */
1114 /* Reset the buffer pointer. Note that we can't simply
1115 set it to (i * records_per_buffer) since the run
1116 order might have changed. */
1117 ofs = buffer_ptr[min] -= buffer_ptr[min] % records_per_buffer;
1119 buffered[min] = min (records_per_buffer, run_length[min]);
1120 for (j = 0; j < buffered[min]; j++)
1121 if ((int) fread (x[j + ofs]->record, sizeof (union value),
1122 default_dict.nval, handle[min])
1123 != default_dict.nval)
1125 sprintf (tmp_extname, "%08x", run_index[min]);
1126 if (ferror (handle[min]))
1127 msg (SE, _("%s: Error reading temporary file in "
1129 tmp_basename, strerror (errno));
1131 msg (SE, _("%s: Unexpected end of temporary file "
1136 run_length[min] -= buffered[min];
1140 /* No. Delete this run. */
1142 /* Close the file. */
1143 FILE *f = handle[min];
1145 sprintf (tmp_extname, "%08x", run_index[min]);
1146 if (fclose (f) == EOF)
1147 msg (SE, _("%s: Error closing temporary file in merge: "
1148 "%s."), tmp_basename, strerror (errno));
1150 /* Delete the file. */
1151 if (remove (tmp_basename) != 0)
1152 msg (SE, _("%s: Error removing temporary file in merge: "
1153 "%s."), tmp_basename, strerror (errno));
1158 /* Since this isn't the last run, we move the last
1159 run into its spot to force all the runs to be
1161 run_index[min] = run_index[n_runs];
1162 run_length[min] = run_length[n_runs];
1163 buffer_ptr[min] = buffer_ptr[n_runs];
1164 buffered[min] = buffered[n_runs];
1165 handle[min] = handle[n_runs];
1173 /* Close output file. */
1175 FILE *f = handle[N_INPUT_BUFFERS];
1176 handle[N_INPUT_BUFFERS] = NULL;
1177 if (fclose (f) == EOF)
1179 sprintf (tmp_extname, "%08x", run_no);
1180 msg (SE, _("%s: Error closing temporary file in merge: "
1182 tmp_basename, strerror (errno));
1190 /* Close all the input and output files. */
1194 for (i = 0; i < n_runs; i++)
1195 if (run_length[i] != 0)
1198 sprintf (tmp_basename, "%08x", run_index[i]);
1199 if (remove (tmp_basename) != 0)
1200 msg (SE, _("%s: Error removing temporary file: %s."),
1201 tmp_basename, strerror (errno));
1204 close_handle (N_INPUT_BUFFERS);
1205 sprintf (tmp_basename, "%08x", run_no);
1206 if (remove (tmp_basename) != 0)
1207 msg (SE, _("%s: Error removing temporary file: %s."),
1208 tmp_basename, strerror (errno));
1212 /* External sort input program. */
1214 /* Reads all the records from the source stream and passes them
1217 sort_stream_read (void)
1219 read_sort_output (write_case);
1222 /* Reads all the records from the output stream and passes them to the
1223 function provided, which must have an interface identical to
1226 read_sort_output (int (*write_case) (void))
1231 if (separate_case_tab)
1233 struct ccase *save_temp_case = temp_case;
1234 struct case_list **p;
1236 for (p = separate_case_tab; *p; p++)
1238 temp_case = &(*p)->c;
1242 free (separate_case_tab);
1243 separate_case_tab = NULL;
1245 temp_case = save_temp_case;
1247 sprintf (tmp_extname, "%08x", run_no - 1);
1248 f = fopen (tmp_basename, "rb");
1251 msg (ME, _("%s: Cannot open sort result file: %s."), tmp_basename,
1257 for (i = 0; i < vfm_source_info.ncases; i++)
1259 if (!fread (temp_case, vfm_source_info.case_size, 1, f))
1262 msg (ME, _("%s: Error reading sort result file: %s."),
1263 tmp_basename, strerror (errno));
1265 msg (ME, _("%s: Unexpected end of sort result file: %s."),
1266 tmp_basename, strerror (errno));
1275 if (fclose (f) == EOF)
1276 msg (ME, _("%s: Error closing sort result file: %s."), tmp_basename,
1279 if (remove (tmp_basename) != 0)
1280 msg (ME, _("%s: Error removing sort result file: %s."), tmp_basename,
1287 #if 0 /* dead code */
1288 /* Alternate interface to sort_stream_write used for external sorting
1289 when SEPARATE is true. */
1291 write_separate (struct ccase *c)
1293 assert (c == temp_case);
1295 sort_stream_write ();
1300 /* Performs one iteration of 5.4.1R steps R4, R5, R6, R7, R2, and
1303 sort_stream_write (void)
1305 struct repl_sel_tree *t;
1308 memcpy (q->record, temp_case->data, vfm_sink_info.case_size);
1309 if (compare_record (q->record, lastkey) < 0)
1321 || (t->rn == rq && compare_record (t->loser->record, q->record) < 0))
1323 struct repl_sel_tree *temp_tree;
1326 temp_tree = t->loser;
1346 assert (rq <= rmax);
1353 output_record (q->record);
1354 lastkey = x[x_max]->record;
1355 memcpy (lastkey, q->record, vfm_sink_info.case_size);
1359 /* Switches mode from sink to source. */
1361 sort_stream_mode (void)
1363 /* If this is not done, then we get the following source/sink pairs:
1364 source=memory/disk/DATA LIST/etc., sink=SORT; source=SORT,
1365 sink=SORT; which is not good. */
1369 struct case_stream sort_stream =