Adopt use of gnulib for portability.
[pspp-builds.git] / src / sort.c
index a77c5a8298f22d387e628be51850ba00627acf19..80380e7d0ca41581396e94079e06d682b4d61b15 100644 (file)
 
    You should have received a copy of the GNU General Public License
    along with this program; if not, write to the Free Software
-   Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
-   02111-1307, USA. */
+   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+   02110-1301, USA. */
 
 #include <config.h>
 #include "sort.h"
 #include "error.h"
+#include <limits.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <errno.h>
 #include "algorithm.h"
 #include "alloc.h"
+#include <stdbool.h>
 #include "case.h"
 #include "casefile.h"
 #include "command.h"
 #include "error.h"
 #include "expressions/public.h"
+#include "glob.h"
 #include "lexer.h"
 #include "misc.h"
 #include "settings.h"
+#include "sort-prs.h"
 #include "str.h"
 #include "var.h"
 #include "vfm.h"
 #include "vfmP.h"
 
-#if HAVE_UNISTD_H
-#include <unistd.h>
-#endif
-
-#if HAVE_SYS_TYPES_H
-#include <sys/types.h>
-#endif
-
-#if HAVE_SYS_STAT_H
-#include <sys/stat.h>
-#endif
+#include "gettext.h"
+#define _(msgid) gettext (msgid)
 
 #include "debug-print.h"
 
-/* Sort direction. */
-enum sort_direction
-  {
-    SRT_ASCEND,                        /* A, B, C, ..., X, Y, Z. */
-    SRT_DESCEND                        /* Z, Y, X, ..., C, B, A. */
-  };
-
-/* A sort criterion. */
-struct sort_criterion
-  {
-    int fv;                     /* Variable data index. */
-    int width;                  /* 0=numeric, otherwise string widthe. */
-    enum sort_direction dir;    /* Sort direction. */
-  };
+/* These should only be changed for testing purposes. */
+static int min_buffers = 64;
+static int max_buffers = INT_MAX;
+static bool allow_internal_sort = true;
 
-/* A set of sort criteria. */
-struct sort_criteria 
-  {
-    struct sort_criterion *crits;
-    size_t crit_cnt;
-  };
-
-static int compare_case_dblptrs (const void *, const void *, void *);
 static int compare_record (const struct ccase *, const struct ccase *,
                            const struct sort_criteria *);
 static struct casefile *do_internal_sort (struct casereader *,
@@ -87,15 +64,38 @@ int
 cmd_sort_cases (void)
 {
   struct sort_criteria *criteria;
-  int success;
+  bool success = false;
 
   lex_match (T_BY);
 
-  criteria = sort_parse_criteria (default_dict, NULL, NULL);
+  criteria = sort_parse_criteria (default_dict, NULL, NULL, NULL, NULL);
   if (criteria == NULL)
     return CMD_FAILURE;
 
+  if (test_mode && lex_match ('/')) 
+    {
+      if (!lex_force_match_id ("BUFFERS") || !lex_match ('=')
+          || !lex_force_int ())
+        goto done;
+
+      min_buffers = max_buffers = lex_integer ();
+      allow_internal_sort = false;
+      if (max_buffers < 2) 
+        {
+          msg (SE, _("Buffer limit must be at least 2."));
+          goto done;
+        }
+
+      lex_get ();
+    }
+
   success = sort_active_file_in_place (criteria);
+
+ done:
+  min_buffers = 64;
+  max_buffers = INT_MAX;
+  allow_internal_sort = true;
+  
   sort_destroy_criteria (criteria);
   return success ? lex_end_of_command () : CMD_FAILURE;
 }
@@ -133,7 +133,7 @@ sort_active_file_in_place (const struct sort_criteria *criteria)
   if (dst == NULL) 
     return 0;
 
-  vfm_source = storage_source_create (dst, default_dict);
+  vfm_source = storage_source_create (dst);
   return 1;
 }
 
@@ -151,93 +151,6 @@ sort_active_file_to_casefile (const struct sort_criteria *criteria)
   return sort_execute (casefile_get_reader (src), criteria);
 }
 
-/* Parses a list of sort keys and returns a struct sort_cases_pgm
-   based on it.  Returns a null pointer on error. */
-struct sort_criteria *
-sort_parse_criteria (const struct dictionary *dict,
-                     struct variable ***vars, int *var_cnt)
-{
-  struct sort_criteria *criteria;
-  struct variable **local_vars = NULL;
-  size_t local_var_cnt;
-
-  assert ((vars == NULL) == (var_cnt == NULL));
-  if (vars == NULL) 
-    {
-      vars = &local_vars;
-      var_cnt = &local_var_cnt;
-    }
-
-  criteria = xmalloc (sizeof *criteria);
-  criteria->crits = NULL;
-  criteria->crit_cnt = 0;
-
-  *vars = NULL;
-  *var_cnt = 0;
-
-  do
-    {
-      int prev_var_cnt = *var_cnt;
-      enum sort_direction direction;
-
-      /* Variables. */
-      if (!parse_variables (dict, vars, var_cnt,
-                           PV_NO_DUPLICATE | PV_APPEND | PV_NO_SCRATCH))
-        goto error;
-
-      /* Sort direction. */
-      if (lex_match ('('))
-       {
-         if (lex_match_id ("D") || lex_match_id ("DOWN"))
-           direction = SRT_DESCEND;
-         else if (lex_match_id ("A") || lex_match_id ("UP"))
-            direction = SRT_ASCEND;
-          else
-           {
-             msg (SE, _("`A' or `D' expected inside parentheses."));
-              goto error;
-           }
-         if (!lex_match (')'))
-           {
-             msg (SE, _("`)' expected."));
-              goto error;
-           }
-       }
-      else
-        direction = SRT_ASCEND;
-
-      criteria->crits = xrealloc (criteria->crits,
-                                  sizeof *criteria->crits * *var_cnt);
-      criteria->crit_cnt = *var_cnt;
-      for (; prev_var_cnt < criteria->crit_cnt; prev_var_cnt++) 
-        {
-          struct sort_criterion *c = &criteria->crits[prev_var_cnt];
-          c->fv = (*vars)[prev_var_cnt]->fv;
-          c->width = (*vars)[prev_var_cnt]->width;
-          c->dir = direction;
-        }
-    }
-  while (token != '.' && token != '/');
-
-  free (local_vars);
-  return criteria;
-
- error:
-  free (local_vars);
-  sort_destroy_criteria (criteria);
-  return NULL;
-}
-
-/* Destroys a SORT CASES program. */
-void
-sort_destroy_criteria (struct sort_criteria *criteria) 
-{
-  if (criteria != NULL) 
-    {
-      free (criteria->crits);
-      free (criteria);
-    }
-}
 
 /* Reads all the cases from READER, which is destroyed.  Sorts
    the cases according to CRITERIA.  Returns the sorted cases in
@@ -245,122 +158,113 @@ sort_destroy_criteria (struct sort_criteria *criteria)
 struct casefile *
 sort_execute (struct casereader *reader, const struct sort_criteria *criteria)
 {
-  struct casefile *output;
-
-  output = do_internal_sort (reader, criteria);
+  struct casefile *output = do_internal_sort (reader, criteria);
   if (output == NULL)
     output = do_external_sort (reader, criteria);
   casereader_destroy (reader);
   return output;
 }
 \f
+/* A case and its index. */
+struct indexed_case 
+  {
+    struct ccase c;     /* Case. */
+    unsigned long idx;  /* Index to allow for stable sorting. */
+  };
+
+static int compare_indexed_cases (const void *, const void *, void *);
+
 /* If the data is in memory, do an internal sort and return a new
-   casefile for the data. */
+   casefile for the data.  Otherwise, return a null pointer. */
 static struct casefile *
 do_internal_sort (struct casereader *reader,
                   const struct sort_criteria *criteria)
 {
   const struct casefile *src;
   struct casefile *dst;
-  struct ccase *cases, **case_ptrs;
   unsigned long case_cnt;
 
+  if (!allow_internal_sort)
+    return NULL;
+
   src = casereader_get_casefile (reader);
   if (casefile_get_case_cnt (src) > 1 && !casefile_in_core (src))
     return NULL;
       
   case_cnt = casefile_get_case_cnt (src);
-  cases = malloc (sizeof *cases * case_cnt);
-  case_ptrs = malloc (sizeof *case_ptrs * case_cnt);
-  if ((cases != NULL && case_ptrs != NULL) || case_cnt == 0) 
+  dst = casefile_create (casefile_get_value_cnt (src));
+  if (case_cnt != 0) 
     {
-      unsigned long case_idx;
+      struct indexed_case *cases = malloc (sizeof *cases * case_cnt);
+      if (cases != NULL) 
+        {
+          unsigned long i;
+          
+          for (i = 0; i < case_cnt; i++)
+            {
+              casereader_read_xfer_assert (reader, &cases[i].c);
+              cases[i].idx = i;
+            }
+
+          sort (cases, case_cnt, sizeof *cases, compare_indexed_cases,
+                (void *) criteria);
       
-      for (case_idx = 0; case_idx < case_cnt; case_idx++) 
+          for (i = 0; i < case_cnt; i++)
+            casefile_append_xfer (dst, &cases[i].c);
+
+          free (cases);
+        }
+      else 
         {
-          int success = casereader_read_xfer (reader, &cases[case_idx]);
-          assert (success);
-          case_ptrs[case_idx] = &cases[case_idx];
+          /* Failure. */
+          casefile_destroy (dst);
+          dst = NULL;
         }
-
-      sort (case_ptrs, case_cnt, sizeof *case_ptrs, compare_case_dblptrs,
-            (void *) criteria);
-      
-      dst = casefile_create (casefile_get_value_cnt (src));
-      for (case_idx = 0; case_idx < case_cnt; case_idx++) 
-        casefile_append_xfer (dst, case_ptrs[case_idx]);
     }
-  else
-    dst = NULL;
-  
-  free (case_ptrs);
-  free (cases);
 
   return dst;
 }
 
 /* Compares the variables specified by CRITERIA between the cases
-   at A and B, and returns a strcmp()-type result. */
+   at A and B, with a "last resort" comparison for stability, and
+   returns a strcmp()-type result. */
 static int
-compare_case_dblptrs (const void *a_, const void *b_, void *criteria_)
+compare_indexed_cases (const void *a_, const void *b_, void *criteria_)
 {
   struct sort_criteria *criteria = criteria_;
-  struct ccase *const *pa = a_;
-  struct ccase *const *pb = b_;
-  struct ccase *a = *pa;
-  struct ccase *b = *pb;
-  return compare_record (a, b, criteria);
+  const struct indexed_case *a = a_;
+  const struct indexed_case *b = b_;
+  int result = compare_record (&a->c, &b->c, criteria);
+  if (result == 0)
+    result = a->idx < b->idx ? -1 : a->idx > b->idx;
+  return result;
 }
 \f
 /* External sort. */
 
-/* Maximum order of merge.  If you increase this, then you should
-   use a heap for comparing cases during merge.  */
-#define MAX_MERGE_ORDER                7
-
-/* Minimum total number of records for buffers. */
-#define MIN_BUFFER_TOTAL_SIZE_RECS     64
-
-/* Minimum single input buffer size, in bytes and records. */
-#define MIN_BUFFER_SIZE_BYTES  4096
-#define MIN_BUFFER_SIZE_RECS   16
-
-#if MIN_BUFFER_SIZE_RECS * 2 + 16 > MIN_BUFFER_TOTAL_SIZE_RECS
-#error MIN_BUFFER_SIZE_RECS and MIN_BUFFER_TOTAL_SIZE_RECS do not make sense.
-#endif
-
-/* Sorts initial runs A and B in decending order by length. */
-static int
-compare_initial_runs (const void *a_, const void *b_, void *aux UNUSED) 
-{
-  struct casefile *const *a = a_;
-  struct casefile *const *b = b_;
-  unsigned long a_case_cnt = casefile_get_case_cnt (*a);
-  unsigned long b_case_cnt = casefile_get_case_cnt (*b);
-  
-  return a_case_cnt > b_case_cnt ? -1 : a_case_cnt < b_case_cnt;
-}
+/* Maximum order of merge (external sort only).  The maximum
+   reasonable value is about 7.  Above that, it would be a good
+   idea to use a heap in merge_once() to select the minimum. */
+#define MAX_MERGE_ORDER 7
 
 /* Results of an external sort. */
 struct external_sort 
   {
     const struct sort_criteria *criteria; /* Sort criteria. */
     size_t value_cnt;                 /* Size of data in `union value's. */
-    struct casefile **initial_runs;   /* Array of initial runs. */
+    struct casefile **runs;           /* Array of initial runs. */
     size_t run_cnt, run_cap;          /* Number of runs, allocated capacity. */
   };
 
 /* Prototypes for helper functions. */
-static int write_initial_runs (struct external_sort *, struct casereader *);
-static int merge (struct external_sort *);
+static int write_runs (struct external_sort *, struct casereader *);
+static struct casefile *merge (struct external_sort *);
 static void destroy_external_sort (struct external_sort *);
 
-/* Performs an external sort of the active file according to the
-   specification in SCP.  Forms initial runs using a heap as a
-   reservoir.  Determines the optimum merge pattern via Huffman's
-   method (see Knuth vol. 3, 2nd edition, p. 365-366), and merges
-   according to that pattern. */
+/* Performs a stable external sort of the active file according
+   to the specification in SCP.  Forms initial runs using a heap
+   as a reservoir.  Merges the initial runs according to a
+   pattern that assures stability. */
 static struct casefile *
 do_external_sort (struct casereader *reader,
                   const struct sort_criteria *criteria)
@@ -374,11 +278,10 @@ do_external_sort (struct casereader *reader,
   xsrt->value_cnt = casefile_get_value_cnt (casereader_get_casefile (reader));
   xsrt->run_cap = 512;
   xsrt->run_cnt = 0;
-  xsrt->initial_runs = xmalloc (sizeof *xsrt->initial_runs * xsrt->run_cap);
-  if (write_initial_runs (xsrt, reader) && merge (xsrt))
+  xsrt->runs = xmalloc (sizeof *xsrt->runs * xsrt->run_cap);
+  if (write_runs (xsrt, reader))
     {
-      struct casefile *output = xsrt->initial_runs[0];
-      xsrt->initial_runs[0] = NULL;
+      struct casefile *output = merge (xsrt);
       destroy_external_sort (xsrt);
       return output;
     }
@@ -398,8 +301,8 @@ destroy_external_sort (struct external_sort *xsrt)
       int i;
       
       for (i = 0; i < xsrt->run_cnt; i++)
-        casefile_destroy (xsrt->initial_runs[i]);
-      free (xsrt->initial_runs);
+        casefile_destroy (xsrt->runs[i]);
+      free (xsrt->runs);
       free (xsrt);
     }
 }
@@ -411,6 +314,7 @@ struct record_run
   {
     int run;                    /* Run number of case. */
     struct ccase record;        /* Case data. */
+    size_t idx;                 /* Case number (for stability). */
   };
 
 /* Represents a set of initial runs during an external sort. */
@@ -435,7 +339,8 @@ struct initial_run_state
 static const struct case_sink_class sort_sink_class;
 
 static void destroy_initial_run_state (struct initial_run_state *);
-static void process_case (struct initial_run_state *, const struct ccase *);
+static void process_case (struct initial_run_state *, const struct ccase *,
+                          size_t);
 static int allocate_cases (struct initial_run_state *);
 static void output_record (struct initial_run_state *);
 static void start_run (struct initial_run_state *);
@@ -447,10 +352,11 @@ static int compare_record_run_minheap (const void *, const void *, void *);
 
 /* Reads cases from READER and composes initial runs in XSRT. */
 static int
-write_initial_runs (struct external_sort *xsrt, struct casereader *reader)
+write_runs (struct external_sort *xsrt, struct casereader *reader)
 {
   struct initial_run_state *irs;
   struct ccase c;
+  size_t idx = 0;
   int success = 0;
 
   /* Allocate memory for cases. */
@@ -469,7 +375,7 @@ write_initial_runs (struct external_sort *xsrt, struct casereader *reader)
   /* Create initial runs. */
   start_run (irs);
   for (; irs->okay && casereader_read (reader, &c); case_destroy (&c))
-    process_case (irs, &c);
+    process_case (irs, &c, idx++);
   while (irs->okay && irs->record_cnt > 0)
     output_record (irs);
   end_run (irs);
@@ -484,18 +390,19 @@ write_initial_runs (struct external_sort *xsrt, struct casereader *reader)
 
 /* Add a single case to an initial run. */
 static void
-process_case (struct initial_run_state *irs, const struct ccase *c)
+process_case (struct initial_run_state *irs, const struct ccase *c, size_t idx)
 {
-  struct record_run *new_record_run;
+  struct record_run *rr;
 
   /* Compose record_run for this run and add to heap. */
   assert (irs->record_cnt < irs->record_cap - 1);
-  new_record_run = irs->records + irs->record_cnt++;
-  case_copy (&new_record_run->record, 0, c, 0, irs->xsrt->value_cnt);
-  new_record_run->run = irs->run;
+  rr = irs->records + irs->record_cnt++;
+  case_copy (&rr->record, 0, c, 0, irs->xsrt->value_cnt);
+  rr->run = irs->run;
+  rr->idx = idx;
   if (!case_is_null (&irs->last_output)
       && compare_record (c, &irs->last_output, irs->xsrt->criteria) < 0)
-    new_record_run->run = irs->run + 1;
+    rr->run = irs->run + 1;
   push_heap (irs->records, irs->record_cnt, sizeof *irs->records,
              compare_record_run_minheap, irs);
 
@@ -537,6 +444,8 @@ allocate_cases (struct initial_run_state *irs)
                       + irs->xsrt->value_cnt * sizeof (union value)
                       + 4 * sizeof (void *));
   max_cases = get_max_workspace() / approx_case_cost;
+  if (max_cases > max_buffers)
+    max_cases = max_buffers;
   irs->records = malloc (sizeof *irs->records * max_cases);
   for (i = 0; i < max_cases; i++)
     if (!case_try_create (&irs->records[i].record, irs->xsrt->value_cnt))
@@ -547,12 +456,12 @@ allocate_cases (struct initial_run_state *irs)
   irs->record_cap = max_cases;
 
   /* Fail if we didn't allocate an acceptable number of cases. */
-  if (irs->records == NULL || max_cases < MIN_BUFFER_TOTAL_SIZE_RECS)
+  if (irs->records == NULL || max_cases < min_buffers)
     {
       msg (SE, _("Out of memory.  Could not allocate room for minimum of %d "
                 "cases of %d bytes each.  (PSPP workspace is currently "
                 "restricted to a maximum of %d KB.)"),
-          MIN_BUFFER_TOTAL_SIZE_RECS, approx_case_cost, get_max_workspace() / 1024);
+          min_buffers, approx_case_cost, get_max_workspace() / 1024);
       return 0;
     }
   return 1;
@@ -592,16 +501,18 @@ compare_record (const struct ccase *a, const struct ccase *b,
 }
 
 /* Compares record-run tuples A and B on run number first, then
-   on the current record according to SCP. */
+   on record, then on case index. */
 static int
 compare_record_run (const struct record_run *a,
                     const struct record_run *b,
                     struct initial_run_state *irs)
 {
-  if (a->run != b->run)
-    return a->run > b->run ? 1 : -1;
-  else
-    return compare_record (&a->record, &b->record, irs->xsrt->criteria);
+  int result = a->run < b->run ? -1 : a->run > b->run;
+  if (result == 0)
+    result = compare_record (&a->record, &b->record, irs->xsrt->criteria);
+  if (result == 0)
+    result = a->idx < b->idx ? -1 : a->idx > b->idx;
+  return result;
 }
 
 /* Compares record-run tuples A and B on run number first, then
@@ -633,14 +544,14 @@ end_run (struct initial_run_state *irs)
   /* Record initial run. */
   if (irs->casefile != NULL) 
     {
+      casefile_sleep (irs->casefile);
       if (xsrt->run_cnt >= xsrt->run_cap) 
         {
           xsrt->run_cap *= 2;
-          xsrt->initial_runs
-            = xrealloc (xsrt->initial_runs,
-                        sizeof *xsrt->initial_runs * xsrt->run_cap);
+          xsrt->runs = xrealloc (xsrt->runs,
+                                 sizeof *xsrt->runs * xsrt->run_cap);
         }
-      xsrt->initial_runs[xsrt->run_cnt++] = irs->casefile;
+      xsrt->runs[xsrt->run_cnt++] = irs->casefile;
       irs->casefile = NULL; 
     }
 }
@@ -685,193 +596,128 @@ output_record (struct initial_run_state *irs)
 \f
 /* Merging. */
 
-/* State of merging initial runs. */
-struct merge_state 
-  {
-    struct external_sort *xsrt; /* External sort state. */
-    struct ccase *cases;        /* Buffers. */
-    size_t case_cnt;            /* Number of buffers. */
-  };
-
-struct run;
-static struct casefile *merge_once (struct merge_state *,
+static int choose_merge (struct casefile *runs[], int run_cnt, int order);
+static struct casefile *merge_once (struct external_sort *,
                                     struct casefile *[], size_t);
-static int mod (int, int);
 
-/* Performs a series of P-way merges of initial runs. */
-static int
+/* Repeatedly merges run until only one is left,
+   and returns the final casefile.  */
+static struct casefile *
 merge (struct external_sort *xsrt)
 {
-  struct merge_state mrg;       /* State of merge. */
-  size_t approx_case_cost;      /* Approximate memory cost of one case. */
-  int max_order;                /* Maximum order of merge. */
-  size_t dummy_run_cnt;         /* Number of dummy runs to insert. */
-  int success = 0;
-  int i;
-
-  mrg.xsrt = xsrt;
-
-  /* Allocate as many cases as possible into cases. */
-  approx_case_cost = (sizeof *mrg.cases
-                      + xsrt->value_cnt * sizeof (union value)
-                      + 4 * sizeof (void *));
-  mrg.case_cnt = get_max_workspace() / approx_case_cost;
-  mrg.cases = malloc (sizeof *mrg.cases * mrg.case_cnt);
-  if (mrg.cases == NULL)
-    goto done;
-  for (i = 0; i < mrg.case_cnt; i++) 
-    if (!case_try_create (&mrg.cases[i], xsrt->value_cnt)) 
-      {
-        mrg.case_cnt = i;
-        break;
-      }
-  if (mrg.case_cnt < MIN_BUFFER_TOTAL_SIZE_RECS)
-    {
-      msg (SE, _("Out of memory.  Could not allocate room for minimum of %d "
-                "cases of %d bytes each.  (PSPP workspace is currently "
-                "restricted to a maximum of %d KB.)"),
-          MIN_BUFFER_TOTAL_SIZE_RECS, approx_case_cost, get_max_workspace() / 1024);
-      return 0;
-    }
-
-  /* Determine maximum order of merge. */
-  max_order = MAX_MERGE_ORDER;
-  if (mrg.case_cnt / max_order < MIN_BUFFER_SIZE_RECS)
-    max_order = mrg.case_cnt / MIN_BUFFER_SIZE_RECS;
-  else if (mrg.case_cnt / max_order * xsrt->value_cnt * sizeof (union value)
-           < MIN_BUFFER_SIZE_BYTES)
-    max_order = mrg.case_cnt / (MIN_BUFFER_SIZE_BYTES
-                                / (xsrt->value_cnt * sizeof (union value)));
-  if (max_order < 2)
-    max_order = 2;
-  if (max_order > xsrt->run_cnt)
-    max_order = xsrt->run_cnt;
-
-  /* Repeatedly merge the P shortest existing runs until only one run
-     is left. */
-  make_heap (xsrt->initial_runs, xsrt->run_cnt, sizeof *xsrt->initial_runs,
-             compare_initial_runs, NULL);
-  dummy_run_cnt = mod (1 - (int) xsrt->run_cnt, max_order - 1);
-
-  assert (max_order > 0);
-  assert (max_order <= 2
-          || (xsrt->run_cnt + dummy_run_cnt) % (max_order - 1) == 1);
   while (xsrt->run_cnt > 1)
     {
-      struct casefile *output_run;
-      int order;
-      int i;
-
-      /* Choose order of merge (max_order after first merge). */
-      order = max_order - dummy_run_cnt;
-      dummy_run_cnt = 0;
-
-      /* Choose runs to merge. */
-      assert (xsrt->run_cnt >= order);
-      for (i = 0; i < order; i++) 
-        pop_heap (xsrt->initial_runs, xsrt->run_cnt--,
-                  sizeof *xsrt->initial_runs,
-                  compare_initial_runs, NULL); 
-          
-      /* Merge runs. */
-      output_run = merge_once (&mrg,
-                               xsrt->initial_runs + xsrt->run_cnt, order);
-      if (output_run == NULL)
-        goto done;
-      
-      /* Add output run to heap. */
-      xsrt->initial_runs[xsrt->run_cnt++] = output_run;
-      push_heap (xsrt->initial_runs, xsrt->run_cnt, sizeof *xsrt->initial_runs,
-                 compare_initial_runs, NULL);
+      int order = min (MAX_MERGE_ORDER, xsrt->run_cnt);
+      int idx = choose_merge (xsrt->runs, xsrt->run_cnt, order);
+      xsrt->runs[idx] = merge_once (xsrt, xsrt->runs + idx, order);
+      remove_range (xsrt->runs, xsrt->run_cnt, sizeof *xsrt->runs,
+                    idx + 1, order - 1);
+      xsrt->run_cnt -= order - 1;
     }
-
-  /* Exactly one run is left, which contains the entire sorted
-     file.  We could use it to find a total case count. */
   assert (xsrt->run_cnt == 1);
-
-  success = 1;
-
- done:
-  for (i = 0; i < mrg.case_cnt; i++)
-    case_destroy (&mrg.cases[i]);
-  free (mrg.cases);
-
-  return success;
+  xsrt->run_cnt = 0;
+  return xsrt->runs[0];
 }
 
-/* Modulo function as defined by Knuth. */
+/* Chooses ORDER runs out of the RUN_CNT runs in RUNS to merge,
+   and returns the index of the first one.
+
+   For stability, we must merge only consecutive runs.  For
+   efficiency, we choose the shortest consecutive sequence of
+   runs. */
 static int
-mod (int x, int y)
+choose_merge (struct casefile *runs[], int run_cnt, int order) 
 {
-  if (y == 0)
-    return x;
-  else if (x == 0)
-    return 0;
-  else if (x > 0 && y > 0)
-    return x % y;
-  else if (x < 0 && y > 0)
-    return y - (-x) % y;
+  int min_idx, min_sum;
+  int cur_idx, cur_sum;
+  int i;
 
-  abort ();
+  /* Sum up the length of the first ORDER runs. */
+  cur_sum = 0;
+  for (i = 0; i < order; i++)
+    cur_sum += casefile_get_case_cnt (runs[i]);
+
+  /* Find the shortest group of ORDER runs,
+     using a running total for efficiency. */
+  min_idx = 0;
+  min_sum = cur_sum;
+  for (cur_idx = 1; cur_idx + order <= run_cnt; cur_idx++)
+    {
+      cur_sum -= casefile_get_case_cnt (runs[cur_idx - 1]);
+      cur_sum += casefile_get_case_cnt (runs[cur_idx + order - 1]);
+      if (cur_sum < min_sum)
+        {
+          min_sum = cur_sum;
+          min_idx = cur_idx;
+        }
+    }
+
+  return min_idx;
 }
 
-/* Merges the RUN_CNT initial runs specified in INPUT_RUNS into a
-   new run.  Returns nonzero only if successful.  Adds an entry
-   to MRG->xsrt->runs for the output file if and only if the
-   output file is actually created.  Always deletes all the input
-   files. */
+/* Merges the RUN_CNT initial runs specified in INPUT_FILES into a
+   new run, and returns the new run. */
 static struct casefile *
-merge_once (struct merge_state *mrg,
-            struct casefile *input_runs[],
+merge_once (struct external_sort *xsrt,
+            struct casefile **const input_files,
             size_t run_cnt)
 {
-  struct casereader *input_readers[MAX_MERGE_ORDER];
-  struct ccase input_cases[MAX_MERGE_ORDER];
-  struct casefile *output_casefile = NULL;
+  struct run
+    {
+      struct casefile *file;
+      struct casereader *reader;
+      struct ccase ccase;
+    }
+  *runs;
+
+  struct casefile *output = NULL;
   int i;
 
+  /* Open input files. */
+  runs = xmalloc (sizeof *runs * run_cnt);
   for (i = 0; i < run_cnt; i++) 
     {
-      input_readers[i] = casefile_get_destructive_reader (input_runs[i]);
-      if (!casereader_read_xfer (input_readers[i], &input_cases[i]))
+      struct run *r = &runs[i];
+      r->file = input_files[i];
+      r->reader = casefile_get_destructive_reader (r->file);
+      if (!casereader_read_xfer (r->reader, &r->ccase))
         {
           run_cnt--;
           i--;
         }
     }
-  
-  output_casefile = casefile_create (mrg->xsrt->value_cnt);
-  casefile_to_disk (output_casefile);
+
+  /* Create output file. */
+  output = casefile_create (xsrt->value_cnt);
+  casefile_to_disk (output);
 
   /* Merge. */
   while (run_cnt > 0) 
     {
-      size_t min_idx;
-
+      struct run *min_run, *run;
+      
       /* Find minimum. */
-      min_idx = 0;
-      for (i = 1; i < run_cnt; i++)
-       if (compare_record (&input_cases[i], &input_cases[min_idx],
-                            mrg->xsrt->criteria) < 0)
-          min_idx = i;
+      min_run = runs;
+      for (run = runs + 1; run < runs + run_cnt; run++)
+       if (compare_record (&run->ccase, &min_run->ccase, xsrt->criteria) < 0)
+          min_run = run;
 
       /* Write minimum to output file. */
-      casefile_append_xfer (output_casefile, &input_cases[min_idx]);
+      casefile_append_xfer (output, &min_run->ccase);
 
-      if (!casereader_read_xfer (input_readers[min_idx],
-                                 &input_cases[min_idx]))
+      /* Read another case from minimum run. */
+      if (!casereader_read_xfer (min_run->reader, &min_run->ccase))
         {
-          casereader_destroy (input_readers[min_idx]);
-          casefile_destroy (input_runs[min_idx]);
+          casereader_destroy (min_run->reader);
+          casefile_destroy (min_run->file);
 
+          remove_element (runs, run_cnt, sizeof *runs, min_run - runs);
           run_cnt--;
-          input_runs[min_idx] = input_runs[run_cnt];
-          input_readers[min_idx] = input_readers[run_cnt];
-          input_cases[min_idx] = input_cases[run_cnt];
         } 
     }
 
-  casefile_sleep (output_casefile);
+  casefile_sleep (output);
+  free (runs);
 
-  return output_casefile;
+  return output;
 }