Clean up struct dictionary's value_cnt usage.
[pspp-builds.git] / src / sort.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include "sort.h"
22 #include <assert.h>
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <errno.h>
26 #include "alloc.h"
27 #include "approx.h"
28 #include "command.h"
29 #include "error.h"
30 #include "expr.h"
31 #include "heap.h"
32 #include "lexer.h"
33 #include "misc.h"
34 #include "str.h"
35 #include "var.h"
36 #include "vfm.h"
37 #include "vfmP.h"
38
39 #if HAVE_UNISTD_H
40 #include <unistd.h>
41 #endif
42
43 #if HAVE_SYS_TYPES_H
44 #include <sys/types.h>
45 #endif
46
47 #if HAVE_SYS_STAT_H
48 #include <sys/stat.h>
49 #endif
50
51 #include "debug-print.h"
52
53 /* Variables to sort. */
54 struct variable **v_sort;
55 int nv_sort;
56
57 /* Used when internal-sorting to a separate file. */
58 static struct case_list **separate_case_tab;
59
60 /* Other prototypes. */
61 static int compare_case_lists (const void *, const void *);
62 static int do_internal_sort (int separate);
63 static int do_external_sort (int separate);
64 int parse_sort_variables (void);
65 void read_sort_output (write_case_func *write_case, write_case_data wc_data);
66
67 /* Performs the SORT CASES procedures. */
68 int
69 cmd_sort_cases (void)
70 {
71   /* First, just parse the command. */
72   lex_match_id ("SORT");
73   lex_match_id ("CASES");
74   lex_match (T_BY);
75
76   if (!parse_sort_variables ())
77     return CMD_FAILURE;
78       
79   cancel_temporary ();
80
81   /* Then it's time to do the actual work.  There are two cases:
82
83      (internal sort) All the data is in memory.  In this case, we
84      perform an EXECUTE to get the data into the desired form, then
85      sort the cases in place, if it is still all in memory.
86
87      (external sort) The data is not in memory.  It may be coming from
88      a system file or other data file, etc.  In any case, it is now
89      time to perform an external sort.  This is better explained in
90      do_external_sort(). */
91
92   /* Do all this dirty work. */
93   {
94     int success = sort_cases (0);
95     free (v_sort);
96     if (success)
97       return lex_end_of_command ();
98     else
99       return CMD_FAILURE;
100   }
101 }
102
103 /* Parses a list of sort variables into v_sort and nv_sort.  */
104 int
105 parse_sort_variables (void)
106 {
107   v_sort = NULL;
108   nv_sort = 0;
109   do
110     {
111       int prev_nv_sort = nv_sort;
112       int order = SRT_ASCEND;
113
114       if (!parse_variables (default_dict, &v_sort, &nv_sort,
115                             PV_NO_DUPLICATE | PV_APPEND | PV_NO_SCRATCH))
116         return 0;
117       if (lex_match ('('))
118         {
119           if (lex_match_id ("D") || lex_match_id ("DOWN"))
120             order = SRT_DESCEND;
121           else if (!lex_match_id ("A") && !lex_match_id ("UP"))
122             {
123               free (v_sort);
124               msg (SE, _("`A' or `D' expected inside parentheses."));
125               return 0;
126             }
127           if (!lex_match (')'))
128             {
129               free (v_sort);
130               msg (SE, _("`)' expected."));
131               return 0;
132             }
133         }
134       for (; prev_nv_sort < nv_sort; prev_nv_sort++)
135         v_sort[prev_nv_sort]->p.srt.order = order;
136     }
137   while (token != '.' && token != '/');
138   
139   return 1;
140 }
141
142 /* Sorts the active file based on the key variables specified in
143    global variables v_sort and nv_sort.  The output is either to the
144    active file, if SEPARATE is zero, or to a separate file, if
145    SEPARATE is nonzero.  In the latter case the output cases can be
146    read with a call to read_sort_output().  (In the former case the
147    output cases should be dealt with through the usual vfm interface.)
148
149    The caller is responsible for freeing v_sort[]. */
150 int
151 sort_cases (int separate)
152 {
153   assert (separate_case_tab == NULL);
154
155   /* Not sure this is necessary but it's good to be safe. */
156   if (separate && vfm_source == &sort_stream)
157     procedure (NULL, NULL, NULL, NULL);
158   
159   /* SORT CASES cancels PROCESS IF. */
160   expr_free (process_if_expr);
161   process_if_expr = NULL;
162
163   if (do_internal_sort (separate))
164     return 1;
165
166   page_to_disk ();
167   return do_external_sort (separate);
168 }
169
170 /* If a reasonable situation is set up, do an internal sort of the
171    data.  Return success. */
172 static int
173 do_internal_sort (int separate)
174 {
175   if (vfm_source != &vfm_disk_stream)
176     {
177       if (vfm_source != &vfm_memory_stream)
178         procedure (NULL, NULL, NULL, NULL);
179       if (vfm_source == &vfm_memory_stream)
180         {
181           struct case_list **case_tab = malloc (sizeof *case_tab
182                                          * (vfm_source_info.ncases + 1));
183           if (vfm_source_info.ncases == 0)
184             {
185               free (case_tab);
186               return 1;
187             }
188           if (case_tab != NULL)
189             {
190               struct case_list *clp = memory_source_cases;
191               struct case_list **ctp = case_tab;
192               int i;
193
194               for (; clp; clp = clp->next)
195                 *ctp++ = clp;
196               qsort (case_tab, vfm_source_info.ncases, sizeof *case_tab,
197                      compare_case_lists);
198
199               if (!separate)
200                 {
201                   memory_source_cases = case_tab[0];
202                   for (i = 1; i < vfm_source_info.ncases; i++)
203                     case_tab[i - 1]->next = case_tab[i];
204                   case_tab[vfm_source_info.ncases - 1]->next = NULL;
205                   free (case_tab);
206                 } else {
207                   case_tab[vfm_source_info.ncases] = NULL;
208                   separate_case_tab = case_tab;
209                 }
210               
211               return 1;
212             }
213         }
214     }
215   return 0;
216 }
217
218 /* Compares the NV_SORT variables in V_SORT[] between the
219    `case_list's at A and B, and returns a strcmp()-type
220    result. */
221 static int
222 compare_case_lists (const void *a_, const void *b_)
223 {
224   struct case_list *const *pa = a_;
225   struct case_list *const *pb = b_;
226   struct case_list *a = *pa;
227   struct case_list *b = *pb;
228   struct variable *v;
229   int result = 0;
230   int i;
231
232   for (i = 0; i < nv_sort; i++)
233     {
234       v = v_sort[i];
235       
236       if (v->type == NUMERIC)
237         {
238           double af = a->c.data[v->fv].f;
239           double bf = b->c.data[v->fv].f;
240
241           result = af < bf ? -1 : af > bf;
242         }
243       else
244         result = memcmp (a->c.data[v->fv].s, b->c.data[v->fv].s, v->width);
245
246       if (result != 0)
247         break;
248     }
249
250   if (v->p.srt.order == SRT_DESCEND)
251     result = -result;
252   return result;
253 }
254 \f
255 /* External sort. */
256
257 /* Maximum number of input + output file handles. */
258 #if defined FOPEN_MAX && (FOPEN_MAX - 5 < 18)
259 #define MAX_FILE_HANDLES        (FOPEN_MAX - 5)
260 #else
261 #define MAX_FILE_HANDLES        18
262 #endif
263
264 #if MAX_FILE_HANDLES < 3
265 #error At least 3 file handles must be available for sorting.
266 #endif
267
268 /* Number of input buffers. */
269 #define N_INPUT_BUFFERS         (MAX_FILE_HANDLES - 1)
270
271 /* Maximum order of merge.  This is the value suggested by Knuth;
272    specifically, he said to use tree selection, which we don't
273    implement, for larger orders of merge. */
274 #define MAX_MERGE_ORDER         7
275
276 /* Minimum total number of records for buffers. */
277 #define MIN_BUFFER_TOTAL_SIZE_RECS      64
278
279 /* Minimum single input or output buffer size, in bytes and records. */
280 #define MIN_BUFFER_SIZE_BYTES   4096
281 #define MIN_BUFFER_SIZE_RECS    16
282
283 /* Structure for replacement selection tree. */
284 struct repl_sel_tree
285   {
286     struct repl_sel_tree *loser;/* Loser associated w/this internal node. */
287     int rn;                     /* Run number of `loser'. */
288     struct repl_sel_tree *fe;   /* Internal node above this external node. */
289     struct repl_sel_tree *fi;   /* Internal node above this internal node. */
290     union value record[1];      /* The case proper. */
291   };
292
293 /* Static variables used for sorting. */
294 static struct repl_sel_tree **x; /* Buffers. */
295 static int x_max;               /* Size of buffers, in records. */
296 static int records_per_buffer;  /* Number of records in each buffer. */
297
298 /* In the merge phase, the first N_INPUT_BUFFERS handle[] elements are
299    input files and the last element is the output file.  Before that,
300    they're all used as output files, although the last one is
301    segregated. */
302 static FILE *handle[MAX_FILE_HANDLES];  /* File handles. */
303
304 /* Now, MAX_FILE_HANDLES is the maximum number of files we will *try*
305    to open.  But if we can't open that many, max_handles will be set
306    to the number we apparently can open. */
307 static int max_handles;         /* Maximum number of handles. */
308
309 /* When we create temporary files, they are all put in the same
310    directory and numbered sequentially from zero.  tmp_basename is the
311    drive/directory, etc., and tmp_extname can be sprintf() with "%08x"
312    to the file number, then tmp_basename used to open the file. */
313 static char *tmp_basename;      /* Temporary file basename. */
314 static char *tmp_extname;       /* Temporary file extension name. */
315
316 /* We use Huffman's method to determine the merge pattern.  This means
317    that we need to know which runs are the shortest at any given time.
318    Priority queues as implemented by heap.c are a natural for this
319    task (probably because I wrote the code specifically for it). */
320 static struct heap *huffman_queue;      /* Huffman priority queue. */
321
322 /* Prototypes for helper functions. */
323 static void sort_stream_write (void);
324 static int write_initial_runs (int separate);
325 static int allocate_cases (void);
326 static int allocate_file_handles (void);
327 static int merge (void);
328 static void rmdir_temp_dir (void);
329
330 /* Performs an external sort of the active file.  A description of the
331    procedure follows.  All page references refer to Knuth's _Art of
332    Computer Programming, Vol. 3: Sorting and Searching_, which is the
333    canonical resource for sorting.
334
335    1. The data is read and S initial runs are formed through the
336    action of algorithm 5.4.1R (replacement selection).
337
338    2. Huffman's method (p. 365-366) is used to determine the optimum
339    merge pattern.
340
341    3. If an OS that supports overlapped reading, writing, and
342    computing is being run, we should use 5.4.6F for forecasting.
343    Otherwise, buffers are filled only when they run out of data.
344    FIXME: Since the author of PSPP uses GNU/Linux, which does not
345    yet implement overlapped r/w/c, 5.4.6F is not used.
346
347    4. We perform P-way merges:
348
349    (a) The desired P is the smallest P such that ceil(ln(S)/ln(P))
350    is minimized.  (FIXME: Since I don't have an algorithm for
351    minimizing this, it's just set to MAX_MERGE_ORDER.)
352
353    (b) P is reduced if the selected value would make input buffers
354    less than 4096 bytes each, or 16 records, whichever is larger.
355
356    (c) P is reduced if we run out of available file handles or space
357    for file handles.
358
359    (d) P is reduced if we don't have space for one or two output
360    buffers, which have the same minimum size as input buffers.  (We
361    need two output buffers if 5.4.6F is in use for forecasting.)  */
362 static int
363 do_external_sort (int separate)
364 {
365   int success = 0;
366
367   assert (MAX_FILE_HANDLES >= 3);
368
369   x = NULL;
370   tmp_basename = NULL;
371
372   huffman_queue = heap_create (512);
373   if (huffman_queue == NULL)
374     return 0;
375
376   if (!allocate_cases ())
377     goto lossage;
378
379   if (!allocate_file_handles ())
380     goto lossage;
381
382   if (!write_initial_runs (separate))
383     goto lossage;
384
385   merge ();
386
387   success = 1;
388
389   /* Despite the name, flow of control comes here regardless of
390      whether or not the sort is successful. */
391 lossage:
392   heap_destroy (huffman_queue);
393
394   if (x)
395     {
396       int i;
397
398       for (i = 0; i <= x_max; i++)
399         free (x[i]);
400       free (x);
401     }
402
403   if (!success)
404     rmdir_temp_dir ();
405
406   return success;
407 }
408
409 #if !HAVE_GETPID
410 #define getpid() (0)
411 #endif
412
413 /* Sets up to open temporary files. */
414 /* PORTME: This creates a directory for temporary files.  Some OSes
415    might not have that concept... */
416 static int
417 allocate_file_handles (void)
418 {
419   const char *dir;              /* Directory prefix. */
420   char *buf;                    /* String buffer. */
421   char *cp;                     /* Pointer into buf. */
422
423   dir = getenv ("SPSSTMPDIR");
424   if (dir == NULL)
425     dir = getenv ("SPSSXTMPDIR");
426   if (dir == NULL)
427     dir = getenv ("TMPDIR");
428 #ifdef P_tmpdir
429   if (dir == NULL)
430     dir = P_tmpdir;
431 #endif
432 #ifdef unix
433   if (dir == NULL)
434     dir = "/tmp";
435 #elif defined (__MSDOS__)
436   if (dir == NULL)
437     dir = getenv ("TEMP");
438   if (dir == NULL)
439     dir = getenv ("TMP");
440   if (dir == NULL)
441     dir = "\\";
442 #else
443   dir = "";
444 #endif
445
446   buf = xmalloc (strlen (dir) + 1 + 4 + 8 + 4 + 1 + INT_DIGITS + 1);
447   cp = spprintf (buf, "%s%c%04lX%04lXpspp", dir, DIR_SEPARATOR,
448                  ((long) time (0)) & 0xffff, ((long) getpid ()) & 0xffff);
449 #ifndef __MSDOS__
450   if (-1 == mkdir (buf, S_IRWXU))
451 #else
452   if (-1 == mkdir (buf))
453 #endif
454     {
455       free (buf);
456       msg (SE, _("%s: Cannot create temporary directory: %s."),
457            buf, strerror (errno));
458       return 0;
459     }
460   *cp++ = DIR_SEPARATOR;
461
462   tmp_basename = buf;
463   tmp_extname = cp;
464
465   max_handles = MAX_FILE_HANDLES;
466
467   return 1;
468 }
469
470 /* Removes the directory created for temporary files, if one exists.
471    Also frees tmp_basename. */
472 static void
473 rmdir_temp_dir (void)
474 {
475   if (NULL == tmp_basename)
476     return;
477
478   tmp_extname[-1] = '\0';
479   if (rmdir (tmp_basename) == -1)
480     msg (SE, _("%s: Error removing directory for temporary files: %s."),
481          tmp_basename, strerror (errno));
482
483   free (tmp_basename);
484 }
485
486 /* Allocates room for lots of cases as a buffer. */
487 static int
488 allocate_cases (void)
489 {
490   /* This is the size of one case. */
491   const int case_size = (sizeof (struct repl_sel_tree)
492                          + dict_get_case_size (default_dict)
493                          - sizeof (union value)
494                          + sizeof (struct repl_sel_tree *));
495
496   x = NULL;
497
498   /* Allocate as many cases as we can, assuming a space of four
499      void pointers for malloc()'s internal bookkeeping. */
500   x_max = MAX_WORKSPACE / (case_size + 4 * sizeof (void *));
501   x = malloc (sizeof (struct repl_sel_tree *) * x_max);
502   if (x != NULL)
503     {
504       int i;
505
506       for (i = 0; i < x_max; i++)
507         {
508           x[i] = malloc (sizeof (struct repl_sel_tree)
509                          + dict_get_case_size (default_dict)
510                          - sizeof (union value));
511           if (x[i] == NULL)
512             break;
513         }
514       x_max = i;
515     }
516   if (x == NULL || x_max < MIN_BUFFER_TOTAL_SIZE_RECS)
517     {
518       if (x != NULL)
519         {
520           int i;
521           
522           for (i = 0; i < x_max; i++)
523             free (x[i]);
524         }
525       free (x);
526       msg (SE, _("Out of memory.  Could not allocate room for minimum of %d "
527                  "cases of %d bytes each.  (PSPP workspace is currently "
528                  "restricted to a maximum of %d KB.)"),
529            MIN_BUFFER_TOTAL_SIZE_RECS, case_size, MAX_WORKSPACE / 1024);
530       x_max = 0;
531       x = NULL;
532       return 0;
533     }
534
535   /* The last element of the array is used to store lastkey. */
536   x_max--;
537
538   debug_printf ((_("allocated %d cases == %d bytes\n"),
539                  x_max, x_max * case_size));
540   return 1;
541 }
542 \f
543 /* Replacement selection. */
544
545 static int rmax, rc, rq;
546 static struct repl_sel_tree *q;
547 static union value *lastkey;
548 static int run_no, file_index;
549 static int deferred_abort;
550 static int run_length;
551
552 static int compare_record (union value *, union value *);
553
554 static inline void
555 output_record (union value *v)
556 {
557   union value *src_case;
558   
559   if (deferred_abort)
560     return;
561
562   if (compaction_necessary)
563     {
564       compact_case (compaction_case, (struct ccase *) v);
565       src_case = (union value *) compaction_case;
566     }
567   else
568     src_case = (union value *) v;
569
570   if ((int) fwrite (src_case, sizeof *src_case, compaction_nval,
571                     handle[file_index])
572       != compaction_nval)
573     {
574       deferred_abort = 1;
575       sprintf (tmp_extname, "%08x", run_no);
576       msg (SE, _("%s: Error writing temporary file: %s."),
577            tmp_basename, strerror (errno));
578       return;
579     }
580
581   run_length++;
582 }
583
584 static int
585 close_handle (int i)
586 {
587   int result = fclose (handle[i]);
588   msg (VM (2), _("SORT: Closing handle %d."), i);
589   
590   handle[i] = NULL;
591   if (EOF == result)
592     {
593       sprintf (tmp_extname, "%08x", i);
594       msg (SE, _("%s: Error closing temporary file: %s."),
595            tmp_basename, strerror (errno));
596       return 0;
597     }
598   return 1;
599 }
600
601 static int
602 close_handles (int beg, int end)
603 {
604   int success = 1;
605   int i;
606
607   for (i = beg; i < end; i++)
608     success &= close_handle (i);
609   return success;
610 }
611
612 static int
613 open_handle_w (int handle_no, int run_no)
614 {
615   sprintf (tmp_extname, "%08x", run_no);
616   msg (VM (1), _("SORT: %s: Opening for writing as run %d."),
617        tmp_basename, run_no);
618
619   /* The `x' modifier causes the GNU C library to insist on creating a
620      new file--if the file already exists, an error is signaled.  The
621      ANSI C standard says that other libraries should ignore anything
622      after the `w+b', so it shouldn't be a problem. */
623   return NULL != (handle[handle_no] = fopen (tmp_basename, "w+bx"));
624 }
625
626 static int
627 open_handle_r (int handle_no, int run_no)
628 {
629   FILE *f;
630
631   sprintf (tmp_extname, "%08x", run_no);
632   msg (VM (1), _("SORT: %s: Opening for writing as run %d."),
633        tmp_basename, run_no);
634   f = handle[handle_no] = fopen (tmp_basename, "rb");
635
636   if (f == NULL)
637     {
638       msg (SE, _("%s: Error opening temporary file for reading: %s."),
639            tmp_basename, strerror (errno));
640       return 0;
641     }
642   
643   return 1;
644 }
645
646 /* Begins a new initial run, specifically its output file. */
647 static void
648 begin_run (void)
649 {
650   /* Decide which handle[] to use.  If run_no is max_handles or
651      greater, then we've run out of handles so it's time to just do
652      one file at a time, which by default is handle 0. */
653   file_index = (run_no < max_handles ? run_no : 0);
654   run_length = 0;
655
656   /* Alright, now create the temporary file. */
657   if (open_handle_w (file_index, run_no) == 0)
658     {
659       /* Failure to create the temporary file.  Check if there are
660          unacceptably few files already open. */
661       if (file_index < 3)
662         {
663           deferred_abort = 1;
664           msg (SE, _("%s: Error creating temporary file: %s."),
665                tmp_basename, strerror (errno));
666           return;
667         }
668
669       /* Close all the open temporary files. */
670       if (!close_handles (0, file_index))
671         return;
672
673       /* Now try again to create the temporary file. */
674       max_handles = file_index;
675       file_index = 0;
676       if (open_handle_w (0, run_no) == 0)
677         {
678           /* It still failed, report it this time. */
679           deferred_abort = 1;
680           msg (SE, _("%s: Error creating temporary file: %s."),
681                tmp_basename, strerror (errno));
682           return;
683         }
684     }
685 }
686
687 /* Ends the current initial run.  Just increments run_no if no initial
688    run has been started yet. */
689 static void
690 end_run (void)
691 {
692   /* Close file handles if necessary. */
693   {
694     int result;
695
696     if (run_no == max_handles - 1)
697       result = close_handles (0, max_handles);
698     else if (run_no >= max_handles)
699       result = close_handle (0);
700     else
701       result = 1;
702     if (!result)
703       deferred_abort = 1;
704   }
705
706   /* Advance to next run. */
707   run_no++;
708   if (run_no)
709     heap_insert (huffman_queue, run_no - 1, run_length);
710 }
711
712 /* Performs 5.4.1R. */
713 static int
714 write_initial_runs (int separate)
715 {
716   run_no = -1;
717   deferred_abort = 0;
718
719   /* Steps R1, R2, R3. */
720   rmax = 0;
721   rc = 0;
722   lastkey = NULL;
723   q = x[0];
724   rq = 0;
725   {
726     int j;
727
728     for (j = 0; j < x_max; j++)
729       {
730         struct repl_sel_tree *J = x[j];
731
732         J->loser = J;
733         J->rn = 0;
734         J->fe = x[(x_max + j) / 2];
735         J->fi = x[j / 2];
736         memset (J->record, 0, dict_get_case_size (default_dict));
737       }
738   }
739
740   /* Most of the iterations of steps R4, R5, R6, R7, R2, R3, ... */
741   if (!separate)
742     {
743       if (vfm_sink)
744         vfm_sink->destroy_sink ();
745       vfm_sink = &sort_stream;
746     }
747   procedure (NULL, NULL, NULL, NULL);
748
749   /* Final iterations of steps R4, R5, R6, R7, R2, R3, ... */
750   for (;;)
751     {
752       struct repl_sel_tree *t;
753
754       /* R4. */
755       rq = rmax + 1;
756
757       /* R5. */
758       t = q->fe;
759
760       /* R6 and R7. */
761       for (;;)
762         {
763           /* R6. */
764           if (t->rn < rq
765               || (t->rn == rq
766                   && compare_record (t->loser->record, q->record) < 0))
767             {
768               struct repl_sel_tree *temp_tree;
769               int temp_int;
770
771               temp_tree = t->loser;
772               t->loser = q;
773               q = temp_tree;
774
775               temp_int = t->rn;
776               t->rn = rq;
777               rq = temp_int;
778             }
779
780           /* R7. */
781           if (t == x[1])
782             break;
783           t = t->fi;
784         }
785
786       /* R2. */
787       if (rq != rc)
788         {
789           end_run ();
790           if (rq > rmax)
791             break;
792           begin_run ();
793           rc = rq;
794         }
795
796       /* R3. */
797       if (rq != 0)
798         {
799           output_record (q->record);
800           lastkey = x[x_max]->record;
801           memcpy (lastkey, q->record, sizeof (union value) * vfm_sink_info.nval);
802         }
803     }
804   assert (run_no == rmax);
805
806   /* If an unrecoverable error occurred somewhere in the above code,
807      then the `deferred_abort' flag would have been set.  */
808   if (deferred_abort)
809     {
810       int i;
811
812       for (i = 0; i < max_handles; i++)
813         if (handle[i] != NULL)
814           {
815             sprintf (tmp_extname, "%08x", i);
816
817             if (fclose (handle[i]) == EOF)
818               msg (SE, _("%s: Error closing temporary file: %s."),
819                    tmp_basename, strerror (errno));
820
821             if (remove (tmp_basename) != 0)
822               msg (SE, _("%s: Error removing temporary file: %s."),
823                    tmp_basename, strerror (errno));
824
825             handle[i] = NULL;
826           }
827       return 0;
828     }
829
830   return 1;
831 }
832
833 /* Compares the NV_SORT variables in V_SORT[] between the `value's at
834    A and B, and returns a strcmp()-type result. */
835 static int
836 compare_record (union value * a, union value * b)
837 {
838   int i;
839   int result = 0;
840   struct variable *v;
841
842   assert (a != NULL);
843   if (b == NULL)                /* Sort NULLs after everything else. */
844     return -1;
845
846   for (i = 0; i < nv_sort; i++)
847     {
848       v = v_sort[i];
849
850       if (v->type == NUMERIC)
851         {
852           if (approx_ne (a[v->fv].f, b[v->fv].f))
853             {
854               result = (a[v->fv].f > b[v->fv].f) ? 1 : -1;
855               break;
856             }
857         }
858       else
859         {
860           result = memcmp (a[v->fv].s, b[v->fv].s, v->width);
861           if (result != 0)
862             break;
863         }
864     }
865
866   if (v->p.srt.order == SRT_ASCEND)
867     return result;
868   else
869     {
870       assert (v->p.srt.order == SRT_DESCEND);
871       return -result;
872     }
873 }
874 \f
875 /* Merging. */
876
877 static int merge_once (int run_index[], int run_length[], int n_runs);
878
879 /* Modula function as defined by Knuth. */
880 static int
881 mod (int x, int y)
882 {
883   int result;
884
885   if (y == 0)
886     return x;
887   result = abs (x) % abs (y);
888   if (y < 0)
889     result = -result;
890   return result;
891 }
892
893 /* Performs a series of P-way merges of initial runs using Huffman's
894    method. */
895 static int
896 merge (void)
897 {
898   /* Order of merge. */
899   int order;
900
901   /* Idiot check. */
902   assert (MIN_BUFFER_SIZE_RECS * 2 <= MIN_BUFFER_TOTAL_SIZE_RECS - 1);
903
904   /* Close all the input files.  I hope that the boundary conditions
905      are correct on this but I'm not sure. */
906   if (run_no < max_handles)
907     {
908       int i;
909
910       for (i = 0; i < run_no; )
911         if (!close_handle (i++))
912           {
913             for (; i < run_no; i++)
914               close_handle (i);
915             return 0;
916           }
917     }
918
919   /* Determine order of merge. */
920   order = MAX_MERGE_ORDER;
921   if (x_max / order < MIN_BUFFER_SIZE_RECS)
922     order = x_max / MIN_BUFFER_SIZE_RECS;
923   else if (x_max / order * dict_get_case_size (default_dict)
924            < MIN_BUFFER_SIZE_BYTES)
925     order = x_max / (MIN_BUFFER_SIZE_BYTES
926                      / dict_get_case_size (default_dict));
927
928   /* Make sure the order of merge is bounded. */
929   if (order < 2)
930     order = 2;
931   if (order > rmax)
932     order = rmax;
933   assert (x_max / order > 0);
934
935   /* Calculate number of records per buffer. */
936   records_per_buffer = x_max / order;
937
938   /* Add (1 - S) mod (P - 1) dummy runs of length 0. */
939   {
940     int n_dummy_runs = mod (1 - rmax, order - 1);
941     debug_printf (("rmax=%d, order=%d, n_dummy_runs=%d\n",
942                    rmax, order, n_dummy_runs));
943     assert (n_dummy_runs >= 0);
944     while (n_dummy_runs--)
945       {
946         heap_insert (huffman_queue, -2, 0);
947         rmax++;
948       }
949   }
950
951   /* Repeatedly merge the P shortest existing runs until only one run
952      is left. */
953   while (rmax > 1)
954     {
955       int run_index[MAX_MERGE_ORDER];
956       int run_length[MAX_MERGE_ORDER];
957       int total_run_length = 0;
958       int i;
959
960       assert (rmax >= order);
961
962       /* Find the shortest runs; put them in runs[] in reverse order
963          of length, to force dummy runs of length 0 to the end of the
964          list. */
965       debug_printf ((_("merging runs")));
966       for (i = order - 1; i >= 0; i--)
967         {
968           run_index[i] = heap_delete (huffman_queue, &run_length[i]);
969           assert (run_index[i] != -1);
970           total_run_length += run_length[i];
971           debug_printf ((" %d(%d)", run_index[i], run_length[i]));
972         }
973       debug_printf ((_(" into run %d(%d)\n"), run_no, total_run_length));
974
975       if (!merge_once (run_index, run_length, order))
976         {
977           int index;
978
979           while (-1 != (index = heap_delete (huffman_queue, NULL)))
980             {
981               sprintf (tmp_extname, "%08x", index);
982               if (remove (tmp_basename) != 0)
983                 msg (SE, _("%s: Error removing temporary file: %s."),
984                      tmp_basename, strerror (errno));
985             }
986
987           return 0;
988         }
989
990       if (!heap_insert (huffman_queue, run_no++, total_run_length))
991         {
992           msg (SE, _("Out of memory expanding Huffman priority queue."));
993           return 0;
994         }
995
996       rmax -= order - 1;
997     }
998
999   /* There should be exactly one element in the priority queue after
1000      all that merging.  This represents the entire sorted active file.
1001      So we could find a total case count by deleting this element from
1002      the queue. */
1003   assert (heap_size (huffman_queue) == 1);
1004
1005   return 1;
1006 }
1007
1008 /* Merges N_RUNS initial runs into a new run.  The jth run for 0 <= j
1009    < N_RUNS is taken from temporary file RUN_INDEX[j]; it is composed
1010    of RUN_LENGTH[j] cases. */
1011 static int
1012 merge_once (int run_index[], int run_length[], int n_runs)
1013 {
1014   /* For each run, the number of records remaining in its buffer. */
1015   int buffered[MAX_MERGE_ORDER];
1016
1017   /* For each run, the index of the next record in the buffer. */
1018   int buffer_ptr[MAX_MERGE_ORDER];
1019
1020   /* Open input files. */
1021   {
1022     int i;
1023
1024     for (i = 0; i < n_runs; i++)
1025       if (run_index[i] != -2 && !open_handle_r (i, run_index[i]))
1026         {
1027           /* Close and remove temporary files. */
1028           while (i--)
1029             {
1030               close_handle (i);
1031               sprintf (tmp_extname, "%08x", i);
1032               if (remove (tmp_basename) != 0)
1033                 msg (SE, _("%s: Error removing temporary file: %s."),
1034                      tmp_basename, strerror (errno));
1035             }
1036
1037           return 0;
1038         }
1039   }
1040
1041   /* Create output file. */
1042   if (!open_handle_w (N_INPUT_BUFFERS, run_no))
1043     {
1044       msg (SE, _("%s: Error creating temporary file for merge: %s."),
1045            tmp_basename, strerror (errno));
1046       goto lossage;
1047     }
1048
1049   /* Prime each buffer. */
1050   {
1051     int i;
1052
1053     for (i = 0; i < n_runs; i++)
1054       if (run_index[i] == -2)
1055         {
1056           n_runs = i;
1057           break;
1058         }
1059       else
1060         {
1061           int j;
1062           int ofs = records_per_buffer * i;
1063
1064           buffered[i] = min (records_per_buffer, run_length[i]);
1065           for (j = 0; j < buffered[i]; j++)
1066             if (fread (x[j + ofs]->record,
1067                        dict_get_case_size (default_dict), 1, handle[i]) != 1)
1068               {
1069                 sprintf (tmp_extname, "%08x", run_index[i]);
1070                 if (ferror (handle[i]))
1071                   msg (SE, _("%s: Error reading temporary file in merge: %s."),
1072                        tmp_basename, strerror (errno));
1073                 else
1074                   msg (SE, _("%s: Unexpected end of temporary file in merge."),
1075                        tmp_basename);
1076                 goto lossage;
1077               }
1078           buffer_ptr[i] = ofs;
1079           run_length[i] -= buffered[i];
1080         }
1081   }
1082
1083   /* Perform the merge proper. */
1084   while (n_runs)                /* Loop while some data is left. */
1085     {
1086       int i;
1087       int min = 0;
1088
1089       for (i = 1; i < n_runs; i++)
1090         if (compare_record (x[buffer_ptr[min]]->record,
1091                             x[buffer_ptr[i]]->record) > 0)
1092           min = i;
1093
1094       if (fwrite (x[buffer_ptr[min]]->record,
1095                   dict_get_case_size (default_dict), 1,
1096                   handle[N_INPUT_BUFFERS]) != 1)
1097         {
1098           sprintf (tmp_extname, "%08x", run_index[i]);
1099           msg (SE, _("%s: Error writing temporary file in "
1100                "merge: %s."), tmp_basename, strerror (errno));
1101           goto lossage;
1102         }
1103
1104       /* Remove one case from the buffer for this input file. */
1105       if (--buffered[min] == 0)
1106         {
1107           /* The input buffer is empty.  Do any cases remain in the
1108              initial run on disk? */
1109           if (run_length[min])
1110             {
1111               /* Yes.  Read them in. */
1112
1113               int j;
1114               int ofs;
1115
1116               /* Reset the buffer pointer.  Note that we can't simply
1117                  set it to (i * records_per_buffer) since the run
1118                  order might have changed. */
1119               ofs = buffer_ptr[min] -= buffer_ptr[min] % records_per_buffer;
1120
1121               buffered[min] = min (records_per_buffer, run_length[min]);
1122               for (j = 0; j < buffered[min]; j++)
1123                 if (fread (x[j + ofs]->record,
1124                            dict_get_case_size (default_dict), 1, handle[min])
1125                     != 1)
1126                   {
1127                     sprintf (tmp_extname, "%08x", run_index[min]);
1128                     if (ferror (handle[min]))
1129                       msg (SE, _("%s: Error reading temporary file in "
1130                                  "merge: %s."),
1131                            tmp_basename, strerror (errno));
1132                     else
1133                       msg (SE, _("%s: Unexpected end of temporary file "
1134                                  "in merge."),
1135                            tmp_basename);
1136                     goto lossage;
1137                   }
1138               run_length[min] -= buffered[min];
1139             }
1140           else
1141             {
1142               /* No.  Delete this run. */
1143
1144               /* Close the file. */
1145               FILE *f = handle[min];
1146               handle[min] = NULL;
1147               sprintf (tmp_extname, "%08x", run_index[min]);
1148               if (fclose (f) == EOF)
1149                 msg (SE, _("%s: Error closing temporary file in merge: "
1150                      "%s."), tmp_basename, strerror (errno));
1151
1152               /* Delete the file. */
1153               if (remove (tmp_basename) != 0)
1154                 msg (SE, _("%s: Error removing temporary file in merge: "
1155                      "%s."), tmp_basename, strerror (errno));
1156
1157               n_runs--;
1158               if (min != n_runs)
1159                 {
1160                   /* Since this isn't the last run, we move the last
1161                      run into its spot to force all the runs to be
1162                      contiguous. */
1163                   run_index[min] = run_index[n_runs];
1164                   run_length[min] = run_length[n_runs];
1165                   buffer_ptr[min] = buffer_ptr[n_runs];
1166                   buffered[min] = buffered[n_runs];
1167                   handle[min] = handle[n_runs];
1168                 }
1169             }
1170         }
1171       else
1172         buffer_ptr[min]++;
1173     }
1174
1175   /* Close output file. */
1176   {
1177     FILE *f = handle[N_INPUT_BUFFERS];
1178     handle[N_INPUT_BUFFERS] = NULL;
1179     if (fclose (f) == EOF)
1180       {
1181         sprintf (tmp_extname, "%08x", run_no);
1182         msg (SE, _("%s: Error closing temporary file in merge: "
1183                    "%s."),
1184              tmp_basename, strerror (errno));
1185         return 0;
1186       }
1187   }
1188
1189   return 1;
1190
1191 lossage:
1192   /* Close all the input and output files. */
1193   {
1194     int i;
1195
1196     for (i = 0; i < n_runs; i++)
1197       if (run_length[i] != 0)
1198         {
1199           close_handle (i);
1200           sprintf (tmp_basename, "%08x", run_index[i]);
1201           if (remove (tmp_basename) != 0)
1202             msg (SE, _("%s: Error removing temporary file: %s."),
1203                  tmp_basename, strerror (errno));
1204         }
1205   }
1206   close_handle (N_INPUT_BUFFERS);
1207   sprintf (tmp_basename, "%08x", run_no);
1208   if (remove (tmp_basename) != 0)
1209     msg (SE, _("%s: Error removing temporary file: %s."),
1210          tmp_basename, strerror (errno));
1211   return 0;
1212 }
1213 \f
1214 /* External sort input program. */
1215
1216 /* Reads all the records from the source stream and passes them
1217    to write_case(). */
1218 static void
1219 sort_stream_read (write_case_func *write_case, write_case_data wc_data)
1220 {
1221   read_sort_output (write_case, wc_data);
1222 }
1223
1224 /* Reads all the records from the output stream and passes them to the
1225    function provided, which must have an interface identical to
1226    write_case(). */
1227 void
1228 read_sort_output (write_case_func *write_case, write_case_data wc_data)
1229 {
1230   int i;
1231   FILE *f;
1232
1233   if (separate_case_tab)
1234     {
1235       struct ccase *save_temp_case = temp_case;
1236       struct case_list **p;
1237
1238       for (p = separate_case_tab; *p; p++)
1239         {
1240           temp_case = &(*p)->c;
1241           write_case (wc_data);
1242         }
1243       
1244       free (separate_case_tab);
1245       separate_case_tab = NULL;
1246             
1247       temp_case = save_temp_case;
1248     } else {
1249       sprintf (tmp_extname, "%08x", run_no - 1);
1250       f = fopen (tmp_basename, "rb");
1251       if (!f)
1252         {
1253           msg (ME, _("%s: Cannot open sort result file: %s."), tmp_basename,
1254                strerror (errno));
1255           err_failure ();
1256           return;
1257         }
1258
1259       for (i = 0; i < vfm_source_info.ncases; i++)
1260         {
1261           if (!fread (temp_case, vfm_source_info.case_size, 1, f))
1262             {
1263               if (ferror (f))
1264                 msg (ME, _("%s: Error reading sort result file: %s."),
1265                      tmp_basename, strerror (errno));
1266               else
1267                 msg (ME, _("%s: Unexpected end of sort result file: %s."),
1268                      tmp_basename, strerror (errno));
1269               err_failure ();
1270               break;
1271             }
1272
1273           if (!write_case (wc_data))
1274             break;
1275         }
1276
1277       if (fclose (f) == EOF)
1278         msg (ME, _("%s: Error closing sort result file: %s."), tmp_basename,
1279              strerror (errno));
1280
1281       if (remove (tmp_basename) != 0)
1282         msg (ME, _("%s: Error removing sort result file: %s."), tmp_basename,
1283              strerror (errno));
1284       else
1285         rmdir_temp_dir ();
1286     }
1287 }
1288
1289 #if 0 /* dead code */
1290 /* Alternate interface to sort_stream_write used for external sorting
1291    when SEPARATE is true. */
1292 static int
1293 write_separate (struct ccase *c)
1294 {
1295   assert (c == temp_case);
1296
1297   sort_stream_write ();
1298   return 1;
1299 }
1300 #endif
1301
1302 /* Performs one iteration of 5.4.1R steps R4, R5, R6, R7, R2, and
1303    R3. */
1304 static void
1305 sort_stream_write (void)
1306 {
1307   struct repl_sel_tree *t;
1308
1309   /* R4. */
1310   memcpy (q->record, temp_case->data, vfm_sink_info.case_size);
1311   if (compare_record (q->record, lastkey) < 0)
1312     if (++rq > rmax)
1313       rmax = rq;
1314
1315   /* R5. */
1316   t = q->fe;
1317
1318   /* R6 and R7. */
1319   for (;;)
1320     {
1321       /* R6. */
1322       if (t->rn < rq
1323           || (t->rn == rq && compare_record (t->loser->record, q->record) < 0))
1324         {
1325           struct repl_sel_tree *temp_tree;
1326           int temp_int;
1327
1328           temp_tree = t->loser;
1329           t->loser = q;
1330           q = temp_tree;
1331
1332           temp_int = t->rn;
1333           t->rn = rq;
1334           rq = temp_int;
1335         }
1336
1337       /* R7. */
1338       if (t == x[1])
1339         break;
1340       t = t->fi;
1341     }
1342
1343   /* R2. */
1344   if (rq != rc)
1345     {
1346       end_run ();
1347       begin_run ();
1348       assert (rq <= rmax);
1349       rc = rq;
1350     }
1351
1352   /* R3. */
1353   if (rq != 0)
1354     {
1355       output_record (q->record);
1356       lastkey = x[x_max]->record;
1357       memcpy (lastkey, q->record, vfm_sink_info.case_size);
1358     }
1359 }
1360
1361 /* Switches mode from sink to source. */
1362 static void
1363 sort_stream_mode (void)
1364 {
1365   /* If this is not done, then we get the following source/sink pairs:
1366      source=memory/disk/DATA LIST/etc., sink=SORT; source=SORT,
1367      sink=SORT; which is not good. */
1368   vfm_sink = NULL;
1369 }
1370
1371 struct case_stream sort_stream =
1372   {
1373     NULL,
1374     sort_stream_read,
1375     sort_stream_write,
1376     sort_stream_mode,
1377     NULL,
1378     NULL,
1379     "SORT",
1380   };