X-Git-Url: https://pintos-os.org/cgi-bin/gitweb.cgi?a=blobdiff_plain;f=src%2Fsort.c;h=5f9cdb42f50e307a68980d214eabca331b0ea917;hb=b9e28aa5614a079548c616bcf97aa804024ad647;hp=e5feef94c208d545a1679d664a63c372353a42f3;hpb=4944c86a9318bc5b5578ab145a95c116ffd2c9fd;p=pspp diff --git a/src/sort.c b/src/sort.c index e5feef94c2..5f9cdb42f5 100644 --- a/src/sort.c +++ b/src/sort.c @@ -18,23 +18,25 @@ 02111-1307, USA. */ #include -#include +#include "sort.h" +#include "error.h" #include #include #include +#include "algorithm.h" #include "alloc.h" -#include "approx.h" +#include "casefile.h" #include "command.h" #include "error.h" #include "expr.h" -#include "heap.h" #include "lexer.h" #include "misc.h" -#include "sort.h" +#include "settings.h" #include "str.h" #include "var.h" #include "vfm.h" #include "vfmP.h" +#include "workspace.h" #if HAVE_UNISTD_H #include @@ -48,1338 +50,1374 @@ #include #endif -#undef DEBUGGING -/*#define DEBUGGING 1*/ #include "debug-print.h" -/* Variables to sort. */ -struct variable **v_sort; -int nv_sort; - -/* Used when internal-sorting to a separate file. */ -static struct case_list **separate_case_tab; - -/* Exported by qsort.c. */ -void blp_quicksort (void *pbase, size_t total_elems, size_t size, - int (*cmp) (const void *, const void *), - void *temp_buf); - /* Other prototypes. */ -static int compare_case_lists (const void *, const void *); -static int do_internal_sort (int separate); -static int do_external_sort (int separate); -int parse_sort_variables (void); -void read_sort_output (int (*write_case) (void)); +static int compare_record (const union value *, const union value *, + const struct sort_cases_pgm *, int *idx_to_fv); +static int compare_cases (const struct ccase *, const struct ccase *, void *); +static int compare_case_dblptrs (const void *, const void *, void *); +static struct internal_sort *do_internal_sort (struct sort_cases_pgm *, + int separate); +static void destroy_internal_sort (struct internal_sort *); +static struct external_sort *do_external_sort (struct sort_cases_pgm *, + int separate); +static void destroy_external_sort (struct external_sort *); +struct sort_cases_pgm *parse_sort (void); /* Performs the SORT CASES procedures. */ int cmd_sort_cases (void) { - /* First, just parse the command. */ - lex_match_id ("SORT"); - lex_match_id ("CASES"); + struct sort_cases_pgm *scp; + int success; + lex_match (T_BY); - if (!parse_sort_variables ()) + scp = parse_sort (); + if (scp == NULL) return CMD_FAILURE; - - cancel_temporary (); - /* Then it's time to do the actual work. There are two cases: - - (internal sort) All the data is in memory. In this case, we - perform an EXECUTE to get the data into the desired form, then - sort the cases in place, if it is still all in memory. - - (external sort) The data is not in memory. It may be coming from - a system file or other data file, etc. In any case, it is now - time to perform an external sort. This is better explained in - do_external_sort(). */ + if (temporary != 0) + { + msg (SE, _("SORT CASES may not be used after TEMPORARY. " + "Temporary transformations will be made permanent.")); + cancel_temporary (); + } - /* Do all this dirty work. */ - { - int success = sort_cases (0); - free (v_sort); - if (success) - return lex_end_of_command (); - else - return CMD_FAILURE; - } + success = sort_cases (scp, 0); + destroy_sort_cases_pgm (scp); + if (success) + return lex_end_of_command (); + else + return CMD_FAILURE; } -/* Parses a list of sort variables into v_sort and nv_sort. */ -int -parse_sort_variables (void) +/* Parses a list of sort keys and returns a struct sort_cases_pgm + based on it. Returns a null pointer on error. */ +struct sort_cases_pgm * +parse_sort (void) { - v_sort = NULL; - nv_sort = 0; + struct sort_cases_pgm *scp; + + scp = xmalloc (sizeof *scp); + scp->ref_cnt = 1; + scp->vars = NULL; + scp->dirs = NULL; + scp->var_cnt = 0; + scp->isrt = NULL; + scp->xsrt = NULL; + do { - int prev_nv_sort = nv_sort; - int order = SRT_ASCEND; + int prev_var_cnt = scp->var_cnt; + enum sort_direction direction = SRT_ASCEND; - if (!parse_variables (&default_dict, &v_sort, &nv_sort, + /* Variables. */ + if (!parse_variables (default_dict, &scp->vars, &scp->var_cnt, PV_NO_DUPLICATE | PV_APPEND | PV_NO_SCRATCH)) - return 0; + goto error; + + /* Sort direction. */ if (lex_match ('(')) { if (lex_match_id ("D") || lex_match_id ("DOWN")) - order = SRT_DESCEND; + direction = SRT_DESCEND; else if (!lex_match_id ("A") && !lex_match_id ("UP")) { - free (v_sort); msg (SE, _("`A' or `D' expected inside parentheses.")); - return 0; + goto error; } if (!lex_match (')')) { - free (v_sort); msg (SE, _("`)' expected.")); - return 0; + goto error; } } - for (; prev_nv_sort < nv_sort; prev_nv_sort++) - v_sort[prev_nv_sort]->p.srt.order = order; + scp->dirs = xrealloc (scp->dirs, sizeof *scp->dirs * scp->var_cnt); + for (; prev_var_cnt < scp->var_cnt; prev_var_cnt++) + scp->dirs[prev_var_cnt] = direction; } while (token != '.' && token != '/'); - return 1; + return scp; + + error: + destroy_sort_cases_pgm (scp); + return NULL; +} + +/* Destroys a SORT CASES program. */ +void +destroy_sort_cases_pgm (struct sort_cases_pgm *scp) +{ + if (scp != NULL) + { + assert (scp->ref_cnt > 0); + if (--scp->ref_cnt > 0) + return; + + free (scp->vars); + free (scp->dirs); + destroy_internal_sort (scp->isrt); + destroy_external_sort (scp->xsrt); + free (scp); + } } /* Sorts the active file based on the key variables specified in - global variables v_sort and nv_sort. The output is either to the - active file, if SEPARATE is zero, or to a separate file, if - SEPARATE is nonzero. In the latter case the output cases can be - read with a call to read_sort_output(). (In the former case the - output cases should be dealt with through the usual vfm interface.) + global variables vars and var_cnt. + + If SEPARATE is zero, then output goes to the active file. The + output cases can be read through the usual VFM interfaces. + + If SEPARATE is nonzero, then output goes to a separate file. + The output cases can be read with a call to + read_sort_output(). - The caller is responsible for freeing v_sort[]. */ + The caller is responsible for freeing SCP. */ int -sort_cases (int separate) +sort_cases (struct sort_cases_pgm *scp, int separate) { - assert (separate_case_tab == NULL); + scp->case_size + = sizeof (union value) * dict_get_compacted_value_cnt (default_dict); /* Not sure this is necessary but it's good to be safe. */ - if (separate && vfm_source == &sort_stream) - procedure (NULL, NULL, NULL); + if (separate && case_source_is_class (vfm_source, &sort_source_class)) + procedure (NULL, NULL); /* SORT CASES cancels PROCESS IF. */ expr_free (process_if_expr); process_if_expr = NULL; - if (do_internal_sort (separate)) - return 1; + /* Try an internal sort first. */ + scp->isrt = do_internal_sort (scp, separate); + if (scp->isrt != NULL) + return 1; - page_to_disk (); - return do_external_sort (separate); -} + /* Fall back to an external sort. */ + scp->xsrt = do_external_sort (scp, separate); + if (scp->xsrt != NULL) + return 1; -/* If a reasonable situation is set up, do an internal sort of the - data. Return success. */ -static int -do_internal_sort (int separate) -{ - if (vfm_source != &vfm_disk_stream) - { - if (vfm_source != &vfm_memory_stream) - procedure (NULL, NULL, NULL); - if (vfm_source == &vfm_memory_stream) - { - struct case_list **case_tab = malloc (sizeof *case_tab - * (vfm_source_info.ncases + 1)); - if (vfm_source_info.ncases == 0) - { - free (case_tab); - return 1; - } - if (case_tab != NULL) - { - struct case_list *clp = memory_source_cases; - struct case_list **ctp = case_tab; - int i; - - for (; clp; clp = clp->next) - *ctp++ = clp; - qsort (case_tab, vfm_source_info.ncases, sizeof *case_tab, - compare_case_lists); - - if (!separate) - { - memory_source_cases = case_tab[0]; - for (i = 1; i < vfm_source_info.ncases; i++) - case_tab[i - 1]->next = case_tab[i]; - case_tab[vfm_source_info.ncases - 1]->next = NULL; - free (case_tab); - } else { - case_tab[vfm_source_info.ncases] = NULL; - separate_case_tab = case_tab; - } - - return 1; - } - } - } + destroy_sort_cases_pgm (scp); return 0; } + +/* Results of an internal sort. + Used only for sorting to a separate file. */ +struct internal_sort + { + const struct ccase **cases; + size_t case_cnt; + }; -/* Compares the NV_SORT variables in V_SORT[] between the `case_list's - at _A and _B, and returns a strcmp()-type result. */ -static int -compare_case_lists (const void *pa, const void *pb) +/* If the data is in memory, do an internal sort. Return + success. */ +static struct internal_sort * +do_internal_sort (struct sort_cases_pgm *scp, int separate) { - struct case_list *a = *(struct case_list **) pa; - struct case_list *b = *(struct case_list **) pb; - struct variable *v; - int result = 0; - int i; + struct internal_sort *isrt; - for (i = 0; i < nv_sort; i++) + isrt = xmalloc (sizeof *isrt); + isrt->cases = NULL; + isrt->case_cnt = 0; + + if (case_source_is_class (vfm_source, &storage_source_class)) { - v = v_sort[i]; - - if (v->type == NUMERIC) - { - if (approx_ne (a->c.data[v->fv].f, b->c.data[v->fv].f)) - { - result = (a->c.data[v->fv].f > b->c.data[v->fv].f) ? 1 : -1; - break; - } - } - else - { - result = memcmp (a->c.data[v->fv].s, b->c.data[v->fv].s, v->width); - if (result != 0) - break; - } + struct casefile *casefile = storage_source_get_casefile (vfm_source); + + if (!separate) + { + if (!casefile_sort (casefile, compare_cases, scp)) + goto error; + } + else + { + /* FIXME FIXME FIXME. + This is crap because the casefile could get flushed + to disk between the time we sort it and we use it + later, causing invalid pointer accesses. + The right solution is probably to extend casefiles + to support duplication. */ + struct casereader *reader; + size_t case_idx; + + if (!casefile_in_core (casefile)) + goto error; + + isrt->case_cnt = casefile_get_case_cnt (casefile); + isrt->cases = workspace_malloc (sizeof *isrt->cases + * isrt->case_cnt); + if (isrt->cases == NULL) + goto error; + + reader = casefile_get_reader (casefile); + for (case_idx = 0; case_idx < isrt->case_cnt; case_idx++) + { + casereader_read (reader, &isrt->cases[case_idx]); + assert (isrt->cases[case_idx] != NULL); + } + casereader_destroy (reader); + + sort (isrt->cases, isrt->case_cnt, casefile_get_case_size (casefile), + compare_case_dblptrs, scp); + } + + return isrt; } + + error: + free (isrt); + return NULL; +} - if (v->p.srt.order == SRT_ASCEND) - return result; - else +/* Destroys an internal sort result. */ +static void +destroy_internal_sort (struct internal_sort *isrt) +{ + if (isrt != NULL) { - assert (v->p.srt.order == SRT_DESCEND); - return -result; + workspace_free (isrt->cases, sizeof *isrt->cases * isrt->case_cnt); + free (isrt); } } - -/* External sort. */ -/* Maximum number of input + output file handles. */ -#if defined FOPEN_MAX && (FOPEN_MAX - 5 < 18) -#define MAX_FILE_HANDLES (FOPEN_MAX - 5) -#else -#define MAX_FILE_HANDLES 18 -#endif +/* Compares the variables specified by SCP between the cases at A + and B, and returns a strcmp()-type result. */ +static int +compare_cases (const struct ccase *a, const struct ccase *b, + void *scp_) +{ + struct sort_cases_pgm *scp = scp_; -#if MAX_FILE_HANDLES < 3 -#error At least 3 file handles must be available for sorting. -#endif + return compare_record (a->data, b->data, scp, NULL); +} + +/* Compares the variables specified by SCP between the cases at A + and B, and returns a strcmp()-type result. */ +static int +compare_case_dblptrs (const void *a_, const void *b_, void *scp_) +{ + struct sort_cases_pgm *scp = scp_; + struct ccase *const *pa = a_; + struct ccase *const *pb = b_; + struct ccase *a = *pa; + struct ccase *b = *pb; -/* Number of input buffers. */ -#define N_INPUT_BUFFERS (MAX_FILE_HANDLES - 1) + return compare_record (a->data, b->data, scp, NULL); +} + +/* External sort. */ -/* Maximum order of merge. This is the value suggested by Knuth; - specifically, he said to use tree selection, which we don't - implement, for larger orders of merge. */ +/* Maximum order of merge. If you increase this, then you should + use a heap for comparing cases during merge. */ #define MAX_MERGE_ORDER 7 /* Minimum total number of records for buffers. */ #define MIN_BUFFER_TOTAL_SIZE_RECS 64 -/* Minimum single input or output buffer size, in bytes and records. */ +/* Minimum single input buffer size, in bytes and records. */ #define MIN_BUFFER_SIZE_BYTES 4096 #define MIN_BUFFER_SIZE_RECS 16 -/* Structure for replacement selection tree. */ -struct repl_sel_tree +#if MIN_BUFFER_SIZE_RECS * 2 + 16 > MIN_BUFFER_TOTAL_SIZE_RECS +#error MIN_BUFFER_SIZE_RECS and MIN_BUFFER_TOTAL_SIZE_RECS do not make sense. +#endif + +/* An initial run and its length. */ +struct initial_run { - struct repl_sel_tree *loser;/* Loser associated w/this internal node. */ - int rn; /* Run number of `loser'. */ - struct repl_sel_tree *fe; /* Internal node above this external node. */ - struct repl_sel_tree *fi; /* Internal node above this internal node. */ - union value record[1]; /* The case proper. */ + int file_idx; /* File index. */ + size_t case_cnt; /* Number of cases. */ }; -/* Static variables used for sorting. */ -static struct repl_sel_tree **x; /* Buffers. */ -static int x_max; /* Size of buffers, in records. */ -static int records_per_buffer; /* Number of records in each buffer. */ - -/* In the merge phase, the first N_INPUT_BUFFERS handle[] elements are - input files and the last element is the output file. Before that, - they're all used as output files, although the last one is - segregated. */ -static FILE *handle[MAX_FILE_HANDLES]; /* File handles. */ - -/* Now, MAX_FILE_HANDLES is the maximum number of files we will *try* - to open. But if we can't open that many, max_handles will be set - to the number we apparently can open. */ -static int max_handles; /* Maximum number of handles. */ - -/* When we create temporary files, they are all put in the same - directory and numbered sequentially from zero. tmp_basename is the - drive/directory, etc., and tmp_extname can be sprintf() with "%08x" - to the file number, then tmp_basename used to open the file. */ -static char *tmp_basename; /* Temporary file basename. */ -static char *tmp_extname; /* Temporary file extension name. */ - -/* We use Huffman's method to determine the merge pattern. This means - that we need to know which runs are the shortest at any given time. - Priority queues as implemented by heap.c are a natural for this - task (probably because I wrote the code specifically for it). */ -static struct heap *huffman_queue; /* Huffman priority queue. */ - -/* Prototypes for helper functions. */ -static void sort_stream_write (void); -static int write_initial_runs (int separate); -static int allocate_cases (void); -static int allocate_file_handles (void); -static int merge (void); -static void rmdir_temp_dir (void); - -/* Performs an external sort of the active file. A description of the - procedure follows. All page references refer to Knuth's _Art of - Computer Programming, Vol. 3: Sorting and Searching_, which is the - canonical resource for sorting. - - 1. The data is read and S initial runs are formed through the - action of algorithm 5.4.1R (replacement selection). - - 2. Huffman's method (p. 365-366) is used to determine the optimum - merge pattern. - - 3. If an OS that supports overlapped reading, writing, and - computing is being run, we should use 5.4.6F for forecasting. - Otherwise, buffers are filled only when they run out of data. - FIXME: Since the author of PSPP uses GNU/Linux, which does not - yet implement overlapped r/w/c, 5.4.6F is not used. - - 4. We perform P-way merges: - - (a) The desired P is the smallest P such that ceil(ln(S)/ln(P)) - is minimized. (FIXME: Since I don't have an algorithm for - minimizing this, it's just set to MAX_MERGE_ORDER.) - - (b) P is reduced if the selected value would make input buffers - less than 4096 bytes each, or 16 records, whichever is larger. - - (c) P is reduced if we run out of available file handles or space - for file handles. - - (d) P is reduced if we don't have space for one or two output - buffers, which have the same minimum size as input buffers. (We - need two output buffers if 5.4.6F is in use for forecasting.) */ +/* Sorts initial runs A and B in decending order by length. */ static int -do_external_sort (int separate) +compare_initial_runs (const void *a_, const void *b_, void *aux UNUSED) { - int success = 0; - - assert (MAX_FILE_HANDLES >= 3); - - x = NULL; - tmp_basename = NULL; - - huffman_queue = heap_create (512); - if (huffman_queue == NULL) - return 0; + const struct initial_run *a = a_; + const struct initial_run *b = b_; + + return a->case_cnt > b->case_cnt ? -1 : a->case_cnt case_cnt; +} - if (!allocate_cases ()) - goto lossage; +/* Results of an external sort. */ +struct external_sort + { + struct sort_cases_pgm *scp; /* SORT CASES info. */ + struct initial_run *initial_runs; /* Array of initial runs. */ + size_t run_cnt, run_cap; /* Number of runs, allocated capacity. */ + char *temp_dir; /* Temporary file directory name. */ + char *temp_name; /* Name of a temporary file. */ + int next_file_idx; /* Lowest unused file index. */ + }; - if (!allocate_file_handles ()) - goto lossage; +/* Prototypes for helper functions. */ +static void sort_sink_write (struct case_sink *, const struct ccase *); +static int write_initial_runs (struct external_sort *, int separate); +static int init_external_sort (struct external_sort *); +static int merge (struct external_sort *); +static void rmdir_temp_dir (struct external_sort *); +static void remove_temp_file (struct external_sort *xsrt, int file_idx); + +/* Performs an external sort of the active file according to the + specification in SCP. Forms initial runs using a heap as a + reservoir. Determines the optimum merge pattern via Huffman's + method (see Knuth vol. 3, 2nd edition, p. 365-366), and merges + according to that pattern. */ +static struct external_sort * +do_external_sort (struct sort_cases_pgm *scp, int separate) +{ + struct external_sort *xsrt; + int success = 0; - if (!write_initial_runs (separate)) - goto lossage; + if (vfm_source != NULL + && case_source_is_class (vfm_source, &storage_source_class)) + casefile_to_disk (storage_source_get_casefile (vfm_source)); - merge (); + xsrt = xmalloc (sizeof *xsrt); + xsrt->scp = scp; + if (!init_external_sort (xsrt)) + goto done; + if (!write_initial_runs (xsrt, separate)) + goto done; + if (!merge (xsrt)) + goto done; success = 1; - /* Despite the name, flow of control comes here regardless of - whether or not the sort is successful. */ -lossage: - heap_destroy (huffman_queue); - - if (x) + done: + if (success) { - int i; - - for (i = 0; i <= x_max; i++) - free (x[i]); - free (x); + /* Don't destroy anything because we'll need it for reading + the output. */ + return xsrt; + } + else + { + destroy_external_sort (xsrt); + return NULL; } - - if (!success) - rmdir_temp_dir (); - - return success; } -#if !HAVE_GETPID -#define getpid() (0) -#endif - -/* Sets up to open temporary files. */ -/* PORTME: This creates a directory for temporary files. Some OSes - might not have that concept... */ -static int -allocate_file_handles (void) +/* Destroys XSRT. */ +static void +destroy_external_sort (struct external_sort *xsrt) { - const char *dir; /* Directory prefix. */ - char *buf; /* String buffer. */ - char *cp; /* Pointer into buf. */ - - dir = getenv ("SPSSTMPDIR"); - if (dir == NULL) - dir = getenv ("SPSSXTMPDIR"); - if (dir == NULL) - dir = getenv ("TMPDIR"); -#ifdef P_tmpdir - if (dir == NULL) - dir = P_tmpdir; -#endif -#if __unix__ - if (dir == NULL) - dir = "/tmp"; -#elif __MSDOS__ - if (dir == NULL) - dir = getenv ("TEMP"); - if (dir == NULL) - dir = getenv ("TMP"); - if (dir == NULL) - dir = "\\"; -#else - dir = ""; -#endif - - buf = xmalloc (strlen (dir) + 1 + 4 + 8 + 4 + 1 + INT_DIGITS + 1); - cp = spprintf (buf, "%s%c%04lX%04lXpspp", dir, DIR_SEPARATOR, - ((long) time (0)) & 0xffff, ((long) getpid ()) & 0xffff); - if (-1 == mkdir (buf, S_IRWXU)) + if (xsrt != NULL) { - free (buf); - msg (SE, _("%s: Cannot create temporary directory: %s."), - buf, strerror (errno)); - return 0; + int i; + + for (i = 0; i < xsrt->run_cnt; i++) + remove_temp_file (xsrt, xsrt->initial_runs[i].file_idx); + rmdir_temp_dir (xsrt); + free (xsrt->temp_dir); + free (xsrt->temp_name); + free (xsrt->initial_runs); + free (xsrt); } - *cp++ = DIR_SEPARATOR; - - tmp_basename = buf; - tmp_extname = cp; - - max_handles = MAX_FILE_HANDLES; - - return 1; } -/* Removes the directory created for temporary files, if one exists. - Also frees tmp_basename. */ -static void -rmdir_temp_dir (void) +#ifdef HAVE_MKDTEMP +/* Creates and returns the name of a temporary directory. */ +static char * +make_temp_dir (void) { - if (NULL == tmp_basename) - return; + const char *parent_dir; + char *temp_dir; - tmp_extname[-1] = '\0'; - if (rmdir (tmp_basename) == -1) - msg (SE, _("%s: Error removing directory for temporary files: %s."), - tmp_basename, strerror (errno)); + if (getenv ("TMPDIR") != NULL) + parent_dir = getenv ("TMPDIR"); + else + parent_dir = P_tmpdir; - free (tmp_basename); + temp_dir = xmalloc (strlen (parent_dir) + 32); + sprintf (temp_dir, "%s%cpsppXXXXXX", parent_dir, DIR_SEPARATOR); + if (mkdtemp (temp_dir) == NULL) + { + msg (SE, _("%s: Creating temporary directory: %s."), + temp_dir, strerror (errno)); + free (temp_dir); + return NULL; + } + else + return temp_dir; } - -/* Allocates room for lots of cases as a buffer. */ +#else /* !HAVE_MKDTEMP */ +/* Creates directory DIR. */ static int -allocate_cases (void) +do_mkdir (const char *dir) { - /* This is the size of one case. */ - const int case_size = (sizeof (struct repl_sel_tree) - + sizeof (union value) * (default_dict.nval - 1) - + sizeof (struct repl_sel_tree *)); - - x = NULL; - - /* Allocate as many cases as we can, assuming a space of four - void pointers for malloc()'s internal bookkeeping. */ - x_max = MAX_WORKSPACE / (case_size + 4 * sizeof (void *)); - x = malloc (sizeof (struct repl_sel_tree *) * x_max); - if (x != NULL) - { - int i; +#ifndef __MSDOS__ + return mkdir (dir, S_IRWXU); +#else + return mkdir (dir); +#endif +} - for (i = 0; i < x_max; i++) - { - x[i] = malloc (sizeof (struct repl_sel_tree) - + sizeof (union value) * (default_dict.nval - 1)); - if (x[i] == NULL) - break; - } - x_max = i; - } - if (x == NULL || x_max < MIN_BUFFER_TOTAL_SIZE_RECS) +/* Creates and returns the name of a temporary directory. */ +static char * +make_temp_dir (void) +{ + int i; + + for (i = 0; i < 100; i++) { - if (x != NULL) - { - int i; - - for (i = 0; i < x_max; i++) - free (x[i]); - } - free (x); - msg (SE, _("Out of memory. Could not allocate room for minimum of %d " - "cases of %d bytes each. (PSPP workspace is currently " - "restricted to a maximum of %d KB.)"), - MIN_BUFFER_TOTAL_SIZE_RECS, case_size, MAX_WORKSPACE / 1024); - x_max = 0; - x = NULL; - return 0; + char temp_dir[L_tmpnam + 1]; + if (tmpnam (temp_dir) == NULL) + { + msg (SE, _("Generating temporary directory name failed: %s."), + strerror (errno)); + return NULL; + } + else if (do_mkdir (temp_dir) == 0) + return xstrdup (temp_dir); } + + msg (SE, _("Creating temporary directory failed: %s."), strerror (errno)); + return NULL; +} +#endif /* !HAVE_MKDTEMP */ - /* The last element of the array is used to store lastkey. */ - x_max--; +/* Sets up to open temporary files. */ +static int +init_external_sort (struct external_sort *xsrt) +{ + /* Zero. */ + xsrt->temp_dir = NULL; + xsrt->next_file_idx = 0; + + /* Huffman queue. */ + xsrt->run_cap = 512; + xsrt->run_cnt = 0; + xsrt->initial_runs = xmalloc (sizeof *xsrt->initial_runs * xsrt->run_cap); + + /* Temporary directory. */ + xsrt->temp_dir = make_temp_dir (); + xsrt->temp_name = NULL; + if (xsrt->temp_dir == NULL) + return 0; + xsrt->temp_name = xmalloc (strlen (xsrt->temp_dir) + 64); - debug_printf ((_("allocated %d cases == %d bytes\n"), - x_max, x_max * case_size)); return 1; } - -/* Replacement selection. */ -static int rmax, rc, rq; -static struct repl_sel_tree *q; -static union value *lastkey; -static int run_no, file_index; -static int deferred_abort; -static int run_length; - -static int compare_record (union value *, union value *); - -static inline void -output_record (union value *v) +/* Returns nonzero if we should return an error even though the + operation succeeded. Useful for testing. */ +static int +simulate_error (void) { - union value *src_case; + static int op_err_cnt = -1; + static int op_cnt; - if (deferred_abort) - return; - - if (compaction_necessary) - { - compact_case (compaction_case, (struct ccase *) v); - src_case = (union value *) compaction_case; - } + if (op_err_cnt == -1 || op_cnt++ < op_err_cnt) + return 0; else - src_case = (union value *) v; - - if ((int) fwrite (src_case, sizeof *src_case, compaction_nval, - handle[file_index]) - != compaction_nval) { - deferred_abort = 1; - sprintf (tmp_extname, "%08x", run_no); - msg (SE, _("%s: Error writing temporary file: %s."), - tmp_basename, strerror (errno)); - return; + errno = 0; + return 1; } - - run_length++; } -static int -close_handle (int i) +/* Removes the directory created for temporary files, if one + exists. */ +static void +rmdir_temp_dir (struct external_sort *xsrt) { - int result = fclose (handle[i]); - msg (VM (2), _("SORT: Closing handle %d."), i); - - handle[i] = NULL; - if (EOF == result) + if (xsrt->temp_dir != NULL && rmdir (xsrt->temp_dir) == -1) { - sprintf (tmp_extname, "%08x", i); - msg (SE, _("%s: Error closing temporary file: %s."), - tmp_basename, strerror (errno)); - return 0; + msg (SW, _("%s: Error removing directory for temporary files: %s."), + xsrt->temp_dir, strerror (errno)); + xsrt->temp_dir = NULL; } - return 1; } -static int -close_handles (int beg, int end) +/* Returns the name of temporary file number FILE_IDX for XSRT. + The name is written into a static buffer, so be careful. */ +static char * +get_temp_file_name (struct external_sort *xsrt, int file_idx) { - int success = 1; - int i; - - for (i = beg; i < end; i++) - success &= close_handle (i); - return success; + assert (xsrt->temp_dir != NULL); + sprintf (xsrt->temp_name, "%s%c%04d", + xsrt->temp_dir, DIR_SEPARATOR, file_idx); + return xsrt->temp_name; } -static int -open_handle_w (int handle_no, int run_no) +/* Opens temporary file numbered FILE_IDX for XSRT with mode MODE + and returns the FILE *. */ +static FILE * +open_temp_file (struct external_sort *xsrt, int file_idx, const char *mode) { - sprintf (tmp_extname, "%08x", run_no); - msg (VM (1), _("SORT: %s: Opening for writing as run %d."), - tmp_basename, run_no); - - /* The `x' modifier causes the GNU C library to insist on creating a - new file--if the file already exists, an error is signaled. The - ANSI C standard says that other libraries should ignore anything - after the `w+b', so it shouldn't be a problem. */ - return NULL != (handle[handle_no] = fopen (tmp_basename, "w+bx")); + char *temp_file; + FILE *file; + + temp_file = get_temp_file_name (xsrt, file_idx); + + file = fopen (temp_file, mode); + if (simulate_error () || file == NULL) + msg (SE, _("%s: Error opening temporary file for %s: %s."), + temp_file, mode[0] == 'r' ? "reading" : "writing", + strerror (errno)); + + return file; } +/* Closes FILE, which is the temporary file numbered FILE_IDX + under XSRT. Returns nonzero only if successful. */ static int -open_handle_r (int handle_no, int run_no) +close_temp_file (struct external_sort *xsrt, int file_idx, FILE *file) { - FILE *f; - - sprintf (tmp_extname, "%08x", run_no); - msg (VM (1), _("SORT: %s: Opening for writing as run %d."), - tmp_basename, run_no); - f = handle[handle_no] = fopen (tmp_basename, "rb"); - - if (f == NULL) + if (file != NULL) { - msg (SE, _("%s: Error opening temporary file for reading: %s."), - tmp_basename, strerror (errno)); - return 0; + char *temp_file = get_temp_file_name (xsrt, file_idx); + if (simulate_error () || fclose (file) == EOF) + { + msg (SE, _("%s: Error closing temporary file: %s."), + temp_file, strerror (errno)); + return 0; + } } - return 1; } -/* Begins a new initial run, specifically its output file. */ +/* Delete temporary file numbered FILE_IDX for XSRT. */ static void -begin_run (void) +remove_temp_file (struct external_sort *xsrt, int file_idx) { - /* Decide which handle[] to use. If run_no is max_handles or - greater, then we've run out of handles so it's time to just do - one file at a time, which by default is handle 0. */ - file_index = (run_no < max_handles ? run_no : 0); - run_length = 0; - - /* Alright, now create the temporary file. */ - if (open_handle_w (file_index, run_no) == 0) + if (file_idx != -1) { - /* Failure to create the temporary file. Check if there are - unacceptably few files already open. */ - if (file_index < 3) - { - deferred_abort = 1; - msg (SE, _("%s: Error creating temporary file: %s."), - tmp_basename, strerror (errno)); - return; - } - - /* Close all the open temporary files. */ - if (!close_handles (0, file_index)) - return; - - /* Now try again to create the temporary file. */ - max_handles = file_index; - file_index = 0; - if (open_handle_w (0, run_no) == 0) - { - /* It still failed, report it this time. */ - deferred_abort = 1; - msg (SE, _("%s: Error creating temporary file: %s."), - tmp_basename, strerror (errno)); - return; - } + char *temp_file = get_temp_file_name (xsrt, file_idx); + if (simulate_error () || remove (temp_file) != 0) + msg (SW, _("%s: Error removing temporary file: %s."), + temp_file, strerror (errno)); } } -/* Ends the current initial run. Just increments run_no if no initial - run has been started yet. */ -static void -end_run (void) +/* Writes SIZE bytes from buffer DATA into FILE, which is + temporary file numbered FILE_IDX from XSRT. */ +static int +write_temp_file (struct external_sort *xsrt, int file_idx, + FILE *file, const void *data, size_t size) { - /* Close file handles if necessary. */ - { - int result; - - if (run_no == max_handles - 1) - result = close_handles (0, max_handles); - else if (run_no >= max_handles) - result = close_handle (0); - else - result = 1; - if (!result) - deferred_abort = 1; - } - - /* Advance to next run. */ - run_no++; - if (run_no) - heap_insert (huffman_queue, run_no - 1, run_length); + if (!simulate_error () && fwrite (data, size, 1, file) == 1) + return 1; + else + { + char *temp_file = get_temp_file_name (xsrt, file_idx); + msg (SE, _("%s: Error writing temporary file: %s."), + temp_file, strerror (errno)); + return 0; + } } -/* Performs 5.4.1R. */ +/* Reads SIZE bytes into buffer DATA into FILE, which is + temporary file numbered FILE_IDX from XSRT. */ static int -write_initial_runs (int separate) +read_temp_file (struct external_sort *xsrt, int file_idx, + FILE *file, void *data, size_t size) { - run_no = -1; - deferred_abort = 0; - - /* Steps R1, R2, R3. */ - rmax = 0; - rc = 0; - lastkey = NULL; - q = x[0]; - rq = 0; + if (!simulate_error () && fread (data, size, 1, file) == 1) + return 1; + else + { + char *temp_file = get_temp_file_name (xsrt, file_idx); + if (ferror (file)) + msg (SE, _("%s: Error reading temporary file: %s."), + temp_file, strerror (errno)); + else + msg (SE, _("%s: Unexpected end of temporary file."), + temp_file); + return 0; + } +} + +/* Replacement selection. */ + +/* Pairs a record with a run number. */ +struct record_run { - int j; + int run; /* Run number of case. */ + struct case_list *record; /* Case data. */ + }; - for (j = 0; j < x_max; j++) - { - struct repl_sel_tree *J = x[j]; +/* Represents a set of initial runs during an external sort. */ +struct initial_run_state + { + struct external_sort *xsrt; + + int *idx_to_fv; /* Translation table copied from sink. */ + + /* Reservoir. */ + struct record_run *records; /* Records arranged as a heap. */ + size_t record_cnt; /* Current number of records. */ + size_t record_cap; /* Capacity for records. */ + struct case_list *free_list;/* Cases not in heap. */ + + /* Run currently being output. */ + int file_idx; /* Temporary file number. */ + size_t case_cnt; /* Number of cases so far. */ + FILE *output_file; /* Output file. */ + struct case_list *last_output;/* Record last output. */ + + int okay; /* Zero if an error has been encountered. */ + }; - J->loser = J; - J->rn = 0; - J->fe = x[(x_max + j) / 2]; - J->fi = x[j / 2]; - memset (J->record, 0, default_dict.nval * sizeof (union value)); - } - } +static const struct case_sink_class sort_sink_class; + +static void destroy_initial_run_state (struct initial_run_state *irs); +static int allocate_cases (struct initial_run_state *); +static struct case_list *grab_case (struct initial_run_state *); +static void release_case (struct initial_run_state *, struct case_list *); +static void output_record (struct initial_run_state *irs); +static void start_run (struct initial_run_state *irs); +static void end_run (struct initial_run_state *irs); +static int compare_record_run (const struct record_run *, + const struct record_run *, + struct initial_run_state *); +static int compare_record_run_minheap (const void *, const void *, void *); + +/* Writes initial runs for XSRT, sending them to a separate file + if SEPARATE is nonzero. */ +static int +write_initial_runs (struct external_sort *xsrt, int separate) +{ + struct initial_run_state *irs; + int success = 0; - /* Most of the iterations of steps R4, R5, R6, R7, R2, R3, ... */ + /* Allocate memory for cases. */ + irs = xmalloc (sizeof *irs); + irs->xsrt = xsrt; + irs->records = NULL; + irs->record_cnt = irs->record_cap = 0; + irs->free_list = NULL; + irs->output_file = NULL; + irs->last_output = NULL; + irs->file_idx = 0; + irs->case_cnt = 0; + irs->okay = 1; + if (!allocate_cases (irs)) + goto done; + + /* Create case sink. */ if (!separate) { - if (vfm_sink) - vfm_sink->destroy_sink (); - vfm_sink = &sort_stream; + if (vfm_sink != NULL && vfm_sink->class->destroy != NULL) + vfm_sink->class->destroy (vfm_sink); + vfm_sink = create_case_sink (&sort_sink_class, default_dict, irs); + xsrt->scp->ref_cnt++; } - procedure (NULL, NULL, NULL); - /* Final iterations of steps R4, R5, R6, R7, R2, R3, ... */ - for (;;) - { - struct repl_sel_tree *t; + /* Create initial runs. */ + start_run (irs); + procedure (NULL, NULL); + irs->idx_to_fv = NULL; + while (irs->record_cnt > 0 && irs->okay) + output_record (irs); + end_run (irs); - /* R4. */ - rq = rmax + 1; + success = irs->okay; - /* R5. */ - t = q->fe; + done: + destroy_initial_run_state (irs); - /* R6 and R7. */ - for (;;) - { - /* R6. */ - if (t->rn < rq - || (t->rn == rq - && compare_record (t->loser->record, q->record) < 0)) - { - struct repl_sel_tree *temp_tree; - int temp_int; + return success; +} - temp_tree = t->loser; - t->loser = q; - q = temp_tree; +/* Add a single case to an initial run. */ +static void +sort_sink_write (struct case_sink *sink, const struct ccase *c) +{ + struct initial_run_state *irs = sink->aux; + struct record_run *new_record_run; - temp_int = t->rn; - t->rn = rq; - rq = temp_int; - } + if (!irs->okay) + return; - /* R7. */ - if (t == x[1]) - break; - t = t->fi; - } + irs->idx_to_fv = sink->idx_to_fv; + + /* Compose record_run for this run and add to heap. */ + assert (irs->record_cnt < irs->record_cap); + new_record_run = irs->records + irs->record_cnt++; + new_record_run->record = grab_case (irs); + memcpy (new_record_run->record->c.data, c->data, irs->xsrt->scp->case_size); + new_record_run->run = irs->file_idx; + if (irs->last_output != NULL + && compare_record (c->data, irs->last_output->c.data, + irs->xsrt->scp, sink->idx_to_fv) < 0) + new_record_run->run = irs->xsrt->next_file_idx; + push_heap (irs->records, irs->record_cnt, sizeof *irs->records, + compare_record_run_minheap, irs); + + /* Output a record if the reservoir is full. */ + if (irs->record_cnt == irs->record_cap && irs->okay) + output_record (irs); +} - /* R2. */ - if (rq != rc) - { - end_run (); - if (rq > rmax) - break; - begin_run (); - rc = rq; - } +/* Destroys the initial run state represented by IRS. */ +static void +destroy_initial_run_state (struct initial_run_state *irs) +{ + struct case_list *iter, *next; + int i; - /* R3. */ - if (rq != 0) - { - output_record (q->record); - lastkey = x[x_max]->record; - memcpy (lastkey, q->record, sizeof (union value) * vfm_sink_info.nval); - } - } - assert (run_no == rmax); + if (irs == NULL) + return; - /* If an unrecoverable error occurred somewhere in the above code, - then the `deferred_abort' flag would have been set. */ - if (deferred_abort) + /* Release cases to free list. */ + for (i = 0; i < irs->record_cnt; i++) + release_case (irs, irs->records[i].record); + if (irs->last_output != NULL) + release_case (irs, irs->last_output); + + /* Free cases in free list. */ + for (iter = irs->free_list; iter != NULL; iter = next) { - int i; + next = iter->next; + free (iter); + } - for (i = 0; i < max_handles; i++) - if (handle[i] != NULL) - { - sprintf (tmp_extname, "%08x", i); + free (irs->records); + if (irs->output_file != NULL) + close_temp_file (irs->xsrt, irs->file_idx, irs->output_file); - if (fclose (handle[i]) == EOF) - msg (SE, _("%s: Error closing temporary file: %s."), - tmp_basename, strerror (errno)); + free (irs); +} - if (remove (tmp_basename) != 0) - msg (SE, _("%s: Error removing temporary file: %s."), - tmp_basename, strerror (errno)); +/* Allocates room for lots of cases as a buffer. */ +static int +allocate_cases (struct initial_run_state *irs) +{ + int approx_case_cost; /* Approximate memory cost of one case in bytes. */ + int max_cases; /* Maximum number of cases to allocate. */ + int i; - handle[i] = NULL; - } - return 0; + /* Allocate as many cases as we can within the workspace + limit. */ + approx_case_cost = (sizeof *irs->records + + sizeof *irs->free_list + + irs->xsrt->scp->case_size + + 4 * sizeof (void *)); + max_cases = get_max_workspace() / approx_case_cost; + irs->records = malloc (sizeof *irs->records * max_cases); + for (i = 0; i < max_cases; i++) + { + struct case_list *c; + c = malloc (sizeof *c + + irs->xsrt->scp->case_size + - sizeof (union value)); + if (c == NULL) + { + max_cases = i; + break; + } + release_case (irs, c); } + /* irs->records gets all but one of the allocated cases. + The extra is used for last_output. */ + irs->record_cap = max_cases - 1; + + /* Fail if we didn't allocate an acceptable number of cases. */ + if (irs->records == NULL || max_cases < MIN_BUFFER_TOTAL_SIZE_RECS) + { + msg (SE, _("Out of memory. Could not allocate room for minimum of %d " + "cases of %d bytes each. (PSPP workspace is currently " + "restricted to a maximum of %d KB.)"), + MIN_BUFFER_TOTAL_SIZE_RECS, approx_case_cost, get_max_workspace() / 1024); + return 0; + } return 1; } -/* Compares the NV_SORT variables in V_SORT[] between the `value's at +/* Compares the VAR_CNT variables in VARS[] between the `value's at A and B, and returns a strcmp()-type result. */ static int -compare_record (union value * a, union value * b) +compare_record (const union value *a, const union value *b, + const struct sort_cases_pgm *scp, + int *idx_to_fv) { int i; - int result = 0; - struct variable *v; assert (a != NULL); - if (b == NULL) /* Sort NULLs after everything else. */ - return -1; - - for (i = 0; i < nv_sort; i++) + assert (b != NULL); + + for (i = 0; i < scp->var_cnt; i++) { - v = v_sort[i]; + struct variable *v = scp->vars[i]; + int fv; + int result; + if (idx_to_fv != NULL) + fv = idx_to_fv[v->index]; + else + fv = v->fv; + if (v->type == NUMERIC) - { - if (approx_ne (a[v->fv].f, b[v->fv].f)) - { - result = (a[v->fv].f > b[v->fv].f) ? 1 : -1; - break; - } - } + { + double af = a[fv].f; + double bf = b[fv].f; + + result = af < bf ? -1 : af > bf; + } else - { - result = memcmp (a[v->fv].s, b[v->fv].s, v->width); - if (result != 0) - break; - } + result = memcmp (a[fv].s, b[fv].s, v->width); + + if (result != 0) + { + if (scp->dirs[i] == SRT_DESCEND) + result = -result; + return result; + } } - if (v->p.srt.order == SRT_ASCEND) - return result; + return 0; +} + +/* Compares record-run tuples A and B on run number first, then + on the current record according to SCP. */ +static int +compare_record_run (const struct record_run *a, + const struct record_run *b, + struct initial_run_state *irs) +{ + if (a->run != b->run) + return a->run > b->run ? 1 : -1; else + return compare_record (a->record->c.data, b->record->c.data, + irs->xsrt->scp, irs->idx_to_fv); +} + +/* Compares record-run tuples A and B on run number first, then + on the current record according to SCP, but in descending + order. */ +static int +compare_record_run_minheap (const void *a, const void *b, void *irs) +{ + return -compare_record_run (a, b, irs); +} + +/* Begins a new initial run, specifically its output file. */ +static void +start_run (struct initial_run_state *irs) +{ + irs->file_idx = irs->xsrt->next_file_idx++; + irs->case_cnt = 0; + irs->output_file = open_temp_file (irs->xsrt, irs->file_idx, "wb"); + if (irs->output_file == NULL) + irs->okay = 0; + if (irs->last_output != NULL) { - assert (v->p.srt.order == SRT_DESCEND); - return -result; + release_case (irs, irs->last_output); + irs->last_output = NULL; } } - -/* Merging. */ -static int merge_once (int run_index[], int run_length[], int n_runs); +/* Ends the current initial run. */ +static void +end_run (struct initial_run_state *irs) +{ + struct external_sort *xsrt = irs->xsrt; + + /* Record initial run. */ + if (xsrt->run_cnt >= xsrt->run_cap) + { + xsrt->run_cap *= 2; + xsrt->initial_runs + = xrealloc (xsrt->initial_runs, + sizeof *xsrt->initial_runs * xsrt->run_cap); + } + xsrt->initial_runs[xsrt->run_cnt].file_idx = irs->file_idx; + xsrt->initial_runs[xsrt->run_cnt].case_cnt = irs->case_cnt; + xsrt->run_cnt++; + + /* Close file handle. */ + if (irs->output_file != NULL + && !close_temp_file (irs->xsrt, irs->file_idx, irs->output_file)) + irs->okay = 0; + irs->output_file = NULL; +} -/* Modula function as defined by Knuth. */ -static int -mod (int x, int y) +/* Writes a record to the current initial run. */ +static void +output_record (struct initial_run_state *irs) { - int result; + struct record_run *record_run; + + /* Extract minimum case from heap. */ + assert (irs->record_cnt > 0); + pop_heap (irs->records, irs->record_cnt--, sizeof *irs->records, + compare_record_run_minheap, irs); + record_run = irs->records + irs->record_cnt; + + /* Bail if an error has occurred. */ + if (!irs->okay) + return; - if (y == 0) - return x; - result = abs (x) % abs (y); - if (y < 0) - result = -result; - return result; + /* Start new run if necessary. */ + assert (record_run->run == irs->file_idx + || record_run->run == irs->xsrt->next_file_idx); + if (record_run->run != irs->file_idx) + { + end_run (irs); + start_run (irs); + } + assert (record_run->run == irs->file_idx); + irs->case_cnt++; + + /* Write to disk. */ + if (irs->output_file != NULL + && !write_temp_file (irs->xsrt, irs->file_idx, irs->output_file, + &record_run->record->c, irs->xsrt->scp->case_size)) + irs->okay = 0; + + /* This record becomes last_output. */ + if (irs->last_output != NULL) + release_case (irs, irs->last_output); + irs->last_output = record_run->record; +} + +/* Gets a case from the free list in IRS. It is an error to call + this function if the free list is empty. */ +static struct case_list * +grab_case (struct initial_run_state *irs) +{ + struct case_list *c; + + assert (irs != NULL); + assert (irs->free_list != NULL); + + c = irs->free_list; + irs->free_list = c->next; + return c; } -/* Performs a series of P-way merges of initial runs using Huffman's +/* Returns C to the free list in IRS. */ +static void +release_case (struct initial_run_state *irs, struct case_list *c) +{ + assert (irs != NULL); + assert (c != NULL); + + c->next = irs->free_list; + irs->free_list = c; +} + +/* Merging. */ + +/* State of merging initial runs. */ +struct merge_state + { + struct external_sort *xsrt; /* External sort state. */ + struct ccase **cases; /* Buffers. */ + size_t case_cnt; /* Number of buffers. */ + }; + +struct run; +static int merge_once (struct merge_state *, + const struct initial_run[], size_t, + struct initial_run *); +static int fill_run_buffer (struct merge_state *, struct run *); +static int mod (int, int); + +/* Performs a series of P-way merges of initial runs method. */ static int -merge (void) +merge (struct external_sort *xsrt) { - /* Order of merge. */ - int order; + struct merge_state mrg; /* State of merge. */ + size_t approx_case_cost; /* Approximate memory cost of one case. */ + int max_order; /* Maximum order of merge. */ + size_t dummy_run_cnt; /* Number of dummy runs to insert. */ + int success = 0; + int i; - /* Idiot check. */ - assert (MIN_BUFFER_SIZE_RECS * 2 <= MIN_BUFFER_TOTAL_SIZE_RECS - 1); + mrg.xsrt = xsrt; - /* Close all the input files. I hope that the boundary conditions - are correct on this but I'm not sure. */ - if (run_no < max_handles) + /* Allocate as many cases as possible into cases. */ + approx_case_cost = (sizeof *mrg.cases + + xsrt->scp->case_size + 4 * sizeof (void *)); + mrg.case_cnt = get_max_workspace() / approx_case_cost; + mrg.cases = malloc (sizeof *mrg.cases * mrg.case_cnt); + if (mrg.cases == NULL) + goto done; + for (i = 0; i < mrg.case_cnt; i++) { - int i; - - for (i = 0; i < run_no; ) - if (!close_handle (i++)) - { - for (; i < run_no; i++) - close_handle (i); - return 0; - } + mrg.cases[i] = malloc (xsrt->scp->case_size); + if (mrg.cases[i] == NULL) + { + mrg.case_cnt = i; + break; + } + } + if (mrg.case_cnt < MIN_BUFFER_TOTAL_SIZE_RECS) + { + msg (SE, _("Out of memory. Could not allocate room for minimum of %d " + "cases of %d bytes each. (PSPP workspace is currently " + "restricted to a maximum of %d KB.)"), + MIN_BUFFER_TOTAL_SIZE_RECS, approx_case_cost, get_max_workspace() / 1024); + return 0; } - /* Determine order of merge. */ - order = MAX_MERGE_ORDER; - if (x_max / order < MIN_BUFFER_SIZE_RECS) - order = x_max / MIN_BUFFER_SIZE_RECS; - else if (x_max / order * sizeof (union value) * default_dict.nval - < MIN_BUFFER_SIZE_BYTES) - order = x_max / (MIN_BUFFER_SIZE_BYTES - / (sizeof (union value) * (default_dict.nval - 1))); - - /* Make sure the order of merge is bounded. */ - if (order < 2) - order = 2; - if (order > rmax) - order = rmax; - assert (x_max / order > 0); - - /* Calculate number of records per buffer. */ - records_per_buffer = x_max / order; - - /* Add (1 - S) mod (P - 1) dummy runs of length 0. */ - { - int n_dummy_runs = mod (1 - rmax, order - 1); - debug_printf (("rmax=%d, order=%d, n_dummy_runs=%d\n", - rmax, order, n_dummy_runs)); - assert (n_dummy_runs >= 0); - while (n_dummy_runs--) - { - heap_insert (huffman_queue, -2, 0); - rmax++; - } - } + /* Determine maximum order of merge. */ + max_order = MAX_MERGE_ORDER; + if (mrg.case_cnt / max_order < MIN_BUFFER_SIZE_RECS) + max_order = mrg.case_cnt / MIN_BUFFER_SIZE_RECS; + else if (mrg.case_cnt / max_order * xsrt->scp->case_size + < MIN_BUFFER_SIZE_BYTES) + max_order = mrg.case_cnt / (MIN_BUFFER_SIZE_BYTES / xsrt->scp->case_size); + if (max_order < 2) + max_order = 2; + if (max_order > xsrt->run_cnt) + max_order = xsrt->run_cnt; /* Repeatedly merge the P shortest existing runs until only one run is left. */ - while (rmax > 1) + make_heap (xsrt->initial_runs, xsrt->run_cnt, sizeof *xsrt->initial_runs, + compare_initial_runs, NULL); + dummy_run_cnt = mod (1 - (int) xsrt->run_cnt, max_order - 1); + assert (max_order == 1 + || (xsrt->run_cnt + dummy_run_cnt) % (max_order - 1) == 1); + while (xsrt->run_cnt > 1) { - int run_index[MAX_MERGE_ORDER]; - int run_length[MAX_MERGE_ORDER]; - int total_run_length = 0; + struct initial_run output_run; + int order; int i; - assert (rmax >= order); - - /* Find the shortest runs; put them in runs[] in reverse order - of length, to force dummy runs of length 0 to the end of the - list. */ - debug_printf ((_("merging runs"))); - for (i = order - 1; i >= 0; i--) - { - run_index[i] = heap_delete (huffman_queue, &run_length[i]); - assert (run_index[i] != -1); - total_run_length += run_length[i]; - debug_printf ((" %d(%d)", run_index[i], run_length[i])); - } - debug_printf ((_(" into run %d(%d)\n"), run_no, total_run_length)); - - if (!merge_once (run_index, run_length, order)) - { - int index; - - while (-1 != (index = heap_delete (huffman_queue, NULL))) - { - sprintf (tmp_extname, "%08x", index); - if (remove (tmp_basename) != 0) - msg (SE, _("%s: Error removing temporary file: %s."), - tmp_basename, strerror (errno)); - } + /* Choose order of merge (max_order after first merge). */ + order = max_order - dummy_run_cnt; + dummy_run_cnt = 0; + + /* Choose runs to merge. */ + assert (xsrt->run_cnt >= order); + for (i = 0; i < order; i++) + pop_heap (xsrt->initial_runs, xsrt->run_cnt--, + sizeof *xsrt->initial_runs, + compare_initial_runs, NULL); + + /* Merge runs. */ + if (!merge_once (&mrg, xsrt->initial_runs + xsrt->run_cnt, order, + &output_run)) + goto done; + + /* Add output run to heap. */ + xsrt->initial_runs[xsrt->run_cnt++] = output_run; + push_heap (xsrt->initial_runs, xsrt->run_cnt, sizeof *xsrt->initial_runs, + compare_initial_runs, NULL); + } - return 0; - } + /* Exactly one run is left, which contains the entire sorted + file. We could use it to find a total case count. */ + assert (xsrt->run_cnt == 1); - if (!heap_insert (huffman_queue, run_no++, total_run_length)) - { - msg (SE, _("Out of memory expanding Huffman priority queue.")); - return 0; - } - - rmax -= order - 1; - } + success = 1; - /* There should be exactly one element in the priority queue after - all that merging. This represents the entire sorted active file. - So we could find a total case count by deleting this element from - the queue. */ - assert (heap_size (huffman_queue) == 1); + done: + for (i = 0; i < mrg.case_cnt; i++) + free (mrg.cases[i]); + free (mrg.cases); - return 1; + return success; } -/* Merges N_RUNS initial runs into a new run. The jth run for 0 <= j - < N_RUNS is taken from temporary file RUN_INDEX[j]; it is composed - of RUN_LENGTH[j] cases. */ +/* Modulo function as defined by Knuth. */ static int -merge_once (int run_index[], int run_length[], int n_runs) +mod (int x, int y) { - /* For each run, the number of records remaining in its buffer. */ - int buffered[MAX_MERGE_ORDER]; + if (y == 0) + return x; + else if (x == 0) + return 0; + else if (x > 0 && y > 0) + return x % y; + else if (x < 0 && y > 0) + return y - (-x) % y; - /* For each run, the index of the next record in the buffer. */ - int buffer_ptr[MAX_MERGE_ORDER]; + assert (0); + abort (); +} - /* Open input files. */ +/* A run of data for use in merging. */ +struct run { - int i; + FILE *file; /* File that contains run. */ + int file_idx; /* Index of file that contains run. */ + struct ccase **buffer; /* Case buffer. */ + struct ccase **buffer_head; /* First unconsumed case in buffer. */ + struct ccase **buffer_tail; /* One past last unconsumed case in buffer. */ + size_t buffer_cap; /* Number of cases buffer can hold. */ + size_t unread_case_cnt; /* Number of cases not yet read. */ + }; - for (i = 0; i < n_runs; i++) - if (run_index[i] != -2 && !open_handle_r (i, run_index[i])) - { - /* Close and remove temporary files. */ - while (i--) - { - close_handle (i); - sprintf (tmp_extname, "%08x", i); - if (remove (tmp_basename) != 0) - msg (SE, _("%s: Error removing temporary file: %s."), - tmp_basename, strerror (errno)); - } +/* Merges the RUN_CNT initial runs specified in INPUT_RUNS into a + new run. Returns nonzero only if successful. Adds an entry + to MRG->xsrt->runs for the output file if and only if the + output file is actually created. Always deletes all the input + files. */ +static int +merge_once (struct merge_state *mrg, + const struct initial_run input_runs[], + size_t run_cnt, + struct initial_run *output_run) +{ + struct run runs[MAX_MERGE_ORDER]; + FILE *output_file = NULL; + int success = 0; + int i; - return 0; - } - } + /* Initialize runs[]. */ + for (i = 0; i < run_cnt; i++) + { + runs[i].file = NULL; + runs[i].file_idx = input_runs[i].file_idx; + runs[i].buffer = mrg->cases + mrg->case_cnt / run_cnt * i; + runs[i].buffer_head = runs[i].buffer; + runs[i].buffer_tail = runs[i].buffer; + runs[i].buffer_cap = mrg->case_cnt / run_cnt; + runs[i].unread_case_cnt = input_runs[i].case_cnt; + } - /* Create output file. */ - if (!open_handle_w (N_INPUT_BUFFERS, run_no)) + /* Open input files. */ + for (i = 0; i < run_cnt; i++) { - msg (SE, _("%s: Error creating temporary file for merge: %s."), - tmp_basename, strerror (errno)); - goto lossage; + runs[i].file = open_temp_file (mrg->xsrt, runs[i].file_idx, "rb"); + if (runs[i].file == NULL) + goto error; + } + + /* Create output file and count cases to be output. */ + output_run->file_idx = mrg->xsrt->next_file_idx++; + output_run->case_cnt = 0; + for (i = 0; i < run_cnt; i++) + output_run->case_cnt += input_runs[i].case_cnt; + output_file = open_temp_file (mrg->xsrt, output_run->file_idx, "wb"); + if (output_file == NULL) + goto error; + + /* Prime buffers. */ + for (i = 0; i < run_cnt; i++) + if (!fill_run_buffer (mrg, runs + i)) + goto error; + + /* Merge. */ + while (run_cnt > 0) + { + struct run *min_run; + + /* Find minimum. */ + min_run = runs; + for (i = 1; i < run_cnt; i++) + if (compare_record ((*runs[i].buffer_head)->data, + (*min_run->buffer_head)->data, + mrg->xsrt->scp, NULL) < 0) + min_run = runs + i; + + /* Write minimum to output file. */ + if (!write_temp_file (mrg->xsrt, min_run->file_idx, output_file, + (*min_run->buffer_head)->data, + mrg->xsrt->scp->case_size)) + goto error; + + /* Remove case from buffer. */ + if (++min_run->buffer_head >= min_run->buffer_tail) + { + /* Buffer is empty. Fill from file. */ + if (!fill_run_buffer (mrg, min_run)) + goto error; + + /* If buffer is still empty, delete its run. */ + if (min_run->buffer_head >= min_run->buffer_tail) + { + close_temp_file (mrg->xsrt, min_run->file_idx, min_run->file); + remove_temp_file (mrg->xsrt, min_run->file_idx); + *min_run = runs[--run_cnt]; + + /* We could donate the now-unused buffer space to + other runs. */ + } + } } - /* Prime each buffer. */ - { - int i; + /* Close output file. */ + close_temp_file (mrg->xsrt, output_run->file_idx, output_file); - for (i = 0; i < n_runs; i++) - if (run_index[i] == -2) - { - n_runs = i; - break; - } - else - { - int j; - int ofs = records_per_buffer * i; - - buffered[i] = min (records_per_buffer, run_length[i]); - for (j = 0; j < buffered[i]; j++) - if ((int) fread (x[j + ofs]->record, sizeof (union value), - default_dict.nval, handle[i]) - != default_dict.nval) - { - sprintf (tmp_extname, "%08x", run_index[i]); - if (ferror (handle[i])) - msg (SE, _("%s: Error reading temporary file in merge: %s."), - tmp_basename, strerror (errno)); - else - msg (SE, _("%s: Unexpected end of temporary file in merge."), - tmp_basename); - goto lossage; - } - buffer_ptr[i] = ofs; - run_length[i] -= buffered[i]; - } - } + return 1; - /* Perform the merge proper. */ - while (n_runs) /* Loop while some data is left. */ + error: + /* Close and remove output file. */ + if (output_file != NULL) { - int i; - int min = 0; + close_temp_file (mrg->xsrt, output_run->file_idx, output_file); + remove_temp_file (mrg->xsrt, output_run->file_idx); + } + + /* Close and remove any remaining input runs. */ + for (i = 0; i < run_cnt; i++) + { + close_temp_file (mrg->xsrt, runs[i].file_idx, runs[i].file); + remove_temp_file (mrg->xsrt, runs[i].file_idx); + } - for (i = 1; i < n_runs; i++) - if (compare_record (x[buffer_ptr[min]]->record, - x[buffer_ptr[i]]->record) > 0) - min = i; + return success; +} - if ((int) fwrite (x[buffer_ptr[min]]->record, sizeof (union value), - default_dict.nval, handle[N_INPUT_BUFFERS]) - != default_dict.nval) - { - sprintf (tmp_extname, "%08x", run_index[i]); - msg (SE, _("%s: Error writing temporary file in " - "merge: %s."), tmp_basename, strerror (errno)); - goto lossage; - } +/* Reads as many cases as possible into RUN's buffer. + Reads nonzero unless a disk error occurs. */ +static int +fill_run_buffer (struct merge_state *mrg, struct run *run) +{ + run->buffer_head = run->buffer_tail = run->buffer; + while (run->unread_case_cnt > 0 + && run->buffer_tail < run->buffer + run->buffer_cap) + { + if (!read_temp_file (mrg->xsrt, run->file_idx, run->file, + (*run->buffer_tail)->data, + mrg->xsrt->scp->case_size)) + return 0; - /* Remove one case from the buffer for this input file. */ - if (--buffered[min] == 0) - { - /* The input buffer is empty. Do any cases remain in the - initial run on disk? */ - if (run_length[min]) - { - /* Yes. Read them in. */ - - int j; - int ofs; - - /* Reset the buffer pointer. Note that we can't simply - set it to (i * records_per_buffer) since the run - order might have changed. */ - ofs = buffer_ptr[min] -= buffer_ptr[min] % records_per_buffer; - - buffered[min] = min (records_per_buffer, run_length[min]); - for (j = 0; j < buffered[min]; j++) - if ((int) fread (x[j + ofs]->record, sizeof (union value), - default_dict.nval, handle[min]) - != default_dict.nval) - { - sprintf (tmp_extname, "%08x", run_index[min]); - if (ferror (handle[min])) - msg (SE, _("%s: Error reading temporary file in " - "merge: %s."), - tmp_basename, strerror (errno)); - else - msg (SE, _("%s: Unexpected end of temporary file " - "in merge."), - tmp_basename); - goto lossage; - } - run_length[min] -= buffered[min]; - } - else - { - /* No. Delete this run. */ - - /* Close the file. */ - FILE *f = handle[min]; - handle[min] = NULL; - sprintf (tmp_extname, "%08x", run_index[min]); - if (fclose (f) == EOF) - msg (SE, _("%s: Error closing temporary file in merge: " - "%s."), tmp_basename, strerror (errno)); - - /* Delete the file. */ - if (remove (tmp_basename) != 0) - msg (SE, _("%s: Error removing temporary file in merge: " - "%s."), tmp_basename, strerror (errno)); - - n_runs--; - if (min != n_runs) - { - /* Since this isn't the last run, we move the last - run into its spot to force all the runs to be - contiguous. */ - run_index[min] = run_index[n_runs]; - run_length[min] = run_length[n_runs]; - buffer_ptr[min] = buffer_ptr[n_runs]; - buffered[min] = buffered[n_runs]; - handle[min] = handle[n_runs]; - } - } - } - else - buffer_ptr[min]++; + run->unread_case_cnt--; + run->buffer_tail++; } - /* Close output file. */ - { - FILE *f = handle[N_INPUT_BUFFERS]; - handle[N_INPUT_BUFFERS] = NULL; - if (fclose (f) == EOF) - { - sprintf (tmp_extname, "%08x", run_no); - msg (SE, _("%s: Error closing temporary file in merge: " - "%s."), - tmp_basename, strerror (errno)); - return 0; - } - } - return 1; +} + +static struct case_source * +sort_sink_make_source (struct case_sink *sink) +{ + struct initial_run_state *irs = sink->aux; + + return create_case_source (&sort_source_class, default_dict, + irs->xsrt->scp); +} -lossage: - /* Close all the input and output files. */ +static const struct case_sink_class sort_sink_class = { - int i; + "SORT CASES", + NULL, + sort_sink_write, + NULL, + sort_sink_make_source, + }; + +struct sort_source_aux + { + struct sort_cases_pgm *scp; + struct ccase *dst; + write_case_func *write_case; + write_case_data wc_data; + }; - for (i = 0; i < n_runs; i++) - if (run_length[i] != 0) - { - close_handle (i); - sprintf (tmp_basename, "%08x", run_index[i]); - if (remove (tmp_basename) != 0) - msg (SE, _("%s: Error removing temporary file: %s."), - tmp_basename, strerror (errno)); - } - } - close_handle (N_INPUT_BUFFERS); - sprintf (tmp_basename, "%08x", run_no); - if (remove (tmp_basename) != 0) - msg (SE, _("%s: Error removing temporary file: %s."), - tmp_basename, strerror (errno)); - return 0; +/* Passes C to the write_case function. */ +static int +sort_source_read_helper (const struct ccase *src, void *aux_) +{ + struct sort_source_aux *aux = aux_; + + memcpy (aux->dst, src, aux->scp->case_size); + return aux->write_case (aux->wc_data); } - -/* External sort input program. */ /* Reads all the records from the source stream and passes them to write_case(). */ -void -sort_stream_read (void) +static void +sort_source_read (struct case_source *source, + struct ccase *c, + write_case_func *write_case, write_case_data wc_data) { - read_sort_output (write_case); + struct sort_cases_pgm *scp = source->aux; + struct sort_source_aux aux; + + aux.scp = scp; + aux.dst = c; + aux.write_case = write_case; + aux.wc_data = wc_data; + + read_sort_output (scp, sort_source_read_helper, &aux); } +static void read_internal_sort_output (struct internal_sort *isrt, + read_sort_output_func *, void *aux); +static void read_external_sort_output (struct external_sort *xsrt, + read_sort_output_func *, void *aux); + /* Reads all the records from the output stream and passes them to the function provided, which must have an interface identical to write_case(). */ void -read_sort_output (int (*write_case) (void)) +read_sort_output (struct sort_cases_pgm *scp, + read_sort_output_func *output_func, void *aux) { - int i; - FILE *f; - - if (separate_case_tab) + assert ((scp->isrt != NULL) + (scp->xsrt != NULL) <= 1); + if (scp->isrt != NULL) + read_internal_sort_output (scp->isrt, output_func, aux); + else if (scp->xsrt != NULL) + read_external_sort_output (scp->xsrt, output_func, aux); + else { - struct ccase *save_temp_case = temp_case; - struct case_list **p; - - for (p = separate_case_tab; *p; p++) - { - temp_case = &(*p)->c; - write_case (); - } - - free (separate_case_tab); - separate_case_tab = NULL; - - temp_case = save_temp_case; - } else { - sprintf (tmp_extname, "%08x", run_no - 1); - f = fopen (tmp_basename, "rb"); - if (!f) - { - msg (ME, _("%s: Cannot open sort result file: %s."), tmp_basename, - strerror (errno)); - err_failure (); - return; - } - - for (i = 0; i < vfm_source_info.ncases; i++) - { - if (!fread (temp_case, vfm_source_info.case_size, 1, f)) - { - if (ferror (f)) - msg (ME, _("%s: Error reading sort result file: %s."), - tmp_basename, strerror (errno)); - else - msg (ME, _("%s: Unexpected end of sort result file: %s."), - tmp_basename, strerror (errno)); - err_failure (); - break; - } - - if (!write_case ()) - break; - } - - if (fclose (f) == EOF) - msg (ME, _("%s: Error closing sort result file: %s."), tmp_basename, - strerror (errno)); - - if (remove (tmp_basename) != 0) - msg (ME, _("%s: Error removing sort result file: %s."), tmp_basename, - strerror (errno)); - else - rmdir_temp_dir (); + /* No results. Probably an external sort that failed. */ } } -#if 0 /* dead code */ -/* Alternate interface to sort_stream_write used for external sorting - when SEPARATE is true. */ -static int -write_separate (struct ccase *c) +static void +read_internal_sort_output (struct internal_sort *isrt, + read_sort_output_func *output_func, + void *aux) { - assert (c == temp_case); + size_t case_idx; - sort_stream_write (); - return 1; + for (case_idx = 0; case_idx < isrt->case_cnt; case_idx++) + if (!output_func (isrt->cases[case_idx], aux)) + break; } -#endif -/* Performs one iteration of 5.4.1R steps R4, R5, R6, R7, R2, and - R3. */ static void -sort_stream_write (void) +read_external_sort_output (struct external_sort *xsrt, + read_sort_output_func *output_func, void *aux) { - struct repl_sel_tree *t; - - /* R4. */ - memcpy (q->record, temp_case->data, vfm_sink_info.case_size); - if (compare_record (q->record, lastkey) < 0) - if (++rq > rmax) - rmax = rq; + FILE *file; + int file_idx; + size_t i; + struct ccase *c; - /* R5. */ - t = q->fe; + assert (xsrt->run_cnt == 1); + file_idx = xsrt->initial_runs[0].file_idx; - /* R6 and R7. */ - for (;;) + file = open_temp_file (xsrt, file_idx, "rb"); + if (file == NULL) { - /* R6. */ - if (t->rn < rq - || (t->rn == rq && compare_record (t->loser->record, q->record) < 0)) - { - struct repl_sel_tree *temp_tree; - int temp_int; - - temp_tree = t->loser; - t->loser = q; - q = temp_tree; - - temp_int = t->rn; - t->rn = rq; - rq = temp_int; - } - - /* R7. */ - if (t == x[1]) - break; - t = t->fi; + err_failure (); + return; } - /* R2. */ - if (rq != rc) + c = xmalloc (xsrt->scp->case_size); + for (i = 0; i < xsrt->initial_runs[0].case_cnt; i++) { - end_run (); - begin_run (); - assert (rq <= rmax); - rc = rq; - } + if (!read_temp_file (xsrt, file_idx, file, c, xsrt->scp->case_size)) + { + err_failure (); + break; + } - /* R3. */ - if (rq != 0) - { - output_record (q->record); - lastkey = x[x_max]->record; - memcpy (lastkey, q->record, vfm_sink_info.case_size); + if (!output_func (c, aux)) + break; } + free (c); + close_temp_file (xsrt, file_idx, file); } -/* Switches mode from sink to source. */ -void -sort_stream_mode (void) +static void +sort_source_destroy (struct case_source *source) { - /* If this is not done, then we get the following source/sink pairs: - source=memory/disk/DATA LIST/etc., sink=SORT; source=SORT, - sink=SORT; which is not good. */ - vfm_sink = NULL; + struct sort_cases_pgm *scp = source->aux; + + destroy_sort_cases_pgm (scp); } -struct case_stream sort_stream = +const struct case_source_class sort_source_class = { - NULL, - sort_stream_read, - sort_stream_write, - sort_stream_mode, - NULL, - NULL, - "SORT", + "SORT CASES", + NULL, /* FIXME */ + sort_source_read, + sort_source_destroy, };