Sat Dec 27 16:16:49 2003 Ben Pfaff <blp@gnu.org>
[pspp-builds.git] / src / sort.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include "sort.h"
22 #include <assert.h>
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <errno.h>
26 #include "alloc.h"
27 #include "approx.h"
28 #include "command.h"
29 #include "error.h"
30 #include "expr.h"
31 #include "heap.h"
32 #include "lexer.h"
33 #include "misc.h"
34 #include "str.h"
35 #include "var.h"
36 #include "vfm.h"
37 #include "vfmP.h"
38
39 #if HAVE_UNISTD_H
40 #include <unistd.h>
41 #endif
42
43 #if HAVE_SYS_TYPES_H
44 #include <sys/types.h>
45 #endif
46
47 #if HAVE_SYS_STAT_H
48 #include <sys/stat.h>
49 #endif
50
51 #include "debug-print.h"
52
53 /* Variables to sort. */
54 struct variable **v_sort;
55 int nv_sort;
56
57 /* Used when internal-sorting to a separate file. */
58 static struct case_list **separate_case_tab;
59
60 /* Other prototypes. */
61 static int compare_case_lists (const void *, const void *);
62 static int do_internal_sort (int separate);
63 static int do_external_sort (int separate);
64 int parse_sort_variables (void);
65 void read_sort_output (int (*write_case) (void));
66
67 /* Performs the SORT CASES procedures. */
68 int
69 cmd_sort_cases (void)
70 {
71   /* First, just parse the command. */
72   lex_match_id ("SORT");
73   lex_match_id ("CASES");
74   lex_match (T_BY);
75
76   if (!parse_sort_variables ())
77     return CMD_FAILURE;
78       
79   cancel_temporary ();
80
81   /* Then it's time to do the actual work.  There are two cases:
82
83      (internal sort) All the data is in memory.  In this case, we
84      perform an EXECUTE to get the data into the desired form, then
85      sort the cases in place, if it is still all in memory.
86
87      (external sort) The data is not in memory.  It may be coming from
88      a system file or other data file, etc.  In any case, it is now
89      time to perform an external sort.  This is better explained in
90      do_external_sort(). */
91
92   /* Do all this dirty work. */
93   {
94     int success = sort_cases (0);
95     free (v_sort);
96     if (success)
97       return lex_end_of_command ();
98     else
99       return CMD_FAILURE;
100   }
101 }
102
103 /* Parses a list of sort variables into v_sort and nv_sort.  */
104 int
105 parse_sort_variables (void)
106 {
107   v_sort = NULL;
108   nv_sort = 0;
109   do
110     {
111       int prev_nv_sort = nv_sort;
112       int order = SRT_ASCEND;
113
114       if (!parse_variables (default_dict, &v_sort, &nv_sort,
115                             PV_NO_DUPLICATE | PV_APPEND | PV_NO_SCRATCH))
116         return 0;
117       if (lex_match ('('))
118         {
119           if (lex_match_id ("D") || lex_match_id ("DOWN"))
120             order = SRT_DESCEND;
121           else if (!lex_match_id ("A") && !lex_match_id ("UP"))
122             {
123               free (v_sort);
124               msg (SE, _("`A' or `D' expected inside parentheses."));
125               return 0;
126             }
127           if (!lex_match (')'))
128             {
129               free (v_sort);
130               msg (SE, _("`)' expected."));
131               return 0;
132             }
133         }
134       for (; prev_nv_sort < nv_sort; prev_nv_sort++)
135         v_sort[prev_nv_sort]->p.srt.order = order;
136     }
137   while (token != '.' && token != '/');
138   
139   return 1;
140 }
141
142 /* Sorts the active file based on the key variables specified in
143    global variables v_sort and nv_sort.  The output is either to the
144    active file, if SEPARATE is zero, or to a separate file, if
145    SEPARATE is nonzero.  In the latter case the output cases can be
146    read with a call to read_sort_output().  (In the former case the
147    output cases should be dealt with through the usual vfm interface.)
148
149    The caller is responsible for freeing v_sort[]. */
150 int
151 sort_cases (int separate)
152 {
153   assert (separate_case_tab == NULL);
154
155   /* Not sure this is necessary but it's good to be safe. */
156   if (separate && vfm_source == &sort_stream)
157     procedure (NULL, NULL, NULL);
158   
159   /* SORT CASES cancels PROCESS IF. */
160   expr_free (process_if_expr);
161   process_if_expr = NULL;
162
163   if (do_internal_sort (separate))
164     return 1;
165
166   page_to_disk ();
167   return do_external_sort (separate);
168 }
169
170 /* If a reasonable situation is set up, do an internal sort of the
171    data.  Return success. */
172 static int
173 do_internal_sort (int separate)
174 {
175   if (vfm_source != &vfm_disk_stream)
176     {
177       if (vfm_source != &vfm_memory_stream)
178         procedure (NULL, NULL, NULL);
179       if (vfm_source == &vfm_memory_stream)
180         {
181           struct case_list **case_tab = malloc (sizeof *case_tab
182                                          * (vfm_source_info.ncases + 1));
183           if (vfm_source_info.ncases == 0)
184             {
185               free (case_tab);
186               return 1;
187             }
188           if (case_tab != NULL)
189             {
190               struct case_list *clp = memory_source_cases;
191               struct case_list **ctp = case_tab;
192               int i;
193
194               for (; clp; clp = clp->next)
195                 *ctp++ = clp;
196               qsort (case_tab, vfm_source_info.ncases, sizeof *case_tab,
197                      compare_case_lists);
198
199               if (!separate)
200                 {
201                   memory_source_cases = case_tab[0];
202                   for (i = 1; i < vfm_source_info.ncases; i++)
203                     case_tab[i - 1]->next = case_tab[i];
204                   case_tab[vfm_source_info.ncases - 1]->next = NULL;
205                   free (case_tab);
206                 } else {
207                   case_tab[vfm_source_info.ncases] = NULL;
208                   separate_case_tab = case_tab;
209                 }
210               
211               return 1;
212             }
213         }
214     }
215   return 0;
216 }
217
218 /* Compares the NV_SORT variables in V_SORT[] between the
219    `case_list's at A and B, and returns a strcmp()-type
220    result. */
221 static int
222 compare_case_lists (const void *a_, const void *b_)
223 {
224   struct case_list *const *pa = a_;
225   struct case_list *const *pb = b_;
226   struct case_list *a = *pa;
227   struct case_list *b = *pb;
228   struct variable *v;
229   int result = 0;
230   int i;
231
232   for (i = 0; i < nv_sort; i++)
233     {
234       v = v_sort[i];
235       
236       if (v->type == NUMERIC)
237         {
238           double af = a->c.data[v->fv].f;
239           double bf = b->c.data[v->fv].f;
240
241           result = af < bf ? -1 : af > bf;
242         }
243       else
244         result = memcmp (a->c.data[v->fv].s, b->c.data[v->fv].s, v->width);
245
246       if (result != 0)
247         break;
248     }
249
250   if (v->p.srt.order == SRT_DESCEND)
251     result = -result;
252   return result;
253 }
254 \f
255 /* External sort. */
256
257 /* Maximum number of input + output file handles. */
258 #if defined FOPEN_MAX && (FOPEN_MAX - 5 < 18)
259 #define MAX_FILE_HANDLES        (FOPEN_MAX - 5)
260 #else
261 #define MAX_FILE_HANDLES        18
262 #endif
263
264 #if MAX_FILE_HANDLES < 3
265 #error At least 3 file handles must be available for sorting.
266 #endif
267
268 /* Number of input buffers. */
269 #define N_INPUT_BUFFERS         (MAX_FILE_HANDLES - 1)
270
271 /* Maximum order of merge.  This is the value suggested by Knuth;
272    specifically, he said to use tree selection, which we don't
273    implement, for larger orders of merge. */
274 #define MAX_MERGE_ORDER         7
275
276 /* Minimum total number of records for buffers. */
277 #define MIN_BUFFER_TOTAL_SIZE_RECS      64
278
279 /* Minimum single input or output buffer size, in bytes and records. */
280 #define MIN_BUFFER_SIZE_BYTES   4096
281 #define MIN_BUFFER_SIZE_RECS    16
282
283 /* Structure for replacement selection tree. */
284 struct repl_sel_tree
285   {
286     struct repl_sel_tree *loser;/* Loser associated w/this internal node. */
287     int rn;                     /* Run number of `loser'. */
288     struct repl_sel_tree *fe;   /* Internal node above this external node. */
289     struct repl_sel_tree *fi;   /* Internal node above this internal node. */
290     union value record[1];      /* The case proper. */
291   };
292
293 /* Static variables used for sorting. */
294 static struct repl_sel_tree **x; /* Buffers. */
295 static int x_max;               /* Size of buffers, in records. */
296 static int records_per_buffer;  /* Number of records in each buffer. */
297
298 /* In the merge phase, the first N_INPUT_BUFFERS handle[] elements are
299    input files and the last element is the output file.  Before that,
300    they're all used as output files, although the last one is
301    segregated. */
302 static FILE *handle[MAX_FILE_HANDLES];  /* File handles. */
303
304 /* Now, MAX_FILE_HANDLES is the maximum number of files we will *try*
305    to open.  But if we can't open that many, max_handles will be set
306    to the number we apparently can open. */
307 static int max_handles;         /* Maximum number of handles. */
308
309 /* When we create temporary files, they are all put in the same
310    directory and numbered sequentially from zero.  tmp_basename is the
311    drive/directory, etc., and tmp_extname can be sprintf() with "%08x"
312    to the file number, then tmp_basename used to open the file. */
313 static char *tmp_basename;      /* Temporary file basename. */
314 static char *tmp_extname;       /* Temporary file extension name. */
315
316 /* We use Huffman's method to determine the merge pattern.  This means
317    that we need to know which runs are the shortest at any given time.
318    Priority queues as implemented by heap.c are a natural for this
319    task (probably because I wrote the code specifically for it). */
320 static struct heap *huffman_queue;      /* Huffman priority queue. */
321
322 /* Prototypes for helper functions. */
323 static void sort_stream_write (void);
324 static int write_initial_runs (int separate);
325 static int allocate_cases (void);
326 static int allocate_file_handles (void);
327 static int merge (void);
328 static void rmdir_temp_dir (void);
329
330 /* Performs an external sort of the active file.  A description of the
331    procedure follows.  All page references refer to Knuth's _Art of
332    Computer Programming, Vol. 3: Sorting and Searching_, which is the
333    canonical resource for sorting.
334
335    1. The data is read and S initial runs are formed through the
336    action of algorithm 5.4.1R (replacement selection).
337
338    2. Huffman's method (p. 365-366) is used to determine the optimum
339    merge pattern.
340
341    3. If an OS that supports overlapped reading, writing, and
342    computing is being run, we should use 5.4.6F for forecasting.
343    Otherwise, buffers are filled only when they run out of data.
344    FIXME: Since the author of PSPP uses GNU/Linux, which does not
345    yet implement overlapped r/w/c, 5.4.6F is not used.
346
347    4. We perform P-way merges:
348
349    (a) The desired P is the smallest P such that ceil(ln(S)/ln(P))
350    is minimized.  (FIXME: Since I don't have an algorithm for
351    minimizing this, it's just set to MAX_MERGE_ORDER.)
352
353    (b) P is reduced if the selected value would make input buffers
354    less than 4096 bytes each, or 16 records, whichever is larger.
355
356    (c) P is reduced if we run out of available file handles or space
357    for file handles.
358
359    (d) P is reduced if we don't have space for one or two output
360    buffers, which have the same minimum size as input buffers.  (We
361    need two output buffers if 5.4.6F is in use for forecasting.)  */
362 static int
363 do_external_sort (int separate)
364 {
365   int success = 0;
366
367   assert (MAX_FILE_HANDLES >= 3);
368
369   x = NULL;
370   tmp_basename = NULL;
371
372   huffman_queue = heap_create (512);
373   if (huffman_queue == NULL)
374     return 0;
375
376   if (!allocate_cases ())
377     goto lossage;
378
379   if (!allocate_file_handles ())
380     goto lossage;
381
382   if (!write_initial_runs (separate))
383     goto lossage;
384
385   merge ();
386
387   success = 1;
388
389   /* Despite the name, flow of control comes here regardless of
390      whether or not the sort is successful. */
391 lossage:
392   heap_destroy (huffman_queue);
393
394   if (x)
395     {
396       int i;
397
398       for (i = 0; i <= x_max; i++)
399         free (x[i]);
400       free (x);
401     }
402
403   if (!success)
404     rmdir_temp_dir ();
405
406   return success;
407 }
408
409 #if !HAVE_GETPID
410 #define getpid() (0)
411 #endif
412
413 /* Sets up to open temporary files. */
414 /* PORTME: This creates a directory for temporary files.  Some OSes
415    might not have that concept... */
416 static int
417 allocate_file_handles (void)
418 {
419   const char *dir;              /* Directory prefix. */
420   char *buf;                    /* String buffer. */
421   char *cp;                     /* Pointer into buf. */
422
423   dir = getenv ("SPSSTMPDIR");
424   if (dir == NULL)
425     dir = getenv ("SPSSXTMPDIR");
426   if (dir == NULL)
427     dir = getenv ("TMPDIR");
428 #ifdef P_tmpdir
429   if (dir == NULL)
430     dir = P_tmpdir;
431 #endif
432 #if __unix__
433   if (dir == NULL)
434     dir = "/tmp";
435 #elif __MSDOS__
436   if (dir == NULL)
437     dir = getenv ("TEMP");
438   if (dir == NULL)
439     dir = getenv ("TMP");
440   if (dir == NULL)
441     dir = "\\";
442 #else
443   dir = "";
444 #endif
445
446   buf = xmalloc (strlen (dir) + 1 + 4 + 8 + 4 + 1 + INT_DIGITS + 1);
447   cp = spprintf (buf, "%s%c%04lX%04lXpspp", dir, DIR_SEPARATOR,
448                  ((long) time (0)) & 0xffff, ((long) getpid ()) & 0xffff);
449   if (-1 == mkdir (buf, S_IRWXU))
450     {
451       free (buf);
452       msg (SE, _("%s: Cannot create temporary directory: %s."),
453            buf, strerror (errno));
454       return 0;
455     }
456   *cp++ = DIR_SEPARATOR;
457
458   tmp_basename = buf;
459   tmp_extname = cp;
460
461   max_handles = MAX_FILE_HANDLES;
462
463   return 1;
464 }
465
466 /* Removes the directory created for temporary files, if one exists.
467    Also frees tmp_basename. */
468 static void
469 rmdir_temp_dir (void)
470 {
471   if (NULL == tmp_basename)
472     return;
473
474   tmp_extname[-1] = '\0';
475   if (rmdir (tmp_basename) == -1)
476     msg (SE, _("%s: Error removing directory for temporary files: %s."),
477          tmp_basename, strerror (errno));
478
479   free (tmp_basename);
480 }
481
482 /* Allocates room for lots of cases as a buffer. */
483 static int
484 allocate_cases (void)
485 {
486   /* This is the size of one case. */
487   const int case_size = (sizeof (struct repl_sel_tree)
488                          + (sizeof (union value)
489                             * (dict_get_value_cnt (default_dict) - 1))
490                          + sizeof (struct repl_sel_tree *));
491
492   x = NULL;
493
494   /* Allocate as many cases as we can, assuming a space of four
495      void pointers for malloc()'s internal bookkeeping. */
496   x_max = MAX_WORKSPACE / (case_size + 4 * sizeof (void *));
497   x = malloc (sizeof (struct repl_sel_tree *) * x_max);
498   if (x != NULL)
499     {
500       int i;
501
502       for (i = 0; i < x_max; i++)
503         {
504           x[i] = malloc (sizeof (struct repl_sel_tree)
505                          + (sizeof (union value)
506                             * (dict_get_value_cnt (default_dict) - 1)));
507           if (x[i] == NULL)
508             break;
509         }
510       x_max = i;
511     }
512   if (x == NULL || x_max < MIN_BUFFER_TOTAL_SIZE_RECS)
513     {
514       if (x != NULL)
515         {
516           int i;
517           
518           for (i = 0; i < x_max; i++)
519             free (x[i]);
520         }
521       free (x);
522       msg (SE, _("Out of memory.  Could not allocate room for minimum of %d "
523                  "cases of %d bytes each.  (PSPP workspace is currently "
524                  "restricted to a maximum of %d KB.)"),
525            MIN_BUFFER_TOTAL_SIZE_RECS, case_size, MAX_WORKSPACE / 1024);
526       x_max = 0;
527       x = NULL;
528       return 0;
529     }
530
531   /* The last element of the array is used to store lastkey. */
532   x_max--;
533
534   debug_printf ((_("allocated %d cases == %d bytes\n"),
535                  x_max, x_max * case_size));
536   return 1;
537 }
538 \f
539 /* Replacement selection. */
540
541 static int rmax, rc, rq;
542 static struct repl_sel_tree *q;
543 static union value *lastkey;
544 static int run_no, file_index;
545 static int deferred_abort;
546 static int run_length;
547
548 static int compare_record (union value *, union value *);
549
550 static inline void
551 output_record (union value *v)
552 {
553   union value *src_case;
554   
555   if (deferred_abort)
556     return;
557
558   if (compaction_necessary)
559     {
560       compact_case (compaction_case, (struct ccase *) v);
561       src_case = (union value *) compaction_case;
562     }
563   else
564     src_case = (union value *) v;
565
566   if ((int) fwrite (src_case, sizeof *src_case, compaction_nval,
567                     handle[file_index])
568       != compaction_nval)
569     {
570       deferred_abort = 1;
571       sprintf (tmp_extname, "%08x", run_no);
572       msg (SE, _("%s: Error writing temporary file: %s."),
573            tmp_basename, strerror (errno));
574       return;
575     }
576
577   run_length++;
578 }
579
580 static int
581 close_handle (int i)
582 {
583   int result = fclose (handle[i]);
584   msg (VM (2), _("SORT: Closing handle %d."), i);
585   
586   handle[i] = NULL;
587   if (EOF == result)
588     {
589       sprintf (tmp_extname, "%08x", i);
590       msg (SE, _("%s: Error closing temporary file: %s."),
591            tmp_basename, strerror (errno));
592       return 0;
593     }
594   return 1;
595 }
596
597 static int
598 close_handles (int beg, int end)
599 {
600   int success = 1;
601   int i;
602
603   for (i = beg; i < end; i++)
604     success &= close_handle (i);
605   return success;
606 }
607
608 static int
609 open_handle_w (int handle_no, int run_no)
610 {
611   sprintf (tmp_extname, "%08x", run_no);
612   msg (VM (1), _("SORT: %s: Opening for writing as run %d."),
613        tmp_basename, run_no);
614
615   /* The `x' modifier causes the GNU C library to insist on creating a
616      new file--if the file already exists, an error is signaled.  The
617      ANSI C standard says that other libraries should ignore anything
618      after the `w+b', so it shouldn't be a problem. */
619   return NULL != (handle[handle_no] = fopen (tmp_basename, "w+bx"));
620 }
621
622 static int
623 open_handle_r (int handle_no, int run_no)
624 {
625   FILE *f;
626
627   sprintf (tmp_extname, "%08x", run_no);
628   msg (VM (1), _("SORT: %s: Opening for writing as run %d."),
629        tmp_basename, run_no);
630   f = handle[handle_no] = fopen (tmp_basename, "rb");
631
632   if (f == NULL)
633     {
634       msg (SE, _("%s: Error opening temporary file for reading: %s."),
635            tmp_basename, strerror (errno));
636       return 0;
637     }
638   
639   return 1;
640 }
641
642 /* Begins a new initial run, specifically its output file. */
643 static void
644 begin_run (void)
645 {
646   /* Decide which handle[] to use.  If run_no is max_handles or
647      greater, then we've run out of handles so it's time to just do
648      one file at a time, which by default is handle 0. */
649   file_index = (run_no < max_handles ? run_no : 0);
650   run_length = 0;
651
652   /* Alright, now create the temporary file. */
653   if (open_handle_w (file_index, run_no) == 0)
654     {
655       /* Failure to create the temporary file.  Check if there are
656          unacceptably few files already open. */
657       if (file_index < 3)
658         {
659           deferred_abort = 1;
660           msg (SE, _("%s: Error creating temporary file: %s."),
661                tmp_basename, strerror (errno));
662           return;
663         }
664
665       /* Close all the open temporary files. */
666       if (!close_handles (0, file_index))
667         return;
668
669       /* Now try again to create the temporary file. */
670       max_handles = file_index;
671       file_index = 0;
672       if (open_handle_w (0, run_no) == 0)
673         {
674           /* It still failed, report it this time. */
675           deferred_abort = 1;
676           msg (SE, _("%s: Error creating temporary file: %s."),
677                tmp_basename, strerror (errno));
678           return;
679         }
680     }
681 }
682
683 /* Ends the current initial run.  Just increments run_no if no initial
684    run has been started yet. */
685 static void
686 end_run (void)
687 {
688   /* Close file handles if necessary. */
689   {
690     int result;
691
692     if (run_no == max_handles - 1)
693       result = close_handles (0, max_handles);
694     else if (run_no >= max_handles)
695       result = close_handle (0);
696     else
697       result = 1;
698     if (!result)
699       deferred_abort = 1;
700   }
701
702   /* Advance to next run. */
703   run_no++;
704   if (run_no)
705     heap_insert (huffman_queue, run_no - 1, run_length);
706 }
707
708 /* Performs 5.4.1R. */
709 static int
710 write_initial_runs (int separate)
711 {
712   run_no = -1;
713   deferred_abort = 0;
714
715   /* Steps R1, R2, R3. */
716   rmax = 0;
717   rc = 0;
718   lastkey = NULL;
719   q = x[0];
720   rq = 0;
721   {
722     int j;
723
724     for (j = 0; j < x_max; j++)
725       {
726         struct repl_sel_tree *J = x[j];
727
728         J->loser = J;
729         J->rn = 0;
730         J->fe = x[(x_max + j) / 2];
731         J->fi = x[j / 2];
732         memset (J->record, 0,
733                 dict_get_value_cnt (default_dict) * sizeof (union value));
734       }
735   }
736
737   /* Most of the iterations of steps R4, R5, R6, R7, R2, R3, ... */
738   if (!separate)
739     {
740       if (vfm_sink)
741         vfm_sink->destroy_sink ();
742       vfm_sink = &sort_stream;
743     }
744   procedure (NULL, NULL, NULL);
745
746   /* Final iterations of steps R4, R5, R6, R7, R2, R3, ... */
747   for (;;)
748     {
749       struct repl_sel_tree *t;
750
751       /* R4. */
752       rq = rmax + 1;
753
754       /* R5. */
755       t = q->fe;
756
757       /* R6 and R7. */
758       for (;;)
759         {
760           /* R6. */
761           if (t->rn < rq
762               || (t->rn == rq
763                   && compare_record (t->loser->record, q->record) < 0))
764             {
765               struct repl_sel_tree *temp_tree;
766               int temp_int;
767
768               temp_tree = t->loser;
769               t->loser = q;
770               q = temp_tree;
771
772               temp_int = t->rn;
773               t->rn = rq;
774               rq = temp_int;
775             }
776
777           /* R7. */
778           if (t == x[1])
779             break;
780           t = t->fi;
781         }
782
783       /* R2. */
784       if (rq != rc)
785         {
786           end_run ();
787           if (rq > rmax)
788             break;
789           begin_run ();
790           rc = rq;
791         }
792
793       /* R3. */
794       if (rq != 0)
795         {
796           output_record (q->record);
797           lastkey = x[x_max]->record;
798           memcpy (lastkey, q->record, sizeof (union value) * vfm_sink_info.nval);
799         }
800     }
801   assert (run_no == rmax);
802
803   /* If an unrecoverable error occurred somewhere in the above code,
804      then the `deferred_abort' flag would have been set.  */
805   if (deferred_abort)
806     {
807       int i;
808
809       for (i = 0; i < max_handles; i++)
810         if (handle[i] != NULL)
811           {
812             sprintf (tmp_extname, "%08x", i);
813
814             if (fclose (handle[i]) == EOF)
815               msg (SE, _("%s: Error closing temporary file: %s."),
816                    tmp_basename, strerror (errno));
817
818             if (remove (tmp_basename) != 0)
819               msg (SE, _("%s: Error removing temporary file: %s."),
820                    tmp_basename, strerror (errno));
821
822             handle[i] = NULL;
823           }
824       return 0;
825     }
826
827   return 1;
828 }
829
830 /* Compares the NV_SORT variables in V_SORT[] between the `value's at
831    A and B, and returns a strcmp()-type result. */
832 static int
833 compare_record (union value * a, union value * b)
834 {
835   int i;
836   int result = 0;
837   struct variable *v;
838
839   assert (a != NULL);
840   if (b == NULL)                /* Sort NULLs after everything else. */
841     return -1;
842
843   for (i = 0; i < nv_sort; i++)
844     {
845       v = v_sort[i];
846
847       if (v->type == NUMERIC)
848         {
849           if (approx_ne (a[v->fv].f, b[v->fv].f))
850             {
851               result = (a[v->fv].f > b[v->fv].f) ? 1 : -1;
852               break;
853             }
854         }
855       else
856         {
857           result = memcmp (a[v->fv].s, b[v->fv].s, v->width);
858           if (result != 0)
859             break;
860         }
861     }
862
863   if (v->p.srt.order == SRT_ASCEND)
864     return result;
865   else
866     {
867       assert (v->p.srt.order == SRT_DESCEND);
868       return -result;
869     }
870 }
871 \f
872 /* Merging. */
873
874 static int merge_once (int run_index[], int run_length[], int n_runs);
875
876 /* Modula function as defined by Knuth. */
877 static int
878 mod (int x, int y)
879 {
880   int result;
881
882   if (y == 0)
883     return x;
884   result = abs (x) % abs (y);
885   if (y < 0)
886     result = -result;
887   return result;
888 }
889
890 /* Performs a series of P-way merges of initial runs using Huffman's
891    method. */
892 static int
893 merge (void)
894 {
895   /* Order of merge. */
896   int order;
897
898   /* Idiot check. */
899   assert (MIN_BUFFER_SIZE_RECS * 2 <= MIN_BUFFER_TOTAL_SIZE_RECS - 1);
900
901   /* Close all the input files.  I hope that the boundary conditions
902      are correct on this but I'm not sure. */
903   if (run_no < max_handles)
904     {
905       int i;
906
907       for (i = 0; i < run_no; )
908         if (!close_handle (i++))
909           {
910             for (; i < run_no; i++)
911               close_handle (i);
912             return 0;
913           }
914     }
915
916   /* Determine order of merge. */
917   order = MAX_MERGE_ORDER;
918   if (x_max / order < MIN_BUFFER_SIZE_RECS)
919     order = x_max / MIN_BUFFER_SIZE_RECS;
920   else if (x_max / order * sizeof (union value) * dict_get_value_cnt (default_dict)
921            < MIN_BUFFER_SIZE_BYTES)
922     order = x_max / (MIN_BUFFER_SIZE_BYTES
923                      / (sizeof (union value)
924                         * (dict_get_value_cnt (default_dict) - 1)));
925
926   /* Make sure the order of merge is bounded. */
927   if (order < 2)
928     order = 2;
929   if (order > rmax)
930     order = rmax;
931   assert (x_max / order > 0);
932
933   /* Calculate number of records per buffer. */
934   records_per_buffer = x_max / order;
935
936   /* Add (1 - S) mod (P - 1) dummy runs of length 0. */
937   {
938     int n_dummy_runs = mod (1 - rmax, order - 1);
939     debug_printf (("rmax=%d, order=%d, n_dummy_runs=%d\n",
940                    rmax, order, n_dummy_runs));
941     assert (n_dummy_runs >= 0);
942     while (n_dummy_runs--)
943       {
944         heap_insert (huffman_queue, -2, 0);
945         rmax++;
946       }
947   }
948
949   /* Repeatedly merge the P shortest existing runs until only one run
950      is left. */
951   while (rmax > 1)
952     {
953       int run_index[MAX_MERGE_ORDER];
954       int run_length[MAX_MERGE_ORDER];
955       int total_run_length = 0;
956       int i;
957
958       assert (rmax >= order);
959
960       /* Find the shortest runs; put them in runs[] in reverse order
961          of length, to force dummy runs of length 0 to the end of the
962          list. */
963       debug_printf ((_("merging runs")));
964       for (i = order - 1; i >= 0; i--)
965         {
966           run_index[i] = heap_delete (huffman_queue, &run_length[i]);
967           assert (run_index[i] != -1);
968           total_run_length += run_length[i];
969           debug_printf ((" %d(%d)", run_index[i], run_length[i]));
970         }
971       debug_printf ((_(" into run %d(%d)\n"), run_no, total_run_length));
972
973       if (!merge_once (run_index, run_length, order))
974         {
975           int index;
976
977           while (-1 != (index = heap_delete (huffman_queue, NULL)))
978             {
979               sprintf (tmp_extname, "%08x", index);
980               if (remove (tmp_basename) != 0)
981                 msg (SE, _("%s: Error removing temporary file: %s."),
982                      tmp_basename, strerror (errno));
983             }
984
985           return 0;
986         }
987
988       if (!heap_insert (huffman_queue, run_no++, total_run_length))
989         {
990           msg (SE, _("Out of memory expanding Huffman priority queue."));
991           return 0;
992         }
993
994       rmax -= order - 1;
995     }
996
997   /* There should be exactly one element in the priority queue after
998      all that merging.  This represents the entire sorted active file.
999      So we could find a total case count by deleting this element from
1000      the queue. */
1001   assert (heap_size (huffman_queue) == 1);
1002
1003   return 1;
1004 }
1005
1006 /* Merges N_RUNS initial runs into a new run.  The jth run for 0 <= j
1007    < N_RUNS is taken from temporary file RUN_INDEX[j]; it is composed
1008    of RUN_LENGTH[j] cases. */
1009 static int
1010 merge_once (int run_index[], int run_length[], int n_runs)
1011 {
1012   /* For each run, the number of records remaining in its buffer. */
1013   int buffered[MAX_MERGE_ORDER];
1014
1015   /* For each run, the index of the next record in the buffer. */
1016   int buffer_ptr[MAX_MERGE_ORDER];
1017
1018   /* Open input files. */
1019   {
1020     int i;
1021
1022     for (i = 0; i < n_runs; i++)
1023       if (run_index[i] != -2 && !open_handle_r (i, run_index[i]))
1024         {
1025           /* Close and remove temporary files. */
1026           while (i--)
1027             {
1028               close_handle (i);
1029               sprintf (tmp_extname, "%08x", i);
1030               if (remove (tmp_basename) != 0)
1031                 msg (SE, _("%s: Error removing temporary file: %s."),
1032                      tmp_basename, strerror (errno));
1033             }
1034
1035           return 0;
1036         }
1037   }
1038
1039   /* Create output file. */
1040   if (!open_handle_w (N_INPUT_BUFFERS, run_no))
1041     {
1042       msg (SE, _("%s: Error creating temporary file for merge: %s."),
1043            tmp_basename, strerror (errno));
1044       goto lossage;
1045     }
1046
1047   /* Prime each buffer. */
1048   {
1049     int i;
1050
1051     for (i = 0; i < n_runs; i++)
1052       if (run_index[i] == -2)
1053         {
1054           n_runs = i;
1055           break;
1056         }
1057       else
1058         {
1059           int j;
1060           int ofs = records_per_buffer * i;
1061
1062           buffered[i] = min (records_per_buffer, run_length[i]);
1063           for (j = 0; j < buffered[i]; j++)
1064             if ((int) fread (x[j + ofs]->record, sizeof (union value),
1065                              dict_get_value_cnt (default_dict), handle[i])
1066                 != dict_get_value_cnt (default_dict))
1067               {
1068                 sprintf (tmp_extname, "%08x", run_index[i]);
1069                 if (ferror (handle[i]))
1070                   msg (SE, _("%s: Error reading temporary file in merge: %s."),
1071                        tmp_basename, strerror (errno));
1072                 else
1073                   msg (SE, _("%s: Unexpected end of temporary file in merge."),
1074                        tmp_basename);
1075                 goto lossage;
1076               }
1077           buffer_ptr[i] = ofs;
1078           run_length[i] -= buffered[i];
1079         }
1080   }
1081
1082   /* Perform the merge proper. */
1083   while (n_runs)                /* Loop while some data is left. */
1084     {
1085       int i;
1086       int min = 0;
1087
1088       for (i = 1; i < n_runs; i++)
1089         if (compare_record (x[buffer_ptr[min]]->record,
1090                             x[buffer_ptr[i]]->record) > 0)
1091           min = i;
1092
1093       if ((int) fwrite (x[buffer_ptr[min]]->record, sizeof (union value),
1094                         dict_get_value_cnt (default_dict),
1095                         handle[N_INPUT_BUFFERS])
1096           != dict_get_value_cnt (default_dict))
1097         {
1098           sprintf (tmp_extname, "%08x", run_index[i]);
1099           msg (SE, _("%s: Error writing temporary file in "
1100                "merge: %s."), tmp_basename, strerror (errno));
1101           goto lossage;
1102         }
1103
1104       /* Remove one case from the buffer for this input file. */
1105       if (--buffered[min] == 0)
1106         {
1107           /* The input buffer is empty.  Do any cases remain in the
1108              initial run on disk? */
1109           if (run_length[min])
1110             {
1111               /* Yes.  Read them in. */
1112
1113               int j;
1114               int ofs;
1115
1116               /* Reset the buffer pointer.  Note that we can't simply
1117                  set it to (i * records_per_buffer) since the run
1118                  order might have changed. */
1119               ofs = buffer_ptr[min] -= buffer_ptr[min] % records_per_buffer;
1120
1121               buffered[min] = min (records_per_buffer, run_length[min]);
1122               for (j = 0; j < buffered[min]; j++)
1123                 if ((int) fread (x[j + ofs]->record, sizeof (union value),
1124                                  dict_get_value_cnt (default_dict),
1125                                  handle[min])
1126                     != dict_get_value_cnt (default_dict))
1127                   {
1128                     sprintf (tmp_extname, "%08x", run_index[min]);
1129                     if (ferror (handle[min]))
1130                       msg (SE, _("%s: Error reading temporary file in "
1131                                  "merge: %s."),
1132                            tmp_basename, strerror (errno));
1133                     else
1134                       msg (SE, _("%s: Unexpected end of temporary file "
1135                                  "in merge."),
1136                            tmp_basename);
1137                     goto lossage;
1138                   }
1139               run_length[min] -= buffered[min];
1140             }
1141           else
1142             {
1143               /* No.  Delete this run. */
1144
1145               /* Close the file. */
1146               FILE *f = handle[min];
1147               handle[min] = NULL;
1148               sprintf (tmp_extname, "%08x", run_index[min]);
1149               if (fclose (f) == EOF)
1150                 msg (SE, _("%s: Error closing temporary file in merge: "
1151                      "%s."), tmp_basename, strerror (errno));
1152
1153               /* Delete the file. */
1154               if (remove (tmp_basename) != 0)
1155                 msg (SE, _("%s: Error removing temporary file in merge: "
1156                      "%s."), tmp_basename, strerror (errno));
1157
1158               n_runs--;
1159               if (min != n_runs)
1160                 {
1161                   /* Since this isn't the last run, we move the last
1162                      run into its spot to force all the runs to be
1163                      contiguous. */
1164                   run_index[min] = run_index[n_runs];
1165                   run_length[min] = run_length[n_runs];
1166                   buffer_ptr[min] = buffer_ptr[n_runs];
1167                   buffered[min] = buffered[n_runs];
1168                   handle[min] = handle[n_runs];
1169                 }
1170             }
1171         }
1172       else
1173         buffer_ptr[min]++;
1174     }
1175
1176   /* Close output file. */
1177   {
1178     FILE *f = handle[N_INPUT_BUFFERS];
1179     handle[N_INPUT_BUFFERS] = NULL;
1180     if (fclose (f) == EOF)
1181       {
1182         sprintf (tmp_extname, "%08x", run_no);
1183         msg (SE, _("%s: Error closing temporary file in merge: "
1184                    "%s."),
1185              tmp_basename, strerror (errno));
1186         return 0;
1187       }
1188   }
1189
1190   return 1;
1191
1192 lossage:
1193   /* Close all the input and output files. */
1194   {
1195     int i;
1196
1197     for (i = 0; i < n_runs; i++)
1198       if (run_length[i] != 0)
1199         {
1200           close_handle (i);
1201           sprintf (tmp_basename, "%08x", run_index[i]);
1202           if (remove (tmp_basename) != 0)
1203             msg (SE, _("%s: Error removing temporary file: %s."),
1204                  tmp_basename, strerror (errno));
1205         }
1206   }
1207   close_handle (N_INPUT_BUFFERS);
1208   sprintf (tmp_basename, "%08x", run_no);
1209   if (remove (tmp_basename) != 0)
1210     msg (SE, _("%s: Error removing temporary file: %s."),
1211          tmp_basename, strerror (errno));
1212   return 0;
1213 }
1214 \f
1215 /* External sort input program. */
1216
1217 /* Reads all the records from the source stream and passes them
1218    to write_case(). */
1219 static void
1220 sort_stream_read (void)
1221 {
1222   read_sort_output (write_case);
1223 }
1224
1225 /* Reads all the records from the output stream and passes them to the
1226    function provided, which must have an interface identical to
1227    write_case(). */
1228 void
1229 read_sort_output (int (*write_case) (void))
1230 {
1231   int i;
1232   FILE *f;
1233
1234   if (separate_case_tab)
1235     {
1236       struct ccase *save_temp_case = temp_case;
1237       struct case_list **p;
1238
1239       for (p = separate_case_tab; *p; p++)
1240         {
1241           temp_case = &(*p)->c;
1242           write_case ();
1243         }
1244       
1245       free (separate_case_tab);
1246       separate_case_tab = NULL;
1247             
1248       temp_case = save_temp_case;
1249     } else {
1250       sprintf (tmp_extname, "%08x", run_no - 1);
1251       f = fopen (tmp_basename, "rb");
1252       if (!f)
1253         {
1254           msg (ME, _("%s: Cannot open sort result file: %s."), tmp_basename,
1255                strerror (errno));
1256           err_failure ();
1257           return;
1258         }
1259
1260       for (i = 0; i < vfm_source_info.ncases; i++)
1261         {
1262           if (!fread (temp_case, vfm_source_info.case_size, 1, f))
1263             {
1264               if (ferror (f))
1265                 msg (ME, _("%s: Error reading sort result file: %s."),
1266                      tmp_basename, strerror (errno));
1267               else
1268                 msg (ME, _("%s: Unexpected end of sort result file: %s."),
1269                      tmp_basename, strerror (errno));
1270               err_failure ();
1271               break;
1272             }
1273
1274           if (!write_case ())
1275             break;
1276         }
1277
1278       if (fclose (f) == EOF)
1279         msg (ME, _("%s: Error closing sort result file: %s."), tmp_basename,
1280              strerror (errno));
1281
1282       if (remove (tmp_basename) != 0)
1283         msg (ME, _("%s: Error removing sort result file: %s."), tmp_basename,
1284              strerror (errno));
1285       else
1286         rmdir_temp_dir ();
1287     }
1288 }
1289
1290 #if 0 /* dead code */
1291 /* Alternate interface to sort_stream_write used for external sorting
1292    when SEPARATE is true. */
1293 static int
1294 write_separate (struct ccase *c)
1295 {
1296   assert (c == temp_case);
1297
1298   sort_stream_write ();
1299   return 1;
1300 }
1301 #endif
1302
1303 /* Performs one iteration of 5.4.1R steps R4, R5, R6, R7, R2, and
1304    R3. */
1305 static void
1306 sort_stream_write (void)
1307 {
1308   struct repl_sel_tree *t;
1309
1310   /* R4. */
1311   memcpy (q->record, temp_case->data, vfm_sink_info.case_size);
1312   if (compare_record (q->record, lastkey) < 0)
1313     if (++rq > rmax)
1314       rmax = rq;
1315
1316   /* R5. */
1317   t = q->fe;
1318
1319   /* R6 and R7. */
1320   for (;;)
1321     {
1322       /* R6. */
1323       if (t->rn < rq
1324           || (t->rn == rq && compare_record (t->loser->record, q->record) < 0))
1325         {
1326           struct repl_sel_tree *temp_tree;
1327           int temp_int;
1328
1329           temp_tree = t->loser;
1330           t->loser = q;
1331           q = temp_tree;
1332
1333           temp_int = t->rn;
1334           t->rn = rq;
1335           rq = temp_int;
1336         }
1337
1338       /* R7. */
1339       if (t == x[1])
1340         break;
1341       t = t->fi;
1342     }
1343
1344   /* R2. */
1345   if (rq != rc)
1346     {
1347       end_run ();
1348       begin_run ();
1349       assert (rq <= rmax);
1350       rc = rq;
1351     }
1352
1353   /* R3. */
1354   if (rq != 0)
1355     {
1356       output_record (q->record);
1357       lastkey = x[x_max]->record;
1358       memcpy (lastkey, q->record, vfm_sink_info.case_size);
1359     }
1360 }
1361
1362 /* Switches mode from sink to source. */
1363 static void
1364 sort_stream_mode (void)
1365 {
1366   /* If this is not done, then we get the following source/sink pairs:
1367      source=memory/disk/DATA LIST/etc., sink=SORT; source=SORT,
1368      sink=SORT; which is not good. */
1369   vfm_sink = NULL;
1370 }
1371
1372 struct case_stream sort_stream =
1373   {
1374     NULL,
1375     sort_stream_read,
1376     sort_stream_write,
1377     sort_stream_mode,
1378     NULL,
1379     NULL,
1380     "SORT",
1381   };