Changed all the licence notices in all the files.
[pspp-builds.git] / src / sort.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21 #include "sort.h"
22 #include "error.h"
23 #include <limits.h>
24 #include <stdio.h>
25 #include <stdlib.h>
26 #include <errno.h>
27 #include "algorithm.h"
28 #include "alloc.h"
29 #include "bool.h"
30 #include "case.h"
31 #include "casefile.h"
32 #include "command.h"
33 #include "error.h"
34 #include "expressions/public.h"
35 #include "glob.h"
36 #include "lexer.h"
37 #include "misc.h"
38 #include "settings.h"
39 #include "str.h"
40 #include "var.h"
41 #include "vfm.h"
42 #include "vfmP.h"
43
44 #include "debug-print.h"
45
46 /* Sort direction. */
47 enum sort_direction
48   {
49     SRT_ASCEND,                 /* A, B, C, ..., X, Y, Z. */
50     SRT_DESCEND                 /* Z, Y, X, ..., C, B, A. */
51   };
52
53 /* A sort criterion. */
54 struct sort_criterion
55   {
56     int fv;                     /* Variable data index. */
57     int width;                  /* 0=numeric, otherwise string widthe. */
58     enum sort_direction dir;    /* Sort direction. */
59   };
60
61 /* A set of sort criteria. */
62 struct sort_criteria 
63   {
64     struct sort_criterion *crits;
65     size_t crit_cnt;
66   };
67
68 /* These should only be changed for testing purposes. */
69 static int min_buffers = 64;
70 static int max_buffers = INT_MAX;
71 static bool allow_internal_sort = true;
72
73 static int compare_record (const struct ccase *, const struct ccase *,
74                            const struct sort_criteria *);
75 static struct casefile *do_internal_sort (struct casereader *,
76                                           const struct sort_criteria *);
77 static struct casefile *do_external_sort (struct casereader *,
78                                           const struct sort_criteria *);
79
80 /* Performs the SORT CASES procedures. */
81 int
82 cmd_sort_cases (void)
83 {
84   struct sort_criteria *criteria;
85   bool success = false;
86
87   lex_match (T_BY);
88
89   criteria = sort_parse_criteria (default_dict, NULL, NULL, NULL);
90   if (criteria == NULL)
91     return CMD_FAILURE;
92
93   if (test_mode && lex_match ('/')) 
94     {
95       if (!lex_force_match_id ("BUFFERS") || !lex_match ('=')
96           || !lex_force_int ())
97         goto done;
98
99       min_buffers = max_buffers = lex_integer ();
100       allow_internal_sort = false;
101       if (max_buffers < 2) 
102         {
103           msg (SE, _("Buffer limit must be at least 2."));
104           goto done;
105         }
106
107       lex_get ();
108     }
109
110   success = sort_active_file_in_place (criteria);
111
112  done:
113   min_buffers = 64;
114   max_buffers = INT_MAX;
115   allow_internal_sort = true;
116   
117   sort_destroy_criteria (criteria);
118   return success ? lex_end_of_command () : CMD_FAILURE;
119 }
120
121 /* Gets ready to sort the active file, either in-place or to a
122    separate casefile. */
123 static void
124 prepare_to_sort_active_file (void) 
125 {
126   /* Cancel temporary transformations and PROCESS IF. */
127   if (temporary != 0)
128     cancel_temporary (); 
129   expr_free (process_if_expr);
130   process_if_expr = NULL;
131
132   /* Make sure source cases are in a storage source. */
133   procedure (NULL, NULL);
134   assert (case_source_is_class (vfm_source, &storage_source_class));
135 }
136
137 /* Sorts the active file in-place according to CRITERIA.
138    Returns nonzero if successful. */
139 int
140 sort_active_file_in_place (const struct sort_criteria *criteria) 
141 {
142   struct casefile *src, *dst;
143   
144   prepare_to_sort_active_file ();
145
146   src = storage_source_get_casefile (vfm_source);
147   dst = sort_execute (casefile_get_destructive_reader (src), criteria);
148   free_case_source (vfm_source);
149   vfm_source = NULL;
150
151   if (dst == NULL) 
152     return 0;
153
154   vfm_source = storage_source_create (dst);
155   return 1;
156 }
157
158 /* Sorts the active file to a separate casefile.  If successful,
159    returns the sorted casefile.  Returns a null pointer on
160    failure. */
161 struct casefile *
162 sort_active_file_to_casefile (const struct sort_criteria *criteria) 
163 {
164   struct casefile *src;
165   
166   prepare_to_sort_active_file ();
167
168   src = storage_source_get_casefile (vfm_source);
169   return sort_execute (casefile_get_reader (src), criteria);
170 }
171
172 /* Parses a list of sort keys and returns a struct sort_criteria
173    based on it.  Returns a null pointer on error.
174    If SAW_DIRECTION is nonnull, sets *SAW_DIRECTION to true if at
175    least one parenthesized sort direction was specified, false
176    otherwise. */
177 struct sort_criteria *
178 sort_parse_criteria (const struct dictionary *dict,
179                      struct variable ***vars, int *var_cnt,
180                      bool *saw_direction)
181 {
182   struct sort_criteria *criteria;
183   struct variable **local_vars = NULL;
184   size_t local_var_cnt;
185
186   assert ((vars == NULL) == (var_cnt == NULL));
187   if (vars == NULL) 
188     {
189       vars = &local_vars;
190       var_cnt = &local_var_cnt;
191     }
192
193   criteria = xmalloc (sizeof *criteria);
194   criteria->crits = NULL;
195   criteria->crit_cnt = 0;
196
197   *vars = NULL;
198   *var_cnt = 0;
199   if (saw_direction != NULL)
200     *saw_direction = false;
201
202   do
203     {
204       int prev_var_cnt = *var_cnt;
205       enum sort_direction direction;
206
207       /* Variables. */
208       if (!parse_variables (dict, vars, var_cnt,
209                             PV_NO_DUPLICATE | PV_APPEND | PV_NO_SCRATCH))
210         goto error;
211
212       /* Sort direction. */
213       if (lex_match ('('))
214         {
215           if (lex_match_id ("D") || lex_match_id ("DOWN"))
216             direction = SRT_DESCEND;
217           else if (lex_match_id ("A") || lex_match_id ("UP"))
218             direction = SRT_ASCEND;
219           else
220             {
221               msg (SE, _("`A' or `D' expected inside parentheses."));
222               goto error;
223             }
224           if (!lex_match (')'))
225             {
226               msg (SE, _("`)' expected."));
227               goto error;
228             }
229           *saw_direction = true;
230         }
231       else
232         direction = SRT_ASCEND;
233
234       criteria->crits = xrealloc (criteria->crits,
235                                   sizeof *criteria->crits * *var_cnt);
236       criteria->crit_cnt = *var_cnt;
237       for (; prev_var_cnt < criteria->crit_cnt; prev_var_cnt++) 
238         {
239           struct sort_criterion *c = &criteria->crits[prev_var_cnt];
240           c->fv = (*vars)[prev_var_cnt]->fv;
241           c->width = (*vars)[prev_var_cnt]->width;
242           c->dir = direction;
243         }
244     }
245   while (token != '.' && token != '/');
246
247   free (local_vars);
248   return criteria;
249
250  error:
251   free (local_vars);
252   sort_destroy_criteria (criteria);
253   return NULL;
254 }
255
256 /* Destroys a SORT CASES program. */
257 void
258 sort_destroy_criteria (struct sort_criteria *criteria) 
259 {
260   if (criteria != NULL) 
261     {
262       free (criteria->crits);
263       free (criteria);
264     }
265 }
266
267 /* Reads all the cases from READER, which is destroyed.  Sorts
268    the cases according to CRITERIA.  Returns the sorted cases in
269    a newly created casefile. */
270 struct casefile *
271 sort_execute (struct casereader *reader, const struct sort_criteria *criteria)
272 {
273   struct casefile *output = do_internal_sort (reader, criteria);
274   if (output == NULL)
275     output = do_external_sort (reader, criteria);
276   casereader_destroy (reader);
277   return output;
278 }
279 \f
280 /* A case and its index. */
281 struct indexed_case 
282   {
283     struct ccase c;     /* Case. */
284     unsigned long idx;  /* Index to allow for stable sorting. */
285   };
286
287 static int compare_indexed_cases (const void *, const void *, void *);
288
289 /* If the data is in memory, do an internal sort and return a new
290    casefile for the data.  Otherwise, return a null pointer. */
291 static struct casefile *
292 do_internal_sort (struct casereader *reader,
293                   const struct sort_criteria *criteria)
294 {
295   const struct casefile *src;
296   struct casefile *dst;
297   unsigned long case_cnt;
298
299   if (!allow_internal_sort)
300     return NULL;
301
302   src = casereader_get_casefile (reader);
303   if (casefile_get_case_cnt (src) > 1 && !casefile_in_core (src))
304     return NULL;
305       
306   case_cnt = casefile_get_case_cnt (src);
307   dst = casefile_create (casefile_get_value_cnt (src));
308   if (case_cnt != 0) 
309     {
310       struct indexed_case *cases = malloc (sizeof *cases * case_cnt);
311       if (cases != NULL) 
312         {
313           unsigned long i;
314           
315           for (i = 0; i < case_cnt; i++)
316             {
317               casereader_read_xfer_assert (reader, &cases[i].c);
318               cases[i].idx = i;
319             }
320
321           sort (cases, case_cnt, sizeof *cases, compare_indexed_cases,
322                 (void *) criteria);
323       
324           for (i = 0; i < case_cnt; i++)
325             casefile_append_xfer (dst, &cases[i].c);
326
327           free (cases);
328         }
329       else 
330         {
331           /* Failure. */
332           casefile_destroy (dst);
333           dst = NULL;
334         }
335     }
336
337   return dst;
338 }
339
340 /* Compares the variables specified by CRITERIA between the cases
341    at A and B, with a "last resort" comparison for stability, and
342    returns a strcmp()-type result. */
343 static int
344 compare_indexed_cases (const void *a_, const void *b_, void *criteria_)
345 {
346   struct sort_criteria *criteria = criteria_;
347   const struct indexed_case *a = a_;
348   const struct indexed_case *b = b_;
349   int result = compare_record (&a->c, &b->c, criteria);
350   if (result == 0)
351     result = a->idx < b->idx ? -1 : a->idx > b->idx;
352   return result;
353 }
354 \f
355 /* External sort. */
356
357 /* Maximum order of merge (external sort only).  The maximum
358    reasonable value is about 7.  Above that, it would be a good
359    idea to use a heap in merge_once() to select the minimum. */
360 #define MAX_MERGE_ORDER 7
361
362 /* Results of an external sort. */
363 struct external_sort 
364   {
365     const struct sort_criteria *criteria; /* Sort criteria. */
366     size_t value_cnt;                 /* Size of data in `union value's. */
367     struct casefile **runs;           /* Array of initial runs. */
368     size_t run_cnt, run_cap;          /* Number of runs, allocated capacity. */
369   };
370
371 /* Prototypes for helper functions. */
372 static int write_runs (struct external_sort *, struct casereader *);
373 static struct casefile *merge (struct external_sort *);
374 static void destroy_external_sort (struct external_sort *);
375
376 /* Performs a stable external sort of the active file according
377    to the specification in SCP.  Forms initial runs using a heap
378    as a reservoir.  Merges the initial runs according to a
379    pattern that assures stability. */
380 static struct casefile *
381 do_external_sort (struct casereader *reader,
382                   const struct sort_criteria *criteria)
383 {
384   struct external_sort *xsrt;
385
386   casefile_to_disk (casereader_get_casefile (reader));
387
388   xsrt = xmalloc (sizeof *xsrt);
389   xsrt->criteria = criteria;
390   xsrt->value_cnt = casefile_get_value_cnt (casereader_get_casefile (reader));
391   xsrt->run_cap = 512;
392   xsrt->run_cnt = 0;
393   xsrt->runs = xmalloc (sizeof *xsrt->runs * xsrt->run_cap);
394   if (write_runs (xsrt, reader))
395     {
396       struct casefile *output = merge (xsrt);
397       destroy_external_sort (xsrt);
398       return output;
399     }
400   else
401     {
402       destroy_external_sort (xsrt);
403       return NULL;
404     }
405 }
406
407 /* Destroys XSRT. */
408 static void
409 destroy_external_sort (struct external_sort *xsrt) 
410 {
411   if (xsrt != NULL) 
412     {
413       int i;
414       
415       for (i = 0; i < xsrt->run_cnt; i++)
416         casefile_destroy (xsrt->runs[i]);
417       free (xsrt->runs);
418       free (xsrt);
419     }
420 }
421 \f
422 /* Replacement selection. */
423
424 /* Pairs a record with a run number. */
425 struct record_run
426   {
427     int run;                    /* Run number of case. */
428     struct ccase record;        /* Case data. */
429     size_t idx;                 /* Case number (for stability). */
430   };
431
432 /* Represents a set of initial runs during an external sort. */
433 struct initial_run_state 
434   {
435     struct external_sort *xsrt;
436
437     /* Reservoir. */
438     struct record_run *records; /* Records arranged as a heap. */
439     size_t record_cnt;          /* Current number of records. */
440     size_t record_cap;          /* Capacity for records. */
441     
442     /* Run currently being output. */
443     int run;                    /* Run number. */
444     size_t case_cnt;            /* Number of cases so far. */
445     struct casefile *casefile;  /* Output file. */
446     struct ccase last_output;   /* Record last output. */
447
448     int okay;                   /* Zero if an error has been encountered. */
449   };
450
451 static const struct case_sink_class sort_sink_class;
452
453 static void destroy_initial_run_state (struct initial_run_state *);
454 static void process_case (struct initial_run_state *, const struct ccase *,
455                           size_t);
456 static int allocate_cases (struct initial_run_state *);
457 static void output_record (struct initial_run_state *);
458 static void start_run (struct initial_run_state *);
459 static void end_run (struct initial_run_state *);
460 static int compare_record_run (const struct record_run *,
461                                const struct record_run *,
462                                struct initial_run_state *);
463 static int compare_record_run_minheap (const void *, const void *, void *);
464
465 /* Reads cases from READER and composes initial runs in XSRT. */
466 static int
467 write_runs (struct external_sort *xsrt, struct casereader *reader)
468 {
469   struct initial_run_state *irs;
470   struct ccase c;
471   size_t idx = 0;
472   int success = 0;
473
474   /* Allocate memory for cases. */
475   irs = xmalloc (sizeof *irs);
476   irs->xsrt = xsrt;
477   irs->records = NULL;
478   irs->record_cnt = irs->record_cap = 0;
479   irs->run = 0;
480   irs->case_cnt = 0;
481   irs->casefile = NULL;
482   case_nullify (&irs->last_output);
483   irs->okay = 1;
484   if (!allocate_cases (irs)) 
485     goto done;
486
487   /* Create initial runs. */
488   start_run (irs);
489   for (; irs->okay && casereader_read (reader, &c); case_destroy (&c))
490     process_case (irs, &c, idx++);
491   while (irs->okay && irs->record_cnt > 0)
492     output_record (irs);
493   end_run (irs);
494
495   success = irs->okay;
496
497  done:
498   destroy_initial_run_state (irs);
499
500   return success;
501 }
502
503 /* Add a single case to an initial run. */
504 static void
505 process_case (struct initial_run_state *irs, const struct ccase *c, size_t idx)
506 {
507   struct record_run *rr;
508
509   /* Compose record_run for this run and add to heap. */
510   assert (irs->record_cnt < irs->record_cap - 1);
511   rr = irs->records + irs->record_cnt++;
512   case_copy (&rr->record, 0, c, 0, irs->xsrt->value_cnt);
513   rr->run = irs->run;
514   rr->idx = idx;
515   if (!case_is_null (&irs->last_output)
516       && compare_record (c, &irs->last_output, irs->xsrt->criteria) < 0)
517     rr->run = irs->run + 1;
518   push_heap (irs->records, irs->record_cnt, sizeof *irs->records,
519              compare_record_run_minheap, irs);
520
521   /* Output a record if the reservoir is full. */
522   if (irs->record_cnt == irs->record_cap - 1 && irs->okay)
523     output_record (irs);
524 }
525
526 /* Destroys the initial run state represented by IRS. */
527 static void
528 destroy_initial_run_state (struct initial_run_state *irs) 
529 {
530   int i;
531
532   if (irs == NULL)
533     return;
534
535   for (i = 0; i < irs->record_cap; i++)
536     case_destroy (&irs->records[i].record);
537   free (irs->records);
538
539   if (irs->casefile != NULL)
540     casefile_sleep (irs->casefile);
541
542   free (irs);
543 }
544
545 /* Allocates room for lots of cases as a buffer. */
546 static int
547 allocate_cases (struct initial_run_state *irs)
548 {
549   int approx_case_cost; /* Approximate memory cost of one case in bytes. */
550   int max_cases;        /* Maximum number of cases to allocate. */
551   int i;
552
553   /* Allocate as many cases as we can within the workspace
554      limit. */
555   approx_case_cost = (sizeof *irs->records
556                       + irs->xsrt->value_cnt * sizeof (union value)
557                       + 4 * sizeof (void *));
558   max_cases = get_max_workspace() / approx_case_cost;
559   if (max_cases > max_buffers)
560     max_cases = max_buffers;
561   irs->records = malloc (sizeof *irs->records * max_cases);
562   for (i = 0; i < max_cases; i++)
563     if (!case_try_create (&irs->records[i].record, irs->xsrt->value_cnt))
564       {
565         max_cases = i;
566         break;
567       }
568   irs->record_cap = max_cases;
569
570   /* Fail if we didn't allocate an acceptable number of cases. */
571   if (irs->records == NULL || max_cases < min_buffers)
572     {
573       msg (SE, _("Out of memory.  Could not allocate room for minimum of %d "
574                  "cases of %d bytes each.  (PSPP workspace is currently "
575                  "restricted to a maximum of %d KB.)"),
576            min_buffers, approx_case_cost, get_max_workspace() / 1024);
577       return 0;
578     }
579   return 1;
580 }
581
582 /* Compares the VAR_CNT variables in VARS[] between the `value's at
583    A and B, and returns a strcmp()-type result. */
584 static int
585 compare_record (const struct ccase *a, const struct ccase *b,
586                 const struct sort_criteria *criteria)
587 {
588   int i;
589
590   assert (a != NULL);
591   assert (b != NULL);
592   
593   for (i = 0; i < criteria->crit_cnt; i++)
594     {
595       const struct sort_criterion *c = &criteria->crits[i];
596       int result;
597       
598       if (c->width == 0)
599         {
600           double af = case_num (a, c->fv);
601           double bf = case_num (b, c->fv);
602           
603           result = af < bf ? -1 : af > bf;
604         }
605       else
606         result = memcmp (case_str (a, c->fv), case_str (b, c->fv), c->width);
607
608       if (result != 0)
609         return c->dir == SRT_ASCEND ? result : -result;
610     }
611
612   return 0;
613 }
614
615 /* Compares record-run tuples A and B on run number first, then
616    on record, then on case index. */
617 static int
618 compare_record_run (const struct record_run *a,
619                     const struct record_run *b,
620                     struct initial_run_state *irs)
621 {
622   int result = a->run < b->run ? -1 : a->run > b->run;
623   if (result == 0)
624     result = compare_record (&a->record, &b->record, irs->xsrt->criteria);
625   if (result == 0)
626     result = a->idx < b->idx ? -1 : a->idx > b->idx;
627   return result;
628 }
629
630 /* Compares record-run tuples A and B on run number first, then
631    on the current record according to SCP, but in descending
632    order. */
633 static int
634 compare_record_run_minheap (const void *a, const void *b, void *irs) 
635 {
636   return -compare_record_run (a, b, irs);
637 }
638
639 /* Begins a new initial run, specifically its output file. */
640 static void
641 start_run (struct initial_run_state *irs)
642 {
643   irs->run++;
644   irs->case_cnt = 0;
645   irs->casefile = casefile_create (irs->xsrt->value_cnt);
646   casefile_to_disk (irs->casefile);
647   case_nullify (&irs->last_output); 
648 }
649
650 /* Ends the current initial run.  */
651 static void
652 end_run (struct initial_run_state *irs)
653 {
654   struct external_sort *xsrt = irs->xsrt;
655
656   /* Record initial run. */
657   if (irs->casefile != NULL) 
658     {
659       casefile_sleep (irs->casefile);
660       if (xsrt->run_cnt >= xsrt->run_cap) 
661         {
662           xsrt->run_cap *= 2;
663           xsrt->runs = xrealloc (xsrt->runs,
664                                  sizeof *xsrt->runs * xsrt->run_cap);
665         }
666       xsrt->runs[xsrt->run_cnt++] = irs->casefile;
667       irs->casefile = NULL; 
668     }
669 }
670
671 /* Writes a record to the current initial run. */
672 static void
673 output_record (struct initial_run_state *irs)
674 {
675   struct record_run *record_run;
676   struct ccase case_tmp;
677   
678   /* Extract minimum case from heap. */
679   assert (irs->record_cnt > 0);
680   pop_heap (irs->records, irs->record_cnt--, sizeof *irs->records,
681             compare_record_run_minheap, irs);
682   record_run = irs->records + irs->record_cnt;
683
684   /* Bail if an error has occurred. */
685   if (!irs->okay)
686     return;
687
688   /* Start new run if necessary. */
689   assert (record_run->run == irs->run
690           || record_run->run == irs->run + 1);
691   if (record_run->run != irs->run)
692     {
693       end_run (irs);
694       start_run (irs);
695     }
696   assert (record_run->run == irs->run);
697   irs->case_cnt++;
698
699   /* Write to disk. */
700   if (irs->casefile != NULL)
701     casefile_append (irs->casefile, &record_run->record);
702
703   /* This record becomes last_output. */
704   irs->last_output = case_tmp = record_run->record;
705   record_run->record = irs->records[irs->record_cap - 1].record;
706   irs->records[irs->record_cap - 1].record = case_tmp;
707 }
708 \f
709 /* Merging. */
710
711 static int choose_merge (struct casefile *runs[], int run_cnt, int order);
712 static struct casefile *merge_once (struct external_sort *,
713                                     struct casefile *[], size_t);
714
715 /* Repeatedly merges run until only one is left,
716    and returns the final casefile.  */
717 static struct casefile *
718 merge (struct external_sort *xsrt)
719 {
720   while (xsrt->run_cnt > 1)
721     {
722       int order = min (MAX_MERGE_ORDER, xsrt->run_cnt);
723       int idx = choose_merge (xsrt->runs, xsrt->run_cnt, order);
724       xsrt->runs[idx] = merge_once (xsrt, xsrt->runs + idx, order);
725       remove_range (xsrt->runs, xsrt->run_cnt, sizeof *xsrt->runs,
726                     idx + 1, order - 1);
727       xsrt->run_cnt -= order - 1;
728     }
729   assert (xsrt->run_cnt == 1);
730   xsrt->run_cnt = 0;
731   return xsrt->runs[0];
732 }
733
734 /* Chooses ORDER runs out of the RUN_CNT runs in RUNS to merge,
735    and returns the index of the first one.
736
737    For stability, we must merge only consecutive runs.  For
738    efficiency, we choose the shortest consecutive sequence of
739    runs. */
740 static int
741 choose_merge (struct casefile *runs[], int run_cnt, int order) 
742 {
743   int min_idx, min_sum;
744   int cur_idx, cur_sum;
745   int i;
746
747   /* Sum up the length of the first ORDER runs. */
748   cur_sum = 0;
749   for (i = 0; i < order; i++)
750     cur_sum += casefile_get_case_cnt (runs[i]);
751
752   /* Find the shortest group of ORDER runs,
753      using a running total for efficiency. */
754   min_idx = 0;
755   min_sum = cur_sum;
756   for (cur_idx = 1; cur_idx + order <= run_cnt; cur_idx++)
757     {
758       cur_sum -= casefile_get_case_cnt (runs[cur_idx - 1]);
759       cur_sum += casefile_get_case_cnt (runs[cur_idx + order - 1]);
760       if (cur_sum < min_sum)
761         {
762           min_sum = cur_sum;
763           min_idx = cur_idx;
764         }
765     }
766
767   return min_idx;
768 }
769
770 /* Merges the RUN_CNT initial runs specified in INPUT_FILES into a
771    new run, and returns the new run. */
772 static struct casefile *
773 merge_once (struct external_sort *xsrt,
774             struct casefile **const input_files,
775             size_t run_cnt)
776 {
777   struct run
778     {
779       struct casefile *file;
780       struct casereader *reader;
781       struct ccase ccase;
782     }
783   *runs;
784
785   struct casefile *output = NULL;
786   int i;
787
788   /* Open input files. */
789   runs = xmalloc (sizeof *runs * run_cnt);
790   for (i = 0; i < run_cnt; i++) 
791     {
792       struct run *r = &runs[i];
793       r->file = input_files[i];
794       r->reader = casefile_get_destructive_reader (r->file);
795       if (!casereader_read_xfer (r->reader, &r->ccase))
796         {
797           run_cnt--;
798           i--;
799         }
800     }
801
802   /* Create output file. */
803   output = casefile_create (xsrt->value_cnt);
804   casefile_to_disk (output);
805
806   /* Merge. */
807   while (run_cnt > 0) 
808     {
809       struct run *min_run, *run;
810       
811       /* Find minimum. */
812       min_run = runs;
813       for (run = runs + 1; run < runs + run_cnt; run++)
814         if (compare_record (&run->ccase, &min_run->ccase, xsrt->criteria) < 0)
815           min_run = run;
816
817       /* Write minimum to output file. */
818       casefile_append_xfer (output, &min_run->ccase);
819
820       /* Read another case from minimum run. */
821       if (!casereader_read_xfer (min_run->reader, &min_run->ccase))
822         {
823           casereader_destroy (min_run->reader);
824           casefile_destroy (min_run->file);
825
826           remove_element (runs, run_cnt, sizeof *runs, min_run - runs);
827           run_cnt--;
828         } 
829     }
830
831   casefile_sleep (output);
832   free (runs);
833
834   return output;
835 }