checkin of 0.3.0
[pspp-builds.git] / src / sort.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include <assert.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <errno.h>
25 #include "alloc.h"
26 #include "approx.h"
27 #include "command.h"
28 #include "error.h"
29 #include "expr.h"
30 #include "heap.h"
31 #include "lexer.h"
32 #include "misc.h"
33 #include "sort.h"
34 #include "str.h"
35 #include "var.h"
36 #include "vfm.h"
37 #include "vfmP.h"
38
39 #if HAVE_UNISTD_H
40 #include <unistd.h>
41 #endif
42
43 #if HAVE_SYS_TYPES_H
44 #include <sys/types.h>
45 #endif
46
47 #if HAVE_SYS_STAT_H
48 #include <sys/stat.h>
49 #endif
50
51 #undef DEBUGGING
52 /*#define DEBUGGING 1*/
53 #include "debug-print.h"
54
55 /* Variables to sort. */
56 struct variable **v_sort;
57 int nv_sort;
58
59 /* Used when internal-sorting to a separate file. */
60 static struct case_list **separate_case_tab;
61
62 /* Exported by qsort.c. */
63 void blp_quicksort (void *pbase, size_t total_elems, size_t size,
64                     int (*cmp) (const void *, const void *),
65                     void *temp_buf);
66
67 /* Other prototypes. */
68 static int compare_case_lists (const void *, const void *);
69 static int do_internal_sort (int separate);
70 static int do_external_sort (int separate);
71 int parse_sort_variables (void);
72 void read_sort_output (int (*write_case) (void));
73
74 /* Performs the SORT CASES procedures. */
75 int
76 cmd_sort_cases (void)
77 {
78   /* First, just parse the command. */
79   lex_match_id ("SORT");
80   lex_match_id ("CASES");
81   lex_match (T_BY);
82
83   if (!parse_sort_variables ())
84     return CMD_FAILURE;
85       
86   cancel_temporary ();
87
88   /* Then it's time to do the actual work.  There are two cases:
89
90      (internal sort) All the data is in memory.  In this case, we
91      perform an EXECUTE to get the data into the desired form, then
92      sort the cases in place, if it is still all in memory.
93
94      (external sort) The data is not in memory.  It may be coming from
95      a system file or other data file, etc.  In any case, it is now
96      time to perform an external sort.  This is better explained in
97      do_external_sort(). */
98
99   /* Do all this dirty work. */
100   {
101     int success = sort_cases (0);
102     free (v_sort);
103     if (success)
104       return lex_end_of_command ();
105     else
106       return CMD_FAILURE;
107   }
108 }
109
110 /* Parses a list of sort variables into v_sort and nv_sort.  */
111 int
112 parse_sort_variables (void)
113 {
114   v_sort = NULL;
115   nv_sort = 0;
116   do
117     {
118       int prev_nv_sort = nv_sort;
119       int order = SRT_ASCEND;
120
121       if (!parse_variables (&default_dict, &v_sort, &nv_sort,
122                             PV_NO_DUPLICATE | PV_APPEND | PV_NO_SCRATCH))
123         return 0;
124       if (lex_match ('('))
125         {
126           if (lex_match_id ("D") || lex_match_id ("DOWN"))
127             order = SRT_DESCEND;
128           else if (!lex_match_id ("A") && !lex_match_id ("UP"))
129             {
130               free (v_sort);
131               msg (SE, _("`A' or `D' expected inside parentheses."));
132               return 0;
133             }
134           if (!lex_match (')'))
135             {
136               free (v_sort);
137               msg (SE, _("`)' expected."));
138               return 0;
139             }
140         }
141       for (; prev_nv_sort < nv_sort; prev_nv_sort++)
142         v_sort[prev_nv_sort]->p.srt.order = order;
143     }
144   while (token != '.' && token != '/');
145   
146   return 1;
147 }
148
149 /* Sorts the active file based on the key variables specified in
150    global variables v_sort and nv_sort.  The output is either to the
151    active file, if SEPARATE is zero, or to a separate file, if
152    SEPARATE is nonzero.  In the latter case the output cases can be
153    read with a call to read_sort_output().  (In the former case the
154    output cases should be dealt with through the usual vfm interface.)
155
156    The caller is responsible for freeing v_sort[]. */
157 int
158 sort_cases (int separate)
159 {
160   assert (separate_case_tab == NULL);
161
162   /* Not sure this is necessary but it's good to be safe. */
163   if (separate && vfm_source == &sort_stream)
164     procedure (NULL, NULL, NULL);
165   
166   /* SORT CASES cancels PROCESS IF. */
167   expr_free (process_if_expr);
168   process_if_expr = NULL;
169
170   if (do_internal_sort (separate))
171     return 1;
172
173   page_to_disk ();
174   return do_external_sort (separate);
175 }
176
177 /* If a reasonable situation is set up, do an internal sort of the
178    data.  Return success. */
179 static int
180 do_internal_sort (int separate)
181 {
182   if (vfm_source != &vfm_disk_stream)
183     {
184       if (vfm_source != &vfm_memory_stream)
185         procedure (NULL, NULL, NULL);
186       if (vfm_source == &vfm_memory_stream)
187         {
188           struct case_list **case_tab = malloc (sizeof *case_tab
189                                          * (vfm_source_info.ncases + 1));
190           if (vfm_source_info.ncases == 0)
191             {
192               free (case_tab);
193               return 1;
194             }
195           if (case_tab != NULL)
196             {
197               struct case_list *clp = memory_source_cases;
198               struct case_list **ctp = case_tab;
199               int i;
200
201               for (; clp; clp = clp->next)
202                 *ctp++ = clp;
203               qsort (case_tab, vfm_source_info.ncases, sizeof *case_tab,
204                      compare_case_lists);
205
206               if (!separate)
207                 {
208                   memory_source_cases = case_tab[0];
209                   for (i = 1; i < vfm_source_info.ncases; i++)
210                     case_tab[i - 1]->next = case_tab[i];
211                   case_tab[vfm_source_info.ncases - 1]->next = NULL;
212                   free (case_tab);
213                 } else {
214                   case_tab[vfm_source_info.ncases] = NULL;
215                   separate_case_tab = case_tab;
216                 }
217               
218               return 1;
219             }
220         }
221     }
222   return 0;
223 }
224
225 /* Compares the NV_SORT variables in V_SORT[] between the `case_list's
226    at _A and _B, and returns a strcmp()-type result. */
227 static int
228 compare_case_lists (const void *pa, const void *pb)
229 {
230   struct case_list *a = *(struct case_list **) pa;
231   struct case_list *b = *(struct case_list **) pb;
232   struct variable *v;
233   int result = 0;
234   int i;
235
236   for (i = 0; i < nv_sort; i++)
237     {
238       v = v_sort[i];
239       
240       if (v->type == NUMERIC)
241         {
242           if (approx_ne (a->c.data[v->fv].f, b->c.data[v->fv].f))
243             {
244               result = (a->c.data[v->fv].f > b->c.data[v->fv].f) ? 1 : -1;
245               break;
246             }
247         }
248       else
249         {
250           result = memcmp (a->c.data[v->fv].s, b->c.data[v->fv].s, v->width);
251           if (result != 0)
252             break;
253         }
254     }
255
256   if (v->p.srt.order == SRT_ASCEND)
257     return result;
258   else
259     {
260       assert (v->p.srt.order == SRT_DESCEND);
261       return -result;
262     }
263 }
264 \f
265 /* External sort. */
266
267 /* Maximum number of input + output file handles. */
268 #if defined FOPEN_MAX && (FOPEN_MAX - 5 < 18)
269 #define MAX_FILE_HANDLES        (FOPEN_MAX - 5)
270 #else
271 #define MAX_FILE_HANDLES        18
272 #endif
273
274 #if MAX_FILE_HANDLES < 3
275 #error At least 3 file handles must be available for sorting.
276 #endif
277
278 /* Number of input buffers. */
279 #define N_INPUT_BUFFERS         (MAX_FILE_HANDLES - 1)
280
281 /* Maximum order of merge.  This is the value suggested by Knuth;
282    specifically, he said to use tree selection, which we don't
283    implement, for larger orders of merge. */
284 #define MAX_MERGE_ORDER         7
285
286 /* Minimum total number of records for buffers. */
287 #define MIN_BUFFER_TOTAL_SIZE_RECS      64
288
289 /* Minimum single input or output buffer size, in bytes and records. */
290 #define MIN_BUFFER_SIZE_BYTES   4096
291 #define MIN_BUFFER_SIZE_RECS    16
292
293 /* Structure for replacement selection tree. */
294 struct repl_sel_tree
295   {
296     struct repl_sel_tree *loser;/* Loser associated w/this internal node. */
297     int rn;                     /* Run number of `loser'. */
298     struct repl_sel_tree *fe;   /* Internal node above this external node. */
299     struct repl_sel_tree *fi;   /* Internal node above this internal node. */
300     union value record[1];      /* The case proper. */
301   };
302
303 /* Static variables used for sorting. */
304 static struct repl_sel_tree **x; /* Buffers. */
305 static int x_max;               /* Size of buffers, in records. */
306 static int records_per_buffer;  /* Number of records in each buffer. */
307
308 /* In the merge phase, the first N_INPUT_BUFFERS handle[] elements are
309    input files and the last element is the output file.  Before that,
310    they're all used as output files, although the last one is
311    segregated. */
312 static FILE *handle[MAX_FILE_HANDLES];  /* File handles. */
313
314 /* Now, MAX_FILE_HANDLES is the maximum number of files we will *try*
315    to open.  But if we can't open that many, max_handles will be set
316    to the number we apparently can open. */
317 static int max_handles;         /* Maximum number of handles. */
318
319 /* When we create temporary files, they are all put in the same
320    directory and numbered sequentially from zero.  tmp_basename is the
321    drive/directory, etc., and tmp_extname can be sprintf() with "%08x"
322    to the file number, then tmp_basename used to open the file. */
323 static char *tmp_basename;      /* Temporary file basename. */
324 static char *tmp_extname;       /* Temporary file extension name. */
325
326 /* We use Huffman's method to determine the merge pattern.  This means
327    that we need to know which runs are the shortest at any given time.
328    Priority queues as implemented by heap.c are a natural for this
329    task (probably because I wrote the code specifically for it). */
330 static struct heap *huffman_queue;      /* Huffman priority queue. */
331
332 /* Prototypes for helper functions. */
333 static void sort_stream_write (void);
334 static int write_initial_runs (int separate);
335 static int allocate_cases (void);
336 static int allocate_file_handles (void);
337 static int merge (void);
338 static void rmdir_temp_dir (void);
339
340 /* Performs an external sort of the active file.  A description of the
341    procedure follows.  All page references refer to Knuth's _Art of
342    Computer Programming, Vol. 3: Sorting and Searching_, which is the
343    canonical resource for sorting.
344
345    1. The data is read and S initial runs are formed through the
346    action of algorithm 5.4.1R (replacement selection).
347
348    2. Huffman's method (p. 365-366) is used to determine the optimum
349    merge pattern.
350
351    3. If an OS that supports overlapped reading, writing, and
352    computing is being run, we should use 5.4.6F for forecasting.
353    Otherwise, buffers are filled only when they run out of data.
354    FIXME: Since the author of PSPP uses GNU/Linux, which does not
355    yet implement overlapped r/w/c, 5.4.6F is not used.
356
357    4. We perform P-way merges:
358
359    (a) The desired P is the smallest P such that ceil(ln(S)/ln(P))
360    is minimized.  (FIXME: Since I don't have an algorithm for
361    minimizing this, it's just set to MAX_MERGE_ORDER.)
362
363    (b) P is reduced if the selected value would make input buffers
364    less than 4096 bytes each, or 16 records, whichever is larger.
365
366    (c) P is reduced if we run out of available file handles or space
367    for file handles.
368
369    (d) P is reduced if we don't have space for one or two output
370    buffers, which have the same minimum size as input buffers.  (We
371    need two output buffers if 5.4.6F is in use for forecasting.)  */
372 static int
373 do_external_sort (int separate)
374 {
375   int success = 0;
376
377   assert (MAX_FILE_HANDLES >= 3);
378
379   x = NULL;
380   tmp_basename = NULL;
381
382   huffman_queue = heap_create (512);
383   if (huffman_queue == NULL)
384     return 0;
385
386   if (!allocate_cases ())
387     goto lossage;
388
389   if (!allocate_file_handles ())
390     goto lossage;
391
392   if (!write_initial_runs (separate))
393     goto lossage;
394
395   merge ();
396
397   success = 1;
398
399   /* Despite the name, flow of control comes here regardless of
400      whether or not the sort is successful. */
401 lossage:
402   heap_destroy (huffman_queue);
403
404   if (x)
405     {
406       int i;
407
408       for (i = 0; i <= x_max; i++)
409         free (x[i]);
410       free (x);
411     }
412
413   if (!success)
414     rmdir_temp_dir ();
415
416   return success;
417 }
418
419 #if !HAVE_GETPID
420 #define getpid() (0)
421 #endif
422
423 /* Sets up to open temporary files. */
424 /* PORTME: This creates a directory for temporary files.  Some OSes
425    might not have that concept... */
426 static int
427 allocate_file_handles (void)
428 {
429   const char *dir;              /* Directory prefix. */
430   char *buf;                    /* String buffer. */
431   char *cp;                     /* Pointer into buf. */
432
433   dir = getenv ("SPSSTMPDIR");
434   if (dir == NULL)
435     dir = getenv ("SPSSXTMPDIR");
436   if (dir == NULL)
437     dir = getenv ("TMPDIR");
438 #ifdef P_tmpdir
439   if (dir == NULL)
440     dir = P_tmpdir;
441 #endif
442 #if __unix__
443   if (dir == NULL)
444     dir = "/tmp";
445 #elif __MSDOS__
446   if (dir == NULL)
447     dir = getenv ("TEMP");
448   if (dir == NULL)
449     dir = getenv ("TMP");
450   if (dir == NULL)
451     dir = "\\";
452 #else
453   dir = "";
454 #endif
455
456   buf = xmalloc (strlen (dir) + 1 + 4 + 8 + 4 + 1 + INT_DIGITS + 1);
457   cp = spprintf (buf, "%s%c%04lX%04lXpspp", dir, DIR_SEPARATOR,
458                  ((long) time (0)) & 0xffff, ((long) getpid ()) & 0xffff);
459   if (-1 == mkdir (buf, S_IRWXU))
460     {
461       free (buf);
462       msg (SE, _("%s: Cannot create temporary directory: %s."),
463            buf, strerror (errno));
464       return 0;
465     }
466   *cp++ = DIR_SEPARATOR;
467
468   tmp_basename = buf;
469   tmp_extname = cp;
470
471   max_handles = MAX_FILE_HANDLES;
472
473   return 1;
474 }
475
476 /* Removes the directory created for temporary files, if one exists.
477    Also frees tmp_basename. */
478 static void
479 rmdir_temp_dir (void)
480 {
481   if (NULL == tmp_basename)
482     return;
483
484   tmp_extname[-1] = '\0';
485   if (rmdir (tmp_basename) == -1)
486     msg (SE, _("%s: Error removing directory for temporary files: %s."),
487          tmp_basename, strerror (errno));
488
489   free (tmp_basename);
490 }
491
492 /* Allocates room for lots of cases as a buffer. */
493 static int
494 allocate_cases (void)
495 {
496   /* This is the size of one case. */
497   const int case_size = (sizeof (struct repl_sel_tree)
498                          + sizeof (union value) * (default_dict.nval - 1)
499                          + sizeof (struct repl_sel_tree *));
500
501   x = NULL;
502
503   /* Allocate as many cases as we can, assuming a space of four
504      void pointers for malloc()'s internal bookkeeping. */
505   x_max = MAX_WORKSPACE / (case_size + 4 * sizeof (void *));
506   x = malloc (sizeof (struct repl_sel_tree *) * x_max);
507   if (x != NULL)
508     {
509       int i;
510
511       for (i = 0; i < x_max; i++)
512         {
513           x[i] = malloc (sizeof (struct repl_sel_tree)
514                          + sizeof (union value) * (default_dict.nval - 1));
515           if (x[i] == NULL)
516             break;
517         }
518       x_max = i;
519     }
520   if (x == NULL || x_max < MIN_BUFFER_TOTAL_SIZE_RECS)
521     {
522       if (x != NULL)
523         {
524           int i;
525           
526           for (i = 0; i < x_max; i++)
527             free (x[i]);
528         }
529       free (x);
530       msg (SE, _("Out of memory.  Could not allocate room for minimum of %d "
531                  "cases of %d bytes each.  (PSPP workspace is currently "
532                  "restricted to a maximum of %d KB.)"),
533            MIN_BUFFER_TOTAL_SIZE_RECS, case_size, MAX_WORKSPACE / 1024);
534       x_max = 0;
535       x = NULL;
536       return 0;
537     }
538
539   /* The last element of the array is used to store lastkey. */
540   x_max--;
541
542   debug_printf ((_("allocated %d cases == %d bytes\n"),
543                  x_max, x_max * case_size));
544   return 1;
545 }
546 \f
547 /* Replacement selection. */
548
549 static int rmax, rc, rq;
550 static struct repl_sel_tree *q;
551 static union value *lastkey;
552 static int run_no, file_index;
553 static int deferred_abort;
554 static int run_length;
555
556 static int compare_record (union value *, union value *);
557
558 static inline void
559 output_record (union value *v)
560 {
561   union value *src_case;
562   
563   if (deferred_abort)
564     return;
565
566   if (compaction_necessary)
567     {
568       compact_case (compaction_case, (struct ccase *) v);
569       src_case = (union value *) compaction_case;
570     }
571   else
572     src_case = (union value *) v;
573
574   if ((int) fwrite (src_case, sizeof *src_case, compaction_nval,
575                     handle[file_index])
576       != compaction_nval)
577     {
578       deferred_abort = 1;
579       sprintf (tmp_extname, "%08x", run_no);
580       msg (SE, _("%s: Error writing temporary file: %s."),
581            tmp_basename, strerror (errno));
582       return;
583     }
584
585   run_length++;
586 }
587
588 static int
589 close_handle (int i)
590 {
591   int result = fclose (handle[i]);
592   msg (VM (2), _("SORT: Closing handle %d."), i);
593   
594   handle[i] = NULL;
595   if (EOF == result)
596     {
597       sprintf (tmp_extname, "%08x", i);
598       msg (SE, _("%s: Error closing temporary file: %s."),
599            tmp_basename, strerror (errno));
600       return 0;
601     }
602   return 1;
603 }
604
605 static int
606 close_handles (int beg, int end)
607 {
608   int success = 1;
609   int i;
610
611   for (i = beg; i < end; i++)
612     success &= close_handle (i);
613   return success;
614 }
615
616 static int
617 open_handle_w (int handle_no, int run_no)
618 {
619   sprintf (tmp_extname, "%08x", run_no);
620   msg (VM (1), _("SORT: %s: Opening for writing as run %d."),
621        tmp_basename, run_no);
622
623   /* The `x' modifier causes the GNU C library to insist on creating a
624      new file--if the file already exists, an error is signaled.  The
625      ANSI C standard says that other libraries should ignore anything
626      after the `w+b', so it shouldn't be a problem. */
627   return NULL != (handle[handle_no] = fopen (tmp_basename, "w+bx"));
628 }
629
630 static int
631 open_handle_r (int handle_no, int run_no)
632 {
633   FILE *f;
634
635   sprintf (tmp_extname, "%08x", run_no);
636   msg (VM (1), _("SORT: %s: Opening for writing as run %d."),
637        tmp_basename, run_no);
638   f = handle[handle_no] = fopen (tmp_basename, "rb");
639
640   if (f == NULL)
641     {
642       msg (SE, _("%s: Error opening temporary file for reading: %s."),
643            tmp_basename, strerror (errno));
644       return 0;
645     }
646   
647   return 1;
648 }
649
650 /* Begins a new initial run, specifically its output file. */
651 static void
652 begin_run (void)
653 {
654   /* Decide which handle[] to use.  If run_no is max_handles or
655      greater, then we've run out of handles so it's time to just do
656      one file at a time, which by default is handle 0. */
657   file_index = (run_no < max_handles ? run_no : 0);
658   run_length = 0;
659
660   /* Alright, now create the temporary file. */
661   if (open_handle_w (file_index, run_no) == 0)
662     {
663       /* Failure to create the temporary file.  Check if there are
664          unacceptably few files already open. */
665       if (file_index < 3)
666         {
667           deferred_abort = 1;
668           msg (SE, _("%s: Error creating temporary file: %s."),
669                tmp_basename, strerror (errno));
670           return;
671         }
672
673       /* Close all the open temporary files. */
674       if (!close_handles (0, file_index))
675         return;
676
677       /* Now try again to create the temporary file. */
678       max_handles = file_index;
679       file_index = 0;
680       if (open_handle_w (0, run_no) == 0)
681         {
682           /* It still failed, report it this time. */
683           deferred_abort = 1;
684           msg (SE, _("%s: Error creating temporary file: %s."),
685                tmp_basename, strerror (errno));
686           return;
687         }
688     }
689 }
690
691 /* Ends the current initial run.  Just increments run_no if no initial
692    run has been started yet. */
693 static void
694 end_run (void)
695 {
696   /* Close file handles if necessary. */
697   {
698     int result;
699
700     if (run_no == max_handles - 1)
701       result = close_handles (0, max_handles);
702     else if (run_no >= max_handles)
703       result = close_handle (0);
704     else
705       result = 1;
706     if (!result)
707       deferred_abort = 1;
708   }
709
710   /* Advance to next run. */
711   run_no++;
712   if (run_no)
713     heap_insert (huffman_queue, run_no - 1, run_length);
714 }
715
716 /* Performs 5.4.1R. */
717 static int
718 write_initial_runs (int separate)
719 {
720   run_no = -1;
721   deferred_abort = 0;
722
723   /* Steps R1, R2, R3. */
724   rmax = 0;
725   rc = 0;
726   lastkey = NULL;
727   q = x[0];
728   rq = 0;
729   {
730     int j;
731
732     for (j = 0; j < x_max; j++)
733       {
734         struct repl_sel_tree *J = x[j];
735
736         J->loser = J;
737         J->rn = 0;
738         J->fe = x[(x_max + j) / 2];
739         J->fi = x[j / 2];
740         memset (J->record, 0, default_dict.nval * sizeof (union value));
741       }
742   }
743
744   /* Most of the iterations of steps R4, R5, R6, R7, R2, R3, ... */
745   if (!separate)
746     {
747       if (vfm_sink)
748         vfm_sink->destroy_sink ();
749       vfm_sink = &sort_stream;
750     }
751   procedure (NULL, NULL, NULL);
752
753   /* Final iterations of steps R4, R5, R6, R7, R2, R3, ... */
754   for (;;)
755     {
756       struct repl_sel_tree *t;
757
758       /* R4. */
759       rq = rmax + 1;
760
761       /* R5. */
762       t = q->fe;
763
764       /* R6 and R7. */
765       for (;;)
766         {
767           /* R6. */
768           if (t->rn < rq
769               || (t->rn == rq
770                   && compare_record (t->loser->record, q->record) < 0))
771             {
772               struct repl_sel_tree *temp_tree;
773               int temp_int;
774
775               temp_tree = t->loser;
776               t->loser = q;
777               q = temp_tree;
778
779               temp_int = t->rn;
780               t->rn = rq;
781               rq = temp_int;
782             }
783
784           /* R7. */
785           if (t == x[1])
786             break;
787           t = t->fi;
788         }
789
790       /* R2. */
791       if (rq != rc)
792         {
793           end_run ();
794           if (rq > rmax)
795             break;
796           begin_run ();
797           rc = rq;
798         }
799
800       /* R3. */
801       if (rq != 0)
802         {
803           output_record (q->record);
804           lastkey = x[x_max]->record;
805           memcpy (lastkey, q->record, sizeof (union value) * vfm_sink_info.nval);
806         }
807     }
808   assert (run_no == rmax);
809
810   /* If an unrecoverable error occurred somewhere in the above code,
811      then the `deferred_abort' flag would have been set.  */
812   if (deferred_abort)
813     {
814       int i;
815
816       for (i = 0; i < max_handles; i++)
817         if (handle[i] != NULL)
818           {
819             sprintf (tmp_extname, "%08x", i);
820
821             if (fclose (handle[i]) == EOF)
822               msg (SE, _("%s: Error closing temporary file: %s."),
823                    tmp_basename, strerror (errno));
824
825             if (remove (tmp_basename) != 0)
826               msg (SE, _("%s: Error removing temporary file: %s."),
827                    tmp_basename, strerror (errno));
828
829             handle[i] = NULL;
830           }
831       return 0;
832     }
833
834   return 1;
835 }
836
837 /* Compares the NV_SORT variables in V_SORT[] between the `value's at
838    A and B, and returns a strcmp()-type result. */
839 static int
840 compare_record (union value * a, union value * b)
841 {
842   int i;
843   int result = 0;
844   struct variable *v;
845
846   assert (a != NULL);
847   if (b == NULL)                /* Sort NULLs after everything else. */
848     return -1;
849
850   for (i = 0; i < nv_sort; i++)
851     {
852       v = v_sort[i];
853
854       if (v->type == NUMERIC)
855         {
856           if (approx_ne (a[v->fv].f, b[v->fv].f))
857             {
858               result = (a[v->fv].f > b[v->fv].f) ? 1 : -1;
859               break;
860             }
861         }
862       else
863         {
864           result = memcmp (a[v->fv].s, b[v->fv].s, v->width);
865           if (result != 0)
866             break;
867         }
868     }
869
870   if (v->p.srt.order == SRT_ASCEND)
871     return result;
872   else
873     {
874       assert (v->p.srt.order == SRT_DESCEND);
875       return -result;
876     }
877 }
878 \f
879 /* Merging. */
880
881 static int merge_once (int run_index[], int run_length[], int n_runs);
882
883 /* Modula function as defined by Knuth. */
884 static int
885 mod (int x, int y)
886 {
887   int result;
888
889   if (y == 0)
890     return x;
891   result = abs (x) % abs (y);
892   if (y < 0)
893     result = -result;
894   return result;
895 }
896
897 /* Performs a series of P-way merges of initial runs using Huffman's
898    method. */
899 static int
900 merge (void)
901 {
902   /* Order of merge. */
903   int order;
904
905   /* Idiot check. */
906   assert (MIN_BUFFER_SIZE_RECS * 2 <= MIN_BUFFER_TOTAL_SIZE_RECS - 1);
907
908   /* Close all the input files.  I hope that the boundary conditions
909      are correct on this but I'm not sure. */
910   if (run_no < max_handles)
911     {
912       int i;
913
914       for (i = 0; i < run_no; )
915         if (!close_handle (i++))
916           {
917             for (; i < run_no; i++)
918               close_handle (i);
919             return 0;
920           }
921     }
922
923   /* Determine order of merge. */
924   order = MAX_MERGE_ORDER;
925   if (x_max / order < MIN_BUFFER_SIZE_RECS)
926     order = x_max / MIN_BUFFER_SIZE_RECS;
927   else if (x_max / order * sizeof (union value) * default_dict.nval
928            < MIN_BUFFER_SIZE_BYTES)
929     order = x_max / (MIN_BUFFER_SIZE_BYTES
930                      / (sizeof (union value) * (default_dict.nval - 1)));
931
932   /* Make sure the order of merge is bounded. */
933   if (order < 2)
934     order = 2;
935   if (order > rmax)
936     order = rmax;
937   assert (x_max / order > 0);
938
939   /* Calculate number of records per buffer. */
940   records_per_buffer = x_max / order;
941
942   /* Add (1 - S) mod (P - 1) dummy runs of length 0. */
943   {
944     int n_dummy_runs = mod (1 - rmax, order - 1);
945     debug_printf (("rmax=%d, order=%d, n_dummy_runs=%d\n",
946                    rmax, order, n_dummy_runs));
947     assert (n_dummy_runs >= 0);
948     while (n_dummy_runs--)
949       {
950         heap_insert (huffman_queue, -2, 0);
951         rmax++;
952       }
953   }
954
955   /* Repeatedly merge the P shortest existing runs until only one run
956      is left. */
957   while (rmax > 1)
958     {
959       int run_index[MAX_MERGE_ORDER];
960       int run_length[MAX_MERGE_ORDER];
961       int total_run_length = 0;
962       int i;
963
964       assert (rmax >= order);
965
966       /* Find the shortest runs; put them in runs[] in reverse order
967          of length, to force dummy runs of length 0 to the end of the
968          list. */
969       debug_printf ((_("merging runs")));
970       for (i = order - 1; i >= 0; i--)
971         {
972           run_index[i] = heap_delete (huffman_queue, &run_length[i]);
973           assert (run_index[i] != -1);
974           total_run_length += run_length[i];
975           debug_printf ((" %d(%d)", run_index[i], run_length[i]));
976         }
977       debug_printf ((_(" into run %d(%d)\n"), run_no, total_run_length));
978
979       if (!merge_once (run_index, run_length, order))
980         {
981           int index;
982
983           while (-1 != (index = heap_delete (huffman_queue, NULL)))
984             {
985               sprintf (tmp_extname, "%08x", index);
986               if (remove (tmp_basename) != 0)
987                 msg (SE, _("%s: Error removing temporary file: %s."),
988                      tmp_basename, strerror (errno));
989             }
990
991           return 0;
992         }
993
994       if (!heap_insert (huffman_queue, run_no++, total_run_length))
995         {
996           msg (SE, _("Out of memory expanding Huffman priority queue."));
997           return 0;
998         }
999
1000       rmax -= order - 1;
1001     }
1002
1003   /* There should be exactly one element in the priority queue after
1004      all that merging.  This represents the entire sorted active file.
1005      So we could find a total case count by deleting this element from
1006      the queue. */
1007   assert (heap_size (huffman_queue) == 1);
1008
1009   return 1;
1010 }
1011
1012 /* Merges N_RUNS initial runs into a new run.  The jth run for 0 <= j
1013    < N_RUNS is taken from temporary file RUN_INDEX[j]; it is composed
1014    of RUN_LENGTH[j] cases. */
1015 static int
1016 merge_once (int run_index[], int run_length[], int n_runs)
1017 {
1018   /* For each run, the number of records remaining in its buffer. */
1019   int buffered[MAX_MERGE_ORDER];
1020
1021   /* For each run, the index of the next record in the buffer. */
1022   int buffer_ptr[MAX_MERGE_ORDER];
1023
1024   /* Open input files. */
1025   {
1026     int i;
1027
1028     for (i = 0; i < n_runs; i++)
1029       if (run_index[i] != -2 && !open_handle_r (i, run_index[i]))
1030         {
1031           /* Close and remove temporary files. */
1032           while (i--)
1033             {
1034               close_handle (i);
1035               sprintf (tmp_extname, "%08x", i);
1036               if (remove (tmp_basename) != 0)
1037                 msg (SE, _("%s: Error removing temporary file: %s."),
1038                      tmp_basename, strerror (errno));
1039             }
1040
1041           return 0;
1042         }
1043   }
1044
1045   /* Create output file. */
1046   if (!open_handle_w (N_INPUT_BUFFERS, run_no))
1047     {
1048       msg (SE, _("%s: Error creating temporary file for merge: %s."),
1049            tmp_basename, strerror (errno));
1050       goto lossage;
1051     }
1052
1053   /* Prime each buffer. */
1054   {
1055     int i;
1056
1057     for (i = 0; i < n_runs; i++)
1058       if (run_index[i] == -2)
1059         {
1060           n_runs = i;
1061           break;
1062         }
1063       else
1064         {
1065           int j;
1066           int ofs = records_per_buffer * i;
1067
1068           buffered[i] = min (records_per_buffer, run_length[i]);
1069           for (j = 0; j < buffered[i]; j++)
1070             if ((int) fread (x[j + ofs]->record, sizeof (union value),
1071                              default_dict.nval, handle[i])
1072                 != default_dict.nval)
1073               {
1074                 sprintf (tmp_extname, "%08x", run_index[i]);
1075                 if (ferror (handle[i]))
1076                   msg (SE, _("%s: Error reading temporary file in merge: %s."),
1077                        tmp_basename, strerror (errno));
1078                 else
1079                   msg (SE, _("%s: Unexpected end of temporary file in merge."),
1080                        tmp_basename);
1081                 goto lossage;
1082               }
1083           buffer_ptr[i] = ofs;
1084           run_length[i] -= buffered[i];
1085         }
1086   }
1087
1088   /* Perform the merge proper. */
1089   while (n_runs)                /* Loop while some data is left. */
1090     {
1091       int i;
1092       int min = 0;
1093
1094       for (i = 1; i < n_runs; i++)
1095         if (compare_record (x[buffer_ptr[min]]->record,
1096                             x[buffer_ptr[i]]->record) > 0)
1097           min = i;
1098
1099       if ((int) fwrite (x[buffer_ptr[min]]->record, sizeof (union value),
1100                         default_dict.nval, handle[N_INPUT_BUFFERS])
1101           != default_dict.nval)
1102         {
1103           sprintf (tmp_extname, "%08x", run_index[i]);
1104           msg (SE, _("%s: Error writing temporary file in "
1105                "merge: %s."), tmp_basename, strerror (errno));
1106           goto lossage;
1107         }
1108
1109       /* Remove one case from the buffer for this input file. */
1110       if (--buffered[min] == 0)
1111         {
1112           /* The input buffer is empty.  Do any cases remain in the
1113              initial run on disk? */
1114           if (run_length[min])
1115             {
1116               /* Yes.  Read them in. */
1117
1118               int j;
1119               int ofs;
1120
1121               /* Reset the buffer pointer.  Note that we can't simply
1122                  set it to (i * records_per_buffer) since the run
1123                  order might have changed. */
1124               ofs = buffer_ptr[min] -= buffer_ptr[min] % records_per_buffer;
1125
1126               buffered[min] = min (records_per_buffer, run_length[min]);
1127               for (j = 0; j < buffered[min]; j++)
1128                 if ((int) fread (x[j + ofs]->record, sizeof (union value),
1129                                  default_dict.nval, handle[min])
1130                     != default_dict.nval)
1131                   {
1132                     sprintf (tmp_extname, "%08x", run_index[min]);
1133                     if (ferror (handle[min]))
1134                       msg (SE, _("%s: Error reading temporary file in "
1135                                  "merge: %s."),
1136                            tmp_basename, strerror (errno));
1137                     else
1138                       msg (SE, _("%s: Unexpected end of temporary file "
1139                                  "in merge."),
1140                            tmp_basename);
1141                     goto lossage;
1142                   }
1143               run_length[min] -= buffered[min];
1144             }
1145           else
1146             {
1147               /* No.  Delete this run. */
1148
1149               /* Close the file. */
1150               FILE *f = handle[min];
1151               handle[min] = NULL;
1152               sprintf (tmp_extname, "%08x", run_index[min]);
1153               if (fclose (f) == EOF)
1154                 msg (SE, _("%s: Error closing temporary file in merge: "
1155                      "%s."), tmp_basename, strerror (errno));
1156
1157               /* Delete the file. */
1158               if (remove (tmp_basename) != 0)
1159                 msg (SE, _("%s: Error removing temporary file in merge: "
1160                      "%s."), tmp_basename, strerror (errno));
1161
1162               n_runs--;
1163               if (min != n_runs)
1164                 {
1165                   /* Since this isn't the last run, we move the last
1166                      run into its spot to force all the runs to be
1167                      contiguous. */
1168                   run_index[min] = run_index[n_runs];
1169                   run_length[min] = run_length[n_runs];
1170                   buffer_ptr[min] = buffer_ptr[n_runs];
1171                   buffered[min] = buffered[n_runs];
1172                   handle[min] = handle[n_runs];
1173                 }
1174             }
1175         }
1176       else
1177         buffer_ptr[min]++;
1178     }
1179
1180   /* Close output file. */
1181   {
1182     FILE *f = handle[N_INPUT_BUFFERS];
1183     handle[N_INPUT_BUFFERS] = NULL;
1184     if (fclose (f) == EOF)
1185       {
1186         sprintf (tmp_extname, "%08x", run_no);
1187         msg (SE, _("%s: Error closing temporary file in merge: "
1188                    "%s."),
1189              tmp_basename, strerror (errno));
1190         return 0;
1191       }
1192   }
1193
1194   return 1;
1195
1196 lossage:
1197   /* Close all the input and output files. */
1198   {
1199     int i;
1200
1201     for (i = 0; i < n_runs; i++)
1202       if (run_length[i] != 0)
1203         {
1204           close_handle (i);
1205           sprintf (tmp_basename, "%08x", run_index[i]);
1206           if (remove (tmp_basename) != 0)
1207             msg (SE, _("%s: Error removing temporary file: %s."),
1208                  tmp_basename, strerror (errno));
1209         }
1210   }
1211   close_handle (N_INPUT_BUFFERS);
1212   sprintf (tmp_basename, "%08x", run_no);
1213   if (remove (tmp_basename) != 0)
1214     msg (SE, _("%s: Error removing temporary file: %s."),
1215          tmp_basename, strerror (errno));
1216   return 0;
1217 }
1218 \f
1219 /* External sort input program. */
1220
1221 /* Reads all the records from the source stream and passes them
1222    to write_case(). */
1223 void
1224 sort_stream_read (void)
1225 {
1226   read_sort_output (write_case);
1227 }
1228
1229 /* Reads all the records from the output stream and passes them to the
1230    function provided, which must have an interface identical to
1231    write_case(). */
1232 void
1233 read_sort_output (int (*write_case) (void))
1234 {
1235   int i;
1236   FILE *f;
1237
1238   if (separate_case_tab)
1239     {
1240       struct ccase *save_temp_case = temp_case;
1241       struct case_list **p;
1242
1243       for (p = separate_case_tab; *p; p++)
1244         {
1245           temp_case = &(*p)->c;
1246           write_case ();
1247         }
1248       
1249       free (separate_case_tab);
1250       separate_case_tab = NULL;
1251             
1252       temp_case = save_temp_case;
1253     } else {
1254       sprintf (tmp_extname, "%08x", run_no - 1);
1255       f = fopen (tmp_basename, "rb");
1256       if (!f)
1257         {
1258           msg (ME, _("%s: Cannot open sort result file: %s."), tmp_basename,
1259                strerror (errno));
1260           err_failure ();
1261           return;
1262         }
1263
1264       for (i = 0; i < vfm_source_info.ncases; i++)
1265         {
1266           if (!fread (temp_case, vfm_source_info.case_size, 1, f))
1267             {
1268               if (ferror (f))
1269                 msg (ME, _("%s: Error reading sort result file: %s."),
1270                      tmp_basename, strerror (errno));
1271               else
1272                 msg (ME, _("%s: Unexpected end of sort result file: %s."),
1273                      tmp_basename, strerror (errno));
1274               err_failure ();
1275               break;
1276             }
1277
1278           if (!write_case ())
1279             break;
1280         }
1281
1282       if (fclose (f) == EOF)
1283         msg (ME, _("%s: Error closing sort result file: %s."), tmp_basename,
1284              strerror (errno));
1285
1286       if (remove (tmp_basename) != 0)
1287         msg (ME, _("%s: Error removing sort result file: %s."), tmp_basename,
1288              strerror (errno));
1289       else
1290         rmdir_temp_dir ();
1291     }
1292 }
1293
1294 #if 0 /* dead code */
1295 /* Alternate interface to sort_stream_write used for external sorting
1296    when SEPARATE is true. */
1297 static int
1298 write_separate (struct ccase *c)
1299 {
1300   assert (c == temp_case);
1301
1302   sort_stream_write ();
1303   return 1;
1304 }
1305 #endif
1306
1307 /* Performs one iteration of 5.4.1R steps R4, R5, R6, R7, R2, and
1308    R3. */
1309 static void
1310 sort_stream_write (void)
1311 {
1312   struct repl_sel_tree *t;
1313
1314   /* R4. */
1315   memcpy (q->record, temp_case->data, vfm_sink_info.case_size);
1316   if (compare_record (q->record, lastkey) < 0)
1317     if (++rq > rmax)
1318       rmax = rq;
1319
1320   /* R5. */
1321   t = q->fe;
1322
1323   /* R6 and R7. */
1324   for (;;)
1325     {
1326       /* R6. */
1327       if (t->rn < rq
1328           || (t->rn == rq && compare_record (t->loser->record, q->record) < 0))
1329         {
1330           struct repl_sel_tree *temp_tree;
1331           int temp_int;
1332
1333           temp_tree = t->loser;
1334           t->loser = q;
1335           q = temp_tree;
1336
1337           temp_int = t->rn;
1338           t->rn = rq;
1339           rq = temp_int;
1340         }
1341
1342       /* R7. */
1343       if (t == x[1])
1344         break;
1345       t = t->fi;
1346     }
1347
1348   /* R2. */
1349   if (rq != rc)
1350     {
1351       end_run ();
1352       begin_run ();
1353       assert (rq <= rmax);
1354       rc = rq;
1355     }
1356
1357   /* R3. */
1358   if (rq != 0)
1359     {
1360       output_record (q->record);
1361       lastkey = x[x_max]->record;
1362       memcpy (lastkey, q->record, vfm_sink_info.case_size);
1363     }
1364 }
1365
1366 /* Switches mode from sink to source. */
1367 void
1368 sort_stream_mode (void)
1369 {
1370   /* If this is not done, then we get the following source/sink pairs:
1371      source=memory/disk/DATA LIST/etc., sink=SORT; source=SORT,
1372      sink=SORT; which is not good. */
1373   vfm_sink = NULL;
1374 }
1375
1376 struct case_stream sort_stream =
1377   {
1378     NULL,
1379     sort_stream_read,
1380     sort_stream_write,
1381     sort_stream_mode,
1382     NULL,
1383     NULL,
1384     "SORT",
1385   };