X-Git-Url: https://pintos-os.org/cgi-bin/gitweb.cgi?a=blobdiff_plain;f=src%2Fsort.c;h=8dad3566ccdcca1ad8e9fabf63ace81b42d7019f;hb=40271dcbfdecb01dfe808684741215eb2ddeb508;hp=6e518befabc34f2d6f9fdc6edcfe1065893a0709;hpb=1d985886f778e35f8d89c4e3c897b79fde8de6ed;p=pspp-builds.git diff --git a/src/sort.c b/src/sort.c index 6e518bef..8dad3566 100644 --- a/src/sort.c +++ b/src/sort.c @@ -14,68 +14,44 @@ You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software - Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA - 02111-1307, USA. */ + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + 02110-1301, USA. */ #include #include "sort.h" #include "error.h" +#include #include #include #include #include "algorithm.h" #include "alloc.h" -#include "bool.h" +#include #include "case.h" #include "casefile.h" #include "command.h" #include "error.h" #include "expressions/public.h" +#include "glob.h" #include "lexer.h" #include "misc.h" #include "settings.h" +#include "sort-prs.h" #include "str.h" #include "var.h" #include "vfm.h" #include "vfmP.h" -#if HAVE_UNISTD_H -#include -#endif - -#if HAVE_SYS_TYPES_H -#include -#endif - -#if HAVE_SYS_STAT_H -#include -#endif +#include "gettext.h" +#define _(msgid) gettext (msgid) #include "debug-print.h" -/* Sort direction. */ -enum sort_direction - { - SRT_ASCEND, /* A, B, C, ..., X, Y, Z. */ - SRT_DESCEND /* Z, Y, X, ..., C, B, A. */ - }; - -/* A sort criterion. */ -struct sort_criterion - { - int fv; /* Variable data index. */ - int width; /* 0=numeric, otherwise string widthe. */ - enum sort_direction dir; /* Sort direction. */ - }; +/* These should only be changed for testing purposes. */ +static int min_buffers = 64; +static int max_buffers = INT_MAX; +static bool allow_internal_sort = true; -/* A set of sort criteria. */ -struct sort_criteria - { - struct sort_criterion *crits; - size_t crit_cnt; - }; - -static int compare_case_dblptrs (const void *, const void *, void *); static int compare_record (const struct ccase *, const struct ccase *, const struct sort_criteria *); static struct casefile *do_internal_sort (struct casereader *, @@ -88,15 +64,38 @@ int cmd_sort_cases (void) { struct sort_criteria *criteria; - int success; + bool success = false; lex_match (T_BY); - criteria = sort_parse_criteria (default_dict, NULL, NULL, NULL); + criteria = sort_parse_criteria (default_dict, NULL, NULL, NULL, NULL); if (criteria == NULL) return CMD_FAILURE; + if (get_testing_mode () && lex_match ('/')) + { + if (!lex_force_match_id ("BUFFERS") || !lex_match ('=') + || !lex_force_int ()) + goto done; + + min_buffers = max_buffers = lex_integer (); + allow_internal_sort = false; + if (max_buffers < 2) + { + msg (SE, _("Buffer limit must be at least 2.")); + goto done; + } + + lex_get (); + } + success = sort_active_file_in_place (criteria); + + done: + min_buffers = 64; + max_buffers = INT_MAX; + allow_internal_sort = true; + sort_destroy_criteria (criteria); return success ? lex_end_of_command () : CMD_FAILURE; } @@ -134,7 +133,7 @@ sort_active_file_in_place (const struct sort_criteria *criteria) if (dst == NULL) return 0; - vfm_source = storage_source_create (dst, default_dict); + vfm_source = storage_source_create (dst); return 1; } @@ -152,100 +151,6 @@ sort_active_file_to_casefile (const struct sort_criteria *criteria) return sort_execute (casefile_get_reader (src), criteria); } -/* Parses a list of sort keys and returns a struct sort_criteria - based on it. Returns a null pointer on error. - If SAW_DIRECTION is nonnull, sets *SAW_DIRECTION to true if at - least one parenthesized sort direction was specified, false - otherwise. */ -struct sort_criteria * -sort_parse_criteria (const struct dictionary *dict, - struct variable ***vars, int *var_cnt, - bool *saw_direction) -{ - struct sort_criteria *criteria; - struct variable **local_vars = NULL; - size_t local_var_cnt; - - assert ((vars == NULL) == (var_cnt == NULL)); - if (vars == NULL) - { - vars = &local_vars; - var_cnt = &local_var_cnt; - } - - criteria = xmalloc (sizeof *criteria); - criteria->crits = NULL; - criteria->crit_cnt = 0; - - *vars = NULL; - *var_cnt = 0; - if (saw_direction != NULL) - *saw_direction = false; - - do - { - int prev_var_cnt = *var_cnt; - enum sort_direction direction; - - /* Variables. */ - if (!parse_variables (dict, vars, var_cnt, - PV_NO_DUPLICATE | PV_APPEND | PV_NO_SCRATCH)) - goto error; - - /* Sort direction. */ - if (lex_match ('(')) - { - if (lex_match_id ("D") || lex_match_id ("DOWN")) - direction = SRT_DESCEND; - else if (lex_match_id ("A") || lex_match_id ("UP")) - direction = SRT_ASCEND; - else - { - msg (SE, _("`A' or `D' expected inside parentheses.")); - goto error; - } - if (!lex_match (')')) - { - msg (SE, _("`)' expected.")); - goto error; - } - *saw_direction = true; - } - else - direction = SRT_ASCEND; - - criteria->crits = xrealloc (criteria->crits, - sizeof *criteria->crits * *var_cnt); - criteria->crit_cnt = *var_cnt; - for (; prev_var_cnt < criteria->crit_cnt; prev_var_cnt++) - { - struct sort_criterion *c = &criteria->crits[prev_var_cnt]; - c->fv = (*vars)[prev_var_cnt]->fv; - c->width = (*vars)[prev_var_cnt]->width; - c->dir = direction; - } - } - while (token != '.' && token != '/'); - - free (local_vars); - return criteria; - - error: - free (local_vars); - sort_destroy_criteria (criteria); - return NULL; -} - -/* Destroys a SORT CASES program. */ -void -sort_destroy_criteria (struct sort_criteria *criteria) -{ - if (criteria != NULL) - { - free (criteria->crits); - free (criteria); - } -} /* Reads all the cases from READER, which is destroyed. Sorts the cases according to CRITERIA. Returns the sorted cases in @@ -253,122 +158,113 @@ sort_destroy_criteria (struct sort_criteria *criteria) struct casefile * sort_execute (struct casereader *reader, const struct sort_criteria *criteria) { - struct casefile *output; - - output = do_internal_sort (reader, criteria); + struct casefile *output = do_internal_sort (reader, criteria); if (output == NULL) output = do_external_sort (reader, criteria); casereader_destroy (reader); return output; } +/* A case and its index. */ +struct indexed_case + { + struct ccase c; /* Case. */ + unsigned long idx; /* Index to allow for stable sorting. */ + }; + +static int compare_indexed_cases (const void *, const void *, void *); + /* If the data is in memory, do an internal sort and return a new - casefile for the data. */ + casefile for the data. Otherwise, return a null pointer. */ static struct casefile * do_internal_sort (struct casereader *reader, const struct sort_criteria *criteria) { const struct casefile *src; struct casefile *dst; - struct ccase *cases, **case_ptrs; unsigned long case_cnt; + if (!allow_internal_sort) + return NULL; + src = casereader_get_casefile (reader); if (casefile_get_case_cnt (src) > 1 && !casefile_in_core (src)) return NULL; case_cnt = casefile_get_case_cnt (src); - cases = malloc (sizeof *cases * case_cnt); - case_ptrs = malloc (sizeof *case_ptrs * case_cnt); - if ((cases != NULL && case_ptrs != NULL) || case_cnt == 0) + dst = casefile_create (casefile_get_value_cnt (src)); + if (case_cnt != 0) { - unsigned long case_idx; + struct indexed_case *cases = nmalloc (sizeof *cases, case_cnt); + if (cases != NULL) + { + unsigned long i; + + for (i = 0; i < case_cnt; i++) + { + casereader_read_xfer_assert (reader, &cases[i].c); + cases[i].idx = i; + } + + sort (cases, case_cnt, sizeof *cases, compare_indexed_cases, + (void *) criteria); - for (case_idx = 0; case_idx < case_cnt; case_idx++) + for (i = 0; i < case_cnt; i++) + casefile_append_xfer (dst, &cases[i].c); + + free (cases); + } + else { - int success = casereader_read_xfer (reader, &cases[case_idx]); - assert (success); - case_ptrs[case_idx] = &cases[case_idx]; + /* Failure. */ + casefile_destroy (dst); + dst = NULL; } - - sort (case_ptrs, case_cnt, sizeof *case_ptrs, compare_case_dblptrs, - (void *) criteria); - - dst = casefile_create (casefile_get_value_cnt (src)); - for (case_idx = 0; case_idx < case_cnt; case_idx++) - casefile_append_xfer (dst, case_ptrs[case_idx]); } - else - dst = NULL; - - free (case_ptrs); - free (cases); return dst; } /* Compares the variables specified by CRITERIA between the cases - at A and B, and returns a strcmp()-type result. */ + at A and B, with a "last resort" comparison for stability, and + returns a strcmp()-type result. */ static int -compare_case_dblptrs (const void *a_, const void *b_, void *criteria_) +compare_indexed_cases (const void *a_, const void *b_, void *criteria_) { struct sort_criteria *criteria = criteria_; - struct ccase *const *pa = a_; - struct ccase *const *pb = b_; - struct ccase *a = *pa; - struct ccase *b = *pb; - - return compare_record (a, b, criteria); + const struct indexed_case *a = a_; + const struct indexed_case *b = b_; + int result = compare_record (&a->c, &b->c, criteria); + if (result == 0) + result = a->idx < b->idx ? -1 : a->idx > b->idx; + return result; } /* External sort. */ -/* Maximum order of merge. If you increase this, then you should - use a heap for comparing cases during merge. */ -#define MAX_MERGE_ORDER 7 - -/* Minimum total number of records for buffers. */ -#define MIN_BUFFER_TOTAL_SIZE_RECS 64 - -/* Minimum single input buffer size, in bytes and records. */ -#define MIN_BUFFER_SIZE_BYTES 4096 -#define MIN_BUFFER_SIZE_RECS 16 - -#if MIN_BUFFER_SIZE_RECS * 2 + 16 > MIN_BUFFER_TOTAL_SIZE_RECS -#error MIN_BUFFER_SIZE_RECS and MIN_BUFFER_TOTAL_SIZE_RECS do not make sense. -#endif - -/* Sorts initial runs A and B in decending order by length. */ -static int -compare_initial_runs (const void *a_, const void *b_, void *aux UNUSED) -{ - struct casefile *const *a = a_; - struct casefile *const *b = b_; - unsigned long a_case_cnt = casefile_get_case_cnt (*a); - unsigned long b_case_cnt = casefile_get_case_cnt (*b); - - return a_case_cnt > b_case_cnt ? -1 : a_case_cnt < b_case_cnt; -} +/* Maximum order of merge (external sort only). The maximum + reasonable value is about 7. Above that, it would be a good + idea to use a heap in merge_once() to select the minimum. */ +#define MAX_MERGE_ORDER 7 /* Results of an external sort. */ struct external_sort { const struct sort_criteria *criteria; /* Sort criteria. */ size_t value_cnt; /* Size of data in `union value's. */ - struct casefile **initial_runs; /* Array of initial runs. */ + struct casefile **runs; /* Array of initial runs. */ size_t run_cnt, run_cap; /* Number of runs, allocated capacity. */ }; /* Prototypes for helper functions. */ -static int write_initial_runs (struct external_sort *, struct casereader *); -static int merge (struct external_sort *); +static int write_runs (struct external_sort *, struct casereader *); +static struct casefile *merge (struct external_sort *); static void destroy_external_sort (struct external_sort *); -/* Performs an external sort of the active file according to the - specification in SCP. Forms initial runs using a heap as a - reservoir. Determines the optimum merge pattern via Huffman's - method (see Knuth vol. 3, 2nd edition, p. 365-366), and merges - according to that pattern. */ +/* Performs a stable external sort of the active file according + to the specification in SCP. Forms initial runs using a heap + as a reservoir. Merges the initial runs according to a + pattern that assures stability. */ static struct casefile * do_external_sort (struct casereader *reader, const struct sort_criteria *criteria) @@ -382,11 +278,10 @@ do_external_sort (struct casereader *reader, xsrt->value_cnt = casefile_get_value_cnt (casereader_get_casefile (reader)); xsrt->run_cap = 512; xsrt->run_cnt = 0; - xsrt->initial_runs = xmalloc (sizeof *xsrt->initial_runs * xsrt->run_cap); - if (write_initial_runs (xsrt, reader) && merge (xsrt)) + xsrt->runs = xnmalloc (xsrt->run_cap, sizeof *xsrt->runs); + if (write_runs (xsrt, reader)) { - struct casefile *output = xsrt->initial_runs[0]; - xsrt->initial_runs[0] = NULL; + struct casefile *output = merge (xsrt); destroy_external_sort (xsrt); return output; } @@ -406,8 +301,8 @@ destroy_external_sort (struct external_sort *xsrt) int i; for (i = 0; i < xsrt->run_cnt; i++) - casefile_destroy (xsrt->initial_runs[i]); - free (xsrt->initial_runs); + casefile_destroy (xsrt->runs[i]); + free (xsrt->runs); free (xsrt); } } @@ -419,6 +314,7 @@ struct record_run { int run; /* Run number of case. */ struct ccase record; /* Case data. */ + size_t idx; /* Case number (for stability). */ }; /* Represents a set of initial runs during an external sort. */ @@ -443,7 +339,8 @@ struct initial_run_state static const struct case_sink_class sort_sink_class; static void destroy_initial_run_state (struct initial_run_state *); -static void process_case (struct initial_run_state *, const struct ccase *); +static void process_case (struct initial_run_state *, const struct ccase *, + size_t); static int allocate_cases (struct initial_run_state *); static void output_record (struct initial_run_state *); static void start_run (struct initial_run_state *); @@ -455,10 +352,11 @@ static int compare_record_run_minheap (const void *, const void *, void *); /* Reads cases from READER and composes initial runs in XSRT. */ static int -write_initial_runs (struct external_sort *xsrt, struct casereader *reader) +write_runs (struct external_sort *xsrt, struct casereader *reader) { struct initial_run_state *irs; struct ccase c; + size_t idx = 0; int success = 0; /* Allocate memory for cases. */ @@ -477,7 +375,7 @@ write_initial_runs (struct external_sort *xsrt, struct casereader *reader) /* Create initial runs. */ start_run (irs); for (; irs->okay && casereader_read (reader, &c); case_destroy (&c)) - process_case (irs, &c); + process_case (irs, &c, idx++); while (irs->okay && irs->record_cnt > 0) output_record (irs); end_run (irs); @@ -492,18 +390,19 @@ write_initial_runs (struct external_sort *xsrt, struct casereader *reader) /* Add a single case to an initial run. */ static void -process_case (struct initial_run_state *irs, const struct ccase *c) +process_case (struct initial_run_state *irs, const struct ccase *c, size_t idx) { - struct record_run *new_record_run; + struct record_run *rr; /* Compose record_run for this run and add to heap. */ assert (irs->record_cnt < irs->record_cap - 1); - new_record_run = irs->records + irs->record_cnt++; - case_copy (&new_record_run->record, 0, c, 0, irs->xsrt->value_cnt); - new_record_run->run = irs->run; + rr = irs->records + irs->record_cnt++; + case_copy (&rr->record, 0, c, 0, irs->xsrt->value_cnt); + rr->run = irs->run; + rr->idx = idx; if (!case_is_null (&irs->last_output) && compare_record (c, &irs->last_output, irs->xsrt->criteria) < 0) - new_record_run->run = irs->run + 1; + rr->run = irs->run + 1; push_heap (irs->records, irs->record_cnt, sizeof *irs->records, compare_record_run_minheap, irs); @@ -544,23 +443,26 @@ allocate_cases (struct initial_run_state *irs) approx_case_cost = (sizeof *irs->records + irs->xsrt->value_cnt * sizeof (union value) + 4 * sizeof (void *)); - max_cases = get_max_workspace() / approx_case_cost; - irs->records = malloc (sizeof *irs->records * max_cases); - for (i = 0; i < max_cases; i++) - if (!case_try_create (&irs->records[i].record, irs->xsrt->value_cnt)) - { - max_cases = i; - break; - } + max_cases = get_workspace() / approx_case_cost; + if (max_cases > max_buffers) + max_cases = max_buffers; + irs->records = nmalloc (sizeof *irs->records, max_cases); + if (irs->records != NULL) + for (i = 0; i < max_cases; i++) + if (!case_try_create (&irs->records[i].record, irs->xsrt->value_cnt)) + { + max_cases = i; + break; + } irs->record_cap = max_cases; /* Fail if we didn't allocate an acceptable number of cases. */ - if (irs->records == NULL || max_cases < MIN_BUFFER_TOTAL_SIZE_RECS) + if (irs->records == NULL || max_cases < min_buffers) { msg (SE, _("Out of memory. Could not allocate room for minimum of %d " "cases of %d bytes each. (PSPP workspace is currently " "restricted to a maximum of %d KB.)"), - MIN_BUFFER_TOTAL_SIZE_RECS, approx_case_cost, get_max_workspace() / 1024); + min_buffers, approx_case_cost, get_workspace() / 1024); return 0; } return 1; @@ -600,16 +502,18 @@ compare_record (const struct ccase *a, const struct ccase *b, } /* Compares record-run tuples A and B on run number first, then - on the current record according to SCP. */ + on record, then on case index. */ static int compare_record_run (const struct record_run *a, const struct record_run *b, struct initial_run_state *irs) { - if (a->run != b->run) - return a->run > b->run ? 1 : -1; - else - return compare_record (&a->record, &b->record, irs->xsrt->criteria); + int result = a->run < b->run ? -1 : a->run > b->run; + if (result == 0) + result = compare_record (&a->record, &b->record, irs->xsrt->criteria); + if (result == 0) + result = a->idx < b->idx ? -1 : a->idx > b->idx; + return result; } /* Compares record-run tuples A and B on run number first, then @@ -641,14 +545,14 @@ end_run (struct initial_run_state *irs) /* Record initial run. */ if (irs->casefile != NULL) { + casefile_sleep (irs->casefile); if (xsrt->run_cnt >= xsrt->run_cap) { xsrt->run_cap *= 2; - xsrt->initial_runs - = xrealloc (xsrt->initial_runs, - sizeof *xsrt->initial_runs * xsrt->run_cap); + xsrt->runs = xnrealloc (xsrt->runs, + xsrt->run_cap, sizeof *xsrt->runs); } - xsrt->initial_runs[xsrt->run_cnt++] = irs->casefile; + xsrt->runs[xsrt->run_cnt++] = irs->casefile; irs->casefile = NULL; } } @@ -693,193 +597,128 @@ output_record (struct initial_run_state *irs) /* Merging. */ -/* State of merging initial runs. */ -struct merge_state - { - struct external_sort *xsrt; /* External sort state. */ - struct ccase *cases; /* Buffers. */ - size_t case_cnt; /* Number of buffers. */ - }; - -struct run; -static struct casefile *merge_once (struct merge_state *, +static int choose_merge (struct casefile *runs[], int run_cnt, int order); +static struct casefile *merge_once (struct external_sort *, struct casefile *[], size_t); -static int mod (int, int); -/* Performs a series of P-way merges of initial runs. */ -static int +/* Repeatedly merges run until only one is left, + and returns the final casefile. */ +static struct casefile * merge (struct external_sort *xsrt) { - struct merge_state mrg; /* State of merge. */ - size_t approx_case_cost; /* Approximate memory cost of one case. */ - int max_order; /* Maximum order of merge. */ - size_t dummy_run_cnt; /* Number of dummy runs to insert. */ - int success = 0; - int i; - - mrg.xsrt = xsrt; - - /* Allocate as many cases as possible into cases. */ - approx_case_cost = (sizeof *mrg.cases - + xsrt->value_cnt * sizeof (union value) - + 4 * sizeof (void *)); - mrg.case_cnt = get_max_workspace() / approx_case_cost; - mrg.cases = malloc (sizeof *mrg.cases * mrg.case_cnt); - if (mrg.cases == NULL) - goto done; - for (i = 0; i < mrg.case_cnt; i++) - if (!case_try_create (&mrg.cases[i], xsrt->value_cnt)) - { - mrg.case_cnt = i; - break; - } - if (mrg.case_cnt < MIN_BUFFER_TOTAL_SIZE_RECS) - { - msg (SE, _("Out of memory. Could not allocate room for minimum of %d " - "cases of %d bytes each. (PSPP workspace is currently " - "restricted to a maximum of %d KB.)"), - MIN_BUFFER_TOTAL_SIZE_RECS, approx_case_cost, get_max_workspace() / 1024); - return 0; - } - - /* Determine maximum order of merge. */ - max_order = MAX_MERGE_ORDER; - if (mrg.case_cnt / max_order < MIN_BUFFER_SIZE_RECS) - max_order = mrg.case_cnt / MIN_BUFFER_SIZE_RECS; - else if (mrg.case_cnt / max_order * xsrt->value_cnt * sizeof (union value) - < MIN_BUFFER_SIZE_BYTES) - max_order = mrg.case_cnt / (MIN_BUFFER_SIZE_BYTES - / (xsrt->value_cnt * sizeof (union value))); - if (max_order < 2) - max_order = 2; - if (max_order > xsrt->run_cnt) - max_order = xsrt->run_cnt; - - /* Repeatedly merge the P shortest existing runs until only one run - is left. */ - make_heap (xsrt->initial_runs, xsrt->run_cnt, sizeof *xsrt->initial_runs, - compare_initial_runs, NULL); - dummy_run_cnt = mod (1 - (int) xsrt->run_cnt, max_order - 1); - - assert (max_order > 0); - assert (max_order <= 2 - || (xsrt->run_cnt + dummy_run_cnt) % (max_order - 1) == 1); while (xsrt->run_cnt > 1) { - struct casefile *output_run; - int order; - int i; - - /* Choose order of merge (max_order after first merge). */ - order = max_order - dummy_run_cnt; - dummy_run_cnt = 0; - - /* Choose runs to merge. */ - assert (xsrt->run_cnt >= order); - for (i = 0; i < order; i++) - pop_heap (xsrt->initial_runs, xsrt->run_cnt--, - sizeof *xsrt->initial_runs, - compare_initial_runs, NULL); - - /* Merge runs. */ - output_run = merge_once (&mrg, - xsrt->initial_runs + xsrt->run_cnt, order); - if (output_run == NULL) - goto done; - - /* Add output run to heap. */ - xsrt->initial_runs[xsrt->run_cnt++] = output_run; - push_heap (xsrt->initial_runs, xsrt->run_cnt, sizeof *xsrt->initial_runs, - compare_initial_runs, NULL); + int order = min (MAX_MERGE_ORDER, xsrt->run_cnt); + int idx = choose_merge (xsrt->runs, xsrt->run_cnt, order); + xsrt->runs[idx] = merge_once (xsrt, xsrt->runs + idx, order); + remove_range (xsrt->runs, xsrt->run_cnt, sizeof *xsrt->runs, + idx + 1, order - 1); + xsrt->run_cnt -= order - 1; } - - /* Exactly one run is left, which contains the entire sorted - file. We could use it to find a total case count. */ assert (xsrt->run_cnt == 1); - - success = 1; - - done: - for (i = 0; i < mrg.case_cnt; i++) - case_destroy (&mrg.cases[i]); - free (mrg.cases); - - return success; + xsrt->run_cnt = 0; + return xsrt->runs[0]; } -/* Modulo function as defined by Knuth. */ +/* Chooses ORDER runs out of the RUN_CNT runs in RUNS to merge, + and returns the index of the first one. + + For stability, we must merge only consecutive runs. For + efficiency, we choose the shortest consecutive sequence of + runs. */ static int -mod (int x, int y) +choose_merge (struct casefile *runs[], int run_cnt, int order) { - if (y == 0) - return x; - else if (x == 0) - return 0; - else if (x > 0 && y > 0) - return x % y; - else if (x < 0 && y > 0) - return y - (-x) % y; + int min_idx, min_sum; + int cur_idx, cur_sum; + int i; - abort (); + /* Sum up the length of the first ORDER runs. */ + cur_sum = 0; + for (i = 0; i < order; i++) + cur_sum += casefile_get_case_cnt (runs[i]); + + /* Find the shortest group of ORDER runs, + using a running total for efficiency. */ + min_idx = 0; + min_sum = cur_sum; + for (cur_idx = 1; cur_idx + order <= run_cnt; cur_idx++) + { + cur_sum -= casefile_get_case_cnt (runs[cur_idx - 1]); + cur_sum += casefile_get_case_cnt (runs[cur_idx + order - 1]); + if (cur_sum < min_sum) + { + min_sum = cur_sum; + min_idx = cur_idx; + } + } + + return min_idx; } -/* Merges the RUN_CNT initial runs specified in INPUT_RUNS into a - new run. Returns nonzero only if successful. Adds an entry - to MRG->xsrt->runs for the output file if and only if the - output file is actually created. Always deletes all the input - files. */ +/* Merges the RUN_CNT initial runs specified in INPUT_FILES into a + new run, and returns the new run. */ static struct casefile * -merge_once (struct merge_state *mrg, - struct casefile *input_runs[], +merge_once (struct external_sort *xsrt, + struct casefile **const input_files, size_t run_cnt) { - struct casereader *input_readers[MAX_MERGE_ORDER]; - struct ccase input_cases[MAX_MERGE_ORDER]; - struct casefile *output_casefile = NULL; + struct run + { + struct casefile *file; + struct casereader *reader; + struct ccase ccase; + } + *runs; + + struct casefile *output = NULL; int i; + /* Open input files. */ + runs = xnmalloc (run_cnt, sizeof *runs); for (i = 0; i < run_cnt; i++) { - input_readers[i] = casefile_get_destructive_reader (input_runs[i]); - if (!casereader_read_xfer (input_readers[i], &input_cases[i])) + struct run *r = &runs[i]; + r->file = input_files[i]; + r->reader = casefile_get_destructive_reader (r->file); + if (!casereader_read_xfer (r->reader, &r->ccase)) { run_cnt--; i--; } } - - output_casefile = casefile_create (mrg->xsrt->value_cnt); - casefile_to_disk (output_casefile); + + /* Create output file. */ + output = casefile_create (xsrt->value_cnt); + casefile_to_disk (output); /* Merge. */ while (run_cnt > 0) { - size_t min_idx; - + struct run *min_run, *run; + /* Find minimum. */ - min_idx = 0; - for (i = 1; i < run_cnt; i++) - if (compare_record (&input_cases[i], &input_cases[min_idx], - mrg->xsrt->criteria) < 0) - min_idx = i; + min_run = runs; + for (run = runs + 1; run < runs + run_cnt; run++) + if (compare_record (&run->ccase, &min_run->ccase, xsrt->criteria) < 0) + min_run = run; /* Write minimum to output file. */ - casefile_append_xfer (output_casefile, &input_cases[min_idx]); + casefile_append_xfer (output, &min_run->ccase); - if (!casereader_read_xfer (input_readers[min_idx], - &input_cases[min_idx])) + /* Read another case from minimum run. */ + if (!casereader_read_xfer (min_run->reader, &min_run->ccase)) { - casereader_destroy (input_readers[min_idx]); - casefile_destroy (input_runs[min_idx]); + casereader_destroy (min_run->reader); + casefile_destroy (min_run->file); + remove_element (runs, run_cnt, sizeof *runs, min_run - runs); run_cnt--; - input_runs[min_idx] = input_runs[run_cnt]; - input_readers[min_idx] = input_readers[run_cnt]; - input_cases[min_idx] = input_cases[run_cnt]; } } - casefile_sleep (output_casefile); + casefile_sleep (output); + free (runs); - return output_casefile; + return output; }