Encapsulated the static data of procedure.[ch] into a single object, to be
[pspp-builds.git] / src / math / sort.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21
22 #include "sort.h"
23
24 #include <errno.h>
25 #include <limits.h>
26 #include <stdbool.h>
27 #include <stdio.h>
28 #include <stdlib.h>
29
30 #include <data/case-source.h>
31 #include <data/case.h>
32 #include <data/casefile.h>
33 #include <data/fastfile.h>
34 #include <data/procedure.h>
35 #include <data/settings.h>
36 #include <data/variable.h>
37 #include <data/storage-stream.h>
38 #include <language/expressions/public.h>
39 #include <libpspp/alloc.h>
40 #include <libpspp/array.h>
41 #include <libpspp/assertion.h>
42 #include <libpspp/message.h>
43 #include <libpspp/message.h>
44 #include <libpspp/misc.h>
45 #include <libpspp/str.h>
46
47 #include "gettext.h"
48 #define _(msgid) gettext (msgid)
49
50 /* These should only be changed for testing purposes. */
51 int min_buffers = 64;
52 int max_buffers = INT_MAX;
53 bool allow_internal_sort = true;
54
55 static int compare_record (const struct ccase *, const struct ccase *,
56                            const struct sort_criteria *);
57 static struct casefile *do_internal_sort (struct casereader *,
58                                           const struct sort_criteria *);
59 static struct casefile *do_external_sort (struct casereader *,
60                                           const struct sort_criteria *);
61
62 /* Get ready to sort the active file. */
63 static void
64 prepare_to_sort_active_file (void) 
65 {
66   proc_cancel_temporary_transformations (current_dataset); 
67 }
68
69 /* Sorts the active file in-place according to CRITERIA.
70    Returns true if successful. */
71 bool
72 sort_active_file_in_place (const struct sort_criteria *criteria) 
73 {
74   struct casefile *in, *out;
75
76   prepare_to_sort_active_file ();
77   if (!procedure (current_dataset,NULL, NULL))
78     return false;
79   
80   in = proc_capture_output (current_dataset);
81   out = sort_execute (casefile_get_destructive_reader (in), criteria);
82   if (out == NULL) 
83     return false;
84
85   proc_set_source (current_dataset, storage_source_create (out));
86   return true;
87 }
88
89 /* Data passed to sort_to_casefile_callback(). */
90 struct sort_to_casefile_cb_data 
91   {
92     const struct sort_criteria *criteria;
93     struct casefile *output;
94   };
95
96 /* Sorts casefile CF according to the criteria in CB_DATA. */
97 static bool
98 sort_to_casefile_callback (const struct casefile *cf, void *cb_data_) 
99 {
100   struct sort_to_casefile_cb_data *cb_data = cb_data_;
101   cb_data->output = sort_execute (casefile_get_reader (cf), cb_data->criteria);
102   return cb_data->output != NULL;
103 }
104
105 /* Sorts the active file to a separate casefile.  If successful,
106    returns the sorted casefile.  Returns a null pointer on
107    failure. */
108 struct casefile *
109 sort_active_file_to_casefile (const struct sort_criteria *criteria) 
110 {
111   struct sort_to_casefile_cb_data cb_data;
112   
113   prepare_to_sort_active_file ();
114
115   cb_data.criteria = criteria;
116   cb_data.output = NULL;
117   if (!multipass_procedure (current_dataset,sort_to_casefile_callback, &cb_data)) 
118     {
119       casefile_destroy (cb_data.output);
120       return NULL;
121     }
122   return cb_data.output;
123 }
124
125
126 /* Reads all the cases from READER, which is destroyed.  Sorts
127    the cases according to CRITERIA.  Returns the sorted cases in
128    a newly created casefile. */
129 struct casefile *
130 sort_execute (struct casereader *reader, const struct sort_criteria *criteria)
131 {
132   struct casefile *output = do_internal_sort (reader, criteria);
133   if (output == NULL)
134     output = do_external_sort (reader, criteria);
135   casereader_destroy (reader);
136   return output;
137 }
138 \f
139 /* A case and its index. */
140 struct indexed_case 
141   {
142     struct ccase c;     /* Case. */
143     unsigned long idx;  /* Index to allow for stable sorting. */
144   };
145
146 static int compare_indexed_cases (const void *, const void *, void *);
147
148 /* If the data is in memory, do an internal sort and return a new
149    casefile for the data.  Otherwise, return a null pointer. */
150 static struct casefile *
151 do_internal_sort (struct casereader *reader,
152                   const struct sort_criteria *criteria)
153 {
154   const struct casefile *src;
155   struct casefile *dst;
156   unsigned long case_cnt;
157
158   if (!allow_internal_sort)
159     return NULL;
160
161   src = casereader_get_casefile (reader);
162   if (casefile_get_case_cnt (src) > 1 && !casefile_in_core (src))
163     return NULL;
164       
165   case_cnt = casefile_get_case_cnt (src);
166   dst = fastfile_create (casefile_get_value_cnt (src));
167   if (case_cnt != 0) 
168     {
169       struct indexed_case *cases = nmalloc (sizeof *cases, case_cnt);
170       if (cases != NULL) 
171         {
172           unsigned long i;
173           
174           for (i = 0; i < case_cnt; i++)
175             {
176               bool ok = casereader_read_xfer (reader, &cases[i].c);
177               if (!ok)
178                 NOT_REACHED ();
179               cases[i].idx = i;
180             }
181
182           sort (cases, case_cnt, sizeof *cases, compare_indexed_cases,
183                 (void *) criteria);
184       
185           for (i = 0; i < case_cnt; i++)
186             casefile_append_xfer (dst, &cases[i].c);
187           if (casefile_error (dst))
188             NOT_REACHED ();
189
190           free (cases);
191         }
192       else 
193         {
194           /* Failure. */
195           casefile_destroy (dst);
196           dst = NULL;
197         }
198     }
199
200   return dst;
201 }
202
203 /* Compares the variables specified by CRITERIA between the cases
204    at A and B, with a "last resort" comparison for stability, and
205    returns a strcmp()-type result. */
206 static int
207 compare_indexed_cases (const void *a_, const void *b_, void *criteria_)
208 {
209   struct sort_criteria *criteria = criteria_;
210   const struct indexed_case *a = a_;
211   const struct indexed_case *b = b_;
212   int result = compare_record (&a->c, &b->c, criteria);
213   if (result == 0)
214     result = a->idx < b->idx ? -1 : a->idx > b->idx;
215   return result;
216 }
217 \f
218 /* External sort. */
219
220 /* Maximum order of merge (external sort only).  The maximum
221    reasonable value is about 7.  Above that, it would be a good
222    idea to use a heap in merge_once() to select the minimum. */
223 #define MAX_MERGE_ORDER 7
224
225 /* Results of an external sort. */
226 struct external_sort 
227   {
228     const struct sort_criteria *criteria; /* Sort criteria. */
229     size_t value_cnt;                 /* Size of data in `union value's. */
230     struct casefile **runs;           /* Array of initial runs. */
231     size_t run_cnt, run_cap;          /* Number of runs, allocated capacity. */
232   };
233
234 /* Prototypes for helper functions. */
235 static int write_runs (struct external_sort *, struct casereader *);
236 static struct casefile *merge (struct external_sort *);
237 static void destroy_external_sort (struct external_sort *);
238
239 /* Performs a stable external sort of the active file according
240    to the specification in SCP.  Forms initial runs using a heap
241    as a reservoir.  Merges the initial runs according to a
242    pattern that assures stability. */
243 static struct casefile *
244 do_external_sort (struct casereader *reader,
245                   const struct sort_criteria *criteria)
246 {
247   struct external_sort *xsrt;
248
249   if (!casefile_to_disk (casereader_get_casefile (reader)))
250     return NULL;
251
252   xsrt = xmalloc (sizeof *xsrt);
253   xsrt->criteria = criteria;
254   xsrt->value_cnt = casefile_get_value_cnt (casereader_get_casefile (reader));
255   xsrt->run_cap = 512;
256   xsrt->run_cnt = 0;
257   xsrt->runs = xnmalloc (xsrt->run_cap, sizeof *xsrt->runs);
258   if (write_runs (xsrt, reader))
259     {
260       struct casefile *output = merge (xsrt);
261       destroy_external_sort (xsrt);
262       return output;
263     }
264   else
265     {
266       destroy_external_sort (xsrt);
267       return NULL;
268     }
269 }
270
271 /* Destroys XSRT. */
272 static void
273 destroy_external_sort (struct external_sort *xsrt) 
274 {
275   if (xsrt != NULL) 
276     {
277       int i;
278       
279       for (i = 0; i < xsrt->run_cnt; i++)
280         casefile_destroy (xsrt->runs[i]);
281       free (xsrt->runs);
282       free (xsrt);
283     }
284 }
285 \f
286 /* Replacement selection. */
287
288 /* Pairs a record with a run number. */
289 struct record_run
290   {
291     int run;                    /* Run number of case. */
292     struct ccase record;        /* Case data. */
293     size_t idx;                 /* Case number (for stability). */
294   };
295
296 /* Represents a set of initial runs during an external sort. */
297 struct initial_run_state 
298   {
299     struct external_sort *xsrt;
300
301     /* Reservoir. */
302     struct record_run *records; /* Records arranged as a heap. */
303     size_t record_cnt;          /* Current number of records. */
304     size_t record_cap;          /* Capacity for records. */
305     
306     /* Run currently being output. */
307     int run;                    /* Run number. */
308     size_t case_cnt;            /* Number of cases so far. */
309     struct casefile *casefile;  /* Output file. */
310     struct ccase last_output;   /* Record last output. */
311
312     int okay;                   /* Zero if an error has been encountered. */
313   };
314
315 static bool destroy_initial_run_state (struct initial_run_state *);
316 static void process_case (struct initial_run_state *, const struct ccase *,
317                           size_t);
318 static int allocate_cases (struct initial_run_state *);
319 static void output_record (struct initial_run_state *);
320 static void start_run (struct initial_run_state *);
321 static void end_run (struct initial_run_state *);
322 static int compare_record_run (const struct record_run *,
323                                const struct record_run *,
324                                struct initial_run_state *);
325 static int compare_record_run_minheap (const void *, const void *, void *);
326
327 /* Reads cases from READER and composes initial runs in XSRT. */
328 static int
329 write_runs (struct external_sort *xsrt, struct casereader *reader)
330 {
331   struct initial_run_state *irs;
332   struct ccase c;
333   size_t idx = 0;
334   int success = 0;
335
336   /* Allocate memory for cases. */
337   irs = xmalloc (sizeof *irs);
338   irs->xsrt = xsrt;
339   irs->records = NULL;
340   irs->record_cnt = irs->record_cap = 0;
341   irs->run = 0;
342   irs->case_cnt = 0;
343   irs->casefile = NULL;
344   case_nullify (&irs->last_output);
345   irs->okay = 1;
346   if (!allocate_cases (irs)) 
347     goto done;
348
349   /* Create initial runs. */
350   start_run (irs);
351   for (; irs->okay && casereader_read (reader, &c); case_destroy (&c))
352     process_case (irs, &c, idx++);
353   while (irs->okay && irs->record_cnt > 0)
354     output_record (irs);
355   end_run (irs);
356
357   success = irs->okay;
358
359  done:
360   if (!destroy_initial_run_state (irs))
361     success = false;
362
363   return success;
364 }
365
366 /* Add a single case to an initial run. */
367 static void
368 process_case (struct initial_run_state *irs, const struct ccase *c, size_t idx)
369 {
370   struct record_run *rr;
371
372   /* Compose record_run for this run and add to heap. */
373   assert (irs->record_cnt < irs->record_cap - 1);
374   rr = irs->records + irs->record_cnt++;
375   case_copy (&rr->record, 0, c, 0, irs->xsrt->value_cnt);
376   rr->run = irs->run;
377   rr->idx = idx;
378   if (!case_is_null (&irs->last_output)
379       && compare_record (c, &irs->last_output, irs->xsrt->criteria) < 0)
380     rr->run = irs->run + 1;
381   push_heap (irs->records, irs->record_cnt, sizeof *irs->records,
382              compare_record_run_minheap, irs);
383
384   /* Output a record if the reservoir is full. */
385   if (irs->record_cnt == irs->record_cap - 1 && irs->okay)
386     output_record (irs);
387 }
388
389 /* Destroys the initial run state represented by IRS.
390    Returns true if successful, false if an I/O error occurred. */
391 static bool
392 destroy_initial_run_state (struct initial_run_state *irs) 
393 {
394   int i;
395   bool ok = true;
396
397   if (irs == NULL)
398     return true;
399
400   for (i = 0; i < irs->record_cap; i++)
401     case_destroy (&irs->records[i].record);
402   free (irs->records);
403
404   if (irs->casefile != NULL)
405     ok = casefile_sleep (irs->casefile);
406
407   free (irs);
408   return ok;
409 }
410
411 /* Allocates room for lots of cases as a buffer. */
412 static int
413 allocate_cases (struct initial_run_state *irs)
414 {
415   int approx_case_cost; /* Approximate memory cost of one case in bytes. */
416   int max_cases;        /* Maximum number of cases to allocate. */
417   int i;
418
419   /* Allocate as many cases as we can within the workspace
420      limit. */
421   approx_case_cost = (sizeof *irs->records
422                       + irs->xsrt->value_cnt * sizeof (union value)
423                       + 4 * sizeof (void *));
424   max_cases = get_workspace() / approx_case_cost;
425   if (max_cases > max_buffers)
426     max_cases = max_buffers;
427   irs->records = nmalloc (sizeof *irs->records, max_cases);
428   if (irs->records != NULL)
429     for (i = 0; i < max_cases; i++)
430       if (!case_try_create (&irs->records[i].record, irs->xsrt->value_cnt))
431         {
432           max_cases = i;
433           break;
434         }
435   irs->record_cap = max_cases;
436
437   /* Fail if we didn't allocate an acceptable number of cases. */
438   if (irs->records == NULL || max_cases < min_buffers)
439     {
440       msg (SE, _("Out of memory.  Could not allocate room for minimum of %d "
441                  "cases of %d bytes each.  (PSPP workspace is currently "
442                  "restricted to a maximum of %d KB.)"),
443            min_buffers, approx_case_cost, get_workspace() / 1024);
444       return 0;
445     }
446   return 1;
447 }
448
449 /* Compares the VAR_CNT variables in VARS[] between the `value's at
450    A and B, and returns a strcmp()-type result. */
451 static int
452 compare_record (const struct ccase *a, const struct ccase *b,
453                 const struct sort_criteria *criteria)
454 {
455   int i;
456
457   assert (a != NULL);
458   assert (b != NULL);
459   
460   for (i = 0; i < criteria->crit_cnt; i++)
461     {
462       const struct sort_criterion *c = &criteria->crits[i];
463       int result;
464       
465       if (c->width == 0)
466         {
467           double af = case_num (a, c->fv);
468           double bf = case_num (b, c->fv);
469           
470           result = af < bf ? -1 : af > bf;
471         }
472       else
473         result = memcmp (case_str (a, c->fv), case_str (b, c->fv), c->width);
474
475       if (result != 0)
476         return c->dir == SRT_ASCEND ? result : -result;
477     }
478
479   return 0;
480 }
481
482 /* Compares record-run tuples A and B on run number first, then
483    on record, then on case index. */
484 static int
485 compare_record_run (const struct record_run *a,
486                     const struct record_run *b,
487                     struct initial_run_state *irs)
488 {
489   int result = a->run < b->run ? -1 : a->run > b->run;
490   if (result == 0)
491     result = compare_record (&a->record, &b->record, irs->xsrt->criteria);
492   if (result == 0)
493     result = a->idx < b->idx ? -1 : a->idx > b->idx;
494   return result;
495 }
496
497 /* Compares record-run tuples A and B on run number first, then
498    on the current record according to SCP, but in descending
499    order. */
500 static int
501 compare_record_run_minheap (const void *a, const void *b, void *irs) 
502 {
503   return -compare_record_run (a, b, irs);
504 }
505
506 /* Begins a new initial run, specifically its output file. */
507 static void
508 start_run (struct initial_run_state *irs)
509 {
510   irs->run++;
511   irs->case_cnt = 0;
512   irs->casefile = fastfile_create (irs->xsrt->value_cnt);
513   casefile_to_disk (irs->casefile);
514   case_nullify (&irs->last_output); 
515 }
516
517 /* Ends the current initial run.  */
518 static void
519 end_run (struct initial_run_state *irs)
520 {
521   struct external_sort *xsrt = irs->xsrt;
522
523   /* Record initial run. */
524   if (irs->casefile != NULL) 
525     {
526       casefile_sleep (irs->casefile);
527       if (xsrt->run_cnt >= xsrt->run_cap) 
528         {
529           xsrt->run_cap *= 2;
530           xsrt->runs = xnrealloc (xsrt->runs,
531                                   xsrt->run_cap, sizeof *xsrt->runs);
532         }
533       xsrt->runs[xsrt->run_cnt++] = irs->casefile;
534       if (casefile_error (irs->casefile))
535         irs->okay = false;
536       irs->casefile = NULL; 
537     }
538 }
539
540 /* Writes a record to the current initial run. */
541 static void
542 output_record (struct initial_run_state *irs)
543 {
544   struct record_run *record_run;
545   struct ccase case_tmp;
546   
547   /* Extract minimum case from heap. */
548   assert (irs->record_cnt > 0);
549   pop_heap (irs->records, irs->record_cnt--, sizeof *irs->records,
550             compare_record_run_minheap, irs);
551   record_run = irs->records + irs->record_cnt;
552
553   /* Bail if an error has occurred. */
554   if (!irs->okay)
555     return;
556
557   /* Start new run if necessary. */
558   assert (record_run->run == irs->run
559           || record_run->run == irs->run + 1);
560   if (record_run->run != irs->run)
561     {
562       end_run (irs);
563       start_run (irs);
564     }
565   assert (record_run->run == irs->run);
566   irs->case_cnt++;
567
568   /* Write to disk. */
569   if (irs->casefile != NULL)
570     casefile_append (irs->casefile, &record_run->record);
571
572   /* This record becomes last_output. */
573   irs->last_output = case_tmp = record_run->record;
574   record_run->record = irs->records[irs->record_cap - 1].record;
575   irs->records[irs->record_cap - 1].record = case_tmp;
576 }
577 \f
578 /* Merging. */
579
580 static int choose_merge (struct casefile *runs[], int run_cnt, int order);
581 static struct casefile *merge_once (struct external_sort *,
582                                     struct casefile *[], size_t);
583
584 /* Repeatedly merges run until only one is left,
585    and returns the final casefile.
586    Returns a null pointer if an I/O error occurs. */
587 static struct casefile *
588 merge (struct external_sort *xsrt)
589 {
590   while (xsrt->run_cnt > 1)
591     {
592       int order = min (MAX_MERGE_ORDER, xsrt->run_cnt);
593       int idx = choose_merge (xsrt->runs, xsrt->run_cnt, order);
594       xsrt->runs[idx] = merge_once (xsrt, xsrt->runs + idx, order);
595       remove_range (xsrt->runs, xsrt->run_cnt, sizeof *xsrt->runs,
596                     idx + 1, order - 1);
597       xsrt->run_cnt -= order - 1;
598
599       if (xsrt->runs[idx] == NULL)
600         return NULL;
601     }
602   assert (xsrt->run_cnt == 1);
603   xsrt->run_cnt = 0;
604   return xsrt->runs[0];
605 }
606
607 /* Chooses ORDER runs out of the RUN_CNT runs in RUNS to merge,
608    and returns the index of the first one.
609
610    For stability, we must merge only consecutive runs.  For
611    efficiency, we choose the shortest consecutive sequence of
612    runs. */
613 static int
614 choose_merge (struct casefile *runs[], int run_cnt, int order) 
615 {
616   int min_idx, min_sum;
617   int cur_idx, cur_sum;
618   int i;
619
620   /* Sum up the length of the first ORDER runs. */
621   cur_sum = 0;
622   for (i = 0; i < order; i++)
623     cur_sum += casefile_get_case_cnt (runs[i]);
624
625   /* Find the shortest group of ORDER runs,
626      using a running total for efficiency. */
627   min_idx = 0;
628   min_sum = cur_sum;
629   for (cur_idx = 1; cur_idx + order <= run_cnt; cur_idx++)
630     {
631       cur_sum -= casefile_get_case_cnt (runs[cur_idx - 1]);
632       cur_sum += casefile_get_case_cnt (runs[cur_idx + order - 1]);
633       if (cur_sum < min_sum)
634         {
635           min_sum = cur_sum;
636           min_idx = cur_idx;
637         }
638     }
639
640   return min_idx;
641 }
642
643 /* Merges the RUN_CNT initial runs specified in INPUT_FILES into a
644    new run, and returns the new run.
645    Returns a null pointer if an I/O error occurs. */
646 static struct casefile *
647 merge_once (struct external_sort *xsrt,
648             struct casefile **const input_files,
649             size_t run_cnt)
650 {
651   struct run
652     {
653       struct casefile *file;
654       struct casereader *reader;
655       struct ccase ccase;
656     }
657   *runs;
658
659   struct casefile *output = NULL;
660   int i;
661
662   /* Open input files. */
663   runs = xnmalloc (run_cnt, sizeof *runs);
664   for (i = 0; i < run_cnt; i++) 
665     {
666       struct run *r = &runs[i];
667       r->file = input_files[i];
668       r->reader = casefile_get_destructive_reader (r->file);
669       if (!casereader_read_xfer (r->reader, &r->ccase))
670         {
671           run_cnt--;
672           i--;
673         }
674     }
675
676   /* Create output file. */
677   output = fastfile_create (xsrt->value_cnt);
678   casefile_to_disk (output);
679
680   /* Merge. */
681   while (run_cnt > 0) 
682     {
683       struct run *min_run, *run;
684       
685       /* Find minimum. */
686       min_run = runs;
687       for (run = runs + 1; run < runs + run_cnt; run++)
688         if (compare_record (&run->ccase, &min_run->ccase, xsrt->criteria) < 0)
689           min_run = run;
690
691       /* Write minimum to output file. */
692       casefile_append_xfer (output, &min_run->ccase);
693
694       /* Read another case from minimum run. */
695       if (!casereader_read_xfer (min_run->reader, &min_run->ccase))
696         {
697           if (casefile_error (min_run->file) || casefile_error (output))
698             goto error;
699           casereader_destroy (min_run->reader);
700           casefile_destroy (min_run->file);
701
702           remove_element (runs, run_cnt, sizeof *runs, min_run - runs);
703           run_cnt--;
704         } 
705     }
706
707   if (!casefile_sleep (output))
708     goto error;
709   free (runs);
710
711   return output;
712
713  error:
714   for (i = 0; i < run_cnt; i++) 
715     casefile_destroy (runs[i].file);
716   casefile_destroy (output);
717   free (runs);
718   return NULL;
719 }