X-Git-Url: https://pintos-os.org/cgi-bin/gitweb.cgi?a=blobdiff_plain;f=src%2Fmath%2Fsort.c;h=8719d877e096fcc9fc0d0f911d462d7bd0566725;hb=5c3291dc396b795696e94f47780308fd7ace6fc4;hp=827c2314f2e6e54ceab3a3cb597fbc8287993e31;hpb=19d0debdc5b72e1bb6c79956403a4d3bc054f300;p=pspp-builds.git diff --git a/src/math/sort.c b/src/math/sort.c index 827c2314..8719d877 100644 --- a/src/math/sort.c +++ b/src/math/sort.c @@ -1,50 +1,36 @@ -/* PSPP - computes sample statistics. - Copyright (C) 1997-9, 2000 Free Software Foundation, Inc. - Written by Ben Pfaff . +/* PSPP - a program for statistical analysis. + Copyright (C) 1997-9, 2000, 2006, 2009 Free Software Foundation, Inc. - This program is free software; you can redistribute it and/or - modify it under the terms of the GNU General Public License as - published by the Free Software Foundation; either version 2 of the - License, or (at your option) any later version. + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. - This program is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA - 02110-1301, USA. */ + along with this program. If not, see . */ #include #include "sort.h" -#include -#include -#include #include -#include -#include #include -#include -#include -#include +#include +#include +#include #include -#include -#include -#include -#include +#include #include #include -#include -#include -#include -#include +#include -#include "minmax.h" +#include "xalloc.h" #include "gettext.h" #define _(msgid) gettext (msgid) @@ -52,668 +38,273 @@ /* These should only be changed for testing purposes. */ int min_buffers = 64; int max_buffers = INT_MAX; -bool allow_internal_sort = true; -static int compare_record (const struct ccase *, const struct ccase *, - const struct sort_criteria *); -static struct casefile *do_internal_sort (struct casereader *, - const struct sort_criteria *); -static struct casefile *do_external_sort (struct casereader *, - const struct sort_criteria *); +struct sort_writer + { + struct caseproto *proto; + struct subcase ordering; + struct merge *merge; + struct pqueue *pqueue; + + struct casewriter *run; + casenumber run_id; + struct ccase *run_end; + }; +static struct casewriter_class sort_casewriter_class; -/* Sorts the active file in-place according to CRITERIA. - Returns true if successful. */ -bool -sort_active_file_in_place (struct dataset *ds, - const struct sort_criteria *criteria) -{ - struct casefile *in, *out; - - proc_cancel_temporary_transformations (ds); - if (!procedure (ds, NULL, NULL)) - return false; - - in = proc_capture_output (ds); - out = sort_execute (casefile_get_destructive_reader (in), criteria); - if (out == NULL) - return false; - - proc_set_source (ds, storage_source_create (out)); - return true; -} +static struct pqueue *pqueue_create (const struct subcase *, + const struct caseproto *); +static void pqueue_destroy (struct pqueue *); +static bool pqueue_is_full (const struct pqueue *); +static bool pqueue_is_empty (const struct pqueue *); +static void pqueue_push (struct pqueue *, struct ccase *, casenumber); +static struct ccase *pqueue_pop (struct pqueue *, casenumber *); -/* Data passed to sort_to_casefile_callback(). */ -struct sort_to_casefile_cb_data - { - const struct sort_criteria *criteria; - struct casefile *output; - }; +static void output_record (struct sort_writer *); -/* Sorts casefile CF according to the criteria in CB_DATA. */ -static bool -sort_to_casefile_callback (const struct casefile *cf, void *cb_data_) +struct casewriter * +sort_create_writer (const struct subcase *ordering, + const struct caseproto *proto) { - struct sort_to_casefile_cb_data *cb_data = cb_data_; - cb_data->output = sort_execute (casefile_get_reader (cf, NULL), cb_data->criteria); - return cb_data->output != NULL; + struct sort_writer *sort; + + sort = xmalloc (sizeof *sort); + sort->proto = caseproto_ref (proto); + subcase_clone (&sort->ordering, ordering); + sort->merge = merge_create (ordering, proto); + sort->pqueue = pqueue_create (ordering, proto); + sort->run = NULL; + sort->run_id = 0; + sort->run_end = NULL; + + return casewriter_create (proto, &sort_casewriter_class, sort); } -/* Sorts the active file to a separate casefile. If successful, - returns the sorted casefile. Returns a null pointer on - failure. */ -struct casefile * -sort_active_file_to_casefile (struct dataset *ds, - const struct sort_criteria *criteria) +static void +sort_casewriter_write (struct casewriter *writer UNUSED, void *sort_, + struct ccase *c) { - struct sort_to_casefile_cb_data cb_data; - - proc_cancel_temporary_transformations (ds); + struct sort_writer *sort = sort_; + bool next_run; - cb_data.criteria = criteria; - cb_data.output = NULL; - if (!multipass_procedure (ds, sort_to_casefile_callback, &cb_data)) - { - casefile_destroy (cb_data.output); - return NULL; - } - return cb_data.output; -} + if (pqueue_is_full (sort->pqueue)) + output_record (sort); + next_run = (sort->run_end == NULL + || subcase_compare_3way (&sort->ordering, c, + &sort->ordering, sort->run_end) < 0); + pqueue_push (sort->pqueue, c, sort->run_id + (next_run ? 1 : 0)); +} -/* Reads all the cases from READER, which is destroyed. Sorts - the cases according to CRITERIA. Returns the sorted cases in - a newly created casefile. */ -struct casefile * -sort_execute (struct casereader *reader, const struct sort_criteria *criteria) +static void +sort_casewriter_destroy (struct casewriter *writer UNUSED, void *sort_) { - struct casefile *output = do_internal_sort (reader, criteria); - if (output == NULL) - output = do_external_sort (reader, criteria); - casereader_destroy (reader); - return output; + struct sort_writer *sort = sort_; + + subcase_destroy (&sort->ordering); + merge_destroy (sort->merge); + pqueue_destroy (sort->pqueue); + casewriter_destroy (sort->run); + case_unref (sort->run_end); + caseproto_unref (sort->proto); + free (sort); } - -/* A case and its index. */ -struct indexed_case - { - struct ccase c; /* Case. */ - unsigned long idx; /* Index to allow for stable sorting. */ - }; - -static int compare_indexed_cases (const void *, const void *, const void *); -/* If the data is in memory, do an internal sort and return a new - casefile for the data. Otherwise, return a null pointer. */ -static struct casefile * -do_internal_sort (struct casereader *reader, - const struct sort_criteria *criteria) +static struct casereader * +sort_casewriter_convert_to_reader (struct casewriter *writer, void *sort_) { - const struct casefile *src; - struct casefile *dst; - unsigned long case_cnt; - - if (!allow_internal_sort) - return NULL; - - src = casereader_get_casefile (reader); - if (casefile_get_case_cnt (src) > 1 && !casefile_in_core (src)) - return NULL; - - case_cnt = casefile_get_case_cnt (src); - dst = fastfile_create (casefile_get_value_cnt (src)); - if (case_cnt != 0) + struct sort_writer *sort = sort_; + struct casereader *output; + + if (sort->run == NULL && sort->run_id == 0) { - struct indexed_case *cases = nmalloc (sizeof *cases, case_cnt); - if (cases != NULL) - { - unsigned long i; - - for (i = 0; i < case_cnt; i++) - { - bool ok = casereader_read_xfer (reader, &cases[i].c); - if (!ok) - NOT_REACHED (); - cases[i].idx = i; - } - - sort (cases, case_cnt, sizeof *cases, compare_indexed_cases, - (void *) criteria); - - for (i = 0; i < case_cnt; i++) - casefile_append_xfer (dst, &cases[i].c); - if (casefile_error (dst)) - NOT_REACHED (); - - free (cases); - } - else - { - /* Failure. */ - casefile_destroy (dst); - dst = NULL; - } + /* In-core sort. */ + sort->run = mem_writer_create (sort->proto); + sort->run_id = 1; } + while (!pqueue_is_empty (sort->pqueue)) + output_record (sort); - return dst; -} + merge_append (sort->merge, casewriter_make_reader (sort->run)); + sort->run = NULL; -/* Compares the variables specified by CRITERIA between the cases - at A and B, with a "last resort" comparison for stability, and - returns a strcmp()-type result. */ -static int -compare_indexed_cases (const void *a_, const void *b_, const void *criteria_) -{ - const struct sort_criteria *criteria = criteria_; - const struct indexed_case *a = a_; - const struct indexed_case *b = b_; - int result = compare_record (&a->c, &b->c, criteria); - if (result == 0) - result = a->idx < b->idx ? -1 : a->idx > b->idx; - return result; + output = merge_make_reader (sort->merge); + sort_casewriter_destroy (writer, sort); + return output; } - -/* External sort. */ -/* Maximum order of merge (external sort only). The maximum - reasonable value is about 7. Above that, it would be a good - idea to use a heap in merge_once() to select the minimum. */ -#define MAX_MERGE_ORDER 7 +static void +output_record (struct sort_writer *sort) +{ + struct ccase *min_case; + casenumber min_run_id; -/* Results of an external sort. */ -struct external_sort - { - const struct sort_criteria *criteria; /* Sort criteria. */ - size_t value_cnt; /* Size of data in `union value's. */ - struct casefile **runs; /* Array of initial runs. */ - size_t run_cnt, run_cap; /* Number of runs, allocated capacity. */ - }; + min_case = pqueue_pop (sort->pqueue, &min_run_id); +#if 0 + printf ("\toutput: %f to run %d\n", case_num_idx (min_case, 0), min_run_id); +#endif -/* Prototypes for helper functions. */ -static int write_runs (struct external_sort *, struct casereader *); -static struct casefile *merge (struct external_sort *); -static void destroy_external_sort (struct external_sort *); - -/* Performs a stable external sort of the active file according - to the specification in SCP. Forms initial runs using a heap - as a reservoir. Merges the initial runs according to a - pattern that assures stability. */ -static struct casefile * -do_external_sort (struct casereader *reader, - const struct sort_criteria *criteria) -{ - struct external_sort *xsrt; - - if (!casefile_to_disk (casereader_get_casefile (reader))) - return NULL; - - xsrt = xmalloc (sizeof *xsrt); - xsrt->criteria = criteria; - xsrt->value_cnt = casefile_get_value_cnt (casereader_get_casefile (reader)); - xsrt->run_cap = 512; - xsrt->run_cnt = 0; - xsrt->runs = xnmalloc (xsrt->run_cap, sizeof *xsrt->runs); - if (write_runs (xsrt, reader)) + if (sort->run_id != min_run_id && sort->run != NULL) { - struct casefile *output = merge (xsrt); - destroy_external_sort (xsrt); - return output; + merge_append (sort->merge, casewriter_make_reader (sort->run)); + sort->run = NULL; } - else + if (sort->run == NULL) { - destroy_external_sort (xsrt); - return NULL; + sort->run = tmpfile_writer_create (sort->proto); + sort->run_id = min_run_id; } -} -/* Destroys XSRT. */ -static void -destroy_external_sort (struct external_sort *xsrt) -{ - if (xsrt != NULL) - { - int i; - - for (i = 0; i < xsrt->run_cnt; i++) - casefile_destroy (xsrt->runs[i]); - free (xsrt->runs); - free (xsrt); - } + case_unref (sort->run_end); + sort->run_end = case_ref (min_case); + casewriter_write (sort->run, min_case); } - -/* Replacement selection. */ - -/* Pairs a record with a run number. */ -struct record_run - { - int run; /* Run number of case. */ - struct ccase record; /* Case data. */ - size_t idx; /* Case number (for stability). */ - }; -/* Represents a set of initial runs during an external sort. */ -struct initial_run_state +static struct casewriter_class sort_casewriter_class = { - struct external_sort *xsrt; - - /* Reservoir. */ - struct record_run *records; /* Records arranged as a heap. */ - size_t record_cnt; /* Current number of records. */ - size_t record_cap; /* Capacity for records. */ - - /* Run currently being output. */ - int run; /* Run number. */ - size_t case_cnt; /* Number of cases so far. */ - struct casefile *casefile; /* Output file. */ - struct ccase last_output; /* Record last output. */ - - int okay; /* Zero if an error has been encountered. */ + sort_casewriter_write, + sort_casewriter_destroy, + sort_casewriter_convert_to_reader, }; - -static bool destroy_initial_run_state (struct initial_run_state *); -static void process_case (struct initial_run_state *, - const struct ccase *, size_t); -static int allocate_cases (struct initial_run_state *); -static void output_record (struct initial_run_state *); -static void start_run (struct initial_run_state *); -static void end_run (struct initial_run_state *); -static int compare_record_run (const struct record_run *, - const struct record_run *, - const struct initial_run_state *); -static int compare_record_run_minheap (const void *, const void *, - const void *); - -/* Reads cases from READER and composes initial runs in XSRT. */ -static int -write_runs (struct external_sort *xsrt, struct casereader *reader) -{ - struct initial_run_state *irs; - struct ccase c; - size_t idx = 0; - int success = 0; - - /* Allocate memory for cases. */ - irs = xmalloc (sizeof *irs); - irs->xsrt = xsrt; - irs->records = NULL; - irs->record_cnt = irs->record_cap = 0; - irs->run = 0; - irs->case_cnt = 0; - irs->casefile = NULL; - case_nullify (&irs->last_output); - irs->okay = 1; - if (!allocate_cases (irs)) - goto done; - - /* Create initial runs. */ - start_run (irs); - for (; irs->okay && casereader_read (reader, &c); case_destroy (&c)) - process_case (irs, &c, idx++); - while (irs->okay && irs->record_cnt > 0) - output_record (irs); - end_run (irs); - - success = irs->okay; - - done: - if (!destroy_initial_run_state (irs)) - success = false; - - return success; -} - -/* Add a single case to an initial run. */ -static void -process_case (struct initial_run_state *irs, const struct ccase *c, - size_t idx) + +/* Reads all the cases from INPUT. Sorts the cases according to + ORDERING. Returns the sorted cases in a new casereader. */ +struct casereader * +sort_execute (struct casereader *input, const struct subcase *ordering) { - struct record_run *rr; - - /* Compose record_run for this run and add to heap. */ - assert (irs->record_cnt < irs->record_cap - 1); - rr = irs->records + irs->record_cnt++; - case_copy (&rr->record, 0, c, 0, irs->xsrt->value_cnt); - rr->run = irs->run; - rr->idx = idx; - if (!case_is_null (&irs->last_output) - && compare_record (c, &irs->last_output, irs->xsrt->criteria) < 0) - rr->run = irs->run + 1; - push_heap (irs->records, irs->record_cnt, sizeof *irs->records, - compare_record_run_minheap, irs); - - /* Output a record if the reservoir is full. */ - if (irs->record_cnt == irs->record_cap - 1 && irs->okay) - output_record (irs); + struct casewriter *output = + sort_create_writer (ordering, casereader_get_proto (input)); + casereader_transfer (input, output); + return casewriter_make_reader (output); } -/* Destroys the initial run state represented by IRS. - Returns true if successful, false if an I/O error occurred. */ -static bool -destroy_initial_run_state (struct initial_run_state *irs) +/* Reads all the cases from INPUT. Sorts the cases in ascending + order according to VARIABLE. Returns the sorted cases in a + new casereader. */ +struct casereader * +sort_execute_1var (struct casereader *input, const struct variable *var) { - int i; - bool ok = true; - - if (irs == NULL) - return true; + struct subcase sc; + struct casereader *reader; - for (i = 0; i < irs->record_cap; i++) - case_destroy (&irs->records[i].record); - free (irs->records); + subcase_init_var (&sc, var, SC_ASCEND); + reader = sort_execute (input, &sc); + subcase_destroy (&sc); + return reader; +} + +struct pqueue + { + struct subcase ordering; + struct pqueue_record *records; + size_t record_cnt; + size_t record_cap; + casenumber idx; + }; - if (irs->casefile != NULL) - ok = casefile_sleep (irs->casefile); +struct pqueue_record + { + casenumber id; + struct ccase *c; + casenumber idx; + }; - free (irs); - return ok; -} +static int compare_pqueue_records_minheap (const void *a, const void *b, + const void *pq_); -/* Allocates room for lots of cases as a buffer. */ -static int -allocate_cases (struct initial_run_state *irs) +static struct pqueue * +pqueue_create (const struct subcase *ordering, const struct caseproto *proto) { - int approx_case_cost; /* Approximate memory cost of one case in bytes. */ - int max_cases; /* Maximum number of cases to allocate. */ - int i; - - /* Allocate as many cases as we can within the workspace - limit. */ - approx_case_cost = (sizeof *irs->records - + irs->xsrt->value_cnt * sizeof (union value) - + 4 * sizeof (void *)); - max_cases = get_workspace() / approx_case_cost; - if (max_cases > max_buffers) - max_cases = max_buffers; - irs->records = nmalloc (sizeof *irs->records, max_cases); - if (irs->records != NULL) - for (i = 0; i < max_cases; i++) - if (!case_try_create (&irs->records[i].record, irs->xsrt->value_cnt)) - { - max_cases = i; - break; - } - irs->record_cap = max_cases; - - /* Fail if we didn't allocate an acceptable number of cases. */ - if (irs->records == NULL || max_cases < min_buffers) - { - msg (SE, _("Out of memory. Could not allocate room for minimum of %d " - "cases of %d bytes each. (PSPP workspace is currently " - "restricted to a maximum of %d KB.)"), - min_buffers, approx_case_cost, get_workspace() / 1024); - return 0; - } - return 1; + struct pqueue *pq; + + pq = xmalloc (sizeof *pq); + subcase_clone (&pq->ordering, ordering); + pq->record_cap = settings_get_workspace_cases (proto); + if (pq->record_cap > max_buffers) + pq->record_cap = max_buffers; + else if (pq->record_cap < min_buffers) + pq->record_cap = min_buffers; + pq->record_cnt = 0; + pq->records = xnmalloc (pq->record_cap, sizeof *pq->records); + pq->idx = 0; + + return pq; } -/* Compares the VAR_CNT variables in VARS[] between the `value's at - A and B, and returns a strcmp()-type result. */ -static int -compare_record (const struct ccase *a, const struct ccase *b, - const struct sort_criteria *criteria) +static void +pqueue_destroy (struct pqueue *pq) { - int i; - - assert (a != NULL); - assert (b != NULL); - - for (i = 0; i < criteria->crit_cnt; i++) + if (pq != NULL) { - const struct sort_criterion *c = &criteria->crits[i]; - int result; - - if (c->width == 0) + while (!pqueue_is_empty (pq)) { - double af = case_num (a, c->fv); - double bf = case_num (b, c->fv); - - result = af < bf ? -1 : af > bf; + casenumber id; + struct ccase *c = pqueue_pop (pq, &id); + case_unref (c); } - else - result = memcmp (case_str (a, c->fv), case_str (b, c->fv), c->width); - - if (result != 0) - return c->dir == SRT_ASCEND ? result : -result; + subcase_destroy (&pq->ordering); + free (pq->records); + free (pq); } - - return 0; -} - -/* Compares record-run tuples A and B on run number first, then - on record, then on case index. */ -static int -compare_record_run (const struct record_run *a, - const struct record_run *b, - const struct initial_run_state *irs) -{ - int result = a->run < b->run ? -1 : a->run > b->run; - if (result == 0) - result = compare_record (&a->record, &b->record, irs->xsrt->criteria); - if (result == 0) - result = a->idx < b->idx ? -1 : a->idx > b->idx; - return result; } -/* Compares record-run tuples A and B on run number first, then - on the current record according to SCP, but in descending - order. */ -static int -compare_record_run_minheap (const void *a, const void *b, const void *irs) +static bool +pqueue_is_full (const struct pqueue *pq) { - return -compare_record_run (a, b, irs); + return pq->record_cnt >= pq->record_cap; } -/* Begins a new initial run, specifically its output file. */ -static void -start_run (struct initial_run_state *irs) +static bool +pqueue_is_empty (const struct pqueue *pq) { - irs->run++; - irs->case_cnt = 0; - irs->casefile = fastfile_create (irs->xsrt->value_cnt); - casefile_to_disk (irs->casefile); - case_nullify (&irs->last_output); + return pq->record_cnt == 0; } -/* Ends the current initial run. */ static void -end_run (struct initial_run_state *irs) +pqueue_push (struct pqueue *pq, struct ccase *c, casenumber id) { - struct external_sort *xsrt = irs->xsrt; + struct pqueue_record *r; - /* Record initial run. */ - if (irs->casefile != NULL) - { - casefile_sleep (irs->casefile); - if (xsrt->run_cnt >= xsrt->run_cap) - { - xsrt->run_cap *= 2; - xsrt->runs = xnrealloc (xsrt->runs, - xsrt->run_cap, sizeof *xsrt->runs); - } - xsrt->runs[xsrt->run_cnt++] = irs->casefile; - if (casefile_error (irs->casefile)) - irs->okay = false; - irs->casefile = NULL; - } -} + assert (!pqueue_is_full (pq)); -/* Writes a record to the current initial run. */ -static void -output_record (struct initial_run_state *irs) -{ - struct record_run *record_run; - struct ccase case_tmp; - - /* Extract minimum case from heap. */ - assert (irs->record_cnt > 0); - pop_heap (irs->records, irs->record_cnt--, sizeof *irs->records, - compare_record_run_minheap, irs); - record_run = irs->records + irs->record_cnt; - - /* Bail if an error has occurred. */ - if (!irs->okay) - return; - - /* Start new run if necessary. */ - assert (record_run->run == irs->run - || record_run->run == irs->run + 1); - if (record_run->run != irs->run) - { - end_run (irs); - start_run (irs); - } - assert (record_run->run == irs->run); - irs->case_cnt++; - - /* Write to disk. */ - if (irs->casefile != NULL) - casefile_append (irs->casefile, &record_run->record); + r = &pq->records[pq->record_cnt++]; + r->id = id; + r->c = c; + r->idx = pq->idx++; - /* This record becomes last_output. */ - irs->last_output = case_tmp = record_run->record; - record_run->record = irs->records[irs->record_cap - 1].record; - irs->records[irs->record_cap - 1].record = case_tmp; + push_heap (pq->records, pq->record_cnt, sizeof *pq->records, + compare_pqueue_records_minheap, pq); } - -/* Merging. */ -static int choose_merge (struct casefile *runs[], int run_cnt, int order); -static struct casefile *merge_once (struct external_sort *, - struct casefile *[], size_t); - -/* Repeatedly merges run until only one is left, - and returns the final casefile. - Returns a null pointer if an I/O error occurs. */ -static struct casefile * -merge (struct external_sort *xsrt) +static struct ccase * +pqueue_pop (struct pqueue *pq, casenumber *id) { - while (xsrt->run_cnt > 1) - { - int order = MIN (MAX_MERGE_ORDER, xsrt->run_cnt); - int idx = choose_merge (xsrt->runs, xsrt->run_cnt, order); - xsrt->runs[idx] = merge_once (xsrt, xsrt->runs + idx, order); - remove_range (xsrt->runs, xsrt->run_cnt, sizeof *xsrt->runs, - idx + 1, order - 1); - xsrt->run_cnt -= order - 1; - - if (xsrt->runs[idx] == NULL) - return NULL; - } - assert (xsrt->run_cnt == 1); - xsrt->run_cnt = 0; - return xsrt->runs[0]; -} + struct pqueue_record *r; -/* Chooses ORDER runs out of the RUN_CNT runs in RUNS to merge, - and returns the index of the first one. + assert (!pqueue_is_empty (pq)); - For stability, we must merge only consecutive runs. For - efficiency, we choose the shortest consecutive sequence of - runs. */ -static int -choose_merge (struct casefile *runs[], int run_cnt, int order) -{ - int min_idx, min_sum; - int cur_idx, cur_sum; - int i; - - /* Sum up the length of the first ORDER runs. */ - cur_sum = 0; - for (i = 0; i < order; i++) - cur_sum += casefile_get_case_cnt (runs[i]); - - /* Find the shortest group of ORDER runs, - using a running total for efficiency. */ - min_idx = 0; - min_sum = cur_sum; - for (cur_idx = 1; cur_idx + order <= run_cnt; cur_idx++) - { - cur_sum -= casefile_get_case_cnt (runs[cur_idx - 1]); - cur_sum += casefile_get_case_cnt (runs[cur_idx + order - 1]); - if (cur_sum < min_sum) - { - min_sum = cur_sum; - min_idx = cur_idx; - } - } + pop_heap (pq->records, pq->record_cnt--, sizeof *pq->records, + compare_pqueue_records_minheap, pq); - return min_idx; + r = &pq->records[pq->record_cnt]; + *id = r->id; + return r->c; } -/* Merges the RUN_CNT initial runs specified in INPUT_FILES into a - new run, and returns the new run. - Returns a null pointer if an I/O error occurs. */ -static struct casefile * -merge_once (struct external_sort *xsrt, - struct casefile **const input_files, - size_t run_cnt) +/* Compares record-run tuples A and B on id, then on case data, + then on insertion order, in descending order. */ +static int +compare_pqueue_records_minheap (const void *a_, const void *b_, + const void *pq_) { - struct run - { - struct casefile *file; - struct casereader *reader; - struct ccase ccase; - } - *runs; - - struct casefile *output = NULL; - int i; - - /* Open input files. */ - runs = xnmalloc (run_cnt, sizeof *runs); - for (i = 0; i < run_cnt; i++) - { - struct run *r = &runs[i]; - r->file = input_files[i]; - r->reader = casefile_get_destructive_reader (r->file); - if (!casereader_read_xfer (r->reader, &r->ccase)) - { - run_cnt--; - i--; - } - } - - /* Create output file. */ - output = fastfile_create (xsrt->value_cnt); - casefile_to_disk (output); - - /* Merge. */ - while (run_cnt > 0) - { - struct run *min_run, *run; - - /* Find minimum. */ - min_run = runs; - for (run = runs + 1; run < runs + run_cnt; run++) - if (compare_record (&run->ccase, &min_run->ccase, xsrt->criteria) < 0) - min_run = run; - - /* Write minimum to output file. */ - casefile_append_xfer (output, &min_run->ccase); - - /* Read another case from minimum run. */ - if (!casereader_read_xfer (min_run->reader, &min_run->ccase)) - { - if (casefile_error (min_run->file) || casefile_error (output)) - goto error; - casereader_destroy (min_run->reader); - casefile_destroy (min_run->file); - - remove_element (runs, run_cnt, sizeof *runs, min_run - runs); - run_cnt--; - } - } - - if (!casefile_sleep (output)) - goto error; - free (runs); - - return output; - - error: - for (i = 0; i < run_cnt; i++) - casefile_destroy (runs[i].file); - casefile_destroy (output); - free (runs); - return NULL; + const struct pqueue_record *a = a_; + const struct pqueue_record *b = b_; + const struct pqueue *pq = pq_; + int result = a->id < b->id ? -1 : a->id > b->id; + if (result == 0) + result = subcase_compare_3way (&pq->ordering, a->c, &pq->ordering, b->c); + if (result == 0) + result = a->idx < b->idx ? -1 : a->idx > b->idx; + return -result; }