1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
44 #include <sys/types.h>
52 /*#define DEBUGGING 1*/
53 #include "debug-print.h"
55 /* Variables to sort. */
56 struct variable **v_sort;
59 /* Used when internal-sorting to a separate file. */
60 static struct case_list **separate_case_tab;
62 /* Exported by qsort.c. */
63 void blp_quicksort (void *pbase, size_t total_elems, size_t size,
64 int (*cmp) (const void *, const void *),
67 /* Other prototypes. */
68 static int compare_case_lists (const void *, const void *);
69 static int do_internal_sort (int separate);
70 static int do_external_sort (int separate);
71 int parse_sort_variables (void);
72 void read_sort_output (int (*write_case) (void));
74 /* Performs the SORT CASES procedures. */
78 /* First, just parse the command. */
79 lex_match_id ("SORT");
80 lex_match_id ("CASES");
83 if (!parse_sort_variables ())
88 /* Then it's time to do the actual work. There are two cases:
90 (internal sort) All the data is in memory. In this case, we
91 perform an EXECUTE to get the data into the desired form, then
92 sort the cases in place, if it is still all in memory.
94 (external sort) The data is not in memory. It may be coming from
95 a system file or other data file, etc. In any case, it is now
96 time to perform an external sort. This is better explained in
97 do_external_sort(). */
99 /* Do all this dirty work. */
101 int success = sort_cases (0);
104 return lex_end_of_command ();
110 /* Parses a list of sort variables into v_sort and nv_sort. */
112 parse_sort_variables (void)
118 int prev_nv_sort = nv_sort;
119 int order = SRT_ASCEND;
121 if (!parse_variables (&default_dict, &v_sort, &nv_sort,
122 PV_NO_DUPLICATE | PV_APPEND | PV_NO_SCRATCH))
126 if (lex_match_id ("D") || lex_match_id ("DOWN"))
128 else if (!lex_match_id ("A") && !lex_match_id ("UP"))
131 msg (SE, _("`A' or `D' expected inside parentheses."));
134 if (!lex_match (')'))
137 msg (SE, _("`)' expected."));
141 for (; prev_nv_sort < nv_sort; prev_nv_sort++)
142 v_sort[prev_nv_sort]->p.srt.order = order;
144 while (token != '.' && token != '/');
149 /* Sorts the active file based on the key variables specified in
150 global variables v_sort and nv_sort. The output is either to the
151 active file, if SEPARATE is zero, or to a separate file, if
152 SEPARATE is nonzero. In the latter case the output cases can be
153 read with a call to read_sort_output(). (In the former case the
154 output cases should be dealt with through the usual vfm interface.)
156 The caller is responsible for freeing v_sort[]. */
158 sort_cases (int separate)
160 assert (separate_case_tab == NULL);
162 /* Not sure this is necessary but it's good to be safe. */
163 if (separate && vfm_source == &sort_stream)
164 procedure (NULL, NULL, NULL);
166 /* SORT CASES cancels PROCESS IF. */
167 expr_free (process_if_expr);
168 process_if_expr = NULL;
170 if (do_internal_sort (separate))
174 return do_external_sort (separate);
177 /* If a reasonable situation is set up, do an internal sort of the
178 data. Return success. */
180 do_internal_sort (int separate)
182 if (vfm_source != &vfm_disk_stream)
184 if (vfm_source != &vfm_memory_stream)
185 procedure (NULL, NULL, NULL);
186 if (vfm_source == &vfm_memory_stream)
188 struct case_list **case_tab = malloc (sizeof *case_tab
189 * (vfm_source_info.ncases + 1));
190 if (vfm_source_info.ncases == 0)
195 if (case_tab != NULL)
197 struct case_list *clp = memory_source_cases;
198 struct case_list **ctp = case_tab;
201 for (; clp; clp = clp->next)
203 qsort (case_tab, vfm_source_info.ncases, sizeof *case_tab,
208 memory_source_cases = case_tab[0];
209 for (i = 1; i < vfm_source_info.ncases; i++)
210 case_tab[i - 1]->next = case_tab[i];
211 case_tab[vfm_source_info.ncases - 1]->next = NULL;
214 case_tab[vfm_source_info.ncases] = NULL;
215 separate_case_tab = case_tab;
225 /* Compares the NV_SORT variables in V_SORT[] between the `case_list's
226 at _A and _B, and returns a strcmp()-type result. */
228 compare_case_lists (const void *pa, const void *pb)
230 struct case_list *a = *(struct case_list **) pa;
231 struct case_list *b = *(struct case_list **) pb;
236 for (i = 0; i < nv_sort; i++)
240 if (v->type == NUMERIC)
242 if (approx_ne (a->c.data[v->fv].f, b->c.data[v->fv].f))
244 result = (a->c.data[v->fv].f > b->c.data[v->fv].f) ? 1 : -1;
250 result = memcmp (a->c.data[v->fv].s, b->c.data[v->fv].s, v->width);
256 if (v->p.srt.order == SRT_ASCEND)
260 assert (v->p.srt.order == SRT_DESCEND);
267 /* Maximum number of input + output file handles. */
268 #if defined FOPEN_MAX && (FOPEN_MAX - 5 < 18)
269 #define MAX_FILE_HANDLES (FOPEN_MAX - 5)
271 #define MAX_FILE_HANDLES 18
274 #if MAX_FILE_HANDLES < 3
275 #error At least 3 file handles must be available for sorting.
278 /* Number of input buffers. */
279 #define N_INPUT_BUFFERS (MAX_FILE_HANDLES - 1)
281 /* Maximum order of merge. This is the value suggested by Knuth;
282 specifically, he said to use tree selection, which we don't
283 implement, for larger orders of merge. */
284 #define MAX_MERGE_ORDER 7
286 /* Minimum total number of records for buffers. */
287 #define MIN_BUFFER_TOTAL_SIZE_RECS 64
289 /* Minimum single input or output buffer size, in bytes and records. */
290 #define MIN_BUFFER_SIZE_BYTES 4096
291 #define MIN_BUFFER_SIZE_RECS 16
293 /* Structure for replacement selection tree. */
296 struct repl_sel_tree *loser;/* Loser associated w/this internal node. */
297 int rn; /* Run number of `loser'. */
298 struct repl_sel_tree *fe; /* Internal node above this external node. */
299 struct repl_sel_tree *fi; /* Internal node above this internal node. */
300 union value record[1]; /* The case proper. */
303 /* Static variables used for sorting. */
304 static struct repl_sel_tree **x; /* Buffers. */
305 static int x_max; /* Size of buffers, in records. */
306 static int records_per_buffer; /* Number of records in each buffer. */
308 /* In the merge phase, the first N_INPUT_BUFFERS handle[] elements are
309 input files and the last element is the output file. Before that,
310 they're all used as output files, although the last one is
312 static FILE *handle[MAX_FILE_HANDLES]; /* File handles. */
314 /* Now, MAX_FILE_HANDLES is the maximum number of files we will *try*
315 to open. But if we can't open that many, max_handles will be set
316 to the number we apparently can open. */
317 static int max_handles; /* Maximum number of handles. */
319 /* When we create temporary files, they are all put in the same
320 directory and numbered sequentially from zero. tmp_basename is the
321 drive/directory, etc., and tmp_extname can be sprintf() with "%08x"
322 to the file number, then tmp_basename used to open the file. */
323 static char *tmp_basename; /* Temporary file basename. */
324 static char *tmp_extname; /* Temporary file extension name. */
326 /* We use Huffman's method to determine the merge pattern. This means
327 that we need to know which runs are the shortest at any given time.
328 Priority queues as implemented by heap.c are a natural for this
329 task (probably because I wrote the code specifically for it). */
330 static struct heap *huffman_queue; /* Huffman priority queue. */
332 /* Prototypes for helper functions. */
333 static void sort_stream_write (void);
334 static int write_initial_runs (int separate);
335 static int allocate_cases (void);
336 static int allocate_file_handles (void);
337 static int merge (void);
338 static void rmdir_temp_dir (void);
340 /* Performs an external sort of the active file. A description of the
341 procedure follows. All page references refer to Knuth's _Art of
342 Computer Programming, Vol. 3: Sorting and Searching_, which is the
343 canonical resource for sorting.
345 1. The data is read and S initial runs are formed through the
346 action of algorithm 5.4.1R (replacement selection).
348 2. Huffman's method (p. 365-366) is used to determine the optimum
351 3. If an OS that supports overlapped reading, writing, and
352 computing is being run, we should use 5.4.6F for forecasting.
353 Otherwise, buffers are filled only when they run out of data.
354 FIXME: Since the author of PSPP uses GNU/Linux, which does not
355 yet implement overlapped r/w/c, 5.4.6F is not used.
357 4. We perform P-way merges:
359 (a) The desired P is the smallest P such that ceil(ln(S)/ln(P))
360 is minimized. (FIXME: Since I don't have an algorithm for
361 minimizing this, it's just set to MAX_MERGE_ORDER.)
363 (b) P is reduced if the selected value would make input buffers
364 less than 4096 bytes each, or 16 records, whichever is larger.
366 (c) P is reduced if we run out of available file handles or space
369 (d) P is reduced if we don't have space for one or two output
370 buffers, which have the same minimum size as input buffers. (We
371 need two output buffers if 5.4.6F is in use for forecasting.) */
373 do_external_sort (int separate)
377 assert (MAX_FILE_HANDLES >= 3);
382 huffman_queue = heap_create (512);
383 if (huffman_queue == NULL)
386 if (!allocate_cases ())
389 if (!allocate_file_handles ())
392 if (!write_initial_runs (separate))
399 /* Despite the name, flow of control comes here regardless of
400 whether or not the sort is successful. */
402 heap_destroy (huffman_queue);
408 for (i = 0; i <= x_max; i++)
423 /* Sets up to open temporary files. */
424 /* PORTME: This creates a directory for temporary files. Some OSes
425 might not have that concept... */
427 allocate_file_handles (void)
429 const char *dir; /* Directory prefix. */
430 char *buf; /* String buffer. */
431 char *cp; /* Pointer into buf. */
433 dir = getenv ("SPSSTMPDIR");
435 dir = getenv ("SPSSXTMPDIR");
437 dir = getenv ("TMPDIR");
447 dir = getenv ("TEMP");
449 dir = getenv ("TMP");
456 buf = xmalloc (strlen (dir) + 1 + 4 + 8 + 4 + 1 + INT_DIGITS + 1);
457 cp = spprintf (buf, "%s%c%04lX%04lXpspp", dir, DIR_SEPARATOR,
458 ((long) time (0)) & 0xffff, ((long) getpid ()) & 0xffff);
459 if (-1 == mkdir (buf, S_IRWXU))
462 msg (SE, _("%s: Cannot create temporary directory: %s."),
463 buf, strerror (errno));
466 *cp++ = DIR_SEPARATOR;
471 max_handles = MAX_FILE_HANDLES;
476 /* Removes the directory created for temporary files, if one exists.
477 Also frees tmp_basename. */
479 rmdir_temp_dir (void)
481 if (NULL == tmp_basename)
484 tmp_extname[-1] = '\0';
485 if (rmdir (tmp_basename) == -1)
486 msg (SE, _("%s: Error removing directory for temporary files: %s."),
487 tmp_basename, strerror (errno));
492 /* Allocates room for lots of cases as a buffer. */
494 allocate_cases (void)
496 /* This is the size of one case. */
497 const int case_size = (sizeof (struct repl_sel_tree)
498 + sizeof (union value) * (default_dict.nval - 1)
499 + sizeof (struct repl_sel_tree *));
503 /* Allocate as many cases as we can, assuming a space of four
504 void pointers for malloc()'s internal bookkeeping. */
505 x_max = MAX_WORKSPACE / (case_size + 4 * sizeof (void *));
506 x = malloc (sizeof (struct repl_sel_tree *) * x_max);
511 for (i = 0; i < x_max; i++)
513 x[i] = malloc (sizeof (struct repl_sel_tree)
514 + sizeof (union value) * (default_dict.nval - 1));
520 if (x == NULL || x_max < MIN_BUFFER_TOTAL_SIZE_RECS)
526 for (i = 0; i < x_max; i++)
530 msg (SE, _("Out of memory. Could not allocate room for minimum of %d "
531 "cases of %d bytes each. (PSPP workspace is currently "
532 "restricted to a maximum of %d KB.)"),
533 MIN_BUFFER_TOTAL_SIZE_RECS, case_size, MAX_WORKSPACE / 1024);
539 /* The last element of the array is used to store lastkey. */
542 debug_printf ((_("allocated %d cases == %d bytes\n"),
543 x_max, x_max * case_size));
547 /* Replacement selection. */
549 static int rmax, rc, rq;
550 static struct repl_sel_tree *q;
551 static union value *lastkey;
552 static int run_no, file_index;
553 static int deferred_abort;
554 static int run_length;
556 static int compare_record (union value *, union value *);
559 output_record (union value *v)
561 union value *src_case;
566 if (compaction_necessary)
568 compact_case (compaction_case, (struct ccase *) v);
569 src_case = (union value *) compaction_case;
572 src_case = (union value *) v;
574 if ((int) fwrite (src_case, sizeof *src_case, compaction_nval,
579 sprintf (tmp_extname, "%08x", run_no);
580 msg (SE, _("%s: Error writing temporary file: %s."),
581 tmp_basename, strerror (errno));
591 int result = fclose (handle[i]);
592 msg (VM (2), _("SORT: Closing handle %d."), i);
597 sprintf (tmp_extname, "%08x", i);
598 msg (SE, _("%s: Error closing temporary file: %s."),
599 tmp_basename, strerror (errno));
606 close_handles (int beg, int end)
611 for (i = beg; i < end; i++)
612 success &= close_handle (i);
617 open_handle_w (int handle_no, int run_no)
619 sprintf (tmp_extname, "%08x", run_no);
620 msg (VM (1), _("SORT: %s: Opening for writing as run %d."),
621 tmp_basename, run_no);
623 /* The `x' modifier causes the GNU C library to insist on creating a
624 new file--if the file already exists, an error is signaled. The
625 ANSI C standard says that other libraries should ignore anything
626 after the `w+b', so it shouldn't be a problem. */
627 return NULL != (handle[handle_no] = fopen (tmp_basename, "w+bx"));
631 open_handle_r (int handle_no, int run_no)
635 sprintf (tmp_extname, "%08x", run_no);
636 msg (VM (1), _("SORT: %s: Opening for writing as run %d."),
637 tmp_basename, run_no);
638 f = handle[handle_no] = fopen (tmp_basename, "rb");
642 msg (SE, _("%s: Error opening temporary file for reading: %s."),
643 tmp_basename, strerror (errno));
650 /* Begins a new initial run, specifically its output file. */
654 /* Decide which handle[] to use. If run_no is max_handles or
655 greater, then we've run out of handles so it's time to just do
656 one file at a time, which by default is handle 0. */
657 file_index = (run_no < max_handles ? run_no : 0);
660 /* Alright, now create the temporary file. */
661 if (open_handle_w (file_index, run_no) == 0)
663 /* Failure to create the temporary file. Check if there are
664 unacceptably few files already open. */
668 msg (SE, _("%s: Error creating temporary file: %s."),
669 tmp_basename, strerror (errno));
673 /* Close all the open temporary files. */
674 if (!close_handles (0, file_index))
677 /* Now try again to create the temporary file. */
678 max_handles = file_index;
680 if (open_handle_w (0, run_no) == 0)
682 /* It still failed, report it this time. */
684 msg (SE, _("%s: Error creating temporary file: %s."),
685 tmp_basename, strerror (errno));
691 /* Ends the current initial run. Just increments run_no if no initial
692 run has been started yet. */
696 /* Close file handles if necessary. */
700 if (run_no == max_handles - 1)
701 result = close_handles (0, max_handles);
702 else if (run_no >= max_handles)
703 result = close_handle (0);
710 /* Advance to next run. */
713 heap_insert (huffman_queue, run_no - 1, run_length);
716 /* Performs 5.4.1R. */
718 write_initial_runs (int separate)
723 /* Steps R1, R2, R3. */
732 for (j = 0; j < x_max; j++)
734 struct repl_sel_tree *J = x[j];
738 J->fe = x[(x_max + j) / 2];
740 memset (J->record, 0, default_dict.nval * sizeof (union value));
744 /* Most of the iterations of steps R4, R5, R6, R7, R2, R3, ... */
748 vfm_sink->destroy_sink ();
749 vfm_sink = &sort_stream;
751 procedure (NULL, NULL, NULL);
753 /* Final iterations of steps R4, R5, R6, R7, R2, R3, ... */
756 struct repl_sel_tree *t;
770 && compare_record (t->loser->record, q->record) < 0))
772 struct repl_sel_tree *temp_tree;
775 temp_tree = t->loser;
803 output_record (q->record);
804 lastkey = x[x_max]->record;
805 memcpy (lastkey, q->record, sizeof (union value) * vfm_sink_info.nval);
808 assert (run_no == rmax);
810 /* If an unrecoverable error occurred somewhere in the above code,
811 then the `deferred_abort' flag would have been set. */
816 for (i = 0; i < max_handles; i++)
817 if (handle[i] != NULL)
819 sprintf (tmp_extname, "%08x", i);
821 if (fclose (handle[i]) == EOF)
822 msg (SE, _("%s: Error closing temporary file: %s."),
823 tmp_basename, strerror (errno));
825 if (remove (tmp_basename) != 0)
826 msg (SE, _("%s: Error removing temporary file: %s."),
827 tmp_basename, strerror (errno));
837 /* Compares the NV_SORT variables in V_SORT[] between the `value's at
838 A and B, and returns a strcmp()-type result. */
840 compare_record (union value * a, union value * b)
847 if (b == NULL) /* Sort NULLs after everything else. */
850 for (i = 0; i < nv_sort; i++)
854 if (v->type == NUMERIC)
856 if (approx_ne (a[v->fv].f, b[v->fv].f))
858 result = (a[v->fv].f > b[v->fv].f) ? 1 : -1;
864 result = memcmp (a[v->fv].s, b[v->fv].s, v->width);
870 if (v->p.srt.order == SRT_ASCEND)
874 assert (v->p.srt.order == SRT_DESCEND);
881 static int merge_once (int run_index[], int run_length[], int n_runs);
883 /* Modula function as defined by Knuth. */
891 result = abs (x) % abs (y);
897 /* Performs a series of P-way merges of initial runs using Huffman's
902 /* Order of merge. */
906 assert (MIN_BUFFER_SIZE_RECS * 2 <= MIN_BUFFER_TOTAL_SIZE_RECS - 1);
908 /* Close all the input files. I hope that the boundary conditions
909 are correct on this but I'm not sure. */
910 if (run_no < max_handles)
914 for (i = 0; i < run_no; )
915 if (!close_handle (i++))
917 for (; i < run_no; i++)
923 /* Determine order of merge. */
924 order = MAX_MERGE_ORDER;
925 if (x_max / order < MIN_BUFFER_SIZE_RECS)
926 order = x_max / MIN_BUFFER_SIZE_RECS;
927 else if (x_max / order * sizeof (union value) * default_dict.nval
928 < MIN_BUFFER_SIZE_BYTES)
929 order = x_max / (MIN_BUFFER_SIZE_BYTES
930 / (sizeof (union value) * (default_dict.nval - 1)));
932 /* Make sure the order of merge is bounded. */
937 assert (x_max / order > 0);
939 /* Calculate number of records per buffer. */
940 records_per_buffer = x_max / order;
942 /* Add (1 - S) mod (P - 1) dummy runs of length 0. */
944 int n_dummy_runs = mod (1 - rmax, order - 1);
945 debug_printf (("rmax=%d, order=%d, n_dummy_runs=%d\n",
946 rmax, order, n_dummy_runs));
947 assert (n_dummy_runs >= 0);
948 while (n_dummy_runs--)
950 heap_insert (huffman_queue, -2, 0);
955 /* Repeatedly merge the P shortest existing runs until only one run
959 int run_index[MAX_MERGE_ORDER];
960 int run_length[MAX_MERGE_ORDER];
961 int total_run_length = 0;
964 assert (rmax >= order);
966 /* Find the shortest runs; put them in runs[] in reverse order
967 of length, to force dummy runs of length 0 to the end of the
969 debug_printf ((_("merging runs")));
970 for (i = order - 1; i >= 0; i--)
972 run_index[i] = heap_delete (huffman_queue, &run_length[i]);
973 assert (run_index[i] != -1);
974 total_run_length += run_length[i];
975 debug_printf ((" %d(%d)", run_index[i], run_length[i]));
977 debug_printf ((_(" into run %d(%d)\n"), run_no, total_run_length));
979 if (!merge_once (run_index, run_length, order))
983 while (-1 != (index = heap_delete (huffman_queue, NULL)))
985 sprintf (tmp_extname, "%08x", index);
986 if (remove (tmp_basename) != 0)
987 msg (SE, _("%s: Error removing temporary file: %s."),
988 tmp_basename, strerror (errno));
994 if (!heap_insert (huffman_queue, run_no++, total_run_length))
996 msg (SE, _("Out of memory expanding Huffman priority queue."));
1003 /* There should be exactly one element in the priority queue after
1004 all that merging. This represents the entire sorted active file.
1005 So we could find a total case count by deleting this element from
1007 assert (heap_size (huffman_queue) == 1);
1012 /* Merges N_RUNS initial runs into a new run. The jth run for 0 <= j
1013 < N_RUNS is taken from temporary file RUN_INDEX[j]; it is composed
1014 of RUN_LENGTH[j] cases. */
1016 merge_once (int run_index[], int run_length[], int n_runs)
1018 /* For each run, the number of records remaining in its buffer. */
1019 int buffered[MAX_MERGE_ORDER];
1021 /* For each run, the index of the next record in the buffer. */
1022 int buffer_ptr[MAX_MERGE_ORDER];
1024 /* Open input files. */
1028 for (i = 0; i < n_runs; i++)
1029 if (run_index[i] != -2 && !open_handle_r (i, run_index[i]))
1031 /* Close and remove temporary files. */
1035 sprintf (tmp_extname, "%08x", i);
1036 if (remove (tmp_basename) != 0)
1037 msg (SE, _("%s: Error removing temporary file: %s."),
1038 tmp_basename, strerror (errno));
1045 /* Create output file. */
1046 if (!open_handle_w (N_INPUT_BUFFERS, run_no))
1048 msg (SE, _("%s: Error creating temporary file for merge: %s."),
1049 tmp_basename, strerror (errno));
1053 /* Prime each buffer. */
1057 for (i = 0; i < n_runs; i++)
1058 if (run_index[i] == -2)
1066 int ofs = records_per_buffer * i;
1068 buffered[i] = min (records_per_buffer, run_length[i]);
1069 for (j = 0; j < buffered[i]; j++)
1070 if ((int) fread (x[j + ofs]->record, sizeof (union value),
1071 default_dict.nval, handle[i])
1072 != default_dict.nval)
1074 sprintf (tmp_extname, "%08x", run_index[i]);
1075 if (ferror (handle[i]))
1076 msg (SE, _("%s: Error reading temporary file in merge: %s."),
1077 tmp_basename, strerror (errno));
1079 msg (SE, _("%s: Unexpected end of temporary file in merge."),
1083 buffer_ptr[i] = ofs;
1084 run_length[i] -= buffered[i];
1088 /* Perform the merge proper. */
1089 while (n_runs) /* Loop while some data is left. */
1094 for (i = 1; i < n_runs; i++)
1095 if (compare_record (x[buffer_ptr[min]]->record,
1096 x[buffer_ptr[i]]->record) > 0)
1099 if ((int) fwrite (x[buffer_ptr[min]]->record, sizeof (union value),
1100 default_dict.nval, handle[N_INPUT_BUFFERS])
1101 != default_dict.nval)
1103 sprintf (tmp_extname, "%08x", run_index[i]);
1104 msg (SE, _("%s: Error writing temporary file in "
1105 "merge: %s."), tmp_basename, strerror (errno));
1109 /* Remove one case from the buffer for this input file. */
1110 if (--buffered[min] == 0)
1112 /* The input buffer is empty. Do any cases remain in the
1113 initial run on disk? */
1114 if (run_length[min])
1116 /* Yes. Read them in. */
1121 /* Reset the buffer pointer. Note that we can't simply
1122 set it to (i * records_per_buffer) since the run
1123 order might have changed. */
1124 ofs = buffer_ptr[min] -= buffer_ptr[min] % records_per_buffer;
1126 buffered[min] = min (records_per_buffer, run_length[min]);
1127 for (j = 0; j < buffered[min]; j++)
1128 if ((int) fread (x[j + ofs]->record, sizeof (union value),
1129 default_dict.nval, handle[min])
1130 != default_dict.nval)
1132 sprintf (tmp_extname, "%08x", run_index[min]);
1133 if (ferror (handle[min]))
1134 msg (SE, _("%s: Error reading temporary file in "
1136 tmp_basename, strerror (errno));
1138 msg (SE, _("%s: Unexpected end of temporary file "
1143 run_length[min] -= buffered[min];
1147 /* No. Delete this run. */
1149 /* Close the file. */
1150 FILE *f = handle[min];
1152 sprintf (tmp_extname, "%08x", run_index[min]);
1153 if (fclose (f) == EOF)
1154 msg (SE, _("%s: Error closing temporary file in merge: "
1155 "%s."), tmp_basename, strerror (errno));
1157 /* Delete the file. */
1158 if (remove (tmp_basename) != 0)
1159 msg (SE, _("%s: Error removing temporary file in merge: "
1160 "%s."), tmp_basename, strerror (errno));
1165 /* Since this isn't the last run, we move the last
1166 run into its spot to force all the runs to be
1168 run_index[min] = run_index[n_runs];
1169 run_length[min] = run_length[n_runs];
1170 buffer_ptr[min] = buffer_ptr[n_runs];
1171 buffered[min] = buffered[n_runs];
1172 handle[min] = handle[n_runs];
1180 /* Close output file. */
1182 FILE *f = handle[N_INPUT_BUFFERS];
1183 handle[N_INPUT_BUFFERS] = NULL;
1184 if (fclose (f) == EOF)
1186 sprintf (tmp_extname, "%08x", run_no);
1187 msg (SE, _("%s: Error closing temporary file in merge: "
1189 tmp_basename, strerror (errno));
1197 /* Close all the input and output files. */
1201 for (i = 0; i < n_runs; i++)
1202 if (run_length[i] != 0)
1205 sprintf (tmp_basename, "%08x", run_index[i]);
1206 if (remove (tmp_basename) != 0)
1207 msg (SE, _("%s: Error removing temporary file: %s."),
1208 tmp_basename, strerror (errno));
1211 close_handle (N_INPUT_BUFFERS);
1212 sprintf (tmp_basename, "%08x", run_no);
1213 if (remove (tmp_basename) != 0)
1214 msg (SE, _("%s: Error removing temporary file: %s."),
1215 tmp_basename, strerror (errno));
1219 /* External sort input program. */
1221 /* Reads all the records from the source stream and passes them
1224 sort_stream_read (void)
1226 read_sort_output (write_case);
1229 /* Reads all the records from the output stream and passes them to the
1230 function provided, which must have an interface identical to
1233 read_sort_output (int (*write_case) (void))
1238 if (separate_case_tab)
1240 struct ccase *save_temp_case = temp_case;
1241 struct case_list **p;
1243 for (p = separate_case_tab; *p; p++)
1245 temp_case = &(*p)->c;
1249 free (separate_case_tab);
1250 separate_case_tab = NULL;
1252 temp_case = save_temp_case;
1254 sprintf (tmp_extname, "%08x", run_no - 1);
1255 f = fopen (tmp_basename, "rb");
1258 msg (ME, _("%s: Cannot open sort result file: %s."), tmp_basename,
1264 for (i = 0; i < vfm_source_info.ncases; i++)
1266 if (!fread (temp_case, vfm_source_info.case_size, 1, f))
1269 msg (ME, _("%s: Error reading sort result file: %s."),
1270 tmp_basename, strerror (errno));
1272 msg (ME, _("%s: Unexpected end of sort result file: %s."),
1273 tmp_basename, strerror (errno));
1282 if (fclose (f) == EOF)
1283 msg (ME, _("%s: Error closing sort result file: %s."), tmp_basename,
1286 if (remove (tmp_basename) != 0)
1287 msg (ME, _("%s: Error removing sort result file: %s."), tmp_basename,
1294 #if 0 /* dead code */
1295 /* Alternate interface to sort_stream_write used for external sorting
1296 when SEPARATE is true. */
1298 write_separate (struct ccase *c)
1300 assert (c == temp_case);
1302 sort_stream_write ();
1307 /* Performs one iteration of 5.4.1R steps R4, R5, R6, R7, R2, and
1310 sort_stream_write (void)
1312 struct repl_sel_tree *t;
1315 memcpy (q->record, temp_case->data, vfm_sink_info.case_size);
1316 if (compare_record (q->record, lastkey) < 0)
1328 || (t->rn == rq && compare_record (t->loser->record, q->record) < 0))
1330 struct repl_sel_tree *temp_tree;
1333 temp_tree = t->loser;
1353 assert (rq <= rmax);
1360 output_record (q->record);
1361 lastkey = x[x_max]->record;
1362 memcpy (lastkey, q->record, vfm_sink_info.case_size);
1366 /* Switches mode from sink to source. */
1368 sort_stream_mode (void)
1370 /* If this is not done, then we get the following source/sink pairs:
1371 source=memory/disk/DATA LIST/etc., sink=SORT; source=SORT,
1372 sink=SORT; which is not good. */
1376 struct case_stream sort_stream =