1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
44 #include <sys/types.h>
51 #include "debug-print.h"
53 /* Variables to sort. */
54 struct variable **v_sort;
57 /* Used when internal-sorting to a separate file. */
58 static struct case_list **separate_case_tab;
60 /* Other prototypes. */
61 static int compare_case_lists (const void *, const void *);
62 static int do_internal_sort (int separate);
63 static int do_external_sort (int separate);
64 int parse_sort_variables (void);
65 void read_sort_output (int (*write_case) (void));
67 /* Performs the SORT CASES procedures. */
71 /* First, just parse the command. */
72 lex_match_id ("SORT");
73 lex_match_id ("CASES");
76 if (!parse_sort_variables ())
81 /* Then it's time to do the actual work. There are two cases:
83 (internal sort) All the data is in memory. In this case, we
84 perform an EXECUTE to get the data into the desired form, then
85 sort the cases in place, if it is still all in memory.
87 (external sort) The data is not in memory. It may be coming from
88 a system file or other data file, etc. In any case, it is now
89 time to perform an external sort. This is better explained in
90 do_external_sort(). */
92 /* Do all this dirty work. */
94 int success = sort_cases (0);
97 return lex_end_of_command ();
103 /* Parses a list of sort variables into v_sort and nv_sort. */
105 parse_sort_variables (void)
111 int prev_nv_sort = nv_sort;
112 int order = SRT_ASCEND;
114 if (!parse_variables (default_dict, &v_sort, &nv_sort,
115 PV_NO_DUPLICATE | PV_APPEND | PV_NO_SCRATCH))
119 if (lex_match_id ("D") || lex_match_id ("DOWN"))
121 else if (!lex_match_id ("A") && !lex_match_id ("UP"))
124 msg (SE, _("`A' or `D' expected inside parentheses."));
127 if (!lex_match (')'))
130 msg (SE, _("`)' expected."));
134 for (; prev_nv_sort < nv_sort; prev_nv_sort++)
135 v_sort[prev_nv_sort]->p.srt.order = order;
137 while (token != '.' && token != '/');
142 /* Sorts the active file based on the key variables specified in
143 global variables v_sort and nv_sort. The output is either to the
144 active file, if SEPARATE is zero, or to a separate file, if
145 SEPARATE is nonzero. In the latter case the output cases can be
146 read with a call to read_sort_output(). (In the former case the
147 output cases should be dealt with through the usual vfm interface.)
149 The caller is responsible for freeing v_sort[]. */
151 sort_cases (int separate)
153 assert (separate_case_tab == NULL);
155 /* Not sure this is necessary but it's good to be safe. */
156 if (separate && vfm_source == &sort_stream)
157 procedure (NULL, NULL, NULL);
159 /* SORT CASES cancels PROCESS IF. */
160 expr_free (process_if_expr);
161 process_if_expr = NULL;
163 if (do_internal_sort (separate))
167 return do_external_sort (separate);
170 /* If a reasonable situation is set up, do an internal sort of the
171 data. Return success. */
173 do_internal_sort (int separate)
175 if (vfm_source != &vfm_disk_stream)
177 if (vfm_source != &vfm_memory_stream)
178 procedure (NULL, NULL, NULL);
179 if (vfm_source == &vfm_memory_stream)
181 struct case_list **case_tab = malloc (sizeof *case_tab
182 * (vfm_source_info.ncases + 1));
183 if (vfm_source_info.ncases == 0)
188 if (case_tab != NULL)
190 struct case_list *clp = memory_source_cases;
191 struct case_list **ctp = case_tab;
194 for (; clp; clp = clp->next)
196 qsort (case_tab, vfm_source_info.ncases, sizeof *case_tab,
201 memory_source_cases = case_tab[0];
202 for (i = 1; i < vfm_source_info.ncases; i++)
203 case_tab[i - 1]->next = case_tab[i];
204 case_tab[vfm_source_info.ncases - 1]->next = NULL;
207 case_tab[vfm_source_info.ncases] = NULL;
208 separate_case_tab = case_tab;
218 /* Compares the NV_SORT variables in V_SORT[] between the
219 `case_list's at A and B, and returns a strcmp()-type
222 compare_case_lists (const void *a_, const void *b_)
224 struct case_list *const *pa = a_;
225 struct case_list *const *pb = b_;
226 struct case_list *a = *pa;
227 struct case_list *b = *pb;
232 for (i = 0; i < nv_sort; i++)
236 if (v->type == NUMERIC)
238 double af = a->c.data[v->fv].f;
239 double bf = b->c.data[v->fv].f;
241 result = af < bf ? -1 : af > bf;
244 result = memcmp (a->c.data[v->fv].s, b->c.data[v->fv].s, v->width);
250 if (v->p.srt.order == SRT_DESCEND)
257 /* Maximum number of input + output file handles. */
258 #if defined FOPEN_MAX && (FOPEN_MAX - 5 < 18)
259 #define MAX_FILE_HANDLES (FOPEN_MAX - 5)
261 #define MAX_FILE_HANDLES 18
264 #if MAX_FILE_HANDLES < 3
265 #error At least 3 file handles must be available for sorting.
268 /* Number of input buffers. */
269 #define N_INPUT_BUFFERS (MAX_FILE_HANDLES - 1)
271 /* Maximum order of merge. This is the value suggested by Knuth;
272 specifically, he said to use tree selection, which we don't
273 implement, for larger orders of merge. */
274 #define MAX_MERGE_ORDER 7
276 /* Minimum total number of records for buffers. */
277 #define MIN_BUFFER_TOTAL_SIZE_RECS 64
279 /* Minimum single input or output buffer size, in bytes and records. */
280 #define MIN_BUFFER_SIZE_BYTES 4096
281 #define MIN_BUFFER_SIZE_RECS 16
283 /* Structure for replacement selection tree. */
286 struct repl_sel_tree *loser;/* Loser associated w/this internal node. */
287 int rn; /* Run number of `loser'. */
288 struct repl_sel_tree *fe; /* Internal node above this external node. */
289 struct repl_sel_tree *fi; /* Internal node above this internal node. */
290 union value record[1]; /* The case proper. */
293 /* Static variables used for sorting. */
294 static struct repl_sel_tree **x; /* Buffers. */
295 static int x_max; /* Size of buffers, in records. */
296 static int records_per_buffer; /* Number of records in each buffer. */
298 /* In the merge phase, the first N_INPUT_BUFFERS handle[] elements are
299 input files and the last element is the output file. Before that,
300 they're all used as output files, although the last one is
302 static FILE *handle[MAX_FILE_HANDLES]; /* File handles. */
304 /* Now, MAX_FILE_HANDLES is the maximum number of files we will *try*
305 to open. But if we can't open that many, max_handles will be set
306 to the number we apparently can open. */
307 static int max_handles; /* Maximum number of handles. */
309 /* When we create temporary files, they are all put in the same
310 directory and numbered sequentially from zero. tmp_basename is the
311 drive/directory, etc., and tmp_extname can be sprintf() with "%08x"
312 to the file number, then tmp_basename used to open the file. */
313 static char *tmp_basename; /* Temporary file basename. */
314 static char *tmp_extname; /* Temporary file extension name. */
316 /* We use Huffman's method to determine the merge pattern. This means
317 that we need to know which runs are the shortest at any given time.
318 Priority queues as implemented by heap.c are a natural for this
319 task (probably because I wrote the code specifically for it). */
320 static struct heap *huffman_queue; /* Huffman priority queue. */
322 /* Prototypes for helper functions. */
323 static void sort_stream_write (void);
324 static int write_initial_runs (int separate);
325 static int allocate_cases (void);
326 static int allocate_file_handles (void);
327 static int merge (void);
328 static void rmdir_temp_dir (void);
330 /* Performs an external sort of the active file. A description of the
331 procedure follows. All page references refer to Knuth's _Art of
332 Computer Programming, Vol. 3: Sorting and Searching_, which is the
333 canonical resource for sorting.
335 1. The data is read and S initial runs are formed through the
336 action of algorithm 5.4.1R (replacement selection).
338 2. Huffman's method (p. 365-366) is used to determine the optimum
341 3. If an OS that supports overlapped reading, writing, and
342 computing is being run, we should use 5.4.6F for forecasting.
343 Otherwise, buffers are filled only when they run out of data.
344 FIXME: Since the author of PSPP uses GNU/Linux, which does not
345 yet implement overlapped r/w/c, 5.4.6F is not used.
347 4. We perform P-way merges:
349 (a) The desired P is the smallest P such that ceil(ln(S)/ln(P))
350 is minimized. (FIXME: Since I don't have an algorithm for
351 minimizing this, it's just set to MAX_MERGE_ORDER.)
353 (b) P is reduced if the selected value would make input buffers
354 less than 4096 bytes each, or 16 records, whichever is larger.
356 (c) P is reduced if we run out of available file handles or space
359 (d) P is reduced if we don't have space for one or two output
360 buffers, which have the same minimum size as input buffers. (We
361 need two output buffers if 5.4.6F is in use for forecasting.) */
363 do_external_sort (int separate)
367 assert (MAX_FILE_HANDLES >= 3);
372 huffman_queue = heap_create (512);
373 if (huffman_queue == NULL)
376 if (!allocate_cases ())
379 if (!allocate_file_handles ())
382 if (!write_initial_runs (separate))
389 /* Despite the name, flow of control comes here regardless of
390 whether or not the sort is successful. */
392 heap_destroy (huffman_queue);
398 for (i = 0; i <= x_max; i++)
413 /* Sets up to open temporary files. */
414 /* PORTME: This creates a directory for temporary files. Some OSes
415 might not have that concept... */
417 allocate_file_handles (void)
419 const char *dir; /* Directory prefix. */
420 char *buf; /* String buffer. */
421 char *cp; /* Pointer into buf. */
423 dir = getenv ("SPSSTMPDIR");
425 dir = getenv ("SPSSXTMPDIR");
427 dir = getenv ("TMPDIR");
437 dir = getenv ("TEMP");
439 dir = getenv ("TMP");
446 buf = xmalloc (strlen (dir) + 1 + 4 + 8 + 4 + 1 + INT_DIGITS + 1);
447 cp = spprintf (buf, "%s%c%04lX%04lXpspp", dir, DIR_SEPARATOR,
448 ((long) time (0)) & 0xffff, ((long) getpid ()) & 0xffff);
449 if (-1 == mkdir (buf, S_IRWXU))
452 msg (SE, _("%s: Cannot create temporary directory: %s."),
453 buf, strerror (errno));
456 *cp++ = DIR_SEPARATOR;
461 max_handles = MAX_FILE_HANDLES;
466 /* Removes the directory created for temporary files, if one exists.
467 Also frees tmp_basename. */
469 rmdir_temp_dir (void)
471 if (NULL == tmp_basename)
474 tmp_extname[-1] = '\0';
475 if (rmdir (tmp_basename) == -1)
476 msg (SE, _("%s: Error removing directory for temporary files: %s."),
477 tmp_basename, strerror (errno));
482 /* Allocates room for lots of cases as a buffer. */
484 allocate_cases (void)
486 /* This is the size of one case. */
487 const int case_size = (sizeof (struct repl_sel_tree)
488 + (sizeof (union value)
489 * (dict_get_value_cnt (default_dict) - 1))
490 + sizeof (struct repl_sel_tree *));
494 /* Allocate as many cases as we can, assuming a space of four
495 void pointers for malloc()'s internal bookkeeping. */
496 x_max = MAX_WORKSPACE / (case_size + 4 * sizeof (void *));
497 x = malloc (sizeof (struct repl_sel_tree *) * x_max);
502 for (i = 0; i < x_max; i++)
504 x[i] = malloc (sizeof (struct repl_sel_tree)
505 + (sizeof (union value)
506 * (dict_get_value_cnt (default_dict) - 1)));
512 if (x == NULL || x_max < MIN_BUFFER_TOTAL_SIZE_RECS)
518 for (i = 0; i < x_max; i++)
522 msg (SE, _("Out of memory. Could not allocate room for minimum of %d "
523 "cases of %d bytes each. (PSPP workspace is currently "
524 "restricted to a maximum of %d KB.)"),
525 MIN_BUFFER_TOTAL_SIZE_RECS, case_size, MAX_WORKSPACE / 1024);
531 /* The last element of the array is used to store lastkey. */
534 debug_printf ((_("allocated %d cases == %d bytes\n"),
535 x_max, x_max * case_size));
539 /* Replacement selection. */
541 static int rmax, rc, rq;
542 static struct repl_sel_tree *q;
543 static union value *lastkey;
544 static int run_no, file_index;
545 static int deferred_abort;
546 static int run_length;
548 static int compare_record (union value *, union value *);
551 output_record (union value *v)
553 union value *src_case;
558 if (compaction_necessary)
560 compact_case (compaction_case, (struct ccase *) v);
561 src_case = (union value *) compaction_case;
564 src_case = (union value *) v;
566 if ((int) fwrite (src_case, sizeof *src_case, compaction_nval,
571 sprintf (tmp_extname, "%08x", run_no);
572 msg (SE, _("%s: Error writing temporary file: %s."),
573 tmp_basename, strerror (errno));
583 int result = fclose (handle[i]);
584 msg (VM (2), _("SORT: Closing handle %d."), i);
589 sprintf (tmp_extname, "%08x", i);
590 msg (SE, _("%s: Error closing temporary file: %s."),
591 tmp_basename, strerror (errno));
598 close_handles (int beg, int end)
603 for (i = beg; i < end; i++)
604 success &= close_handle (i);
609 open_handle_w (int handle_no, int run_no)
611 sprintf (tmp_extname, "%08x", run_no);
612 msg (VM (1), _("SORT: %s: Opening for writing as run %d."),
613 tmp_basename, run_no);
615 /* The `x' modifier causes the GNU C library to insist on creating a
616 new file--if the file already exists, an error is signaled. The
617 ANSI C standard says that other libraries should ignore anything
618 after the `w+b', so it shouldn't be a problem. */
619 return NULL != (handle[handle_no] = fopen (tmp_basename, "w+bx"));
623 open_handle_r (int handle_no, int run_no)
627 sprintf (tmp_extname, "%08x", run_no);
628 msg (VM (1), _("SORT: %s: Opening for writing as run %d."),
629 tmp_basename, run_no);
630 f = handle[handle_no] = fopen (tmp_basename, "rb");
634 msg (SE, _("%s: Error opening temporary file for reading: %s."),
635 tmp_basename, strerror (errno));
642 /* Begins a new initial run, specifically its output file. */
646 /* Decide which handle[] to use. If run_no is max_handles or
647 greater, then we've run out of handles so it's time to just do
648 one file at a time, which by default is handle 0. */
649 file_index = (run_no < max_handles ? run_no : 0);
652 /* Alright, now create the temporary file. */
653 if (open_handle_w (file_index, run_no) == 0)
655 /* Failure to create the temporary file. Check if there are
656 unacceptably few files already open. */
660 msg (SE, _("%s: Error creating temporary file: %s."),
661 tmp_basename, strerror (errno));
665 /* Close all the open temporary files. */
666 if (!close_handles (0, file_index))
669 /* Now try again to create the temporary file. */
670 max_handles = file_index;
672 if (open_handle_w (0, run_no) == 0)
674 /* It still failed, report it this time. */
676 msg (SE, _("%s: Error creating temporary file: %s."),
677 tmp_basename, strerror (errno));
683 /* Ends the current initial run. Just increments run_no if no initial
684 run has been started yet. */
688 /* Close file handles if necessary. */
692 if (run_no == max_handles - 1)
693 result = close_handles (0, max_handles);
694 else if (run_no >= max_handles)
695 result = close_handle (0);
702 /* Advance to next run. */
705 heap_insert (huffman_queue, run_no - 1, run_length);
708 /* Performs 5.4.1R. */
710 write_initial_runs (int separate)
715 /* Steps R1, R2, R3. */
724 for (j = 0; j < x_max; j++)
726 struct repl_sel_tree *J = x[j];
730 J->fe = x[(x_max + j) / 2];
732 memset (J->record, 0,
733 dict_get_value_cnt (default_dict) * sizeof (union value));
737 /* Most of the iterations of steps R4, R5, R6, R7, R2, R3, ... */
741 vfm_sink->destroy_sink ();
742 vfm_sink = &sort_stream;
744 procedure (NULL, NULL, NULL);
746 /* Final iterations of steps R4, R5, R6, R7, R2, R3, ... */
749 struct repl_sel_tree *t;
763 && compare_record (t->loser->record, q->record) < 0))
765 struct repl_sel_tree *temp_tree;
768 temp_tree = t->loser;
796 output_record (q->record);
797 lastkey = x[x_max]->record;
798 memcpy (lastkey, q->record, sizeof (union value) * vfm_sink_info.nval);
801 assert (run_no == rmax);
803 /* If an unrecoverable error occurred somewhere in the above code,
804 then the `deferred_abort' flag would have been set. */
809 for (i = 0; i < max_handles; i++)
810 if (handle[i] != NULL)
812 sprintf (tmp_extname, "%08x", i);
814 if (fclose (handle[i]) == EOF)
815 msg (SE, _("%s: Error closing temporary file: %s."),
816 tmp_basename, strerror (errno));
818 if (remove (tmp_basename) != 0)
819 msg (SE, _("%s: Error removing temporary file: %s."),
820 tmp_basename, strerror (errno));
830 /* Compares the NV_SORT variables in V_SORT[] between the `value's at
831 A and B, and returns a strcmp()-type result. */
833 compare_record (union value * a, union value * b)
840 if (b == NULL) /* Sort NULLs after everything else. */
843 for (i = 0; i < nv_sort; i++)
847 if (v->type == NUMERIC)
849 if (approx_ne (a[v->fv].f, b[v->fv].f))
851 result = (a[v->fv].f > b[v->fv].f) ? 1 : -1;
857 result = memcmp (a[v->fv].s, b[v->fv].s, v->width);
863 if (v->p.srt.order == SRT_ASCEND)
867 assert (v->p.srt.order == SRT_DESCEND);
874 static int merge_once (int run_index[], int run_length[], int n_runs);
876 /* Modula function as defined by Knuth. */
884 result = abs (x) % abs (y);
890 /* Performs a series of P-way merges of initial runs using Huffman's
895 /* Order of merge. */
899 assert (MIN_BUFFER_SIZE_RECS * 2 <= MIN_BUFFER_TOTAL_SIZE_RECS - 1);
901 /* Close all the input files. I hope that the boundary conditions
902 are correct on this but I'm not sure. */
903 if (run_no < max_handles)
907 for (i = 0; i < run_no; )
908 if (!close_handle (i++))
910 for (; i < run_no; i++)
916 /* Determine order of merge. */
917 order = MAX_MERGE_ORDER;
918 if (x_max / order < MIN_BUFFER_SIZE_RECS)
919 order = x_max / MIN_BUFFER_SIZE_RECS;
920 else if (x_max / order * sizeof (union value) * dict_get_value_cnt (default_dict)
921 < MIN_BUFFER_SIZE_BYTES)
922 order = x_max / (MIN_BUFFER_SIZE_BYTES
923 / (sizeof (union value)
924 * (dict_get_value_cnt (default_dict) - 1)));
926 /* Make sure the order of merge is bounded. */
931 assert (x_max / order > 0);
933 /* Calculate number of records per buffer. */
934 records_per_buffer = x_max / order;
936 /* Add (1 - S) mod (P - 1) dummy runs of length 0. */
938 int n_dummy_runs = mod (1 - rmax, order - 1);
939 debug_printf (("rmax=%d, order=%d, n_dummy_runs=%d\n",
940 rmax, order, n_dummy_runs));
941 assert (n_dummy_runs >= 0);
942 while (n_dummy_runs--)
944 heap_insert (huffman_queue, -2, 0);
949 /* Repeatedly merge the P shortest existing runs until only one run
953 int run_index[MAX_MERGE_ORDER];
954 int run_length[MAX_MERGE_ORDER];
955 int total_run_length = 0;
958 assert (rmax >= order);
960 /* Find the shortest runs; put them in runs[] in reverse order
961 of length, to force dummy runs of length 0 to the end of the
963 debug_printf ((_("merging runs")));
964 for (i = order - 1; i >= 0; i--)
966 run_index[i] = heap_delete (huffman_queue, &run_length[i]);
967 assert (run_index[i] != -1);
968 total_run_length += run_length[i];
969 debug_printf ((" %d(%d)", run_index[i], run_length[i]));
971 debug_printf ((_(" into run %d(%d)\n"), run_no, total_run_length));
973 if (!merge_once (run_index, run_length, order))
977 while (-1 != (index = heap_delete (huffman_queue, NULL)))
979 sprintf (tmp_extname, "%08x", index);
980 if (remove (tmp_basename) != 0)
981 msg (SE, _("%s: Error removing temporary file: %s."),
982 tmp_basename, strerror (errno));
988 if (!heap_insert (huffman_queue, run_no++, total_run_length))
990 msg (SE, _("Out of memory expanding Huffman priority queue."));
997 /* There should be exactly one element in the priority queue after
998 all that merging. This represents the entire sorted active file.
999 So we could find a total case count by deleting this element from
1001 assert (heap_size (huffman_queue) == 1);
1006 /* Merges N_RUNS initial runs into a new run. The jth run for 0 <= j
1007 < N_RUNS is taken from temporary file RUN_INDEX[j]; it is composed
1008 of RUN_LENGTH[j] cases. */
1010 merge_once (int run_index[], int run_length[], int n_runs)
1012 /* For each run, the number of records remaining in its buffer. */
1013 int buffered[MAX_MERGE_ORDER];
1015 /* For each run, the index of the next record in the buffer. */
1016 int buffer_ptr[MAX_MERGE_ORDER];
1018 /* Open input files. */
1022 for (i = 0; i < n_runs; i++)
1023 if (run_index[i] != -2 && !open_handle_r (i, run_index[i]))
1025 /* Close and remove temporary files. */
1029 sprintf (tmp_extname, "%08x", i);
1030 if (remove (tmp_basename) != 0)
1031 msg (SE, _("%s: Error removing temporary file: %s."),
1032 tmp_basename, strerror (errno));
1039 /* Create output file. */
1040 if (!open_handle_w (N_INPUT_BUFFERS, run_no))
1042 msg (SE, _("%s: Error creating temporary file for merge: %s."),
1043 tmp_basename, strerror (errno));
1047 /* Prime each buffer. */
1051 for (i = 0; i < n_runs; i++)
1052 if (run_index[i] == -2)
1060 int ofs = records_per_buffer * i;
1062 buffered[i] = min (records_per_buffer, run_length[i]);
1063 for (j = 0; j < buffered[i]; j++)
1064 if ((int) fread (x[j + ofs]->record, sizeof (union value),
1065 dict_get_value_cnt (default_dict), handle[i])
1066 != dict_get_value_cnt (default_dict))
1068 sprintf (tmp_extname, "%08x", run_index[i]);
1069 if (ferror (handle[i]))
1070 msg (SE, _("%s: Error reading temporary file in merge: %s."),
1071 tmp_basename, strerror (errno));
1073 msg (SE, _("%s: Unexpected end of temporary file in merge."),
1077 buffer_ptr[i] = ofs;
1078 run_length[i] -= buffered[i];
1082 /* Perform the merge proper. */
1083 while (n_runs) /* Loop while some data is left. */
1088 for (i = 1; i < n_runs; i++)
1089 if (compare_record (x[buffer_ptr[min]]->record,
1090 x[buffer_ptr[i]]->record) > 0)
1093 if ((int) fwrite (x[buffer_ptr[min]]->record, sizeof (union value),
1094 dict_get_value_cnt (default_dict),
1095 handle[N_INPUT_BUFFERS])
1096 != dict_get_value_cnt (default_dict))
1098 sprintf (tmp_extname, "%08x", run_index[i]);
1099 msg (SE, _("%s: Error writing temporary file in "
1100 "merge: %s."), tmp_basename, strerror (errno));
1104 /* Remove one case from the buffer for this input file. */
1105 if (--buffered[min] == 0)
1107 /* The input buffer is empty. Do any cases remain in the
1108 initial run on disk? */
1109 if (run_length[min])
1111 /* Yes. Read them in. */
1116 /* Reset the buffer pointer. Note that we can't simply
1117 set it to (i * records_per_buffer) since the run
1118 order might have changed. */
1119 ofs = buffer_ptr[min] -= buffer_ptr[min] % records_per_buffer;
1121 buffered[min] = min (records_per_buffer, run_length[min]);
1122 for (j = 0; j < buffered[min]; j++)
1123 if ((int) fread (x[j + ofs]->record, sizeof (union value),
1124 dict_get_value_cnt (default_dict),
1126 != dict_get_value_cnt (default_dict))
1128 sprintf (tmp_extname, "%08x", run_index[min]);
1129 if (ferror (handle[min]))
1130 msg (SE, _("%s: Error reading temporary file in "
1132 tmp_basename, strerror (errno));
1134 msg (SE, _("%s: Unexpected end of temporary file "
1139 run_length[min] -= buffered[min];
1143 /* No. Delete this run. */
1145 /* Close the file. */
1146 FILE *f = handle[min];
1148 sprintf (tmp_extname, "%08x", run_index[min]);
1149 if (fclose (f) == EOF)
1150 msg (SE, _("%s: Error closing temporary file in merge: "
1151 "%s."), tmp_basename, strerror (errno));
1153 /* Delete the file. */
1154 if (remove (tmp_basename) != 0)
1155 msg (SE, _("%s: Error removing temporary file in merge: "
1156 "%s."), tmp_basename, strerror (errno));
1161 /* Since this isn't the last run, we move the last
1162 run into its spot to force all the runs to be
1164 run_index[min] = run_index[n_runs];
1165 run_length[min] = run_length[n_runs];
1166 buffer_ptr[min] = buffer_ptr[n_runs];
1167 buffered[min] = buffered[n_runs];
1168 handle[min] = handle[n_runs];
1176 /* Close output file. */
1178 FILE *f = handle[N_INPUT_BUFFERS];
1179 handle[N_INPUT_BUFFERS] = NULL;
1180 if (fclose (f) == EOF)
1182 sprintf (tmp_extname, "%08x", run_no);
1183 msg (SE, _("%s: Error closing temporary file in merge: "
1185 tmp_basename, strerror (errno));
1193 /* Close all the input and output files. */
1197 for (i = 0; i < n_runs; i++)
1198 if (run_length[i] != 0)
1201 sprintf (tmp_basename, "%08x", run_index[i]);
1202 if (remove (tmp_basename) != 0)
1203 msg (SE, _("%s: Error removing temporary file: %s."),
1204 tmp_basename, strerror (errno));
1207 close_handle (N_INPUT_BUFFERS);
1208 sprintf (tmp_basename, "%08x", run_no);
1209 if (remove (tmp_basename) != 0)
1210 msg (SE, _("%s: Error removing temporary file: %s."),
1211 tmp_basename, strerror (errno));
1215 /* External sort input program. */
1217 /* Reads all the records from the source stream and passes them
1220 sort_stream_read (void)
1222 read_sort_output (write_case);
1225 /* Reads all the records from the output stream and passes them to the
1226 function provided, which must have an interface identical to
1229 read_sort_output (int (*write_case) (void))
1234 if (separate_case_tab)
1236 struct ccase *save_temp_case = temp_case;
1237 struct case_list **p;
1239 for (p = separate_case_tab; *p; p++)
1241 temp_case = &(*p)->c;
1245 free (separate_case_tab);
1246 separate_case_tab = NULL;
1248 temp_case = save_temp_case;
1250 sprintf (tmp_extname, "%08x", run_no - 1);
1251 f = fopen (tmp_basename, "rb");
1254 msg (ME, _("%s: Cannot open sort result file: %s."), tmp_basename,
1260 for (i = 0; i < vfm_source_info.ncases; i++)
1262 if (!fread (temp_case, vfm_source_info.case_size, 1, f))
1265 msg (ME, _("%s: Error reading sort result file: %s."),
1266 tmp_basename, strerror (errno));
1268 msg (ME, _("%s: Unexpected end of sort result file: %s."),
1269 tmp_basename, strerror (errno));
1278 if (fclose (f) == EOF)
1279 msg (ME, _("%s: Error closing sort result file: %s."), tmp_basename,
1282 if (remove (tmp_basename) != 0)
1283 msg (ME, _("%s: Error removing sort result file: %s."), tmp_basename,
1290 #if 0 /* dead code */
1291 /* Alternate interface to sort_stream_write used for external sorting
1292 when SEPARATE is true. */
1294 write_separate (struct ccase *c)
1296 assert (c == temp_case);
1298 sort_stream_write ();
1303 /* Performs one iteration of 5.4.1R steps R4, R5, R6, R7, R2, and
1306 sort_stream_write (void)
1308 struct repl_sel_tree *t;
1311 memcpy (q->record, temp_case->data, vfm_sink_info.case_size);
1312 if (compare_record (q->record, lastkey) < 0)
1324 || (t->rn == rq && compare_record (t->loser->record, q->record) < 0))
1326 struct repl_sel_tree *temp_tree;
1329 temp_tree = t->loser;
1349 assert (rq <= rmax);
1356 output_record (q->record);
1357 lastkey = x[x_max]->record;
1358 memcpy (lastkey, q->record, vfm_sink_info.case_size);
1362 /* Switches mode from sink to source. */
1364 sort_stream_mode (void)
1366 /* If this is not done, then we get the following source/sink pairs:
1367 source=memory/disk/DATA LIST/etc., sink=SORT; source=SORT,
1368 sink=SORT; which is not good. */
1372 struct case_stream sort_stream =