1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
26 #include "algorithm.h"
44 #include <sys/types.h>
51 #include "debug-print.h"
53 /* Other prototypes. */
54 static int compare_record (const union value *, const union value *,
55 const struct sort_cases_pgm *);
56 static int compare_case_lists (const void *, const void *, void *);
57 static struct internal_sort *do_internal_sort (struct sort_cases_pgm *,
59 static void destroy_internal_sort (struct internal_sort *);
60 static struct external_sort *do_external_sort (struct sort_cases_pgm *,
62 static void destroy_external_sort (struct external_sort *);
63 struct sort_cases_pgm *parse_sort (void);
65 /* Performs the SORT CASES procedures. */
69 struct sort_cases_pgm *scp;
72 lex_match_id ("SORT");
73 lex_match_id ("CASES");
82 success = sort_cases (scp, 0);
83 destroy_sort_cases_pgm (scp);
85 return lex_end_of_command ();
90 /* Parses a list of sort keys and returns a struct sort_cases_pgm
91 based on it. Returns a null pointer on error. */
92 struct sort_cases_pgm *
95 struct sort_cases_pgm *scp;
97 scp = xmalloc (sizeof *scp);
107 int prev_var_cnt = scp->var_cnt;
108 enum sort_direction direction = SRT_ASCEND;
111 if (!parse_variables (default_dict, &scp->vars, &scp->var_cnt,
112 PV_NO_DUPLICATE | PV_APPEND | PV_NO_SCRATCH))
115 /* Sort direction. */
118 if (lex_match_id ("D") || lex_match_id ("DOWN"))
119 direction = SRT_DESCEND;
120 else if (!lex_match_id ("A") && !lex_match_id ("UP"))
122 msg (SE, _("`A' or `D' expected inside parentheses."));
125 if (!lex_match (')'))
127 msg (SE, _("`)' expected."));
131 scp->dirs = xrealloc (scp->dirs, sizeof *scp->dirs * scp->var_cnt);
132 for (; prev_var_cnt < scp->var_cnt; prev_var_cnt++)
133 scp->dirs[prev_var_cnt] = direction;
135 while (token != '.' && token != '/');
140 destroy_sort_cases_pgm (scp);
145 destroy_sort_cases_pgm (struct sort_cases_pgm *scp)
149 assert (scp->ref_cnt > 0);
150 if (--scp->ref_cnt > 0)
155 destroy_internal_sort (scp->isrt);
156 destroy_external_sort (scp->xsrt);
161 /* Sorts the active file based on the key variables specified in
162 global variables vars and var_cnt. The output is either to the
163 active file, if SEPARATE is zero, or to a separate file, if
164 SEPARATE is nonzero. In the latter case the output cases can be
165 read with a call to read_sort_output(). (In the former case the
166 output cases should be dealt with through the usual vfm interface.)
168 The caller is responsible for freeing vars[]. */
170 sort_cases (struct sort_cases_pgm *scp, int separate)
172 /* Not sure this is necessary but it's good to be safe. */
173 if (separate && case_source_is_class (vfm_source, &sort_source_class))
174 procedure (NULL, NULL, NULL, NULL);
176 /* SORT CASES cancels PROCESS IF. */
177 expr_free (process_if_expr);
178 process_if_expr = NULL;
180 /* Try an internal sort first. */
181 scp->isrt = do_internal_sort (scp, separate);
182 if (scp->isrt != NULL)
185 /* Fall back to an external sort. */
186 write_active_file_to_disk ();
187 scp->xsrt = do_external_sort (scp, separate);
188 if (scp->xsrt != NULL)
191 destroy_sort_cases_pgm (scp);
197 struct case_list **results;
200 /* If a reasonable situation is set up, do an internal sort of the
201 data. Return success. */
202 static struct internal_sort *
203 do_internal_sort (struct sort_cases_pgm *scp, int separate)
205 struct internal_sort *isrt;
207 isrt = xmalloc (sizeof *isrt);
208 isrt->results = NULL;
210 if (!case_source_is_class (vfm_source, &disk_source_class))
212 if (!case_source_is_class (vfm_source, &memory_source_class))
213 procedure (NULL, NULL, NULL, NULL);
215 if (case_source_is_class (vfm_source, &memory_source_class))
217 struct case_list *case_list;
218 struct case_list **case_array;
222 case_cnt = vfm_source->class->count (vfm_source);
226 if (case_cnt > set_max_workspace / sizeof *case_array)
229 case_list = memory_source_get_cases (vfm_source);
230 case_array = malloc (sizeof *case_array * (case_cnt + 1));
231 if (case_array == NULL)
234 for (i = 0; case_list != NULL; i++)
236 case_array[i] = case_list;
237 case_list = case_list->next;
239 assert (i == case_cnt);
240 case_array[case_cnt] = NULL;
242 sort (case_array, case_cnt, sizeof *case_array,
243 compare_case_lists, scp);
247 memory_source_set_cases (vfm_source, case_array[0]);
248 for (i = 1; i <= case_cnt; i++)
249 case_array[i - 1]->next = case_array[i];
253 isrt->results = case_array;
265 destroy_internal_sort (struct internal_sort *isrt)
269 free (isrt->results);
274 /* Compares the VAR_CNT variables in VARS[] between the
275 `case_list's at A and B, and returns a strcmp()-type
278 compare_case_lists (const void *a_, const void *b_, void *scp_)
280 struct sort_cases_pgm *scp = scp_;
281 struct case_list *const *pa = a_;
282 struct case_list *const *pb = b_;
283 struct case_list *a = *pa;
284 struct case_list *b = *pb;
286 return compare_record (a->c.data, b->c.data, scp);
291 /* Maximum order of merge. If you increase this, then you should
292 use a heap for comparing cases during merge. */
293 #define MAX_MERGE_ORDER 7
295 /* Minimum total number of records for buffers. */
296 #define MIN_BUFFER_TOTAL_SIZE_RECS 64
298 /* Minimum single input buffer size, in bytes and records. */
299 #define MIN_BUFFER_SIZE_BYTES 4096
300 #define MIN_BUFFER_SIZE_RECS 16
302 #if MIN_BUFFER_SIZE_RECS * 2 + 16 > MIN_BUFFER_TOTAL_SIZE_RECS
303 #error MIN_BUFFER_SIZE_RECS and MIN_BUFFER_TOTAL_SIZE_RECS do not make sense.
306 /* An initial run and its length. */
309 int file_idx; /* File index. */
310 size_t case_cnt; /* Number of cases. */
313 /* Sorts initial runs A and B in decending order by length. */
315 compare_initial_runs (const void *a_, const void *b_, void *aux UNUSED)
317 const struct initial_run *a = a_;
318 const struct initial_run *b = b_;
320 return a->case_cnt > b->case_cnt ? -1 : a->case_cnt <b->case_cnt;
325 struct sort_cases_pgm *scp; /* SORT CASES info. */
326 struct initial_run *initial_runs; /* Array of initial runs. */
327 size_t run_cnt, run_cap; /* Number of runs, allocated capacity. */
328 char *temp_dir; /* Temporary file directory name. */
329 int next_file_idx; /* Lowest unused file index. */
330 size_t case_size; /* Number of bytes in case. */
333 /* Prototypes for helper functions. */
334 static void sort_sink_write (struct case_sink *, struct ccase *);
335 static int write_initial_runs (struct external_sort *, int separate);
336 static int init_external_sort (struct external_sort *);
337 static int merge (struct external_sort *);
338 static void rmdir_temp_dir (struct external_sort *);
339 static void remove_temp_file (struct external_sort *xsrt, int file_idx);
341 /* Performs an external sort of the active file according to the
342 specification in SCP. Forms initial runs using a heap as a
343 reservoir. Determines the optimum merge pattern via Huffman's
344 method (see Knuth vol. 3, 2nd edition, p. 365-366), and merges
345 according to that pattern. */
346 static struct external_sort *
347 do_external_sort (struct sort_cases_pgm *scp, int separate)
349 struct external_sort *xsrt;
352 xsrt = xmalloc (sizeof *xsrt);
354 xsrt->case_size = sizeof (union value) * compaction_nval;
355 if (!init_external_sort (xsrt))
357 if (!write_initial_runs (xsrt, separate))
367 /* Don't destroy anything because we'll need it for reading
373 destroy_external_sort (xsrt);
380 destroy_external_sort (struct external_sort *xsrt)
386 for (i = 0; i < xsrt->run_cnt; i++)
387 remove_temp_file (xsrt, xsrt->initial_runs[i].file_idx);
388 rmdir_temp_dir (xsrt);
389 free (xsrt->initial_runs);
395 /* Creates and returns the name of a temporary directory. */
399 const char *parent_dir;
402 if (getenv ("TMPDIR") != NULL)
403 parent_dir = getenv ("TMPDIR");
405 parent_dir = P_tmpdir;
407 temp_dir = xmalloc (strlen (parent_dir) + 32);
408 sprintf (temp_dir, "%s%cpsppXXXXXX", parent_dir, DIR_SEPARATOR);
409 if (mkdtemp (temp_dir) == NULL)
411 msg (SE, _("%s: Creating temporary directory: %s."),
412 temp_dir, strerror (errno));
419 #else /* !HAVE_MKDTEMP */
420 /* Creates directory DIR. */
422 do_mkdir (const char *dir)
425 return mkdir (dir, S_IRWXU);
431 /* Creates and returns the name of a temporary directory. */
437 for (i = 0; i < 100; i++)
439 char temp_dir[L_tmpnam + 1];
440 if (tmpnam (temp_dir) == NULL)
442 msg (SE, _("Generating temporary directory name failed: %s."),
446 else if (do_mkdir (temp_dir) == 0)
447 return xstrdup (temp_dir);
450 msg (SE, _("Creating temporary directory failed: %s."), strerror (errno));
453 #endif /* !HAVE_MKDTEMP */
455 /* Sets up to open temporary files. */
457 init_external_sort (struct external_sort *xsrt)
460 xsrt->temp_dir = NULL;
461 xsrt->next_file_idx = 0;
466 xsrt->initial_runs = xmalloc (sizeof *xsrt->initial_runs * xsrt->run_cap);
468 /* Temporary directory. */
469 xsrt->temp_dir = make_temp_dir ();
470 if (xsrt->temp_dir == NULL)
478 simulate_error (void)
480 static int op_err_cnt = -1;
483 if (op_err_cnt == -1 || op_cnt++ < op_err_cnt)
492 /* Removes the directory created for temporary files, if one
495 rmdir_temp_dir (struct external_sort *xsrt)
497 if (xsrt->temp_dir != NULL && rmdir (xsrt->temp_dir) == -1)
499 msg (SE, _("%s: Error removing directory for temporary files: %s."),
500 xsrt->temp_dir, strerror (errno));
501 xsrt->temp_dir = NULL;
505 #define TEMP_FILE_NAME_SIZE (L_tmpnam + 32)
507 get_temp_file_name (struct external_sort *xsrt, int file_idx,
508 char filename[TEMP_FILE_NAME_SIZE])
510 assert (xsrt->temp_dir != NULL);
511 sprintf (filename, "%s%c%04d", xsrt->temp_dir, DIR_SEPARATOR, file_idx);
515 open_temp_file (struct external_sort *xsrt, int file_idx, const char *mode)
517 char temp_file[TEMP_FILE_NAME_SIZE];
520 get_temp_file_name (xsrt, file_idx, temp_file);
522 file = fopen (temp_file, mode);
523 if (simulate_error () || file == NULL)
524 msg (SE, _("%s: Error opening temporary file for %s: %s."),
525 temp_file, mode[0] == 'r' ? "reading" : "writing",
532 close_temp_file (struct external_sort *xsrt, int file_idx, FILE *file)
536 char temp_file[TEMP_FILE_NAME_SIZE];
537 get_temp_file_name (xsrt, file_idx, temp_file);
538 if (simulate_error () || fclose (file) == EOF)
540 msg (SE, _("%s: Error closing temporary file: %s."),
541 temp_file, strerror (errno));
549 remove_temp_file (struct external_sort *xsrt, int file_idx)
553 char temp_file[TEMP_FILE_NAME_SIZE];
554 get_temp_file_name (xsrt, file_idx, temp_file);
555 if (simulate_error () || remove (temp_file) != 0)
556 msg (SE, _("%s: Error removing temporary file: %s."),
557 temp_file, strerror (errno));
562 write_temp_file (struct external_sort *xsrt, int file_idx,
563 FILE *file, const void *data, size_t size)
565 if (!simulate_error () && fwrite (data, size, 1, file) == 1)
569 char temp_file[TEMP_FILE_NAME_SIZE];
570 get_temp_file_name (xsrt, file_idx, temp_file);
571 msg (SE, _("%s: Error writing temporary file: %s."),
572 temp_file, strerror (errno));
578 read_temp_file (struct external_sort *xsrt, int file_idx,
579 FILE *file, void *data, size_t size)
581 if (!simulate_error () && fread (data, size, 1, file) == 1)
585 char temp_file[TEMP_FILE_NAME_SIZE];
586 get_temp_file_name (xsrt, file_idx, temp_file);
588 msg (SE, _("%s: Error reading temporary file: %s."),
589 temp_file, strerror (errno));
591 msg (SE, _("%s: Unexpected end of temporary file."),
597 /* Replacement selection. */
599 /* Pairs a record with a run number. */
602 int run; /* Run number of case. */
603 struct case_list *record; /* Case data. */
606 struct initial_run_state
608 struct external_sort *xsrt;
611 struct record_run *records; /* Records arranged as a heap. */
612 size_t record_cnt; /* Current number of records. */
613 size_t record_cap; /* Capacity for records. */
614 struct case_list *free_list;/* Cases not in heap. */
616 /* Run currently being output. */
617 int file_idx; /* Temporary file number. */
618 size_t case_cnt; /* Number of cases so far. */
619 size_t case_size; /* Number of bytes in a case. */
620 FILE *output_file; /* Output file. */
621 struct case_list *last_output;/* Record last output. */
623 int okay; /* Zero if an error has been encountered. */
626 static void destroy_initial_run_state (struct initial_run_state *irs);
627 static int allocate_cases (struct initial_run_state *);
628 static struct case_list *grab_case (struct initial_run_state *);
629 static void release_case (struct initial_run_state *, struct case_list *);
630 static void output_record (struct initial_run_state *irs);
631 static void start_run (struct initial_run_state *irs);
632 static void end_run (struct initial_run_state *irs);
633 static int compare_record_run (const struct record_run *,
634 const struct record_run *,
635 struct sort_cases_pgm *);
636 static int compare_record_run_minheap (const void *, const void *, void *);
639 write_initial_runs (struct external_sort *xsrt, int separate)
641 struct initial_run_state *irs;
644 /* Allocate memory for cases. */
645 irs = xmalloc (sizeof *irs);
648 irs->record_cnt = irs->record_cap = 0;
649 irs->free_list = NULL;
650 irs->output_file = NULL;
651 irs->last_output = NULL;
654 irs->case_size = dict_get_case_size (default_dict);
656 if (!allocate_cases (irs))
659 /* Create case sink. */
663 vfm_sink->class->destroy (vfm_sink);
664 vfm_sink = create_case_sink (&sort_sink_class, irs);
665 xsrt->scp->ref_cnt++;
668 /* Create initial runs. */
670 procedure (NULL, NULL, NULL, NULL);
671 while (irs->record_cnt > 0 && irs->okay)
678 destroy_initial_run_state (irs);
683 /* Add a single case to an initial run. */
685 sort_sink_write (struct case_sink *sink, struct ccase *c)
687 struct initial_run_state *irs = sink->aux;
688 struct record_run *new_record_run;
693 /* Compose record_run for this run and add to heap. */
694 assert (irs->record_cnt < irs->record_cap);
695 new_record_run = irs->records + irs->record_cnt++;
696 new_record_run->record = grab_case (irs);
697 memcpy (new_record_run->record->c.data, c->data, irs->case_size);
698 new_record_run->run = irs->file_idx;
699 if (irs->last_output != NULL
700 && compare_record (c->data, irs->last_output->c.data,
702 new_record_run->run = irs->xsrt->next_file_idx;
703 push_heap (irs->records, irs->record_cnt, sizeof *irs->records,
704 compare_record_run_minheap, irs->xsrt->scp);
706 /* Output a record if the reservoir is full. */
707 if (irs->record_cnt == irs->record_cap && irs->okay)
712 destroy_initial_run_state (struct initial_run_state *irs)
714 struct case_list *iter, *next;
720 /* Release cases to free list. */
721 for (i = 0; i < irs->record_cnt; i++)
722 release_case (irs, irs->records[i].record);
723 if (irs->last_output != NULL)
724 release_case (irs, irs->last_output);
726 /* Free cases in free list. */
727 for (iter = irs->free_list; iter != NULL; iter = next)
736 if (irs->output_file != NULL)
737 close_temp_file (irs->xsrt, irs->file_idx, irs->output_file);
740 /* Allocates room for lots of cases as a buffer. */
742 allocate_cases (struct initial_run_state *irs)
744 size_t case_size; /* Size of one case, in bytes. */
745 int approx_case_cost; /* Approximate memory cost of one case in bytes. */
746 int max_cases; /* Maximum number of cases to allocate. */
749 /* Allocate as many cases as we can within the workspace
751 case_size = dict_get_case_size (default_dict);
752 approx_case_cost = (sizeof *irs->records
753 + sizeof *irs->free_list
755 + 4 * sizeof (void *));
756 max_cases = set_max_workspace / approx_case_cost;
757 irs->records = malloc (sizeof *irs->records * max_cases);
758 for (i = 0; i < max_cases; i++)
761 c = malloc (sizeof *c + case_size - sizeof (union value));
767 release_case (irs, c);
770 /* irs->records gets all but one of the allocated cases.
771 The extra is used for last_output. */
772 irs->record_cap = max_cases - 1;
774 /* Fail if we didn't allocate an acceptable number of cases. */
775 if (irs->records == NULL || max_cases < MIN_BUFFER_TOTAL_SIZE_RECS)
777 msg (SE, _("Out of memory. Could not allocate room for minimum of %d "
778 "cases of %d bytes each. (PSPP workspace is currently "
779 "restricted to a maximum of %d KB.)"),
780 MIN_BUFFER_TOTAL_SIZE_RECS, approx_case_cost, set_max_workspace / 1024);
786 /* Compares the VAR_CNT variables in VARS[] between the `value's at
787 A and B, and returns a strcmp()-type result. */
789 compare_record (const union value *a, const union value *b,
790 const struct sort_cases_pgm *scp)
797 for (i = 0; i < scp->var_cnt; i++)
799 struct variable *v = scp->vars[i];
803 if (v->type == NUMERIC)
808 result = af < bf ? -1 : af > bf;
811 result = memcmp (a[fv].s, b[fv].s, v->width);
815 if (scp->dirs[i] == SRT_DESCEND)
825 compare_record_run (const struct record_run *a,
826 const struct record_run *b,
827 struct sort_cases_pgm *scp)
829 if (a->run != b->run)
830 return a->run > b->run ? 1 : -1;
832 return compare_record (a->record->c.data, b->record->c.data, scp);
836 compare_record_run_minheap (const void *a, const void *b, void *scp)
838 return -compare_record_run (a, b, scp);
841 /* Begins a new initial run, specifically its output file. */
843 start_run (struct initial_run_state *irs)
845 irs->file_idx = irs->xsrt->next_file_idx++;
847 irs->output_file = open_temp_file (irs->xsrt, irs->file_idx, "wb");
848 if (irs->output_file == NULL)
850 if (irs->last_output != NULL)
852 release_case (irs, irs->last_output);
853 irs->last_output = NULL;
857 /* Ends the current initial run. */
859 end_run (struct initial_run_state *irs)
861 struct external_sort *xsrt = irs->xsrt;
863 /* Record initial run. */
864 if (xsrt->run_cnt >= xsrt->run_cap)
868 = xrealloc (xsrt->initial_runs,
869 sizeof *xsrt->initial_runs * xsrt->run_cap);
871 xsrt->initial_runs[xsrt->run_cnt].file_idx = irs->file_idx;
872 xsrt->initial_runs[xsrt->run_cnt].case_cnt = irs->case_cnt;
875 /* Close file handle. */
876 if (irs->output_file != NULL
877 && !close_temp_file (irs->xsrt, irs->file_idx, irs->output_file))
879 irs->output_file = NULL;
883 output_record (struct initial_run_state *irs)
885 struct record_run *record_run;
886 struct ccase *out_case;
888 /* Extract minimum case from heap. */
889 assert (irs->record_cnt > 0);
890 pop_heap (irs->records, irs->record_cnt--, sizeof *irs->records,
891 compare_record_run_minheap, irs->xsrt->scp);
892 record_run = irs->records + irs->record_cnt;
894 /* Bail if an error has occurred. */
898 /* Obtain case data to write to disk. */
899 out_case = &record_run->record->c;
900 if (compaction_necessary)
902 compact_case (compaction_case, out_case);
903 out_case = compaction_case;
906 /* Start new run if necessary. */
907 assert (record_run->run == irs->file_idx
908 || record_run->run == irs->xsrt->next_file_idx);
909 if (record_run->run != irs->file_idx)
914 assert (record_run->run == irs->file_idx);
918 if (irs->output_file != NULL
919 && !write_temp_file (irs->xsrt, irs->file_idx, irs->output_file,
921 sizeof *out_case->data * compaction_nval))
924 /* This record becomes last_output. */
925 if (irs->last_output != NULL)
926 release_case (irs, irs->last_output);
927 irs->last_output = record_run->record;
930 static struct case_list *
931 grab_case (struct initial_run_state *irs)
935 assert (irs != NULL);
936 assert (irs->free_list != NULL);
939 irs->free_list = c->next;
944 release_case (struct initial_run_state *irs, struct case_list *c)
946 assert (irs != NULL);
949 c->next = irs->free_list;
957 struct external_sort *xsrt; /* External sort state. */
958 struct ccase **cases; /* Buffers. */
959 size_t case_cnt; /* Number of buffers. */
963 static int merge_once (struct merge_state *,
964 const struct initial_run[], size_t,
965 struct initial_run *);
966 static int fill_run_buffer (struct merge_state *, struct run *);
967 static int mod (int, int);
969 /* Performs a series of P-way merges of initial runs
972 merge (struct external_sort *xsrt)
974 struct merge_state mrg; /* State of merge. */
975 size_t case_size; /* Size of one case, in bytes. */
976 size_t approx_case_cost; /* Approximate memory cost of one case. */
977 int max_order; /* Maximum order of merge. */
978 size_t dummy_run_cnt; /* Number of dummy runs to insert. */
984 /* Allocate as many cases as possible into cases. */
985 case_size = dict_get_case_size (default_dict);
986 approx_case_cost = sizeof *mrg.cases + case_size + 4 * sizeof (void *);
987 mrg.case_cnt = set_max_workspace / approx_case_cost;
988 mrg.cases = malloc (sizeof *mrg.cases * mrg.case_cnt);
989 if (mrg.cases == NULL)
991 for (i = 0; i < mrg.case_cnt; i++)
993 mrg.cases[i] = malloc (case_size);
994 if (mrg.cases[i] == NULL)
1000 if (mrg.case_cnt < MIN_BUFFER_TOTAL_SIZE_RECS)
1002 msg (SE, _("Out of memory. Could not allocate room for minimum of %d "
1003 "cases of %d bytes each. (PSPP workspace is currently "
1004 "restricted to a maximum of %d KB.)"),
1005 MIN_BUFFER_TOTAL_SIZE_RECS, approx_case_cost, set_max_workspace / 1024);
1009 /* Determine maximum order of merge. */
1010 max_order = MAX_MERGE_ORDER;
1011 if (mrg.case_cnt / max_order < MIN_BUFFER_SIZE_RECS)
1012 max_order = mrg.case_cnt / MIN_BUFFER_SIZE_RECS;
1013 else if (mrg.case_cnt / max_order * case_size < MIN_BUFFER_SIZE_BYTES)
1014 max_order = mrg.case_cnt / (MIN_BUFFER_SIZE_BYTES / case_size);
1017 if (max_order > xsrt->run_cnt)
1018 max_order = xsrt->run_cnt;
1020 /* Repeatedly merge the P shortest existing runs until only one run
1022 make_heap (xsrt->initial_runs, xsrt->run_cnt, sizeof *xsrt->initial_runs,
1023 compare_initial_runs, NULL);
1024 dummy_run_cnt = mod (1 - (int) xsrt->run_cnt, max_order - 1);
1025 assert (max_order == 1
1026 || (xsrt->run_cnt + dummy_run_cnt) % (max_order - 1) == 1);
1027 while (xsrt->run_cnt > 1)
1029 struct initial_run output_run;
1033 /* Choose order of merge (max_order after first merge). */
1034 order = max_order - dummy_run_cnt;
1037 /* Choose runs to merge. */
1038 assert (xsrt->run_cnt >= order);
1039 for (i = 0; i < order; i++)
1040 pop_heap (xsrt->initial_runs, xsrt->run_cnt--,
1041 sizeof *xsrt->initial_runs,
1042 compare_initial_runs, NULL);
1045 if (!merge_once (&mrg, xsrt->initial_runs + xsrt->run_cnt, order,
1049 /* Add output run to heap. */
1050 xsrt->initial_runs[xsrt->run_cnt++] = output_run;
1051 push_heap (xsrt->initial_runs, xsrt->run_cnt, sizeof *xsrt->initial_runs,
1052 compare_initial_runs, NULL);
1055 /* Exactly one run is left, which contains the entire sorted
1056 file. We could use it to find a total case count. */
1057 assert (xsrt->run_cnt == 1);
1062 for (i = 0; i < mrg.case_cnt; i++)
1063 free (mrg.cases[i]);
1069 /* Modulo function as defined by Knuth. */
1077 else if (x > 0 && y > 0)
1079 else if (x < 0 && y > 0)
1080 return y - (-x) % y;
1085 /* A run of data for use in merging. */
1088 FILE *file; /* File that contains run. */
1089 int file_idx; /* Index of file that contains run. */
1090 struct ccase **buffer; /* Case buffer. */
1091 struct ccase **buffer_head; /* First unconsumed case in buffer. */
1092 struct ccase **buffer_tail; /* One past last unconsumed case in buffer. */
1093 size_t buffer_cap; /* Number of cases buffer can hold. */
1094 size_t unread_case_cnt; /* Number of cases not yet read. */
1097 /* Merges the RUN_CNT initial runs specified in INPUT_RUNS into a
1098 new run. Returns nonzero only if successful. Adds an entry
1099 to MRG->xsrt->runs for the output file if and only if the
1100 output file is actually created. Always deletes all the input
1103 merge_once (struct merge_state *mrg,
1104 const struct initial_run input_runs[],
1106 struct initial_run *output_run)
1108 struct run runs[MAX_MERGE_ORDER];
1109 FILE *output_file = NULL;
1114 /* Initialize runs[]. */
1115 for (i = 0; i < run_cnt; i++)
1117 runs[i].file = NULL;
1118 runs[i].file_idx = input_runs[i].file_idx;
1119 runs[i].buffer = mrg->cases + mrg->case_cnt / run_cnt * i;
1120 runs[i].buffer_head = runs[i].buffer;
1121 runs[i].buffer_tail = runs[i].buffer;
1122 runs[i].buffer_cap = mrg->case_cnt / run_cnt;
1123 runs[i].unread_case_cnt = input_runs[i].case_cnt;
1126 /* Open input files. */
1127 for (i = 0; i < run_cnt; i++)
1129 runs[i].file = open_temp_file (mrg->xsrt, runs[i].file_idx, "rb");
1130 if (runs[i].file == NULL)
1134 /* Create output file and count cases to be output. */
1135 output_run->file_idx = mrg->xsrt->next_file_idx++;
1136 output_run->case_cnt = 0;
1137 for (i = 0; i < run_cnt; i++)
1138 output_run->case_cnt += input_runs[i].case_cnt;
1139 output_file = open_temp_file (mrg->xsrt, output_run->file_idx, "wb");
1140 if (output_file == NULL)
1143 /* Prime buffers. */
1144 for (i = 0; i < run_cnt; i++)
1145 if (!fill_run_buffer (mrg, runs + i))
1149 case_size = dict_get_case_size (default_dict);
1152 struct run *min_run;
1156 for (i = 1; i < run_cnt; i++)
1157 if (compare_record ((*runs[i].buffer_head)->data,
1158 (*min_run->buffer_head)->data,
1159 mrg->xsrt->scp) < 0)
1162 /* Write minimum to output file. */
1163 if (!write_temp_file (mrg->xsrt, min_run->file_idx, output_file,
1164 (*min_run->buffer_head)->data, case_size))
1167 /* Remove case from buffer. */
1168 if (++min_run->buffer_head >= min_run->buffer_tail)
1170 /* Buffer is empty. Fill from file. */
1171 if (!fill_run_buffer (mrg, min_run))
1174 /* If buffer is still empty, delete its run. */
1175 if (min_run->buffer_head >= min_run->buffer_tail)
1177 close_temp_file (mrg->xsrt, min_run->file_idx, min_run->file);
1178 remove_temp_file (mrg->xsrt, min_run->file_idx);
1179 *min_run = runs[--run_cnt];
1181 /* We could donate the now-unused buffer space to
1187 /* Close output file. */
1188 close_temp_file (mrg->xsrt, output_run->file_idx, output_file);
1193 /* Close and remove output file. */
1194 if (output_file != NULL)
1196 close_temp_file (mrg->xsrt, output_run->file_idx, output_file);
1197 remove_temp_file (mrg->xsrt, output_run->file_idx);
1200 /* Close and remove any remaining input runs. */
1201 for (i = 0; i < run_cnt; i++)
1203 close_temp_file (mrg->xsrt, runs[i].file_idx, runs[i].file);
1204 remove_temp_file (mrg->xsrt, runs[i].file_idx);
1210 /* Reads as many cases as possible into RUN's buffer.
1211 Reads nonzero unless a disk error occurs. */
1213 fill_run_buffer (struct merge_state *mrg, struct run *run)
1215 run->buffer_head = run->buffer_tail = run->buffer;
1216 while (run->unread_case_cnt > 0
1217 && run->buffer_tail < run->buffer + run->buffer_cap)
1219 if (!read_temp_file (mrg->xsrt, run->file_idx, run->file,
1220 (*run->buffer_tail)->data,
1221 dict_get_case_size (default_dict)))
1224 run->unread_case_cnt--;
1232 sort_sink_destroy (struct case_sink *sink UNUSED)
1237 static struct case_source *
1238 sort_sink_make_source (struct case_sink *sink)
1240 struct initial_run_state *irs = sink->aux;
1242 return create_case_source (&sort_source_class, irs->xsrt->scp);
1245 const struct case_sink_class sort_sink_class =
1251 sort_sink_make_source,
1254 /* Reads all the records from the source stream and passes them
1257 sort_source_read (struct case_source *source,
1258 write_case_func *write_case, write_case_data wc_data)
1260 struct sort_cases_pgm *scp = source->aux;
1262 read_sort_output (scp, write_case, wc_data);
1265 void read_internal_sort_output (struct internal_sort *isrt,
1266 write_case_func *write_case,
1267 write_case_data wc_data);
1268 void read_external_sort_output (struct external_sort *xsrt,
1269 write_case_func *write_case,
1270 write_case_data wc_data);
1272 /* Reads all the records from the output stream and passes them to the
1273 function provided, which must have an interface identical to
1276 read_sort_output (struct sort_cases_pgm *scp,
1277 write_case_func *write_case, write_case_data wc_data)
1279 assert ((scp->isrt != NULL) + (scp->xsrt != NULL) <= 1);
1280 if (scp->isrt != NULL)
1281 read_internal_sort_output (scp->isrt, write_case, wc_data);
1282 else if (scp->xsrt != NULL)
1283 read_external_sort_output (scp->xsrt, write_case, wc_data);
1286 /* No results. Probably an external sort that failed. */
1291 read_internal_sort_output (struct internal_sort *isrt,
1292 write_case_func *write_case,
1293 write_case_data wc_data)
1295 struct ccase *save_temp_case = temp_case;
1296 struct case_list **p;
1298 for (p = isrt->results; *p; p++)
1300 temp_case = &(*p)->c;
1301 write_case (wc_data);
1303 free (isrt->results);
1305 temp_case = save_temp_case;
1309 read_external_sort_output (struct external_sort *xsrt,
1310 write_case_func *write_case,
1311 write_case_data wc_data)
1317 assert (xsrt->run_cnt == 1);
1318 file_idx = xsrt->initial_runs[0].file_idx;
1320 file = open_temp_file (xsrt, file_idx, "rb");
1327 for (i = 0; i < xsrt->initial_runs[0].case_cnt; i++)
1329 if (!read_temp_file (xsrt, file_idx, file,
1330 temp_case, xsrt->case_size))
1336 if (!write_case (wc_data))
1342 sort_source_destroy (struct case_source *source)
1344 struct sort_cases_pgm *scp = source->aux;
1346 destroy_sort_cases_pgm (scp);
1349 const struct case_source_class sort_source_class =
1354 sort_source_destroy,