Make the expression code a little nicer and fix bugs found
[pspp-builds.git] / src / sort.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include "sort.h"
22 #include "error.h"
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <errno.h>
26 #include "algorithm.h"
27 #include "alloc.h"
28 #include "command.h"
29 #include "error.h"
30 #include "expr.h"
31 #include "lexer.h"
32 #include "misc.h"
33 #include "settings.h"
34 #include "str.h"
35 #include "var.h"
36 #include "vfm.h"
37 #include "vfmP.h"
38
39 #if HAVE_UNISTD_H
40 #include <unistd.h>
41 #endif
42
43 #if HAVE_SYS_TYPES_H
44 #include <sys/types.h>
45 #endif
46
47 #if HAVE_SYS_STAT_H
48 #include <sys/stat.h>
49 #endif
50
51 #include "debug-print.h"
52
53 /* Other prototypes. */
54 static int compare_record (const union value *, const union value *,
55                            const struct sort_cases_pgm *, int *idx_to_fv);
56 static int compare_case_lists (const void *, const void *, void *);
57 static struct internal_sort *do_internal_sort (struct sort_cases_pgm *,
58                                                int separate);
59 static void destroy_internal_sort (struct internal_sort *);
60 static struct external_sort *do_external_sort (struct sort_cases_pgm *,
61                                                int separate);
62 static void destroy_external_sort (struct external_sort *);
63 struct sort_cases_pgm *parse_sort (void);
64
65 /* Performs the SORT CASES procedures. */
66 int
67 cmd_sort_cases (void)
68 {
69   struct sort_cases_pgm *scp;
70   int success;
71
72   lex_match (T_BY);
73
74   scp = parse_sort ();
75   if (scp == NULL)
76     return CMD_FAILURE;
77
78   if (temporary != 0)
79     {
80       msg (SE, _("SORT CASES may not be used after TEMPORARY.  "
81                  "Temporary transformations will be made permanent."));
82       cancel_temporary (); 
83     }
84
85   success = sort_cases (scp, 0);
86   destroy_sort_cases_pgm (scp);
87   if (success)
88     return lex_end_of_command ();
89   else
90     return CMD_FAILURE;
91 }
92
93 /* Parses a list of sort keys and returns a struct sort_cases_pgm
94    based on it.  Returns a null pointer on error. */
95 struct sort_cases_pgm *
96 parse_sort (void)
97 {
98   struct sort_cases_pgm *scp;
99
100   scp = xmalloc (sizeof *scp);
101   scp->ref_cnt = 1;
102   scp->vars = NULL;
103   scp->dirs = NULL;
104   scp->var_cnt = 0;
105   scp->isrt = NULL;
106   scp->xsrt = NULL;
107
108   do
109     {
110       int prev_var_cnt = scp->var_cnt;
111       enum sort_direction direction = SRT_ASCEND;
112
113       /* Variables. */
114       if (!parse_variables (default_dict, &scp->vars, &scp->var_cnt,
115                             PV_NO_DUPLICATE | PV_APPEND | PV_NO_SCRATCH))
116         goto error;
117
118       /* Sort direction. */
119       if (lex_match ('('))
120         {
121           if (lex_match_id ("D") || lex_match_id ("DOWN"))
122             direction = SRT_DESCEND;
123           else if (!lex_match_id ("A") && !lex_match_id ("UP"))
124             {
125               msg (SE, _("`A' or `D' expected inside parentheses."));
126               goto error;
127             }
128           if (!lex_match (')'))
129             {
130               msg (SE, _("`)' expected."));
131               goto error;
132             }
133         }
134       scp->dirs = xrealloc (scp->dirs, sizeof *scp->dirs * scp->var_cnt);
135       for (; prev_var_cnt < scp->var_cnt; prev_var_cnt++)
136         scp->dirs[prev_var_cnt] = direction;
137     }
138   while (token != '.' && token != '/');
139   
140   return scp;
141
142  error:
143   destroy_sort_cases_pgm (scp);
144   return NULL;
145 }
146
147 /* Destroys a SORT CASES program. */
148 void
149 destroy_sort_cases_pgm (struct sort_cases_pgm *scp) 
150 {
151   if (scp != NULL) 
152     {
153       assert (scp->ref_cnt > 0);
154       if (--scp->ref_cnt > 0)
155         return;
156
157       free (scp->vars);
158       free (scp->dirs);
159       destroy_internal_sort (scp->isrt);
160       destroy_external_sort (scp->xsrt);
161       free (scp);
162     }
163 }
164
165 /* Sorts the active file based on the key variables specified in
166    global variables vars and var_cnt.  The output is either to the
167    active file, if SEPARATE is zero, or to a separate file, if
168    SEPARATE is nonzero.  In the latter case the output cases can be
169    read with a call to read_sort_output().  (In the former case the
170    output cases should be dealt with through the usual vfm interface.)
171
172    The caller is responsible for freeing vars[]. */
173 int
174 sort_cases (struct sort_cases_pgm *scp, int separate)
175 {
176   scp->case_size
177     = sizeof (union value) * dict_get_compacted_value_cnt (default_dict);
178
179   /* Not sure this is necessary but it's good to be safe. */
180   if (separate && case_source_is_class (vfm_source, &sort_source_class))
181     procedure (NULL, NULL);
182   
183   /* SORT CASES cancels PROCESS IF. */
184   expr_free (process_if_expr);
185   process_if_expr = NULL;
186
187   /* Try an internal sort first. */
188   scp->isrt = do_internal_sort (scp, separate);
189   if (scp->isrt != NULL) 
190     return 1; 
191
192   /* Fall back to an external sort. */
193   if (vfm_source != NULL
194       && case_source_is_class (vfm_source, &storage_source_class))
195     storage_source_to_disk (vfm_source);
196   scp->xsrt = do_external_sort (scp, separate);
197   if (scp->xsrt != NULL) 
198     return 1;
199
200   destroy_sort_cases_pgm (scp);
201   return 0;
202 }
203 \f
204 /* Results of an internal sort. */
205 struct internal_sort 
206   {
207     struct case_list **results;
208   };
209
210 /* If the data is in memory, do an internal sort.  Return
211    success. */
212 static struct internal_sort *
213 do_internal_sort (struct sort_cases_pgm *scp, int separate)
214 {
215   struct internal_sort *isrt;
216
217   isrt = xmalloc (sizeof *isrt);
218   isrt->results = NULL;
219
220   if (case_source_is_class (vfm_source, &storage_source_class)
221       && !storage_source_on_disk (vfm_source))
222     {
223       struct case_list *case_list;
224       struct case_list **case_array;
225       int case_cnt;
226       int i;
227
228       case_cnt = vfm_source->class->count (vfm_source);
229       if (case_cnt <= 0)
230         return isrt;
231
232       if (case_cnt > get_max_workspace() / sizeof *case_array)
233         goto error;
234
235       case_list = storage_source_get_cases (vfm_source);
236       case_array = malloc (sizeof *case_array * (case_cnt + 1));
237       if (case_array == NULL)
238         goto error;
239
240       for (i = 0; case_list != NULL; i++) 
241         {
242           case_array[i] = case_list;
243           case_list = case_list->next;
244         }
245       assert (i == case_cnt);
246       case_array[case_cnt] = NULL;
247
248       sort (case_array, case_cnt, sizeof *case_array,
249             compare_case_lists, scp);
250
251       if (!separate) 
252         {
253           storage_source_set_cases (vfm_source, case_array[0]);
254           for (i = 1; i <= case_cnt; i++)
255             case_array[i - 1]->next = case_array[i]; 
256           free (case_array);
257         }
258       else 
259         isrt->results = case_array;
260                       
261       return isrt;
262     }
263
264  error:
265   free (isrt);
266   return NULL;
267 }
268
269 /* Destroys an internal sort result. */
270 static void
271 destroy_internal_sort (struct internal_sort *isrt) 
272 {
273   if (isrt != NULL) 
274     {
275       free (isrt->results);
276       free (isrt);
277     }
278 }
279
280 /* Compares the VAR_CNT variables in VARS[] between the
281    `case_list's at A and B, and returns a strcmp()-type
282    result. */
283 static int
284 compare_case_lists (const void *a_, const void *b_, void *scp_)
285 {
286   struct sort_cases_pgm *scp = scp_;
287   struct case_list *const *pa = a_;
288   struct case_list *const *pb = b_;
289   struct case_list *a = *pa;
290   struct case_list *b = *pb;
291
292   return compare_record (a->c.data, b->c.data, scp, NULL);
293 }
294 \f
295 /* External sort. */
296
297 /* Maximum order of merge.  If you increase this, then you should
298    use a heap for comparing cases during merge.  */
299 #define MAX_MERGE_ORDER         7
300
301 /* Minimum total number of records for buffers. */
302 #define MIN_BUFFER_TOTAL_SIZE_RECS      64
303
304 /* Minimum single input buffer size, in bytes and records. */
305 #define MIN_BUFFER_SIZE_BYTES   4096
306 #define MIN_BUFFER_SIZE_RECS    16
307
308 #if MIN_BUFFER_SIZE_RECS * 2 + 16 > MIN_BUFFER_TOTAL_SIZE_RECS
309 #error MIN_BUFFER_SIZE_RECS and MIN_BUFFER_TOTAL_SIZE_RECS do not make sense.
310 #endif
311
312 /* An initial run and its length. */
313 struct initial_run 
314   {
315     int file_idx;                     /* File index. */
316     size_t case_cnt;                  /* Number of cases. */
317   };
318
319 /* Sorts initial runs A and B in decending order by length. */
320 static int
321 compare_initial_runs (const void *a_, const void *b_, void *aux UNUSED) 
322 {
323   const struct initial_run *a = a_;
324   const struct initial_run *b = b_;
325   
326   return a->case_cnt > b->case_cnt ? -1 : a->case_cnt <b->case_cnt;
327 }
328
329 /* Results of an external sort. */
330 struct external_sort 
331   {
332     struct sort_cases_pgm *scp;       /* SORT CASES info. */
333     struct initial_run *initial_runs; /* Array of initial runs. */
334     size_t run_cnt, run_cap;          /* Number of runs, allocated capacity. */
335     char *temp_dir;                   /* Temporary file directory name. */
336     char *temp_name;                  /* Name of a temporary file. */
337     int next_file_idx;                /* Lowest unused file index. */
338   };
339
340 /* Prototypes for helper functions. */
341 static void sort_sink_write (struct case_sink *, const struct ccase *);
342 static int write_initial_runs (struct external_sort *, int separate);
343 static int init_external_sort (struct external_sort *);
344 static int merge (struct external_sort *);
345 static void rmdir_temp_dir (struct external_sort *);
346 static void remove_temp_file (struct external_sort *xsrt, int file_idx);
347
348 /* Performs an external sort of the active file according to the
349    specification in SCP.  Forms initial runs using a heap as a
350    reservoir.  Determines the optimum merge pattern via Huffman's
351    method (see Knuth vol. 3, 2nd edition, p. 365-366), and merges
352    according to that pattern. */
353 static struct external_sort *
354 do_external_sort (struct sort_cases_pgm *scp, int separate)
355 {
356   struct external_sort *xsrt;
357   int success = 0;
358
359   xsrt = xmalloc (sizeof *xsrt);
360   xsrt->scp = scp;
361   if (!init_external_sort (xsrt))
362     goto done;
363   if (!write_initial_runs (xsrt, separate))
364     goto done;
365   if (!merge (xsrt))
366     goto done;
367
368   success = 1;
369
370  done:
371   if (success)
372     {
373       /* Don't destroy anything because we'll need it for reading
374          the output. */
375       return xsrt;
376     }
377   else
378     {
379       destroy_external_sort (xsrt);
380       return NULL;
381     }
382 }
383
384 /* Destroys XSRT. */
385 static void
386 destroy_external_sort (struct external_sort *xsrt) 
387 {
388   if (xsrt != NULL) 
389     {
390       int i;
391       
392       for (i = 0; i < xsrt->run_cnt; i++)
393         remove_temp_file (xsrt, xsrt->initial_runs[i].file_idx);
394       rmdir_temp_dir (xsrt);
395       free (xsrt->temp_dir);
396       free (xsrt->temp_name);
397       free (xsrt->initial_runs);
398       free (xsrt);
399     }
400 }
401
402 #ifdef HAVE_MKDTEMP
403 /* Creates and returns the name of a temporary directory. */
404 static char *
405 make_temp_dir (void) 
406 {
407   const char *parent_dir;
408   char *temp_dir;
409
410   if (getenv ("TMPDIR") != NULL)
411     parent_dir = getenv ("TMPDIR");
412   else
413     parent_dir = P_tmpdir;
414
415   temp_dir = xmalloc (strlen (parent_dir) + 32);
416   sprintf (temp_dir, "%s%cpsppXXXXXX", parent_dir, DIR_SEPARATOR);
417   if (mkdtemp (temp_dir) == NULL) 
418     {
419       msg (SE, _("%s: Creating temporary directory: %s."),
420            temp_dir, strerror (errno));
421       free (temp_dir);
422       return NULL;
423     }
424   else
425     return temp_dir;
426 }
427 #else /* !HAVE_MKDTEMP */
428 /* Creates directory DIR. */
429 static int
430 do_mkdir (const char *dir) 
431 {
432 #ifndef __MSDOS__
433   return mkdir (dir, S_IRWXU);
434 #else
435   return mkdir (dir);
436 #endif
437 }
438
439 /* Creates and returns the name of a temporary directory. */
440 static char *
441 make_temp_dir (void) 
442 {
443   int i;
444   
445   for (i = 0; i < 100; i++)
446     {
447       char temp_dir[L_tmpnam + 1];
448       if (tmpnam (temp_dir) == NULL) 
449         {
450           msg (SE, _("Generating temporary directory name failed: %s."),
451                strerror (errno));
452           return NULL; 
453         }
454       else if (do_mkdir (temp_dir) == 0)
455         return xstrdup (temp_dir);
456     }
457   
458   msg (SE, _("Creating temporary directory failed: %s."), strerror (errno));
459   return NULL;
460 }
461 #endif /* !HAVE_MKDTEMP */
462
463 /* Sets up to open temporary files. */
464 static int
465 init_external_sort (struct external_sort *xsrt)
466 {
467   /* Zero. */
468   xsrt->temp_dir = NULL;
469   xsrt->next_file_idx = 0;
470
471   /* Huffman queue. */
472   xsrt->run_cap = 512;
473   xsrt->run_cnt = 0;
474   xsrt->initial_runs = xmalloc (sizeof *xsrt->initial_runs * xsrt->run_cap);
475
476   /* Temporary directory. */
477   xsrt->temp_dir = make_temp_dir ();
478   xsrt->temp_name = NULL;
479   if (xsrt->temp_dir == NULL)
480     return 0;
481   xsrt->temp_name = xmalloc (strlen (xsrt->temp_dir) + 64);
482
483   return 1;
484 }
485
486 /* Returns nonzero if we should return an error even though the
487    operation succeeded.  Useful for testing. */
488 static int
489 simulate_error (void) 
490 {
491   static int op_err_cnt = -1;
492   static int op_cnt;
493   
494   if (op_err_cnt == -1 || op_cnt++ < op_err_cnt)
495     return 0;
496   else
497     {
498       errno = 0;
499       return 1;
500     }
501 }
502
503 /* Removes the directory created for temporary files, if one
504    exists. */
505 static void
506 rmdir_temp_dir (struct external_sort *xsrt)
507 {
508   if (xsrt->temp_dir != NULL && rmdir (xsrt->temp_dir) == -1) 
509     {
510       msg (SW, _("%s: Error removing directory for temporary files: %s."),
511            xsrt->temp_dir, strerror (errno));
512       xsrt->temp_dir = NULL; 
513     }
514 }
515
516 /* Returns the name of temporary file number FILE_IDX for XSRT.
517    The name is written into a static buffer, so be careful.  */
518 static char *
519 get_temp_file_name (struct external_sort *xsrt, int file_idx)
520 {
521   assert (xsrt->temp_dir != NULL);
522   sprintf (xsrt->temp_name, "%s%c%04d",
523            xsrt->temp_dir, DIR_SEPARATOR, file_idx);
524   return xsrt->temp_name;
525 }
526
527 /* Opens temporary file numbered FILE_IDX for XSRT with mode MODE
528    and returns the FILE *. */
529 static FILE *
530 open_temp_file (struct external_sort *xsrt, int file_idx, const char *mode)
531 {
532   char *temp_file;
533   FILE *file;
534
535   temp_file = get_temp_file_name (xsrt, file_idx);
536
537   file = fopen (temp_file, mode);
538   if (simulate_error () || file == NULL) 
539     msg (SE, _("%s: Error opening temporary file for %s: %s."),
540          temp_file, mode[0] == 'r' ? "reading" : "writing",
541          strerror (errno));
542
543   return file;
544 }
545
546 /* Closes FILE, which is the temporary file numbered FILE_IDX
547    under XSRT.  Returns nonzero only if successful.  */
548 static int
549 close_temp_file (struct external_sort *xsrt, int file_idx, FILE *file)
550 {
551   if (file != NULL) 
552     {
553       char *temp_file = get_temp_file_name (xsrt, file_idx);
554       if (simulate_error () || fclose (file) == EOF) 
555         {
556           msg (SE, _("%s: Error closing temporary file: %s."),
557                temp_file, strerror (errno));
558           return 0;
559         }
560     }
561   return 1;
562 }
563
564 /* Delete temporary file numbered FILE_IDX for XSRT. */
565 static void
566 remove_temp_file (struct external_sort *xsrt, int file_idx) 
567 {
568   if (file_idx != -1)
569     {
570       char *temp_file = get_temp_file_name (xsrt, file_idx);
571       if (simulate_error () || remove (temp_file) != 0)
572         msg (SW, _("%s: Error removing temporary file: %s."),
573              temp_file, strerror (errno));
574     }
575 }
576
577 /* Writes SIZE bytes from buffer DATA into FILE, which is
578    temporary file numbered FILE_IDX from XSRT. */
579 static int
580 write_temp_file (struct external_sort *xsrt, int file_idx,
581                  FILE *file, const void *data, size_t size) 
582 {
583   if (!simulate_error () && fwrite (data, size, 1, file) == 1)
584     return 1;
585   else
586     {
587       char *temp_file = get_temp_file_name (xsrt, file_idx);
588       msg (SE, _("%s: Error writing temporary file: %s."),
589            temp_file, strerror (errno));
590       return 0;
591     }
592 }
593
594 /* Reads SIZE bytes into buffer DATA into FILE, which is
595    temporary file numbered FILE_IDX from XSRT. */
596 static int
597 read_temp_file (struct external_sort *xsrt, int file_idx,
598                 FILE *file, void *data, size_t size) 
599 {
600   if (!simulate_error () && fread (data, size, 1, file) == 1)
601     return 1;
602   else 
603     {
604       char *temp_file = get_temp_file_name (xsrt, file_idx);
605       if (ferror (file))
606         msg (SE, _("%s: Error reading temporary file: %s."),
607              temp_file, strerror (errno));
608       else
609         msg (SE, _("%s: Unexpected end of temporary file."),
610              temp_file);
611       return 0;
612     }
613 }
614 \f
615 /* Replacement selection. */
616
617 /* Pairs a record with a run number. */
618 struct record_run
619   {
620     int run;                    /* Run number of case. */
621     struct case_list *record;   /* Case data. */
622   };
623
624 /* Represents a set of initial runs during an external sort. */
625 struct initial_run_state 
626   {
627     struct external_sort *xsrt;
628
629     int *idx_to_fv;             /* Translation table copied from sink. */
630
631     /* Reservoir. */
632     struct record_run *records; /* Records arranged as a heap. */
633     size_t record_cnt;          /* Current number of records. */
634     size_t record_cap;          /* Capacity for records. */
635     struct case_list *free_list;/* Cases not in heap. */
636     
637     /* Run currently being output. */
638     int file_idx;               /* Temporary file number. */
639     size_t case_cnt;            /* Number of cases so far. */
640     FILE *output_file;          /* Output file. */
641     struct case_list *last_output;/* Record last output. */
642
643     int okay;                   /* Zero if an error has been encountered. */
644   };
645
646 static const struct case_sink_class sort_sink_class;
647
648 static void destroy_initial_run_state (struct initial_run_state *irs);
649 static int allocate_cases (struct initial_run_state *);
650 static struct case_list *grab_case (struct initial_run_state *);
651 static void release_case (struct initial_run_state *, struct case_list *);
652 static void output_record (struct initial_run_state *irs);
653 static void start_run (struct initial_run_state *irs);
654 static void end_run (struct initial_run_state *irs);
655 static int compare_record_run (const struct record_run *,
656                                const struct record_run *,
657                                struct initial_run_state *);
658 static int compare_record_run_minheap (const void *, const void *, void *);
659
660 /* Writes initial runs for XSRT, sending them to a separate file
661    if SEPARATE is nonzero. */
662 static int
663 write_initial_runs (struct external_sort *xsrt, int separate)
664 {
665   struct initial_run_state *irs;
666   int success = 0;
667
668   /* Allocate memory for cases. */
669   irs = xmalloc (sizeof *irs);
670   irs->xsrt = xsrt;
671   irs->records = NULL;
672   irs->record_cnt = irs->record_cap = 0;
673   irs->free_list = NULL;
674   irs->output_file = NULL;
675   irs->last_output = NULL;
676   irs->file_idx = 0;
677   irs->case_cnt = 0;
678   irs->okay = 1;
679   if (!allocate_cases (irs)) 
680     goto done;
681
682   /* Create case sink. */
683   if (!separate)
684     {
685       if (vfm_sink != NULL && vfm_sink->class->destroy != NULL)
686         vfm_sink->class->destroy (vfm_sink);
687       vfm_sink = create_case_sink (&sort_sink_class, default_dict, irs);
688       xsrt->scp->ref_cnt++;
689     }
690
691   /* Create initial runs. */
692   start_run (irs);
693   procedure (NULL, NULL);
694   irs->idx_to_fv = NULL;
695   while (irs->record_cnt > 0 && irs->okay)
696     output_record (irs);
697   end_run (irs);
698
699   success = irs->okay;
700
701  done:
702   destroy_initial_run_state (irs);
703
704   return success;
705 }
706
707 /* Add a single case to an initial run. */
708 static void
709 sort_sink_write (struct case_sink *sink, const struct ccase *c)
710 {
711   struct initial_run_state *irs = sink->aux;
712   struct record_run *new_record_run;
713
714   if (!irs->okay)
715     return;
716
717   irs->idx_to_fv = sink->idx_to_fv;
718
719   /* Compose record_run for this run and add to heap. */
720   assert (irs->record_cnt < irs->record_cap);
721   new_record_run = irs->records + irs->record_cnt++;
722   new_record_run->record = grab_case (irs);
723   memcpy (new_record_run->record->c.data, c->data, irs->xsrt->scp->case_size);
724   new_record_run->run = irs->file_idx;
725   if (irs->last_output != NULL
726       && compare_record (c->data, irs->last_output->c.data,
727                          irs->xsrt->scp, sink->idx_to_fv) < 0)
728     new_record_run->run = irs->xsrt->next_file_idx;
729   push_heap (irs->records, irs->record_cnt, sizeof *irs->records,
730              compare_record_run_minheap, irs);
731
732   /* Output a record if the reservoir is full. */
733   if (irs->record_cnt == irs->record_cap && irs->okay)
734     output_record (irs);
735 }
736
737 /* Destroys the initial run state represented by IRS. */
738 static void
739 destroy_initial_run_state (struct initial_run_state *irs) 
740 {
741   struct case_list *iter, *next;
742   int i;
743
744   if (irs == NULL)
745     return;
746
747   /* Release cases to free list. */
748   for (i = 0; i < irs->record_cnt; i++)
749     release_case (irs, irs->records[i].record);
750   if (irs->last_output != NULL)
751     release_case (irs, irs->last_output);
752
753   /* Free cases in free list. */
754   for (iter = irs->free_list; iter != NULL; iter = next) 
755     {
756       next = iter->next;
757       free (iter);
758     }
759
760   free (irs->records);
761   if (irs->output_file != NULL)
762     close_temp_file (irs->xsrt, irs->file_idx, irs->output_file);
763
764   free (irs);
765 }
766
767 /* Allocates room for lots of cases as a buffer. */
768 static int
769 allocate_cases (struct initial_run_state *irs)
770 {
771   int approx_case_cost; /* Approximate memory cost of one case in bytes. */
772   int max_cases;        /* Maximum number of cases to allocate. */
773   int i;
774
775   /* Allocate as many cases as we can within the workspace
776      limit. */
777   approx_case_cost = (sizeof *irs->records
778                       + sizeof *irs->free_list
779                       + irs->xsrt->scp->case_size
780                       + 4 * sizeof (void *));
781   max_cases = get_max_workspace() / approx_case_cost;
782   irs->records = malloc (sizeof *irs->records * max_cases);
783   for (i = 0; i < max_cases; i++)
784     {
785       struct case_list *c;
786       c = malloc (sizeof *c
787                   + irs->xsrt->scp->case_size
788                   - sizeof (union value));
789       if (c == NULL) 
790         {
791           max_cases = i;
792           break;
793         }
794       release_case (irs, c);
795     }
796
797   /* irs->records gets all but one of the allocated cases.
798      The extra is used for last_output. */
799   irs->record_cap = max_cases - 1;
800
801   /* Fail if we didn't allocate an acceptable number of cases. */
802   if (irs->records == NULL || max_cases < MIN_BUFFER_TOTAL_SIZE_RECS)
803     {
804       msg (SE, _("Out of memory.  Could not allocate room for minimum of %d "
805                  "cases of %d bytes each.  (PSPP workspace is currently "
806                  "restricted to a maximum of %d KB.)"),
807            MIN_BUFFER_TOTAL_SIZE_RECS, approx_case_cost, get_max_workspace() / 1024);
808       return 0;
809     }
810   return 1;
811 }
812
813 /* Compares the VAR_CNT variables in VARS[] between the `value's at
814    A and B, and returns a strcmp()-type result. */
815 static int
816 compare_record (const union value *a, const union value *b,
817                 const struct sort_cases_pgm *scp,
818                 int *idx_to_fv)
819 {
820   int i;
821
822   assert (a != NULL);
823   assert (b != NULL);
824   
825   for (i = 0; i < scp->var_cnt; i++)
826     {
827       struct variable *v = scp->vars[i];
828       int fv;
829       int result;
830
831       if (idx_to_fv != NULL)
832         fv = idx_to_fv[v->index];
833       else
834         fv = v->fv;
835       
836       if (v->type == NUMERIC)
837         {
838           double af = a[fv].f;
839           double bf = b[fv].f;
840           
841           result = af < bf ? -1 : af > bf;
842         }
843       else
844         result = memcmp (a[fv].s, b[fv].s, v->width);
845
846       if (result != 0) 
847         {
848           if (scp->dirs[i] == SRT_DESCEND)
849             result = -result;
850           return result;
851         }
852     }
853
854   return 0;
855 }
856
857 /* Compares record-run tuples A and B on run number first, then
858    on the current record according to SCP. */
859 static int
860 compare_record_run (const struct record_run *a,
861                     const struct record_run *b,
862                     struct initial_run_state *irs)
863 {
864   if (a->run != b->run)
865     return a->run > b->run ? 1 : -1;
866   else
867     return compare_record (a->record->c.data, b->record->c.data,
868                            irs->xsrt->scp, irs->idx_to_fv);
869 }
870
871 /* Compares record-run tuples A and B on run number first, then
872    on the current record according to SCP, but in descending
873    order. */
874 static int
875 compare_record_run_minheap (const void *a, const void *b, void *irs) 
876 {
877   return -compare_record_run (a, b, irs);
878 }
879
880 /* Begins a new initial run, specifically its output file. */
881 static void
882 start_run (struct initial_run_state *irs)
883 {
884   irs->file_idx = irs->xsrt->next_file_idx++;
885   irs->case_cnt = 0;
886   irs->output_file = open_temp_file (irs->xsrt, irs->file_idx, "wb");
887   if (irs->output_file == NULL) 
888     irs->okay = 0;
889   if (irs->last_output != NULL) 
890     {
891       release_case (irs, irs->last_output);
892       irs->last_output = NULL; 
893     }
894 }
895
896 /* Ends the current initial run.  */
897 static void
898 end_run (struct initial_run_state *irs)
899 {
900   struct external_sort *xsrt = irs->xsrt;
901   
902   /* Record initial run. */
903   if (xsrt->run_cnt >= xsrt->run_cap) 
904     {
905       xsrt->run_cap *= 2;
906       xsrt->initial_runs
907         = xrealloc (xsrt->initial_runs,
908                     sizeof *xsrt->initial_runs * xsrt->run_cap);
909     }
910   xsrt->initial_runs[xsrt->run_cnt].file_idx = irs->file_idx;
911   xsrt->initial_runs[xsrt->run_cnt].case_cnt = irs->case_cnt;
912   xsrt->run_cnt++;
913
914   /* Close file handle. */
915   if (irs->output_file != NULL
916       && !close_temp_file (irs->xsrt, irs->file_idx, irs->output_file)) 
917     irs->okay = 0;
918   irs->output_file = NULL;
919 }
920
921 /* Writes a record to the current initial run. */
922 static void
923 output_record (struct initial_run_state *irs)
924 {
925   struct record_run *record_run;
926   
927   /* Extract minimum case from heap. */
928   assert (irs->record_cnt > 0);
929   pop_heap (irs->records, irs->record_cnt--, sizeof *irs->records,
930             compare_record_run_minheap, irs);
931   record_run = irs->records + irs->record_cnt;
932
933   /* Bail if an error has occurred. */
934   if (!irs->okay)
935     return;
936
937   /* Start new run if necessary. */
938   assert (record_run->run == irs->file_idx
939           || record_run->run == irs->xsrt->next_file_idx);
940   if (record_run->run != irs->file_idx)
941     {
942       end_run (irs);
943       start_run (irs);
944     }
945   assert (record_run->run == irs->file_idx);
946   irs->case_cnt++;
947
948   /* Write to disk. */
949   if (irs->output_file != NULL
950       && !write_temp_file (irs->xsrt, irs->file_idx, irs->output_file,
951                            &record_run->record->c, irs->xsrt->scp->case_size))
952     irs->okay = 0;
953
954   /* This record becomes last_output. */
955   if (irs->last_output != NULL)
956     release_case (irs, irs->last_output);
957   irs->last_output = record_run->record;
958 }
959
960 /* Gets a case from the free list in IRS.  It is an error to call
961    this function if the free list is empty. */
962 static struct case_list *
963 grab_case (struct initial_run_state *irs)
964 {
965   struct case_list *c;
966   
967   assert (irs != NULL);
968   assert (irs->free_list != NULL);
969
970   c = irs->free_list;
971   irs->free_list = c->next;
972   return c;
973 }
974
975 /* Returns C to the free list in IRS. */
976 static void 
977 release_case (struct initial_run_state *irs, struct case_list *c) 
978 {
979   assert (irs != NULL);
980   assert (c != NULL);
981
982   c->next = irs->free_list;
983   irs->free_list = c;
984 }
985 \f
986 /* Merging. */
987
988 /* State of merging initial runs. */
989 struct merge_state 
990   {
991     struct external_sort *xsrt; /* External sort state. */
992     struct ccase **cases;       /* Buffers. */
993     size_t case_cnt;            /* Number of buffers. */
994   };
995
996 struct run;
997 static int merge_once (struct merge_state *,
998                        const struct initial_run[], size_t,
999                        struct initial_run *);
1000 static int fill_run_buffer (struct merge_state *, struct run *);
1001 static int mod (int, int);
1002
1003 /* Performs a series of P-way merges of initial runs
1004    method. */
1005 static int
1006 merge (struct external_sort *xsrt)
1007 {
1008   struct merge_state mrg;       /* State of merge. */
1009   size_t approx_case_cost;      /* Approximate memory cost of one case. */
1010   int max_order;                /* Maximum order of merge. */
1011   size_t dummy_run_cnt;         /* Number of dummy runs to insert. */
1012   int success = 0;
1013   int i;
1014
1015   mrg.xsrt = xsrt;
1016
1017   /* Allocate as many cases as possible into cases. */
1018   approx_case_cost = (sizeof *mrg.cases
1019                       + xsrt->scp->case_size + 4 * sizeof (void *));
1020   mrg.case_cnt = get_max_workspace() / approx_case_cost;
1021   mrg.cases = malloc (sizeof *mrg.cases * mrg.case_cnt);
1022   if (mrg.cases == NULL)
1023     goto done;
1024   for (i = 0; i < mrg.case_cnt; i++) 
1025     {
1026       mrg.cases[i] = malloc (xsrt->scp->case_size);
1027       if (mrg.cases[i] == NULL) 
1028         {
1029           mrg.case_cnt = i;
1030           break;
1031         }
1032     }
1033   if (mrg.case_cnt < MIN_BUFFER_TOTAL_SIZE_RECS)
1034     {
1035       msg (SE, _("Out of memory.  Could not allocate room for minimum of %d "
1036                  "cases of %d bytes each.  (PSPP workspace is currently "
1037                  "restricted to a maximum of %d KB.)"),
1038            MIN_BUFFER_TOTAL_SIZE_RECS, approx_case_cost, get_max_workspace() / 1024);
1039       return 0;
1040     }
1041
1042   /* Determine maximum order of merge. */
1043   max_order = MAX_MERGE_ORDER;
1044   if (mrg.case_cnt / max_order < MIN_BUFFER_SIZE_RECS)
1045     max_order = mrg.case_cnt / MIN_BUFFER_SIZE_RECS;
1046   else if (mrg.case_cnt / max_order * xsrt->scp->case_size
1047            < MIN_BUFFER_SIZE_BYTES)
1048     max_order = mrg.case_cnt / (MIN_BUFFER_SIZE_BYTES / xsrt->scp->case_size);
1049   if (max_order < 2)
1050     max_order = 2;
1051   if (max_order > xsrt->run_cnt)
1052     max_order = xsrt->run_cnt;
1053
1054   /* Repeatedly merge the P shortest existing runs until only one run
1055      is left. */
1056   make_heap (xsrt->initial_runs, xsrt->run_cnt, sizeof *xsrt->initial_runs,
1057              compare_initial_runs, NULL);
1058   dummy_run_cnt = mod (1 - (int) xsrt->run_cnt, max_order - 1);
1059   assert (max_order == 1
1060           || (xsrt->run_cnt + dummy_run_cnt) % (max_order - 1) == 1);
1061   while (xsrt->run_cnt > 1)
1062     {
1063       struct initial_run output_run;
1064       int order;
1065       int i;
1066
1067       /* Choose order of merge (max_order after first merge). */
1068       order = max_order - dummy_run_cnt;
1069       dummy_run_cnt = 0;
1070
1071       /* Choose runs to merge. */
1072       assert (xsrt->run_cnt >= order);
1073       for (i = 0; i < order; i++) 
1074         pop_heap (xsrt->initial_runs, xsrt->run_cnt--,
1075                   sizeof *xsrt->initial_runs,
1076                   compare_initial_runs, NULL); 
1077           
1078       /* Merge runs. */
1079       if (!merge_once (&mrg, xsrt->initial_runs + xsrt->run_cnt, order,
1080                        &output_run))
1081         goto done;
1082
1083       /* Add output run to heap. */
1084       xsrt->initial_runs[xsrt->run_cnt++] = output_run;
1085       push_heap (xsrt->initial_runs, xsrt->run_cnt, sizeof *xsrt->initial_runs,
1086                  compare_initial_runs, NULL);
1087     }
1088
1089   /* Exactly one run is left, which contains the entire sorted
1090      file.  We could use it to find a total case count. */
1091   assert (xsrt->run_cnt == 1);
1092
1093   success = 1;
1094
1095  done:
1096   for (i = 0; i < mrg.case_cnt; i++)
1097     free (mrg.cases[i]);
1098   free (mrg.cases);
1099
1100   return success;
1101 }
1102
1103 /* Modulo function as defined by Knuth. */
1104 static int
1105 mod (int x, int y)
1106 {
1107   if (y == 0)
1108     return x;
1109   else if (x == 0)
1110     return 0;
1111   else if (x > 0 && y > 0)
1112     return x % y;
1113   else if (x < 0 && y > 0)
1114     return y - (-x) % y;
1115
1116   assert (0);
1117   abort ();
1118 }
1119
1120 /* A run of data for use in merging. */
1121 struct run 
1122   {
1123     FILE *file;                 /* File that contains run. */
1124     int file_idx;               /* Index of file that contains run. */
1125     struct ccase **buffer;      /* Case buffer. */
1126     struct ccase **buffer_head; /* First unconsumed case in buffer. */
1127     struct ccase **buffer_tail; /* One past last unconsumed case in buffer. */
1128     size_t buffer_cap;          /* Number of cases buffer can hold. */
1129     size_t unread_case_cnt;     /* Number of cases not yet read. */
1130   };
1131
1132 /* Merges the RUN_CNT initial runs specified in INPUT_RUNS into a
1133    new run.  Returns nonzero only if successful.  Adds an entry
1134    to MRG->xsrt->runs for the output file if and only if the
1135    output file is actually created.  Always deletes all the input
1136    files. */
1137 static int
1138 merge_once (struct merge_state *mrg,
1139             const struct initial_run input_runs[],
1140             size_t run_cnt,
1141             struct initial_run *output_run)
1142 {
1143   struct run runs[MAX_MERGE_ORDER];
1144   FILE *output_file = NULL;
1145   int success = 0;
1146   int i;
1147
1148   /* Initialize runs[]. */
1149   for (i = 0; i < run_cnt; i++) 
1150     {
1151       runs[i].file = NULL;
1152       runs[i].file_idx = input_runs[i].file_idx;
1153       runs[i].buffer = mrg->cases + mrg->case_cnt / run_cnt * i;
1154       runs[i].buffer_head = runs[i].buffer;
1155       runs[i].buffer_tail = runs[i].buffer;
1156       runs[i].buffer_cap = mrg->case_cnt / run_cnt;
1157       runs[i].unread_case_cnt = input_runs[i].case_cnt;
1158     }
1159
1160   /* Open input files. */
1161   for (i = 0; i < run_cnt; i++) 
1162     {
1163       runs[i].file = open_temp_file (mrg->xsrt, runs[i].file_idx, "rb");
1164       if (runs[i].file == NULL)
1165         goto error;
1166     }
1167   
1168   /* Create output file and count cases to be output. */
1169   output_run->file_idx = mrg->xsrt->next_file_idx++;
1170   output_run->case_cnt = 0;
1171   for (i = 0; i < run_cnt; i++)
1172     output_run->case_cnt += input_runs[i].case_cnt;
1173   output_file = open_temp_file (mrg->xsrt, output_run->file_idx, "wb");
1174   if (output_file == NULL) 
1175     goto error;
1176
1177   /* Prime buffers. */
1178   for (i = 0; i < run_cnt; i++)
1179     if (!fill_run_buffer (mrg, runs + i))
1180       goto error;
1181
1182   /* Merge. */
1183   while (run_cnt > 0) 
1184     {
1185       struct run *min_run;
1186
1187       /* Find minimum. */
1188       min_run = runs;
1189       for (i = 1; i < run_cnt; i++)
1190         if (compare_record ((*runs[i].buffer_head)->data,
1191                             (*min_run->buffer_head)->data,
1192                             mrg->xsrt->scp, NULL) < 0)
1193           min_run = runs + i;
1194
1195       /* Write minimum to output file. */
1196       if (!write_temp_file (mrg->xsrt, min_run->file_idx, output_file,
1197                             (*min_run->buffer_head)->data,
1198                             mrg->xsrt->scp->case_size))
1199         goto error;
1200
1201       /* Remove case from buffer. */
1202       if (++min_run->buffer_head >= min_run->buffer_tail)
1203         {
1204           /* Buffer is empty.  Fill from file. */
1205           if (!fill_run_buffer (mrg, min_run))
1206             goto error;
1207
1208           /* If buffer is still empty, delete its run. */
1209           if (min_run->buffer_head >= min_run->buffer_tail)
1210             {
1211               close_temp_file (mrg->xsrt, min_run->file_idx, min_run->file);
1212               remove_temp_file (mrg->xsrt, min_run->file_idx);
1213               *min_run = runs[--run_cnt];
1214
1215               /* We could donate the now-unused buffer space to
1216                  other runs. */
1217             }
1218         } 
1219     }
1220
1221   /* Close output file.  */
1222   close_temp_file (mrg->xsrt, output_run->file_idx, output_file);
1223
1224   return 1;
1225
1226  error:
1227   /* Close and remove output file.  */
1228   if (output_file != NULL) 
1229     {
1230       close_temp_file (mrg->xsrt, output_run->file_idx, output_file);
1231       remove_temp_file (mrg->xsrt, output_run->file_idx);
1232     }
1233   
1234   /* Close and remove any remaining input runs. */
1235   for (i = 0; i < run_cnt; i++) 
1236     {
1237       close_temp_file (mrg->xsrt, runs[i].file_idx, runs[i].file);
1238       remove_temp_file (mrg->xsrt, runs[i].file_idx);
1239     }
1240
1241   return success;
1242 }
1243
1244 /* Reads as many cases as possible into RUN's buffer.
1245    Reads nonzero unless a disk error occurs. */
1246 static int
1247 fill_run_buffer (struct merge_state *mrg, struct run *run) 
1248 {
1249   run->buffer_head = run->buffer_tail = run->buffer;
1250   while (run->unread_case_cnt > 0
1251          && run->buffer_tail < run->buffer + run->buffer_cap)
1252     {
1253       if (!read_temp_file (mrg->xsrt, run->file_idx, run->file,
1254                            (*run->buffer_tail)->data,
1255                            mrg->xsrt->scp->case_size))
1256         return 0;
1257
1258       run->unread_case_cnt--;
1259       run->buffer_tail++;
1260     }
1261
1262   return 1;
1263 }
1264 \f
1265 static struct case_source *
1266 sort_sink_make_source (struct case_sink *sink) 
1267 {
1268   struct initial_run_state *irs = sink->aux;
1269
1270   return create_case_source (&sort_source_class, default_dict,
1271                              irs->xsrt->scp);
1272 }
1273
1274 static const struct case_sink_class sort_sink_class = 
1275   {
1276     "SORT CASES",
1277     NULL,
1278     sort_sink_write,
1279     NULL,
1280     sort_sink_make_source,
1281   };
1282 \f
1283 struct sort_source_aux 
1284   {
1285     struct sort_cases_pgm *scp;
1286     struct ccase *dst;
1287     write_case_func *write_case;
1288     write_case_data wc_data;
1289   };
1290
1291 /* Passes C to the write_case function. */
1292 static int
1293 sort_source_read_helper (const struct ccase *src, void *aux_) 
1294 {
1295   struct sort_source_aux *aux = aux_;
1296
1297   memcpy (aux->dst, src, aux->scp->case_size);
1298   return aux->write_case (aux->wc_data);
1299 }
1300
1301 /* Reads all the records from the source stream and passes them
1302    to write_case(). */
1303 static void
1304 sort_source_read (struct case_source *source,
1305                   struct ccase *c,
1306                   write_case_func *write_case, write_case_data wc_data)
1307 {
1308   struct sort_cases_pgm *scp = source->aux;
1309   struct sort_source_aux aux;
1310
1311   aux.scp = scp;
1312   aux.dst = c;
1313   aux.write_case = write_case;
1314   aux.wc_data = wc_data;
1315   
1316   read_sort_output (scp, sort_source_read_helper, &aux);
1317 }
1318
1319 static void read_internal_sort_output (struct internal_sort *isrt,
1320                                        read_sort_output_func *, void *aux);
1321 static void read_external_sort_output (struct external_sort *xsrt,
1322                                        read_sort_output_func *, void *aux);
1323
1324 /* Reads all the records from the output stream and passes them to the
1325    function provided, which must have an interface identical to
1326    write_case(). */
1327 void
1328 read_sort_output (struct sort_cases_pgm *scp,
1329                   read_sort_output_func *output_func, void *aux)
1330 {
1331   assert ((scp->isrt != NULL) + (scp->xsrt != NULL) <= 1);
1332   if (scp->isrt != NULL)
1333     read_internal_sort_output (scp->isrt, output_func, aux);
1334   else if (scp->xsrt != NULL)
1335     read_external_sort_output (scp->xsrt, output_func, aux);
1336   else 
1337     {
1338       /* No results.  Probably an external sort that failed. */
1339     }
1340 }
1341
1342 static void
1343 read_internal_sort_output (struct internal_sort *isrt,
1344                            read_sort_output_func *output_func,
1345                            void *aux)
1346 {
1347   struct case_list **p;
1348
1349   for (p = isrt->results; *p; p++)
1350     if (!output_func (&(*p)->c, aux))
1351       break;
1352   free (isrt->results);
1353 }
1354
1355 static void
1356 read_external_sort_output (struct external_sort *xsrt,
1357                            read_sort_output_func *output_func, void *aux)
1358 {
1359   FILE *file;
1360   int file_idx;
1361   size_t i;
1362   struct ccase *c;
1363
1364   assert (xsrt->run_cnt == 1);
1365   file_idx = xsrt->initial_runs[0].file_idx;
1366
1367   file = open_temp_file (xsrt, file_idx, "rb");
1368   if (file == NULL)
1369     {
1370       err_failure ();
1371       return;
1372     }
1373
1374   c = xmalloc (xsrt->scp->case_size);
1375   for (i = 0; i < xsrt->initial_runs[0].case_cnt; i++)
1376     {
1377       if (!read_temp_file (xsrt, file_idx, file, c, xsrt->scp->case_size))
1378         {
1379           err_failure ();
1380           break;
1381         }
1382
1383       if (!output_func (c, aux))
1384         break;
1385     }
1386   free (c);
1387   close_temp_file (xsrt, file_idx, file);
1388 }
1389
1390 static void
1391 sort_source_destroy (struct case_source *source) 
1392 {
1393   struct sort_cases_pgm *scp = source->aux;
1394   
1395   destroy_sort_cases_pgm (scp);
1396 }
1397
1398 const struct case_source_class sort_source_class =
1399   {
1400     "SORT CASES",
1401     NULL, /* FIXME */
1402     sort_source_read,
1403     sort_source_destroy,
1404   };