Eliminated global variable current_dataset.
[pspp-builds.git] / src / math / sort.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21
22 #include "sort.h"
23
24 #include <errno.h>
25 #include <limits.h>
26 #include <stdbool.h>
27 #include <stdio.h>
28 #include <stdlib.h>
29
30 #include <data/case-source.h>
31 #include <data/case.h>
32 #include <data/casefile.h>
33 #include <data/fastfile.h>
34 #include <data/procedure.h>
35 #include <data/settings.h>
36 #include <data/variable.h>
37 #include <data/storage-stream.h>
38 #include <language/expressions/public.h>
39 #include <libpspp/alloc.h>
40 #include <libpspp/array.h>
41 #include <libpspp/assertion.h>
42 #include <libpspp/message.h>
43 #include <libpspp/message.h>
44 #include <libpspp/misc.h>
45 #include <libpspp/str.h>
46
47 #include "gettext.h"
48 #define _(msgid) gettext (msgid)
49
50 /* These should only be changed for testing purposes. */
51 int min_buffers = 64;
52 int max_buffers = INT_MAX;
53 bool allow_internal_sort = true;
54
55 static int compare_record (const struct ccase *, const struct ccase *,
56                            const struct sort_criteria *);
57 static struct casefile *do_internal_sort (struct casereader *,
58                                           const struct sort_criteria *);
59 static struct casefile *do_external_sort (struct casereader *,
60                                           const struct sort_criteria *);
61
62
63 /* Sorts the active file in-place according to CRITERIA.
64    Returns true if successful. */
65 bool
66 sort_active_file_in_place (struct dataset *ds, 
67                            const struct sort_criteria *criteria) 
68 {
69   struct casefile *in, *out;
70
71   proc_cancel_temporary_transformations (ds);
72   if (!procedure (ds, NULL, NULL))
73     return false;
74   
75   in = proc_capture_output (ds);
76   out = sort_execute (casefile_get_destructive_reader (in), criteria);
77   if (out == NULL) 
78     return false;
79
80   proc_set_source (ds, storage_source_create (out));
81   return true;
82 }
83
84 /* Data passed to sort_to_casefile_callback(). */
85 struct sort_to_casefile_cb_data 
86   {
87     const struct sort_criteria *criteria;
88     struct casefile *output;
89   };
90
91 /* Sorts casefile CF according to the criteria in CB_DATA. */
92 static bool
93 sort_to_casefile_callback (const struct casefile *cf, void *cb_data_) 
94 {
95   struct sort_to_casefile_cb_data *cb_data = cb_data_;
96   cb_data->output = sort_execute (casefile_get_reader (cf), cb_data->criteria);
97   return cb_data->output != NULL;
98 }
99
100 /* Sorts the active file to a separate casefile.  If successful,
101    returns the sorted casefile.  Returns a null pointer on
102    failure. */
103 struct casefile *
104 sort_active_file_to_casefile (struct dataset *ds, 
105                               const struct sort_criteria *criteria) 
106 {
107   struct sort_to_casefile_cb_data cb_data;
108   
109   proc_cancel_temporary_transformations (ds);
110
111   cb_data.criteria = criteria;
112   cb_data.output = NULL;
113   if (!multipass_procedure (ds, sort_to_casefile_callback, &cb_data)) 
114     {
115       casefile_destroy (cb_data.output);
116       return NULL;
117     }
118   return cb_data.output;
119 }
120
121
122 /* Reads all the cases from READER, which is destroyed.  Sorts
123    the cases according to CRITERIA.  Returns the sorted cases in
124    a newly created casefile. */
125 struct casefile *
126 sort_execute (struct casereader *reader, const struct sort_criteria *criteria)
127 {
128   struct casefile *output = do_internal_sort (reader, criteria);
129   if (output == NULL)
130     output = do_external_sort (reader, criteria);
131   casereader_destroy (reader);
132   return output;
133 }
134 \f
135 /* A case and its index. */
136 struct indexed_case 
137   {
138     struct ccase c;     /* Case. */
139     unsigned long idx;  /* Index to allow for stable sorting. */
140   };
141
142 static int compare_indexed_cases (const void *, const void *, void *);
143
144 /* If the data is in memory, do an internal sort and return a new
145    casefile for the data.  Otherwise, return a null pointer. */
146 static struct casefile *
147 do_internal_sort (struct casereader *reader,
148                   const struct sort_criteria *criteria)
149 {
150   const struct casefile *src;
151   struct casefile *dst;
152   unsigned long case_cnt;
153
154   if (!allow_internal_sort)
155     return NULL;
156
157   src = casereader_get_casefile (reader);
158   if (casefile_get_case_cnt (src) > 1 && !casefile_in_core (src))
159     return NULL;
160       
161   case_cnt = casefile_get_case_cnt (src);
162   dst = fastfile_create (casefile_get_value_cnt (src));
163   if (case_cnt != 0) 
164     {
165       struct indexed_case *cases = nmalloc (sizeof *cases, case_cnt);
166       if (cases != NULL) 
167         {
168           unsigned long i;
169           
170           for (i = 0; i < case_cnt; i++)
171             {
172               bool ok = casereader_read_xfer (reader, &cases[i].c);
173               if (!ok)
174                 NOT_REACHED ();
175               cases[i].idx = i;
176             }
177
178           sort (cases, case_cnt, sizeof *cases, compare_indexed_cases,
179                 (void *) criteria);
180       
181           for (i = 0; i < case_cnt; i++)
182             casefile_append_xfer (dst, &cases[i].c);
183           if (casefile_error (dst))
184             NOT_REACHED ();
185
186           free (cases);
187         }
188       else 
189         {
190           /* Failure. */
191           casefile_destroy (dst);
192           dst = NULL;
193         }
194     }
195
196   return dst;
197 }
198
199 /* Compares the variables specified by CRITERIA between the cases
200    at A and B, with a "last resort" comparison for stability, and
201    returns a strcmp()-type result. */
202 static int
203 compare_indexed_cases (const void *a_, const void *b_, void *criteria_)
204 {
205   struct sort_criteria *criteria = criteria_;
206   const struct indexed_case *a = a_;
207   const struct indexed_case *b = b_;
208   int result = compare_record (&a->c, &b->c, criteria);
209   if (result == 0)
210     result = a->idx < b->idx ? -1 : a->idx > b->idx;
211   return result;
212 }
213 \f
214 /* External sort. */
215
216 /* Maximum order of merge (external sort only).  The maximum
217    reasonable value is about 7.  Above that, it would be a good
218    idea to use a heap in merge_once() to select the minimum. */
219 #define MAX_MERGE_ORDER 7
220
221 /* Results of an external sort. */
222 struct external_sort 
223   {
224     const struct sort_criteria *criteria; /* Sort criteria. */
225     size_t value_cnt;                 /* Size of data in `union value's. */
226     struct casefile **runs;           /* Array of initial runs. */
227     size_t run_cnt, run_cap;          /* Number of runs, allocated capacity. */
228   };
229
230 /* Prototypes for helper functions. */
231 static int write_runs (struct external_sort *, struct casereader *);
232 static struct casefile *merge (struct external_sort *);
233 static void destroy_external_sort (struct external_sort *);
234
235 /* Performs a stable external sort of the active file according
236    to the specification in SCP.  Forms initial runs using a heap
237    as a reservoir.  Merges the initial runs according to a
238    pattern that assures stability. */
239 static struct casefile *
240 do_external_sort (struct casereader *reader,
241                   const struct sort_criteria *criteria)
242 {
243   struct external_sort *xsrt;
244
245   if (!casefile_to_disk (casereader_get_casefile (reader)))
246     return NULL;
247
248   xsrt = xmalloc (sizeof *xsrt);
249   xsrt->criteria = criteria;
250   xsrt->value_cnt = casefile_get_value_cnt (casereader_get_casefile (reader));
251   xsrt->run_cap = 512;
252   xsrt->run_cnt = 0;
253   xsrt->runs = xnmalloc (xsrt->run_cap, sizeof *xsrt->runs);
254   if (write_runs (xsrt, reader))
255     {
256       struct casefile *output = merge (xsrt);
257       destroy_external_sort (xsrt);
258       return output;
259     }
260   else
261     {
262       destroy_external_sort (xsrt);
263       return NULL;
264     }
265 }
266
267 /* Destroys XSRT. */
268 static void
269 destroy_external_sort (struct external_sort *xsrt) 
270 {
271   if (xsrt != NULL) 
272     {
273       int i;
274       
275       for (i = 0; i < xsrt->run_cnt; i++)
276         casefile_destroy (xsrt->runs[i]);
277       free (xsrt->runs);
278       free (xsrt);
279     }
280 }
281 \f
282 /* Replacement selection. */
283
284 /* Pairs a record with a run number. */
285 struct record_run
286   {
287     int run;                    /* Run number of case. */
288     struct ccase record;        /* Case data. */
289     size_t idx;                 /* Case number (for stability). */
290   };
291
292 /* Represents a set of initial runs during an external sort. */
293 struct initial_run_state 
294   {
295     struct external_sort *xsrt;
296
297     /* Reservoir. */
298     struct record_run *records; /* Records arranged as a heap. */
299     size_t record_cnt;          /* Current number of records. */
300     size_t record_cap;          /* Capacity for records. */
301     
302     /* Run currently being output. */
303     int run;                    /* Run number. */
304     size_t case_cnt;            /* Number of cases so far. */
305     struct casefile *casefile;  /* Output file. */
306     struct ccase last_output;   /* Record last output. */
307
308     int okay;                   /* Zero if an error has been encountered. */
309   };
310
311 static bool destroy_initial_run_state (struct initial_run_state *);
312 static void process_case (struct initial_run_state *, const struct ccase *,
313                           size_t);
314 static int allocate_cases (struct initial_run_state *);
315 static void output_record (struct initial_run_state *);
316 static void start_run (struct initial_run_state *);
317 static void end_run (struct initial_run_state *);
318 static int compare_record_run (const struct record_run *,
319                                const struct record_run *,
320                                struct initial_run_state *);
321 static int compare_record_run_minheap (const void *, const void *, void *);
322
323 /* Reads cases from READER and composes initial runs in XSRT. */
324 static int
325 write_runs (struct external_sort *xsrt, struct casereader *reader)
326 {
327   struct initial_run_state *irs;
328   struct ccase c;
329   size_t idx = 0;
330   int success = 0;
331
332   /* Allocate memory for cases. */
333   irs = xmalloc (sizeof *irs);
334   irs->xsrt = xsrt;
335   irs->records = NULL;
336   irs->record_cnt = irs->record_cap = 0;
337   irs->run = 0;
338   irs->case_cnt = 0;
339   irs->casefile = NULL;
340   case_nullify (&irs->last_output);
341   irs->okay = 1;
342   if (!allocate_cases (irs)) 
343     goto done;
344
345   /* Create initial runs. */
346   start_run (irs);
347   for (; irs->okay && casereader_read (reader, &c); case_destroy (&c))
348     process_case (irs, &c, idx++);
349   while (irs->okay && irs->record_cnt > 0)
350     output_record (irs);
351   end_run (irs);
352
353   success = irs->okay;
354
355  done:
356   if (!destroy_initial_run_state (irs))
357     success = false;
358
359   return success;
360 }
361
362 /* Add a single case to an initial run. */
363 static void
364 process_case (struct initial_run_state *irs, const struct ccase *c, size_t idx)
365 {
366   struct record_run *rr;
367
368   /* Compose record_run for this run and add to heap. */
369   assert (irs->record_cnt < irs->record_cap - 1);
370   rr = irs->records + irs->record_cnt++;
371   case_copy (&rr->record, 0, c, 0, irs->xsrt->value_cnt);
372   rr->run = irs->run;
373   rr->idx = idx;
374   if (!case_is_null (&irs->last_output)
375       && compare_record (c, &irs->last_output, irs->xsrt->criteria) < 0)
376     rr->run = irs->run + 1;
377   push_heap (irs->records, irs->record_cnt, sizeof *irs->records,
378              compare_record_run_minheap, irs);
379
380   /* Output a record if the reservoir is full. */
381   if (irs->record_cnt == irs->record_cap - 1 && irs->okay)
382     output_record (irs);
383 }
384
385 /* Destroys the initial run state represented by IRS.
386    Returns true if successful, false if an I/O error occurred. */
387 static bool
388 destroy_initial_run_state (struct initial_run_state *irs) 
389 {
390   int i;
391   bool ok = true;
392
393   if (irs == NULL)
394     return true;
395
396   for (i = 0; i < irs->record_cap; i++)
397     case_destroy (&irs->records[i].record);
398   free (irs->records);
399
400   if (irs->casefile != NULL)
401     ok = casefile_sleep (irs->casefile);
402
403   free (irs);
404   return ok;
405 }
406
407 /* Allocates room for lots of cases as a buffer. */
408 static int
409 allocate_cases (struct initial_run_state *irs)
410 {
411   int approx_case_cost; /* Approximate memory cost of one case in bytes. */
412   int max_cases;        /* Maximum number of cases to allocate. */
413   int i;
414
415   /* Allocate as many cases as we can within the workspace
416      limit. */
417   approx_case_cost = (sizeof *irs->records
418                       + irs->xsrt->value_cnt * sizeof (union value)
419                       + 4 * sizeof (void *));
420   max_cases = get_workspace() / approx_case_cost;
421   if (max_cases > max_buffers)
422     max_cases = max_buffers;
423   irs->records = nmalloc (sizeof *irs->records, max_cases);
424   if (irs->records != NULL)
425     for (i = 0; i < max_cases; i++)
426       if (!case_try_create (&irs->records[i].record, irs->xsrt->value_cnt))
427         {
428           max_cases = i;
429           break;
430         }
431   irs->record_cap = max_cases;
432
433   /* Fail if we didn't allocate an acceptable number of cases. */
434   if (irs->records == NULL || max_cases < min_buffers)
435     {
436       msg (SE, _("Out of memory.  Could not allocate room for minimum of %d "
437                  "cases of %d bytes each.  (PSPP workspace is currently "
438                  "restricted to a maximum of %d KB.)"),
439            min_buffers, approx_case_cost, get_workspace() / 1024);
440       return 0;
441     }
442   return 1;
443 }
444
445 /* Compares the VAR_CNT variables in VARS[] between the `value's at
446    A and B, and returns a strcmp()-type result. */
447 static int
448 compare_record (const struct ccase *a, const struct ccase *b,
449                 const struct sort_criteria *criteria)
450 {
451   int i;
452
453   assert (a != NULL);
454   assert (b != NULL);
455   
456   for (i = 0; i < criteria->crit_cnt; i++)
457     {
458       const struct sort_criterion *c = &criteria->crits[i];
459       int result;
460       
461       if (c->width == 0)
462         {
463           double af = case_num (a, c->fv);
464           double bf = case_num (b, c->fv);
465           
466           result = af < bf ? -1 : af > bf;
467         }
468       else
469         result = memcmp (case_str (a, c->fv), case_str (b, c->fv), c->width);
470
471       if (result != 0)
472         return c->dir == SRT_ASCEND ? result : -result;
473     }
474
475   return 0;
476 }
477
478 /* Compares record-run tuples A and B on run number first, then
479    on record, then on case index. */
480 static int
481 compare_record_run (const struct record_run *a,
482                     const struct record_run *b,
483                     struct initial_run_state *irs)
484 {
485   int result = a->run < b->run ? -1 : a->run > b->run;
486   if (result == 0)
487     result = compare_record (&a->record, &b->record, irs->xsrt->criteria);
488   if (result == 0)
489     result = a->idx < b->idx ? -1 : a->idx > b->idx;
490   return result;
491 }
492
493 /* Compares record-run tuples A and B on run number first, then
494    on the current record according to SCP, but in descending
495    order. */
496 static int
497 compare_record_run_minheap (const void *a, const void *b, void *irs) 
498 {
499   return -compare_record_run (a, b, irs);
500 }
501
502 /* Begins a new initial run, specifically its output file. */
503 static void
504 start_run (struct initial_run_state *irs)
505 {
506   irs->run++;
507   irs->case_cnt = 0;
508   irs->casefile = fastfile_create (irs->xsrt->value_cnt);
509   casefile_to_disk (irs->casefile);
510   case_nullify (&irs->last_output); 
511 }
512
513 /* Ends the current initial run.  */
514 static void
515 end_run (struct initial_run_state *irs)
516 {
517   struct external_sort *xsrt = irs->xsrt;
518
519   /* Record initial run. */
520   if (irs->casefile != NULL) 
521     {
522       casefile_sleep (irs->casefile);
523       if (xsrt->run_cnt >= xsrt->run_cap) 
524         {
525           xsrt->run_cap *= 2;
526           xsrt->runs = xnrealloc (xsrt->runs,
527                                   xsrt->run_cap, sizeof *xsrt->runs);
528         }
529       xsrt->runs[xsrt->run_cnt++] = irs->casefile;
530       if (casefile_error (irs->casefile))
531         irs->okay = false;
532       irs->casefile = NULL; 
533     }
534 }
535
536 /* Writes a record to the current initial run. */
537 static void
538 output_record (struct initial_run_state *irs)
539 {
540   struct record_run *record_run;
541   struct ccase case_tmp;
542   
543   /* Extract minimum case from heap. */
544   assert (irs->record_cnt > 0);
545   pop_heap (irs->records, irs->record_cnt--, sizeof *irs->records,
546             compare_record_run_minheap, irs);
547   record_run = irs->records + irs->record_cnt;
548
549   /* Bail if an error has occurred. */
550   if (!irs->okay)
551     return;
552
553   /* Start new run if necessary. */
554   assert (record_run->run == irs->run
555           || record_run->run == irs->run + 1);
556   if (record_run->run != irs->run)
557     {
558       end_run (irs);
559       start_run (irs);
560     }
561   assert (record_run->run == irs->run);
562   irs->case_cnt++;
563
564   /* Write to disk. */
565   if (irs->casefile != NULL)
566     casefile_append (irs->casefile, &record_run->record);
567
568   /* This record becomes last_output. */
569   irs->last_output = case_tmp = record_run->record;
570   record_run->record = irs->records[irs->record_cap - 1].record;
571   irs->records[irs->record_cap - 1].record = case_tmp;
572 }
573 \f
574 /* Merging. */
575
576 static int choose_merge (struct casefile *runs[], int run_cnt, int order);
577 static struct casefile *merge_once (struct external_sort *,
578                                     struct casefile *[], size_t);
579
580 /* Repeatedly merges run until only one is left,
581    and returns the final casefile.
582    Returns a null pointer if an I/O error occurs. */
583 static struct casefile *
584 merge (struct external_sort *xsrt)
585 {
586   while (xsrt->run_cnt > 1)
587     {
588       int order = min (MAX_MERGE_ORDER, xsrt->run_cnt);
589       int idx = choose_merge (xsrt->runs, xsrt->run_cnt, order);
590       xsrt->runs[idx] = merge_once (xsrt, xsrt->runs + idx, order);
591       remove_range (xsrt->runs, xsrt->run_cnt, sizeof *xsrt->runs,
592                     idx + 1, order - 1);
593       xsrt->run_cnt -= order - 1;
594
595       if (xsrt->runs[idx] == NULL)
596         return NULL;
597     }
598   assert (xsrt->run_cnt == 1);
599   xsrt->run_cnt = 0;
600   return xsrt->runs[0];
601 }
602
603 /* Chooses ORDER runs out of the RUN_CNT runs in RUNS to merge,
604    and returns the index of the first one.
605
606    For stability, we must merge only consecutive runs.  For
607    efficiency, we choose the shortest consecutive sequence of
608    runs. */
609 static int
610 choose_merge (struct casefile *runs[], int run_cnt, int order) 
611 {
612   int min_idx, min_sum;
613   int cur_idx, cur_sum;
614   int i;
615
616   /* Sum up the length of the first ORDER runs. */
617   cur_sum = 0;
618   for (i = 0; i < order; i++)
619     cur_sum += casefile_get_case_cnt (runs[i]);
620
621   /* Find the shortest group of ORDER runs,
622      using a running total for efficiency. */
623   min_idx = 0;
624   min_sum = cur_sum;
625   for (cur_idx = 1; cur_idx + order <= run_cnt; cur_idx++)
626     {
627       cur_sum -= casefile_get_case_cnt (runs[cur_idx - 1]);
628       cur_sum += casefile_get_case_cnt (runs[cur_idx + order - 1]);
629       if (cur_sum < min_sum)
630         {
631           min_sum = cur_sum;
632           min_idx = cur_idx;
633         }
634     }
635
636   return min_idx;
637 }
638
639 /* Merges the RUN_CNT initial runs specified in INPUT_FILES into a
640    new run, and returns the new run.
641    Returns a null pointer if an I/O error occurs. */
642 static struct casefile *
643 merge_once (struct external_sort *xsrt,
644             struct casefile **const input_files,
645             size_t run_cnt)
646 {
647   struct run
648     {
649       struct casefile *file;
650       struct casereader *reader;
651       struct ccase ccase;
652     }
653   *runs;
654
655   struct casefile *output = NULL;
656   int i;
657
658   /* Open input files. */
659   runs = xnmalloc (run_cnt, sizeof *runs);
660   for (i = 0; i < run_cnt; i++) 
661     {
662       struct run *r = &runs[i];
663       r->file = input_files[i];
664       r->reader = casefile_get_destructive_reader (r->file);
665       if (!casereader_read_xfer (r->reader, &r->ccase))
666         {
667           run_cnt--;
668           i--;
669         }
670     }
671
672   /* Create output file. */
673   output = fastfile_create (xsrt->value_cnt);
674   casefile_to_disk (output);
675
676   /* Merge. */
677   while (run_cnt > 0) 
678     {
679       struct run *min_run, *run;
680       
681       /* Find minimum. */
682       min_run = runs;
683       for (run = runs + 1; run < runs + run_cnt; run++)
684         if (compare_record (&run->ccase, &min_run->ccase, xsrt->criteria) < 0)
685           min_run = run;
686
687       /* Write minimum to output file. */
688       casefile_append_xfer (output, &min_run->ccase);
689
690       /* Read another case from minimum run. */
691       if (!casereader_read_xfer (min_run->reader, &min_run->ccase))
692         {
693           if (casefile_error (min_run->file) || casefile_error (output))
694             goto error;
695           casereader_destroy (min_run->reader);
696           casefile_destroy (min_run->file);
697
698           remove_element (runs, run_cnt, sizeof *runs, min_run - runs);
699           run_cnt--;
700         } 
701     }
702
703   if (!casefile_sleep (output))
704     goto error;
705   free (runs);
706
707   return output;
708
709  error:
710   for (i = 0; i < run_cnt; i++) 
711     casefile_destroy (runs[i].file);
712   casefile_destroy (output);
713   free (runs);
714   return NULL;
715 }