1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
44 #include <sys/types.h>
51 #include "debug-print.h"
53 /* Variables to sort. */
54 struct variable **v_sort;
57 /* Used when internal-sorting to a separate file. */
58 static struct case_list **separate_case_tab;
60 /* Exported by qsort.c. */
61 void blp_quicksort (void *pbase, size_t total_elems, size_t size,
62 int (*cmp) (const void *, const void *),
65 /* Other prototypes. */
66 static int compare_case_lists (const void *, const void *);
67 static int do_internal_sort (int separate);
68 static int do_external_sort (int separate);
69 int parse_sort_variables (void);
70 void read_sort_output (int (*write_case) (void));
72 /* Performs the SORT CASES procedures. */
76 /* First, just parse the command. */
77 lex_match_id ("SORT");
78 lex_match_id ("CASES");
81 if (!parse_sort_variables ())
86 /* Then it's time to do the actual work. There are two cases:
88 (internal sort) All the data is in memory. In this case, we
89 perform an EXECUTE to get the data into the desired form, then
90 sort the cases in place, if it is still all in memory.
92 (external sort) The data is not in memory. It may be coming from
93 a system file or other data file, etc. In any case, it is now
94 time to perform an external sort. This is better explained in
95 do_external_sort(). */
97 /* Do all this dirty work. */
99 int success = sort_cases (0);
102 return lex_end_of_command ();
108 /* Parses a list of sort variables into v_sort and nv_sort. */
110 parse_sort_variables (void)
116 int prev_nv_sort = nv_sort;
117 int order = SRT_ASCEND;
119 if (!parse_variables (&default_dict, &v_sort, &nv_sort,
120 PV_NO_DUPLICATE | PV_APPEND | PV_NO_SCRATCH))
124 if (lex_match_id ("D") || lex_match_id ("DOWN"))
126 else if (!lex_match_id ("A") && !lex_match_id ("UP"))
129 msg (SE, _("`A' or `D' expected inside parentheses."));
132 if (!lex_match (')'))
135 msg (SE, _("`)' expected."));
139 for (; prev_nv_sort < nv_sort; prev_nv_sort++)
140 v_sort[prev_nv_sort]->p.srt.order = order;
142 while (token != '.' && token != '/');
147 /* Sorts the active file based on the key variables specified in
148 global variables v_sort and nv_sort. The output is either to the
149 active file, if SEPARATE is zero, or to a separate file, if
150 SEPARATE is nonzero. In the latter case the output cases can be
151 read with a call to read_sort_output(). (In the former case the
152 output cases should be dealt with through the usual vfm interface.)
154 The caller is responsible for freeing v_sort[]. */
156 sort_cases (int separate)
158 assert (separate_case_tab == NULL);
160 /* Not sure this is necessary but it's good to be safe. */
161 if (separate && vfm_source == &sort_stream)
162 procedure (NULL, NULL, NULL);
164 /* SORT CASES cancels PROCESS IF. */
165 expr_free (process_if_expr);
166 process_if_expr = NULL;
168 if (do_internal_sort (separate))
172 return do_external_sort (separate);
175 /* If a reasonable situation is set up, do an internal sort of the
176 data. Return success. */
178 do_internal_sort (int separate)
180 if (vfm_source != &vfm_disk_stream)
182 if (vfm_source != &vfm_memory_stream)
183 procedure (NULL, NULL, NULL);
184 if (vfm_source == &vfm_memory_stream)
186 struct case_list **case_tab = malloc (sizeof *case_tab
187 * (vfm_source_info.ncases + 1));
188 if (vfm_source_info.ncases == 0)
193 if (case_tab != NULL)
195 struct case_list *clp = memory_source_cases;
196 struct case_list **ctp = case_tab;
199 for (; clp; clp = clp->next)
201 qsort (case_tab, vfm_source_info.ncases, sizeof *case_tab,
206 memory_source_cases = case_tab[0];
207 for (i = 1; i < vfm_source_info.ncases; i++)
208 case_tab[i - 1]->next = case_tab[i];
209 case_tab[vfm_source_info.ncases - 1]->next = NULL;
212 case_tab[vfm_source_info.ncases] = NULL;
213 separate_case_tab = case_tab;
223 /* Compares the NV_SORT variables in V_SORT[] between the `case_list's
224 at _A and _B, and returns a strcmp()-type result. */
226 compare_case_lists (const void *pa, const void *pb)
228 struct case_list *a = *(struct case_list **) pa;
229 struct case_list *b = *(struct case_list **) pb;
234 for (i = 0; i < nv_sort; i++)
238 if (v->type == NUMERIC)
240 if (approx_ne (a->c.data[v->fv].f, b->c.data[v->fv].f))
242 result = (a->c.data[v->fv].f > b->c.data[v->fv].f) ? 1 : -1;
248 result = memcmp (a->c.data[v->fv].s, b->c.data[v->fv].s, v->width);
254 if (v->p.srt.order == SRT_ASCEND)
258 assert (v->p.srt.order == SRT_DESCEND);
265 /* Maximum number of input + output file handles. */
266 #if defined FOPEN_MAX && (FOPEN_MAX - 5 < 18)
267 #define MAX_FILE_HANDLES (FOPEN_MAX - 5)
269 #define MAX_FILE_HANDLES 18
272 #if MAX_FILE_HANDLES < 3
273 #error At least 3 file handles must be available for sorting.
276 /* Number of input buffers. */
277 #define N_INPUT_BUFFERS (MAX_FILE_HANDLES - 1)
279 /* Maximum order of merge. This is the value suggested by Knuth;
280 specifically, he said to use tree selection, which we don't
281 implement, for larger orders of merge. */
282 #define MAX_MERGE_ORDER 7
284 /* Minimum total number of records for buffers. */
285 #define MIN_BUFFER_TOTAL_SIZE_RECS 64
287 /* Minimum single input or output buffer size, in bytes and records. */
288 #define MIN_BUFFER_SIZE_BYTES 4096
289 #define MIN_BUFFER_SIZE_RECS 16
291 /* Structure for replacement selection tree. */
294 struct repl_sel_tree *loser;/* Loser associated w/this internal node. */
295 int rn; /* Run number of `loser'. */
296 struct repl_sel_tree *fe; /* Internal node above this external node. */
297 struct repl_sel_tree *fi; /* Internal node above this internal node. */
298 union value record[1]; /* The case proper. */
301 /* Static variables used for sorting. */
302 static struct repl_sel_tree **x; /* Buffers. */
303 static int x_max; /* Size of buffers, in records. */
304 static int records_per_buffer; /* Number of records in each buffer. */
306 /* In the merge phase, the first N_INPUT_BUFFERS handle[] elements are
307 input files and the last element is the output file. Before that,
308 they're all used as output files, although the last one is
310 static FILE *handle[MAX_FILE_HANDLES]; /* File handles. */
312 /* Now, MAX_FILE_HANDLES is the maximum number of files we will *try*
313 to open. But if we can't open that many, max_handles will be set
314 to the number we apparently can open. */
315 static int max_handles; /* Maximum number of handles. */
317 /* When we create temporary files, they are all put in the same
318 directory and numbered sequentially from zero. tmp_basename is the
319 drive/directory, etc., and tmp_extname can be sprintf() with "%08x"
320 to the file number, then tmp_basename used to open the file. */
321 static char *tmp_basename; /* Temporary file basename. */
322 static char *tmp_extname; /* Temporary file extension name. */
324 /* We use Huffman's method to determine the merge pattern. This means
325 that we need to know which runs are the shortest at any given time.
326 Priority queues as implemented by heap.c are a natural for this
327 task (probably because I wrote the code specifically for it). */
328 static struct heap *huffman_queue; /* Huffman priority queue. */
330 /* Prototypes for helper functions. */
331 static void sort_stream_write (void);
332 static int write_initial_runs (int separate);
333 static int allocate_cases (void);
334 static int allocate_file_handles (void);
335 static int merge (void);
336 static void rmdir_temp_dir (void);
338 /* Performs an external sort of the active file. A description of the
339 procedure follows. All page references refer to Knuth's _Art of
340 Computer Programming, Vol. 3: Sorting and Searching_, which is the
341 canonical resource for sorting.
343 1. The data is read and S initial runs are formed through the
344 action of algorithm 5.4.1R (replacement selection).
346 2. Huffman's method (p. 365-366) is used to determine the optimum
349 3. If an OS that supports overlapped reading, writing, and
350 computing is being run, we should use 5.4.6F for forecasting.
351 Otherwise, buffers are filled only when they run out of data.
352 FIXME: Since the author of PSPP uses GNU/Linux, which does not
353 yet implement overlapped r/w/c, 5.4.6F is not used.
355 4. We perform P-way merges:
357 (a) The desired P is the smallest P such that ceil(ln(S)/ln(P))
358 is minimized. (FIXME: Since I don't have an algorithm for
359 minimizing this, it's just set to MAX_MERGE_ORDER.)
361 (b) P is reduced if the selected value would make input buffers
362 less than 4096 bytes each, or 16 records, whichever is larger.
364 (c) P is reduced if we run out of available file handles or space
367 (d) P is reduced if we don't have space for one or two output
368 buffers, which have the same minimum size as input buffers. (We
369 need two output buffers if 5.4.6F is in use for forecasting.) */
371 do_external_sort (int separate)
375 assert (MAX_FILE_HANDLES >= 3);
380 huffman_queue = heap_create (512);
381 if (huffman_queue == NULL)
384 if (!allocate_cases ())
387 if (!allocate_file_handles ())
390 if (!write_initial_runs (separate))
397 /* Despite the name, flow of control comes here regardless of
398 whether or not the sort is successful. */
400 heap_destroy (huffman_queue);
406 for (i = 0; i <= x_max; i++)
421 /* Sets up to open temporary files. */
422 /* PORTME: This creates a directory for temporary files. Some OSes
423 might not have that concept... */
425 allocate_file_handles (void)
427 const char *dir; /* Directory prefix. */
428 char *buf; /* String buffer. */
429 char *cp; /* Pointer into buf. */
431 dir = getenv ("SPSSTMPDIR");
433 dir = getenv ("SPSSXTMPDIR");
435 dir = getenv ("TMPDIR");
445 dir = getenv ("TEMP");
447 dir = getenv ("TMP");
454 buf = xmalloc (strlen (dir) + 1 + 4 + 8 + 4 + 1 + INT_DIGITS + 1);
455 cp = spprintf (buf, "%s%c%04lX%04lXpspp", dir, DIR_SEPARATOR,
456 ((long) time (0)) & 0xffff, ((long) getpid ()) & 0xffff);
457 if (-1 == mkdir (buf, S_IRWXU))
460 msg (SE, _("%s: Cannot create temporary directory: %s."),
461 buf, strerror (errno));
464 *cp++ = DIR_SEPARATOR;
469 max_handles = MAX_FILE_HANDLES;
474 /* Removes the directory created for temporary files, if one exists.
475 Also frees tmp_basename. */
477 rmdir_temp_dir (void)
479 if (NULL == tmp_basename)
482 tmp_extname[-1] = '\0';
483 if (rmdir (tmp_basename) == -1)
484 msg (SE, _("%s: Error removing directory for temporary files: %s."),
485 tmp_basename, strerror (errno));
490 /* Allocates room for lots of cases as a buffer. */
492 allocate_cases (void)
494 /* This is the size of one case. */
495 const int case_size = (sizeof (struct repl_sel_tree)
496 + sizeof (union value) * (default_dict.nval - 1)
497 + sizeof (struct repl_sel_tree *));
501 /* Allocate as many cases as we can, assuming a space of four
502 void pointers for malloc()'s internal bookkeeping. */
503 x_max = MAX_WORKSPACE / (case_size + 4 * sizeof (void *));
504 x = malloc (sizeof (struct repl_sel_tree *) * x_max);
509 for (i = 0; i < x_max; i++)
511 x[i] = malloc (sizeof (struct repl_sel_tree)
512 + sizeof (union value) * (default_dict.nval - 1));
518 if (x == NULL || x_max < MIN_BUFFER_TOTAL_SIZE_RECS)
524 for (i = 0; i < x_max; i++)
528 msg (SE, _("Out of memory. Could not allocate room for minimum of %d "
529 "cases of %d bytes each. (PSPP workspace is currently "
530 "restricted to a maximum of %d KB.)"),
531 MIN_BUFFER_TOTAL_SIZE_RECS, case_size, MAX_WORKSPACE / 1024);
537 /* The last element of the array is used to store lastkey. */
540 debug_printf ((_("allocated %d cases == %d bytes\n"),
541 x_max, x_max * case_size));
545 /* Replacement selection. */
547 static int rmax, rc, rq;
548 static struct repl_sel_tree *q;
549 static union value *lastkey;
550 static int run_no, file_index;
551 static int deferred_abort;
552 static int run_length;
554 static int compare_record (union value *, union value *);
557 output_record (union value *v)
559 union value *src_case;
564 if (compaction_necessary)
566 compact_case (compaction_case, (struct ccase *) v);
567 src_case = (union value *) compaction_case;
570 src_case = (union value *) v;
572 if ((int) fwrite (src_case, sizeof *src_case, compaction_nval,
577 sprintf (tmp_extname, "%08x", run_no);
578 msg (SE, _("%s: Error writing temporary file: %s."),
579 tmp_basename, strerror (errno));
589 int result = fclose (handle[i]);
590 msg (VM (2), _("SORT: Closing handle %d."), i);
595 sprintf (tmp_extname, "%08x", i);
596 msg (SE, _("%s: Error closing temporary file: %s."),
597 tmp_basename, strerror (errno));
604 close_handles (int beg, int end)
609 for (i = beg; i < end; i++)
610 success &= close_handle (i);
615 open_handle_w (int handle_no, int run_no)
617 sprintf (tmp_extname, "%08x", run_no);
618 msg (VM (1), _("SORT: %s: Opening for writing as run %d."),
619 tmp_basename, run_no);
621 /* The `x' modifier causes the GNU C library to insist on creating a
622 new file--if the file already exists, an error is signaled. The
623 ANSI C standard says that other libraries should ignore anything
624 after the `w+b', so it shouldn't be a problem. */
625 return NULL != (handle[handle_no] = fopen (tmp_basename, "w+bx"));
629 open_handle_r (int handle_no, int run_no)
633 sprintf (tmp_extname, "%08x", run_no);
634 msg (VM (1), _("SORT: %s: Opening for writing as run %d."),
635 tmp_basename, run_no);
636 f = handle[handle_no] = fopen (tmp_basename, "rb");
640 msg (SE, _("%s: Error opening temporary file for reading: %s."),
641 tmp_basename, strerror (errno));
648 /* Begins a new initial run, specifically its output file. */
652 /* Decide which handle[] to use. If run_no is max_handles or
653 greater, then we've run out of handles so it's time to just do
654 one file at a time, which by default is handle 0. */
655 file_index = (run_no < max_handles ? run_no : 0);
658 /* Alright, now create the temporary file. */
659 if (open_handle_w (file_index, run_no) == 0)
661 /* Failure to create the temporary file. Check if there are
662 unacceptably few files already open. */
666 msg (SE, _("%s: Error creating temporary file: %s."),
667 tmp_basename, strerror (errno));
671 /* Close all the open temporary files. */
672 if (!close_handles (0, file_index))
675 /* Now try again to create the temporary file. */
676 max_handles = file_index;
678 if (open_handle_w (0, run_no) == 0)
680 /* It still failed, report it this time. */
682 msg (SE, _("%s: Error creating temporary file: %s."),
683 tmp_basename, strerror (errno));
689 /* Ends the current initial run. Just increments run_no if no initial
690 run has been started yet. */
694 /* Close file handles if necessary. */
698 if (run_no == max_handles - 1)
699 result = close_handles (0, max_handles);
700 else if (run_no >= max_handles)
701 result = close_handle (0);
708 /* Advance to next run. */
711 heap_insert (huffman_queue, run_no - 1, run_length);
714 /* Performs 5.4.1R. */
716 write_initial_runs (int separate)
721 /* Steps R1, R2, R3. */
730 for (j = 0; j < x_max; j++)
732 struct repl_sel_tree *J = x[j];
736 J->fe = x[(x_max + j) / 2];
738 memset (J->record, 0, default_dict.nval * sizeof (union value));
742 /* Most of the iterations of steps R4, R5, R6, R7, R2, R3, ... */
746 vfm_sink->destroy_sink ();
747 vfm_sink = &sort_stream;
749 procedure (NULL, NULL, NULL);
751 /* Final iterations of steps R4, R5, R6, R7, R2, R3, ... */
754 struct repl_sel_tree *t;
768 && compare_record (t->loser->record, q->record) < 0))
770 struct repl_sel_tree *temp_tree;
773 temp_tree = t->loser;
801 output_record (q->record);
802 lastkey = x[x_max]->record;
803 memcpy (lastkey, q->record, sizeof (union value) * vfm_sink_info.nval);
806 assert (run_no == rmax);
808 /* If an unrecoverable error occurred somewhere in the above code,
809 then the `deferred_abort' flag would have been set. */
814 for (i = 0; i < max_handles; i++)
815 if (handle[i] != NULL)
817 sprintf (tmp_extname, "%08x", i);
819 if (fclose (handle[i]) == EOF)
820 msg (SE, _("%s: Error closing temporary file: %s."),
821 tmp_basename, strerror (errno));
823 if (remove (tmp_basename) != 0)
824 msg (SE, _("%s: Error removing temporary file: %s."),
825 tmp_basename, strerror (errno));
835 /* Compares the NV_SORT variables in V_SORT[] between the `value's at
836 A and B, and returns a strcmp()-type result. */
838 compare_record (union value * a, union value * b)
845 if (b == NULL) /* Sort NULLs after everything else. */
848 for (i = 0; i < nv_sort; i++)
852 if (v->type == NUMERIC)
854 if (approx_ne (a[v->fv].f, b[v->fv].f))
856 result = (a[v->fv].f > b[v->fv].f) ? 1 : -1;
862 result = memcmp (a[v->fv].s, b[v->fv].s, v->width);
868 if (v->p.srt.order == SRT_ASCEND)
872 assert (v->p.srt.order == SRT_DESCEND);
879 static int merge_once (int run_index[], int run_length[], int n_runs);
881 /* Modula function as defined by Knuth. */
889 result = abs (x) % abs (y);
895 /* Performs a series of P-way merges of initial runs using Huffman's
900 /* Order of merge. */
904 assert (MIN_BUFFER_SIZE_RECS * 2 <= MIN_BUFFER_TOTAL_SIZE_RECS - 1);
906 /* Close all the input files. I hope that the boundary conditions
907 are correct on this but I'm not sure. */
908 if (run_no < max_handles)
912 for (i = 0; i < run_no; )
913 if (!close_handle (i++))
915 for (; i < run_no; i++)
921 /* Determine order of merge. */
922 order = MAX_MERGE_ORDER;
923 if (x_max / order < MIN_BUFFER_SIZE_RECS)
924 order = x_max / MIN_BUFFER_SIZE_RECS;
925 else if (x_max / order * sizeof (union value) * default_dict.nval
926 < MIN_BUFFER_SIZE_BYTES)
927 order = x_max / (MIN_BUFFER_SIZE_BYTES
928 / (sizeof (union value) * (default_dict.nval - 1)));
930 /* Make sure the order of merge is bounded. */
935 assert (x_max / order > 0);
937 /* Calculate number of records per buffer. */
938 records_per_buffer = x_max / order;
940 /* Add (1 - S) mod (P - 1) dummy runs of length 0. */
942 int n_dummy_runs = mod (1 - rmax, order - 1);
943 debug_printf (("rmax=%d, order=%d, n_dummy_runs=%d\n",
944 rmax, order, n_dummy_runs));
945 assert (n_dummy_runs >= 0);
946 while (n_dummy_runs--)
948 heap_insert (huffman_queue, -2, 0);
953 /* Repeatedly merge the P shortest existing runs until only one run
957 int run_index[MAX_MERGE_ORDER];
958 int run_length[MAX_MERGE_ORDER];
959 int total_run_length = 0;
962 assert (rmax >= order);
964 /* Find the shortest runs; put them in runs[] in reverse order
965 of length, to force dummy runs of length 0 to the end of the
967 debug_printf ((_("merging runs")));
968 for (i = order - 1; i >= 0; i--)
970 run_index[i] = heap_delete (huffman_queue, &run_length[i]);
971 assert (run_index[i] != -1);
972 total_run_length += run_length[i];
973 debug_printf ((" %d(%d)", run_index[i], run_length[i]));
975 debug_printf ((_(" into run %d(%d)\n"), run_no, total_run_length));
977 if (!merge_once (run_index, run_length, order))
981 while (-1 != (index = heap_delete (huffman_queue, NULL)))
983 sprintf (tmp_extname, "%08x", index);
984 if (remove (tmp_basename) != 0)
985 msg (SE, _("%s: Error removing temporary file: %s."),
986 tmp_basename, strerror (errno));
992 if (!heap_insert (huffman_queue, run_no++, total_run_length))
994 msg (SE, _("Out of memory expanding Huffman priority queue."));
1001 /* There should be exactly one element in the priority queue after
1002 all that merging. This represents the entire sorted active file.
1003 So we could find a total case count by deleting this element from
1005 assert (heap_size (huffman_queue) == 1);
1010 /* Merges N_RUNS initial runs into a new run. The jth run for 0 <= j
1011 < N_RUNS is taken from temporary file RUN_INDEX[j]; it is composed
1012 of RUN_LENGTH[j] cases. */
1014 merge_once (int run_index[], int run_length[], int n_runs)
1016 /* For each run, the number of records remaining in its buffer. */
1017 int buffered[MAX_MERGE_ORDER];
1019 /* For each run, the index of the next record in the buffer. */
1020 int buffer_ptr[MAX_MERGE_ORDER];
1022 /* Open input files. */
1026 for (i = 0; i < n_runs; i++)
1027 if (run_index[i] != -2 && !open_handle_r (i, run_index[i]))
1029 /* Close and remove temporary files. */
1033 sprintf (tmp_extname, "%08x", i);
1034 if (remove (tmp_basename) != 0)
1035 msg (SE, _("%s: Error removing temporary file: %s."),
1036 tmp_basename, strerror (errno));
1043 /* Create output file. */
1044 if (!open_handle_w (N_INPUT_BUFFERS, run_no))
1046 msg (SE, _("%s: Error creating temporary file for merge: %s."),
1047 tmp_basename, strerror (errno));
1051 /* Prime each buffer. */
1055 for (i = 0; i < n_runs; i++)
1056 if (run_index[i] == -2)
1064 int ofs = records_per_buffer * i;
1066 buffered[i] = min (records_per_buffer, run_length[i]);
1067 for (j = 0; j < buffered[i]; j++)
1068 if ((int) fread (x[j + ofs]->record, sizeof (union value),
1069 default_dict.nval, handle[i])
1070 != default_dict.nval)
1072 sprintf (tmp_extname, "%08x", run_index[i]);
1073 if (ferror (handle[i]))
1074 msg (SE, _("%s: Error reading temporary file in merge: %s."),
1075 tmp_basename, strerror (errno));
1077 msg (SE, _("%s: Unexpected end of temporary file in merge."),
1081 buffer_ptr[i] = ofs;
1082 run_length[i] -= buffered[i];
1086 /* Perform the merge proper. */
1087 while (n_runs) /* Loop while some data is left. */
1092 for (i = 1; i < n_runs; i++)
1093 if (compare_record (x[buffer_ptr[min]]->record,
1094 x[buffer_ptr[i]]->record) > 0)
1097 if ((int) fwrite (x[buffer_ptr[min]]->record, sizeof (union value),
1098 default_dict.nval, handle[N_INPUT_BUFFERS])
1099 != default_dict.nval)
1101 sprintf (tmp_extname, "%08x", run_index[i]);
1102 msg (SE, _("%s: Error writing temporary file in "
1103 "merge: %s."), tmp_basename, strerror (errno));
1107 /* Remove one case from the buffer for this input file. */
1108 if (--buffered[min] == 0)
1110 /* The input buffer is empty. Do any cases remain in the
1111 initial run on disk? */
1112 if (run_length[min])
1114 /* Yes. Read them in. */
1119 /* Reset the buffer pointer. Note that we can't simply
1120 set it to (i * records_per_buffer) since the run
1121 order might have changed. */
1122 ofs = buffer_ptr[min] -= buffer_ptr[min] % records_per_buffer;
1124 buffered[min] = min (records_per_buffer, run_length[min]);
1125 for (j = 0; j < buffered[min]; j++)
1126 if ((int) fread (x[j + ofs]->record, sizeof (union value),
1127 default_dict.nval, handle[min])
1128 != default_dict.nval)
1130 sprintf (tmp_extname, "%08x", run_index[min]);
1131 if (ferror (handle[min]))
1132 msg (SE, _("%s: Error reading temporary file in "
1134 tmp_basename, strerror (errno));
1136 msg (SE, _("%s: Unexpected end of temporary file "
1141 run_length[min] -= buffered[min];
1145 /* No. Delete this run. */
1147 /* Close the file. */
1148 FILE *f = handle[min];
1150 sprintf (tmp_extname, "%08x", run_index[min]);
1151 if (fclose (f) == EOF)
1152 msg (SE, _("%s: Error closing temporary file in merge: "
1153 "%s."), tmp_basename, strerror (errno));
1155 /* Delete the file. */
1156 if (remove (tmp_basename) != 0)
1157 msg (SE, _("%s: Error removing temporary file in merge: "
1158 "%s."), tmp_basename, strerror (errno));
1163 /* Since this isn't the last run, we move the last
1164 run into its spot to force all the runs to be
1166 run_index[min] = run_index[n_runs];
1167 run_length[min] = run_length[n_runs];
1168 buffer_ptr[min] = buffer_ptr[n_runs];
1169 buffered[min] = buffered[n_runs];
1170 handle[min] = handle[n_runs];
1178 /* Close output file. */
1180 FILE *f = handle[N_INPUT_BUFFERS];
1181 handle[N_INPUT_BUFFERS] = NULL;
1182 if (fclose (f) == EOF)
1184 sprintf (tmp_extname, "%08x", run_no);
1185 msg (SE, _("%s: Error closing temporary file in merge: "
1187 tmp_basename, strerror (errno));
1195 /* Close all the input and output files. */
1199 for (i = 0; i < n_runs; i++)
1200 if (run_length[i] != 0)
1203 sprintf (tmp_basename, "%08x", run_index[i]);
1204 if (remove (tmp_basename) != 0)
1205 msg (SE, _("%s: Error removing temporary file: %s."),
1206 tmp_basename, strerror (errno));
1209 close_handle (N_INPUT_BUFFERS);
1210 sprintf (tmp_basename, "%08x", run_no);
1211 if (remove (tmp_basename) != 0)
1212 msg (SE, _("%s: Error removing temporary file: %s."),
1213 tmp_basename, strerror (errno));
1217 /* External sort input program. */
1219 /* Reads all the records from the source stream and passes them
1222 sort_stream_read (void)
1224 read_sort_output (write_case);
1227 /* Reads all the records from the output stream and passes them to the
1228 function provided, which must have an interface identical to
1231 read_sort_output (int (*write_case) (void))
1236 if (separate_case_tab)
1238 struct ccase *save_temp_case = temp_case;
1239 struct case_list **p;
1241 for (p = separate_case_tab; *p; p++)
1243 temp_case = &(*p)->c;
1247 free (separate_case_tab);
1248 separate_case_tab = NULL;
1250 temp_case = save_temp_case;
1252 sprintf (tmp_extname, "%08x", run_no - 1);
1253 f = fopen (tmp_basename, "rb");
1256 msg (ME, _("%s: Cannot open sort result file: %s."), tmp_basename,
1262 for (i = 0; i < vfm_source_info.ncases; i++)
1264 if (!fread (temp_case, vfm_source_info.case_size, 1, f))
1267 msg (ME, _("%s: Error reading sort result file: %s."),
1268 tmp_basename, strerror (errno));
1270 msg (ME, _("%s: Unexpected end of sort result file: %s."),
1271 tmp_basename, strerror (errno));
1280 if (fclose (f) == EOF)
1281 msg (ME, _("%s: Error closing sort result file: %s."), tmp_basename,
1284 if (remove (tmp_basename) != 0)
1285 msg (ME, _("%s: Error removing sort result file: %s."), tmp_basename,
1292 #if 0 /* dead code */
1293 /* Alternate interface to sort_stream_write used for external sorting
1294 when SEPARATE is true. */
1296 write_separate (struct ccase *c)
1298 assert (c == temp_case);
1300 sort_stream_write ();
1305 /* Performs one iteration of 5.4.1R steps R4, R5, R6, R7, R2, and
1308 sort_stream_write (void)
1310 struct repl_sel_tree *t;
1313 memcpy (q->record, temp_case->data, vfm_sink_info.case_size);
1314 if (compare_record (q->record, lastkey) < 0)
1326 || (t->rn == rq && compare_record (t->loser->record, q->record) < 0))
1328 struct repl_sel_tree *temp_tree;
1331 temp_tree = t->loser;
1351 assert (rq <= rmax);
1358 output_record (q->record);
1359 lastkey = x[x_max]->record;
1360 memcpy (lastkey, q->record, vfm_sink_info.case_size);
1364 /* Switches mode from sink to source. */
1366 sort_stream_mode (void)
1368 /* If this is not done, then we get the following source/sink pairs:
1369 source=memory/disk/DATA LIST/etc., sink=SORT; source=SORT,
1370 sink=SORT; which is not good. */
1374 struct case_stream sort_stream =