bb4220b75943a231d6bdd5395d6615bf3acef70a
[pspp-builds.git] / src / sort.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21 #include "sort.h"
22 #include "error.h"
23 #include <limits.h>
24 #include <stdio.h>
25 #include <stdlib.h>
26 #include <errno.h>
27 #include "algorithm.h"
28 #include "alloc.h"
29 #include "bool.h"
30 #include "case.h"
31 #include "casefile.h"
32 #include "command.h"
33 #include "error.h"
34 #include "expressions/public.h"
35 #include "glob.h"
36 #include "lexer.h"
37 #include "misc.h"
38 #include "settings.h"
39 #include "sort-prs.h"
40 #include "str.h"
41 #include "var.h"
42 #include "vfm.h"
43 #include "vfmP.h"
44
45 #include "debug-print.h"
46
47 /* These should only be changed for testing purposes. */
48 static int min_buffers = 64;
49 static int max_buffers = INT_MAX;
50 static bool allow_internal_sort = true;
51
52 static int compare_record (const struct ccase *, const struct ccase *,
53                            const struct sort_criteria *);
54 static struct casefile *do_internal_sort (struct casereader *,
55                                           const struct sort_criteria *);
56 static struct casefile *do_external_sort (struct casereader *,
57                                           const struct sort_criteria *);
58
59 /* Performs the SORT CASES procedures. */
60 int
61 cmd_sort_cases (void)
62 {
63   struct sort_criteria *criteria;
64   bool success = false;
65
66   lex_match (T_BY);
67
68   criteria = sort_parse_criteria (default_dict, NULL, NULL, NULL, NULL);
69   if (criteria == NULL)
70     return CMD_FAILURE;
71
72   if (test_mode && lex_match ('/')) 
73     {
74       if (!lex_force_match_id ("BUFFERS") || !lex_match ('=')
75           || !lex_force_int ())
76         goto done;
77
78       min_buffers = max_buffers = lex_integer ();
79       allow_internal_sort = false;
80       if (max_buffers < 2) 
81         {
82           msg (SE, _("Buffer limit must be at least 2."));
83           goto done;
84         }
85
86       lex_get ();
87     }
88
89   success = sort_active_file_in_place (criteria);
90
91  done:
92   min_buffers = 64;
93   max_buffers = INT_MAX;
94   allow_internal_sort = true;
95   
96   sort_destroy_criteria (criteria);
97   return success ? lex_end_of_command () : CMD_FAILURE;
98 }
99
100 /* Gets ready to sort the active file, either in-place or to a
101    separate casefile. */
102 static void
103 prepare_to_sort_active_file (void) 
104 {
105   /* Cancel temporary transformations and PROCESS IF. */
106   if (temporary != 0)
107     cancel_temporary (); 
108   expr_free (process_if_expr);
109   process_if_expr = NULL;
110
111   /* Make sure source cases are in a storage source. */
112   procedure (NULL, NULL);
113   assert (case_source_is_class (vfm_source, &storage_source_class));
114 }
115
116 /* Sorts the active file in-place according to CRITERIA.
117    Returns nonzero if successful. */
118 int
119 sort_active_file_in_place (const struct sort_criteria *criteria) 
120 {
121   struct casefile *src, *dst;
122   
123   prepare_to_sort_active_file ();
124
125   src = storage_source_get_casefile (vfm_source);
126   dst = sort_execute (casefile_get_destructive_reader (src), criteria);
127   free_case_source (vfm_source);
128   vfm_source = NULL;
129
130   if (dst == NULL) 
131     return 0;
132
133   vfm_source = storage_source_create (dst);
134   return 1;
135 }
136
137 /* Sorts the active file to a separate casefile.  If successful,
138    returns the sorted casefile.  Returns a null pointer on
139    failure. */
140 struct casefile *
141 sort_active_file_to_casefile (const struct sort_criteria *criteria) 
142 {
143   struct casefile *src;
144   
145   prepare_to_sort_active_file ();
146
147   src = storage_source_get_casefile (vfm_source);
148   return sort_execute (casefile_get_reader (src), criteria);
149 }
150
151
152 /* Reads all the cases from READER, which is destroyed.  Sorts
153    the cases according to CRITERIA.  Returns the sorted cases in
154    a newly created casefile. */
155 struct casefile *
156 sort_execute (struct casereader *reader, const struct sort_criteria *criteria)
157 {
158   struct casefile *output = do_internal_sort (reader, criteria);
159   if (output == NULL)
160     output = do_external_sort (reader, criteria);
161   casereader_destroy (reader);
162   return output;
163 }
164 \f
165 /* A case and its index. */
166 struct indexed_case 
167   {
168     struct ccase c;     /* Case. */
169     unsigned long idx;  /* Index to allow for stable sorting. */
170   };
171
172 static int compare_indexed_cases (const void *, const void *, void *);
173
174 /* If the data is in memory, do an internal sort and return a new
175    casefile for the data.  Otherwise, return a null pointer. */
176 static struct casefile *
177 do_internal_sort (struct casereader *reader,
178                   const struct sort_criteria *criteria)
179 {
180   const struct casefile *src;
181   struct casefile *dst;
182   unsigned long case_cnt;
183
184   if (!allow_internal_sort)
185     return NULL;
186
187   src = casereader_get_casefile (reader);
188   if (casefile_get_case_cnt (src) > 1 && !casefile_in_core (src))
189     return NULL;
190       
191   case_cnt = casefile_get_case_cnt (src);
192   dst = casefile_create (casefile_get_value_cnt (src));
193   if (case_cnt != 0) 
194     {
195       struct indexed_case *cases = malloc (sizeof *cases * case_cnt);
196       if (cases != NULL) 
197         {
198           unsigned long i;
199           
200           for (i = 0; i < case_cnt; i++)
201             {
202               casereader_read_xfer_assert (reader, &cases[i].c);
203               cases[i].idx = i;
204             }
205
206           sort (cases, case_cnt, sizeof *cases, compare_indexed_cases,
207                 (void *) criteria);
208       
209           for (i = 0; i < case_cnt; i++)
210             casefile_append_xfer (dst, &cases[i].c);
211
212           free (cases);
213         }
214       else 
215         {
216           /* Failure. */
217           casefile_destroy (dst);
218           dst = NULL;
219         }
220     }
221
222   return dst;
223 }
224
225 /* Compares the variables specified by CRITERIA between the cases
226    at A and B, with a "last resort" comparison for stability, and
227    returns a strcmp()-type result. */
228 static int
229 compare_indexed_cases (const void *a_, const void *b_, void *criteria_)
230 {
231   struct sort_criteria *criteria = criteria_;
232   const struct indexed_case *a = a_;
233   const struct indexed_case *b = b_;
234   int result = compare_record (&a->c, &b->c, criteria);
235   if (result == 0)
236     result = a->idx < b->idx ? -1 : a->idx > b->idx;
237   return result;
238 }
239 \f
240 /* External sort. */
241
242 /* Maximum order of merge (external sort only).  The maximum
243    reasonable value is about 7.  Above that, it would be a good
244    idea to use a heap in merge_once() to select the minimum. */
245 #define MAX_MERGE_ORDER 7
246
247 /* Results of an external sort. */
248 struct external_sort 
249   {
250     const struct sort_criteria *criteria; /* Sort criteria. */
251     size_t value_cnt;                 /* Size of data in `union value's. */
252     struct casefile **runs;           /* Array of initial runs. */
253     size_t run_cnt, run_cap;          /* Number of runs, allocated capacity. */
254   };
255
256 /* Prototypes for helper functions. */
257 static int write_runs (struct external_sort *, struct casereader *);
258 static struct casefile *merge (struct external_sort *);
259 static void destroy_external_sort (struct external_sort *);
260
261 /* Performs a stable external sort of the active file according
262    to the specification in SCP.  Forms initial runs using a heap
263    as a reservoir.  Merges the initial runs according to a
264    pattern that assures stability. */
265 static struct casefile *
266 do_external_sort (struct casereader *reader,
267                   const struct sort_criteria *criteria)
268 {
269   struct external_sort *xsrt;
270
271   casefile_to_disk (casereader_get_casefile (reader));
272
273   xsrt = xmalloc (sizeof *xsrt);
274   xsrt->criteria = criteria;
275   xsrt->value_cnt = casefile_get_value_cnt (casereader_get_casefile (reader));
276   xsrt->run_cap = 512;
277   xsrt->run_cnt = 0;
278   xsrt->runs = xmalloc (sizeof *xsrt->runs * xsrt->run_cap);
279   if (write_runs (xsrt, reader))
280     {
281       struct casefile *output = merge (xsrt);
282       destroy_external_sort (xsrt);
283       return output;
284     }
285   else
286     {
287       destroy_external_sort (xsrt);
288       return NULL;
289     }
290 }
291
292 /* Destroys XSRT. */
293 static void
294 destroy_external_sort (struct external_sort *xsrt) 
295 {
296   if (xsrt != NULL) 
297     {
298       int i;
299       
300       for (i = 0; i < xsrt->run_cnt; i++)
301         casefile_destroy (xsrt->runs[i]);
302       free (xsrt->runs);
303       free (xsrt);
304     }
305 }
306 \f
307 /* Replacement selection. */
308
309 /* Pairs a record with a run number. */
310 struct record_run
311   {
312     int run;                    /* Run number of case. */
313     struct ccase record;        /* Case data. */
314     size_t idx;                 /* Case number (for stability). */
315   };
316
317 /* Represents a set of initial runs during an external sort. */
318 struct initial_run_state 
319   {
320     struct external_sort *xsrt;
321
322     /* Reservoir. */
323     struct record_run *records; /* Records arranged as a heap. */
324     size_t record_cnt;          /* Current number of records. */
325     size_t record_cap;          /* Capacity for records. */
326     
327     /* Run currently being output. */
328     int run;                    /* Run number. */
329     size_t case_cnt;            /* Number of cases so far. */
330     struct casefile *casefile;  /* Output file. */
331     struct ccase last_output;   /* Record last output. */
332
333     int okay;                   /* Zero if an error has been encountered. */
334   };
335
336 static const struct case_sink_class sort_sink_class;
337
338 static void destroy_initial_run_state (struct initial_run_state *);
339 static void process_case (struct initial_run_state *, const struct ccase *,
340                           size_t);
341 static int allocate_cases (struct initial_run_state *);
342 static void output_record (struct initial_run_state *);
343 static void start_run (struct initial_run_state *);
344 static void end_run (struct initial_run_state *);
345 static int compare_record_run (const struct record_run *,
346                                const struct record_run *,
347                                struct initial_run_state *);
348 static int compare_record_run_minheap (const void *, const void *, void *);
349
350 /* Reads cases from READER and composes initial runs in XSRT. */
351 static int
352 write_runs (struct external_sort *xsrt, struct casereader *reader)
353 {
354   struct initial_run_state *irs;
355   struct ccase c;
356   size_t idx = 0;
357   int success = 0;
358
359   /* Allocate memory for cases. */
360   irs = xmalloc (sizeof *irs);
361   irs->xsrt = xsrt;
362   irs->records = NULL;
363   irs->record_cnt = irs->record_cap = 0;
364   irs->run = 0;
365   irs->case_cnt = 0;
366   irs->casefile = NULL;
367   case_nullify (&irs->last_output);
368   irs->okay = 1;
369   if (!allocate_cases (irs)) 
370     goto done;
371
372   /* Create initial runs. */
373   start_run (irs);
374   for (; irs->okay && casereader_read (reader, &c); case_destroy (&c))
375     process_case (irs, &c, idx++);
376   while (irs->okay && irs->record_cnt > 0)
377     output_record (irs);
378   end_run (irs);
379
380   success = irs->okay;
381
382  done:
383   destroy_initial_run_state (irs);
384
385   return success;
386 }
387
388 /* Add a single case to an initial run. */
389 static void
390 process_case (struct initial_run_state *irs, const struct ccase *c, size_t idx)
391 {
392   struct record_run *rr;
393
394   /* Compose record_run for this run and add to heap. */
395   assert (irs->record_cnt < irs->record_cap - 1);
396   rr = irs->records + irs->record_cnt++;
397   case_copy (&rr->record, 0, c, 0, irs->xsrt->value_cnt);
398   rr->run = irs->run;
399   rr->idx = idx;
400   if (!case_is_null (&irs->last_output)
401       && compare_record (c, &irs->last_output, irs->xsrt->criteria) < 0)
402     rr->run = irs->run + 1;
403   push_heap (irs->records, irs->record_cnt, sizeof *irs->records,
404              compare_record_run_minheap, irs);
405
406   /* Output a record if the reservoir is full. */
407   if (irs->record_cnt == irs->record_cap - 1 && irs->okay)
408     output_record (irs);
409 }
410
411 /* Destroys the initial run state represented by IRS. */
412 static void
413 destroy_initial_run_state (struct initial_run_state *irs) 
414 {
415   int i;
416
417   if (irs == NULL)
418     return;
419
420   for (i = 0; i < irs->record_cap; i++)
421     case_destroy (&irs->records[i].record);
422   free (irs->records);
423
424   if (irs->casefile != NULL)
425     casefile_sleep (irs->casefile);
426
427   free (irs);
428 }
429
430 /* Allocates room for lots of cases as a buffer. */
431 static int
432 allocate_cases (struct initial_run_state *irs)
433 {
434   int approx_case_cost; /* Approximate memory cost of one case in bytes. */
435   int max_cases;        /* Maximum number of cases to allocate. */
436   int i;
437
438   /* Allocate as many cases as we can within the workspace
439      limit. */
440   approx_case_cost = (sizeof *irs->records
441                       + irs->xsrt->value_cnt * sizeof (union value)
442                       + 4 * sizeof (void *));
443   max_cases = get_max_workspace() / approx_case_cost;
444   if (max_cases > max_buffers)
445     max_cases = max_buffers;
446   irs->records = malloc (sizeof *irs->records * max_cases);
447   for (i = 0; i < max_cases; i++)
448     if (!case_try_create (&irs->records[i].record, irs->xsrt->value_cnt))
449       {
450         max_cases = i;
451         break;
452       }
453   irs->record_cap = max_cases;
454
455   /* Fail if we didn't allocate an acceptable number of cases. */
456   if (irs->records == NULL || max_cases < min_buffers)
457     {
458       msg (SE, _("Out of memory.  Could not allocate room for minimum of %d "
459                  "cases of %d bytes each.  (PSPP workspace is currently "
460                  "restricted to a maximum of %d KB.)"),
461            min_buffers, approx_case_cost, get_max_workspace() / 1024);
462       return 0;
463     }
464   return 1;
465 }
466
467 /* Compares the VAR_CNT variables in VARS[] between the `value's at
468    A and B, and returns a strcmp()-type result. */
469 static int
470 compare_record (const struct ccase *a, const struct ccase *b,
471                 const struct sort_criteria *criteria)
472 {
473   int i;
474
475   assert (a != NULL);
476   assert (b != NULL);
477   
478   for (i = 0; i < criteria->crit_cnt; i++)
479     {
480       const struct sort_criterion *c = &criteria->crits[i];
481       int result;
482       
483       if (c->width == 0)
484         {
485           double af = case_num (a, c->fv);
486           double bf = case_num (b, c->fv);
487           
488           result = af < bf ? -1 : af > bf;
489         }
490       else
491         result = memcmp (case_str (a, c->fv), case_str (b, c->fv), c->width);
492
493       if (result != 0)
494         return c->dir == SRT_ASCEND ? result : -result;
495     }
496
497   return 0;
498 }
499
500 /* Compares record-run tuples A and B on run number first, then
501    on record, then on case index. */
502 static int
503 compare_record_run (const struct record_run *a,
504                     const struct record_run *b,
505                     struct initial_run_state *irs)
506 {
507   int result = a->run < b->run ? -1 : a->run > b->run;
508   if (result == 0)
509     result = compare_record (&a->record, &b->record, irs->xsrt->criteria);
510   if (result == 0)
511     result = a->idx < b->idx ? -1 : a->idx > b->idx;
512   return result;
513 }
514
515 /* Compares record-run tuples A and B on run number first, then
516    on the current record according to SCP, but in descending
517    order. */
518 static int
519 compare_record_run_minheap (const void *a, const void *b, void *irs) 
520 {
521   return -compare_record_run (a, b, irs);
522 }
523
524 /* Begins a new initial run, specifically its output file. */
525 static void
526 start_run (struct initial_run_state *irs)
527 {
528   irs->run++;
529   irs->case_cnt = 0;
530   irs->casefile = casefile_create (irs->xsrt->value_cnt);
531   casefile_to_disk (irs->casefile);
532   case_nullify (&irs->last_output); 
533 }
534
535 /* Ends the current initial run.  */
536 static void
537 end_run (struct initial_run_state *irs)
538 {
539   struct external_sort *xsrt = irs->xsrt;
540
541   /* Record initial run. */
542   if (irs->casefile != NULL) 
543     {
544       casefile_sleep (irs->casefile);
545       if (xsrt->run_cnt >= xsrt->run_cap) 
546         {
547           xsrt->run_cap *= 2;
548           xsrt->runs = xrealloc (xsrt->runs,
549                                  sizeof *xsrt->runs * xsrt->run_cap);
550         }
551       xsrt->runs[xsrt->run_cnt++] = irs->casefile;
552       irs->casefile = NULL; 
553     }
554 }
555
556 /* Writes a record to the current initial run. */
557 static void
558 output_record (struct initial_run_state *irs)
559 {
560   struct record_run *record_run;
561   struct ccase case_tmp;
562   
563   /* Extract minimum case from heap. */
564   assert (irs->record_cnt > 0);
565   pop_heap (irs->records, irs->record_cnt--, sizeof *irs->records,
566             compare_record_run_minheap, irs);
567   record_run = irs->records + irs->record_cnt;
568
569   /* Bail if an error has occurred. */
570   if (!irs->okay)
571     return;
572
573   /* Start new run if necessary. */
574   assert (record_run->run == irs->run
575           || record_run->run == irs->run + 1);
576   if (record_run->run != irs->run)
577     {
578       end_run (irs);
579       start_run (irs);
580     }
581   assert (record_run->run == irs->run);
582   irs->case_cnt++;
583
584   /* Write to disk. */
585   if (irs->casefile != NULL)
586     casefile_append (irs->casefile, &record_run->record);
587
588   /* This record becomes last_output. */
589   irs->last_output = case_tmp = record_run->record;
590   record_run->record = irs->records[irs->record_cap - 1].record;
591   irs->records[irs->record_cap - 1].record = case_tmp;
592 }
593 \f
594 /* Merging. */
595
596 static int choose_merge (struct casefile *runs[], int run_cnt, int order);
597 static struct casefile *merge_once (struct external_sort *,
598                                     struct casefile *[], size_t);
599
600 /* Repeatedly merges run until only one is left,
601    and returns the final casefile.  */
602 static struct casefile *
603 merge (struct external_sort *xsrt)
604 {
605   while (xsrt->run_cnt > 1)
606     {
607       int order = min (MAX_MERGE_ORDER, xsrt->run_cnt);
608       int idx = choose_merge (xsrt->runs, xsrt->run_cnt, order);
609       xsrt->runs[idx] = merge_once (xsrt, xsrt->runs + idx, order);
610       remove_range (xsrt->runs, xsrt->run_cnt, sizeof *xsrt->runs,
611                     idx + 1, order - 1);
612       xsrt->run_cnt -= order - 1;
613     }
614   assert (xsrt->run_cnt == 1);
615   xsrt->run_cnt = 0;
616   return xsrt->runs[0];
617 }
618
619 /* Chooses ORDER runs out of the RUN_CNT runs in RUNS to merge,
620    and returns the index of the first one.
621
622    For stability, we must merge only consecutive runs.  For
623    efficiency, we choose the shortest consecutive sequence of
624    runs. */
625 static int
626 choose_merge (struct casefile *runs[], int run_cnt, int order) 
627 {
628   int min_idx, min_sum;
629   int cur_idx, cur_sum;
630   int i;
631
632   /* Sum up the length of the first ORDER runs. */
633   cur_sum = 0;
634   for (i = 0; i < order; i++)
635     cur_sum += casefile_get_case_cnt (runs[i]);
636
637   /* Find the shortest group of ORDER runs,
638      using a running total for efficiency. */
639   min_idx = 0;
640   min_sum = cur_sum;
641   for (cur_idx = 1; cur_idx + order <= run_cnt; cur_idx++)
642     {
643       cur_sum -= casefile_get_case_cnt (runs[cur_idx - 1]);
644       cur_sum += casefile_get_case_cnt (runs[cur_idx + order - 1]);
645       if (cur_sum < min_sum)
646         {
647           min_sum = cur_sum;
648           min_idx = cur_idx;
649         }
650     }
651
652   return min_idx;
653 }
654
655 /* Merges the RUN_CNT initial runs specified in INPUT_FILES into a
656    new run, and returns the new run. */
657 static struct casefile *
658 merge_once (struct external_sort *xsrt,
659             struct casefile **const input_files,
660             size_t run_cnt)
661 {
662   struct run
663     {
664       struct casefile *file;
665       struct casereader *reader;
666       struct ccase ccase;
667     }
668   *runs;
669
670   struct casefile *output = NULL;
671   int i;
672
673   /* Open input files. */
674   runs = xmalloc (sizeof *runs * run_cnt);
675   for (i = 0; i < run_cnt; i++) 
676     {
677       struct run *r = &runs[i];
678       r->file = input_files[i];
679       r->reader = casefile_get_destructive_reader (r->file);
680       if (!casereader_read_xfer (r->reader, &r->ccase))
681         {
682           run_cnt--;
683           i--;
684         }
685     }
686
687   /* Create output file. */
688   output = casefile_create (xsrt->value_cnt);
689   casefile_to_disk (output);
690
691   /* Merge. */
692   while (run_cnt > 0) 
693     {
694       struct run *min_run, *run;
695       
696       /* Find minimum. */
697       min_run = runs;
698       for (run = runs + 1; run < runs + run_cnt; run++)
699         if (compare_record (&run->ccase, &min_run->ccase, xsrt->criteria) < 0)
700           min_run = run;
701
702       /* Write minimum to output file. */
703       casefile_append_xfer (output, &min_run->ccase);
704
705       /* Read another case from minimum run. */
706       if (!casereader_read_xfer (min_run->reader, &min_run->ccase))
707         {
708           casereader_destroy (min_run->reader);
709           casefile_destroy (min_run->file);
710
711           remove_element (runs, run_cnt, sizeof *runs, min_run - runs);
712           run_cnt--;
713         } 
714     }
715
716   casefile_sleep (output);
717   free (runs);
718
719   return output;
720 }