1d2257b246ded9d0f33f1aa3d42ad5ea40592ef5
[pspp-builds.git] / src / math / sort.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21
22 #include "sort.h"
23
24 #include <errno.h>
25 #include <limits.h>
26 #include <stdbool.h>
27 #include <stdio.h>
28 #include <stdlib.h>
29
30 #include <data/case-source.h>
31 #include <data/case.h>
32 #include <data/casefile.h>
33 #include <data/procedure.h>
34 #include <data/settings.h>
35 #include <data/variable.h>
36 #include <data/storage-stream.h>
37 #include <language/expressions/public.h>
38 #include <libpspp/alloc.h>
39 #include <libpspp/array.h>
40 #include <libpspp/message.h>
41 #include <libpspp/message.h>
42 #include <libpspp/misc.h>
43 #include <libpspp/str.h>
44
45 #include "gettext.h"
46 #define _(msgid) gettext (msgid)
47
48 /* These should only be changed for testing purposes. */
49 int min_buffers = 64;
50 int max_buffers = INT_MAX;
51 bool allow_internal_sort = true;
52
53 static int compare_record (const struct ccase *, const struct ccase *,
54                            const struct sort_criteria *);
55 static struct casefile *do_internal_sort (struct casereader *,
56                                           const struct sort_criteria *);
57 static struct casefile *do_external_sort (struct casereader *,
58                                           const struct sort_criteria *);
59
60 /* Get ready to sort the active file. */
61 static void
62 prepare_to_sort_active_file (void) 
63 {
64   proc_cancel_temporary_transformations (); 
65   expr_free (process_if_expr);
66   process_if_expr = NULL;
67 }
68
69 /* Sorts the active file in-place according to CRITERIA.
70    Returns nonzero if successful. */
71 int
72 sort_active_file_in_place (const struct sort_criteria *criteria) 
73 {
74   struct casefile *in, *out;
75
76   prepare_to_sort_active_file ();
77   if (!procedure (NULL, NULL))
78     return 0;
79   
80   in = proc_capture_output ();
81   out = sort_execute (casefile_get_destructive_reader (in), criteria);
82   if (out == NULL) 
83     return 0;
84
85   proc_set_source (storage_source_create (out));
86   return 1;
87 }
88
89 /* Data passed to sort_to_casefile_callback(). */
90 struct sort_to_casefile_cb_data 
91   {
92     const struct sort_criteria *criteria;
93     struct casefile *output;
94   };
95
96 /* Sorts casefile CF according to the criteria in CB_DATA. */
97 static bool
98 sort_to_casefile_callback (const struct casefile *cf, void *cb_data_) 
99 {
100   struct sort_to_casefile_cb_data *cb_data = cb_data_;
101   cb_data->output = sort_execute (casefile_get_reader (cf), cb_data->criteria);
102   return cb_data->output != NULL;
103 }
104
105 /* Sorts the active file to a separate casefile.  If successful,
106    returns the sorted casefile.  Returns a null pointer on
107    failure. */
108 struct casefile *
109 sort_active_file_to_casefile (const struct sort_criteria *criteria) 
110 {
111   struct sort_to_casefile_cb_data cb_data;
112   
113   prepare_to_sort_active_file ();
114
115   cb_data.criteria = criteria;
116   cb_data.output = NULL;
117   multipass_procedure (sort_to_casefile_callback, &cb_data);
118
119   return cb_data.output;
120 }
121
122
123 /* Reads all the cases from READER, which is destroyed.  Sorts
124    the cases according to CRITERIA.  Returns the sorted cases in
125    a newly created casefile. */
126 struct casefile *
127 sort_execute (struct casereader *reader, const struct sort_criteria *criteria)
128 {
129   struct casefile *output = do_internal_sort (reader, criteria);
130   if (output == NULL)
131     output = do_external_sort (reader, criteria);
132   casereader_destroy (reader);
133   return output;
134 }
135 \f
136 /* A case and its index. */
137 struct indexed_case 
138   {
139     struct ccase c;     /* Case. */
140     unsigned long idx;  /* Index to allow for stable sorting. */
141   };
142
143 static int compare_indexed_cases (const void *, const void *, void *);
144
145 /* If the data is in memory, do an internal sort and return a new
146    casefile for the data.  Otherwise, return a null pointer. */
147 static struct casefile *
148 do_internal_sort (struct casereader *reader,
149                   const struct sort_criteria *criteria)
150 {
151   const struct casefile *src;
152   struct casefile *dst;
153   unsigned long case_cnt;
154
155   if (!allow_internal_sort)
156     return NULL;
157
158   src = casereader_get_casefile (reader);
159   if (casefile_get_case_cnt (src) > 1 && !casefile_in_core (src))
160     return NULL;
161       
162   case_cnt = casefile_get_case_cnt (src);
163   dst = casefile_create (casefile_get_value_cnt (src));
164   if (case_cnt != 0) 
165     {
166       struct indexed_case *cases = nmalloc (sizeof *cases, case_cnt);
167       if (cases != NULL) 
168         {
169           unsigned long i;
170           
171           for (i = 0; i < case_cnt; i++)
172             {
173               bool ok = casereader_read_xfer (reader, &cases[i].c);
174               if (!ok)
175                 abort ();
176               cases[i].idx = i;
177             }
178
179           sort (cases, case_cnt, sizeof *cases, compare_indexed_cases,
180                 (void *) criteria);
181       
182           for (i = 0; i < case_cnt; i++)
183             casefile_append_xfer (dst, &cases[i].c);
184           if (casefile_error (dst))
185             abort ();
186
187           free (cases);
188         }
189       else 
190         {
191           /* Failure. */
192           casefile_destroy (dst);
193           dst = NULL;
194         }
195     }
196
197   return dst;
198 }
199
200 /* Compares the variables specified by CRITERIA between the cases
201    at A and B, with a "last resort" comparison for stability, and
202    returns a strcmp()-type result. */
203 static int
204 compare_indexed_cases (const void *a_, const void *b_, void *criteria_)
205 {
206   struct sort_criteria *criteria = criteria_;
207   const struct indexed_case *a = a_;
208   const struct indexed_case *b = b_;
209   int result = compare_record (&a->c, &b->c, criteria);
210   if (result == 0)
211     result = a->idx < b->idx ? -1 : a->idx > b->idx;
212   return result;
213 }
214 \f
215 /* External sort. */
216
217 /* Maximum order of merge (external sort only).  The maximum
218    reasonable value is about 7.  Above that, it would be a good
219    idea to use a heap in merge_once() to select the minimum. */
220 #define MAX_MERGE_ORDER 7
221
222 /* Results of an external sort. */
223 struct external_sort 
224   {
225     const struct sort_criteria *criteria; /* Sort criteria. */
226     size_t value_cnt;                 /* Size of data in `union value's. */
227     struct casefile **runs;           /* Array of initial runs. */
228     size_t run_cnt, run_cap;          /* Number of runs, allocated capacity. */
229   };
230
231 /* Prototypes for helper functions. */
232 static int write_runs (struct external_sort *, struct casereader *);
233 static struct casefile *merge (struct external_sort *);
234 static void destroy_external_sort (struct external_sort *);
235
236 /* Performs a stable external sort of the active file according
237    to the specification in SCP.  Forms initial runs using a heap
238    as a reservoir.  Merges the initial runs according to a
239    pattern that assures stability. */
240 static struct casefile *
241 do_external_sort (struct casereader *reader,
242                   const struct sort_criteria *criteria)
243 {
244   struct external_sort *xsrt;
245
246   if (!casefile_to_disk (casereader_get_casefile (reader)))
247     return NULL;
248
249   xsrt = xmalloc (sizeof *xsrt);
250   xsrt->criteria = criteria;
251   xsrt->value_cnt = casefile_get_value_cnt (casereader_get_casefile (reader));
252   xsrt->run_cap = 512;
253   xsrt->run_cnt = 0;
254   xsrt->runs = xnmalloc (xsrt->run_cap, sizeof *xsrt->runs);
255   if (write_runs (xsrt, reader))
256     {
257       struct casefile *output = merge (xsrt);
258       destroy_external_sort (xsrt);
259       return output;
260     }
261   else
262     {
263       destroy_external_sort (xsrt);
264       return NULL;
265     }
266 }
267
268 /* Destroys XSRT. */
269 static void
270 destroy_external_sort (struct external_sort *xsrt) 
271 {
272   if (xsrt != NULL) 
273     {
274       int i;
275       
276       for (i = 0; i < xsrt->run_cnt; i++)
277         casefile_destroy (xsrt->runs[i]);
278       free (xsrt->runs);
279       free (xsrt);
280     }
281 }
282 \f
283 /* Replacement selection. */
284
285 /* Pairs a record with a run number. */
286 struct record_run
287   {
288     int run;                    /* Run number of case. */
289     struct ccase record;        /* Case data. */
290     size_t idx;                 /* Case number (for stability). */
291   };
292
293 /* Represents a set of initial runs during an external sort. */
294 struct initial_run_state 
295   {
296     struct external_sort *xsrt;
297
298     /* Reservoir. */
299     struct record_run *records; /* Records arranged as a heap. */
300     size_t record_cnt;          /* Current number of records. */
301     size_t record_cap;          /* Capacity for records. */
302     
303     /* Run currently being output. */
304     int run;                    /* Run number. */
305     size_t case_cnt;            /* Number of cases so far. */
306     struct casefile *casefile;  /* Output file. */
307     struct ccase last_output;   /* Record last output. */
308
309     int okay;                   /* Zero if an error has been encountered. */
310   };
311
312 static bool destroy_initial_run_state (struct initial_run_state *);
313 static void process_case (struct initial_run_state *, const struct ccase *,
314                           size_t);
315 static int allocate_cases (struct initial_run_state *);
316 static void output_record (struct initial_run_state *);
317 static void start_run (struct initial_run_state *);
318 static void end_run (struct initial_run_state *);
319 static int compare_record_run (const struct record_run *,
320                                const struct record_run *,
321                                struct initial_run_state *);
322 static int compare_record_run_minheap (const void *, const void *, void *);
323
324 /* Reads cases from READER and composes initial runs in XSRT. */
325 static int
326 write_runs (struct external_sort *xsrt, struct casereader *reader)
327 {
328   struct initial_run_state *irs;
329   struct ccase c;
330   size_t idx = 0;
331   int success = 0;
332
333   /* Allocate memory for cases. */
334   irs = xmalloc (sizeof *irs);
335   irs->xsrt = xsrt;
336   irs->records = NULL;
337   irs->record_cnt = irs->record_cap = 0;
338   irs->run = 0;
339   irs->case_cnt = 0;
340   irs->casefile = NULL;
341   case_nullify (&irs->last_output);
342   irs->okay = 1;
343   if (!allocate_cases (irs)) 
344     goto done;
345
346   /* Create initial runs. */
347   start_run (irs);
348   for (; irs->okay && casereader_read (reader, &c); case_destroy (&c))
349     process_case (irs, &c, idx++);
350   while (irs->okay && irs->record_cnt > 0)
351     output_record (irs);
352   end_run (irs);
353
354   success = irs->okay;
355
356  done:
357   if (!destroy_initial_run_state (irs))
358     success = false;
359
360   return success;
361 }
362
363 /* Add a single case to an initial run. */
364 static void
365 process_case (struct initial_run_state *irs, const struct ccase *c, size_t idx)
366 {
367   struct record_run *rr;
368
369   /* Compose record_run for this run and add to heap. */
370   assert (irs->record_cnt < irs->record_cap - 1);
371   rr = irs->records + irs->record_cnt++;
372   case_copy (&rr->record, 0, c, 0, irs->xsrt->value_cnt);
373   rr->run = irs->run;
374   rr->idx = idx;
375   if (!case_is_null (&irs->last_output)
376       && compare_record (c, &irs->last_output, irs->xsrt->criteria) < 0)
377     rr->run = irs->run + 1;
378   push_heap (irs->records, irs->record_cnt, sizeof *irs->records,
379              compare_record_run_minheap, irs);
380
381   /* Output a record if the reservoir is full. */
382   if (irs->record_cnt == irs->record_cap - 1 && irs->okay)
383     output_record (irs);
384 }
385
386 /* Destroys the initial run state represented by IRS.
387    Returns true if successful, false if an I/O error occurred. */
388 static bool
389 destroy_initial_run_state (struct initial_run_state *irs) 
390 {
391   int i;
392   bool ok = true;
393
394   if (irs == NULL)
395     return true;
396
397   for (i = 0; i < irs->record_cap; i++)
398     case_destroy (&irs->records[i].record);
399   free (irs->records);
400
401   if (irs->casefile != NULL)
402     ok = casefile_sleep (irs->casefile);
403
404   free (irs);
405   return ok;
406 }
407
408 /* Allocates room for lots of cases as a buffer. */
409 static int
410 allocate_cases (struct initial_run_state *irs)
411 {
412   int approx_case_cost; /* Approximate memory cost of one case in bytes. */
413   int max_cases;        /* Maximum number of cases to allocate. */
414   int i;
415
416   /* Allocate as many cases as we can within the workspace
417      limit. */
418   approx_case_cost = (sizeof *irs->records
419                       + irs->xsrt->value_cnt * sizeof (union value)
420                       + 4 * sizeof (void *));
421   max_cases = get_workspace() / approx_case_cost;
422   if (max_cases > max_buffers)
423     max_cases = max_buffers;
424   irs->records = nmalloc (sizeof *irs->records, max_cases);
425   if (irs->records != NULL)
426     for (i = 0; i < max_cases; i++)
427       if (!case_try_create (&irs->records[i].record, irs->xsrt->value_cnt))
428         {
429           max_cases = i;
430           break;
431         }
432   irs->record_cap = max_cases;
433
434   /* Fail if we didn't allocate an acceptable number of cases. */
435   if (irs->records == NULL || max_cases < min_buffers)
436     {
437       msg (SE, _("Out of memory.  Could not allocate room for minimum of %d "
438                  "cases of %d bytes each.  (PSPP workspace is currently "
439                  "restricted to a maximum of %d KB.)"),
440            min_buffers, approx_case_cost, get_workspace() / 1024);
441       return 0;
442     }
443   return 1;
444 }
445
446 /* Compares the VAR_CNT variables in VARS[] between the `value's at
447    A and B, and returns a strcmp()-type result. */
448 static int
449 compare_record (const struct ccase *a, const struct ccase *b,
450                 const struct sort_criteria *criteria)
451 {
452   int i;
453
454   assert (a != NULL);
455   assert (b != NULL);
456   
457   for (i = 0; i < criteria->crit_cnt; i++)
458     {
459       const struct sort_criterion *c = &criteria->crits[i];
460       int result;
461       
462       if (c->width == 0)
463         {
464           double af = case_num (a, c->fv);
465           double bf = case_num (b, c->fv);
466           
467           result = af < bf ? -1 : af > bf;
468         }
469       else
470         result = memcmp (case_str (a, c->fv), case_str (b, c->fv), c->width);
471
472       if (result != 0)
473         return c->dir == SRT_ASCEND ? result : -result;
474     }
475
476   return 0;
477 }
478
479 /* Compares record-run tuples A and B on run number first, then
480    on record, then on case index. */
481 static int
482 compare_record_run (const struct record_run *a,
483                     const struct record_run *b,
484                     struct initial_run_state *irs)
485 {
486   int result = a->run < b->run ? -1 : a->run > b->run;
487   if (result == 0)
488     result = compare_record (&a->record, &b->record, irs->xsrt->criteria);
489   if (result == 0)
490     result = a->idx < b->idx ? -1 : a->idx > b->idx;
491   return result;
492 }
493
494 /* Compares record-run tuples A and B on run number first, then
495    on the current record according to SCP, but in descending
496    order. */
497 static int
498 compare_record_run_minheap (const void *a, const void *b, void *irs) 
499 {
500   return -compare_record_run (a, b, irs);
501 }
502
503 /* Begins a new initial run, specifically its output file. */
504 static void
505 start_run (struct initial_run_state *irs)
506 {
507   irs->run++;
508   irs->case_cnt = 0;
509   irs->casefile = casefile_create (irs->xsrt->value_cnt);
510   casefile_to_disk (irs->casefile);
511   case_nullify (&irs->last_output); 
512 }
513
514 /* Ends the current initial run.  */
515 static void
516 end_run (struct initial_run_state *irs)
517 {
518   struct external_sort *xsrt = irs->xsrt;
519
520   /* Record initial run. */
521   if (irs->casefile != NULL) 
522     {
523       casefile_sleep (irs->casefile);
524       if (xsrt->run_cnt >= xsrt->run_cap) 
525         {
526           xsrt->run_cap *= 2;
527           xsrt->runs = xnrealloc (xsrt->runs,
528                                   xsrt->run_cap, sizeof *xsrt->runs);
529         }
530       xsrt->runs[xsrt->run_cnt++] = irs->casefile;
531       if (casefile_error (irs->casefile))
532         irs->okay = false;
533       irs->casefile = NULL; 
534     }
535 }
536
537 /* Writes a record to the current initial run. */
538 static void
539 output_record (struct initial_run_state *irs)
540 {
541   struct record_run *record_run;
542   struct ccase case_tmp;
543   
544   /* Extract minimum case from heap. */
545   assert (irs->record_cnt > 0);
546   pop_heap (irs->records, irs->record_cnt--, sizeof *irs->records,
547             compare_record_run_minheap, irs);
548   record_run = irs->records + irs->record_cnt;
549
550   /* Bail if an error has occurred. */
551   if (!irs->okay)
552     return;
553
554   /* Start new run if necessary. */
555   assert (record_run->run == irs->run
556           || record_run->run == irs->run + 1);
557   if (record_run->run != irs->run)
558     {
559       end_run (irs);
560       start_run (irs);
561     }
562   assert (record_run->run == irs->run);
563   irs->case_cnt++;
564
565   /* Write to disk. */
566   if (irs->casefile != NULL)
567     casefile_append (irs->casefile, &record_run->record);
568
569   /* This record becomes last_output. */
570   irs->last_output = case_tmp = record_run->record;
571   record_run->record = irs->records[irs->record_cap - 1].record;
572   irs->records[irs->record_cap - 1].record = case_tmp;
573 }
574 \f
575 /* Merging. */
576
577 static int choose_merge (struct casefile *runs[], int run_cnt, int order);
578 static struct casefile *merge_once (struct external_sort *,
579                                     struct casefile *[], size_t);
580
581 /* Repeatedly merges run until only one is left,
582    and returns the final casefile.
583    Returns a null pointer if an I/O error occurs. */
584 static struct casefile *
585 merge (struct external_sort *xsrt)
586 {
587   while (xsrt->run_cnt > 1)
588     {
589       int order = min (MAX_MERGE_ORDER, xsrt->run_cnt);
590       int idx = choose_merge (xsrt->runs, xsrt->run_cnt, order);
591       xsrt->runs[idx] = merge_once (xsrt, xsrt->runs + idx, order);
592       remove_range (xsrt->runs, xsrt->run_cnt, sizeof *xsrt->runs,
593                     idx + 1, order - 1);
594       xsrt->run_cnt -= order - 1;
595
596       if (xsrt->runs[idx] == NULL)
597         return NULL;
598     }
599   assert (xsrt->run_cnt == 1);
600   xsrt->run_cnt = 0;
601   return xsrt->runs[0];
602 }
603
604 /* Chooses ORDER runs out of the RUN_CNT runs in RUNS to merge,
605    and returns the index of the first one.
606
607    For stability, we must merge only consecutive runs.  For
608    efficiency, we choose the shortest consecutive sequence of
609    runs. */
610 static int
611 choose_merge (struct casefile *runs[], int run_cnt, int order) 
612 {
613   int min_idx, min_sum;
614   int cur_idx, cur_sum;
615   int i;
616
617   /* Sum up the length of the first ORDER runs. */
618   cur_sum = 0;
619   for (i = 0; i < order; i++)
620     cur_sum += casefile_get_case_cnt (runs[i]);
621
622   /* Find the shortest group of ORDER runs,
623      using a running total for efficiency. */
624   min_idx = 0;
625   min_sum = cur_sum;
626   for (cur_idx = 1; cur_idx + order <= run_cnt; cur_idx++)
627     {
628       cur_sum -= casefile_get_case_cnt (runs[cur_idx - 1]);
629       cur_sum += casefile_get_case_cnt (runs[cur_idx + order - 1]);
630       if (cur_sum < min_sum)
631         {
632           min_sum = cur_sum;
633           min_idx = cur_idx;
634         }
635     }
636
637   return min_idx;
638 }
639
640 /* Merges the RUN_CNT initial runs specified in INPUT_FILES into a
641    new run, and returns the new run.
642    Returns a null pointer if an I/O error occurs. */
643 static struct casefile *
644 merge_once (struct external_sort *xsrt,
645             struct casefile **const input_files,
646             size_t run_cnt)
647 {
648   struct run
649     {
650       struct casefile *file;
651       struct casereader *reader;
652       struct ccase ccase;
653     }
654   *runs;
655
656   struct casefile *output = NULL;
657   int i;
658
659   /* Open input files. */
660   runs = xnmalloc (run_cnt, sizeof *runs);
661   for (i = 0; i < run_cnt; i++) 
662     {
663       struct run *r = &runs[i];
664       r->file = input_files[i];
665       r->reader = casefile_get_destructive_reader (r->file);
666       if (!casereader_read_xfer (r->reader, &r->ccase))
667         {
668           run_cnt--;
669           i--;
670         }
671     }
672
673   /* Create output file. */
674   output = casefile_create (xsrt->value_cnt);
675   casefile_to_disk (output);
676
677   /* Merge. */
678   while (run_cnt > 0) 
679     {
680       struct run *min_run, *run;
681       
682       /* Find minimum. */
683       min_run = runs;
684       for (run = runs + 1; run < runs + run_cnt; run++)
685         if (compare_record (&run->ccase, &min_run->ccase, xsrt->criteria) < 0)
686           min_run = run;
687
688       /* Write minimum to output file. */
689       casefile_append_xfer (output, &min_run->ccase);
690
691       /* Read another case from minimum run. */
692       if (!casereader_read_xfer (min_run->reader, &min_run->ccase))
693         {
694           if (casefile_error (min_run->file) || casefile_error (output))
695             goto error;
696           casereader_destroy (min_run->reader);
697           casefile_destroy (min_run->file);
698
699           remove_element (runs, run_cnt, sizeof *runs, min_run - runs);
700           run_cnt--;
701         } 
702     }
703
704   if (!casefile_sleep (output))
705     goto error;
706   free (runs);
707
708   return output;
709
710  error:
711   for (i = 0; i < run_cnt; i++) 
712     casefile_destroy (runs[i].file);
713   casefile_destroy (output);
714   free (runs);
715   return NULL;
716 }