#include <config.h>
#include "sort.h"
#include "error.h"
+#include <limits.h>
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include "command.h"
#include "error.h"
#include "expressions/public.h"
+#include "glob.h"
#include "lexer.h"
#include "misc.h"
#include "settings.h"
#include "vfm.h"
#include "vfmP.h"
-#if HAVE_UNISTD_H
-#include <unistd.h>
-#endif
-
-#if HAVE_SYS_TYPES_H
-#include <sys/types.h>
-#endif
-
-#if HAVE_SYS_STAT_H
-#include <sys/stat.h>
-#endif
-
#include "debug-print.h"
/* Sort direction. */
size_t crit_cnt;
};
+/* These should only be changed for testing purposes. */
+static int min_buffers = 64;
+static int max_buffers = INT_MAX;
+static bool allow_internal_sort = true;
+
static int compare_record (const struct ccase *, const struct ccase *,
const struct sort_criteria *);
static struct casefile *do_internal_sort (struct casereader *,
cmd_sort_cases (void)
{
struct sort_criteria *criteria;
- int success;
+ bool success = false;
lex_match (T_BY);
if (criteria == NULL)
return CMD_FAILURE;
+ if (test_mode && lex_match ('/'))
+ {
+ if (!lex_force_match_id ("BUFFERS") || !lex_match ('=')
+ || !lex_force_int ())
+ goto done;
+
+ min_buffers = max_buffers = lex_integer ();
+ allow_internal_sort = false;
+ if (max_buffers < 2)
+ {
+ msg (SE, _("Buffer limit must be at least 2."));
+ goto done;
+ }
+
+ lex_get ();
+ }
+
success = sort_active_file_in_place (criteria);
+
+ done:
+ min_buffers = 64;
+ max_buffers = INT_MAX;
+ allow_internal_sort = true;
+
sort_destroy_criteria (criteria);
return success ? lex_end_of_command () : CMD_FAILURE;
}
struct casefile *
sort_execute (struct casereader *reader, const struct sort_criteria *criteria)
{
- struct casefile *output;
-
- output = do_internal_sort (reader, criteria);
+ struct casefile *output = do_internal_sort (reader, criteria);
if (output == NULL)
output = do_external_sort (reader, criteria);
casereader_destroy (reader);
static int compare_indexed_cases (const void *, const void *, void *);
/* If the data is in memory, do an internal sort and return a new
- casefile for the data. */
+ casefile for the data. Otherwise, return a null pointer. */
static struct casefile *
do_internal_sort (struct casereader *reader,
const struct sort_criteria *criteria)
struct casefile *dst;
unsigned long case_cnt;
+ if (!allow_internal_sort)
+ return NULL;
+
src = casereader_get_casefile (reader);
if (casefile_get_case_cnt (src) > 1 && !casefile_in_core (src))
return NULL;
\f
/* External sort. */
-/* Maximum order of merge. If you increase this, then you should
- use a heap for comparing cases during merge. */
-#define MAX_MERGE_ORDER 7
-
-/* Minimum total number of records for buffers. */
-#define MIN_BUFFER_TOTAL_SIZE_RECS 64
-
-/* Minimum single input buffer size, in bytes and records. */
-#define MIN_BUFFER_SIZE_BYTES 4096
-#define MIN_BUFFER_SIZE_RECS 16
-
-#if MIN_BUFFER_SIZE_RECS * 2 + 16 > MIN_BUFFER_TOTAL_SIZE_RECS
-#error MIN_BUFFER_SIZE_RECS and MIN_BUFFER_TOTAL_SIZE_RECS do not make sense.
-#endif
-
-/* Sorts initial runs A and B in decending order by length. */
-static int
-compare_initial_runs (const void *a_, const void *b_, void *aux UNUSED)
-{
- struct casefile *const *a = a_;
- struct casefile *const *b = b_;
- unsigned long a_case_cnt = casefile_get_case_cnt (*a);
- unsigned long b_case_cnt = casefile_get_case_cnt (*b);
-
- return a_case_cnt > b_case_cnt ? -1 : a_case_cnt < b_case_cnt;
-}
+/* Maximum order of merge (external sort only). The maximum
+ reasonable value is about 7. Above that, it would be a good
+ idea to use a heap in merge_once() to select the minimum. */
+#define MAX_MERGE_ORDER 7
/* Results of an external sort. */
struct external_sort
{
const struct sort_criteria *criteria; /* Sort criteria. */
size_t value_cnt; /* Size of data in `union value's. */
- struct casefile **initial_runs; /* Array of initial runs. */
+ struct casefile **runs; /* Array of initial runs. */
size_t run_cnt, run_cap; /* Number of runs, allocated capacity. */
};
/* Prototypes for helper functions. */
-static int write_initial_runs (struct external_sort *, struct casereader *);
-static int merge (struct external_sort *);
+static int write_runs (struct external_sort *, struct casereader *);
+static struct casefile *merge (struct external_sort *);
static void destroy_external_sort (struct external_sort *);
-/* Performs an external sort of the active file according to the
- specification in SCP. Forms initial runs using a heap as a
- reservoir. Determines the optimum merge pattern via Huffman's
- method (see Knuth vol. 3, 2nd edition, p. 365-366), and merges
- according to that pattern. */
+/* Performs a stable external sort of the active file according
+ to the specification in SCP. Forms initial runs using a heap
+ as a reservoir. Merges the initial runs according to a
+ pattern that assures stability. */
static struct casefile *
do_external_sort (struct casereader *reader,
const struct sort_criteria *criteria)
xsrt->value_cnt = casefile_get_value_cnt (casereader_get_casefile (reader));
xsrt->run_cap = 512;
xsrt->run_cnt = 0;
- xsrt->initial_runs = xmalloc (sizeof *xsrt->initial_runs * xsrt->run_cap);
- if (write_initial_runs (xsrt, reader) && merge (xsrt))
+ xsrt->runs = xmalloc (sizeof *xsrt->runs * xsrt->run_cap);
+ if (write_runs (xsrt, reader))
{
- struct casefile *output = xsrt->initial_runs[0];
- xsrt->initial_runs[0] = NULL;
+ struct casefile *output = merge (xsrt);
destroy_external_sort (xsrt);
return output;
}
int i;
for (i = 0; i < xsrt->run_cnt; i++)
- casefile_destroy (xsrt->initial_runs[i]);
- free (xsrt->initial_runs);
+ casefile_destroy (xsrt->runs[i]);
+ free (xsrt->runs);
free (xsrt);
}
}
{
int run; /* Run number of case. */
struct ccase record; /* Case data. */
+ size_t idx; /* Case number (for stability). */
};
/* Represents a set of initial runs during an external sort. */
static const struct case_sink_class sort_sink_class;
static void destroy_initial_run_state (struct initial_run_state *);
-static void process_case (struct initial_run_state *, const struct ccase *);
+static void process_case (struct initial_run_state *, const struct ccase *,
+ size_t);
static int allocate_cases (struct initial_run_state *);
static void output_record (struct initial_run_state *);
static void start_run (struct initial_run_state *);
/* Reads cases from READER and composes initial runs in XSRT. */
static int
-write_initial_runs (struct external_sort *xsrt, struct casereader *reader)
+write_runs (struct external_sort *xsrt, struct casereader *reader)
{
struct initial_run_state *irs;
struct ccase c;
+ size_t idx = 0;
int success = 0;
/* Allocate memory for cases. */
/* Create initial runs. */
start_run (irs);
for (; irs->okay && casereader_read (reader, &c); case_destroy (&c))
- process_case (irs, &c);
+ process_case (irs, &c, idx++);
while (irs->okay && irs->record_cnt > 0)
output_record (irs);
end_run (irs);
/* Add a single case to an initial run. */
static void
-process_case (struct initial_run_state *irs, const struct ccase *c)
+process_case (struct initial_run_state *irs, const struct ccase *c, size_t idx)
{
- struct record_run *new_record_run;
+ struct record_run *rr;
/* Compose record_run for this run and add to heap. */
assert (irs->record_cnt < irs->record_cap - 1);
- new_record_run = irs->records + irs->record_cnt++;
- case_copy (&new_record_run->record, 0, c, 0, irs->xsrt->value_cnt);
- new_record_run->run = irs->run;
+ rr = irs->records + irs->record_cnt++;
+ case_copy (&rr->record, 0, c, 0, irs->xsrt->value_cnt);
+ rr->run = irs->run;
+ rr->idx = idx;
if (!case_is_null (&irs->last_output)
&& compare_record (c, &irs->last_output, irs->xsrt->criteria) < 0)
- new_record_run->run = irs->run + 1;
+ rr->run = irs->run + 1;
push_heap (irs->records, irs->record_cnt, sizeof *irs->records,
compare_record_run_minheap, irs);
+ irs->xsrt->value_cnt * sizeof (union value)
+ 4 * sizeof (void *));
max_cases = get_max_workspace() / approx_case_cost;
+ if (max_cases > max_buffers)
+ max_cases = max_buffers;
irs->records = malloc (sizeof *irs->records * max_cases);
for (i = 0; i < max_cases; i++)
if (!case_try_create (&irs->records[i].record, irs->xsrt->value_cnt))
irs->record_cap = max_cases;
/* Fail if we didn't allocate an acceptable number of cases. */
- if (irs->records == NULL || max_cases < MIN_BUFFER_TOTAL_SIZE_RECS)
+ if (irs->records == NULL || max_cases < min_buffers)
{
msg (SE, _("Out of memory. Could not allocate room for minimum of %d "
"cases of %d bytes each. (PSPP workspace is currently "
"restricted to a maximum of %d KB.)"),
- MIN_BUFFER_TOTAL_SIZE_RECS, approx_case_cost, get_max_workspace() / 1024);
+ min_buffers, approx_case_cost, get_max_workspace() / 1024);
return 0;
}
return 1;
}
/* Compares record-run tuples A and B on run number first, then
- on the current record according to SCP. */
+ on record, then on case index. */
static int
compare_record_run (const struct record_run *a,
const struct record_run *b,
struct initial_run_state *irs)
{
- if (a->run != b->run)
- return a->run > b->run ? 1 : -1;
- else
- return compare_record (&a->record, &b->record, irs->xsrt->criteria);
+ int result = a->run < b->run ? -1 : a->run > b->run;
+ if (result == 0)
+ result = compare_record (&a->record, &b->record, irs->xsrt->criteria);
+ if (result == 0)
+ result = a->idx < b->idx ? -1 : a->idx > b->idx;
+ return result;
}
/* Compares record-run tuples A and B on run number first, then
/* Record initial run. */
if (irs->casefile != NULL)
{
+ casefile_sleep (irs->casefile);
if (xsrt->run_cnt >= xsrt->run_cap)
{
xsrt->run_cap *= 2;
- xsrt->initial_runs
- = xrealloc (xsrt->initial_runs,
- sizeof *xsrt->initial_runs * xsrt->run_cap);
+ xsrt->runs = xrealloc (xsrt->runs,
+ sizeof *xsrt->runs * xsrt->run_cap);
}
- xsrt->initial_runs[xsrt->run_cnt++] = irs->casefile;
+ xsrt->runs[xsrt->run_cnt++] = irs->casefile;
irs->casefile = NULL;
}
}
\f
/* Merging. */
-/* State of merging initial runs. */
-struct merge_state
- {
- struct external_sort *xsrt; /* External sort state. */
- struct ccase *cases; /* Buffers. */
- size_t case_cnt; /* Number of buffers. */
- };
-
-struct run;
-static struct casefile *merge_once (struct merge_state *,
+static int choose_merge (struct casefile *runs[], int run_cnt, int order);
+static struct casefile *merge_once (struct external_sort *,
struct casefile *[], size_t);
-static int mod (int, int);
-/* Performs a series of P-way merges of initial runs. */
-static int
+/* Repeatedly merges run until only one is left,
+ and returns the final casefile. */
+static struct casefile *
merge (struct external_sort *xsrt)
{
- struct merge_state mrg; /* State of merge. */
- size_t approx_case_cost; /* Approximate memory cost of one case. */
- int max_order; /* Maximum order of merge. */
- size_t dummy_run_cnt; /* Number of dummy runs to insert. */
- int success = 0;
- int i;
-
- mrg.xsrt = xsrt;
-
- /* Allocate as many cases as possible into cases. */
- approx_case_cost = (sizeof *mrg.cases
- + xsrt->value_cnt * sizeof (union value)
- + 4 * sizeof (void *));
- mrg.case_cnt = get_max_workspace() / approx_case_cost;
- mrg.cases = malloc (sizeof *mrg.cases * mrg.case_cnt);
- if (mrg.cases == NULL)
- goto done;
- for (i = 0; i < mrg.case_cnt; i++)
- if (!case_try_create (&mrg.cases[i], xsrt->value_cnt))
- {
- mrg.case_cnt = i;
- break;
- }
- if (mrg.case_cnt < MIN_BUFFER_TOTAL_SIZE_RECS)
- {
- msg (SE, _("Out of memory. Could not allocate room for minimum of %d "
- "cases of %d bytes each. (PSPP workspace is currently "
- "restricted to a maximum of %d KB.)"),
- MIN_BUFFER_TOTAL_SIZE_RECS, approx_case_cost, get_max_workspace() / 1024);
- return 0;
- }
-
- /* Determine maximum order of merge. */
- max_order = MAX_MERGE_ORDER;
- if (mrg.case_cnt / max_order < MIN_BUFFER_SIZE_RECS)
- max_order = mrg.case_cnt / MIN_BUFFER_SIZE_RECS;
- else if (mrg.case_cnt / max_order * xsrt->value_cnt * sizeof (union value)
- < MIN_BUFFER_SIZE_BYTES)
- max_order = mrg.case_cnt / (MIN_BUFFER_SIZE_BYTES
- / (xsrt->value_cnt * sizeof (union value)));
- if (max_order < 2)
- max_order = 2;
- if (max_order > xsrt->run_cnt)
- max_order = xsrt->run_cnt;
-
- /* Repeatedly merge the P shortest existing runs until only one run
- is left. */
- make_heap (xsrt->initial_runs, xsrt->run_cnt, sizeof *xsrt->initial_runs,
- compare_initial_runs, NULL);
- dummy_run_cnt = mod (1 - (int) xsrt->run_cnt, max_order - 1);
-
- assert (max_order > 0);
- assert (max_order <= 2
- || (xsrt->run_cnt + dummy_run_cnt) % (max_order - 1) == 1);
while (xsrt->run_cnt > 1)
{
- struct casefile *output_run;
- int order;
- int i;
-
- /* Choose order of merge (max_order after first merge). */
- order = max_order - dummy_run_cnt;
- dummy_run_cnt = 0;
-
- /* Choose runs to merge. */
- assert (xsrt->run_cnt >= order);
- for (i = 0; i < order; i++)
- pop_heap (xsrt->initial_runs, xsrt->run_cnt--,
- sizeof *xsrt->initial_runs,
- compare_initial_runs, NULL);
-
- /* Merge runs. */
- output_run = merge_once (&mrg,
- xsrt->initial_runs + xsrt->run_cnt, order);
- if (output_run == NULL)
- goto done;
-
- /* Add output run to heap. */
- xsrt->initial_runs[xsrt->run_cnt++] = output_run;
- push_heap (xsrt->initial_runs, xsrt->run_cnt, sizeof *xsrt->initial_runs,
- compare_initial_runs, NULL);
+ int order = min (MAX_MERGE_ORDER, xsrt->run_cnt);
+ int idx = choose_merge (xsrt->runs, xsrt->run_cnt, order);
+ xsrt->runs[idx] = merge_once (xsrt, xsrt->runs + idx, order);
+ remove_range (xsrt->runs, xsrt->run_cnt, sizeof *xsrt->runs,
+ idx + 1, order - 1);
+ xsrt->run_cnt -= order - 1;
}
-
- /* Exactly one run is left, which contains the entire sorted
- file. We could use it to find a total case count. */
assert (xsrt->run_cnt == 1);
-
- success = 1;
-
- done:
- for (i = 0; i < mrg.case_cnt; i++)
- case_destroy (&mrg.cases[i]);
- free (mrg.cases);
-
- return success;
+ xsrt->run_cnt = 0;
+ return xsrt->runs[0];
}
-/* Modulo function as defined by Knuth. */
+/* Chooses ORDER runs out of the RUN_CNT runs in RUNS to merge,
+ and returns the index of the first one.
+
+ For stability, we must merge only consecutive runs. For
+ efficiency, we choose the shortest consecutive sequence of
+ runs. */
static int
-mod (int x, int y)
+choose_merge (struct casefile *runs[], int run_cnt, int order)
{
- if (y == 0)
- return x;
- else if (x == 0)
- return 0;
- else if (x > 0 && y > 0)
- return x % y;
- else if (x < 0 && y > 0)
- return y - (-x) % y;
+ int min_idx, min_sum;
+ int cur_idx, cur_sum;
+ int i;
+
+ /* Sum up the length of the first ORDER runs. */
+ cur_sum = 0;
+ for (i = 0; i < order; i++)
+ cur_sum += casefile_get_case_cnt (runs[i]);
+
+ /* Find the shortest group of ORDER runs,
+ using a running total for efficiency. */
+ min_idx = 0;
+ min_sum = cur_sum;
+ for (cur_idx = 1; cur_idx + order <= run_cnt; cur_idx++)
+ {
+ cur_sum -= casefile_get_case_cnt (runs[cur_idx - 1]);
+ cur_sum += casefile_get_case_cnt (runs[cur_idx + order - 1]);
+ if (cur_sum < min_sum)
+ {
+ min_sum = cur_sum;
+ min_idx = cur_idx;
+ }
+ }
- abort ();
+ return min_idx;
}
-/* Merges the RUN_CNT initial runs specified in INPUT_RUNS into a
- new run. Returns nonzero only if successful. Adds an entry
- to MRG->xsrt->runs for the output file if and only if the
- output file is actually created. Always deletes all the input
- files. */
+/* Merges the RUN_CNT initial runs specified in INPUT_FILES into a
+ new run, and returns the new run. */
static struct casefile *
-merge_once (struct merge_state *mrg,
- struct casefile *input_runs[],
+merge_once (struct external_sort *xsrt,
+ struct casefile **const input_files,
size_t run_cnt)
{
- struct casereader *input_readers[MAX_MERGE_ORDER];
- struct ccase input_cases[MAX_MERGE_ORDER];
- struct casefile *output_casefile = NULL;
+ struct run
+ {
+ struct casefile *file;
+ struct casereader *reader;
+ struct ccase ccase;
+ }
+ *runs;
+
+ struct casefile *output = NULL;
int i;
+ /* Open input files. */
+ runs = xmalloc (sizeof *runs * run_cnt);
for (i = 0; i < run_cnt; i++)
{
- input_readers[i] = casefile_get_destructive_reader (input_runs[i]);
- if (!casereader_read_xfer (input_readers[i], &input_cases[i]))
+ struct run *r = &runs[i];
+ r->file = input_files[i];
+ r->reader = casefile_get_destructive_reader (r->file);
+ if (!casereader_read_xfer (r->reader, &r->ccase))
{
run_cnt--;
i--;
}
}
-
- output_casefile = casefile_create (mrg->xsrt->value_cnt);
- casefile_to_disk (output_casefile);
+
+ /* Create output file. */
+ output = casefile_create (xsrt->value_cnt);
+ casefile_to_disk (output);
/* Merge. */
while (run_cnt > 0)
{
- size_t min_idx;
-
+ struct run *min_run, *run;
+
/* Find minimum. */
- min_idx = 0;
- for (i = 1; i < run_cnt; i++)
- if (compare_record (&input_cases[i], &input_cases[min_idx],
- mrg->xsrt->criteria) < 0)
- min_idx = i;
+ min_run = runs;
+ for (run = runs + 1; run < runs + run_cnt; run++)
+ if (compare_record (&run->ccase, &min_run->ccase, xsrt->criteria) < 0)
+ min_run = run;
/* Write minimum to output file. */
- casefile_append_xfer (output_casefile, &input_cases[min_idx]);
+ casefile_append_xfer (output, &min_run->ccase);
- if (!casereader_read_xfer (input_readers[min_idx],
- &input_cases[min_idx]))
+ /* Read another case from minimum run. */
+ if (!casereader_read_xfer (min_run->reader, &min_run->ccase))
{
- casereader_destroy (input_readers[min_idx]);
- casefile_destroy (input_runs[min_idx]);
+ casereader_destroy (min_run->reader);
+ casefile_destroy (min_run->file);
+ remove_element (runs, run_cnt, sizeof *runs, min_run - runs);
run_cnt--;
- input_runs[min_idx] = input_runs[run_cnt];
- input_readers[min_idx] = input_readers[run_cnt];
- input_cases[min_idx] = input_cases[run_cnt];
}
}
- casefile_sleep (output_casefile);
+ casefile_sleep (output);
+ free (runs);
- return output_casefile;
+ return output;
}