Revise.
[pspp] / src / sort.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21 #include "sort.h"
22 #include "error.h"
23 #include <limits.h>
24 #include <stdio.h>
25 #include <stdlib.h>
26 #include <errno.h>
27 #include "algorithm.h"
28 #include "alloc.h"
29 #include <stdbool.h>
30 #include "case.h"
31 #include "casefile.h"
32 #include "command.h"
33 #include "error.h"
34 #include "expressions/public.h"
35 #include "glob.h"
36 #include "lexer.h"
37 #include "misc.h"
38 #include "settings.h"
39 #include "sort-prs.h"
40 #include "str.h"
41 #include "var.h"
42 #include "vfm.h"
43 #include "vfmP.h"
44
45 #include "gettext.h"
46 #define _(msgid) gettext (msgid)
47
48 #include "debug-print.h"
49
50 /* These should only be changed for testing purposes. */
51 static int min_buffers = 64;
52 static int max_buffers = INT_MAX;
53 static bool allow_internal_sort = true;
54
55 static int compare_record (const struct ccase *, const struct ccase *,
56                            const struct sort_criteria *);
57 static struct casefile *do_internal_sort (struct casereader *,
58                                           const struct sort_criteria *);
59 static struct casefile *do_external_sort (struct casereader *,
60                                           const struct sort_criteria *);
61
62 /* Performs the SORT CASES procedures. */
63 int
64 cmd_sort_cases (void)
65 {
66   struct sort_criteria *criteria;
67   bool success = false;
68
69   lex_match (T_BY);
70
71   criteria = sort_parse_criteria (default_dict, NULL, NULL, NULL, NULL);
72   if (criteria == NULL)
73     return CMD_FAILURE;
74
75   if (test_mode && lex_match ('/')) 
76     {
77       if (!lex_force_match_id ("BUFFERS") || !lex_match ('=')
78           || !lex_force_int ())
79         goto done;
80
81       min_buffers = max_buffers = lex_integer ();
82       allow_internal_sort = false;
83       if (max_buffers < 2) 
84         {
85           msg (SE, _("Buffer limit must be at least 2."));
86           goto done;
87         }
88
89       lex_get ();
90     }
91
92   success = sort_active_file_in_place (criteria);
93
94  done:
95   min_buffers = 64;
96   max_buffers = INT_MAX;
97   allow_internal_sort = true;
98   
99   sort_destroy_criteria (criteria);
100   return success ? lex_end_of_command () : CMD_FAILURE;
101 }
102
103 /* Gets ready to sort the active file, either in-place or to a
104    separate casefile. */
105 static void
106 prepare_to_sort_active_file (void) 
107 {
108   /* Cancel temporary transformations and PROCESS IF. */
109   if (temporary != 0)
110     cancel_temporary (); 
111   expr_free (process_if_expr);
112   process_if_expr = NULL;
113
114   /* Make sure source cases are in a storage source. */
115   procedure (NULL, NULL);
116   assert (case_source_is_class (vfm_source, &storage_source_class));
117 }
118
119 /* Sorts the active file in-place according to CRITERIA.
120    Returns nonzero if successful. */
121 int
122 sort_active_file_in_place (const struct sort_criteria *criteria) 
123 {
124   struct casefile *src, *dst;
125   
126   prepare_to_sort_active_file ();
127
128   src = storage_source_get_casefile (vfm_source);
129   dst = sort_execute (casefile_get_destructive_reader (src), criteria);
130   free_case_source (vfm_source);
131   vfm_source = NULL;
132
133   if (dst == NULL) 
134     return 0;
135
136   vfm_source = storage_source_create (dst);
137   return 1;
138 }
139
140 /* Sorts the active file to a separate casefile.  If successful,
141    returns the sorted casefile.  Returns a null pointer on
142    failure. */
143 struct casefile *
144 sort_active_file_to_casefile (const struct sort_criteria *criteria) 
145 {
146   struct casefile *src;
147   
148   prepare_to_sort_active_file ();
149
150   src = storage_source_get_casefile (vfm_source);
151   return sort_execute (casefile_get_reader (src), criteria);
152 }
153
154
155 /* Reads all the cases from READER, which is destroyed.  Sorts
156    the cases according to CRITERIA.  Returns the sorted cases in
157    a newly created casefile. */
158 struct casefile *
159 sort_execute (struct casereader *reader, const struct sort_criteria *criteria)
160 {
161   struct casefile *output = do_internal_sort (reader, criteria);
162   if (output == NULL)
163     output = do_external_sort (reader, criteria);
164   casereader_destroy (reader);
165   return output;
166 }
167 \f
168 /* A case and its index. */
169 struct indexed_case 
170   {
171     struct ccase c;     /* Case. */
172     unsigned long idx;  /* Index to allow for stable sorting. */
173   };
174
175 static int compare_indexed_cases (const void *, const void *, void *);
176
177 /* If the data is in memory, do an internal sort and return a new
178    casefile for the data.  Otherwise, return a null pointer. */
179 static struct casefile *
180 do_internal_sort (struct casereader *reader,
181                   const struct sort_criteria *criteria)
182 {
183   const struct casefile *src;
184   struct casefile *dst;
185   unsigned long case_cnt;
186
187   if (!allow_internal_sort)
188     return NULL;
189
190   src = casereader_get_casefile (reader);
191   if (casefile_get_case_cnt (src) > 1 && !casefile_in_core (src))
192     return NULL;
193       
194   case_cnt = casefile_get_case_cnt (src);
195   dst = casefile_create (casefile_get_value_cnt (src));
196   if (case_cnt != 0) 
197     {
198       struct indexed_case *cases = malloc (sizeof *cases * case_cnt);
199       if (cases != NULL) 
200         {
201           unsigned long i;
202           
203           for (i = 0; i < case_cnt; i++)
204             {
205               casereader_read_xfer_assert (reader, &cases[i].c);
206               cases[i].idx = i;
207             }
208
209           sort (cases, case_cnt, sizeof *cases, compare_indexed_cases,
210                 (void *) criteria);
211       
212           for (i = 0; i < case_cnt; i++)
213             casefile_append_xfer (dst, &cases[i].c);
214
215           free (cases);
216         }
217       else 
218         {
219           /* Failure. */
220           casefile_destroy (dst);
221           dst = NULL;
222         }
223     }
224
225   return dst;
226 }
227
228 /* Compares the variables specified by CRITERIA between the cases
229    at A and B, with a "last resort" comparison for stability, and
230    returns a strcmp()-type result. */
231 static int
232 compare_indexed_cases (const void *a_, const void *b_, void *criteria_)
233 {
234   struct sort_criteria *criteria = criteria_;
235   const struct indexed_case *a = a_;
236   const struct indexed_case *b = b_;
237   int result = compare_record (&a->c, &b->c, criteria);
238   if (result == 0)
239     result = a->idx < b->idx ? -1 : a->idx > b->idx;
240   return result;
241 }
242 \f
243 /* External sort. */
244
245 /* Maximum order of merge (external sort only).  The maximum
246    reasonable value is about 7.  Above that, it would be a good
247    idea to use a heap in merge_once() to select the minimum. */
248 #define MAX_MERGE_ORDER 7
249
250 /* Results of an external sort. */
251 struct external_sort 
252   {
253     const struct sort_criteria *criteria; /* Sort criteria. */
254     size_t value_cnt;                 /* Size of data in `union value's. */
255     struct casefile **runs;           /* Array of initial runs. */
256     size_t run_cnt, run_cap;          /* Number of runs, allocated capacity. */
257   };
258
259 /* Prototypes for helper functions. */
260 static int write_runs (struct external_sort *, struct casereader *);
261 static struct casefile *merge (struct external_sort *);
262 static void destroy_external_sort (struct external_sort *);
263
264 /* Performs a stable external sort of the active file according
265    to the specification in SCP.  Forms initial runs using a heap
266    as a reservoir.  Merges the initial runs according to a
267    pattern that assures stability. */
268 static struct casefile *
269 do_external_sort (struct casereader *reader,
270                   const struct sort_criteria *criteria)
271 {
272   struct external_sort *xsrt;
273
274   casefile_to_disk (casereader_get_casefile (reader));
275
276   xsrt = xmalloc (sizeof *xsrt);
277   xsrt->criteria = criteria;
278   xsrt->value_cnt = casefile_get_value_cnt (casereader_get_casefile (reader));
279   xsrt->run_cap = 512;
280   xsrt->run_cnt = 0;
281   xsrt->runs = xmalloc (sizeof *xsrt->runs * xsrt->run_cap);
282   if (write_runs (xsrt, reader))
283     {
284       struct casefile *output = merge (xsrt);
285       destroy_external_sort (xsrt);
286       return output;
287     }
288   else
289     {
290       destroy_external_sort (xsrt);
291       return NULL;
292     }
293 }
294
295 /* Destroys XSRT. */
296 static void
297 destroy_external_sort (struct external_sort *xsrt) 
298 {
299   if (xsrt != NULL) 
300     {
301       int i;
302       
303       for (i = 0; i < xsrt->run_cnt; i++)
304         casefile_destroy (xsrt->runs[i]);
305       free (xsrt->runs);
306       free (xsrt);
307     }
308 }
309 \f
310 /* Replacement selection. */
311
312 /* Pairs a record with a run number. */
313 struct record_run
314   {
315     int run;                    /* Run number of case. */
316     struct ccase record;        /* Case data. */
317     size_t idx;                 /* Case number (for stability). */
318   };
319
320 /* Represents a set of initial runs during an external sort. */
321 struct initial_run_state 
322   {
323     struct external_sort *xsrt;
324
325     /* Reservoir. */
326     struct record_run *records; /* Records arranged as a heap. */
327     size_t record_cnt;          /* Current number of records. */
328     size_t record_cap;          /* Capacity for records. */
329     
330     /* Run currently being output. */
331     int run;                    /* Run number. */
332     size_t case_cnt;            /* Number of cases so far. */
333     struct casefile *casefile;  /* Output file. */
334     struct ccase last_output;   /* Record last output. */
335
336     int okay;                   /* Zero if an error has been encountered. */
337   };
338
339 static const struct case_sink_class sort_sink_class;
340
341 static void destroy_initial_run_state (struct initial_run_state *);
342 static void process_case (struct initial_run_state *, const struct ccase *,
343                           size_t);
344 static int allocate_cases (struct initial_run_state *);
345 static void output_record (struct initial_run_state *);
346 static void start_run (struct initial_run_state *);
347 static void end_run (struct initial_run_state *);
348 static int compare_record_run (const struct record_run *,
349                                const struct record_run *,
350                                struct initial_run_state *);
351 static int compare_record_run_minheap (const void *, const void *, void *);
352
353 /* Reads cases from READER and composes initial runs in XSRT. */
354 static int
355 write_runs (struct external_sort *xsrt, struct casereader *reader)
356 {
357   struct initial_run_state *irs;
358   struct ccase c;
359   size_t idx = 0;
360   int success = 0;
361
362   /* Allocate memory for cases. */
363   irs = xmalloc (sizeof *irs);
364   irs->xsrt = xsrt;
365   irs->records = NULL;
366   irs->record_cnt = irs->record_cap = 0;
367   irs->run = 0;
368   irs->case_cnt = 0;
369   irs->casefile = NULL;
370   case_nullify (&irs->last_output);
371   irs->okay = 1;
372   if (!allocate_cases (irs)) 
373     goto done;
374
375   /* Create initial runs. */
376   start_run (irs);
377   for (; irs->okay && casereader_read (reader, &c); case_destroy (&c))
378     process_case (irs, &c, idx++);
379   while (irs->okay && irs->record_cnt > 0)
380     output_record (irs);
381   end_run (irs);
382
383   success = irs->okay;
384
385  done:
386   destroy_initial_run_state (irs);
387
388   return success;
389 }
390
391 /* Add a single case to an initial run. */
392 static void
393 process_case (struct initial_run_state *irs, const struct ccase *c, size_t idx)
394 {
395   struct record_run *rr;
396
397   /* Compose record_run for this run and add to heap. */
398   assert (irs->record_cnt < irs->record_cap - 1);
399   rr = irs->records + irs->record_cnt++;
400   case_copy (&rr->record, 0, c, 0, irs->xsrt->value_cnt);
401   rr->run = irs->run;
402   rr->idx = idx;
403   if (!case_is_null (&irs->last_output)
404       && compare_record (c, &irs->last_output, irs->xsrt->criteria) < 0)
405     rr->run = irs->run + 1;
406   push_heap (irs->records, irs->record_cnt, sizeof *irs->records,
407              compare_record_run_minheap, irs);
408
409   /* Output a record if the reservoir is full. */
410   if (irs->record_cnt == irs->record_cap - 1 && irs->okay)
411     output_record (irs);
412 }
413
414 /* Destroys the initial run state represented by IRS. */
415 static void
416 destroy_initial_run_state (struct initial_run_state *irs) 
417 {
418   int i;
419
420   if (irs == NULL)
421     return;
422
423   for (i = 0; i < irs->record_cap; i++)
424     case_destroy (&irs->records[i].record);
425   free (irs->records);
426
427   if (irs->casefile != NULL)
428     casefile_sleep (irs->casefile);
429
430   free (irs);
431 }
432
433 /* Allocates room for lots of cases as a buffer. */
434 static int
435 allocate_cases (struct initial_run_state *irs)
436 {
437   int approx_case_cost; /* Approximate memory cost of one case in bytes. */
438   int max_cases;        /* Maximum number of cases to allocate. */
439   int i;
440
441   /* Allocate as many cases as we can within the workspace
442      limit. */
443   approx_case_cost = (sizeof *irs->records
444                       + irs->xsrt->value_cnt * sizeof (union value)
445                       + 4 * sizeof (void *));
446   max_cases = get_max_workspace() / approx_case_cost;
447   if (max_cases > max_buffers)
448     max_cases = max_buffers;
449   irs->records = malloc (sizeof *irs->records * max_cases);
450   for (i = 0; i < max_cases; i++)
451     if (!case_try_create (&irs->records[i].record, irs->xsrt->value_cnt))
452       {
453         max_cases = i;
454         break;
455       }
456   irs->record_cap = max_cases;
457
458   /* Fail if we didn't allocate an acceptable number of cases. */
459   if (irs->records == NULL || max_cases < min_buffers)
460     {
461       msg (SE, _("Out of memory.  Could not allocate room for minimum of %d "
462                  "cases of %d bytes each.  (PSPP workspace is currently "
463                  "restricted to a maximum of %d KB.)"),
464            min_buffers, approx_case_cost, get_max_workspace() / 1024);
465       return 0;
466     }
467   return 1;
468 }
469
470 /* Compares the VAR_CNT variables in VARS[] between the `value's at
471    A and B, and returns a strcmp()-type result. */
472 static int
473 compare_record (const struct ccase *a, const struct ccase *b,
474                 const struct sort_criteria *criteria)
475 {
476   int i;
477
478   assert (a != NULL);
479   assert (b != NULL);
480   
481   for (i = 0; i < criteria->crit_cnt; i++)
482     {
483       const struct sort_criterion *c = &criteria->crits[i];
484       int result;
485       
486       if (c->width == 0)
487         {
488           double af = case_num (a, c->fv);
489           double bf = case_num (b, c->fv);
490           
491           result = af < bf ? -1 : af > bf;
492         }
493       else
494         result = memcmp (case_str (a, c->fv), case_str (b, c->fv), c->width);
495
496       if (result != 0)
497         return c->dir == SRT_ASCEND ? result : -result;
498     }
499
500   return 0;
501 }
502
503 /* Compares record-run tuples A and B on run number first, then
504    on record, then on case index. */
505 static int
506 compare_record_run (const struct record_run *a,
507                     const struct record_run *b,
508                     struct initial_run_state *irs)
509 {
510   int result = a->run < b->run ? -1 : a->run > b->run;
511   if (result == 0)
512     result = compare_record (&a->record, &b->record, irs->xsrt->criteria);
513   if (result == 0)
514     result = a->idx < b->idx ? -1 : a->idx > b->idx;
515   return result;
516 }
517
518 /* Compares record-run tuples A and B on run number first, then
519    on the current record according to SCP, but in descending
520    order. */
521 static int
522 compare_record_run_minheap (const void *a, const void *b, void *irs) 
523 {
524   return -compare_record_run (a, b, irs);
525 }
526
527 /* Begins a new initial run, specifically its output file. */
528 static void
529 start_run (struct initial_run_state *irs)
530 {
531   irs->run++;
532   irs->case_cnt = 0;
533   irs->casefile = casefile_create (irs->xsrt->value_cnt);
534   casefile_to_disk (irs->casefile);
535   case_nullify (&irs->last_output); 
536 }
537
538 /* Ends the current initial run.  */
539 static void
540 end_run (struct initial_run_state *irs)
541 {
542   struct external_sort *xsrt = irs->xsrt;
543
544   /* Record initial run. */
545   if (irs->casefile != NULL) 
546     {
547       casefile_sleep (irs->casefile);
548       if (xsrt->run_cnt >= xsrt->run_cap) 
549         {
550           xsrt->run_cap *= 2;
551           xsrt->runs = xrealloc (xsrt->runs,
552                                  sizeof *xsrt->runs * xsrt->run_cap);
553         }
554       xsrt->runs[xsrt->run_cnt++] = irs->casefile;
555       irs->casefile = NULL; 
556     }
557 }
558
559 /* Writes a record to the current initial run. */
560 static void
561 output_record (struct initial_run_state *irs)
562 {
563   struct record_run *record_run;
564   struct ccase case_tmp;
565   
566   /* Extract minimum case from heap. */
567   assert (irs->record_cnt > 0);
568   pop_heap (irs->records, irs->record_cnt--, sizeof *irs->records,
569             compare_record_run_minheap, irs);
570   record_run = irs->records + irs->record_cnt;
571
572   /* Bail if an error has occurred. */
573   if (!irs->okay)
574     return;
575
576   /* Start new run if necessary. */
577   assert (record_run->run == irs->run
578           || record_run->run == irs->run + 1);
579   if (record_run->run != irs->run)
580     {
581       end_run (irs);
582       start_run (irs);
583     }
584   assert (record_run->run == irs->run);
585   irs->case_cnt++;
586
587   /* Write to disk. */
588   if (irs->casefile != NULL)
589     casefile_append (irs->casefile, &record_run->record);
590
591   /* This record becomes last_output. */
592   irs->last_output = case_tmp = record_run->record;
593   record_run->record = irs->records[irs->record_cap - 1].record;
594   irs->records[irs->record_cap - 1].record = case_tmp;
595 }
596 \f
597 /* Merging. */
598
599 static int choose_merge (struct casefile *runs[], int run_cnt, int order);
600 static struct casefile *merge_once (struct external_sort *,
601                                     struct casefile *[], size_t);
602
603 /* Repeatedly merges run until only one is left,
604    and returns the final casefile.  */
605 static struct casefile *
606 merge (struct external_sort *xsrt)
607 {
608   while (xsrt->run_cnt > 1)
609     {
610       int order = min (MAX_MERGE_ORDER, xsrt->run_cnt);
611       int idx = choose_merge (xsrt->runs, xsrt->run_cnt, order);
612       xsrt->runs[idx] = merge_once (xsrt, xsrt->runs + idx, order);
613       remove_range (xsrt->runs, xsrt->run_cnt, sizeof *xsrt->runs,
614                     idx + 1, order - 1);
615       xsrt->run_cnt -= order - 1;
616     }
617   assert (xsrt->run_cnt == 1);
618   xsrt->run_cnt = 0;
619   return xsrt->runs[0];
620 }
621
622 /* Chooses ORDER runs out of the RUN_CNT runs in RUNS to merge,
623    and returns the index of the first one.
624
625    For stability, we must merge only consecutive runs.  For
626    efficiency, we choose the shortest consecutive sequence of
627    runs. */
628 static int
629 choose_merge (struct casefile *runs[], int run_cnt, int order) 
630 {
631   int min_idx, min_sum;
632   int cur_idx, cur_sum;
633   int i;
634
635   /* Sum up the length of the first ORDER runs. */
636   cur_sum = 0;
637   for (i = 0; i < order; i++)
638     cur_sum += casefile_get_case_cnt (runs[i]);
639
640   /* Find the shortest group of ORDER runs,
641      using a running total for efficiency. */
642   min_idx = 0;
643   min_sum = cur_sum;
644   for (cur_idx = 1; cur_idx + order <= run_cnt; cur_idx++)
645     {
646       cur_sum -= casefile_get_case_cnt (runs[cur_idx - 1]);
647       cur_sum += casefile_get_case_cnt (runs[cur_idx + order - 1]);
648       if (cur_sum < min_sum)
649         {
650           min_sum = cur_sum;
651           min_idx = cur_idx;
652         }
653     }
654
655   return min_idx;
656 }
657
658 /* Merges the RUN_CNT initial runs specified in INPUT_FILES into a
659    new run, and returns the new run. */
660 static struct casefile *
661 merge_once (struct external_sort *xsrt,
662             struct casefile **const input_files,
663             size_t run_cnt)
664 {
665   struct run
666     {
667       struct casefile *file;
668       struct casereader *reader;
669       struct ccase ccase;
670     }
671   *runs;
672
673   struct casefile *output = NULL;
674   int i;
675
676   /* Open input files. */
677   runs = xmalloc (sizeof *runs * run_cnt);
678   for (i = 0; i < run_cnt; i++) 
679     {
680       struct run *r = &runs[i];
681       r->file = input_files[i];
682       r->reader = casefile_get_destructive_reader (r->file);
683       if (!casereader_read_xfer (r->reader, &r->ccase))
684         {
685           run_cnt--;
686           i--;
687         }
688     }
689
690   /* Create output file. */
691   output = casefile_create (xsrt->value_cnt);
692   casefile_to_disk (output);
693
694   /* Merge. */
695   while (run_cnt > 0) 
696     {
697       struct run *min_run, *run;
698       
699       /* Find minimum. */
700       min_run = runs;
701       for (run = runs + 1; run < runs + run_cnt; run++)
702         if (compare_record (&run->ccase, &min_run->ccase, xsrt->criteria) < 0)
703           min_run = run;
704
705       /* Write minimum to output file. */
706       casefile_append_xfer (output, &min_run->ccase);
707
708       /* Read another case from minimum run. */
709       if (!casereader_read_xfer (min_run->reader, &min_run->ccase))
710         {
711           casereader_destroy (min_run->reader);
712           casefile_destroy (min_run->file);
713
714           remove_element (runs, run_cnt, sizeof *runs, min_run - runs);
715           run_cnt--;
716         } 
717     }
718
719   casefile_sleep (output);
720   free (runs);
721
722   return output;
723 }