First phase of making SORT CASES stable (PR 12035).
[pspp-builds.git] / src / sort.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include "sort.h"
22 #include "error.h"
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <errno.h>
26 #include "algorithm.h"
27 #include "alloc.h"
28 #include "bool.h"
29 #include "case.h"
30 #include "casefile.h"
31 #include "command.h"
32 #include "error.h"
33 #include "expressions/public.h"
34 #include "lexer.h"
35 #include "misc.h"
36 #include "settings.h"
37 #include "str.h"
38 #include "var.h"
39 #include "vfm.h"
40 #include "vfmP.h"
41
42 #if HAVE_UNISTD_H
43 #include <unistd.h>
44 #endif
45
46 #if HAVE_SYS_TYPES_H
47 #include <sys/types.h>
48 #endif
49
50 #if HAVE_SYS_STAT_H
51 #include <sys/stat.h>
52 #endif
53
54 #include "debug-print.h"
55
56 /* Sort direction. */
57 enum sort_direction
58   {
59     SRT_ASCEND,                 /* A, B, C, ..., X, Y, Z. */
60     SRT_DESCEND                 /* Z, Y, X, ..., C, B, A. */
61   };
62
63 /* A sort criterion. */
64 struct sort_criterion
65   {
66     int fv;                     /* Variable data index. */
67     int width;                  /* 0=numeric, otherwise string widthe. */
68     enum sort_direction dir;    /* Sort direction. */
69   };
70
71 /* A set of sort criteria. */
72 struct sort_criteria 
73   {
74     struct sort_criterion *crits;
75     size_t crit_cnt;
76   };
77
78 static int compare_record (const struct ccase *, const struct ccase *,
79                            const struct sort_criteria *);
80 static struct casefile *do_internal_sort (struct casereader *,
81                                           const struct sort_criteria *);
82 static struct casefile *do_external_sort (struct casereader *,
83                                           const struct sort_criteria *);
84
85 /* Performs the SORT CASES procedures. */
86 int
87 cmd_sort_cases (void)
88 {
89   struct sort_criteria *criteria;
90   int success;
91
92   lex_match (T_BY);
93
94   criteria = sort_parse_criteria (default_dict, NULL, NULL, NULL);
95   if (criteria == NULL)
96     return CMD_FAILURE;
97
98   success = sort_active_file_in_place (criteria);
99   sort_destroy_criteria (criteria);
100   return success ? lex_end_of_command () : CMD_FAILURE;
101 }
102
103 /* Gets ready to sort the active file, either in-place or to a
104    separate casefile. */
105 static void
106 prepare_to_sort_active_file (void) 
107 {
108   /* Cancel temporary transformations and PROCESS IF. */
109   if (temporary != 0)
110     cancel_temporary (); 
111   expr_free (process_if_expr);
112   process_if_expr = NULL;
113
114   /* Make sure source cases are in a storage source. */
115   procedure (NULL, NULL);
116   assert (case_source_is_class (vfm_source, &storage_source_class));
117 }
118
119 /* Sorts the active file in-place according to CRITERIA.
120    Returns nonzero if successful. */
121 int
122 sort_active_file_in_place (const struct sort_criteria *criteria) 
123 {
124   struct casefile *src, *dst;
125   
126   prepare_to_sort_active_file ();
127
128   src = storage_source_get_casefile (vfm_source);
129   dst = sort_execute (casefile_get_destructive_reader (src), criteria);
130   free_case_source (vfm_source);
131   vfm_source = NULL;
132
133   if (dst == NULL) 
134     return 0;
135
136   vfm_source = storage_source_create (dst);
137   return 1;
138 }
139
140 /* Sorts the active file to a separate casefile.  If successful,
141    returns the sorted casefile.  Returns a null pointer on
142    failure. */
143 struct casefile *
144 sort_active_file_to_casefile (const struct sort_criteria *criteria) 
145 {
146   struct casefile *src;
147   
148   prepare_to_sort_active_file ();
149
150   src = storage_source_get_casefile (vfm_source);
151   return sort_execute (casefile_get_reader (src), criteria);
152 }
153
154 /* Parses a list of sort keys and returns a struct sort_criteria
155    based on it.  Returns a null pointer on error.
156    If SAW_DIRECTION is nonnull, sets *SAW_DIRECTION to true if at
157    least one parenthesized sort direction was specified, false
158    otherwise. */
159 struct sort_criteria *
160 sort_parse_criteria (const struct dictionary *dict,
161                      struct variable ***vars, int *var_cnt,
162                      bool *saw_direction)
163 {
164   struct sort_criteria *criteria;
165   struct variable **local_vars = NULL;
166   size_t local_var_cnt;
167
168   assert ((vars == NULL) == (var_cnt == NULL));
169   if (vars == NULL) 
170     {
171       vars = &local_vars;
172       var_cnt = &local_var_cnt;
173     }
174
175   criteria = xmalloc (sizeof *criteria);
176   criteria->crits = NULL;
177   criteria->crit_cnt = 0;
178
179   *vars = NULL;
180   *var_cnt = 0;
181   if (saw_direction != NULL)
182     *saw_direction = false;
183
184   do
185     {
186       int prev_var_cnt = *var_cnt;
187       enum sort_direction direction;
188
189       /* Variables. */
190       if (!parse_variables (dict, vars, var_cnt,
191                             PV_NO_DUPLICATE | PV_APPEND | PV_NO_SCRATCH))
192         goto error;
193
194       /* Sort direction. */
195       if (lex_match ('('))
196         {
197           if (lex_match_id ("D") || lex_match_id ("DOWN"))
198             direction = SRT_DESCEND;
199           else if (lex_match_id ("A") || lex_match_id ("UP"))
200             direction = SRT_ASCEND;
201           else
202             {
203               msg (SE, _("`A' or `D' expected inside parentheses."));
204               goto error;
205             }
206           if (!lex_match (')'))
207             {
208               msg (SE, _("`)' expected."));
209               goto error;
210             }
211           *saw_direction = true;
212         }
213       else
214         direction = SRT_ASCEND;
215
216       criteria->crits = xrealloc (criteria->crits,
217                                   sizeof *criteria->crits * *var_cnt);
218       criteria->crit_cnt = *var_cnt;
219       for (; prev_var_cnt < criteria->crit_cnt; prev_var_cnt++) 
220         {
221           struct sort_criterion *c = &criteria->crits[prev_var_cnt];
222           c->fv = (*vars)[prev_var_cnt]->fv;
223           c->width = (*vars)[prev_var_cnt]->width;
224           c->dir = direction;
225         }
226     }
227   while (token != '.' && token != '/');
228
229   free (local_vars);
230   return criteria;
231
232  error:
233   free (local_vars);
234   sort_destroy_criteria (criteria);
235   return NULL;
236 }
237
238 /* Destroys a SORT CASES program. */
239 void
240 sort_destroy_criteria (struct sort_criteria *criteria) 
241 {
242   if (criteria != NULL) 
243     {
244       free (criteria->crits);
245       free (criteria);
246     }
247 }
248
249 /* Reads all the cases from READER, which is destroyed.  Sorts
250    the cases according to CRITERIA.  Returns the sorted cases in
251    a newly created casefile. */
252 struct casefile *
253 sort_execute (struct casereader *reader, const struct sort_criteria *criteria)
254 {
255   struct casefile *output;
256
257   output = do_internal_sort (reader, criteria);
258   if (output == NULL)
259     output = do_external_sort (reader, criteria);
260   casereader_destroy (reader);
261   return output;
262 }
263 \f
264 /* A case and its index. */
265 struct indexed_case 
266   {
267     struct ccase c;     /* Case. */
268     unsigned long idx;  /* Index to allow for stable sorting. */
269   };
270
271 static int compare_indexed_cases (const void *, const void *, void *);
272
273 /* If the data is in memory, do an internal sort and return a new
274    casefile for the data. */
275 static struct casefile *
276 do_internal_sort (struct casereader *reader,
277                   const struct sort_criteria *criteria)
278 {
279   const struct casefile *src;
280   struct casefile *dst;
281   unsigned long case_cnt;
282
283   src = casereader_get_casefile (reader);
284   if (casefile_get_case_cnt (src) > 1 && !casefile_in_core (src))
285     return NULL;
286       
287   case_cnt = casefile_get_case_cnt (src);
288   dst = casefile_create (casefile_get_value_cnt (src));
289   if (case_cnt != 0) 
290     {
291       struct indexed_case *cases = malloc (sizeof *cases * case_cnt);
292       if (cases != NULL) 
293         {
294           unsigned long i;
295           
296           for (i = 0; i < case_cnt; i++)
297             {
298               casereader_read_xfer_assert (reader, &cases[i].c);
299               cases[i].idx = i;
300             }
301
302           sort (cases, case_cnt, sizeof *cases, compare_indexed_cases,
303                 (void *) criteria);
304       
305           for (i = 0; i < case_cnt; i++)
306             casefile_append_xfer (dst, &cases[i].c);
307
308           free (cases);
309         }
310       else 
311         {
312           /* Failure. */
313           casefile_destroy (dst);
314           dst = NULL;
315         }
316     }
317
318   return dst;
319 }
320
321 /* Compares the variables specified by CRITERIA between the cases
322    at A and B, with a "last resort" comparison for stability, and
323    returns a strcmp()-type result. */
324 static int
325 compare_indexed_cases (const void *a_, const void *b_, void *criteria_)
326 {
327   struct sort_criteria *criteria = criteria_;
328   const struct indexed_case *a = a_;
329   const struct indexed_case *b = b_;
330   int result = compare_record (&a->c, &b->c, criteria);
331   if (result == 0)
332     result = a->idx < b->idx ? -1 : a->idx > b->idx;
333   return result;
334 }
335 \f
336 /* External sort. */
337
338 /* Maximum order of merge.  If you increase this, then you should
339    use a heap for comparing cases during merge.  */
340 #define MAX_MERGE_ORDER         7
341
342 /* Minimum total number of records for buffers. */
343 #define MIN_BUFFER_TOTAL_SIZE_RECS      64
344
345 /* Minimum single input buffer size, in bytes and records. */
346 #define MIN_BUFFER_SIZE_BYTES   4096
347 #define MIN_BUFFER_SIZE_RECS    16
348
349 #if MIN_BUFFER_SIZE_RECS * 2 + 16 > MIN_BUFFER_TOTAL_SIZE_RECS
350 #error MIN_BUFFER_SIZE_RECS and MIN_BUFFER_TOTAL_SIZE_RECS do not make sense.
351 #endif
352
353 /* Sorts initial runs A and B in decending order by length. */
354 static int
355 compare_initial_runs (const void *a_, const void *b_, void *aux UNUSED) 
356 {
357   struct casefile *const *a = a_;
358   struct casefile *const *b = b_;
359   unsigned long a_case_cnt = casefile_get_case_cnt (*a);
360   unsigned long b_case_cnt = casefile_get_case_cnt (*b);
361   
362   return a_case_cnt > b_case_cnt ? -1 : a_case_cnt < b_case_cnt;
363 }
364
365 /* Results of an external sort. */
366 struct external_sort 
367   {
368     const struct sort_criteria *criteria; /* Sort criteria. */
369     size_t value_cnt;                 /* Size of data in `union value's. */
370     struct casefile **initial_runs;   /* Array of initial runs. */
371     size_t run_cnt, run_cap;          /* Number of runs, allocated capacity. */
372   };
373
374 /* Prototypes for helper functions. */
375 static int write_initial_runs (struct external_sort *, struct casereader *);
376 static int merge (struct external_sort *);
377 static void destroy_external_sort (struct external_sort *);
378
379 /* Performs an external sort of the active file according to the
380    specification in SCP.  Forms initial runs using a heap as a
381    reservoir.  Determines the optimum merge pattern via Huffman's
382    method (see Knuth vol. 3, 2nd edition, p. 365-366), and merges
383    according to that pattern. */
384 static struct casefile *
385 do_external_sort (struct casereader *reader,
386                   const struct sort_criteria *criteria)
387 {
388   struct external_sort *xsrt;
389
390   casefile_to_disk (casereader_get_casefile (reader));
391
392   xsrt = xmalloc (sizeof *xsrt);
393   xsrt->criteria = criteria;
394   xsrt->value_cnt = casefile_get_value_cnt (casereader_get_casefile (reader));
395   xsrt->run_cap = 512;
396   xsrt->run_cnt = 0;
397   xsrt->initial_runs = xmalloc (sizeof *xsrt->initial_runs * xsrt->run_cap);
398   if (write_initial_runs (xsrt, reader) && merge (xsrt))
399     {
400       struct casefile *output = xsrt->initial_runs[0];
401       xsrt->initial_runs[0] = NULL;
402       destroy_external_sort (xsrt);
403       return output;
404     }
405   else
406     {
407       destroy_external_sort (xsrt);
408       return NULL;
409     }
410 }
411
412 /* Destroys XSRT. */
413 static void
414 destroy_external_sort (struct external_sort *xsrt) 
415 {
416   if (xsrt != NULL) 
417     {
418       int i;
419       
420       for (i = 0; i < xsrt->run_cnt; i++)
421         casefile_destroy (xsrt->initial_runs[i]);
422       free (xsrt->initial_runs);
423       free (xsrt);
424     }
425 }
426 \f
427 /* Replacement selection. */
428
429 /* Pairs a record with a run number. */
430 struct record_run
431   {
432     int run;                    /* Run number of case. */
433     struct ccase record;        /* Case data. */
434   };
435
436 /* Represents a set of initial runs during an external sort. */
437 struct initial_run_state 
438   {
439     struct external_sort *xsrt;
440
441     /* Reservoir. */
442     struct record_run *records; /* Records arranged as a heap. */
443     size_t record_cnt;          /* Current number of records. */
444     size_t record_cap;          /* Capacity for records. */
445     
446     /* Run currently being output. */
447     int run;                    /* Run number. */
448     size_t case_cnt;            /* Number of cases so far. */
449     struct casefile *casefile;  /* Output file. */
450     struct ccase last_output;   /* Record last output. */
451
452     int okay;                   /* Zero if an error has been encountered. */
453   };
454
455 static const struct case_sink_class sort_sink_class;
456
457 static void destroy_initial_run_state (struct initial_run_state *);
458 static void process_case (struct initial_run_state *, const struct ccase *);
459 static int allocate_cases (struct initial_run_state *);
460 static void output_record (struct initial_run_state *);
461 static void start_run (struct initial_run_state *);
462 static void end_run (struct initial_run_state *);
463 static int compare_record_run (const struct record_run *,
464                                const struct record_run *,
465                                struct initial_run_state *);
466 static int compare_record_run_minheap (const void *, const void *, void *);
467
468 /* Reads cases from READER and composes initial runs in XSRT. */
469 static int
470 write_initial_runs (struct external_sort *xsrt, struct casereader *reader)
471 {
472   struct initial_run_state *irs;
473   struct ccase c;
474   int success = 0;
475
476   /* Allocate memory for cases. */
477   irs = xmalloc (sizeof *irs);
478   irs->xsrt = xsrt;
479   irs->records = NULL;
480   irs->record_cnt = irs->record_cap = 0;
481   irs->run = 0;
482   irs->case_cnt = 0;
483   irs->casefile = NULL;
484   case_nullify (&irs->last_output);
485   irs->okay = 1;
486   if (!allocate_cases (irs)) 
487     goto done;
488
489   /* Create initial runs. */
490   start_run (irs);
491   for (; irs->okay && casereader_read (reader, &c); case_destroy (&c))
492     process_case (irs, &c);
493   while (irs->okay && irs->record_cnt > 0)
494     output_record (irs);
495   end_run (irs);
496
497   success = irs->okay;
498
499  done:
500   destroy_initial_run_state (irs);
501
502   return success;
503 }
504
505 /* Add a single case to an initial run. */
506 static void
507 process_case (struct initial_run_state *irs, const struct ccase *c)
508 {
509   struct record_run *new_record_run;
510
511   /* Compose record_run for this run and add to heap. */
512   assert (irs->record_cnt < irs->record_cap - 1);
513   new_record_run = irs->records + irs->record_cnt++;
514   case_copy (&new_record_run->record, 0, c, 0, irs->xsrt->value_cnt);
515   new_record_run->run = irs->run;
516   if (!case_is_null (&irs->last_output)
517       && compare_record (c, &irs->last_output, irs->xsrt->criteria) < 0)
518     new_record_run->run = irs->run + 1;
519   push_heap (irs->records, irs->record_cnt, sizeof *irs->records,
520              compare_record_run_minheap, irs);
521
522   /* Output a record if the reservoir is full. */
523   if (irs->record_cnt == irs->record_cap - 1 && irs->okay)
524     output_record (irs);
525 }
526
527 /* Destroys the initial run state represented by IRS. */
528 static void
529 destroy_initial_run_state (struct initial_run_state *irs) 
530 {
531   int i;
532
533   if (irs == NULL)
534     return;
535
536   for (i = 0; i < irs->record_cap; i++)
537     case_destroy (&irs->records[i].record);
538   free (irs->records);
539
540   if (irs->casefile != NULL)
541     casefile_sleep (irs->casefile);
542
543   free (irs);
544 }
545
546 /* Allocates room for lots of cases as a buffer. */
547 static int
548 allocate_cases (struct initial_run_state *irs)
549 {
550   int approx_case_cost; /* Approximate memory cost of one case in bytes. */
551   int max_cases;        /* Maximum number of cases to allocate. */
552   int i;
553
554   /* Allocate as many cases as we can within the workspace
555      limit. */
556   approx_case_cost = (sizeof *irs->records
557                       + irs->xsrt->value_cnt * sizeof (union value)
558                       + 4 * sizeof (void *));
559   max_cases = get_max_workspace() / approx_case_cost;
560   irs->records = malloc (sizeof *irs->records * max_cases);
561   for (i = 0; i < max_cases; i++)
562     if (!case_try_create (&irs->records[i].record, irs->xsrt->value_cnt))
563       {
564         max_cases = i;
565         break;
566       }
567   irs->record_cap = max_cases;
568
569   /* Fail if we didn't allocate an acceptable number of cases. */
570   if (irs->records == NULL || max_cases < MIN_BUFFER_TOTAL_SIZE_RECS)
571     {
572       msg (SE, _("Out of memory.  Could not allocate room for minimum of %d "
573                  "cases of %d bytes each.  (PSPP workspace is currently "
574                  "restricted to a maximum of %d KB.)"),
575            MIN_BUFFER_TOTAL_SIZE_RECS, approx_case_cost, get_max_workspace() / 1024);
576       return 0;
577     }
578   return 1;
579 }
580
581 /* Compares the VAR_CNT variables in VARS[] between the `value's at
582    A and B, and returns a strcmp()-type result. */
583 static int
584 compare_record (const struct ccase *a, const struct ccase *b,
585                 const struct sort_criteria *criteria)
586 {
587   int i;
588
589   assert (a != NULL);
590   assert (b != NULL);
591   
592   for (i = 0; i < criteria->crit_cnt; i++)
593     {
594       const struct sort_criterion *c = &criteria->crits[i];
595       int result;
596       
597       if (c->width == 0)
598         {
599           double af = case_num (a, c->fv);
600           double bf = case_num (b, c->fv);
601           
602           result = af < bf ? -1 : af > bf;
603         }
604       else
605         result = memcmp (case_str (a, c->fv), case_str (b, c->fv), c->width);
606
607       if (result != 0)
608         return c->dir == SRT_ASCEND ? result : -result;
609     }
610
611   return 0;
612 }
613
614 /* Compares record-run tuples A and B on run number first, then
615    on the current record according to SCP. */
616 static int
617 compare_record_run (const struct record_run *a,
618                     const struct record_run *b,
619                     struct initial_run_state *irs)
620 {
621   if (a->run != b->run)
622     return a->run > b->run ? 1 : -1;
623   else
624     return compare_record (&a->record, &b->record, irs->xsrt->criteria);
625 }
626
627 /* Compares record-run tuples A and B on run number first, then
628    on the current record according to SCP, but in descending
629    order. */
630 static int
631 compare_record_run_minheap (const void *a, const void *b, void *irs) 
632 {
633   return -compare_record_run (a, b, irs);
634 }
635
636 /* Begins a new initial run, specifically its output file. */
637 static void
638 start_run (struct initial_run_state *irs)
639 {
640   irs->run++;
641   irs->case_cnt = 0;
642   irs->casefile = casefile_create (irs->xsrt->value_cnt);
643   casefile_to_disk (irs->casefile);
644   case_nullify (&irs->last_output); 
645 }
646
647 /* Ends the current initial run.  */
648 static void
649 end_run (struct initial_run_state *irs)
650 {
651   struct external_sort *xsrt = irs->xsrt;
652
653   /* Record initial run. */
654   if (irs->casefile != NULL) 
655     {
656       if (xsrt->run_cnt >= xsrt->run_cap) 
657         {
658           xsrt->run_cap *= 2;
659           xsrt->initial_runs
660             = xrealloc (xsrt->initial_runs,
661                         sizeof *xsrt->initial_runs * xsrt->run_cap);
662         }
663       xsrt->initial_runs[xsrt->run_cnt++] = irs->casefile;
664       irs->casefile = NULL; 
665     }
666 }
667
668 /* Writes a record to the current initial run. */
669 static void
670 output_record (struct initial_run_state *irs)
671 {
672   struct record_run *record_run;
673   struct ccase case_tmp;
674   
675   /* Extract minimum case from heap. */
676   assert (irs->record_cnt > 0);
677   pop_heap (irs->records, irs->record_cnt--, sizeof *irs->records,
678             compare_record_run_minheap, irs);
679   record_run = irs->records + irs->record_cnt;
680
681   /* Bail if an error has occurred. */
682   if (!irs->okay)
683     return;
684
685   /* Start new run if necessary. */
686   assert (record_run->run == irs->run
687           || record_run->run == irs->run + 1);
688   if (record_run->run != irs->run)
689     {
690       end_run (irs);
691       start_run (irs);
692     }
693   assert (record_run->run == irs->run);
694   irs->case_cnt++;
695
696   /* Write to disk. */
697   if (irs->casefile != NULL)
698     casefile_append (irs->casefile, &record_run->record);
699
700   /* This record becomes last_output. */
701   irs->last_output = case_tmp = record_run->record;
702   record_run->record = irs->records[irs->record_cap - 1].record;
703   irs->records[irs->record_cap - 1].record = case_tmp;
704 }
705 \f
706 /* Merging. */
707
708 /* State of merging initial runs. */
709 struct merge_state 
710   {
711     struct external_sort *xsrt; /* External sort state. */
712     struct ccase *cases;        /* Buffers. */
713     size_t case_cnt;            /* Number of buffers. */
714   };
715
716 struct run;
717 static struct casefile *merge_once (struct merge_state *,
718                                     struct casefile *[], size_t);
719 static int mod (int, int);
720
721 /* Performs a series of P-way merges of initial runs. */
722 static int
723 merge (struct external_sort *xsrt)
724 {
725   struct merge_state mrg;       /* State of merge. */
726   size_t approx_case_cost;      /* Approximate memory cost of one case. */
727   int max_order;                /* Maximum order of merge. */
728   size_t dummy_run_cnt;         /* Number of dummy runs to insert. */
729   int success = 0;
730   int i;
731
732   mrg.xsrt = xsrt;
733
734   /* Allocate as many cases as possible into cases. */
735   approx_case_cost = (sizeof *mrg.cases
736                       + xsrt->value_cnt * sizeof (union value)
737                       + 4 * sizeof (void *));
738   mrg.case_cnt = get_max_workspace() / approx_case_cost;
739   mrg.cases = malloc (sizeof *mrg.cases * mrg.case_cnt);
740   if (mrg.cases == NULL)
741     goto done;
742   for (i = 0; i < mrg.case_cnt; i++) 
743     if (!case_try_create (&mrg.cases[i], xsrt->value_cnt)) 
744       {
745         mrg.case_cnt = i;
746         break;
747       }
748   if (mrg.case_cnt < MIN_BUFFER_TOTAL_SIZE_RECS)
749     {
750       msg (SE, _("Out of memory.  Could not allocate room for minimum of %d "
751                  "cases of %d bytes each.  (PSPP workspace is currently "
752                  "restricted to a maximum of %d KB.)"),
753            MIN_BUFFER_TOTAL_SIZE_RECS, approx_case_cost, get_max_workspace() / 1024);
754       return 0;
755     }
756
757   /* Determine maximum order of merge. */
758   max_order = MAX_MERGE_ORDER;
759   if (mrg.case_cnt / max_order < MIN_BUFFER_SIZE_RECS)
760     max_order = mrg.case_cnt / MIN_BUFFER_SIZE_RECS;
761   else if (mrg.case_cnt / max_order * xsrt->value_cnt * sizeof (union value)
762            < MIN_BUFFER_SIZE_BYTES)
763     max_order = mrg.case_cnt / (MIN_BUFFER_SIZE_BYTES
764                                 / (xsrt->value_cnt * sizeof (union value)));
765   if (max_order < 2)
766     max_order = 2;
767   if (max_order > xsrt->run_cnt)
768     max_order = xsrt->run_cnt;
769
770   /* Repeatedly merge the P shortest existing runs until only one run
771      is left. */
772   make_heap (xsrt->initial_runs, xsrt->run_cnt, sizeof *xsrt->initial_runs,
773              compare_initial_runs, NULL);
774   dummy_run_cnt = mod (1 - (int) xsrt->run_cnt, max_order - 1);
775
776   assert (max_order > 0);
777   assert (max_order <= 2
778           || (xsrt->run_cnt + dummy_run_cnt) % (max_order - 1) == 1);
779   while (xsrt->run_cnt > 1)
780     {
781       struct casefile *output_run;
782       int order;
783       int i;
784
785       /* Choose order of merge (max_order after first merge). */
786       order = max_order - dummy_run_cnt;
787       dummy_run_cnt = 0;
788
789       /* Choose runs to merge. */
790       assert (xsrt->run_cnt >= order);
791       for (i = 0; i < order; i++) 
792         pop_heap (xsrt->initial_runs, xsrt->run_cnt--,
793                   sizeof *xsrt->initial_runs,
794                   compare_initial_runs, NULL); 
795           
796       /* Merge runs. */
797       output_run = merge_once (&mrg,
798                                xsrt->initial_runs + xsrt->run_cnt, order);
799       if (output_run == NULL)
800         goto done;
801       
802       /* Add output run to heap. */
803       xsrt->initial_runs[xsrt->run_cnt++] = output_run;
804       push_heap (xsrt->initial_runs, xsrt->run_cnt, sizeof *xsrt->initial_runs,
805                  compare_initial_runs, NULL);
806     }
807
808   /* Exactly one run is left, which contains the entire sorted
809      file.  We could use it to find a total case count. */
810   assert (xsrt->run_cnt == 1);
811
812   success = 1;
813
814  done:
815   for (i = 0; i < mrg.case_cnt; i++)
816     case_destroy (&mrg.cases[i]);
817   free (mrg.cases);
818
819   return success;
820 }
821
822 /* Modulo function as defined by Knuth. */
823 static int
824 mod (int x, int y)
825 {
826   if (y == 0)
827     return x;
828   else if (x == 0)
829     return 0;
830   else if (x > 0 && y > 0)
831     return x % y;
832   else if (x < 0 && y > 0)
833     return y - (-x) % y;
834
835   abort ();
836 }
837
838 /* Merges the RUN_CNT initial runs specified in INPUT_RUNS into a
839    new run.  Returns nonzero only if successful.  Adds an entry
840    to MRG->xsrt->runs for the output file if and only if the
841    output file is actually created.  Always deletes all the input
842    files. */
843 static struct casefile *
844 merge_once (struct merge_state *mrg,
845             struct casefile *input_runs[],
846             size_t run_cnt)
847 {
848   struct casereader *input_readers[MAX_MERGE_ORDER];
849   struct ccase input_cases[MAX_MERGE_ORDER];
850   struct casefile *output_casefile = NULL;
851   int i;
852
853   for (i = 0; i < run_cnt; i++) 
854     {
855       input_readers[i] = casefile_get_destructive_reader (input_runs[i]);
856       if (!casereader_read_xfer (input_readers[i], &input_cases[i]))
857         {
858           run_cnt--;
859           i--;
860         }
861     }
862   
863   output_casefile = casefile_create (mrg->xsrt->value_cnt);
864   casefile_to_disk (output_casefile);
865
866   /* Merge. */
867   while (run_cnt > 0) 
868     {
869       size_t min_idx;
870
871       /* Find minimum. */
872       min_idx = 0;
873       for (i = 1; i < run_cnt; i++)
874         if (compare_record (&input_cases[i], &input_cases[min_idx],
875                             mrg->xsrt->criteria) < 0)
876           min_idx = i;
877
878       /* Write minimum to output file. */
879       casefile_append_xfer (output_casefile, &input_cases[min_idx]);
880
881       if (!casereader_read_xfer (input_readers[min_idx],
882                                  &input_cases[min_idx]))
883         {
884           casereader_destroy (input_readers[min_idx]);
885           casefile_destroy (input_runs[min_idx]);
886
887           run_cnt--;
888           input_runs[min_idx] = input_runs[run_cnt];
889           input_readers[min_idx] = input_readers[run_cnt];
890           input_cases[min_idx] = input_cases[run_cnt];
891         } 
892     }
893
894   casefile_sleep (output_casefile);
895
896   return output_casefile;
897 }