c77d84626887ab735c4c39c6183ccaaef7c7c8c1
[pspp-builds.git] / src / sort.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include <assert.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <errno.h>
25 #include "alloc.h"
26 #include "approx.h"
27 #include "command.h"
28 #include "error.h"
29 #include "expr.h"
30 #include "heap.h"
31 #include "lexer.h"
32 #include "misc.h"
33 #include "sort.h"
34 #include "str.h"
35 #include "var.h"
36 #include "vfm.h"
37 #include "vfmP.h"
38
39 #if HAVE_UNISTD_H
40 #include <unistd.h>
41 #endif
42
43 #if HAVE_SYS_TYPES_H
44 #include <sys/types.h>
45 #endif
46
47 #if HAVE_SYS_STAT_H
48 #include <sys/stat.h>
49 #endif
50
51 #include "debug-print.h"
52
53 /* Variables to sort. */
54 struct variable **v_sort;
55 int nv_sort;
56
57 /* Used when internal-sorting to a separate file. */
58 static struct case_list **separate_case_tab;
59
60 /* Other prototypes. */
61 static int compare_case_lists (const void *, const void *);
62 static int do_internal_sort (int separate);
63 static int do_external_sort (int separate);
64 int parse_sort_variables (void);
65 void read_sort_output (int (*write_case) (void));
66
67 /* Performs the SORT CASES procedures. */
68 int
69 cmd_sort_cases (void)
70 {
71   /* First, just parse the command. */
72   lex_match_id ("SORT");
73   lex_match_id ("CASES");
74   lex_match (T_BY);
75
76   if (!parse_sort_variables ())
77     return CMD_FAILURE;
78       
79   cancel_temporary ();
80
81   /* Then it's time to do the actual work.  There are two cases:
82
83      (internal sort) All the data is in memory.  In this case, we
84      perform an EXECUTE to get the data into the desired form, then
85      sort the cases in place, if it is still all in memory.
86
87      (external sort) The data is not in memory.  It may be coming from
88      a system file or other data file, etc.  In any case, it is now
89      time to perform an external sort.  This is better explained in
90      do_external_sort(). */
91
92   /* Do all this dirty work. */
93   {
94     int success = sort_cases (0);
95     free (v_sort);
96     if (success)
97       return lex_end_of_command ();
98     else
99       return CMD_FAILURE;
100   }
101 }
102
103 /* Parses a list of sort variables into v_sort and nv_sort.  */
104 int
105 parse_sort_variables (void)
106 {
107   v_sort = NULL;
108   nv_sort = 0;
109   do
110     {
111       int prev_nv_sort = nv_sort;
112       int order = SRT_ASCEND;
113
114       if (!parse_variables (&default_dict, &v_sort, &nv_sort,
115                             PV_NO_DUPLICATE | PV_APPEND | PV_NO_SCRATCH))
116         return 0;
117       if (lex_match ('('))
118         {
119           if (lex_match_id ("D") || lex_match_id ("DOWN"))
120             order = SRT_DESCEND;
121           else if (!lex_match_id ("A") && !lex_match_id ("UP"))
122             {
123               free (v_sort);
124               msg (SE, _("`A' or `D' expected inside parentheses."));
125               return 0;
126             }
127           if (!lex_match (')'))
128             {
129               free (v_sort);
130               msg (SE, _("`)' expected."));
131               return 0;
132             }
133         }
134       for (; prev_nv_sort < nv_sort; prev_nv_sort++)
135         v_sort[prev_nv_sort]->p.srt.order = order;
136     }
137   while (token != '.' && token != '/');
138   
139   return 1;
140 }
141
142 /* Sorts the active file based on the key variables specified in
143    global variables v_sort and nv_sort.  The output is either to the
144    active file, if SEPARATE is zero, or to a separate file, if
145    SEPARATE is nonzero.  In the latter case the output cases can be
146    read with a call to read_sort_output().  (In the former case the
147    output cases should be dealt with through the usual vfm interface.)
148
149    The caller is responsible for freeing v_sort[]. */
150 int
151 sort_cases (int separate)
152 {
153   assert (separate_case_tab == NULL);
154
155   /* Not sure this is necessary but it's good to be safe. */
156   if (separate && vfm_source == &sort_stream)
157     procedure (NULL, NULL, NULL);
158   
159   /* SORT CASES cancels PROCESS IF. */
160   expr_free (process_if_expr);
161   process_if_expr = NULL;
162
163   if (do_internal_sort (separate))
164     return 1;
165
166   page_to_disk ();
167   return do_external_sort (separate);
168 }
169
170 /* If a reasonable situation is set up, do an internal sort of the
171    data.  Return success. */
172 static int
173 do_internal_sort (int separate)
174 {
175   if (vfm_source != &vfm_disk_stream)
176     {
177       if (vfm_source != &vfm_memory_stream)
178         procedure (NULL, NULL, NULL);
179       if (vfm_source == &vfm_memory_stream)
180         {
181           struct case_list **case_tab = malloc (sizeof *case_tab
182                                          * (vfm_source_info.ncases + 1));
183           if (vfm_source_info.ncases == 0)
184             {
185               free (case_tab);
186               return 1;
187             }
188           if (case_tab != NULL)
189             {
190               struct case_list *clp = memory_source_cases;
191               struct case_list **ctp = case_tab;
192               int i;
193
194               for (; clp; clp = clp->next)
195                 *ctp++ = clp;
196               qsort (case_tab, vfm_source_info.ncases, sizeof *case_tab,
197                      compare_case_lists);
198
199               if (!separate)
200                 {
201                   memory_source_cases = case_tab[0];
202                   for (i = 1; i < vfm_source_info.ncases; i++)
203                     case_tab[i - 1]->next = case_tab[i];
204                   case_tab[vfm_source_info.ncases - 1]->next = NULL;
205                   free (case_tab);
206                 } else {
207                   case_tab[vfm_source_info.ncases] = NULL;
208                   separate_case_tab = case_tab;
209                 }
210               
211               return 1;
212             }
213         }
214     }
215   return 0;
216 }
217
218 /* Compares the NV_SORT variables in V_SORT[] between the
219    `case_list's at A and B, and returns a strcmp()-type
220    result. */
221 static int
222 compare_case_lists (const void *a_, const void *b_)
223 {
224   struct case_list *const *pa = a_;
225   struct case_list *const *pb = b_;
226   struct case_list *a = *pa;
227   struct case_list *b = *pb;
228   struct variable *v;
229   int result = 0;
230   int i;
231
232   for (i = 0; i < nv_sort; i++)
233     {
234       v = v_sort[i];
235       
236       if (v->type == NUMERIC)
237         {
238           double af = a->c.data[v->fv].f;
239           double bf = b->c.data[v->fv].f;
240
241           result = af < bf ? -1 : af > bf;
242         }
243       else
244         result = memcmp (a->c.data[v->fv].s, b->c.data[v->fv].s, v->width);
245
246       if (result != 0)
247         break;
248     }
249
250   if (v->p.srt.order == SRT_DESCEND)
251     result = -result;
252   return result;
253 }
254 \f
255 /* External sort. */
256
257 /* Maximum number of input + output file handles. */
258 #if defined FOPEN_MAX && (FOPEN_MAX - 5 < 18)
259 #define MAX_FILE_HANDLES        (FOPEN_MAX - 5)
260 #else
261 #define MAX_FILE_HANDLES        18
262 #endif
263
264 #if MAX_FILE_HANDLES < 3
265 #error At least 3 file handles must be available for sorting.
266 #endif
267
268 /* Number of input buffers. */
269 #define N_INPUT_BUFFERS         (MAX_FILE_HANDLES - 1)
270
271 /* Maximum order of merge.  This is the value suggested by Knuth;
272    specifically, he said to use tree selection, which we don't
273    implement, for larger orders of merge. */
274 #define MAX_MERGE_ORDER         7
275
276 /* Minimum total number of records for buffers. */
277 #define MIN_BUFFER_TOTAL_SIZE_RECS      64
278
279 /* Minimum single input or output buffer size, in bytes and records. */
280 #define MIN_BUFFER_SIZE_BYTES   4096
281 #define MIN_BUFFER_SIZE_RECS    16
282
283 /* Structure for replacement selection tree. */
284 struct repl_sel_tree
285   {
286     struct repl_sel_tree *loser;/* Loser associated w/this internal node. */
287     int rn;                     /* Run number of `loser'. */
288     struct repl_sel_tree *fe;   /* Internal node above this external node. */
289     struct repl_sel_tree *fi;   /* Internal node above this internal node. */
290     union value record[1];      /* The case proper. */
291   };
292
293 /* Static variables used for sorting. */
294 static struct repl_sel_tree **x; /* Buffers. */
295 static int x_max;               /* Size of buffers, in records. */
296 static int records_per_buffer;  /* Number of records in each buffer. */
297
298 /* In the merge phase, the first N_INPUT_BUFFERS handle[] elements are
299    input files and the last element is the output file.  Before that,
300    they're all used as output files, although the last one is
301    segregated. */
302 static FILE *handle[MAX_FILE_HANDLES];  /* File handles. */
303
304 /* Now, MAX_FILE_HANDLES is the maximum number of files we will *try*
305    to open.  But if we can't open that many, max_handles will be set
306    to the number we apparently can open. */
307 static int max_handles;         /* Maximum number of handles. */
308
309 /* When we create temporary files, they are all put in the same
310    directory and numbered sequentially from zero.  tmp_basename is the
311    drive/directory, etc., and tmp_extname can be sprintf() with "%08x"
312    to the file number, then tmp_basename used to open the file. */
313 static char *tmp_basename;      /* Temporary file basename. */
314 static char *tmp_extname;       /* Temporary file extension name. */
315
316 /* We use Huffman's method to determine the merge pattern.  This means
317    that we need to know which runs are the shortest at any given time.
318    Priority queues as implemented by heap.c are a natural for this
319    task (probably because I wrote the code specifically for it). */
320 static struct heap *huffman_queue;      /* Huffman priority queue. */
321
322 /* Prototypes for helper functions. */
323 static void sort_stream_write (void);
324 static int write_initial_runs (int separate);
325 static int allocate_cases (void);
326 static int allocate_file_handles (void);
327 static int merge (void);
328 static void rmdir_temp_dir (void);
329
330 /* Performs an external sort of the active file.  A description of the
331    procedure follows.  All page references refer to Knuth's _Art of
332    Computer Programming, Vol. 3: Sorting and Searching_, which is the
333    canonical resource for sorting.
334
335    1. The data is read and S initial runs are formed through the
336    action of algorithm 5.4.1R (replacement selection).
337
338    2. Huffman's method (p. 365-366) is used to determine the optimum
339    merge pattern.
340
341    3. If an OS that supports overlapped reading, writing, and
342    computing is being run, we should use 5.4.6F for forecasting.
343    Otherwise, buffers are filled only when they run out of data.
344    FIXME: Since the author of PSPP uses GNU/Linux, which does not
345    yet implement overlapped r/w/c, 5.4.6F is not used.
346
347    4. We perform P-way merges:
348
349    (a) The desired P is the smallest P such that ceil(ln(S)/ln(P))
350    is minimized.  (FIXME: Since I don't have an algorithm for
351    minimizing this, it's just set to MAX_MERGE_ORDER.)
352
353    (b) P is reduced if the selected value would make input buffers
354    less than 4096 bytes each, or 16 records, whichever is larger.
355
356    (c) P is reduced if we run out of available file handles or space
357    for file handles.
358
359    (d) P is reduced if we don't have space for one or two output
360    buffers, which have the same minimum size as input buffers.  (We
361    need two output buffers if 5.4.6F is in use for forecasting.)  */
362 static int
363 do_external_sort (int separate)
364 {
365   int success = 0;
366
367   assert (MAX_FILE_HANDLES >= 3);
368
369   x = NULL;
370   tmp_basename = NULL;
371
372   huffman_queue = heap_create (512);
373   if (huffman_queue == NULL)
374     return 0;
375
376   if (!allocate_cases ())
377     goto lossage;
378
379   if (!allocate_file_handles ())
380     goto lossage;
381
382   if (!write_initial_runs (separate))
383     goto lossage;
384
385   merge ();
386
387   success = 1;
388
389   /* Despite the name, flow of control comes here regardless of
390      whether or not the sort is successful. */
391 lossage:
392   heap_destroy (huffman_queue);
393
394   if (x)
395     {
396       int i;
397
398       for (i = 0; i <= x_max; i++)
399         free (x[i]);
400       free (x);
401     }
402
403   if (!success)
404     rmdir_temp_dir ();
405
406   return success;
407 }
408
409 #if !HAVE_GETPID
410 #define getpid() (0)
411 #endif
412
413 /* Sets up to open temporary files. */
414 /* PORTME: This creates a directory for temporary files.  Some OSes
415    might not have that concept... */
416 static int
417 allocate_file_handles (void)
418 {
419   const char *dir;              /* Directory prefix. */
420   char *buf;                    /* String buffer. */
421   char *cp;                     /* Pointer into buf. */
422
423   dir = getenv ("SPSSTMPDIR");
424   if (dir == NULL)
425     dir = getenv ("SPSSXTMPDIR");
426   if (dir == NULL)
427     dir = getenv ("TMPDIR");
428 #ifdef P_tmpdir
429   if (dir == NULL)
430     dir = P_tmpdir;
431 #endif
432 #if __unix__
433   if (dir == NULL)
434     dir = "/tmp";
435 #elif __MSDOS__
436   if (dir == NULL)
437     dir = getenv ("TEMP");
438   if (dir == NULL)
439     dir = getenv ("TMP");
440   if (dir == NULL)
441     dir = "\\";
442 #else
443   dir = "";
444 #endif
445
446   buf = xmalloc (strlen (dir) + 1 + 4 + 8 + 4 + 1 + INT_DIGITS + 1);
447   cp = spprintf (buf, "%s%c%04lX%04lXpspp", dir, DIR_SEPARATOR,
448                  ((long) time (0)) & 0xffff, ((long) getpid ()) & 0xffff);
449   if (-1 == mkdir (buf, S_IRWXU))
450     {
451       free (buf);
452       msg (SE, _("%s: Cannot create temporary directory: %s."),
453            buf, strerror (errno));
454       return 0;
455     }
456   *cp++ = DIR_SEPARATOR;
457
458   tmp_basename = buf;
459   tmp_extname = cp;
460
461   max_handles = MAX_FILE_HANDLES;
462
463   return 1;
464 }
465
466 /* Removes the directory created for temporary files, if one exists.
467    Also frees tmp_basename. */
468 static void
469 rmdir_temp_dir (void)
470 {
471   if (NULL == tmp_basename)
472     return;
473
474   tmp_extname[-1] = '\0';
475   if (rmdir (tmp_basename) == -1)
476     msg (SE, _("%s: Error removing directory for temporary files: %s."),
477          tmp_basename, strerror (errno));
478
479   free (tmp_basename);
480 }
481
482 /* Allocates room for lots of cases as a buffer. */
483 static int
484 allocate_cases (void)
485 {
486   /* This is the size of one case. */
487   const int case_size = (sizeof (struct repl_sel_tree)
488                          + sizeof (union value) * (default_dict.nval - 1)
489                          + sizeof (struct repl_sel_tree *));
490
491   x = NULL;
492
493   /* Allocate as many cases as we can, assuming a space of four
494      void pointers for malloc()'s internal bookkeeping. */
495   x_max = MAX_WORKSPACE / (case_size + 4 * sizeof (void *));
496   x = malloc (sizeof (struct repl_sel_tree *) * x_max);
497   if (x != NULL)
498     {
499       int i;
500
501       for (i = 0; i < x_max; i++)
502         {
503           x[i] = malloc (sizeof (struct repl_sel_tree)
504                          + sizeof (union value) * (default_dict.nval - 1));
505           if (x[i] == NULL)
506             break;
507         }
508       x_max = i;
509     }
510   if (x == NULL || x_max < MIN_BUFFER_TOTAL_SIZE_RECS)
511     {
512       if (x != NULL)
513         {
514           int i;
515           
516           for (i = 0; i < x_max; i++)
517             free (x[i]);
518         }
519       free (x);
520       msg (SE, _("Out of memory.  Could not allocate room for minimum of %d "
521                  "cases of %d bytes each.  (PSPP workspace is currently "
522                  "restricted to a maximum of %d KB.)"),
523            MIN_BUFFER_TOTAL_SIZE_RECS, case_size, MAX_WORKSPACE / 1024);
524       x_max = 0;
525       x = NULL;
526       return 0;
527     }
528
529   /* The last element of the array is used to store lastkey. */
530   x_max--;
531
532   debug_printf ((_("allocated %d cases == %d bytes\n"),
533                  x_max, x_max * case_size));
534   return 1;
535 }
536 \f
537 /* Replacement selection. */
538
539 static int rmax, rc, rq;
540 static struct repl_sel_tree *q;
541 static union value *lastkey;
542 static int run_no, file_index;
543 static int deferred_abort;
544 static int run_length;
545
546 static int compare_record (union value *, union value *);
547
548 static inline void
549 output_record (union value *v)
550 {
551   union value *src_case;
552   
553   if (deferred_abort)
554     return;
555
556   if (compaction_necessary)
557     {
558       compact_case (compaction_case, (struct ccase *) v);
559       src_case = (union value *) compaction_case;
560     }
561   else
562     src_case = (union value *) v;
563
564   if ((int) fwrite (src_case, sizeof *src_case, compaction_nval,
565                     handle[file_index])
566       != compaction_nval)
567     {
568       deferred_abort = 1;
569       sprintf (tmp_extname, "%08x", run_no);
570       msg (SE, _("%s: Error writing temporary file: %s."),
571            tmp_basename, strerror (errno));
572       return;
573     }
574
575   run_length++;
576 }
577
578 static int
579 close_handle (int i)
580 {
581   int result = fclose (handle[i]);
582   msg (VM (2), _("SORT: Closing handle %d."), i);
583   
584   handle[i] = NULL;
585   if (EOF == result)
586     {
587       sprintf (tmp_extname, "%08x", i);
588       msg (SE, _("%s: Error closing temporary file: %s."),
589            tmp_basename, strerror (errno));
590       return 0;
591     }
592   return 1;
593 }
594
595 static int
596 close_handles (int beg, int end)
597 {
598   int success = 1;
599   int i;
600
601   for (i = beg; i < end; i++)
602     success &= close_handle (i);
603   return success;
604 }
605
606 static int
607 open_handle_w (int handle_no, int run_no)
608 {
609   sprintf (tmp_extname, "%08x", run_no);
610   msg (VM (1), _("SORT: %s: Opening for writing as run %d."),
611        tmp_basename, run_no);
612
613   /* The `x' modifier causes the GNU C library to insist on creating a
614      new file--if the file already exists, an error is signaled.  The
615      ANSI C standard says that other libraries should ignore anything
616      after the `w+b', so it shouldn't be a problem. */
617   return NULL != (handle[handle_no] = fopen (tmp_basename, "w+bx"));
618 }
619
620 static int
621 open_handle_r (int handle_no, int run_no)
622 {
623   FILE *f;
624
625   sprintf (tmp_extname, "%08x", run_no);
626   msg (VM (1), _("SORT: %s: Opening for writing as run %d."),
627        tmp_basename, run_no);
628   f = handle[handle_no] = fopen (tmp_basename, "rb");
629
630   if (f == NULL)
631     {
632       msg (SE, _("%s: Error opening temporary file for reading: %s."),
633            tmp_basename, strerror (errno));
634       return 0;
635     }
636   
637   return 1;
638 }
639
640 /* Begins a new initial run, specifically its output file. */
641 static void
642 begin_run (void)
643 {
644   /* Decide which handle[] to use.  If run_no is max_handles or
645      greater, then we've run out of handles so it's time to just do
646      one file at a time, which by default is handle 0. */
647   file_index = (run_no < max_handles ? run_no : 0);
648   run_length = 0;
649
650   /* Alright, now create the temporary file. */
651   if (open_handle_w (file_index, run_no) == 0)
652     {
653       /* Failure to create the temporary file.  Check if there are
654          unacceptably few files already open. */
655       if (file_index < 3)
656         {
657           deferred_abort = 1;
658           msg (SE, _("%s: Error creating temporary file: %s."),
659                tmp_basename, strerror (errno));
660           return;
661         }
662
663       /* Close all the open temporary files. */
664       if (!close_handles (0, file_index))
665         return;
666
667       /* Now try again to create the temporary file. */
668       max_handles = file_index;
669       file_index = 0;
670       if (open_handle_w (0, run_no) == 0)
671         {
672           /* It still failed, report it this time. */
673           deferred_abort = 1;
674           msg (SE, _("%s: Error creating temporary file: %s."),
675                tmp_basename, strerror (errno));
676           return;
677         }
678     }
679 }
680
681 /* Ends the current initial run.  Just increments run_no if no initial
682    run has been started yet. */
683 static void
684 end_run (void)
685 {
686   /* Close file handles if necessary. */
687   {
688     int result;
689
690     if (run_no == max_handles - 1)
691       result = close_handles (0, max_handles);
692     else if (run_no >= max_handles)
693       result = close_handle (0);
694     else
695       result = 1;
696     if (!result)
697       deferred_abort = 1;
698   }
699
700   /* Advance to next run. */
701   run_no++;
702   if (run_no)
703     heap_insert (huffman_queue, run_no - 1, run_length);
704 }
705
706 /* Performs 5.4.1R. */
707 static int
708 write_initial_runs (int separate)
709 {
710   run_no = -1;
711   deferred_abort = 0;
712
713   /* Steps R1, R2, R3. */
714   rmax = 0;
715   rc = 0;
716   lastkey = NULL;
717   q = x[0];
718   rq = 0;
719   {
720     int j;
721
722     for (j = 0; j < x_max; j++)
723       {
724         struct repl_sel_tree *J = x[j];
725
726         J->loser = J;
727         J->rn = 0;
728         J->fe = x[(x_max + j) / 2];
729         J->fi = x[j / 2];
730         memset (J->record, 0, default_dict.nval * sizeof (union value));
731       }
732   }
733
734   /* Most of the iterations of steps R4, R5, R6, R7, R2, R3, ... */
735   if (!separate)
736     {
737       if (vfm_sink)
738         vfm_sink->destroy_sink ();
739       vfm_sink = &sort_stream;
740     }
741   procedure (NULL, NULL, NULL);
742
743   /* Final iterations of steps R4, R5, R6, R7, R2, R3, ... */
744   for (;;)
745     {
746       struct repl_sel_tree *t;
747
748       /* R4. */
749       rq = rmax + 1;
750
751       /* R5. */
752       t = q->fe;
753
754       /* R6 and R7. */
755       for (;;)
756         {
757           /* R6. */
758           if (t->rn < rq
759               || (t->rn == rq
760                   && compare_record (t->loser->record, q->record) < 0))
761             {
762               struct repl_sel_tree *temp_tree;
763               int temp_int;
764
765               temp_tree = t->loser;
766               t->loser = q;
767               q = temp_tree;
768
769               temp_int = t->rn;
770               t->rn = rq;
771               rq = temp_int;
772             }
773
774           /* R7. */
775           if (t == x[1])
776             break;
777           t = t->fi;
778         }
779
780       /* R2. */
781       if (rq != rc)
782         {
783           end_run ();
784           if (rq > rmax)
785             break;
786           begin_run ();
787           rc = rq;
788         }
789
790       /* R3. */
791       if (rq != 0)
792         {
793           output_record (q->record);
794           lastkey = x[x_max]->record;
795           memcpy (lastkey, q->record, sizeof (union value) * vfm_sink_info.nval);
796         }
797     }
798   assert (run_no == rmax);
799
800   /* If an unrecoverable error occurred somewhere in the above code,
801      then the `deferred_abort' flag would have been set.  */
802   if (deferred_abort)
803     {
804       int i;
805
806       for (i = 0; i < max_handles; i++)
807         if (handle[i] != NULL)
808           {
809             sprintf (tmp_extname, "%08x", i);
810
811             if (fclose (handle[i]) == EOF)
812               msg (SE, _("%s: Error closing temporary file: %s."),
813                    tmp_basename, strerror (errno));
814
815             if (remove (tmp_basename) != 0)
816               msg (SE, _("%s: Error removing temporary file: %s."),
817                    tmp_basename, strerror (errno));
818
819             handle[i] = NULL;
820           }
821       return 0;
822     }
823
824   return 1;
825 }
826
827 /* Compares the NV_SORT variables in V_SORT[] between the `value's at
828    A and B, and returns a strcmp()-type result. */
829 static int
830 compare_record (union value * a, union value * b)
831 {
832   int i;
833   int result = 0;
834   struct variable *v;
835
836   assert (a != NULL);
837   if (b == NULL)                /* Sort NULLs after everything else. */
838     return -1;
839
840   for (i = 0; i < nv_sort; i++)
841     {
842       v = v_sort[i];
843
844       if (v->type == NUMERIC)
845         {
846           if (approx_ne (a[v->fv].f, b[v->fv].f))
847             {
848               result = (a[v->fv].f > b[v->fv].f) ? 1 : -1;
849               break;
850             }
851         }
852       else
853         {
854           result = memcmp (a[v->fv].s, b[v->fv].s, v->width);
855           if (result != 0)
856             break;
857         }
858     }
859
860   if (v->p.srt.order == SRT_ASCEND)
861     return result;
862   else
863     {
864       assert (v->p.srt.order == SRT_DESCEND);
865       return -result;
866     }
867 }
868 \f
869 /* Merging. */
870
871 static int merge_once (int run_index[], int run_length[], int n_runs);
872
873 /* Modula function as defined by Knuth. */
874 static int
875 mod (int x, int y)
876 {
877   int result;
878
879   if (y == 0)
880     return x;
881   result = abs (x) % abs (y);
882   if (y < 0)
883     result = -result;
884   return result;
885 }
886
887 /* Performs a series of P-way merges of initial runs using Huffman's
888    method. */
889 static int
890 merge (void)
891 {
892   /* Order of merge. */
893   int order;
894
895   /* Idiot check. */
896   assert (MIN_BUFFER_SIZE_RECS * 2 <= MIN_BUFFER_TOTAL_SIZE_RECS - 1);
897
898   /* Close all the input files.  I hope that the boundary conditions
899      are correct on this but I'm not sure. */
900   if (run_no < max_handles)
901     {
902       int i;
903
904       for (i = 0; i < run_no; )
905         if (!close_handle (i++))
906           {
907             for (; i < run_no; i++)
908               close_handle (i);
909             return 0;
910           }
911     }
912
913   /* Determine order of merge. */
914   order = MAX_MERGE_ORDER;
915   if (x_max / order < MIN_BUFFER_SIZE_RECS)
916     order = x_max / MIN_BUFFER_SIZE_RECS;
917   else if (x_max / order * sizeof (union value) * default_dict.nval
918            < MIN_BUFFER_SIZE_BYTES)
919     order = x_max / (MIN_BUFFER_SIZE_BYTES
920                      / (sizeof (union value) * (default_dict.nval - 1)));
921
922   /* Make sure the order of merge is bounded. */
923   if (order < 2)
924     order = 2;
925   if (order > rmax)
926     order = rmax;
927   assert (x_max / order > 0);
928
929   /* Calculate number of records per buffer. */
930   records_per_buffer = x_max / order;
931
932   /* Add (1 - S) mod (P - 1) dummy runs of length 0. */
933   {
934     int n_dummy_runs = mod (1 - rmax, order - 1);
935     debug_printf (("rmax=%d, order=%d, n_dummy_runs=%d\n",
936                    rmax, order, n_dummy_runs));
937     assert (n_dummy_runs >= 0);
938     while (n_dummy_runs--)
939       {
940         heap_insert (huffman_queue, -2, 0);
941         rmax++;
942       }
943   }
944
945   /* Repeatedly merge the P shortest existing runs until only one run
946      is left. */
947   while (rmax > 1)
948     {
949       int run_index[MAX_MERGE_ORDER];
950       int run_length[MAX_MERGE_ORDER];
951       int total_run_length = 0;
952       int i;
953
954       assert (rmax >= order);
955
956       /* Find the shortest runs; put them in runs[] in reverse order
957          of length, to force dummy runs of length 0 to the end of the
958          list. */
959       debug_printf ((_("merging runs")));
960       for (i = order - 1; i >= 0; i--)
961         {
962           run_index[i] = heap_delete (huffman_queue, &run_length[i]);
963           assert (run_index[i] != -1);
964           total_run_length += run_length[i];
965           debug_printf ((" %d(%d)", run_index[i], run_length[i]));
966         }
967       debug_printf ((_(" into run %d(%d)\n"), run_no, total_run_length));
968
969       if (!merge_once (run_index, run_length, order))
970         {
971           int index;
972
973           while (-1 != (index = heap_delete (huffman_queue, NULL)))
974             {
975               sprintf (tmp_extname, "%08x", index);
976               if (remove (tmp_basename) != 0)
977                 msg (SE, _("%s: Error removing temporary file: %s."),
978                      tmp_basename, strerror (errno));
979             }
980
981           return 0;
982         }
983
984       if (!heap_insert (huffman_queue, run_no++, total_run_length))
985         {
986           msg (SE, _("Out of memory expanding Huffman priority queue."));
987           return 0;
988         }
989
990       rmax -= order - 1;
991     }
992
993   /* There should be exactly one element in the priority queue after
994      all that merging.  This represents the entire sorted active file.
995      So we could find a total case count by deleting this element from
996      the queue. */
997   assert (heap_size (huffman_queue) == 1);
998
999   return 1;
1000 }
1001
1002 /* Merges N_RUNS initial runs into a new run.  The jth run for 0 <= j
1003    < N_RUNS is taken from temporary file RUN_INDEX[j]; it is composed
1004    of RUN_LENGTH[j] cases. */
1005 static int
1006 merge_once (int run_index[], int run_length[], int n_runs)
1007 {
1008   /* For each run, the number of records remaining in its buffer. */
1009   int buffered[MAX_MERGE_ORDER];
1010
1011   /* For each run, the index of the next record in the buffer. */
1012   int buffer_ptr[MAX_MERGE_ORDER];
1013
1014   /* Open input files. */
1015   {
1016     int i;
1017
1018     for (i = 0; i < n_runs; i++)
1019       if (run_index[i] != -2 && !open_handle_r (i, run_index[i]))
1020         {
1021           /* Close and remove temporary files. */
1022           while (i--)
1023             {
1024               close_handle (i);
1025               sprintf (tmp_extname, "%08x", i);
1026               if (remove (tmp_basename) != 0)
1027                 msg (SE, _("%s: Error removing temporary file: %s."),
1028                      tmp_basename, strerror (errno));
1029             }
1030
1031           return 0;
1032         }
1033   }
1034
1035   /* Create output file. */
1036   if (!open_handle_w (N_INPUT_BUFFERS, run_no))
1037     {
1038       msg (SE, _("%s: Error creating temporary file for merge: %s."),
1039            tmp_basename, strerror (errno));
1040       goto lossage;
1041     }
1042
1043   /* Prime each buffer. */
1044   {
1045     int i;
1046
1047     for (i = 0; i < n_runs; i++)
1048       if (run_index[i] == -2)
1049         {
1050           n_runs = i;
1051           break;
1052         }
1053       else
1054         {
1055           int j;
1056           int ofs = records_per_buffer * i;
1057
1058           buffered[i] = min (records_per_buffer, run_length[i]);
1059           for (j = 0; j < buffered[i]; j++)
1060             if ((int) fread (x[j + ofs]->record, sizeof (union value),
1061                              default_dict.nval, handle[i])
1062                 != default_dict.nval)
1063               {
1064                 sprintf (tmp_extname, "%08x", run_index[i]);
1065                 if (ferror (handle[i]))
1066                   msg (SE, _("%s: Error reading temporary file in merge: %s."),
1067                        tmp_basename, strerror (errno));
1068                 else
1069                   msg (SE, _("%s: Unexpected end of temporary file in merge."),
1070                        tmp_basename);
1071                 goto lossage;
1072               }
1073           buffer_ptr[i] = ofs;
1074           run_length[i] -= buffered[i];
1075         }
1076   }
1077
1078   /* Perform the merge proper. */
1079   while (n_runs)                /* Loop while some data is left. */
1080     {
1081       int i;
1082       int min = 0;
1083
1084       for (i = 1; i < n_runs; i++)
1085         if (compare_record (x[buffer_ptr[min]]->record,
1086                             x[buffer_ptr[i]]->record) > 0)
1087           min = i;
1088
1089       if ((int) fwrite (x[buffer_ptr[min]]->record, sizeof (union value),
1090                         default_dict.nval, handle[N_INPUT_BUFFERS])
1091           != default_dict.nval)
1092         {
1093           sprintf (tmp_extname, "%08x", run_index[i]);
1094           msg (SE, _("%s: Error writing temporary file in "
1095                "merge: %s."), tmp_basename, strerror (errno));
1096           goto lossage;
1097         }
1098
1099       /* Remove one case from the buffer for this input file. */
1100       if (--buffered[min] == 0)
1101         {
1102           /* The input buffer is empty.  Do any cases remain in the
1103              initial run on disk? */
1104           if (run_length[min])
1105             {
1106               /* Yes.  Read them in. */
1107
1108               int j;
1109               int ofs;
1110
1111               /* Reset the buffer pointer.  Note that we can't simply
1112                  set it to (i * records_per_buffer) since the run
1113                  order might have changed. */
1114               ofs = buffer_ptr[min] -= buffer_ptr[min] % records_per_buffer;
1115
1116               buffered[min] = min (records_per_buffer, run_length[min]);
1117               for (j = 0; j < buffered[min]; j++)
1118                 if ((int) fread (x[j + ofs]->record, sizeof (union value),
1119                                  default_dict.nval, handle[min])
1120                     != default_dict.nval)
1121                   {
1122                     sprintf (tmp_extname, "%08x", run_index[min]);
1123                     if (ferror (handle[min]))
1124                       msg (SE, _("%s: Error reading temporary file in "
1125                                  "merge: %s."),
1126                            tmp_basename, strerror (errno));
1127                     else
1128                       msg (SE, _("%s: Unexpected end of temporary file "
1129                                  "in merge."),
1130                            tmp_basename);
1131                     goto lossage;
1132                   }
1133               run_length[min] -= buffered[min];
1134             }
1135           else
1136             {
1137               /* No.  Delete this run. */
1138
1139               /* Close the file. */
1140               FILE *f = handle[min];
1141               handle[min] = NULL;
1142               sprintf (tmp_extname, "%08x", run_index[min]);
1143               if (fclose (f) == EOF)
1144                 msg (SE, _("%s: Error closing temporary file in merge: "
1145                      "%s."), tmp_basename, strerror (errno));
1146
1147               /* Delete the file. */
1148               if (remove (tmp_basename) != 0)
1149                 msg (SE, _("%s: Error removing temporary file in merge: "
1150                      "%s."), tmp_basename, strerror (errno));
1151
1152               n_runs--;
1153               if (min != n_runs)
1154                 {
1155                   /* Since this isn't the last run, we move the last
1156                      run into its spot to force all the runs to be
1157                      contiguous. */
1158                   run_index[min] = run_index[n_runs];
1159                   run_length[min] = run_length[n_runs];
1160                   buffer_ptr[min] = buffer_ptr[n_runs];
1161                   buffered[min] = buffered[n_runs];
1162                   handle[min] = handle[n_runs];
1163                 }
1164             }
1165         }
1166       else
1167         buffer_ptr[min]++;
1168     }
1169
1170   /* Close output file. */
1171   {
1172     FILE *f = handle[N_INPUT_BUFFERS];
1173     handle[N_INPUT_BUFFERS] = NULL;
1174     if (fclose (f) == EOF)
1175       {
1176         sprintf (tmp_extname, "%08x", run_no);
1177         msg (SE, _("%s: Error closing temporary file in merge: "
1178                    "%s."),
1179              tmp_basename, strerror (errno));
1180         return 0;
1181       }
1182   }
1183
1184   return 1;
1185
1186 lossage:
1187   /* Close all the input and output files. */
1188   {
1189     int i;
1190
1191     for (i = 0; i < n_runs; i++)
1192       if (run_length[i] != 0)
1193         {
1194           close_handle (i);
1195           sprintf (tmp_basename, "%08x", run_index[i]);
1196           if (remove (tmp_basename) != 0)
1197             msg (SE, _("%s: Error removing temporary file: %s."),
1198                  tmp_basename, strerror (errno));
1199         }
1200   }
1201   close_handle (N_INPUT_BUFFERS);
1202   sprintf (tmp_basename, "%08x", run_no);
1203   if (remove (tmp_basename) != 0)
1204     msg (SE, _("%s: Error removing temporary file: %s."),
1205          tmp_basename, strerror (errno));
1206   return 0;
1207 }
1208 \f
1209 /* External sort input program. */
1210
1211 /* Reads all the records from the source stream and passes them
1212    to write_case(). */
1213 void
1214 sort_stream_read (void)
1215 {
1216   read_sort_output (write_case);
1217 }
1218
1219 /* Reads all the records from the output stream and passes them to the
1220    function provided, which must have an interface identical to
1221    write_case(). */
1222 void
1223 read_sort_output (int (*write_case) (void))
1224 {
1225   int i;
1226   FILE *f;
1227
1228   if (separate_case_tab)
1229     {
1230       struct ccase *save_temp_case = temp_case;
1231       struct case_list **p;
1232
1233       for (p = separate_case_tab; *p; p++)
1234         {
1235           temp_case = &(*p)->c;
1236           write_case ();
1237         }
1238       
1239       free (separate_case_tab);
1240       separate_case_tab = NULL;
1241             
1242       temp_case = save_temp_case;
1243     } else {
1244       sprintf (tmp_extname, "%08x", run_no - 1);
1245       f = fopen (tmp_basename, "rb");
1246       if (!f)
1247         {
1248           msg (ME, _("%s: Cannot open sort result file: %s."), tmp_basename,
1249                strerror (errno));
1250           err_failure ();
1251           return;
1252         }
1253
1254       for (i = 0; i < vfm_source_info.ncases; i++)
1255         {
1256           if (!fread (temp_case, vfm_source_info.case_size, 1, f))
1257             {
1258               if (ferror (f))
1259                 msg (ME, _("%s: Error reading sort result file: %s."),
1260                      tmp_basename, strerror (errno));
1261               else
1262                 msg (ME, _("%s: Unexpected end of sort result file: %s."),
1263                      tmp_basename, strerror (errno));
1264               err_failure ();
1265               break;
1266             }
1267
1268           if (!write_case ())
1269             break;
1270         }
1271
1272       if (fclose (f) == EOF)
1273         msg (ME, _("%s: Error closing sort result file: %s."), tmp_basename,
1274              strerror (errno));
1275
1276       if (remove (tmp_basename) != 0)
1277         msg (ME, _("%s: Error removing sort result file: %s."), tmp_basename,
1278              strerror (errno));
1279       else
1280         rmdir_temp_dir ();
1281     }
1282 }
1283
1284 #if 0 /* dead code */
1285 /* Alternate interface to sort_stream_write used for external sorting
1286    when SEPARATE is true. */
1287 static int
1288 write_separate (struct ccase *c)
1289 {
1290   assert (c == temp_case);
1291
1292   sort_stream_write ();
1293   return 1;
1294 }
1295 #endif
1296
1297 /* Performs one iteration of 5.4.1R steps R4, R5, R6, R7, R2, and
1298    R3. */
1299 static void
1300 sort_stream_write (void)
1301 {
1302   struct repl_sel_tree *t;
1303
1304   /* R4. */
1305   memcpy (q->record, temp_case->data, vfm_sink_info.case_size);
1306   if (compare_record (q->record, lastkey) < 0)
1307     if (++rq > rmax)
1308       rmax = rq;
1309
1310   /* R5. */
1311   t = q->fe;
1312
1313   /* R6 and R7. */
1314   for (;;)
1315     {
1316       /* R6. */
1317       if (t->rn < rq
1318           || (t->rn == rq && compare_record (t->loser->record, q->record) < 0))
1319         {
1320           struct repl_sel_tree *temp_tree;
1321           int temp_int;
1322
1323           temp_tree = t->loser;
1324           t->loser = q;
1325           q = temp_tree;
1326
1327           temp_int = t->rn;
1328           t->rn = rq;
1329           rq = temp_int;
1330         }
1331
1332       /* R7. */
1333       if (t == x[1])
1334         break;
1335       t = t->fi;
1336     }
1337
1338   /* R2. */
1339   if (rq != rc)
1340     {
1341       end_run ();
1342       begin_run ();
1343       assert (rq <= rmax);
1344       rc = rq;
1345     }
1346
1347   /* R3. */
1348   if (rq != 0)
1349     {
1350       output_record (q->record);
1351       lastkey = x[x_max]->record;
1352       memcpy (lastkey, q->record, vfm_sink_info.case_size);
1353     }
1354 }
1355
1356 /* Switches mode from sink to source. */
1357 void
1358 sort_stream_mode (void)
1359 {
1360   /* If this is not done, then we get the following source/sink pairs:
1361      source=memory/disk/DATA LIST/etc., sink=SORT; source=SORT,
1362      sink=SORT; which is not good. */
1363   vfm_sink = NULL;
1364 }
1365
1366 struct case_stream sort_stream =
1367   {
1368     NULL,
1369     sort_stream_read,
1370     sort_stream_write,
1371     sort_stream_mode,
1372     NULL,
1373     NULL,
1374     "SORT",
1375   };