b00c492fe6c88f59b55d2d98435bdd071213b9c0
[pspp-builds.git] / src / sort.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include "sort.h"
22 #include <assert.h>
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <errno.h>
26 #include "algorithm.h"
27 #include "alloc.h"
28 #include "command.h"
29 #include "error.h"
30 #include "expr.h"
31 #include "lexer.h"
32 #include "misc.h"
33 #include "settings.h"
34 #include "str.h"
35 #include "var.h"
36 #include "vfm.h"
37 #include "vfmP.h"
38
39 #if HAVE_UNISTD_H
40 #include <unistd.h>
41 #endif
42
43 #if HAVE_SYS_TYPES_H
44 #include <sys/types.h>
45 #endif
46
47 #if HAVE_SYS_STAT_H
48 #include <sys/stat.h>
49 #endif
50
51 #include "debug-print.h"
52
53 /* Other prototypes. */
54 static int compare_record (const union value *, const union value *,
55                            const struct sort_cases_pgm *);
56 static int compare_case_lists (const void *, const void *, void *);
57 static struct internal_sort *do_internal_sort (struct sort_cases_pgm *,
58                                                int separate);
59 static void destroy_internal_sort (struct internal_sort *);
60 static struct external_sort *do_external_sort (struct sort_cases_pgm *,
61                                                int separate);
62 static void destroy_external_sort (struct external_sort *);
63 struct sort_cases_pgm *parse_sort (void);
64
65 /* Performs the SORT CASES procedures. */
66 int
67 cmd_sort_cases (void)
68 {
69   struct sort_cases_pgm *scp;
70   int success;
71
72   lex_match_id ("SORT");
73   lex_match_id ("CASES");
74   lex_match (T_BY);
75
76   scp = parse_sort ();
77   if (scp == NULL)
78     return CMD_FAILURE;
79       
80   cancel_temporary ();
81
82   success = sort_cases (scp, 0);
83   destroy_sort_cases_pgm (scp);
84   if (success)
85     return lex_end_of_command ();
86   else
87     return CMD_FAILURE;
88 }
89
90 /* Parses a list of sort keys and returns a struct sort_cases_pgm
91    based on it.  Returns a null pointer on error. */
92 struct sort_cases_pgm *
93 parse_sort (void)
94 {
95   struct sort_cases_pgm *scp;
96
97   scp = xmalloc (sizeof *scp);
98   scp->ref_cnt = 1;
99   scp->vars = NULL;
100   scp->dirs = NULL;
101   scp->var_cnt = 0;
102   scp->isrt = NULL;
103   scp->xsrt = NULL;
104
105   do
106     {
107       int prev_var_cnt = scp->var_cnt;
108       enum sort_direction direction = SRT_ASCEND;
109
110       /* Variables. */
111       if (!parse_variables (default_dict, &scp->vars, &scp->var_cnt,
112                             PV_NO_DUPLICATE | PV_APPEND | PV_NO_SCRATCH))
113         goto error;
114
115       /* Sort direction. */
116       if (lex_match ('('))
117         {
118           if (lex_match_id ("D") || lex_match_id ("DOWN"))
119             direction = SRT_DESCEND;
120           else if (!lex_match_id ("A") && !lex_match_id ("UP"))
121             {
122               msg (SE, _("`A' or `D' expected inside parentheses."));
123               goto error;
124             }
125           if (!lex_match (')'))
126             {
127               msg (SE, _("`)' expected."));
128               goto error;
129             }
130         }
131       scp->dirs = xrealloc (scp->dirs, sizeof *scp->dirs * scp->var_cnt);
132       for (; prev_var_cnt < scp->var_cnt; prev_var_cnt++)
133         scp->dirs[prev_var_cnt] = direction;
134     }
135   while (token != '.' && token != '/');
136   
137   return scp;
138
139  error:
140   destroy_sort_cases_pgm (scp);
141   return NULL;
142 }
143
144 /* Destroys a SORT CASES program. */
145 void
146 destroy_sort_cases_pgm (struct sort_cases_pgm *scp) 
147 {
148   if (scp != NULL) 
149     {
150       assert (scp->ref_cnt > 0);
151       if (--scp->ref_cnt > 0)
152         return;
153
154       free (scp->vars);
155       free (scp->dirs);
156       destroy_internal_sort (scp->isrt);
157       destroy_external_sort (scp->xsrt);
158       free (scp);
159     }
160 }
161
162 /* Sorts the active file based on the key variables specified in
163    global variables vars and var_cnt.  The output is either to the
164    active file, if SEPARATE is zero, or to a separate file, if
165    SEPARATE is nonzero.  In the latter case the output cases can be
166    read with a call to read_sort_output().  (In the former case the
167    output cases should be dealt with through the usual vfm interface.)
168
169    The caller is responsible for freeing vars[]. */
170 int
171 sort_cases (struct sort_cases_pgm *scp, int separate)
172 {
173   scp->case_size = sizeof (union value) * compaction_nval;
174
175   /* Not sure this is necessary but it's good to be safe. */
176   if (separate && case_source_is_class (vfm_source, &sort_source_class))
177     procedure (NULL, NULL, NULL, NULL);
178   
179   /* SORT CASES cancels PROCESS IF. */
180   expr_free (process_if_expr);
181   process_if_expr = NULL;
182
183   /* Try an internal sort first. */
184   scp->isrt = do_internal_sort (scp, separate);
185   if (scp->isrt != NULL) 
186     return 1; 
187
188   /* Fall back to an external sort. */
189   write_active_file_to_disk ();
190   scp->xsrt = do_external_sort (scp, separate);
191   if (scp->xsrt != NULL) 
192     return 1;
193
194   destroy_sort_cases_pgm (scp);
195   return 0;
196 }
197 \f
198 /* Results of an internal sort. */
199 struct internal_sort 
200   {
201     struct case_list **results;
202   };
203
204 /* If a reasonable situation is set up, do an internal sort of the
205    data.  Return success. */
206 static struct internal_sort *
207 do_internal_sort (struct sort_cases_pgm *scp, int separate)
208 {
209   struct internal_sort *isrt;
210
211   isrt = xmalloc (sizeof *isrt);
212   isrt->results = NULL;
213
214   if (!case_source_is_class (vfm_source, &disk_source_class))
215     {
216       if (!case_source_is_class (vfm_source, &memory_source_class))
217         procedure (NULL, NULL, NULL, NULL);
218
219       if (case_source_is_class (vfm_source, &memory_source_class))
220         {
221           struct case_list *case_list;
222           struct case_list **case_array;
223           int case_cnt;
224           int i;
225
226           case_cnt = vfm_source->class->count (vfm_source);
227           if (case_cnt <= 0)
228             return isrt;
229
230           if (case_cnt > set_max_workspace / sizeof *case_array)
231             goto error;
232
233           case_list = memory_source_get_cases (vfm_source);
234           case_array = malloc (sizeof *case_array * (case_cnt + 1));
235           if (case_array == NULL)
236             goto error;
237
238           for (i = 0; case_list != NULL; i++) 
239             {
240               case_array[i] = case_list;
241               case_list = case_list->next;
242             }
243           assert (i == case_cnt);
244           case_array[case_cnt] = NULL;
245
246           sort (case_array, case_cnt, sizeof *case_array,
247                 compare_case_lists, scp);
248
249           if (!separate) 
250             {
251               memory_source_set_cases (vfm_source, case_array[0]);
252               for (i = 1; i <= case_cnt; i++)
253                 case_array[i - 1]->next = case_array[i]; 
254               free (case_array);
255             }
256           else 
257             isrt->results = case_array;
258                       
259           return isrt;
260         }
261     }
262
263  error:
264   free (isrt);
265   return NULL;
266 }
267
268 /* Destroys an internal sort result. */
269 static void
270 destroy_internal_sort (struct internal_sort *isrt) 
271 {
272   if (isrt != NULL) 
273     {
274       free (isrt->results);
275       free (isrt);
276     }
277 }
278
279 /* Compares the VAR_CNT variables in VARS[] between the
280    `case_list's at A and B, and returns a strcmp()-type
281    result. */
282 static int
283 compare_case_lists (const void *a_, const void *b_, void *scp_)
284 {
285   struct sort_cases_pgm *scp = scp_;
286   struct case_list *const *pa = a_;
287   struct case_list *const *pb = b_;
288   struct case_list *a = *pa;
289   struct case_list *b = *pb;
290
291   return compare_record (a->c.data, b->c.data, scp);
292 }
293 \f
294 /* External sort. */
295
296 /* Maximum order of merge.  If you increase this, then you should
297    use a heap for comparing cases during merge.  */
298 #define MAX_MERGE_ORDER         7
299
300 /* Minimum total number of records for buffers. */
301 #define MIN_BUFFER_TOTAL_SIZE_RECS      64
302
303 /* Minimum single input buffer size, in bytes and records. */
304 #define MIN_BUFFER_SIZE_BYTES   4096
305 #define MIN_BUFFER_SIZE_RECS    16
306
307 #if MIN_BUFFER_SIZE_RECS * 2 + 16 > MIN_BUFFER_TOTAL_SIZE_RECS
308 #error MIN_BUFFER_SIZE_RECS and MIN_BUFFER_TOTAL_SIZE_RECS do not make sense.
309 #endif
310
311 /* An initial run and its length. */
312 struct initial_run 
313   {
314     int file_idx;                     /* File index. */
315     size_t case_cnt;                  /* Number of cases. */
316   };
317
318 /* Sorts initial runs A and B in decending order by length. */
319 static int
320 compare_initial_runs (const void *a_, const void *b_, void *aux UNUSED) 
321 {
322   const struct initial_run *a = a_;
323   const struct initial_run *b = b_;
324   
325   return a->case_cnt > b->case_cnt ? -1 : a->case_cnt <b->case_cnt;
326 }
327
328 /* Results of an external sort. */
329 struct external_sort 
330   {
331     struct sort_cases_pgm *scp;       /* SORT CASES info. */
332     struct initial_run *initial_runs; /* Array of initial runs. */
333     size_t run_cnt, run_cap;          /* Number of runs, allocated capacity. */
334     char *temp_dir;                   /* Temporary file directory name. */
335     char *temp_name;                  /* Name of a temporary file. */
336     int next_file_idx;                /* Lowest unused file index. */
337   };
338
339 /* Prototypes for helper functions. */
340 static void sort_sink_write (struct case_sink *, const struct ccase *);
341 static int write_initial_runs (struct external_sort *, int separate);
342 static int init_external_sort (struct external_sort *);
343 static int merge (struct external_sort *);
344 static void rmdir_temp_dir (struct external_sort *);
345 static void remove_temp_file (struct external_sort *xsrt, int file_idx);
346
347 /* Performs an external sort of the active file according to the
348    specification in SCP.  Forms initial runs using a heap as a
349    reservoir.  Determines the optimum merge pattern via Huffman's
350    method (see Knuth vol. 3, 2nd edition, p. 365-366), and merges
351    according to that pattern. */
352 static struct external_sort *
353 do_external_sort (struct sort_cases_pgm *scp, int separate)
354 {
355   struct external_sort *xsrt;
356   int success = 0;
357
358   xsrt = xmalloc (sizeof *xsrt);
359   xsrt->scp = scp;
360   if (!init_external_sort (xsrt))
361     goto done;
362   if (!write_initial_runs (xsrt, separate))
363     goto done;
364   if (!merge (xsrt))
365     goto done;
366
367   success = 1;
368
369  done:
370   if (success)
371     {
372       /* Don't destroy anything because we'll need it for reading
373          the output. */
374       return xsrt;
375     }
376   else
377     {
378       destroy_external_sort (xsrt);
379       return NULL;
380     }
381 }
382
383 /* Destroys XSRT. */
384 static void
385 destroy_external_sort (struct external_sort *xsrt) 
386 {
387   if (xsrt != NULL) 
388     {
389       int i;
390       
391       for (i = 0; i < xsrt->run_cnt; i++)
392         remove_temp_file (xsrt, xsrt->initial_runs[i].file_idx);
393       rmdir_temp_dir (xsrt);
394       free (xsrt->temp_dir);
395       free (xsrt->temp_name);
396       free (xsrt->initial_runs);
397       free (xsrt);
398     }
399 }
400
401 #ifdef HAVE_MKDTEMP
402 /* Creates and returns the name of a temporary directory. */
403 static char *
404 make_temp_dir (void) 
405 {
406   const char *parent_dir;
407   char *temp_dir;
408
409   if (getenv ("TMPDIR") != NULL)
410     parent_dir = getenv ("TMPDIR");
411   else
412     parent_dir = P_tmpdir;
413
414   temp_dir = xmalloc (strlen (parent_dir) + 32);
415   sprintf (temp_dir, "%s%cpsppXXXXXX", parent_dir, DIR_SEPARATOR);
416   if (mkdtemp (temp_dir) == NULL) 
417     {
418       msg (SE, _("%s: Creating temporary directory: %s."),
419            temp_dir, strerror (errno));
420       free (temp_dir);
421       return NULL;
422     }
423   else
424     return temp_dir;
425 }
426 #else /* !HAVE_MKDTEMP */
427 /* Creates directory DIR. */
428 static int
429 do_mkdir (const char *dir) 
430 {
431 #ifndef __MSDOS__
432   return mkdir (dir, S_IRWXU);
433 #else
434   return mkdir (dir);
435 #endif
436 }
437
438 /* Creates and returns the name of a temporary directory. */
439 static char *
440 make_temp_dir (void) 
441 {
442   int i;
443   
444   for (i = 0; i < 100; i++)
445     {
446       char temp_dir[L_tmpnam + 1];
447       if (tmpnam (temp_dir) == NULL) 
448         {
449           msg (SE, _("Generating temporary directory name failed: %s."),
450                strerror (errno));
451           return NULL; 
452         }
453       else if (do_mkdir (temp_dir) == 0)
454         return xstrdup (temp_dir);
455     }
456   
457   msg (SE, _("Creating temporary directory failed: %s."), strerror (errno));
458   return NULL;
459 }
460 #endif /* !HAVE_MKDTEMP */
461
462 /* Sets up to open temporary files. */
463 static int
464 init_external_sort (struct external_sort *xsrt)
465 {
466   /* Zero. */
467   xsrt->temp_dir = NULL;
468   xsrt->next_file_idx = 0;
469
470   /* Huffman queue. */
471   xsrt->run_cap = 512;
472   xsrt->run_cnt = 0;
473   xsrt->initial_runs = xmalloc (sizeof *xsrt->initial_runs * xsrt->run_cap);
474
475   /* Temporary directory. */
476   xsrt->temp_dir = make_temp_dir ();
477   xsrt->temp_name = NULL;
478   if (xsrt->temp_dir == NULL)
479     return 0;
480   xsrt->temp_name = xmalloc (strlen (xsrt->temp_dir) + 64);
481
482   return 1;
483 }
484
485 /* Returns nonzero if we should return an error even though the
486    operation succeeded.  Useful for testing. */
487 static int
488 simulate_error (void) 
489 {
490   static int op_err_cnt = -1;
491   static int op_cnt;
492   
493   if (op_err_cnt == -1 || op_cnt++ < op_err_cnt)
494     return 0;
495   else
496     {
497       errno = 0;
498       return 1;
499     }
500 }
501
502 /* Removes the directory created for temporary files, if one
503    exists. */
504 static void
505 rmdir_temp_dir (struct external_sort *xsrt)
506 {
507   if (xsrt->temp_dir != NULL && rmdir (xsrt->temp_dir) == -1) 
508     {
509       msg (SE, _("%s: Error removing directory for temporary files: %s."),
510            xsrt->temp_dir, strerror (errno));
511       xsrt->temp_dir = NULL; 
512     }
513 }
514
515 /* Returns the name of temporary file number FILE_IDX for XSRT.
516    The name is written into a static buffer, so be careful.  */
517 static char *
518 get_temp_file_name (struct external_sort *xsrt, int file_idx)
519 {
520   assert (xsrt->temp_dir != NULL);
521   sprintf (xsrt->temp_name, "%s%c%04d",
522            xsrt->temp_dir, DIR_SEPARATOR, file_idx);
523   return xsrt->temp_name;
524 }
525
526 /* Opens temporary file numbered FILE_IDX for XSRT with mode MODE
527    and returns the FILE *. */
528 static FILE *
529 open_temp_file (struct external_sort *xsrt, int file_idx, const char *mode)
530 {
531   char *temp_file;
532   FILE *file;
533
534   temp_file = get_temp_file_name (xsrt, file_idx);
535
536   file = fopen (temp_file, mode);
537   if (simulate_error () || file == NULL) 
538     msg (SE, _("%s: Error opening temporary file for %s: %s."),
539          temp_file, mode[0] == 'r' ? "reading" : "writing",
540          strerror (errno));
541
542   return file;
543 }
544
545 /* Closes FILE, which is the temporary file numbered FILE_IDX
546    under XSRT.  Returns nonzero only if successful.  */
547 static int
548 close_temp_file (struct external_sort *xsrt, int file_idx, FILE *file)
549 {
550   if (file != NULL) 
551     {
552       char *temp_file = get_temp_file_name (xsrt, file_idx);
553       if (simulate_error () || fclose (file) == EOF) 
554         {
555           msg (SE, _("%s: Error closing temporary file: %s."),
556                temp_file, strerror (errno));
557           return 0;
558         }
559     }
560   return 1;
561 }
562
563 /* Delete temporary file numbered FILE_IDX for XSRT. */
564 static void
565 remove_temp_file (struct external_sort *xsrt, int file_idx) 
566 {
567   if (file_idx != -1)
568     {
569       char *temp_file = get_temp_file_name (xsrt, file_idx);
570       if (simulate_error () || remove (temp_file) != 0)
571         msg (SE, _("%s: Error removing temporary file: %s."),
572              temp_file, strerror (errno));
573     }
574 }
575
576 /* Writes SIZE bytes from buffer DATA into FILE, which is
577    temporary file numbered FILE_IDX from XSRT. */
578 static int
579 write_temp_file (struct external_sort *xsrt, int file_idx,
580                  FILE *file, const void *data, size_t size) 
581 {
582   if (!simulate_error () && fwrite (data, size, 1, file) == 1)
583     return 1;
584   else
585     {
586       char *temp_file = get_temp_file_name (xsrt, file_idx);
587       msg (SE, _("%s: Error writing temporary file: %s."),
588            temp_file, strerror (errno));
589       return 0;
590     }
591 }
592
593 /* Reads SIZE bytes into buffer DATA into FILE, which is
594    temporary file numbered FILE_IDX from XSRT. */
595 static int
596 read_temp_file (struct external_sort *xsrt, int file_idx,
597                 FILE *file, void *data, size_t size) 
598 {
599   if (!simulate_error () && fread (data, size, 1, file) == 1)
600     return 1;
601   else 
602     {
603       char *temp_file = get_temp_file_name (xsrt, file_idx);
604       if (ferror (file))
605         msg (SE, _("%s: Error reading temporary file: %s."),
606              temp_file, strerror (errno));
607       else
608         msg (SE, _("%s: Unexpected end of temporary file."),
609              temp_file);
610       return 0;
611     }
612 }
613 \f
614 /* Replacement selection. */
615
616 /* Pairs a record with a run number. */
617 struct record_run
618   {
619     int run;                    /* Run number of case. */
620     struct case_list *record;   /* Case data. */
621   };
622
623 /* Represents a set of initial runs during an external sort. */
624 struct initial_run_state 
625   {
626     struct external_sort *xsrt;
627
628     /* Reservoir. */
629     struct record_run *records; /* Records arranged as a heap. */
630     size_t record_cnt;          /* Current number of records. */
631     size_t record_cap;          /* Capacity for records. */
632     struct case_list *free_list;/* Cases not in heap. */
633     
634     /* Run currently being output. */
635     int file_idx;               /* Temporary file number. */
636     size_t case_cnt;            /* Number of cases so far. */
637     size_t case_size;           /* Number of bytes in a case. */
638     FILE *output_file;          /* Output file. */
639     struct case_list *last_output;/* Record last output. */
640
641     int okay;                   /* Zero if an error has been encountered. */
642   };
643
644 static void destroy_initial_run_state (struct initial_run_state *irs);
645 static int allocate_cases (struct initial_run_state *);
646 static struct case_list *grab_case (struct initial_run_state *);
647 static void release_case (struct initial_run_state *, struct case_list *);
648 static void output_record (struct initial_run_state *irs);
649 static void start_run (struct initial_run_state *irs);
650 static void end_run (struct initial_run_state *irs);
651 static int compare_record_run (const struct record_run *,
652                                const struct record_run *,
653                                struct sort_cases_pgm *);
654 static int compare_record_run_minheap (const void *, const void *, void *);
655
656 /* Writes initial runs for XSRT, sending them to a separate file
657    if SEPARATE is nonzero. */
658 static int
659 write_initial_runs (struct external_sort *xsrt, int separate)
660 {
661   struct initial_run_state *irs;
662   int success = 0;
663
664   /* Allocate memory for cases. */
665   irs = xmalloc (sizeof *irs);
666   irs->xsrt = xsrt;
667   irs->records = NULL;
668   irs->record_cnt = irs->record_cap = 0;
669   irs->free_list = NULL;
670   irs->output_file = NULL;
671   irs->last_output = NULL;
672   irs->file_idx = 0;
673   irs->case_cnt = 0;
674   irs->case_size = dict_get_case_size (default_dict);
675   irs->okay = 1;
676   if (!allocate_cases (irs)) 
677     goto done;
678
679   /* Create case sink. */
680   if (!separate)
681     {
682       if (vfm_sink != NULL && vfm_sink->class->destroy != NULL)
683         vfm_sink->class->destroy (vfm_sink);
684       vfm_sink = create_case_sink (&sort_sink_class, irs);
685       xsrt->scp->ref_cnt++;
686     }
687
688   /* Create initial runs. */
689   start_run (irs);
690   procedure (NULL, NULL, NULL, NULL);
691   while (irs->record_cnt > 0 && irs->okay)
692     output_record (irs);
693   end_run (irs);
694
695   success = irs->okay;
696
697  done:
698   destroy_initial_run_state (irs);
699
700   return success;
701 }
702
703 /* Add a single case to an initial run. */
704 static void
705 sort_sink_write (struct case_sink *sink, const struct ccase *c)
706 {
707   struct initial_run_state *irs = sink->aux;
708   struct record_run *new_record_run;
709
710   if (!irs->okay)
711     return;
712
713   /* Compose record_run for this run and add to heap. */
714   assert (irs->record_cnt < irs->record_cap);
715   new_record_run = irs->records + irs->record_cnt++;
716   new_record_run->record = grab_case (irs);
717   memcpy (new_record_run->record->c.data, c->data, irs->case_size);
718   new_record_run->run = irs->file_idx;
719   if (irs->last_output != NULL
720       && compare_record (c->data, irs->last_output->c.data,
721                          irs->xsrt->scp) < 0)
722     new_record_run->run = irs->xsrt->next_file_idx;
723   push_heap (irs->records, irs->record_cnt, sizeof *irs->records,
724              compare_record_run_minheap, irs->xsrt->scp);
725
726   /* Output a record if the reservoir is full. */
727   if (irs->record_cnt == irs->record_cap && irs->okay)
728     output_record (irs);
729 }
730
731 /* Destroys the initial run state represented by IRS. */
732 static void
733 destroy_initial_run_state (struct initial_run_state *irs) 
734 {
735   struct case_list *iter, *next;
736   int i;
737
738   if (irs == NULL)
739     return;
740
741   /* Release cases to free list. */
742   for (i = 0; i < irs->record_cnt; i++)
743     release_case (irs, irs->records[i].record);
744   if (irs->last_output != NULL)
745     release_case (irs, irs->last_output);
746
747   /* Free cases in free list. */
748   for (iter = irs->free_list; iter != NULL; iter = next) 
749     {
750       next = iter->next;
751       free (iter);
752     }
753
754   free (irs->records);
755   free (irs);
756
757   if (irs->output_file != NULL)
758     close_temp_file (irs->xsrt, irs->file_idx, irs->output_file);
759 }
760
761 /* Allocates room for lots of cases as a buffer. */
762 static int
763 allocate_cases (struct initial_run_state *irs)
764 {
765   size_t case_size;     /* Size of one case, in bytes. */
766   int approx_case_cost; /* Approximate memory cost of one case in bytes. */
767   int max_cases;        /* Maximum number of cases to allocate. */
768   int i;
769
770   /* Allocate as many cases as we can within the workspace
771      limit. */
772   case_size = dict_get_case_size (default_dict);
773   approx_case_cost = (sizeof *irs->records
774                       + sizeof *irs->free_list
775                       + case_size
776                       + 4 * sizeof (void *));
777   max_cases = set_max_workspace / approx_case_cost;
778   irs->records = malloc (sizeof *irs->records * max_cases);
779   for (i = 0; i < max_cases; i++)
780     {
781       struct case_list *c;
782       c = malloc (sizeof *c + case_size - sizeof (union value));
783       if (c == NULL) 
784         {
785           max_cases = i;
786           break;
787         }
788       release_case (irs, c);
789     }
790
791   /* irs->records gets all but one of the allocated cases.
792      The extra is used for last_output. */
793   irs->record_cap = max_cases - 1;
794
795   /* Fail if we didn't allocate an acceptable number of cases. */
796   if (irs->records == NULL || max_cases < MIN_BUFFER_TOTAL_SIZE_RECS)
797     {
798       msg (SE, _("Out of memory.  Could not allocate room for minimum of %d "
799                  "cases of %d bytes each.  (PSPP workspace is currently "
800                  "restricted to a maximum of %d KB.)"),
801            MIN_BUFFER_TOTAL_SIZE_RECS, approx_case_cost, set_max_workspace / 1024);
802       return 0;
803     }
804   return 1;
805 }
806
807 /* Compares the VAR_CNT variables in VARS[] between the `value's at
808    A and B, and returns a strcmp()-type result. */
809 static int
810 compare_record (const union value *a, const union value *b,
811                 const struct sort_cases_pgm *scp)
812 {
813   int i;
814
815   assert (a != NULL);
816   assert (b != NULL);
817   
818   for (i = 0; i < scp->var_cnt; i++)
819     {
820       struct variable *v = scp->vars[i];
821       int fv = v->fv;
822       int result;
823
824       if (v->type == NUMERIC)
825         {
826           double af = a[fv].f;
827           double bf = b[fv].f;
828           
829           result = af < bf ? -1 : af > bf;
830         }
831       else
832         result = memcmp (a[fv].s, b[fv].s, v->width);
833
834       if (result != 0) 
835         {
836           if (scp->dirs[i] == SRT_DESCEND)
837             result = -result;
838           return result;
839         }
840     }
841
842   return 0;
843 }
844
845 /* Compares record-run tuples A and B on run number first, then
846    on the current record according to SCP. */
847 static int
848 compare_record_run (const struct record_run *a,
849                     const struct record_run *b,
850                     struct sort_cases_pgm *scp) 
851 {
852   if (a->run != b->run)
853     return a->run > b->run ? 1 : -1;
854   else
855     return compare_record (a->record->c.data, b->record->c.data, scp);
856 }
857
858 /* Compares record-run tuples A and B on run number first, then
859    on the current record according to SCP, but in descending
860    order. */
861 static int
862 compare_record_run_minheap (const void *a, const void *b, void *scp) 
863 {
864   return -compare_record_run (a, b, scp);
865 }
866
867 /* Begins a new initial run, specifically its output file. */
868 static void
869 start_run (struct initial_run_state *irs)
870 {
871   irs->file_idx = irs->xsrt->next_file_idx++;
872   irs->case_cnt = 0;
873   irs->output_file = open_temp_file (irs->xsrt, irs->file_idx, "wb");
874   if (irs->output_file == NULL) 
875     irs->okay = 0;
876   if (irs->last_output != NULL) 
877     {
878       release_case (irs, irs->last_output);
879       irs->last_output = NULL; 
880     }
881 }
882
883 /* Ends the current initial run.  */
884 static void
885 end_run (struct initial_run_state *irs)
886 {
887   struct external_sort *xsrt = irs->xsrt;
888   
889   /* Record initial run. */
890   if (xsrt->run_cnt >= xsrt->run_cap) 
891     {
892       xsrt->run_cap *= 2;
893       xsrt->initial_runs
894         = xrealloc (xsrt->initial_runs,
895                     sizeof *xsrt->initial_runs * xsrt->run_cap);
896     }
897   xsrt->initial_runs[xsrt->run_cnt].file_idx = irs->file_idx;
898   xsrt->initial_runs[xsrt->run_cnt].case_cnt = irs->case_cnt;
899   xsrt->run_cnt++;
900
901   /* Close file handle. */
902   if (irs->output_file != NULL
903       && !close_temp_file (irs->xsrt, irs->file_idx, irs->output_file)) 
904     irs->okay = 0;
905   irs->output_file = NULL;
906 }
907
908 /* Writes a record to the current initial run. */
909 static void
910 output_record (struct initial_run_state *irs)
911 {
912   struct record_run *record_run;
913   struct ccase *out_case;
914   
915   /* Extract minimum case from heap. */
916   assert (irs->record_cnt > 0);
917   pop_heap (irs->records, irs->record_cnt--, sizeof *irs->records,
918             compare_record_run_minheap, irs->xsrt->scp);
919   record_run = irs->records + irs->record_cnt;
920
921   /* Bail if an error has occurred. */
922   if (!irs->okay)
923     return;
924
925   /* Obtain case data to write to disk. */
926   out_case = &record_run->record->c;
927   if (compaction_necessary)
928     {
929       compact_case (compaction_case, out_case);
930       out_case = compaction_case;
931     }
932
933   /* Start new run if necessary. */
934   assert (record_run->run == irs->file_idx
935           || record_run->run == irs->xsrt->next_file_idx);
936   if (record_run->run != irs->file_idx)
937     {
938       end_run (irs);
939       start_run (irs);
940     }
941   assert (record_run->run == irs->file_idx);
942   irs->case_cnt++;
943
944   /* Write to disk. */
945   if (irs->output_file != NULL
946       && !write_temp_file (irs->xsrt, irs->file_idx, irs->output_file,
947                            out_case->data,
948                            sizeof *out_case->data * compaction_nval))
949     irs->okay = 0;
950
951   /* This record becomes last_output. */
952   if (irs->last_output != NULL)
953     release_case (irs, irs->last_output);
954   irs->last_output = record_run->record;
955 }
956
957 /* Gets a case from the free list in IRS.  It is an error to call
958    this function if the free list is empty. */
959 static struct case_list *
960 grab_case (struct initial_run_state *irs)
961 {
962   struct case_list *c;
963   
964   assert (irs != NULL);
965   assert (irs->free_list != NULL);
966
967   c = irs->free_list;
968   irs->free_list = c->next;
969   return c;
970 }
971
972 /* Returns C to the free list in IRS. */
973 static void 
974 release_case (struct initial_run_state *irs, struct case_list *c) 
975 {
976   assert (irs != NULL);
977   assert (c != NULL);
978
979   c->next = irs->free_list;
980   irs->free_list = c;
981 }
982 \f
983 /* Merging. */
984
985 /* State of merging initial runs. */
986 struct merge_state 
987   {
988     struct external_sort *xsrt; /* External sort state. */
989     struct ccase **cases;       /* Buffers. */
990     size_t case_cnt;            /* Number of buffers. */
991   };
992
993 struct run;
994 static int merge_once (struct merge_state *,
995                        const struct initial_run[], size_t,
996                        struct initial_run *);
997 static int fill_run_buffer (struct merge_state *, struct run *);
998 static int mod (int, int);
999
1000 /* Performs a series of P-way merges of initial runs
1001    method. */
1002 static int
1003 merge (struct external_sort *xsrt)
1004 {
1005   struct merge_state mrg;       /* State of merge. */
1006   size_t case_size;             /* Size of one case, in bytes. */
1007   size_t approx_case_cost;      /* Approximate memory cost of one case. */
1008   int max_order;                /* Maximum order of merge. */
1009   size_t dummy_run_cnt;         /* Number of dummy runs to insert. */
1010   int success = 0;
1011   int i;
1012
1013   mrg.xsrt = xsrt;
1014
1015   /* Allocate as many cases as possible into cases. */
1016   case_size = dict_get_case_size (default_dict);
1017   approx_case_cost = sizeof *mrg.cases + case_size + 4 * sizeof (void *);
1018   mrg.case_cnt = set_max_workspace / approx_case_cost;
1019   mrg.cases = malloc (sizeof *mrg.cases * mrg.case_cnt);
1020   if (mrg.cases == NULL)
1021     goto done;
1022   for (i = 0; i < mrg.case_cnt; i++) 
1023     {
1024       mrg.cases[i] = malloc (case_size);
1025       if (mrg.cases[i] == NULL) 
1026         {
1027           mrg.case_cnt = i;
1028           break;
1029         }
1030     }
1031   if (mrg.case_cnt < MIN_BUFFER_TOTAL_SIZE_RECS)
1032     {
1033       msg (SE, _("Out of memory.  Could not allocate room for minimum of %d "
1034                  "cases of %d bytes each.  (PSPP workspace is currently "
1035                  "restricted to a maximum of %d KB.)"),
1036            MIN_BUFFER_TOTAL_SIZE_RECS, approx_case_cost, set_max_workspace / 1024);
1037       return 0;
1038     }
1039
1040   /* Determine maximum order of merge. */
1041   max_order = MAX_MERGE_ORDER;
1042   if (mrg.case_cnt / max_order < MIN_BUFFER_SIZE_RECS)
1043     max_order = mrg.case_cnt / MIN_BUFFER_SIZE_RECS;
1044   else if (mrg.case_cnt / max_order * case_size < MIN_BUFFER_SIZE_BYTES)
1045     max_order = mrg.case_cnt / (MIN_BUFFER_SIZE_BYTES / case_size);
1046   if (max_order < 2)
1047     max_order = 2;
1048   if (max_order > xsrt->run_cnt)
1049     max_order = xsrt->run_cnt;
1050
1051   /* Repeatedly merge the P shortest existing runs until only one run
1052      is left. */
1053   make_heap (xsrt->initial_runs, xsrt->run_cnt, sizeof *xsrt->initial_runs,
1054              compare_initial_runs, NULL);
1055   dummy_run_cnt = mod (1 - (int) xsrt->run_cnt, max_order - 1);
1056   assert (max_order == 1
1057           || (xsrt->run_cnt + dummy_run_cnt) % (max_order - 1) == 1);
1058   while (xsrt->run_cnt > 1)
1059     {
1060       struct initial_run output_run;
1061       int order;
1062       int i;
1063
1064       /* Choose order of merge (max_order after first merge). */
1065       order = max_order - dummy_run_cnt;
1066       dummy_run_cnt = 0;
1067
1068       /* Choose runs to merge. */
1069       assert (xsrt->run_cnt >= order);
1070       for (i = 0; i < order; i++) 
1071         pop_heap (xsrt->initial_runs, xsrt->run_cnt--,
1072                   sizeof *xsrt->initial_runs,
1073                   compare_initial_runs, NULL); 
1074           
1075       /* Merge runs. */
1076       if (!merge_once (&mrg, xsrt->initial_runs + xsrt->run_cnt, order,
1077                        &output_run))
1078         goto done;
1079
1080       /* Add output run to heap. */
1081       xsrt->initial_runs[xsrt->run_cnt++] = output_run;
1082       push_heap (xsrt->initial_runs, xsrt->run_cnt, sizeof *xsrt->initial_runs,
1083                  compare_initial_runs, NULL);
1084     }
1085
1086   /* Exactly one run is left, which contains the entire sorted
1087      file.  We could use it to find a total case count. */
1088   assert (xsrt->run_cnt == 1);
1089
1090   success = 1;
1091
1092  done:
1093   for (i = 0; i < mrg.case_cnt; i++)
1094     free (mrg.cases[i]);
1095   free (mrg.cases);
1096
1097   return success;
1098 }
1099
1100 /* Modulo function as defined by Knuth. */
1101 static int
1102 mod (int x, int y)
1103 {
1104   if (y == 0)
1105     return x;
1106   else if (x == 0)
1107     return 0;
1108   else if (x > 0 && y > 0)
1109     return x % y;
1110   else if (x < 0 && y > 0)
1111     return y - (-x) % y;
1112
1113   assert (0);
1114 }
1115
1116 /* A run of data for use in merging. */
1117 struct run 
1118   {
1119     FILE *file;                 /* File that contains run. */
1120     int file_idx;               /* Index of file that contains run. */
1121     struct ccase **buffer;      /* Case buffer. */
1122     struct ccase **buffer_head; /* First unconsumed case in buffer. */
1123     struct ccase **buffer_tail; /* One past last unconsumed case in buffer. */
1124     size_t buffer_cap;          /* Number of cases buffer can hold. */
1125     size_t unread_case_cnt;     /* Number of cases not yet read. */
1126   };
1127
1128 /* Merges the RUN_CNT initial runs specified in INPUT_RUNS into a
1129    new run.  Returns nonzero only if successful.  Adds an entry
1130    to MRG->xsrt->runs for the output file if and only if the
1131    output file is actually created.  Always deletes all the input
1132    files. */
1133 static int
1134 merge_once (struct merge_state *mrg,
1135             const struct initial_run input_runs[],
1136             size_t run_cnt,
1137             struct initial_run *output_run)
1138 {
1139   struct run runs[MAX_MERGE_ORDER];
1140   FILE *output_file = NULL;
1141   size_t case_size;
1142   int success = 0;
1143   int i;
1144
1145   /* Initialize runs[]. */
1146   for (i = 0; i < run_cnt; i++) 
1147     {
1148       runs[i].file = NULL;
1149       runs[i].file_idx = input_runs[i].file_idx;
1150       runs[i].buffer = mrg->cases + mrg->case_cnt / run_cnt * i;
1151       runs[i].buffer_head = runs[i].buffer;
1152       runs[i].buffer_tail = runs[i].buffer;
1153       runs[i].buffer_cap = mrg->case_cnt / run_cnt;
1154       runs[i].unread_case_cnt = input_runs[i].case_cnt;
1155     }
1156
1157   /* Open input files. */
1158   for (i = 0; i < run_cnt; i++) 
1159     {
1160       runs[i].file = open_temp_file (mrg->xsrt, runs[i].file_idx, "rb");
1161       if (runs[i].file == NULL)
1162         goto error;
1163     }
1164   
1165   /* Create output file and count cases to be output. */
1166   output_run->file_idx = mrg->xsrt->next_file_idx++;
1167   output_run->case_cnt = 0;
1168   for (i = 0; i < run_cnt; i++)
1169     output_run->case_cnt += input_runs[i].case_cnt;
1170   output_file = open_temp_file (mrg->xsrt, output_run->file_idx, "wb");
1171   if (output_file == NULL) 
1172     goto error;
1173
1174   /* Prime buffers. */
1175   for (i = 0; i < run_cnt; i++)
1176     if (!fill_run_buffer (mrg, runs + i))
1177       goto error;
1178
1179   /* Merge. */
1180   case_size = dict_get_case_size (default_dict);
1181   while (run_cnt > 0) 
1182     {
1183       struct run *min_run;
1184
1185       /* Find minimum. */
1186       min_run = runs;
1187       for (i = 1; i < run_cnt; i++)
1188         if (compare_record ((*runs[i].buffer_head)->data,
1189                             (*min_run->buffer_head)->data,
1190                             mrg->xsrt->scp) < 0)
1191           min_run = runs + i;
1192
1193       /* Write minimum to output file. */
1194       if (!write_temp_file (mrg->xsrt, min_run->file_idx, output_file,
1195                             (*min_run->buffer_head)->data, case_size))
1196         goto error;
1197
1198       /* Remove case from buffer. */
1199       if (++min_run->buffer_head >= min_run->buffer_tail)
1200         {
1201           /* Buffer is empty.  Fill from file. */
1202           if (!fill_run_buffer (mrg, min_run))
1203             goto error;
1204
1205           /* If buffer is still empty, delete its run. */
1206           if (min_run->buffer_head >= min_run->buffer_tail)
1207             {
1208               close_temp_file (mrg->xsrt, min_run->file_idx, min_run->file);
1209               remove_temp_file (mrg->xsrt, min_run->file_idx);
1210               *min_run = runs[--run_cnt];
1211
1212               /* We could donate the now-unused buffer space to
1213                  other runs. */
1214             }
1215         } 
1216     }
1217
1218   /* Close output file.  */
1219   close_temp_file (mrg->xsrt, output_run->file_idx, output_file);
1220
1221   return 1;
1222
1223  error:
1224   /* Close and remove output file.  */
1225   if (output_file != NULL) 
1226     {
1227       close_temp_file (mrg->xsrt, output_run->file_idx, output_file);
1228       remove_temp_file (mrg->xsrt, output_run->file_idx);
1229     }
1230   
1231   /* Close and remove any remaining input runs. */
1232   for (i = 0; i < run_cnt; i++) 
1233     {
1234       close_temp_file (mrg->xsrt, runs[i].file_idx, runs[i].file);
1235       remove_temp_file (mrg->xsrt, runs[i].file_idx);
1236     }
1237
1238   return success;
1239 }
1240
1241 /* Reads as many cases as possible into RUN's buffer.
1242    Reads nonzero unless a disk error occurs. */
1243 static int
1244 fill_run_buffer (struct merge_state *mrg, struct run *run) 
1245 {
1246   run->buffer_head = run->buffer_tail = run->buffer;
1247   while (run->unread_case_cnt > 0
1248          && run->buffer_tail < run->buffer + run->buffer_cap)
1249     {
1250       if (!read_temp_file (mrg->xsrt, run->file_idx, run->file,
1251                            (*run->buffer_tail)->data,
1252                            dict_get_case_size (default_dict)))
1253         return 0;
1254
1255       run->unread_case_cnt--;
1256       run->buffer_tail++;
1257     }
1258
1259   return 1;
1260 }
1261 \f
1262 static struct case_source *
1263 sort_sink_make_source (struct case_sink *sink) 
1264 {
1265   struct initial_run_state *irs = sink->aux;
1266
1267   return create_case_source (&sort_source_class, default_dict,
1268                              irs->xsrt->scp);
1269 }
1270
1271 const struct case_sink_class sort_sink_class = 
1272   {
1273     "SORT CASES",
1274     NULL,
1275     sort_sink_write,
1276     NULL,
1277     sort_sink_make_source,
1278   };
1279 \f
1280 struct sort_source_aux 
1281   {
1282     struct sort_cases_pgm *scp;
1283     struct ccase *dst;
1284     write_case_func *write_case;
1285     write_case_data wc_data;
1286   };
1287
1288 /* Passes C to the write_case function. */
1289 static int
1290 sort_source_read_helper (const struct ccase *src, void *aux_) 
1291 {
1292   struct sort_source_aux *aux = aux_;
1293
1294   memcpy (aux->dst, src, aux->scp->case_size);
1295   return aux->write_case (aux->wc_data);
1296 }
1297
1298 /* Reads all the records from the source stream and passes them
1299    to write_case(). */
1300 static void
1301 sort_source_read (struct case_source *source,
1302                   struct ccase *c,
1303                   write_case_func *write_case, write_case_data wc_data)
1304 {
1305   struct sort_cases_pgm *scp = source->aux;
1306   struct sort_source_aux aux;
1307
1308   aux.scp = scp;
1309   aux.dst = c;
1310   aux.write_case = write_case;
1311   aux.wc_data = wc_data;
1312   
1313   read_sort_output (scp, sort_source_read_helper, &aux);
1314 }
1315
1316 static void read_internal_sort_output (struct internal_sort *isrt,
1317                                        read_sort_output_func *, void *aux);
1318 static void read_external_sort_output (struct external_sort *xsrt,
1319                                        read_sort_output_func *, void *aux);
1320
1321 /* Reads all the records from the output stream and passes them to the
1322    function provided, which must have an interface identical to
1323    write_case(). */
1324 void
1325 read_sort_output (struct sort_cases_pgm *scp,
1326                   read_sort_output_func *output_func, void *aux)
1327 {
1328   assert ((scp->isrt != NULL) + (scp->xsrt != NULL) <= 1);
1329   if (scp->isrt != NULL)
1330     read_internal_sort_output (scp->isrt, output_func, aux);
1331   else if (scp->xsrt != NULL)
1332     read_external_sort_output (scp->xsrt, output_func, aux);
1333   else 
1334     {
1335       /* No results.  Probably an external sort that failed. */
1336     }
1337 }
1338
1339 static void
1340 read_internal_sort_output (struct internal_sort *isrt,
1341                            read_sort_output_func *output_func,
1342                            void *aux)
1343 {
1344   struct case_list **p;
1345
1346   for (p = isrt->results; *p; p++)
1347     if (!output_func (&(*p)->c, aux))
1348       break;
1349   free (isrt->results);
1350 }
1351
1352 static void
1353 read_external_sort_output (struct external_sort *xsrt,
1354                            read_sort_output_func *output_func, void *aux)
1355 {
1356   FILE *file;
1357   int file_idx;
1358   size_t i;
1359   struct ccase *c;
1360
1361   assert (xsrt->run_cnt == 1);
1362   file_idx = xsrt->initial_runs[0].file_idx;
1363
1364   file = open_temp_file (xsrt, file_idx, "rb");
1365   if (file == NULL)
1366     {
1367       err_failure ();
1368       return;
1369     }
1370
1371   c = xmalloc (xsrt->scp->case_size);
1372   for (i = 0; i < xsrt->initial_runs[0].case_cnt; i++)
1373     {
1374       if (!read_temp_file (xsrt, file_idx, file, c, xsrt->scp->case_size))
1375         {
1376           err_failure ();
1377           break;
1378         }
1379
1380       if (!output_func (c, aux))
1381         break;
1382     }
1383   free (c);
1384 }
1385
1386 static void
1387 sort_source_destroy (struct case_source *source) 
1388 {
1389   struct sort_cases_pgm *scp = source->aux;
1390   
1391   destroy_sort_cases_pgm (scp);
1392 }
1393
1394 const struct case_source_class sort_source_class =
1395   {
1396     "SORT CASES",
1397     NULL, /* FIXME */
1398     sort_source_read,
1399     sort_source_destroy,
1400   };