5f9cdb42f50e307a68980d214eabca331b0ea917
[pspp-builds.git] / src / sort.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include "sort.h"
22 #include "error.h"
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <errno.h>
26 #include "algorithm.h"
27 #include "alloc.h"
28 #include "casefile.h"
29 #include "command.h"
30 #include "error.h"
31 #include "expr.h"
32 #include "lexer.h"
33 #include "misc.h"
34 #include "settings.h"
35 #include "str.h"
36 #include "var.h"
37 #include "vfm.h"
38 #include "vfmP.h"
39 #include "workspace.h"
40
41 #if HAVE_UNISTD_H
42 #include <unistd.h>
43 #endif
44
45 #if HAVE_SYS_TYPES_H
46 #include <sys/types.h>
47 #endif
48
49 #if HAVE_SYS_STAT_H
50 #include <sys/stat.h>
51 #endif
52
53 #include "debug-print.h"
54
55 /* Other prototypes. */
56 static int compare_record (const union value *, const union value *,
57                            const struct sort_cases_pgm *, int *idx_to_fv);
58 static int compare_cases (const struct ccase *, const struct ccase *, void *);
59 static int compare_case_dblptrs (const void *, const void *, void *);
60 static struct internal_sort *do_internal_sort (struct sort_cases_pgm *,
61                                                int separate);
62 static void destroy_internal_sort (struct internal_sort *);
63 static struct external_sort *do_external_sort (struct sort_cases_pgm *,
64                                                int separate);
65 static void destroy_external_sort (struct external_sort *);
66 struct sort_cases_pgm *parse_sort (void);
67
68 /* Performs the SORT CASES procedures. */
69 int
70 cmd_sort_cases (void)
71 {
72   struct sort_cases_pgm *scp;
73   int success;
74
75   lex_match (T_BY);
76
77   scp = parse_sort ();
78   if (scp == NULL)
79     return CMD_FAILURE;
80
81   if (temporary != 0)
82     {
83       msg (SE, _("SORT CASES may not be used after TEMPORARY.  "
84                  "Temporary transformations will be made permanent."));
85       cancel_temporary (); 
86     }
87
88   success = sort_cases (scp, 0);
89   destroy_sort_cases_pgm (scp);
90   if (success)
91     return lex_end_of_command ();
92   else
93     return CMD_FAILURE;
94 }
95
96 /* Parses a list of sort keys and returns a struct sort_cases_pgm
97    based on it.  Returns a null pointer on error. */
98 struct sort_cases_pgm *
99 parse_sort (void)
100 {
101   struct sort_cases_pgm *scp;
102
103   scp = xmalloc (sizeof *scp);
104   scp->ref_cnt = 1;
105   scp->vars = NULL;
106   scp->dirs = NULL;
107   scp->var_cnt = 0;
108   scp->isrt = NULL;
109   scp->xsrt = NULL;
110
111   do
112     {
113       int prev_var_cnt = scp->var_cnt;
114       enum sort_direction direction = SRT_ASCEND;
115
116       /* Variables. */
117       if (!parse_variables (default_dict, &scp->vars, &scp->var_cnt,
118                             PV_NO_DUPLICATE | PV_APPEND | PV_NO_SCRATCH))
119         goto error;
120
121       /* Sort direction. */
122       if (lex_match ('('))
123         {
124           if (lex_match_id ("D") || lex_match_id ("DOWN"))
125             direction = SRT_DESCEND;
126           else if (!lex_match_id ("A") && !lex_match_id ("UP"))
127             {
128               msg (SE, _("`A' or `D' expected inside parentheses."));
129               goto error;
130             }
131           if (!lex_match (')'))
132             {
133               msg (SE, _("`)' expected."));
134               goto error;
135             }
136         }
137       scp->dirs = xrealloc (scp->dirs, sizeof *scp->dirs * scp->var_cnt);
138       for (; prev_var_cnt < scp->var_cnt; prev_var_cnt++)
139         scp->dirs[prev_var_cnt] = direction;
140     }
141   while (token != '.' && token != '/');
142   
143   return scp;
144
145  error:
146   destroy_sort_cases_pgm (scp);
147   return NULL;
148 }
149
150 /* Destroys a SORT CASES program. */
151 void
152 destroy_sort_cases_pgm (struct sort_cases_pgm *scp) 
153 {
154   if (scp != NULL) 
155     {
156       assert (scp->ref_cnt > 0);
157       if (--scp->ref_cnt > 0)
158         return;
159
160       free (scp->vars);
161       free (scp->dirs);
162       destroy_internal_sort (scp->isrt);
163       destroy_external_sort (scp->xsrt);
164       free (scp);
165     }
166 }
167
168 /* Sorts the active file based on the key variables specified in
169    global variables vars and var_cnt.
170
171    If SEPARATE is zero, then output goes to the active file.  The
172    output cases can be read through the usual VFM interfaces.
173
174    If SEPARATE is nonzero, then output goes to a separate file.
175    The output cases can be read with a call to
176    read_sort_output().
177
178    The caller is responsible for freeing SCP. */
179 int
180 sort_cases (struct sort_cases_pgm *scp, int separate)
181 {
182   scp->case_size
183     = sizeof (union value) * dict_get_compacted_value_cnt (default_dict);
184
185   /* Not sure this is necessary but it's good to be safe. */
186   if (separate && case_source_is_class (vfm_source, &sort_source_class))
187     procedure (NULL, NULL);
188   
189   /* SORT CASES cancels PROCESS IF. */
190   expr_free (process_if_expr);
191   process_if_expr = NULL;
192
193   /* Try an internal sort first. */
194   scp->isrt = do_internal_sort (scp, separate);
195   if (scp->isrt != NULL) 
196     return 1; 
197
198   /* Fall back to an external sort. */
199   scp->xsrt = do_external_sort (scp, separate);
200   if (scp->xsrt != NULL) 
201     return 1;
202
203   destroy_sort_cases_pgm (scp);
204   return 0;
205 }
206 \f
207 /* Results of an internal sort.
208    Used only for sorting to a separate file. */
209 struct internal_sort 
210   {
211     const struct ccase **cases;
212     size_t case_cnt;
213   };
214
215 /* If the data is in memory, do an internal sort.  Return
216    success. */
217 static struct internal_sort *
218 do_internal_sort (struct sort_cases_pgm *scp, int separate)
219 {
220   struct internal_sort *isrt;
221
222   isrt = xmalloc (sizeof *isrt);
223   isrt->cases = NULL;
224   isrt->case_cnt = 0;
225
226   if (case_source_is_class (vfm_source, &storage_source_class)) 
227     {
228       struct casefile *casefile = storage_source_get_casefile (vfm_source);
229
230       if (!separate)
231         {
232           if (!casefile_sort (casefile, compare_cases, scp))
233             goto error;
234         }
235       else 
236         {
237           /* FIXME FIXME FIXME.
238              This is crap because the casefile could get flushed
239              to disk between the time we sort it and we use it
240              later, causing invalid pointer accesses.
241              The right solution is probably to extend casefiles
242              to support duplication. */
243           struct casereader *reader;
244           size_t case_idx;
245
246           if (!casefile_in_core (casefile))
247             goto error;
248           
249           isrt->case_cnt = casefile_get_case_cnt (casefile);
250           isrt->cases = workspace_malloc (sizeof *isrt->cases
251                                           * isrt->case_cnt);
252           if (isrt->cases == NULL)
253             goto error;
254
255           reader = casefile_get_reader (casefile);
256           for (case_idx = 0; case_idx < isrt->case_cnt; case_idx++) 
257             {
258               casereader_read (reader, &isrt->cases[case_idx]);
259               assert (isrt->cases[case_idx] != NULL);
260             }
261           casereader_destroy (reader);
262
263           sort (isrt->cases, isrt->case_cnt, casefile_get_case_size (casefile),
264                 compare_case_dblptrs, scp);
265         }
266
267       return isrt;
268     }
269   
270  error:
271   free (isrt);
272   return NULL;
273 }
274
275 /* Destroys an internal sort result. */
276 static void
277 destroy_internal_sort (struct internal_sort *isrt) 
278 {
279   if (isrt != NULL) 
280     {
281       workspace_free (isrt->cases, sizeof *isrt->cases * isrt->case_cnt);
282       free (isrt);
283     }
284 }
285
286 /* Compares the variables specified by SCP between the cases at A
287    and B, and returns a strcmp()-type result. */
288 static int
289 compare_cases (const struct ccase *a, const struct ccase *b,
290                void *scp_)
291 {
292   struct sort_cases_pgm *scp = scp_;
293
294   return compare_record (a->data, b->data, scp, NULL);
295 }
296
297 /* Compares the variables specified by SCP between the cases at A
298    and B, and returns a strcmp()-type result. */
299 static int
300 compare_case_dblptrs (const void *a_, const void *b_, void *scp_)
301 {
302   struct sort_cases_pgm *scp = scp_;
303   struct ccase *const *pa = a_;
304   struct ccase *const *pb = b_;
305   struct ccase *a = *pa;
306   struct ccase *b = *pb;
307
308   return compare_record (a->data, b->data, scp, NULL);
309 }
310 \f
311 /* External sort. */
312
313 /* Maximum order of merge.  If you increase this, then you should
314    use a heap for comparing cases during merge.  */
315 #define MAX_MERGE_ORDER         7
316
317 /* Minimum total number of records for buffers. */
318 #define MIN_BUFFER_TOTAL_SIZE_RECS      64
319
320 /* Minimum single input buffer size, in bytes and records. */
321 #define MIN_BUFFER_SIZE_BYTES   4096
322 #define MIN_BUFFER_SIZE_RECS    16
323
324 #if MIN_BUFFER_SIZE_RECS * 2 + 16 > MIN_BUFFER_TOTAL_SIZE_RECS
325 #error MIN_BUFFER_SIZE_RECS and MIN_BUFFER_TOTAL_SIZE_RECS do not make sense.
326 #endif
327
328 /* An initial run and its length. */
329 struct initial_run 
330   {
331     int file_idx;                     /* File index. */
332     size_t case_cnt;                  /* Number of cases. */
333   };
334
335 /* Sorts initial runs A and B in decending order by length. */
336 static int
337 compare_initial_runs (const void *a_, const void *b_, void *aux UNUSED) 
338 {
339   const struct initial_run *a = a_;
340   const struct initial_run *b = b_;
341   
342   return a->case_cnt > b->case_cnt ? -1 : a->case_cnt <b->case_cnt;
343 }
344
345 /* Results of an external sort. */
346 struct external_sort 
347   {
348     struct sort_cases_pgm *scp;       /* SORT CASES info. */
349     struct initial_run *initial_runs; /* Array of initial runs. */
350     size_t run_cnt, run_cap;          /* Number of runs, allocated capacity. */
351     char *temp_dir;                   /* Temporary file directory name. */
352     char *temp_name;                  /* Name of a temporary file. */
353     int next_file_idx;                /* Lowest unused file index. */
354   };
355
356 /* Prototypes for helper functions. */
357 static void sort_sink_write (struct case_sink *, const struct ccase *);
358 static int write_initial_runs (struct external_sort *, int separate);
359 static int init_external_sort (struct external_sort *);
360 static int merge (struct external_sort *);
361 static void rmdir_temp_dir (struct external_sort *);
362 static void remove_temp_file (struct external_sort *xsrt, int file_idx);
363
364 /* Performs an external sort of the active file according to the
365    specification in SCP.  Forms initial runs using a heap as a
366    reservoir.  Determines the optimum merge pattern via Huffman's
367    method (see Knuth vol. 3, 2nd edition, p. 365-366), and merges
368    according to that pattern. */
369 static struct external_sort *
370 do_external_sort (struct sort_cases_pgm *scp, int separate)
371 {
372   struct external_sort *xsrt;
373   int success = 0;
374
375   if (vfm_source != NULL
376       && case_source_is_class (vfm_source, &storage_source_class)) 
377     casefile_to_disk (storage_source_get_casefile (vfm_source));
378
379   xsrt = xmalloc (sizeof *xsrt);
380   xsrt->scp = scp;
381   if (!init_external_sort (xsrt))
382     goto done;
383   if (!write_initial_runs (xsrt, separate))
384     goto done;
385   if (!merge (xsrt))
386     goto done;
387
388   success = 1;
389
390  done:
391   if (success)
392     {
393       /* Don't destroy anything because we'll need it for reading
394          the output. */
395       return xsrt;
396     }
397   else
398     {
399       destroy_external_sort (xsrt);
400       return NULL;
401     }
402 }
403
404 /* Destroys XSRT. */
405 static void
406 destroy_external_sort (struct external_sort *xsrt) 
407 {
408   if (xsrt != NULL) 
409     {
410       int i;
411       
412       for (i = 0; i < xsrt->run_cnt; i++)
413         remove_temp_file (xsrt, xsrt->initial_runs[i].file_idx);
414       rmdir_temp_dir (xsrt);
415       free (xsrt->temp_dir);
416       free (xsrt->temp_name);
417       free (xsrt->initial_runs);
418       free (xsrt);
419     }
420 }
421
422 #ifdef HAVE_MKDTEMP
423 /* Creates and returns the name of a temporary directory. */
424 static char *
425 make_temp_dir (void) 
426 {
427   const char *parent_dir;
428   char *temp_dir;
429
430   if (getenv ("TMPDIR") != NULL)
431     parent_dir = getenv ("TMPDIR");
432   else
433     parent_dir = P_tmpdir;
434
435   temp_dir = xmalloc (strlen (parent_dir) + 32);
436   sprintf (temp_dir, "%s%cpsppXXXXXX", parent_dir, DIR_SEPARATOR);
437   if (mkdtemp (temp_dir) == NULL) 
438     {
439       msg (SE, _("%s: Creating temporary directory: %s."),
440            temp_dir, strerror (errno));
441       free (temp_dir);
442       return NULL;
443     }
444   else
445     return temp_dir;
446 }
447 #else /* !HAVE_MKDTEMP */
448 /* Creates directory DIR. */
449 static int
450 do_mkdir (const char *dir) 
451 {
452 #ifndef __MSDOS__
453   return mkdir (dir, S_IRWXU);
454 #else
455   return mkdir (dir);
456 #endif
457 }
458
459 /* Creates and returns the name of a temporary directory. */
460 static char *
461 make_temp_dir (void) 
462 {
463   int i;
464   
465   for (i = 0; i < 100; i++)
466     {
467       char temp_dir[L_tmpnam + 1];
468       if (tmpnam (temp_dir) == NULL) 
469         {
470           msg (SE, _("Generating temporary directory name failed: %s."),
471                strerror (errno));
472           return NULL; 
473         }
474       else if (do_mkdir (temp_dir) == 0)
475         return xstrdup (temp_dir);
476     }
477   
478   msg (SE, _("Creating temporary directory failed: %s."), strerror (errno));
479   return NULL;
480 }
481 #endif /* !HAVE_MKDTEMP */
482
483 /* Sets up to open temporary files. */
484 static int
485 init_external_sort (struct external_sort *xsrt)
486 {
487   /* Zero. */
488   xsrt->temp_dir = NULL;
489   xsrt->next_file_idx = 0;
490
491   /* Huffman queue. */
492   xsrt->run_cap = 512;
493   xsrt->run_cnt = 0;
494   xsrt->initial_runs = xmalloc (sizeof *xsrt->initial_runs * xsrt->run_cap);
495
496   /* Temporary directory. */
497   xsrt->temp_dir = make_temp_dir ();
498   xsrt->temp_name = NULL;
499   if (xsrt->temp_dir == NULL)
500     return 0;
501   xsrt->temp_name = xmalloc (strlen (xsrt->temp_dir) + 64);
502
503   return 1;
504 }
505
506 /* Returns nonzero if we should return an error even though the
507    operation succeeded.  Useful for testing. */
508 static int
509 simulate_error (void) 
510 {
511   static int op_err_cnt = -1;
512   static int op_cnt;
513   
514   if (op_err_cnt == -1 || op_cnt++ < op_err_cnt)
515     return 0;
516   else
517     {
518       errno = 0;
519       return 1;
520     }
521 }
522
523 /* Removes the directory created for temporary files, if one
524    exists. */
525 static void
526 rmdir_temp_dir (struct external_sort *xsrt)
527 {
528   if (xsrt->temp_dir != NULL && rmdir (xsrt->temp_dir) == -1) 
529     {
530       msg (SW, _("%s: Error removing directory for temporary files: %s."),
531            xsrt->temp_dir, strerror (errno));
532       xsrt->temp_dir = NULL; 
533     }
534 }
535
536 /* Returns the name of temporary file number FILE_IDX for XSRT.
537    The name is written into a static buffer, so be careful.  */
538 static char *
539 get_temp_file_name (struct external_sort *xsrt, int file_idx)
540 {
541   assert (xsrt->temp_dir != NULL);
542   sprintf (xsrt->temp_name, "%s%c%04d",
543            xsrt->temp_dir, DIR_SEPARATOR, file_idx);
544   return xsrt->temp_name;
545 }
546
547 /* Opens temporary file numbered FILE_IDX for XSRT with mode MODE
548    and returns the FILE *. */
549 static FILE *
550 open_temp_file (struct external_sort *xsrt, int file_idx, const char *mode)
551 {
552   char *temp_file;
553   FILE *file;
554
555   temp_file = get_temp_file_name (xsrt, file_idx);
556
557   file = fopen (temp_file, mode);
558   if (simulate_error () || file == NULL) 
559     msg (SE, _("%s: Error opening temporary file for %s: %s."),
560          temp_file, mode[0] == 'r' ? "reading" : "writing",
561          strerror (errno));
562
563   return file;
564 }
565
566 /* Closes FILE, which is the temporary file numbered FILE_IDX
567    under XSRT.  Returns nonzero only if successful.  */
568 static int
569 close_temp_file (struct external_sort *xsrt, int file_idx, FILE *file)
570 {
571   if (file != NULL) 
572     {
573       char *temp_file = get_temp_file_name (xsrt, file_idx);
574       if (simulate_error () || fclose (file) == EOF) 
575         {
576           msg (SE, _("%s: Error closing temporary file: %s."),
577                temp_file, strerror (errno));
578           return 0;
579         }
580     }
581   return 1;
582 }
583
584 /* Delete temporary file numbered FILE_IDX for XSRT. */
585 static void
586 remove_temp_file (struct external_sort *xsrt, int file_idx) 
587 {
588   if (file_idx != -1)
589     {
590       char *temp_file = get_temp_file_name (xsrt, file_idx);
591       if (simulate_error () || remove (temp_file) != 0)
592         msg (SW, _("%s: Error removing temporary file: %s."),
593              temp_file, strerror (errno));
594     }
595 }
596
597 /* Writes SIZE bytes from buffer DATA into FILE, which is
598    temporary file numbered FILE_IDX from XSRT. */
599 static int
600 write_temp_file (struct external_sort *xsrt, int file_idx,
601                  FILE *file, const void *data, size_t size) 
602 {
603   if (!simulate_error () && fwrite (data, size, 1, file) == 1)
604     return 1;
605   else
606     {
607       char *temp_file = get_temp_file_name (xsrt, file_idx);
608       msg (SE, _("%s: Error writing temporary file: %s."),
609            temp_file, strerror (errno));
610       return 0;
611     }
612 }
613
614 /* Reads SIZE bytes into buffer DATA into FILE, which is
615    temporary file numbered FILE_IDX from XSRT. */
616 static int
617 read_temp_file (struct external_sort *xsrt, int file_idx,
618                 FILE *file, void *data, size_t size) 
619 {
620   if (!simulate_error () && fread (data, size, 1, file) == 1)
621     return 1;
622   else 
623     {
624       char *temp_file = get_temp_file_name (xsrt, file_idx);
625       if (ferror (file))
626         msg (SE, _("%s: Error reading temporary file: %s."),
627              temp_file, strerror (errno));
628       else
629         msg (SE, _("%s: Unexpected end of temporary file."),
630              temp_file);
631       return 0;
632     }
633 }
634 \f
635 /* Replacement selection. */
636
637 /* Pairs a record with a run number. */
638 struct record_run
639   {
640     int run;                    /* Run number of case. */
641     struct case_list *record;   /* Case data. */
642   };
643
644 /* Represents a set of initial runs during an external sort. */
645 struct initial_run_state 
646   {
647     struct external_sort *xsrt;
648
649     int *idx_to_fv;             /* Translation table copied from sink. */
650
651     /* Reservoir. */
652     struct record_run *records; /* Records arranged as a heap. */
653     size_t record_cnt;          /* Current number of records. */
654     size_t record_cap;          /* Capacity for records. */
655     struct case_list *free_list;/* Cases not in heap. */
656     
657     /* Run currently being output. */
658     int file_idx;               /* Temporary file number. */
659     size_t case_cnt;            /* Number of cases so far. */
660     FILE *output_file;          /* Output file. */
661     struct case_list *last_output;/* Record last output. */
662
663     int okay;                   /* Zero if an error has been encountered. */
664   };
665
666 static const struct case_sink_class sort_sink_class;
667
668 static void destroy_initial_run_state (struct initial_run_state *irs);
669 static int allocate_cases (struct initial_run_state *);
670 static struct case_list *grab_case (struct initial_run_state *);
671 static void release_case (struct initial_run_state *, struct case_list *);
672 static void output_record (struct initial_run_state *irs);
673 static void start_run (struct initial_run_state *irs);
674 static void end_run (struct initial_run_state *irs);
675 static int compare_record_run (const struct record_run *,
676                                const struct record_run *,
677                                struct initial_run_state *);
678 static int compare_record_run_minheap (const void *, const void *, void *);
679
680 /* Writes initial runs for XSRT, sending them to a separate file
681    if SEPARATE is nonzero. */
682 static int
683 write_initial_runs (struct external_sort *xsrt, int separate)
684 {
685   struct initial_run_state *irs;
686   int success = 0;
687
688   /* Allocate memory for cases. */
689   irs = xmalloc (sizeof *irs);
690   irs->xsrt = xsrt;
691   irs->records = NULL;
692   irs->record_cnt = irs->record_cap = 0;
693   irs->free_list = NULL;
694   irs->output_file = NULL;
695   irs->last_output = NULL;
696   irs->file_idx = 0;
697   irs->case_cnt = 0;
698   irs->okay = 1;
699   if (!allocate_cases (irs)) 
700     goto done;
701
702   /* Create case sink. */
703   if (!separate)
704     {
705       if (vfm_sink != NULL && vfm_sink->class->destroy != NULL)
706         vfm_sink->class->destroy (vfm_sink);
707       vfm_sink = create_case_sink (&sort_sink_class, default_dict, irs);
708       xsrt->scp->ref_cnt++;
709     }
710
711   /* Create initial runs. */
712   start_run (irs);
713   procedure (NULL, NULL);
714   irs->idx_to_fv = NULL;
715   while (irs->record_cnt > 0 && irs->okay)
716     output_record (irs);
717   end_run (irs);
718
719   success = irs->okay;
720
721  done:
722   destroy_initial_run_state (irs);
723
724   return success;
725 }
726
727 /* Add a single case to an initial run. */
728 static void
729 sort_sink_write (struct case_sink *sink, const struct ccase *c)
730 {
731   struct initial_run_state *irs = sink->aux;
732   struct record_run *new_record_run;
733
734   if (!irs->okay)
735     return;
736
737   irs->idx_to_fv = sink->idx_to_fv;
738
739   /* Compose record_run for this run and add to heap. */
740   assert (irs->record_cnt < irs->record_cap);
741   new_record_run = irs->records + irs->record_cnt++;
742   new_record_run->record = grab_case (irs);
743   memcpy (new_record_run->record->c.data, c->data, irs->xsrt->scp->case_size);
744   new_record_run->run = irs->file_idx;
745   if (irs->last_output != NULL
746       && compare_record (c->data, irs->last_output->c.data,
747                          irs->xsrt->scp, sink->idx_to_fv) < 0)
748     new_record_run->run = irs->xsrt->next_file_idx;
749   push_heap (irs->records, irs->record_cnt, sizeof *irs->records,
750              compare_record_run_minheap, irs);
751
752   /* Output a record if the reservoir is full. */
753   if (irs->record_cnt == irs->record_cap && irs->okay)
754     output_record (irs);
755 }
756
757 /* Destroys the initial run state represented by IRS. */
758 static void
759 destroy_initial_run_state (struct initial_run_state *irs) 
760 {
761   struct case_list *iter, *next;
762   int i;
763
764   if (irs == NULL)
765     return;
766
767   /* Release cases to free list. */
768   for (i = 0; i < irs->record_cnt; i++)
769     release_case (irs, irs->records[i].record);
770   if (irs->last_output != NULL)
771     release_case (irs, irs->last_output);
772
773   /* Free cases in free list. */
774   for (iter = irs->free_list; iter != NULL; iter = next) 
775     {
776       next = iter->next;
777       free (iter);
778     }
779
780   free (irs->records);
781   if (irs->output_file != NULL)
782     close_temp_file (irs->xsrt, irs->file_idx, irs->output_file);
783
784   free (irs);
785 }
786
787 /* Allocates room for lots of cases as a buffer. */
788 static int
789 allocate_cases (struct initial_run_state *irs)
790 {
791   int approx_case_cost; /* Approximate memory cost of one case in bytes. */
792   int max_cases;        /* Maximum number of cases to allocate. */
793   int i;
794
795   /* Allocate as many cases as we can within the workspace
796      limit. */
797   approx_case_cost = (sizeof *irs->records
798                       + sizeof *irs->free_list
799                       + irs->xsrt->scp->case_size
800                       + 4 * sizeof (void *));
801   max_cases = get_max_workspace() / approx_case_cost;
802   irs->records = malloc (sizeof *irs->records * max_cases);
803   for (i = 0; i < max_cases; i++)
804     {
805       struct case_list *c;
806       c = malloc (sizeof *c
807                   + irs->xsrt->scp->case_size
808                   - sizeof (union value));
809       if (c == NULL) 
810         {
811           max_cases = i;
812           break;
813         }
814       release_case (irs, c);
815     }
816
817   /* irs->records gets all but one of the allocated cases.
818      The extra is used for last_output. */
819   irs->record_cap = max_cases - 1;
820
821   /* Fail if we didn't allocate an acceptable number of cases. */
822   if (irs->records == NULL || max_cases < MIN_BUFFER_TOTAL_SIZE_RECS)
823     {
824       msg (SE, _("Out of memory.  Could not allocate room for minimum of %d "
825                  "cases of %d bytes each.  (PSPP workspace is currently "
826                  "restricted to a maximum of %d KB.)"),
827            MIN_BUFFER_TOTAL_SIZE_RECS, approx_case_cost, get_max_workspace() / 1024);
828       return 0;
829     }
830   return 1;
831 }
832
833 /* Compares the VAR_CNT variables in VARS[] between the `value's at
834    A and B, and returns a strcmp()-type result. */
835 static int
836 compare_record (const union value *a, const union value *b,
837                 const struct sort_cases_pgm *scp,
838                 int *idx_to_fv)
839 {
840   int i;
841
842   assert (a != NULL);
843   assert (b != NULL);
844   
845   for (i = 0; i < scp->var_cnt; i++)
846     {
847       struct variable *v = scp->vars[i];
848       int fv;
849       int result;
850
851       if (idx_to_fv != NULL)
852         fv = idx_to_fv[v->index];
853       else
854         fv = v->fv;
855       
856       if (v->type == NUMERIC)
857         {
858           double af = a[fv].f;
859           double bf = b[fv].f;
860           
861           result = af < bf ? -1 : af > bf;
862         }
863       else
864         result = memcmp (a[fv].s, b[fv].s, v->width);
865
866       if (result != 0) 
867         {
868           if (scp->dirs[i] == SRT_DESCEND)
869             result = -result;
870           return result;
871         }
872     }
873
874   return 0;
875 }
876
877 /* Compares record-run tuples A and B on run number first, then
878    on the current record according to SCP. */
879 static int
880 compare_record_run (const struct record_run *a,
881                     const struct record_run *b,
882                     struct initial_run_state *irs)
883 {
884   if (a->run != b->run)
885     return a->run > b->run ? 1 : -1;
886   else
887     return compare_record (a->record->c.data, b->record->c.data,
888                            irs->xsrt->scp, irs->idx_to_fv);
889 }
890
891 /* Compares record-run tuples A and B on run number first, then
892    on the current record according to SCP, but in descending
893    order. */
894 static int
895 compare_record_run_minheap (const void *a, const void *b, void *irs) 
896 {
897   return -compare_record_run (a, b, irs);
898 }
899
900 /* Begins a new initial run, specifically its output file. */
901 static void
902 start_run (struct initial_run_state *irs)
903 {
904   irs->file_idx = irs->xsrt->next_file_idx++;
905   irs->case_cnt = 0;
906   irs->output_file = open_temp_file (irs->xsrt, irs->file_idx, "wb");
907   if (irs->output_file == NULL) 
908     irs->okay = 0;
909   if (irs->last_output != NULL) 
910     {
911       release_case (irs, irs->last_output);
912       irs->last_output = NULL; 
913     }
914 }
915
916 /* Ends the current initial run.  */
917 static void
918 end_run (struct initial_run_state *irs)
919 {
920   struct external_sort *xsrt = irs->xsrt;
921   
922   /* Record initial run. */
923   if (xsrt->run_cnt >= xsrt->run_cap) 
924     {
925       xsrt->run_cap *= 2;
926       xsrt->initial_runs
927         = xrealloc (xsrt->initial_runs,
928                     sizeof *xsrt->initial_runs * xsrt->run_cap);
929     }
930   xsrt->initial_runs[xsrt->run_cnt].file_idx = irs->file_idx;
931   xsrt->initial_runs[xsrt->run_cnt].case_cnt = irs->case_cnt;
932   xsrt->run_cnt++;
933
934   /* Close file handle. */
935   if (irs->output_file != NULL
936       && !close_temp_file (irs->xsrt, irs->file_idx, irs->output_file)) 
937     irs->okay = 0;
938   irs->output_file = NULL;
939 }
940
941 /* Writes a record to the current initial run. */
942 static void
943 output_record (struct initial_run_state *irs)
944 {
945   struct record_run *record_run;
946   
947   /* Extract minimum case from heap. */
948   assert (irs->record_cnt > 0);
949   pop_heap (irs->records, irs->record_cnt--, sizeof *irs->records,
950             compare_record_run_minheap, irs);
951   record_run = irs->records + irs->record_cnt;
952
953   /* Bail if an error has occurred. */
954   if (!irs->okay)
955     return;
956
957   /* Start new run if necessary. */
958   assert (record_run->run == irs->file_idx
959           || record_run->run == irs->xsrt->next_file_idx);
960   if (record_run->run != irs->file_idx)
961     {
962       end_run (irs);
963       start_run (irs);
964     }
965   assert (record_run->run == irs->file_idx);
966   irs->case_cnt++;
967
968   /* Write to disk. */
969   if (irs->output_file != NULL
970       && !write_temp_file (irs->xsrt, irs->file_idx, irs->output_file,
971                            &record_run->record->c, irs->xsrt->scp->case_size))
972     irs->okay = 0;
973
974   /* This record becomes last_output. */
975   if (irs->last_output != NULL)
976     release_case (irs, irs->last_output);
977   irs->last_output = record_run->record;
978 }
979
980 /* Gets a case from the free list in IRS.  It is an error to call
981    this function if the free list is empty. */
982 static struct case_list *
983 grab_case (struct initial_run_state *irs)
984 {
985   struct case_list *c;
986   
987   assert (irs != NULL);
988   assert (irs->free_list != NULL);
989
990   c = irs->free_list;
991   irs->free_list = c->next;
992   return c;
993 }
994
995 /* Returns C to the free list in IRS. */
996 static void 
997 release_case (struct initial_run_state *irs, struct case_list *c) 
998 {
999   assert (irs != NULL);
1000   assert (c != NULL);
1001
1002   c->next = irs->free_list;
1003   irs->free_list = c;
1004 }
1005 \f
1006 /* Merging. */
1007
1008 /* State of merging initial runs. */
1009 struct merge_state 
1010   {
1011     struct external_sort *xsrt; /* External sort state. */
1012     struct ccase **cases;       /* Buffers. */
1013     size_t case_cnt;            /* Number of buffers. */
1014   };
1015
1016 struct run;
1017 static int merge_once (struct merge_state *,
1018                        const struct initial_run[], size_t,
1019                        struct initial_run *);
1020 static int fill_run_buffer (struct merge_state *, struct run *);
1021 static int mod (int, int);
1022
1023 /* Performs a series of P-way merges of initial runs
1024    method. */
1025 static int
1026 merge (struct external_sort *xsrt)
1027 {
1028   struct merge_state mrg;       /* State of merge. */
1029   size_t approx_case_cost;      /* Approximate memory cost of one case. */
1030   int max_order;                /* Maximum order of merge. */
1031   size_t dummy_run_cnt;         /* Number of dummy runs to insert. */
1032   int success = 0;
1033   int i;
1034
1035   mrg.xsrt = xsrt;
1036
1037   /* Allocate as many cases as possible into cases. */
1038   approx_case_cost = (sizeof *mrg.cases
1039                       + xsrt->scp->case_size + 4 * sizeof (void *));
1040   mrg.case_cnt = get_max_workspace() / approx_case_cost;
1041   mrg.cases = malloc (sizeof *mrg.cases * mrg.case_cnt);
1042   if (mrg.cases == NULL)
1043     goto done;
1044   for (i = 0; i < mrg.case_cnt; i++) 
1045     {
1046       mrg.cases[i] = malloc (xsrt->scp->case_size);
1047       if (mrg.cases[i] == NULL) 
1048         {
1049           mrg.case_cnt = i;
1050           break;
1051         }
1052     }
1053   if (mrg.case_cnt < MIN_BUFFER_TOTAL_SIZE_RECS)
1054     {
1055       msg (SE, _("Out of memory.  Could not allocate room for minimum of %d "
1056                  "cases of %d bytes each.  (PSPP workspace is currently "
1057                  "restricted to a maximum of %d KB.)"),
1058            MIN_BUFFER_TOTAL_SIZE_RECS, approx_case_cost, get_max_workspace() / 1024);
1059       return 0;
1060     }
1061
1062   /* Determine maximum order of merge. */
1063   max_order = MAX_MERGE_ORDER;
1064   if (mrg.case_cnt / max_order < MIN_BUFFER_SIZE_RECS)
1065     max_order = mrg.case_cnt / MIN_BUFFER_SIZE_RECS;
1066   else if (mrg.case_cnt / max_order * xsrt->scp->case_size
1067            < MIN_BUFFER_SIZE_BYTES)
1068     max_order = mrg.case_cnt / (MIN_BUFFER_SIZE_BYTES / xsrt->scp->case_size);
1069   if (max_order < 2)
1070     max_order = 2;
1071   if (max_order > xsrt->run_cnt)
1072     max_order = xsrt->run_cnt;
1073
1074   /* Repeatedly merge the P shortest existing runs until only one run
1075      is left. */
1076   make_heap (xsrt->initial_runs, xsrt->run_cnt, sizeof *xsrt->initial_runs,
1077              compare_initial_runs, NULL);
1078   dummy_run_cnt = mod (1 - (int) xsrt->run_cnt, max_order - 1);
1079   assert (max_order == 1
1080           || (xsrt->run_cnt + dummy_run_cnt) % (max_order - 1) == 1);
1081   while (xsrt->run_cnt > 1)
1082     {
1083       struct initial_run output_run;
1084       int order;
1085       int i;
1086
1087       /* Choose order of merge (max_order after first merge). */
1088       order = max_order - dummy_run_cnt;
1089       dummy_run_cnt = 0;
1090
1091       /* Choose runs to merge. */
1092       assert (xsrt->run_cnt >= order);
1093       for (i = 0; i < order; i++) 
1094         pop_heap (xsrt->initial_runs, xsrt->run_cnt--,
1095                   sizeof *xsrt->initial_runs,
1096                   compare_initial_runs, NULL); 
1097           
1098       /* Merge runs. */
1099       if (!merge_once (&mrg, xsrt->initial_runs + xsrt->run_cnt, order,
1100                        &output_run))
1101         goto done;
1102
1103       /* Add output run to heap. */
1104       xsrt->initial_runs[xsrt->run_cnt++] = output_run;
1105       push_heap (xsrt->initial_runs, xsrt->run_cnt, sizeof *xsrt->initial_runs,
1106                  compare_initial_runs, NULL);
1107     }
1108
1109   /* Exactly one run is left, which contains the entire sorted
1110      file.  We could use it to find a total case count. */
1111   assert (xsrt->run_cnt == 1);
1112
1113   success = 1;
1114
1115  done:
1116   for (i = 0; i < mrg.case_cnt; i++)
1117     free (mrg.cases[i]);
1118   free (mrg.cases);
1119
1120   return success;
1121 }
1122
1123 /* Modulo function as defined by Knuth. */
1124 static int
1125 mod (int x, int y)
1126 {
1127   if (y == 0)
1128     return x;
1129   else if (x == 0)
1130     return 0;
1131   else if (x > 0 && y > 0)
1132     return x % y;
1133   else if (x < 0 && y > 0)
1134     return y - (-x) % y;
1135
1136   assert (0);
1137   abort ();
1138 }
1139
1140 /* A run of data for use in merging. */
1141 struct run 
1142   {
1143     FILE *file;                 /* File that contains run. */
1144     int file_idx;               /* Index of file that contains run. */
1145     struct ccase **buffer;      /* Case buffer. */
1146     struct ccase **buffer_head; /* First unconsumed case in buffer. */
1147     struct ccase **buffer_tail; /* One past last unconsumed case in buffer. */
1148     size_t buffer_cap;          /* Number of cases buffer can hold. */
1149     size_t unread_case_cnt;     /* Number of cases not yet read. */
1150   };
1151
1152 /* Merges the RUN_CNT initial runs specified in INPUT_RUNS into a
1153    new run.  Returns nonzero only if successful.  Adds an entry
1154    to MRG->xsrt->runs for the output file if and only if the
1155    output file is actually created.  Always deletes all the input
1156    files. */
1157 static int
1158 merge_once (struct merge_state *mrg,
1159             const struct initial_run input_runs[],
1160             size_t run_cnt,
1161             struct initial_run *output_run)
1162 {
1163   struct run runs[MAX_MERGE_ORDER];
1164   FILE *output_file = NULL;
1165   int success = 0;
1166   int i;
1167
1168   /* Initialize runs[]. */
1169   for (i = 0; i < run_cnt; i++) 
1170     {
1171       runs[i].file = NULL;
1172       runs[i].file_idx = input_runs[i].file_idx;
1173       runs[i].buffer = mrg->cases + mrg->case_cnt / run_cnt * i;
1174       runs[i].buffer_head = runs[i].buffer;
1175       runs[i].buffer_tail = runs[i].buffer;
1176       runs[i].buffer_cap = mrg->case_cnt / run_cnt;
1177       runs[i].unread_case_cnt = input_runs[i].case_cnt;
1178     }
1179
1180   /* Open input files. */
1181   for (i = 0; i < run_cnt; i++) 
1182     {
1183       runs[i].file = open_temp_file (mrg->xsrt, runs[i].file_idx, "rb");
1184       if (runs[i].file == NULL)
1185         goto error;
1186     }
1187   
1188   /* Create output file and count cases to be output. */
1189   output_run->file_idx = mrg->xsrt->next_file_idx++;
1190   output_run->case_cnt = 0;
1191   for (i = 0; i < run_cnt; i++)
1192     output_run->case_cnt += input_runs[i].case_cnt;
1193   output_file = open_temp_file (mrg->xsrt, output_run->file_idx, "wb");
1194   if (output_file == NULL) 
1195     goto error;
1196
1197   /* Prime buffers. */
1198   for (i = 0; i < run_cnt; i++)
1199     if (!fill_run_buffer (mrg, runs + i))
1200       goto error;
1201
1202   /* Merge. */
1203   while (run_cnt > 0) 
1204     {
1205       struct run *min_run;
1206
1207       /* Find minimum. */
1208       min_run = runs;
1209       for (i = 1; i < run_cnt; i++)
1210         if (compare_record ((*runs[i].buffer_head)->data,
1211                             (*min_run->buffer_head)->data,
1212                             mrg->xsrt->scp, NULL) < 0)
1213           min_run = runs + i;
1214
1215       /* Write minimum to output file. */
1216       if (!write_temp_file (mrg->xsrt, min_run->file_idx, output_file,
1217                             (*min_run->buffer_head)->data,
1218                             mrg->xsrt->scp->case_size))
1219         goto error;
1220
1221       /* Remove case from buffer. */
1222       if (++min_run->buffer_head >= min_run->buffer_tail)
1223         {
1224           /* Buffer is empty.  Fill from file. */
1225           if (!fill_run_buffer (mrg, min_run))
1226             goto error;
1227
1228           /* If buffer is still empty, delete its run. */
1229           if (min_run->buffer_head >= min_run->buffer_tail)
1230             {
1231               close_temp_file (mrg->xsrt, min_run->file_idx, min_run->file);
1232               remove_temp_file (mrg->xsrt, min_run->file_idx);
1233               *min_run = runs[--run_cnt];
1234
1235               /* We could donate the now-unused buffer space to
1236                  other runs. */
1237             }
1238         } 
1239     }
1240
1241   /* Close output file.  */
1242   close_temp_file (mrg->xsrt, output_run->file_idx, output_file);
1243
1244   return 1;
1245
1246  error:
1247   /* Close and remove output file.  */
1248   if (output_file != NULL) 
1249     {
1250       close_temp_file (mrg->xsrt, output_run->file_idx, output_file);
1251       remove_temp_file (mrg->xsrt, output_run->file_idx);
1252     }
1253   
1254   /* Close and remove any remaining input runs. */
1255   for (i = 0; i < run_cnt; i++) 
1256     {
1257       close_temp_file (mrg->xsrt, runs[i].file_idx, runs[i].file);
1258       remove_temp_file (mrg->xsrt, runs[i].file_idx);
1259     }
1260
1261   return success;
1262 }
1263
1264 /* Reads as many cases as possible into RUN's buffer.
1265    Reads nonzero unless a disk error occurs. */
1266 static int
1267 fill_run_buffer (struct merge_state *mrg, struct run *run) 
1268 {
1269   run->buffer_head = run->buffer_tail = run->buffer;
1270   while (run->unread_case_cnt > 0
1271          && run->buffer_tail < run->buffer + run->buffer_cap)
1272     {
1273       if (!read_temp_file (mrg->xsrt, run->file_idx, run->file,
1274                            (*run->buffer_tail)->data,
1275                            mrg->xsrt->scp->case_size))
1276         return 0;
1277
1278       run->unread_case_cnt--;
1279       run->buffer_tail++;
1280     }
1281
1282   return 1;
1283 }
1284 \f
1285 static struct case_source *
1286 sort_sink_make_source (struct case_sink *sink) 
1287 {
1288   struct initial_run_state *irs = sink->aux;
1289
1290   return create_case_source (&sort_source_class, default_dict,
1291                              irs->xsrt->scp);
1292 }
1293
1294 static const struct case_sink_class sort_sink_class = 
1295   {
1296     "SORT CASES",
1297     NULL,
1298     sort_sink_write,
1299     NULL,
1300     sort_sink_make_source,
1301   };
1302 \f
1303 struct sort_source_aux 
1304   {
1305     struct sort_cases_pgm *scp;
1306     struct ccase *dst;
1307     write_case_func *write_case;
1308     write_case_data wc_data;
1309   };
1310
1311 /* Passes C to the write_case function. */
1312 static int
1313 sort_source_read_helper (const struct ccase *src, void *aux_) 
1314 {
1315   struct sort_source_aux *aux = aux_;
1316
1317   memcpy (aux->dst, src, aux->scp->case_size);
1318   return aux->write_case (aux->wc_data);
1319 }
1320
1321 /* Reads all the records from the source stream and passes them
1322    to write_case(). */
1323 static void
1324 sort_source_read (struct case_source *source,
1325                   struct ccase *c,
1326                   write_case_func *write_case, write_case_data wc_data)
1327 {
1328   struct sort_cases_pgm *scp = source->aux;
1329   struct sort_source_aux aux;
1330
1331   aux.scp = scp;
1332   aux.dst = c;
1333   aux.write_case = write_case;
1334   aux.wc_data = wc_data;
1335   
1336   read_sort_output (scp, sort_source_read_helper, &aux);
1337 }
1338
1339 static void read_internal_sort_output (struct internal_sort *isrt,
1340                                        read_sort_output_func *, void *aux);
1341 static void read_external_sort_output (struct external_sort *xsrt,
1342                                        read_sort_output_func *, void *aux);
1343
1344 /* Reads all the records from the output stream and passes them to the
1345    function provided, which must have an interface identical to
1346    write_case(). */
1347 void
1348 read_sort_output (struct sort_cases_pgm *scp,
1349                   read_sort_output_func *output_func, void *aux)
1350 {
1351   assert ((scp->isrt != NULL) + (scp->xsrt != NULL) <= 1);
1352   if (scp->isrt != NULL)
1353     read_internal_sort_output (scp->isrt, output_func, aux);
1354   else if (scp->xsrt != NULL)
1355     read_external_sort_output (scp->xsrt, output_func, aux);
1356   else 
1357     {
1358       /* No results.  Probably an external sort that failed. */
1359     }
1360 }
1361
1362 static void
1363 read_internal_sort_output (struct internal_sort *isrt,
1364                            read_sort_output_func *output_func,
1365                            void *aux)
1366 {
1367   size_t case_idx;
1368
1369   for (case_idx = 0; case_idx < isrt->case_cnt; case_idx++)
1370     if (!output_func (isrt->cases[case_idx], aux))
1371       break;
1372 }
1373
1374 static void
1375 read_external_sort_output (struct external_sort *xsrt,
1376                            read_sort_output_func *output_func, void *aux)
1377 {
1378   FILE *file;
1379   int file_idx;
1380   size_t i;
1381   struct ccase *c;
1382
1383   assert (xsrt->run_cnt == 1);
1384   file_idx = xsrt->initial_runs[0].file_idx;
1385
1386   file = open_temp_file (xsrt, file_idx, "rb");
1387   if (file == NULL)
1388     {
1389       err_failure ();
1390       return;
1391     }
1392
1393   c = xmalloc (xsrt->scp->case_size);
1394   for (i = 0; i < xsrt->initial_runs[0].case_cnt; i++)
1395     {
1396       if (!read_temp_file (xsrt, file_idx, file, c, xsrt->scp->case_size))
1397         {
1398           err_failure ();
1399           break;
1400         }
1401
1402       if (!output_func (c, aux))
1403         break;
1404     }
1405   free (c);
1406   close_temp_file (xsrt, file_idx, file);
1407 }
1408
1409 static void
1410 sort_source_destroy (struct case_source *source) 
1411 {
1412   struct sort_cases_pgm *scp = source->aux;
1413   
1414   destroy_sort_cases_pgm (scp);
1415 }
1416
1417 const struct case_source_class sort_source_class =
1418   {
1419     "SORT CASES",
1420     NULL, /* FIXME */
1421     sort_source_read,
1422     sort_source_destroy,
1423   };