Get rid of src/libpspp/debug-print.h and all its users. (There were
[pspp-builds.git] / src / math / sort.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21 #include "sort.h"
22 #include <libpspp/message.h>
23 #include <libpspp/alloc.h>
24 #include <limits.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <errno.h>
28 #include <libpspp/array.h>
29 #include <stdbool.h>
30 #include <data/case.h>
31 #include <data/casefile.h>
32 #include <libpspp/message.h>
33 #include <language/expressions/public.h>
34
35 #include <libpspp/misc.h>
36 #include <data/settings.h>
37 #include <libpspp/str.h>
38 #include <data/variable.h>
39 #include <procedure.h>
40
41 #include "gettext.h"
42 #define _(msgid) gettext (msgid)
43
44 /* These should only be changed for testing purposes. */
45 int min_buffers = 64;
46 int max_buffers = INT_MAX;
47 bool allow_internal_sort = true;
48
49 static int compare_record (const struct ccase *, const struct ccase *,
50                            const struct sort_criteria *);
51 static struct casefile *do_internal_sort (struct casereader *,
52                                           const struct sort_criteria *);
53 static struct casefile *do_external_sort (struct casereader *,
54                                           const struct sort_criteria *);
55
56 /* Gets ready to sort the active file, either in-place or to a
57    separate casefile. */
58 static bool
59 prepare_to_sort_active_file (void) 
60 {
61   bool ok;
62   
63   /* Cancel temporary transformations and PROCESS IF. */
64   if (temporary != 0)
65     cancel_temporary (); 
66   expr_free (process_if_expr);
67   process_if_expr = NULL;
68
69   /* Make sure source cases are in a storage source. */
70   ok = procedure (NULL, NULL);
71   assert (case_source_is_class (vfm_source, &storage_source_class));
72
73   return ok;
74 }
75
76 /* Sorts the active file in-place according to CRITERIA.
77    Returns nonzero if successful. */
78 int
79 sort_active_file_in_place (const struct sort_criteria *criteria) 
80 {
81   struct casefile *src, *dst;
82   
83   if (!prepare_to_sort_active_file ())
84     return 0;
85
86   src = storage_source_get_casefile (vfm_source);
87   dst = sort_execute (casefile_get_destructive_reader (src), criteria);
88   free_case_source (vfm_source);
89   vfm_source = NULL;
90
91   if (dst == NULL) 
92     return 0;
93
94   vfm_source = storage_source_create (dst);
95   return 1;
96 }
97
98 /* Sorts the active file to a separate casefile.  If successful,
99    returns the sorted casefile.  Returns a null pointer on
100    failure. */
101 struct casefile *
102 sort_active_file_to_casefile (const struct sort_criteria *criteria) 
103 {
104   struct casefile *src;
105   
106   if (!prepare_to_sort_active_file ())
107     return NULL;
108
109   src = storage_source_get_casefile (vfm_source);
110   return sort_execute (casefile_get_reader (src), criteria);
111 }
112
113
114 /* Reads all the cases from READER, which is destroyed.  Sorts
115    the cases according to CRITERIA.  Returns the sorted cases in
116    a newly created casefile. */
117 struct casefile *
118 sort_execute (struct casereader *reader, const struct sort_criteria *criteria)
119 {
120   struct casefile *output = do_internal_sort (reader, criteria);
121   if (output == NULL)
122     output = do_external_sort (reader, criteria);
123   casereader_destroy (reader);
124   return output;
125 }
126 \f
127 /* A case and its index. */
128 struct indexed_case 
129   {
130     struct ccase c;     /* Case. */
131     unsigned long idx;  /* Index to allow for stable sorting. */
132   };
133
134 static int compare_indexed_cases (const void *, const void *, void *);
135
136 /* If the data is in memory, do an internal sort and return a new
137    casefile for the data.  Otherwise, return a null pointer. */
138 static struct casefile *
139 do_internal_sort (struct casereader *reader,
140                   const struct sort_criteria *criteria)
141 {
142   const struct casefile *src;
143   struct casefile *dst;
144   unsigned long case_cnt;
145
146   if (!allow_internal_sort)
147     return NULL;
148
149   src = casereader_get_casefile (reader);
150   if (casefile_get_case_cnt (src) > 1 && !casefile_in_core (src))
151     return NULL;
152       
153   case_cnt = casefile_get_case_cnt (src);
154   dst = casefile_create (casefile_get_value_cnt (src));
155   if (case_cnt != 0) 
156     {
157       struct indexed_case *cases = nmalloc (sizeof *cases, case_cnt);
158       if (cases != NULL) 
159         {
160           unsigned long i;
161           
162           for (i = 0; i < case_cnt; i++)
163             {
164               bool ok = casereader_read_xfer (reader, &cases[i].c);
165               if (!ok)
166                 abort ();
167               cases[i].idx = i;
168             }
169
170           sort (cases, case_cnt, sizeof *cases, compare_indexed_cases,
171                 (void *) criteria);
172       
173           for (i = 0; i < case_cnt; i++)
174             casefile_append_xfer (dst, &cases[i].c);
175           if (casefile_error (dst))
176             abort ();
177
178           free (cases);
179         }
180       else 
181         {
182           /* Failure. */
183           casefile_destroy (dst);
184           dst = NULL;
185         }
186     }
187
188   return dst;
189 }
190
191 /* Compares the variables specified by CRITERIA between the cases
192    at A and B, with a "last resort" comparison for stability, and
193    returns a strcmp()-type result. */
194 static int
195 compare_indexed_cases (const void *a_, const void *b_, void *criteria_)
196 {
197   struct sort_criteria *criteria = criteria_;
198   const struct indexed_case *a = a_;
199   const struct indexed_case *b = b_;
200   int result = compare_record (&a->c, &b->c, criteria);
201   if (result == 0)
202     result = a->idx < b->idx ? -1 : a->idx > b->idx;
203   return result;
204 }
205 \f
206 /* External sort. */
207
208 /* Maximum order of merge (external sort only).  The maximum
209    reasonable value is about 7.  Above that, it would be a good
210    idea to use a heap in merge_once() to select the minimum. */
211 #define MAX_MERGE_ORDER 7
212
213 /* Results of an external sort. */
214 struct external_sort 
215   {
216     const struct sort_criteria *criteria; /* Sort criteria. */
217     size_t value_cnt;                 /* Size of data in `union value's. */
218     struct casefile **runs;           /* Array of initial runs. */
219     size_t run_cnt, run_cap;          /* Number of runs, allocated capacity. */
220   };
221
222 /* Prototypes for helper functions. */
223 static int write_runs (struct external_sort *, struct casereader *);
224 static struct casefile *merge (struct external_sort *);
225 static void destroy_external_sort (struct external_sort *);
226
227 /* Performs a stable external sort of the active file according
228    to the specification in SCP.  Forms initial runs using a heap
229    as a reservoir.  Merges the initial runs according to a
230    pattern that assures stability. */
231 static struct casefile *
232 do_external_sort (struct casereader *reader,
233                   const struct sort_criteria *criteria)
234 {
235   struct external_sort *xsrt;
236
237   if (!casefile_to_disk (casereader_get_casefile (reader)))
238     return NULL;
239
240   xsrt = xmalloc (sizeof *xsrt);
241   xsrt->criteria = criteria;
242   xsrt->value_cnt = casefile_get_value_cnt (casereader_get_casefile (reader));
243   xsrt->run_cap = 512;
244   xsrt->run_cnt = 0;
245   xsrt->runs = xnmalloc (xsrt->run_cap, sizeof *xsrt->runs);
246   if (write_runs (xsrt, reader))
247     {
248       struct casefile *output = merge (xsrt);
249       destroy_external_sort (xsrt);
250       return output;
251     }
252   else
253     {
254       destroy_external_sort (xsrt);
255       return NULL;
256     }
257 }
258
259 /* Destroys XSRT. */
260 static void
261 destroy_external_sort (struct external_sort *xsrt) 
262 {
263   if (xsrt != NULL) 
264     {
265       int i;
266       
267       for (i = 0; i < xsrt->run_cnt; i++)
268         casefile_destroy (xsrt->runs[i]);
269       free (xsrt->runs);
270       free (xsrt);
271     }
272 }
273 \f
274 /* Replacement selection. */
275
276 /* Pairs a record with a run number. */
277 struct record_run
278   {
279     int run;                    /* Run number of case. */
280     struct ccase record;        /* Case data. */
281     size_t idx;                 /* Case number (for stability). */
282   };
283
284 /* Represents a set of initial runs during an external sort. */
285 struct initial_run_state 
286   {
287     struct external_sort *xsrt;
288
289     /* Reservoir. */
290     struct record_run *records; /* Records arranged as a heap. */
291     size_t record_cnt;          /* Current number of records. */
292     size_t record_cap;          /* Capacity for records. */
293     
294     /* Run currently being output. */
295     int run;                    /* Run number. */
296     size_t case_cnt;            /* Number of cases so far. */
297     struct casefile *casefile;  /* Output file. */
298     struct ccase last_output;   /* Record last output. */
299
300     int okay;                   /* Zero if an error has been encountered. */
301   };
302
303 static const struct case_sink_class sort_sink_class;
304
305 static bool destroy_initial_run_state (struct initial_run_state *);
306 static void process_case (struct initial_run_state *, const struct ccase *,
307                           size_t);
308 static int allocate_cases (struct initial_run_state *);
309 static void output_record (struct initial_run_state *);
310 static void start_run (struct initial_run_state *);
311 static void end_run (struct initial_run_state *);
312 static int compare_record_run (const struct record_run *,
313                                const struct record_run *,
314                                struct initial_run_state *);
315 static int compare_record_run_minheap (const void *, const void *, void *);
316
317 /* Reads cases from READER and composes initial runs in XSRT. */
318 static int
319 write_runs (struct external_sort *xsrt, struct casereader *reader)
320 {
321   struct initial_run_state *irs;
322   struct ccase c;
323   size_t idx = 0;
324   int success = 0;
325
326   /* Allocate memory for cases. */
327   irs = xmalloc (sizeof *irs);
328   irs->xsrt = xsrt;
329   irs->records = NULL;
330   irs->record_cnt = irs->record_cap = 0;
331   irs->run = 0;
332   irs->case_cnt = 0;
333   irs->casefile = NULL;
334   case_nullify (&irs->last_output);
335   irs->okay = 1;
336   if (!allocate_cases (irs)) 
337     goto done;
338
339   /* Create initial runs. */
340   start_run (irs);
341   for (; irs->okay && casereader_read (reader, &c); case_destroy (&c))
342     process_case (irs, &c, idx++);
343   while (irs->okay && irs->record_cnt > 0)
344     output_record (irs);
345   end_run (irs);
346
347   success = irs->okay;
348
349  done:
350   if (!destroy_initial_run_state (irs))
351     success = false;
352
353   return success;
354 }
355
356 /* Add a single case to an initial run. */
357 static void
358 process_case (struct initial_run_state *irs, const struct ccase *c, size_t idx)
359 {
360   struct record_run *rr;
361
362   /* Compose record_run for this run and add to heap. */
363   assert (irs->record_cnt < irs->record_cap - 1);
364   rr = irs->records + irs->record_cnt++;
365   case_copy (&rr->record, 0, c, 0, irs->xsrt->value_cnt);
366   rr->run = irs->run;
367   rr->idx = idx;
368   if (!case_is_null (&irs->last_output)
369       && compare_record (c, &irs->last_output, irs->xsrt->criteria) < 0)
370     rr->run = irs->run + 1;
371   push_heap (irs->records, irs->record_cnt, sizeof *irs->records,
372              compare_record_run_minheap, irs);
373
374   /* Output a record if the reservoir is full. */
375   if (irs->record_cnt == irs->record_cap - 1 && irs->okay)
376     output_record (irs);
377 }
378
379 /* Destroys the initial run state represented by IRS.
380    Returns true if successful, false if an I/O error occurred. */
381 static bool
382 destroy_initial_run_state (struct initial_run_state *irs) 
383 {
384   int i;
385   bool ok = true;
386
387   if (irs == NULL)
388     return true;
389
390   for (i = 0; i < irs->record_cap; i++)
391     case_destroy (&irs->records[i].record);
392   free (irs->records);
393
394   if (irs->casefile != NULL)
395     ok = casefile_sleep (irs->casefile);
396
397   free (irs);
398   return ok;
399 }
400
401 /* Allocates room for lots of cases as a buffer. */
402 static int
403 allocate_cases (struct initial_run_state *irs)
404 {
405   int approx_case_cost; /* Approximate memory cost of one case in bytes. */
406   int max_cases;        /* Maximum number of cases to allocate. */
407   int i;
408
409   /* Allocate as many cases as we can within the workspace
410      limit. */
411   approx_case_cost = (sizeof *irs->records
412                       + irs->xsrt->value_cnt * sizeof (union value)
413                       + 4 * sizeof (void *));
414   max_cases = get_workspace() / approx_case_cost;
415   if (max_cases > max_buffers)
416     max_cases = max_buffers;
417   irs->records = nmalloc (sizeof *irs->records, max_cases);
418   if (irs->records != NULL)
419     for (i = 0; i < max_cases; i++)
420       if (!case_try_create (&irs->records[i].record, irs->xsrt->value_cnt))
421         {
422           max_cases = i;
423           break;
424         }
425   irs->record_cap = max_cases;
426
427   /* Fail if we didn't allocate an acceptable number of cases. */
428   if (irs->records == NULL || max_cases < min_buffers)
429     {
430       msg (SE, _("Out of memory.  Could not allocate room for minimum of %d "
431                  "cases of %d bytes each.  (PSPP workspace is currently "
432                  "restricted to a maximum of %d KB.)"),
433            min_buffers, approx_case_cost, get_workspace() / 1024);
434       return 0;
435     }
436   return 1;
437 }
438
439 /* Compares the VAR_CNT variables in VARS[] between the `value's at
440    A and B, and returns a strcmp()-type result. */
441 static int
442 compare_record (const struct ccase *a, const struct ccase *b,
443                 const struct sort_criteria *criteria)
444 {
445   int i;
446
447   assert (a != NULL);
448   assert (b != NULL);
449   
450   for (i = 0; i < criteria->crit_cnt; i++)
451     {
452       const struct sort_criterion *c = &criteria->crits[i];
453       int result;
454       
455       if (c->width == 0)
456         {
457           double af = case_num (a, c->fv);
458           double bf = case_num (b, c->fv);
459           
460           result = af < bf ? -1 : af > bf;
461         }
462       else
463         result = memcmp (case_str (a, c->fv), case_str (b, c->fv), c->width);
464
465       if (result != 0)
466         return c->dir == SRT_ASCEND ? result : -result;
467     }
468
469   return 0;
470 }
471
472 /* Compares record-run tuples A and B on run number first, then
473    on record, then on case index. */
474 static int
475 compare_record_run (const struct record_run *a,
476                     const struct record_run *b,
477                     struct initial_run_state *irs)
478 {
479   int result = a->run < b->run ? -1 : a->run > b->run;
480   if (result == 0)
481     result = compare_record (&a->record, &b->record, irs->xsrt->criteria);
482   if (result == 0)
483     result = a->idx < b->idx ? -1 : a->idx > b->idx;
484   return result;
485 }
486
487 /* Compares record-run tuples A and B on run number first, then
488    on the current record according to SCP, but in descending
489    order. */
490 static int
491 compare_record_run_minheap (const void *a, const void *b, void *irs) 
492 {
493   return -compare_record_run (a, b, irs);
494 }
495
496 /* Begins a new initial run, specifically its output file. */
497 static void
498 start_run (struct initial_run_state *irs)
499 {
500   irs->run++;
501   irs->case_cnt = 0;
502   irs->casefile = casefile_create (irs->xsrt->value_cnt);
503   casefile_to_disk (irs->casefile);
504   case_nullify (&irs->last_output); 
505 }
506
507 /* Ends the current initial run.  */
508 static void
509 end_run (struct initial_run_state *irs)
510 {
511   struct external_sort *xsrt = irs->xsrt;
512
513   /* Record initial run. */
514   if (irs->casefile != NULL) 
515     {
516       casefile_sleep (irs->casefile);
517       if (xsrt->run_cnt >= xsrt->run_cap) 
518         {
519           xsrt->run_cap *= 2;
520           xsrt->runs = xnrealloc (xsrt->runs,
521                                   xsrt->run_cap, sizeof *xsrt->runs);
522         }
523       xsrt->runs[xsrt->run_cnt++] = irs->casefile;
524       if (casefile_error (irs->casefile))
525         irs->okay = false;
526       irs->casefile = NULL; 
527     }
528 }
529
530 /* Writes a record to the current initial run. */
531 static void
532 output_record (struct initial_run_state *irs)
533 {
534   struct record_run *record_run;
535   struct ccase case_tmp;
536   
537   /* Extract minimum case from heap. */
538   assert (irs->record_cnt > 0);
539   pop_heap (irs->records, irs->record_cnt--, sizeof *irs->records,
540             compare_record_run_minheap, irs);
541   record_run = irs->records + irs->record_cnt;
542
543   /* Bail if an error has occurred. */
544   if (!irs->okay)
545     return;
546
547   /* Start new run if necessary. */
548   assert (record_run->run == irs->run
549           || record_run->run == irs->run + 1);
550   if (record_run->run != irs->run)
551     {
552       end_run (irs);
553       start_run (irs);
554     }
555   assert (record_run->run == irs->run);
556   irs->case_cnt++;
557
558   /* Write to disk. */
559   if (irs->casefile != NULL)
560     casefile_append (irs->casefile, &record_run->record);
561
562   /* This record becomes last_output. */
563   irs->last_output = case_tmp = record_run->record;
564   record_run->record = irs->records[irs->record_cap - 1].record;
565   irs->records[irs->record_cap - 1].record = case_tmp;
566 }
567 \f
568 /* Merging. */
569
570 static int choose_merge (struct casefile *runs[], int run_cnt, int order);
571 static struct casefile *merge_once (struct external_sort *,
572                                     struct casefile *[], size_t);
573
574 /* Repeatedly merges run until only one is left,
575    and returns the final casefile.
576    Returns a null pointer if an I/O error occurs. */
577 static struct casefile *
578 merge (struct external_sort *xsrt)
579 {
580   while (xsrt->run_cnt > 1)
581     {
582       int order = min (MAX_MERGE_ORDER, xsrt->run_cnt);
583       int idx = choose_merge (xsrt->runs, xsrt->run_cnt, order);
584       xsrt->runs[idx] = merge_once (xsrt, xsrt->runs + idx, order);
585       remove_range (xsrt->runs, xsrt->run_cnt, sizeof *xsrt->runs,
586                     idx + 1, order - 1);
587       xsrt->run_cnt -= order - 1;
588
589       if (xsrt->runs[idx] == NULL)
590         return NULL;
591     }
592   assert (xsrt->run_cnt == 1);
593   xsrt->run_cnt = 0;
594   return xsrt->runs[0];
595 }
596
597 /* Chooses ORDER runs out of the RUN_CNT runs in RUNS to merge,
598    and returns the index of the first one.
599
600    For stability, we must merge only consecutive runs.  For
601    efficiency, we choose the shortest consecutive sequence of
602    runs. */
603 static int
604 choose_merge (struct casefile *runs[], int run_cnt, int order) 
605 {
606   int min_idx, min_sum;
607   int cur_idx, cur_sum;
608   int i;
609
610   /* Sum up the length of the first ORDER runs. */
611   cur_sum = 0;
612   for (i = 0; i < order; i++)
613     cur_sum += casefile_get_case_cnt (runs[i]);
614
615   /* Find the shortest group of ORDER runs,
616      using a running total for efficiency. */
617   min_idx = 0;
618   min_sum = cur_sum;
619   for (cur_idx = 1; cur_idx + order <= run_cnt; cur_idx++)
620     {
621       cur_sum -= casefile_get_case_cnt (runs[cur_idx - 1]);
622       cur_sum += casefile_get_case_cnt (runs[cur_idx + order - 1]);
623       if (cur_sum < min_sum)
624         {
625           min_sum = cur_sum;
626           min_idx = cur_idx;
627         }
628     }
629
630   return min_idx;
631 }
632
633 /* Merges the RUN_CNT initial runs specified in INPUT_FILES into a
634    new run, and returns the new run.
635    Returns a null pointer if an I/O error occurs. */
636 static struct casefile *
637 merge_once (struct external_sort *xsrt,
638             struct casefile **const input_files,
639             size_t run_cnt)
640 {
641   struct run
642     {
643       struct casefile *file;
644       struct casereader *reader;
645       struct ccase ccase;
646     }
647   *runs;
648
649   struct casefile *output = NULL;
650   int i;
651
652   /* Open input files. */
653   runs = xnmalloc (run_cnt, sizeof *runs);
654   for (i = 0; i < run_cnt; i++) 
655     {
656       struct run *r = &runs[i];
657       r->file = input_files[i];
658       r->reader = casefile_get_destructive_reader (r->file);
659       if (!casereader_read_xfer (r->reader, &r->ccase))
660         {
661           run_cnt--;
662           i--;
663         }
664     }
665
666   /* Create output file. */
667   output = casefile_create (xsrt->value_cnt);
668   casefile_to_disk (output);
669
670   /* Merge. */
671   while (run_cnt > 0) 
672     {
673       struct run *min_run, *run;
674       
675       /* Find minimum. */
676       min_run = runs;
677       for (run = runs + 1; run < runs + run_cnt; run++)
678         if (compare_record (&run->ccase, &min_run->ccase, xsrt->criteria) < 0)
679           min_run = run;
680
681       /* Write minimum to output file. */
682       casefile_append_xfer (output, &min_run->ccase);
683
684       /* Read another case from minimum run. */
685       if (!casereader_read_xfer (min_run->reader, &min_run->ccase))
686         {
687           if (casefile_error (min_run->file) || casefile_error (output))
688             goto error;
689           casereader_destroy (min_run->reader);
690           casefile_destroy (min_run->file);
691
692           remove_element (runs, run_cnt, sizeof *runs, min_run - runs);
693           run_cnt--;
694         } 
695     }
696
697   if (!casefile_sleep (output))
698     goto error;
699   free (runs);
700
701   return output;
702
703  error:
704   for (i = 0; i < run_cnt; i++) 
705     casefile_destroy (runs[i].file);
706   casefile_destroy (output);
707   free (runs);
708   return NULL;
709 }