X-Git-Url: https://pintos-os.org/cgi-bin/gitweb.cgi?a=blobdiff_plain;f=src%2Fsort.c;h=a77c5a8298f22d387e628be51850ba00627acf19;hb=3bd4593ef9a57f23062c61ec465389f749ba3089;hp=d17075ade2a132091a40d97ce64a0bfaf0aa03dd;hpb=5906e30c29662d12594199e1652ba3a7e5670944;p=pspp-builds.git diff --git a/src/sort.c b/src/sort.c index d17075ad..a77c5a82 100644 --- a/src/sort.c +++ b/src/sort.c @@ -19,18 +19,20 @@ #include #include "sort.h" -#include +#include "error.h" #include #include #include +#include "algorithm.h" #include "alloc.h" -#include "approx.h" +#include "case.h" +#include "casefile.h" #include "command.h" #include "error.h" -#include "expr.h" -#include "heap.h" +#include "expressions/public.h" #include "lexer.h" #include "misc.h" +#include "settings.h" #include "str.h" #include "var.h" #include "vfm.h" @@ -50,1336 +52,826 @@ #include "debug-print.h" -/* Variables to sort. */ -struct variable **v_sort; -int nv_sort; +/* Sort direction. */ +enum sort_direction + { + SRT_ASCEND, /* A, B, C, ..., X, Y, Z. */ + SRT_DESCEND /* Z, Y, X, ..., C, B, A. */ + }; -/* Used when internal-sorting to a separate file. */ -static struct case_list **separate_case_tab; +/* A sort criterion. */ +struct sort_criterion + { + int fv; /* Variable data index. */ + int width; /* 0=numeric, otherwise string widthe. */ + enum sort_direction dir; /* Sort direction. */ + }; -/* Other prototypes. */ -static int compare_case_lists (const void *, const void *); -static int do_internal_sort (int separate); -static int do_external_sort (int separate); -int parse_sort_variables (void); -void read_sort_output (write_case_func *write_case, write_case_data wc_data); +/* A set of sort criteria. */ +struct sort_criteria + { + struct sort_criterion *crits; + size_t crit_cnt; + }; + +static int compare_case_dblptrs (const void *, const void *, void *); +static int compare_record (const struct ccase *, const struct ccase *, + const struct sort_criteria *); +static struct casefile *do_internal_sort (struct casereader *, + const struct sort_criteria *); +static struct casefile *do_external_sort (struct casereader *, + const struct sort_criteria *); /* Performs the SORT CASES procedures. */ int cmd_sort_cases (void) { - /* First, just parse the command. */ - lex_match_id ("SORT"); - lex_match_id ("CASES"); + struct sort_criteria *criteria; + int success; + lex_match (T_BY); - if (!parse_sort_variables ()) + criteria = sort_parse_criteria (default_dict, NULL, NULL); + if (criteria == NULL) return CMD_FAILURE; - - cancel_temporary (); - /* Then it's time to do the actual work. There are two cases: - - (internal sort) All the data is in memory. In this case, we - perform an EXECUTE to get the data into the desired form, then - sort the cases in place, if it is still all in memory. + success = sort_active_file_in_place (criteria); + sort_destroy_criteria (criteria); + return success ? lex_end_of_command () : CMD_FAILURE; +} - (external sort) The data is not in memory. It may be coming from - a system file or other data file, etc. In any case, it is now - time to perform an external sort. This is better explained in - do_external_sort(). */ +/* Gets ready to sort the active file, either in-place or to a + separate casefile. */ +static void +prepare_to_sort_active_file (void) +{ + /* Cancel temporary transformations and PROCESS IF. */ + if (temporary != 0) + cancel_temporary (); + expr_free (process_if_expr); + process_if_expr = NULL; - /* Do all this dirty work. */ - { - int success = sort_cases (0); - free (v_sort); - if (success) - return lex_end_of_command (); - else - return CMD_FAILURE; - } + /* Make sure source cases are in a storage source. */ + procedure (NULL, NULL); + assert (case_source_is_class (vfm_source, &storage_source_class)); } -/* Parses a list of sort variables into v_sort and nv_sort. */ +/* Sorts the active file in-place according to CRITERIA. + Returns nonzero if successful. */ int -parse_sort_variables (void) +sort_active_file_in_place (const struct sort_criteria *criteria) +{ + struct casefile *src, *dst; + + prepare_to_sort_active_file (); + + src = storage_source_get_casefile (vfm_source); + dst = sort_execute (casefile_get_destructive_reader (src), criteria); + free_case_source (vfm_source); + vfm_source = NULL; + + if (dst == NULL) + return 0; + + vfm_source = storage_source_create (dst, default_dict); + return 1; +} + +/* Sorts the active file to a separate casefile. If successful, + returns the sorted casefile. Returns a null pointer on + failure. */ +struct casefile * +sort_active_file_to_casefile (const struct sort_criteria *criteria) { - v_sort = NULL; - nv_sort = 0; + struct casefile *src; + + prepare_to_sort_active_file (); + + src = storage_source_get_casefile (vfm_source); + return sort_execute (casefile_get_reader (src), criteria); +} + +/* Parses a list of sort keys and returns a struct sort_cases_pgm + based on it. Returns a null pointer on error. */ +struct sort_criteria * +sort_parse_criteria (const struct dictionary *dict, + struct variable ***vars, int *var_cnt) +{ + struct sort_criteria *criteria; + struct variable **local_vars = NULL; + size_t local_var_cnt; + + assert ((vars == NULL) == (var_cnt == NULL)); + if (vars == NULL) + { + vars = &local_vars; + var_cnt = &local_var_cnt; + } + + criteria = xmalloc (sizeof *criteria); + criteria->crits = NULL; + criteria->crit_cnt = 0; + + *vars = NULL; + *var_cnt = 0; + do { - int prev_nv_sort = nv_sort; - int order = SRT_ASCEND; + int prev_var_cnt = *var_cnt; + enum sort_direction direction; - if (!parse_variables (default_dict, &v_sort, &nv_sort, + /* Variables. */ + if (!parse_variables (dict, vars, var_cnt, PV_NO_DUPLICATE | PV_APPEND | PV_NO_SCRATCH)) - return 0; + goto error; + + /* Sort direction. */ if (lex_match ('(')) { if (lex_match_id ("D") || lex_match_id ("DOWN")) - order = SRT_DESCEND; - else if (!lex_match_id ("A") && !lex_match_id ("UP")) + direction = SRT_DESCEND; + else if (lex_match_id ("A") || lex_match_id ("UP")) + direction = SRT_ASCEND; + else { - free (v_sort); msg (SE, _("`A' or `D' expected inside parentheses.")); - return 0; + goto error; } if (!lex_match (')')) { - free (v_sort); msg (SE, _("`)' expected.")); - return 0; + goto error; } } - for (; prev_nv_sort < nv_sort; prev_nv_sort++) - v_sort[prev_nv_sort]->p.srt.order = order; + else + direction = SRT_ASCEND; + + criteria->crits = xrealloc (criteria->crits, + sizeof *criteria->crits * *var_cnt); + criteria->crit_cnt = *var_cnt; + for (; prev_var_cnt < criteria->crit_cnt; prev_var_cnt++) + { + struct sort_criterion *c = &criteria->crits[prev_var_cnt]; + c->fv = (*vars)[prev_var_cnt]->fv; + c->width = (*vars)[prev_var_cnt]->width; + c->dir = direction; + } } while (token != '.' && token != '/'); - - return 1; -} -/* Sorts the active file based on the key variables specified in - global variables v_sort and nv_sort. The output is either to the - active file, if SEPARATE is zero, or to a separate file, if - SEPARATE is nonzero. In the latter case the output cases can be - read with a call to read_sort_output(). (In the former case the - output cases should be dealt with through the usual vfm interface.) + free (local_vars); + return criteria; - The caller is responsible for freeing v_sort[]. */ -int -sort_cases (int separate) -{ - assert (separate_case_tab == NULL); - - /* Not sure this is necessary but it's good to be safe. */ - if (separate && vfm_source == &sort_stream) - procedure (NULL, NULL, NULL, NULL); - - /* SORT CASES cancels PROCESS IF. */ - expr_free (process_if_expr); - process_if_expr = NULL; - - if (do_internal_sort (separate)) - return 1; - - page_to_disk (); - return do_external_sort (separate); + error: + free (local_vars); + sort_destroy_criteria (criteria); + return NULL; } -/* If a reasonable situation is set up, do an internal sort of the - data. Return success. */ -static int -do_internal_sort (int separate) +/* Destroys a SORT CASES program. */ +void +sort_destroy_criteria (struct sort_criteria *criteria) { - if (vfm_source != &vfm_disk_stream) + if (criteria != NULL) { - if (vfm_source != &vfm_memory_stream) - procedure (NULL, NULL, NULL, NULL); - if (vfm_source == &vfm_memory_stream) - { - struct case_list **case_tab = malloc (sizeof *case_tab - * (vfm_source_info.ncases + 1)); - if (vfm_source_info.ncases == 0) - { - free (case_tab); - return 1; - } - if (case_tab != NULL) - { - struct case_list *clp = memory_source_cases; - struct case_list **ctp = case_tab; - int i; - - for (; clp; clp = clp->next) - *ctp++ = clp; - qsort (case_tab, vfm_source_info.ncases, sizeof *case_tab, - compare_case_lists); - - if (!separate) - { - memory_source_cases = case_tab[0]; - for (i = 1; i < vfm_source_info.ncases; i++) - case_tab[i - 1]->next = case_tab[i]; - case_tab[vfm_source_info.ncases - 1]->next = NULL; - free (case_tab); - } else { - case_tab[vfm_source_info.ncases] = NULL; - separate_case_tab = case_tab; - } - - return 1; - } - } + free (criteria->crits); + free (criteria); } - return 0; } -/* Compares the NV_SORT variables in V_SORT[] between the - `case_list's at A and B, and returns a strcmp()-type - result. */ -static int -compare_case_lists (const void *a_, const void *b_) +/* Reads all the cases from READER, which is destroyed. Sorts + the cases according to CRITERIA. Returns the sorted cases in + a newly created casefile. */ +struct casefile * +sort_execute (struct casereader *reader, const struct sort_criteria *criteria) { - struct case_list *const *pa = a_; - struct case_list *const *pb = b_; - struct case_list *a = *pa; - struct case_list *b = *pb; - struct variable *v; - int result = 0; - int i; + struct casefile *output; - for (i = 0; i < nv_sort; i++) + output = do_internal_sort (reader, criteria); + if (output == NULL) + output = do_external_sort (reader, criteria); + casereader_destroy (reader); + return output; +} + +/* If the data is in memory, do an internal sort and return a new + casefile for the data. */ +static struct casefile * +do_internal_sort (struct casereader *reader, + const struct sort_criteria *criteria) +{ + const struct casefile *src; + struct casefile *dst; + struct ccase *cases, **case_ptrs; + unsigned long case_cnt; + + src = casereader_get_casefile (reader); + if (casefile_get_case_cnt (src) > 1 && !casefile_in_core (src)) + return NULL; + + case_cnt = casefile_get_case_cnt (src); + cases = malloc (sizeof *cases * case_cnt); + case_ptrs = malloc (sizeof *case_ptrs * case_cnt); + if ((cases != NULL && case_ptrs != NULL) || case_cnt == 0) { - v = v_sort[i]; + unsigned long case_idx; - if (v->type == NUMERIC) - { - double af = a->c.data[v->fv].f; - double bf = b->c.data[v->fv].f; - - result = af < bf ? -1 : af > bf; - } - else - result = memcmp (a->c.data[v->fv].s, b->c.data[v->fv].s, v->width); - - if (result != 0) - break; + for (case_idx = 0; case_idx < case_cnt; case_idx++) + { + int success = casereader_read_xfer (reader, &cases[case_idx]); + assert (success); + case_ptrs[case_idx] = &cases[case_idx]; + } + + sort (case_ptrs, case_cnt, sizeof *case_ptrs, compare_case_dblptrs, + (void *) criteria); + + dst = casefile_create (casefile_get_value_cnt (src)); + for (case_idx = 0; case_idx < case_cnt; case_idx++) + casefile_append_xfer (dst, case_ptrs[case_idx]); } + else + dst = NULL; + + free (case_ptrs); + free (cases); - if (v->p.srt.order == SRT_DESCEND) - result = -result; - return result; + return dst; +} + +/* Compares the variables specified by CRITERIA between the cases + at A and B, and returns a strcmp()-type result. */ +static int +compare_case_dblptrs (const void *a_, const void *b_, void *criteria_) +{ + struct sort_criteria *criteria = criteria_; + struct ccase *const *pa = a_; + struct ccase *const *pb = b_; + struct ccase *a = *pa; + struct ccase *b = *pb; + + return compare_record (a, b, criteria); } /* External sort. */ -/* Maximum number of input + output file handles. */ -#if defined FOPEN_MAX && (FOPEN_MAX - 5 < 18) -#define MAX_FILE_HANDLES (FOPEN_MAX - 5) -#else -#define MAX_FILE_HANDLES 18 -#endif - -#if MAX_FILE_HANDLES < 3 -#error At least 3 file handles must be available for sorting. -#endif - -/* Number of input buffers. */ -#define N_INPUT_BUFFERS (MAX_FILE_HANDLES - 1) - -/* Maximum order of merge. This is the value suggested by Knuth; - specifically, he said to use tree selection, which we don't - implement, for larger orders of merge. */ +/* Maximum order of merge. If you increase this, then you should + use a heap for comparing cases during merge. */ #define MAX_MERGE_ORDER 7 /* Minimum total number of records for buffers. */ #define MIN_BUFFER_TOTAL_SIZE_RECS 64 -/* Minimum single input or output buffer size, in bytes and records. */ +/* Minimum single input buffer size, in bytes and records. */ #define MIN_BUFFER_SIZE_BYTES 4096 #define MIN_BUFFER_SIZE_RECS 16 -/* Structure for replacement selection tree. */ -struct repl_sel_tree - { - struct repl_sel_tree *loser;/* Loser associated w/this internal node. */ - int rn; /* Run number of `loser'. */ - struct repl_sel_tree *fe; /* Internal node above this external node. */ - struct repl_sel_tree *fi; /* Internal node above this internal node. */ - union value record[1]; /* The case proper. */ - }; - -/* Static variables used for sorting. */ -static struct repl_sel_tree **x; /* Buffers. */ -static int x_max; /* Size of buffers, in records. */ -static int records_per_buffer; /* Number of records in each buffer. */ - -/* In the merge phase, the first N_INPUT_BUFFERS handle[] elements are - input files and the last element is the output file. Before that, - they're all used as output files, although the last one is - segregated. */ -static FILE *handle[MAX_FILE_HANDLES]; /* File handles. */ - -/* Now, MAX_FILE_HANDLES is the maximum number of files we will *try* - to open. But if we can't open that many, max_handles will be set - to the number we apparently can open. */ -static int max_handles; /* Maximum number of handles. */ - -/* When we create temporary files, they are all put in the same - directory and numbered sequentially from zero. tmp_basename is the - drive/directory, etc., and tmp_extname can be sprintf() with "%08x" - to the file number, then tmp_basename used to open the file. */ -static char *tmp_basename; /* Temporary file basename. */ -static char *tmp_extname; /* Temporary file extension name. */ - -/* We use Huffman's method to determine the merge pattern. This means - that we need to know which runs are the shortest at any given time. - Priority queues as implemented by heap.c are a natural for this - task (probably because I wrote the code specifically for it). */ -static struct heap *huffman_queue; /* Huffman priority queue. */ - -/* Prototypes for helper functions. */ -static void sort_stream_write (void); -static int write_initial_runs (int separate); -static int allocate_cases (void); -static int allocate_file_handles (void); -static int merge (void); -static void rmdir_temp_dir (void); - -/* Performs an external sort of the active file. A description of the - procedure follows. All page references refer to Knuth's _Art of - Computer Programming, Vol. 3: Sorting and Searching_, which is the - canonical resource for sorting. - - 1. The data is read and S initial runs are formed through the - action of algorithm 5.4.1R (replacement selection). - - 2. Huffman's method (p. 365-366) is used to determine the optimum - merge pattern. - - 3. If an OS that supports overlapped reading, writing, and - computing is being run, we should use 5.4.6F for forecasting. - Otherwise, buffers are filled only when they run out of data. - FIXME: Since the author of PSPP uses GNU/Linux, which does not - yet implement overlapped r/w/c, 5.4.6F is not used. - - 4. We perform P-way merges: - - (a) The desired P is the smallest P such that ceil(ln(S)/ln(P)) - is minimized. (FIXME: Since I don't have an algorithm for - minimizing this, it's just set to MAX_MERGE_ORDER.) - - (b) P is reduced if the selected value would make input buffers - less than 4096 bytes each, or 16 records, whichever is larger. - - (c) P is reduced if we run out of available file handles or space - for file handles. +#if MIN_BUFFER_SIZE_RECS * 2 + 16 > MIN_BUFFER_TOTAL_SIZE_RECS +#error MIN_BUFFER_SIZE_RECS and MIN_BUFFER_TOTAL_SIZE_RECS do not make sense. +#endif - (d) P is reduced if we don't have space for one or two output - buffers, which have the same minimum size as input buffers. (We - need two output buffers if 5.4.6F is in use for forecasting.) */ +/* Sorts initial runs A and B in decending order by length. */ static int -do_external_sort (int separate) +compare_initial_runs (const void *a_, const void *b_, void *aux UNUSED) { - int success = 0; - - assert (MAX_FILE_HANDLES >= 3); - - x = NULL; - tmp_basename = NULL; - - huffman_queue = heap_create (512); - if (huffman_queue == NULL) - return 0; - - if (!allocate_cases ()) - goto lossage; - - if (!allocate_file_handles ()) - goto lossage; - - if (!write_initial_runs (separate)) - goto lossage; + struct casefile *const *a = a_; + struct casefile *const *b = b_; + unsigned long a_case_cnt = casefile_get_case_cnt (*a); + unsigned long b_case_cnt = casefile_get_case_cnt (*b); + + return a_case_cnt > b_case_cnt ? -1 : a_case_cnt < b_case_cnt; +} - merge (); +/* Results of an external sort. */ +struct external_sort + { + const struct sort_criteria *criteria; /* Sort criteria. */ + size_t value_cnt; /* Size of data in `union value's. */ + struct casefile **initial_runs; /* Array of initial runs. */ + size_t run_cnt, run_cap; /* Number of runs, allocated capacity. */ + }; - success = 1; +/* Prototypes for helper functions. */ +static int write_initial_runs (struct external_sort *, struct casereader *); +static int merge (struct external_sort *); +static void destroy_external_sort (struct external_sort *); + +/* Performs an external sort of the active file according to the + specification in SCP. Forms initial runs using a heap as a + reservoir. Determines the optimum merge pattern via Huffman's + method (see Knuth vol. 3, 2nd edition, p. 365-366), and merges + according to that pattern. */ +static struct casefile * +do_external_sort (struct casereader *reader, + const struct sort_criteria *criteria) +{ + struct external_sort *xsrt; - /* Despite the name, flow of control comes here regardless of - whether or not the sort is successful. */ -lossage: - heap_destroy (huffman_queue); + casefile_to_disk (casereader_get_casefile (reader)); - if (x) + xsrt = xmalloc (sizeof *xsrt); + xsrt->criteria = criteria; + xsrt->value_cnt = casefile_get_value_cnt (casereader_get_casefile (reader)); + xsrt->run_cap = 512; + xsrt->run_cnt = 0; + xsrt->initial_runs = xmalloc (sizeof *xsrt->initial_runs * xsrt->run_cap); + if (write_initial_runs (xsrt, reader) && merge (xsrt)) { - int i; - - for (i = 0; i <= x_max; i++) - free (x[i]); - free (x); + struct casefile *output = xsrt->initial_runs[0]; + xsrt->initial_runs[0] = NULL; + destroy_external_sort (xsrt); + return output; } - - if (!success) - rmdir_temp_dir (); - - return success; -} - -#if !HAVE_GETPID -#define getpid() (0) -#endif - -/* Sets up to open temporary files. */ -/* PORTME: This creates a directory for temporary files. Some OSes - might not have that concept... */ -static int -allocate_file_handles (void) -{ - const char *dir; /* Directory prefix. */ - char *buf; /* String buffer. */ - char *cp; /* Pointer into buf. */ - - dir = getenv ("SPSSTMPDIR"); - if (dir == NULL) - dir = getenv ("SPSSXTMPDIR"); - if (dir == NULL) - dir = getenv ("TMPDIR"); -#ifdef P_tmpdir - if (dir == NULL) - dir = P_tmpdir; -#endif -#ifdef unix - if (dir == NULL) - dir = "/tmp"; -#elif defined (__MSDOS__) - if (dir == NULL) - dir = getenv ("TEMP"); - if (dir == NULL) - dir = getenv ("TMP"); - if (dir == NULL) - dir = "\\"; -#else - dir = ""; -#endif - - buf = xmalloc (strlen (dir) + 1 + 4 + 8 + 4 + 1 + INT_DIGITS + 1); - cp = spprintf (buf, "%s%c%04lX%04lXpspp", dir, DIR_SEPARATOR, - ((long) time (0)) & 0xffff, ((long) getpid ()) & 0xffff); -#ifndef __MSDOS__ - if (-1 == mkdir (buf, S_IRWXU)) -#else - if (-1 == mkdir (buf)) -#endif + else { - free (buf); - msg (SE, _("%s: Cannot create temporary directory: %s."), - buf, strerror (errno)); - return 0; + destroy_external_sort (xsrt); + return NULL; } - *cp++ = DIR_SEPARATOR; - - tmp_basename = buf; - tmp_extname = cp; - - max_handles = MAX_FILE_HANDLES; - - return 1; } -/* Removes the directory created for temporary files, if one exists. - Also frees tmp_basename. */ +/* Destroys XSRT. */ static void -rmdir_temp_dir (void) -{ - if (NULL == tmp_basename) - return; - - tmp_extname[-1] = '\0'; - if (rmdir (tmp_basename) == -1) - msg (SE, _("%s: Error removing directory for temporary files: %s."), - tmp_basename, strerror (errno)); - - free (tmp_basename); -} - -/* Allocates room for lots of cases as a buffer. */ -static int -allocate_cases (void) +destroy_external_sort (struct external_sort *xsrt) { - /* This is the size of one case. */ - const int case_size = (sizeof (struct repl_sel_tree) - + (sizeof (union value) - * (dict_get_value_cnt (default_dict) - 1)) - + sizeof (struct repl_sel_tree *)); - - x = NULL; - - /* Allocate as many cases as we can, assuming a space of four - void pointers for malloc()'s internal bookkeeping. */ - x_max = MAX_WORKSPACE / (case_size + 4 * sizeof (void *)); - x = malloc (sizeof (struct repl_sel_tree *) * x_max); - if (x != NULL) + if (xsrt != NULL) { int i; - - for (i = 0; i < x_max; i++) - { - x[i] = malloc (sizeof (struct repl_sel_tree) - + (sizeof (union value) - * (dict_get_value_cnt (default_dict) - 1))); - if (x[i] == NULL) - break; - } - x_max = i; - } - if (x == NULL || x_max < MIN_BUFFER_TOTAL_SIZE_RECS) - { - if (x != NULL) - { - int i; - - for (i = 0; i < x_max; i++) - free (x[i]); - } - free (x); - msg (SE, _("Out of memory. Could not allocate room for minimum of %d " - "cases of %d bytes each. (PSPP workspace is currently " - "restricted to a maximum of %d KB.)"), - MIN_BUFFER_TOTAL_SIZE_RECS, case_size, MAX_WORKSPACE / 1024); - x_max = 0; - x = NULL; - return 0; + + for (i = 0; i < xsrt->run_cnt; i++) + casefile_destroy (xsrt->initial_runs[i]); + free (xsrt->initial_runs); + free (xsrt); } - - /* The last element of the array is used to store lastkey. */ - x_max--; - - debug_printf ((_("allocated %d cases == %d bytes\n"), - x_max, x_max * case_size)); - return 1; } /* Replacement selection. */ -static int rmax, rc, rq; -static struct repl_sel_tree *q; -static union value *lastkey; -static int run_no, file_index; -static int deferred_abort; -static int run_length; - -static int compare_record (union value *, union value *); - -static inline void -output_record (union value *v) -{ - union value *src_case; - - if (deferred_abort) - return; +/* Pairs a record with a run number. */ +struct record_run + { + int run; /* Run number of case. */ + struct ccase record; /* Case data. */ + }; - if (compaction_necessary) - { - compact_case (compaction_case, (struct ccase *) v); - src_case = (union value *) compaction_case; - } - else - src_case = (union value *) v; +/* Represents a set of initial runs during an external sort. */ +struct initial_run_state + { + struct external_sort *xsrt; + + /* Reservoir. */ + struct record_run *records; /* Records arranged as a heap. */ + size_t record_cnt; /* Current number of records. */ + size_t record_cap; /* Capacity for records. */ + + /* Run currently being output. */ + int run; /* Run number. */ + size_t case_cnt; /* Number of cases so far. */ + struct casefile *casefile; /* Output file. */ + struct ccase last_output; /* Record last output. */ + + int okay; /* Zero if an error has been encountered. */ + }; - if ((int) fwrite (src_case, sizeof *src_case, compaction_nval, - handle[file_index]) - != compaction_nval) - { - deferred_abort = 1; - sprintf (tmp_extname, "%08x", run_no); - msg (SE, _("%s: Error writing temporary file: %s."), - tmp_basename, strerror (errno)); - return; - } +static const struct case_sink_class sort_sink_class; - run_length++; -} +static void destroy_initial_run_state (struct initial_run_state *); +static void process_case (struct initial_run_state *, const struct ccase *); +static int allocate_cases (struct initial_run_state *); +static void output_record (struct initial_run_state *); +static void start_run (struct initial_run_state *); +static void end_run (struct initial_run_state *); +static int compare_record_run (const struct record_run *, + const struct record_run *, + struct initial_run_state *); +static int compare_record_run_minheap (const void *, const void *, void *); +/* Reads cases from READER and composes initial runs in XSRT. */ static int -close_handle (int i) +write_initial_runs (struct external_sort *xsrt, struct casereader *reader) { - int result = fclose (handle[i]); - msg (VM (2), _("SORT: Closing handle %d."), i); - - handle[i] = NULL; - if (EOF == result) - { - sprintf (tmp_extname, "%08x", i); - msg (SE, _("%s: Error closing temporary file: %s."), - tmp_basename, strerror (errno)); - return 0; - } - return 1; -} + struct initial_run_state *irs; + struct ccase c; + int success = 0; -static int -close_handles (int beg, int end) -{ - int success = 1; - int i; + /* Allocate memory for cases. */ + irs = xmalloc (sizeof *irs); + irs->xsrt = xsrt; + irs->records = NULL; + irs->record_cnt = irs->record_cap = 0; + irs->run = 0; + irs->case_cnt = 0; + irs->casefile = NULL; + case_nullify (&irs->last_output); + irs->okay = 1; + if (!allocate_cases (irs)) + goto done; + + /* Create initial runs. */ + start_run (irs); + for (; irs->okay && casereader_read (reader, &c); case_destroy (&c)) + process_case (irs, &c); + while (irs->okay && irs->record_cnt > 0) + output_record (irs); + end_run (irs); + + success = irs->okay; + + done: + destroy_initial_run_state (irs); - for (i = beg; i < end; i++) - success &= close_handle (i); return success; } -static int -open_handle_w (int handle_no, int run_no) +/* Add a single case to an initial run. */ +static void +process_case (struct initial_run_state *irs, const struct ccase *c) { - sprintf (tmp_extname, "%08x", run_no); - msg (VM (1), _("SORT: %s: Opening for writing as run %d."), - tmp_basename, run_no); - - /* The `x' modifier causes the GNU C library to insist on creating a - new file--if the file already exists, an error is signaled. The - ANSI C standard says that other libraries should ignore anything - after the `w+b', so it shouldn't be a problem. */ - return NULL != (handle[handle_no] = fopen (tmp_basename, "w+bx")); + struct record_run *new_record_run; + + /* Compose record_run for this run and add to heap. */ + assert (irs->record_cnt < irs->record_cap - 1); + new_record_run = irs->records + irs->record_cnt++; + case_copy (&new_record_run->record, 0, c, 0, irs->xsrt->value_cnt); + new_record_run->run = irs->run; + if (!case_is_null (&irs->last_output) + && compare_record (c, &irs->last_output, irs->xsrt->criteria) < 0) + new_record_run->run = irs->run + 1; + push_heap (irs->records, irs->record_cnt, sizeof *irs->records, + compare_record_run_minheap, irs); + + /* Output a record if the reservoir is full. */ + if (irs->record_cnt == irs->record_cap - 1 && irs->okay) + output_record (irs); } -static int -open_handle_r (int handle_no, int run_no) -{ - FILE *f; - - sprintf (tmp_extname, "%08x", run_no); - msg (VM (1), _("SORT: %s: Opening for writing as run %d."), - tmp_basename, run_no); - f = handle[handle_no] = fopen (tmp_basename, "rb"); - - if (f == NULL) - { - msg (SE, _("%s: Error opening temporary file for reading: %s."), - tmp_basename, strerror (errno)); - return 0; - } - - return 1; -} - -/* Begins a new initial run, specifically its output file. */ +/* Destroys the initial run state represented by IRS. */ static void -begin_run (void) +destroy_initial_run_state (struct initial_run_state *irs) { - /* Decide which handle[] to use. If run_no is max_handles or - greater, then we've run out of handles so it's time to just do - one file at a time, which by default is handle 0. */ - file_index = (run_no < max_handles ? run_no : 0); - run_length = 0; - - /* Alright, now create the temporary file. */ - if (open_handle_w (file_index, run_no) == 0) - { - /* Failure to create the temporary file. Check if there are - unacceptably few files already open. */ - if (file_index < 3) - { - deferred_abort = 1; - msg (SE, _("%s: Error creating temporary file: %s."), - tmp_basename, strerror (errno)); - return; - } + int i; - /* Close all the open temporary files. */ - if (!close_handles (0, file_index)) - return; + if (irs == NULL) + return; - /* Now try again to create the temporary file. */ - max_handles = file_index; - file_index = 0; - if (open_handle_w (0, run_no) == 0) - { - /* It still failed, report it this time. */ - deferred_abort = 1; - msg (SE, _("%s: Error creating temporary file: %s."), - tmp_basename, strerror (errno)); - return; - } - } -} + for (i = 0; i < irs->record_cap; i++) + case_destroy (&irs->records[i].record); + free (irs->records); -/* Ends the current initial run. Just increments run_no if no initial - run has been started yet. */ -static void -end_run (void) -{ - /* Close file handles if necessary. */ - { - int result; - - if (run_no == max_handles - 1) - result = close_handles (0, max_handles); - else if (run_no >= max_handles) - result = close_handle (0); - else - result = 1; - if (!result) - deferred_abort = 1; - } - - /* Advance to next run. */ - run_no++; - if (run_no) - heap_insert (huffman_queue, run_no - 1, run_length); + if (irs->casefile != NULL) + casefile_sleep (irs->casefile); + + free (irs); } -/* Performs 5.4.1R. */ +/* Allocates room for lots of cases as a buffer. */ static int -write_initial_runs (int separate) +allocate_cases (struct initial_run_state *irs) { - run_no = -1; - deferred_abort = 0; - - /* Steps R1, R2, R3. */ - rmax = 0; - rc = 0; - lastkey = NULL; - q = x[0]; - rq = 0; - { - int j; + int approx_case_cost; /* Approximate memory cost of one case in bytes. */ + int max_cases; /* Maximum number of cases to allocate. */ + int i; - for (j = 0; j < x_max; j++) + /* Allocate as many cases as we can within the workspace + limit. */ + approx_case_cost = (sizeof *irs->records + + irs->xsrt->value_cnt * sizeof (union value) + + 4 * sizeof (void *)); + max_cases = get_max_workspace() / approx_case_cost; + irs->records = malloc (sizeof *irs->records * max_cases); + for (i = 0; i < max_cases; i++) + if (!case_try_create (&irs->records[i].record, irs->xsrt->value_cnt)) { - struct repl_sel_tree *J = x[j]; - - J->loser = J; - J->rn = 0; - J->fe = x[(x_max + j) / 2]; - J->fi = x[j / 2]; - memset (J->record, 0, - dict_get_value_cnt (default_dict) * sizeof (union value)); + max_cases = i; + break; } - } - - /* Most of the iterations of steps R4, R5, R6, R7, R2, R3, ... */ - if (!separate) - { - if (vfm_sink) - vfm_sink->destroy_sink (); - vfm_sink = &sort_stream; - } - procedure (NULL, NULL, NULL, NULL); - - /* Final iterations of steps R4, R5, R6, R7, R2, R3, ... */ - for (;;) - { - struct repl_sel_tree *t; - - /* R4. */ - rq = rmax + 1; - - /* R5. */ - t = q->fe; - - /* R6 and R7. */ - for (;;) - { - /* R6. */ - if (t->rn < rq - || (t->rn == rq - && compare_record (t->loser->record, q->record) < 0)) - { - struct repl_sel_tree *temp_tree; - int temp_int; - - temp_tree = t->loser; - t->loser = q; - q = temp_tree; - - temp_int = t->rn; - t->rn = rq; - rq = temp_int; - } - - /* R7. */ - if (t == x[1]) - break; - t = t->fi; - } - - /* R2. */ - if (rq != rc) - { - end_run (); - if (rq > rmax) - break; - begin_run (); - rc = rq; - } + irs->record_cap = max_cases; - /* R3. */ - if (rq != 0) - { - output_record (q->record); - lastkey = x[x_max]->record; - memcpy (lastkey, q->record, sizeof (union value) * vfm_sink_info.nval); - } - } - assert (run_no == rmax); - - /* If an unrecoverable error occurred somewhere in the above code, - then the `deferred_abort' flag would have been set. */ - if (deferred_abort) + /* Fail if we didn't allocate an acceptable number of cases. */ + if (irs->records == NULL || max_cases < MIN_BUFFER_TOTAL_SIZE_RECS) { - int i; - - for (i = 0; i < max_handles; i++) - if (handle[i] != NULL) - { - sprintf (tmp_extname, "%08x", i); - - if (fclose (handle[i]) == EOF) - msg (SE, _("%s: Error closing temporary file: %s."), - tmp_basename, strerror (errno)); - - if (remove (tmp_basename) != 0) - msg (SE, _("%s: Error removing temporary file: %s."), - tmp_basename, strerror (errno)); - - handle[i] = NULL; - } + msg (SE, _("Out of memory. Could not allocate room for minimum of %d " + "cases of %d bytes each. (PSPP workspace is currently " + "restricted to a maximum of %d KB.)"), + MIN_BUFFER_TOTAL_SIZE_RECS, approx_case_cost, get_max_workspace() / 1024); return 0; } - return 1; } -/* Compares the NV_SORT variables in V_SORT[] between the `value's at +/* Compares the VAR_CNT variables in VARS[] between the `value's at A and B, and returns a strcmp()-type result. */ static int -compare_record (union value * a, union value * b) +compare_record (const struct ccase *a, const struct ccase *b, + const struct sort_criteria *criteria) { int i; - int result = 0; - struct variable *v; assert (a != NULL); - if (b == NULL) /* Sort NULLs after everything else. */ - return -1; - - for (i = 0; i < nv_sort; i++) + assert (b != NULL); + + for (i = 0; i < criteria->crit_cnt; i++) { - v = v_sort[i]; - - if (v->type == NUMERIC) - { - if (approx_ne (a[v->fv].f, b[v->fv].f)) - { - result = (a[v->fv].f > b[v->fv].f) ? 1 : -1; - break; - } - } + const struct sort_criterion *c = &criteria->crits[i]; + int result; + + if (c->width == 0) + { + double af = case_num (a, c->fv); + double bf = case_num (b, c->fv); + + result = af < bf ? -1 : af > bf; + } else - { - result = memcmp (a[v->fv].s, b[v->fv].s, v->width); - if (result != 0) - break; - } - } + result = memcmp (case_str (a, c->fv), case_str (b, c->fv), c->width); - if (v->p.srt.order == SRT_ASCEND) - return result; - else - { - assert (v->p.srt.order == SRT_DESCEND); - return -result; + if (result != 0) + return c->dir == SRT_ASCEND ? result : -result; } -} - -/* Merging. */ -static int merge_once (int run_index[], int run_length[], int n_runs); + return 0; +} -/* Modula function as defined by Knuth. */ +/* Compares record-run tuples A and B on run number first, then + on the current record according to SCP. */ static int -mod (int x, int y) +compare_record_run (const struct record_run *a, + const struct record_run *b, + struct initial_run_state *irs) { - int result; - - if (y == 0) - return x; - result = abs (x) % abs (y); - if (y < 0) - result = -result; - return result; + if (a->run != b->run) + return a->run > b->run ? 1 : -1; + else + return compare_record (&a->record, &b->record, irs->xsrt->criteria); } -/* Performs a series of P-way merges of initial runs using Huffman's - method. */ +/* Compares record-run tuples A and B on run number first, then + on the current record according to SCP, but in descending + order. */ static int -merge (void) +compare_record_run_minheap (const void *a, const void *b, void *irs) { - /* Order of merge. */ - int order; - - /* Idiot check. */ - assert (MIN_BUFFER_SIZE_RECS * 2 <= MIN_BUFFER_TOTAL_SIZE_RECS - 1); - - /* Close all the input files. I hope that the boundary conditions - are correct on this but I'm not sure. */ - if (run_no < max_handles) - { - int i; + return -compare_record_run (a, b, irs); +} - for (i = 0; i < run_no; ) - if (!close_handle (i++)) - { - for (; i < run_no; i++) - close_handle (i); - return 0; - } - } +/* Begins a new initial run, specifically its output file. */ +static void +start_run (struct initial_run_state *irs) +{ + irs->run++; + irs->case_cnt = 0; + irs->casefile = casefile_create (irs->xsrt->value_cnt); + casefile_to_disk (irs->casefile); + case_nullify (&irs->last_output); +} - /* Determine order of merge. */ - order = MAX_MERGE_ORDER; - if (x_max / order < MIN_BUFFER_SIZE_RECS) - order = x_max / MIN_BUFFER_SIZE_RECS; - else if (x_max / order * sizeof (union value) * dict_get_value_cnt (default_dict) - < MIN_BUFFER_SIZE_BYTES) - order = x_max / (MIN_BUFFER_SIZE_BYTES - / (sizeof (union value) - * (dict_get_value_cnt (default_dict) - 1))); - - /* Make sure the order of merge is bounded. */ - if (order < 2) - order = 2; - if (order > rmax) - order = rmax; - assert (x_max / order > 0); - - /* Calculate number of records per buffer. */ - records_per_buffer = x_max / order; - - /* Add (1 - S) mod (P - 1) dummy runs of length 0. */ - { - int n_dummy_runs = mod (1 - rmax, order - 1); - debug_printf (("rmax=%d, order=%d, n_dummy_runs=%d\n", - rmax, order, n_dummy_runs)); - assert (n_dummy_runs >= 0); - while (n_dummy_runs--) - { - heap_insert (huffman_queue, -2, 0); - rmax++; - } - } +/* Ends the current initial run. */ +static void +end_run (struct initial_run_state *irs) +{ + struct external_sort *xsrt = irs->xsrt; - /* Repeatedly merge the P shortest existing runs until only one run - is left. */ - while (rmax > 1) + /* Record initial run. */ + if (irs->casefile != NULL) { - int run_index[MAX_MERGE_ORDER]; - int run_length[MAX_MERGE_ORDER]; - int total_run_length = 0; - int i; - - assert (rmax >= order); - - /* Find the shortest runs; put them in runs[] in reverse order - of length, to force dummy runs of length 0 to the end of the - list. */ - debug_printf ((_("merging runs"))); - for (i = order - 1; i >= 0; i--) - { - run_index[i] = heap_delete (huffman_queue, &run_length[i]); - assert (run_index[i] != -1); - total_run_length += run_length[i]; - debug_printf ((" %d(%d)", run_index[i], run_length[i])); - } - debug_printf ((_(" into run %d(%d)\n"), run_no, total_run_length)); - - if (!merge_once (run_index, run_length, order)) - { - int index; - - while (-1 != (index = heap_delete (huffman_queue, NULL))) - { - sprintf (tmp_extname, "%08x", index); - if (remove (tmp_basename) != 0) - msg (SE, _("%s: Error removing temporary file: %s."), - tmp_basename, strerror (errno)); - } - - return 0; - } - - if (!heap_insert (huffman_queue, run_no++, total_run_length)) - { - msg (SE, _("Out of memory expanding Huffman priority queue.")); - return 0; - } - - rmax -= order - 1; + if (xsrt->run_cnt >= xsrt->run_cap) + { + xsrt->run_cap *= 2; + xsrt->initial_runs + = xrealloc (xsrt->initial_runs, + sizeof *xsrt->initial_runs * xsrt->run_cap); + } + xsrt->initial_runs[xsrt->run_cnt++] = irs->casefile; + irs->casefile = NULL; } - - /* There should be exactly one element in the priority queue after - all that merging. This represents the entire sorted active file. - So we could find a total case count by deleting this element from - the queue. */ - assert (heap_size (huffman_queue) == 1); - - return 1; } -/* Merges N_RUNS initial runs into a new run. The jth run for 0 <= j - < N_RUNS is taken from temporary file RUN_INDEX[j]; it is composed - of RUN_LENGTH[j] cases. */ -static int -merge_once (int run_index[], int run_length[], int n_runs) +/* Writes a record to the current initial run. */ +static void +output_record (struct initial_run_state *irs) { - /* For each run, the number of records remaining in its buffer. */ - int buffered[MAX_MERGE_ORDER]; - - /* For each run, the index of the next record in the buffer. */ - int buffer_ptr[MAX_MERGE_ORDER]; - - /* Open input files. */ - { - int i; - - for (i = 0; i < n_runs; i++) - if (run_index[i] != -2 && !open_handle_r (i, run_index[i])) - { - /* Close and remove temporary files. */ - while (i--) - { - close_handle (i); - sprintf (tmp_extname, "%08x", i); - if (remove (tmp_basename) != 0) - msg (SE, _("%s: Error removing temporary file: %s."), - tmp_basename, strerror (errno)); - } - - return 0; - } - } + struct record_run *record_run; + struct ccase case_tmp; + + /* Extract minimum case from heap. */ + assert (irs->record_cnt > 0); + pop_heap (irs->records, irs->record_cnt--, sizeof *irs->records, + compare_record_run_minheap, irs); + record_run = irs->records + irs->record_cnt; + + /* Bail if an error has occurred. */ + if (!irs->okay) + return; - /* Create output file. */ - if (!open_handle_w (N_INPUT_BUFFERS, run_no)) + /* Start new run if necessary. */ + assert (record_run->run == irs->run + || record_run->run == irs->run + 1); + if (record_run->run != irs->run) { - msg (SE, _("%s: Error creating temporary file for merge: %s."), - tmp_basename, strerror (errno)); - goto lossage; + end_run (irs); + start_run (irs); } + assert (record_run->run == irs->run); + irs->case_cnt++; - /* Prime each buffer. */ - { - int i; - - for (i = 0; i < n_runs; i++) - if (run_index[i] == -2) - { - n_runs = i; - break; - } - else - { - int j; - int ofs = records_per_buffer * i; - - buffered[i] = min (records_per_buffer, run_length[i]); - for (j = 0; j < buffered[i]; j++) - if ((int) fread (x[j + ofs]->record, sizeof (union value), - dict_get_value_cnt (default_dict), handle[i]) - != dict_get_value_cnt (default_dict)) - { - sprintf (tmp_extname, "%08x", run_index[i]); - if (ferror (handle[i])) - msg (SE, _("%s: Error reading temporary file in merge: %s."), - tmp_basename, strerror (errno)); - else - msg (SE, _("%s: Unexpected end of temporary file in merge."), - tmp_basename); - goto lossage; - } - buffer_ptr[i] = ofs; - run_length[i] -= buffered[i]; - } - } + /* Write to disk. */ + if (irs->casefile != NULL) + casefile_append (irs->casefile, &record_run->record); - /* Perform the merge proper. */ - while (n_runs) /* Loop while some data is left. */ - { - int i; - int min = 0; + /* This record becomes last_output. */ + irs->last_output = case_tmp = record_run->record; + record_run->record = irs->records[irs->record_cap - 1].record; + irs->records[irs->record_cap - 1].record = case_tmp; +} + +/* Merging. */ - for (i = 1; i < n_runs; i++) - if (compare_record (x[buffer_ptr[min]]->record, - x[buffer_ptr[i]]->record) > 0) - min = i; +/* State of merging initial runs. */ +struct merge_state + { + struct external_sort *xsrt; /* External sort state. */ + struct ccase *cases; /* Buffers. */ + size_t case_cnt; /* Number of buffers. */ + }; - if ((int) fwrite (x[buffer_ptr[min]]->record, sizeof (union value), - dict_get_value_cnt (default_dict), - handle[N_INPUT_BUFFERS]) - != dict_get_value_cnt (default_dict)) - { - sprintf (tmp_extname, "%08x", run_index[i]); - msg (SE, _("%s: Error writing temporary file in " - "merge: %s."), tmp_basename, strerror (errno)); - goto lossage; - } +struct run; +static struct casefile *merge_once (struct merge_state *, + struct casefile *[], size_t); +static int mod (int, int); - /* Remove one case from the buffer for this input file. */ - if (--buffered[min] == 0) - { - /* The input buffer is empty. Do any cases remain in the - initial run on disk? */ - if (run_length[min]) - { - /* Yes. Read them in. */ - - int j; - int ofs; - - /* Reset the buffer pointer. Note that we can't simply - set it to (i * records_per_buffer) since the run - order might have changed. */ - ofs = buffer_ptr[min] -= buffer_ptr[min] % records_per_buffer; - - buffered[min] = min (records_per_buffer, run_length[min]); - for (j = 0; j < buffered[min]; j++) - if ((int) fread (x[j + ofs]->record, sizeof (union value), - dict_get_value_cnt (default_dict), - handle[min]) - != dict_get_value_cnt (default_dict)) - { - sprintf (tmp_extname, "%08x", run_index[min]); - if (ferror (handle[min])) - msg (SE, _("%s: Error reading temporary file in " - "merge: %s."), - tmp_basename, strerror (errno)); - else - msg (SE, _("%s: Unexpected end of temporary file " - "in merge."), - tmp_basename); - goto lossage; - } - run_length[min] -= buffered[min]; - } - else - { - /* No. Delete this run. */ - - /* Close the file. */ - FILE *f = handle[min]; - handle[min] = NULL; - sprintf (tmp_extname, "%08x", run_index[min]); - if (fclose (f) == EOF) - msg (SE, _("%s: Error closing temporary file in merge: " - "%s."), tmp_basename, strerror (errno)); - - /* Delete the file. */ - if (remove (tmp_basename) != 0) - msg (SE, _("%s: Error removing temporary file in merge: " - "%s."), tmp_basename, strerror (errno)); - - n_runs--; - if (min != n_runs) - { - /* Since this isn't the last run, we move the last - run into its spot to force all the runs to be - contiguous. */ - run_index[min] = run_index[n_runs]; - run_length[min] = run_length[n_runs]; - buffer_ptr[min] = buffer_ptr[n_runs]; - buffered[min] = buffered[n_runs]; - handle[min] = handle[n_runs]; - } - } - } - else - buffer_ptr[min]++; - } +/* Performs a series of P-way merges of initial runs. */ +static int +merge (struct external_sort *xsrt) +{ + struct merge_state mrg; /* State of merge. */ + size_t approx_case_cost; /* Approximate memory cost of one case. */ + int max_order; /* Maximum order of merge. */ + size_t dummy_run_cnt; /* Number of dummy runs to insert. */ + int success = 0; + int i; - /* Close output file. */ - { - FILE *f = handle[N_INPUT_BUFFERS]; - handle[N_INPUT_BUFFERS] = NULL; - if (fclose (f) == EOF) + mrg.xsrt = xsrt; + + /* Allocate as many cases as possible into cases. */ + approx_case_cost = (sizeof *mrg.cases + + xsrt->value_cnt * sizeof (union value) + + 4 * sizeof (void *)); + mrg.case_cnt = get_max_workspace() / approx_case_cost; + mrg.cases = malloc (sizeof *mrg.cases * mrg.case_cnt); + if (mrg.cases == NULL) + goto done; + for (i = 0; i < mrg.case_cnt; i++) + if (!case_try_create (&mrg.cases[i], xsrt->value_cnt)) { - sprintf (tmp_extname, "%08x", run_no); - msg (SE, _("%s: Error closing temporary file in merge: " - "%s."), - tmp_basename, strerror (errno)); - return 0; + mrg.case_cnt = i; + break; } - } - - return 1; - -lossage: - /* Close all the input and output files. */ - { - int i; - - for (i = 0; i < n_runs; i++) - if (run_length[i] != 0) - { - close_handle (i); - sprintf (tmp_basename, "%08x", run_index[i]); - if (remove (tmp_basename) != 0) - msg (SE, _("%s: Error removing temporary file: %s."), - tmp_basename, strerror (errno)); - } - } - close_handle (N_INPUT_BUFFERS); - sprintf (tmp_basename, "%08x", run_no); - if (remove (tmp_basename) != 0) - msg (SE, _("%s: Error removing temporary file: %s."), - tmp_basename, strerror (errno)); - return 0; -} - -/* External sort input program. */ - -/* Reads all the records from the source stream and passes them - to write_case(). */ -static void -sort_stream_read (write_case_func *write_case, write_case_data wc_data) -{ - read_sort_output (write_case, wc_data); -} + if (mrg.case_cnt < MIN_BUFFER_TOTAL_SIZE_RECS) + { + msg (SE, _("Out of memory. Could not allocate room for minimum of %d " + "cases of %d bytes each. (PSPP workspace is currently " + "restricted to a maximum of %d KB.)"), + MIN_BUFFER_TOTAL_SIZE_RECS, approx_case_cost, get_max_workspace() / 1024); + return 0; + } -/* Reads all the records from the output stream and passes them to the - function provided, which must have an interface identical to - write_case(). */ -void -read_sort_output (write_case_func *write_case, write_case_data wc_data) -{ - int i; - FILE *f; + /* Determine maximum order of merge. */ + max_order = MAX_MERGE_ORDER; + if (mrg.case_cnt / max_order < MIN_BUFFER_SIZE_RECS) + max_order = mrg.case_cnt / MIN_BUFFER_SIZE_RECS; + else if (mrg.case_cnt / max_order * xsrt->value_cnt * sizeof (union value) + < MIN_BUFFER_SIZE_BYTES) + max_order = mrg.case_cnt / (MIN_BUFFER_SIZE_BYTES + / (xsrt->value_cnt * sizeof (union value))); + if (max_order < 2) + max_order = 2; + if (max_order > xsrt->run_cnt) + max_order = xsrt->run_cnt; - if (separate_case_tab) + /* Repeatedly merge the P shortest existing runs until only one run + is left. */ + make_heap (xsrt->initial_runs, xsrt->run_cnt, sizeof *xsrt->initial_runs, + compare_initial_runs, NULL); + dummy_run_cnt = mod (1 - (int) xsrt->run_cnt, max_order - 1); + + assert (max_order > 0); + assert (max_order <= 2 + || (xsrt->run_cnt + dummy_run_cnt) % (max_order - 1) == 1); + while (xsrt->run_cnt > 1) { - struct ccase *save_temp_case = temp_case; - struct case_list **p; + struct casefile *output_run; + int order; + int i; - for (p = separate_case_tab; *p; p++) - { - temp_case = &(*p)->c; - write_case (wc_data); - } + /* Choose order of merge (max_order after first merge). */ + order = max_order - dummy_run_cnt; + dummy_run_cnt = 0; + + /* Choose runs to merge. */ + assert (xsrt->run_cnt >= order); + for (i = 0; i < order; i++) + pop_heap (xsrt->initial_runs, xsrt->run_cnt--, + sizeof *xsrt->initial_runs, + compare_initial_runs, NULL); + + /* Merge runs. */ + output_run = merge_once (&mrg, + xsrt->initial_runs + xsrt->run_cnt, order); + if (output_run == NULL) + goto done; - free (separate_case_tab); - separate_case_tab = NULL; - - temp_case = save_temp_case; - } else { - sprintf (tmp_extname, "%08x", run_no - 1); - f = fopen (tmp_basename, "rb"); - if (!f) - { - msg (ME, _("%s: Cannot open sort result file: %s."), tmp_basename, - strerror (errno)); - err_failure (); - return; - } + /* Add output run to heap. */ + xsrt->initial_runs[xsrt->run_cnt++] = output_run; + push_heap (xsrt->initial_runs, xsrt->run_cnt, sizeof *xsrt->initial_runs, + compare_initial_runs, NULL); + } - for (i = 0; i < vfm_source_info.ncases; i++) - { - if (!fread (temp_case, vfm_source_info.case_size, 1, f)) - { - if (ferror (f)) - msg (ME, _("%s: Error reading sort result file: %s."), - tmp_basename, strerror (errno)); - else - msg (ME, _("%s: Unexpected end of sort result file: %s."), - tmp_basename, strerror (errno)); - err_failure (); - break; - } + /* Exactly one run is left, which contains the entire sorted + file. We could use it to find a total case count. */ + assert (xsrt->run_cnt == 1); - if (!write_case (wc_data)) - break; - } + success = 1; - if (fclose (f) == EOF) - msg (ME, _("%s: Error closing sort result file: %s."), tmp_basename, - strerror (errno)); + done: + for (i = 0; i < mrg.case_cnt; i++) + case_destroy (&mrg.cases[i]); + free (mrg.cases); - if (remove (tmp_basename) != 0) - msg (ME, _("%s: Error removing sort result file: %s."), tmp_basename, - strerror (errno)); - else - rmdir_temp_dir (); - } + return success; } -#if 0 /* dead code */ -/* Alternate interface to sort_stream_write used for external sorting - when SEPARATE is true. */ +/* Modulo function as defined by Knuth. */ static int -write_separate (struct ccase *c) +mod (int x, int y) { - assert (c == temp_case); + if (y == 0) + return x; + else if (x == 0) + return 0; + else if (x > 0 && y > 0) + return x % y; + else if (x < 0 && y > 0) + return y - (-x) % y; - sort_stream_write (); - return 1; + abort (); } -#endif -/* Performs one iteration of 5.4.1R steps R4, R5, R6, R7, R2, and - R3. */ -static void -sort_stream_write (void) +/* Merges the RUN_CNT initial runs specified in INPUT_RUNS into a + new run. Returns nonzero only if successful. Adds an entry + to MRG->xsrt->runs for the output file if and only if the + output file is actually created. Always deletes all the input + files. */ +static struct casefile * +merge_once (struct merge_state *mrg, + struct casefile *input_runs[], + size_t run_cnt) { - struct repl_sel_tree *t; - - /* R4. */ - memcpy (q->record, temp_case->data, vfm_sink_info.case_size); - if (compare_record (q->record, lastkey) < 0) - if (++rq > rmax) - rmax = rq; - - /* R5. */ - t = q->fe; + struct casereader *input_readers[MAX_MERGE_ORDER]; + struct ccase input_cases[MAX_MERGE_ORDER]; + struct casefile *output_casefile = NULL; + int i; - /* R6 and R7. */ - for (;;) + for (i = 0; i < run_cnt; i++) { - /* R6. */ - if (t->rn < rq - || (t->rn == rq && compare_record (t->loser->record, q->record) < 0)) - { - struct repl_sel_tree *temp_tree; - int temp_int; - - temp_tree = t->loser; - t->loser = q; - q = temp_tree; - - temp_int = t->rn; - t->rn = rq; - rq = temp_int; - } - - /* R7. */ - if (t == x[1]) - break; - t = t->fi; + input_readers[i] = casefile_get_destructive_reader (input_runs[i]); + if (!casereader_read_xfer (input_readers[i], &input_cases[i])) + { + run_cnt--; + i--; + } } + + output_casefile = casefile_create (mrg->xsrt->value_cnt); + casefile_to_disk (output_casefile); - /* R2. */ - if (rq != rc) + /* Merge. */ + while (run_cnt > 0) { - end_run (); - begin_run (); - assert (rq <= rmax); - rc = rq; + size_t min_idx; + + /* Find minimum. */ + min_idx = 0; + for (i = 1; i < run_cnt; i++) + if (compare_record (&input_cases[i], &input_cases[min_idx], + mrg->xsrt->criteria) < 0) + min_idx = i; + + /* Write minimum to output file. */ + casefile_append_xfer (output_casefile, &input_cases[min_idx]); + + if (!casereader_read_xfer (input_readers[min_idx], + &input_cases[min_idx])) + { + casereader_destroy (input_readers[min_idx]); + casefile_destroy (input_runs[min_idx]); + + run_cnt--; + input_runs[min_idx] = input_runs[run_cnt]; + input_readers[min_idx] = input_readers[run_cnt]; + input_cases[min_idx] = input_cases[run_cnt]; + } } - /* R3. */ - if (rq != 0) - { - output_record (q->record); - lastkey = x[x_max]->record; - memcpy (lastkey, q->record, vfm_sink_info.case_size); - } -} + casefile_sleep (output_casefile); -/* Switches mode from sink to source. */ -static void -sort_stream_mode (void) -{ - /* If this is not done, then we get the following source/sink pairs: - source=memory/disk/DATA LIST/etc., sink=SORT; source=SORT, - sink=SORT; which is not good. */ - vfm_sink = NULL; + return output_casefile; } - -struct case_stream sort_stream = - { - NULL, - sort_stream_read, - sort_stream_write, - sort_stream_mode, - NULL, - NULL, - "SORT", - };