Add WARN_UNUSED_RESULT to procedure function prototypes
[pspp-builds.git] / src / math / sort.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21
22 #include "sort.h"
23
24 #include <errno.h>
25 #include <limits.h>
26 #include <stdbool.h>
27 #include <stdio.h>
28 #include <stdlib.h>
29
30 #include <data/case-source.h>
31 #include <data/case.h>
32 #include <data/casefile.h>
33 #include <data/procedure.h>
34 #include <data/settings.h>
35 #include <data/variable.h>
36 #include <data/storage-stream.h>
37 #include <language/expressions/public.h>
38 #include <libpspp/alloc.h>
39 #include <libpspp/array.h>
40 #include <libpspp/message.h>
41 #include <libpspp/message.h>
42 #include <libpspp/misc.h>
43 #include <libpspp/str.h>
44
45 #include "gettext.h"
46 #define _(msgid) gettext (msgid)
47
48 /* These should only be changed for testing purposes. */
49 int min_buffers = 64;
50 int max_buffers = INT_MAX;
51 bool allow_internal_sort = true;
52
53 static int compare_record (const struct ccase *, const struct ccase *,
54                            const struct sort_criteria *);
55 static struct casefile *do_internal_sort (struct casereader *,
56                                           const struct sort_criteria *);
57 static struct casefile *do_external_sort (struct casereader *,
58                                           const struct sort_criteria *);
59
60 /* Get ready to sort the active file. */
61 static void
62 prepare_to_sort_active_file (void) 
63 {
64   proc_cancel_temporary_transformations (); 
65 }
66
67 /* Sorts the active file in-place according to CRITERIA.
68    Returns nonzero if successful. */
69 int
70 sort_active_file_in_place (const struct sort_criteria *criteria) 
71 {
72   struct casefile *in, *out;
73
74   prepare_to_sort_active_file ();
75   if (!procedure (NULL, NULL))
76     return 0;
77   
78   in = proc_capture_output ();
79   out = sort_execute (casefile_get_destructive_reader (in), criteria);
80   if (out == NULL) 
81     return 0;
82
83   proc_set_source (storage_source_create (out));
84   return 1;
85 }
86
87 /* Data passed to sort_to_casefile_callback(). */
88 struct sort_to_casefile_cb_data 
89   {
90     const struct sort_criteria *criteria;
91     struct casefile *output;
92   };
93
94 /* Sorts casefile CF according to the criteria in CB_DATA. */
95 static bool
96 sort_to_casefile_callback (const struct casefile *cf, void *cb_data_) 
97 {
98   struct sort_to_casefile_cb_data *cb_data = cb_data_;
99   cb_data->output = sort_execute (casefile_get_reader (cf), cb_data->criteria);
100   return cb_data->output != NULL;
101 }
102
103 /* Sorts the active file to a separate casefile.  If successful,
104    returns the sorted casefile.  Returns a null pointer on
105    failure. */
106 struct casefile *
107 sort_active_file_to_casefile (const struct sort_criteria *criteria) 
108 {
109   struct sort_to_casefile_cb_data cb_data;
110   
111   prepare_to_sort_active_file ();
112
113   cb_data.criteria = criteria;
114   cb_data.output = NULL;
115   if (!multipass_procedure (sort_to_casefile_callback, &cb_data)) 
116     {
117       casefile_destroy (cb_data.output);
118       return NULL;
119     }
120   return cb_data.output;
121 }
122
123
124 /* Reads all the cases from READER, which is destroyed.  Sorts
125    the cases according to CRITERIA.  Returns the sorted cases in
126    a newly created casefile. */
127 struct casefile *
128 sort_execute (struct casereader *reader, const struct sort_criteria *criteria)
129 {
130   struct casefile *output = do_internal_sort (reader, criteria);
131   if (output == NULL)
132     output = do_external_sort (reader, criteria);
133   casereader_destroy (reader);
134   return output;
135 }
136 \f
137 /* A case and its index. */
138 struct indexed_case 
139   {
140     struct ccase c;     /* Case. */
141     unsigned long idx;  /* Index to allow for stable sorting. */
142   };
143
144 static int compare_indexed_cases (const void *, const void *, void *);
145
146 /* If the data is in memory, do an internal sort and return a new
147    casefile for the data.  Otherwise, return a null pointer. */
148 static struct casefile *
149 do_internal_sort (struct casereader *reader,
150                   const struct sort_criteria *criteria)
151 {
152   const struct casefile *src;
153   struct casefile *dst;
154   unsigned long case_cnt;
155
156   if (!allow_internal_sort)
157     return NULL;
158
159   src = casereader_get_casefile (reader);
160   if (casefile_get_case_cnt (src) > 1 && !casefile_in_core (src))
161     return NULL;
162       
163   case_cnt = casefile_get_case_cnt (src);
164   dst = casefile_create (casefile_get_value_cnt (src));
165   if (case_cnt != 0) 
166     {
167       struct indexed_case *cases = nmalloc (sizeof *cases, case_cnt);
168       if (cases != NULL) 
169         {
170           unsigned long i;
171           
172           for (i = 0; i < case_cnt; i++)
173             {
174               bool ok = casereader_read_xfer (reader, &cases[i].c);
175               if (!ok)
176                 abort ();
177               cases[i].idx = i;
178             }
179
180           sort (cases, case_cnt, sizeof *cases, compare_indexed_cases,
181                 (void *) criteria);
182       
183           for (i = 0; i < case_cnt; i++)
184             casefile_append_xfer (dst, &cases[i].c);
185           if (casefile_error (dst))
186             abort ();
187
188           free (cases);
189         }
190       else 
191         {
192           /* Failure. */
193           casefile_destroy (dst);
194           dst = NULL;
195         }
196     }
197
198   return dst;
199 }
200
201 /* Compares the variables specified by CRITERIA between the cases
202    at A and B, with a "last resort" comparison for stability, and
203    returns a strcmp()-type result. */
204 static int
205 compare_indexed_cases (const void *a_, const void *b_, void *criteria_)
206 {
207   struct sort_criteria *criteria = criteria_;
208   const struct indexed_case *a = a_;
209   const struct indexed_case *b = b_;
210   int result = compare_record (&a->c, &b->c, criteria);
211   if (result == 0)
212     result = a->idx < b->idx ? -1 : a->idx > b->idx;
213   return result;
214 }
215 \f
216 /* External sort. */
217
218 /* Maximum order of merge (external sort only).  The maximum
219    reasonable value is about 7.  Above that, it would be a good
220    idea to use a heap in merge_once() to select the minimum. */
221 #define MAX_MERGE_ORDER 7
222
223 /* Results of an external sort. */
224 struct external_sort 
225   {
226     const struct sort_criteria *criteria; /* Sort criteria. */
227     size_t value_cnt;                 /* Size of data in `union value's. */
228     struct casefile **runs;           /* Array of initial runs. */
229     size_t run_cnt, run_cap;          /* Number of runs, allocated capacity. */
230   };
231
232 /* Prototypes for helper functions. */
233 static int write_runs (struct external_sort *, struct casereader *);
234 static struct casefile *merge (struct external_sort *);
235 static void destroy_external_sort (struct external_sort *);
236
237 /* Performs a stable external sort of the active file according
238    to the specification in SCP.  Forms initial runs using a heap
239    as a reservoir.  Merges the initial runs according to a
240    pattern that assures stability. */
241 static struct casefile *
242 do_external_sort (struct casereader *reader,
243                   const struct sort_criteria *criteria)
244 {
245   struct external_sort *xsrt;
246
247   if (!casefile_to_disk (casereader_get_casefile (reader)))
248     return NULL;
249
250   xsrt = xmalloc (sizeof *xsrt);
251   xsrt->criteria = criteria;
252   xsrt->value_cnt = casefile_get_value_cnt (casereader_get_casefile (reader));
253   xsrt->run_cap = 512;
254   xsrt->run_cnt = 0;
255   xsrt->runs = xnmalloc (xsrt->run_cap, sizeof *xsrt->runs);
256   if (write_runs (xsrt, reader))
257     {
258       struct casefile *output = merge (xsrt);
259       destroy_external_sort (xsrt);
260       return output;
261     }
262   else
263     {
264       destroy_external_sort (xsrt);
265       return NULL;
266     }
267 }
268
269 /* Destroys XSRT. */
270 static void
271 destroy_external_sort (struct external_sort *xsrt) 
272 {
273   if (xsrt != NULL) 
274     {
275       int i;
276       
277       for (i = 0; i < xsrt->run_cnt; i++)
278         casefile_destroy (xsrt->runs[i]);
279       free (xsrt->runs);
280       free (xsrt);
281     }
282 }
283 \f
284 /* Replacement selection. */
285
286 /* Pairs a record with a run number. */
287 struct record_run
288   {
289     int run;                    /* Run number of case. */
290     struct ccase record;        /* Case data. */
291     size_t idx;                 /* Case number (for stability). */
292   };
293
294 /* Represents a set of initial runs during an external sort. */
295 struct initial_run_state 
296   {
297     struct external_sort *xsrt;
298
299     /* Reservoir. */
300     struct record_run *records; /* Records arranged as a heap. */
301     size_t record_cnt;          /* Current number of records. */
302     size_t record_cap;          /* Capacity for records. */
303     
304     /* Run currently being output. */
305     int run;                    /* Run number. */
306     size_t case_cnt;            /* Number of cases so far. */
307     struct casefile *casefile;  /* Output file. */
308     struct ccase last_output;   /* Record last output. */
309
310     int okay;                   /* Zero if an error has been encountered. */
311   };
312
313 static bool destroy_initial_run_state (struct initial_run_state *);
314 static void process_case (struct initial_run_state *, const struct ccase *,
315                           size_t);
316 static int allocate_cases (struct initial_run_state *);
317 static void output_record (struct initial_run_state *);
318 static void start_run (struct initial_run_state *);
319 static void end_run (struct initial_run_state *);
320 static int compare_record_run (const struct record_run *,
321                                const struct record_run *,
322                                struct initial_run_state *);
323 static int compare_record_run_minheap (const void *, const void *, void *);
324
325 /* Reads cases from READER and composes initial runs in XSRT. */
326 static int
327 write_runs (struct external_sort *xsrt, struct casereader *reader)
328 {
329   struct initial_run_state *irs;
330   struct ccase c;
331   size_t idx = 0;
332   int success = 0;
333
334   /* Allocate memory for cases. */
335   irs = xmalloc (sizeof *irs);
336   irs->xsrt = xsrt;
337   irs->records = NULL;
338   irs->record_cnt = irs->record_cap = 0;
339   irs->run = 0;
340   irs->case_cnt = 0;
341   irs->casefile = NULL;
342   case_nullify (&irs->last_output);
343   irs->okay = 1;
344   if (!allocate_cases (irs)) 
345     goto done;
346
347   /* Create initial runs. */
348   start_run (irs);
349   for (; irs->okay && casereader_read (reader, &c); case_destroy (&c))
350     process_case (irs, &c, idx++);
351   while (irs->okay && irs->record_cnt > 0)
352     output_record (irs);
353   end_run (irs);
354
355   success = irs->okay;
356
357  done:
358   if (!destroy_initial_run_state (irs))
359     success = false;
360
361   return success;
362 }
363
364 /* Add a single case to an initial run. */
365 static void
366 process_case (struct initial_run_state *irs, const struct ccase *c, size_t idx)
367 {
368   struct record_run *rr;
369
370   /* Compose record_run for this run and add to heap. */
371   assert (irs->record_cnt < irs->record_cap - 1);
372   rr = irs->records + irs->record_cnt++;
373   case_copy (&rr->record, 0, c, 0, irs->xsrt->value_cnt);
374   rr->run = irs->run;
375   rr->idx = idx;
376   if (!case_is_null (&irs->last_output)
377       && compare_record (c, &irs->last_output, irs->xsrt->criteria) < 0)
378     rr->run = irs->run + 1;
379   push_heap (irs->records, irs->record_cnt, sizeof *irs->records,
380              compare_record_run_minheap, irs);
381
382   /* Output a record if the reservoir is full. */
383   if (irs->record_cnt == irs->record_cap - 1 && irs->okay)
384     output_record (irs);
385 }
386
387 /* Destroys the initial run state represented by IRS.
388    Returns true if successful, false if an I/O error occurred. */
389 static bool
390 destroy_initial_run_state (struct initial_run_state *irs) 
391 {
392   int i;
393   bool ok = true;
394
395   if (irs == NULL)
396     return true;
397
398   for (i = 0; i < irs->record_cap; i++)
399     case_destroy (&irs->records[i].record);
400   free (irs->records);
401
402   if (irs->casefile != NULL)
403     ok = casefile_sleep (irs->casefile);
404
405   free (irs);
406   return ok;
407 }
408
409 /* Allocates room for lots of cases as a buffer. */
410 static int
411 allocate_cases (struct initial_run_state *irs)
412 {
413   int approx_case_cost; /* Approximate memory cost of one case in bytes. */
414   int max_cases;        /* Maximum number of cases to allocate. */
415   int i;
416
417   /* Allocate as many cases as we can within the workspace
418      limit. */
419   approx_case_cost = (sizeof *irs->records
420                       + irs->xsrt->value_cnt * sizeof (union value)
421                       + 4 * sizeof (void *));
422   max_cases = get_workspace() / approx_case_cost;
423   if (max_cases > max_buffers)
424     max_cases = max_buffers;
425   irs->records = nmalloc (sizeof *irs->records, max_cases);
426   if (irs->records != NULL)
427     for (i = 0; i < max_cases; i++)
428       if (!case_try_create (&irs->records[i].record, irs->xsrt->value_cnt))
429         {
430           max_cases = i;
431           break;
432         }
433   irs->record_cap = max_cases;
434
435   /* Fail if we didn't allocate an acceptable number of cases. */
436   if (irs->records == NULL || max_cases < min_buffers)
437     {
438       msg (SE, _("Out of memory.  Could not allocate room for minimum of %d "
439                  "cases of %d bytes each.  (PSPP workspace is currently "
440                  "restricted to a maximum of %d KB.)"),
441            min_buffers, approx_case_cost, get_workspace() / 1024);
442       return 0;
443     }
444   return 1;
445 }
446
447 /* Compares the VAR_CNT variables in VARS[] between the `value's at
448    A and B, and returns a strcmp()-type result. */
449 static int
450 compare_record (const struct ccase *a, const struct ccase *b,
451                 const struct sort_criteria *criteria)
452 {
453   int i;
454
455   assert (a != NULL);
456   assert (b != NULL);
457   
458   for (i = 0; i < criteria->crit_cnt; i++)
459     {
460       const struct sort_criterion *c = &criteria->crits[i];
461       int result;
462       
463       if (c->width == 0)
464         {
465           double af = case_num (a, c->fv);
466           double bf = case_num (b, c->fv);
467           
468           result = af < bf ? -1 : af > bf;
469         }
470       else
471         result = memcmp (case_str (a, c->fv), case_str (b, c->fv), c->width);
472
473       if (result != 0)
474         return c->dir == SRT_ASCEND ? result : -result;
475     }
476
477   return 0;
478 }
479
480 /* Compares record-run tuples A and B on run number first, then
481    on record, then on case index. */
482 static int
483 compare_record_run (const struct record_run *a,
484                     const struct record_run *b,
485                     struct initial_run_state *irs)
486 {
487   int result = a->run < b->run ? -1 : a->run > b->run;
488   if (result == 0)
489     result = compare_record (&a->record, &b->record, irs->xsrt->criteria);
490   if (result == 0)
491     result = a->idx < b->idx ? -1 : a->idx > b->idx;
492   return result;
493 }
494
495 /* Compares record-run tuples A and B on run number first, then
496    on the current record according to SCP, but in descending
497    order. */
498 static int
499 compare_record_run_minheap (const void *a, const void *b, void *irs) 
500 {
501   return -compare_record_run (a, b, irs);
502 }
503
504 /* Begins a new initial run, specifically its output file. */
505 static void
506 start_run (struct initial_run_state *irs)
507 {
508   irs->run++;
509   irs->case_cnt = 0;
510   irs->casefile = casefile_create (irs->xsrt->value_cnt);
511   casefile_to_disk (irs->casefile);
512   case_nullify (&irs->last_output); 
513 }
514
515 /* Ends the current initial run.  */
516 static void
517 end_run (struct initial_run_state *irs)
518 {
519   struct external_sort *xsrt = irs->xsrt;
520
521   /* Record initial run. */
522   if (irs->casefile != NULL) 
523     {
524       casefile_sleep (irs->casefile);
525       if (xsrt->run_cnt >= xsrt->run_cap) 
526         {
527           xsrt->run_cap *= 2;
528           xsrt->runs = xnrealloc (xsrt->runs,
529                                   xsrt->run_cap, sizeof *xsrt->runs);
530         }
531       xsrt->runs[xsrt->run_cnt++] = irs->casefile;
532       if (casefile_error (irs->casefile))
533         irs->okay = false;
534       irs->casefile = NULL; 
535     }
536 }
537
538 /* Writes a record to the current initial run. */
539 static void
540 output_record (struct initial_run_state *irs)
541 {
542   struct record_run *record_run;
543   struct ccase case_tmp;
544   
545   /* Extract minimum case from heap. */
546   assert (irs->record_cnt > 0);
547   pop_heap (irs->records, irs->record_cnt--, sizeof *irs->records,
548             compare_record_run_minheap, irs);
549   record_run = irs->records + irs->record_cnt;
550
551   /* Bail if an error has occurred. */
552   if (!irs->okay)
553     return;
554
555   /* Start new run if necessary. */
556   assert (record_run->run == irs->run
557           || record_run->run == irs->run + 1);
558   if (record_run->run != irs->run)
559     {
560       end_run (irs);
561       start_run (irs);
562     }
563   assert (record_run->run == irs->run);
564   irs->case_cnt++;
565
566   /* Write to disk. */
567   if (irs->casefile != NULL)
568     casefile_append (irs->casefile, &record_run->record);
569
570   /* This record becomes last_output. */
571   irs->last_output = case_tmp = record_run->record;
572   record_run->record = irs->records[irs->record_cap - 1].record;
573   irs->records[irs->record_cap - 1].record = case_tmp;
574 }
575 \f
576 /* Merging. */
577
578 static int choose_merge (struct casefile *runs[], int run_cnt, int order);
579 static struct casefile *merge_once (struct external_sort *,
580                                     struct casefile *[], size_t);
581
582 /* Repeatedly merges run until only one is left,
583    and returns the final casefile.
584    Returns a null pointer if an I/O error occurs. */
585 static struct casefile *
586 merge (struct external_sort *xsrt)
587 {
588   while (xsrt->run_cnt > 1)
589     {
590       int order = min (MAX_MERGE_ORDER, xsrt->run_cnt);
591       int idx = choose_merge (xsrt->runs, xsrt->run_cnt, order);
592       xsrt->runs[idx] = merge_once (xsrt, xsrt->runs + idx, order);
593       remove_range (xsrt->runs, xsrt->run_cnt, sizeof *xsrt->runs,
594                     idx + 1, order - 1);
595       xsrt->run_cnt -= order - 1;
596
597       if (xsrt->runs[idx] == NULL)
598         return NULL;
599     }
600   assert (xsrt->run_cnt == 1);
601   xsrt->run_cnt = 0;
602   return xsrt->runs[0];
603 }
604
605 /* Chooses ORDER runs out of the RUN_CNT runs in RUNS to merge,
606    and returns the index of the first one.
607
608    For stability, we must merge only consecutive runs.  For
609    efficiency, we choose the shortest consecutive sequence of
610    runs. */
611 static int
612 choose_merge (struct casefile *runs[], int run_cnt, int order) 
613 {
614   int min_idx, min_sum;
615   int cur_idx, cur_sum;
616   int i;
617
618   /* Sum up the length of the first ORDER runs. */
619   cur_sum = 0;
620   for (i = 0; i < order; i++)
621     cur_sum += casefile_get_case_cnt (runs[i]);
622
623   /* Find the shortest group of ORDER runs,
624      using a running total for efficiency. */
625   min_idx = 0;
626   min_sum = cur_sum;
627   for (cur_idx = 1; cur_idx + order <= run_cnt; cur_idx++)
628     {
629       cur_sum -= casefile_get_case_cnt (runs[cur_idx - 1]);
630       cur_sum += casefile_get_case_cnt (runs[cur_idx + order - 1]);
631       if (cur_sum < min_sum)
632         {
633           min_sum = cur_sum;
634           min_idx = cur_idx;
635         }
636     }
637
638   return min_idx;
639 }
640
641 /* Merges the RUN_CNT initial runs specified in INPUT_FILES into a
642    new run, and returns the new run.
643    Returns a null pointer if an I/O error occurs. */
644 static struct casefile *
645 merge_once (struct external_sort *xsrt,
646             struct casefile **const input_files,
647             size_t run_cnt)
648 {
649   struct run
650     {
651       struct casefile *file;
652       struct casereader *reader;
653       struct ccase ccase;
654     }
655   *runs;
656
657   struct casefile *output = NULL;
658   int i;
659
660   /* Open input files. */
661   runs = xnmalloc (run_cnt, sizeof *runs);
662   for (i = 0; i < run_cnt; i++) 
663     {
664       struct run *r = &runs[i];
665       r->file = input_files[i];
666       r->reader = casefile_get_destructive_reader (r->file);
667       if (!casereader_read_xfer (r->reader, &r->ccase))
668         {
669           run_cnt--;
670           i--;
671         }
672     }
673
674   /* Create output file. */
675   output = casefile_create (xsrt->value_cnt);
676   casefile_to_disk (output);
677
678   /* Merge. */
679   while (run_cnt > 0) 
680     {
681       struct run *min_run, *run;
682       
683       /* Find minimum. */
684       min_run = runs;
685       for (run = runs + 1; run < runs + run_cnt; run++)
686         if (compare_record (&run->ccase, &min_run->ccase, xsrt->criteria) < 0)
687           min_run = run;
688
689       /* Write minimum to output file. */
690       casefile_append_xfer (output, &min_run->ccase);
691
692       /* Read another case from minimum run. */
693       if (!casereader_read_xfer (min_run->reader, &min_run->ccase))
694         {
695           if (casefile_error (min_run->file) || casefile_error (output))
696             goto error;
697           casereader_destroy (min_run->reader);
698           casefile_destroy (min_run->file);
699
700           remove_element (runs, run_cnt, sizeof *runs, min_run - runs);
701           run_cnt--;
702         } 
703     }
704
705   if (!casefile_sleep (output))
706     goto error;
707   free (runs);
708
709   return output;
710
711  error:
712   for (i = 0; i < run_cnt; i++) 
713     casefile_destroy (runs[i].file);
714   casefile_destroy (output);
715   free (runs);
716   return NULL;
717 }