7949ea5e24c303562ab1397d7bb2920c8dd2b33f
[pspp-builds.git] / src / sort.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include "sort.h"
22 #include <assert.h>
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <errno.h>
26 #include "alloc.h"
27 #include "command.h"
28 #include "error.h"
29 #include "expr.h"
30 #include "heap.h"
31 #include "lexer.h"
32 #include "misc.h"
33 #include "str.h"
34 #include "var.h"
35 #include "vfm.h"
36 #include "vfmP.h"
37
38 #if HAVE_UNISTD_H
39 #include <unistd.h>
40 #endif
41
42 #if HAVE_SYS_TYPES_H
43 #include <sys/types.h>
44 #endif
45
46 #if HAVE_SYS_STAT_H
47 #include <sys/stat.h>
48 #endif
49
50 #include "debug-print.h"
51
52 /* Variables to sort. */
53 struct variable **v_sort;
54 int nv_sort;
55
56 /* Used when internal-sorting to a separate file. */
57 static struct case_list **separate_case_tab;
58
59 /* Other prototypes. */
60 static int compare_case_lists (const void *, const void *);
61 static int do_internal_sort (int separate);
62 static int do_external_sort (int separate);
63 int parse_sort_variables (void);
64 void read_sort_output (write_case_func *write_case, write_case_data wc_data);
65
66 /* Performs the SORT CASES procedures. */
67 int
68 cmd_sort_cases (void)
69 {
70   /* First, just parse the command. */
71   lex_match_id ("SORT");
72   lex_match_id ("CASES");
73   lex_match (T_BY);
74
75   if (!parse_sort_variables ())
76     return CMD_FAILURE;
77       
78   cancel_temporary ();
79
80   /* Then it's time to do the actual work.  There are two cases:
81
82      (internal sort) All the data is in memory.  In this case, we
83      perform an EXECUTE to get the data into the desired form, then
84      sort the cases in place, if it is still all in memory.
85
86      (external sort) The data is not in memory.  It may be coming from
87      a system file or other data file, etc.  In any case, it is now
88      time to perform an external sort.  This is better explained in
89      do_external_sort(). */
90
91   /* Do all this dirty work. */
92   {
93     int success = sort_cases (0);
94     free (v_sort);
95     if (success)
96       return lex_end_of_command ();
97     else
98       return CMD_FAILURE;
99   }
100 }
101
102 /* Parses a list of sort variables into v_sort and nv_sort.  */
103 int
104 parse_sort_variables (void)
105 {
106   v_sort = NULL;
107   nv_sort = 0;
108   do
109     {
110       int prev_nv_sort = nv_sort;
111       int order = SRT_ASCEND;
112
113       if (!parse_variables (default_dict, &v_sort, &nv_sort,
114                             PV_NO_DUPLICATE | PV_APPEND | PV_NO_SCRATCH))
115         return 0;
116       if (lex_match ('('))
117         {
118           if (lex_match_id ("D") || lex_match_id ("DOWN"))
119             order = SRT_DESCEND;
120           else if (!lex_match_id ("A") && !lex_match_id ("UP"))
121             {
122               free (v_sort);
123               msg (SE, _("`A' or `D' expected inside parentheses."));
124               return 0;
125             }
126           if (!lex_match (')'))
127             {
128               free (v_sort);
129               msg (SE, _("`)' expected."));
130               return 0;
131             }
132         }
133       for (; prev_nv_sort < nv_sort; prev_nv_sort++)
134         v_sort[prev_nv_sort]->p.srt.order = order;
135     }
136   while (token != '.' && token != '/');
137   
138   return 1;
139 }
140
141 /* Sorts the active file based on the key variables specified in
142    global variables v_sort and nv_sort.  The output is either to the
143    active file, if SEPARATE is zero, or to a separate file, if
144    SEPARATE is nonzero.  In the latter case the output cases can be
145    read with a call to read_sort_output().  (In the former case the
146    output cases should be dealt with through the usual vfm interface.)
147
148    The caller is responsible for freeing v_sort[]. */
149 int
150 sort_cases (int separate)
151 {
152   assert (separate_case_tab == NULL);
153
154   /* Not sure this is necessary but it's good to be safe. */
155   if (separate && vfm_source == &sort_stream)
156     procedure (NULL, NULL, NULL, NULL);
157   
158   /* SORT CASES cancels PROCESS IF. */
159   expr_free (process_if_expr);
160   process_if_expr = NULL;
161
162   if (do_internal_sort (separate))
163     return 1;
164
165   page_to_disk ();
166   return do_external_sort (separate);
167 }
168
169 /* If a reasonable situation is set up, do an internal sort of the
170    data.  Return success. */
171 static int
172 do_internal_sort (int separate)
173 {
174   if (vfm_source != &vfm_disk_stream)
175     {
176       if (vfm_source != &vfm_memory_stream)
177         procedure (NULL, NULL, NULL, NULL);
178       if (vfm_source == &vfm_memory_stream)
179         {
180           struct case_list **case_tab = malloc (sizeof *case_tab
181                                          * (vfm_source_info.ncases + 1));
182           if (vfm_source_info.ncases == 0)
183             {
184               free (case_tab);
185               return 1;
186             }
187           if (case_tab != NULL)
188             {
189               struct case_list *clp = memory_source_cases;
190               struct case_list **ctp = case_tab;
191               int i;
192
193               for (; clp; clp = clp->next)
194                 *ctp++ = clp;
195               qsort (case_tab, vfm_source_info.ncases, sizeof *case_tab,
196                      compare_case_lists);
197
198               if (!separate)
199                 {
200                   memory_source_cases = case_tab[0];
201                   for (i = 1; i < vfm_source_info.ncases; i++)
202                     case_tab[i - 1]->next = case_tab[i];
203                   case_tab[vfm_source_info.ncases - 1]->next = NULL;
204                   free (case_tab);
205                 } else {
206                   case_tab[vfm_source_info.ncases] = NULL;
207                   separate_case_tab = case_tab;
208                 }
209               
210               return 1;
211             }
212         }
213     }
214   return 0;
215 }
216
217 /* Compares the NV_SORT variables in V_SORT[] between the
218    `case_list's at A and B, and returns a strcmp()-type
219    result. */
220 static int
221 compare_case_lists (const void *a_, const void *b_)
222 {
223   struct case_list *const *pa = a_;
224   struct case_list *const *pb = b_;
225   struct case_list *a = *pa;
226   struct case_list *b = *pb;
227   struct variable *v;
228   int result = 0;
229   int i;
230
231   for (i = 0; i < nv_sort; i++)
232     {
233       v = v_sort[i];
234       
235       if (v->type == NUMERIC)
236         {
237           double af = a->c.data[v->fv].f;
238           double bf = b->c.data[v->fv].f;
239
240           result = af < bf ? -1 : af > bf;
241         }
242       else
243         result = memcmp (a->c.data[v->fv].s, b->c.data[v->fv].s, v->width);
244
245       if (result != 0)
246         break;
247     }
248
249   if (v->p.srt.order == SRT_DESCEND)
250     result = -result;
251   return result;
252 }
253 \f
254 /* External sort. */
255
256 /* Maximum number of input + output file handles. */
257 #if defined FOPEN_MAX && (FOPEN_MAX - 5 < 18)
258 #define MAX_FILE_HANDLES        (FOPEN_MAX - 5)
259 #else
260 #define MAX_FILE_HANDLES        18
261 #endif
262
263 #if MAX_FILE_HANDLES < 3
264 #error At least 3 file handles must be available for sorting.
265 #endif
266
267 /* Number of input buffers. */
268 #define N_INPUT_BUFFERS         (MAX_FILE_HANDLES - 1)
269
270 /* Maximum order of merge.  This is the value suggested by Knuth;
271    specifically, he said to use tree selection, which we don't
272    implement, for larger orders of merge. */
273 #define MAX_MERGE_ORDER         7
274
275 /* Minimum total number of records for buffers. */
276 #define MIN_BUFFER_TOTAL_SIZE_RECS      64
277
278 /* Minimum single input or output buffer size, in bytes and records. */
279 #define MIN_BUFFER_SIZE_BYTES   4096
280 #define MIN_BUFFER_SIZE_RECS    16
281
282 /* Structure for replacement selection tree. */
283 struct repl_sel_tree
284   {
285     struct repl_sel_tree *loser;/* Loser associated w/this internal node. */
286     int rn;                     /* Run number of `loser'. */
287     struct repl_sel_tree *fe;   /* Internal node above this external node. */
288     struct repl_sel_tree *fi;   /* Internal node above this internal node. */
289     union value record[1];      /* The case proper. */
290   };
291
292 /* Static variables used for sorting. */
293 static struct repl_sel_tree **x; /* Buffers. */
294 static int x_max;               /* Size of buffers, in records. */
295 static int records_per_buffer;  /* Number of records in each buffer. */
296
297 /* In the merge phase, the first N_INPUT_BUFFERS handle[] elements are
298    input files and the last element is the output file.  Before that,
299    they're all used as output files, although the last one is
300    segregated. */
301 static FILE *handle[MAX_FILE_HANDLES];  /* File handles. */
302
303 /* Now, MAX_FILE_HANDLES is the maximum number of files we will *try*
304    to open.  But if we can't open that many, max_handles will be set
305    to the number we apparently can open. */
306 static int max_handles;         /* Maximum number of handles. */
307
308 /* When we create temporary files, they are all put in the same
309    directory and numbered sequentially from zero.  tmp_basename is the
310    drive/directory, etc., and tmp_extname can be sprintf() with "%08x"
311    to the file number, then tmp_basename used to open the file. */
312 static char *tmp_basename;      /* Temporary file basename. */
313 static char *tmp_extname;       /* Temporary file extension name. */
314
315 /* We use Huffman's method to determine the merge pattern.  This means
316    that we need to know which runs are the shortest at any given time.
317    Priority queues as implemented by heap.c are a natural for this
318    task (probably because I wrote the code specifically for it). */
319 static struct heap *huffman_queue;      /* Huffman priority queue. */
320
321 /* Prototypes for helper functions. */
322 static void sort_stream_write (void);
323 static int write_initial_runs (int separate);
324 static int allocate_cases (void);
325 static int allocate_file_handles (void);
326 static int merge (void);
327 static void rmdir_temp_dir (void);
328
329 /* Performs an external sort of the active file.  A description of the
330    procedure follows.  All page references refer to Knuth's _Art of
331    Computer Programming, Vol. 3: Sorting and Searching_, which is the
332    canonical resource for sorting.
333
334    1. The data is read and S initial runs are formed through the
335    action of algorithm 5.4.1R (replacement selection).
336
337    2. Huffman's method (p. 365-366) is used to determine the optimum
338    merge pattern.
339
340    3. If an OS that supports overlapped reading, writing, and
341    computing is being run, we should use 5.4.6F for forecasting.
342    Otherwise, buffers are filled only when they run out of data.
343    FIXME: Since the author of PSPP uses GNU/Linux, which does not
344    yet implement overlapped r/w/c, 5.4.6F is not used.
345
346    4. We perform P-way merges:
347
348    (a) The desired P is the smallest P such that ceil(ln(S)/ln(P))
349    is minimized.  (FIXME: Since I don't have an algorithm for
350    minimizing this, it's just set to MAX_MERGE_ORDER.)
351
352    (b) P is reduced if the selected value would make input buffers
353    less than 4096 bytes each, or 16 records, whichever is larger.
354
355    (c) P is reduced if we run out of available file handles or space
356    for file handles.
357
358    (d) P is reduced if we don't have space for one or two output
359    buffers, which have the same minimum size as input buffers.  (We
360    need two output buffers if 5.4.6F is in use for forecasting.)  */
361 static int
362 do_external_sort (int separate)
363 {
364   int success = 0;
365
366   assert (MAX_FILE_HANDLES >= 3);
367
368   x = NULL;
369   tmp_basename = NULL;
370
371   huffman_queue = heap_create (512);
372   if (huffman_queue == NULL)
373     return 0;
374
375   if (!allocate_cases ())
376     goto lossage;
377
378   if (!allocate_file_handles ())
379     goto lossage;
380
381   if (!write_initial_runs (separate))
382     goto lossage;
383
384   merge ();
385
386   success = 1;
387
388   /* Despite the name, flow of control comes here regardless of
389      whether or not the sort is successful. */
390 lossage:
391   heap_destroy (huffman_queue);
392
393   if (x)
394     {
395       int i;
396
397       for (i = 0; i <= x_max; i++)
398         free (x[i]);
399       free (x);
400     }
401
402   if (!success)
403     rmdir_temp_dir ();
404
405   return success;
406 }
407
408 #if !HAVE_GETPID
409 #define getpid() (0)
410 #endif
411
412 /* Sets up to open temporary files. */
413 /* PORTME: This creates a directory for temporary files.  Some OSes
414    might not have that concept... */
415 static int
416 allocate_file_handles (void)
417 {
418   const char *dir;              /* Directory prefix. */
419   char *buf;                    /* String buffer. */
420   char *cp;                     /* Pointer into buf. */
421
422   dir = getenv ("SPSSTMPDIR");
423   if (dir == NULL)
424     dir = getenv ("SPSSXTMPDIR");
425   if (dir == NULL)
426     dir = getenv ("TMPDIR");
427 #ifdef P_tmpdir
428   if (dir == NULL)
429     dir = P_tmpdir;
430 #endif
431 #ifdef unix
432   if (dir == NULL)
433     dir = "/tmp";
434 #elif defined (__MSDOS__)
435   if (dir == NULL)
436     dir = getenv ("TEMP");
437   if (dir == NULL)
438     dir = getenv ("TMP");
439   if (dir == NULL)
440     dir = "\\";
441 #else
442   dir = "";
443 #endif
444
445   buf = xmalloc (strlen (dir) + 1 + 4 + 8 + 4 + 1 + INT_DIGITS + 1);
446   cp = spprintf (buf, "%s%c%04lX%04lXpspp", dir, DIR_SEPARATOR,
447                  ((long) time (0)) & 0xffff, ((long) getpid ()) & 0xffff);
448 #ifndef __MSDOS__
449   if (-1 == mkdir (buf, S_IRWXU))
450 #else
451   if (-1 == mkdir (buf))
452 #endif
453     {
454       free (buf);
455       msg (SE, _("%s: Cannot create temporary directory: %s."),
456            buf, strerror (errno));
457       return 0;
458     }
459   *cp++ = DIR_SEPARATOR;
460
461   tmp_basename = buf;
462   tmp_extname = cp;
463
464   max_handles = MAX_FILE_HANDLES;
465
466   return 1;
467 }
468
469 /* Removes the directory created for temporary files, if one exists.
470    Also frees tmp_basename. */
471 static void
472 rmdir_temp_dir (void)
473 {
474   if (NULL == tmp_basename)
475     return;
476
477   tmp_extname[-1] = '\0';
478   if (rmdir (tmp_basename) == -1)
479     msg (SE, _("%s: Error removing directory for temporary files: %s."),
480          tmp_basename, strerror (errno));
481
482   free (tmp_basename);
483 }
484
485 /* Allocates room for lots of cases as a buffer. */
486 static int
487 allocate_cases (void)
488 {
489   /* This is the size of one case. */
490   const int case_size = (sizeof (struct repl_sel_tree)
491                          + dict_get_case_size (default_dict)
492                          - sizeof (union value)
493                          + sizeof (struct repl_sel_tree *));
494
495   x = NULL;
496
497   /* Allocate as many cases as we can, assuming a space of four
498      void pointers for malloc()'s internal bookkeeping. */
499   x_max = MAX_WORKSPACE / (case_size + 4 * sizeof (void *));
500   x = malloc (sizeof (struct repl_sel_tree *) * x_max);
501   if (x != NULL)
502     {
503       int i;
504
505       for (i = 0; i < x_max; i++)
506         {
507           x[i] = malloc (sizeof (struct repl_sel_tree)
508                          + dict_get_case_size (default_dict)
509                          - sizeof (union value));
510           if (x[i] == NULL)
511             break;
512         }
513       x_max = i;
514     }
515   if (x == NULL || x_max < MIN_BUFFER_TOTAL_SIZE_RECS)
516     {
517       if (x != NULL)
518         {
519           int i;
520           
521           for (i = 0; i < x_max; i++)
522             free (x[i]);
523         }
524       free (x);
525       msg (SE, _("Out of memory.  Could not allocate room for minimum of %d "
526                  "cases of %d bytes each.  (PSPP workspace is currently "
527                  "restricted to a maximum of %d KB.)"),
528            MIN_BUFFER_TOTAL_SIZE_RECS, case_size, MAX_WORKSPACE / 1024);
529       x_max = 0;
530       x = NULL;
531       return 0;
532     }
533
534   /* The last element of the array is used to store lastkey. */
535   x_max--;
536
537   debug_printf ((_("allocated %d cases == %d bytes\n"),
538                  x_max, x_max * case_size));
539   return 1;
540 }
541 \f
542 /* Replacement selection. */
543
544 static int rmax, rc, rq;
545 static struct repl_sel_tree *q;
546 static union value *lastkey;
547 static int run_no, file_index;
548 static int deferred_abort;
549 static int run_length;
550
551 static int compare_record (union value *, union value *);
552
553 static inline void
554 output_record (union value *v)
555 {
556   union value *src_case;
557   
558   if (deferred_abort)
559     return;
560
561   if (compaction_necessary)
562     {
563       compact_case (compaction_case, (struct ccase *) v);
564       src_case = (union value *) compaction_case;
565     }
566   else
567     src_case = (union value *) v;
568
569   if ((int) fwrite (src_case, sizeof *src_case, compaction_nval,
570                     handle[file_index])
571       != compaction_nval)
572     {
573       deferred_abort = 1;
574       sprintf (tmp_extname, "%08x", run_no);
575       msg (SE, _("%s: Error writing temporary file: %s."),
576            tmp_basename, strerror (errno));
577       return;
578     }
579
580   run_length++;
581 }
582
583 static int
584 close_handle (int i)
585 {
586   int result = fclose (handle[i]);
587   msg (VM (2), _("SORT: Closing handle %d."), i);
588   
589   handle[i] = NULL;
590   if (EOF == result)
591     {
592       sprintf (tmp_extname, "%08x", i);
593       msg (SE, _("%s: Error closing temporary file: %s."),
594            tmp_basename, strerror (errno));
595       return 0;
596     }
597   return 1;
598 }
599
600 static int
601 close_handles (int beg, int end)
602 {
603   int success = 1;
604   int i;
605
606   for (i = beg; i < end; i++)
607     success &= close_handle (i);
608   return success;
609 }
610
611 static int
612 open_handle_w (int handle_no, int run_no)
613 {
614   sprintf (tmp_extname, "%08x", run_no);
615   msg (VM (1), _("SORT: %s: Opening for writing as run %d."),
616        tmp_basename, run_no);
617
618   /* The `x' modifier causes the GNU C library to insist on creating a
619      new file--if the file already exists, an error is signaled.  The
620      ANSI C standard says that other libraries should ignore anything
621      after the `w+b', so it shouldn't be a problem. */
622   return NULL != (handle[handle_no] = fopen (tmp_basename, "w+bx"));
623 }
624
625 static int
626 open_handle_r (int handle_no, int run_no)
627 {
628   FILE *f;
629
630   sprintf (tmp_extname, "%08x", run_no);
631   msg (VM (1), _("SORT: %s: Opening for writing as run %d."),
632        tmp_basename, run_no);
633   f = handle[handle_no] = fopen (tmp_basename, "rb");
634
635   if (f == NULL)
636     {
637       msg (SE, _("%s: Error opening temporary file for reading: %s."),
638            tmp_basename, strerror (errno));
639       return 0;
640     }
641   
642   return 1;
643 }
644
645 /* Begins a new initial run, specifically its output file. */
646 static void
647 begin_run (void)
648 {
649   /* Decide which handle[] to use.  If run_no is max_handles or
650      greater, then we've run out of handles so it's time to just do
651      one file at a time, which by default is handle 0. */
652   file_index = (run_no < max_handles ? run_no : 0);
653   run_length = 0;
654
655   /* Alright, now create the temporary file. */
656   if (open_handle_w (file_index, run_no) == 0)
657     {
658       /* Failure to create the temporary file.  Check if there are
659          unacceptably few files already open. */
660       if (file_index < 3)
661         {
662           deferred_abort = 1;
663           msg (SE, _("%s: Error creating temporary file: %s."),
664                tmp_basename, strerror (errno));
665           return;
666         }
667
668       /* Close all the open temporary files. */
669       if (!close_handles (0, file_index))
670         return;
671
672       /* Now try again to create the temporary file. */
673       max_handles = file_index;
674       file_index = 0;
675       if (open_handle_w (0, run_no) == 0)
676         {
677           /* It still failed, report it this time. */
678           deferred_abort = 1;
679           msg (SE, _("%s: Error creating temporary file: %s."),
680                tmp_basename, strerror (errno));
681           return;
682         }
683     }
684 }
685
686 /* Ends the current initial run.  Just increments run_no if no initial
687    run has been started yet. */
688 static void
689 end_run (void)
690 {
691   /* Close file handles if necessary. */
692   {
693     int result;
694
695     if (run_no == max_handles - 1)
696       result = close_handles (0, max_handles);
697     else if (run_no >= max_handles)
698       result = close_handle (0);
699     else
700       result = 1;
701     if (!result)
702       deferred_abort = 1;
703   }
704
705   /* Advance to next run. */
706   run_no++;
707   if (run_no)
708     heap_insert (huffman_queue, run_no - 1, run_length);
709 }
710
711 /* Performs 5.4.1R. */
712 static int
713 write_initial_runs (int separate)
714 {
715   run_no = -1;
716   deferred_abort = 0;
717
718   /* Steps R1, R2, R3. */
719   rmax = 0;
720   rc = 0;
721   lastkey = NULL;
722   q = x[0];
723   rq = 0;
724   {
725     int j;
726
727     for (j = 0; j < x_max; j++)
728       {
729         struct repl_sel_tree *J = x[j];
730
731         J->loser = J;
732         J->rn = 0;
733         J->fe = x[(x_max + j) / 2];
734         J->fi = x[j / 2];
735         memset (J->record, 0, dict_get_case_size (default_dict));
736       }
737   }
738
739   /* Most of the iterations of steps R4, R5, R6, R7, R2, R3, ... */
740   if (!separate)
741     {
742       if (vfm_sink)
743         vfm_sink->destroy_sink ();
744       vfm_sink = &sort_stream;
745     }
746   procedure (NULL, NULL, NULL, NULL);
747
748   /* Final iterations of steps R4, R5, R6, R7, R2, R3, ... */
749   for (;;)
750     {
751       struct repl_sel_tree *t;
752
753       /* R4. */
754       rq = rmax + 1;
755
756       /* R5. */
757       t = q->fe;
758
759       /* R6 and R7. */
760       for (;;)
761         {
762           /* R6. */
763           if (t->rn < rq
764               || (t->rn == rq
765                   && compare_record (t->loser->record, q->record) < 0))
766             {
767               struct repl_sel_tree *temp_tree;
768               int temp_int;
769
770               temp_tree = t->loser;
771               t->loser = q;
772               q = temp_tree;
773
774               temp_int = t->rn;
775               t->rn = rq;
776               rq = temp_int;
777             }
778
779           /* R7. */
780           if (t == x[1])
781             break;
782           t = t->fi;
783         }
784
785       /* R2. */
786       if (rq != rc)
787         {
788           end_run ();
789           if (rq > rmax)
790             break;
791           begin_run ();
792           rc = rq;
793         }
794
795       /* R3. */
796       if (rq != 0)
797         {
798           output_record (q->record);
799           lastkey = x[x_max]->record;
800           memcpy (lastkey, q->record, sizeof (union value) * vfm_sink_info.nval);
801         }
802     }
803   assert (run_no == rmax);
804
805   /* If an unrecoverable error occurred somewhere in the above code,
806      then the `deferred_abort' flag would have been set.  */
807   if (deferred_abort)
808     {
809       int i;
810
811       for (i = 0; i < max_handles; i++)
812         if (handle[i] != NULL)
813           {
814             sprintf (tmp_extname, "%08x", i);
815
816             if (fclose (handle[i]) == EOF)
817               msg (SE, _("%s: Error closing temporary file: %s."),
818                    tmp_basename, strerror (errno));
819
820             if (remove (tmp_basename) != 0)
821               msg (SE, _("%s: Error removing temporary file: %s."),
822                    tmp_basename, strerror (errno));
823
824             handle[i] = NULL;
825           }
826       return 0;
827     }
828
829   return 1;
830 }
831
832 /* Compares the NV_SORT variables in V_SORT[] between the `value's at
833    A and B, and returns a strcmp()-type result. */
834 static int
835 compare_record (union value * a, union value * b)
836 {
837   int i;
838   int result = 0;
839   struct variable *v;
840
841   assert (a != NULL);
842   if (b == NULL)                /* Sort NULLs after everything else. */
843     return -1;
844
845   for (i = 0; i < nv_sort; i++)
846     {
847       v = v_sort[i];
848
849       if (v->type == NUMERIC)
850         {
851           if (a[v->fv].f != b[v->fv].f)
852             {
853               result = (a[v->fv].f > b[v->fv].f) ? 1 : -1;
854               break;
855             }
856         }
857       else
858         {
859           result = memcmp (a[v->fv].s, b[v->fv].s, v->width);
860           if (result != 0)
861             break;
862         }
863     }
864
865   if (v->p.srt.order == SRT_ASCEND)
866     return result;
867   else
868     {
869       assert (v->p.srt.order == SRT_DESCEND);
870       return -result;
871     }
872 }
873 \f
874 /* Merging. */
875
876 static int merge_once (int run_index[], int run_length[], int n_runs);
877
878 /* Modula function as defined by Knuth. */
879 static int
880 mod (int x, int y)
881 {
882   int result;
883
884   if (y == 0)
885     return x;
886   result = abs (x) % abs (y);
887   if (y < 0)
888     result = -result;
889   return result;
890 }
891
892 /* Performs a series of P-way merges of initial runs using Huffman's
893    method. */
894 static int
895 merge (void)
896 {
897   /* Order of merge. */
898   int order;
899
900   /* Idiot check. */
901   assert (MIN_BUFFER_SIZE_RECS * 2 <= MIN_BUFFER_TOTAL_SIZE_RECS - 1);
902
903   /* Close all the input files.  I hope that the boundary conditions
904      are correct on this but I'm not sure. */
905   if (run_no < max_handles)
906     {
907       int i;
908
909       for (i = 0; i < run_no; )
910         if (!close_handle (i++))
911           {
912             for (; i < run_no; i++)
913               close_handle (i);
914             return 0;
915           }
916     }
917
918   /* Determine order of merge. */
919   order = MAX_MERGE_ORDER;
920   if (x_max / order < MIN_BUFFER_SIZE_RECS)
921     order = x_max / MIN_BUFFER_SIZE_RECS;
922   else if (x_max / order * dict_get_case_size (default_dict)
923            < MIN_BUFFER_SIZE_BYTES)
924     order = x_max / (MIN_BUFFER_SIZE_BYTES
925                      / dict_get_case_size (default_dict));
926
927   /* Make sure the order of merge is bounded. */
928   if (order < 2)
929     order = 2;
930   if (order > rmax)
931     order = rmax;
932   assert (x_max / order > 0);
933
934   /* Calculate number of records per buffer. */
935   records_per_buffer = x_max / order;
936
937   /* Add (1 - S) mod (P - 1) dummy runs of length 0. */
938   {
939     int n_dummy_runs = mod (1 - rmax, order - 1);
940     debug_printf (("rmax=%d, order=%d, n_dummy_runs=%d\n",
941                    rmax, order, n_dummy_runs));
942     assert (n_dummy_runs >= 0);
943     while (n_dummy_runs--)
944       {
945         heap_insert (huffman_queue, -2, 0);
946         rmax++;
947       }
948   }
949
950   /* Repeatedly merge the P shortest existing runs until only one run
951      is left. */
952   while (rmax > 1)
953     {
954       int run_index[MAX_MERGE_ORDER];
955       int run_length[MAX_MERGE_ORDER];
956       int total_run_length = 0;
957       int i;
958
959       assert (rmax >= order);
960
961       /* Find the shortest runs; put them in runs[] in reverse order
962          of length, to force dummy runs of length 0 to the end of the
963          list. */
964       debug_printf ((_("merging runs")));
965       for (i = order - 1; i >= 0; i--)
966         {
967           run_index[i] = heap_delete (huffman_queue, &run_length[i]);
968           assert (run_index[i] != -1);
969           total_run_length += run_length[i];
970           debug_printf ((" %d(%d)", run_index[i], run_length[i]));
971         }
972       debug_printf ((_(" into run %d(%d)\n"), run_no, total_run_length));
973
974       if (!merge_once (run_index, run_length, order))
975         {
976           int index;
977
978           while (-1 != (index = heap_delete (huffman_queue, NULL)))
979             {
980               sprintf (tmp_extname, "%08x", index);
981               if (remove (tmp_basename) != 0)
982                 msg (SE, _("%s: Error removing temporary file: %s."),
983                      tmp_basename, strerror (errno));
984             }
985
986           return 0;
987         }
988
989       if (!heap_insert (huffman_queue, run_no++, total_run_length))
990         {
991           msg (SE, _("Out of memory expanding Huffman priority queue."));
992           return 0;
993         }
994
995       rmax -= order - 1;
996     }
997
998   /* There should be exactly one element in the priority queue after
999      all that merging.  This represents the entire sorted active file.
1000      So we could find a total case count by deleting this element from
1001      the queue. */
1002   assert (heap_size (huffman_queue) == 1);
1003
1004   return 1;
1005 }
1006
1007 /* Merges N_RUNS initial runs into a new run.  The jth run for 0 <= j
1008    < N_RUNS is taken from temporary file RUN_INDEX[j]; it is composed
1009    of RUN_LENGTH[j] cases. */
1010 static int
1011 merge_once (int run_index[], int run_length[], int n_runs)
1012 {
1013   /* For each run, the number of records remaining in its buffer. */
1014   int buffered[MAX_MERGE_ORDER];
1015
1016   /* For each run, the index of the next record in the buffer. */
1017   int buffer_ptr[MAX_MERGE_ORDER];
1018
1019   /* Open input files. */
1020   {
1021     int i;
1022
1023     for (i = 0; i < n_runs; i++)
1024       if (run_index[i] != -2 && !open_handle_r (i, run_index[i]))
1025         {
1026           /* Close and remove temporary files. */
1027           while (i--)
1028             {
1029               close_handle (i);
1030               sprintf (tmp_extname, "%08x", i);
1031               if (remove (tmp_basename) != 0)
1032                 msg (SE, _("%s: Error removing temporary file: %s."),
1033                      tmp_basename, strerror (errno));
1034             }
1035
1036           return 0;
1037         }
1038   }
1039
1040   /* Create output file. */
1041   if (!open_handle_w (N_INPUT_BUFFERS, run_no))
1042     {
1043       msg (SE, _("%s: Error creating temporary file for merge: %s."),
1044            tmp_basename, strerror (errno));
1045       goto lossage;
1046     }
1047
1048   /* Prime each buffer. */
1049   {
1050     int i;
1051
1052     for (i = 0; i < n_runs; i++)
1053       if (run_index[i] == -2)
1054         {
1055           n_runs = i;
1056           break;
1057         }
1058       else
1059         {
1060           int j;
1061           int ofs = records_per_buffer * i;
1062
1063           buffered[i] = min (records_per_buffer, run_length[i]);
1064           for (j = 0; j < buffered[i]; j++)
1065             if (fread (x[j + ofs]->record,
1066                        dict_get_case_size (default_dict), 1, handle[i]) != 1)
1067               {
1068                 sprintf (tmp_extname, "%08x", run_index[i]);
1069                 if (ferror (handle[i]))
1070                   msg (SE, _("%s: Error reading temporary file in merge: %s."),
1071                        tmp_basename, strerror (errno));
1072                 else
1073                   msg (SE, _("%s: Unexpected end of temporary file in merge."),
1074                        tmp_basename);
1075                 goto lossage;
1076               }
1077           buffer_ptr[i] = ofs;
1078           run_length[i] -= buffered[i];
1079         }
1080   }
1081
1082   /* Perform the merge proper. */
1083   while (n_runs)                /* Loop while some data is left. */
1084     {
1085       int i;
1086       int min = 0;
1087
1088       for (i = 1; i < n_runs; i++)
1089         if (compare_record (x[buffer_ptr[min]]->record,
1090                             x[buffer_ptr[i]]->record) > 0)
1091           min = i;
1092
1093       if (fwrite (x[buffer_ptr[min]]->record,
1094                   dict_get_case_size (default_dict), 1,
1095                   handle[N_INPUT_BUFFERS]) != 1)
1096         {
1097           sprintf (tmp_extname, "%08x", run_index[i]);
1098           msg (SE, _("%s: Error writing temporary file in "
1099                "merge: %s."), tmp_basename, strerror (errno));
1100           goto lossage;
1101         }
1102
1103       /* Remove one case from the buffer for this input file. */
1104       if (--buffered[min] == 0)
1105         {
1106           /* The input buffer is empty.  Do any cases remain in the
1107              initial run on disk? */
1108           if (run_length[min])
1109             {
1110               /* Yes.  Read them in. */
1111
1112               int j;
1113               int ofs;
1114
1115               /* Reset the buffer pointer.  Note that we can't simply
1116                  set it to (i * records_per_buffer) since the run
1117                  order might have changed. */
1118               ofs = buffer_ptr[min] -= buffer_ptr[min] % records_per_buffer;
1119
1120               buffered[min] = min (records_per_buffer, run_length[min]);
1121               for (j = 0; j < buffered[min]; j++)
1122                 if (fread (x[j + ofs]->record,
1123                            dict_get_case_size (default_dict), 1, handle[min])
1124                     != 1)
1125                   {
1126                     sprintf (tmp_extname, "%08x", run_index[min]);
1127                     if (ferror (handle[min]))
1128                       msg (SE, _("%s: Error reading temporary file in "
1129                                  "merge: %s."),
1130                            tmp_basename, strerror (errno));
1131                     else
1132                       msg (SE, _("%s: Unexpected end of temporary file "
1133                                  "in merge."),
1134                            tmp_basename);
1135                     goto lossage;
1136                   }
1137               run_length[min] -= buffered[min];
1138             }
1139           else
1140             {
1141               /* No.  Delete this run. */
1142
1143               /* Close the file. */
1144               FILE *f = handle[min];
1145               handle[min] = NULL;
1146               sprintf (tmp_extname, "%08x", run_index[min]);
1147               if (fclose (f) == EOF)
1148                 msg (SE, _("%s: Error closing temporary file in merge: "
1149                      "%s."), tmp_basename, strerror (errno));
1150
1151               /* Delete the file. */
1152               if (remove (tmp_basename) != 0)
1153                 msg (SE, _("%s: Error removing temporary file in merge: "
1154                      "%s."), tmp_basename, strerror (errno));
1155
1156               n_runs--;
1157               if (min != n_runs)
1158                 {
1159                   /* Since this isn't the last run, we move the last
1160                      run into its spot to force all the runs to be
1161                      contiguous. */
1162                   run_index[min] = run_index[n_runs];
1163                   run_length[min] = run_length[n_runs];
1164                   buffer_ptr[min] = buffer_ptr[n_runs];
1165                   buffered[min] = buffered[n_runs];
1166                   handle[min] = handle[n_runs];
1167                 }
1168             }
1169         }
1170       else
1171         buffer_ptr[min]++;
1172     }
1173
1174   /* Close output file. */
1175   {
1176     FILE *f = handle[N_INPUT_BUFFERS];
1177     handle[N_INPUT_BUFFERS] = NULL;
1178     if (fclose (f) == EOF)
1179       {
1180         sprintf (tmp_extname, "%08x", run_no);
1181         msg (SE, _("%s: Error closing temporary file in merge: "
1182                    "%s."),
1183              tmp_basename, strerror (errno));
1184         return 0;
1185       }
1186   }
1187
1188   return 1;
1189
1190 lossage:
1191   /* Close all the input and output files. */
1192   {
1193     int i;
1194
1195     for (i = 0; i < n_runs; i++)
1196       if (run_length[i] != 0)
1197         {
1198           close_handle (i);
1199           sprintf (tmp_basename, "%08x", run_index[i]);
1200           if (remove (tmp_basename) != 0)
1201             msg (SE, _("%s: Error removing temporary file: %s."),
1202                  tmp_basename, strerror (errno));
1203         }
1204   }
1205   close_handle (N_INPUT_BUFFERS);
1206   sprintf (tmp_basename, "%08x", run_no);
1207   if (remove (tmp_basename) != 0)
1208     msg (SE, _("%s: Error removing temporary file: %s."),
1209          tmp_basename, strerror (errno));
1210   return 0;
1211 }
1212 \f
1213 /* External sort input program. */
1214
1215 /* Reads all the records from the source stream and passes them
1216    to write_case(). */
1217 static void
1218 sort_stream_read (write_case_func *write_case, write_case_data wc_data)
1219 {
1220   read_sort_output (write_case, wc_data);
1221 }
1222
1223 /* Reads all the records from the output stream and passes them to the
1224    function provided, which must have an interface identical to
1225    write_case(). */
1226 void
1227 read_sort_output (write_case_func *write_case, write_case_data wc_data)
1228 {
1229   int i;
1230   FILE *f;
1231
1232   if (separate_case_tab)
1233     {
1234       struct ccase *save_temp_case = temp_case;
1235       struct case_list **p;
1236
1237       for (p = separate_case_tab; *p; p++)
1238         {
1239           temp_case = &(*p)->c;
1240           write_case (wc_data);
1241         }
1242       
1243       free (separate_case_tab);
1244       separate_case_tab = NULL;
1245             
1246       temp_case = save_temp_case;
1247     } else {
1248       sprintf (tmp_extname, "%08x", run_no - 1);
1249       f = fopen (tmp_basename, "rb");
1250       if (!f)
1251         {
1252           msg (ME, _("%s: Cannot open sort result file: %s."), tmp_basename,
1253                strerror (errno));
1254           err_failure ();
1255           return;
1256         }
1257
1258       for (i = 0; i < vfm_source_info.ncases; i++)
1259         {
1260           if (!fread (temp_case, vfm_source_info.case_size, 1, f))
1261             {
1262               if (ferror (f))
1263                 msg (ME, _("%s: Error reading sort result file: %s."),
1264                      tmp_basename, strerror (errno));
1265               else
1266                 msg (ME, _("%s: Unexpected end of sort result file: %s."),
1267                      tmp_basename, strerror (errno));
1268               err_failure ();
1269               break;
1270             }
1271
1272           if (!write_case (wc_data))
1273             break;
1274         }
1275
1276       if (fclose (f) == EOF)
1277         msg (ME, _("%s: Error closing sort result file: %s."), tmp_basename,
1278              strerror (errno));
1279
1280       if (remove (tmp_basename) != 0)
1281         msg (ME, _("%s: Error removing sort result file: %s."), tmp_basename,
1282              strerror (errno));
1283       else
1284         rmdir_temp_dir ();
1285     }
1286 }
1287
1288 #if 0 /* dead code */
1289 /* Alternate interface to sort_stream_write used for external sorting
1290    when SEPARATE is true. */
1291 static int
1292 write_separate (struct ccase *c)
1293 {
1294   assert (c == temp_case);
1295
1296   sort_stream_write ();
1297   return 1;
1298 }
1299 #endif
1300
1301 /* Performs one iteration of 5.4.1R steps R4, R5, R6, R7, R2, and
1302    R3. */
1303 static void
1304 sort_stream_write (void)
1305 {
1306   struct repl_sel_tree *t;
1307
1308   /* R4. */
1309   memcpy (q->record, temp_case->data, vfm_sink_info.case_size);
1310   if (compare_record (q->record, lastkey) < 0)
1311     if (++rq > rmax)
1312       rmax = rq;
1313
1314   /* R5. */
1315   t = q->fe;
1316
1317   /* R6 and R7. */
1318   for (;;)
1319     {
1320       /* R6. */
1321       if (t->rn < rq
1322           || (t->rn == rq && compare_record (t->loser->record, q->record) < 0))
1323         {
1324           struct repl_sel_tree *temp_tree;
1325           int temp_int;
1326
1327           temp_tree = t->loser;
1328           t->loser = q;
1329           q = temp_tree;
1330
1331           temp_int = t->rn;
1332           t->rn = rq;
1333           rq = temp_int;
1334         }
1335
1336       /* R7. */
1337       if (t == x[1])
1338         break;
1339       t = t->fi;
1340     }
1341
1342   /* R2. */
1343   if (rq != rc)
1344     {
1345       end_run ();
1346       begin_run ();
1347       assert (rq <= rmax);
1348       rc = rq;
1349     }
1350
1351   /* R3. */
1352   if (rq != 0)
1353     {
1354       output_record (q->record);
1355       lastkey = x[x_max]->record;
1356       memcpy (lastkey, q->record, vfm_sink_info.case_size);
1357     }
1358 }
1359
1360 /* Switches mode from sink to source. */
1361 static void
1362 sort_stream_mode (void)
1363 {
1364   /* If this is not done, then we get the following source/sink pairs:
1365      source=memory/disk/DATA LIST/etc., sink=SORT; source=SORT,
1366      sink=SORT; which is not good. */
1367   vfm_sink = NULL;
1368 }
1369
1370 struct case_stream sort_stream =
1371   {
1372     NULL,
1373     sort_stream_read,
1374     sort_stream_write,
1375     sort_stream_mode,
1376     NULL,
1377     NULL,
1378     "SORT",
1379   };