Actually implement the new procedure code and adapt all of its clients
[pspp-builds.git] / src / math / sort.c
index 46da0ec0dc4406b16d4b9b3c6c993b96cdf40623..aa7d2071d2724434478e85cf0af32b8b7053ffd9 100644 (file)
 
 #include "sort.h"
 
-#include <errno.h>
-#include <limits.h>
-#include <stdbool.h>
 #include <stdio.h>
-#include <stdlib.h>
 
-#include <data/case-source.h>
+#include <data/case-ordering.h>
 #include <data/case.h>
-#include <data/casefile.h>
-#include <data/fastfile.h>
-#include <data/casefile-factory.h>
-#include <data/fastfile-factory.h>
-#include <data/procedure.h>
+#include <data/casereader.h>
+#include <data/casewriter.h>
+#include <data/casewriter-provider.h>
 #include <data/settings.h>
-#include <data/variable.h>
-#include <data/storage-stream.h>
 #include <libpspp/alloc.h>
 #include <libpspp/array.h>
 #include <libpspp/assertion.h>
-#include <libpspp/message.h>
-#include <libpspp/message.h>
-#include <libpspp/misc.h>
-#include <libpspp/str.h>
-
-#include "minmax.h"
+#include <math/merge.h>
 
 #include "gettext.h"
 #define _(msgid) gettext (msgid)
 /* These should only be changed for testing purposes. */
 int min_buffers = 64;
 int max_buffers = INT_MAX;
-bool allow_internal_sort = true;
-
-static int compare_record (const struct ccase *, const struct ccase *,
-                           const struct sort_criteria *);
-static struct casefile *do_internal_sort (struct casereader *,
-                                          const struct sort_criteria *,
-                                         struct casefile_factory *
-                                         );
-static struct casefile *do_external_sort (struct casereader *,
-                                          const struct sort_criteria *,
-                                         struct casefile_factory *
-                                         );
-
-
-/* Sorts the active file in-place according to CRITERIA.
-   Returns true if successful. */
-bool
-sort_active_file_in_place (struct dataset *ds, 
-                          const struct sort_criteria *criteria) 
-{
-  struct casefile *in, *out;
-
-  proc_cancel_temporary_transformations (ds);
-  if (!procedure (ds, NULL, NULL))
-    return false;
-  
-  in = proc_capture_output (ds);
-  out = sort_execute (casefile_get_destructive_reader (in), criteria, 
-                     dataset_get_casefile_factory (ds));
-  if (out == NULL) 
-    return false;
-
-  proc_set_source (ds, storage_source_create (out));
-  return true;
-}
 
-/* Data passed to sort_to_casefile_callback(). */
-struct sort_to_casefile_cb_data 
+struct sort_writer 
   {
-    const struct sort_criteria *criteria;
-    struct casefile *output;
-    struct casefile_factory *factory ;
+    struct case_ordering *ordering;
+    struct merge *merge;
+    struct pqueue *pqueue;
+
+    struct casewriter *run;
+    casenumber run_id;
+    struct ccase run_end;
   };
 
-/* Sorts casefile CF according to the criteria in CB_DATA. */
-static bool
-sort_to_casefile_callback (const struct casefile *cf, void *cb_data_) 
-{
-  struct sort_to_casefile_cb_data *cb_data = cb_data_;
-  cb_data->output = sort_execute (casefile_get_reader (cf, NULL), 
-                                 cb_data->criteria,
-                                 cb_data->factory
-                                 );
-  return cb_data->output != NULL;
-}
+static struct casewriter_class sort_casewriter_class;
 
-/* Sorts the active file to a separate casefile.  If successful,
-   returns the sorted casefile.  Returns a null pointer on
-   failure. */
-struct casefile *
-sort_active_file_to_casefile (struct dataset *ds, 
-                             const struct sort_criteria *criteria) 
+static struct pqueue *pqueue_create (const struct case_ordering *);
+static void pqueue_destroy (struct pqueue *);
+static bool pqueue_is_full (const struct pqueue *);
+static bool pqueue_is_empty (const struct pqueue *);
+static void pqueue_push (struct pqueue *, struct ccase *, casenumber);
+static void pqueue_pop (struct pqueue *, struct ccase *, casenumber *);
+
+static void output_record (struct sort_writer *);
+
+struct casewriter *
+sort_create_writer (struct case_ordering *ordering) 
 {
-  struct sort_to_casefile_cb_data cb_data;
-  
-  proc_cancel_temporary_transformations (ds);
+  struct sort_writer *sort;
 
-  cb_data.criteria = criteria;
-  cb_data.output = NULL;
-  cb_data.factory = dataset_get_casefile_factory (ds);
-  if (!multipass_procedure (ds, sort_to_casefile_callback, &cb_data)) 
-    {
-      casefile_destroy (cb_data.output);
-      return NULL;
-    }
-  return cb_data.output;
-}
+  sort = xmalloc (sizeof *sort);
+  sort->ordering = case_ordering_clone (ordering);
+  sort->merge = merge_create (ordering);
+  sort->pqueue = pqueue_create (ordering);
+  sort->run = NULL;
+  sort->run_id = 0;
+  case_nullify (&sort->run_end);
 
+  case_ordering_destroy (ordering);
 
-/* Reads all the cases from READER, which is destroyed.  Sorts
-   the cases according to CRITERIA.  Returns the sorted cases in
-   a newly created casefile, which will be created by FACTORY.
-   If FACTORY is NULL, then a local fastfile_factory will be used.
-*/
-struct casefile *
-sort_execute (struct casereader *reader,
-             const struct sort_criteria *criteria,
-             struct casefile_factory *factory
-             )
-{
-  struct casefile_factory *local_factory = NULL;
-  struct casefile *output ;
-  if ( factory == NULL )
-    factory = local_factory = fastfile_factory_create ();
+  return casewriter_create (&sort_casewriter_class, sort);
+}
 
-  output = do_internal_sort (reader, criteria, factory);
-  if (output == NULL)
-    output = do_external_sort (reader, criteria, factory);
-  casereader_destroy (reader);
+static void
+sort_casewriter_write (struct casewriter *writer UNUSED, void *sort_,
+                       struct ccase *c)
+{
+  struct sort_writer *sort = sort_;
+  bool next_run;
 
-  fastfile_factory_destroy (local_factory);
+  if (pqueue_is_full (sort->pqueue)) 
+    output_record (sort); 
 
-  return output;
+  next_run = (case_is_null (&sort->run_end)
+              || case_ordering_compare_cases (c, &sort->run_end,
+                                              sort->ordering) < 0);
+  pqueue_push (sort->pqueue, c, sort->run_id + (next_run ? 1 : 0));
 }
-\f
-/* A case and its index. */
-struct indexed_case 
-  {
-    struct ccase c;     /* Case. */
-    unsigned long idx;  /* Index to allow for stable sorting. */
-  };
 
-static int compare_indexed_cases (const void *, const void *, const void *);
+static void
+sort_casewriter_destroy (struct casewriter *writer UNUSED, void *sort_) 
+{
+  struct sort_writer *sort = sort_;
+  
+  case_ordering_destroy (sort->ordering);
+  merge_destroy (sort->merge);
+  pqueue_destroy (sort->pqueue);
+  casewriter_destroy (sort->run);
+  case_destroy (&sort->run_end);
+  free (sort);
+}
 
-/* If the data is in memory, do an internal sort and return a new
-   casefile for the data.  Otherwise, return a null pointer. */
-static struct casefile *
-do_internal_sort (struct casereader *reader,
-                  const struct sort_criteria *criteria, 
-                 struct casefile_factory *factory)
+static struct casereader *
+sort_casewriter_convert_to_reader (struct casewriter *writer, void *sort_)
 {
-  const struct casefile *src;
-  struct casefile *dst;
-  unsigned long case_cnt;
-
-  if (!allow_internal_sort)
-    return NULL;
-
-  src = casereader_get_casefile (reader);
-  if (casefile_get_case_cnt (src) > 1 && !casefile_in_core (src))
-    return NULL;
-      
-  case_cnt = casefile_get_case_cnt (src);
-  dst = factory->create_casefile (factory, casefile_get_value_cnt (src));
-  if (case_cnt != 0) 
+  struct sort_writer *sort = sort_;
+  struct casereader *output;
+
+  if (sort->run == NULL && sort->run_id == 0) 
     {
-      struct indexed_case *cases = nmalloc (sizeof *cases, case_cnt);
-      if (cases != NULL) 
-        {
-          unsigned long i;
-          
-          for (i = 0; i < case_cnt; i++)
-            {
-              bool ok = casereader_read_xfer (reader, &cases[i].c);
-              if (!ok)
-                NOT_REACHED ();
-              cases[i].idx = i;
-            }
-
-          sort (cases, case_cnt, sizeof *cases, compare_indexed_cases,
-                (void *) criteria);
-      
-          for (i = 0; i < case_cnt; i++)
-            casefile_append_xfer (dst, &cases[i].c);
-          if (casefile_error (dst))
-            NOT_REACHED ();
-
-          free (cases);
-        }
-      else 
-        {
-          /* Failure. */
-          casefile_destroy (dst);
-          dst = NULL;
-        }
+      /* In-core sort. */
+      sort->run = mem_writer_create (case_ordering_get_value_cnt (
+                                       sort->ordering));
+      sort->run_id = 1; 
     }
+  while (!pqueue_is_empty (sort->pqueue))
+    output_record (sort);
 
-  return dst;
-}
+  merge_append (sort->merge, casewriter_make_reader (sort->run));
+  sort->run = NULL;
 
-/* Compares the variables specified by CRITERIA between the cases
-   at A and B, with a "last resort" comparison for stability, and
-   returns a strcmp()-type result. */
-static int
-compare_indexed_cases (const void *a_, const void *b_, const void *criteria_)
-{
-  const struct sort_criteria *criteria = criteria_;
-  const struct indexed_case *a = a_;
-  const struct indexed_case *b = b_;
-  int result = compare_record (&a->c, &b->c, criteria);
-  if (result == 0)
-    result = a->idx < b->idx ? -1 : a->idx > b->idx;
-  return result;
+  output = merge_make_reader (sort->merge);
+  sort_casewriter_destroy (writer, sort);
+  return output;
 }
-\f
-/* External sort. */
 
-/* Maximum order of merge (external sort only).  The maximum
-   reasonable value is about 7.  Above that, it would be a good
-   idea to use a heap in merge_once() to select the minimum. */
-#define MAX_MERGE_ORDER 7
+static void
+output_record (struct sort_writer *sort)
+{
+  struct ccase min_case;
+  casenumber min_run_id;
 
-/* Results of an external sort. */
-struct external_sort 
-  {
-    const struct sort_criteria *criteria; /* Sort criteria. */
-    size_t value_cnt;                 /* Size of data in `union value's. */
-    struct casefile **runs;           /* Array of initial runs. */
-    size_t run_cnt, run_cap;          /* Number of runs, allocated capacity. */
-    struct casefile_factory *factory; /* Factory used to  create the result */
-  };
+  pqueue_pop (sort->pqueue, &min_case, &min_run_id);
+#if 0
+  printf ("\toutput: %f to run %d\n", case_num_idx (&min_case, 0), min_run_id);
+#endif
 
-/* Prototypes for helper functions. */
-static int write_runs (struct external_sort *, struct casereader *);
-static struct casefile *merge (struct external_sort *);
-static void destroy_external_sort (struct external_sort *);
-
-/* Performs a stable external sort of the active file according
-   to the specification in SCP.  Forms initial runs using a heap
-   as a reservoir.  Merges the initial runs according to a
-   pattern that assures stability. */
-static struct casefile *
-do_external_sort (struct casereader *reader,
-                  const struct sort_criteria *criteria,
-                 struct casefile_factory *factory
-                 )
-{
-  struct external_sort *xsrt;
-
-  if (!casefile_to_disk (casereader_get_casefile (reader)))
-    return NULL;
-
-  xsrt = xmalloc (sizeof *xsrt);
-  xsrt->criteria = criteria;
-  xsrt->value_cnt = casefile_get_value_cnt (casereader_get_casefile (reader));
-  xsrt->run_cap = 512;
-  xsrt->run_cnt = 0;
-  xsrt->runs = xnmalloc (xsrt->run_cap, sizeof *xsrt->runs);
-  xsrt->factory = factory;
-  if (write_runs (xsrt, reader))
+  if (sort->run_id != min_run_id && sort->run != NULL) 
     {
-      struct casefile *output = merge (xsrt);
-      destroy_external_sort (xsrt);
-      return output;
+      merge_append (sort->merge, casewriter_make_reader (sort->run));
+      sort->run = NULL; 
     }
-  else
+  if (sort->run == NULL) 
     {
-      destroy_external_sort (xsrt);
-      return NULL;
+      sort->run = tmpfile_writer_create (case_ordering_get_value_cnt (
+                                           sort->ordering));
+      sort->run_id = min_run_id;
     }
+
+  case_destroy (&sort->run_end);
+  case_clone (&sort->run_end, &min_case);
+  
+  casewriter_write (sort->run, &min_case);
 }
 
-/* Destroys XSRT. */
-static void
-destroy_external_sort (struct external_sort *xsrt) 
+static struct casewriter_class sort_casewriter_class = 
+  {
+    sort_casewriter_write,
+    sort_casewriter_destroy,
+    sort_casewriter_convert_to_reader,
+  };
+\f
+/* Reads all the cases from INPUT.  Sorts the cases according to
+   ORDERING.  Returns the sorted cases in a new casereader, or a
+   null pointer if an I/O error occurs.  Both INPUT and ORDERING
+   are destroyed upon return, regardless of success. */
+struct casereader *
+sort_execute (struct casereader *input, struct case_ordering *ordering)
 {
-  if (xsrt != NULL) 
-    {
-      int i;
-      
-      for (i = 0; i < xsrt->run_cnt; i++)
-        casefile_destroy (xsrt->runs[i]);
-      free (xsrt->runs);
-      free (xsrt);
-    }
+  struct casewriter *output = sort_create_writer (ordering);
+  casereader_transfer (input, output);
+  return casewriter_make_reader (output);
 }
 \f
-/* Replacement selection. */
-
-/* Pairs a record with a run number. */
-struct record_run
+struct pqueue 
   {
-    int run;                    /* Run number of case. */
-    struct ccase record;        /* Case data. */
-    size_t idx;                 /* Case number (for stability). */
+    struct case_ordering *ordering;
+    struct pqueue_record *records;
+    size_t record_cnt;
+    size_t record_cap;
+    casenumber idx;
   };
 
-/* Represents a set of initial runs during an external sort. */
-struct initial_run_state 
+struct pqueue_record
   {
-    struct external_sort *xsrt;
-
-    /* Reservoir. */
-    struct record_run *records; /* Records arranged as a heap. */
-    size_t record_cnt;          /* Current number of records. */
-    size_t record_cap;          /* Capacity for records. */
-    
-    /* Run currently being output. */
-    int run;                    /* Run number. */
-    size_t case_cnt;            /* Number of cases so far. */
-    struct casefile *casefile;  /* Output file. */
-    struct ccase last_output;   /* Record last output. */
-
-    int okay;                   /* Zero if an error has been encountered. */
+    casenumber id;
+    struct ccase c;
+    casenumber idx;
   };
 
-static bool destroy_initial_run_state (struct initial_run_state *);
-static void process_case (struct initial_run_state *, 
-                         const struct ccase *, size_t);
-static int allocate_cases (struct initial_run_state *);
-static void output_record (struct initial_run_state *);
-static void start_run (struct initial_run_state *);
-static void end_run (struct initial_run_state *);
-static int compare_record_run (const struct record_run *,
-                               const struct record_run *,
-                               const struct initial_run_state *);
-static int compare_record_run_minheap (const void *, const void *, 
-                                      const void *);
-
-/* Reads cases from READER and composes initial runs in XSRT. */
-static int
-write_runs (struct external_sort *xsrt, struct casereader *reader)
-{
-  struct initial_run_state *irs;
-  struct ccase c;
-  size_t idx = 0;
-  int success = 0;
-
-  /* Allocate memory for cases. */
-  irs = xmalloc (sizeof *irs);
-  irs->xsrt = xsrt;
-  irs->records = NULL;
-  irs->record_cnt = irs->record_cap = 0;
-  irs->run = 0;
-  irs->case_cnt = 0;
-  irs->casefile = NULL;
-  case_nullify (&irs->last_output);
-  irs->okay = 1;
-  if (!allocate_cases (irs)) 
-    goto done;
-
-  /* Create initial runs. */
-  start_run (irs);
-  for (; irs->okay && casereader_read (reader, &c); case_destroy (&c))
-    process_case (irs, &c, idx++);
-  while (irs->okay && irs->record_cnt > 0)
-    output_record (irs);
-  end_run (irs);
-
-  success = irs->okay;
-
- done:
-  if (!destroy_initial_run_state (irs))
-    success = false;
-
-  return success;
-}
-
-/* Add a single case to an initial run. */
-static void
-process_case (struct initial_run_state *irs, const struct ccase *c, 
-             size_t idx)
-{
-  struct record_run *rr;
-
-  /* Compose record_run for this run and add to heap. */
-  assert (irs->record_cnt < irs->record_cap - 1);
-  rr = irs->records + irs->record_cnt++;
-  case_copy (&rr->record, 0, c, 0, irs->xsrt->value_cnt);
-  rr->run = irs->run;
-  rr->idx = idx;
-  if (!case_is_null (&irs->last_output)
-      && compare_record (c, &irs->last_output, irs->xsrt->criteria) < 0)
-    rr->run = irs->run + 1;
-  push_heap (irs->records, irs->record_cnt, sizeof *irs->records,
-             compare_record_run_minheap, irs);
-
-  /* Output a record if the reservoir is full. */
-  if (irs->record_cnt == irs->record_cap - 1 && irs->okay)
-    output_record (irs);
-}
+static int compare_pqueue_records_minheap (const void *a, const void *b,
+                                           const void *pq_);
 
-/* Destroys the initial run state represented by IRS.
-   Returns true if successful, false if an I/O error occurred. */
-static bool
-destroy_initial_run_state (struct initial_run_state *irs) 
+static struct pqueue *
+pqueue_create (const struct case_ordering *ordering) 
 {
-  int i;
-  bool ok = true;
-
-  if (irs == NULL)
-    return true;
-
-  for (i = 0; i < irs->record_cap; i++)
-    case_destroy (&irs->records[i].record);
-  free (irs->records);
-
-  if (irs->casefile != NULL)
-    ok = casefile_sleep (irs->casefile);
-
-  free (irs);
-  return ok;
+  struct pqueue *pq;
+
+  pq = xmalloc (sizeof *pq);
+  pq->ordering = case_ordering_clone (ordering);
+  pq->record_cap
+    = get_workspace_cases (case_ordering_get_value_cnt (ordering));
+  if (pq->record_cap > max_buffers)
+    pq->record_cap = max_buffers;
+  else if (pq->record_cap < min_buffers)
+    pq->record_cap = min_buffers;
+  pq->record_cnt = 0;
+  pq->records = xnmalloc (pq->record_cap, sizeof *pq->records);
+  pq->idx = 0;
+
+  return pq; 
 }
 
-/* Allocates room for lots of cases as a buffer. */
-static int
-allocate_cases (struct initial_run_state *irs)
-{
-  int approx_case_cost; /* Approximate memory cost of one case in bytes. */
-  int max_cases;        /* Maximum number of cases to allocate. */
-  int i;
-
-  /* Allocate as many cases as we can within the workspace
-     limit. */
-  approx_case_cost = (sizeof *irs->records
-                      + irs->xsrt->value_cnt * sizeof (union value)
-                      + 4 * sizeof (void *));
-  max_cases = get_workspace() / approx_case_cost;
-  if (max_cases > max_buffers)
-    max_cases = max_buffers;
-  irs->records = nmalloc (sizeof *irs->records, max_cases);
-  if (irs->records != NULL)
-    for (i = 0; i < max_cases; i++)
-      if (!case_try_create (&irs->records[i].record, irs->xsrt->value_cnt))
-        {
-          max_cases = i;
-          break;
-        }
-  irs->record_cap = max_cases;
-
-  /* Fail if we didn't allocate an acceptable number of cases. */
-  if (irs->records == NULL || max_cases < min_buffers)
-    {
-      msg (SE, _("Out of memory.  Could not allocate room for minimum of %d "
-                "cases of %d bytes each.  (PSPP workspace is currently "
-                "restricted to a maximum of %lu KB.)"),
-          min_buffers, approx_case_cost,
-           (unsigned long int) (get_workspace() / 1024));
-      return 0;
-    }
-  return 1;
-}
-
-/* Compares the VAR_CNT variables in VARS[] between the `value's at
-   A and B, and returns a strcmp()-type result. */
-static int
-compare_record (const struct ccase *a, const struct ccase *b,
-                const struct sort_criteria *criteria)
+static void
+pqueue_destroy (struct pqueue *pq) 
 {
-  int i;
-
-  assert (a != NULL);
-  assert (b != NULL);
-  
-  for (i = 0; i < criteria->crit_cnt; i++)
+  if (pq != NULL) 
     {
-      const struct sort_criterion *c = &criteria->crits[i];
-      int result;
-      
-      if (c->width == 0)
+      while (!pqueue_is_empty (pq)) 
         {
-          double af = case_num_idx (a, c->fv);
-          double bf = case_num_idx (b, c->fv);
-          
-          result = af < bf ? -1 : af > bf;
+          struct ccase c;
+          casenumber id;
+          pqueue_pop (pq, &c, &id);
+          case_destroy (&c);
         }
-      else
-        result = memcmp (case_str_idx (a, c->fv),
-                         case_str_idx (b, c->fv), c->width);
-
-      if (result != 0)
-        return c->dir == SRT_ASCEND ? result : -result;
+      case_ordering_destroy (pq->ordering);
+      free (pq->records);
+      free (pq);
     }
-
-  return 0;
 }
 
-/* Compares record-run tuples A and B on run number first, then
-   on record, then on case index. */
-static int
-compare_record_run (const struct record_run *a,
-                    const struct record_run *b,
-                    const struct initial_run_state *irs)
+static bool
+pqueue_is_full (const struct pqueue *pq) 
 {
-  int result = a->run < b->run ? -1 : a->run > b->run;
-  if (result == 0)
-    result = compare_record (&a->record, &b->record, irs->xsrt->criteria);
-  if (result == 0)
-    result = a->idx < b->idx ? -1 : a->idx > b->idx;
-  return result;
+  return pq->record_cnt >= pq->record_cap;
 }
 
-/* Compares record-run tuples A and B on run number first, then
-   on the current record according to SCP, but in descending
-   order. */
-static int
-compare_record_run_minheap (const void *a, const void *b, const void *irs) 
+static bool
+pqueue_is_empty (const struct pqueue *pq) 
 {
-  return -compare_record_run (a, b, irs);
+  return pq->record_cnt == 0;
 }
 
-/* Begins a new initial run, specifically its output file. */
 static void
-start_run (struct initial_run_state *irs)
+pqueue_push (struct pqueue *pq, struct ccase *c, casenumber id) 
 {
-  irs->run++;
-  irs->case_cnt = 0;
-
-  /* This casefile is internal to the sort, so don't use the factory
-     to create it. */
-  irs->casefile = fastfile_create (irs->xsrt->value_cnt);
-  casefile_to_disk (irs->casefile);
-  case_nullify (&irs->last_output); 
-}
+  struct pqueue_record *r;
+  
+  assert (!pqueue_is_full (pq));
 
-/* Ends the current initial run.  */
-static void
-end_run (struct initial_run_state *irs)
-{
-  struct external_sort *xsrt = irs->xsrt;
+  r = &pq->records[pq->record_cnt++];
+  r->id = id;
+  case_move (&r->c, c);
+  r->idx = pq->idx++;
 
-  /* Record initial run. */
-  if (irs->casefile != NULL) 
-    {
-      casefile_sleep (irs->casefile);
-      if (xsrt->run_cnt >= xsrt->run_cap) 
-        {
-          xsrt->run_cap *= 2;
-          xsrt->runs = xnrealloc (xsrt->runs,
-                                  xsrt->run_cap, sizeof *xsrt->runs);
-        }
-      xsrt->runs[xsrt->run_cnt++] = irs->casefile;
-      if (casefile_error (irs->casefile))
-        irs->okay = false;
-      irs->casefile = NULL; 
-    }
+  push_heap (pq->records, pq->record_cnt, sizeof *pq->records,
+             compare_pqueue_records_minheap, pq);
 }
 
-/* Writes a record to the current initial run. */
 static void
-output_record (struct initial_run_state *irs)
+pqueue_pop (struct pqueue *pq, struct ccase *c, casenumber *id) 
 {
-  struct record_run *record_run;
-  struct ccase case_tmp;
-  
-  /* Extract minimum case from heap. */
-  assert (irs->record_cnt > 0);
-  pop_heap (irs->records, irs->record_cnt--, sizeof *irs->records,
-            compare_record_run_minheap, irs);
-  record_run = irs->records + irs->record_cnt;
-
-  /* Bail if an error has occurred. */
-  if (!irs->okay)
-    return;
-
-  /* Start new run if necessary. */
-  assert (record_run->run == irs->run
-          || record_run->run == irs->run + 1);
-  if (record_run->run != irs->run)
-    {
-      end_run (irs);
-      start_run (irs);
-    }
-  assert (record_run->run == irs->run);
-  irs->case_cnt++;
+  struct pqueue_record *r;
 
-  /* Write to disk. */
-  if (irs->casefile != NULL)
-    casefile_append (irs->casefile, &record_run->record);
-
-  /* This record becomes last_output. */
-  irs->last_output = case_tmp = record_run->record;
-  record_run->record = irs->records[irs->record_cap - 1].record;
-  irs->records[irs->record_cap - 1].record = case_tmp;
-}
-\f
-/* Merging. */
+  assert (!pqueue_is_empty (pq));
 
-static int choose_merge (struct casefile *runs[], int run_cnt, int order);
-static struct casefile *merge_once (struct external_sort *,
-                                    struct casefile *[], size_t);
+  pop_heap (pq->records, pq->record_cnt--, sizeof *pq->records,
+            compare_pqueue_records_minheap, pq);
 
-/* Repeatedly merges run until only one is left,
-   and returns the final casefile.
-   Returns a null pointer if an I/O error occurs. */
-static struct casefile *
-merge (struct external_sort *xsrt)
-{
-  while (xsrt->run_cnt > 1)
-    {
-      int order = MIN (MAX_MERGE_ORDER, xsrt->run_cnt);
-      int idx = choose_merge (xsrt->runs, xsrt->run_cnt, order);
-      xsrt->runs[idx] = merge_once (xsrt, xsrt->runs + idx, order);
-      remove_range (xsrt->runs, xsrt->run_cnt, sizeof *xsrt->runs,
-                    idx + 1, order - 1);
-      xsrt->run_cnt -= order - 1;
-
-      if (xsrt->runs[idx] == NULL)
-        return NULL;
-    }
-  assert (xsrt->run_cnt == 1);
-  xsrt->run_cnt = 0;
-  return xsrt->runs[0];
+  r = &pq->records[pq->record_cnt];
+  *id = r->id;
+  case_move (c, &r->c);
 }
 
-/* Chooses ORDER runs out of the RUN_CNT runs in RUNS to merge,
-   and returns the index of the first one.
-
-   For stability, we must merge only consecutive runs.  For
-   efficiency, we choose the shortest consecutive sequence of
-   runs. */
+/* Compares record-run tuples A and B on id, then on case data,
+   then on insertion order, in descending order. */
 static int
-choose_merge (struct casefile *runs[], int run_cnt, int order) 
+compare_pqueue_records_minheap (const void *a_, const void *b_,
+                                const void *pq_) 
 {
-  int min_idx, min_sum;
-  int cur_idx, cur_sum;
-  int i;
-
-  /* Sum up the length of the first ORDER runs. */
-  cur_sum = 0;
-  for (i = 0; i < order; i++)
-    cur_sum += casefile_get_case_cnt (runs[i]);
-
-  /* Find the shortest group of ORDER runs,
-     using a running total for efficiency. */
-  min_idx = 0;
-  min_sum = cur_sum;
-  for (cur_idx = 1; cur_idx + order <= run_cnt; cur_idx++)
-    {
-      cur_sum -= casefile_get_case_cnt (runs[cur_idx - 1]);
-      cur_sum += casefile_get_case_cnt (runs[cur_idx + order - 1]);
-      if (cur_sum < min_sum)
-        {
-          min_sum = cur_sum;
-          min_idx = cur_idx;
-        }
-    }
-
-  return min_idx;
-}
-
-/* Merges the RUN_CNT initial runs specified in INPUT_FILES into a
-   new run, and returns the new run.
-   Returns a null pointer if an I/O error occurs. */
-static struct casefile *
-merge_once (struct external_sort *xsrt,
-            struct casefile **const input_files,
-            size_t run_cnt)
-{
-  struct run
-    {
-      struct casefile *file;
-      struct casereader *reader;
-      struct ccase ccase;
-    }
-  *runs;
-
-  struct casefile *output = NULL;
-  int i;
-
-  /* Open input files. */
-  runs = xnmalloc (run_cnt, sizeof *runs);
-  for (i = 0; i < run_cnt; i++) 
-    {
-      struct run *r = &runs[i];
-      r->file = input_files[i];
-      r->reader = casefile_get_destructive_reader (r->file);
-      if (!casereader_read_xfer (r->reader, &r->ccase))
-        {
-          run_cnt--;
-          i--;
-        }
-    }
-
-  /* Create output file. */
-  output = xsrt->factory->create_casefile (xsrt->factory, xsrt->value_cnt);
-  casefile_to_disk (output);
-
-  /* Merge. */
-  while (run_cnt > 0) 
-    {
-      struct run *min_run, *run;
-      
-      /* Find minimum. */
-      min_run = runs;
-      for (run = runs + 1; run < runs + run_cnt; run++)
-       if (compare_record (&run->ccase, &min_run->ccase, xsrt->criteria) < 0)
-          min_run = run;
-
-      /* Write minimum to output file. */
-      casefile_append_xfer (output, &min_run->ccase);
-
-      /* Read another case from minimum run. */
-      if (!casereader_read_xfer (min_run->reader, &min_run->ccase))
-        {
-          if (casefile_error (min_run->file) || casefile_error (output))
-            goto error;
-          casereader_destroy (min_run->reader);
-          casefile_destroy (min_run->file);
-
-          remove_element (runs, run_cnt, sizeof *runs, min_run - runs);
-          run_cnt--;
-        } 
-    }
-
-  if (!casefile_sleep (output))
-    goto error;
-  free (runs);
-
-  return output;
-
- error:
-  for (i = 0; i < run_cnt; i++) 
-    casefile_destroy (runs[i].file);
-  casefile_destroy (output);
-  free (runs);
-  return NULL;
+  const struct pqueue_record *a = a_;
+  const struct pqueue_record *b = b_;
+  const struct pqueue *pq = pq_;
+  int result = a->id < b->id ? -1 : a->id > b->id;
+  if (result == 0)
+    result = case_ordering_compare_cases (&a->c, &b->c, pq->ordering);
+  if (result == 0)
+    result = a->idx < b->idx ? -1 : a->idx > b->idx;
+  return -result;
 }