You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
- Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
- 02111-1307, USA. */
+ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ 02110-1301, USA. */
#include <config.h>
#include "sort.h"
-#include <assert.h>
+#include "error.h"
+#include <limits.h>
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include "algorithm.h"
#include "alloc.h"
+#include <stdbool.h>
+#include "case.h"
+#include "casefile.h"
#include "command.h"
#include "error.h"
-#include "expr.h"
+#include "expressions/public.h"
+#include "glob.h"
#include "lexer.h"
#include "misc.h"
#include "settings.h"
+#include "sort-prs.h"
#include "str.h"
#include "var.h"
#include "vfm.h"
#include "vfmP.h"
-#if HAVE_UNISTD_H
-#include <unistd.h>
-#endif
-
-#if HAVE_SYS_TYPES_H
-#include <sys/types.h>
-#endif
-
-#if HAVE_SYS_STAT_H
-#include <sys/stat.h>
-#endif
+#include "gettext.h"
+#define _(msgid) gettext (msgid)
#include "debug-print.h"
-/* Other prototypes. */
-static int compare_record (const union value *, const union value *,
- const struct sort_cases_pgm *);
-static int compare_case_lists (const void *, const void *, void *);
-static struct internal_sort *do_internal_sort (struct sort_cases_pgm *,
- int separate);
-static void destroy_internal_sort (struct internal_sort *);
-static struct external_sort *do_external_sort (struct sort_cases_pgm *,
- int separate);
-static void destroy_external_sort (struct external_sort *);
-struct sort_cases_pgm *parse_sort (void);
+/* These should only be changed for testing purposes. */
+static int min_buffers = 64;
+static int max_buffers = INT_MAX;
+static bool allow_internal_sort = true;
+
+static int compare_record (const struct ccase *, const struct ccase *,
+ const struct sort_criteria *);
+static struct casefile *do_internal_sort (struct casereader *,
+ const struct sort_criteria *);
+static struct casefile *do_external_sort (struct casereader *,
+ const struct sort_criteria *);
/* Performs the SORT CASES procedures. */
int
cmd_sort_cases (void)
{
- struct sort_cases_pgm *scp;
- int success;
+ struct sort_criteria *criteria;
+ bool success = false;
- lex_match_id ("SORT");
- lex_match_id ("CASES");
lex_match (T_BY);
- scp = parse_sort ();
- if (scp == NULL)
+ criteria = sort_parse_criteria (default_dict, NULL, NULL, NULL, NULL);
+ if (criteria == NULL)
return CMD_FAILURE;
-
- cancel_temporary ();
- success = sort_cases (scp, 0);
- destroy_sort_cases_pgm (scp);
- if (success)
- return lex_end_of_command ();
- else
- return CMD_FAILURE;
-}
-
-/* Parses a list of sort keys and returns a struct sort_cases_pgm
- based on it. Returns a null pointer on error. */
-struct sort_cases_pgm *
-parse_sort (void)
-{
- struct sort_cases_pgm *scp;
+ if (get_testing_mode () && lex_match ('/'))
+ {
+ if (!lex_force_match_id ("BUFFERS") || !lex_match ('=')
+ || !lex_force_int ())
+ goto done;
- scp = xmalloc (sizeof *scp);
- scp->ref_cnt = 1;
- scp->vars = NULL;
- scp->dirs = NULL;
- scp->var_cnt = 0;
- scp->isrt = NULL;
- scp->xsrt = NULL;
+ min_buffers = max_buffers = lex_integer ();
+ allow_internal_sort = false;
+ if (max_buffers < 2)
+ {
+ msg (SE, _("Buffer limit must be at least 2."));
+ goto done;
+ }
- do
- {
- int prev_var_cnt = scp->var_cnt;
- enum sort_direction direction = SRT_ASCEND;
-
- /* Variables. */
- if (!parse_variables (default_dict, &scp->vars, &scp->var_cnt,
- PV_NO_DUPLICATE | PV_APPEND | PV_NO_SCRATCH))
- goto error;
-
- /* Sort direction. */
- if (lex_match ('('))
- {
- if (lex_match_id ("D") || lex_match_id ("DOWN"))
- direction = SRT_DESCEND;
- else if (!lex_match_id ("A") && !lex_match_id ("UP"))
- {
- msg (SE, _("`A' or `D' expected inside parentheses."));
- goto error;
- }
- if (!lex_match (')'))
- {
- msg (SE, _("`)' expected."));
- goto error;
- }
- }
- scp->dirs = xrealloc (scp->dirs, sizeof *scp->dirs * scp->var_cnt);
- for (; prev_var_cnt < scp->var_cnt; prev_var_cnt++)
- scp->dirs[prev_var_cnt] = direction;
+ lex_get ();
}
- while (token != '.' && token != '/');
-
- return scp;
- error:
- destroy_sort_cases_pgm (scp);
- return NULL;
+ success = sort_active_file_in_place (criteria);
+
+ done:
+ min_buffers = 64;
+ max_buffers = INT_MAX;
+ allow_internal_sort = true;
+
+ sort_destroy_criteria (criteria);
+ return success ? lex_end_of_command () : CMD_FAILURE;
}
-void
-destroy_sort_cases_pgm (struct sort_cases_pgm *scp)
+/* Gets ready to sort the active file, either in-place or to a
+ separate casefile. */
+static void
+prepare_to_sort_active_file (void)
{
- if (scp != NULL)
- {
- assert (scp->ref_cnt > 0);
- if (--scp->ref_cnt > 0)
- return;
-
- free (scp->vars);
- free (scp->dirs);
- destroy_internal_sort (scp->isrt);
- destroy_external_sort (scp->xsrt);
- free (scp);
- }
-}
+ /* Cancel temporary transformations and PROCESS IF. */
+ if (temporary != 0)
+ cancel_temporary ();
+ expr_free (process_if_expr);
+ process_if_expr = NULL;
-/* Sorts the active file based on the key variables specified in
- global variables vars and var_cnt. The output is either to the
- active file, if SEPARATE is zero, or to a separate file, if
- SEPARATE is nonzero. In the latter case the output cases can be
- read with a call to read_sort_output(). (In the former case the
- output cases should be dealt with through the usual vfm interface.)
+ /* Make sure source cases are in a storage source. */
+ procedure (NULL, NULL);
+ assert (case_source_is_class (vfm_source, &storage_source_class));
+}
- The caller is responsible for freeing vars[]. */
+/* Sorts the active file in-place according to CRITERIA.
+ Returns nonzero if successful. */
int
-sort_cases (struct sort_cases_pgm *scp, int separate)
+sort_active_file_in_place (const struct sort_criteria *criteria)
{
- /* Not sure this is necessary but it's good to be safe. */
- if (separate && case_source_is_class (vfm_source, &sort_source_class))
- procedure (NULL, NULL, NULL, NULL);
+ struct casefile *src, *dst;
- /* SORT CASES cancels PROCESS IF. */
- expr_free (process_if_expr);
- process_if_expr = NULL;
+ prepare_to_sort_active_file ();
- /* Try an internal sort first. */
- scp->isrt = do_internal_sort (scp, separate);
- if (scp->isrt != NULL)
- return 1;
+ src = storage_source_get_casefile (vfm_source);
+ dst = sort_execute (casefile_get_destructive_reader (src), criteria);
+ free_case_source (vfm_source);
+ vfm_source = NULL;
- /* Fall back to an external sort. */
- write_active_file_to_disk ();
- scp->xsrt = do_external_sort (scp, separate);
- if (scp->xsrt != NULL)
- return 1;
+ if (dst == NULL)
+ return 0;
- destroy_sort_cases_pgm (scp);
- return 0;
+ vfm_source = storage_source_create (dst);
+ return 1;
}
-\f
-struct internal_sort
- {
- struct case_list **results;
- };
-/* If a reasonable situation is set up, do an internal sort of the
- data. Return success. */
-static struct internal_sort *
-do_internal_sort (struct sort_cases_pgm *scp, int separate)
+/* Sorts the active file to a separate casefile. If successful,
+ returns the sorted casefile. Returns a null pointer on
+ failure. */
+struct casefile *
+sort_active_file_to_casefile (const struct sort_criteria *criteria)
{
- struct internal_sort *isrt;
+ struct casefile *src;
+
+ prepare_to_sort_active_file ();
- isrt = xmalloc (sizeof *isrt);
- isrt->results = NULL;
+ src = storage_source_get_casefile (vfm_source);
+ return sort_execute (casefile_get_reader (src), criteria);
+}
- if (!case_source_is_class (vfm_source, &disk_source_class))
- {
- if (!case_source_is_class (vfm_source, &memory_source_class))
- procedure (NULL, NULL, NULL, NULL);
- if (case_source_is_class (vfm_source, &memory_source_class))
- {
- struct case_list *case_list;
- struct case_list **case_array;
- size_t case_cnt;
- int i;
+/* Reads all the cases from READER, which is destroyed. Sorts
+ the cases according to CRITERIA. Returns the sorted cases in
+ a newly created casefile. */
+struct casefile *
+sort_execute (struct casereader *reader, const struct sort_criteria *criteria)
+{
+ struct casefile *output = do_internal_sort (reader, criteria);
+ if (output == NULL)
+ output = do_external_sort (reader, criteria);
+ casereader_destroy (reader);
+ return output;
+}
+\f
+/* A case and its index. */
+struct indexed_case
+ {
+ struct ccase c; /* Case. */
+ unsigned long idx; /* Index to allow for stable sorting. */
+ };
- case_cnt = vfm_source_info.ncases;
- if (case_cnt == 0)
- return isrt;
+static int compare_indexed_cases (const void *, const void *, void *);
- if (case_cnt > set_max_workspace / sizeof *case_array)
- goto error;
+/* If the data is in memory, do an internal sort and return a new
+ casefile for the data. Otherwise, return a null pointer. */
+static struct casefile *
+do_internal_sort (struct casereader *reader,
+ const struct sort_criteria *criteria)
+{
+ const struct casefile *src;
+ struct casefile *dst;
+ unsigned long case_cnt;
- case_list = memory_source_get_cases (vfm_source);
- case_array = malloc (sizeof *case_array * (case_cnt + 1));
- if (case_array == NULL)
- goto error;
+ if (!allow_internal_sort)
+ return NULL;
- for (i = 0; case_list != NULL; i++)
+ src = casereader_get_casefile (reader);
+ if (casefile_get_case_cnt (src) > 1 && !casefile_in_core (src))
+ return NULL;
+
+ case_cnt = casefile_get_case_cnt (src);
+ dst = casefile_create (casefile_get_value_cnt (src));
+ if (case_cnt != 0)
+ {
+ struct indexed_case *cases = nmalloc (sizeof *cases, case_cnt);
+ if (cases != NULL)
+ {
+ unsigned long i;
+
+ for (i = 0; i < case_cnt; i++)
{
- case_array[i] = case_list;
- case_list = case_list->next;
+ casereader_read_xfer_assert (reader, &cases[i].c);
+ cases[i].idx = i;
}
- assert (i == case_cnt);
- case_array[case_cnt] = NULL;
- sort (case_array, case_cnt, sizeof *case_array,
- compare_case_lists, scp);
+ sort (cases, case_cnt, sizeof *cases, compare_indexed_cases,
+ (void *) criteria);
+
+ for (i = 0; i < case_cnt; i++)
+ casefile_append_xfer (dst, &cases[i].c);
- if (!separate)
- {
- memory_source_set_cases (vfm_source, case_array[0]);
- for (i = 1; i <= case_cnt; i++)
- case_array[i - 1]->next = case_array[i];
- free (case_array);
- }
- else
- isrt->results = case_array;
-
- return isrt;
- }
+ free (cases);
+ }
+ else
+ {
+ /* Failure. */
+ casefile_destroy (dst);
+ dst = NULL;
+ }
}
- error:
- free (isrt);
- return NULL;
+ return dst;
}
-static void
-destroy_internal_sort (struct internal_sort *isrt)
-{
- if (isrt != NULL)
- {
- free (isrt->results);
- free (isrt);
- }
-}
-
-/* Compares the VAR_CNT variables in VARS[] between the
- `case_list's at A and B, and returns a strcmp()-type
- result. */
+/* Compares the variables specified by CRITERIA between the cases
+ at A and B, with a "last resort" comparison for stability, and
+ returns a strcmp()-type result. */
static int
-compare_case_lists (const void *a_, const void *b_, void *scp_)
+compare_indexed_cases (const void *a_, const void *b_, void *criteria_)
{
- struct sort_cases_pgm *scp = scp_;
- struct case_list *const *pa = a_;
- struct case_list *const *pb = b_;
- struct case_list *a = *pa;
- struct case_list *b = *pb;
-
- return compare_record (a->c.data, b->c.data, scp);
+ struct sort_criteria *criteria = criteria_;
+ const struct indexed_case *a = a_;
+ const struct indexed_case *b = b_;
+ int result = compare_record (&a->c, &b->c, criteria);
+ if (result == 0)
+ result = a->idx < b->idx ? -1 : a->idx > b->idx;
+ return result;
}
\f
/* External sort. */
-/* Maximum order of merge. If you increase this, then you should
- use a heap for comparing cases during merge. */
-#define MAX_MERGE_ORDER 7
-
-/* Minimum total number of records for buffers. */
-#define MIN_BUFFER_TOTAL_SIZE_RECS 64
-
-/* Minimum single input buffer size, in bytes and records. */
-#define MIN_BUFFER_SIZE_BYTES 4096
-#define MIN_BUFFER_SIZE_RECS 16
-
-#if MIN_BUFFER_SIZE_RECS * 2 + 16 > MIN_BUFFER_TOTAL_SIZE_RECS
-#error MIN_BUFFER_SIZE_RECS and MIN_BUFFER_TOTAL_SIZE_RECS do not make sense.
-#endif
-
-/* An initial run and its length. */
-struct initial_run
- {
- int file_idx; /* File index. */
- size_t case_cnt; /* Number of cases. */
- };
-
-/* Sorts initial runs A and B in decending order by length. */
-static int
-compare_initial_runs (const void *a_, const void *b_, void *aux UNUSED)
-{
- const struct initial_run *a = a_;
- const struct initial_run *b = b_;
-
- return a->case_cnt > b->case_cnt ? -1 : a->case_cnt <b->case_cnt;
-}
+/* Maximum order of merge (external sort only). The maximum
+ reasonable value is about 7. Above that, it would be a good
+ idea to use a heap in merge_once() to select the minimum. */
+#define MAX_MERGE_ORDER 7
+/* Results of an external sort. */
struct external_sort
{
- struct sort_cases_pgm *scp; /* SORT CASES info. */
- struct initial_run *initial_runs; /* Array of initial runs. */
+ const struct sort_criteria *criteria; /* Sort criteria. */
+ size_t value_cnt; /* Size of data in `union value's. */
+ struct casefile **runs; /* Array of initial runs. */
size_t run_cnt, run_cap; /* Number of runs, allocated capacity. */
- char *temp_dir; /* Temporary file directory name. */
- int next_file_idx; /* Lowest unused file index. */
};
/* Prototypes for helper functions. */
-static void sort_sink_write (struct case_sink *, struct ccase *);
-static int write_initial_runs (struct external_sort *, int separate);
-static int init_external_sort (struct external_sort *);
-static int merge (struct external_sort *);
-static void rmdir_temp_dir (struct external_sort *);
-static void remove_temp_file (struct external_sort *xsrt, int file_idx);
-
-/* Performs an external sort of the active file according to the
- specification in SCP. Forms initial runs using a heap as a
- reservoir. Determines the optimum merge pattern via Huffman's
- method (see Knuth vol. 3, 2nd edition, p. 365-366), and merges
- according to that pattern. */
-static struct external_sort *
-do_external_sort (struct sort_cases_pgm *scp, int separate)
+static int write_runs (struct external_sort *, struct casereader *);
+static struct casefile *merge (struct external_sort *);
+static void destroy_external_sort (struct external_sort *);
+
+/* Performs a stable external sort of the active file according
+ to the specification in SCP. Forms initial runs using a heap
+ as a reservoir. Merges the initial runs according to a
+ pattern that assures stability. */
+static struct casefile *
+do_external_sort (struct casereader *reader,
+ const struct sort_criteria *criteria)
{
struct external_sort *xsrt;
- int success = 0;
- xsrt = xmalloc (sizeof *xsrt);
- xsrt->scp = scp;
- if (!init_external_sort (xsrt))
- goto done;
- if (!write_initial_runs (xsrt, separate))
- goto done;
- if (!merge (xsrt))
- goto done;
+ casefile_to_disk (casereader_get_casefile (reader));
- success = 1;
-
- done:
- if (success)
+ xsrt = xmalloc (sizeof *xsrt);
+ xsrt->criteria = criteria;
+ xsrt->value_cnt = casefile_get_value_cnt (casereader_get_casefile (reader));
+ xsrt->run_cap = 512;
+ xsrt->run_cnt = 0;
+ xsrt->runs = xnmalloc (xsrt->run_cap, sizeof *xsrt->runs);
+ if (write_runs (xsrt, reader))
{
- /* Don't destroy anything because we'll need it for reading
- the output. */
- return xsrt;
+ struct casefile *output = merge (xsrt);
+ destroy_external_sort (xsrt);
+ return output;
}
else
{
int i;
for (i = 0; i < xsrt->run_cnt; i++)
- remove_temp_file (xsrt, xsrt->initial_runs[i].file_idx);
- rmdir_temp_dir (xsrt);
- free (xsrt->initial_runs);
+ casefile_destroy (xsrt->runs[i]);
+ free (xsrt->runs);
free (xsrt);
}
}
-
-#ifdef HAVE_MKDTEMP
-/* Creates and returns the name of a temporary directory. */
-static char *
-make_temp_dir (void)
-{
- const char *parent_dir;
- char *temp_dir;
-
- if (getenv ("TMPDIR") != NULL)
- parent_dir = getenv ("TMPDIR");
- else
- parent_dir = P_tmpdir;
-
- temp_dir = xmalloc (strlen (parent_dir) + 32);
- sprintf (temp_dir, "%s%cpsppXXXXXX", parent_dir, DIR_SEPARATOR);
- if (mkdtemp (temp_dir) == NULL)
- {
- msg (SE, _("%s: Creating temporary directory: %s."),
- temp_dir, strerror (errno));
- free (temp_dir);
- return NULL;
- }
- else
- return temp_dir;
-}
-#else /* !HAVE_MKDTEMP */
-/* Creates directory DIR. */
-static int
-do_mkdir (const char *dir)
-{
-#ifndef __MSDOS__
- return mkdir (dir, S_IRWXU);
-#else
- return mkdir (dir);
-#endif
-}
-
-/* Creates and returns the name of a temporary directory. */
-static char *
-make_temp_dir (void)
-{
- int i;
-
- for (i = 0; i < 100; i++)
- {
- char temp_dir[L_tmpnam + 1];
- if (tmpnam (temp_dir) == NULL)
- {
- msg (SE, _("Generating temporary directory name failed: %s."),
- strerror (errno));
- return NULL;
- }
- else if (do_mkdir (temp_dir) == 0)
- return xstrdup (temp_dir);
- }
-
- msg (SE, _("Creating temporary directory failed: %s."), strerror (errno));
- return NULL;
-}
-#endif /* !HAVE_MKDTEMP */
-
-/* Sets up to open temporary files. */
-static int
-init_external_sort (struct external_sort *xsrt)
-{
- /* Zero. */
- xsrt->temp_dir = NULL;
- xsrt->next_file_idx = 0;
-
- /* Huffman queue. */
- xsrt->run_cap = 512;
- xsrt->run_cnt = 0;
- xsrt->initial_runs = xmalloc (sizeof *xsrt->initial_runs * xsrt->run_cap);
-
- /* Temporary directory. */
- xsrt->temp_dir = make_temp_dir ();
- if (xsrt->temp_dir == NULL)
- return 0;
-
- return 1;
-}
-
-
-static int
-simulate_error (void)
-{
- static int op_err_cnt = -1;
- static int op_cnt;
-
- if (op_err_cnt == -1 || op_cnt++ < op_err_cnt)
- return 0;
- else
- {
- errno = 0;
- return 1;
- }
-}
-
-/* Removes the directory created for temporary files, if one
- exists. */
-static void
-rmdir_temp_dir (struct external_sort *xsrt)
-{
- if (xsrt->temp_dir != NULL && rmdir (xsrt->temp_dir) == -1)
- {
- msg (SE, _("%s: Error removing directory for temporary files: %s."),
- xsrt->temp_dir, strerror (errno));
- xsrt->temp_dir = NULL;
- }
-}
-
-#define TEMP_FILE_NAME_SIZE (L_tmpnam + 32)
-static void
-get_temp_file_name (struct external_sort *xsrt, int file_idx,
- char filename[TEMP_FILE_NAME_SIZE])
-{
- assert (xsrt->temp_dir != NULL);
- sprintf (filename, "%s%c%04d", xsrt->temp_dir, DIR_SEPARATOR, file_idx);
-}
-
-static FILE *
-open_temp_file (struct external_sort *xsrt, int file_idx, const char *mode)
-{
- char temp_file[TEMP_FILE_NAME_SIZE];
- FILE *file;
-
- get_temp_file_name (xsrt, file_idx, temp_file);
-
- file = fopen (temp_file, mode);
- if (simulate_error () || file == NULL)
- msg (SE, _("%s: Error opening temporary file for %s: %s."),
- temp_file, mode[0] == 'r' ? "reading" : "writing",
- strerror (errno));
-
- return file;
-}
-
-static int
-close_temp_file (struct external_sort *xsrt, int file_idx, FILE *file)
-{
- if (file != NULL)
- {
- char temp_file[TEMP_FILE_NAME_SIZE];
- get_temp_file_name (xsrt, file_idx, temp_file);
- if (simulate_error () || fclose (file) == EOF)
- {
- msg (SE, _("%s: Error closing temporary file: %s."),
- temp_file, strerror (errno));
- return 0;
- }
- }
- return 1;
-}
-
-static void
-remove_temp_file (struct external_sort *xsrt, int file_idx)
-{
- if (file_idx != -1)
- {
- char temp_file[TEMP_FILE_NAME_SIZE];
- get_temp_file_name (xsrt, file_idx, temp_file);
- if (simulate_error () || remove (temp_file) != 0)
- msg (SE, _("%s: Error removing temporary file: %s."),
- temp_file, strerror (errno));
- }
-}
-
-static int
-write_temp_file (struct external_sort *xsrt, int file_idx,
- FILE *file, const void *data, size_t size)
-{
- if (!simulate_error () && fwrite (data, size, 1, file) == 1)
- return 1;
- else
- {
- char temp_file[TEMP_FILE_NAME_SIZE];
- get_temp_file_name (xsrt, file_idx, temp_file);
- msg (SE, _("%s: Error writing temporary file: %s."),
- temp_file, strerror (errno));
- return 0;
- }
-}
-
-static int
-read_temp_file (struct external_sort *xsrt, int file_idx,
- FILE *file, void *data, size_t size)
-{
- if (!simulate_error () && fread (data, size, 1, file) == 1)
- return 1;
- else
- {
- char temp_file[TEMP_FILE_NAME_SIZE];
- get_temp_file_name (xsrt, file_idx, temp_file);
- if (ferror (file))
- msg (SE, _("%s: Error reading temporary file: %s."),
- temp_file, strerror (errno));
- else
- msg (SE, _("%s: Unexpected end of temporary file."),
- temp_file);
- return 0;
- }
-}
\f
/* Replacement selection. */
struct record_run
{
int run; /* Run number of case. */
- struct case_list *record; /* Case data. */
+ struct ccase record; /* Case data. */
+ size_t idx; /* Case number (for stability). */
};
+/* Represents a set of initial runs during an external sort. */
struct initial_run_state
{
struct external_sort *xsrt;
struct record_run *records; /* Records arranged as a heap. */
size_t record_cnt; /* Current number of records. */
size_t record_cap; /* Capacity for records. */
- struct case_list *free_list;/* Cases not in heap. */
/* Run currently being output. */
- int file_idx; /* Temporary file number. */
+ int run; /* Run number. */
size_t case_cnt; /* Number of cases so far. */
- FILE *output_file; /* Output file. */
- struct case_list *last_output;/* Record last output. */
+ struct casefile *casefile; /* Output file. */
+ struct ccase last_output; /* Record last output. */
int okay; /* Zero if an error has been encountered. */
};
-static void destroy_initial_run_state (struct initial_run_state *irs);
+static const struct case_sink_class sort_sink_class;
+
+static void destroy_initial_run_state (struct initial_run_state *);
+static void process_case (struct initial_run_state *, const struct ccase *,
+ size_t);
static int allocate_cases (struct initial_run_state *);
-static struct case_list *grab_case (struct initial_run_state *);
-static void release_case (struct initial_run_state *, struct case_list *);
-static void output_record (struct initial_run_state *irs);
-static void start_run (struct initial_run_state *irs);
-static void end_run (struct initial_run_state *irs);
+static void output_record (struct initial_run_state *);
+static void start_run (struct initial_run_state *);
+static void end_run (struct initial_run_state *);
static int compare_record_run (const struct record_run *,
const struct record_run *,
- struct sort_cases_pgm *);
+ struct initial_run_state *);
static int compare_record_run_minheap (const void *, const void *, void *);
+/* Reads cases from READER and composes initial runs in XSRT. */
static int
-write_initial_runs (struct external_sort *xsrt, int separate)
+write_runs (struct external_sort *xsrt, struct casereader *reader)
{
struct initial_run_state *irs;
+ struct ccase c;
+ size_t idx = 0;
int success = 0;
/* Allocate memory for cases. */
irs->xsrt = xsrt;
irs->records = NULL;
irs->record_cnt = irs->record_cap = 0;
- irs->free_list = NULL;
- irs->output_file = NULL;
- irs->last_output = NULL;
- irs->file_idx = 0;
+ irs->run = 0;
irs->case_cnt = 0;
+ irs->casefile = NULL;
+ case_nullify (&irs->last_output);
irs->okay = 1;
if (!allocate_cases (irs))
goto done;
- /* Create case sink. */
- if (!separate)
- {
- if (vfm_sink)
- vfm_sink->class->destroy (vfm_sink);
- vfm_sink = create_case_sink (&sort_sink_class, irs);
- xsrt->scp->ref_cnt++;
- }
-
/* Create initial runs. */
start_run (irs);
- procedure (NULL, NULL, NULL, NULL);
- while (irs->record_cnt > 0 && irs->okay)
+ for (; irs->okay && casereader_read (reader, &c); case_destroy (&c))
+ process_case (irs, &c, idx++);
+ while (irs->okay && irs->record_cnt > 0)
output_record (irs);
end_run (irs);
/* Add a single case to an initial run. */
static void
-sort_sink_write (struct case_sink *sink, struct ccase *c)
+process_case (struct initial_run_state *irs, const struct ccase *c, size_t idx)
{
- struct initial_run_state *irs = sink->aux;
- struct record_run *new_record_run;
-
- if (!irs->okay)
- return;
+ struct record_run *rr;
/* Compose record_run for this run and add to heap. */
- assert (irs->record_cnt < irs->record_cap);
- new_record_run = irs->records + irs->record_cnt++;
- new_record_run->record = grab_case (irs);
- memcpy (new_record_run->record->c.data, c->data, vfm_sink_info.case_size);
- new_record_run->run = irs->file_idx;
- if (irs->last_output != NULL
- && compare_record (c->data, irs->last_output->c.data,
- irs->xsrt->scp) < 0)
- new_record_run->run = irs->xsrt->next_file_idx;
+ assert (irs->record_cnt < irs->record_cap - 1);
+ rr = irs->records + irs->record_cnt++;
+ case_copy (&rr->record, 0, c, 0, irs->xsrt->value_cnt);
+ rr->run = irs->run;
+ rr->idx = idx;
+ if (!case_is_null (&irs->last_output)
+ && compare_record (c, &irs->last_output, irs->xsrt->criteria) < 0)
+ rr->run = irs->run + 1;
push_heap (irs->records, irs->record_cnt, sizeof *irs->records,
- compare_record_run_minheap, irs->xsrt->scp);
+ compare_record_run_minheap, irs);
/* Output a record if the reservoir is full. */
- if (irs->record_cnt == irs->record_cap && irs->okay)
+ if (irs->record_cnt == irs->record_cap - 1 && irs->okay)
output_record (irs);
}
+/* Destroys the initial run state represented by IRS. */
static void
destroy_initial_run_state (struct initial_run_state *irs)
{
- struct case_list *iter, *next;
int i;
if (irs == NULL)
return;
- /* Release cases to free list. */
- for (i = 0; i < irs->record_cnt; i++)
- release_case (irs, irs->records[i].record);
- if (irs->last_output != NULL)
- release_case (irs, irs->last_output);
+ for (i = 0; i < irs->record_cap; i++)
+ case_destroy (&irs->records[i].record);
+ free (irs->records);
- /* Free cases in free list. */
- for (iter = irs->free_list; iter != NULL; iter = next)
- {
- next = iter->next;
- free (iter);
- }
+ if (irs->casefile != NULL)
+ casefile_sleep (irs->casefile);
- free (irs->records);
free (irs);
-
- if (irs->output_file != NULL)
- close_temp_file (irs->xsrt, irs->file_idx, irs->output_file);
}
/* Allocates room for lots of cases as a buffer. */
static int
allocate_cases (struct initial_run_state *irs)
{
- size_t case_size; /* Size of one case, in bytes. */
int approx_case_cost; /* Approximate memory cost of one case in bytes. */
int max_cases; /* Maximum number of cases to allocate. */
int i;
/* Allocate as many cases as we can within the workspace
limit. */
- case_size = dict_get_case_size (default_dict);
approx_case_cost = (sizeof *irs->records
- + sizeof *irs->free_list
- + case_size
+ + irs->xsrt->value_cnt * sizeof (union value)
+ 4 * sizeof (void *));
- max_cases = set_max_workspace / approx_case_cost;
- irs->records = malloc (sizeof *irs->records * max_cases);
- for (i = 0; i < max_cases; i++)
- {
- struct case_list *c;
- c = malloc (sizeof *c + case_size - sizeof (union value));
- if (c == NULL)
+ max_cases = get_workspace() / approx_case_cost;
+ if (max_cases > max_buffers)
+ max_cases = max_buffers;
+ irs->records = nmalloc (sizeof *irs->records, max_cases);
+ if (irs->records != NULL)
+ for (i = 0; i < max_cases; i++)
+ if (!case_try_create (&irs->records[i].record, irs->xsrt->value_cnt))
{
max_cases = i;
break;
}
- release_case (irs, c);
- }
-
- /* irs->records gets all but one of the allocated cases.
- The extra is used for last_output. */
- irs->record_cap = max_cases - 1;
+ irs->record_cap = max_cases;
/* Fail if we didn't allocate an acceptable number of cases. */
- if (irs->records == NULL || max_cases < MIN_BUFFER_TOTAL_SIZE_RECS)
+ if (irs->records == NULL || max_cases < min_buffers)
{
msg (SE, _("Out of memory. Could not allocate room for minimum of %d "
"cases of %d bytes each. (PSPP workspace is currently "
"restricted to a maximum of %d KB.)"),
- MIN_BUFFER_TOTAL_SIZE_RECS, approx_case_cost, set_max_workspace / 1024);
+ min_buffers, approx_case_cost, get_workspace() / 1024);
return 0;
}
return 1;
/* Compares the VAR_CNT variables in VARS[] between the `value's at
A and B, and returns a strcmp()-type result. */
static int
-compare_record (const union value *a, const union value *b,
- const struct sort_cases_pgm *scp)
+compare_record (const struct ccase *a, const struct ccase *b,
+ const struct sort_criteria *criteria)
{
int i;
assert (a != NULL);
assert (b != NULL);
- for (i = 0; i < scp->var_cnt; i++)
+ for (i = 0; i < criteria->crit_cnt; i++)
{
- struct variable *v = scp->vars[i];
- int fv = v->fv;
+ const struct sort_criterion *c = &criteria->crits[i];
int result;
-
- if (v->type == NUMERIC)
+
+ if (c->width == 0)
{
- double af = a[fv].f;
- double bf = b[fv].f;
+ double af = case_num (a, c->fv);
+ double bf = case_num (b, c->fv);
result = af < bf ? -1 : af > bf;
}
else
- result = memcmp (a[fv].s, b[fv].s, v->width);
+ result = memcmp (case_str (a, c->fv), case_str (b, c->fv), c->width);
- if (result != 0)
- {
- if (scp->dirs[i] == SRT_DESCEND)
- result = -result;
- return result;
- }
+ if (result != 0)
+ return c->dir == SRT_ASCEND ? result : -result;
}
return 0;
}
+/* Compares record-run tuples A and B on run number first, then
+ on record, then on case index. */
static int
compare_record_run (const struct record_run *a,
const struct record_run *b,
- struct sort_cases_pgm *scp)
+ struct initial_run_state *irs)
{
- if (a->run != b->run)
- return a->run > b->run ? 1 : -1;
- else
- return compare_record (a->record->c.data, b->record->c.data, scp);
+ int result = a->run < b->run ? -1 : a->run > b->run;
+ if (result == 0)
+ result = compare_record (&a->record, &b->record, irs->xsrt->criteria);
+ if (result == 0)
+ result = a->idx < b->idx ? -1 : a->idx > b->idx;
+ return result;
}
+/* Compares record-run tuples A and B on run number first, then
+ on the current record according to SCP, but in descending
+ order. */
static int
-compare_record_run_minheap (const void *a, const void *b, void *scp)
+compare_record_run_minheap (const void *a, const void *b, void *irs)
{
- return -compare_record_run (a, b, scp);
+ return -compare_record_run (a, b, irs);
}
/* Begins a new initial run, specifically its output file. */
static void
start_run (struct initial_run_state *irs)
{
- irs->file_idx = irs->xsrt->next_file_idx++;
+ irs->run++;
irs->case_cnt = 0;
- irs->output_file = open_temp_file (irs->xsrt, irs->file_idx, "wb");
- if (irs->output_file == NULL)
- irs->okay = 0;
- if (irs->last_output != NULL)
- {
- release_case (irs, irs->last_output);
- irs->last_output = NULL;
- }
+ irs->casefile = casefile_create (irs->xsrt->value_cnt);
+ casefile_to_disk (irs->casefile);
+ case_nullify (&irs->last_output);
}
/* Ends the current initial run. */
end_run (struct initial_run_state *irs)
{
struct external_sort *xsrt = irs->xsrt;
-
+
/* Record initial run. */
- if (xsrt->run_cnt >= xsrt->run_cap)
+ if (irs->casefile != NULL)
{
- xsrt->run_cap *= 2;
- xsrt->initial_runs
- = xrealloc (xsrt->initial_runs,
- sizeof *xsrt->initial_runs * xsrt->run_cap);
+ casefile_sleep (irs->casefile);
+ if (xsrt->run_cnt >= xsrt->run_cap)
+ {
+ xsrt->run_cap *= 2;
+ xsrt->runs = xnrealloc (xsrt->runs,
+ xsrt->run_cap, sizeof *xsrt->runs);
+ }
+ xsrt->runs[xsrt->run_cnt++] = irs->casefile;
+ irs->casefile = NULL;
}
- xsrt->initial_runs[xsrt->run_cnt].file_idx = irs->file_idx;
- xsrt->initial_runs[xsrt->run_cnt].case_cnt = irs->case_cnt;
- xsrt->run_cnt++;
-
- /* Close file handle. */
- if (irs->output_file != NULL
- && !close_temp_file (irs->xsrt, irs->file_idx, irs->output_file))
- irs->okay = 0;
- irs->output_file = NULL;
}
+/* Writes a record to the current initial run. */
static void
output_record (struct initial_run_state *irs)
{
struct record_run *record_run;
- struct ccase *out_case;
+ struct ccase case_tmp;
/* Extract minimum case from heap. */
assert (irs->record_cnt > 0);
pop_heap (irs->records, irs->record_cnt--, sizeof *irs->records,
- compare_record_run_minheap, irs->xsrt->scp);
+ compare_record_run_minheap, irs);
record_run = irs->records + irs->record_cnt;
/* Bail if an error has occurred. */
if (!irs->okay)
return;
- /* Obtain case data to write to disk. */
- out_case = &record_run->record->c;
- if (compaction_necessary)
- {
- compact_case (compaction_case, out_case);
- out_case = compaction_case;
- }
-
/* Start new run if necessary. */
- assert (record_run->run == irs->file_idx
- || record_run->run == irs->xsrt->next_file_idx);
- if (record_run->run != irs->file_idx)
+ assert (record_run->run == irs->run
+ || record_run->run == irs->run + 1);
+ if (record_run->run != irs->run)
{
end_run (irs);
start_run (irs);
}
- assert (record_run->run == irs->file_idx);
+ assert (record_run->run == irs->run);
irs->case_cnt++;
/* Write to disk. */
- if (irs->output_file != NULL
- && !write_temp_file (irs->xsrt, irs->file_idx, irs->output_file,
- out_case->data,
- sizeof *out_case->data * compaction_nval))
- irs->okay = 0;
+ if (irs->casefile != NULL)
+ casefile_append (irs->casefile, &record_run->record);
/* This record becomes last_output. */
- if (irs->last_output != NULL)
- release_case (irs, irs->last_output);
- irs->last_output = record_run->record;
+ irs->last_output = case_tmp = record_run->record;
+ record_run->record = irs->records[irs->record_cap - 1].record;
+ irs->records[irs->record_cap - 1].record = case_tmp;
}
+\f
+/* Merging. */
-static struct case_list *
-grab_case (struct initial_run_state *irs)
-{
- struct case_list *c;
-
- assert (irs != NULL);
- assert (irs->free_list != NULL);
-
- c = irs->free_list;
- irs->free_list = c->next;
- return c;
-}
+static int choose_merge (struct casefile *runs[], int run_cnt, int order);
+static struct casefile *merge_once (struct external_sort *,
+ struct casefile *[], size_t);
-static void
-release_case (struct initial_run_state *irs, struct case_list *c)
+/* Repeatedly merges run until only one is left,
+ and returns the final casefile. */
+static struct casefile *
+merge (struct external_sort *xsrt)
{
- assert (irs != NULL);
- assert (c != NULL);
-
- c->next = irs->free_list;
- irs->free_list = c;
+ while (xsrt->run_cnt > 1)
+ {
+ int order = min (MAX_MERGE_ORDER, xsrt->run_cnt);
+ int idx = choose_merge (xsrt->runs, xsrt->run_cnt, order);
+ xsrt->runs[idx] = merge_once (xsrt, xsrt->runs + idx, order);
+ remove_range (xsrt->runs, xsrt->run_cnt, sizeof *xsrt->runs,
+ idx + 1, order - 1);
+ xsrt->run_cnt -= order - 1;
+ }
+ assert (xsrt->run_cnt == 1);
+ xsrt->run_cnt = 0;
+ return xsrt->runs[0];
}
-\f
-/* Merging. */
-struct merge_state
- {
- struct external_sort *xsrt; /* External sort state. */
- struct ccase **cases; /* Buffers. */
- size_t case_cnt; /* Number of buffers. */
- };
-
-struct run;
-static int merge_once (struct merge_state *,
- const struct initial_run[], size_t,
- struct initial_run *);
-static int fill_run_buffer (struct merge_state *, struct run *);
-static int mod (int, int);
+/* Chooses ORDER runs out of the RUN_CNT runs in RUNS to merge,
+ and returns the index of the first one.
-/* Performs a series of P-way merges of initial runs
- method. */
+ For stability, we must merge only consecutive runs. For
+ efficiency, we choose the shortest consecutive sequence of
+ runs. */
static int
-merge (struct external_sort *xsrt)
+choose_merge (struct casefile *runs[], int run_cnt, int order)
{
- struct merge_state mrg; /* State of merge. */
- size_t case_size; /* Size of one case, in bytes. */
- size_t approx_case_cost; /* Approximate memory cost of one case. */
- int max_order; /* Maximum order of merge. */
- size_t dummy_run_cnt; /* Number of dummy runs to insert. */
- int success = 0;
+ int min_idx, min_sum;
+ int cur_idx, cur_sum;
int i;
- mrg.xsrt = xsrt;
+ /* Sum up the length of the first ORDER runs. */
+ cur_sum = 0;
+ for (i = 0; i < order; i++)
+ cur_sum += casefile_get_case_cnt (runs[i]);
- /* Allocate as many cases as possible into cases. */
- case_size = dict_get_case_size (default_dict);
- approx_case_cost = sizeof *mrg.cases + case_size + 4 * sizeof (void *);
- mrg.case_cnt = set_max_workspace / approx_case_cost;
- mrg.cases = malloc (sizeof *mrg.cases * mrg.case_cnt);
- if (mrg.cases == NULL)
- goto done;
- for (i = 0; i < mrg.case_cnt; i++)
+ /* Find the shortest group of ORDER runs,
+ using a running total for efficiency. */
+ min_idx = 0;
+ min_sum = cur_sum;
+ for (cur_idx = 1; cur_idx + order <= run_cnt; cur_idx++)
{
- mrg.cases[i] = malloc (case_size);
- if (mrg.cases[i] == NULL)
+ cur_sum -= casefile_get_case_cnt (runs[cur_idx - 1]);
+ cur_sum += casefile_get_case_cnt (runs[cur_idx + order - 1]);
+ if (cur_sum < min_sum)
{
- mrg.case_cnt = i;
- break;
+ min_sum = cur_sum;
+ min_idx = cur_idx;
}
}
- if (mrg.case_cnt < MIN_BUFFER_TOTAL_SIZE_RECS)
- {
- msg (SE, _("Out of memory. Could not allocate room for minimum of %d "
- "cases of %d bytes each. (PSPP workspace is currently "
- "restricted to a maximum of %d KB.)"),
- MIN_BUFFER_TOTAL_SIZE_RECS, approx_case_cost, set_max_workspace / 1024);
- return 0;
- }
-
- /* Determine maximum order of merge. */
- max_order = MAX_MERGE_ORDER;
- if (mrg.case_cnt / max_order < MIN_BUFFER_SIZE_RECS)
- max_order = mrg.case_cnt / MIN_BUFFER_SIZE_RECS;
- else if (mrg.case_cnt / max_order * case_size < MIN_BUFFER_SIZE_BYTES)
- max_order = mrg.case_cnt / (MIN_BUFFER_SIZE_BYTES / case_size);
- if (max_order < 2)
- max_order = 2;
- if (max_order > xsrt->run_cnt)
- max_order = xsrt->run_cnt;
-
- /* Repeatedly merge the P shortest existing runs until only one run
- is left. */
- make_heap (xsrt->initial_runs, xsrt->run_cnt, sizeof *xsrt->initial_runs,
- compare_initial_runs, NULL);
- dummy_run_cnt = mod (1 - (int) xsrt->run_cnt, max_order - 1);
- assert (max_order == 1
- || (xsrt->run_cnt + dummy_run_cnt) % (max_order - 1) == 1);
- while (xsrt->run_cnt > 1)
- {
- struct initial_run output_run;
- int order;
- int i;
- /* Choose order of merge (max_order after first merge). */
- order = max_order - dummy_run_cnt;
- dummy_run_cnt = 0;
-
- /* Choose runs to merge. */
- assert (xsrt->run_cnt >= order);
- for (i = 0; i < order; i++)
- pop_heap (xsrt->initial_runs, xsrt->run_cnt--,
- sizeof *xsrt->initial_runs,
- compare_initial_runs, NULL);
-
- /* Merge runs. */
- if (!merge_once (&mrg, xsrt->initial_runs + xsrt->run_cnt, order,
- &output_run))
- goto done;
-
- /* Add output run to heap. */
- xsrt->initial_runs[xsrt->run_cnt++] = output_run;
- push_heap (xsrt->initial_runs, xsrt->run_cnt, sizeof *xsrt->initial_runs,
- compare_initial_runs, NULL);
- }
-
- /* Exactly one run is left, which contains the entire sorted
- file. We could use it to find a total case count. */
- assert (xsrt->run_cnt == 1);
-
- success = 1;
-
- done:
- for (i = 0; i < mrg.case_cnt; i++)
- free (mrg.cases[i]);
- free (mrg.cases);
-
- return success;
+ return min_idx;
}
-/* Modulo function as defined by Knuth. */
-static int
-mod (int x, int y)
+/* Merges the RUN_CNT initial runs specified in INPUT_FILES into a
+ new run, and returns the new run. */
+static struct casefile *
+merge_once (struct external_sort *xsrt,
+ struct casefile **const input_files,
+ size_t run_cnt)
{
- if (y == 0)
- return x;
- else if (x == 0)
- return 0;
- else if (x > 0 && y > 0)
- return x % y;
- else if (x < 0 && y > 0)
- return y - (-x) % y;
-
- assert (0);
-}
-
-/* A run of data for use in merging. */
-struct run
- {
- FILE *file; /* File that contains run. */
- int file_idx; /* Index of file that contains run. */
- struct ccase **buffer; /* Case buffer. */
- struct ccase **buffer_head; /* First unconsumed case in buffer. */
- struct ccase **buffer_tail; /* One past last unconsumed case in buffer. */
- size_t buffer_cap; /* Number of cases buffer can hold. */
- size_t unread_case_cnt; /* Number of cases not yet read. */
- };
-
-/* Merges the RUN_CNT initial runs specified in INPUT_RUNS into a
- new run. Returns nonzero only if successful. Adds an entry
- to MRG->xsrt->runs for the output file if and only if the
- output file is actually created. Always deletes all the input
- files. */
-static int
-merge_once (struct merge_state *mrg,
- const struct initial_run input_runs[],
- size_t run_cnt,
- struct initial_run *output_run)
-{
- struct run runs[MAX_MERGE_ORDER];
- FILE *output_file = NULL;
- size_t case_size;
- int success = 0;
- int i;
-
- /* Initialize runs[]. */
- for (i = 0; i < run_cnt; i++)
+ struct run
{
- runs[i].file = NULL;
- runs[i].file_idx = input_runs[i].file_idx;
- runs[i].buffer = mrg->cases + mrg->case_cnt / run_cnt * i;
- runs[i].buffer_head = runs[i].buffer;
- runs[i].buffer_tail = runs[i].buffer;
- runs[i].buffer_cap = mrg->case_cnt / run_cnt;
- runs[i].unread_case_cnt = input_runs[i].case_cnt;
+ struct casefile *file;
+ struct casereader *reader;
+ struct ccase ccase;
}
+ *runs;
+
+ struct casefile *output = NULL;
+ int i;
/* Open input files. */
+ runs = xnmalloc (run_cnt, sizeof *runs);
for (i = 0; i < run_cnt; i++)
{
- runs[i].file = open_temp_file (mrg->xsrt, runs[i].file_idx, "rb");
- if (runs[i].file == NULL)
- goto error;
+ struct run *r = &runs[i];
+ r->file = input_files[i];
+ r->reader = casefile_get_destructive_reader (r->file);
+ if (!casereader_read_xfer (r->reader, &r->ccase))
+ {
+ run_cnt--;
+ i--;
+ }
}
-
- /* Create output file and count cases to be output. */
- output_run->file_idx = mrg->xsrt->next_file_idx++;
- output_run->case_cnt = 0;
- for (i = 0; i < run_cnt; i++)
- output_run->case_cnt += input_runs[i].case_cnt;
- output_file = open_temp_file (mrg->xsrt, output_run->file_idx, "wb");
- if (output_file == NULL)
- goto error;
-
- /* Prime buffers. */
- for (i = 0; i < run_cnt; i++)
- if (!fill_run_buffer (mrg, runs + i))
- goto error;
+
+ /* Create output file. */
+ output = casefile_create (xsrt->value_cnt);
+ casefile_to_disk (output);
/* Merge. */
- case_size = dict_get_case_size (default_dict);
while (run_cnt > 0)
{
- struct run *min_run;
-
+ struct run *min_run, *run;
+
/* Find minimum. */
min_run = runs;
- for (i = 1; i < run_cnt; i++)
- if (compare_record ((*runs[i].buffer_head)->data,
- (*min_run->buffer_head)->data,
- mrg->xsrt->scp) < 0)
- min_run = runs + i;
+ for (run = runs + 1; run < runs + run_cnt; run++)
+ if (compare_record (&run->ccase, &min_run->ccase, xsrt->criteria) < 0)
+ min_run = run;
/* Write minimum to output file. */
- if (!write_temp_file (mrg->xsrt, min_run->file_idx, output_file,
- (*min_run->buffer_head)->data, case_size))
- goto error;
+ casefile_append_xfer (output, &min_run->ccase);
- /* Remove case from buffer. */
- if (++min_run->buffer_head >= min_run->buffer_tail)
+ /* Read another case from minimum run. */
+ if (!casereader_read_xfer (min_run->reader, &min_run->ccase))
{
- /* Buffer is empty. Fill from file. */
- if (!fill_run_buffer (mrg, min_run))
- goto error;
-
- /* If buffer is still empty, delete its run. */
- if (min_run->buffer_head >= min_run->buffer_tail)
- {
- close_temp_file (mrg->xsrt, min_run->file_idx, min_run->file);
- remove_temp_file (mrg->xsrt, min_run->file_idx);
- *min_run = runs[--run_cnt];
+ casereader_destroy (min_run->reader);
+ casefile_destroy (min_run->file);
- /* We could donate the now-unused buffer space to
- other runs. */
- }
+ remove_element (runs, run_cnt, sizeof *runs, min_run - runs);
+ run_cnt--;
}
}
- /* Close output file. */
- close_temp_file (mrg->xsrt, output_run->file_idx, output_file);
-
- return 1;
-
- error:
- /* Close and remove output file. */
- if (output_file != NULL)
- {
- close_temp_file (mrg->xsrt, output_run->file_idx, output_file);
- remove_temp_file (mrg->xsrt, output_run->file_idx);
- }
-
- /* Close and remove any remaining input runs. */
- for (i = 0; i < run_cnt; i++)
- {
- close_temp_file (mrg->xsrt, runs[i].file_idx, runs[i].file);
- remove_temp_file (mrg->xsrt, runs[i].file_idx);
- }
-
- return success;
-}
-
-/* Reads as many cases as possible into RUN's buffer.
- Reads nonzero unless a disk error occurs. */
-static int
-fill_run_buffer (struct merge_state *mrg, struct run *run)
-{
- run->buffer_head = run->buffer_tail = run->buffer;
- while (run->unread_case_cnt > 0
- && run->buffer_tail < run->buffer + run->buffer_cap)
- {
- if (!read_temp_file (mrg->xsrt, run->file_idx, run->file,
- (*run->buffer_tail)->data,
- dict_get_case_size (default_dict)))
- return 0;
-
- run->unread_case_cnt--;
- run->buffer_tail++;
- }
-
- return 1;
-}
-\f
-static void
-sort_sink_destroy (struct case_sink *sink UNUSED)
-{
- assert (0);
-}
-
-static struct case_source *
-sort_sink_make_source (struct case_sink *sink)
-{
- struct initial_run_state *irs = sink->aux;
+ casefile_sleep (output);
+ free (runs);
- return create_case_source (&sort_source_class, irs->xsrt->scp);
+ return output;
}
-
-const struct case_sink_class sort_sink_class =
- {
- "SORT CASES",
- NULL,
- sort_sink_write,
- sort_sink_destroy,
- sort_sink_make_source,
- };
-\f
-/* Reads all the records from the source stream and passes them
- to write_case(). */
-static void
-sort_source_read (struct case_source *source,
- write_case_func *write_case, write_case_data wc_data)
-{
- struct sort_cases_pgm *scp = source->aux;
-
- read_sort_output (scp, write_case, wc_data);
-}
-
-void read_internal_sort_output (struct internal_sort *isrt,
- write_case_func *write_case,
- write_case_data wc_data);
-void read_external_sort_output (struct external_sort *xsrt,
- write_case_func *write_case,
- write_case_data wc_data);
-
-/* Reads all the records from the output stream and passes them to the
- function provided, which must have an interface identical to
- write_case(). */
-void
-read_sort_output (struct sort_cases_pgm *scp,
- write_case_func *write_case, write_case_data wc_data)
-{
- assert ((scp->isrt != NULL) + (scp->xsrt != NULL) <= 1);
- if (scp->isrt != NULL)
- read_internal_sort_output (scp->isrt, write_case, wc_data);
- else if (scp->xsrt != NULL)
- read_external_sort_output (scp->xsrt, write_case, wc_data);
- else
- {
- /* No results. Probably an external sort that failed. */
- }
-}
-
-void
-read_internal_sort_output (struct internal_sort *isrt,
- write_case_func *write_case,
- write_case_data wc_data)
-{
- struct ccase *save_temp_case = temp_case;
- struct case_list **p;
-
- for (p = isrt->results; *p; p++)
- {
- temp_case = &(*p)->c;
- write_case (wc_data);
- }
- free (isrt->results);
-
- temp_case = save_temp_case;
-}
-
-void
-read_external_sort_output (struct external_sort *xsrt,
- write_case_func *write_case,
- write_case_data wc_data)
-{
- FILE *file;
- int file_idx;
- int i;
-
- assert (xsrt->run_cnt == 1);
- file_idx = xsrt->initial_runs[0].file_idx;
-
- file = open_temp_file (xsrt, file_idx, "rb");
- if (file == NULL)
- {
- err_failure ();
- return;
- }
-
- for (i = 0; i < vfm_source_info.ncases; i++)
- {
- if (!read_temp_file (xsrt, file_idx, file,
- temp_case, vfm_source_info.case_size))
- {
- err_failure ();
- break;
- }
-
- if (!write_case (wc_data))
- break;
- }
-}
-
-static void
-sort_source_destroy (struct case_source *source)
-{
- struct sort_cases_pgm *scp = source->aux;
-
- destroy_sort_cases_pgm (scp);
-}
-
-const struct case_source_class sort_source_class =
- {
- "SORT CASES",
- sort_source_read,
- sort_source_destroy,
- };