Fix some warnings.
[pspp-builds.git] / src / math / sort.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21 #include "sort.h"
22 #include <libpspp/message.h>
23 #include <libpspp/alloc.h>
24 #include <limits.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <errno.h>
28 #include <libpspp/array.h>
29 #include <stdbool.h>
30 #include <data/case.h>
31 #include <data/casefile.h>
32 #include <libpspp/message.h>
33 #include <language/expressions/public.h>
34
35 #include <libpspp/misc.h>
36 #include <data/settings.h>
37 #include <libpspp/str.h>
38 #include <data/variable.h>
39 #include <procedure.h>
40
41 #include "gettext.h"
42 #define _(msgid) gettext (msgid)
43
44 #include <libpspp/debug-print.h>
45
46 /* These should only be changed for testing purposes. */
47 int min_buffers = 64;
48 int max_buffers = INT_MAX;
49 bool allow_internal_sort = true;
50
51 static int compare_record (const struct ccase *, const struct ccase *,
52                            const struct sort_criteria *);
53 static struct casefile *do_internal_sort (struct casereader *,
54                                           const struct sort_criteria *);
55 static struct casefile *do_external_sort (struct casereader *,
56                                           const struct sort_criteria *);
57
58 /* Gets ready to sort the active file, either in-place or to a
59    separate casefile. */
60 static bool
61 prepare_to_sort_active_file (void) 
62 {
63   bool ok;
64   
65   /* Cancel temporary transformations and PROCESS IF. */
66   if (temporary != 0)
67     cancel_temporary (); 
68   expr_free (process_if_expr);
69   process_if_expr = NULL;
70
71   /* Make sure source cases are in a storage source. */
72   ok = procedure (NULL, NULL);
73   assert (case_source_is_class (vfm_source, &storage_source_class));
74
75   return ok;
76 }
77
78 /* Sorts the active file in-place according to CRITERIA.
79    Returns nonzero if successful. */
80 int
81 sort_active_file_in_place (const struct sort_criteria *criteria) 
82 {
83   struct casefile *src, *dst;
84   
85   if (!prepare_to_sort_active_file ())
86     return 0;
87
88   src = storage_source_get_casefile (vfm_source);
89   dst = sort_execute (casefile_get_destructive_reader (src), criteria);
90   free_case_source (vfm_source);
91   vfm_source = NULL;
92
93   if (dst == NULL) 
94     return 0;
95
96   vfm_source = storage_source_create (dst);
97   return 1;
98 }
99
100 /* Sorts the active file to a separate casefile.  If successful,
101    returns the sorted casefile.  Returns a null pointer on
102    failure. */
103 struct casefile *
104 sort_active_file_to_casefile (const struct sort_criteria *criteria) 
105 {
106   struct casefile *src;
107   
108   if (!prepare_to_sort_active_file ())
109     return NULL;
110
111   src = storage_source_get_casefile (vfm_source);
112   return sort_execute (casefile_get_reader (src), criteria);
113 }
114
115
116 /* Reads all the cases from READER, which is destroyed.  Sorts
117    the cases according to CRITERIA.  Returns the sorted cases in
118    a newly created casefile. */
119 struct casefile *
120 sort_execute (struct casereader *reader, const struct sort_criteria *criteria)
121 {
122   struct casefile *output = do_internal_sort (reader, criteria);
123   if (output == NULL)
124     output = do_external_sort (reader, criteria);
125   casereader_destroy (reader);
126   return output;
127 }
128 \f
129 /* A case and its index. */
130 struct indexed_case 
131   {
132     struct ccase c;     /* Case. */
133     unsigned long idx;  /* Index to allow for stable sorting. */
134   };
135
136 static int compare_indexed_cases (const void *, const void *, void *);
137
138 /* If the data is in memory, do an internal sort and return a new
139    casefile for the data.  Otherwise, return a null pointer. */
140 static struct casefile *
141 do_internal_sort (struct casereader *reader,
142                   const struct sort_criteria *criteria)
143 {
144   const struct casefile *src;
145   struct casefile *dst;
146   unsigned long case_cnt;
147
148   if (!allow_internal_sort)
149     return NULL;
150
151   src = casereader_get_casefile (reader);
152   if (casefile_get_case_cnt (src) > 1 && !casefile_in_core (src))
153     return NULL;
154       
155   case_cnt = casefile_get_case_cnt (src);
156   dst = casefile_create (casefile_get_value_cnt (src));
157   if (case_cnt != 0) 
158     {
159       struct indexed_case *cases = nmalloc (sizeof *cases, case_cnt);
160       if (cases != NULL) 
161         {
162           unsigned long i;
163           
164           for (i = 0; i < case_cnt; i++)
165             {
166               bool ok = casereader_read_xfer (reader, &cases[i].c);
167               if (!ok)
168                 abort ();
169               cases[i].idx = i;
170             }
171
172           sort (cases, case_cnt, sizeof *cases, compare_indexed_cases,
173                 (void *) criteria);
174       
175           for (i = 0; i < case_cnt; i++)
176             casefile_append_xfer (dst, &cases[i].c);
177           if (casefile_error (dst))
178             abort ();
179
180           free (cases);
181         }
182       else 
183         {
184           /* Failure. */
185           casefile_destroy (dst);
186           dst = NULL;
187         }
188     }
189
190   return dst;
191 }
192
193 /* Compares the variables specified by CRITERIA between the cases
194    at A and B, with a "last resort" comparison for stability, and
195    returns a strcmp()-type result. */
196 static int
197 compare_indexed_cases (const void *a_, const void *b_, void *criteria_)
198 {
199   struct sort_criteria *criteria = criteria_;
200   const struct indexed_case *a = a_;
201   const struct indexed_case *b = b_;
202   int result = compare_record (&a->c, &b->c, criteria);
203   if (result == 0)
204     result = a->idx < b->idx ? -1 : a->idx > b->idx;
205   return result;
206 }
207 \f
208 /* External sort. */
209
210 /* Maximum order of merge (external sort only).  The maximum
211    reasonable value is about 7.  Above that, it would be a good
212    idea to use a heap in merge_once() to select the minimum. */
213 #define MAX_MERGE_ORDER 7
214
215 /* Results of an external sort. */
216 struct external_sort 
217   {
218     const struct sort_criteria *criteria; /* Sort criteria. */
219     size_t value_cnt;                 /* Size of data in `union value's. */
220     struct casefile **runs;           /* Array of initial runs. */
221     size_t run_cnt, run_cap;          /* Number of runs, allocated capacity. */
222   };
223
224 /* Prototypes for helper functions. */
225 static int write_runs (struct external_sort *, struct casereader *);
226 static struct casefile *merge (struct external_sort *);
227 static void destroy_external_sort (struct external_sort *);
228
229 /* Performs a stable external sort of the active file according
230    to the specification in SCP.  Forms initial runs using a heap
231    as a reservoir.  Merges the initial runs according to a
232    pattern that assures stability. */
233 static struct casefile *
234 do_external_sort (struct casereader *reader,
235                   const struct sort_criteria *criteria)
236 {
237   struct external_sort *xsrt;
238
239   if (!casefile_to_disk (casereader_get_casefile (reader)))
240     return NULL;
241
242   xsrt = xmalloc (sizeof *xsrt);
243   xsrt->criteria = criteria;
244   xsrt->value_cnt = casefile_get_value_cnt (casereader_get_casefile (reader));
245   xsrt->run_cap = 512;
246   xsrt->run_cnt = 0;
247   xsrt->runs = xnmalloc (xsrt->run_cap, sizeof *xsrt->runs);
248   if (write_runs (xsrt, reader))
249     {
250       struct casefile *output = merge (xsrt);
251       destroy_external_sort (xsrt);
252       return output;
253     }
254   else
255     {
256       destroy_external_sort (xsrt);
257       return NULL;
258     }
259 }
260
261 /* Destroys XSRT. */
262 static void
263 destroy_external_sort (struct external_sort *xsrt) 
264 {
265   if (xsrt != NULL) 
266     {
267       int i;
268       
269       for (i = 0; i < xsrt->run_cnt; i++)
270         casefile_destroy (xsrt->runs[i]);
271       free (xsrt->runs);
272       free (xsrt);
273     }
274 }
275 \f
276 /* Replacement selection. */
277
278 /* Pairs a record with a run number. */
279 struct record_run
280   {
281     int run;                    /* Run number of case. */
282     struct ccase record;        /* Case data. */
283     size_t idx;                 /* Case number (for stability). */
284   };
285
286 /* Represents a set of initial runs during an external sort. */
287 struct initial_run_state 
288   {
289     struct external_sort *xsrt;
290
291     /* Reservoir. */
292     struct record_run *records; /* Records arranged as a heap. */
293     size_t record_cnt;          /* Current number of records. */
294     size_t record_cap;          /* Capacity for records. */
295     
296     /* Run currently being output. */
297     int run;                    /* Run number. */
298     size_t case_cnt;            /* Number of cases so far. */
299     struct casefile *casefile;  /* Output file. */
300     struct ccase last_output;   /* Record last output. */
301
302     int okay;                   /* Zero if an error has been encountered. */
303   };
304
305 static const struct case_sink_class sort_sink_class;
306
307 static bool destroy_initial_run_state (struct initial_run_state *);
308 static void process_case (struct initial_run_state *, const struct ccase *,
309                           size_t);
310 static int allocate_cases (struct initial_run_state *);
311 static void output_record (struct initial_run_state *);
312 static void start_run (struct initial_run_state *);
313 static void end_run (struct initial_run_state *);
314 static int compare_record_run (const struct record_run *,
315                                const struct record_run *,
316                                struct initial_run_state *);
317 static int compare_record_run_minheap (const void *, const void *, void *);
318
319 /* Reads cases from READER and composes initial runs in XSRT. */
320 static int
321 write_runs (struct external_sort *xsrt, struct casereader *reader)
322 {
323   struct initial_run_state *irs;
324   struct ccase c;
325   size_t idx = 0;
326   int success = 0;
327
328   /* Allocate memory for cases. */
329   irs = xmalloc (sizeof *irs);
330   irs->xsrt = xsrt;
331   irs->records = NULL;
332   irs->record_cnt = irs->record_cap = 0;
333   irs->run = 0;
334   irs->case_cnt = 0;
335   irs->casefile = NULL;
336   case_nullify (&irs->last_output);
337   irs->okay = 1;
338   if (!allocate_cases (irs)) 
339     goto done;
340
341   /* Create initial runs. */
342   start_run (irs);
343   for (; irs->okay && casereader_read (reader, &c); case_destroy (&c))
344     process_case (irs, &c, idx++);
345   while (irs->okay && irs->record_cnt > 0)
346     output_record (irs);
347   end_run (irs);
348
349   success = irs->okay;
350
351  done:
352   if (!destroy_initial_run_state (irs))
353     success = false;
354
355   return success;
356 }
357
358 /* Add a single case to an initial run. */
359 static void
360 process_case (struct initial_run_state *irs, const struct ccase *c, size_t idx)
361 {
362   struct record_run *rr;
363
364   /* Compose record_run for this run and add to heap. */
365   assert (irs->record_cnt < irs->record_cap - 1);
366   rr = irs->records + irs->record_cnt++;
367   case_copy (&rr->record, 0, c, 0, irs->xsrt->value_cnt);
368   rr->run = irs->run;
369   rr->idx = idx;
370   if (!case_is_null (&irs->last_output)
371       && compare_record (c, &irs->last_output, irs->xsrt->criteria) < 0)
372     rr->run = irs->run + 1;
373   push_heap (irs->records, irs->record_cnt, sizeof *irs->records,
374              compare_record_run_minheap, irs);
375
376   /* Output a record if the reservoir is full. */
377   if (irs->record_cnt == irs->record_cap - 1 && irs->okay)
378     output_record (irs);
379 }
380
381 /* Destroys the initial run state represented by IRS.
382    Returns true if successful, false if an I/O error occurred. */
383 static bool
384 destroy_initial_run_state (struct initial_run_state *irs) 
385 {
386   int i;
387   bool ok = true;
388
389   if (irs == NULL)
390     return true;
391
392   for (i = 0; i < irs->record_cap; i++)
393     case_destroy (&irs->records[i].record);
394   free (irs->records);
395
396   if (irs->casefile != NULL)
397     ok = casefile_sleep (irs->casefile);
398
399   free (irs);
400   return ok;
401 }
402
403 /* Allocates room for lots of cases as a buffer. */
404 static int
405 allocate_cases (struct initial_run_state *irs)
406 {
407   int approx_case_cost; /* Approximate memory cost of one case in bytes. */
408   int max_cases;        /* Maximum number of cases to allocate. */
409   int i;
410
411   /* Allocate as many cases as we can within the workspace
412      limit. */
413   approx_case_cost = (sizeof *irs->records
414                       + irs->xsrt->value_cnt * sizeof (union value)
415                       + 4 * sizeof (void *));
416   max_cases = get_workspace() / approx_case_cost;
417   if (max_cases > max_buffers)
418     max_cases = max_buffers;
419   irs->records = nmalloc (sizeof *irs->records, max_cases);
420   if (irs->records != NULL)
421     for (i = 0; i < max_cases; i++)
422       if (!case_try_create (&irs->records[i].record, irs->xsrt->value_cnt))
423         {
424           max_cases = i;
425           break;
426         }
427   irs->record_cap = max_cases;
428
429   /* Fail if we didn't allocate an acceptable number of cases. */
430   if (irs->records == NULL || max_cases < min_buffers)
431     {
432       msg (SE, _("Out of memory.  Could not allocate room for minimum of %d "
433                  "cases of %d bytes each.  (PSPP workspace is currently "
434                  "restricted to a maximum of %d KB.)"),
435            min_buffers, approx_case_cost, get_workspace() / 1024);
436       return 0;
437     }
438   return 1;
439 }
440
441 /* Compares the VAR_CNT variables in VARS[] between the `value's at
442    A and B, and returns a strcmp()-type result. */
443 static int
444 compare_record (const struct ccase *a, const struct ccase *b,
445                 const struct sort_criteria *criteria)
446 {
447   int i;
448
449   assert (a != NULL);
450   assert (b != NULL);
451   
452   for (i = 0; i < criteria->crit_cnt; i++)
453     {
454       const struct sort_criterion *c = &criteria->crits[i];
455       int result;
456       
457       if (c->width == 0)
458         {
459           double af = case_num (a, c->fv);
460           double bf = case_num (b, c->fv);
461           
462           result = af < bf ? -1 : af > bf;
463         }
464       else
465         result = memcmp (case_str (a, c->fv), case_str (b, c->fv), c->width);
466
467       if (result != 0)
468         return c->dir == SRT_ASCEND ? result : -result;
469     }
470
471   return 0;
472 }
473
474 /* Compares record-run tuples A and B on run number first, then
475    on record, then on case index. */
476 static int
477 compare_record_run (const struct record_run *a,
478                     const struct record_run *b,
479                     struct initial_run_state *irs)
480 {
481   int result = a->run < b->run ? -1 : a->run > b->run;
482   if (result == 0)
483     result = compare_record (&a->record, &b->record, irs->xsrt->criteria);
484   if (result == 0)
485     result = a->idx < b->idx ? -1 : a->idx > b->idx;
486   return result;
487 }
488
489 /* Compares record-run tuples A and B on run number first, then
490    on the current record according to SCP, but in descending
491    order. */
492 static int
493 compare_record_run_minheap (const void *a, const void *b, void *irs) 
494 {
495   return -compare_record_run (a, b, irs);
496 }
497
498 /* Begins a new initial run, specifically its output file. */
499 static void
500 start_run (struct initial_run_state *irs)
501 {
502   irs->run++;
503   irs->case_cnt = 0;
504   irs->casefile = casefile_create (irs->xsrt->value_cnt);
505   casefile_to_disk (irs->casefile);
506   case_nullify (&irs->last_output); 
507 }
508
509 /* Ends the current initial run.  */
510 static void
511 end_run (struct initial_run_state *irs)
512 {
513   struct external_sort *xsrt = irs->xsrt;
514
515   /* Record initial run. */
516   if (irs->casefile != NULL) 
517     {
518       casefile_sleep (irs->casefile);
519       if (xsrt->run_cnt >= xsrt->run_cap) 
520         {
521           xsrt->run_cap *= 2;
522           xsrt->runs = xnrealloc (xsrt->runs,
523                                   xsrt->run_cap, sizeof *xsrt->runs);
524         }
525       xsrt->runs[xsrt->run_cnt++] = irs->casefile;
526       if (casefile_error (irs->casefile))
527         irs->okay = false;
528       irs->casefile = NULL; 
529     }
530 }
531
532 /* Writes a record to the current initial run. */
533 static void
534 output_record (struct initial_run_state *irs)
535 {
536   struct record_run *record_run;
537   struct ccase case_tmp;
538   
539   /* Extract minimum case from heap. */
540   assert (irs->record_cnt > 0);
541   pop_heap (irs->records, irs->record_cnt--, sizeof *irs->records,
542             compare_record_run_minheap, irs);
543   record_run = irs->records + irs->record_cnt;
544
545   /* Bail if an error has occurred. */
546   if (!irs->okay)
547     return;
548
549   /* Start new run if necessary. */
550   assert (record_run->run == irs->run
551           || record_run->run == irs->run + 1);
552   if (record_run->run != irs->run)
553     {
554       end_run (irs);
555       start_run (irs);
556     }
557   assert (record_run->run == irs->run);
558   irs->case_cnt++;
559
560   /* Write to disk. */
561   if (irs->casefile != NULL)
562     casefile_append (irs->casefile, &record_run->record);
563
564   /* This record becomes last_output. */
565   irs->last_output = case_tmp = record_run->record;
566   record_run->record = irs->records[irs->record_cap - 1].record;
567   irs->records[irs->record_cap - 1].record = case_tmp;
568 }
569 \f
570 /* Merging. */
571
572 static int choose_merge (struct casefile *runs[], int run_cnt, int order);
573 static struct casefile *merge_once (struct external_sort *,
574                                     struct casefile *[], size_t);
575
576 /* Repeatedly merges run until only one is left,
577    and returns the final casefile.
578    Returns a null pointer if an I/O error occurs. */
579 static struct casefile *
580 merge (struct external_sort *xsrt)
581 {
582   while (xsrt->run_cnt > 1)
583     {
584       int order = min (MAX_MERGE_ORDER, xsrt->run_cnt);
585       int idx = choose_merge (xsrt->runs, xsrt->run_cnt, order);
586       xsrt->runs[idx] = merge_once (xsrt, xsrt->runs + idx, order);
587       remove_range (xsrt->runs, xsrt->run_cnt, sizeof *xsrt->runs,
588                     idx + 1, order - 1);
589       xsrt->run_cnt -= order - 1;
590
591       if (xsrt->runs[idx] == NULL)
592         return NULL;
593     }
594   assert (xsrt->run_cnt == 1);
595   xsrt->run_cnt = 0;
596   return xsrt->runs[0];
597 }
598
599 /* Chooses ORDER runs out of the RUN_CNT runs in RUNS to merge,
600    and returns the index of the first one.
601
602    For stability, we must merge only consecutive runs.  For
603    efficiency, we choose the shortest consecutive sequence of
604    runs. */
605 static int
606 choose_merge (struct casefile *runs[], int run_cnt, int order) 
607 {
608   int min_idx, min_sum;
609   int cur_idx, cur_sum;
610   int i;
611
612   /* Sum up the length of the first ORDER runs. */
613   cur_sum = 0;
614   for (i = 0; i < order; i++)
615     cur_sum += casefile_get_case_cnt (runs[i]);
616
617   /* Find the shortest group of ORDER runs,
618      using a running total for efficiency. */
619   min_idx = 0;
620   min_sum = cur_sum;
621   for (cur_idx = 1; cur_idx + order <= run_cnt; cur_idx++)
622     {
623       cur_sum -= casefile_get_case_cnt (runs[cur_idx - 1]);
624       cur_sum += casefile_get_case_cnt (runs[cur_idx + order - 1]);
625       if (cur_sum < min_sum)
626         {
627           min_sum = cur_sum;
628           min_idx = cur_idx;
629         }
630     }
631
632   return min_idx;
633 }
634
635 /* Merges the RUN_CNT initial runs specified in INPUT_FILES into a
636    new run, and returns the new run.
637    Returns a null pointer if an I/O error occurs. */
638 static struct casefile *
639 merge_once (struct external_sort *xsrt,
640             struct casefile **const input_files,
641             size_t run_cnt)
642 {
643   struct run
644     {
645       struct casefile *file;
646       struct casereader *reader;
647       struct ccase ccase;
648     }
649   *runs;
650
651   struct casefile *output = NULL;
652   int i;
653
654   /* Open input files. */
655   runs = xnmalloc (run_cnt, sizeof *runs);
656   for (i = 0; i < run_cnt; i++) 
657     {
658       struct run *r = &runs[i];
659       r->file = input_files[i];
660       r->reader = casefile_get_destructive_reader (r->file);
661       if (!casereader_read_xfer (r->reader, &r->ccase))
662         {
663           run_cnt--;
664           i--;
665         }
666     }
667
668   /* Create output file. */
669   output = casefile_create (xsrt->value_cnt);
670   casefile_to_disk (output);
671
672   /* Merge. */
673   while (run_cnt > 0) 
674     {
675       struct run *min_run, *run;
676       
677       /* Find minimum. */
678       min_run = runs;
679       for (run = runs + 1; run < runs + run_cnt; run++)
680         if (compare_record (&run->ccase, &min_run->ccase, xsrt->criteria) < 0)
681           min_run = run;
682
683       /* Write minimum to output file. */
684       casefile_append_xfer (output, &min_run->ccase);
685
686       /* Read another case from minimum run. */
687       if (!casereader_read_xfer (min_run->reader, &min_run->ccase))
688         {
689           if (casefile_error (min_run->file) || casefile_error (output))
690             goto error;
691           casereader_destroy (min_run->reader);
692           casefile_destroy (min_run->file);
693
694           remove_element (runs, run_cnt, sizeof *runs, min_run - runs);
695           run_cnt--;
696         } 
697     }
698
699   if (!casefile_sleep (output))
700     goto error;
701   free (runs);
702
703   return output;
704
705  error:
706   for (i = 0; i < run_cnt; i++) 
707     casefile_destroy (runs[i].file);
708   casefile_destroy (output);
709   free (runs);
710   return NULL;
711 }