3ce5da5707cb2ab1d3508eadf3c48a7ad5b6f0a2
[pspp-builds.git] / src / math / sort.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21
22 #include "sort.h"
23
24 #include <errno.h>
25 #include <limits.h>
26 #include <stdbool.h>
27 #include <stdio.h>
28 #include <stdlib.h>
29
30 #include <data/case-source.h>
31 #include <data/case.h>
32 #include <data/casefile.h>
33 #include <data/procedure.h>
34 #include <data/settings.h>
35 #include <data/variable.h>
36 #include <data/storage-stream.h>
37 #include <language/expressions/public.h>
38 #include <libpspp/alloc.h>
39 #include <libpspp/array.h>
40 #include <libpspp/assertion.h>
41 #include <libpspp/message.h>
42 #include <libpspp/message.h>
43 #include <libpspp/misc.h>
44 #include <libpspp/str.h>
45
46 #include "gettext.h"
47 #define _(msgid) gettext (msgid)
48
49 /* These should only be changed for testing purposes. */
50 int min_buffers = 64;
51 int max_buffers = INT_MAX;
52 bool allow_internal_sort = true;
53
54 static int compare_record (const struct ccase *, const struct ccase *,
55                            const struct sort_criteria *);
56 static struct casefile *do_internal_sort (struct casereader *,
57                                           const struct sort_criteria *);
58 static struct casefile *do_external_sort (struct casereader *,
59                                           const struct sort_criteria *);
60
61 /* Get ready to sort the active file. */
62 static void
63 prepare_to_sort_active_file (void) 
64 {
65   proc_cancel_temporary_transformations (); 
66 }
67
68 /* Sorts the active file in-place according to CRITERIA.
69    Returns nonzero if successful. */
70 int
71 sort_active_file_in_place (const struct sort_criteria *criteria) 
72 {
73   struct casefile *in, *out;
74
75   prepare_to_sort_active_file ();
76   if (!procedure (NULL, NULL))
77     return 0;
78   
79   in = proc_capture_output ();
80   out = sort_execute (casefile_get_destructive_reader (in), criteria);
81   if (out == NULL) 
82     return 0;
83
84   proc_set_source (storage_source_create (out));
85   return 1;
86 }
87
88 /* Data passed to sort_to_casefile_callback(). */
89 struct sort_to_casefile_cb_data 
90   {
91     const struct sort_criteria *criteria;
92     struct casefile *output;
93   };
94
95 /* Sorts casefile CF according to the criteria in CB_DATA. */
96 static bool
97 sort_to_casefile_callback (const struct casefile *cf, void *cb_data_) 
98 {
99   struct sort_to_casefile_cb_data *cb_data = cb_data_;
100   cb_data->output = sort_execute (casefile_get_reader (cf), cb_data->criteria);
101   return cb_data->output != NULL;
102 }
103
104 /* Sorts the active file to a separate casefile.  If successful,
105    returns the sorted casefile.  Returns a null pointer on
106    failure. */
107 struct casefile *
108 sort_active_file_to_casefile (const struct sort_criteria *criteria) 
109 {
110   struct sort_to_casefile_cb_data cb_data;
111   
112   prepare_to_sort_active_file ();
113
114   cb_data.criteria = criteria;
115   cb_data.output = NULL;
116   if (!multipass_procedure (sort_to_casefile_callback, &cb_data)) 
117     {
118       casefile_destroy (cb_data.output);
119       return NULL;
120     }
121   return cb_data.output;
122 }
123
124
125 /* Reads all the cases from READER, which is destroyed.  Sorts
126    the cases according to CRITERIA.  Returns the sorted cases in
127    a newly created casefile. */
128 struct casefile *
129 sort_execute (struct casereader *reader, const struct sort_criteria *criteria)
130 {
131   struct casefile *output = do_internal_sort (reader, criteria);
132   if (output == NULL)
133     output = do_external_sort (reader, criteria);
134   casereader_destroy (reader);
135   return output;
136 }
137 \f
138 /* A case and its index. */
139 struct indexed_case 
140   {
141     struct ccase c;     /* Case. */
142     unsigned long idx;  /* Index to allow for stable sorting. */
143   };
144
145 static int compare_indexed_cases (const void *, const void *, void *);
146
147 /* If the data is in memory, do an internal sort and return a new
148    casefile for the data.  Otherwise, return a null pointer. */
149 static struct casefile *
150 do_internal_sort (struct casereader *reader,
151                   const struct sort_criteria *criteria)
152 {
153   const struct casefile *src;
154   struct casefile *dst;
155   unsigned long case_cnt;
156
157   if (!allow_internal_sort)
158     return NULL;
159
160   src = casereader_get_casefile (reader);
161   if (casefile_get_case_cnt (src) > 1 && !casefile_in_core (src))
162     return NULL;
163       
164   case_cnt = casefile_get_case_cnt (src);
165   dst = casefile_create (casefile_get_value_cnt (src));
166   if (case_cnt != 0) 
167     {
168       struct indexed_case *cases = nmalloc (sizeof *cases, case_cnt);
169       if (cases != NULL) 
170         {
171           unsigned long i;
172           
173           for (i = 0; i < case_cnt; i++)
174             {
175               bool ok = casereader_read_xfer (reader, &cases[i].c);
176               if (!ok)
177                 NOT_REACHED ();
178               cases[i].idx = i;
179             }
180
181           sort (cases, case_cnt, sizeof *cases, compare_indexed_cases,
182                 (void *) criteria);
183       
184           for (i = 0; i < case_cnt; i++)
185             casefile_append_xfer (dst, &cases[i].c);
186           if (casefile_error (dst))
187             NOT_REACHED ();
188
189           free (cases);
190         }
191       else 
192         {
193           /* Failure. */
194           casefile_destroy (dst);
195           dst = NULL;
196         }
197     }
198
199   return dst;
200 }
201
202 /* Compares the variables specified by CRITERIA between the cases
203    at A and B, with a "last resort" comparison for stability, and
204    returns a strcmp()-type result. */
205 static int
206 compare_indexed_cases (const void *a_, const void *b_, void *criteria_)
207 {
208   struct sort_criteria *criteria = criteria_;
209   const struct indexed_case *a = a_;
210   const struct indexed_case *b = b_;
211   int result = compare_record (&a->c, &b->c, criteria);
212   if (result == 0)
213     result = a->idx < b->idx ? -1 : a->idx > b->idx;
214   return result;
215 }
216 \f
217 /* External sort. */
218
219 /* Maximum order of merge (external sort only).  The maximum
220    reasonable value is about 7.  Above that, it would be a good
221    idea to use a heap in merge_once() to select the minimum. */
222 #define MAX_MERGE_ORDER 7
223
224 /* Results of an external sort. */
225 struct external_sort 
226   {
227     const struct sort_criteria *criteria; /* Sort criteria. */
228     size_t value_cnt;                 /* Size of data in `union value's. */
229     struct casefile **runs;           /* Array of initial runs. */
230     size_t run_cnt, run_cap;          /* Number of runs, allocated capacity. */
231   };
232
233 /* Prototypes for helper functions. */
234 static int write_runs (struct external_sort *, struct casereader *);
235 static struct casefile *merge (struct external_sort *);
236 static void destroy_external_sort (struct external_sort *);
237
238 /* Performs a stable external sort of the active file according
239    to the specification in SCP.  Forms initial runs using a heap
240    as a reservoir.  Merges the initial runs according to a
241    pattern that assures stability. */
242 static struct casefile *
243 do_external_sort (struct casereader *reader,
244                   const struct sort_criteria *criteria)
245 {
246   struct external_sort *xsrt;
247
248   if (!casefile_to_disk (casereader_get_casefile (reader)))
249     return NULL;
250
251   xsrt = xmalloc (sizeof *xsrt);
252   xsrt->criteria = criteria;
253   xsrt->value_cnt = casefile_get_value_cnt (casereader_get_casefile (reader));
254   xsrt->run_cap = 512;
255   xsrt->run_cnt = 0;
256   xsrt->runs = xnmalloc (xsrt->run_cap, sizeof *xsrt->runs);
257   if (write_runs (xsrt, reader))
258     {
259       struct casefile *output = merge (xsrt);
260       destroy_external_sort (xsrt);
261       return output;
262     }
263   else
264     {
265       destroy_external_sort (xsrt);
266       return NULL;
267     }
268 }
269
270 /* Destroys XSRT. */
271 static void
272 destroy_external_sort (struct external_sort *xsrt) 
273 {
274   if (xsrt != NULL) 
275     {
276       int i;
277       
278       for (i = 0; i < xsrt->run_cnt; i++)
279         casefile_destroy (xsrt->runs[i]);
280       free (xsrt->runs);
281       free (xsrt);
282     }
283 }
284 \f
285 /* Replacement selection. */
286
287 /* Pairs a record with a run number. */
288 struct record_run
289   {
290     int run;                    /* Run number of case. */
291     struct ccase record;        /* Case data. */
292     size_t idx;                 /* Case number (for stability). */
293   };
294
295 /* Represents a set of initial runs during an external sort. */
296 struct initial_run_state 
297   {
298     struct external_sort *xsrt;
299
300     /* Reservoir. */
301     struct record_run *records; /* Records arranged as a heap. */
302     size_t record_cnt;          /* Current number of records. */
303     size_t record_cap;          /* Capacity for records. */
304     
305     /* Run currently being output. */
306     int run;                    /* Run number. */
307     size_t case_cnt;            /* Number of cases so far. */
308     struct casefile *casefile;  /* Output file. */
309     struct ccase last_output;   /* Record last output. */
310
311     int okay;                   /* Zero if an error has been encountered. */
312   };
313
314 static bool destroy_initial_run_state (struct initial_run_state *);
315 static void process_case (struct initial_run_state *, const struct ccase *,
316                           size_t);
317 static int allocate_cases (struct initial_run_state *);
318 static void output_record (struct initial_run_state *);
319 static void start_run (struct initial_run_state *);
320 static void end_run (struct initial_run_state *);
321 static int compare_record_run (const struct record_run *,
322                                const struct record_run *,
323                                struct initial_run_state *);
324 static int compare_record_run_minheap (const void *, const void *, void *);
325
326 /* Reads cases from READER and composes initial runs in XSRT. */
327 static int
328 write_runs (struct external_sort *xsrt, struct casereader *reader)
329 {
330   struct initial_run_state *irs;
331   struct ccase c;
332   size_t idx = 0;
333   int success = 0;
334
335   /* Allocate memory for cases. */
336   irs = xmalloc (sizeof *irs);
337   irs->xsrt = xsrt;
338   irs->records = NULL;
339   irs->record_cnt = irs->record_cap = 0;
340   irs->run = 0;
341   irs->case_cnt = 0;
342   irs->casefile = NULL;
343   case_nullify (&irs->last_output);
344   irs->okay = 1;
345   if (!allocate_cases (irs)) 
346     goto done;
347
348   /* Create initial runs. */
349   start_run (irs);
350   for (; irs->okay && casereader_read (reader, &c); case_destroy (&c))
351     process_case (irs, &c, idx++);
352   while (irs->okay && irs->record_cnt > 0)
353     output_record (irs);
354   end_run (irs);
355
356   success = irs->okay;
357
358  done:
359   if (!destroy_initial_run_state (irs))
360     success = false;
361
362   return success;
363 }
364
365 /* Add a single case to an initial run. */
366 static void
367 process_case (struct initial_run_state *irs, const struct ccase *c, size_t idx)
368 {
369   struct record_run *rr;
370
371   /* Compose record_run for this run and add to heap. */
372   assert (irs->record_cnt < irs->record_cap - 1);
373   rr = irs->records + irs->record_cnt++;
374   case_copy (&rr->record, 0, c, 0, irs->xsrt->value_cnt);
375   rr->run = irs->run;
376   rr->idx = idx;
377   if (!case_is_null (&irs->last_output)
378       && compare_record (c, &irs->last_output, irs->xsrt->criteria) < 0)
379     rr->run = irs->run + 1;
380   push_heap (irs->records, irs->record_cnt, sizeof *irs->records,
381              compare_record_run_minheap, irs);
382
383   /* Output a record if the reservoir is full. */
384   if (irs->record_cnt == irs->record_cap - 1 && irs->okay)
385     output_record (irs);
386 }
387
388 /* Destroys the initial run state represented by IRS.
389    Returns true if successful, false if an I/O error occurred. */
390 static bool
391 destroy_initial_run_state (struct initial_run_state *irs) 
392 {
393   int i;
394   bool ok = true;
395
396   if (irs == NULL)
397     return true;
398
399   for (i = 0; i < irs->record_cap; i++)
400     case_destroy (&irs->records[i].record);
401   free (irs->records);
402
403   if (irs->casefile != NULL)
404     ok = casefile_sleep (irs->casefile);
405
406   free (irs);
407   return ok;
408 }
409
410 /* Allocates room for lots of cases as a buffer. */
411 static int
412 allocate_cases (struct initial_run_state *irs)
413 {
414   int approx_case_cost; /* Approximate memory cost of one case in bytes. */
415   int max_cases;        /* Maximum number of cases to allocate. */
416   int i;
417
418   /* Allocate as many cases as we can within the workspace
419      limit. */
420   approx_case_cost = (sizeof *irs->records
421                       + irs->xsrt->value_cnt * sizeof (union value)
422                       + 4 * sizeof (void *));
423   max_cases = get_workspace() / approx_case_cost;
424   if (max_cases > max_buffers)
425     max_cases = max_buffers;
426   irs->records = nmalloc (sizeof *irs->records, max_cases);
427   if (irs->records != NULL)
428     for (i = 0; i < max_cases; i++)
429       if (!case_try_create (&irs->records[i].record, irs->xsrt->value_cnt))
430         {
431           max_cases = i;
432           break;
433         }
434   irs->record_cap = max_cases;
435
436   /* Fail if we didn't allocate an acceptable number of cases. */
437   if (irs->records == NULL || max_cases < min_buffers)
438     {
439       msg (SE, _("Out of memory.  Could not allocate room for minimum of %d "
440                  "cases of %d bytes each.  (PSPP workspace is currently "
441                  "restricted to a maximum of %d KB.)"),
442            min_buffers, approx_case_cost, get_workspace() / 1024);
443       return 0;
444     }
445   return 1;
446 }
447
448 /* Compares the VAR_CNT variables in VARS[] between the `value's at
449    A and B, and returns a strcmp()-type result. */
450 static int
451 compare_record (const struct ccase *a, const struct ccase *b,
452                 const struct sort_criteria *criteria)
453 {
454   int i;
455
456   assert (a != NULL);
457   assert (b != NULL);
458   
459   for (i = 0; i < criteria->crit_cnt; i++)
460     {
461       const struct sort_criterion *c = &criteria->crits[i];
462       int result;
463       
464       if (c->width == 0)
465         {
466           double af = case_num (a, c->fv);
467           double bf = case_num (b, c->fv);
468           
469           result = af < bf ? -1 : af > bf;
470         }
471       else
472         result = memcmp (case_str (a, c->fv), case_str (b, c->fv), c->width);
473
474       if (result != 0)
475         return c->dir == SRT_ASCEND ? result : -result;
476     }
477
478   return 0;
479 }
480
481 /* Compares record-run tuples A and B on run number first, then
482    on record, then on case index. */
483 static int
484 compare_record_run (const struct record_run *a,
485                     const struct record_run *b,
486                     struct initial_run_state *irs)
487 {
488   int result = a->run < b->run ? -1 : a->run > b->run;
489   if (result == 0)
490     result = compare_record (&a->record, &b->record, irs->xsrt->criteria);
491   if (result == 0)
492     result = a->idx < b->idx ? -1 : a->idx > b->idx;
493   return result;
494 }
495
496 /* Compares record-run tuples A and B on run number first, then
497    on the current record according to SCP, but in descending
498    order. */
499 static int
500 compare_record_run_minheap (const void *a, const void *b, void *irs) 
501 {
502   return -compare_record_run (a, b, irs);
503 }
504
505 /* Begins a new initial run, specifically its output file. */
506 static void
507 start_run (struct initial_run_state *irs)
508 {
509   irs->run++;
510   irs->case_cnt = 0;
511   irs->casefile = casefile_create (irs->xsrt->value_cnt);
512   casefile_to_disk (irs->casefile);
513   case_nullify (&irs->last_output); 
514 }
515
516 /* Ends the current initial run.  */
517 static void
518 end_run (struct initial_run_state *irs)
519 {
520   struct external_sort *xsrt = irs->xsrt;
521
522   /* Record initial run. */
523   if (irs->casefile != NULL) 
524     {
525       casefile_sleep (irs->casefile);
526       if (xsrt->run_cnt >= xsrt->run_cap) 
527         {
528           xsrt->run_cap *= 2;
529           xsrt->runs = xnrealloc (xsrt->runs,
530                                   xsrt->run_cap, sizeof *xsrt->runs);
531         }
532       xsrt->runs[xsrt->run_cnt++] = irs->casefile;
533       if (casefile_error (irs->casefile))
534         irs->okay = false;
535       irs->casefile = NULL; 
536     }
537 }
538
539 /* Writes a record to the current initial run. */
540 static void
541 output_record (struct initial_run_state *irs)
542 {
543   struct record_run *record_run;
544   struct ccase case_tmp;
545   
546   /* Extract minimum case from heap. */
547   assert (irs->record_cnt > 0);
548   pop_heap (irs->records, irs->record_cnt--, sizeof *irs->records,
549             compare_record_run_minheap, irs);
550   record_run = irs->records + irs->record_cnt;
551
552   /* Bail if an error has occurred. */
553   if (!irs->okay)
554     return;
555
556   /* Start new run if necessary. */
557   assert (record_run->run == irs->run
558           || record_run->run == irs->run + 1);
559   if (record_run->run != irs->run)
560     {
561       end_run (irs);
562       start_run (irs);
563     }
564   assert (record_run->run == irs->run);
565   irs->case_cnt++;
566
567   /* Write to disk. */
568   if (irs->casefile != NULL)
569     casefile_append (irs->casefile, &record_run->record);
570
571   /* This record becomes last_output. */
572   irs->last_output = case_tmp = record_run->record;
573   record_run->record = irs->records[irs->record_cap - 1].record;
574   irs->records[irs->record_cap - 1].record = case_tmp;
575 }
576 \f
577 /* Merging. */
578
579 static int choose_merge (struct casefile *runs[], int run_cnt, int order);
580 static struct casefile *merge_once (struct external_sort *,
581                                     struct casefile *[], size_t);
582
583 /* Repeatedly merges run until only one is left,
584    and returns the final casefile.
585    Returns a null pointer if an I/O error occurs. */
586 static struct casefile *
587 merge (struct external_sort *xsrt)
588 {
589   while (xsrt->run_cnt > 1)
590     {
591       int order = min (MAX_MERGE_ORDER, xsrt->run_cnt);
592       int idx = choose_merge (xsrt->runs, xsrt->run_cnt, order);
593       xsrt->runs[idx] = merge_once (xsrt, xsrt->runs + idx, order);
594       remove_range (xsrt->runs, xsrt->run_cnt, sizeof *xsrt->runs,
595                     idx + 1, order - 1);
596       xsrt->run_cnt -= order - 1;
597
598       if (xsrt->runs[idx] == NULL)
599         return NULL;
600     }
601   assert (xsrt->run_cnt == 1);
602   xsrt->run_cnt = 0;
603   return xsrt->runs[0];
604 }
605
606 /* Chooses ORDER runs out of the RUN_CNT runs in RUNS to merge,
607    and returns the index of the first one.
608
609    For stability, we must merge only consecutive runs.  For
610    efficiency, we choose the shortest consecutive sequence of
611    runs. */
612 static int
613 choose_merge (struct casefile *runs[], int run_cnt, int order) 
614 {
615   int min_idx, min_sum;
616   int cur_idx, cur_sum;
617   int i;
618
619   /* Sum up the length of the first ORDER runs. */
620   cur_sum = 0;
621   for (i = 0; i < order; i++)
622     cur_sum += casefile_get_case_cnt (runs[i]);
623
624   /* Find the shortest group of ORDER runs,
625      using a running total for efficiency. */
626   min_idx = 0;
627   min_sum = cur_sum;
628   for (cur_idx = 1; cur_idx + order <= run_cnt; cur_idx++)
629     {
630       cur_sum -= casefile_get_case_cnt (runs[cur_idx - 1]);
631       cur_sum += casefile_get_case_cnt (runs[cur_idx + order - 1]);
632       if (cur_sum < min_sum)
633         {
634           min_sum = cur_sum;
635           min_idx = cur_idx;
636         }
637     }
638
639   return min_idx;
640 }
641
642 /* Merges the RUN_CNT initial runs specified in INPUT_FILES into a
643    new run, and returns the new run.
644    Returns a null pointer if an I/O error occurs. */
645 static struct casefile *
646 merge_once (struct external_sort *xsrt,
647             struct casefile **const input_files,
648             size_t run_cnt)
649 {
650   struct run
651     {
652       struct casefile *file;
653       struct casereader *reader;
654       struct ccase ccase;
655     }
656   *runs;
657
658   struct casefile *output = NULL;
659   int i;
660
661   /* Open input files. */
662   runs = xnmalloc (run_cnt, sizeof *runs);
663   for (i = 0; i < run_cnt; i++) 
664     {
665       struct run *r = &runs[i];
666       r->file = input_files[i];
667       r->reader = casefile_get_destructive_reader (r->file);
668       if (!casereader_read_xfer (r->reader, &r->ccase))
669         {
670           run_cnt--;
671           i--;
672         }
673     }
674
675   /* Create output file. */
676   output = casefile_create (xsrt->value_cnt);
677   casefile_to_disk (output);
678
679   /* Merge. */
680   while (run_cnt > 0) 
681     {
682       struct run *min_run, *run;
683       
684       /* Find minimum. */
685       min_run = runs;
686       for (run = runs + 1; run < runs + run_cnt; run++)
687         if (compare_record (&run->ccase, &min_run->ccase, xsrt->criteria) < 0)
688           min_run = run;
689
690       /* Write minimum to output file. */
691       casefile_append_xfer (output, &min_run->ccase);
692
693       /* Read another case from minimum run. */
694       if (!casereader_read_xfer (min_run->reader, &min_run->ccase))
695         {
696           if (casefile_error (min_run->file) || casefile_error (output))
697             goto error;
698           casereader_destroy (min_run->reader);
699           casefile_destroy (min_run->file);
700
701           remove_element (runs, run_cnt, sizeof *runs, min_run - runs);
702           run_cnt--;
703         } 
704     }
705
706   if (!casefile_sleep (output))
707     goto error;
708   free (runs);
709
710   return output;
711
712  error:
713   for (i = 0; i < run_cnt; i++) 
714     casefile_destroy (runs[i].file);
715   casefile_destroy (output);
716   free (runs);
717   return NULL;
718 }