X-Git-Url: https://pintos-os.org/cgi-bin/gitweb.cgi?a=blobdiff_plain;f=src%2Fsort.c;h=ec90b558f06af65c1b856f9b52a55e4396e3a51c;hb=2981ebc0eeb14a4e0c91a39c454bfe3c4dfce1a4;hp=99d850ad9345c354de71b39b3f33d75b857e3a9a;hpb=bc963dae9be291ea0a7cccf189d13e00d3797cfd;p=pspp-builds.git diff --git a/src/sort.c b/src/sort.c index 99d850ad..ec90b558 100644 --- a/src/sort.c +++ b/src/sort.c @@ -19,7 +19,7 @@ #include #include "sort.h" -#include +#include "error.h" #include #include #include @@ -52,7 +52,7 @@ /* Other prototypes. */ static int compare_record (const union value *, const union value *, - const struct sort_cases_pgm *); + const struct sort_cases_pgm *, int *idx_to_fv); static int compare_case_lists (const void *, const void *, void *); static struct internal_sort *do_internal_sort (struct sort_cases_pgm *, int separate); @@ -69,15 +69,18 @@ cmd_sort_cases (void) struct sort_cases_pgm *scp; int success; - lex_match_id ("SORT"); - lex_match_id ("CASES"); lex_match (T_BY); scp = parse_sort (); if (scp == NULL) return CMD_FAILURE; - - cancel_temporary (); + + if (temporary != 0) + { + msg (SE, _("SORT CASES may not be used after TEMPORARY. " + "Temporary transformations will be made permanent.")); + cancel_temporary (); + } success = sort_cases (scp, 0); destroy_sort_cases_pgm (scp); @@ -141,6 +144,7 @@ parse_sort (void) return NULL; } +/* Destroys a SORT CASES program. */ void destroy_sort_cases_pgm (struct sort_cases_pgm *scp) { @@ -169,9 +173,12 @@ destroy_sort_cases_pgm (struct sort_cases_pgm *scp) int sort_cases (struct sort_cases_pgm *scp, int separate) { + scp->case_size + = sizeof (union value) * dict_get_compacted_value_cnt (default_dict); + /* Not sure this is necessary but it's good to be safe. */ if (separate && case_source_is_class (vfm_source, &sort_source_class)) - procedure (NULL, NULL, NULL, NULL); + procedure (NULL, NULL); /* SORT CASES cancels PROCESS IF. */ expr_free (process_if_expr); @@ -183,7 +190,9 @@ sort_cases (struct sort_cases_pgm *scp, int separate) return 1; /* Fall back to an external sort. */ - write_active_file_to_disk (); + if (vfm_source != NULL + && case_source_is_class (vfm_source, &storage_source_class)) + storage_source_to_disk (vfm_source); scp->xsrt = do_external_sort (scp, separate); if (scp->xsrt != NULL) return 1; @@ -192,13 +201,14 @@ sort_cases (struct sort_cases_pgm *scp, int separate) return 0; } +/* Results of an internal sort. */ struct internal_sort { struct case_list **results; }; -/* If a reasonable situation is set up, do an internal sort of the - data. Return success. */ +/* If the data is in memory, do an internal sort. Return + success. */ static struct internal_sort * do_internal_sort (struct sort_cases_pgm *scp, int separate) { @@ -207,53 +217,48 @@ do_internal_sort (struct sort_cases_pgm *scp, int separate) isrt = xmalloc (sizeof *isrt); isrt->results = NULL; - if (!case_source_is_class (vfm_source, &disk_source_class)) + if (case_source_is_class (vfm_source, &storage_source_class) + && !storage_source_on_disk (vfm_source)) { - if (!case_source_is_class (vfm_source, &memory_source_class)) - procedure (NULL, NULL, NULL, NULL); - - if (case_source_is_class (vfm_source, &memory_source_class)) - { - struct case_list *case_list; - struct case_list **case_array; - int case_cnt; - int i; + struct case_list *case_list; + struct case_list **case_array; + int case_cnt; + int i; - case_cnt = vfm_source->class->count (vfm_source); - if (case_cnt <= 0) - return isrt; + case_cnt = vfm_source->class->count (vfm_source); + if (case_cnt <= 0) + return isrt; - if (case_cnt > set_max_workspace / sizeof *case_array) - goto error; + if (case_cnt > get_max_workspace() / sizeof *case_array) + goto error; - case_list = memory_source_get_cases (vfm_source); - case_array = malloc (sizeof *case_array * (case_cnt + 1)); - if (case_array == NULL) - goto error; + case_list = storage_source_get_cases (vfm_source); + case_array = malloc (sizeof *case_array * (case_cnt + 1)); + if (case_array == NULL) + goto error; - for (i = 0; case_list != NULL; i++) - { - case_array[i] = case_list; - case_list = case_list->next; - } - assert (i == case_cnt); - case_array[case_cnt] = NULL; + for (i = 0; case_list != NULL; i++) + { + case_array[i] = case_list; + case_list = case_list->next; + } + assert (i == case_cnt); + case_array[case_cnt] = NULL; - sort (case_array, case_cnt, sizeof *case_array, - compare_case_lists, scp); + sort (case_array, case_cnt, sizeof *case_array, + compare_case_lists, scp); - if (!separate) - { - memory_source_set_cases (vfm_source, case_array[0]); - for (i = 1; i <= case_cnt; i++) - case_array[i - 1]->next = case_array[i]; - free (case_array); - } - else - isrt->results = case_array; + if (!separate) + { + storage_source_set_cases (vfm_source, case_array[0]); + for (i = 1; i <= case_cnt; i++) + case_array[i - 1]->next = case_array[i]; + free (case_array); + } + else + isrt->results = case_array; - return isrt; - } + return isrt; } error: @@ -261,6 +266,7 @@ do_internal_sort (struct sort_cases_pgm *scp, int separate) return NULL; } +/* Destroys an internal sort result. */ static void destroy_internal_sort (struct internal_sort *isrt) { @@ -283,7 +289,7 @@ compare_case_lists (const void *a_, const void *b_, void *scp_) struct case_list *a = *pa; struct case_list *b = *pb; - return compare_record (a->c.data, b->c.data, scp); + return compare_record (a->c.data, b->c.data, scp, NULL); } /* External sort. */ @@ -320,18 +326,19 @@ compare_initial_runs (const void *a_, const void *b_, void *aux UNUSED) return a->case_cnt > b->case_cnt ? -1 : a->case_cnt case_cnt; } +/* Results of an external sort. */ struct external_sort { struct sort_cases_pgm *scp; /* SORT CASES info. */ struct initial_run *initial_runs; /* Array of initial runs. */ size_t run_cnt, run_cap; /* Number of runs, allocated capacity. */ char *temp_dir; /* Temporary file directory name. */ + char *temp_name; /* Name of a temporary file. */ int next_file_idx; /* Lowest unused file index. */ - size_t case_size; /* Number of bytes in case. */ }; /* Prototypes for helper functions. */ -static void sort_sink_write (struct case_sink *, struct ccase *); +static void sort_sink_write (struct case_sink *, const struct ccase *); static int write_initial_runs (struct external_sort *, int separate); static int init_external_sort (struct external_sort *); static int merge (struct external_sort *); @@ -351,7 +358,6 @@ do_external_sort (struct sort_cases_pgm *scp, int separate) xsrt = xmalloc (sizeof *xsrt); xsrt->scp = scp; - xsrt->case_size = sizeof (union value) * compaction_nval; if (!init_external_sort (xsrt)) goto done; if (!write_initial_runs (xsrt, separate)) @@ -386,6 +392,8 @@ destroy_external_sort (struct external_sort *xsrt) for (i = 0; i < xsrt->run_cnt; i++) remove_temp_file (xsrt, xsrt->initial_runs[i].file_idx); rmdir_temp_dir (xsrt); + free (xsrt->temp_dir); + free (xsrt->temp_name); free (xsrt->initial_runs); free (xsrt); } @@ -467,13 +475,16 @@ init_external_sort (struct external_sort *xsrt) /* Temporary directory. */ xsrt->temp_dir = make_temp_dir (); + xsrt->temp_name = NULL; if (xsrt->temp_dir == NULL) return 0; + xsrt->temp_name = xmalloc (strlen (xsrt->temp_dir) + 64); return 1; } - +/* Returns nonzero if we should return an error even though the + operation succeeded. Useful for testing. */ static int simulate_error (void) { @@ -496,28 +507,32 @@ rmdir_temp_dir (struct external_sort *xsrt) { if (xsrt->temp_dir != NULL && rmdir (xsrt->temp_dir) == -1) { - msg (SE, _("%s: Error removing directory for temporary files: %s."), + msg (SW, _("%s: Error removing directory for temporary files: %s."), xsrt->temp_dir, strerror (errno)); xsrt->temp_dir = NULL; } } -#define TEMP_FILE_NAME_SIZE (L_tmpnam + 32) -static void -get_temp_file_name (struct external_sort *xsrt, int file_idx, - char filename[TEMP_FILE_NAME_SIZE]) +/* Returns the name of temporary file number FILE_IDX for XSRT. + The name is written into a static buffer, so be careful. */ +static char * +get_temp_file_name (struct external_sort *xsrt, int file_idx) { assert (xsrt->temp_dir != NULL); - sprintf (filename, "%s%c%04d", xsrt->temp_dir, DIR_SEPARATOR, file_idx); + sprintf (xsrt->temp_name, "%s%c%04d", + xsrt->temp_dir, DIR_SEPARATOR, file_idx); + return xsrt->temp_name; } +/* Opens temporary file numbered FILE_IDX for XSRT with mode MODE + and returns the FILE *. */ static FILE * open_temp_file (struct external_sort *xsrt, int file_idx, const char *mode) { - char temp_file[TEMP_FILE_NAME_SIZE]; + char *temp_file; FILE *file; - get_temp_file_name (xsrt, file_idx, temp_file); + temp_file = get_temp_file_name (xsrt, file_idx); file = fopen (temp_file, mode); if (simulate_error () || file == NULL) @@ -528,13 +543,14 @@ open_temp_file (struct external_sort *xsrt, int file_idx, const char *mode) return file; } +/* Closes FILE, which is the temporary file numbered FILE_IDX + under XSRT. Returns nonzero only if successful. */ static int close_temp_file (struct external_sort *xsrt, int file_idx, FILE *file) { if (file != NULL) { - char temp_file[TEMP_FILE_NAME_SIZE]; - get_temp_file_name (xsrt, file_idx, temp_file); + char *temp_file = get_temp_file_name (xsrt, file_idx); if (simulate_error () || fclose (file) == EOF) { msg (SE, _("%s: Error closing temporary file: %s."), @@ -545,19 +561,21 @@ close_temp_file (struct external_sort *xsrt, int file_idx, FILE *file) return 1; } +/* Delete temporary file numbered FILE_IDX for XSRT. */ static void remove_temp_file (struct external_sort *xsrt, int file_idx) { if (file_idx != -1) { - char temp_file[TEMP_FILE_NAME_SIZE]; - get_temp_file_name (xsrt, file_idx, temp_file); + char *temp_file = get_temp_file_name (xsrt, file_idx); if (simulate_error () || remove (temp_file) != 0) - msg (SE, _("%s: Error removing temporary file: %s."), + msg (SW, _("%s: Error removing temporary file: %s."), temp_file, strerror (errno)); } } +/* Writes SIZE bytes from buffer DATA into FILE, which is + temporary file numbered FILE_IDX from XSRT. */ static int write_temp_file (struct external_sort *xsrt, int file_idx, FILE *file, const void *data, size_t size) @@ -566,14 +584,15 @@ write_temp_file (struct external_sort *xsrt, int file_idx, return 1; else { - char temp_file[TEMP_FILE_NAME_SIZE]; - get_temp_file_name (xsrt, file_idx, temp_file); + char *temp_file = get_temp_file_name (xsrt, file_idx); msg (SE, _("%s: Error writing temporary file: %s."), temp_file, strerror (errno)); return 0; } } +/* Reads SIZE bytes into buffer DATA into FILE, which is + temporary file numbered FILE_IDX from XSRT. */ static int read_temp_file (struct external_sort *xsrt, int file_idx, FILE *file, void *data, size_t size) @@ -582,8 +601,7 @@ read_temp_file (struct external_sort *xsrt, int file_idx, return 1; else { - char temp_file[TEMP_FILE_NAME_SIZE]; - get_temp_file_name (xsrt, file_idx, temp_file); + char *temp_file = get_temp_file_name (xsrt, file_idx); if (ferror (file)) msg (SE, _("%s: Error reading temporary file: %s."), temp_file, strerror (errno)); @@ -603,10 +621,13 @@ struct record_run struct case_list *record; /* Case data. */ }; +/* Represents a set of initial runs during an external sort. */ struct initial_run_state { struct external_sort *xsrt; + int *idx_to_fv; /* Translation table copied from sink. */ + /* Reservoir. */ struct record_run *records; /* Records arranged as a heap. */ size_t record_cnt; /* Current number of records. */ @@ -616,13 +637,14 @@ struct initial_run_state /* Run currently being output. */ int file_idx; /* Temporary file number. */ size_t case_cnt; /* Number of cases so far. */ - size_t case_size; /* Number of bytes in a case. */ FILE *output_file; /* Output file. */ struct case_list *last_output;/* Record last output. */ int okay; /* Zero if an error has been encountered. */ }; +static const struct case_sink_class sort_sink_class; + static void destroy_initial_run_state (struct initial_run_state *irs); static int allocate_cases (struct initial_run_state *); static struct case_list *grab_case (struct initial_run_state *); @@ -632,9 +654,11 @@ static void start_run (struct initial_run_state *irs); static void end_run (struct initial_run_state *irs); static int compare_record_run (const struct record_run *, const struct record_run *, - struct sort_cases_pgm *); + struct initial_run_state *); static int compare_record_run_minheap (const void *, const void *, void *); +/* Writes initial runs for XSRT, sending them to a separate file + if SEPARATE is nonzero. */ static int write_initial_runs (struct external_sort *xsrt, int separate) { @@ -651,7 +675,6 @@ write_initial_runs (struct external_sort *xsrt, int separate) irs->last_output = NULL; irs->file_idx = 0; irs->case_cnt = 0; - irs->case_size = dict_get_case_size (default_dict); irs->okay = 1; if (!allocate_cases (irs)) goto done; @@ -659,15 +682,16 @@ write_initial_runs (struct external_sort *xsrt, int separate) /* Create case sink. */ if (!separate) { - if (vfm_sink) + if (vfm_sink != NULL && vfm_sink->class->destroy != NULL) vfm_sink->class->destroy (vfm_sink); - vfm_sink = create_case_sink (&sort_sink_class, irs); + vfm_sink = create_case_sink (&sort_sink_class, default_dict, irs); xsrt->scp->ref_cnt++; } /* Create initial runs. */ start_run (irs); - procedure (NULL, NULL, NULL, NULL); + procedure (NULL, NULL); + irs->idx_to_fv = NULL; while (irs->record_cnt > 0 && irs->okay) output_record (irs); end_run (irs); @@ -682,7 +706,7 @@ write_initial_runs (struct external_sort *xsrt, int separate) /* Add a single case to an initial run. */ static void -sort_sink_write (struct case_sink *sink, struct ccase *c) +sort_sink_write (struct case_sink *sink, const struct ccase *c) { struct initial_run_state *irs = sink->aux; struct record_run *new_record_run; @@ -690,24 +714,27 @@ sort_sink_write (struct case_sink *sink, struct ccase *c) if (!irs->okay) return; + irs->idx_to_fv = sink->idx_to_fv; + /* Compose record_run for this run and add to heap. */ assert (irs->record_cnt < irs->record_cap); new_record_run = irs->records + irs->record_cnt++; new_record_run->record = grab_case (irs); - memcpy (new_record_run->record->c.data, c->data, irs->case_size); + memcpy (new_record_run->record->c.data, c->data, irs->xsrt->scp->case_size); new_record_run->run = irs->file_idx; if (irs->last_output != NULL && compare_record (c->data, irs->last_output->c.data, - irs->xsrt->scp) < 0) + irs->xsrt->scp, sink->idx_to_fv) < 0) new_record_run->run = irs->xsrt->next_file_idx; push_heap (irs->records, irs->record_cnt, sizeof *irs->records, - compare_record_run_minheap, irs->xsrt->scp); + compare_record_run_minheap, irs); /* Output a record if the reservoir is full. */ if (irs->record_cnt == irs->record_cap && irs->okay) output_record (irs); } +/* Destroys the initial run state represented by IRS. */ static void destroy_initial_run_state (struct initial_run_state *irs) { @@ -731,34 +758,34 @@ destroy_initial_run_state (struct initial_run_state *irs) } free (irs->records); - free (irs); - if (irs->output_file != NULL) close_temp_file (irs->xsrt, irs->file_idx, irs->output_file); + + free (irs); } /* Allocates room for lots of cases as a buffer. */ static int allocate_cases (struct initial_run_state *irs) { - size_t case_size; /* Size of one case, in bytes. */ int approx_case_cost; /* Approximate memory cost of one case in bytes. */ int max_cases; /* Maximum number of cases to allocate. */ int i; /* Allocate as many cases as we can within the workspace limit. */ - case_size = dict_get_case_size (default_dict); approx_case_cost = (sizeof *irs->records + sizeof *irs->free_list - + case_size + + irs->xsrt->scp->case_size + 4 * sizeof (void *)); - max_cases = set_max_workspace / approx_case_cost; + max_cases = get_max_workspace() / approx_case_cost; irs->records = malloc (sizeof *irs->records * max_cases); for (i = 0; i < max_cases; i++) { struct case_list *c; - c = malloc (sizeof *c + case_size - sizeof (union value)); + c = malloc (sizeof *c + + irs->xsrt->scp->case_size + - sizeof (union value)); if (c == NULL) { max_cases = i; @@ -777,7 +804,7 @@ allocate_cases (struct initial_run_state *irs) msg (SE, _("Out of memory. Could not allocate room for minimum of %d " "cases of %d bytes each. (PSPP workspace is currently " "restricted to a maximum of %d KB.)"), - MIN_BUFFER_TOTAL_SIZE_RECS, approx_case_cost, set_max_workspace / 1024); + MIN_BUFFER_TOTAL_SIZE_RECS, approx_case_cost, get_max_workspace() / 1024); return 0; } return 1; @@ -787,7 +814,8 @@ allocate_cases (struct initial_run_state *irs) A and B, and returns a strcmp()-type result. */ static int compare_record (const union value *a, const union value *b, - const struct sort_cases_pgm *scp) + const struct sort_cases_pgm *scp, + int *idx_to_fv) { int i; @@ -797,9 +825,14 @@ compare_record (const union value *a, const union value *b, for (i = 0; i < scp->var_cnt; i++) { struct variable *v = scp->vars[i]; - int fv = v->fv; + int fv; int result; + if (idx_to_fv != NULL) + fv = idx_to_fv[v->index]; + else + fv = v->fv; + if (v->type == NUMERIC) { double af = a[fv].f; @@ -821,21 +854,27 @@ compare_record (const union value *a, const union value *b, return 0; } +/* Compares record-run tuples A and B on run number first, then + on the current record according to SCP. */ static int compare_record_run (const struct record_run *a, const struct record_run *b, - struct sort_cases_pgm *scp) + struct initial_run_state *irs) { if (a->run != b->run) return a->run > b->run ? 1 : -1; else - return compare_record (a->record->c.data, b->record->c.data, scp); + return compare_record (a->record->c.data, b->record->c.data, + irs->xsrt->scp, irs->idx_to_fv); } +/* Compares record-run tuples A and B on run number first, then + on the current record according to SCP, but in descending + order. */ static int -compare_record_run_minheap (const void *a, const void *b, void *scp) +compare_record_run_minheap (const void *a, const void *b, void *irs) { - return -compare_record_run (a, b, scp); + return -compare_record_run (a, b, irs); } /* Begins a new initial run, specifically its output file. */ @@ -879,30 +918,22 @@ end_run (struct initial_run_state *irs) irs->output_file = NULL; } +/* Writes a record to the current initial run. */ static void output_record (struct initial_run_state *irs) { struct record_run *record_run; - struct ccase *out_case; /* Extract minimum case from heap. */ assert (irs->record_cnt > 0); pop_heap (irs->records, irs->record_cnt--, sizeof *irs->records, - compare_record_run_minheap, irs->xsrt->scp); + compare_record_run_minheap, irs); record_run = irs->records + irs->record_cnt; /* Bail if an error has occurred. */ if (!irs->okay) return; - /* Obtain case data to write to disk. */ - out_case = &record_run->record->c; - if (compaction_necessary) - { - compact_case (compaction_case, out_case); - out_case = compaction_case; - } - /* Start new run if necessary. */ assert (record_run->run == irs->file_idx || record_run->run == irs->xsrt->next_file_idx); @@ -917,8 +948,7 @@ output_record (struct initial_run_state *irs) /* Write to disk. */ if (irs->output_file != NULL && !write_temp_file (irs->xsrt, irs->file_idx, irs->output_file, - out_case->data, - sizeof *out_case->data * compaction_nval)) + &record_run->record->c, irs->xsrt->scp->case_size)) irs->okay = 0; /* This record becomes last_output. */ @@ -927,6 +957,8 @@ output_record (struct initial_run_state *irs) irs->last_output = record_run->record; } +/* Gets a case from the free list in IRS. It is an error to call + this function if the free list is empty. */ static struct case_list * grab_case (struct initial_run_state *irs) { @@ -940,6 +972,7 @@ grab_case (struct initial_run_state *irs) return c; } +/* Returns C to the free list in IRS. */ static void release_case (struct initial_run_state *irs, struct case_list *c) { @@ -952,6 +985,7 @@ release_case (struct initial_run_state *irs, struct case_list *c) /* Merging. */ +/* State of merging initial runs. */ struct merge_state { struct external_sort *xsrt; /* External sort state. */ @@ -972,7 +1006,6 @@ static int merge (struct external_sort *xsrt) { struct merge_state mrg; /* State of merge. */ - size_t case_size; /* Size of one case, in bytes. */ size_t approx_case_cost; /* Approximate memory cost of one case. */ int max_order; /* Maximum order of merge. */ size_t dummy_run_cnt; /* Number of dummy runs to insert. */ @@ -982,15 +1015,15 @@ merge (struct external_sort *xsrt) mrg.xsrt = xsrt; /* Allocate as many cases as possible into cases. */ - case_size = dict_get_case_size (default_dict); - approx_case_cost = sizeof *mrg.cases + case_size + 4 * sizeof (void *); - mrg.case_cnt = set_max_workspace / approx_case_cost; + approx_case_cost = (sizeof *mrg.cases + + xsrt->scp->case_size + 4 * sizeof (void *)); + mrg.case_cnt = get_max_workspace() / approx_case_cost; mrg.cases = malloc (sizeof *mrg.cases * mrg.case_cnt); if (mrg.cases == NULL) goto done; for (i = 0; i < mrg.case_cnt; i++) { - mrg.cases[i] = malloc (case_size); + mrg.cases[i] = malloc (xsrt->scp->case_size); if (mrg.cases[i] == NULL) { mrg.case_cnt = i; @@ -1002,7 +1035,7 @@ merge (struct external_sort *xsrt) msg (SE, _("Out of memory. Could not allocate room for minimum of %d " "cases of %d bytes each. (PSPP workspace is currently " "restricted to a maximum of %d KB.)"), - MIN_BUFFER_TOTAL_SIZE_RECS, approx_case_cost, set_max_workspace / 1024); + MIN_BUFFER_TOTAL_SIZE_RECS, approx_case_cost, get_max_workspace() / 1024); return 0; } @@ -1010,8 +1043,9 @@ merge (struct external_sort *xsrt) max_order = MAX_MERGE_ORDER; if (mrg.case_cnt / max_order < MIN_BUFFER_SIZE_RECS) max_order = mrg.case_cnt / MIN_BUFFER_SIZE_RECS; - else if (mrg.case_cnt / max_order * case_size < MIN_BUFFER_SIZE_BYTES) - max_order = mrg.case_cnt / (MIN_BUFFER_SIZE_BYTES / case_size); + else if (mrg.case_cnt / max_order * xsrt->scp->case_size + < MIN_BUFFER_SIZE_BYTES) + max_order = mrg.case_cnt / (MIN_BUFFER_SIZE_BYTES / xsrt->scp->case_size); if (max_order < 2) max_order = 2; if (max_order > xsrt->run_cnt) @@ -1107,7 +1141,6 @@ merge_once (struct merge_state *mrg, { struct run runs[MAX_MERGE_ORDER]; FILE *output_file = NULL; - size_t case_size; int success = 0; int i; @@ -1146,7 +1179,6 @@ merge_once (struct merge_state *mrg, goto error; /* Merge. */ - case_size = dict_get_case_size (default_dict); while (run_cnt > 0) { struct run *min_run; @@ -1156,12 +1188,13 @@ merge_once (struct merge_state *mrg, for (i = 1; i < run_cnt; i++) if (compare_record ((*runs[i].buffer_head)->data, (*min_run->buffer_head)->data, - mrg->xsrt->scp) < 0) + mrg->xsrt->scp, NULL) < 0) min_run = runs + i; /* Write minimum to output file. */ if (!write_temp_file (mrg->xsrt, min_run->file_idx, output_file, - (*min_run->buffer_head)->data, case_size)) + (*min_run->buffer_head)->data, + mrg->xsrt->scp->case_size)) goto error; /* Remove case from buffer. */ @@ -1218,7 +1251,7 @@ fill_run_buffer (struct merge_state *mrg, struct run *run) { if (!read_temp_file (mrg->xsrt, run->file_idx, run->file, (*run->buffer_tail)->data, - dict_get_case_size (default_dict))) + mrg->xsrt->scp->case_size)) return 0; run->unread_case_cnt--; @@ -1228,91 +1261,104 @@ fill_run_buffer (struct merge_state *mrg, struct run *run) return 1; } -static void -sort_sink_destroy (struct case_sink *sink UNUSED) -{ - assert (0); -} - static struct case_source * sort_sink_make_source (struct case_sink *sink) { struct initial_run_state *irs = sink->aux; - return create_case_source (&sort_source_class, irs->xsrt->scp); + return create_case_source (&sort_source_class, default_dict, + irs->xsrt->scp); } -const struct case_sink_class sort_sink_class = +static const struct case_sink_class sort_sink_class = { "SORT CASES", NULL, sort_sink_write, - sort_sink_destroy, + NULL, sort_sink_make_source, }; +struct sort_source_aux + { + struct sort_cases_pgm *scp; + struct ccase *dst; + write_case_func *write_case; + write_case_data wc_data; + }; + +/* Passes C to the write_case function. */ +static int +sort_source_read_helper (const struct ccase *src, void *aux_) +{ + struct sort_source_aux *aux = aux_; + + memcpy (aux->dst, src, aux->scp->case_size); + return aux->write_case (aux->wc_data); +} + /* Reads all the records from the source stream and passes them to write_case(). */ static void sort_source_read (struct case_source *source, + struct ccase *c, write_case_func *write_case, write_case_data wc_data) { struct sort_cases_pgm *scp = source->aux; + struct sort_source_aux aux; + + aux.scp = scp; + aux.dst = c; + aux.write_case = write_case; + aux.wc_data = wc_data; - read_sort_output (scp, write_case, wc_data); + read_sort_output (scp, sort_source_read_helper, &aux); } -void read_internal_sort_output (struct internal_sort *isrt, - write_case_func *write_case, - write_case_data wc_data); -void read_external_sort_output (struct external_sort *xsrt, - write_case_func *write_case, - write_case_data wc_data); +static void read_internal_sort_output (struct internal_sort *isrt, + read_sort_output_func *, void *aux); +static void read_external_sort_output (struct external_sort *xsrt, + read_sort_output_func *, void *aux); /* Reads all the records from the output stream and passes them to the function provided, which must have an interface identical to write_case(). */ void read_sort_output (struct sort_cases_pgm *scp, - write_case_func *write_case, write_case_data wc_data) + read_sort_output_func *output_func, void *aux) { assert ((scp->isrt != NULL) + (scp->xsrt != NULL) <= 1); if (scp->isrt != NULL) - read_internal_sort_output (scp->isrt, write_case, wc_data); + read_internal_sort_output (scp->isrt, output_func, aux); else if (scp->xsrt != NULL) - read_external_sort_output (scp->xsrt, write_case, wc_data); + read_external_sort_output (scp->xsrt, output_func, aux); else { /* No results. Probably an external sort that failed. */ } } -void +static void read_internal_sort_output (struct internal_sort *isrt, - write_case_func *write_case, - write_case_data wc_data) + read_sort_output_func *output_func, + void *aux) { - struct ccase *save_temp_case = temp_case; struct case_list **p; - for (p = isrt->results; *p; p++) - { - temp_case = &(*p)->c; - write_case (wc_data); - } + for (p = isrt->results; *p; p++) + if (!output_func (&(*p)->c, aux)) + break; free (isrt->results); - - temp_case = save_temp_case; } -void +static void read_external_sort_output (struct external_sort *xsrt, - write_case_func *write_case, - write_case_data wc_data) + read_sort_output_func *output_func, void *aux) { FILE *file; int file_idx; size_t i; + struct ccase *c; assert (xsrt->run_cnt == 1); file_idx = xsrt->initial_runs[0].file_idx; @@ -1324,18 +1370,20 @@ read_external_sort_output (struct external_sort *xsrt, return; } + c = xmalloc (xsrt->scp->case_size); for (i = 0; i < xsrt->initial_runs[0].case_cnt; i++) { - if (!read_temp_file (xsrt, file_idx, file, - temp_case, xsrt->case_size)) + if (!read_temp_file (xsrt, file_idx, file, c, xsrt->scp->case_size)) { err_failure (); break; } - if (!write_case (wc_data)) + if (!output_func (c, aux)) break; } + free (c); + close_temp_file (xsrt, file_idx, file); } static void