Remove "Written by Ben Pfaff <blp@gnu.org>" lines everywhere.
[pspp-builds.git] / src / math / sort.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License as
6    published by the Free Software Foundation; either version 2 of the
7    License, or (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful, but
10    WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12    General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
17    02110-1301, USA. */
18
19 #include <config.h>
20
21 #include "sort.h"
22
23 #include <errno.h>
24 #include <limits.h>
25 #include <stdbool.h>
26 #include <stdio.h>
27 #include <stdlib.h>
28
29 #include <data/case-source.h>
30 #include <data/case.h>
31 #include <data/casefile.h>
32 #include <data/fastfile.h>
33 #include <data/procedure.h>
34 #include <data/settings.h>
35 #include <data/variable.h>
36 #include <data/storage-stream.h>
37 #include <libpspp/alloc.h>
38 #include <libpspp/array.h>
39 #include <libpspp/assertion.h>
40 #include <libpspp/message.h>
41 #include <libpspp/message.h>
42 #include <libpspp/misc.h>
43 #include <libpspp/str.h>
44
45 #include "minmax.h"
46
47 #include "gettext.h"
48 #define _(msgid) gettext (msgid)
49
50 /* These should only be changed for testing purposes. */
51 int min_buffers = 64;
52 int max_buffers = INT_MAX;
53 bool allow_internal_sort = true;
54
55 static int compare_record (const struct ccase *, const struct ccase *,
56                            const struct sort_criteria *);
57 static struct casefile *do_internal_sort (struct casereader *,
58                                           const struct sort_criteria *);
59 static struct casefile *do_external_sort (struct casereader *,
60                                           const struct sort_criteria *);
61
62
63 /* Sorts the active file in-place according to CRITERIA.
64    Returns true if successful. */
65 bool
66 sort_active_file_in_place (struct dataset *ds, 
67                            const struct sort_criteria *criteria) 
68 {
69   struct casefile *in, *out;
70
71   proc_cancel_temporary_transformations (ds);
72   if (!procedure (ds, NULL, NULL))
73     return false;
74   
75   in = proc_capture_output (ds);
76   out = sort_execute (casefile_get_destructive_reader (in), criteria);
77   if (out == NULL) 
78     return false;
79
80   proc_set_source (ds, storage_source_create (out));
81   return true;
82 }
83
84 /* Data passed to sort_to_casefile_callback(). */
85 struct sort_to_casefile_cb_data 
86   {
87     const struct sort_criteria *criteria;
88     struct casefile *output;
89   };
90
91 /* Sorts casefile CF according to the criteria in CB_DATA. */
92 static bool
93 sort_to_casefile_callback (const struct casefile *cf, void *cb_data_) 
94 {
95   struct sort_to_casefile_cb_data *cb_data = cb_data_;
96   cb_data->output = sort_execute (casefile_get_reader (cf, NULL), cb_data->criteria);
97   return cb_data->output != NULL;
98 }
99
100 /* Sorts the active file to a separate casefile.  If successful,
101    returns the sorted casefile.  Returns a null pointer on
102    failure. */
103 struct casefile *
104 sort_active_file_to_casefile (struct dataset *ds, 
105                               const struct sort_criteria *criteria) 
106 {
107   struct sort_to_casefile_cb_data cb_data;
108   
109   proc_cancel_temporary_transformations (ds);
110
111   cb_data.criteria = criteria;
112   cb_data.output = NULL;
113   if (!multipass_procedure (ds, sort_to_casefile_callback, &cb_data)) 
114     {
115       casefile_destroy (cb_data.output);
116       return NULL;
117     }
118   return cb_data.output;
119 }
120
121
122 /* Reads all the cases from READER, which is destroyed.  Sorts
123    the cases according to CRITERIA.  Returns the sorted cases in
124    a newly created casefile. */
125 struct casefile *
126 sort_execute (struct casereader *reader, const struct sort_criteria *criteria)
127 {
128   struct casefile *output = do_internal_sort (reader, criteria);
129   if (output == NULL)
130     output = do_external_sort (reader, criteria);
131   casereader_destroy (reader);
132   return output;
133 }
134 \f
135 /* A case and its index. */
136 struct indexed_case 
137   {
138     struct ccase c;     /* Case. */
139     unsigned long idx;  /* Index to allow for stable sorting. */
140   };
141
142 static int compare_indexed_cases (const void *, const void *, const void *);
143
144 /* If the data is in memory, do an internal sort and return a new
145    casefile for the data.  Otherwise, return a null pointer. */
146 static struct casefile *
147 do_internal_sort (struct casereader *reader,
148                   const struct sort_criteria *criteria)
149 {
150   const struct casefile *src;
151   struct casefile *dst;
152   unsigned long case_cnt;
153
154   if (!allow_internal_sort)
155     return NULL;
156
157   src = casereader_get_casefile (reader);
158   if (casefile_get_case_cnt (src) > 1 && !casefile_in_core (src))
159     return NULL;
160       
161   case_cnt = casefile_get_case_cnt (src);
162   dst = fastfile_create (casefile_get_value_cnt (src));
163   if (case_cnt != 0) 
164     {
165       struct indexed_case *cases = nmalloc (sizeof *cases, case_cnt);
166       if (cases != NULL) 
167         {
168           unsigned long i;
169           
170           for (i = 0; i < case_cnt; i++)
171             {
172               bool ok = casereader_read_xfer (reader, &cases[i].c);
173               if (!ok)
174                 NOT_REACHED ();
175               cases[i].idx = i;
176             }
177
178           sort (cases, case_cnt, sizeof *cases, compare_indexed_cases,
179                 (void *) criteria);
180       
181           for (i = 0; i < case_cnt; i++)
182             casefile_append_xfer (dst, &cases[i].c);
183           if (casefile_error (dst))
184             NOT_REACHED ();
185
186           free (cases);
187         }
188       else 
189         {
190           /* Failure. */
191           casefile_destroy (dst);
192           dst = NULL;
193         }
194     }
195
196   return dst;
197 }
198
199 /* Compares the variables specified by CRITERIA between the cases
200    at A and B, with a "last resort" comparison for stability, and
201    returns a strcmp()-type result. */
202 static int
203 compare_indexed_cases (const void *a_, const void *b_, const void *criteria_)
204 {
205   const struct sort_criteria *criteria = criteria_;
206   const struct indexed_case *a = a_;
207   const struct indexed_case *b = b_;
208   int result = compare_record (&a->c, &b->c, criteria);
209   if (result == 0)
210     result = a->idx < b->idx ? -1 : a->idx > b->idx;
211   return result;
212 }
213 \f
214 /* External sort. */
215
216 /* Maximum order of merge (external sort only).  The maximum
217    reasonable value is about 7.  Above that, it would be a good
218    idea to use a heap in merge_once() to select the minimum. */
219 #define MAX_MERGE_ORDER 7
220
221 /* Results of an external sort. */
222 struct external_sort 
223   {
224     const struct sort_criteria *criteria; /* Sort criteria. */
225     size_t value_cnt;                 /* Size of data in `union value's. */
226     struct casefile **runs;           /* Array of initial runs. */
227     size_t run_cnt, run_cap;          /* Number of runs, allocated capacity. */
228   };
229
230 /* Prototypes for helper functions. */
231 static int write_runs (struct external_sort *, struct casereader *);
232 static struct casefile *merge (struct external_sort *);
233 static void destroy_external_sort (struct external_sort *);
234
235 /* Performs a stable external sort of the active file according
236    to the specification in SCP.  Forms initial runs using a heap
237    as a reservoir.  Merges the initial runs according to a
238    pattern that assures stability. */
239 static struct casefile *
240 do_external_sort (struct casereader *reader,
241                   const struct sort_criteria *criteria)
242 {
243   struct external_sort *xsrt;
244
245   if (!casefile_to_disk (casereader_get_casefile (reader)))
246     return NULL;
247
248   xsrt = xmalloc (sizeof *xsrt);
249   xsrt->criteria = criteria;
250   xsrt->value_cnt = casefile_get_value_cnt (casereader_get_casefile (reader));
251   xsrt->run_cap = 512;
252   xsrt->run_cnt = 0;
253   xsrt->runs = xnmalloc (xsrt->run_cap, sizeof *xsrt->runs);
254   if (write_runs (xsrt, reader))
255     {
256       struct casefile *output = merge (xsrt);
257       destroy_external_sort (xsrt);
258       return output;
259     }
260   else
261     {
262       destroy_external_sort (xsrt);
263       return NULL;
264     }
265 }
266
267 /* Destroys XSRT. */
268 static void
269 destroy_external_sort (struct external_sort *xsrt) 
270 {
271   if (xsrt != NULL) 
272     {
273       int i;
274       
275       for (i = 0; i < xsrt->run_cnt; i++)
276         casefile_destroy (xsrt->runs[i]);
277       free (xsrt->runs);
278       free (xsrt);
279     }
280 }
281 \f
282 /* Replacement selection. */
283
284 /* Pairs a record with a run number. */
285 struct record_run
286   {
287     int run;                    /* Run number of case. */
288     struct ccase record;        /* Case data. */
289     size_t idx;                 /* Case number (for stability). */
290   };
291
292 /* Represents a set of initial runs during an external sort. */
293 struct initial_run_state 
294   {
295     struct external_sort *xsrt;
296
297     /* Reservoir. */
298     struct record_run *records; /* Records arranged as a heap. */
299     size_t record_cnt;          /* Current number of records. */
300     size_t record_cap;          /* Capacity for records. */
301     
302     /* Run currently being output. */
303     int run;                    /* Run number. */
304     size_t case_cnt;            /* Number of cases so far. */
305     struct casefile *casefile;  /* Output file. */
306     struct ccase last_output;   /* Record last output. */
307
308     int okay;                   /* Zero if an error has been encountered. */
309   };
310
311 static bool destroy_initial_run_state (struct initial_run_state *);
312 static void process_case (struct initial_run_state *, 
313                           const struct ccase *, size_t);
314 static int allocate_cases (struct initial_run_state *);
315 static void output_record (struct initial_run_state *);
316 static void start_run (struct initial_run_state *);
317 static void end_run (struct initial_run_state *);
318 static int compare_record_run (const struct record_run *,
319                                const struct record_run *,
320                                const struct initial_run_state *);
321 static int compare_record_run_minheap (const void *, const void *, 
322                                        const void *);
323
324 /* Reads cases from READER and composes initial runs in XSRT. */
325 static int
326 write_runs (struct external_sort *xsrt, struct casereader *reader)
327 {
328   struct initial_run_state *irs;
329   struct ccase c;
330   size_t idx = 0;
331   int success = 0;
332
333   /* Allocate memory for cases. */
334   irs = xmalloc (sizeof *irs);
335   irs->xsrt = xsrt;
336   irs->records = NULL;
337   irs->record_cnt = irs->record_cap = 0;
338   irs->run = 0;
339   irs->case_cnt = 0;
340   irs->casefile = NULL;
341   case_nullify (&irs->last_output);
342   irs->okay = 1;
343   if (!allocate_cases (irs)) 
344     goto done;
345
346   /* Create initial runs. */
347   start_run (irs);
348   for (; irs->okay && casereader_read (reader, &c); case_destroy (&c))
349     process_case (irs, &c, idx++);
350   while (irs->okay && irs->record_cnt > 0)
351     output_record (irs);
352   end_run (irs);
353
354   success = irs->okay;
355
356  done:
357   if (!destroy_initial_run_state (irs))
358     success = false;
359
360   return success;
361 }
362
363 /* Add a single case to an initial run. */
364 static void
365 process_case (struct initial_run_state *irs, const struct ccase *c, 
366               size_t idx)
367 {
368   struct record_run *rr;
369
370   /* Compose record_run for this run and add to heap. */
371   assert (irs->record_cnt < irs->record_cap - 1);
372   rr = irs->records + irs->record_cnt++;
373   case_copy (&rr->record, 0, c, 0, irs->xsrt->value_cnt);
374   rr->run = irs->run;
375   rr->idx = idx;
376   if (!case_is_null (&irs->last_output)
377       && compare_record (c, &irs->last_output, irs->xsrt->criteria) < 0)
378     rr->run = irs->run + 1;
379   push_heap (irs->records, irs->record_cnt, sizeof *irs->records,
380              compare_record_run_minheap, irs);
381
382   /* Output a record if the reservoir is full. */
383   if (irs->record_cnt == irs->record_cap - 1 && irs->okay)
384     output_record (irs);
385 }
386
387 /* Destroys the initial run state represented by IRS.
388    Returns true if successful, false if an I/O error occurred. */
389 static bool
390 destroy_initial_run_state (struct initial_run_state *irs) 
391 {
392   int i;
393   bool ok = true;
394
395   if (irs == NULL)
396     return true;
397
398   for (i = 0; i < irs->record_cap; i++)
399     case_destroy (&irs->records[i].record);
400   free (irs->records);
401
402   if (irs->casefile != NULL)
403     ok = casefile_sleep (irs->casefile);
404
405   free (irs);
406   return ok;
407 }
408
409 /* Allocates room for lots of cases as a buffer. */
410 static int
411 allocate_cases (struct initial_run_state *irs)
412 {
413   int approx_case_cost; /* Approximate memory cost of one case in bytes. */
414   int max_cases;        /* Maximum number of cases to allocate. */
415   int i;
416
417   /* Allocate as many cases as we can within the workspace
418      limit. */
419   approx_case_cost = (sizeof *irs->records
420                       + irs->xsrt->value_cnt * sizeof (union value)
421                       + 4 * sizeof (void *));
422   max_cases = get_workspace() / approx_case_cost;
423   if (max_cases > max_buffers)
424     max_cases = max_buffers;
425   irs->records = nmalloc (sizeof *irs->records, max_cases);
426   if (irs->records != NULL)
427     for (i = 0; i < max_cases; i++)
428       if (!case_try_create (&irs->records[i].record, irs->xsrt->value_cnt))
429         {
430           max_cases = i;
431           break;
432         }
433   irs->record_cap = max_cases;
434
435   /* Fail if we didn't allocate an acceptable number of cases. */
436   if (irs->records == NULL || max_cases < min_buffers)
437     {
438       msg (SE, _("Out of memory.  Could not allocate room for minimum of %d "
439                  "cases of %d bytes each.  (PSPP workspace is currently "
440                  "restricted to a maximum of %d KB.)"),
441            min_buffers, approx_case_cost, get_workspace() / 1024);
442       return 0;
443     }
444   return 1;
445 }
446
447 /* Compares the VAR_CNT variables in VARS[] between the `value's at
448    A and B, and returns a strcmp()-type result. */
449 static int
450 compare_record (const struct ccase *a, const struct ccase *b,
451                 const struct sort_criteria *criteria)
452 {
453   int i;
454
455   assert (a != NULL);
456   assert (b != NULL);
457   
458   for (i = 0; i < criteria->crit_cnt; i++)
459     {
460       const struct sort_criterion *c = &criteria->crits[i];
461       int result;
462       
463       if (c->width == 0)
464         {
465           double af = case_num_idx (a, c->fv);
466           double bf = case_num_idx (b, c->fv);
467           
468           result = af < bf ? -1 : af > bf;
469         }
470       else
471         result = memcmp (case_str_idx (a, c->fv),
472                          case_str_idx (b, c->fv), c->width);
473
474       if (result != 0)
475         return c->dir == SRT_ASCEND ? result : -result;
476     }
477
478   return 0;
479 }
480
481 /* Compares record-run tuples A and B on run number first, then
482    on record, then on case index. */
483 static int
484 compare_record_run (const struct record_run *a,
485                     const struct record_run *b,
486                     const struct initial_run_state *irs)
487 {
488   int result = a->run < b->run ? -1 : a->run > b->run;
489   if (result == 0)
490     result = compare_record (&a->record, &b->record, irs->xsrt->criteria);
491   if (result == 0)
492     result = a->idx < b->idx ? -1 : a->idx > b->idx;
493   return result;
494 }
495
496 /* Compares record-run tuples A and B on run number first, then
497    on the current record according to SCP, but in descending
498    order. */
499 static int
500 compare_record_run_minheap (const void *a, const void *b, const void *irs) 
501 {
502   return -compare_record_run (a, b, irs);
503 }
504
505 /* Begins a new initial run, specifically its output file. */
506 static void
507 start_run (struct initial_run_state *irs)
508 {
509   irs->run++;
510   irs->case_cnt = 0;
511   irs->casefile = fastfile_create (irs->xsrt->value_cnt);
512   casefile_to_disk (irs->casefile);
513   case_nullify (&irs->last_output); 
514 }
515
516 /* Ends the current initial run.  */
517 static void
518 end_run (struct initial_run_state *irs)
519 {
520   struct external_sort *xsrt = irs->xsrt;
521
522   /* Record initial run. */
523   if (irs->casefile != NULL) 
524     {
525       casefile_sleep (irs->casefile);
526       if (xsrt->run_cnt >= xsrt->run_cap) 
527         {
528           xsrt->run_cap *= 2;
529           xsrt->runs = xnrealloc (xsrt->runs,
530                                   xsrt->run_cap, sizeof *xsrt->runs);
531         }
532       xsrt->runs[xsrt->run_cnt++] = irs->casefile;
533       if (casefile_error (irs->casefile))
534         irs->okay = false;
535       irs->casefile = NULL; 
536     }
537 }
538
539 /* Writes a record to the current initial run. */
540 static void
541 output_record (struct initial_run_state *irs)
542 {
543   struct record_run *record_run;
544   struct ccase case_tmp;
545   
546   /* Extract minimum case from heap. */
547   assert (irs->record_cnt > 0);
548   pop_heap (irs->records, irs->record_cnt--, sizeof *irs->records,
549             compare_record_run_minheap, irs);
550   record_run = irs->records + irs->record_cnt;
551
552   /* Bail if an error has occurred. */
553   if (!irs->okay)
554     return;
555
556   /* Start new run if necessary. */
557   assert (record_run->run == irs->run
558           || record_run->run == irs->run + 1);
559   if (record_run->run != irs->run)
560     {
561       end_run (irs);
562       start_run (irs);
563     }
564   assert (record_run->run == irs->run);
565   irs->case_cnt++;
566
567   /* Write to disk. */
568   if (irs->casefile != NULL)
569     casefile_append (irs->casefile, &record_run->record);
570
571   /* This record becomes last_output. */
572   irs->last_output = case_tmp = record_run->record;
573   record_run->record = irs->records[irs->record_cap - 1].record;
574   irs->records[irs->record_cap - 1].record = case_tmp;
575 }
576 \f
577 /* Merging. */
578
579 static int choose_merge (struct casefile *runs[], int run_cnt, int order);
580 static struct casefile *merge_once (struct external_sort *,
581                                     struct casefile *[], size_t);
582
583 /* Repeatedly merges run until only one is left,
584    and returns the final casefile.
585    Returns a null pointer if an I/O error occurs. */
586 static struct casefile *
587 merge (struct external_sort *xsrt)
588 {
589   while (xsrt->run_cnt > 1)
590     {
591       int order = MIN (MAX_MERGE_ORDER, xsrt->run_cnt);
592       int idx = choose_merge (xsrt->runs, xsrt->run_cnt, order);
593       xsrt->runs[idx] = merge_once (xsrt, xsrt->runs + idx, order);
594       remove_range (xsrt->runs, xsrt->run_cnt, sizeof *xsrt->runs,
595                     idx + 1, order - 1);
596       xsrt->run_cnt -= order - 1;
597
598       if (xsrt->runs[idx] == NULL)
599         return NULL;
600     }
601   assert (xsrt->run_cnt == 1);
602   xsrt->run_cnt = 0;
603   return xsrt->runs[0];
604 }
605
606 /* Chooses ORDER runs out of the RUN_CNT runs in RUNS to merge,
607    and returns the index of the first one.
608
609    For stability, we must merge only consecutive runs.  For
610    efficiency, we choose the shortest consecutive sequence of
611    runs. */
612 static int
613 choose_merge (struct casefile *runs[], int run_cnt, int order) 
614 {
615   int min_idx, min_sum;
616   int cur_idx, cur_sum;
617   int i;
618
619   /* Sum up the length of the first ORDER runs. */
620   cur_sum = 0;
621   for (i = 0; i < order; i++)
622     cur_sum += casefile_get_case_cnt (runs[i]);
623
624   /* Find the shortest group of ORDER runs,
625      using a running total for efficiency. */
626   min_idx = 0;
627   min_sum = cur_sum;
628   for (cur_idx = 1; cur_idx + order <= run_cnt; cur_idx++)
629     {
630       cur_sum -= casefile_get_case_cnt (runs[cur_idx - 1]);
631       cur_sum += casefile_get_case_cnt (runs[cur_idx + order - 1]);
632       if (cur_sum < min_sum)
633         {
634           min_sum = cur_sum;
635           min_idx = cur_idx;
636         }
637     }
638
639   return min_idx;
640 }
641
642 /* Merges the RUN_CNT initial runs specified in INPUT_FILES into a
643    new run, and returns the new run.
644    Returns a null pointer if an I/O error occurs. */
645 static struct casefile *
646 merge_once (struct external_sort *xsrt,
647             struct casefile **const input_files,
648             size_t run_cnt)
649 {
650   struct run
651     {
652       struct casefile *file;
653       struct casereader *reader;
654       struct ccase ccase;
655     }
656   *runs;
657
658   struct casefile *output = NULL;
659   int i;
660
661   /* Open input files. */
662   runs = xnmalloc (run_cnt, sizeof *runs);
663   for (i = 0; i < run_cnt; i++) 
664     {
665       struct run *r = &runs[i];
666       r->file = input_files[i];
667       r->reader = casefile_get_destructive_reader (r->file);
668       if (!casereader_read_xfer (r->reader, &r->ccase))
669         {
670           run_cnt--;
671           i--;
672         }
673     }
674
675   /* Create output file. */
676   output = fastfile_create (xsrt->value_cnt);
677   casefile_to_disk (output);
678
679   /* Merge. */
680   while (run_cnt > 0) 
681     {
682       struct run *min_run, *run;
683       
684       /* Find minimum. */
685       min_run = runs;
686       for (run = runs + 1; run < runs + run_cnt; run++)
687         if (compare_record (&run->ccase, &min_run->ccase, xsrt->criteria) < 0)
688           min_run = run;
689
690       /* Write minimum to output file. */
691       casefile_append_xfer (output, &min_run->ccase);
692
693       /* Read another case from minimum run. */
694       if (!casereader_read_xfer (min_run->reader, &min_run->ccase))
695         {
696           if (casefile_error (min_run->file) || casefile_error (output))
697             goto error;
698           casereader_destroy (min_run->reader);
699           casefile_destroy (min_run->file);
700
701           remove_element (runs, run_cnt, sizeof *runs, min_run - runs);
702           run_cnt--;
703         } 
704     }
705
706   if (!casefile_sleep (output))
707     goto error;
708   free (runs);
709
710   return output;
711
712  error:
713   for (i = 0; i < run_cnt; i++) 
714     casefile_destroy (runs[i].file);
715   casefile_destroy (output);
716   free (runs);
717   return NULL;
718 }