Start work on testing and debugging AGGREGATE.
[pspp-builds.git] / src / sort.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include "sort.h"
22 #include "error.h"
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <errno.h>
26 #include "algorithm.h"
27 #include "alloc.h"
28 #include "bool.h"
29 #include "case.h"
30 #include "casefile.h"
31 #include "command.h"
32 #include "error.h"
33 #include "expressions/public.h"
34 #include "lexer.h"
35 #include "misc.h"
36 #include "settings.h"
37 #include "str.h"
38 #include "var.h"
39 #include "vfm.h"
40 #include "vfmP.h"
41
42 #if HAVE_UNISTD_H
43 #include <unistd.h>
44 #endif
45
46 #if HAVE_SYS_TYPES_H
47 #include <sys/types.h>
48 #endif
49
50 #if HAVE_SYS_STAT_H
51 #include <sys/stat.h>
52 #endif
53
54 #include "debug-print.h"
55
56 /* Sort direction. */
57 enum sort_direction
58   {
59     SRT_ASCEND,                 /* A, B, C, ..., X, Y, Z. */
60     SRT_DESCEND                 /* Z, Y, X, ..., C, B, A. */
61   };
62
63 /* A sort criterion. */
64 struct sort_criterion
65   {
66     int fv;                     /* Variable data index. */
67     int width;                  /* 0=numeric, otherwise string widthe. */
68     enum sort_direction dir;    /* Sort direction. */
69   };
70
71 /* A set of sort criteria. */
72 struct sort_criteria 
73   {
74     struct sort_criterion *crits;
75     size_t crit_cnt;
76   };
77
78 static int compare_case_dblptrs (const void *, const void *, void *);
79 static int compare_record (const struct ccase *, const struct ccase *,
80                            const struct sort_criteria *);
81 static struct casefile *do_internal_sort (struct casereader *,
82                                           const struct sort_criteria *);
83 static struct casefile *do_external_sort (struct casereader *,
84                                           const struct sort_criteria *);
85
86 /* Performs the SORT CASES procedures. */
87 int
88 cmd_sort_cases (void)
89 {
90   struct sort_criteria *criteria;
91   int success;
92
93   lex_match (T_BY);
94
95   criteria = sort_parse_criteria (default_dict, NULL, NULL, NULL);
96   if (criteria == NULL)
97     return CMD_FAILURE;
98
99   success = sort_active_file_in_place (criteria);
100   sort_destroy_criteria (criteria);
101   return success ? lex_end_of_command () : CMD_FAILURE;
102 }
103
104 /* Gets ready to sort the active file, either in-place or to a
105    separate casefile. */
106 static void
107 prepare_to_sort_active_file (void) 
108 {
109   /* Cancel temporary transformations and PROCESS IF. */
110   if (temporary != 0)
111     cancel_temporary (); 
112   expr_free (process_if_expr);
113   process_if_expr = NULL;
114
115   /* Make sure source cases are in a storage source. */
116   procedure (NULL, NULL);
117   assert (case_source_is_class (vfm_source, &storage_source_class));
118 }
119
120 /* Sorts the active file in-place according to CRITERIA.
121    Returns nonzero if successful. */
122 int
123 sort_active_file_in_place (const struct sort_criteria *criteria) 
124 {
125   struct casefile *src, *dst;
126   
127   prepare_to_sort_active_file ();
128
129   src = storage_source_get_casefile (vfm_source);
130   dst = sort_execute (casefile_get_destructive_reader (src), criteria);
131   free_case_source (vfm_source);
132   vfm_source = NULL;
133
134   if (dst == NULL) 
135     return 0;
136
137   vfm_source = storage_source_create (dst, default_dict);
138   return 1;
139 }
140
141 /* Sorts the active file to a separate casefile.  If successful,
142    returns the sorted casefile.  Returns a null pointer on
143    failure. */
144 struct casefile *
145 sort_active_file_to_casefile (const struct sort_criteria *criteria) 
146 {
147   struct casefile *src;
148   
149   prepare_to_sort_active_file ();
150
151   src = storage_source_get_casefile (vfm_source);
152   return sort_execute (casefile_get_reader (src), criteria);
153 }
154
155 /* Parses a list of sort keys and returns a struct sort_criteria
156    based on it.  Returns a null pointer on error.
157    If SAW_DIRECTION is nonnull, sets *SAW_DIRECTION to true if at
158    least one parenthesized sort direction was specified, false
159    otherwise. */
160 struct sort_criteria *
161 sort_parse_criteria (const struct dictionary *dict,
162                      struct variable ***vars, int *var_cnt,
163                      bool *saw_direction)
164 {
165   struct sort_criteria *criteria;
166   struct variable **local_vars = NULL;
167   size_t local_var_cnt;
168
169   assert ((vars == NULL) == (var_cnt == NULL));
170   if (vars == NULL) 
171     {
172       vars = &local_vars;
173       var_cnt = &local_var_cnt;
174     }
175
176   criteria = xmalloc (sizeof *criteria);
177   criteria->crits = NULL;
178   criteria->crit_cnt = 0;
179
180   *vars = NULL;
181   *var_cnt = 0;
182   if (saw_direction != NULL)
183     *saw_direction = false;
184
185   do
186     {
187       int prev_var_cnt = *var_cnt;
188       enum sort_direction direction;
189
190       /* Variables. */
191       if (!parse_variables (dict, vars, var_cnt,
192                             PV_NO_DUPLICATE | PV_APPEND | PV_NO_SCRATCH))
193         goto error;
194
195       /* Sort direction. */
196       if (lex_match ('('))
197         {
198           if (lex_match_id ("D") || lex_match_id ("DOWN"))
199             direction = SRT_DESCEND;
200           else if (lex_match_id ("A") || lex_match_id ("UP"))
201             direction = SRT_ASCEND;
202           else
203             {
204               msg (SE, _("`A' or `D' expected inside parentheses."));
205               goto error;
206             }
207           if (!lex_match (')'))
208             {
209               msg (SE, _("`)' expected."));
210               goto error;
211             }
212           *saw_direction = true;
213         }
214       else
215         direction = SRT_ASCEND;
216
217       criteria->crits = xrealloc (criteria->crits,
218                                   sizeof *criteria->crits * *var_cnt);
219       criteria->crit_cnt = *var_cnt;
220       for (; prev_var_cnt < criteria->crit_cnt; prev_var_cnt++) 
221         {
222           struct sort_criterion *c = &criteria->crits[prev_var_cnt];
223           c->fv = (*vars)[prev_var_cnt]->fv;
224           c->width = (*vars)[prev_var_cnt]->width;
225           c->dir = direction;
226         }
227     }
228   while (token != '.' && token != '/');
229
230   free (local_vars);
231   return criteria;
232
233  error:
234   free (local_vars);
235   sort_destroy_criteria (criteria);
236   return NULL;
237 }
238
239 /* Destroys a SORT CASES program. */
240 void
241 sort_destroy_criteria (struct sort_criteria *criteria) 
242 {
243   if (criteria != NULL) 
244     {
245       free (criteria->crits);
246       free (criteria);
247     }
248 }
249
250 /* Reads all the cases from READER, which is destroyed.  Sorts
251    the cases according to CRITERIA.  Returns the sorted cases in
252    a newly created casefile. */
253 struct casefile *
254 sort_execute (struct casereader *reader, const struct sort_criteria *criteria)
255 {
256   struct casefile *output;
257
258   output = do_internal_sort (reader, criteria);
259   if (output == NULL)
260     output = do_external_sort (reader, criteria);
261   casereader_destroy (reader);
262   return output;
263 }
264 \f
265 /* If the data is in memory, do an internal sort and return a new
266    casefile for the data. */
267 static struct casefile *
268 do_internal_sort (struct casereader *reader,
269                   const struct sort_criteria *criteria)
270 {
271   const struct casefile *src;
272   struct casefile *dst;
273   struct ccase *cases, **case_ptrs;
274   unsigned long case_cnt;
275
276   src = casereader_get_casefile (reader);
277   if (casefile_get_case_cnt (src) > 1 && !casefile_in_core (src))
278     return NULL;
279       
280   case_cnt = casefile_get_case_cnt (src);
281   cases = malloc (sizeof *cases * case_cnt);
282   case_ptrs = malloc (sizeof *case_ptrs * case_cnt);
283   if ((cases != NULL && case_ptrs != NULL) || case_cnt == 0) 
284     {
285       unsigned long case_idx;
286       
287       for (case_idx = 0; case_idx < case_cnt; case_idx++) 
288         {
289           int success = casereader_read_xfer (reader, &cases[case_idx]);
290           assert (success);
291           case_ptrs[case_idx] = &cases[case_idx];
292         }
293
294       sort (case_ptrs, case_cnt, sizeof *case_ptrs, compare_case_dblptrs,
295             (void *) criteria);
296       
297       dst = casefile_create (casefile_get_value_cnt (src));
298       for (case_idx = 0; case_idx < case_cnt; case_idx++) 
299         casefile_append_xfer (dst, case_ptrs[case_idx]);
300     }
301   else
302     dst = NULL;
303   
304   free (case_ptrs);
305   free (cases);
306
307   return dst;
308 }
309
310 /* Compares the variables specified by CRITERIA between the cases
311    at A and B, and returns a strcmp()-type result. */
312 static int
313 compare_case_dblptrs (const void *a_, const void *b_, void *criteria_)
314 {
315   struct sort_criteria *criteria = criteria_;
316   struct ccase *const *pa = a_;
317   struct ccase *const *pb = b_;
318   struct ccase *a = *pa;
319   struct ccase *b = *pb;
320  
321   return compare_record (a, b, criteria);
322 }
323 \f
324 /* External sort. */
325
326 /* Maximum order of merge.  If you increase this, then you should
327    use a heap for comparing cases during merge.  */
328 #define MAX_MERGE_ORDER         7
329
330 /* Minimum total number of records for buffers. */
331 #define MIN_BUFFER_TOTAL_SIZE_RECS      64
332
333 /* Minimum single input buffer size, in bytes and records. */
334 #define MIN_BUFFER_SIZE_BYTES   4096
335 #define MIN_BUFFER_SIZE_RECS    16
336
337 #if MIN_BUFFER_SIZE_RECS * 2 + 16 > MIN_BUFFER_TOTAL_SIZE_RECS
338 #error MIN_BUFFER_SIZE_RECS and MIN_BUFFER_TOTAL_SIZE_RECS do not make sense.
339 #endif
340
341 /* Sorts initial runs A and B in decending order by length. */
342 static int
343 compare_initial_runs (const void *a_, const void *b_, void *aux UNUSED) 
344 {
345   struct casefile *const *a = a_;
346   struct casefile *const *b = b_;
347   unsigned long a_case_cnt = casefile_get_case_cnt (*a);
348   unsigned long b_case_cnt = casefile_get_case_cnt (*b);
349   
350   return a_case_cnt > b_case_cnt ? -1 : a_case_cnt < b_case_cnt;
351 }
352
353 /* Results of an external sort. */
354 struct external_sort 
355   {
356     const struct sort_criteria *criteria; /* Sort criteria. */
357     size_t value_cnt;                 /* Size of data in `union value's. */
358     struct casefile **initial_runs;   /* Array of initial runs. */
359     size_t run_cnt, run_cap;          /* Number of runs, allocated capacity. */
360   };
361
362 /* Prototypes for helper functions. */
363 static int write_initial_runs (struct external_sort *, struct casereader *);
364 static int merge (struct external_sort *);
365 static void destroy_external_sort (struct external_sort *);
366
367 /* Performs an external sort of the active file according to the
368    specification in SCP.  Forms initial runs using a heap as a
369    reservoir.  Determines the optimum merge pattern via Huffman's
370    method (see Knuth vol. 3, 2nd edition, p. 365-366), and merges
371    according to that pattern. */
372 static struct casefile *
373 do_external_sort (struct casereader *reader,
374                   const struct sort_criteria *criteria)
375 {
376   struct external_sort *xsrt;
377
378   casefile_to_disk (casereader_get_casefile (reader));
379
380   xsrt = xmalloc (sizeof *xsrt);
381   xsrt->criteria = criteria;
382   xsrt->value_cnt = casefile_get_value_cnt (casereader_get_casefile (reader));
383   xsrt->run_cap = 512;
384   xsrt->run_cnt = 0;
385   xsrt->initial_runs = xmalloc (sizeof *xsrt->initial_runs * xsrt->run_cap);
386   if (write_initial_runs (xsrt, reader) && merge (xsrt))
387     {
388       struct casefile *output = xsrt->initial_runs[0];
389       xsrt->initial_runs[0] = NULL;
390       destroy_external_sort (xsrt);
391       return output;
392     }
393   else
394     {
395       destroy_external_sort (xsrt);
396       return NULL;
397     }
398 }
399
400 /* Destroys XSRT. */
401 static void
402 destroy_external_sort (struct external_sort *xsrt) 
403 {
404   if (xsrt != NULL) 
405     {
406       int i;
407       
408       for (i = 0; i < xsrt->run_cnt; i++)
409         casefile_destroy (xsrt->initial_runs[i]);
410       free (xsrt->initial_runs);
411       free (xsrt);
412     }
413 }
414 \f
415 /* Replacement selection. */
416
417 /* Pairs a record with a run number. */
418 struct record_run
419   {
420     int run;                    /* Run number of case. */
421     struct ccase record;        /* Case data. */
422   };
423
424 /* Represents a set of initial runs during an external sort. */
425 struct initial_run_state 
426   {
427     struct external_sort *xsrt;
428
429     /* Reservoir. */
430     struct record_run *records; /* Records arranged as a heap. */
431     size_t record_cnt;          /* Current number of records. */
432     size_t record_cap;          /* Capacity for records. */
433     
434     /* Run currently being output. */
435     int run;                    /* Run number. */
436     size_t case_cnt;            /* Number of cases so far. */
437     struct casefile *casefile;  /* Output file. */
438     struct ccase last_output;   /* Record last output. */
439
440     int okay;                   /* Zero if an error has been encountered. */
441   };
442
443 static const struct case_sink_class sort_sink_class;
444
445 static void destroy_initial_run_state (struct initial_run_state *);
446 static void process_case (struct initial_run_state *, const struct ccase *);
447 static int allocate_cases (struct initial_run_state *);
448 static void output_record (struct initial_run_state *);
449 static void start_run (struct initial_run_state *);
450 static void end_run (struct initial_run_state *);
451 static int compare_record_run (const struct record_run *,
452                                const struct record_run *,
453                                struct initial_run_state *);
454 static int compare_record_run_minheap (const void *, const void *, void *);
455
456 /* Reads cases from READER and composes initial runs in XSRT. */
457 static int
458 write_initial_runs (struct external_sort *xsrt, struct casereader *reader)
459 {
460   struct initial_run_state *irs;
461   struct ccase c;
462   int success = 0;
463
464   /* Allocate memory for cases. */
465   irs = xmalloc (sizeof *irs);
466   irs->xsrt = xsrt;
467   irs->records = NULL;
468   irs->record_cnt = irs->record_cap = 0;
469   irs->run = 0;
470   irs->case_cnt = 0;
471   irs->casefile = NULL;
472   case_nullify (&irs->last_output);
473   irs->okay = 1;
474   if (!allocate_cases (irs)) 
475     goto done;
476
477   /* Create initial runs. */
478   start_run (irs);
479   for (; irs->okay && casereader_read (reader, &c); case_destroy (&c))
480     process_case (irs, &c);
481   while (irs->okay && irs->record_cnt > 0)
482     output_record (irs);
483   end_run (irs);
484
485   success = irs->okay;
486
487  done:
488   destroy_initial_run_state (irs);
489
490   return success;
491 }
492
493 /* Add a single case to an initial run. */
494 static void
495 process_case (struct initial_run_state *irs, const struct ccase *c)
496 {
497   struct record_run *new_record_run;
498
499   /* Compose record_run for this run and add to heap. */
500   assert (irs->record_cnt < irs->record_cap - 1);
501   new_record_run = irs->records + irs->record_cnt++;
502   case_copy (&new_record_run->record, 0, c, 0, irs->xsrt->value_cnt);
503   new_record_run->run = irs->run;
504   if (!case_is_null (&irs->last_output)
505       && compare_record (c, &irs->last_output, irs->xsrt->criteria) < 0)
506     new_record_run->run = irs->run + 1;
507   push_heap (irs->records, irs->record_cnt, sizeof *irs->records,
508              compare_record_run_minheap, irs);
509
510   /* Output a record if the reservoir is full. */
511   if (irs->record_cnt == irs->record_cap - 1 && irs->okay)
512     output_record (irs);
513 }
514
515 /* Destroys the initial run state represented by IRS. */
516 static void
517 destroy_initial_run_state (struct initial_run_state *irs) 
518 {
519   int i;
520
521   if (irs == NULL)
522     return;
523
524   for (i = 0; i < irs->record_cap; i++)
525     case_destroy (&irs->records[i].record);
526   free (irs->records);
527
528   if (irs->casefile != NULL)
529     casefile_sleep (irs->casefile);
530
531   free (irs);
532 }
533
534 /* Allocates room for lots of cases as a buffer. */
535 static int
536 allocate_cases (struct initial_run_state *irs)
537 {
538   int approx_case_cost; /* Approximate memory cost of one case in bytes. */
539   int max_cases;        /* Maximum number of cases to allocate. */
540   int i;
541
542   /* Allocate as many cases as we can within the workspace
543      limit. */
544   approx_case_cost = (sizeof *irs->records
545                       + irs->xsrt->value_cnt * sizeof (union value)
546                       + 4 * sizeof (void *));
547   max_cases = get_max_workspace() / approx_case_cost;
548   irs->records = malloc (sizeof *irs->records * max_cases);
549   for (i = 0; i < max_cases; i++)
550     if (!case_try_create (&irs->records[i].record, irs->xsrt->value_cnt))
551       {
552         max_cases = i;
553         break;
554       }
555   irs->record_cap = max_cases;
556
557   /* Fail if we didn't allocate an acceptable number of cases. */
558   if (irs->records == NULL || max_cases < MIN_BUFFER_TOTAL_SIZE_RECS)
559     {
560       msg (SE, _("Out of memory.  Could not allocate room for minimum of %d "
561                  "cases of %d bytes each.  (PSPP workspace is currently "
562                  "restricted to a maximum of %d KB.)"),
563            MIN_BUFFER_TOTAL_SIZE_RECS, approx_case_cost, get_max_workspace() / 1024);
564       return 0;
565     }
566   return 1;
567 }
568
569 /* Compares the VAR_CNT variables in VARS[] between the `value's at
570    A and B, and returns a strcmp()-type result. */
571 static int
572 compare_record (const struct ccase *a, const struct ccase *b,
573                 const struct sort_criteria *criteria)
574 {
575   int i;
576
577   assert (a != NULL);
578   assert (b != NULL);
579   
580   for (i = 0; i < criteria->crit_cnt; i++)
581     {
582       const struct sort_criterion *c = &criteria->crits[i];
583       int result;
584       
585       if (c->width == 0)
586         {
587           double af = case_num (a, c->fv);
588           double bf = case_num (b, c->fv);
589           
590           result = af < bf ? -1 : af > bf;
591         }
592       else
593         result = memcmp (case_str (a, c->fv), case_str (b, c->fv), c->width);
594
595       if (result != 0)
596         return c->dir == SRT_ASCEND ? result : -result;
597     }
598
599   return 0;
600 }
601
602 /* Compares record-run tuples A and B on run number first, then
603    on the current record according to SCP. */
604 static int
605 compare_record_run (const struct record_run *a,
606                     const struct record_run *b,
607                     struct initial_run_state *irs)
608 {
609   if (a->run != b->run)
610     return a->run > b->run ? 1 : -1;
611   else
612     return compare_record (&a->record, &b->record, irs->xsrt->criteria);
613 }
614
615 /* Compares record-run tuples A and B on run number first, then
616    on the current record according to SCP, but in descending
617    order. */
618 static int
619 compare_record_run_minheap (const void *a, const void *b, void *irs) 
620 {
621   return -compare_record_run (a, b, irs);
622 }
623
624 /* Begins a new initial run, specifically its output file. */
625 static void
626 start_run (struct initial_run_state *irs)
627 {
628   irs->run++;
629   irs->case_cnt = 0;
630   irs->casefile = casefile_create (irs->xsrt->value_cnt);
631   casefile_to_disk (irs->casefile);
632   case_nullify (&irs->last_output); 
633 }
634
635 /* Ends the current initial run.  */
636 static void
637 end_run (struct initial_run_state *irs)
638 {
639   struct external_sort *xsrt = irs->xsrt;
640
641   /* Record initial run. */
642   if (irs->casefile != NULL) 
643     {
644       if (xsrt->run_cnt >= xsrt->run_cap) 
645         {
646           xsrt->run_cap *= 2;
647           xsrt->initial_runs
648             = xrealloc (xsrt->initial_runs,
649                         sizeof *xsrt->initial_runs * xsrt->run_cap);
650         }
651       xsrt->initial_runs[xsrt->run_cnt++] = irs->casefile;
652       irs->casefile = NULL; 
653     }
654 }
655
656 /* Writes a record to the current initial run. */
657 static void
658 output_record (struct initial_run_state *irs)
659 {
660   struct record_run *record_run;
661   struct ccase case_tmp;
662   
663   /* Extract minimum case from heap. */
664   assert (irs->record_cnt > 0);
665   pop_heap (irs->records, irs->record_cnt--, sizeof *irs->records,
666             compare_record_run_minheap, irs);
667   record_run = irs->records + irs->record_cnt;
668
669   /* Bail if an error has occurred. */
670   if (!irs->okay)
671     return;
672
673   /* Start new run if necessary. */
674   assert (record_run->run == irs->run
675           || record_run->run == irs->run + 1);
676   if (record_run->run != irs->run)
677     {
678       end_run (irs);
679       start_run (irs);
680     }
681   assert (record_run->run == irs->run);
682   irs->case_cnt++;
683
684   /* Write to disk. */
685   if (irs->casefile != NULL)
686     casefile_append (irs->casefile, &record_run->record);
687
688   /* This record becomes last_output. */
689   irs->last_output = case_tmp = record_run->record;
690   record_run->record = irs->records[irs->record_cap - 1].record;
691   irs->records[irs->record_cap - 1].record = case_tmp;
692 }
693 \f
694 /* Merging. */
695
696 /* State of merging initial runs. */
697 struct merge_state 
698   {
699     struct external_sort *xsrt; /* External sort state. */
700     struct ccase *cases;        /* Buffers. */
701     size_t case_cnt;            /* Number of buffers. */
702   };
703
704 struct run;
705 static struct casefile *merge_once (struct merge_state *,
706                                     struct casefile *[], size_t);
707 static int mod (int, int);
708
709 /* Performs a series of P-way merges of initial runs. */
710 static int
711 merge (struct external_sort *xsrt)
712 {
713   struct merge_state mrg;       /* State of merge. */
714   size_t approx_case_cost;      /* Approximate memory cost of one case. */
715   int max_order;                /* Maximum order of merge. */
716   size_t dummy_run_cnt;         /* Number of dummy runs to insert. */
717   int success = 0;
718   int i;
719
720   mrg.xsrt = xsrt;
721
722   /* Allocate as many cases as possible into cases. */
723   approx_case_cost = (sizeof *mrg.cases
724                       + xsrt->value_cnt * sizeof (union value)
725                       + 4 * sizeof (void *));
726   mrg.case_cnt = get_max_workspace() / approx_case_cost;
727   mrg.cases = malloc (sizeof *mrg.cases * mrg.case_cnt);
728   if (mrg.cases == NULL)
729     goto done;
730   for (i = 0; i < mrg.case_cnt; i++) 
731     if (!case_try_create (&mrg.cases[i], xsrt->value_cnt)) 
732       {
733         mrg.case_cnt = i;
734         break;
735       }
736   if (mrg.case_cnt < MIN_BUFFER_TOTAL_SIZE_RECS)
737     {
738       msg (SE, _("Out of memory.  Could not allocate room for minimum of %d "
739                  "cases of %d bytes each.  (PSPP workspace is currently "
740                  "restricted to a maximum of %d KB.)"),
741            MIN_BUFFER_TOTAL_SIZE_RECS, approx_case_cost, get_max_workspace() / 1024);
742       return 0;
743     }
744
745   /* Determine maximum order of merge. */
746   max_order = MAX_MERGE_ORDER;
747   if (mrg.case_cnt / max_order < MIN_BUFFER_SIZE_RECS)
748     max_order = mrg.case_cnt / MIN_BUFFER_SIZE_RECS;
749   else if (mrg.case_cnt / max_order * xsrt->value_cnt * sizeof (union value)
750            < MIN_BUFFER_SIZE_BYTES)
751     max_order = mrg.case_cnt / (MIN_BUFFER_SIZE_BYTES
752                                 / (xsrt->value_cnt * sizeof (union value)));
753   if (max_order < 2)
754     max_order = 2;
755   if (max_order > xsrt->run_cnt)
756     max_order = xsrt->run_cnt;
757
758   /* Repeatedly merge the P shortest existing runs until only one run
759      is left. */
760   make_heap (xsrt->initial_runs, xsrt->run_cnt, sizeof *xsrt->initial_runs,
761              compare_initial_runs, NULL);
762   dummy_run_cnt = mod (1 - (int) xsrt->run_cnt, max_order - 1);
763
764   assert (max_order > 0);
765   assert (max_order <= 2
766           || (xsrt->run_cnt + dummy_run_cnt) % (max_order - 1) == 1);
767   while (xsrt->run_cnt > 1)
768     {
769       struct casefile *output_run;
770       int order;
771       int i;
772
773       /* Choose order of merge (max_order after first merge). */
774       order = max_order - dummy_run_cnt;
775       dummy_run_cnt = 0;
776
777       /* Choose runs to merge. */
778       assert (xsrt->run_cnt >= order);
779       for (i = 0; i < order; i++) 
780         pop_heap (xsrt->initial_runs, xsrt->run_cnt--,
781                   sizeof *xsrt->initial_runs,
782                   compare_initial_runs, NULL); 
783           
784       /* Merge runs. */
785       output_run = merge_once (&mrg,
786                                xsrt->initial_runs + xsrt->run_cnt, order);
787       if (output_run == NULL)
788         goto done;
789       
790       /* Add output run to heap. */
791       xsrt->initial_runs[xsrt->run_cnt++] = output_run;
792       push_heap (xsrt->initial_runs, xsrt->run_cnt, sizeof *xsrt->initial_runs,
793                  compare_initial_runs, NULL);
794     }
795
796   /* Exactly one run is left, which contains the entire sorted
797      file.  We could use it to find a total case count. */
798   assert (xsrt->run_cnt == 1);
799
800   success = 1;
801
802  done:
803   for (i = 0; i < mrg.case_cnt; i++)
804     case_destroy (&mrg.cases[i]);
805   free (mrg.cases);
806
807   return success;
808 }
809
810 /* Modulo function as defined by Knuth. */
811 static int
812 mod (int x, int y)
813 {
814   if (y == 0)
815     return x;
816   else if (x == 0)
817     return 0;
818   else if (x > 0 && y > 0)
819     return x % y;
820   else if (x < 0 && y > 0)
821     return y - (-x) % y;
822
823   abort ();
824 }
825
826 /* Merges the RUN_CNT initial runs specified in INPUT_RUNS into a
827    new run.  Returns nonzero only if successful.  Adds an entry
828    to MRG->xsrt->runs for the output file if and only if the
829    output file is actually created.  Always deletes all the input
830    files. */
831 static struct casefile *
832 merge_once (struct merge_state *mrg,
833             struct casefile *input_runs[],
834             size_t run_cnt)
835 {
836   struct casereader *input_readers[MAX_MERGE_ORDER];
837   struct ccase input_cases[MAX_MERGE_ORDER];
838   struct casefile *output_casefile = NULL;
839   int i;
840
841   for (i = 0; i < run_cnt; i++) 
842     {
843       input_readers[i] = casefile_get_destructive_reader (input_runs[i]);
844       if (!casereader_read_xfer (input_readers[i], &input_cases[i]))
845         {
846           run_cnt--;
847           i--;
848         }
849     }
850   
851   output_casefile = casefile_create (mrg->xsrt->value_cnt);
852   casefile_to_disk (output_casefile);
853
854   /* Merge. */
855   while (run_cnt > 0) 
856     {
857       size_t min_idx;
858
859       /* Find minimum. */
860       min_idx = 0;
861       for (i = 1; i < run_cnt; i++)
862         if (compare_record (&input_cases[i], &input_cases[min_idx],
863                             mrg->xsrt->criteria) < 0)
864           min_idx = i;
865
866       /* Write minimum to output file. */
867       casefile_append_xfer (output_casefile, &input_cases[min_idx]);
868
869       if (!casereader_read_xfer (input_readers[min_idx],
870                                  &input_cases[min_idx]))
871         {
872           casereader_destroy (input_readers[min_idx]);
873           casefile_destroy (input_runs[min_idx]);
874
875           run_cnt--;
876           input_runs[min_idx] = input_runs[run_cnt];
877           input_readers[min_idx] = input_readers[run_cnt];
878           input_cases[min_idx] = input_cases[run_cnt];
879         } 
880     }
881
882   casefile_sleep (output_casefile);
883
884   return output_casefile;
885 }