99d850ad9345c354de71b39b3f33d75b857e3a9a
[pspp-builds.git] / src / sort.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include "sort.h"
22 #include <assert.h>
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <errno.h>
26 #include "algorithm.h"
27 #include "alloc.h"
28 #include "command.h"
29 #include "error.h"
30 #include "expr.h"
31 #include "lexer.h"
32 #include "misc.h"
33 #include "settings.h"
34 #include "str.h"
35 #include "var.h"
36 #include "vfm.h"
37 #include "vfmP.h"
38
39 #if HAVE_UNISTD_H
40 #include <unistd.h>
41 #endif
42
43 #if HAVE_SYS_TYPES_H
44 #include <sys/types.h>
45 #endif
46
47 #if HAVE_SYS_STAT_H
48 #include <sys/stat.h>
49 #endif
50
51 #include "debug-print.h"
52
53 /* Other prototypes. */
54 static int compare_record (const union value *, const union value *,
55                            const struct sort_cases_pgm *);
56 static int compare_case_lists (const void *, const void *, void *);
57 static struct internal_sort *do_internal_sort (struct sort_cases_pgm *,
58                                                int separate);
59 static void destroy_internal_sort (struct internal_sort *);
60 static struct external_sort *do_external_sort (struct sort_cases_pgm *,
61                                                int separate);
62 static void destroy_external_sort (struct external_sort *);
63 struct sort_cases_pgm *parse_sort (void);
64
65 /* Performs the SORT CASES procedures. */
66 int
67 cmd_sort_cases (void)
68 {
69   struct sort_cases_pgm *scp;
70   int success;
71
72   lex_match_id ("SORT");
73   lex_match_id ("CASES");
74   lex_match (T_BY);
75
76   scp = parse_sort ();
77   if (scp == NULL)
78     return CMD_FAILURE;
79       
80   cancel_temporary ();
81
82   success = sort_cases (scp, 0);
83   destroy_sort_cases_pgm (scp);
84   if (success)
85     return lex_end_of_command ();
86   else
87     return CMD_FAILURE;
88 }
89
90 /* Parses a list of sort keys and returns a struct sort_cases_pgm
91    based on it.  Returns a null pointer on error. */
92 struct sort_cases_pgm *
93 parse_sort (void)
94 {
95   struct sort_cases_pgm *scp;
96
97   scp = xmalloc (sizeof *scp);
98   scp->ref_cnt = 1;
99   scp->vars = NULL;
100   scp->dirs = NULL;
101   scp->var_cnt = 0;
102   scp->isrt = NULL;
103   scp->xsrt = NULL;
104
105   do
106     {
107       int prev_var_cnt = scp->var_cnt;
108       enum sort_direction direction = SRT_ASCEND;
109
110       /* Variables. */
111       if (!parse_variables (default_dict, &scp->vars, &scp->var_cnt,
112                             PV_NO_DUPLICATE | PV_APPEND | PV_NO_SCRATCH))
113         goto error;
114
115       /* Sort direction. */
116       if (lex_match ('('))
117         {
118           if (lex_match_id ("D") || lex_match_id ("DOWN"))
119             direction = SRT_DESCEND;
120           else if (!lex_match_id ("A") && !lex_match_id ("UP"))
121             {
122               msg (SE, _("`A' or `D' expected inside parentheses."));
123               goto error;
124             }
125           if (!lex_match (')'))
126             {
127               msg (SE, _("`)' expected."));
128               goto error;
129             }
130         }
131       scp->dirs = xrealloc (scp->dirs, sizeof *scp->dirs * scp->var_cnt);
132       for (; prev_var_cnt < scp->var_cnt; prev_var_cnt++)
133         scp->dirs[prev_var_cnt] = direction;
134     }
135   while (token != '.' && token != '/');
136   
137   return scp;
138
139  error:
140   destroy_sort_cases_pgm (scp);
141   return NULL;
142 }
143
144 void
145 destroy_sort_cases_pgm (struct sort_cases_pgm *scp) 
146 {
147   if (scp != NULL) 
148     {
149       assert (scp->ref_cnt > 0);
150       if (--scp->ref_cnt > 0)
151         return;
152
153       free (scp->vars);
154       free (scp->dirs);
155       destroy_internal_sort (scp->isrt);
156       destroy_external_sort (scp->xsrt);
157       free (scp);
158     }
159 }
160
161 /* Sorts the active file based on the key variables specified in
162    global variables vars and var_cnt.  The output is either to the
163    active file, if SEPARATE is zero, or to a separate file, if
164    SEPARATE is nonzero.  In the latter case the output cases can be
165    read with a call to read_sort_output().  (In the former case the
166    output cases should be dealt with through the usual vfm interface.)
167
168    The caller is responsible for freeing vars[]. */
169 int
170 sort_cases (struct sort_cases_pgm *scp, int separate)
171 {
172   /* Not sure this is necessary but it's good to be safe. */
173   if (separate && case_source_is_class (vfm_source, &sort_source_class))
174     procedure (NULL, NULL, NULL, NULL);
175   
176   /* SORT CASES cancels PROCESS IF. */
177   expr_free (process_if_expr);
178   process_if_expr = NULL;
179
180   /* Try an internal sort first. */
181   scp->isrt = do_internal_sort (scp, separate);
182   if (scp->isrt != NULL) 
183     return 1; 
184
185   /* Fall back to an external sort. */
186   write_active_file_to_disk ();
187   scp->xsrt = do_external_sort (scp, separate);
188   if (scp->xsrt != NULL) 
189     return 1;
190
191   destroy_sort_cases_pgm (scp);
192   return 0;
193 }
194 \f
195 struct internal_sort 
196   {
197     struct case_list **results;
198   };
199
200 /* If a reasonable situation is set up, do an internal sort of the
201    data.  Return success. */
202 static struct internal_sort *
203 do_internal_sort (struct sort_cases_pgm *scp, int separate)
204 {
205   struct internal_sort *isrt;
206
207   isrt = xmalloc (sizeof *isrt);
208   isrt->results = NULL;
209
210   if (!case_source_is_class (vfm_source, &disk_source_class))
211     {
212       if (!case_source_is_class (vfm_source, &memory_source_class))
213         procedure (NULL, NULL, NULL, NULL);
214
215       if (case_source_is_class (vfm_source, &memory_source_class))
216         {
217           struct case_list *case_list;
218           struct case_list **case_array;
219           int case_cnt;
220           int i;
221
222           case_cnt = vfm_source->class->count (vfm_source);
223           if (case_cnt <= 0)
224             return isrt;
225
226           if (case_cnt > set_max_workspace / sizeof *case_array)
227             goto error;
228
229           case_list = memory_source_get_cases (vfm_source);
230           case_array = malloc (sizeof *case_array * (case_cnt + 1));
231           if (case_array == NULL)
232             goto error;
233
234           for (i = 0; case_list != NULL; i++) 
235             {
236               case_array[i] = case_list;
237               case_list = case_list->next;
238             }
239           assert (i == case_cnt);
240           case_array[case_cnt] = NULL;
241
242           sort (case_array, case_cnt, sizeof *case_array,
243                 compare_case_lists, scp);
244
245           if (!separate) 
246             {
247               memory_source_set_cases (vfm_source, case_array[0]);
248               for (i = 1; i <= case_cnt; i++)
249                 case_array[i - 1]->next = case_array[i]; 
250               free (case_array);
251             }
252           else 
253             isrt->results = case_array;
254                       
255           return isrt;
256         }
257     }
258
259  error:
260   free (isrt);
261   return NULL;
262 }
263
264 static void
265 destroy_internal_sort (struct internal_sort *isrt) 
266 {
267   if (isrt != NULL) 
268     {
269       free (isrt->results);
270       free (isrt);
271     }
272 }
273
274 /* Compares the VAR_CNT variables in VARS[] between the
275    `case_list's at A and B, and returns a strcmp()-type
276    result. */
277 static int
278 compare_case_lists (const void *a_, const void *b_, void *scp_)
279 {
280   struct sort_cases_pgm *scp = scp_;
281   struct case_list *const *pa = a_;
282   struct case_list *const *pb = b_;
283   struct case_list *a = *pa;
284   struct case_list *b = *pb;
285
286   return compare_record (a->c.data, b->c.data, scp);
287 }
288 \f
289 /* External sort. */
290
291 /* Maximum order of merge.  If you increase this, then you should
292    use a heap for comparing cases during merge.  */
293 #define MAX_MERGE_ORDER         7
294
295 /* Minimum total number of records for buffers. */
296 #define MIN_BUFFER_TOTAL_SIZE_RECS      64
297
298 /* Minimum single input buffer size, in bytes and records. */
299 #define MIN_BUFFER_SIZE_BYTES   4096
300 #define MIN_BUFFER_SIZE_RECS    16
301
302 #if MIN_BUFFER_SIZE_RECS * 2 + 16 > MIN_BUFFER_TOTAL_SIZE_RECS
303 #error MIN_BUFFER_SIZE_RECS and MIN_BUFFER_TOTAL_SIZE_RECS do not make sense.
304 #endif
305
306 /* An initial run and its length. */
307 struct initial_run 
308   {
309     int file_idx;                     /* File index. */
310     size_t case_cnt;                  /* Number of cases. */
311   };
312
313 /* Sorts initial runs A and B in decending order by length. */
314 static int
315 compare_initial_runs (const void *a_, const void *b_, void *aux UNUSED) 
316 {
317   const struct initial_run *a = a_;
318   const struct initial_run *b = b_;
319   
320   return a->case_cnt > b->case_cnt ? -1 : a->case_cnt <b->case_cnt;
321 }
322
323 struct external_sort 
324   {
325     struct sort_cases_pgm *scp;       /* SORT CASES info. */
326     struct initial_run *initial_runs; /* Array of initial runs. */
327     size_t run_cnt, run_cap;          /* Number of runs, allocated capacity. */
328     char *temp_dir;                   /* Temporary file directory name. */
329     int next_file_idx;                /* Lowest unused file index. */
330     size_t case_size;                 /* Number of bytes in case. */
331   };
332
333 /* Prototypes for helper functions. */
334 static void sort_sink_write (struct case_sink *, struct ccase *);
335 static int write_initial_runs (struct external_sort *, int separate);
336 static int init_external_sort (struct external_sort *);
337 static int merge (struct external_sort *);
338 static void rmdir_temp_dir (struct external_sort *);
339 static void remove_temp_file (struct external_sort *xsrt, int file_idx);
340
341 /* Performs an external sort of the active file according to the
342    specification in SCP.  Forms initial runs using a heap as a
343    reservoir.  Determines the optimum merge pattern via Huffman's
344    method (see Knuth vol. 3, 2nd edition, p. 365-366), and merges
345    according to that pattern. */
346 static struct external_sort *
347 do_external_sort (struct sort_cases_pgm *scp, int separate)
348 {
349   struct external_sort *xsrt;
350   int success = 0;
351
352   xsrt = xmalloc (sizeof *xsrt);
353   xsrt->scp = scp;
354   xsrt->case_size = sizeof (union value) * compaction_nval;
355   if (!init_external_sort (xsrt))
356     goto done;
357   if (!write_initial_runs (xsrt, separate))
358     goto done;
359   if (!merge (xsrt))
360     goto done;
361
362   success = 1;
363
364  done:
365   if (success)
366     {
367       /* Don't destroy anything because we'll need it for reading
368          the output. */
369       return xsrt;
370     }
371   else
372     {
373       destroy_external_sort (xsrt);
374       return NULL;
375     }
376 }
377
378 /* Destroys XSRT. */
379 static void
380 destroy_external_sort (struct external_sort *xsrt) 
381 {
382   if (xsrt != NULL) 
383     {
384       int i;
385       
386       for (i = 0; i < xsrt->run_cnt; i++)
387         remove_temp_file (xsrt, xsrt->initial_runs[i].file_idx);
388       rmdir_temp_dir (xsrt);
389       free (xsrt->initial_runs);
390       free (xsrt);
391     }
392 }
393
394 #ifdef HAVE_MKDTEMP
395 /* Creates and returns the name of a temporary directory. */
396 static char *
397 make_temp_dir (void) 
398 {
399   const char *parent_dir;
400   char *temp_dir;
401
402   if (getenv ("TMPDIR") != NULL)
403     parent_dir = getenv ("TMPDIR");
404   else
405     parent_dir = P_tmpdir;
406
407   temp_dir = xmalloc (strlen (parent_dir) + 32);
408   sprintf (temp_dir, "%s%cpsppXXXXXX", parent_dir, DIR_SEPARATOR);
409   if (mkdtemp (temp_dir) == NULL) 
410     {
411       msg (SE, _("%s: Creating temporary directory: %s."),
412            temp_dir, strerror (errno));
413       free (temp_dir);
414       return NULL;
415     }
416   else
417     return temp_dir;
418 }
419 #else /* !HAVE_MKDTEMP */
420 /* Creates directory DIR. */
421 static int
422 do_mkdir (const char *dir) 
423 {
424 #ifndef __MSDOS__
425   return mkdir (dir, S_IRWXU);
426 #else
427   return mkdir (dir);
428 #endif
429 }
430
431 /* Creates and returns the name of a temporary directory. */
432 static char *
433 make_temp_dir (void) 
434 {
435   int i;
436   
437   for (i = 0; i < 100; i++)
438     {
439       char temp_dir[L_tmpnam + 1];
440       if (tmpnam (temp_dir) == NULL) 
441         {
442           msg (SE, _("Generating temporary directory name failed: %s."),
443                strerror (errno));
444           return NULL; 
445         }
446       else if (do_mkdir (temp_dir) == 0)
447         return xstrdup (temp_dir);
448     }
449   
450   msg (SE, _("Creating temporary directory failed: %s."), strerror (errno));
451   return NULL;
452 }
453 #endif /* !HAVE_MKDTEMP */
454
455 /* Sets up to open temporary files. */
456 static int
457 init_external_sort (struct external_sort *xsrt)
458 {
459   /* Zero. */
460   xsrt->temp_dir = NULL;
461   xsrt->next_file_idx = 0;
462
463   /* Huffman queue. */
464   xsrt->run_cap = 512;
465   xsrt->run_cnt = 0;
466   xsrt->initial_runs = xmalloc (sizeof *xsrt->initial_runs * xsrt->run_cap);
467
468   /* Temporary directory. */
469   xsrt->temp_dir = make_temp_dir ();
470   if (xsrt->temp_dir == NULL)
471     return 0;
472
473   return 1;
474 }
475
476
477 static int
478 simulate_error (void) 
479 {
480   static int op_err_cnt = -1;
481   static int op_cnt;
482   
483   if (op_err_cnt == -1 || op_cnt++ < op_err_cnt)
484     return 0;
485   else
486     {
487       errno = 0;
488       return 1;
489     }
490 }
491
492 /* Removes the directory created for temporary files, if one
493    exists. */
494 static void
495 rmdir_temp_dir (struct external_sort *xsrt)
496 {
497   if (xsrt->temp_dir != NULL && rmdir (xsrt->temp_dir) == -1) 
498     {
499       msg (SE, _("%s: Error removing directory for temporary files: %s."),
500            xsrt->temp_dir, strerror (errno));
501       xsrt->temp_dir = NULL; 
502     }
503 }
504
505 #define TEMP_FILE_NAME_SIZE (L_tmpnam + 32)
506 static void
507 get_temp_file_name (struct external_sort *xsrt, int file_idx,
508                     char filename[TEMP_FILE_NAME_SIZE]) 
509 {
510   assert (xsrt->temp_dir != NULL);
511   sprintf (filename, "%s%c%04d", xsrt->temp_dir, DIR_SEPARATOR, file_idx);
512 }
513
514 static FILE *
515 open_temp_file (struct external_sort *xsrt, int file_idx, const char *mode)
516 {
517   char temp_file[TEMP_FILE_NAME_SIZE];
518   FILE *file;
519
520   get_temp_file_name (xsrt, file_idx, temp_file);
521
522   file = fopen (temp_file, mode);
523   if (simulate_error () || file == NULL) 
524     msg (SE, _("%s: Error opening temporary file for %s: %s."),
525          temp_file, mode[0] == 'r' ? "reading" : "writing",
526          strerror (errno));
527
528   return file;
529 }
530
531 static int
532 close_temp_file (struct external_sort *xsrt, int file_idx, FILE *file)
533 {
534   if (file != NULL) 
535     {
536       char temp_file[TEMP_FILE_NAME_SIZE];
537       get_temp_file_name (xsrt, file_idx, temp_file);
538       if (simulate_error () || fclose (file) == EOF) 
539         {
540           msg (SE, _("%s: Error closing temporary file: %s."),
541                temp_file, strerror (errno));
542           return 0;
543         }
544     }
545   return 1;
546 }
547
548 static void
549 remove_temp_file (struct external_sort *xsrt, int file_idx) 
550 {
551   if (file_idx != -1)
552     {
553       char temp_file[TEMP_FILE_NAME_SIZE];
554       get_temp_file_name (xsrt, file_idx, temp_file);
555       if (simulate_error () || remove (temp_file) != 0)
556         msg (SE, _("%s: Error removing temporary file: %s."),
557              temp_file, strerror (errno));
558     }
559 }
560
561 static int
562 write_temp_file (struct external_sort *xsrt, int file_idx,
563                  FILE *file, const void *data, size_t size) 
564 {
565   if (!simulate_error () && fwrite (data, size, 1, file) == 1)
566     return 1;
567   else
568     {
569       char temp_file[TEMP_FILE_NAME_SIZE];
570       get_temp_file_name (xsrt, file_idx, temp_file);
571       msg (SE, _("%s: Error writing temporary file: %s."),
572            temp_file, strerror (errno));
573       return 0;
574     }
575 }
576
577 static int
578 read_temp_file (struct external_sort *xsrt, int file_idx,
579                 FILE *file, void *data, size_t size) 
580 {
581   if (!simulate_error () && fread (data, size, 1, file) == 1)
582     return 1;
583   else 
584     {
585       char temp_file[TEMP_FILE_NAME_SIZE];
586       get_temp_file_name (xsrt, file_idx, temp_file);
587       if (ferror (file))
588         msg (SE, _("%s: Error reading temporary file: %s."),
589              temp_file, strerror (errno));
590       else
591         msg (SE, _("%s: Unexpected end of temporary file."),
592              temp_file);
593       return 0;
594     }
595 }
596 \f
597 /* Replacement selection. */
598
599 /* Pairs a record with a run number. */
600 struct record_run
601   {
602     int run;                    /* Run number of case. */
603     struct case_list *record;   /* Case data. */
604   };
605
606 struct initial_run_state 
607   {
608     struct external_sort *xsrt;
609
610     /* Reservoir. */
611     struct record_run *records; /* Records arranged as a heap. */
612     size_t record_cnt;          /* Current number of records. */
613     size_t record_cap;          /* Capacity for records. */
614     struct case_list *free_list;/* Cases not in heap. */
615     
616     /* Run currently being output. */
617     int file_idx;               /* Temporary file number. */
618     size_t case_cnt;            /* Number of cases so far. */
619     size_t case_size;           /* Number of bytes in a case. */
620     FILE *output_file;          /* Output file. */
621     struct case_list *last_output;/* Record last output. */
622
623     int okay;                   /* Zero if an error has been encountered. */
624   };
625
626 static void destroy_initial_run_state (struct initial_run_state *irs);
627 static int allocate_cases (struct initial_run_state *);
628 static struct case_list *grab_case (struct initial_run_state *);
629 static void release_case (struct initial_run_state *, struct case_list *);
630 static void output_record (struct initial_run_state *irs);
631 static void start_run (struct initial_run_state *irs);
632 static void end_run (struct initial_run_state *irs);
633 static int compare_record_run (const struct record_run *,
634                                const struct record_run *,
635                                struct sort_cases_pgm *);
636 static int compare_record_run_minheap (const void *, const void *, void *);
637
638 static int
639 write_initial_runs (struct external_sort *xsrt, int separate)
640 {
641   struct initial_run_state *irs;
642   int success = 0;
643
644   /* Allocate memory for cases. */
645   irs = xmalloc (sizeof *irs);
646   irs->xsrt = xsrt;
647   irs->records = NULL;
648   irs->record_cnt = irs->record_cap = 0;
649   irs->free_list = NULL;
650   irs->output_file = NULL;
651   irs->last_output = NULL;
652   irs->file_idx = 0;
653   irs->case_cnt = 0;
654   irs->case_size = dict_get_case_size (default_dict);
655   irs->okay = 1;
656   if (!allocate_cases (irs)) 
657     goto done;
658
659   /* Create case sink. */
660   if (!separate)
661     {
662       if (vfm_sink)
663         vfm_sink->class->destroy (vfm_sink);
664       vfm_sink = create_case_sink (&sort_sink_class, irs);
665       xsrt->scp->ref_cnt++;
666     }
667
668   /* Create initial runs. */
669   start_run (irs);
670   procedure (NULL, NULL, NULL, NULL);
671   while (irs->record_cnt > 0 && irs->okay)
672     output_record (irs);
673   end_run (irs);
674
675   success = irs->okay;
676
677  done:
678   destroy_initial_run_state (irs);
679
680   return success;
681 }
682
683 /* Add a single case to an initial run. */
684 static void
685 sort_sink_write (struct case_sink *sink, struct ccase *c)
686 {
687   struct initial_run_state *irs = sink->aux;
688   struct record_run *new_record_run;
689
690   if (!irs->okay)
691     return;
692
693   /* Compose record_run for this run and add to heap. */
694   assert (irs->record_cnt < irs->record_cap);
695   new_record_run = irs->records + irs->record_cnt++;
696   new_record_run->record = grab_case (irs);
697   memcpy (new_record_run->record->c.data, c->data, irs->case_size);
698   new_record_run->run = irs->file_idx;
699   if (irs->last_output != NULL
700       && compare_record (c->data, irs->last_output->c.data,
701                          irs->xsrt->scp) < 0)
702     new_record_run->run = irs->xsrt->next_file_idx;
703   push_heap (irs->records, irs->record_cnt, sizeof *irs->records,
704              compare_record_run_minheap, irs->xsrt->scp);
705
706   /* Output a record if the reservoir is full. */
707   if (irs->record_cnt == irs->record_cap && irs->okay)
708     output_record (irs);
709 }
710
711 static void
712 destroy_initial_run_state (struct initial_run_state *irs) 
713 {
714   struct case_list *iter, *next;
715   int i;
716
717   if (irs == NULL)
718     return;
719
720   /* Release cases to free list. */
721   for (i = 0; i < irs->record_cnt; i++)
722     release_case (irs, irs->records[i].record);
723   if (irs->last_output != NULL)
724     release_case (irs, irs->last_output);
725
726   /* Free cases in free list. */
727   for (iter = irs->free_list; iter != NULL; iter = next) 
728     {
729       next = iter->next;
730       free (iter);
731     }
732
733   free (irs->records);
734   free (irs);
735
736   if (irs->output_file != NULL)
737     close_temp_file (irs->xsrt, irs->file_idx, irs->output_file);
738 }
739
740 /* Allocates room for lots of cases as a buffer. */
741 static int
742 allocate_cases (struct initial_run_state *irs)
743 {
744   size_t case_size;     /* Size of one case, in bytes. */
745   int approx_case_cost; /* Approximate memory cost of one case in bytes. */
746   int max_cases;        /* Maximum number of cases to allocate. */
747   int i;
748
749   /* Allocate as many cases as we can within the workspace
750      limit. */
751   case_size = dict_get_case_size (default_dict);
752   approx_case_cost = (sizeof *irs->records
753                       + sizeof *irs->free_list
754                       + case_size
755                       + 4 * sizeof (void *));
756   max_cases = set_max_workspace / approx_case_cost;
757   irs->records = malloc (sizeof *irs->records * max_cases);
758   for (i = 0; i < max_cases; i++)
759     {
760       struct case_list *c;
761       c = malloc (sizeof *c + case_size - sizeof (union value));
762       if (c == NULL) 
763         {
764           max_cases = i;
765           break;
766         }
767       release_case (irs, c);
768     }
769
770   /* irs->records gets all but one of the allocated cases.
771      The extra is used for last_output. */
772   irs->record_cap = max_cases - 1;
773
774   /* Fail if we didn't allocate an acceptable number of cases. */
775   if (irs->records == NULL || max_cases < MIN_BUFFER_TOTAL_SIZE_RECS)
776     {
777       msg (SE, _("Out of memory.  Could not allocate room for minimum of %d "
778                  "cases of %d bytes each.  (PSPP workspace is currently "
779                  "restricted to a maximum of %d KB.)"),
780            MIN_BUFFER_TOTAL_SIZE_RECS, approx_case_cost, set_max_workspace / 1024);
781       return 0;
782     }
783   return 1;
784 }
785
786 /* Compares the VAR_CNT variables in VARS[] between the `value's at
787    A and B, and returns a strcmp()-type result. */
788 static int
789 compare_record (const union value *a, const union value *b,
790                 const struct sort_cases_pgm *scp)
791 {
792   int i;
793
794   assert (a != NULL);
795   assert (b != NULL);
796   
797   for (i = 0; i < scp->var_cnt; i++)
798     {
799       struct variable *v = scp->vars[i];
800       int fv = v->fv;
801       int result;
802
803       if (v->type == NUMERIC)
804         {
805           double af = a[fv].f;
806           double bf = b[fv].f;
807           
808           result = af < bf ? -1 : af > bf;
809         }
810       else
811         result = memcmp (a[fv].s, b[fv].s, v->width);
812
813       if (result != 0) 
814         {
815           if (scp->dirs[i] == SRT_DESCEND)
816             result = -result;
817           return result;
818         }
819     }
820
821   return 0;
822 }
823
824 static int
825 compare_record_run (const struct record_run *a,
826                     const struct record_run *b,
827                     struct sort_cases_pgm *scp) 
828 {
829   if (a->run != b->run)
830     return a->run > b->run ? 1 : -1;
831   else
832     return compare_record (a->record->c.data, b->record->c.data, scp);
833 }
834
835 static int
836 compare_record_run_minheap (const void *a, const void *b, void *scp) 
837 {
838   return -compare_record_run (a, b, scp);
839 }
840
841 /* Begins a new initial run, specifically its output file. */
842 static void
843 start_run (struct initial_run_state *irs)
844 {
845   irs->file_idx = irs->xsrt->next_file_idx++;
846   irs->case_cnt = 0;
847   irs->output_file = open_temp_file (irs->xsrt, irs->file_idx, "wb");
848   if (irs->output_file == NULL) 
849     irs->okay = 0;
850   if (irs->last_output != NULL) 
851     {
852       release_case (irs, irs->last_output);
853       irs->last_output = NULL; 
854     }
855 }
856
857 /* Ends the current initial run.  */
858 static void
859 end_run (struct initial_run_state *irs)
860 {
861   struct external_sort *xsrt = irs->xsrt;
862   
863   /* Record initial run. */
864   if (xsrt->run_cnt >= xsrt->run_cap) 
865     {
866       xsrt->run_cap *= 2;
867       xsrt->initial_runs
868         = xrealloc (xsrt->initial_runs,
869                     sizeof *xsrt->initial_runs * xsrt->run_cap);
870     }
871   xsrt->initial_runs[xsrt->run_cnt].file_idx = irs->file_idx;
872   xsrt->initial_runs[xsrt->run_cnt].case_cnt = irs->case_cnt;
873   xsrt->run_cnt++;
874
875   /* Close file handle. */
876   if (irs->output_file != NULL
877       && !close_temp_file (irs->xsrt, irs->file_idx, irs->output_file)) 
878     irs->okay = 0;
879   irs->output_file = NULL;
880 }
881
882 static void
883 output_record (struct initial_run_state *irs)
884 {
885   struct record_run *record_run;
886   struct ccase *out_case;
887   
888   /* Extract minimum case from heap. */
889   assert (irs->record_cnt > 0);
890   pop_heap (irs->records, irs->record_cnt--, sizeof *irs->records,
891             compare_record_run_minheap, irs->xsrt->scp);
892   record_run = irs->records + irs->record_cnt;
893
894   /* Bail if an error has occurred. */
895   if (!irs->okay)
896     return;
897
898   /* Obtain case data to write to disk. */
899   out_case = &record_run->record->c;
900   if (compaction_necessary)
901     {
902       compact_case (compaction_case, out_case);
903       out_case = compaction_case;
904     }
905
906   /* Start new run if necessary. */
907   assert (record_run->run == irs->file_idx
908           || record_run->run == irs->xsrt->next_file_idx);
909   if (record_run->run != irs->file_idx)
910     {
911       end_run (irs);
912       start_run (irs);
913     }
914   assert (record_run->run == irs->file_idx);
915   irs->case_cnt++;
916
917   /* Write to disk. */
918   if (irs->output_file != NULL
919       && !write_temp_file (irs->xsrt, irs->file_idx, irs->output_file,
920                            out_case->data,
921                            sizeof *out_case->data * compaction_nval))
922     irs->okay = 0;
923
924   /* This record becomes last_output. */
925   if (irs->last_output != NULL)
926     release_case (irs, irs->last_output);
927   irs->last_output = record_run->record;
928 }
929
930 static struct case_list *
931 grab_case (struct initial_run_state *irs)
932 {
933   struct case_list *c;
934   
935   assert (irs != NULL);
936   assert (irs->free_list != NULL);
937
938   c = irs->free_list;
939   irs->free_list = c->next;
940   return c;
941 }
942
943 static void 
944 release_case (struct initial_run_state *irs, struct case_list *c) 
945 {
946   assert (irs != NULL);
947   assert (c != NULL);
948
949   c->next = irs->free_list;
950   irs->free_list = c;
951 }
952 \f
953 /* Merging. */
954
955 struct merge_state 
956   {
957     struct external_sort *xsrt; /* External sort state. */
958     struct ccase **cases;       /* Buffers. */
959     size_t case_cnt;            /* Number of buffers. */
960   };
961
962 struct run;
963 static int merge_once (struct merge_state *,
964                        const struct initial_run[], size_t,
965                        struct initial_run *);
966 static int fill_run_buffer (struct merge_state *, struct run *);
967 static int mod (int, int);
968
969 /* Performs a series of P-way merges of initial runs
970    method. */
971 static int
972 merge (struct external_sort *xsrt)
973 {
974   struct merge_state mrg;       /* State of merge. */
975   size_t case_size;             /* Size of one case, in bytes. */
976   size_t approx_case_cost;      /* Approximate memory cost of one case. */
977   int max_order;                /* Maximum order of merge. */
978   size_t dummy_run_cnt;         /* Number of dummy runs to insert. */
979   int success = 0;
980   int i;
981
982   mrg.xsrt = xsrt;
983
984   /* Allocate as many cases as possible into cases. */
985   case_size = dict_get_case_size (default_dict);
986   approx_case_cost = sizeof *mrg.cases + case_size + 4 * sizeof (void *);
987   mrg.case_cnt = set_max_workspace / approx_case_cost;
988   mrg.cases = malloc (sizeof *mrg.cases * mrg.case_cnt);
989   if (mrg.cases == NULL)
990     goto done;
991   for (i = 0; i < mrg.case_cnt; i++) 
992     {
993       mrg.cases[i] = malloc (case_size);
994       if (mrg.cases[i] == NULL) 
995         {
996           mrg.case_cnt = i;
997           break;
998         }
999     }
1000   if (mrg.case_cnt < MIN_BUFFER_TOTAL_SIZE_RECS)
1001     {
1002       msg (SE, _("Out of memory.  Could not allocate room for minimum of %d "
1003                  "cases of %d bytes each.  (PSPP workspace is currently "
1004                  "restricted to a maximum of %d KB.)"),
1005            MIN_BUFFER_TOTAL_SIZE_RECS, approx_case_cost, set_max_workspace / 1024);
1006       return 0;
1007     }
1008
1009   /* Determine maximum order of merge. */
1010   max_order = MAX_MERGE_ORDER;
1011   if (mrg.case_cnt / max_order < MIN_BUFFER_SIZE_RECS)
1012     max_order = mrg.case_cnt / MIN_BUFFER_SIZE_RECS;
1013   else if (mrg.case_cnt / max_order * case_size < MIN_BUFFER_SIZE_BYTES)
1014     max_order = mrg.case_cnt / (MIN_BUFFER_SIZE_BYTES / case_size);
1015   if (max_order < 2)
1016     max_order = 2;
1017   if (max_order > xsrt->run_cnt)
1018     max_order = xsrt->run_cnt;
1019
1020   /* Repeatedly merge the P shortest existing runs until only one run
1021      is left. */
1022   make_heap (xsrt->initial_runs, xsrt->run_cnt, sizeof *xsrt->initial_runs,
1023              compare_initial_runs, NULL);
1024   dummy_run_cnt = mod (1 - (int) xsrt->run_cnt, max_order - 1);
1025   assert (max_order == 1
1026           || (xsrt->run_cnt + dummy_run_cnt) % (max_order - 1) == 1);
1027   while (xsrt->run_cnt > 1)
1028     {
1029       struct initial_run output_run;
1030       int order;
1031       int i;
1032
1033       /* Choose order of merge (max_order after first merge). */
1034       order = max_order - dummy_run_cnt;
1035       dummy_run_cnt = 0;
1036
1037       /* Choose runs to merge. */
1038       assert (xsrt->run_cnt >= order);
1039       for (i = 0; i < order; i++) 
1040         pop_heap (xsrt->initial_runs, xsrt->run_cnt--,
1041                   sizeof *xsrt->initial_runs,
1042                   compare_initial_runs, NULL); 
1043           
1044       /* Merge runs. */
1045       if (!merge_once (&mrg, xsrt->initial_runs + xsrt->run_cnt, order,
1046                        &output_run))
1047         goto done;
1048
1049       /* Add output run to heap. */
1050       xsrt->initial_runs[xsrt->run_cnt++] = output_run;
1051       push_heap (xsrt->initial_runs, xsrt->run_cnt, sizeof *xsrt->initial_runs,
1052                  compare_initial_runs, NULL);
1053     }
1054
1055   /* Exactly one run is left, which contains the entire sorted
1056      file.  We could use it to find a total case count. */
1057   assert (xsrt->run_cnt == 1);
1058
1059   success = 1;
1060
1061  done:
1062   for (i = 0; i < mrg.case_cnt; i++)
1063     free (mrg.cases[i]);
1064   free (mrg.cases);
1065
1066   return success;
1067 }
1068
1069 /* Modulo function as defined by Knuth. */
1070 static int
1071 mod (int x, int y)
1072 {
1073   if (y == 0)
1074     return x;
1075   else if (x == 0)
1076     return 0;
1077   else if (x > 0 && y > 0)
1078     return x % y;
1079   else if (x < 0 && y > 0)
1080     return y - (-x) % y;
1081
1082   assert (0);
1083 }
1084
1085 /* A run of data for use in merging. */
1086 struct run 
1087   {
1088     FILE *file;                 /* File that contains run. */
1089     int file_idx;               /* Index of file that contains run. */
1090     struct ccase **buffer;      /* Case buffer. */
1091     struct ccase **buffer_head; /* First unconsumed case in buffer. */
1092     struct ccase **buffer_tail; /* One past last unconsumed case in buffer. */
1093     size_t buffer_cap;          /* Number of cases buffer can hold. */
1094     size_t unread_case_cnt;     /* Number of cases not yet read. */
1095   };
1096
1097 /* Merges the RUN_CNT initial runs specified in INPUT_RUNS into a
1098    new run.  Returns nonzero only if successful.  Adds an entry
1099    to MRG->xsrt->runs for the output file if and only if the
1100    output file is actually created.  Always deletes all the input
1101    files. */
1102 static int
1103 merge_once (struct merge_state *mrg,
1104             const struct initial_run input_runs[],
1105             size_t run_cnt,
1106             struct initial_run *output_run)
1107 {
1108   struct run runs[MAX_MERGE_ORDER];
1109   FILE *output_file = NULL;
1110   size_t case_size;
1111   int success = 0;
1112   int i;
1113
1114   /* Initialize runs[]. */
1115   for (i = 0; i < run_cnt; i++) 
1116     {
1117       runs[i].file = NULL;
1118       runs[i].file_idx = input_runs[i].file_idx;
1119       runs[i].buffer = mrg->cases + mrg->case_cnt / run_cnt * i;
1120       runs[i].buffer_head = runs[i].buffer;
1121       runs[i].buffer_tail = runs[i].buffer;
1122       runs[i].buffer_cap = mrg->case_cnt / run_cnt;
1123       runs[i].unread_case_cnt = input_runs[i].case_cnt;
1124     }
1125
1126   /* Open input files. */
1127   for (i = 0; i < run_cnt; i++) 
1128     {
1129       runs[i].file = open_temp_file (mrg->xsrt, runs[i].file_idx, "rb");
1130       if (runs[i].file == NULL)
1131         goto error;
1132     }
1133   
1134   /* Create output file and count cases to be output. */
1135   output_run->file_idx = mrg->xsrt->next_file_idx++;
1136   output_run->case_cnt = 0;
1137   for (i = 0; i < run_cnt; i++)
1138     output_run->case_cnt += input_runs[i].case_cnt;
1139   output_file = open_temp_file (mrg->xsrt, output_run->file_idx, "wb");
1140   if (output_file == NULL) 
1141     goto error;
1142
1143   /* Prime buffers. */
1144   for (i = 0; i < run_cnt; i++)
1145     if (!fill_run_buffer (mrg, runs + i))
1146       goto error;
1147
1148   /* Merge. */
1149   case_size = dict_get_case_size (default_dict);
1150   while (run_cnt > 0) 
1151     {
1152       struct run *min_run;
1153
1154       /* Find minimum. */
1155       min_run = runs;
1156       for (i = 1; i < run_cnt; i++)
1157         if (compare_record ((*runs[i].buffer_head)->data,
1158                             (*min_run->buffer_head)->data,
1159                             mrg->xsrt->scp) < 0)
1160           min_run = runs + i;
1161
1162       /* Write minimum to output file. */
1163       if (!write_temp_file (mrg->xsrt, min_run->file_idx, output_file,
1164                             (*min_run->buffer_head)->data, case_size))
1165         goto error;
1166
1167       /* Remove case from buffer. */
1168       if (++min_run->buffer_head >= min_run->buffer_tail)
1169         {
1170           /* Buffer is empty.  Fill from file. */
1171           if (!fill_run_buffer (mrg, min_run))
1172             goto error;
1173
1174           /* If buffer is still empty, delete its run. */
1175           if (min_run->buffer_head >= min_run->buffer_tail)
1176             {
1177               close_temp_file (mrg->xsrt, min_run->file_idx, min_run->file);
1178               remove_temp_file (mrg->xsrt, min_run->file_idx);
1179               *min_run = runs[--run_cnt];
1180
1181               /* We could donate the now-unused buffer space to
1182                  other runs. */
1183             }
1184         } 
1185     }
1186
1187   /* Close output file.  */
1188   close_temp_file (mrg->xsrt, output_run->file_idx, output_file);
1189
1190   return 1;
1191
1192  error:
1193   /* Close and remove output file.  */
1194   if (output_file != NULL) 
1195     {
1196       close_temp_file (mrg->xsrt, output_run->file_idx, output_file);
1197       remove_temp_file (mrg->xsrt, output_run->file_idx);
1198     }
1199   
1200   /* Close and remove any remaining input runs. */
1201   for (i = 0; i < run_cnt; i++) 
1202     {
1203       close_temp_file (mrg->xsrt, runs[i].file_idx, runs[i].file);
1204       remove_temp_file (mrg->xsrt, runs[i].file_idx);
1205     }
1206
1207   return success;
1208 }
1209
1210 /* Reads as many cases as possible into RUN's buffer.
1211    Reads nonzero unless a disk error occurs. */
1212 static int
1213 fill_run_buffer (struct merge_state *mrg, struct run *run) 
1214 {
1215   run->buffer_head = run->buffer_tail = run->buffer;
1216   while (run->unread_case_cnt > 0
1217          && run->buffer_tail < run->buffer + run->buffer_cap)
1218     {
1219       if (!read_temp_file (mrg->xsrt, run->file_idx, run->file,
1220                            (*run->buffer_tail)->data,
1221                            dict_get_case_size (default_dict)))
1222         return 0;
1223
1224       run->unread_case_cnt--;
1225       run->buffer_tail++;
1226     }
1227
1228   return 1;
1229 }
1230 \f
1231 static void
1232 sort_sink_destroy (struct case_sink *sink UNUSED) 
1233 {
1234   assert (0);
1235 }
1236
1237 static struct case_source *
1238 sort_sink_make_source (struct case_sink *sink) 
1239 {
1240   struct initial_run_state *irs = sink->aux;
1241
1242   return create_case_source (&sort_source_class, irs->xsrt->scp);
1243 }
1244
1245 const struct case_sink_class sort_sink_class = 
1246   {
1247     "SORT CASES",
1248     NULL,
1249     sort_sink_write,
1250     sort_sink_destroy,
1251     sort_sink_make_source,
1252   };
1253 \f
1254 /* Reads all the records from the source stream and passes them
1255    to write_case(). */
1256 static void
1257 sort_source_read (struct case_source *source,
1258                   write_case_func *write_case, write_case_data wc_data)
1259 {
1260   struct sort_cases_pgm *scp = source->aux;
1261   
1262   read_sort_output (scp, write_case, wc_data);
1263 }
1264
1265 void read_internal_sort_output (struct internal_sort *isrt,
1266                                 write_case_func *write_case,
1267                                 write_case_data wc_data);
1268 void read_external_sort_output (struct external_sort *xsrt,
1269                                 write_case_func *write_case,
1270                                 write_case_data wc_data);
1271
1272 /* Reads all the records from the output stream and passes them to the
1273    function provided, which must have an interface identical to
1274    write_case(). */
1275 void
1276 read_sort_output (struct sort_cases_pgm *scp,
1277                   write_case_func *write_case, write_case_data wc_data)
1278 {
1279   assert ((scp->isrt != NULL) + (scp->xsrt != NULL) <= 1);
1280   if (scp->isrt != NULL)
1281     read_internal_sort_output (scp->isrt, write_case, wc_data);
1282   else if (scp->xsrt != NULL)
1283     read_external_sort_output (scp->xsrt, write_case, wc_data);
1284   else 
1285     {
1286       /* No results.  Probably an external sort that failed. */
1287     }
1288 }
1289
1290 void
1291 read_internal_sort_output (struct internal_sort *isrt,
1292                            write_case_func *write_case,
1293                            write_case_data wc_data)
1294 {
1295   struct ccase *save_temp_case = temp_case;
1296   struct case_list **p;
1297
1298   for (p = isrt->results; *p; p++) 
1299     {
1300       temp_case = &(*p)->c;
1301       write_case (wc_data);
1302     }
1303   free (isrt->results);
1304             
1305   temp_case = save_temp_case;
1306 }
1307
1308 void
1309 read_external_sort_output (struct external_sort *xsrt,
1310                            write_case_func *write_case,
1311                            write_case_data wc_data)
1312 {
1313   FILE *file;
1314   int file_idx;
1315   size_t i;
1316
1317   assert (xsrt->run_cnt == 1);
1318   file_idx = xsrt->initial_runs[0].file_idx;
1319
1320   file = open_temp_file (xsrt, file_idx, "rb");
1321   if (file == NULL)
1322     {
1323       err_failure ();
1324       return;
1325     }
1326
1327   for (i = 0; i < xsrt->initial_runs[0].case_cnt; i++)
1328     {
1329       if (!read_temp_file (xsrt, file_idx, file,
1330                           temp_case, xsrt->case_size))
1331         {
1332           err_failure ();
1333           break;
1334         }
1335
1336       if (!write_case (wc_data))
1337         break;
1338     }
1339 }
1340
1341 static void
1342 sort_source_destroy (struct case_source *source) 
1343 {
1344   struct sort_cases_pgm *scp = source->aux;
1345   
1346   destroy_sort_cases_pgm (scp);
1347 }
1348
1349 const struct case_source_class sort_source_class =
1350   {
1351     "SORT CASES",
1352     NULL, /* FIXME */
1353     sort_source_read,
1354     sort_source_destroy,
1355   };