9725da80a1a9896149636905599319f9e5d9c229
[pspp-builds.git] / src / math / sort.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21
22 #include "sort.h"
23
24 #include <errno.h>
25 #include <limits.h>
26 #include <stdbool.h>
27 #include <stdio.h>
28 #include <stdlib.h>
29
30 #include <data/case-source.h>
31 #include <data/case.h>
32 #include <data/casefile.h>
33 #include <data/settings.h>
34 #include <data/variable.h>
35 #include <data/storage-stream.h>
36 #include <language/expressions/public.h>
37 #include <libpspp/alloc.h>
38 #include <libpspp/array.h>
39 #include <libpspp/message.h>
40 #include <libpspp/message.h>
41 #include <libpspp/misc.h>
42 #include <libpspp/str.h>
43 #include <procedure.h>
44
45 #include "gettext.h"
46 #define _(msgid) gettext (msgid)
47
48 /* These should only be changed for testing purposes. */
49 int min_buffers = 64;
50 int max_buffers = INT_MAX;
51 bool allow_internal_sort = true;
52
53 static int compare_record (const struct ccase *, const struct ccase *,
54                            const struct sort_criteria *);
55 static struct casefile *do_internal_sort (struct casereader *,
56                                           const struct sort_criteria *);
57 static struct casefile *do_external_sort (struct casereader *,
58                                           const struct sort_criteria *);
59
60 /* Gets ready to sort the active file, either in-place or to a
61    separate casefile. */
62 static bool
63 prepare_to_sort_active_file (void) 
64 {
65   bool ok;
66   
67   /* Cancel temporary transformations and PROCESS IF. */
68   if (temporary != 0)
69     cancel_temporary (); 
70   expr_free (process_if_expr);
71   process_if_expr = NULL;
72
73   /* Make sure source cases are in a storage source. */
74   ok = procedure (NULL, NULL);
75   assert (case_source_is_class (vfm_source, &storage_source_class));
76
77   return ok;
78 }
79
80 /* Sorts the active file in-place according to CRITERIA.
81    Returns nonzero if successful. */
82 int
83 sort_active_file_in_place (const struct sort_criteria *criteria) 
84 {
85   struct casefile *src, *dst;
86   
87   if (!prepare_to_sort_active_file ())
88     return 0;
89
90   src = storage_source_get_casefile (vfm_source);
91   dst = sort_execute (casefile_get_destructive_reader (src), criteria);
92   free_case_source (vfm_source);
93   vfm_source = NULL;
94
95   if (dst == NULL) 
96     return 0;
97
98   vfm_source = storage_source_create (dst);
99   return 1;
100 }
101
102 /* Sorts the active file to a separate casefile.  If successful,
103    returns the sorted casefile.  Returns a null pointer on
104    failure. */
105 struct casefile *
106 sort_active_file_to_casefile (const struct sort_criteria *criteria) 
107 {
108   struct casefile *src;
109   
110   if (!prepare_to_sort_active_file ())
111     return NULL;
112
113   src = storage_source_get_casefile (vfm_source);
114   return sort_execute (casefile_get_reader (src), criteria);
115 }
116
117
118 /* Reads all the cases from READER, which is destroyed.  Sorts
119    the cases according to CRITERIA.  Returns the sorted cases in
120    a newly created casefile. */
121 struct casefile *
122 sort_execute (struct casereader *reader, const struct sort_criteria *criteria)
123 {
124   struct casefile *output = do_internal_sort (reader, criteria);
125   if (output == NULL)
126     output = do_external_sort (reader, criteria);
127   casereader_destroy (reader);
128   return output;
129 }
130 \f
131 /* A case and its index. */
132 struct indexed_case 
133   {
134     struct ccase c;     /* Case. */
135     unsigned long idx;  /* Index to allow for stable sorting. */
136   };
137
138 static int compare_indexed_cases (const void *, const void *, void *);
139
140 /* If the data is in memory, do an internal sort and return a new
141    casefile for the data.  Otherwise, return a null pointer. */
142 static struct casefile *
143 do_internal_sort (struct casereader *reader,
144                   const struct sort_criteria *criteria)
145 {
146   const struct casefile *src;
147   struct casefile *dst;
148   unsigned long case_cnt;
149
150   if (!allow_internal_sort)
151     return NULL;
152
153   src = casereader_get_casefile (reader);
154   if (casefile_get_case_cnt (src) > 1 && !casefile_in_core (src))
155     return NULL;
156       
157   case_cnt = casefile_get_case_cnt (src);
158   dst = casefile_create (casefile_get_value_cnt (src));
159   if (case_cnt != 0) 
160     {
161       struct indexed_case *cases = nmalloc (sizeof *cases, case_cnt);
162       if (cases != NULL) 
163         {
164           unsigned long i;
165           
166           for (i = 0; i < case_cnt; i++)
167             {
168               bool ok = casereader_read_xfer (reader, &cases[i].c);
169               if (!ok)
170                 abort ();
171               cases[i].idx = i;
172             }
173
174           sort (cases, case_cnt, sizeof *cases, compare_indexed_cases,
175                 (void *) criteria);
176       
177           for (i = 0; i < case_cnt; i++)
178             casefile_append_xfer (dst, &cases[i].c);
179           if (casefile_error (dst))
180             abort ();
181
182           free (cases);
183         }
184       else 
185         {
186           /* Failure. */
187           casefile_destroy (dst);
188           dst = NULL;
189         }
190     }
191
192   return dst;
193 }
194
195 /* Compares the variables specified by CRITERIA between the cases
196    at A and B, with a "last resort" comparison for stability, and
197    returns a strcmp()-type result. */
198 static int
199 compare_indexed_cases (const void *a_, const void *b_, void *criteria_)
200 {
201   struct sort_criteria *criteria = criteria_;
202   const struct indexed_case *a = a_;
203   const struct indexed_case *b = b_;
204   int result = compare_record (&a->c, &b->c, criteria);
205   if (result == 0)
206     result = a->idx < b->idx ? -1 : a->idx > b->idx;
207   return result;
208 }
209 \f
210 /* External sort. */
211
212 /* Maximum order of merge (external sort only).  The maximum
213    reasonable value is about 7.  Above that, it would be a good
214    idea to use a heap in merge_once() to select the minimum. */
215 #define MAX_MERGE_ORDER 7
216
217 /* Results of an external sort. */
218 struct external_sort 
219   {
220     const struct sort_criteria *criteria; /* Sort criteria. */
221     size_t value_cnt;                 /* Size of data in `union value's. */
222     struct casefile **runs;           /* Array of initial runs. */
223     size_t run_cnt, run_cap;          /* Number of runs, allocated capacity. */
224   };
225
226 /* Prototypes for helper functions. */
227 static int write_runs (struct external_sort *, struct casereader *);
228 static struct casefile *merge (struct external_sort *);
229 static void destroy_external_sort (struct external_sort *);
230
231 /* Performs a stable external sort of the active file according
232    to the specification in SCP.  Forms initial runs using a heap
233    as a reservoir.  Merges the initial runs according to a
234    pattern that assures stability. */
235 static struct casefile *
236 do_external_sort (struct casereader *reader,
237                   const struct sort_criteria *criteria)
238 {
239   struct external_sort *xsrt;
240
241   if (!casefile_to_disk (casereader_get_casefile (reader)))
242     return NULL;
243
244   xsrt = xmalloc (sizeof *xsrt);
245   xsrt->criteria = criteria;
246   xsrt->value_cnt = casefile_get_value_cnt (casereader_get_casefile (reader));
247   xsrt->run_cap = 512;
248   xsrt->run_cnt = 0;
249   xsrt->runs = xnmalloc (xsrt->run_cap, sizeof *xsrt->runs);
250   if (write_runs (xsrt, reader))
251     {
252       struct casefile *output = merge (xsrt);
253       destroy_external_sort (xsrt);
254       return output;
255     }
256   else
257     {
258       destroy_external_sort (xsrt);
259       return NULL;
260     }
261 }
262
263 /* Destroys XSRT. */
264 static void
265 destroy_external_sort (struct external_sort *xsrt) 
266 {
267   if (xsrt != NULL) 
268     {
269       int i;
270       
271       for (i = 0; i < xsrt->run_cnt; i++)
272         casefile_destroy (xsrt->runs[i]);
273       free (xsrt->runs);
274       free (xsrt);
275     }
276 }
277 \f
278 /* Replacement selection. */
279
280 /* Pairs a record with a run number. */
281 struct record_run
282   {
283     int run;                    /* Run number of case. */
284     struct ccase record;        /* Case data. */
285     size_t idx;                 /* Case number (for stability). */
286   };
287
288 /* Represents a set of initial runs during an external sort. */
289 struct initial_run_state 
290   {
291     struct external_sort *xsrt;
292
293     /* Reservoir. */
294     struct record_run *records; /* Records arranged as a heap. */
295     size_t record_cnt;          /* Current number of records. */
296     size_t record_cap;          /* Capacity for records. */
297     
298     /* Run currently being output. */
299     int run;                    /* Run number. */
300     size_t case_cnt;            /* Number of cases so far. */
301     struct casefile *casefile;  /* Output file. */
302     struct ccase last_output;   /* Record last output. */
303
304     int okay;                   /* Zero if an error has been encountered. */
305   };
306
307 static bool destroy_initial_run_state (struct initial_run_state *);
308 static void process_case (struct initial_run_state *, const struct ccase *,
309                           size_t);
310 static int allocate_cases (struct initial_run_state *);
311 static void output_record (struct initial_run_state *);
312 static void start_run (struct initial_run_state *);
313 static void end_run (struct initial_run_state *);
314 static int compare_record_run (const struct record_run *,
315                                const struct record_run *,
316                                struct initial_run_state *);
317 static int compare_record_run_minheap (const void *, const void *, void *);
318
319 /* Reads cases from READER and composes initial runs in XSRT. */
320 static int
321 write_runs (struct external_sort *xsrt, struct casereader *reader)
322 {
323   struct initial_run_state *irs;
324   struct ccase c;
325   size_t idx = 0;
326   int success = 0;
327
328   /* Allocate memory for cases. */
329   irs = xmalloc (sizeof *irs);
330   irs->xsrt = xsrt;
331   irs->records = NULL;
332   irs->record_cnt = irs->record_cap = 0;
333   irs->run = 0;
334   irs->case_cnt = 0;
335   irs->casefile = NULL;
336   case_nullify (&irs->last_output);
337   irs->okay = 1;
338   if (!allocate_cases (irs)) 
339     goto done;
340
341   /* Create initial runs. */
342   start_run (irs);
343   for (; irs->okay && casereader_read (reader, &c); case_destroy (&c))
344     process_case (irs, &c, idx++);
345   while (irs->okay && irs->record_cnt > 0)
346     output_record (irs);
347   end_run (irs);
348
349   success = irs->okay;
350
351  done:
352   if (!destroy_initial_run_state (irs))
353     success = false;
354
355   return success;
356 }
357
358 /* Add a single case to an initial run. */
359 static void
360 process_case (struct initial_run_state *irs, const struct ccase *c, size_t idx)
361 {
362   struct record_run *rr;
363
364   /* Compose record_run for this run and add to heap. */
365   assert (irs->record_cnt < irs->record_cap - 1);
366   rr = irs->records + irs->record_cnt++;
367   case_copy (&rr->record, 0, c, 0, irs->xsrt->value_cnt);
368   rr->run = irs->run;
369   rr->idx = idx;
370   if (!case_is_null (&irs->last_output)
371       && compare_record (c, &irs->last_output, irs->xsrt->criteria) < 0)
372     rr->run = irs->run + 1;
373   push_heap (irs->records, irs->record_cnt, sizeof *irs->records,
374              compare_record_run_minheap, irs);
375
376   /* Output a record if the reservoir is full. */
377   if (irs->record_cnt == irs->record_cap - 1 && irs->okay)
378     output_record (irs);
379 }
380
381 /* Destroys the initial run state represented by IRS.
382    Returns true if successful, false if an I/O error occurred. */
383 static bool
384 destroy_initial_run_state (struct initial_run_state *irs) 
385 {
386   int i;
387   bool ok = true;
388
389   if (irs == NULL)
390     return true;
391
392   for (i = 0; i < irs->record_cap; i++)
393     case_destroy (&irs->records[i].record);
394   free (irs->records);
395
396   if (irs->casefile != NULL)
397     ok = casefile_sleep (irs->casefile);
398
399   free (irs);
400   return ok;
401 }
402
403 /* Allocates room for lots of cases as a buffer. */
404 static int
405 allocate_cases (struct initial_run_state *irs)
406 {
407   int approx_case_cost; /* Approximate memory cost of one case in bytes. */
408   int max_cases;        /* Maximum number of cases to allocate. */
409   int i;
410
411   /* Allocate as many cases as we can within the workspace
412      limit. */
413   approx_case_cost = (sizeof *irs->records
414                       + irs->xsrt->value_cnt * sizeof (union value)
415                       + 4 * sizeof (void *));
416   max_cases = get_workspace() / approx_case_cost;
417   if (max_cases > max_buffers)
418     max_cases = max_buffers;
419   irs->records = nmalloc (sizeof *irs->records, max_cases);
420   if (irs->records != NULL)
421     for (i = 0; i < max_cases; i++)
422       if (!case_try_create (&irs->records[i].record, irs->xsrt->value_cnt))
423         {
424           max_cases = i;
425           break;
426         }
427   irs->record_cap = max_cases;
428
429   /* Fail if we didn't allocate an acceptable number of cases. */
430   if (irs->records == NULL || max_cases < min_buffers)
431     {
432       msg (SE, _("Out of memory.  Could not allocate room for minimum of %d "
433                  "cases of %d bytes each.  (PSPP workspace is currently "
434                  "restricted to a maximum of %d KB.)"),
435            min_buffers, approx_case_cost, get_workspace() / 1024);
436       return 0;
437     }
438   return 1;
439 }
440
441 /* Compares the VAR_CNT variables in VARS[] between the `value's at
442    A and B, and returns a strcmp()-type result. */
443 static int
444 compare_record (const struct ccase *a, const struct ccase *b,
445                 const struct sort_criteria *criteria)
446 {
447   int i;
448
449   assert (a != NULL);
450   assert (b != NULL);
451   
452   for (i = 0; i < criteria->crit_cnt; i++)
453     {
454       const struct sort_criterion *c = &criteria->crits[i];
455       int result;
456       
457       if (c->width == 0)
458         {
459           double af = case_num (a, c->fv);
460           double bf = case_num (b, c->fv);
461           
462           result = af < bf ? -1 : af > bf;
463         }
464       else
465         result = memcmp (case_str (a, c->fv), case_str (b, c->fv), c->width);
466
467       if (result != 0)
468         return c->dir == SRT_ASCEND ? result : -result;
469     }
470
471   return 0;
472 }
473
474 /* Compares record-run tuples A and B on run number first, then
475    on record, then on case index. */
476 static int
477 compare_record_run (const struct record_run *a,
478                     const struct record_run *b,
479                     struct initial_run_state *irs)
480 {
481   int result = a->run < b->run ? -1 : a->run > b->run;
482   if (result == 0)
483     result = compare_record (&a->record, &b->record, irs->xsrt->criteria);
484   if (result == 0)
485     result = a->idx < b->idx ? -1 : a->idx > b->idx;
486   return result;
487 }
488
489 /* Compares record-run tuples A and B on run number first, then
490    on the current record according to SCP, but in descending
491    order. */
492 static int
493 compare_record_run_minheap (const void *a, const void *b, void *irs) 
494 {
495   return -compare_record_run (a, b, irs);
496 }
497
498 /* Begins a new initial run, specifically its output file. */
499 static void
500 start_run (struct initial_run_state *irs)
501 {
502   irs->run++;
503   irs->case_cnt = 0;
504   irs->casefile = casefile_create (irs->xsrt->value_cnt);
505   casefile_to_disk (irs->casefile);
506   case_nullify (&irs->last_output); 
507 }
508
509 /* Ends the current initial run.  */
510 static void
511 end_run (struct initial_run_state *irs)
512 {
513   struct external_sort *xsrt = irs->xsrt;
514
515   /* Record initial run. */
516   if (irs->casefile != NULL) 
517     {
518       casefile_sleep (irs->casefile);
519       if (xsrt->run_cnt >= xsrt->run_cap) 
520         {
521           xsrt->run_cap *= 2;
522           xsrt->runs = xnrealloc (xsrt->runs,
523                                   xsrt->run_cap, sizeof *xsrt->runs);
524         }
525       xsrt->runs[xsrt->run_cnt++] = irs->casefile;
526       if (casefile_error (irs->casefile))
527         irs->okay = false;
528       irs->casefile = NULL; 
529     }
530 }
531
532 /* Writes a record to the current initial run. */
533 static void
534 output_record (struct initial_run_state *irs)
535 {
536   struct record_run *record_run;
537   struct ccase case_tmp;
538   
539   /* Extract minimum case from heap. */
540   assert (irs->record_cnt > 0);
541   pop_heap (irs->records, irs->record_cnt--, sizeof *irs->records,
542             compare_record_run_minheap, irs);
543   record_run = irs->records + irs->record_cnt;
544
545   /* Bail if an error has occurred. */
546   if (!irs->okay)
547     return;
548
549   /* Start new run if necessary. */
550   assert (record_run->run == irs->run
551           || record_run->run == irs->run + 1);
552   if (record_run->run != irs->run)
553     {
554       end_run (irs);
555       start_run (irs);
556     }
557   assert (record_run->run == irs->run);
558   irs->case_cnt++;
559
560   /* Write to disk. */
561   if (irs->casefile != NULL)
562     casefile_append (irs->casefile, &record_run->record);
563
564   /* This record becomes last_output. */
565   irs->last_output = case_tmp = record_run->record;
566   record_run->record = irs->records[irs->record_cap - 1].record;
567   irs->records[irs->record_cap - 1].record = case_tmp;
568 }
569 \f
570 /* Merging. */
571
572 static int choose_merge (struct casefile *runs[], int run_cnt, int order);
573 static struct casefile *merge_once (struct external_sort *,
574                                     struct casefile *[], size_t);
575
576 /* Repeatedly merges run until only one is left,
577    and returns the final casefile.
578    Returns a null pointer if an I/O error occurs. */
579 static struct casefile *
580 merge (struct external_sort *xsrt)
581 {
582   while (xsrt->run_cnt > 1)
583     {
584       int order = min (MAX_MERGE_ORDER, xsrt->run_cnt);
585       int idx = choose_merge (xsrt->runs, xsrt->run_cnt, order);
586       xsrt->runs[idx] = merge_once (xsrt, xsrt->runs + idx, order);
587       remove_range (xsrt->runs, xsrt->run_cnt, sizeof *xsrt->runs,
588                     idx + 1, order - 1);
589       xsrt->run_cnt -= order - 1;
590
591       if (xsrt->runs[idx] == NULL)
592         return NULL;
593     }
594   assert (xsrt->run_cnt == 1);
595   xsrt->run_cnt = 0;
596   return xsrt->runs[0];
597 }
598
599 /* Chooses ORDER runs out of the RUN_CNT runs in RUNS to merge,
600    and returns the index of the first one.
601
602    For stability, we must merge only consecutive runs.  For
603    efficiency, we choose the shortest consecutive sequence of
604    runs. */
605 static int
606 choose_merge (struct casefile *runs[], int run_cnt, int order) 
607 {
608   int min_idx, min_sum;
609   int cur_idx, cur_sum;
610   int i;
611
612   /* Sum up the length of the first ORDER runs. */
613   cur_sum = 0;
614   for (i = 0; i < order; i++)
615     cur_sum += casefile_get_case_cnt (runs[i]);
616
617   /* Find the shortest group of ORDER runs,
618      using a running total for efficiency. */
619   min_idx = 0;
620   min_sum = cur_sum;
621   for (cur_idx = 1; cur_idx + order <= run_cnt; cur_idx++)
622     {
623       cur_sum -= casefile_get_case_cnt (runs[cur_idx - 1]);
624       cur_sum += casefile_get_case_cnt (runs[cur_idx + order - 1]);
625       if (cur_sum < min_sum)
626         {
627           min_sum = cur_sum;
628           min_idx = cur_idx;
629         }
630     }
631
632   return min_idx;
633 }
634
635 /* Merges the RUN_CNT initial runs specified in INPUT_FILES into a
636    new run, and returns the new run.
637    Returns a null pointer if an I/O error occurs. */
638 static struct casefile *
639 merge_once (struct external_sort *xsrt,
640             struct casefile **const input_files,
641             size_t run_cnt)
642 {
643   struct run
644     {
645       struct casefile *file;
646       struct casereader *reader;
647       struct ccase ccase;
648     }
649   *runs;
650
651   struct casefile *output = NULL;
652   int i;
653
654   /* Open input files. */
655   runs = xnmalloc (run_cnt, sizeof *runs);
656   for (i = 0; i < run_cnt; i++) 
657     {
658       struct run *r = &runs[i];
659       r->file = input_files[i];
660       r->reader = casefile_get_destructive_reader (r->file);
661       if (!casereader_read_xfer (r->reader, &r->ccase))
662         {
663           run_cnt--;
664           i--;
665         }
666     }
667
668   /* Create output file. */
669   output = casefile_create (xsrt->value_cnt);
670   casefile_to_disk (output);
671
672   /* Merge. */
673   while (run_cnt > 0) 
674     {
675       struct run *min_run, *run;
676       
677       /* Find minimum. */
678       min_run = runs;
679       for (run = runs + 1; run < runs + run_cnt; run++)
680         if (compare_record (&run->ccase, &min_run->ccase, xsrt->criteria) < 0)
681           min_run = run;
682
683       /* Write minimum to output file. */
684       casefile_append_xfer (output, &min_run->ccase);
685
686       /* Read another case from minimum run. */
687       if (!casereader_read_xfer (min_run->reader, &min_run->ccase))
688         {
689           if (casefile_error (min_run->file) || casefile_error (output))
690             goto error;
691           casereader_destroy (min_run->reader);
692           casefile_destroy (min_run->file);
693
694           remove_element (runs, run_cnt, sizeof *runs, min_run - runs);
695           run_cnt--;
696         } 
697     }
698
699   if (!casefile_sleep (output))
700     goto error;
701   free (runs);
702
703   return output;
704
705  error:
706   for (i = 0; i < run_cnt; i++) 
707     casefile_destroy (runs[i].file);
708   casefile_destroy (output);
709   free (runs);
710   return NULL;
711 }