X-Git-Url: https://pintos-os.org/cgi-bin/gitweb.cgi?a=blobdiff_plain;f=src%2Fmath%2Fsort.c;h=752d5270007bcdc4d505e2eb4ad4ba887b27e8fd;hb=refs%2Fheads%2Fpivot-table2;hp=46da0ec0dc4406b16d4b9b3c6c993b96cdf40623;hpb=59d14e5581317e3d1e37c8b92b535ba197984776;p=pspp diff --git a/src/math/sort.c b/src/math/sort.c index 46da0ec0dc..752d527000 100644 --- a/src/math/sort.c +++ b/src/math/sort.c @@ -1,50 +1,37 @@ -/* PSPP - computes sample statistics. - Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc. +/* PSPP - a program for statistical analysis. + Copyright (C) 1997-9, 2000, 2006, 2009-12, 2014 Free Software Foundation, Inc. - This program is free software; you can redistribute it and/or - modify it under the terms of the GNU General Public License as - published by the Free Software Foundation; either version 2 of the - License, or (at your option) any later version. + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. - This program is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. You should have received a copy of the GNU General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA - 02110-1301, USA. */ + along with this program. If not, see . */ #include -#include "sort.h" +#include "math/sort.h" -#include -#include -#include #include -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "minmax.h" + +#include "data/case.h" +#include "data/casereader.h" +#include "data/casewriter-provider.h" +#include "data/casewriter.h" +#include "data/settings.h" +#include "data/subcase.h" +#include "libpspp/array.h" +#include "libpspp/assertion.h" +#include "libpspp/bt.h" +#include "math/merge.h" + +#include "gl/xalloc.h" #include "gettext.h" #define _(msgid) gettext (msgid) @@ -52,701 +39,337 @@ /* These should only be changed for testing purposes. */ int min_buffers = 64; int max_buffers = INT_MAX; -bool allow_internal_sort = true; - -static int compare_record (const struct ccase *, const struct ccase *, - const struct sort_criteria *); -static struct casefile *do_internal_sort (struct casereader *, - const struct sort_criteria *, - struct casefile_factory * - ); -static struct casefile *do_external_sort (struct casereader *, - const struct sort_criteria *, - struct casefile_factory * - ); - - -/* Sorts the active file in-place according to CRITERIA. - Returns true if successful. */ -bool -sort_active_file_in_place (struct dataset *ds, - const struct sort_criteria *criteria) -{ - struct casefile *in, *out; - - proc_cancel_temporary_transformations (ds); - if (!procedure (ds, NULL, NULL)) - return false; - - in = proc_capture_output (ds); - out = sort_execute (casefile_get_destructive_reader (in), criteria, - dataset_get_casefile_factory (ds)); - if (out == NULL) - return false; - - proc_set_source (ds, storage_source_create (out)); - return true; -} -/* Data passed to sort_to_casefile_callback(). */ -struct sort_to_casefile_cb_data +struct sort_writer { - const struct sort_criteria *criteria; - struct casefile *output; - struct casefile_factory *factory ; + struct caseproto *proto; + struct subcase ordering; + struct merge *merge; + struct pqueue *pqueue; + + sort_distinct_combine_func *combine; + sort_distinct_destroy_func *destroy; + void *aux; + + struct casewriter *run; + casenumber run_id; + struct ccase *run_end; }; -/* Sorts casefile CF according to the criteria in CB_DATA. */ -static bool -sort_to_casefile_callback (const struct casefile *cf, void *cb_data_) -{ - struct sort_to_casefile_cb_data *cb_data = cb_data_; - cb_data->output = sort_execute (casefile_get_reader (cf, NULL), - cb_data->criteria, - cb_data->factory - ); - return cb_data->output != NULL; -} +static struct casewriter_class sort_casewriter_class; + +static struct pqueue *pqueue_create (const struct subcase *, + const struct caseproto *, + sort_distinct_combine_func *, void *aux); +static void pqueue_destroy (struct pqueue *); +static bool pqueue_is_full (const struct pqueue *); +static bool pqueue_is_empty (const struct pqueue *); +static void pqueue_push (struct pqueue *, struct ccase *, casenumber); +static struct ccase *pqueue_pop (struct pqueue *, casenumber *); + +static void output_record (struct sort_writer *); -/* Sorts the active file to a separate casefile. If successful, - returns the sorted casefile. Returns a null pointer on - failure. */ -struct casefile * -sort_active_file_to_casefile (struct dataset *ds, - const struct sort_criteria *criteria) +struct casewriter * +sort_create_writer (const struct subcase *ordering, + const struct caseproto *proto) { - struct sort_to_casefile_cb_data cb_data; - - proc_cancel_temporary_transformations (ds); - - cb_data.criteria = criteria; - cb_data.output = NULL; - cb_data.factory = dataset_get_casefile_factory (ds); - if (!multipass_procedure (ds, sort_to_casefile_callback, &cb_data)) - { - casefile_destroy (cb_data.output); - return NULL; - } - return cb_data.output; + return sort_distinct_create_writer (ordering, proto, NULL, NULL, NULL); } - -/* Reads all the cases from READER, which is destroyed. Sorts - the cases according to CRITERIA. Returns the sorted cases in - a newly created casefile, which will be created by FACTORY. - If FACTORY is NULL, then a local fastfile_factory will be used. -*/ -struct casefile * -sort_execute (struct casereader *reader, - const struct sort_criteria *criteria, - struct casefile_factory *factory - ) +struct casewriter * +sort_distinct_create_writer (const struct subcase *ordering, + const struct caseproto *proto, + sort_distinct_combine_func *combine, + sort_distinct_destroy_func *destroy, + void *aux) { - struct casefile_factory *local_factory = NULL; - struct casefile *output ; - if ( factory == NULL ) - factory = local_factory = fastfile_factory_create (); + struct sort_writer *sort; - output = do_internal_sort (reader, criteria, factory); - if (output == NULL) - output = do_external_sort (reader, criteria, factory); - casereader_destroy (reader); + sort = xmalloc (sizeof *sort); + sort->proto = caseproto_ref (proto); + subcase_clone (&sort->ordering, ordering); + sort->merge = merge_create (ordering, proto, combine, aux); + sort->pqueue = pqueue_create (ordering, proto, combine, aux); - fastfile_factory_destroy (local_factory); + sort->combine = combine; + sort->destroy = destroy; + sort->aux = aux; - return output; -} - -/* A case and its index. */ -struct indexed_case - { - struct ccase c; /* Case. */ - unsigned long idx; /* Index to allow for stable sorting. */ - }; + sort->run = NULL; + sort->run_id = 0; + sort->run_end = NULL; -static int compare_indexed_cases (const void *, const void *, const void *); + return casewriter_create (proto, &sort_casewriter_class, sort); +} -/* If the data is in memory, do an internal sort and return a new - casefile for the data. Otherwise, return a null pointer. */ -static struct casefile * -do_internal_sort (struct casereader *reader, - const struct sort_criteria *criteria, - struct casefile_factory *factory) +static void +sort_casewriter_write (struct casewriter *writer UNUSED, void *sort_, + struct ccase *c) { - const struct casefile *src; - struct casefile *dst; - unsigned long case_cnt; - - if (!allow_internal_sort) - return NULL; - - src = casereader_get_casefile (reader); - if (casefile_get_case_cnt (src) > 1 && !casefile_in_core (src)) - return NULL; - - case_cnt = casefile_get_case_cnt (src); - dst = factory->create_casefile (factory, casefile_get_value_cnt (src)); - if (case_cnt != 0) - { - struct indexed_case *cases = nmalloc (sizeof *cases, case_cnt); - if (cases != NULL) - { - unsigned long i; - - for (i = 0; i < case_cnt; i++) - { - bool ok = casereader_read_xfer (reader, &cases[i].c); - if (!ok) - NOT_REACHED (); - cases[i].idx = i; - } + struct sort_writer *sort = sort_; + bool next_run; - sort (cases, case_cnt, sizeof *cases, compare_indexed_cases, - (void *) criteria); - - for (i = 0; i < case_cnt; i++) - casefile_append_xfer (dst, &cases[i].c); - if (casefile_error (dst)) - NOT_REACHED (); - - free (cases); - } - else - { - /* Failure. */ - casefile_destroy (dst); - dst = NULL; - } - } + if (pqueue_is_full (sort->pqueue)) + output_record (sort); - return dst; + next_run = (sort->run_end == NULL + || subcase_compare_3way (&sort->ordering, c, + &sort->ordering, sort->run_end) < 0); + pqueue_push (sort->pqueue, c, sort->run_id + (next_run ? 1 : 0)); } -/* Compares the variables specified by CRITERIA between the cases - at A and B, with a "last resort" comparison for stability, and - returns a strcmp()-type result. */ -static int -compare_indexed_cases (const void *a_, const void *b_, const void *criteria_) +static void +sort_casewriter_destroy (struct casewriter *writer UNUSED, void *sort_) { - const struct sort_criteria *criteria = criteria_; - const struct indexed_case *a = a_; - const struct indexed_case *b = b_; - int result = compare_record (&a->c, &b->c, criteria); - if (result == 0) - result = a->idx < b->idx ? -1 : a->idx > b->idx; - return result; + struct sort_writer *sort = sort_; + + if (sort->destroy != NULL) + sort->destroy (sort->aux); + + subcase_destroy (&sort->ordering); + merge_destroy (sort->merge); + pqueue_destroy (sort->pqueue); + casewriter_destroy (sort->run); + case_unref (sort->run_end); + caseproto_unref (sort->proto); + free (sort); } - -/* External sort. */ - -/* Maximum order of merge (external sort only). The maximum - reasonable value is about 7. Above that, it would be a good - idea to use a heap in merge_once() to select the minimum. */ -#define MAX_MERGE_ORDER 7 - -/* Results of an external sort. */ -struct external_sort - { - const struct sort_criteria *criteria; /* Sort criteria. */ - size_t value_cnt; /* Size of data in `union value's. */ - struct casefile **runs; /* Array of initial runs. */ - size_t run_cnt, run_cap; /* Number of runs, allocated capacity. */ - struct casefile_factory *factory; /* Factory used to create the result */ - }; -/* Prototypes for helper functions. */ -static int write_runs (struct external_sort *, struct casereader *); -static struct casefile *merge (struct external_sort *); -static void destroy_external_sort (struct external_sort *); - -/* Performs a stable external sort of the active file according - to the specification in SCP. Forms initial runs using a heap - as a reservoir. Merges the initial runs according to a - pattern that assures stability. */ -static struct casefile * -do_external_sort (struct casereader *reader, - const struct sort_criteria *criteria, - struct casefile_factory *factory - ) +static struct casereader * +sort_casewriter_convert_to_reader (struct casewriter *writer, void *sort_) { - struct external_sort *xsrt; - - if (!casefile_to_disk (casereader_get_casefile (reader))) - return NULL; - - xsrt = xmalloc (sizeof *xsrt); - xsrt->criteria = criteria; - xsrt->value_cnt = casefile_get_value_cnt (casereader_get_casefile (reader)); - xsrt->run_cap = 512; - xsrt->run_cnt = 0; - xsrt->runs = xnmalloc (xsrt->run_cap, sizeof *xsrt->runs); - xsrt->factory = factory; - if (write_runs (xsrt, reader)) - { - struct casefile *output = merge (xsrt); - destroy_external_sort (xsrt); - return output; - } - else + struct sort_writer *sort = sort_; + struct casereader *output; + + if (sort->run == NULL && sort->run_id == 0) { - destroy_external_sort (xsrt); - return NULL; + /* In-core sort. */ + sort->run = mem_writer_create (sort->proto); + sort->run_id = 1; } + while (!pqueue_is_empty (sort->pqueue)) + output_record (sort); + + merge_append (sort->merge, casewriter_make_reader (sort->run)); + sort->run = NULL; + + output = merge_make_reader (sort->merge); + sort_casewriter_destroy (writer, sort); + return output; } -/* Destroys XSRT. */ static void -destroy_external_sort (struct external_sort *xsrt) +output_record (struct sort_writer *sort) { - if (xsrt != NULL) + struct ccase *min_case; + casenumber min_run_id; + + min_case = pqueue_pop (sort->pqueue, &min_run_id); +#if 0 + printf ("\toutput: %f to run %d\n", case_num_idx (min_case, 0), min_run_id); +#endif + + if (sort->run_id != min_run_id && sort->run != NULL) { - int i; - - for (i = 0; i < xsrt->run_cnt; i++) - casefile_destroy (xsrt->runs[i]); - free (xsrt->runs); - free (xsrt); + merge_append (sort->merge, casewriter_make_reader (sort->run)); + sort->run = NULL; + } + if (sort->run == NULL) + { + sort->run = tmpfile_writer_create (sort->proto); + sort->run_id = min_run_id; } -} - -/* Replacement selection. */ -/* Pairs a record with a run number. */ -struct record_run - { - int run; /* Run number of case. */ - struct ccase record; /* Case data. */ - size_t idx; /* Case number (for stability). */ - }; + case_unref (sort->run_end); + sort->run_end = case_ref (min_case); + casewriter_write (sort->run, min_case); +} -/* Represents a set of initial runs during an external sort. */ -struct initial_run_state +static struct casewriter_class sort_casewriter_class = { - struct external_sort *xsrt; - - /* Reservoir. */ - struct record_run *records; /* Records arranged as a heap. */ - size_t record_cnt; /* Current number of records. */ - size_t record_cap; /* Capacity for records. */ - - /* Run currently being output. */ - int run; /* Run number. */ - size_t case_cnt; /* Number of cases so far. */ - struct casefile *casefile; /* Output file. */ - struct ccase last_output; /* Record last output. */ - - int okay; /* Zero if an error has been encountered. */ + sort_casewriter_write, + sort_casewriter_destroy, + sort_casewriter_convert_to_reader, }; - -static bool destroy_initial_run_state (struct initial_run_state *); -static void process_case (struct initial_run_state *, - const struct ccase *, size_t); -static int allocate_cases (struct initial_run_state *); -static void output_record (struct initial_run_state *); -static void start_run (struct initial_run_state *); -static void end_run (struct initial_run_state *); -static int compare_record_run (const struct record_run *, - const struct record_run *, - const struct initial_run_state *); -static int compare_record_run_minheap (const void *, const void *, - const void *); - -/* Reads cases from READER and composes initial runs in XSRT. */ -static int -write_runs (struct external_sort *xsrt, struct casereader *reader) + +/* Reads all the cases from INPUT. Sorts the cases according to + ORDERING. Returns the sorted cases in a new casereader. + INPUT is destroyed by this function. + */ +struct casereader * +sort_execute (struct casereader *input, const struct subcase *ordering) { - struct initial_run_state *irs; - struct ccase c; - size_t idx = 0; - int success = 0; - - /* Allocate memory for cases. */ - irs = xmalloc (sizeof *irs); - irs->xsrt = xsrt; - irs->records = NULL; - irs->record_cnt = irs->record_cap = 0; - irs->run = 0; - irs->case_cnt = 0; - irs->casefile = NULL; - case_nullify (&irs->last_output); - irs->okay = 1; - if (!allocate_cases (irs)) - goto done; - - /* Create initial runs. */ - start_run (irs); - for (; irs->okay && casereader_read (reader, &c); case_destroy (&c)) - process_case (irs, &c, idx++); - while (irs->okay && irs->record_cnt > 0) - output_record (irs); - end_run (irs); - - success = irs->okay; - - done: - if (!destroy_initial_run_state (irs)) - success = false; - - return success; + struct casewriter *output = + sort_create_writer (ordering, casereader_get_proto (input)); + casereader_transfer (input, output); + return casewriter_make_reader (output); } -/* Add a single case to an initial run. */ -static void -process_case (struct initial_run_state *irs, const struct ccase *c, - size_t idx) +/* Reads all the cases from INPUT. Sorts the cases in ascending + order according to VARIABLE. Returns the sorted cases in a + new casereader. INPUT is destroyed by this function. */ +struct casereader * +sort_execute_1var (struct casereader *input, const struct variable *var) { - struct record_run *rr; - - /* Compose record_run for this run and add to heap. */ - assert (irs->record_cnt < irs->record_cap - 1); - rr = irs->records + irs->record_cnt++; - case_copy (&rr->record, 0, c, 0, irs->xsrt->value_cnt); - rr->run = irs->run; - rr->idx = idx; - if (!case_is_null (&irs->last_output) - && compare_record (c, &irs->last_output, irs->xsrt->criteria) < 0) - rr->run = irs->run + 1; - push_heap (irs->records, irs->record_cnt, sizeof *irs->records, - compare_record_run_minheap, irs); - - /* Output a record if the reservoir is full. */ - if (irs->record_cnt == irs->record_cap - 1 && irs->okay) - output_record (irs); + struct subcase sc; + struct casereader *reader; + + subcase_init_var (&sc, var, SC_ASCEND); + reader = sort_execute (input, &sc); + subcase_destroy (&sc); + return reader; } -/* Destroys the initial run state represented by IRS. - Returns true if successful, false if an I/O error occurred. */ -static bool -destroy_initial_run_state (struct initial_run_state *irs) +/* Reads all the cases from INPUT. Sorts the cases according to ORDERING, + combining cases that have the same ORDERING values using COMBINE. + Returns the sorted cases in a new casereader. */ +struct casereader * +sort_distinct_execute (struct casereader *input, + const struct subcase *ordering, + sort_distinct_combine_func *combine, + sort_distinct_destroy_func *destroy, + void *aux) { - int i; - bool ok = true; - - if (irs == NULL) - return true; + struct casewriter *output = + sort_distinct_create_writer (ordering, casereader_get_proto (input), + combine, destroy, aux); + casereader_transfer (input, output); + return casewriter_make_reader (output); +} + +struct pqueue + { + struct subcase ordering; + struct bt bt; + size_t record_max; + casenumber idx; - for (i = 0; i < irs->record_cap; i++) - case_destroy (&irs->records[i].record); - free (irs->records); + sort_distinct_combine_func *combine; + void *aux; + }; - if (irs->casefile != NULL) - ok = casefile_sleep (irs->casefile); +struct pqueue_record + { + struct bt_node bt_node; + casenumber id; + struct ccase *c; + casenumber idx; + }; - free (irs); - return ok; -} +static int compare_pqueue_records (const struct bt_node *a, + const struct bt_node *b, + const void *ordering); -/* Allocates room for lots of cases as a buffer. */ -static int -allocate_cases (struct initial_run_state *irs) +static struct pqueue * +pqueue_create (const struct subcase *ordering, const struct caseproto *proto, + sort_distinct_combine_func *combine, void *aux) { - int approx_case_cost; /* Approximate memory cost of one case in bytes. */ - int max_cases; /* Maximum number of cases to allocate. */ - int i; - - /* Allocate as many cases as we can within the workspace - limit. */ - approx_case_cost = (sizeof *irs->records - + irs->xsrt->value_cnt * sizeof (union value) - + 4 * sizeof (void *)); - max_cases = get_workspace() / approx_case_cost; - if (max_cases > max_buffers) - max_cases = max_buffers; - irs->records = nmalloc (sizeof *irs->records, max_cases); - if (irs->records != NULL) - for (i = 0; i < max_cases; i++) - if (!case_try_create (&irs->records[i].record, irs->xsrt->value_cnt)) - { - max_cases = i; - break; - } - irs->record_cap = max_cases; - - /* Fail if we didn't allocate an acceptable number of cases. */ - if (irs->records == NULL || max_cases < min_buffers) - { - msg (SE, _("Out of memory. Could not allocate room for minimum of %d " - "cases of %d bytes each. (PSPP workspace is currently " - "restricted to a maximum of %lu KB.)"), - min_buffers, approx_case_cost, - (unsigned long int) (get_workspace() / 1024)); - return 0; - } - return 1; + struct pqueue *pq; + + pq = xmalloc (sizeof *pq); + subcase_clone (&pq->ordering, ordering); + pq->record_max = settings_get_workspace_cases (proto); + if (pq->record_max > max_buffers) + pq->record_max = max_buffers; + else if (pq->record_max < min_buffers) + pq->record_max = min_buffers; + bt_init (&pq->bt, compare_pqueue_records, &pq->ordering); + pq->idx = 0; + + pq->combine = combine; + pq->aux = aux; + + return pq; } -/* Compares the VAR_CNT variables in VARS[] between the `value's at - A and B, and returns a strcmp()-type result. */ -static int -compare_record (const struct ccase *a, const struct ccase *b, - const struct sort_criteria *criteria) +static void +pqueue_destroy (struct pqueue *pq) { - int i; - - assert (a != NULL); - assert (b != NULL); - - for (i = 0; i < criteria->crit_cnt; i++) + if (pq != NULL) { - const struct sort_criterion *c = &criteria->crits[i]; - int result; - - if (c->width == 0) + while (!pqueue_is_empty (pq)) { - double af = case_num_idx (a, c->fv); - double bf = case_num_idx (b, c->fv); - - result = af < bf ? -1 : af > bf; + casenumber id; + struct ccase *c = pqueue_pop (pq, &id); + case_unref (c); } - else - result = memcmp (case_str_idx (a, c->fv), - case_str_idx (b, c->fv), c->width); - - if (result != 0) - return c->dir == SRT_ASCEND ? result : -result; + subcase_destroy (&pq->ordering); + free (pq); } - - return 0; } -/* Compares record-run tuples A and B on run number first, then - on record, then on case index. */ -static int -compare_record_run (const struct record_run *a, - const struct record_run *b, - const struct initial_run_state *irs) +static bool +pqueue_is_full (const struct pqueue *pq) { - int result = a->run < b->run ? -1 : a->run > b->run; - if (result == 0) - result = compare_record (&a->record, &b->record, irs->xsrt->criteria); - if (result == 0) - result = a->idx < b->idx ? -1 : a->idx > b->idx; - return result; + return bt_count (&pq->bt) >= pq->record_max; } -/* Compares record-run tuples A and B on run number first, then - on the current record according to SCP, but in descending - order. */ -static int -compare_record_run_minheap (const void *a, const void *b, const void *irs) +static bool +pqueue_is_empty (const struct pqueue *pq) { - return -compare_record_run (a, b, irs); + return bt_is_empty (&pq->bt); } -/* Begins a new initial run, specifically its output file. */ static void -start_run (struct initial_run_state *irs) +pqueue_push (struct pqueue *pq, struct ccase *c, casenumber id) { - irs->run++; - irs->case_cnt = 0; - - /* This casefile is internal to the sort, so don't use the factory - to create it. */ - irs->casefile = fastfile_create (irs->xsrt->value_cnt); - casefile_to_disk (irs->casefile); - case_nullify (&irs->last_output); -} + struct pqueue_record *r; -/* Ends the current initial run. */ -static void -end_run (struct initial_run_state *irs) -{ - struct external_sort *xsrt = irs->xsrt; + assert (!pqueue_is_full (pq)); - /* Record initial run. */ - if (irs->casefile != NULL) + r = xmalloc (sizeof *r); + r->id = id; + r->c = c; + r->idx = pq->idx++; + bt_insert (&pq->bt, &r->bt_node); + + if (pq->combine != NULL) { - casefile_sleep (irs->casefile); - if (xsrt->run_cnt >= xsrt->run_cap) + struct bt_node *q_ = bt_prev (&pq->bt, &r->bt_node); + if (q_ != NULL) { - xsrt->run_cap *= 2; - xsrt->runs = xnrealloc (xsrt->runs, - xsrt->run_cap, sizeof *xsrt->runs); + struct pqueue_record *q = bt_data (q_, struct pqueue_record, + bt_node); + if (q->id == r->id && subcase_equal (&pq->ordering, q->c, + &pq->ordering, r->c)) + { + bt_delete (&pq->bt, &r->bt_node); + q->c = pq->combine (q->c, r->c, pq->aux); + free (r); + } } - xsrt->runs[xsrt->run_cnt++] = irs->casefile; - if (casefile_error (irs->casefile)) - irs->okay = false; - irs->casefile = NULL; } } -/* Writes a record to the current initial run. */ -static void -output_record (struct initial_run_state *irs) +static struct ccase * +pqueue_pop (struct pqueue *pq, casenumber *id) { - struct record_run *record_run; - struct ccase case_tmp; - - /* Extract minimum case from heap. */ - assert (irs->record_cnt > 0); - pop_heap (irs->records, irs->record_cnt--, sizeof *irs->records, - compare_record_run_minheap, irs); - record_run = irs->records + irs->record_cnt; - - /* Bail if an error has occurred. */ - if (!irs->okay) - return; - - /* Start new run if necessary. */ - assert (record_run->run == irs->run - || record_run->run == irs->run + 1); - if (record_run->run != irs->run) - { - end_run (irs); - start_run (irs); - } - assert (record_run->run == irs->run); - irs->case_cnt++; - - /* Write to disk. */ - if (irs->casefile != NULL) - casefile_append (irs->casefile, &record_run->record); - - /* This record becomes last_output. */ - irs->last_output = case_tmp = record_run->record; - record_run->record = irs->records[irs->record_cap - 1].record; - irs->records[irs->record_cap - 1].record = case_tmp; -} - -/* Merging. */ + struct pqueue_record *r; + struct ccase *c; -static int choose_merge (struct casefile *runs[], int run_cnt, int order); -static struct casefile *merge_once (struct external_sort *, - struct casefile *[], size_t); + assert (!pqueue_is_empty (pq)); -/* Repeatedly merges run until only one is left, - and returns the final casefile. - Returns a null pointer if an I/O error occurs. */ -static struct casefile * -merge (struct external_sort *xsrt) -{ - while (xsrt->run_cnt > 1) - { - int order = MIN (MAX_MERGE_ORDER, xsrt->run_cnt); - int idx = choose_merge (xsrt->runs, xsrt->run_cnt, order); - xsrt->runs[idx] = merge_once (xsrt, xsrt->runs + idx, order); - remove_range (xsrt->runs, xsrt->run_cnt, sizeof *xsrt->runs, - idx + 1, order - 1); - xsrt->run_cnt -= order - 1; - - if (xsrt->runs[idx] == NULL) - return NULL; - } - assert (xsrt->run_cnt == 1); - xsrt->run_cnt = 0; - return xsrt->runs[0]; + r = bt_data (bt_first (&pq->bt), struct pqueue_record, bt_node); + bt_delete (&pq->bt, &r->bt_node); + *id = r->id; + c = r->c; + free (r); + return c; } -/* Chooses ORDER runs out of the RUN_CNT runs in RUNS to merge, - and returns the index of the first one. - - For stability, we must merge only consecutive runs. For - efficiency, we choose the shortest consecutive sequence of - runs. */ +/* Compares record-run tuples A and B on id, then on case data, + then on insertion order. */ static int -choose_merge (struct casefile *runs[], int run_cnt, int order) +compare_pqueue_records (const struct bt_node *a_, const struct bt_node *b_, + const void *ordering_) { - int min_idx, min_sum; - int cur_idx, cur_sum; - int i; - - /* Sum up the length of the first ORDER runs. */ - cur_sum = 0; - for (i = 0; i < order; i++) - cur_sum += casefile_get_case_cnt (runs[i]); - - /* Find the shortest group of ORDER runs, - using a running total for efficiency. */ - min_idx = 0; - min_sum = cur_sum; - for (cur_idx = 1; cur_idx + order <= run_cnt; cur_idx++) - { - cur_sum -= casefile_get_case_cnt (runs[cur_idx - 1]); - cur_sum += casefile_get_case_cnt (runs[cur_idx + order - 1]); - if (cur_sum < min_sum) - { - min_sum = cur_sum; - min_idx = cur_idx; - } - } - - return min_idx; -} - -/* Merges the RUN_CNT initial runs specified in INPUT_FILES into a - new run, and returns the new run. - Returns a null pointer if an I/O error occurs. */ -static struct casefile * -merge_once (struct external_sort *xsrt, - struct casefile **const input_files, - size_t run_cnt) -{ - struct run - { - struct casefile *file; - struct casereader *reader; - struct ccase ccase; - } - *runs; - - struct casefile *output = NULL; - int i; - - /* Open input files. */ - runs = xnmalloc (run_cnt, sizeof *runs); - for (i = 0; i < run_cnt; i++) - { - struct run *r = &runs[i]; - r->file = input_files[i]; - r->reader = casefile_get_destructive_reader (r->file); - if (!casereader_read_xfer (r->reader, &r->ccase)) - { - run_cnt--; - i--; - } - } - - /* Create output file. */ - output = xsrt->factory->create_casefile (xsrt->factory, xsrt->value_cnt); - casefile_to_disk (output); - - /* Merge. */ - while (run_cnt > 0) - { - struct run *min_run, *run; - - /* Find minimum. */ - min_run = runs; - for (run = runs + 1; run < runs + run_cnt; run++) - if (compare_record (&run->ccase, &min_run->ccase, xsrt->criteria) < 0) - min_run = run; - - /* Write minimum to output file. */ - casefile_append_xfer (output, &min_run->ccase); - - /* Read another case from minimum run. */ - if (!casereader_read_xfer (min_run->reader, &min_run->ccase)) - { - if (casefile_error (min_run->file) || casefile_error (output)) - goto error; - casereader_destroy (min_run->reader); - casefile_destroy (min_run->file); - - remove_element (runs, run_cnt, sizeof *runs, min_run - runs); - run_cnt--; - } - } - - if (!casefile_sleep (output)) - goto error; - free (runs); - - return output; - - error: - for (i = 0; i < run_cnt; i++) - casefile_destroy (runs[i].file); - casefile_destroy (output); - free (runs); - return NULL; + const struct pqueue_record *a = bt_data (a_, struct pqueue_record, bt_node); + const struct pqueue_record *b = bt_data (b_, struct pqueue_record, bt_node); + const struct subcase *ordering = ordering_; + int result = a->id < b->id ? -1 : a->id > b->id; + if (result == 0) + result = subcase_compare_3way (ordering, a->c, ordering, b->c); + if (result == 0) + result = a->idx < b->idx ? -1 : a->idx > b->idx; + return result; }