Rewrite expression code.
[pspp-builds.git] / src / sort.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include "sort.h"
22 #include "error.h"
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <errno.h>
26 #include "algorithm.h"
27 #include "alloc.h"
28 #include "case.h"
29 #include "casefile.h"
30 #include "command.h"
31 #include "error.h"
32 #include "expressions/public.h"
33 #include "lexer.h"
34 #include "misc.h"
35 #include "settings.h"
36 #include "str.h"
37 #include "var.h"
38 #include "vfm.h"
39 #include "vfmP.h"
40
41 #if HAVE_UNISTD_H
42 #include <unistd.h>
43 #endif
44
45 #if HAVE_SYS_TYPES_H
46 #include <sys/types.h>
47 #endif
48
49 #if HAVE_SYS_STAT_H
50 #include <sys/stat.h>
51 #endif
52
53 #include "debug-print.h"
54
55 /* Sort direction. */
56 enum sort_direction
57   {
58     SRT_ASCEND,                 /* A, B, C, ..., X, Y, Z. */
59     SRT_DESCEND                 /* Z, Y, X, ..., C, B, A. */
60   };
61
62 /* A sort criterion. */
63 struct sort_criterion
64   {
65     int fv;                     /* Variable data index. */
66     int width;                  /* 0=numeric, otherwise string widthe. */
67     enum sort_direction dir;    /* Sort direction. */
68   };
69
70 /* A set of sort criteria. */
71 struct sort_criteria 
72   {
73     struct sort_criterion *crits;
74     size_t crit_cnt;
75   };
76
77 static int compare_case_dblptrs (const void *, const void *, void *);
78 static int compare_record (const struct ccase *, const struct ccase *,
79                            const struct sort_criteria *);
80 static struct casefile *do_internal_sort (struct casereader *,
81                                           const struct sort_criteria *);
82 static struct casefile *do_external_sort (struct casereader *,
83                                           const struct sort_criteria *);
84
85 /* Performs the SORT CASES procedures. */
86 int
87 cmd_sort_cases (void)
88 {
89   struct sort_criteria *criteria;
90   int success;
91
92   lex_match (T_BY);
93
94   criteria = sort_parse_criteria (default_dict, NULL, NULL);
95   if (criteria == NULL)
96     return CMD_FAILURE;
97
98   success = sort_active_file_in_place (criteria);
99   sort_destroy_criteria (criteria);
100   return success ? lex_end_of_command () : CMD_FAILURE;
101 }
102
103 /* Gets ready to sort the active file, either in-place or to a
104    separate casefile. */
105 static void
106 prepare_to_sort_active_file (void) 
107 {
108   /* Cancel temporary transformations and PROCESS IF. */
109   if (temporary != 0)
110     cancel_temporary (); 
111   expr_free (process_if_expr);
112   process_if_expr = NULL;
113
114   /* Make sure source cases are in a storage source. */
115   procedure (NULL, NULL);
116   assert (case_source_is_class (vfm_source, &storage_source_class));
117 }
118
119 /* Sorts the active file in-place according to CRITERIA.
120    Returns nonzero if successful. */
121 int
122 sort_active_file_in_place (const struct sort_criteria *criteria) 
123 {
124   struct casefile *src, *dst;
125   
126   prepare_to_sort_active_file ();
127
128   src = storage_source_get_casefile (vfm_source);
129   dst = sort_execute (casefile_get_destructive_reader (src), criteria);
130   free_case_source (vfm_source);
131   vfm_source = NULL;
132
133   if (dst == NULL) 
134     return 0;
135
136   vfm_source = storage_source_create (dst, default_dict);
137   return 1;
138 }
139
140 /* Sorts the active file to a separate casefile.  If successful,
141    returns the sorted casefile.  Returns a null pointer on
142    failure. */
143 struct casefile *
144 sort_active_file_to_casefile (const struct sort_criteria *criteria) 
145 {
146   struct casefile *src;
147   
148   prepare_to_sort_active_file ();
149
150   src = storage_source_get_casefile (vfm_source);
151   return sort_execute (casefile_get_reader (src), criteria);
152 }
153
154 /* Parses a list of sort keys and returns a struct sort_cases_pgm
155    based on it.  Returns a null pointer on error. */
156 struct sort_criteria *
157 sort_parse_criteria (const struct dictionary *dict,
158                      struct variable ***vars, int *var_cnt)
159 {
160   struct sort_criteria *criteria;
161   struct variable **local_vars = NULL;
162   size_t local_var_cnt;
163
164   assert ((vars == NULL) == (var_cnt == NULL));
165   if (vars == NULL) 
166     {
167       vars = &local_vars;
168       var_cnt = &local_var_cnt;
169     }
170
171   criteria = xmalloc (sizeof *criteria);
172   criteria->crits = NULL;
173   criteria->crit_cnt = 0;
174
175   *vars = NULL;
176   *var_cnt = 0;
177
178   do
179     {
180       int prev_var_cnt = *var_cnt;
181       enum sort_direction direction;
182
183       /* Variables. */
184       if (!parse_variables (dict, vars, var_cnt,
185                             PV_NO_DUPLICATE | PV_APPEND | PV_NO_SCRATCH))
186         goto error;
187
188       /* Sort direction. */
189       if (lex_match ('('))
190         {
191           if (lex_match_id ("D") || lex_match_id ("DOWN"))
192             direction = SRT_DESCEND;
193           else if (lex_match_id ("A") || lex_match_id ("UP"))
194             direction = SRT_ASCEND;
195           else
196             {
197               msg (SE, _("`A' or `D' expected inside parentheses."));
198               goto error;
199             }
200           if (!lex_match (')'))
201             {
202               msg (SE, _("`)' expected."));
203               goto error;
204             }
205         }
206       else
207         direction = SRT_ASCEND;
208
209       criteria->crits = xrealloc (criteria->crits,
210                                   sizeof *criteria->crits * *var_cnt);
211       criteria->crit_cnt = *var_cnt;
212       for (; prev_var_cnt < criteria->crit_cnt; prev_var_cnt++) 
213         {
214           struct sort_criterion *c = &criteria->crits[prev_var_cnt];
215           c->fv = (*vars)[prev_var_cnt]->fv;
216           c->width = (*vars)[prev_var_cnt]->width;
217           c->dir = direction;
218         }
219     }
220   while (token != '.' && token != '/');
221
222   free (local_vars);
223   return criteria;
224
225  error:
226   free (local_vars);
227   sort_destroy_criteria (criteria);
228   return NULL;
229 }
230
231 /* Destroys a SORT CASES program. */
232 void
233 sort_destroy_criteria (struct sort_criteria *criteria) 
234 {
235   if (criteria != NULL) 
236     {
237       free (criteria->crits);
238       free (criteria);
239     }
240 }
241
242 /* Reads all the cases from READER, which is destroyed.  Sorts
243    the cases according to CRITERIA.  Returns the sorted cases in
244    a newly created casefile. */
245 struct casefile *
246 sort_execute (struct casereader *reader, const struct sort_criteria *criteria)
247 {
248   struct casefile *output;
249
250   output = do_internal_sort (reader, criteria);
251   if (output == NULL)
252     output = do_external_sort (reader, criteria);
253   casereader_destroy (reader);
254   return output;
255 }
256 \f
257 /* If the data is in memory, do an internal sort and return a new
258    casefile for the data. */
259 static struct casefile *
260 do_internal_sort (struct casereader *reader,
261                   const struct sort_criteria *criteria)
262 {
263   const struct casefile *src;
264   struct casefile *dst;
265   struct ccase *cases, **case_ptrs;
266   unsigned long case_cnt;
267
268   src = casereader_get_casefile (reader);
269   if (casefile_get_case_cnt (src) > 1 && !casefile_in_core (src))
270     return NULL;
271       
272   case_cnt = casefile_get_case_cnt (src);
273   cases = malloc (sizeof *cases * case_cnt);
274   case_ptrs = malloc (sizeof *case_ptrs * case_cnt);
275   if ((cases != NULL && case_ptrs != NULL) || case_cnt == 0) 
276     {
277       unsigned long case_idx;
278       
279       for (case_idx = 0; case_idx < case_cnt; case_idx++) 
280         {
281           int success = casereader_read_xfer (reader, &cases[case_idx]);
282           assert (success);
283           case_ptrs[case_idx] = &cases[case_idx];
284         }
285
286       sort (case_ptrs, case_cnt, sizeof *case_ptrs, compare_case_dblptrs,
287             (void *) criteria);
288       
289       dst = casefile_create (casefile_get_value_cnt (src));
290       for (case_idx = 0; case_idx < case_cnt; case_idx++) 
291         casefile_append_xfer (dst, case_ptrs[case_idx]);
292     }
293   else
294     dst = NULL;
295   
296   free (case_ptrs);
297   free (cases);
298
299   return dst;
300 }
301
302 /* Compares the variables specified by CRITERIA between the cases
303    at A and B, and returns a strcmp()-type result. */
304 static int
305 compare_case_dblptrs (const void *a_, const void *b_, void *criteria_)
306 {
307   struct sort_criteria *criteria = criteria_;
308   struct ccase *const *pa = a_;
309   struct ccase *const *pb = b_;
310   struct ccase *a = *pa;
311   struct ccase *b = *pb;
312  
313   return compare_record (a, b, criteria);
314 }
315 \f
316 /* External sort. */
317
318 /* Maximum order of merge.  If you increase this, then you should
319    use a heap for comparing cases during merge.  */
320 #define MAX_MERGE_ORDER         7
321
322 /* Minimum total number of records for buffers. */
323 #define MIN_BUFFER_TOTAL_SIZE_RECS      64
324
325 /* Minimum single input buffer size, in bytes and records. */
326 #define MIN_BUFFER_SIZE_BYTES   4096
327 #define MIN_BUFFER_SIZE_RECS    16
328
329 #if MIN_BUFFER_SIZE_RECS * 2 + 16 > MIN_BUFFER_TOTAL_SIZE_RECS
330 #error MIN_BUFFER_SIZE_RECS and MIN_BUFFER_TOTAL_SIZE_RECS do not make sense.
331 #endif
332
333 /* Sorts initial runs A and B in decending order by length. */
334 static int
335 compare_initial_runs (const void *a_, const void *b_, void *aux UNUSED) 
336 {
337   struct casefile *const *a = a_;
338   struct casefile *const *b = b_;
339   unsigned long a_case_cnt = casefile_get_case_cnt (*a);
340   unsigned long b_case_cnt = casefile_get_case_cnt (*b);
341   
342   return a_case_cnt > b_case_cnt ? -1 : a_case_cnt < b_case_cnt;
343 }
344
345 /* Results of an external sort. */
346 struct external_sort 
347   {
348     const struct sort_criteria *criteria; /* Sort criteria. */
349     size_t value_cnt;                 /* Size of data in `union value's. */
350     struct casefile **initial_runs;   /* Array of initial runs. */
351     size_t run_cnt, run_cap;          /* Number of runs, allocated capacity. */
352   };
353
354 /* Prototypes for helper functions. */
355 static int write_initial_runs (struct external_sort *, struct casereader *);
356 static int merge (struct external_sort *);
357 static void destroy_external_sort (struct external_sort *);
358
359 /* Performs an external sort of the active file according to the
360    specification in SCP.  Forms initial runs using a heap as a
361    reservoir.  Determines the optimum merge pattern via Huffman's
362    method (see Knuth vol. 3, 2nd edition, p. 365-366), and merges
363    according to that pattern. */
364 static struct casefile *
365 do_external_sort (struct casereader *reader,
366                   const struct sort_criteria *criteria)
367 {
368   struct external_sort *xsrt;
369
370   casefile_to_disk (casereader_get_casefile (reader));
371
372   xsrt = xmalloc (sizeof *xsrt);
373   xsrt->criteria = criteria;
374   xsrt->value_cnt = casefile_get_value_cnt (casereader_get_casefile (reader));
375   xsrt->run_cap = 512;
376   xsrt->run_cnt = 0;
377   xsrt->initial_runs = xmalloc (sizeof *xsrt->initial_runs * xsrt->run_cap);
378   if (write_initial_runs (xsrt, reader) && merge (xsrt))
379     {
380       struct casefile *output = xsrt->initial_runs[0];
381       xsrt->initial_runs[0] = NULL;
382       destroy_external_sort (xsrt);
383       return output;
384     }
385   else
386     {
387       destroy_external_sort (xsrt);
388       return NULL;
389     }
390 }
391
392 /* Destroys XSRT. */
393 static void
394 destroy_external_sort (struct external_sort *xsrt) 
395 {
396   if (xsrt != NULL) 
397     {
398       int i;
399       
400       for (i = 0; i < xsrt->run_cnt; i++)
401         casefile_destroy (xsrt->initial_runs[i]);
402       free (xsrt->initial_runs);
403       free (xsrt);
404     }
405 }
406 \f
407 /* Replacement selection. */
408
409 /* Pairs a record with a run number. */
410 struct record_run
411   {
412     int run;                    /* Run number of case. */
413     struct ccase record;        /* Case data. */
414   };
415
416 /* Represents a set of initial runs during an external sort. */
417 struct initial_run_state 
418   {
419     struct external_sort *xsrt;
420
421     /* Reservoir. */
422     struct record_run *records; /* Records arranged as a heap. */
423     size_t record_cnt;          /* Current number of records. */
424     size_t record_cap;          /* Capacity for records. */
425     
426     /* Run currently being output. */
427     int run;                    /* Run number. */
428     size_t case_cnt;            /* Number of cases so far. */
429     struct casefile *casefile;  /* Output file. */
430     struct ccase last_output;   /* Record last output. */
431
432     int okay;                   /* Zero if an error has been encountered. */
433   };
434
435 static const struct case_sink_class sort_sink_class;
436
437 static void destroy_initial_run_state (struct initial_run_state *);
438 static void process_case (struct initial_run_state *, const struct ccase *);
439 static int allocate_cases (struct initial_run_state *);
440 static void output_record (struct initial_run_state *);
441 static void start_run (struct initial_run_state *);
442 static void end_run (struct initial_run_state *);
443 static int compare_record_run (const struct record_run *,
444                                const struct record_run *,
445                                struct initial_run_state *);
446 static int compare_record_run_minheap (const void *, const void *, void *);
447
448 /* Reads cases from READER and composes initial runs in XSRT. */
449 static int
450 write_initial_runs (struct external_sort *xsrt, struct casereader *reader)
451 {
452   struct initial_run_state *irs;
453   struct ccase c;
454   int success = 0;
455
456   /* Allocate memory for cases. */
457   irs = xmalloc (sizeof *irs);
458   irs->xsrt = xsrt;
459   irs->records = NULL;
460   irs->record_cnt = irs->record_cap = 0;
461   irs->run = 0;
462   irs->case_cnt = 0;
463   irs->casefile = NULL;
464   case_nullify (&irs->last_output);
465   irs->okay = 1;
466   if (!allocate_cases (irs)) 
467     goto done;
468
469   /* Create initial runs. */
470   start_run (irs);
471   for (; irs->okay && casereader_read (reader, &c); case_destroy (&c))
472     process_case (irs, &c);
473   while (irs->okay && irs->record_cnt > 0)
474     output_record (irs);
475   end_run (irs);
476
477   success = irs->okay;
478
479  done:
480   destroy_initial_run_state (irs);
481
482   return success;
483 }
484
485 /* Add a single case to an initial run. */
486 static void
487 process_case (struct initial_run_state *irs, const struct ccase *c)
488 {
489   struct record_run *new_record_run;
490
491   /* Compose record_run for this run and add to heap. */
492   assert (irs->record_cnt < irs->record_cap - 1);
493   new_record_run = irs->records + irs->record_cnt++;
494   case_copy (&new_record_run->record, 0, c, 0, irs->xsrt->value_cnt);
495   new_record_run->run = irs->run;
496   if (!case_is_null (&irs->last_output)
497       && compare_record (c, &irs->last_output, irs->xsrt->criteria) < 0)
498     new_record_run->run = irs->run + 1;
499   push_heap (irs->records, irs->record_cnt, sizeof *irs->records,
500              compare_record_run_minheap, irs);
501
502   /* Output a record if the reservoir is full. */
503   if (irs->record_cnt == irs->record_cap - 1 && irs->okay)
504     output_record (irs);
505 }
506
507 /* Destroys the initial run state represented by IRS. */
508 static void
509 destroy_initial_run_state (struct initial_run_state *irs) 
510 {
511   int i;
512
513   if (irs == NULL)
514     return;
515
516   for (i = 0; i < irs->record_cap; i++)
517     case_destroy (&irs->records[i].record);
518   free (irs->records);
519
520   if (irs->casefile != NULL)
521     casefile_sleep (irs->casefile);
522
523   free (irs);
524 }
525
526 /* Allocates room for lots of cases as a buffer. */
527 static int
528 allocate_cases (struct initial_run_state *irs)
529 {
530   int approx_case_cost; /* Approximate memory cost of one case in bytes. */
531   int max_cases;        /* Maximum number of cases to allocate. */
532   int i;
533
534   /* Allocate as many cases as we can within the workspace
535      limit. */
536   approx_case_cost = (sizeof *irs->records
537                       + irs->xsrt->value_cnt * sizeof (union value)
538                       + 4 * sizeof (void *));
539   max_cases = get_max_workspace() / approx_case_cost;
540   irs->records = malloc (sizeof *irs->records * max_cases);
541   for (i = 0; i < max_cases; i++)
542     if (!case_try_create (&irs->records[i].record, irs->xsrt->value_cnt))
543       {
544         max_cases = i;
545         break;
546       }
547   irs->record_cap = max_cases;
548
549   /* Fail if we didn't allocate an acceptable number of cases. */
550   if (irs->records == NULL || max_cases < MIN_BUFFER_TOTAL_SIZE_RECS)
551     {
552       msg (SE, _("Out of memory.  Could not allocate room for minimum of %d "
553                  "cases of %d bytes each.  (PSPP workspace is currently "
554                  "restricted to a maximum of %d KB.)"),
555            MIN_BUFFER_TOTAL_SIZE_RECS, approx_case_cost, get_max_workspace() / 1024);
556       return 0;
557     }
558   return 1;
559 }
560
561 /* Compares the VAR_CNT variables in VARS[] between the `value's at
562    A and B, and returns a strcmp()-type result. */
563 static int
564 compare_record (const struct ccase *a, const struct ccase *b,
565                 const struct sort_criteria *criteria)
566 {
567   int i;
568
569   assert (a != NULL);
570   assert (b != NULL);
571   
572   for (i = 0; i < criteria->crit_cnt; i++)
573     {
574       const struct sort_criterion *c = &criteria->crits[i];
575       int result;
576       
577       if (c->width == 0)
578         {
579           double af = case_num (a, c->fv);
580           double bf = case_num (b, c->fv);
581           
582           result = af < bf ? -1 : af > bf;
583         }
584       else
585         result = memcmp (case_str (a, c->fv), case_str (b, c->fv), c->width);
586
587       if (result != 0)
588         return c->dir == SRT_ASCEND ? result : -result;
589     }
590
591   return 0;
592 }
593
594 /* Compares record-run tuples A and B on run number first, then
595    on the current record according to SCP. */
596 static int
597 compare_record_run (const struct record_run *a,
598                     const struct record_run *b,
599                     struct initial_run_state *irs)
600 {
601   if (a->run != b->run)
602     return a->run > b->run ? 1 : -1;
603   else
604     return compare_record (&a->record, &b->record, irs->xsrt->criteria);
605 }
606
607 /* Compares record-run tuples A and B on run number first, then
608    on the current record according to SCP, but in descending
609    order. */
610 static int
611 compare_record_run_minheap (const void *a, const void *b, void *irs) 
612 {
613   return -compare_record_run (a, b, irs);
614 }
615
616 /* Begins a new initial run, specifically its output file. */
617 static void
618 start_run (struct initial_run_state *irs)
619 {
620   irs->run++;
621   irs->case_cnt = 0;
622   irs->casefile = casefile_create (irs->xsrt->value_cnt);
623   casefile_to_disk (irs->casefile);
624   case_nullify (&irs->last_output); 
625 }
626
627 /* Ends the current initial run.  */
628 static void
629 end_run (struct initial_run_state *irs)
630 {
631   struct external_sort *xsrt = irs->xsrt;
632
633   /* Record initial run. */
634   if (irs->casefile != NULL) 
635     {
636       if (xsrt->run_cnt >= xsrt->run_cap) 
637         {
638           xsrt->run_cap *= 2;
639           xsrt->initial_runs
640             = xrealloc (xsrt->initial_runs,
641                         sizeof *xsrt->initial_runs * xsrt->run_cap);
642         }
643       xsrt->initial_runs[xsrt->run_cnt++] = irs->casefile;
644       irs->casefile = NULL; 
645     }
646 }
647
648 /* Writes a record to the current initial run. */
649 static void
650 output_record (struct initial_run_state *irs)
651 {
652   struct record_run *record_run;
653   struct ccase case_tmp;
654   
655   /* Extract minimum case from heap. */
656   assert (irs->record_cnt > 0);
657   pop_heap (irs->records, irs->record_cnt--, sizeof *irs->records,
658             compare_record_run_minheap, irs);
659   record_run = irs->records + irs->record_cnt;
660
661   /* Bail if an error has occurred. */
662   if (!irs->okay)
663     return;
664
665   /* Start new run if necessary. */
666   assert (record_run->run == irs->run
667           || record_run->run == irs->run + 1);
668   if (record_run->run != irs->run)
669     {
670       end_run (irs);
671       start_run (irs);
672     }
673   assert (record_run->run == irs->run);
674   irs->case_cnt++;
675
676   /* Write to disk. */
677   if (irs->casefile != NULL)
678     casefile_append (irs->casefile, &record_run->record);
679
680   /* This record becomes last_output. */
681   irs->last_output = case_tmp = record_run->record;
682   record_run->record = irs->records[irs->record_cap - 1].record;
683   irs->records[irs->record_cap - 1].record = case_tmp;
684 }
685 \f
686 /* Merging. */
687
688 /* State of merging initial runs. */
689 struct merge_state 
690   {
691     struct external_sort *xsrt; /* External sort state. */
692     struct ccase *cases;        /* Buffers. */
693     size_t case_cnt;            /* Number of buffers. */
694   };
695
696 struct run;
697 static struct casefile *merge_once (struct merge_state *,
698                                     struct casefile *[], size_t);
699 static int mod (int, int);
700
701 /* Performs a series of P-way merges of initial runs. */
702 static int
703 merge (struct external_sort *xsrt)
704 {
705   struct merge_state mrg;       /* State of merge. */
706   size_t approx_case_cost;      /* Approximate memory cost of one case. */
707   int max_order;                /* Maximum order of merge. */
708   size_t dummy_run_cnt;         /* Number of dummy runs to insert. */
709   int success = 0;
710   int i;
711
712   mrg.xsrt = xsrt;
713
714   /* Allocate as many cases as possible into cases. */
715   approx_case_cost = (sizeof *mrg.cases
716                       + xsrt->value_cnt * sizeof (union value)
717                       + 4 * sizeof (void *));
718   mrg.case_cnt = get_max_workspace() / approx_case_cost;
719   mrg.cases = malloc (sizeof *mrg.cases * mrg.case_cnt);
720   if (mrg.cases == NULL)
721     goto done;
722   for (i = 0; i < mrg.case_cnt; i++) 
723     if (!case_try_create (&mrg.cases[i], xsrt->value_cnt)) 
724       {
725         mrg.case_cnt = i;
726         break;
727       }
728   if (mrg.case_cnt < MIN_BUFFER_TOTAL_SIZE_RECS)
729     {
730       msg (SE, _("Out of memory.  Could not allocate room for minimum of %d "
731                  "cases of %d bytes each.  (PSPP workspace is currently "
732                  "restricted to a maximum of %d KB.)"),
733            MIN_BUFFER_TOTAL_SIZE_RECS, approx_case_cost, get_max_workspace() / 1024);
734       return 0;
735     }
736
737   /* Determine maximum order of merge. */
738   max_order = MAX_MERGE_ORDER;
739   if (mrg.case_cnt / max_order < MIN_BUFFER_SIZE_RECS)
740     max_order = mrg.case_cnt / MIN_BUFFER_SIZE_RECS;
741   else if (mrg.case_cnt / max_order * xsrt->value_cnt * sizeof (union value)
742            < MIN_BUFFER_SIZE_BYTES)
743     max_order = mrg.case_cnt / (MIN_BUFFER_SIZE_BYTES
744                                 / (xsrt->value_cnt * sizeof (union value)));
745   if (max_order < 2)
746     max_order = 2;
747   if (max_order > xsrt->run_cnt)
748     max_order = xsrt->run_cnt;
749
750   /* Repeatedly merge the P shortest existing runs until only one run
751      is left. */
752   make_heap (xsrt->initial_runs, xsrt->run_cnt, sizeof *xsrt->initial_runs,
753              compare_initial_runs, NULL);
754   dummy_run_cnt = mod (1 - (int) xsrt->run_cnt, max_order - 1);
755
756   assert (max_order > 0);
757   assert (max_order <= 2
758           || (xsrt->run_cnt + dummy_run_cnt) % (max_order - 1) == 1);
759   while (xsrt->run_cnt > 1)
760     {
761       struct casefile *output_run;
762       int order;
763       int i;
764
765       /* Choose order of merge (max_order after first merge). */
766       order = max_order - dummy_run_cnt;
767       dummy_run_cnt = 0;
768
769       /* Choose runs to merge. */
770       assert (xsrt->run_cnt >= order);
771       for (i = 0; i < order; i++) 
772         pop_heap (xsrt->initial_runs, xsrt->run_cnt--,
773                   sizeof *xsrt->initial_runs,
774                   compare_initial_runs, NULL); 
775           
776       /* Merge runs. */
777       output_run = merge_once (&mrg,
778                                xsrt->initial_runs + xsrt->run_cnt, order);
779       if (output_run == NULL)
780         goto done;
781       
782       /* Add output run to heap. */
783       xsrt->initial_runs[xsrt->run_cnt++] = output_run;
784       push_heap (xsrt->initial_runs, xsrt->run_cnt, sizeof *xsrt->initial_runs,
785                  compare_initial_runs, NULL);
786     }
787
788   /* Exactly one run is left, which contains the entire sorted
789      file.  We could use it to find a total case count. */
790   assert (xsrt->run_cnt == 1);
791
792   success = 1;
793
794  done:
795   for (i = 0; i < mrg.case_cnt; i++)
796     case_destroy (&mrg.cases[i]);
797   free (mrg.cases);
798
799   return success;
800 }
801
802 /* Modulo function as defined by Knuth. */
803 static int
804 mod (int x, int y)
805 {
806   if (y == 0)
807     return x;
808   else if (x == 0)
809     return 0;
810   else if (x > 0 && y > 0)
811     return x % y;
812   else if (x < 0 && y > 0)
813     return y - (-x) % y;
814
815   abort ();
816 }
817
818 /* Merges the RUN_CNT initial runs specified in INPUT_RUNS into a
819    new run.  Returns nonzero only if successful.  Adds an entry
820    to MRG->xsrt->runs for the output file if and only if the
821    output file is actually created.  Always deletes all the input
822    files. */
823 static struct casefile *
824 merge_once (struct merge_state *mrg,
825             struct casefile *input_runs[],
826             size_t run_cnt)
827 {
828   struct casereader *input_readers[MAX_MERGE_ORDER];
829   struct ccase input_cases[MAX_MERGE_ORDER];
830   struct casefile *output_casefile = NULL;
831   int i;
832
833   for (i = 0; i < run_cnt; i++) 
834     {
835       input_readers[i] = casefile_get_destructive_reader (input_runs[i]);
836       if (!casereader_read_xfer (input_readers[i], &input_cases[i]))
837         {
838           run_cnt--;
839           i--;
840         }
841     }
842   
843   output_casefile = casefile_create (mrg->xsrt->value_cnt);
844   casefile_to_disk (output_casefile);
845
846   /* Merge. */
847   while (run_cnt > 0) 
848     {
849       size_t min_idx;
850
851       /* Find minimum. */
852       min_idx = 0;
853       for (i = 1; i < run_cnt; i++)
854         if (compare_record (&input_cases[i], &input_cases[min_idx],
855                             mrg->xsrt->criteria) < 0)
856           min_idx = i;
857
858       /* Write minimum to output file. */
859       casefile_append_xfer (output_casefile, &input_cases[min_idx]);
860
861       if (!casereader_read_xfer (input_readers[min_idx],
862                                  &input_cases[min_idx]))
863         {
864           casereader_destroy (input_readers[min_idx]);
865           casefile_destroy (input_runs[min_idx]);
866
867           run_cnt--;
868           input_runs[min_idx] = input_runs[run_cnt];
869           input_readers[min_idx] = input_readers[run_cnt];
870           input_cases[min_idx] = input_cases[run_cnt];
871         } 
872     }
873
874   casefile_sleep (output_casefile);
875
876   return output_casefile;
877 }