Added a --enable-debug option to configure and
[pspp-builds.git] / src / sort.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include <assert.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <errno.h>
25 #include "alloc.h"
26 #include "approx.h"
27 #include "command.h"
28 #include "error.h"
29 #include "expr.h"
30 #include "heap.h"
31 #include "lexer.h"
32 #include "misc.h"
33 #include "sort.h"
34 #include "str.h"
35 #include "var.h"
36 #include "vfm.h"
37 #include "vfmP.h"
38
39 #if HAVE_UNISTD_H
40 #include <unistd.h>
41 #endif
42
43 #if HAVE_SYS_TYPES_H
44 #include <sys/types.h>
45 #endif
46
47 #if HAVE_SYS_STAT_H
48 #include <sys/stat.h>
49 #endif
50
51 #include "debug-print.h"
52
53 /* Variables to sort. */
54 struct variable **v_sort;
55 int nv_sort;
56
57 /* Used when internal-sorting to a separate file. */
58 static struct case_list **separate_case_tab;
59
60 /* Exported by qsort.c. */
61 void blp_quicksort (void *pbase, size_t total_elems, size_t size,
62                     int (*cmp) (const void *, const void *),
63                     void *temp_buf);
64
65 /* Other prototypes. */
66 static int compare_case_lists (const void *, const void *);
67 static int do_internal_sort (int separate);
68 static int do_external_sort (int separate);
69 int parse_sort_variables (void);
70 void read_sort_output (int (*write_case) (void));
71
72 /* Performs the SORT CASES procedures. */
73 int
74 cmd_sort_cases (void)
75 {
76   /* First, just parse the command. */
77   lex_match_id ("SORT");
78   lex_match_id ("CASES");
79   lex_match (T_BY);
80
81   if (!parse_sort_variables ())
82     return CMD_FAILURE;
83       
84   cancel_temporary ();
85
86   /* Then it's time to do the actual work.  There are two cases:
87
88      (internal sort) All the data is in memory.  In this case, we
89      perform an EXECUTE to get the data into the desired form, then
90      sort the cases in place, if it is still all in memory.
91
92      (external sort) The data is not in memory.  It may be coming from
93      a system file or other data file, etc.  In any case, it is now
94      time to perform an external sort.  This is better explained in
95      do_external_sort(). */
96
97   /* Do all this dirty work. */
98   {
99     int success = sort_cases (0);
100     free (v_sort);
101     if (success)
102       return lex_end_of_command ();
103     else
104       return CMD_FAILURE;
105   }
106 }
107
108 /* Parses a list of sort variables into v_sort and nv_sort.  */
109 int
110 parse_sort_variables (void)
111 {
112   v_sort = NULL;
113   nv_sort = 0;
114   do
115     {
116       int prev_nv_sort = nv_sort;
117       int order = SRT_ASCEND;
118
119       if (!parse_variables (&default_dict, &v_sort, &nv_sort,
120                             PV_NO_DUPLICATE | PV_APPEND | PV_NO_SCRATCH))
121         return 0;
122       if (lex_match ('('))
123         {
124           if (lex_match_id ("D") || lex_match_id ("DOWN"))
125             order = SRT_DESCEND;
126           else if (!lex_match_id ("A") && !lex_match_id ("UP"))
127             {
128               free (v_sort);
129               msg (SE, _("`A' or `D' expected inside parentheses."));
130               return 0;
131             }
132           if (!lex_match (')'))
133             {
134               free (v_sort);
135               msg (SE, _("`)' expected."));
136               return 0;
137             }
138         }
139       for (; prev_nv_sort < nv_sort; prev_nv_sort++)
140         v_sort[prev_nv_sort]->p.srt.order = order;
141     }
142   while (token != '.' && token != '/');
143   
144   return 1;
145 }
146
147 /* Sorts the active file based on the key variables specified in
148    global variables v_sort and nv_sort.  The output is either to the
149    active file, if SEPARATE is zero, or to a separate file, if
150    SEPARATE is nonzero.  In the latter case the output cases can be
151    read with a call to read_sort_output().  (In the former case the
152    output cases should be dealt with through the usual vfm interface.)
153
154    The caller is responsible for freeing v_sort[]. */
155 int
156 sort_cases (int separate)
157 {
158   assert (separate_case_tab == NULL);
159
160   /* Not sure this is necessary but it's good to be safe. */
161   if (separate && vfm_source == &sort_stream)
162     procedure (NULL, NULL, NULL);
163   
164   /* SORT CASES cancels PROCESS IF. */
165   expr_free (process_if_expr);
166   process_if_expr = NULL;
167
168   if (do_internal_sort (separate))
169     return 1;
170
171   page_to_disk ();
172   return do_external_sort (separate);
173 }
174
175 /* If a reasonable situation is set up, do an internal sort of the
176    data.  Return success. */
177 static int
178 do_internal_sort (int separate)
179 {
180   if (vfm_source != &vfm_disk_stream)
181     {
182       if (vfm_source != &vfm_memory_stream)
183         procedure (NULL, NULL, NULL);
184       if (vfm_source == &vfm_memory_stream)
185         {
186           struct case_list **case_tab = malloc (sizeof *case_tab
187                                          * (vfm_source_info.ncases + 1));
188           if (vfm_source_info.ncases == 0)
189             {
190               free (case_tab);
191               return 1;
192             }
193           if (case_tab != NULL)
194             {
195               struct case_list *clp = memory_source_cases;
196               struct case_list **ctp = case_tab;
197               int i;
198
199               for (; clp; clp = clp->next)
200                 *ctp++ = clp;
201               qsort (case_tab, vfm_source_info.ncases, sizeof *case_tab,
202                      compare_case_lists);
203
204               if (!separate)
205                 {
206                   memory_source_cases = case_tab[0];
207                   for (i = 1; i < vfm_source_info.ncases; i++)
208                     case_tab[i - 1]->next = case_tab[i];
209                   case_tab[vfm_source_info.ncases - 1]->next = NULL;
210                   free (case_tab);
211                 } else {
212                   case_tab[vfm_source_info.ncases] = NULL;
213                   separate_case_tab = case_tab;
214                 }
215               
216               return 1;
217             }
218         }
219     }
220   return 0;
221 }
222
223 /* Compares the NV_SORT variables in V_SORT[] between the `case_list's
224    at _A and _B, and returns a strcmp()-type result. */
225 static int
226 compare_case_lists (const void *pa, const void *pb)
227 {
228   struct case_list *a = *(struct case_list **) pa;
229   struct case_list *b = *(struct case_list **) pb;
230   struct variable *v;
231   int result = 0;
232   int i;
233
234   for (i = 0; i < nv_sort; i++)
235     {
236       v = v_sort[i];
237       
238       if (v->type == NUMERIC)
239         {
240           if (approx_ne (a->c.data[v->fv].f, b->c.data[v->fv].f))
241             {
242               result = (a->c.data[v->fv].f > b->c.data[v->fv].f) ? 1 : -1;
243               break;
244             }
245         }
246       else
247         {
248           result = memcmp (a->c.data[v->fv].s, b->c.data[v->fv].s, v->width);
249           if (result != 0)
250             break;
251         }
252     }
253
254   if (v->p.srt.order == SRT_ASCEND)
255     return result;
256   else
257     {
258       assert (v->p.srt.order == SRT_DESCEND);
259       return -result;
260     }
261 }
262 \f
263 /* External sort. */
264
265 /* Maximum number of input + output file handles. */
266 #if defined FOPEN_MAX && (FOPEN_MAX - 5 < 18)
267 #define MAX_FILE_HANDLES        (FOPEN_MAX - 5)
268 #else
269 #define MAX_FILE_HANDLES        18
270 #endif
271
272 #if MAX_FILE_HANDLES < 3
273 #error At least 3 file handles must be available for sorting.
274 #endif
275
276 /* Number of input buffers. */
277 #define N_INPUT_BUFFERS         (MAX_FILE_HANDLES - 1)
278
279 /* Maximum order of merge.  This is the value suggested by Knuth;
280    specifically, he said to use tree selection, which we don't
281    implement, for larger orders of merge. */
282 #define MAX_MERGE_ORDER         7
283
284 /* Minimum total number of records for buffers. */
285 #define MIN_BUFFER_TOTAL_SIZE_RECS      64
286
287 /* Minimum single input or output buffer size, in bytes and records. */
288 #define MIN_BUFFER_SIZE_BYTES   4096
289 #define MIN_BUFFER_SIZE_RECS    16
290
291 /* Structure for replacement selection tree. */
292 struct repl_sel_tree
293   {
294     struct repl_sel_tree *loser;/* Loser associated w/this internal node. */
295     int rn;                     /* Run number of `loser'. */
296     struct repl_sel_tree *fe;   /* Internal node above this external node. */
297     struct repl_sel_tree *fi;   /* Internal node above this internal node. */
298     union value record[1];      /* The case proper. */
299   };
300
301 /* Static variables used for sorting. */
302 static struct repl_sel_tree **x; /* Buffers. */
303 static int x_max;               /* Size of buffers, in records. */
304 static int records_per_buffer;  /* Number of records in each buffer. */
305
306 /* In the merge phase, the first N_INPUT_BUFFERS handle[] elements are
307    input files and the last element is the output file.  Before that,
308    they're all used as output files, although the last one is
309    segregated. */
310 static FILE *handle[MAX_FILE_HANDLES];  /* File handles. */
311
312 /* Now, MAX_FILE_HANDLES is the maximum number of files we will *try*
313    to open.  But if we can't open that many, max_handles will be set
314    to the number we apparently can open. */
315 static int max_handles;         /* Maximum number of handles. */
316
317 /* When we create temporary files, they are all put in the same
318    directory and numbered sequentially from zero.  tmp_basename is the
319    drive/directory, etc., and tmp_extname can be sprintf() with "%08x"
320    to the file number, then tmp_basename used to open the file. */
321 static char *tmp_basename;      /* Temporary file basename. */
322 static char *tmp_extname;       /* Temporary file extension name. */
323
324 /* We use Huffman's method to determine the merge pattern.  This means
325    that we need to know which runs are the shortest at any given time.
326    Priority queues as implemented by heap.c are a natural for this
327    task (probably because I wrote the code specifically for it). */
328 static struct heap *huffman_queue;      /* Huffman priority queue. */
329
330 /* Prototypes for helper functions. */
331 static void sort_stream_write (void);
332 static int write_initial_runs (int separate);
333 static int allocate_cases (void);
334 static int allocate_file_handles (void);
335 static int merge (void);
336 static void rmdir_temp_dir (void);
337
338 /* Performs an external sort of the active file.  A description of the
339    procedure follows.  All page references refer to Knuth's _Art of
340    Computer Programming, Vol. 3: Sorting and Searching_, which is the
341    canonical resource for sorting.
342
343    1. The data is read and S initial runs are formed through the
344    action of algorithm 5.4.1R (replacement selection).
345
346    2. Huffman's method (p. 365-366) is used to determine the optimum
347    merge pattern.
348
349    3. If an OS that supports overlapped reading, writing, and
350    computing is being run, we should use 5.4.6F for forecasting.
351    Otherwise, buffers are filled only when they run out of data.
352    FIXME: Since the author of PSPP uses GNU/Linux, which does not
353    yet implement overlapped r/w/c, 5.4.6F is not used.
354
355    4. We perform P-way merges:
356
357    (a) The desired P is the smallest P such that ceil(ln(S)/ln(P))
358    is minimized.  (FIXME: Since I don't have an algorithm for
359    minimizing this, it's just set to MAX_MERGE_ORDER.)
360
361    (b) P is reduced if the selected value would make input buffers
362    less than 4096 bytes each, or 16 records, whichever is larger.
363
364    (c) P is reduced if we run out of available file handles or space
365    for file handles.
366
367    (d) P is reduced if we don't have space for one or two output
368    buffers, which have the same minimum size as input buffers.  (We
369    need two output buffers if 5.4.6F is in use for forecasting.)  */
370 static int
371 do_external_sort (int separate)
372 {
373   int success = 0;
374
375   assert (MAX_FILE_HANDLES >= 3);
376
377   x = NULL;
378   tmp_basename = NULL;
379
380   huffman_queue = heap_create (512);
381   if (huffman_queue == NULL)
382     return 0;
383
384   if (!allocate_cases ())
385     goto lossage;
386
387   if (!allocate_file_handles ())
388     goto lossage;
389
390   if (!write_initial_runs (separate))
391     goto lossage;
392
393   merge ();
394
395   success = 1;
396
397   /* Despite the name, flow of control comes here regardless of
398      whether or not the sort is successful. */
399 lossage:
400   heap_destroy (huffman_queue);
401
402   if (x)
403     {
404       int i;
405
406       for (i = 0; i <= x_max; i++)
407         free (x[i]);
408       free (x);
409     }
410
411   if (!success)
412     rmdir_temp_dir ();
413
414   return success;
415 }
416
417 #if !HAVE_GETPID
418 #define getpid() (0)
419 #endif
420
421 /* Sets up to open temporary files. */
422 /* PORTME: This creates a directory for temporary files.  Some OSes
423    might not have that concept... */
424 static int
425 allocate_file_handles (void)
426 {
427   const char *dir;              /* Directory prefix. */
428   char *buf;                    /* String buffer. */
429   char *cp;                     /* Pointer into buf. */
430
431   dir = getenv ("SPSSTMPDIR");
432   if (dir == NULL)
433     dir = getenv ("SPSSXTMPDIR");
434   if (dir == NULL)
435     dir = getenv ("TMPDIR");
436 #ifdef P_tmpdir
437   if (dir == NULL)
438     dir = P_tmpdir;
439 #endif
440 #if __unix__
441   if (dir == NULL)
442     dir = "/tmp";
443 #elif __MSDOS__
444   if (dir == NULL)
445     dir = getenv ("TEMP");
446   if (dir == NULL)
447     dir = getenv ("TMP");
448   if (dir == NULL)
449     dir = "\\";
450 #else
451   dir = "";
452 #endif
453
454   buf = xmalloc (strlen (dir) + 1 + 4 + 8 + 4 + 1 + INT_DIGITS + 1);
455   cp = spprintf (buf, "%s%c%04lX%04lXpspp", dir, DIR_SEPARATOR,
456                  ((long) time (0)) & 0xffff, ((long) getpid ()) & 0xffff);
457   if (-1 == mkdir (buf, S_IRWXU))
458     {
459       free (buf);
460       msg (SE, _("%s: Cannot create temporary directory: %s."),
461            buf, strerror (errno));
462       return 0;
463     }
464   *cp++ = DIR_SEPARATOR;
465
466   tmp_basename = buf;
467   tmp_extname = cp;
468
469   max_handles = MAX_FILE_HANDLES;
470
471   return 1;
472 }
473
474 /* Removes the directory created for temporary files, if one exists.
475    Also frees tmp_basename. */
476 static void
477 rmdir_temp_dir (void)
478 {
479   if (NULL == tmp_basename)
480     return;
481
482   tmp_extname[-1] = '\0';
483   if (rmdir (tmp_basename) == -1)
484     msg (SE, _("%s: Error removing directory for temporary files: %s."),
485          tmp_basename, strerror (errno));
486
487   free (tmp_basename);
488 }
489
490 /* Allocates room for lots of cases as a buffer. */
491 static int
492 allocate_cases (void)
493 {
494   /* This is the size of one case. */
495   const int case_size = (sizeof (struct repl_sel_tree)
496                          + sizeof (union value) * (default_dict.nval - 1)
497                          + sizeof (struct repl_sel_tree *));
498
499   x = NULL;
500
501   /* Allocate as many cases as we can, assuming a space of four
502      void pointers for malloc()'s internal bookkeeping. */
503   x_max = MAX_WORKSPACE / (case_size + 4 * sizeof (void *));
504   x = malloc (sizeof (struct repl_sel_tree *) * x_max);
505   if (x != NULL)
506     {
507       int i;
508
509       for (i = 0; i < x_max; i++)
510         {
511           x[i] = malloc (sizeof (struct repl_sel_tree)
512                          + sizeof (union value) * (default_dict.nval - 1));
513           if (x[i] == NULL)
514             break;
515         }
516       x_max = i;
517     }
518   if (x == NULL || x_max < MIN_BUFFER_TOTAL_SIZE_RECS)
519     {
520       if (x != NULL)
521         {
522           int i;
523           
524           for (i = 0; i < x_max; i++)
525             free (x[i]);
526         }
527       free (x);
528       msg (SE, _("Out of memory.  Could not allocate room for minimum of %d "
529                  "cases of %d bytes each.  (PSPP workspace is currently "
530                  "restricted to a maximum of %d KB.)"),
531            MIN_BUFFER_TOTAL_SIZE_RECS, case_size, MAX_WORKSPACE / 1024);
532       x_max = 0;
533       x = NULL;
534       return 0;
535     }
536
537   /* The last element of the array is used to store lastkey. */
538   x_max--;
539
540   debug_printf ((_("allocated %d cases == %d bytes\n"),
541                  x_max, x_max * case_size));
542   return 1;
543 }
544 \f
545 /* Replacement selection. */
546
547 static int rmax, rc, rq;
548 static struct repl_sel_tree *q;
549 static union value *lastkey;
550 static int run_no, file_index;
551 static int deferred_abort;
552 static int run_length;
553
554 static int compare_record (union value *, union value *);
555
556 static inline void
557 output_record (union value *v)
558 {
559   union value *src_case;
560   
561   if (deferred_abort)
562     return;
563
564   if (compaction_necessary)
565     {
566       compact_case (compaction_case, (struct ccase *) v);
567       src_case = (union value *) compaction_case;
568     }
569   else
570     src_case = (union value *) v;
571
572   if ((int) fwrite (src_case, sizeof *src_case, compaction_nval,
573                     handle[file_index])
574       != compaction_nval)
575     {
576       deferred_abort = 1;
577       sprintf (tmp_extname, "%08x", run_no);
578       msg (SE, _("%s: Error writing temporary file: %s."),
579            tmp_basename, strerror (errno));
580       return;
581     }
582
583   run_length++;
584 }
585
586 static int
587 close_handle (int i)
588 {
589   int result = fclose (handle[i]);
590   msg (VM (2), _("SORT: Closing handle %d."), i);
591   
592   handle[i] = NULL;
593   if (EOF == result)
594     {
595       sprintf (tmp_extname, "%08x", i);
596       msg (SE, _("%s: Error closing temporary file: %s."),
597            tmp_basename, strerror (errno));
598       return 0;
599     }
600   return 1;
601 }
602
603 static int
604 close_handles (int beg, int end)
605 {
606   int success = 1;
607   int i;
608
609   for (i = beg; i < end; i++)
610     success &= close_handle (i);
611   return success;
612 }
613
614 static int
615 open_handle_w (int handle_no, int run_no)
616 {
617   sprintf (tmp_extname, "%08x", run_no);
618   msg (VM (1), _("SORT: %s: Opening for writing as run %d."),
619        tmp_basename, run_no);
620
621   /* The `x' modifier causes the GNU C library to insist on creating a
622      new file--if the file already exists, an error is signaled.  The
623      ANSI C standard says that other libraries should ignore anything
624      after the `w+b', so it shouldn't be a problem. */
625   return NULL != (handle[handle_no] = fopen (tmp_basename, "w+bx"));
626 }
627
628 static int
629 open_handle_r (int handle_no, int run_no)
630 {
631   FILE *f;
632
633   sprintf (tmp_extname, "%08x", run_no);
634   msg (VM (1), _("SORT: %s: Opening for writing as run %d."),
635        tmp_basename, run_no);
636   f = handle[handle_no] = fopen (tmp_basename, "rb");
637
638   if (f == NULL)
639     {
640       msg (SE, _("%s: Error opening temporary file for reading: %s."),
641            tmp_basename, strerror (errno));
642       return 0;
643     }
644   
645   return 1;
646 }
647
648 /* Begins a new initial run, specifically its output file. */
649 static void
650 begin_run (void)
651 {
652   /* Decide which handle[] to use.  If run_no is max_handles or
653      greater, then we've run out of handles so it's time to just do
654      one file at a time, which by default is handle 0. */
655   file_index = (run_no < max_handles ? run_no : 0);
656   run_length = 0;
657
658   /* Alright, now create the temporary file. */
659   if (open_handle_w (file_index, run_no) == 0)
660     {
661       /* Failure to create the temporary file.  Check if there are
662          unacceptably few files already open. */
663       if (file_index < 3)
664         {
665           deferred_abort = 1;
666           msg (SE, _("%s: Error creating temporary file: %s."),
667                tmp_basename, strerror (errno));
668           return;
669         }
670
671       /* Close all the open temporary files. */
672       if (!close_handles (0, file_index))
673         return;
674
675       /* Now try again to create the temporary file. */
676       max_handles = file_index;
677       file_index = 0;
678       if (open_handle_w (0, run_no) == 0)
679         {
680           /* It still failed, report it this time. */
681           deferred_abort = 1;
682           msg (SE, _("%s: Error creating temporary file: %s."),
683                tmp_basename, strerror (errno));
684           return;
685         }
686     }
687 }
688
689 /* Ends the current initial run.  Just increments run_no if no initial
690    run has been started yet. */
691 static void
692 end_run (void)
693 {
694   /* Close file handles if necessary. */
695   {
696     int result;
697
698     if (run_no == max_handles - 1)
699       result = close_handles (0, max_handles);
700     else if (run_no >= max_handles)
701       result = close_handle (0);
702     else
703       result = 1;
704     if (!result)
705       deferred_abort = 1;
706   }
707
708   /* Advance to next run. */
709   run_no++;
710   if (run_no)
711     heap_insert (huffman_queue, run_no - 1, run_length);
712 }
713
714 /* Performs 5.4.1R. */
715 static int
716 write_initial_runs (int separate)
717 {
718   run_no = -1;
719   deferred_abort = 0;
720
721   /* Steps R1, R2, R3. */
722   rmax = 0;
723   rc = 0;
724   lastkey = NULL;
725   q = x[0];
726   rq = 0;
727   {
728     int j;
729
730     for (j = 0; j < x_max; j++)
731       {
732         struct repl_sel_tree *J = x[j];
733
734         J->loser = J;
735         J->rn = 0;
736         J->fe = x[(x_max + j) / 2];
737         J->fi = x[j / 2];
738         memset (J->record, 0, default_dict.nval * sizeof (union value));
739       }
740   }
741
742   /* Most of the iterations of steps R4, R5, R6, R7, R2, R3, ... */
743   if (!separate)
744     {
745       if (vfm_sink)
746         vfm_sink->destroy_sink ();
747       vfm_sink = &sort_stream;
748     }
749   procedure (NULL, NULL, NULL);
750
751   /* Final iterations of steps R4, R5, R6, R7, R2, R3, ... */
752   for (;;)
753     {
754       struct repl_sel_tree *t;
755
756       /* R4. */
757       rq = rmax + 1;
758
759       /* R5. */
760       t = q->fe;
761
762       /* R6 and R7. */
763       for (;;)
764         {
765           /* R6. */
766           if (t->rn < rq
767               || (t->rn == rq
768                   && compare_record (t->loser->record, q->record) < 0))
769             {
770               struct repl_sel_tree *temp_tree;
771               int temp_int;
772
773               temp_tree = t->loser;
774               t->loser = q;
775               q = temp_tree;
776
777               temp_int = t->rn;
778               t->rn = rq;
779               rq = temp_int;
780             }
781
782           /* R7. */
783           if (t == x[1])
784             break;
785           t = t->fi;
786         }
787
788       /* R2. */
789       if (rq != rc)
790         {
791           end_run ();
792           if (rq > rmax)
793             break;
794           begin_run ();
795           rc = rq;
796         }
797
798       /* R3. */
799       if (rq != 0)
800         {
801           output_record (q->record);
802           lastkey = x[x_max]->record;
803           memcpy (lastkey, q->record, sizeof (union value) * vfm_sink_info.nval);
804         }
805     }
806   assert (run_no == rmax);
807
808   /* If an unrecoverable error occurred somewhere in the above code,
809      then the `deferred_abort' flag would have been set.  */
810   if (deferred_abort)
811     {
812       int i;
813
814       for (i = 0; i < max_handles; i++)
815         if (handle[i] != NULL)
816           {
817             sprintf (tmp_extname, "%08x", i);
818
819             if (fclose (handle[i]) == EOF)
820               msg (SE, _("%s: Error closing temporary file: %s."),
821                    tmp_basename, strerror (errno));
822
823             if (remove (tmp_basename) != 0)
824               msg (SE, _("%s: Error removing temporary file: %s."),
825                    tmp_basename, strerror (errno));
826
827             handle[i] = NULL;
828           }
829       return 0;
830     }
831
832   return 1;
833 }
834
835 /* Compares the NV_SORT variables in V_SORT[] between the `value's at
836    A and B, and returns a strcmp()-type result. */
837 static int
838 compare_record (union value * a, union value * b)
839 {
840   int i;
841   int result = 0;
842   struct variable *v;
843
844   assert (a != NULL);
845   if (b == NULL)                /* Sort NULLs after everything else. */
846     return -1;
847
848   for (i = 0; i < nv_sort; i++)
849     {
850       v = v_sort[i];
851
852       if (v->type == NUMERIC)
853         {
854           if (approx_ne (a[v->fv].f, b[v->fv].f))
855             {
856               result = (a[v->fv].f > b[v->fv].f) ? 1 : -1;
857               break;
858             }
859         }
860       else
861         {
862           result = memcmp (a[v->fv].s, b[v->fv].s, v->width);
863           if (result != 0)
864             break;
865         }
866     }
867
868   if (v->p.srt.order == SRT_ASCEND)
869     return result;
870   else
871     {
872       assert (v->p.srt.order == SRT_DESCEND);
873       return -result;
874     }
875 }
876 \f
877 /* Merging. */
878
879 static int merge_once (int run_index[], int run_length[], int n_runs);
880
881 /* Modula function as defined by Knuth. */
882 static int
883 mod (int x, int y)
884 {
885   int result;
886
887   if (y == 0)
888     return x;
889   result = abs (x) % abs (y);
890   if (y < 0)
891     result = -result;
892   return result;
893 }
894
895 /* Performs a series of P-way merges of initial runs using Huffman's
896    method. */
897 static int
898 merge (void)
899 {
900   /* Order of merge. */
901   int order;
902
903   /* Idiot check. */
904   assert (MIN_BUFFER_SIZE_RECS * 2 <= MIN_BUFFER_TOTAL_SIZE_RECS - 1);
905
906   /* Close all the input files.  I hope that the boundary conditions
907      are correct on this but I'm not sure. */
908   if (run_no < max_handles)
909     {
910       int i;
911
912       for (i = 0; i < run_no; )
913         if (!close_handle (i++))
914           {
915             for (; i < run_no; i++)
916               close_handle (i);
917             return 0;
918           }
919     }
920
921   /* Determine order of merge. */
922   order = MAX_MERGE_ORDER;
923   if (x_max / order < MIN_BUFFER_SIZE_RECS)
924     order = x_max / MIN_BUFFER_SIZE_RECS;
925   else if (x_max / order * sizeof (union value) * default_dict.nval
926            < MIN_BUFFER_SIZE_BYTES)
927     order = x_max / (MIN_BUFFER_SIZE_BYTES
928                      / (sizeof (union value) * (default_dict.nval - 1)));
929
930   /* Make sure the order of merge is bounded. */
931   if (order < 2)
932     order = 2;
933   if (order > rmax)
934     order = rmax;
935   assert (x_max / order > 0);
936
937   /* Calculate number of records per buffer. */
938   records_per_buffer = x_max / order;
939
940   /* Add (1 - S) mod (P - 1) dummy runs of length 0. */
941   {
942     int n_dummy_runs = mod (1 - rmax, order - 1);
943     debug_printf (("rmax=%d, order=%d, n_dummy_runs=%d\n",
944                    rmax, order, n_dummy_runs));
945     assert (n_dummy_runs >= 0);
946     while (n_dummy_runs--)
947       {
948         heap_insert (huffman_queue, -2, 0);
949         rmax++;
950       }
951   }
952
953   /* Repeatedly merge the P shortest existing runs until only one run
954      is left. */
955   while (rmax > 1)
956     {
957       int run_index[MAX_MERGE_ORDER];
958       int run_length[MAX_MERGE_ORDER];
959       int total_run_length = 0;
960       int i;
961
962       assert (rmax >= order);
963
964       /* Find the shortest runs; put them in runs[] in reverse order
965          of length, to force dummy runs of length 0 to the end of the
966          list. */
967       debug_printf ((_("merging runs")));
968       for (i = order - 1; i >= 0; i--)
969         {
970           run_index[i] = heap_delete (huffman_queue, &run_length[i]);
971           assert (run_index[i] != -1);
972           total_run_length += run_length[i];
973           debug_printf ((" %d(%d)", run_index[i], run_length[i]));
974         }
975       debug_printf ((_(" into run %d(%d)\n"), run_no, total_run_length));
976
977       if (!merge_once (run_index, run_length, order))
978         {
979           int index;
980
981           while (-1 != (index = heap_delete (huffman_queue, NULL)))
982             {
983               sprintf (tmp_extname, "%08x", index);
984               if (remove (tmp_basename) != 0)
985                 msg (SE, _("%s: Error removing temporary file: %s."),
986                      tmp_basename, strerror (errno));
987             }
988
989           return 0;
990         }
991
992       if (!heap_insert (huffman_queue, run_no++, total_run_length))
993         {
994           msg (SE, _("Out of memory expanding Huffman priority queue."));
995           return 0;
996         }
997
998       rmax -= order - 1;
999     }
1000
1001   /* There should be exactly one element in the priority queue after
1002      all that merging.  This represents the entire sorted active file.
1003      So we could find a total case count by deleting this element from
1004      the queue. */
1005   assert (heap_size (huffman_queue) == 1);
1006
1007   return 1;
1008 }
1009
1010 /* Merges N_RUNS initial runs into a new run.  The jth run for 0 <= j
1011    < N_RUNS is taken from temporary file RUN_INDEX[j]; it is composed
1012    of RUN_LENGTH[j] cases. */
1013 static int
1014 merge_once (int run_index[], int run_length[], int n_runs)
1015 {
1016   /* For each run, the number of records remaining in its buffer. */
1017   int buffered[MAX_MERGE_ORDER];
1018
1019   /* For each run, the index of the next record in the buffer. */
1020   int buffer_ptr[MAX_MERGE_ORDER];
1021
1022   /* Open input files. */
1023   {
1024     int i;
1025
1026     for (i = 0; i < n_runs; i++)
1027       if (run_index[i] != -2 && !open_handle_r (i, run_index[i]))
1028         {
1029           /* Close and remove temporary files. */
1030           while (i--)
1031             {
1032               close_handle (i);
1033               sprintf (tmp_extname, "%08x", i);
1034               if (remove (tmp_basename) != 0)
1035                 msg (SE, _("%s: Error removing temporary file: %s."),
1036                      tmp_basename, strerror (errno));
1037             }
1038
1039           return 0;
1040         }
1041   }
1042
1043   /* Create output file. */
1044   if (!open_handle_w (N_INPUT_BUFFERS, run_no))
1045     {
1046       msg (SE, _("%s: Error creating temporary file for merge: %s."),
1047            tmp_basename, strerror (errno));
1048       goto lossage;
1049     }
1050
1051   /* Prime each buffer. */
1052   {
1053     int i;
1054
1055     for (i = 0; i < n_runs; i++)
1056       if (run_index[i] == -2)
1057         {
1058           n_runs = i;
1059           break;
1060         }
1061       else
1062         {
1063           int j;
1064           int ofs = records_per_buffer * i;
1065
1066           buffered[i] = min (records_per_buffer, run_length[i]);
1067           for (j = 0; j < buffered[i]; j++)
1068             if ((int) fread (x[j + ofs]->record, sizeof (union value),
1069                              default_dict.nval, handle[i])
1070                 != default_dict.nval)
1071               {
1072                 sprintf (tmp_extname, "%08x", run_index[i]);
1073                 if (ferror (handle[i]))
1074                   msg (SE, _("%s: Error reading temporary file in merge: %s."),
1075                        tmp_basename, strerror (errno));
1076                 else
1077                   msg (SE, _("%s: Unexpected end of temporary file in merge."),
1078                        tmp_basename);
1079                 goto lossage;
1080               }
1081           buffer_ptr[i] = ofs;
1082           run_length[i] -= buffered[i];
1083         }
1084   }
1085
1086   /* Perform the merge proper. */
1087   while (n_runs)                /* Loop while some data is left. */
1088     {
1089       int i;
1090       int min = 0;
1091
1092       for (i = 1; i < n_runs; i++)
1093         if (compare_record (x[buffer_ptr[min]]->record,
1094                             x[buffer_ptr[i]]->record) > 0)
1095           min = i;
1096
1097       if ((int) fwrite (x[buffer_ptr[min]]->record, sizeof (union value),
1098                         default_dict.nval, handle[N_INPUT_BUFFERS])
1099           != default_dict.nval)
1100         {
1101           sprintf (tmp_extname, "%08x", run_index[i]);
1102           msg (SE, _("%s: Error writing temporary file in "
1103                "merge: %s."), tmp_basename, strerror (errno));
1104           goto lossage;
1105         }
1106
1107       /* Remove one case from the buffer for this input file. */
1108       if (--buffered[min] == 0)
1109         {
1110           /* The input buffer is empty.  Do any cases remain in the
1111              initial run on disk? */
1112           if (run_length[min])
1113             {
1114               /* Yes.  Read them in. */
1115
1116               int j;
1117               int ofs;
1118
1119               /* Reset the buffer pointer.  Note that we can't simply
1120                  set it to (i * records_per_buffer) since the run
1121                  order might have changed. */
1122               ofs = buffer_ptr[min] -= buffer_ptr[min] % records_per_buffer;
1123
1124               buffered[min] = min (records_per_buffer, run_length[min]);
1125               for (j = 0; j < buffered[min]; j++)
1126                 if ((int) fread (x[j + ofs]->record, sizeof (union value),
1127                                  default_dict.nval, handle[min])
1128                     != default_dict.nval)
1129                   {
1130                     sprintf (tmp_extname, "%08x", run_index[min]);
1131                     if (ferror (handle[min]))
1132                       msg (SE, _("%s: Error reading temporary file in "
1133                                  "merge: %s."),
1134                            tmp_basename, strerror (errno));
1135                     else
1136                       msg (SE, _("%s: Unexpected end of temporary file "
1137                                  "in merge."),
1138                            tmp_basename);
1139                     goto lossage;
1140                   }
1141               run_length[min] -= buffered[min];
1142             }
1143           else
1144             {
1145               /* No.  Delete this run. */
1146
1147               /* Close the file. */
1148               FILE *f = handle[min];
1149               handle[min] = NULL;
1150               sprintf (tmp_extname, "%08x", run_index[min]);
1151               if (fclose (f) == EOF)
1152                 msg (SE, _("%s: Error closing temporary file in merge: "
1153                      "%s."), tmp_basename, strerror (errno));
1154
1155               /* Delete the file. */
1156               if (remove (tmp_basename) != 0)
1157                 msg (SE, _("%s: Error removing temporary file in merge: "
1158                      "%s."), tmp_basename, strerror (errno));
1159
1160               n_runs--;
1161               if (min != n_runs)
1162                 {
1163                   /* Since this isn't the last run, we move the last
1164                      run into its spot to force all the runs to be
1165                      contiguous. */
1166                   run_index[min] = run_index[n_runs];
1167                   run_length[min] = run_length[n_runs];
1168                   buffer_ptr[min] = buffer_ptr[n_runs];
1169                   buffered[min] = buffered[n_runs];
1170                   handle[min] = handle[n_runs];
1171                 }
1172             }
1173         }
1174       else
1175         buffer_ptr[min]++;
1176     }
1177
1178   /* Close output file. */
1179   {
1180     FILE *f = handle[N_INPUT_BUFFERS];
1181     handle[N_INPUT_BUFFERS] = NULL;
1182     if (fclose (f) == EOF)
1183       {
1184         sprintf (tmp_extname, "%08x", run_no);
1185         msg (SE, _("%s: Error closing temporary file in merge: "
1186                    "%s."),
1187              tmp_basename, strerror (errno));
1188         return 0;
1189       }
1190   }
1191
1192   return 1;
1193
1194 lossage:
1195   /* Close all the input and output files. */
1196   {
1197     int i;
1198
1199     for (i = 0; i < n_runs; i++)
1200       if (run_length[i] != 0)
1201         {
1202           close_handle (i);
1203           sprintf (tmp_basename, "%08x", run_index[i]);
1204           if (remove (tmp_basename) != 0)
1205             msg (SE, _("%s: Error removing temporary file: %s."),
1206                  tmp_basename, strerror (errno));
1207         }
1208   }
1209   close_handle (N_INPUT_BUFFERS);
1210   sprintf (tmp_basename, "%08x", run_no);
1211   if (remove (tmp_basename) != 0)
1212     msg (SE, _("%s: Error removing temporary file: %s."),
1213          tmp_basename, strerror (errno));
1214   return 0;
1215 }
1216 \f
1217 /* External sort input program. */
1218
1219 /* Reads all the records from the source stream and passes them
1220    to write_case(). */
1221 void
1222 sort_stream_read (void)
1223 {
1224   read_sort_output (write_case);
1225 }
1226
1227 /* Reads all the records from the output stream and passes them to the
1228    function provided, which must have an interface identical to
1229    write_case(). */
1230 void
1231 read_sort_output (int (*write_case) (void))
1232 {
1233   int i;
1234   FILE *f;
1235
1236   if (separate_case_tab)
1237     {
1238       struct ccase *save_temp_case = temp_case;
1239       struct case_list **p;
1240
1241       for (p = separate_case_tab; *p; p++)
1242         {
1243           temp_case = &(*p)->c;
1244           write_case ();
1245         }
1246       
1247       free (separate_case_tab);
1248       separate_case_tab = NULL;
1249             
1250       temp_case = save_temp_case;
1251     } else {
1252       sprintf (tmp_extname, "%08x", run_no - 1);
1253       f = fopen (tmp_basename, "rb");
1254       if (!f)
1255         {
1256           msg (ME, _("%s: Cannot open sort result file: %s."), tmp_basename,
1257                strerror (errno));
1258           err_failure ();
1259           return;
1260         }
1261
1262       for (i = 0; i < vfm_source_info.ncases; i++)
1263         {
1264           if (!fread (temp_case, vfm_source_info.case_size, 1, f))
1265             {
1266               if (ferror (f))
1267                 msg (ME, _("%s: Error reading sort result file: %s."),
1268                      tmp_basename, strerror (errno));
1269               else
1270                 msg (ME, _("%s: Unexpected end of sort result file: %s."),
1271                      tmp_basename, strerror (errno));
1272               err_failure ();
1273               break;
1274             }
1275
1276           if (!write_case ())
1277             break;
1278         }
1279
1280       if (fclose (f) == EOF)
1281         msg (ME, _("%s: Error closing sort result file: %s."), tmp_basename,
1282              strerror (errno));
1283
1284       if (remove (tmp_basename) != 0)
1285         msg (ME, _("%s: Error removing sort result file: %s."), tmp_basename,
1286              strerror (errno));
1287       else
1288         rmdir_temp_dir ();
1289     }
1290 }
1291
1292 #if 0 /* dead code */
1293 /* Alternate interface to sort_stream_write used for external sorting
1294    when SEPARATE is true. */
1295 static int
1296 write_separate (struct ccase *c)
1297 {
1298   assert (c == temp_case);
1299
1300   sort_stream_write ();
1301   return 1;
1302 }
1303 #endif
1304
1305 /* Performs one iteration of 5.4.1R steps R4, R5, R6, R7, R2, and
1306    R3. */
1307 static void
1308 sort_stream_write (void)
1309 {
1310   struct repl_sel_tree *t;
1311
1312   /* R4. */
1313   memcpy (q->record, temp_case->data, vfm_sink_info.case_size);
1314   if (compare_record (q->record, lastkey) < 0)
1315     if (++rq > rmax)
1316       rmax = rq;
1317
1318   /* R5. */
1319   t = q->fe;
1320
1321   /* R6 and R7. */
1322   for (;;)
1323     {
1324       /* R6. */
1325       if (t->rn < rq
1326           || (t->rn == rq && compare_record (t->loser->record, q->record) < 0))
1327         {
1328           struct repl_sel_tree *temp_tree;
1329           int temp_int;
1330
1331           temp_tree = t->loser;
1332           t->loser = q;
1333           q = temp_tree;
1334
1335           temp_int = t->rn;
1336           t->rn = rq;
1337           rq = temp_int;
1338         }
1339
1340       /* R7. */
1341       if (t == x[1])
1342         break;
1343       t = t->fi;
1344     }
1345
1346   /* R2. */
1347   if (rq != rc)
1348     {
1349       end_run ();
1350       begin_run ();
1351       assert (rq <= rmax);
1352       rc = rq;
1353     }
1354
1355   /* R3. */
1356   if (rq != 0)
1357     {
1358       output_record (q->record);
1359       lastkey = x[x_max]->record;
1360       memcpy (lastkey, q->record, vfm_sink_info.case_size);
1361     }
1362 }
1363
1364 /* Switches mode from sink to source. */
1365 void
1366 sort_stream_mode (void)
1367 {
1368   /* If this is not done, then we get the following source/sink pairs:
1369      source=memory/disk/DATA LIST/etc., sink=SORT; source=SORT,
1370      sink=SORT; which is not good. */
1371   vfm_sink = NULL;
1372 }
1373
1374 struct case_stream sort_stream =
1375   {
1376     NULL,
1377     sort_stream_read,
1378     sort_stream_write,
1379     sort_stream_mode,
1380     NULL,
1381     NULL,
1382     "SORT",
1383   };