a16055f971725953ade58241b1f8f8ea67e30bb9
[pspp-builds.git] / src / sort.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21 #include "sort.h"
22 #include "error.h"
23 #include <limits.h>
24 #include <stdio.h>
25 #include <stdlib.h>
26 #include <errno.h>
27 #include "algorithm.h"
28 #include "alloc.h"
29 #include "bool.h"
30 #include "case.h"
31 #include "casefile.h"
32 #include "command.h"
33 #include "error.h"
34 #include "expressions/public.h"
35 #include "glob.h"
36 #include "lexer.h"
37 #include "misc.h"
38 #include "settings.h"
39 #include "str.h"
40 #include "var.h"
41 #include "vfm.h"
42 #include "vfmP.h"
43
44 #include "debug-print.h"
45
46 /* Sort direction. */
47 enum sort_direction
48   {
49     SRT_ASCEND,                 /* A, B, C, ..., X, Y, Z. */
50     SRT_DESCEND                 /* Z, Y, X, ..., C, B, A. */
51   };
52
53 /* A sort criterion. */
54 struct sort_criterion
55   {
56     int fv;                     /* Variable data index. */
57     int width;                  /* 0=numeric, otherwise string widthe. */
58     enum sort_direction dir;    /* Sort direction. */
59   };
60
61 /* A set of sort criteria. */
62 struct sort_criteria 
63   {
64     struct sort_criterion *crits;
65     size_t crit_cnt;
66   };
67
68 /* These should only be changed for testing purposes. */
69 static int min_buffers = 64;
70 static int max_buffers = INT_MAX;
71 static bool allow_internal_sort = true;
72
73 static int compare_record (const struct ccase *, const struct ccase *,
74                            const struct sort_criteria *);
75 static struct casefile *do_internal_sort (struct casereader *,
76                                           const struct sort_criteria *);
77 static struct casefile *do_external_sort (struct casereader *,
78                                           const struct sort_criteria *);
79
80 /* Performs the SORT CASES procedures. */
81 int
82 cmd_sort_cases (void)
83 {
84   struct sort_criteria *criteria;
85   bool success = false;
86
87   lex_match (T_BY);
88
89   criteria = sort_parse_criteria (default_dict, NULL, NULL, NULL);
90   if (criteria == NULL)
91     return CMD_FAILURE;
92
93   if (test_mode && lex_match ('/')) 
94     {
95       if (!lex_force_match_id ("BUFFERS") || !lex_match ('=')
96           || !lex_force_int ())
97         goto done;
98
99       min_buffers = max_buffers = lex_integer ();
100       allow_internal_sort = false;
101       if (max_buffers < 2) 
102         {
103           msg (SE, _("Buffer limit must be at least 2."));
104           goto done;
105         }
106
107       lex_get ();
108     }
109
110   success = sort_active_file_in_place (criteria);
111
112  done:
113   min_buffers = 64;
114   max_buffers = INT_MAX;
115   allow_internal_sort = true;
116   
117   sort_destroy_criteria (criteria);
118   return success ? lex_end_of_command () : CMD_FAILURE;
119 }
120
121 /* Gets ready to sort the active file, either in-place or to a
122    separate casefile. */
123 static void
124 prepare_to_sort_active_file (void) 
125 {
126   /* Cancel temporary transformations and PROCESS IF. */
127   if (temporary != 0)
128     cancel_temporary (); 
129   expr_free (process_if_expr);
130   process_if_expr = NULL;
131
132   /* Make sure source cases are in a storage source. */
133   procedure (NULL, NULL);
134   assert (case_source_is_class (vfm_source, &storage_source_class));
135 }
136
137 /* Sorts the active file in-place according to CRITERIA.
138    Returns nonzero if successful. */
139 int
140 sort_active_file_in_place (const struct sort_criteria *criteria) 
141 {
142   struct casefile *src, *dst;
143   
144   prepare_to_sort_active_file ();
145
146   src = storage_source_get_casefile (vfm_source);
147   dst = sort_execute (casefile_get_destructive_reader (src), criteria);
148   free_case_source (vfm_source);
149   vfm_source = NULL;
150
151   if (dst == NULL) 
152     return 0;
153
154   vfm_source = storage_source_create (dst);
155   return 1;
156 }
157
158 /* Sorts the active file to a separate casefile.  If successful,
159    returns the sorted casefile.  Returns a null pointer on
160    failure. */
161 struct casefile *
162 sort_active_file_to_casefile (const struct sort_criteria *criteria) 
163 {
164   struct casefile *src;
165   
166   prepare_to_sort_active_file ();
167
168   src = storage_source_get_casefile (vfm_source);
169   return sort_execute (casefile_get_reader (src), criteria);
170 }
171
172 /* Parses a list of sort keys and returns a struct sort_criteria
173    based on it.  Returns a null pointer on error.
174    If SAW_DIRECTION is nonnull, sets *SAW_DIRECTION to true if at
175    least one parenthesized sort direction was specified, false
176    otherwise. */
177 struct sort_criteria *
178 sort_parse_criteria (const struct dictionary *dict,
179                      struct variable ***vars, int *var_cnt,
180                      bool *saw_direction)
181 {
182   struct sort_criteria *criteria;
183   struct variable **local_vars = NULL;
184   size_t local_var_cnt;
185
186   assert ((vars == NULL) == (var_cnt == NULL));
187   if (vars == NULL) 
188     {
189       vars = &local_vars;
190       var_cnt = &local_var_cnt;
191     }
192
193   criteria = xmalloc (sizeof *criteria);
194   criteria->crits = NULL;
195   criteria->crit_cnt = 0;
196
197   *vars = NULL;
198   *var_cnt = 0;
199   if (saw_direction != NULL)
200     *saw_direction = false;
201
202   do
203     {
204       int prev_var_cnt = *var_cnt;
205       enum sort_direction direction;
206
207       /* Variables. */
208       if (!parse_variables (dict, vars, var_cnt,
209                             PV_NO_DUPLICATE | PV_APPEND | PV_NO_SCRATCH))
210         goto error;
211
212       /* Sort direction. */
213       if (lex_match ('('))
214         {
215           if (lex_match_id ("D") || lex_match_id ("DOWN"))
216             direction = SRT_DESCEND;
217           else if (lex_match_id ("A") || lex_match_id ("UP"))
218             direction = SRT_ASCEND;
219           else
220             {
221               msg (SE, _("`A' or `D' expected inside parentheses."));
222               goto error;
223             }
224           if (!lex_match (')'))
225             {
226               msg (SE, _("`)' expected."));
227               goto error;
228             }
229           if (saw_direction != NULL)
230             *saw_direction = true;
231         }
232       else
233         direction = SRT_ASCEND;
234
235       criteria->crits = xrealloc (criteria->crits,
236                                   sizeof *criteria->crits * *var_cnt);
237       criteria->crit_cnt = *var_cnt;
238       for (; prev_var_cnt < criteria->crit_cnt; prev_var_cnt++) 
239         {
240           struct sort_criterion *c = &criteria->crits[prev_var_cnt];
241           c->fv = (*vars)[prev_var_cnt]->fv;
242           c->width = (*vars)[prev_var_cnt]->width;
243           c->dir = direction;
244         }
245     }
246   while (token != '.' && token != '/');
247
248   free (local_vars);
249   return criteria;
250
251  error:
252   free (local_vars);
253   sort_destroy_criteria (criteria);
254   return NULL;
255 }
256
257 /* Destroys a SORT CASES program. */
258 void
259 sort_destroy_criteria (struct sort_criteria *criteria) 
260 {
261   if (criteria != NULL) 
262     {
263       free (criteria->crits);
264       free (criteria);
265     }
266 }
267
268 /* Reads all the cases from READER, which is destroyed.  Sorts
269    the cases according to CRITERIA.  Returns the sorted cases in
270    a newly created casefile. */
271 struct casefile *
272 sort_execute (struct casereader *reader, const struct sort_criteria *criteria)
273 {
274   struct casefile *output = do_internal_sort (reader, criteria);
275   if (output == NULL)
276     output = do_external_sort (reader, criteria);
277   casereader_destroy (reader);
278   return output;
279 }
280 \f
281 /* A case and its index. */
282 struct indexed_case 
283   {
284     struct ccase c;     /* Case. */
285     unsigned long idx;  /* Index to allow for stable sorting. */
286   };
287
288 static int compare_indexed_cases (const void *, const void *, void *);
289
290 /* If the data is in memory, do an internal sort and return a new
291    casefile for the data.  Otherwise, return a null pointer. */
292 static struct casefile *
293 do_internal_sort (struct casereader *reader,
294                   const struct sort_criteria *criteria)
295 {
296   const struct casefile *src;
297   struct casefile *dst;
298   unsigned long case_cnt;
299
300   if (!allow_internal_sort)
301     return NULL;
302
303   src = casereader_get_casefile (reader);
304   if (casefile_get_case_cnt (src) > 1 && !casefile_in_core (src))
305     return NULL;
306       
307   case_cnt = casefile_get_case_cnt (src);
308   dst = casefile_create (casefile_get_value_cnt (src));
309   if (case_cnt != 0) 
310     {
311       struct indexed_case *cases = malloc (sizeof *cases * case_cnt);
312       if (cases != NULL) 
313         {
314           unsigned long i;
315           
316           for (i = 0; i < case_cnt; i++)
317             {
318               casereader_read_xfer_assert (reader, &cases[i].c);
319               cases[i].idx = i;
320             }
321
322           sort (cases, case_cnt, sizeof *cases, compare_indexed_cases,
323                 (void *) criteria);
324       
325           for (i = 0; i < case_cnt; i++)
326             casefile_append_xfer (dst, &cases[i].c);
327
328           free (cases);
329         }
330       else 
331         {
332           /* Failure. */
333           casefile_destroy (dst);
334           dst = NULL;
335         }
336     }
337
338   return dst;
339 }
340
341 /* Compares the variables specified by CRITERIA between the cases
342    at A and B, with a "last resort" comparison for stability, and
343    returns a strcmp()-type result. */
344 static int
345 compare_indexed_cases (const void *a_, const void *b_, void *criteria_)
346 {
347   struct sort_criteria *criteria = criteria_;
348   const struct indexed_case *a = a_;
349   const struct indexed_case *b = b_;
350   int result = compare_record (&a->c, &b->c, criteria);
351   if (result == 0)
352     result = a->idx < b->idx ? -1 : a->idx > b->idx;
353   return result;
354 }
355 \f
356 /* External sort. */
357
358 /* Maximum order of merge (external sort only).  The maximum
359    reasonable value is about 7.  Above that, it would be a good
360    idea to use a heap in merge_once() to select the minimum. */
361 #define MAX_MERGE_ORDER 7
362
363 /* Results of an external sort. */
364 struct external_sort 
365   {
366     const struct sort_criteria *criteria; /* Sort criteria. */
367     size_t value_cnt;                 /* Size of data in `union value's. */
368     struct casefile **runs;           /* Array of initial runs. */
369     size_t run_cnt, run_cap;          /* Number of runs, allocated capacity. */
370   };
371
372 /* Prototypes for helper functions. */
373 static int write_runs (struct external_sort *, struct casereader *);
374 static struct casefile *merge (struct external_sort *);
375 static void destroy_external_sort (struct external_sort *);
376
377 /* Performs a stable external sort of the active file according
378    to the specification in SCP.  Forms initial runs using a heap
379    as a reservoir.  Merges the initial runs according to a
380    pattern that assures stability. */
381 static struct casefile *
382 do_external_sort (struct casereader *reader,
383                   const struct sort_criteria *criteria)
384 {
385   struct external_sort *xsrt;
386
387   casefile_to_disk (casereader_get_casefile (reader));
388
389   xsrt = xmalloc (sizeof *xsrt);
390   xsrt->criteria = criteria;
391   xsrt->value_cnt = casefile_get_value_cnt (casereader_get_casefile (reader));
392   xsrt->run_cap = 512;
393   xsrt->run_cnt = 0;
394   xsrt->runs = xmalloc (sizeof *xsrt->runs * xsrt->run_cap);
395   if (write_runs (xsrt, reader))
396     {
397       struct casefile *output = merge (xsrt);
398       destroy_external_sort (xsrt);
399       return output;
400     }
401   else
402     {
403       destroy_external_sort (xsrt);
404       return NULL;
405     }
406 }
407
408 /* Destroys XSRT. */
409 static void
410 destroy_external_sort (struct external_sort *xsrt) 
411 {
412   if (xsrt != NULL) 
413     {
414       int i;
415       
416       for (i = 0; i < xsrt->run_cnt; i++)
417         casefile_destroy (xsrt->runs[i]);
418       free (xsrt->runs);
419       free (xsrt);
420     }
421 }
422 \f
423 /* Replacement selection. */
424
425 /* Pairs a record with a run number. */
426 struct record_run
427   {
428     int run;                    /* Run number of case. */
429     struct ccase record;        /* Case data. */
430     size_t idx;                 /* Case number (for stability). */
431   };
432
433 /* Represents a set of initial runs during an external sort. */
434 struct initial_run_state 
435   {
436     struct external_sort *xsrt;
437
438     /* Reservoir. */
439     struct record_run *records; /* Records arranged as a heap. */
440     size_t record_cnt;          /* Current number of records. */
441     size_t record_cap;          /* Capacity for records. */
442     
443     /* Run currently being output. */
444     int run;                    /* Run number. */
445     size_t case_cnt;            /* Number of cases so far. */
446     struct casefile *casefile;  /* Output file. */
447     struct ccase last_output;   /* Record last output. */
448
449     int okay;                   /* Zero if an error has been encountered. */
450   };
451
452 static const struct case_sink_class sort_sink_class;
453
454 static void destroy_initial_run_state (struct initial_run_state *);
455 static void process_case (struct initial_run_state *, const struct ccase *,
456                           size_t);
457 static int allocate_cases (struct initial_run_state *);
458 static void output_record (struct initial_run_state *);
459 static void start_run (struct initial_run_state *);
460 static void end_run (struct initial_run_state *);
461 static int compare_record_run (const struct record_run *,
462                                const struct record_run *,
463                                struct initial_run_state *);
464 static int compare_record_run_minheap (const void *, const void *, void *);
465
466 /* Reads cases from READER and composes initial runs in XSRT. */
467 static int
468 write_runs (struct external_sort *xsrt, struct casereader *reader)
469 {
470   struct initial_run_state *irs;
471   struct ccase c;
472   size_t idx = 0;
473   int success = 0;
474
475   /* Allocate memory for cases. */
476   irs = xmalloc (sizeof *irs);
477   irs->xsrt = xsrt;
478   irs->records = NULL;
479   irs->record_cnt = irs->record_cap = 0;
480   irs->run = 0;
481   irs->case_cnt = 0;
482   irs->casefile = NULL;
483   case_nullify (&irs->last_output);
484   irs->okay = 1;
485   if (!allocate_cases (irs)) 
486     goto done;
487
488   /* Create initial runs. */
489   start_run (irs);
490   for (; irs->okay && casereader_read (reader, &c); case_destroy (&c))
491     process_case (irs, &c, idx++);
492   while (irs->okay && irs->record_cnt > 0)
493     output_record (irs);
494   end_run (irs);
495
496   success = irs->okay;
497
498  done:
499   destroy_initial_run_state (irs);
500
501   return success;
502 }
503
504 /* Add a single case to an initial run. */
505 static void
506 process_case (struct initial_run_state *irs, const struct ccase *c, size_t idx)
507 {
508   struct record_run *rr;
509
510   /* Compose record_run for this run and add to heap. */
511   assert (irs->record_cnt < irs->record_cap - 1);
512   rr = irs->records + irs->record_cnt++;
513   case_copy (&rr->record, 0, c, 0, irs->xsrt->value_cnt);
514   rr->run = irs->run;
515   rr->idx = idx;
516   if (!case_is_null (&irs->last_output)
517       && compare_record (c, &irs->last_output, irs->xsrt->criteria) < 0)
518     rr->run = irs->run + 1;
519   push_heap (irs->records, irs->record_cnt, sizeof *irs->records,
520              compare_record_run_minheap, irs);
521
522   /* Output a record if the reservoir is full. */
523   if (irs->record_cnt == irs->record_cap - 1 && irs->okay)
524     output_record (irs);
525 }
526
527 /* Destroys the initial run state represented by IRS. */
528 static void
529 destroy_initial_run_state (struct initial_run_state *irs) 
530 {
531   int i;
532
533   if (irs == NULL)
534     return;
535
536   for (i = 0; i < irs->record_cap; i++)
537     case_destroy (&irs->records[i].record);
538   free (irs->records);
539
540   if (irs->casefile != NULL)
541     casefile_sleep (irs->casefile);
542
543   free (irs);
544 }
545
546 /* Allocates room for lots of cases as a buffer. */
547 static int
548 allocate_cases (struct initial_run_state *irs)
549 {
550   int approx_case_cost; /* Approximate memory cost of one case in bytes. */
551   int max_cases;        /* Maximum number of cases to allocate. */
552   int i;
553
554   /* Allocate as many cases as we can within the workspace
555      limit. */
556   approx_case_cost = (sizeof *irs->records
557                       + irs->xsrt->value_cnt * sizeof (union value)
558                       + 4 * sizeof (void *));
559   max_cases = get_max_workspace() / approx_case_cost;
560   if (max_cases > max_buffers)
561     max_cases = max_buffers;
562   irs->records = malloc (sizeof *irs->records * max_cases);
563   for (i = 0; i < max_cases; i++)
564     if (!case_try_create (&irs->records[i].record, irs->xsrt->value_cnt))
565       {
566         max_cases = i;
567         break;
568       }
569   irs->record_cap = max_cases;
570
571   /* Fail if we didn't allocate an acceptable number of cases. */
572   if (irs->records == NULL || max_cases < min_buffers)
573     {
574       msg (SE, _("Out of memory.  Could not allocate room for minimum of %d "
575                  "cases of %d bytes each.  (PSPP workspace is currently "
576                  "restricted to a maximum of %d KB.)"),
577            min_buffers, approx_case_cost, get_max_workspace() / 1024);
578       return 0;
579     }
580   return 1;
581 }
582
583 /* Compares the VAR_CNT variables in VARS[] between the `value's at
584    A and B, and returns a strcmp()-type result. */
585 static int
586 compare_record (const struct ccase *a, const struct ccase *b,
587                 const struct sort_criteria *criteria)
588 {
589   int i;
590
591   assert (a != NULL);
592   assert (b != NULL);
593   
594   for (i = 0; i < criteria->crit_cnt; i++)
595     {
596       const struct sort_criterion *c = &criteria->crits[i];
597       int result;
598       
599       if (c->width == 0)
600         {
601           double af = case_num (a, c->fv);
602           double bf = case_num (b, c->fv);
603           
604           result = af < bf ? -1 : af > bf;
605         }
606       else
607         result = memcmp (case_str (a, c->fv), case_str (b, c->fv), c->width);
608
609       if (result != 0)
610         return c->dir == SRT_ASCEND ? result : -result;
611     }
612
613   return 0;
614 }
615
616 /* Compares record-run tuples A and B on run number first, then
617    on record, then on case index. */
618 static int
619 compare_record_run (const struct record_run *a,
620                     const struct record_run *b,
621                     struct initial_run_state *irs)
622 {
623   int result = a->run < b->run ? -1 : a->run > b->run;
624   if (result == 0)
625     result = compare_record (&a->record, &b->record, irs->xsrt->criteria);
626   if (result == 0)
627     result = a->idx < b->idx ? -1 : a->idx > b->idx;
628   return result;
629 }
630
631 /* Compares record-run tuples A and B on run number first, then
632    on the current record according to SCP, but in descending
633    order. */
634 static int
635 compare_record_run_minheap (const void *a, const void *b, void *irs) 
636 {
637   return -compare_record_run (a, b, irs);
638 }
639
640 /* Begins a new initial run, specifically its output file. */
641 static void
642 start_run (struct initial_run_state *irs)
643 {
644   irs->run++;
645   irs->case_cnt = 0;
646   irs->casefile = casefile_create (irs->xsrt->value_cnt);
647   casefile_to_disk (irs->casefile);
648   case_nullify (&irs->last_output); 
649 }
650
651 /* Ends the current initial run.  */
652 static void
653 end_run (struct initial_run_state *irs)
654 {
655   struct external_sort *xsrt = irs->xsrt;
656
657   /* Record initial run. */
658   if (irs->casefile != NULL) 
659     {
660       casefile_sleep (irs->casefile);
661       if (xsrt->run_cnt >= xsrt->run_cap) 
662         {
663           xsrt->run_cap *= 2;
664           xsrt->runs = xrealloc (xsrt->runs,
665                                  sizeof *xsrt->runs * xsrt->run_cap);
666         }
667       xsrt->runs[xsrt->run_cnt++] = irs->casefile;
668       irs->casefile = NULL; 
669     }
670 }
671
672 /* Writes a record to the current initial run. */
673 static void
674 output_record (struct initial_run_state *irs)
675 {
676   struct record_run *record_run;
677   struct ccase case_tmp;
678   
679   /* Extract minimum case from heap. */
680   assert (irs->record_cnt > 0);
681   pop_heap (irs->records, irs->record_cnt--, sizeof *irs->records,
682             compare_record_run_minheap, irs);
683   record_run = irs->records + irs->record_cnt;
684
685   /* Bail if an error has occurred. */
686   if (!irs->okay)
687     return;
688
689   /* Start new run if necessary. */
690   assert (record_run->run == irs->run
691           || record_run->run == irs->run + 1);
692   if (record_run->run != irs->run)
693     {
694       end_run (irs);
695       start_run (irs);
696     }
697   assert (record_run->run == irs->run);
698   irs->case_cnt++;
699
700   /* Write to disk. */
701   if (irs->casefile != NULL)
702     casefile_append (irs->casefile, &record_run->record);
703
704   /* This record becomes last_output. */
705   irs->last_output = case_tmp = record_run->record;
706   record_run->record = irs->records[irs->record_cap - 1].record;
707   irs->records[irs->record_cap - 1].record = case_tmp;
708 }
709 \f
710 /* Merging. */
711
712 static int choose_merge (struct casefile *runs[], int run_cnt, int order);
713 static struct casefile *merge_once (struct external_sort *,
714                                     struct casefile *[], size_t);
715
716 /* Repeatedly merges run until only one is left,
717    and returns the final casefile.  */
718 static struct casefile *
719 merge (struct external_sort *xsrt)
720 {
721   while (xsrt->run_cnt > 1)
722     {
723       int order = min (MAX_MERGE_ORDER, xsrt->run_cnt);
724       int idx = choose_merge (xsrt->runs, xsrt->run_cnt, order);
725       xsrt->runs[idx] = merge_once (xsrt, xsrt->runs + idx, order);
726       remove_range (xsrt->runs, xsrt->run_cnt, sizeof *xsrt->runs,
727                     idx + 1, order - 1);
728       xsrt->run_cnt -= order - 1;
729     }
730   assert (xsrt->run_cnt == 1);
731   xsrt->run_cnt = 0;
732   return xsrt->runs[0];
733 }
734
735 /* Chooses ORDER runs out of the RUN_CNT runs in RUNS to merge,
736    and returns the index of the first one.
737
738    For stability, we must merge only consecutive runs.  For
739    efficiency, we choose the shortest consecutive sequence of
740    runs. */
741 static int
742 choose_merge (struct casefile *runs[], int run_cnt, int order) 
743 {
744   int min_idx, min_sum;
745   int cur_idx, cur_sum;
746   int i;
747
748   /* Sum up the length of the first ORDER runs. */
749   cur_sum = 0;
750   for (i = 0; i < order; i++)
751     cur_sum += casefile_get_case_cnt (runs[i]);
752
753   /* Find the shortest group of ORDER runs,
754      using a running total for efficiency. */
755   min_idx = 0;
756   min_sum = cur_sum;
757   for (cur_idx = 1; cur_idx + order <= run_cnt; cur_idx++)
758     {
759       cur_sum -= casefile_get_case_cnt (runs[cur_idx - 1]);
760       cur_sum += casefile_get_case_cnt (runs[cur_idx + order - 1]);
761       if (cur_sum < min_sum)
762         {
763           min_sum = cur_sum;
764           min_idx = cur_idx;
765         }
766     }
767
768   return min_idx;
769 }
770
771 /* Merges the RUN_CNT initial runs specified in INPUT_FILES into a
772    new run, and returns the new run. */
773 static struct casefile *
774 merge_once (struct external_sort *xsrt,
775             struct casefile **const input_files,
776             size_t run_cnt)
777 {
778   struct run
779     {
780       struct casefile *file;
781       struct casereader *reader;
782       struct ccase ccase;
783     }
784   *runs;
785
786   struct casefile *output = NULL;
787   int i;
788
789   /* Open input files. */
790   runs = xmalloc (sizeof *runs * run_cnt);
791   for (i = 0; i < run_cnt; i++) 
792     {
793       struct run *r = &runs[i];
794       r->file = input_files[i];
795       r->reader = casefile_get_destructive_reader (r->file);
796       if (!casereader_read_xfer (r->reader, &r->ccase))
797         {
798           run_cnt--;
799           i--;
800         }
801     }
802
803   /* Create output file. */
804   output = casefile_create (xsrt->value_cnt);
805   casefile_to_disk (output);
806
807   /* Merge. */
808   while (run_cnt > 0) 
809     {
810       struct run *min_run, *run;
811       
812       /* Find minimum. */
813       min_run = runs;
814       for (run = runs + 1; run < runs + run_cnt; run++)
815         if (compare_record (&run->ccase, &min_run->ccase, xsrt->criteria) < 0)
816           min_run = run;
817
818       /* Write minimum to output file. */
819       casefile_append_xfer (output, &min_run->ccase);
820
821       /* Read another case from minimum run. */
822       if (!casereader_read_xfer (min_run->reader, &min_run->ccase))
823         {
824           casereader_destroy (min_run->reader);
825           casefile_destroy (min_run->file);
826
827           remove_element (runs, run_cnt, sizeof *runs, min_run - runs);
828           run_cnt--;
829         } 
830     }
831
832   casefile_sleep (output);
833   free (runs);
834
835   return output;
836 }