Fri Dec 19 16:44:22 2003 Ben Pfaff <blp@gnu.org>
[pspp-builds.git] / src / sort.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include <assert.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <errno.h>
25 #include "alloc.h"
26 #include "approx.h"
27 #include "command.h"
28 #include "error.h"
29 #include "expr.h"
30 #include "heap.h"
31 #include "lexer.h"
32 #include "misc.h"
33 #include "sort.h"
34 #include "str.h"
35 #include "var.h"
36 #include "vfm.h"
37 #include "vfmP.h"
38
39 #if HAVE_UNISTD_H
40 #include <unistd.h>
41 #endif
42
43 #if HAVE_SYS_TYPES_H
44 #include <sys/types.h>
45 #endif
46
47 #if HAVE_SYS_STAT_H
48 #include <sys/stat.h>
49 #endif
50
51 #include "debug-print.h"
52
53 /* Variables to sort. */
54 struct variable **v_sort;
55 int nv_sort;
56
57 /* Used when internal-sorting to a separate file. */
58 static struct case_list **separate_case_tab;
59
60 /* Other prototypes. */
61 static int compare_case_lists (const void *, const void *);
62 static int do_internal_sort (int separate);
63 static int do_external_sort (int separate);
64 int parse_sort_variables (void);
65 void read_sort_output (int (*write_case) (void));
66
67 /* Performs the SORT CASES procedures. */
68 int
69 cmd_sort_cases (void)
70 {
71   /* First, just parse the command. */
72   lex_match_id ("SORT");
73   lex_match_id ("CASES");
74   lex_match (T_BY);
75
76   if (!parse_sort_variables ())
77     return CMD_FAILURE;
78       
79   cancel_temporary ();
80
81   /* Then it's time to do the actual work.  There are two cases:
82
83      (internal sort) All the data is in memory.  In this case, we
84      perform an EXECUTE to get the data into the desired form, then
85      sort the cases in place, if it is still all in memory.
86
87      (external sort) The data is not in memory.  It may be coming from
88      a system file or other data file, etc.  In any case, it is now
89      time to perform an external sort.  This is better explained in
90      do_external_sort(). */
91
92   /* Do all this dirty work. */
93   {
94     int success = sort_cases (0);
95     free (v_sort);
96     if (success)
97       return lex_end_of_command ();
98     else
99       return CMD_FAILURE;
100   }
101 }
102
103 /* Parses a list of sort variables into v_sort and nv_sort.  */
104 int
105 parse_sort_variables (void)
106 {
107   v_sort = NULL;
108   nv_sort = 0;
109   do
110     {
111       int prev_nv_sort = nv_sort;
112       int order = SRT_ASCEND;
113
114       if (!parse_variables (&default_dict, &v_sort, &nv_sort,
115                             PV_NO_DUPLICATE | PV_APPEND | PV_NO_SCRATCH))
116         return 0;
117       if (lex_match ('('))
118         {
119           if (lex_match_id ("D") || lex_match_id ("DOWN"))
120             order = SRT_DESCEND;
121           else if (!lex_match_id ("A") && !lex_match_id ("UP"))
122             {
123               free (v_sort);
124               msg (SE, _("`A' or `D' expected inside parentheses."));
125               return 0;
126             }
127           if (!lex_match (')'))
128             {
129               free (v_sort);
130               msg (SE, _("`)' expected."));
131               return 0;
132             }
133         }
134       for (; prev_nv_sort < nv_sort; prev_nv_sort++)
135         v_sort[prev_nv_sort]->p.srt.order = order;
136     }
137   while (token != '.' && token != '/');
138   
139   return 1;
140 }
141
142 /* Sorts the active file based on the key variables specified in
143    global variables v_sort and nv_sort.  The output is either to the
144    active file, if SEPARATE is zero, or to a separate file, if
145    SEPARATE is nonzero.  In the latter case the output cases can be
146    read with a call to read_sort_output().  (In the former case the
147    output cases should be dealt with through the usual vfm interface.)
148
149    The caller is responsible for freeing v_sort[]. */
150 int
151 sort_cases (int separate)
152 {
153   assert (separate_case_tab == NULL);
154
155   /* Not sure this is necessary but it's good to be safe. */
156   if (separate && vfm_source == &sort_stream)
157     procedure (NULL, NULL, NULL);
158   
159   /* SORT CASES cancels PROCESS IF. */
160   expr_free (process_if_expr);
161   process_if_expr = NULL;
162
163   if (do_internal_sort (separate))
164     return 1;
165
166   page_to_disk ();
167   return do_external_sort (separate);
168 }
169
170 /* If a reasonable situation is set up, do an internal sort of the
171    data.  Return success. */
172 static int
173 do_internal_sort (int separate)
174 {
175   if (vfm_source != &vfm_disk_stream)
176     {
177       if (vfm_source != &vfm_memory_stream)
178         procedure (NULL, NULL, NULL);
179       if (vfm_source == &vfm_memory_stream)
180         {
181           struct case_list **case_tab = malloc (sizeof *case_tab
182                                          * (vfm_source_info.ncases + 1));
183           if (vfm_source_info.ncases == 0)
184             {
185               free (case_tab);
186               return 1;
187             }
188           if (case_tab != NULL)
189             {
190               struct case_list *clp = memory_source_cases;
191               struct case_list **ctp = case_tab;
192               int i;
193
194               for (; clp; clp = clp->next)
195                 *ctp++ = clp;
196               qsort (case_tab, vfm_source_info.ncases, sizeof *case_tab,
197                      compare_case_lists);
198
199               if (!separate)
200                 {
201                   memory_source_cases = case_tab[0];
202                   for (i = 1; i < vfm_source_info.ncases; i++)
203                     case_tab[i - 1]->next = case_tab[i];
204                   case_tab[vfm_source_info.ncases - 1]->next = NULL;
205                   free (case_tab);
206                 } else {
207                   case_tab[vfm_source_info.ncases] = NULL;
208                   separate_case_tab = case_tab;
209                 }
210               
211               return 1;
212             }
213         }
214     }
215   return 0;
216 }
217
218 /* Compares the NV_SORT variables in V_SORT[] between the `case_list's
219    at _A and _B, and returns a strcmp()-type result. */
220 static int
221 compare_case_lists (const void *pa, const void *pb)
222 {
223   struct case_list *a = *(struct case_list **) pa;
224   struct case_list *b = *(struct case_list **) pb;
225   struct variable *v;
226   int result = 0;
227   int i;
228
229   for (i = 0; i < nv_sort; i++)
230     {
231       v = v_sort[i];
232       
233       if (v->type == NUMERIC)
234         {
235           if (approx_ne (a->c.data[v->fv].f, b->c.data[v->fv].f))
236             {
237               result = (a->c.data[v->fv].f > b->c.data[v->fv].f) ? 1 : -1;
238               break;
239             }
240         }
241       else
242         {
243           result = memcmp (a->c.data[v->fv].s, b->c.data[v->fv].s, v->width);
244           if (result != 0)
245             break;
246         }
247     }
248
249   if (v->p.srt.order == SRT_ASCEND)
250     return result;
251   else
252     {
253       assert (v->p.srt.order == SRT_DESCEND);
254       return -result;
255     }
256 }
257 \f
258 /* External sort. */
259
260 /* Maximum number of input + output file handles. */
261 #if defined FOPEN_MAX && (FOPEN_MAX - 5 < 18)
262 #define MAX_FILE_HANDLES        (FOPEN_MAX - 5)
263 #else
264 #define MAX_FILE_HANDLES        18
265 #endif
266
267 #if MAX_FILE_HANDLES < 3
268 #error At least 3 file handles must be available for sorting.
269 #endif
270
271 /* Number of input buffers. */
272 #define N_INPUT_BUFFERS         (MAX_FILE_HANDLES - 1)
273
274 /* Maximum order of merge.  This is the value suggested by Knuth;
275    specifically, he said to use tree selection, which we don't
276    implement, for larger orders of merge. */
277 #define MAX_MERGE_ORDER         7
278
279 /* Minimum total number of records for buffers. */
280 #define MIN_BUFFER_TOTAL_SIZE_RECS      64
281
282 /* Minimum single input or output buffer size, in bytes and records. */
283 #define MIN_BUFFER_SIZE_BYTES   4096
284 #define MIN_BUFFER_SIZE_RECS    16
285
286 /* Structure for replacement selection tree. */
287 struct repl_sel_tree
288   {
289     struct repl_sel_tree *loser;/* Loser associated w/this internal node. */
290     int rn;                     /* Run number of `loser'. */
291     struct repl_sel_tree *fe;   /* Internal node above this external node. */
292     struct repl_sel_tree *fi;   /* Internal node above this internal node. */
293     union value record[1];      /* The case proper. */
294   };
295
296 /* Static variables used for sorting. */
297 static struct repl_sel_tree **x; /* Buffers. */
298 static int x_max;               /* Size of buffers, in records. */
299 static int records_per_buffer;  /* Number of records in each buffer. */
300
301 /* In the merge phase, the first N_INPUT_BUFFERS handle[] elements are
302    input files and the last element is the output file.  Before that,
303    they're all used as output files, although the last one is
304    segregated. */
305 static FILE *handle[MAX_FILE_HANDLES];  /* File handles. */
306
307 /* Now, MAX_FILE_HANDLES is the maximum number of files we will *try*
308    to open.  But if we can't open that many, max_handles will be set
309    to the number we apparently can open. */
310 static int max_handles;         /* Maximum number of handles. */
311
312 /* When we create temporary files, they are all put in the same
313    directory and numbered sequentially from zero.  tmp_basename is the
314    drive/directory, etc., and tmp_extname can be sprintf() with "%08x"
315    to the file number, then tmp_basename used to open the file. */
316 static char *tmp_basename;      /* Temporary file basename. */
317 static char *tmp_extname;       /* Temporary file extension name. */
318
319 /* We use Huffman's method to determine the merge pattern.  This means
320    that we need to know which runs are the shortest at any given time.
321    Priority queues as implemented by heap.c are a natural for this
322    task (probably because I wrote the code specifically for it). */
323 static struct heap *huffman_queue;      /* Huffman priority queue. */
324
325 /* Prototypes for helper functions. */
326 static void sort_stream_write (void);
327 static int write_initial_runs (int separate);
328 static int allocate_cases (void);
329 static int allocate_file_handles (void);
330 static int merge (void);
331 static void rmdir_temp_dir (void);
332
333 /* Performs an external sort of the active file.  A description of the
334    procedure follows.  All page references refer to Knuth's _Art of
335    Computer Programming, Vol. 3: Sorting and Searching_, which is the
336    canonical resource for sorting.
337
338    1. The data is read and S initial runs are formed through the
339    action of algorithm 5.4.1R (replacement selection).
340
341    2. Huffman's method (p. 365-366) is used to determine the optimum
342    merge pattern.
343
344    3. If an OS that supports overlapped reading, writing, and
345    computing is being run, we should use 5.4.6F for forecasting.
346    Otherwise, buffers are filled only when they run out of data.
347    FIXME: Since the author of PSPP uses GNU/Linux, which does not
348    yet implement overlapped r/w/c, 5.4.6F is not used.
349
350    4. We perform P-way merges:
351
352    (a) The desired P is the smallest P such that ceil(ln(S)/ln(P))
353    is minimized.  (FIXME: Since I don't have an algorithm for
354    minimizing this, it's just set to MAX_MERGE_ORDER.)
355
356    (b) P is reduced if the selected value would make input buffers
357    less than 4096 bytes each, or 16 records, whichever is larger.
358
359    (c) P is reduced if we run out of available file handles or space
360    for file handles.
361
362    (d) P is reduced if we don't have space for one or two output
363    buffers, which have the same minimum size as input buffers.  (We
364    need two output buffers if 5.4.6F is in use for forecasting.)  */
365 static int
366 do_external_sort (int separate)
367 {
368   int success = 0;
369
370   assert (MAX_FILE_HANDLES >= 3);
371
372   x = NULL;
373   tmp_basename = NULL;
374
375   huffman_queue = heap_create (512);
376   if (huffman_queue == NULL)
377     return 0;
378
379   if (!allocate_cases ())
380     goto lossage;
381
382   if (!allocate_file_handles ())
383     goto lossage;
384
385   if (!write_initial_runs (separate))
386     goto lossage;
387
388   merge ();
389
390   success = 1;
391
392   /* Despite the name, flow of control comes here regardless of
393      whether or not the sort is successful. */
394 lossage:
395   heap_destroy (huffman_queue);
396
397   if (x)
398     {
399       int i;
400
401       for (i = 0; i <= x_max; i++)
402         free (x[i]);
403       free (x);
404     }
405
406   if (!success)
407     rmdir_temp_dir ();
408
409   return success;
410 }
411
412 #if !HAVE_GETPID
413 #define getpid() (0)
414 #endif
415
416 /* Sets up to open temporary files. */
417 /* PORTME: This creates a directory for temporary files.  Some OSes
418    might not have that concept... */
419 static int
420 allocate_file_handles (void)
421 {
422   const char *dir;              /* Directory prefix. */
423   char *buf;                    /* String buffer. */
424   char *cp;                     /* Pointer into buf. */
425
426   dir = getenv ("SPSSTMPDIR");
427   if (dir == NULL)
428     dir = getenv ("SPSSXTMPDIR");
429   if (dir == NULL)
430     dir = getenv ("TMPDIR");
431 #ifdef P_tmpdir
432   if (dir == NULL)
433     dir = P_tmpdir;
434 #endif
435 #if __unix__
436   if (dir == NULL)
437     dir = "/tmp";
438 #elif __MSDOS__
439   if (dir == NULL)
440     dir = getenv ("TEMP");
441   if (dir == NULL)
442     dir = getenv ("TMP");
443   if (dir == NULL)
444     dir = "\\";
445 #else
446   dir = "";
447 #endif
448
449   buf = xmalloc (strlen (dir) + 1 + 4 + 8 + 4 + 1 + INT_DIGITS + 1);
450   cp = spprintf (buf, "%s%c%04lX%04lXpspp", dir, DIR_SEPARATOR,
451                  ((long) time (0)) & 0xffff, ((long) getpid ()) & 0xffff);
452   if (-1 == mkdir (buf, S_IRWXU))
453     {
454       free (buf);
455       msg (SE, _("%s: Cannot create temporary directory: %s."),
456            buf, strerror (errno));
457       return 0;
458     }
459   *cp++ = DIR_SEPARATOR;
460
461   tmp_basename = buf;
462   tmp_extname = cp;
463
464   max_handles = MAX_FILE_HANDLES;
465
466   return 1;
467 }
468
469 /* Removes the directory created for temporary files, if one exists.
470    Also frees tmp_basename. */
471 static void
472 rmdir_temp_dir (void)
473 {
474   if (NULL == tmp_basename)
475     return;
476
477   tmp_extname[-1] = '\0';
478   if (rmdir (tmp_basename) == -1)
479     msg (SE, _("%s: Error removing directory for temporary files: %s."),
480          tmp_basename, strerror (errno));
481
482   free (tmp_basename);
483 }
484
485 /* Allocates room for lots of cases as a buffer. */
486 static int
487 allocate_cases (void)
488 {
489   /* This is the size of one case. */
490   const int case_size = (sizeof (struct repl_sel_tree)
491                          + sizeof (union value) * (default_dict.nval - 1)
492                          + sizeof (struct repl_sel_tree *));
493
494   x = NULL;
495
496   /* Allocate as many cases as we can, assuming a space of four
497      void pointers for malloc()'s internal bookkeeping. */
498   x_max = MAX_WORKSPACE / (case_size + 4 * sizeof (void *));
499   x = malloc (sizeof (struct repl_sel_tree *) * x_max);
500   if (x != NULL)
501     {
502       int i;
503
504       for (i = 0; i < x_max; i++)
505         {
506           x[i] = malloc (sizeof (struct repl_sel_tree)
507                          + sizeof (union value) * (default_dict.nval - 1));
508           if (x[i] == NULL)
509             break;
510         }
511       x_max = i;
512     }
513   if (x == NULL || x_max < MIN_BUFFER_TOTAL_SIZE_RECS)
514     {
515       if (x != NULL)
516         {
517           int i;
518           
519           for (i = 0; i < x_max; i++)
520             free (x[i]);
521         }
522       free (x);
523       msg (SE, _("Out of memory.  Could not allocate room for minimum of %d "
524                  "cases of %d bytes each.  (PSPP workspace is currently "
525                  "restricted to a maximum of %d KB.)"),
526            MIN_BUFFER_TOTAL_SIZE_RECS, case_size, MAX_WORKSPACE / 1024);
527       x_max = 0;
528       x = NULL;
529       return 0;
530     }
531
532   /* The last element of the array is used to store lastkey. */
533   x_max--;
534
535   debug_printf ((_("allocated %d cases == %d bytes\n"),
536                  x_max, x_max * case_size));
537   return 1;
538 }
539 \f
540 /* Replacement selection. */
541
542 static int rmax, rc, rq;
543 static struct repl_sel_tree *q;
544 static union value *lastkey;
545 static int run_no, file_index;
546 static int deferred_abort;
547 static int run_length;
548
549 static int compare_record (union value *, union value *);
550
551 static inline void
552 output_record (union value *v)
553 {
554   union value *src_case;
555   
556   if (deferred_abort)
557     return;
558
559   if (compaction_necessary)
560     {
561       compact_case (compaction_case, (struct ccase *) v);
562       src_case = (union value *) compaction_case;
563     }
564   else
565     src_case = (union value *) v;
566
567   if ((int) fwrite (src_case, sizeof *src_case, compaction_nval,
568                     handle[file_index])
569       != compaction_nval)
570     {
571       deferred_abort = 1;
572       sprintf (tmp_extname, "%08x", run_no);
573       msg (SE, _("%s: Error writing temporary file: %s."),
574            tmp_basename, strerror (errno));
575       return;
576     }
577
578   run_length++;
579 }
580
581 static int
582 close_handle (int i)
583 {
584   int result = fclose (handle[i]);
585   msg (VM (2), _("SORT: Closing handle %d."), i);
586   
587   handle[i] = NULL;
588   if (EOF == result)
589     {
590       sprintf (tmp_extname, "%08x", i);
591       msg (SE, _("%s: Error closing temporary file: %s."),
592            tmp_basename, strerror (errno));
593       return 0;
594     }
595   return 1;
596 }
597
598 static int
599 close_handles (int beg, int end)
600 {
601   int success = 1;
602   int i;
603
604   for (i = beg; i < end; i++)
605     success &= close_handle (i);
606   return success;
607 }
608
609 static int
610 open_handle_w (int handle_no, int run_no)
611 {
612   sprintf (tmp_extname, "%08x", run_no);
613   msg (VM (1), _("SORT: %s: Opening for writing as run %d."),
614        tmp_basename, run_no);
615
616   /* The `x' modifier causes the GNU C library to insist on creating a
617      new file--if the file already exists, an error is signaled.  The
618      ANSI C standard says that other libraries should ignore anything
619      after the `w+b', so it shouldn't be a problem. */
620   return NULL != (handle[handle_no] = fopen (tmp_basename, "w+bx"));
621 }
622
623 static int
624 open_handle_r (int handle_no, int run_no)
625 {
626   FILE *f;
627
628   sprintf (tmp_extname, "%08x", run_no);
629   msg (VM (1), _("SORT: %s: Opening for writing as run %d."),
630        tmp_basename, run_no);
631   f = handle[handle_no] = fopen (tmp_basename, "rb");
632
633   if (f == NULL)
634     {
635       msg (SE, _("%s: Error opening temporary file for reading: %s."),
636            tmp_basename, strerror (errno));
637       return 0;
638     }
639   
640   return 1;
641 }
642
643 /* Begins a new initial run, specifically its output file. */
644 static void
645 begin_run (void)
646 {
647   /* Decide which handle[] to use.  If run_no is max_handles or
648      greater, then we've run out of handles so it's time to just do
649      one file at a time, which by default is handle 0. */
650   file_index = (run_no < max_handles ? run_no : 0);
651   run_length = 0;
652
653   /* Alright, now create the temporary file. */
654   if (open_handle_w (file_index, run_no) == 0)
655     {
656       /* Failure to create the temporary file.  Check if there are
657          unacceptably few files already open. */
658       if (file_index < 3)
659         {
660           deferred_abort = 1;
661           msg (SE, _("%s: Error creating temporary file: %s."),
662                tmp_basename, strerror (errno));
663           return;
664         }
665
666       /* Close all the open temporary files. */
667       if (!close_handles (0, file_index))
668         return;
669
670       /* Now try again to create the temporary file. */
671       max_handles = file_index;
672       file_index = 0;
673       if (open_handle_w (0, run_no) == 0)
674         {
675           /* It still failed, report it this time. */
676           deferred_abort = 1;
677           msg (SE, _("%s: Error creating temporary file: %s."),
678                tmp_basename, strerror (errno));
679           return;
680         }
681     }
682 }
683
684 /* Ends the current initial run.  Just increments run_no if no initial
685    run has been started yet. */
686 static void
687 end_run (void)
688 {
689   /* Close file handles if necessary. */
690   {
691     int result;
692
693     if (run_no == max_handles - 1)
694       result = close_handles (0, max_handles);
695     else if (run_no >= max_handles)
696       result = close_handle (0);
697     else
698       result = 1;
699     if (!result)
700       deferred_abort = 1;
701   }
702
703   /* Advance to next run. */
704   run_no++;
705   if (run_no)
706     heap_insert (huffman_queue, run_no - 1, run_length);
707 }
708
709 /* Performs 5.4.1R. */
710 static int
711 write_initial_runs (int separate)
712 {
713   run_no = -1;
714   deferred_abort = 0;
715
716   /* Steps R1, R2, R3. */
717   rmax = 0;
718   rc = 0;
719   lastkey = NULL;
720   q = x[0];
721   rq = 0;
722   {
723     int j;
724
725     for (j = 0; j < x_max; j++)
726       {
727         struct repl_sel_tree *J = x[j];
728
729         J->loser = J;
730         J->rn = 0;
731         J->fe = x[(x_max + j) / 2];
732         J->fi = x[j / 2];
733         memset (J->record, 0, default_dict.nval * sizeof (union value));
734       }
735   }
736
737   /* Most of the iterations of steps R4, R5, R6, R7, R2, R3, ... */
738   if (!separate)
739     {
740       if (vfm_sink)
741         vfm_sink->destroy_sink ();
742       vfm_sink = &sort_stream;
743     }
744   procedure (NULL, NULL, NULL);
745
746   /* Final iterations of steps R4, R5, R6, R7, R2, R3, ... */
747   for (;;)
748     {
749       struct repl_sel_tree *t;
750
751       /* R4. */
752       rq = rmax + 1;
753
754       /* R5. */
755       t = q->fe;
756
757       /* R6 and R7. */
758       for (;;)
759         {
760           /* R6. */
761           if (t->rn < rq
762               || (t->rn == rq
763                   && compare_record (t->loser->record, q->record) < 0))
764             {
765               struct repl_sel_tree *temp_tree;
766               int temp_int;
767
768               temp_tree = t->loser;
769               t->loser = q;
770               q = temp_tree;
771
772               temp_int = t->rn;
773               t->rn = rq;
774               rq = temp_int;
775             }
776
777           /* R7. */
778           if (t == x[1])
779             break;
780           t = t->fi;
781         }
782
783       /* R2. */
784       if (rq != rc)
785         {
786           end_run ();
787           if (rq > rmax)
788             break;
789           begin_run ();
790           rc = rq;
791         }
792
793       /* R3. */
794       if (rq != 0)
795         {
796           output_record (q->record);
797           lastkey = x[x_max]->record;
798           memcpy (lastkey, q->record, sizeof (union value) * vfm_sink_info.nval);
799         }
800     }
801   assert (run_no == rmax);
802
803   /* If an unrecoverable error occurred somewhere in the above code,
804      then the `deferred_abort' flag would have been set.  */
805   if (deferred_abort)
806     {
807       int i;
808
809       for (i = 0; i < max_handles; i++)
810         if (handle[i] != NULL)
811           {
812             sprintf (tmp_extname, "%08x", i);
813
814             if (fclose (handle[i]) == EOF)
815               msg (SE, _("%s: Error closing temporary file: %s."),
816                    tmp_basename, strerror (errno));
817
818             if (remove (tmp_basename) != 0)
819               msg (SE, _("%s: Error removing temporary file: %s."),
820                    tmp_basename, strerror (errno));
821
822             handle[i] = NULL;
823           }
824       return 0;
825     }
826
827   return 1;
828 }
829
830 /* Compares the NV_SORT variables in V_SORT[] between the `value's at
831    A and B, and returns a strcmp()-type result. */
832 static int
833 compare_record (union value * a, union value * b)
834 {
835   int i;
836   int result = 0;
837   struct variable *v;
838
839   assert (a != NULL);
840   if (b == NULL)                /* Sort NULLs after everything else. */
841     return -1;
842
843   for (i = 0; i < nv_sort; i++)
844     {
845       v = v_sort[i];
846
847       if (v->type == NUMERIC)
848         {
849           if (approx_ne (a[v->fv].f, b[v->fv].f))
850             {
851               result = (a[v->fv].f > b[v->fv].f) ? 1 : -1;
852               break;
853             }
854         }
855       else
856         {
857           result = memcmp (a[v->fv].s, b[v->fv].s, v->width);
858           if (result != 0)
859             break;
860         }
861     }
862
863   if (v->p.srt.order == SRT_ASCEND)
864     return result;
865   else
866     {
867       assert (v->p.srt.order == SRT_DESCEND);
868       return -result;
869     }
870 }
871 \f
872 /* Merging. */
873
874 static int merge_once (int run_index[], int run_length[], int n_runs);
875
876 /* Modula function as defined by Knuth. */
877 static int
878 mod (int x, int y)
879 {
880   int result;
881
882   if (y == 0)
883     return x;
884   result = abs (x) % abs (y);
885   if (y < 0)
886     result = -result;
887   return result;
888 }
889
890 /* Performs a series of P-way merges of initial runs using Huffman's
891    method. */
892 static int
893 merge (void)
894 {
895   /* Order of merge. */
896   int order;
897
898   /* Idiot check. */
899   assert (MIN_BUFFER_SIZE_RECS * 2 <= MIN_BUFFER_TOTAL_SIZE_RECS - 1);
900
901   /* Close all the input files.  I hope that the boundary conditions
902      are correct on this but I'm not sure. */
903   if (run_no < max_handles)
904     {
905       int i;
906
907       for (i = 0; i < run_no; )
908         if (!close_handle (i++))
909           {
910             for (; i < run_no; i++)
911               close_handle (i);
912             return 0;
913           }
914     }
915
916   /* Determine order of merge. */
917   order = MAX_MERGE_ORDER;
918   if (x_max / order < MIN_BUFFER_SIZE_RECS)
919     order = x_max / MIN_BUFFER_SIZE_RECS;
920   else if (x_max / order * sizeof (union value) * default_dict.nval
921            < MIN_BUFFER_SIZE_BYTES)
922     order = x_max / (MIN_BUFFER_SIZE_BYTES
923                      / (sizeof (union value) * (default_dict.nval - 1)));
924
925   /* Make sure the order of merge is bounded. */
926   if (order < 2)
927     order = 2;
928   if (order > rmax)
929     order = rmax;
930   assert (x_max / order > 0);
931
932   /* Calculate number of records per buffer. */
933   records_per_buffer = x_max / order;
934
935   /* Add (1 - S) mod (P - 1) dummy runs of length 0. */
936   {
937     int n_dummy_runs = mod (1 - rmax, order - 1);
938     debug_printf (("rmax=%d, order=%d, n_dummy_runs=%d\n",
939                    rmax, order, n_dummy_runs));
940     assert (n_dummy_runs >= 0);
941     while (n_dummy_runs--)
942       {
943         heap_insert (huffman_queue, -2, 0);
944         rmax++;
945       }
946   }
947
948   /* Repeatedly merge the P shortest existing runs until only one run
949      is left. */
950   while (rmax > 1)
951     {
952       int run_index[MAX_MERGE_ORDER];
953       int run_length[MAX_MERGE_ORDER];
954       int total_run_length = 0;
955       int i;
956
957       assert (rmax >= order);
958
959       /* Find the shortest runs; put them in runs[] in reverse order
960          of length, to force dummy runs of length 0 to the end of the
961          list. */
962       debug_printf ((_("merging runs")));
963       for (i = order - 1; i >= 0; i--)
964         {
965           run_index[i] = heap_delete (huffman_queue, &run_length[i]);
966           assert (run_index[i] != -1);
967           total_run_length += run_length[i];
968           debug_printf ((" %d(%d)", run_index[i], run_length[i]));
969         }
970       debug_printf ((_(" into run %d(%d)\n"), run_no, total_run_length));
971
972       if (!merge_once (run_index, run_length, order))
973         {
974           int index;
975
976           while (-1 != (index = heap_delete (huffman_queue, NULL)))
977             {
978               sprintf (tmp_extname, "%08x", index);
979               if (remove (tmp_basename) != 0)
980                 msg (SE, _("%s: Error removing temporary file: %s."),
981                      tmp_basename, strerror (errno));
982             }
983
984           return 0;
985         }
986
987       if (!heap_insert (huffman_queue, run_no++, total_run_length))
988         {
989           msg (SE, _("Out of memory expanding Huffman priority queue."));
990           return 0;
991         }
992
993       rmax -= order - 1;
994     }
995
996   /* There should be exactly one element in the priority queue after
997      all that merging.  This represents the entire sorted active file.
998      So we could find a total case count by deleting this element from
999      the queue. */
1000   assert (heap_size (huffman_queue) == 1);
1001
1002   return 1;
1003 }
1004
1005 /* Merges N_RUNS initial runs into a new run.  The jth run for 0 <= j
1006    < N_RUNS is taken from temporary file RUN_INDEX[j]; it is composed
1007    of RUN_LENGTH[j] cases. */
1008 static int
1009 merge_once (int run_index[], int run_length[], int n_runs)
1010 {
1011   /* For each run, the number of records remaining in its buffer. */
1012   int buffered[MAX_MERGE_ORDER];
1013
1014   /* For each run, the index of the next record in the buffer. */
1015   int buffer_ptr[MAX_MERGE_ORDER];
1016
1017   /* Open input files. */
1018   {
1019     int i;
1020
1021     for (i = 0; i < n_runs; i++)
1022       if (run_index[i] != -2 && !open_handle_r (i, run_index[i]))
1023         {
1024           /* Close and remove temporary files. */
1025           while (i--)
1026             {
1027               close_handle (i);
1028               sprintf (tmp_extname, "%08x", i);
1029               if (remove (tmp_basename) != 0)
1030                 msg (SE, _("%s: Error removing temporary file: %s."),
1031                      tmp_basename, strerror (errno));
1032             }
1033
1034           return 0;
1035         }
1036   }
1037
1038   /* Create output file. */
1039   if (!open_handle_w (N_INPUT_BUFFERS, run_no))
1040     {
1041       msg (SE, _("%s: Error creating temporary file for merge: %s."),
1042            tmp_basename, strerror (errno));
1043       goto lossage;
1044     }
1045
1046   /* Prime each buffer. */
1047   {
1048     int i;
1049
1050     for (i = 0; i < n_runs; i++)
1051       if (run_index[i] == -2)
1052         {
1053           n_runs = i;
1054           break;
1055         }
1056       else
1057         {
1058           int j;
1059           int ofs = records_per_buffer * i;
1060
1061           buffered[i] = min (records_per_buffer, run_length[i]);
1062           for (j = 0; j < buffered[i]; j++)
1063             if ((int) fread (x[j + ofs]->record, sizeof (union value),
1064                              default_dict.nval, handle[i])
1065                 != default_dict.nval)
1066               {
1067                 sprintf (tmp_extname, "%08x", run_index[i]);
1068                 if (ferror (handle[i]))
1069                   msg (SE, _("%s: Error reading temporary file in merge: %s."),
1070                        tmp_basename, strerror (errno));
1071                 else
1072                   msg (SE, _("%s: Unexpected end of temporary file in merge."),
1073                        tmp_basename);
1074                 goto lossage;
1075               }
1076           buffer_ptr[i] = ofs;
1077           run_length[i] -= buffered[i];
1078         }
1079   }
1080
1081   /* Perform the merge proper. */
1082   while (n_runs)                /* Loop while some data is left. */
1083     {
1084       int i;
1085       int min = 0;
1086
1087       for (i = 1; i < n_runs; i++)
1088         if (compare_record (x[buffer_ptr[min]]->record,
1089                             x[buffer_ptr[i]]->record) > 0)
1090           min = i;
1091
1092       if ((int) fwrite (x[buffer_ptr[min]]->record, sizeof (union value),
1093                         default_dict.nval, handle[N_INPUT_BUFFERS])
1094           != default_dict.nval)
1095         {
1096           sprintf (tmp_extname, "%08x", run_index[i]);
1097           msg (SE, _("%s: Error writing temporary file in "
1098                "merge: %s."), tmp_basename, strerror (errno));
1099           goto lossage;
1100         }
1101
1102       /* Remove one case from the buffer for this input file. */
1103       if (--buffered[min] == 0)
1104         {
1105           /* The input buffer is empty.  Do any cases remain in the
1106              initial run on disk? */
1107           if (run_length[min])
1108             {
1109               /* Yes.  Read them in. */
1110
1111               int j;
1112               int ofs;
1113
1114               /* Reset the buffer pointer.  Note that we can't simply
1115                  set it to (i * records_per_buffer) since the run
1116                  order might have changed. */
1117               ofs = buffer_ptr[min] -= buffer_ptr[min] % records_per_buffer;
1118
1119               buffered[min] = min (records_per_buffer, run_length[min]);
1120               for (j = 0; j < buffered[min]; j++)
1121                 if ((int) fread (x[j + ofs]->record, sizeof (union value),
1122                                  default_dict.nval, handle[min])
1123                     != default_dict.nval)
1124                   {
1125                     sprintf (tmp_extname, "%08x", run_index[min]);
1126                     if (ferror (handle[min]))
1127                       msg (SE, _("%s: Error reading temporary file in "
1128                                  "merge: %s."),
1129                            tmp_basename, strerror (errno));
1130                     else
1131                       msg (SE, _("%s: Unexpected end of temporary file "
1132                                  "in merge."),
1133                            tmp_basename);
1134                     goto lossage;
1135                   }
1136               run_length[min] -= buffered[min];
1137             }
1138           else
1139             {
1140               /* No.  Delete this run. */
1141
1142               /* Close the file. */
1143               FILE *f = handle[min];
1144               handle[min] = NULL;
1145               sprintf (tmp_extname, "%08x", run_index[min]);
1146               if (fclose (f) == EOF)
1147                 msg (SE, _("%s: Error closing temporary file in merge: "
1148                      "%s."), tmp_basename, strerror (errno));
1149
1150               /* Delete the file. */
1151               if (remove (tmp_basename) != 0)
1152                 msg (SE, _("%s: Error removing temporary file in merge: "
1153                      "%s."), tmp_basename, strerror (errno));
1154
1155               n_runs--;
1156               if (min != n_runs)
1157                 {
1158                   /* Since this isn't the last run, we move the last
1159                      run into its spot to force all the runs to be
1160                      contiguous. */
1161                   run_index[min] = run_index[n_runs];
1162                   run_length[min] = run_length[n_runs];
1163                   buffer_ptr[min] = buffer_ptr[n_runs];
1164                   buffered[min] = buffered[n_runs];
1165                   handle[min] = handle[n_runs];
1166                 }
1167             }
1168         }
1169       else
1170         buffer_ptr[min]++;
1171     }
1172
1173   /* Close output file. */
1174   {
1175     FILE *f = handle[N_INPUT_BUFFERS];
1176     handle[N_INPUT_BUFFERS] = NULL;
1177     if (fclose (f) == EOF)
1178       {
1179         sprintf (tmp_extname, "%08x", run_no);
1180         msg (SE, _("%s: Error closing temporary file in merge: "
1181                    "%s."),
1182              tmp_basename, strerror (errno));
1183         return 0;
1184       }
1185   }
1186
1187   return 1;
1188
1189 lossage:
1190   /* Close all the input and output files. */
1191   {
1192     int i;
1193
1194     for (i = 0; i < n_runs; i++)
1195       if (run_length[i] != 0)
1196         {
1197           close_handle (i);
1198           sprintf (tmp_basename, "%08x", run_index[i]);
1199           if (remove (tmp_basename) != 0)
1200             msg (SE, _("%s: Error removing temporary file: %s."),
1201                  tmp_basename, strerror (errno));
1202         }
1203   }
1204   close_handle (N_INPUT_BUFFERS);
1205   sprintf (tmp_basename, "%08x", run_no);
1206   if (remove (tmp_basename) != 0)
1207     msg (SE, _("%s: Error removing temporary file: %s."),
1208          tmp_basename, strerror (errno));
1209   return 0;
1210 }
1211 \f
1212 /* External sort input program. */
1213
1214 /* Reads all the records from the source stream and passes them
1215    to write_case(). */
1216 void
1217 sort_stream_read (void)
1218 {
1219   read_sort_output (write_case);
1220 }
1221
1222 /* Reads all the records from the output stream and passes them to the
1223    function provided, which must have an interface identical to
1224    write_case(). */
1225 void
1226 read_sort_output (int (*write_case) (void))
1227 {
1228   int i;
1229   FILE *f;
1230
1231   if (separate_case_tab)
1232     {
1233       struct ccase *save_temp_case = temp_case;
1234       struct case_list **p;
1235
1236       for (p = separate_case_tab; *p; p++)
1237         {
1238           temp_case = &(*p)->c;
1239           write_case ();
1240         }
1241       
1242       free (separate_case_tab);
1243       separate_case_tab = NULL;
1244             
1245       temp_case = save_temp_case;
1246     } else {
1247       sprintf (tmp_extname, "%08x", run_no - 1);
1248       f = fopen (tmp_basename, "rb");
1249       if (!f)
1250         {
1251           msg (ME, _("%s: Cannot open sort result file: %s."), tmp_basename,
1252                strerror (errno));
1253           err_failure ();
1254           return;
1255         }
1256
1257       for (i = 0; i < vfm_source_info.ncases; i++)
1258         {
1259           if (!fread (temp_case, vfm_source_info.case_size, 1, f))
1260             {
1261               if (ferror (f))
1262                 msg (ME, _("%s: Error reading sort result file: %s."),
1263                      tmp_basename, strerror (errno));
1264               else
1265                 msg (ME, _("%s: Unexpected end of sort result file: %s."),
1266                      tmp_basename, strerror (errno));
1267               err_failure ();
1268               break;
1269             }
1270
1271           if (!write_case ())
1272             break;
1273         }
1274
1275       if (fclose (f) == EOF)
1276         msg (ME, _("%s: Error closing sort result file: %s."), tmp_basename,
1277              strerror (errno));
1278
1279       if (remove (tmp_basename) != 0)
1280         msg (ME, _("%s: Error removing sort result file: %s."), tmp_basename,
1281              strerror (errno));
1282       else
1283         rmdir_temp_dir ();
1284     }
1285 }
1286
1287 #if 0 /* dead code */
1288 /* Alternate interface to sort_stream_write used for external sorting
1289    when SEPARATE is true. */
1290 static int
1291 write_separate (struct ccase *c)
1292 {
1293   assert (c == temp_case);
1294
1295   sort_stream_write ();
1296   return 1;
1297 }
1298 #endif
1299
1300 /* Performs one iteration of 5.4.1R steps R4, R5, R6, R7, R2, and
1301    R3. */
1302 static void
1303 sort_stream_write (void)
1304 {
1305   struct repl_sel_tree *t;
1306
1307   /* R4. */
1308   memcpy (q->record, temp_case->data, vfm_sink_info.case_size);
1309   if (compare_record (q->record, lastkey) < 0)
1310     if (++rq > rmax)
1311       rmax = rq;
1312
1313   /* R5. */
1314   t = q->fe;
1315
1316   /* R6 and R7. */
1317   for (;;)
1318     {
1319       /* R6. */
1320       if (t->rn < rq
1321           || (t->rn == rq && compare_record (t->loser->record, q->record) < 0))
1322         {
1323           struct repl_sel_tree *temp_tree;
1324           int temp_int;
1325
1326           temp_tree = t->loser;
1327           t->loser = q;
1328           q = temp_tree;
1329
1330           temp_int = t->rn;
1331           t->rn = rq;
1332           rq = temp_int;
1333         }
1334
1335       /* R7. */
1336       if (t == x[1])
1337         break;
1338       t = t->fi;
1339     }
1340
1341   /* R2. */
1342   if (rq != rc)
1343     {
1344       end_run ();
1345       begin_run ();
1346       assert (rq <= rmax);
1347       rc = rq;
1348     }
1349
1350   /* R3. */
1351   if (rq != 0)
1352     {
1353       output_record (q->record);
1354       lastkey = x[x_max]->record;
1355       memcpy (lastkey, q->record, vfm_sink_info.case_size);
1356     }
1357 }
1358
1359 /* Switches mode from sink to source. */
1360 void
1361 sort_stream_mode (void)
1362 {
1363   /* If this is not done, then we get the following source/sink pairs:
1364      source=memory/disk/DATA LIST/etc., sink=SORT; source=SORT,
1365      sink=SORT; which is not good. */
1366   vfm_sink = NULL;
1367 }
1368
1369 struct case_stream sort_stream =
1370   {
1371     NULL,
1372     sort_stream_read,
1373     sort_stream_write,
1374     sort_stream_mode,
1375     NULL,
1376     NULL,
1377     "SORT",
1378   };