X-Git-Url: https://pintos-os.org/cgi-bin/gitweb.cgi?a=blobdiff_plain;f=src%2Fsort.c;h=11139868e3708f991e93ee57d8991cb99a90285b;hb=1195caf0c998e80d3e7195a0452a14e7b4194077;hp=cc0a833f696348459585c48ec56458baad1a8ec4;hpb=74a57f26f1458b28a0fddbb9f46004ac8f4d9c30;p=pspp-builds.git diff --git a/src/sort.c b/src/sort.c index cc0a833f..11139868 100644 --- a/src/sort.c +++ b/src/sort.c @@ -14,20 +14,25 @@ You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software - Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA - 02111-1307, USA. */ + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + 02110-1301, USA. */ #include #include "sort.h" -#include +#include "error.h" +#include #include #include #include #include "algorithm.h" #include "alloc.h" +#include "bool.h" +#include "case.h" +#include "casefile.h" #include "command.h" #include "error.h" -#include "expr.h" +#include "expressions/public.h" +#include "glob.h" #include "lexer.h" #include "misc.h" #include "settings.h" @@ -36,82 +41,171 @@ #include "vfm.h" #include "vfmP.h" -#if HAVE_UNISTD_H -#include -#endif +#include "debug-print.h" + +/* Sort direction. */ +enum sort_direction + { + SRT_ASCEND, /* A, B, C, ..., X, Y, Z. */ + SRT_DESCEND /* Z, Y, X, ..., C, B, A. */ + }; -#if HAVE_SYS_TYPES_H -#include -#endif +/* A sort criterion. */ +struct sort_criterion + { + int fv; /* Variable data index. */ + int width; /* 0=numeric, otherwise string widthe. */ + enum sort_direction dir; /* Sort direction. */ + }; -#if HAVE_SYS_STAT_H -#include -#endif +/* A set of sort criteria. */ +struct sort_criteria + { + struct sort_criterion *crits; + size_t crit_cnt; + }; -#include "debug-print.h" +/* These should only be changed for testing purposes. */ +static int min_buffers = 64; +static int max_buffers = INT_MAX; +static bool allow_internal_sort = true; -/* Other prototypes. */ -static int compare_record (const union value *, const union value *, - const struct sort_cases_pgm *, int *idx_to_fv); -static int compare_case_lists (const void *, const void *, void *); -static struct internal_sort *do_internal_sort (struct sort_cases_pgm *, - int separate); -static void destroy_internal_sort (struct internal_sort *); -static struct external_sort *do_external_sort (struct sort_cases_pgm *, - int separate); -static void destroy_external_sort (struct external_sort *); -struct sort_cases_pgm *parse_sort (void); +static int compare_record (const struct ccase *, const struct ccase *, + const struct sort_criteria *); +static struct casefile *do_internal_sort (struct casereader *, + const struct sort_criteria *); +static struct casefile *do_external_sort (struct casereader *, + const struct sort_criteria *); /* Performs the SORT CASES procedures. */ int cmd_sort_cases (void) { - struct sort_cases_pgm *scp; - int success; + struct sort_criteria *criteria; + bool success = false; lex_match (T_BY); - scp = parse_sort (); - if (scp == NULL) + criteria = sort_parse_criteria (default_dict, NULL, NULL, NULL); + if (criteria == NULL) return CMD_FAILURE; - if (temporary != 0) + if (test_mode && lex_match ('/')) { - msg (SE, _("SORT CASES may not be used after TEMPORARY. " - "Temporary transformations will be made permanent.")); - cancel_temporary (); + if (!lex_force_match_id ("BUFFERS") || !lex_match ('=') + || !lex_force_int ()) + goto done; + + min_buffers = max_buffers = lex_integer (); + allow_internal_sort = false; + if (max_buffers < 2) + { + msg (SE, _("Buffer limit must be at least 2.")); + goto done; + } + + lex_get (); } - success = sort_cases (scp, 0); - destroy_sort_cases_pgm (scp); - if (success) - return lex_end_of_command (); - else - return CMD_FAILURE; + success = sort_active_file_in_place (criteria); + + done: + min_buffers = 64; + max_buffers = INT_MAX; + allow_internal_sort = true; + + sort_destroy_criteria (criteria); + return success ? lex_end_of_command () : CMD_FAILURE; +} + +/* Gets ready to sort the active file, either in-place or to a + separate casefile. */ +static void +prepare_to_sort_active_file (void) +{ + /* Cancel temporary transformations and PROCESS IF. */ + if (temporary != 0) + cancel_temporary (); + expr_free (process_if_expr); + process_if_expr = NULL; + + /* Make sure source cases are in a storage source. */ + procedure (NULL, NULL); + assert (case_source_is_class (vfm_source, &storage_source_class)); +} + +/* Sorts the active file in-place according to CRITERIA. + Returns nonzero if successful. */ +int +sort_active_file_in_place (const struct sort_criteria *criteria) +{ + struct casefile *src, *dst; + + prepare_to_sort_active_file (); + + src = storage_source_get_casefile (vfm_source); + dst = sort_execute (casefile_get_destructive_reader (src), criteria); + free_case_source (vfm_source); + vfm_source = NULL; + + if (dst == NULL) + return 0; + + vfm_source = storage_source_create (dst); + return 1; +} + +/* Sorts the active file to a separate casefile. If successful, + returns the sorted casefile. Returns a null pointer on + failure. */ +struct casefile * +sort_active_file_to_casefile (const struct sort_criteria *criteria) +{ + struct casefile *src; + + prepare_to_sort_active_file (); + + src = storage_source_get_casefile (vfm_source); + return sort_execute (casefile_get_reader (src), criteria); } -/* Parses a list of sort keys and returns a struct sort_cases_pgm - based on it. Returns a null pointer on error. */ -struct sort_cases_pgm * -parse_sort (void) +/* Parses a list of sort keys and returns a struct sort_criteria + based on it. Returns a null pointer on error. + If SAW_DIRECTION is nonnull, sets *SAW_DIRECTION to true if at + least one parenthesized sort direction was specified, false + otherwise. */ +struct sort_criteria * +sort_parse_criteria (const struct dictionary *dict, + struct variable ***vars, int *var_cnt, + bool *saw_direction) { - struct sort_cases_pgm *scp; + struct sort_criteria *criteria; + struct variable **local_vars = NULL; + size_t local_var_cnt; + + assert ((vars == NULL) == (var_cnt == NULL)); + if (vars == NULL) + { + vars = &local_vars; + var_cnt = &local_var_cnt; + } + + criteria = xmalloc (sizeof *criteria); + criteria->crits = NULL; + criteria->crit_cnt = 0; - scp = xmalloc (sizeof *scp); - scp->ref_cnt = 1; - scp->vars = NULL; - scp->dirs = NULL; - scp->var_cnt = 0; - scp->isrt = NULL; - scp->xsrt = NULL; + *vars = NULL; + *var_cnt = 0; + if (saw_direction != NULL) + *saw_direction = false; do { - int prev_var_cnt = scp->var_cnt; - enum sort_direction direction = SRT_ASCEND; + int prev_var_cnt = *var_cnt; + enum sort_direction direction; /* Variables. */ - if (!parse_variables (default_dict, &scp->vars, &scp->var_cnt, + if (!parse_variables (dict, vars, var_cnt, PV_NO_DUPLICATE | PV_APPEND | PV_NO_SCRATCH)) goto error; @@ -120,7 +214,9 @@ parse_sort (void) { if (lex_match_id ("D") || lex_match_id ("DOWN")) direction = SRT_DESCEND; - else if (!lex_match_id ("A") && !lex_match_id ("UP")) + else if (lex_match_id ("A") || lex_match_id ("UP")) + direction = SRT_ASCEND; + else { msg (SE, _("`A' or `D' expected inside parentheses.")); goto error; @@ -130,249 +226,176 @@ parse_sort (void) msg (SE, _("`)' expected.")); goto error; } + *saw_direction = true; } - scp->dirs = xrealloc (scp->dirs, sizeof *scp->dirs * scp->var_cnt); - for (; prev_var_cnt < scp->var_cnt; prev_var_cnt++) - scp->dirs[prev_var_cnt] = direction; + else + direction = SRT_ASCEND; + + criteria->crits = xrealloc (criteria->crits, + sizeof *criteria->crits * *var_cnt); + criteria->crit_cnt = *var_cnt; + for (; prev_var_cnt < criteria->crit_cnt; prev_var_cnt++) + { + struct sort_criterion *c = &criteria->crits[prev_var_cnt]; + c->fv = (*vars)[prev_var_cnt]->fv; + c->width = (*vars)[prev_var_cnt]->width; + c->dir = direction; + } } while (token != '.' && token != '/'); - - return scp; + + free (local_vars); + return criteria; error: - destroy_sort_cases_pgm (scp); + free (local_vars); + sort_destroy_criteria (criteria); return NULL; } /* Destroys a SORT CASES program. */ void -destroy_sort_cases_pgm (struct sort_cases_pgm *scp) +sort_destroy_criteria (struct sort_criteria *criteria) { - if (scp != NULL) + if (criteria != NULL) { - assert (scp->ref_cnt > 0); - if (--scp->ref_cnt > 0) - return; - - free (scp->vars); - free (scp->dirs); - destroy_internal_sort (scp->isrt); - destroy_external_sort (scp->xsrt); - free (scp); + free (criteria->crits); + free (criteria); } } -/* Sorts the active file based on the key variables specified in - global variables vars and var_cnt. The output is either to the - active file, if SEPARATE is zero, or to a separate file, if - SEPARATE is nonzero. In the latter case the output cases can be - read with a call to read_sort_output(). (In the former case the - output cases should be dealt with through the usual vfm interface.) - - The caller is responsible for freeing vars[]. */ -int -sort_cases (struct sort_cases_pgm *scp, int separate) +/* Reads all the cases from READER, which is destroyed. Sorts + the cases according to CRITERIA. Returns the sorted cases in + a newly created casefile. */ +struct casefile * +sort_execute (struct casereader *reader, const struct sort_criteria *criteria) { - scp->case_size - = sizeof (union value) * dict_get_compacted_value_cnt (default_dict); - - /* Not sure this is necessary but it's good to be safe. */ - if (separate && case_source_is_class (vfm_source, &sort_source_class)) - procedure (NULL, NULL); - - /* SORT CASES cancels PROCESS IF. */ - expr_free (process_if_expr); - process_if_expr = NULL; - - /* Try an internal sort first. */ - scp->isrt = do_internal_sort (scp, separate); - if (scp->isrt != NULL) - return 1; - - /* Fall back to an external sort. */ - if (vfm_source != NULL - && case_source_is_class (vfm_source, &storage_source_class)) - storage_source_to_disk (vfm_source); - scp->xsrt = do_external_sort (scp, separate); - if (scp->xsrt != NULL) - return 1; - - destroy_sort_cases_pgm (scp); - return 0; + struct casefile *output = do_internal_sort (reader, criteria); + if (output == NULL) + output = do_external_sort (reader, criteria); + casereader_destroy (reader); + return output; } -/* Results of an internal sort. */ -struct internal_sort +/* A case and its index. */ +struct indexed_case { - struct case_list **results; + struct ccase c; /* Case. */ + unsigned long idx; /* Index to allow for stable sorting. */ }; -/* If the data is in memory, do an internal sort. Return - success. */ -static struct internal_sort * -do_internal_sort (struct sort_cases_pgm *scp, int separate) +static int compare_indexed_cases (const void *, const void *, void *); + +/* If the data is in memory, do an internal sort and return a new + casefile for the data. Otherwise, return a null pointer. */ +static struct casefile * +do_internal_sort (struct casereader *reader, + const struct sort_criteria *criteria) { - struct internal_sort *isrt; + const struct casefile *src; + struct casefile *dst; + unsigned long case_cnt; - isrt = xmalloc (sizeof *isrt); - isrt->results = NULL; + if (!allow_internal_sort) + return NULL; - if (case_source_is_class (vfm_source, &storage_source_class) - && !storage_source_on_disk (vfm_source)) + src = casereader_get_casefile (reader); + if (casefile_get_case_cnt (src) > 1 && !casefile_in_core (src)) + return NULL; + + case_cnt = casefile_get_case_cnt (src); + dst = casefile_create (casefile_get_value_cnt (src)); + if (case_cnt != 0) { - struct case_list *case_list; - struct case_list **case_array; - int case_cnt; - int i; - - case_cnt = vfm_source->class->count (vfm_source); - if (case_cnt <= 0) - return isrt; - - if (case_cnt > get_max_workspace() / sizeof *case_array) - goto error; - - case_list = storage_source_get_cases (vfm_source); - case_array = malloc (sizeof *case_array * (case_cnt + 1)); - if (case_array == NULL) - goto error; - - for (i = 0; case_list != NULL; i++) + struct indexed_case *cases = malloc (sizeof *cases * case_cnt); + if (cases != NULL) { - case_array[i] = case_list; - case_list = case_list->next; - } - assert (i == case_cnt); - case_array[case_cnt] = NULL; + unsigned long i; + + for (i = 0; i < case_cnt; i++) + { + casereader_read_xfer_assert (reader, &cases[i].c); + cases[i].idx = i; + } - sort (case_array, case_cnt, sizeof *case_array, - compare_case_lists, scp); + sort (cases, case_cnt, sizeof *cases, compare_indexed_cases, + (void *) criteria); + + for (i = 0; i < case_cnt; i++) + casefile_append_xfer (dst, &cases[i].c); - if (!separate) - { - storage_source_set_cases (vfm_source, case_array[0]); - for (i = 1; i <= case_cnt; i++) - case_array[i - 1]->next = case_array[i]; - free (case_array); + free (cases); } else - isrt->results = case_array; - - return isrt; + { + /* Failure. */ + casefile_destroy (dst); + dst = NULL; + } } - error: - free (isrt); - return NULL; -} - -/* Destroys an internal sort result. */ -static void -destroy_internal_sort (struct internal_sort *isrt) -{ - if (isrt != NULL) - { - free (isrt->results); - free (isrt); - } + return dst; } -/* Compares the VAR_CNT variables in VARS[] between the - `case_list's at A and B, and returns a strcmp()-type - result. */ +/* Compares the variables specified by CRITERIA between the cases + at A and B, with a "last resort" comparison for stability, and + returns a strcmp()-type result. */ static int -compare_case_lists (const void *a_, const void *b_, void *scp_) +compare_indexed_cases (const void *a_, const void *b_, void *criteria_) { - struct sort_cases_pgm *scp = scp_; - struct case_list *const *pa = a_; - struct case_list *const *pb = b_; - struct case_list *a = *pa; - struct case_list *b = *pb; - - return compare_record (a->c.data, b->c.data, scp, NULL); + struct sort_criteria *criteria = criteria_; + const struct indexed_case *a = a_; + const struct indexed_case *b = b_; + int result = compare_record (&a->c, &b->c, criteria); + if (result == 0) + result = a->idx < b->idx ? -1 : a->idx > b->idx; + return result; } /* External sort. */ -/* Maximum order of merge. If you increase this, then you should - use a heap for comparing cases during merge. */ -#define MAX_MERGE_ORDER 7 - -/* Minimum total number of records for buffers. */ -#define MIN_BUFFER_TOTAL_SIZE_RECS 64 - -/* Minimum single input buffer size, in bytes and records. */ -#define MIN_BUFFER_SIZE_BYTES 4096 -#define MIN_BUFFER_SIZE_RECS 16 - -#if MIN_BUFFER_SIZE_RECS * 2 + 16 > MIN_BUFFER_TOTAL_SIZE_RECS -#error MIN_BUFFER_SIZE_RECS and MIN_BUFFER_TOTAL_SIZE_RECS do not make sense. -#endif - -/* An initial run and its length. */ -struct initial_run - { - int file_idx; /* File index. */ - size_t case_cnt; /* Number of cases. */ - }; - -/* Sorts initial runs A and B in decending order by length. */ -static int -compare_initial_runs (const void *a_, const void *b_, void *aux UNUSED) -{ - const struct initial_run *a = a_; - const struct initial_run *b = b_; - - return a->case_cnt > b->case_cnt ? -1 : a->case_cnt case_cnt; -} +/* Maximum order of merge (external sort only). The maximum + reasonable value is about 7. Above that, it would be a good + idea to use a heap in merge_once() to select the minimum. */ +#define MAX_MERGE_ORDER 7 /* Results of an external sort. */ struct external_sort { - struct sort_cases_pgm *scp; /* SORT CASES info. */ - struct initial_run *initial_runs; /* Array of initial runs. */ + const struct sort_criteria *criteria; /* Sort criteria. */ + size_t value_cnt; /* Size of data in `union value's. */ + struct casefile **runs; /* Array of initial runs. */ size_t run_cnt, run_cap; /* Number of runs, allocated capacity. */ - char *temp_dir; /* Temporary file directory name. */ - char *temp_name; /* Name of a temporary file. */ - int next_file_idx; /* Lowest unused file index. */ }; /* Prototypes for helper functions. */ -static void sort_sink_write (struct case_sink *, const struct ccase *); -static int write_initial_runs (struct external_sort *, int separate); -static int init_external_sort (struct external_sort *); -static int merge (struct external_sort *); -static void rmdir_temp_dir (struct external_sort *); -static void remove_temp_file (struct external_sort *xsrt, int file_idx); - -/* Performs an external sort of the active file according to the - specification in SCP. Forms initial runs using a heap as a - reservoir. Determines the optimum merge pattern via Huffman's - method (see Knuth vol. 3, 2nd edition, p. 365-366), and merges - according to that pattern. */ -static struct external_sort * -do_external_sort (struct sort_cases_pgm *scp, int separate) +static int write_runs (struct external_sort *, struct casereader *); +static struct casefile *merge (struct external_sort *); +static void destroy_external_sort (struct external_sort *); + +/* Performs a stable external sort of the active file according + to the specification in SCP. Forms initial runs using a heap + as a reservoir. Merges the initial runs according to a + pattern that assures stability. */ +static struct casefile * +do_external_sort (struct casereader *reader, + const struct sort_criteria *criteria) { struct external_sort *xsrt; - int success = 0; - - xsrt = xmalloc (sizeof *xsrt); - xsrt->scp = scp; - if (!init_external_sort (xsrt)) - goto done; - if (!write_initial_runs (xsrt, separate)) - goto done; - if (!merge (xsrt)) - goto done; - success = 1; + casefile_to_disk (casereader_get_casefile (reader)); - done: - if (success) + xsrt = xmalloc (sizeof *xsrt); + xsrt->criteria = criteria; + xsrt->value_cnt = casefile_get_value_cnt (casereader_get_casefile (reader)); + xsrt->run_cap = 512; + xsrt->run_cnt = 0; + xsrt->runs = xmalloc (sizeof *xsrt->runs * xsrt->run_cap); + if (write_runs (xsrt, reader)) { - /* Don't destroy anything because we'll need it for reading - the output. */ - return xsrt; + struct casefile *output = merge (xsrt); + destroy_external_sort (xsrt); + return output; } else { @@ -390,227 +413,11 @@ destroy_external_sort (struct external_sort *xsrt) int i; for (i = 0; i < xsrt->run_cnt; i++) - remove_temp_file (xsrt, xsrt->initial_runs[i].file_idx); - rmdir_temp_dir (xsrt); - free (xsrt->temp_dir); - free (xsrt->temp_name); - free (xsrt->initial_runs); + casefile_destroy (xsrt->runs[i]); + free (xsrt->runs); free (xsrt); } } - -#ifdef HAVE_MKDTEMP -/* Creates and returns the name of a temporary directory. */ -static char * -make_temp_dir (void) -{ - const char *parent_dir; - char *temp_dir; - - if (getenv ("TMPDIR") != NULL) - parent_dir = getenv ("TMPDIR"); - else - parent_dir = P_tmpdir; - - temp_dir = xmalloc (strlen (parent_dir) + 32); - sprintf (temp_dir, "%s%cpsppXXXXXX", parent_dir, DIR_SEPARATOR); - if (mkdtemp (temp_dir) == NULL) - { - msg (SE, _("%s: Creating temporary directory: %s."), - temp_dir, strerror (errno)); - free (temp_dir); - return NULL; - } - else - return temp_dir; -} -#else /* !HAVE_MKDTEMP */ -/* Creates directory DIR. */ -static int -do_mkdir (const char *dir) -{ -#ifndef __MSDOS__ - return mkdir (dir, S_IRWXU); -#else - return mkdir (dir); -#endif -} - -/* Creates and returns the name of a temporary directory. */ -static char * -make_temp_dir (void) -{ - int i; - - for (i = 0; i < 100; i++) - { - char temp_dir[L_tmpnam + 1]; - if (tmpnam (temp_dir) == NULL) - { - msg (SE, _("Generating temporary directory name failed: %s."), - strerror (errno)); - return NULL; - } - else if (do_mkdir (temp_dir) == 0) - return xstrdup (temp_dir); - } - - msg (SE, _("Creating temporary directory failed: %s."), strerror (errno)); - return NULL; -} -#endif /* !HAVE_MKDTEMP */ - -/* Sets up to open temporary files. */ -static int -init_external_sort (struct external_sort *xsrt) -{ - /* Zero. */ - xsrt->temp_dir = NULL; - xsrt->next_file_idx = 0; - - /* Huffman queue. */ - xsrt->run_cap = 512; - xsrt->run_cnt = 0; - xsrt->initial_runs = xmalloc (sizeof *xsrt->initial_runs * xsrt->run_cap); - - /* Temporary directory. */ - xsrt->temp_dir = make_temp_dir (); - xsrt->temp_name = NULL; - if (xsrt->temp_dir == NULL) - return 0; - xsrt->temp_name = xmalloc (strlen (xsrt->temp_dir) + 64); - - return 1; -} - -/* Returns nonzero if we should return an error even though the - operation succeeded. Useful for testing. */ -static int -simulate_error (void) -{ - static int op_err_cnt = -1; - static int op_cnt; - - if (op_err_cnt == -1 || op_cnt++ < op_err_cnt) - return 0; - else - { - errno = 0; - return 1; - } -} - -/* Removes the directory created for temporary files, if one - exists. */ -static void -rmdir_temp_dir (struct external_sort *xsrt) -{ - if (xsrt->temp_dir != NULL && rmdir (xsrt->temp_dir) == -1) - { - msg (SE, _("%s: Error removing directory for temporary files: %s."), - xsrt->temp_dir, strerror (errno)); - xsrt->temp_dir = NULL; - } -} - -/* Returns the name of temporary file number FILE_IDX for XSRT. - The name is written into a static buffer, so be careful. */ -static char * -get_temp_file_name (struct external_sort *xsrt, int file_idx) -{ - assert (xsrt->temp_dir != NULL); - sprintf (xsrt->temp_name, "%s%c%04d", - xsrt->temp_dir, DIR_SEPARATOR, file_idx); - return xsrt->temp_name; -} - -/* Opens temporary file numbered FILE_IDX for XSRT with mode MODE - and returns the FILE *. */ -static FILE * -open_temp_file (struct external_sort *xsrt, int file_idx, const char *mode) -{ - char *temp_file; - FILE *file; - - temp_file = get_temp_file_name (xsrt, file_idx); - - file = fopen (temp_file, mode); - if (simulate_error () || file == NULL) - msg (SE, _("%s: Error opening temporary file for %s: %s."), - temp_file, mode[0] == 'r' ? "reading" : "writing", - strerror (errno)); - - return file; -} - -/* Closes FILE, which is the temporary file numbered FILE_IDX - under XSRT. Returns nonzero only if successful. */ -static int -close_temp_file (struct external_sort *xsrt, int file_idx, FILE *file) -{ - if (file != NULL) - { - char *temp_file = get_temp_file_name (xsrt, file_idx); - if (simulate_error () || fclose (file) == EOF) - { - msg (SE, _("%s: Error closing temporary file: %s."), - temp_file, strerror (errno)); - return 0; - } - } - return 1; -} - -/* Delete temporary file numbered FILE_IDX for XSRT. */ -static void -remove_temp_file (struct external_sort *xsrt, int file_idx) -{ - if (file_idx != -1) - { - char *temp_file = get_temp_file_name (xsrt, file_idx); - if (simulate_error () || remove (temp_file) != 0) - msg (SE, _("%s: Error removing temporary file: %s."), - temp_file, strerror (errno)); - } -} - -/* Writes SIZE bytes from buffer DATA into FILE, which is - temporary file numbered FILE_IDX from XSRT. */ -static int -write_temp_file (struct external_sort *xsrt, int file_idx, - FILE *file, const void *data, size_t size) -{ - if (!simulate_error () && fwrite (data, size, 1, file) == 1) - return 1; - else - { - char *temp_file = get_temp_file_name (xsrt, file_idx); - msg (SE, _("%s: Error writing temporary file: %s."), - temp_file, strerror (errno)); - return 0; - } -} - -/* Reads SIZE bytes into buffer DATA into FILE, which is - temporary file numbered FILE_IDX from XSRT. */ -static int -read_temp_file (struct external_sort *xsrt, int file_idx, - FILE *file, void *data, size_t size) -{ - if (!simulate_error () && fread (data, size, 1, file) == 1) - return 1; - else - { - char *temp_file = get_temp_file_name (xsrt, file_idx); - if (ferror (file)) - msg (SE, _("%s: Error reading temporary file: %s."), - temp_file, strerror (errno)); - else - msg (SE, _("%s: Unexpected end of temporary file."), - temp_file); - return 0; - } -} /* Replacement selection. */ @@ -618,7 +425,8 @@ read_temp_file (struct external_sort *xsrt, int file_idx, struct record_run { int run; /* Run number of case. */ - struct case_list *record; /* Case data. */ + struct ccase record; /* Case data. */ + size_t idx; /* Case number (for stability). */ }; /* Represents a set of initial runs during an external sort. */ @@ -626,43 +434,41 @@ struct initial_run_state { struct external_sort *xsrt; - int *idx_to_fv; /* Translation table copied from sink. */ - /* Reservoir. */ struct record_run *records; /* Records arranged as a heap. */ size_t record_cnt; /* Current number of records. */ size_t record_cap; /* Capacity for records. */ - struct case_list *free_list;/* Cases not in heap. */ /* Run currently being output. */ - int file_idx; /* Temporary file number. */ + int run; /* Run number. */ size_t case_cnt; /* Number of cases so far. */ - FILE *output_file; /* Output file. */ - struct case_list *last_output;/* Record last output. */ + struct casefile *casefile; /* Output file. */ + struct ccase last_output; /* Record last output. */ int okay; /* Zero if an error has been encountered. */ }; static const struct case_sink_class sort_sink_class; -static void destroy_initial_run_state (struct initial_run_state *irs); +static void destroy_initial_run_state (struct initial_run_state *); +static void process_case (struct initial_run_state *, const struct ccase *, + size_t); static int allocate_cases (struct initial_run_state *); -static struct case_list *grab_case (struct initial_run_state *); -static void release_case (struct initial_run_state *, struct case_list *); -static void output_record (struct initial_run_state *irs); -static void start_run (struct initial_run_state *irs); -static void end_run (struct initial_run_state *irs); +static void output_record (struct initial_run_state *); +static void start_run (struct initial_run_state *); +static void end_run (struct initial_run_state *); static int compare_record_run (const struct record_run *, const struct record_run *, struct initial_run_state *); static int compare_record_run_minheap (const void *, const void *, void *); -/* Writes initial runs for XSRT, sending them to a separate file - if SEPARATE is nonzero. */ +/* Reads cases from READER and composes initial runs in XSRT. */ static int -write_initial_runs (struct external_sort *xsrt, int separate) +write_runs (struct external_sort *xsrt, struct casereader *reader) { struct initial_run_state *irs; + struct ccase c; + size_t idx = 0; int success = 0; /* Allocate memory for cases. */ @@ -670,29 +476,19 @@ write_initial_runs (struct external_sort *xsrt, int separate) irs->xsrt = xsrt; irs->records = NULL; irs->record_cnt = irs->record_cap = 0; - irs->free_list = NULL; - irs->output_file = NULL; - irs->last_output = NULL; - irs->file_idx = 0; + irs->run = 0; irs->case_cnt = 0; + irs->casefile = NULL; + case_nullify (&irs->last_output); irs->okay = 1; if (!allocate_cases (irs)) goto done; - /* Create case sink. */ - if (!separate) - { - if (vfm_sink != NULL && vfm_sink->class->destroy != NULL) - vfm_sink->class->destroy (vfm_sink); - vfm_sink = create_case_sink (&sort_sink_class, default_dict, irs); - xsrt->scp->ref_cnt++; - } - /* Create initial runs. */ start_run (irs); - procedure (NULL, NULL); - irs->idx_to_fv = NULL; - while (irs->record_cnt > 0 && irs->okay) + for (; irs->okay && casereader_read (reader, &c); case_destroy (&c)) + process_case (irs, &c, idx++); + while (irs->okay && irs->record_cnt > 0) output_record (irs); end_run (irs); @@ -706,31 +502,24 @@ write_initial_runs (struct external_sort *xsrt, int separate) /* Add a single case to an initial run. */ static void -sort_sink_write (struct case_sink *sink, const struct ccase *c) +process_case (struct initial_run_state *irs, const struct ccase *c, size_t idx) { - struct initial_run_state *irs = sink->aux; - struct record_run *new_record_run; - - if (!irs->okay) - return; - - irs->idx_to_fv = sink->idx_to_fv; + struct record_run *rr; /* Compose record_run for this run and add to heap. */ - assert (irs->record_cnt < irs->record_cap); - new_record_run = irs->records + irs->record_cnt++; - new_record_run->record = grab_case (irs); - memcpy (new_record_run->record->c.data, c->data, irs->xsrt->scp->case_size); - new_record_run->run = irs->file_idx; - if (irs->last_output != NULL - && compare_record (c->data, irs->last_output->c.data, - irs->xsrt->scp, sink->idx_to_fv) < 0) - new_record_run->run = irs->xsrt->next_file_idx; + assert (irs->record_cnt < irs->record_cap - 1); + rr = irs->records + irs->record_cnt++; + case_copy (&rr->record, 0, c, 0, irs->xsrt->value_cnt); + rr->run = irs->run; + rr->idx = idx; + if (!case_is_null (&irs->last_output) + && compare_record (c, &irs->last_output, irs->xsrt->criteria) < 0) + rr->run = irs->run + 1; push_heap (irs->records, irs->record_cnt, sizeof *irs->records, compare_record_run_minheap, irs); /* Output a record if the reservoir is full. */ - if (irs->record_cnt == irs->record_cap && irs->okay) + if (irs->record_cnt == irs->record_cap - 1 && irs->okay) output_record (irs); } @@ -738,28 +527,17 @@ sort_sink_write (struct case_sink *sink, const struct ccase *c) static void destroy_initial_run_state (struct initial_run_state *irs) { - struct case_list *iter, *next; int i; if (irs == NULL) return; - /* Release cases to free list. */ - for (i = 0; i < irs->record_cnt; i++) - release_case (irs, irs->records[i].record); - if (irs->last_output != NULL) - release_case (irs, irs->last_output); - - /* Free cases in free list. */ - for (iter = irs->free_list; iter != NULL; iter = next) - { - next = iter->next; - free (iter); - } - + for (i = 0; i < irs->record_cap; i++) + case_destroy (&irs->records[i].record); free (irs->records); - if (irs->output_file != NULL) - close_temp_file (irs->xsrt, irs->file_idx, irs->output_file); + + if (irs->casefile != NULL) + casefile_sleep (irs->casefile); free (irs); } @@ -775,36 +553,27 @@ allocate_cases (struct initial_run_state *irs) /* Allocate as many cases as we can within the workspace limit. */ approx_case_cost = (sizeof *irs->records - + sizeof *irs->free_list - + irs->xsrt->scp->case_size + + irs->xsrt->value_cnt * sizeof (union value) + 4 * sizeof (void *)); max_cases = get_max_workspace() / approx_case_cost; + if (max_cases > max_buffers) + max_cases = max_buffers; irs->records = malloc (sizeof *irs->records * max_cases); for (i = 0; i < max_cases; i++) - { - struct case_list *c; - c = malloc (sizeof *c - + irs->xsrt->scp->case_size - - sizeof (union value)); - if (c == NULL) - { - max_cases = i; - break; - } - release_case (irs, c); - } - - /* irs->records gets all but one of the allocated cases. - The extra is used for last_output. */ - irs->record_cap = max_cases - 1; + if (!case_try_create (&irs->records[i].record, irs->xsrt->value_cnt)) + { + max_cases = i; + break; + } + irs->record_cap = max_cases; /* Fail if we didn't allocate an acceptable number of cases. */ - if (irs->records == NULL || max_cases < MIN_BUFFER_TOTAL_SIZE_RECS) + if (irs->records == NULL || max_cases < min_buffers) { msg (SE, _("Out of memory. Could not allocate room for minimum of %d " "cases of %d bytes each. (PSPP workspace is currently " "restricted to a maximum of %d KB.)"), - MIN_BUFFER_TOTAL_SIZE_RECS, approx_case_cost, get_max_workspace() / 1024); + min_buffers, approx_case_cost, get_max_workspace() / 1024); return 0; } return 1; @@ -813,59 +582,49 @@ allocate_cases (struct initial_run_state *irs) /* Compares the VAR_CNT variables in VARS[] between the `value's at A and B, and returns a strcmp()-type result. */ static int -compare_record (const union value *a, const union value *b, - const struct sort_cases_pgm *scp, - int *idx_to_fv) +compare_record (const struct ccase *a, const struct ccase *b, + const struct sort_criteria *criteria) { int i; assert (a != NULL); assert (b != NULL); - for (i = 0; i < scp->var_cnt; i++) + for (i = 0; i < criteria->crit_cnt; i++) { - struct variable *v = scp->vars[i]; - int fv; + const struct sort_criterion *c = &criteria->crits[i]; int result; - - if (idx_to_fv != NULL) - fv = idx_to_fv[v->index]; - else - fv = v->fv; - if (v->type == NUMERIC) + if (c->width == 0) { - double af = a[fv].f; - double bf = b[fv].f; + double af = case_num (a, c->fv); + double bf = case_num (b, c->fv); result = af < bf ? -1 : af > bf; } else - result = memcmp (a[fv].s, b[fv].s, v->width); + result = memcmp (case_str (a, c->fv), case_str (b, c->fv), c->width); - if (result != 0) - { - if (scp->dirs[i] == SRT_DESCEND) - result = -result; - return result; - } + if (result != 0) + return c->dir == SRT_ASCEND ? result : -result; } return 0; } /* Compares record-run tuples A and B on run number first, then - on the current record according to SCP. */ + on record, then on case index. */ static int compare_record_run (const struct record_run *a, const struct record_run *b, struct initial_run_state *irs) { - if (a->run != b->run) - return a->run > b->run ? 1 : -1; - else - return compare_record (a->record->c.data, b->record->c.data, - irs->xsrt->scp, irs->idx_to_fv); + int result = a->run < b->run ? -1 : a->run > b->run; + if (result == 0) + result = compare_record (&a->record, &b->record, irs->xsrt->criteria); + if (result == 0) + result = a->idx < b->idx ? -1 : a->idx > b->idx; + return result; } /* Compares record-run tuples A and B on run number first, then @@ -881,16 +640,11 @@ compare_record_run_minheap (const void *a, const void *b, void *irs) static void start_run (struct initial_run_state *irs) { - irs->file_idx = irs->xsrt->next_file_idx++; + irs->run++; irs->case_cnt = 0; - irs->output_file = open_temp_file (irs->xsrt, irs->file_idx, "wb"); - if (irs->output_file == NULL) - irs->okay = 0; - if (irs->last_output != NULL) - { - release_case (irs, irs->last_output); - irs->last_output = NULL; - } + irs->casefile = casefile_create (irs->xsrt->value_cnt); + casefile_to_disk (irs->casefile); + case_nullify (&irs->last_output); } /* Ends the current initial run. */ @@ -898,24 +652,20 @@ static void end_run (struct initial_run_state *irs) { struct external_sort *xsrt = irs->xsrt; - + /* Record initial run. */ - if (xsrt->run_cnt >= xsrt->run_cap) + if (irs->casefile != NULL) { - xsrt->run_cap *= 2; - xsrt->initial_runs - = xrealloc (xsrt->initial_runs, - sizeof *xsrt->initial_runs * xsrt->run_cap); + casefile_sleep (irs->casefile); + if (xsrt->run_cnt >= xsrt->run_cap) + { + xsrt->run_cap *= 2; + xsrt->runs = xrealloc (xsrt->runs, + sizeof *xsrt->runs * xsrt->run_cap); + } + xsrt->runs[xsrt->run_cnt++] = irs->casefile; + irs->casefile = NULL; } - xsrt->initial_runs[xsrt->run_cnt].file_idx = irs->file_idx; - xsrt->initial_runs[xsrt->run_cnt].case_cnt = irs->case_cnt; - xsrt->run_cnt++; - - /* Close file handle. */ - if (irs->output_file != NULL - && !close_temp_file (irs->xsrt, irs->file_idx, irs->output_file)) - irs->okay = 0; - irs->output_file = NULL; } /* Writes a record to the current initial run. */ @@ -923,6 +673,7 @@ static void output_record (struct initial_run_state *irs) { struct record_run *record_run; + struct ccase case_tmp; /* Extract minimum case from heap. */ assert (irs->record_cnt > 0); @@ -935,468 +686,150 @@ output_record (struct initial_run_state *irs) return; /* Start new run if necessary. */ - assert (record_run->run == irs->file_idx - || record_run->run == irs->xsrt->next_file_idx); - if (record_run->run != irs->file_idx) + assert (record_run->run == irs->run + || record_run->run == irs->run + 1); + if (record_run->run != irs->run) { end_run (irs); start_run (irs); } - assert (record_run->run == irs->file_idx); + assert (record_run->run == irs->run); irs->case_cnt++; /* Write to disk. */ - if (irs->output_file != NULL - && !write_temp_file (irs->xsrt, irs->file_idx, irs->output_file, - &record_run->record->c, irs->xsrt->scp->case_size)) - irs->okay = 0; + if (irs->casefile != NULL) + casefile_append (irs->casefile, &record_run->record); /* This record becomes last_output. */ - if (irs->last_output != NULL) - release_case (irs, irs->last_output); - irs->last_output = record_run->record; + irs->last_output = case_tmp = record_run->record; + record_run->record = irs->records[irs->record_cap - 1].record; + irs->records[irs->record_cap - 1].record = case_tmp; } + +/* Merging. */ -/* Gets a case from the free list in IRS. It is an error to call - this function if the free list is empty. */ -static struct case_list * -grab_case (struct initial_run_state *irs) -{ - struct case_list *c; - - assert (irs != NULL); - assert (irs->free_list != NULL); - - c = irs->free_list; - irs->free_list = c->next; - return c; -} +static int choose_merge (struct casefile *runs[], int run_cnt, int order); +static struct casefile *merge_once (struct external_sort *, + struct casefile *[], size_t); -/* Returns C to the free list in IRS. */ -static void -release_case (struct initial_run_state *irs, struct case_list *c) +/* Repeatedly merges run until only one is left, + and returns the final casefile. */ +static struct casefile * +merge (struct external_sort *xsrt) { - assert (irs != NULL); - assert (c != NULL); - - c->next = irs->free_list; - irs->free_list = c; + while (xsrt->run_cnt > 1) + { + int order = min (MAX_MERGE_ORDER, xsrt->run_cnt); + int idx = choose_merge (xsrt->runs, xsrt->run_cnt, order); + xsrt->runs[idx] = merge_once (xsrt, xsrt->runs + idx, order); + remove_range (xsrt->runs, xsrt->run_cnt, sizeof *xsrt->runs, + idx + 1, order - 1); + xsrt->run_cnt -= order - 1; + } + assert (xsrt->run_cnt == 1); + xsrt->run_cnt = 0; + return xsrt->runs[0]; } - -/* Merging. */ -/* State of merging initial runs. */ -struct merge_state - { - struct external_sort *xsrt; /* External sort state. */ - struct ccase **cases; /* Buffers. */ - size_t case_cnt; /* Number of buffers. */ - }; - -struct run; -static int merge_once (struct merge_state *, - const struct initial_run[], size_t, - struct initial_run *); -static int fill_run_buffer (struct merge_state *, struct run *); -static int mod (int, int); +/* Chooses ORDER runs out of the RUN_CNT runs in RUNS to merge, + and returns the index of the first one. -/* Performs a series of P-way merges of initial runs - method. */ + For stability, we must merge only consecutive runs. For + efficiency, we choose the shortest consecutive sequence of + runs. */ static int -merge (struct external_sort *xsrt) +choose_merge (struct casefile *runs[], int run_cnt, int order) { - struct merge_state mrg; /* State of merge. */ - size_t approx_case_cost; /* Approximate memory cost of one case. */ - int max_order; /* Maximum order of merge. */ - size_t dummy_run_cnt; /* Number of dummy runs to insert. */ - int success = 0; + int min_idx, min_sum; + int cur_idx, cur_sum; int i; - mrg.xsrt = xsrt; + /* Sum up the length of the first ORDER runs. */ + cur_sum = 0; + for (i = 0; i < order; i++) + cur_sum += casefile_get_case_cnt (runs[i]); - /* Allocate as many cases as possible into cases. */ - approx_case_cost = (sizeof *mrg.cases - + xsrt->scp->case_size + 4 * sizeof (void *)); - mrg.case_cnt = get_max_workspace() / approx_case_cost; - mrg.cases = malloc (sizeof *mrg.cases * mrg.case_cnt); - if (mrg.cases == NULL) - goto done; - for (i = 0; i < mrg.case_cnt; i++) + /* Find the shortest group of ORDER runs, + using a running total for efficiency. */ + min_idx = 0; + min_sum = cur_sum; + for (cur_idx = 1; cur_idx + order <= run_cnt; cur_idx++) { - mrg.cases[i] = malloc (xsrt->scp->case_size); - if (mrg.cases[i] == NULL) + cur_sum -= casefile_get_case_cnt (runs[cur_idx - 1]); + cur_sum += casefile_get_case_cnt (runs[cur_idx + order - 1]); + if (cur_sum < min_sum) { - mrg.case_cnt = i; - break; + min_sum = cur_sum; + min_idx = cur_idx; } } - if (mrg.case_cnt < MIN_BUFFER_TOTAL_SIZE_RECS) - { - msg (SE, _("Out of memory. Could not allocate room for minimum of %d " - "cases of %d bytes each. (PSPP workspace is currently " - "restricted to a maximum of %d KB.)"), - MIN_BUFFER_TOTAL_SIZE_RECS, approx_case_cost, get_max_workspace() / 1024); - return 0; - } - /* Determine maximum order of merge. */ - max_order = MAX_MERGE_ORDER; - if (mrg.case_cnt / max_order < MIN_BUFFER_SIZE_RECS) - max_order = mrg.case_cnt / MIN_BUFFER_SIZE_RECS; - else if (mrg.case_cnt / max_order * xsrt->scp->case_size - < MIN_BUFFER_SIZE_BYTES) - max_order = mrg.case_cnt / (MIN_BUFFER_SIZE_BYTES / xsrt->scp->case_size); - if (max_order < 2) - max_order = 2; - if (max_order > xsrt->run_cnt) - max_order = xsrt->run_cnt; - - /* Repeatedly merge the P shortest existing runs until only one run - is left. */ - make_heap (xsrt->initial_runs, xsrt->run_cnt, sizeof *xsrt->initial_runs, - compare_initial_runs, NULL); - dummy_run_cnt = mod (1 - (int) xsrt->run_cnt, max_order - 1); - assert (max_order == 1 - || (xsrt->run_cnt + dummy_run_cnt) % (max_order - 1) == 1); - while (xsrt->run_cnt > 1) - { - struct initial_run output_run; - int order; - int i; - - /* Choose order of merge (max_order after first merge). */ - order = max_order - dummy_run_cnt; - dummy_run_cnt = 0; - - /* Choose runs to merge. */ - assert (xsrt->run_cnt >= order); - for (i = 0; i < order; i++) - pop_heap (xsrt->initial_runs, xsrt->run_cnt--, - sizeof *xsrt->initial_runs, - compare_initial_runs, NULL); - - /* Merge runs. */ - if (!merge_once (&mrg, xsrt->initial_runs + xsrt->run_cnt, order, - &output_run)) - goto done; - - /* Add output run to heap. */ - xsrt->initial_runs[xsrt->run_cnt++] = output_run; - push_heap (xsrt->initial_runs, xsrt->run_cnt, sizeof *xsrt->initial_runs, - compare_initial_runs, NULL); - } - - /* Exactly one run is left, which contains the entire sorted - file. We could use it to find a total case count. */ - assert (xsrt->run_cnt == 1); - - success = 1; - - done: - for (i = 0; i < mrg.case_cnt; i++) - free (mrg.cases[i]); - free (mrg.cases); - - return success; -} - -/* Modulo function as defined by Knuth. */ -static int -mod (int x, int y) -{ - if (y == 0) - return x; - else if (x == 0) - return 0; - else if (x > 0 && y > 0) - return x % y; - else if (x < 0 && y > 0) - return y - (-x) % y; - - assert (0); + return min_idx; } -/* A run of data for use in merging. */ -struct run - { - FILE *file; /* File that contains run. */ - int file_idx; /* Index of file that contains run. */ - struct ccase **buffer; /* Case buffer. */ - struct ccase **buffer_head; /* First unconsumed case in buffer. */ - struct ccase **buffer_tail; /* One past last unconsumed case in buffer. */ - size_t buffer_cap; /* Number of cases buffer can hold. */ - size_t unread_case_cnt; /* Number of cases not yet read. */ - }; - -/* Merges the RUN_CNT initial runs specified in INPUT_RUNS into a - new run. Returns nonzero only if successful. Adds an entry - to MRG->xsrt->runs for the output file if and only if the - output file is actually created. Always deletes all the input - files. */ -static int -merge_once (struct merge_state *mrg, - const struct initial_run input_runs[], - size_t run_cnt, - struct initial_run *output_run) +/* Merges the RUN_CNT initial runs specified in INPUT_FILES into a + new run, and returns the new run. */ +static struct casefile * +merge_once (struct external_sort *xsrt, + struct casefile **const input_files, + size_t run_cnt) { - struct run runs[MAX_MERGE_ORDER]; - FILE *output_file = NULL; - int success = 0; - int i; - - /* Initialize runs[]. */ - for (i = 0; i < run_cnt; i++) + struct run { - runs[i].file = NULL; - runs[i].file_idx = input_runs[i].file_idx; - runs[i].buffer = mrg->cases + mrg->case_cnt / run_cnt * i; - runs[i].buffer_head = runs[i].buffer; - runs[i].buffer_tail = runs[i].buffer; - runs[i].buffer_cap = mrg->case_cnt / run_cnt; - runs[i].unread_case_cnt = input_runs[i].case_cnt; + struct casefile *file; + struct casereader *reader; + struct ccase ccase; } + *runs; + + struct casefile *output = NULL; + int i; /* Open input files. */ + runs = xmalloc (sizeof *runs * run_cnt); for (i = 0; i < run_cnt; i++) { - runs[i].file = open_temp_file (mrg->xsrt, runs[i].file_idx, "rb"); - if (runs[i].file == NULL) - goto error; + struct run *r = &runs[i]; + r->file = input_files[i]; + r->reader = casefile_get_destructive_reader (r->file); + if (!casereader_read_xfer (r->reader, &r->ccase)) + { + run_cnt--; + i--; + } } - - /* Create output file and count cases to be output. */ - output_run->file_idx = mrg->xsrt->next_file_idx++; - output_run->case_cnt = 0; - for (i = 0; i < run_cnt; i++) - output_run->case_cnt += input_runs[i].case_cnt; - output_file = open_temp_file (mrg->xsrt, output_run->file_idx, "wb"); - if (output_file == NULL) - goto error; - - /* Prime buffers. */ - for (i = 0; i < run_cnt; i++) - if (!fill_run_buffer (mrg, runs + i)) - goto error; + + /* Create output file. */ + output = casefile_create (xsrt->value_cnt); + casefile_to_disk (output); /* Merge. */ while (run_cnt > 0) { - struct run *min_run; - + struct run *min_run, *run; + /* Find minimum. */ min_run = runs; - for (i = 1; i < run_cnt; i++) - if (compare_record ((*runs[i].buffer_head)->data, - (*min_run->buffer_head)->data, - mrg->xsrt->scp, NULL) < 0) - min_run = runs + i; + for (run = runs + 1; run < runs + run_cnt; run++) + if (compare_record (&run->ccase, &min_run->ccase, xsrt->criteria) < 0) + min_run = run; /* Write minimum to output file. */ - if (!write_temp_file (mrg->xsrt, min_run->file_idx, output_file, - (*min_run->buffer_head)->data, - mrg->xsrt->scp->case_size)) - goto error; + casefile_append_xfer (output, &min_run->ccase); - /* Remove case from buffer. */ - if (++min_run->buffer_head >= min_run->buffer_tail) + /* Read another case from minimum run. */ + if (!casereader_read_xfer (min_run->reader, &min_run->ccase)) { - /* Buffer is empty. Fill from file. */ - if (!fill_run_buffer (mrg, min_run)) - goto error; - - /* If buffer is still empty, delete its run. */ - if (min_run->buffer_head >= min_run->buffer_tail) - { - close_temp_file (mrg->xsrt, min_run->file_idx, min_run->file); - remove_temp_file (mrg->xsrt, min_run->file_idx); - *min_run = runs[--run_cnt]; + casereader_destroy (min_run->reader); + casefile_destroy (min_run->file); - /* We could donate the now-unused buffer space to - other runs. */ - } + remove_element (runs, run_cnt, sizeof *runs, min_run - runs); + run_cnt--; } } - /* Close output file. */ - close_temp_file (mrg->xsrt, output_run->file_idx, output_file); - - return 1; - - error: - /* Close and remove output file. */ - if (output_file != NULL) - { - close_temp_file (mrg->xsrt, output_run->file_idx, output_file); - remove_temp_file (mrg->xsrt, output_run->file_idx); - } - - /* Close and remove any remaining input runs. */ - for (i = 0; i < run_cnt; i++) - { - close_temp_file (mrg->xsrt, runs[i].file_idx, runs[i].file); - remove_temp_file (mrg->xsrt, runs[i].file_idx); - } - - return success; -} - -/* Reads as many cases as possible into RUN's buffer. - Reads nonzero unless a disk error occurs. */ -static int -fill_run_buffer (struct merge_state *mrg, struct run *run) -{ - run->buffer_head = run->buffer_tail = run->buffer; - while (run->unread_case_cnt > 0 - && run->buffer_tail < run->buffer + run->buffer_cap) - { - if (!read_temp_file (mrg->xsrt, run->file_idx, run->file, - (*run->buffer_tail)->data, - mrg->xsrt->scp->case_size)) - return 0; - - run->unread_case_cnt--; - run->buffer_tail++; - } - - return 1; -} - -static struct case_source * -sort_sink_make_source (struct case_sink *sink) -{ - struct initial_run_state *irs = sink->aux; - - return create_case_source (&sort_source_class, default_dict, - irs->xsrt->scp); -} - -static const struct case_sink_class sort_sink_class = - { - "SORT CASES", - NULL, - sort_sink_write, - NULL, - sort_sink_make_source, - }; - -struct sort_source_aux - { - struct sort_cases_pgm *scp; - struct ccase *dst; - write_case_func *write_case; - write_case_data wc_data; - }; - -/* Passes C to the write_case function. */ -static int -sort_source_read_helper (const struct ccase *src, void *aux_) -{ - struct sort_source_aux *aux = aux_; - - memcpy (aux->dst, src, aux->scp->case_size); - return aux->write_case (aux->wc_data); -} - -/* Reads all the records from the source stream and passes them - to write_case(). */ -static void -sort_source_read (struct case_source *source, - struct ccase *c, - write_case_func *write_case, write_case_data wc_data) -{ - struct sort_cases_pgm *scp = source->aux; - struct sort_source_aux aux; - - aux.scp = scp; - aux.dst = c; - aux.write_case = write_case; - aux.wc_data = wc_data; - - read_sort_output (scp, sort_source_read_helper, &aux); -} - -static void read_internal_sort_output (struct internal_sort *isrt, - read_sort_output_func *, void *aux); -static void read_external_sort_output (struct external_sort *xsrt, - read_sort_output_func *, void *aux); - -/* Reads all the records from the output stream and passes them to the - function provided, which must have an interface identical to - write_case(). */ -void -read_sort_output (struct sort_cases_pgm *scp, - read_sort_output_func *output_func, void *aux) -{ - assert ((scp->isrt != NULL) + (scp->xsrt != NULL) <= 1); - if (scp->isrt != NULL) - read_internal_sort_output (scp->isrt, output_func, aux); - else if (scp->xsrt != NULL) - read_external_sort_output (scp->xsrt, output_func, aux); - else - { - /* No results. Probably an external sort that failed. */ - } -} - -static void -read_internal_sort_output (struct internal_sort *isrt, - read_sort_output_func *output_func, - void *aux) -{ - struct case_list **p; - - for (p = isrt->results; *p; p++) - if (!output_func (&(*p)->c, aux)) - break; - free (isrt->results); -} - -static void -read_external_sort_output (struct external_sort *xsrt, - read_sort_output_func *output_func, void *aux) -{ - FILE *file; - int file_idx; - size_t i; - struct ccase *c; - - assert (xsrt->run_cnt == 1); - file_idx = xsrt->initial_runs[0].file_idx; - - file = open_temp_file (xsrt, file_idx, "rb"); - if (file == NULL) - { - err_failure (); - return; - } - - c = xmalloc (xsrt->scp->case_size); - for (i = 0; i < xsrt->initial_runs[0].case_cnt; i++) - { - if (!read_temp_file (xsrt, file_idx, file, c, xsrt->scp->case_size)) - { - err_failure (); - break; - } - - if (!output_func (c, aux)) - break; - } - free (c); -} + casefile_sleep (output); + free (runs); -static void -sort_source_destroy (struct case_source *source) -{ - struct sort_cases_pgm *scp = source->aux; - - destroy_sort_cases_pgm (scp); + return output; } - -const struct case_source_class sort_source_class = - { - "SORT CASES", - NULL, /* FIXME */ - sort_source_read, - sort_source_destroy, - };