Redo VFM interface. Get rid of compaction_necessary, compaction_nval,
[pspp-builds.git] / src / sort.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include "sort.h"
22 #include <assert.h>
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <errno.h>
26 #include "algorithm.h"
27 #include "alloc.h"
28 #include "command.h"
29 #include "error.h"
30 #include "expr.h"
31 #include "lexer.h"
32 #include "misc.h"
33 #include "settings.h"
34 #include "str.h"
35 #include "var.h"
36 #include "vfm.h"
37 #include "vfmP.h"
38
39 #if HAVE_UNISTD_H
40 #include <unistd.h>
41 #endif
42
43 #if HAVE_SYS_TYPES_H
44 #include <sys/types.h>
45 #endif
46
47 #if HAVE_SYS_STAT_H
48 #include <sys/stat.h>
49 #endif
50
51 #include "debug-print.h"
52
53 /* Other prototypes. */
54 static int compare_record (const union value *, const union value *,
55                            const struct sort_cases_pgm *, int *idx_to_fv);
56 static int compare_case_lists (const void *, const void *, void *);
57 static struct internal_sort *do_internal_sort (struct sort_cases_pgm *,
58                                                int separate);
59 static void destroy_internal_sort (struct internal_sort *);
60 static struct external_sort *do_external_sort (struct sort_cases_pgm *,
61                                                int separate);
62 static void destroy_external_sort (struct external_sort *);
63 struct sort_cases_pgm *parse_sort (void);
64
65 /* Performs the SORT CASES procedures. */
66 int
67 cmd_sort_cases (void)
68 {
69   struct sort_cases_pgm *scp;
70   int success;
71
72   lex_match_id ("SORT");
73   lex_match_id ("CASES");
74   lex_match (T_BY);
75
76   scp = parse_sort ();
77   if (scp == NULL)
78     return CMD_FAILURE;
79
80   if (temporary != 0)
81     {
82       msg (SE, _("SORT CASES may not be used after TEMPORARY.  "
83                  "Temporary transformations will be made permanent."));
84       cancel_temporary (); 
85     }
86
87   success = sort_cases (scp, 0);
88   destroy_sort_cases_pgm (scp);
89   if (success)
90     return lex_end_of_command ();
91   else
92     return CMD_FAILURE;
93 }
94
95 /* Parses a list of sort keys and returns a struct sort_cases_pgm
96    based on it.  Returns a null pointer on error. */
97 struct sort_cases_pgm *
98 parse_sort (void)
99 {
100   struct sort_cases_pgm *scp;
101
102   scp = xmalloc (sizeof *scp);
103   scp->ref_cnt = 1;
104   scp->vars = NULL;
105   scp->dirs = NULL;
106   scp->var_cnt = 0;
107   scp->isrt = NULL;
108   scp->xsrt = NULL;
109
110   do
111     {
112       int prev_var_cnt = scp->var_cnt;
113       enum sort_direction direction = SRT_ASCEND;
114
115       /* Variables. */
116       if (!parse_variables (default_dict, &scp->vars, &scp->var_cnt,
117                             PV_NO_DUPLICATE | PV_APPEND | PV_NO_SCRATCH))
118         goto error;
119
120       /* Sort direction. */
121       if (lex_match ('('))
122         {
123           if (lex_match_id ("D") || lex_match_id ("DOWN"))
124             direction = SRT_DESCEND;
125           else if (!lex_match_id ("A") && !lex_match_id ("UP"))
126             {
127               msg (SE, _("`A' or `D' expected inside parentheses."));
128               goto error;
129             }
130           if (!lex_match (')'))
131             {
132               msg (SE, _("`)' expected."));
133               goto error;
134             }
135         }
136       scp->dirs = xrealloc (scp->dirs, sizeof *scp->dirs * scp->var_cnt);
137       for (; prev_var_cnt < scp->var_cnt; prev_var_cnt++)
138         scp->dirs[prev_var_cnt] = direction;
139     }
140   while (token != '.' && token != '/');
141   
142   return scp;
143
144  error:
145   destroy_sort_cases_pgm (scp);
146   return NULL;
147 }
148
149 /* Destroys a SORT CASES program. */
150 void
151 destroy_sort_cases_pgm (struct sort_cases_pgm *scp) 
152 {
153   if (scp != NULL) 
154     {
155       assert (scp->ref_cnt > 0);
156       if (--scp->ref_cnt > 0)
157         return;
158
159       free (scp->vars);
160       free (scp->dirs);
161       destroy_internal_sort (scp->isrt);
162       destroy_external_sort (scp->xsrt);
163       free (scp);
164     }
165 }
166
167 /* Sorts the active file based on the key variables specified in
168    global variables vars and var_cnt.  The output is either to the
169    active file, if SEPARATE is zero, or to a separate file, if
170    SEPARATE is nonzero.  In the latter case the output cases can be
171    read with a call to read_sort_output().  (In the former case the
172    output cases should be dealt with through the usual vfm interface.)
173
174    The caller is responsible for freeing vars[]. */
175 int
176 sort_cases (struct sort_cases_pgm *scp, int separate)
177 {
178   scp->case_size
179     = sizeof (union value) * dict_get_compacted_value_cnt (default_dict);
180
181   /* Not sure this is necessary but it's good to be safe. */
182   if (separate && case_source_is_class (vfm_source, &sort_source_class))
183     procedure (NULL, NULL);
184   
185   /* SORT CASES cancels PROCESS IF. */
186   expr_free (process_if_expr);
187   process_if_expr = NULL;
188
189   /* Try an internal sort first. */
190   scp->isrt = do_internal_sort (scp, separate);
191   if (scp->isrt != NULL) 
192     return 1; 
193
194   /* Fall back to an external sort. */
195   if (vfm_source != NULL
196       && case_source_is_class (vfm_source, &storage_source_class))
197     storage_source_to_disk (vfm_source);
198   scp->xsrt = do_external_sort (scp, separate);
199   if (scp->xsrt != NULL) 
200     return 1;
201
202   destroy_sort_cases_pgm (scp);
203   return 0;
204 }
205 \f
206 /* Results of an internal sort. */
207 struct internal_sort 
208   {
209     struct case_list **results;
210   };
211
212 /* If the data is in memory, do an internal sort.  Return
213    success. */
214 static struct internal_sort *
215 do_internal_sort (struct sort_cases_pgm *scp, int separate)
216 {
217   struct internal_sort *isrt;
218
219   isrt = xmalloc (sizeof *isrt);
220   isrt->results = NULL;
221
222   if (case_source_is_class (vfm_source, &storage_source_class)
223       && !storage_source_on_disk (vfm_source))
224     {
225       struct case_list *case_list;
226       struct case_list **case_array;
227       int case_cnt;
228       int i;
229
230       case_cnt = vfm_source->class->count (vfm_source);
231       if (case_cnt <= 0)
232         return isrt;
233
234       if (case_cnt > set_max_workspace / sizeof *case_array)
235         goto error;
236
237       case_list = storage_source_get_cases (vfm_source);
238       case_array = malloc (sizeof *case_array * (case_cnt + 1));
239       if (case_array == NULL)
240         goto error;
241
242       for (i = 0; case_list != NULL; i++) 
243         {
244           case_array[i] = case_list;
245           case_list = case_list->next;
246         }
247       assert (i == case_cnt);
248       case_array[case_cnt] = NULL;
249
250       sort (case_array, case_cnt, sizeof *case_array,
251             compare_case_lists, scp);
252
253       if (!separate) 
254         {
255           storage_source_set_cases (vfm_source, case_array[0]);
256           for (i = 1; i <= case_cnt; i++)
257             case_array[i - 1]->next = case_array[i]; 
258           free (case_array);
259         }
260       else 
261         isrt->results = case_array;
262                       
263       return isrt;
264     }
265
266  error:
267   free (isrt);
268   return NULL;
269 }
270
271 /* Destroys an internal sort result. */
272 static void
273 destroy_internal_sort (struct internal_sort *isrt) 
274 {
275   if (isrt != NULL) 
276     {
277       free (isrt->results);
278       free (isrt);
279     }
280 }
281
282 /* Compares the VAR_CNT variables in VARS[] between the
283    `case_list's at A and B, and returns a strcmp()-type
284    result. */
285 static int
286 compare_case_lists (const void *a_, const void *b_, void *scp_)
287 {
288   struct sort_cases_pgm *scp = scp_;
289   struct case_list *const *pa = a_;
290   struct case_list *const *pb = b_;
291   struct case_list *a = *pa;
292   struct case_list *b = *pb;
293
294   return compare_record (a->c.data, b->c.data, scp, NULL);
295 }
296 \f
297 /* External sort. */
298
299 /* Maximum order of merge.  If you increase this, then you should
300    use a heap for comparing cases during merge.  */
301 #define MAX_MERGE_ORDER         7
302
303 /* Minimum total number of records for buffers. */
304 #define MIN_BUFFER_TOTAL_SIZE_RECS      64
305
306 /* Minimum single input buffer size, in bytes and records. */
307 #define MIN_BUFFER_SIZE_BYTES   4096
308 #define MIN_BUFFER_SIZE_RECS    16
309
310 #if MIN_BUFFER_SIZE_RECS * 2 + 16 > MIN_BUFFER_TOTAL_SIZE_RECS
311 #error MIN_BUFFER_SIZE_RECS and MIN_BUFFER_TOTAL_SIZE_RECS do not make sense.
312 #endif
313
314 /* An initial run and its length. */
315 struct initial_run 
316   {
317     int file_idx;                     /* File index. */
318     size_t case_cnt;                  /* Number of cases. */
319   };
320
321 /* Sorts initial runs A and B in decending order by length. */
322 static int
323 compare_initial_runs (const void *a_, const void *b_, void *aux UNUSED) 
324 {
325   const struct initial_run *a = a_;
326   const struct initial_run *b = b_;
327   
328   return a->case_cnt > b->case_cnt ? -1 : a->case_cnt <b->case_cnt;
329 }
330
331 /* Results of an external sort. */
332 struct external_sort 
333   {
334     struct sort_cases_pgm *scp;       /* SORT CASES info. */
335     struct initial_run *initial_runs; /* Array of initial runs. */
336     size_t run_cnt, run_cap;          /* Number of runs, allocated capacity. */
337     char *temp_dir;                   /* Temporary file directory name. */
338     char *temp_name;                  /* Name of a temporary file. */
339     int next_file_idx;                /* Lowest unused file index. */
340   };
341
342 /* Prototypes for helper functions. */
343 static void sort_sink_write (struct case_sink *, const struct ccase *);
344 static int write_initial_runs (struct external_sort *, int separate);
345 static int init_external_sort (struct external_sort *);
346 static int merge (struct external_sort *);
347 static void rmdir_temp_dir (struct external_sort *);
348 static void remove_temp_file (struct external_sort *xsrt, int file_idx);
349
350 /* Performs an external sort of the active file according to the
351    specification in SCP.  Forms initial runs using a heap as a
352    reservoir.  Determines the optimum merge pattern via Huffman's
353    method (see Knuth vol. 3, 2nd edition, p. 365-366), and merges
354    according to that pattern. */
355 static struct external_sort *
356 do_external_sort (struct sort_cases_pgm *scp, int separate)
357 {
358   struct external_sort *xsrt;
359   int success = 0;
360
361   xsrt = xmalloc (sizeof *xsrt);
362   xsrt->scp = scp;
363   if (!init_external_sort (xsrt))
364     goto done;
365   if (!write_initial_runs (xsrt, separate))
366     goto done;
367   if (!merge (xsrt))
368     goto done;
369
370   success = 1;
371
372  done:
373   if (success)
374     {
375       /* Don't destroy anything because we'll need it for reading
376          the output. */
377       return xsrt;
378     }
379   else
380     {
381       destroy_external_sort (xsrt);
382       return NULL;
383     }
384 }
385
386 /* Destroys XSRT. */
387 static void
388 destroy_external_sort (struct external_sort *xsrt) 
389 {
390   if (xsrt != NULL) 
391     {
392       int i;
393       
394       for (i = 0; i < xsrt->run_cnt; i++)
395         remove_temp_file (xsrt, xsrt->initial_runs[i].file_idx);
396       rmdir_temp_dir (xsrt);
397       free (xsrt->temp_dir);
398       free (xsrt->temp_name);
399       free (xsrt->initial_runs);
400       free (xsrt);
401     }
402 }
403
404 #ifdef HAVE_MKDTEMP
405 /* Creates and returns the name of a temporary directory. */
406 static char *
407 make_temp_dir (void) 
408 {
409   const char *parent_dir;
410   char *temp_dir;
411
412   if (getenv ("TMPDIR") != NULL)
413     parent_dir = getenv ("TMPDIR");
414   else
415     parent_dir = P_tmpdir;
416
417   temp_dir = xmalloc (strlen (parent_dir) + 32);
418   sprintf (temp_dir, "%s%cpsppXXXXXX", parent_dir, DIR_SEPARATOR);
419   if (mkdtemp (temp_dir) == NULL) 
420     {
421       msg (SE, _("%s: Creating temporary directory: %s."),
422            temp_dir, strerror (errno));
423       free (temp_dir);
424       return NULL;
425     }
426   else
427     return temp_dir;
428 }
429 #else /* !HAVE_MKDTEMP */
430 /* Creates directory DIR. */
431 static int
432 do_mkdir (const char *dir) 
433 {
434 #ifndef __MSDOS__
435   return mkdir (dir, S_IRWXU);
436 #else
437   return mkdir (dir);
438 #endif
439 }
440
441 /* Creates and returns the name of a temporary directory. */
442 static char *
443 make_temp_dir (void) 
444 {
445   int i;
446   
447   for (i = 0; i < 100; i++)
448     {
449       char temp_dir[L_tmpnam + 1];
450       if (tmpnam (temp_dir) == NULL) 
451         {
452           msg (SE, _("Generating temporary directory name failed: %s."),
453                strerror (errno));
454           return NULL; 
455         }
456       else if (do_mkdir (temp_dir) == 0)
457         return xstrdup (temp_dir);
458     }
459   
460   msg (SE, _("Creating temporary directory failed: %s."), strerror (errno));
461   return NULL;
462 }
463 #endif /* !HAVE_MKDTEMP */
464
465 /* Sets up to open temporary files. */
466 static int
467 init_external_sort (struct external_sort *xsrt)
468 {
469   /* Zero. */
470   xsrt->temp_dir = NULL;
471   xsrt->next_file_idx = 0;
472
473   /* Huffman queue. */
474   xsrt->run_cap = 512;
475   xsrt->run_cnt = 0;
476   xsrt->initial_runs = xmalloc (sizeof *xsrt->initial_runs * xsrt->run_cap);
477
478   /* Temporary directory. */
479   xsrt->temp_dir = make_temp_dir ();
480   xsrt->temp_name = NULL;
481   if (xsrt->temp_dir == NULL)
482     return 0;
483   xsrt->temp_name = xmalloc (strlen (xsrt->temp_dir) + 64);
484
485   return 1;
486 }
487
488 /* Returns nonzero if we should return an error even though the
489    operation succeeded.  Useful for testing. */
490 static int
491 simulate_error (void) 
492 {
493   static int op_err_cnt = -1;
494   static int op_cnt;
495   
496   if (op_err_cnt == -1 || op_cnt++ < op_err_cnt)
497     return 0;
498   else
499     {
500       errno = 0;
501       return 1;
502     }
503 }
504
505 /* Removes the directory created for temporary files, if one
506    exists. */
507 static void
508 rmdir_temp_dir (struct external_sort *xsrt)
509 {
510   if (xsrt->temp_dir != NULL && rmdir (xsrt->temp_dir) == -1) 
511     {
512       msg (SE, _("%s: Error removing directory for temporary files: %s."),
513            xsrt->temp_dir, strerror (errno));
514       xsrt->temp_dir = NULL; 
515     }
516 }
517
518 /* Returns the name of temporary file number FILE_IDX for XSRT.
519    The name is written into a static buffer, so be careful.  */
520 static char *
521 get_temp_file_name (struct external_sort *xsrt, int file_idx)
522 {
523   assert (xsrt->temp_dir != NULL);
524   sprintf (xsrt->temp_name, "%s%c%04d",
525            xsrt->temp_dir, DIR_SEPARATOR, file_idx);
526   return xsrt->temp_name;
527 }
528
529 /* Opens temporary file numbered FILE_IDX for XSRT with mode MODE
530    and returns the FILE *. */
531 static FILE *
532 open_temp_file (struct external_sort *xsrt, int file_idx, const char *mode)
533 {
534   char *temp_file;
535   FILE *file;
536
537   temp_file = get_temp_file_name (xsrt, file_idx);
538
539   file = fopen (temp_file, mode);
540   if (simulate_error () || file == NULL) 
541     msg (SE, _("%s: Error opening temporary file for %s: %s."),
542          temp_file, mode[0] == 'r' ? "reading" : "writing",
543          strerror (errno));
544
545   return file;
546 }
547
548 /* Closes FILE, which is the temporary file numbered FILE_IDX
549    under XSRT.  Returns nonzero only if successful.  */
550 static int
551 close_temp_file (struct external_sort *xsrt, int file_idx, FILE *file)
552 {
553   if (file != NULL) 
554     {
555       char *temp_file = get_temp_file_name (xsrt, file_idx);
556       if (simulate_error () || fclose (file) == EOF) 
557         {
558           msg (SE, _("%s: Error closing temporary file: %s."),
559                temp_file, strerror (errno));
560           return 0;
561         }
562     }
563   return 1;
564 }
565
566 /* Delete temporary file numbered FILE_IDX for XSRT. */
567 static void
568 remove_temp_file (struct external_sort *xsrt, int file_idx) 
569 {
570   if (file_idx != -1)
571     {
572       char *temp_file = get_temp_file_name (xsrt, file_idx);
573       if (simulate_error () || remove (temp_file) != 0)
574         msg (SE, _("%s: Error removing temporary file: %s."),
575              temp_file, strerror (errno));
576     }
577 }
578
579 /* Writes SIZE bytes from buffer DATA into FILE, which is
580    temporary file numbered FILE_IDX from XSRT. */
581 static int
582 write_temp_file (struct external_sort *xsrt, int file_idx,
583                  FILE *file, const void *data, size_t size) 
584 {
585   if (!simulate_error () && fwrite (data, size, 1, file) == 1)
586     return 1;
587   else
588     {
589       char *temp_file = get_temp_file_name (xsrt, file_idx);
590       msg (SE, _("%s: Error writing temporary file: %s."),
591            temp_file, strerror (errno));
592       return 0;
593     }
594 }
595
596 /* Reads SIZE bytes into buffer DATA into FILE, which is
597    temporary file numbered FILE_IDX from XSRT. */
598 static int
599 read_temp_file (struct external_sort *xsrt, int file_idx,
600                 FILE *file, void *data, size_t size) 
601 {
602   if (!simulate_error () && fread (data, size, 1, file) == 1)
603     return 1;
604   else 
605     {
606       char *temp_file = get_temp_file_name (xsrt, file_idx);
607       if (ferror (file))
608         msg (SE, _("%s: Error reading temporary file: %s."),
609              temp_file, strerror (errno));
610       else
611         msg (SE, _("%s: Unexpected end of temporary file."),
612              temp_file);
613       return 0;
614     }
615 }
616 \f
617 /* Replacement selection. */
618
619 /* Pairs a record with a run number. */
620 struct record_run
621   {
622     int run;                    /* Run number of case. */
623     struct case_list *record;   /* Case data. */
624   };
625
626 /* Represents a set of initial runs during an external sort. */
627 struct initial_run_state 
628   {
629     struct external_sort *xsrt;
630
631     int *idx_to_fv;             /* Translation table copied from sink. */
632
633     /* Reservoir. */
634     struct record_run *records; /* Records arranged as a heap. */
635     size_t record_cnt;          /* Current number of records. */
636     size_t record_cap;          /* Capacity for records. */
637     struct case_list *free_list;/* Cases not in heap. */
638     
639     /* Run currently being output. */
640     int file_idx;               /* Temporary file number. */
641     size_t case_cnt;            /* Number of cases so far. */
642     FILE *output_file;          /* Output file. */
643     struct case_list *last_output;/* Record last output. */
644
645     int okay;                   /* Zero if an error has been encountered. */
646   };
647
648 static const struct case_sink_class sort_sink_class;
649
650 static void destroy_initial_run_state (struct initial_run_state *irs);
651 static int allocate_cases (struct initial_run_state *);
652 static struct case_list *grab_case (struct initial_run_state *);
653 static void release_case (struct initial_run_state *, struct case_list *);
654 static void output_record (struct initial_run_state *irs);
655 static void start_run (struct initial_run_state *irs);
656 static void end_run (struct initial_run_state *irs);
657 static int compare_record_run (const struct record_run *,
658                                const struct record_run *,
659                                struct initial_run_state *);
660 static int compare_record_run_minheap (const void *, const void *, void *);
661
662 /* Writes initial runs for XSRT, sending them to a separate file
663    if SEPARATE is nonzero. */
664 static int
665 write_initial_runs (struct external_sort *xsrt, int separate)
666 {
667   struct initial_run_state *irs;
668   int success = 0;
669
670   /* Allocate memory for cases. */
671   irs = xmalloc (sizeof *irs);
672   irs->xsrt = xsrt;
673   irs->records = NULL;
674   irs->record_cnt = irs->record_cap = 0;
675   irs->free_list = NULL;
676   irs->output_file = NULL;
677   irs->last_output = NULL;
678   irs->file_idx = 0;
679   irs->case_cnt = 0;
680   irs->okay = 1;
681   if (!allocate_cases (irs)) 
682     goto done;
683
684   /* Create case sink. */
685   if (!separate)
686     {
687       if (vfm_sink != NULL && vfm_sink->class->destroy != NULL)
688         vfm_sink->class->destroy (vfm_sink);
689       vfm_sink = create_case_sink (&sort_sink_class, default_dict, irs);
690       xsrt->scp->ref_cnt++;
691     }
692
693   /* Create initial runs. */
694   start_run (irs);
695   procedure (NULL, NULL);
696   irs->idx_to_fv = NULL;
697   while (irs->record_cnt > 0 && irs->okay)
698     output_record (irs);
699   end_run (irs);
700
701   success = irs->okay;
702
703  done:
704   destroy_initial_run_state (irs);
705
706   return success;
707 }
708
709 /* Add a single case to an initial run. */
710 static void
711 sort_sink_write (struct case_sink *sink, const struct ccase *c)
712 {
713   struct initial_run_state *irs = sink->aux;
714   struct record_run *new_record_run;
715
716   if (!irs->okay)
717     return;
718
719   irs->idx_to_fv = sink->idx_to_fv;
720
721   /* Compose record_run for this run and add to heap. */
722   assert (irs->record_cnt < irs->record_cap);
723   new_record_run = irs->records + irs->record_cnt++;
724   new_record_run->record = grab_case (irs);
725   memcpy (new_record_run->record->c.data, c->data, irs->xsrt->scp->case_size);
726   new_record_run->run = irs->file_idx;
727   if (irs->last_output != NULL
728       && compare_record (c->data, irs->last_output->c.data,
729                          irs->xsrt->scp, sink->idx_to_fv) < 0)
730     new_record_run->run = irs->xsrt->next_file_idx;
731   push_heap (irs->records, irs->record_cnt, sizeof *irs->records,
732              compare_record_run_minheap, irs);
733
734   /* Output a record if the reservoir is full. */
735   if (irs->record_cnt == irs->record_cap && irs->okay)
736     output_record (irs);
737 }
738
739 /* Destroys the initial run state represented by IRS. */
740 static void
741 destroy_initial_run_state (struct initial_run_state *irs) 
742 {
743   struct case_list *iter, *next;
744   int i;
745
746   if (irs == NULL)
747     return;
748
749   /* Release cases to free list. */
750   for (i = 0; i < irs->record_cnt; i++)
751     release_case (irs, irs->records[i].record);
752   if (irs->last_output != NULL)
753     release_case (irs, irs->last_output);
754
755   /* Free cases in free list. */
756   for (iter = irs->free_list; iter != NULL; iter = next) 
757     {
758       next = iter->next;
759       free (iter);
760     }
761
762   free (irs->records);
763   if (irs->output_file != NULL)
764     close_temp_file (irs->xsrt, irs->file_idx, irs->output_file);
765
766   free (irs);
767 }
768
769 /* Allocates room for lots of cases as a buffer. */
770 static int
771 allocate_cases (struct initial_run_state *irs)
772 {
773   int approx_case_cost; /* Approximate memory cost of one case in bytes. */
774   int max_cases;        /* Maximum number of cases to allocate. */
775   int i;
776
777   /* Allocate as many cases as we can within the workspace
778      limit. */
779   approx_case_cost = (sizeof *irs->records
780                       + sizeof *irs->free_list
781                       + irs->xsrt->scp->case_size
782                       + 4 * sizeof (void *));
783   max_cases = set_max_workspace / approx_case_cost;
784   irs->records = malloc (sizeof *irs->records * max_cases);
785   for (i = 0; i < max_cases; i++)
786     {
787       struct case_list *c;
788       c = malloc (sizeof *c
789                   + irs->xsrt->scp->case_size
790                   - sizeof (union value));
791       if (c == NULL) 
792         {
793           max_cases = i;
794           break;
795         }
796       release_case (irs, c);
797     }
798
799   /* irs->records gets all but one of the allocated cases.
800      The extra is used for last_output. */
801   irs->record_cap = max_cases - 1;
802
803   /* Fail if we didn't allocate an acceptable number of cases. */
804   if (irs->records == NULL || max_cases < MIN_BUFFER_TOTAL_SIZE_RECS)
805     {
806       msg (SE, _("Out of memory.  Could not allocate room for minimum of %d "
807                  "cases of %d bytes each.  (PSPP workspace is currently "
808                  "restricted to a maximum of %d KB.)"),
809            MIN_BUFFER_TOTAL_SIZE_RECS, approx_case_cost, set_max_workspace / 1024);
810       return 0;
811     }
812   return 1;
813 }
814
815 /* Compares the VAR_CNT variables in VARS[] between the `value's at
816    A and B, and returns a strcmp()-type result. */
817 static int
818 compare_record (const union value *a, const union value *b,
819                 const struct sort_cases_pgm *scp,
820                 int *idx_to_fv)
821 {
822   int i;
823
824   assert (a != NULL);
825   assert (b != NULL);
826   
827   for (i = 0; i < scp->var_cnt; i++)
828     {
829       struct variable *v = scp->vars[i];
830       int fv;
831       int result;
832
833       if (idx_to_fv != NULL)
834         fv = idx_to_fv[v->index];
835       else
836         fv = v->fv;
837       
838       if (v->type == NUMERIC)
839         {
840           double af = a[fv].f;
841           double bf = b[fv].f;
842           
843           result = af < bf ? -1 : af > bf;
844         }
845       else
846         result = memcmp (a[fv].s, b[fv].s, v->width);
847
848       if (result != 0) 
849         {
850           if (scp->dirs[i] == SRT_DESCEND)
851             result = -result;
852           return result;
853         }
854     }
855
856   return 0;
857 }
858
859 /* Compares record-run tuples A and B on run number first, then
860    on the current record according to SCP. */
861 static int
862 compare_record_run (const struct record_run *a,
863                     const struct record_run *b,
864                     struct initial_run_state *irs)
865 {
866   if (a->run != b->run)
867     return a->run > b->run ? 1 : -1;
868   else
869     return compare_record (a->record->c.data, b->record->c.data,
870                            irs->xsrt->scp, irs->idx_to_fv);
871 }
872
873 /* Compares record-run tuples A and B on run number first, then
874    on the current record according to SCP, but in descending
875    order. */
876 static int
877 compare_record_run_minheap (const void *a, const void *b, void *irs) 
878 {
879   return -compare_record_run (a, b, irs);
880 }
881
882 /* Begins a new initial run, specifically its output file. */
883 static void
884 start_run (struct initial_run_state *irs)
885 {
886   irs->file_idx = irs->xsrt->next_file_idx++;
887   irs->case_cnt = 0;
888   irs->output_file = open_temp_file (irs->xsrt, irs->file_idx, "wb");
889   if (irs->output_file == NULL) 
890     irs->okay = 0;
891   if (irs->last_output != NULL) 
892     {
893       release_case (irs, irs->last_output);
894       irs->last_output = NULL; 
895     }
896 }
897
898 /* Ends the current initial run.  */
899 static void
900 end_run (struct initial_run_state *irs)
901 {
902   struct external_sort *xsrt = irs->xsrt;
903   
904   /* Record initial run. */
905   if (xsrt->run_cnt >= xsrt->run_cap) 
906     {
907       xsrt->run_cap *= 2;
908       xsrt->initial_runs
909         = xrealloc (xsrt->initial_runs,
910                     sizeof *xsrt->initial_runs * xsrt->run_cap);
911     }
912   xsrt->initial_runs[xsrt->run_cnt].file_idx = irs->file_idx;
913   xsrt->initial_runs[xsrt->run_cnt].case_cnt = irs->case_cnt;
914   xsrt->run_cnt++;
915
916   /* Close file handle. */
917   if (irs->output_file != NULL
918       && !close_temp_file (irs->xsrt, irs->file_idx, irs->output_file)) 
919     irs->okay = 0;
920   irs->output_file = NULL;
921 }
922
923 /* Writes a record to the current initial run. */
924 static void
925 output_record (struct initial_run_state *irs)
926 {
927   struct record_run *record_run;
928   
929   /* Extract minimum case from heap. */
930   assert (irs->record_cnt > 0);
931   pop_heap (irs->records, irs->record_cnt--, sizeof *irs->records,
932             compare_record_run_minheap, irs);
933   record_run = irs->records + irs->record_cnt;
934
935   /* Bail if an error has occurred. */
936   if (!irs->okay)
937     return;
938
939   /* Start new run if necessary. */
940   assert (record_run->run == irs->file_idx
941           || record_run->run == irs->xsrt->next_file_idx);
942   if (record_run->run != irs->file_idx)
943     {
944       end_run (irs);
945       start_run (irs);
946     }
947   assert (record_run->run == irs->file_idx);
948   irs->case_cnt++;
949
950   /* Write to disk. */
951   if (irs->output_file != NULL
952       && !write_temp_file (irs->xsrt, irs->file_idx, irs->output_file,
953                            &record_run->record->c, irs->xsrt->scp->case_size))
954     irs->okay = 0;
955
956   /* This record becomes last_output. */
957   if (irs->last_output != NULL)
958     release_case (irs, irs->last_output);
959   irs->last_output = record_run->record;
960 }
961
962 /* Gets a case from the free list in IRS.  It is an error to call
963    this function if the free list is empty. */
964 static struct case_list *
965 grab_case (struct initial_run_state *irs)
966 {
967   struct case_list *c;
968   
969   assert (irs != NULL);
970   assert (irs->free_list != NULL);
971
972   c = irs->free_list;
973   irs->free_list = c->next;
974   return c;
975 }
976
977 /* Returns C to the free list in IRS. */
978 static void 
979 release_case (struct initial_run_state *irs, struct case_list *c) 
980 {
981   assert (irs != NULL);
982   assert (c != NULL);
983
984   c->next = irs->free_list;
985   irs->free_list = c;
986 }
987 \f
988 /* Merging. */
989
990 /* State of merging initial runs. */
991 struct merge_state 
992   {
993     struct external_sort *xsrt; /* External sort state. */
994     struct ccase **cases;       /* Buffers. */
995     size_t case_cnt;            /* Number of buffers. */
996   };
997
998 struct run;
999 static int merge_once (struct merge_state *,
1000                        const struct initial_run[], size_t,
1001                        struct initial_run *);
1002 static int fill_run_buffer (struct merge_state *, struct run *);
1003 static int mod (int, int);
1004
1005 /* Performs a series of P-way merges of initial runs
1006    method. */
1007 static int
1008 merge (struct external_sort *xsrt)
1009 {
1010   struct merge_state mrg;       /* State of merge. */
1011   size_t approx_case_cost;      /* Approximate memory cost of one case. */
1012   int max_order;                /* Maximum order of merge. */
1013   size_t dummy_run_cnt;         /* Number of dummy runs to insert. */
1014   int success = 0;
1015   int i;
1016
1017   mrg.xsrt = xsrt;
1018
1019   /* Allocate as many cases as possible into cases. */
1020   approx_case_cost = (sizeof *mrg.cases
1021                       + xsrt->scp->case_size + 4 * sizeof (void *));
1022   mrg.case_cnt = set_max_workspace / approx_case_cost;
1023   mrg.cases = malloc (sizeof *mrg.cases * mrg.case_cnt);
1024   if (mrg.cases == NULL)
1025     goto done;
1026   for (i = 0; i < mrg.case_cnt; i++) 
1027     {
1028       mrg.cases[i] = malloc (xsrt->scp->case_size);
1029       if (mrg.cases[i] == NULL) 
1030         {
1031           mrg.case_cnt = i;
1032           break;
1033         }
1034     }
1035   if (mrg.case_cnt < MIN_BUFFER_TOTAL_SIZE_RECS)
1036     {
1037       msg (SE, _("Out of memory.  Could not allocate room for minimum of %d "
1038                  "cases of %d bytes each.  (PSPP workspace is currently "
1039                  "restricted to a maximum of %d KB.)"),
1040            MIN_BUFFER_TOTAL_SIZE_RECS, approx_case_cost, set_max_workspace / 1024);
1041       return 0;
1042     }
1043
1044   /* Determine maximum order of merge. */
1045   max_order = MAX_MERGE_ORDER;
1046   if (mrg.case_cnt / max_order < MIN_BUFFER_SIZE_RECS)
1047     max_order = mrg.case_cnt / MIN_BUFFER_SIZE_RECS;
1048   else if (mrg.case_cnt / max_order * xsrt->scp->case_size
1049            < MIN_BUFFER_SIZE_BYTES)
1050     max_order = mrg.case_cnt / (MIN_BUFFER_SIZE_BYTES / xsrt->scp->case_size);
1051   if (max_order < 2)
1052     max_order = 2;
1053   if (max_order > xsrt->run_cnt)
1054     max_order = xsrt->run_cnt;
1055
1056   /* Repeatedly merge the P shortest existing runs until only one run
1057      is left. */
1058   make_heap (xsrt->initial_runs, xsrt->run_cnt, sizeof *xsrt->initial_runs,
1059              compare_initial_runs, NULL);
1060   dummy_run_cnt = mod (1 - (int) xsrt->run_cnt, max_order - 1);
1061   assert (max_order == 1
1062           || (xsrt->run_cnt + dummy_run_cnt) % (max_order - 1) == 1);
1063   while (xsrt->run_cnt > 1)
1064     {
1065       struct initial_run output_run;
1066       int order;
1067       int i;
1068
1069       /* Choose order of merge (max_order after first merge). */
1070       order = max_order - dummy_run_cnt;
1071       dummy_run_cnt = 0;
1072
1073       /* Choose runs to merge. */
1074       assert (xsrt->run_cnt >= order);
1075       for (i = 0; i < order; i++) 
1076         pop_heap (xsrt->initial_runs, xsrt->run_cnt--,
1077                   sizeof *xsrt->initial_runs,
1078                   compare_initial_runs, NULL); 
1079           
1080       /* Merge runs. */
1081       if (!merge_once (&mrg, xsrt->initial_runs + xsrt->run_cnt, order,
1082                        &output_run))
1083         goto done;
1084
1085       /* Add output run to heap. */
1086       xsrt->initial_runs[xsrt->run_cnt++] = output_run;
1087       push_heap (xsrt->initial_runs, xsrt->run_cnt, sizeof *xsrt->initial_runs,
1088                  compare_initial_runs, NULL);
1089     }
1090
1091   /* Exactly one run is left, which contains the entire sorted
1092      file.  We could use it to find a total case count. */
1093   assert (xsrt->run_cnt == 1);
1094
1095   success = 1;
1096
1097  done:
1098   for (i = 0; i < mrg.case_cnt; i++)
1099     free (mrg.cases[i]);
1100   free (mrg.cases);
1101
1102   return success;
1103 }
1104
1105 /* Modulo function as defined by Knuth. */
1106 static int
1107 mod (int x, int y)
1108 {
1109   if (y == 0)
1110     return x;
1111   else if (x == 0)
1112     return 0;
1113   else if (x > 0 && y > 0)
1114     return x % y;
1115   else if (x < 0 && y > 0)
1116     return y - (-x) % y;
1117
1118   assert (0);
1119 }
1120
1121 /* A run of data for use in merging. */
1122 struct run 
1123   {
1124     FILE *file;                 /* File that contains run. */
1125     int file_idx;               /* Index of file that contains run. */
1126     struct ccase **buffer;      /* Case buffer. */
1127     struct ccase **buffer_head; /* First unconsumed case in buffer. */
1128     struct ccase **buffer_tail; /* One past last unconsumed case in buffer. */
1129     size_t buffer_cap;          /* Number of cases buffer can hold. */
1130     size_t unread_case_cnt;     /* Number of cases not yet read. */
1131   };
1132
1133 /* Merges the RUN_CNT initial runs specified in INPUT_RUNS into a
1134    new run.  Returns nonzero only if successful.  Adds an entry
1135    to MRG->xsrt->runs for the output file if and only if the
1136    output file is actually created.  Always deletes all the input
1137    files. */
1138 static int
1139 merge_once (struct merge_state *mrg,
1140             const struct initial_run input_runs[],
1141             size_t run_cnt,
1142             struct initial_run *output_run)
1143 {
1144   struct run runs[MAX_MERGE_ORDER];
1145   FILE *output_file = NULL;
1146   int success = 0;
1147   int i;
1148
1149   /* Initialize runs[]. */
1150   for (i = 0; i < run_cnt; i++) 
1151     {
1152       runs[i].file = NULL;
1153       runs[i].file_idx = input_runs[i].file_idx;
1154       runs[i].buffer = mrg->cases + mrg->case_cnt / run_cnt * i;
1155       runs[i].buffer_head = runs[i].buffer;
1156       runs[i].buffer_tail = runs[i].buffer;
1157       runs[i].buffer_cap = mrg->case_cnt / run_cnt;
1158       runs[i].unread_case_cnt = input_runs[i].case_cnt;
1159     }
1160
1161   /* Open input files. */
1162   for (i = 0; i < run_cnt; i++) 
1163     {
1164       runs[i].file = open_temp_file (mrg->xsrt, runs[i].file_idx, "rb");
1165       if (runs[i].file == NULL)
1166         goto error;
1167     }
1168   
1169   /* Create output file and count cases to be output. */
1170   output_run->file_idx = mrg->xsrt->next_file_idx++;
1171   output_run->case_cnt = 0;
1172   for (i = 0; i < run_cnt; i++)
1173     output_run->case_cnt += input_runs[i].case_cnt;
1174   output_file = open_temp_file (mrg->xsrt, output_run->file_idx, "wb");
1175   if (output_file == NULL) 
1176     goto error;
1177
1178   /* Prime buffers. */
1179   for (i = 0; i < run_cnt; i++)
1180     if (!fill_run_buffer (mrg, runs + i))
1181       goto error;
1182
1183   /* Merge. */
1184   while (run_cnt > 0) 
1185     {
1186       struct run *min_run;
1187
1188       /* Find minimum. */
1189       min_run = runs;
1190       for (i = 1; i < run_cnt; i++)
1191         if (compare_record ((*runs[i].buffer_head)->data,
1192                             (*min_run->buffer_head)->data,
1193                             mrg->xsrt->scp, NULL) < 0)
1194           min_run = runs + i;
1195
1196       /* Write minimum to output file. */
1197       if (!write_temp_file (mrg->xsrt, min_run->file_idx, output_file,
1198                             (*min_run->buffer_head)->data,
1199                             mrg->xsrt->scp->case_size))
1200         goto error;
1201
1202       /* Remove case from buffer. */
1203       if (++min_run->buffer_head >= min_run->buffer_tail)
1204         {
1205           /* Buffer is empty.  Fill from file. */
1206           if (!fill_run_buffer (mrg, min_run))
1207             goto error;
1208
1209           /* If buffer is still empty, delete its run. */
1210           if (min_run->buffer_head >= min_run->buffer_tail)
1211             {
1212               close_temp_file (mrg->xsrt, min_run->file_idx, min_run->file);
1213               remove_temp_file (mrg->xsrt, min_run->file_idx);
1214               *min_run = runs[--run_cnt];
1215
1216               /* We could donate the now-unused buffer space to
1217                  other runs. */
1218             }
1219         } 
1220     }
1221
1222   /* Close output file.  */
1223   close_temp_file (mrg->xsrt, output_run->file_idx, output_file);
1224
1225   return 1;
1226
1227  error:
1228   /* Close and remove output file.  */
1229   if (output_file != NULL) 
1230     {
1231       close_temp_file (mrg->xsrt, output_run->file_idx, output_file);
1232       remove_temp_file (mrg->xsrt, output_run->file_idx);
1233     }
1234   
1235   /* Close and remove any remaining input runs. */
1236   for (i = 0; i < run_cnt; i++) 
1237     {
1238       close_temp_file (mrg->xsrt, runs[i].file_idx, runs[i].file);
1239       remove_temp_file (mrg->xsrt, runs[i].file_idx);
1240     }
1241
1242   return success;
1243 }
1244
1245 /* Reads as many cases as possible into RUN's buffer.
1246    Reads nonzero unless a disk error occurs. */
1247 static int
1248 fill_run_buffer (struct merge_state *mrg, struct run *run) 
1249 {
1250   run->buffer_head = run->buffer_tail = run->buffer;
1251   while (run->unread_case_cnt > 0
1252          && run->buffer_tail < run->buffer + run->buffer_cap)
1253     {
1254       if (!read_temp_file (mrg->xsrt, run->file_idx, run->file,
1255                            (*run->buffer_tail)->data,
1256                            mrg->xsrt->scp->case_size))
1257         return 0;
1258
1259       run->unread_case_cnt--;
1260       run->buffer_tail++;
1261     }
1262
1263   return 1;
1264 }
1265 \f
1266 static struct case_source *
1267 sort_sink_make_source (struct case_sink *sink) 
1268 {
1269   struct initial_run_state *irs = sink->aux;
1270
1271   return create_case_source (&sort_source_class, default_dict,
1272                              irs->xsrt->scp);
1273 }
1274
1275 static const struct case_sink_class sort_sink_class = 
1276   {
1277     "SORT CASES",
1278     NULL,
1279     sort_sink_write,
1280     NULL,
1281     sort_sink_make_source,
1282   };
1283 \f
1284 struct sort_source_aux 
1285   {
1286     struct sort_cases_pgm *scp;
1287     struct ccase *dst;
1288     write_case_func *write_case;
1289     write_case_data wc_data;
1290   };
1291
1292 /* Passes C to the write_case function. */
1293 static int
1294 sort_source_read_helper (const struct ccase *src, void *aux_) 
1295 {
1296   struct sort_source_aux *aux = aux_;
1297
1298   memcpy (aux->dst, src, aux->scp->case_size);
1299   return aux->write_case (aux->wc_data);
1300 }
1301
1302 /* Reads all the records from the source stream and passes them
1303    to write_case(). */
1304 static void
1305 sort_source_read (struct case_source *source,
1306                   struct ccase *c,
1307                   write_case_func *write_case, write_case_data wc_data)
1308 {
1309   struct sort_cases_pgm *scp = source->aux;
1310   struct sort_source_aux aux;
1311
1312   aux.scp = scp;
1313   aux.dst = c;
1314   aux.write_case = write_case;
1315   aux.wc_data = wc_data;
1316   
1317   read_sort_output (scp, sort_source_read_helper, &aux);
1318 }
1319
1320 static void read_internal_sort_output (struct internal_sort *isrt,
1321                                        read_sort_output_func *, void *aux);
1322 static void read_external_sort_output (struct external_sort *xsrt,
1323                                        read_sort_output_func *, void *aux);
1324
1325 /* Reads all the records from the output stream and passes them to the
1326    function provided, which must have an interface identical to
1327    write_case(). */
1328 void
1329 read_sort_output (struct sort_cases_pgm *scp,
1330                   read_sort_output_func *output_func, void *aux)
1331 {
1332   assert ((scp->isrt != NULL) + (scp->xsrt != NULL) <= 1);
1333   if (scp->isrt != NULL)
1334     read_internal_sort_output (scp->isrt, output_func, aux);
1335   else if (scp->xsrt != NULL)
1336     read_external_sort_output (scp->xsrt, output_func, aux);
1337   else 
1338     {
1339       /* No results.  Probably an external sort that failed. */
1340     }
1341 }
1342
1343 static void
1344 read_internal_sort_output (struct internal_sort *isrt,
1345                            read_sort_output_func *output_func,
1346                            void *aux)
1347 {
1348   struct case_list **p;
1349
1350   for (p = isrt->results; *p; p++)
1351     if (!output_func (&(*p)->c, aux))
1352       break;
1353   free (isrt->results);
1354 }
1355
1356 static void
1357 read_external_sort_output (struct external_sort *xsrt,
1358                            read_sort_output_func *output_func, void *aux)
1359 {
1360   FILE *file;
1361   int file_idx;
1362   size_t i;
1363   struct ccase *c;
1364
1365   assert (xsrt->run_cnt == 1);
1366   file_idx = xsrt->initial_runs[0].file_idx;
1367
1368   file = open_temp_file (xsrt, file_idx, "rb");
1369   if (file == NULL)
1370     {
1371       err_failure ();
1372       return;
1373     }
1374
1375   c = xmalloc (xsrt->scp->case_size);
1376   for (i = 0; i < xsrt->initial_runs[0].case_cnt; i++)
1377     {
1378       if (!read_temp_file (xsrt, file_idx, file, c, xsrt->scp->case_size))
1379         {
1380           err_failure ();
1381           break;
1382         }
1383
1384       if (!output_func (c, aux))
1385         break;
1386     }
1387   free (c);
1388 }
1389
1390 static void
1391 sort_source_destroy (struct case_source *source) 
1392 {
1393   struct sort_cases_pgm *scp = source->aux;
1394   
1395   destroy_sort_cases_pgm (scp);
1396 }
1397
1398 const struct case_source_class sort_source_class =
1399   {
1400     "SORT CASES",
1401     NULL, /* FIXME */
1402     sort_source_read,
1403     sort_source_destroy,
1404   };