de3ff977aa86af822e78cee258e69ea8d7dbabe2
[pspp] / src / sort.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include "sort.h"
22 #include <assert.h>
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <errno.h>
26 #include "alloc.h"
27 #include "approx.h"
28 #include "command.h"
29 #include "error.h"
30 #include "expr.h"
31 #include "heap.h"
32 #include "lexer.h"
33 #include "misc.h"
34 #include "str.h"
35 #include "var.h"
36 #include "vfm.h"
37 #include "vfmP.h"
38
39 #if HAVE_UNISTD_H
40 #include <unistd.h>
41 #endif
42
43 #if HAVE_SYS_TYPES_H
44 #include <sys/types.h>
45 #endif
46
47 #if HAVE_SYS_STAT_H
48 #include <sys/stat.h>
49 #endif
50
51 #include "debug-print.h"
52
53 /* Variables to sort. */
54 struct variable **v_sort;
55 int nv_sort;
56
57 /* Used when internal-sorting to a separate file. */
58 static struct case_list **separate_case_tab;
59
60 /* Other prototypes. */
61 static int compare_case_lists (const void *, const void *);
62 static int do_internal_sort (int separate);
63 static int do_external_sort (int separate);
64 int parse_sort_variables (void);
65 void read_sort_output (int (*write_case) (void));
66
67 /* Performs the SORT CASES procedures. */
68 int
69 cmd_sort_cases (void)
70 {
71   /* First, just parse the command. */
72   lex_match_id ("SORT");
73   lex_match_id ("CASES");
74   lex_match (T_BY);
75
76   if (!parse_sort_variables ())
77     return CMD_FAILURE;
78       
79   cancel_temporary ();
80
81   /* Then it's time to do the actual work.  There are two cases:
82
83      (internal sort) All the data is in memory.  In this case, we
84      perform an EXECUTE to get the data into the desired form, then
85      sort the cases in place, if it is still all in memory.
86
87      (external sort) The data is not in memory.  It may be coming from
88      a system file or other data file, etc.  In any case, it is now
89      time to perform an external sort.  This is better explained in
90      do_external_sort(). */
91
92   /* Do all this dirty work. */
93   {
94     int success = sort_cases (0);
95     free (v_sort);
96     if (success)
97       return lex_end_of_command ();
98     else
99       return CMD_FAILURE;
100   }
101 }
102
103 /* Parses a list of sort variables into v_sort and nv_sort.  */
104 int
105 parse_sort_variables (void)
106 {
107   v_sort = NULL;
108   nv_sort = 0;
109   do
110     {
111       int prev_nv_sort = nv_sort;
112       int order = SRT_ASCEND;
113
114       if (!parse_variables (default_dict, &v_sort, &nv_sort,
115                             PV_NO_DUPLICATE | PV_APPEND | PV_NO_SCRATCH))
116         return 0;
117       if (lex_match ('('))
118         {
119           if (lex_match_id ("D") || lex_match_id ("DOWN"))
120             order = SRT_DESCEND;
121           else if (!lex_match_id ("A") && !lex_match_id ("UP"))
122             {
123               free (v_sort);
124               msg (SE, _("`A' or `D' expected inside parentheses."));
125               return 0;
126             }
127           if (!lex_match (')'))
128             {
129               free (v_sort);
130               msg (SE, _("`)' expected."));
131               return 0;
132             }
133         }
134       for (; prev_nv_sort < nv_sort; prev_nv_sort++)
135         v_sort[prev_nv_sort]->p.srt.order = order;
136     }
137   while (token != '.' && token != '/');
138   
139   return 1;
140 }
141
142 /* Sorts the active file based on the key variables specified in
143    global variables v_sort and nv_sort.  The output is either to the
144    active file, if SEPARATE is zero, or to a separate file, if
145    SEPARATE is nonzero.  In the latter case the output cases can be
146    read with a call to read_sort_output().  (In the former case the
147    output cases should be dealt with through the usual vfm interface.)
148
149    The caller is responsible for freeing v_sort[]. */
150 int
151 sort_cases (int separate)
152 {
153   assert (separate_case_tab == NULL);
154
155   /* Not sure this is necessary but it's good to be safe. */
156   if (separate && vfm_source == &sort_stream)
157     procedure (NULL, NULL, NULL);
158   
159   /* SORT CASES cancels PROCESS IF. */
160   expr_free (process_if_expr);
161   process_if_expr = NULL;
162
163   if (do_internal_sort (separate))
164     return 1;
165
166   page_to_disk ();
167   return do_external_sort (separate);
168 }
169
170 /* If a reasonable situation is set up, do an internal sort of the
171    data.  Return success. */
172 static int
173 do_internal_sort (int separate)
174 {
175   if (vfm_source != &vfm_disk_stream)
176     {
177       if (vfm_source != &vfm_memory_stream)
178         procedure (NULL, NULL, NULL);
179       if (vfm_source == &vfm_memory_stream)
180         {
181           struct case_list **case_tab = malloc (sizeof *case_tab
182                                          * (vfm_source_info.ncases + 1));
183           if (vfm_source_info.ncases == 0)
184             {
185               free (case_tab);
186               return 1;
187             }
188           if (case_tab != NULL)
189             {
190               struct case_list *clp = memory_source_cases;
191               struct case_list **ctp = case_tab;
192               int i;
193
194               for (; clp; clp = clp->next)
195                 *ctp++ = clp;
196               qsort (case_tab, vfm_source_info.ncases, sizeof *case_tab,
197                      compare_case_lists);
198
199               if (!separate)
200                 {
201                   memory_source_cases = case_tab[0];
202                   for (i = 1; i < vfm_source_info.ncases; i++)
203                     case_tab[i - 1]->next = case_tab[i];
204                   case_tab[vfm_source_info.ncases - 1]->next = NULL;
205                   free (case_tab);
206                 } else {
207                   case_tab[vfm_source_info.ncases] = NULL;
208                   separate_case_tab = case_tab;
209                 }
210               
211               return 1;
212             }
213         }
214     }
215   return 0;
216 }
217
218 /* Compares the NV_SORT variables in V_SORT[] between the
219    `case_list's at A and B, and returns a strcmp()-type
220    result. */
221 static int
222 compare_case_lists (const void *a_, const void *b_)
223 {
224   struct case_list *const *pa = a_;
225   struct case_list *const *pb = b_;
226   struct case_list *a = *pa;
227   struct case_list *b = *pb;
228   struct variable *v;
229   int result = 0;
230   int i;
231
232   for (i = 0; i < nv_sort; i++)
233     {
234       v = v_sort[i];
235       
236       if (v->type == NUMERIC)
237         {
238           double af = a->c.data[v->fv].f;
239           double bf = b->c.data[v->fv].f;
240
241           result = af < bf ? -1 : af > bf;
242         }
243       else
244         result = memcmp (a->c.data[v->fv].s, b->c.data[v->fv].s, v->width);
245
246       if (result != 0)
247         break;
248     }
249
250   if (v->p.srt.order == SRT_DESCEND)
251     result = -result;
252   return result;
253 }
254 \f
255 /* External sort. */
256
257 /* Maximum number of input + output file handles. */
258 #if defined FOPEN_MAX && (FOPEN_MAX - 5 < 18)
259 #define MAX_FILE_HANDLES        (FOPEN_MAX - 5)
260 #else
261 #define MAX_FILE_HANDLES        18
262 #endif
263
264 #if MAX_FILE_HANDLES < 3
265 #error At least 3 file handles must be available for sorting.
266 #endif
267
268 /* Number of input buffers. */
269 #define N_INPUT_BUFFERS         (MAX_FILE_HANDLES - 1)
270
271 /* Maximum order of merge.  This is the value suggested by Knuth;
272    specifically, he said to use tree selection, which we don't
273    implement, for larger orders of merge. */
274 #define MAX_MERGE_ORDER         7
275
276 /* Minimum total number of records for buffers. */
277 #define MIN_BUFFER_TOTAL_SIZE_RECS      64
278
279 /* Minimum single input or output buffer size, in bytes and records. */
280 #define MIN_BUFFER_SIZE_BYTES   4096
281 #define MIN_BUFFER_SIZE_RECS    16
282
283 /* Structure for replacement selection tree. */
284 struct repl_sel_tree
285   {
286     struct repl_sel_tree *loser;/* Loser associated w/this internal node. */
287     int rn;                     /* Run number of `loser'. */
288     struct repl_sel_tree *fe;   /* Internal node above this external node. */
289     struct repl_sel_tree *fi;   /* Internal node above this internal node. */
290     union value record[1];      /* The case proper. */
291   };
292
293 /* Static variables used for sorting. */
294 static struct repl_sel_tree **x; /* Buffers. */
295 static int x_max;               /* Size of buffers, in records. */
296 static int records_per_buffer;  /* Number of records in each buffer. */
297
298 /* In the merge phase, the first N_INPUT_BUFFERS handle[] elements are
299    input files and the last element is the output file.  Before that,
300    they're all used as output files, although the last one is
301    segregated. */
302 static FILE *handle[MAX_FILE_HANDLES];  /* File handles. */
303
304 /* Now, MAX_FILE_HANDLES is the maximum number of files we will *try*
305    to open.  But if we can't open that many, max_handles will be set
306    to the number we apparently can open. */
307 static int max_handles;         /* Maximum number of handles. */
308
309 /* When we create temporary files, they are all put in the same
310    directory and numbered sequentially from zero.  tmp_basename is the
311    drive/directory, etc., and tmp_extname can be sprintf() with "%08x"
312    to the file number, then tmp_basename used to open the file. */
313 static char *tmp_basename;      /* Temporary file basename. */
314 static char *tmp_extname;       /* Temporary file extension name. */
315
316 /* We use Huffman's method to determine the merge pattern.  This means
317    that we need to know which runs are the shortest at any given time.
318    Priority queues as implemented by heap.c are a natural for this
319    task (probably because I wrote the code specifically for it). */
320 static struct heap *huffman_queue;      /* Huffman priority queue. */
321
322 /* Prototypes for helper functions. */
323 static void sort_stream_write (void);
324 static int write_initial_runs (int separate);
325 static int allocate_cases (void);
326 static int allocate_file_handles (void);
327 static int merge (void);
328 static void rmdir_temp_dir (void);
329
330 /* Performs an external sort of the active file.  A description of the
331    procedure follows.  All page references refer to Knuth's _Art of
332    Computer Programming, Vol. 3: Sorting and Searching_, which is the
333    canonical resource for sorting.
334
335    1. The data is read and S initial runs are formed through the
336    action of algorithm 5.4.1R (replacement selection).
337
338    2. Huffman's method (p. 365-366) is used to determine the optimum
339    merge pattern.
340
341    3. If an OS that supports overlapped reading, writing, and
342    computing is being run, we should use 5.4.6F for forecasting.
343    Otherwise, buffers are filled only when they run out of data.
344    FIXME: Since the author of PSPP uses GNU/Linux, which does not
345    yet implement overlapped r/w/c, 5.4.6F is not used.
346
347    4. We perform P-way merges:
348
349    (a) The desired P is the smallest P such that ceil(ln(S)/ln(P))
350    is minimized.  (FIXME: Since I don't have an algorithm for
351    minimizing this, it's just set to MAX_MERGE_ORDER.)
352
353    (b) P is reduced if the selected value would make input buffers
354    less than 4096 bytes each, or 16 records, whichever is larger.
355
356    (c) P is reduced if we run out of available file handles or space
357    for file handles.
358
359    (d) P is reduced if we don't have space for one or two output
360    buffers, which have the same minimum size as input buffers.  (We
361    need two output buffers if 5.4.6F is in use for forecasting.)  */
362 static int
363 do_external_sort (int separate)
364 {
365   int success = 0;
366
367   assert (MAX_FILE_HANDLES >= 3);
368
369   x = NULL;
370   tmp_basename = NULL;
371
372   huffman_queue = heap_create (512);
373   if (huffman_queue == NULL)
374     return 0;
375
376   if (!allocate_cases ())
377     goto lossage;
378
379   if (!allocate_file_handles ())
380     goto lossage;
381
382   if (!write_initial_runs (separate))
383     goto lossage;
384
385   merge ();
386
387   success = 1;
388
389   /* Despite the name, flow of control comes here regardless of
390      whether or not the sort is successful. */
391 lossage:
392   heap_destroy (huffman_queue);
393
394   if (x)
395     {
396       int i;
397
398       for (i = 0; i <= x_max; i++)
399         free (x[i]);
400       free (x);
401     }
402
403   if (!success)
404     rmdir_temp_dir ();
405
406   return success;
407 }
408
409 #if !HAVE_GETPID
410 #define getpid() (0)
411 #endif
412
413 /* Sets up to open temporary files. */
414 /* PORTME: This creates a directory for temporary files.  Some OSes
415    might not have that concept... */
416 static int
417 allocate_file_handles (void)
418 {
419   const char *dir;              /* Directory prefix. */
420   char *buf;                    /* String buffer. */
421   char *cp;                     /* Pointer into buf. */
422
423   dir = getenv ("SPSSTMPDIR");
424   if (dir == NULL)
425     dir = getenv ("SPSSXTMPDIR");
426   if (dir == NULL)
427     dir = getenv ("TMPDIR");
428 #ifdef P_tmpdir
429   if (dir == NULL)
430     dir = P_tmpdir;
431 #endif
432 #ifdef unix
433   if (dir == NULL)
434     dir = "/tmp";
435 #elif defined (__MSDOS__)
436   if (dir == NULL)
437     dir = getenv ("TEMP");
438   if (dir == NULL)
439     dir = getenv ("TMP");
440   if (dir == NULL)
441     dir = "\\";
442 #else
443   dir = "";
444 #endif
445
446   buf = xmalloc (strlen (dir) + 1 + 4 + 8 + 4 + 1 + INT_DIGITS + 1);
447   cp = spprintf (buf, "%s%c%04lX%04lXpspp", dir, DIR_SEPARATOR,
448                  ((long) time (0)) & 0xffff, ((long) getpid ()) & 0xffff);
449 #ifndef __MSDOS__
450   if (-1 == mkdir (buf, S_IRWXU))
451 #else
452   if (-1 == mkdir (buf))
453 #endif
454     {
455       free (buf);
456       msg (SE, _("%s: Cannot create temporary directory: %s."),
457            buf, strerror (errno));
458       return 0;
459     }
460   *cp++ = DIR_SEPARATOR;
461
462   tmp_basename = buf;
463   tmp_extname = cp;
464
465   max_handles = MAX_FILE_HANDLES;
466
467   return 1;
468 }
469
470 /* Removes the directory created for temporary files, if one exists.
471    Also frees tmp_basename. */
472 static void
473 rmdir_temp_dir (void)
474 {
475   if (NULL == tmp_basename)
476     return;
477
478   tmp_extname[-1] = '\0';
479   if (rmdir (tmp_basename) == -1)
480     msg (SE, _("%s: Error removing directory for temporary files: %s."),
481          tmp_basename, strerror (errno));
482
483   free (tmp_basename);
484 }
485
486 /* Allocates room for lots of cases as a buffer. */
487 static int
488 allocate_cases (void)
489 {
490   /* This is the size of one case. */
491   const int case_size = (sizeof (struct repl_sel_tree)
492                          + (sizeof (union value)
493                             * (dict_get_value_cnt (default_dict) - 1))
494                          + sizeof (struct repl_sel_tree *));
495
496   x = NULL;
497
498   /* Allocate as many cases as we can, assuming a space of four
499      void pointers for malloc()'s internal bookkeeping. */
500   x_max = MAX_WORKSPACE / (case_size + 4 * sizeof (void *));
501   x = malloc (sizeof (struct repl_sel_tree *) * x_max);
502   if (x != NULL)
503     {
504       int i;
505
506       for (i = 0; i < x_max; i++)
507         {
508           x[i] = malloc (sizeof (struct repl_sel_tree)
509                          + (sizeof (union value)
510                             * (dict_get_value_cnt (default_dict) - 1)));
511           if (x[i] == NULL)
512             break;
513         }
514       x_max = i;
515     }
516   if (x == NULL || x_max < MIN_BUFFER_TOTAL_SIZE_RECS)
517     {
518       if (x != NULL)
519         {
520           int i;
521           
522           for (i = 0; i < x_max; i++)
523             free (x[i]);
524         }
525       free (x);
526       msg (SE, _("Out of memory.  Could not allocate room for minimum of %d "
527                  "cases of %d bytes each.  (PSPP workspace is currently "
528                  "restricted to a maximum of %d KB.)"),
529            MIN_BUFFER_TOTAL_SIZE_RECS, case_size, MAX_WORKSPACE / 1024);
530       x_max = 0;
531       x = NULL;
532       return 0;
533     }
534
535   /* The last element of the array is used to store lastkey. */
536   x_max--;
537
538   debug_printf ((_("allocated %d cases == %d bytes\n"),
539                  x_max, x_max * case_size));
540   return 1;
541 }
542 \f
543 /* Replacement selection. */
544
545 static int rmax, rc, rq;
546 static struct repl_sel_tree *q;
547 static union value *lastkey;
548 static int run_no, file_index;
549 static int deferred_abort;
550 static int run_length;
551
552 static int compare_record (union value *, union value *);
553
554 static inline void
555 output_record (union value *v)
556 {
557   union value *src_case;
558   
559   if (deferred_abort)
560     return;
561
562   if (compaction_necessary)
563     {
564       compact_case (compaction_case, (struct ccase *) v);
565       src_case = (union value *) compaction_case;
566     }
567   else
568     src_case = (union value *) v;
569
570   if ((int) fwrite (src_case, sizeof *src_case, compaction_nval,
571                     handle[file_index])
572       != compaction_nval)
573     {
574       deferred_abort = 1;
575       sprintf (tmp_extname, "%08x", run_no);
576       msg (SE, _("%s: Error writing temporary file: %s."),
577            tmp_basename, strerror (errno));
578       return;
579     }
580
581   run_length++;
582 }
583
584 static int
585 close_handle (int i)
586 {
587   int result = fclose (handle[i]);
588   msg (VM (2), _("SORT: Closing handle %d."), i);
589   
590   handle[i] = NULL;
591   if (EOF == result)
592     {
593       sprintf (tmp_extname, "%08x", i);
594       msg (SE, _("%s: Error closing temporary file: %s."),
595            tmp_basename, strerror (errno));
596       return 0;
597     }
598   return 1;
599 }
600
601 static int
602 close_handles (int beg, int end)
603 {
604   int success = 1;
605   int i;
606
607   for (i = beg; i < end; i++)
608     success &= close_handle (i);
609   return success;
610 }
611
612 static int
613 open_handle_w (int handle_no, int run_no)
614 {
615   sprintf (tmp_extname, "%08x", run_no);
616   msg (VM (1), _("SORT: %s: Opening for writing as run %d."),
617        tmp_basename, run_no);
618
619   /* The `x' modifier causes the GNU C library to insist on creating a
620      new file--if the file already exists, an error is signaled.  The
621      ANSI C standard says that other libraries should ignore anything
622      after the `w+b', so it shouldn't be a problem. */
623   return NULL != (handle[handle_no] = fopen (tmp_basename, "w+bx"));
624 }
625
626 static int
627 open_handle_r (int handle_no, int run_no)
628 {
629   FILE *f;
630
631   sprintf (tmp_extname, "%08x", run_no);
632   msg (VM (1), _("SORT: %s: Opening for writing as run %d."),
633        tmp_basename, run_no);
634   f = handle[handle_no] = fopen (tmp_basename, "rb");
635
636   if (f == NULL)
637     {
638       msg (SE, _("%s: Error opening temporary file for reading: %s."),
639            tmp_basename, strerror (errno));
640       return 0;
641     }
642   
643   return 1;
644 }
645
646 /* Begins a new initial run, specifically its output file. */
647 static void
648 begin_run (void)
649 {
650   /* Decide which handle[] to use.  If run_no is max_handles or
651      greater, then we've run out of handles so it's time to just do
652      one file at a time, which by default is handle 0. */
653   file_index = (run_no < max_handles ? run_no : 0);
654   run_length = 0;
655
656   /* Alright, now create the temporary file. */
657   if (open_handle_w (file_index, run_no) == 0)
658     {
659       /* Failure to create the temporary file.  Check if there are
660          unacceptably few files already open. */
661       if (file_index < 3)
662         {
663           deferred_abort = 1;
664           msg (SE, _("%s: Error creating temporary file: %s."),
665                tmp_basename, strerror (errno));
666           return;
667         }
668
669       /* Close all the open temporary files. */
670       if (!close_handles (0, file_index))
671         return;
672
673       /* Now try again to create the temporary file. */
674       max_handles = file_index;
675       file_index = 0;
676       if (open_handle_w (0, run_no) == 0)
677         {
678           /* It still failed, report it this time. */
679           deferred_abort = 1;
680           msg (SE, _("%s: Error creating temporary file: %s."),
681                tmp_basename, strerror (errno));
682           return;
683         }
684     }
685 }
686
687 /* Ends the current initial run.  Just increments run_no if no initial
688    run has been started yet. */
689 static void
690 end_run (void)
691 {
692   /* Close file handles if necessary. */
693   {
694     int result;
695
696     if (run_no == max_handles - 1)
697       result = close_handles (0, max_handles);
698     else if (run_no >= max_handles)
699       result = close_handle (0);
700     else
701       result = 1;
702     if (!result)
703       deferred_abort = 1;
704   }
705
706   /* Advance to next run. */
707   run_no++;
708   if (run_no)
709     heap_insert (huffman_queue, run_no - 1, run_length);
710 }
711
712 /* Performs 5.4.1R. */
713 static int
714 write_initial_runs (int separate)
715 {
716   run_no = -1;
717   deferred_abort = 0;
718
719   /* Steps R1, R2, R3. */
720   rmax = 0;
721   rc = 0;
722   lastkey = NULL;
723   q = x[0];
724   rq = 0;
725   {
726     int j;
727
728     for (j = 0; j < x_max; j++)
729       {
730         struct repl_sel_tree *J = x[j];
731
732         J->loser = J;
733         J->rn = 0;
734         J->fe = x[(x_max + j) / 2];
735         J->fi = x[j / 2];
736         memset (J->record, 0,
737                 dict_get_value_cnt (default_dict) * sizeof (union value));
738       }
739   }
740
741   /* Most of the iterations of steps R4, R5, R6, R7, R2, R3, ... */
742   if (!separate)
743     {
744       if (vfm_sink)
745         vfm_sink->destroy_sink ();
746       vfm_sink = &sort_stream;
747     }
748   procedure (NULL, NULL, NULL);
749
750   /* Final iterations of steps R4, R5, R6, R7, R2, R3, ... */
751   for (;;)
752     {
753       struct repl_sel_tree *t;
754
755       /* R4. */
756       rq = rmax + 1;
757
758       /* R5. */
759       t = q->fe;
760
761       /* R6 and R7. */
762       for (;;)
763         {
764           /* R6. */
765           if (t->rn < rq
766               || (t->rn == rq
767                   && compare_record (t->loser->record, q->record) < 0))
768             {
769               struct repl_sel_tree *temp_tree;
770               int temp_int;
771
772               temp_tree = t->loser;
773               t->loser = q;
774               q = temp_tree;
775
776               temp_int = t->rn;
777               t->rn = rq;
778               rq = temp_int;
779             }
780
781           /* R7. */
782           if (t == x[1])
783             break;
784           t = t->fi;
785         }
786
787       /* R2. */
788       if (rq != rc)
789         {
790           end_run ();
791           if (rq > rmax)
792             break;
793           begin_run ();
794           rc = rq;
795         }
796
797       /* R3. */
798       if (rq != 0)
799         {
800           output_record (q->record);
801           lastkey = x[x_max]->record;
802           memcpy (lastkey, q->record, sizeof (union value) * vfm_sink_info.nval);
803         }
804     }
805   assert (run_no == rmax);
806
807   /* If an unrecoverable error occurred somewhere in the above code,
808      then the `deferred_abort' flag would have been set.  */
809   if (deferred_abort)
810     {
811       int i;
812
813       for (i = 0; i < max_handles; i++)
814         if (handle[i] != NULL)
815           {
816             sprintf (tmp_extname, "%08x", i);
817
818             if (fclose (handle[i]) == EOF)
819               msg (SE, _("%s: Error closing temporary file: %s."),
820                    tmp_basename, strerror (errno));
821
822             if (remove (tmp_basename) != 0)
823               msg (SE, _("%s: Error removing temporary file: %s."),
824                    tmp_basename, strerror (errno));
825
826             handle[i] = NULL;
827           }
828       return 0;
829     }
830
831   return 1;
832 }
833
834 /* Compares the NV_SORT variables in V_SORT[] between the `value's at
835    A and B, and returns a strcmp()-type result. */
836 static int
837 compare_record (union value * a, union value * b)
838 {
839   int i;
840   int result = 0;
841   struct variable *v;
842
843   assert (a != NULL);
844   if (b == NULL)                /* Sort NULLs after everything else. */
845     return -1;
846
847   for (i = 0; i < nv_sort; i++)
848     {
849       v = v_sort[i];
850
851       if (v->type == NUMERIC)
852         {
853           if (approx_ne (a[v->fv].f, b[v->fv].f))
854             {
855               result = (a[v->fv].f > b[v->fv].f) ? 1 : -1;
856               break;
857             }
858         }
859       else
860         {
861           result = memcmp (a[v->fv].s, b[v->fv].s, v->width);
862           if (result != 0)
863             break;
864         }
865     }
866
867   if (v->p.srt.order == SRT_ASCEND)
868     return result;
869   else
870     {
871       assert (v->p.srt.order == SRT_DESCEND);
872       return -result;
873     }
874 }
875 \f
876 /* Merging. */
877
878 static int merge_once (int run_index[], int run_length[], int n_runs);
879
880 /* Modula function as defined by Knuth. */
881 static int
882 mod (int x, int y)
883 {
884   int result;
885
886   if (y == 0)
887     return x;
888   result = abs (x) % abs (y);
889   if (y < 0)
890     result = -result;
891   return result;
892 }
893
894 /* Performs a series of P-way merges of initial runs using Huffman's
895    method. */
896 static int
897 merge (void)
898 {
899   /* Order of merge. */
900   int order;
901
902   /* Idiot check. */
903   assert (MIN_BUFFER_SIZE_RECS * 2 <= MIN_BUFFER_TOTAL_SIZE_RECS - 1);
904
905   /* Close all the input files.  I hope that the boundary conditions
906      are correct on this but I'm not sure. */
907   if (run_no < max_handles)
908     {
909       int i;
910
911       for (i = 0; i < run_no; )
912         if (!close_handle (i++))
913           {
914             for (; i < run_no; i++)
915               close_handle (i);
916             return 0;
917           }
918     }
919
920   /* Determine order of merge. */
921   order = MAX_MERGE_ORDER;
922   if (x_max / order < MIN_BUFFER_SIZE_RECS)
923     order = x_max / MIN_BUFFER_SIZE_RECS;
924   else if (x_max / order * sizeof (union value) * dict_get_value_cnt (default_dict)
925            < MIN_BUFFER_SIZE_BYTES)
926     order = x_max / (MIN_BUFFER_SIZE_BYTES
927                      / (sizeof (union value)
928                         * (dict_get_value_cnt (default_dict) - 1)));
929
930   /* Make sure the order of merge is bounded. */
931   if (order < 2)
932     order = 2;
933   if (order > rmax)
934     order = rmax;
935   assert (x_max / order > 0);
936
937   /* Calculate number of records per buffer. */
938   records_per_buffer = x_max / order;
939
940   /* Add (1 - S) mod (P - 1) dummy runs of length 0. */
941   {
942     int n_dummy_runs = mod (1 - rmax, order - 1);
943     debug_printf (("rmax=%d, order=%d, n_dummy_runs=%d\n",
944                    rmax, order, n_dummy_runs));
945     assert (n_dummy_runs >= 0);
946     while (n_dummy_runs--)
947       {
948         heap_insert (huffman_queue, -2, 0);
949         rmax++;
950       }
951   }
952
953   /* Repeatedly merge the P shortest existing runs until only one run
954      is left. */
955   while (rmax > 1)
956     {
957       int run_index[MAX_MERGE_ORDER];
958       int run_length[MAX_MERGE_ORDER];
959       int total_run_length = 0;
960       int i;
961
962       assert (rmax >= order);
963
964       /* Find the shortest runs; put them in runs[] in reverse order
965          of length, to force dummy runs of length 0 to the end of the
966          list. */
967       debug_printf ((_("merging runs")));
968       for (i = order - 1; i >= 0; i--)
969         {
970           run_index[i] = heap_delete (huffman_queue, &run_length[i]);
971           assert (run_index[i] != -1);
972           total_run_length += run_length[i];
973           debug_printf ((" %d(%d)", run_index[i], run_length[i]));
974         }
975       debug_printf ((_(" into run %d(%d)\n"), run_no, total_run_length));
976
977       if (!merge_once (run_index, run_length, order))
978         {
979           int index;
980
981           while (-1 != (index = heap_delete (huffman_queue, NULL)))
982             {
983               sprintf (tmp_extname, "%08x", index);
984               if (remove (tmp_basename) != 0)
985                 msg (SE, _("%s: Error removing temporary file: %s."),
986                      tmp_basename, strerror (errno));
987             }
988
989           return 0;
990         }
991
992       if (!heap_insert (huffman_queue, run_no++, total_run_length))
993         {
994           msg (SE, _("Out of memory expanding Huffman priority queue."));
995           return 0;
996         }
997
998       rmax -= order - 1;
999     }
1000
1001   /* There should be exactly one element in the priority queue after
1002      all that merging.  This represents the entire sorted active file.
1003      So we could find a total case count by deleting this element from
1004      the queue. */
1005   assert (heap_size (huffman_queue) == 1);
1006
1007   return 1;
1008 }
1009
1010 /* Merges N_RUNS initial runs into a new run.  The jth run for 0 <= j
1011    < N_RUNS is taken from temporary file RUN_INDEX[j]; it is composed
1012    of RUN_LENGTH[j] cases. */
1013 static int
1014 merge_once (int run_index[], int run_length[], int n_runs)
1015 {
1016   /* For each run, the number of records remaining in its buffer. */
1017   int buffered[MAX_MERGE_ORDER];
1018
1019   /* For each run, the index of the next record in the buffer. */
1020   int buffer_ptr[MAX_MERGE_ORDER];
1021
1022   /* Open input files. */
1023   {
1024     int i;
1025
1026     for (i = 0; i < n_runs; i++)
1027       if (run_index[i] != -2 && !open_handle_r (i, run_index[i]))
1028         {
1029           /* Close and remove temporary files. */
1030           while (i--)
1031             {
1032               close_handle (i);
1033               sprintf (tmp_extname, "%08x", i);
1034               if (remove (tmp_basename) != 0)
1035                 msg (SE, _("%s: Error removing temporary file: %s."),
1036                      tmp_basename, strerror (errno));
1037             }
1038
1039           return 0;
1040         }
1041   }
1042
1043   /* Create output file. */
1044   if (!open_handle_w (N_INPUT_BUFFERS, run_no))
1045     {
1046       msg (SE, _("%s: Error creating temporary file for merge: %s."),
1047            tmp_basename, strerror (errno));
1048       goto lossage;
1049     }
1050
1051   /* Prime each buffer. */
1052   {
1053     int i;
1054
1055     for (i = 0; i < n_runs; i++)
1056       if (run_index[i] == -2)
1057         {
1058           n_runs = i;
1059           break;
1060         }
1061       else
1062         {
1063           int j;
1064           int ofs = records_per_buffer * i;
1065
1066           buffered[i] = min (records_per_buffer, run_length[i]);
1067           for (j = 0; j < buffered[i]; j++)
1068             if ((int) fread (x[j + ofs]->record, sizeof (union value),
1069                              dict_get_value_cnt (default_dict), handle[i])
1070                 != dict_get_value_cnt (default_dict))
1071               {
1072                 sprintf (tmp_extname, "%08x", run_index[i]);
1073                 if (ferror (handle[i]))
1074                   msg (SE, _("%s: Error reading temporary file in merge: %s."),
1075                        tmp_basename, strerror (errno));
1076                 else
1077                   msg (SE, _("%s: Unexpected end of temporary file in merge."),
1078                        tmp_basename);
1079                 goto lossage;
1080               }
1081           buffer_ptr[i] = ofs;
1082           run_length[i] -= buffered[i];
1083         }
1084   }
1085
1086   /* Perform the merge proper. */
1087   while (n_runs)                /* Loop while some data is left. */
1088     {
1089       int i;
1090       int min = 0;
1091
1092       for (i = 1; i < n_runs; i++)
1093         if (compare_record (x[buffer_ptr[min]]->record,
1094                             x[buffer_ptr[i]]->record) > 0)
1095           min = i;
1096
1097       if ((int) fwrite (x[buffer_ptr[min]]->record, sizeof (union value),
1098                         dict_get_value_cnt (default_dict),
1099                         handle[N_INPUT_BUFFERS])
1100           != dict_get_value_cnt (default_dict))
1101         {
1102           sprintf (tmp_extname, "%08x", run_index[i]);
1103           msg (SE, _("%s: Error writing temporary file in "
1104                "merge: %s."), tmp_basename, strerror (errno));
1105           goto lossage;
1106         }
1107
1108       /* Remove one case from the buffer for this input file. */
1109       if (--buffered[min] == 0)
1110         {
1111           /* The input buffer is empty.  Do any cases remain in the
1112              initial run on disk? */
1113           if (run_length[min])
1114             {
1115               /* Yes.  Read them in. */
1116
1117               int j;
1118               int ofs;
1119
1120               /* Reset the buffer pointer.  Note that we can't simply
1121                  set it to (i * records_per_buffer) since the run
1122                  order might have changed. */
1123               ofs = buffer_ptr[min] -= buffer_ptr[min] % records_per_buffer;
1124
1125               buffered[min] = min (records_per_buffer, run_length[min]);
1126               for (j = 0; j < buffered[min]; j++)
1127                 if ((int) fread (x[j + ofs]->record, sizeof (union value),
1128                                  dict_get_value_cnt (default_dict),
1129                                  handle[min])
1130                     != dict_get_value_cnt (default_dict))
1131                   {
1132                     sprintf (tmp_extname, "%08x", run_index[min]);
1133                     if (ferror (handle[min]))
1134                       msg (SE, _("%s: Error reading temporary file in "
1135                                  "merge: %s."),
1136                            tmp_basename, strerror (errno));
1137                     else
1138                       msg (SE, _("%s: Unexpected end of temporary file "
1139                                  "in merge."),
1140                            tmp_basename);
1141                     goto lossage;
1142                   }
1143               run_length[min] -= buffered[min];
1144             }
1145           else
1146             {
1147               /* No.  Delete this run. */
1148
1149               /* Close the file. */
1150               FILE *f = handle[min];
1151               handle[min] = NULL;
1152               sprintf (tmp_extname, "%08x", run_index[min]);
1153               if (fclose (f) == EOF)
1154                 msg (SE, _("%s: Error closing temporary file in merge: "
1155                      "%s."), tmp_basename, strerror (errno));
1156
1157               /* Delete the file. */
1158               if (remove (tmp_basename) != 0)
1159                 msg (SE, _("%s: Error removing temporary file in merge: "
1160                      "%s."), tmp_basename, strerror (errno));
1161
1162               n_runs--;
1163               if (min != n_runs)
1164                 {
1165                   /* Since this isn't the last run, we move the last
1166                      run into its spot to force all the runs to be
1167                      contiguous. */
1168                   run_index[min] = run_index[n_runs];
1169                   run_length[min] = run_length[n_runs];
1170                   buffer_ptr[min] = buffer_ptr[n_runs];
1171                   buffered[min] = buffered[n_runs];
1172                   handle[min] = handle[n_runs];
1173                 }
1174             }
1175         }
1176       else
1177         buffer_ptr[min]++;
1178     }
1179
1180   /* Close output file. */
1181   {
1182     FILE *f = handle[N_INPUT_BUFFERS];
1183     handle[N_INPUT_BUFFERS] = NULL;
1184     if (fclose (f) == EOF)
1185       {
1186         sprintf (tmp_extname, "%08x", run_no);
1187         msg (SE, _("%s: Error closing temporary file in merge: "
1188                    "%s."),
1189              tmp_basename, strerror (errno));
1190         return 0;
1191       }
1192   }
1193
1194   return 1;
1195
1196 lossage:
1197   /* Close all the input and output files. */
1198   {
1199     int i;
1200
1201     for (i = 0; i < n_runs; i++)
1202       if (run_length[i] != 0)
1203         {
1204           close_handle (i);
1205           sprintf (tmp_basename, "%08x", run_index[i]);
1206           if (remove (tmp_basename) != 0)
1207             msg (SE, _("%s: Error removing temporary file: %s."),
1208                  tmp_basename, strerror (errno));
1209         }
1210   }
1211   close_handle (N_INPUT_BUFFERS);
1212   sprintf (tmp_basename, "%08x", run_no);
1213   if (remove (tmp_basename) != 0)
1214     msg (SE, _("%s: Error removing temporary file: %s."),
1215          tmp_basename, strerror (errno));
1216   return 0;
1217 }
1218 \f
1219 /* External sort input program. */
1220
1221 /* Reads all the records from the source stream and passes them
1222    to write_case(). */
1223 static void
1224 sort_stream_read (void)
1225 {
1226   read_sort_output (write_case);
1227 }
1228
1229 /* Reads all the records from the output stream and passes them to the
1230    function provided, which must have an interface identical to
1231    write_case(). */
1232 void
1233 read_sort_output (int (*write_case) (void))
1234 {
1235   int i;
1236   FILE *f;
1237
1238   if (separate_case_tab)
1239     {
1240       struct ccase *save_temp_case = temp_case;
1241       struct case_list **p;
1242
1243       for (p = separate_case_tab; *p; p++)
1244         {
1245           temp_case = &(*p)->c;
1246           write_case ();
1247         }
1248       
1249       free (separate_case_tab);
1250       separate_case_tab = NULL;
1251             
1252       temp_case = save_temp_case;
1253     } else {
1254       sprintf (tmp_extname, "%08x", run_no - 1);
1255       f = fopen (tmp_basename, "rb");
1256       if (!f)
1257         {
1258           msg (ME, _("%s: Cannot open sort result file: %s."), tmp_basename,
1259                strerror (errno));
1260           err_failure ();
1261           return;
1262         }
1263
1264       for (i = 0; i < vfm_source_info.ncases; i++)
1265         {
1266           if (!fread (temp_case, vfm_source_info.case_size, 1, f))
1267             {
1268               if (ferror (f))
1269                 msg (ME, _("%s: Error reading sort result file: %s."),
1270                      tmp_basename, strerror (errno));
1271               else
1272                 msg (ME, _("%s: Unexpected end of sort result file: %s."),
1273                      tmp_basename, strerror (errno));
1274               err_failure ();
1275               break;
1276             }
1277
1278           if (!write_case ())
1279             break;
1280         }
1281
1282       if (fclose (f) == EOF)
1283         msg (ME, _("%s: Error closing sort result file: %s."), tmp_basename,
1284              strerror (errno));
1285
1286       if (remove (tmp_basename) != 0)
1287         msg (ME, _("%s: Error removing sort result file: %s."), tmp_basename,
1288              strerror (errno));
1289       else
1290         rmdir_temp_dir ();
1291     }
1292 }
1293
1294 #if 0 /* dead code */
1295 /* Alternate interface to sort_stream_write used for external sorting
1296    when SEPARATE is true. */
1297 static int
1298 write_separate (struct ccase *c)
1299 {
1300   assert (c == temp_case);
1301
1302   sort_stream_write ();
1303   return 1;
1304 }
1305 #endif
1306
1307 /* Performs one iteration of 5.4.1R steps R4, R5, R6, R7, R2, and
1308    R3. */
1309 static void
1310 sort_stream_write (void)
1311 {
1312   struct repl_sel_tree *t;
1313
1314   /* R4. */
1315   memcpy (q->record, temp_case->data, vfm_sink_info.case_size);
1316   if (compare_record (q->record, lastkey) < 0)
1317     if (++rq > rmax)
1318       rmax = rq;
1319
1320   /* R5. */
1321   t = q->fe;
1322
1323   /* R6 and R7. */
1324   for (;;)
1325     {
1326       /* R6. */
1327       if (t->rn < rq
1328           || (t->rn == rq && compare_record (t->loser->record, q->record) < 0))
1329         {
1330           struct repl_sel_tree *temp_tree;
1331           int temp_int;
1332
1333           temp_tree = t->loser;
1334           t->loser = q;
1335           q = temp_tree;
1336
1337           temp_int = t->rn;
1338           t->rn = rq;
1339           rq = temp_int;
1340         }
1341
1342       /* R7. */
1343       if (t == x[1])
1344         break;
1345       t = t->fi;
1346     }
1347
1348   /* R2. */
1349   if (rq != rc)
1350     {
1351       end_run ();
1352       begin_run ();
1353       assert (rq <= rmax);
1354       rc = rq;
1355     }
1356
1357   /* R3. */
1358   if (rq != 0)
1359     {
1360       output_record (q->record);
1361       lastkey = x[x_max]->record;
1362       memcpy (lastkey, q->record, vfm_sink_info.case_size);
1363     }
1364 }
1365
1366 /* Switches mode from sink to source. */
1367 static void
1368 sort_stream_mode (void)
1369 {
1370   /* If this is not done, then we get the following source/sink pairs:
1371      source=memory/disk/DATA LIST/etc., sink=SORT; source=SORT,
1372      sink=SORT; which is not good. */
1373   vfm_sink = NULL;
1374 }
1375
1376 struct case_stream sort_stream =
1377   {
1378     NULL,
1379     sort_stream_read,
1380     sort_stream_write,
1381     sort_stream_mode,
1382     NULL,
1383     NULL,
1384     "SORT",
1385   };