Continue reforming procedure execution. In this phase, remove PROCESS
[pspp-builds.git] / src / math / sort.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21
22 #include "sort.h"
23
24 #include <errno.h>
25 #include <limits.h>
26 #include <stdbool.h>
27 #include <stdio.h>
28 #include <stdlib.h>
29
30 #include <data/case-source.h>
31 #include <data/case.h>
32 #include <data/casefile.h>
33 #include <data/procedure.h>
34 #include <data/settings.h>
35 #include <data/variable.h>
36 #include <data/storage-stream.h>
37 #include <language/expressions/public.h>
38 #include <libpspp/alloc.h>
39 #include <libpspp/array.h>
40 #include <libpspp/message.h>
41 #include <libpspp/message.h>
42 #include <libpspp/misc.h>
43 #include <libpspp/str.h>
44
45 #include "gettext.h"
46 #define _(msgid) gettext (msgid)
47
48 /* These should only be changed for testing purposes. */
49 int min_buffers = 64;
50 int max_buffers = INT_MAX;
51 bool allow_internal_sort = true;
52
53 static int compare_record (const struct ccase *, const struct ccase *,
54                            const struct sort_criteria *);
55 static struct casefile *do_internal_sort (struct casereader *,
56                                           const struct sort_criteria *);
57 static struct casefile *do_external_sort (struct casereader *,
58                                           const struct sort_criteria *);
59
60 /* Get ready to sort the active file. */
61 static void
62 prepare_to_sort_active_file (void) 
63 {
64   proc_cancel_temporary_transformations (); 
65 }
66
67 /* Sorts the active file in-place according to CRITERIA.
68    Returns nonzero if successful. */
69 int
70 sort_active_file_in_place (const struct sort_criteria *criteria) 
71 {
72   struct casefile *in, *out;
73
74   prepare_to_sort_active_file ();
75   if (!procedure (NULL, NULL))
76     return 0;
77   
78   in = proc_capture_output ();
79   out = sort_execute (casefile_get_destructive_reader (in), criteria);
80   if (out == NULL) 
81     return 0;
82
83   proc_set_source (storage_source_create (out));
84   return 1;
85 }
86
87 /* Data passed to sort_to_casefile_callback(). */
88 struct sort_to_casefile_cb_data 
89   {
90     const struct sort_criteria *criteria;
91     struct casefile *output;
92   };
93
94 /* Sorts casefile CF according to the criteria in CB_DATA. */
95 static bool
96 sort_to_casefile_callback (const struct casefile *cf, void *cb_data_) 
97 {
98   struct sort_to_casefile_cb_data *cb_data = cb_data_;
99   cb_data->output = sort_execute (casefile_get_reader (cf), cb_data->criteria);
100   return cb_data->output != NULL;
101 }
102
103 /* Sorts the active file to a separate casefile.  If successful,
104    returns the sorted casefile.  Returns a null pointer on
105    failure. */
106 struct casefile *
107 sort_active_file_to_casefile (const struct sort_criteria *criteria) 
108 {
109   struct sort_to_casefile_cb_data cb_data;
110   
111   prepare_to_sort_active_file ();
112
113   cb_data.criteria = criteria;
114   cb_data.output = NULL;
115   multipass_procedure (sort_to_casefile_callback, &cb_data);
116
117   return cb_data.output;
118 }
119
120
121 /* Reads all the cases from READER, which is destroyed.  Sorts
122    the cases according to CRITERIA.  Returns the sorted cases in
123    a newly created casefile. */
124 struct casefile *
125 sort_execute (struct casereader *reader, const struct sort_criteria *criteria)
126 {
127   struct casefile *output = do_internal_sort (reader, criteria);
128   if (output == NULL)
129     output = do_external_sort (reader, criteria);
130   casereader_destroy (reader);
131   return output;
132 }
133 \f
134 /* A case and its index. */
135 struct indexed_case 
136   {
137     struct ccase c;     /* Case. */
138     unsigned long idx;  /* Index to allow for stable sorting. */
139   };
140
141 static int compare_indexed_cases (const void *, const void *, void *);
142
143 /* If the data is in memory, do an internal sort and return a new
144    casefile for the data.  Otherwise, return a null pointer. */
145 static struct casefile *
146 do_internal_sort (struct casereader *reader,
147                   const struct sort_criteria *criteria)
148 {
149   const struct casefile *src;
150   struct casefile *dst;
151   unsigned long case_cnt;
152
153   if (!allow_internal_sort)
154     return NULL;
155
156   src = casereader_get_casefile (reader);
157   if (casefile_get_case_cnt (src) > 1 && !casefile_in_core (src))
158     return NULL;
159       
160   case_cnt = casefile_get_case_cnt (src);
161   dst = casefile_create (casefile_get_value_cnt (src));
162   if (case_cnt != 0) 
163     {
164       struct indexed_case *cases = nmalloc (sizeof *cases, case_cnt);
165       if (cases != NULL) 
166         {
167           unsigned long i;
168           
169           for (i = 0; i < case_cnt; i++)
170             {
171               bool ok = casereader_read_xfer (reader, &cases[i].c);
172               if (!ok)
173                 abort ();
174               cases[i].idx = i;
175             }
176
177           sort (cases, case_cnt, sizeof *cases, compare_indexed_cases,
178                 (void *) criteria);
179       
180           for (i = 0; i < case_cnt; i++)
181             casefile_append_xfer (dst, &cases[i].c);
182           if (casefile_error (dst))
183             abort ();
184
185           free (cases);
186         }
187       else 
188         {
189           /* Failure. */
190           casefile_destroy (dst);
191           dst = NULL;
192         }
193     }
194
195   return dst;
196 }
197
198 /* Compares the variables specified by CRITERIA between the cases
199    at A and B, with a "last resort" comparison for stability, and
200    returns a strcmp()-type result. */
201 static int
202 compare_indexed_cases (const void *a_, const void *b_, void *criteria_)
203 {
204   struct sort_criteria *criteria = criteria_;
205   const struct indexed_case *a = a_;
206   const struct indexed_case *b = b_;
207   int result = compare_record (&a->c, &b->c, criteria);
208   if (result == 0)
209     result = a->idx < b->idx ? -1 : a->idx > b->idx;
210   return result;
211 }
212 \f
213 /* External sort. */
214
215 /* Maximum order of merge (external sort only).  The maximum
216    reasonable value is about 7.  Above that, it would be a good
217    idea to use a heap in merge_once() to select the minimum. */
218 #define MAX_MERGE_ORDER 7
219
220 /* Results of an external sort. */
221 struct external_sort 
222   {
223     const struct sort_criteria *criteria; /* Sort criteria. */
224     size_t value_cnt;                 /* Size of data in `union value's. */
225     struct casefile **runs;           /* Array of initial runs. */
226     size_t run_cnt, run_cap;          /* Number of runs, allocated capacity. */
227   };
228
229 /* Prototypes for helper functions. */
230 static int write_runs (struct external_sort *, struct casereader *);
231 static struct casefile *merge (struct external_sort *);
232 static void destroy_external_sort (struct external_sort *);
233
234 /* Performs a stable external sort of the active file according
235    to the specification in SCP.  Forms initial runs using a heap
236    as a reservoir.  Merges the initial runs according to a
237    pattern that assures stability. */
238 static struct casefile *
239 do_external_sort (struct casereader *reader,
240                   const struct sort_criteria *criteria)
241 {
242   struct external_sort *xsrt;
243
244   if (!casefile_to_disk (casereader_get_casefile (reader)))
245     return NULL;
246
247   xsrt = xmalloc (sizeof *xsrt);
248   xsrt->criteria = criteria;
249   xsrt->value_cnt = casefile_get_value_cnt (casereader_get_casefile (reader));
250   xsrt->run_cap = 512;
251   xsrt->run_cnt = 0;
252   xsrt->runs = xnmalloc (xsrt->run_cap, sizeof *xsrt->runs);
253   if (write_runs (xsrt, reader))
254     {
255       struct casefile *output = merge (xsrt);
256       destroy_external_sort (xsrt);
257       return output;
258     }
259   else
260     {
261       destroy_external_sort (xsrt);
262       return NULL;
263     }
264 }
265
266 /* Destroys XSRT. */
267 static void
268 destroy_external_sort (struct external_sort *xsrt) 
269 {
270   if (xsrt != NULL) 
271     {
272       int i;
273       
274       for (i = 0; i < xsrt->run_cnt; i++)
275         casefile_destroy (xsrt->runs[i]);
276       free (xsrt->runs);
277       free (xsrt);
278     }
279 }
280 \f
281 /* Replacement selection. */
282
283 /* Pairs a record with a run number. */
284 struct record_run
285   {
286     int run;                    /* Run number of case. */
287     struct ccase record;        /* Case data. */
288     size_t idx;                 /* Case number (for stability). */
289   };
290
291 /* Represents a set of initial runs during an external sort. */
292 struct initial_run_state 
293   {
294     struct external_sort *xsrt;
295
296     /* Reservoir. */
297     struct record_run *records; /* Records arranged as a heap. */
298     size_t record_cnt;          /* Current number of records. */
299     size_t record_cap;          /* Capacity for records. */
300     
301     /* Run currently being output. */
302     int run;                    /* Run number. */
303     size_t case_cnt;            /* Number of cases so far. */
304     struct casefile *casefile;  /* Output file. */
305     struct ccase last_output;   /* Record last output. */
306
307     int okay;                   /* Zero if an error has been encountered. */
308   };
309
310 static bool destroy_initial_run_state (struct initial_run_state *);
311 static void process_case (struct initial_run_state *, const struct ccase *,
312                           size_t);
313 static int allocate_cases (struct initial_run_state *);
314 static void output_record (struct initial_run_state *);
315 static void start_run (struct initial_run_state *);
316 static void end_run (struct initial_run_state *);
317 static int compare_record_run (const struct record_run *,
318                                const struct record_run *,
319                                struct initial_run_state *);
320 static int compare_record_run_minheap (const void *, const void *, void *);
321
322 /* Reads cases from READER and composes initial runs in XSRT. */
323 static int
324 write_runs (struct external_sort *xsrt, struct casereader *reader)
325 {
326   struct initial_run_state *irs;
327   struct ccase c;
328   size_t idx = 0;
329   int success = 0;
330
331   /* Allocate memory for cases. */
332   irs = xmalloc (sizeof *irs);
333   irs->xsrt = xsrt;
334   irs->records = NULL;
335   irs->record_cnt = irs->record_cap = 0;
336   irs->run = 0;
337   irs->case_cnt = 0;
338   irs->casefile = NULL;
339   case_nullify (&irs->last_output);
340   irs->okay = 1;
341   if (!allocate_cases (irs)) 
342     goto done;
343
344   /* Create initial runs. */
345   start_run (irs);
346   for (; irs->okay && casereader_read (reader, &c); case_destroy (&c))
347     process_case (irs, &c, idx++);
348   while (irs->okay && irs->record_cnt > 0)
349     output_record (irs);
350   end_run (irs);
351
352   success = irs->okay;
353
354  done:
355   if (!destroy_initial_run_state (irs))
356     success = false;
357
358   return success;
359 }
360
361 /* Add a single case to an initial run. */
362 static void
363 process_case (struct initial_run_state *irs, const struct ccase *c, size_t idx)
364 {
365   struct record_run *rr;
366
367   /* Compose record_run for this run and add to heap. */
368   assert (irs->record_cnt < irs->record_cap - 1);
369   rr = irs->records + irs->record_cnt++;
370   case_copy (&rr->record, 0, c, 0, irs->xsrt->value_cnt);
371   rr->run = irs->run;
372   rr->idx = idx;
373   if (!case_is_null (&irs->last_output)
374       && compare_record (c, &irs->last_output, irs->xsrt->criteria) < 0)
375     rr->run = irs->run + 1;
376   push_heap (irs->records, irs->record_cnt, sizeof *irs->records,
377              compare_record_run_minheap, irs);
378
379   /* Output a record if the reservoir is full. */
380   if (irs->record_cnt == irs->record_cap - 1 && irs->okay)
381     output_record (irs);
382 }
383
384 /* Destroys the initial run state represented by IRS.
385    Returns true if successful, false if an I/O error occurred. */
386 static bool
387 destroy_initial_run_state (struct initial_run_state *irs) 
388 {
389   int i;
390   bool ok = true;
391
392   if (irs == NULL)
393     return true;
394
395   for (i = 0; i < irs->record_cap; i++)
396     case_destroy (&irs->records[i].record);
397   free (irs->records);
398
399   if (irs->casefile != NULL)
400     ok = casefile_sleep (irs->casefile);
401
402   free (irs);
403   return ok;
404 }
405
406 /* Allocates room for lots of cases as a buffer. */
407 static int
408 allocate_cases (struct initial_run_state *irs)
409 {
410   int approx_case_cost; /* Approximate memory cost of one case in bytes. */
411   int max_cases;        /* Maximum number of cases to allocate. */
412   int i;
413
414   /* Allocate as many cases as we can within the workspace
415      limit. */
416   approx_case_cost = (sizeof *irs->records
417                       + irs->xsrt->value_cnt * sizeof (union value)
418                       + 4 * sizeof (void *));
419   max_cases = get_workspace() / approx_case_cost;
420   if (max_cases > max_buffers)
421     max_cases = max_buffers;
422   irs->records = nmalloc (sizeof *irs->records, max_cases);
423   if (irs->records != NULL)
424     for (i = 0; i < max_cases; i++)
425       if (!case_try_create (&irs->records[i].record, irs->xsrt->value_cnt))
426         {
427           max_cases = i;
428           break;
429         }
430   irs->record_cap = max_cases;
431
432   /* Fail if we didn't allocate an acceptable number of cases. */
433   if (irs->records == NULL || max_cases < min_buffers)
434     {
435       msg (SE, _("Out of memory.  Could not allocate room for minimum of %d "
436                  "cases of %d bytes each.  (PSPP workspace is currently "
437                  "restricted to a maximum of %d KB.)"),
438            min_buffers, approx_case_cost, get_workspace() / 1024);
439       return 0;
440     }
441   return 1;
442 }
443
444 /* Compares the VAR_CNT variables in VARS[] between the `value's at
445    A and B, and returns a strcmp()-type result. */
446 static int
447 compare_record (const struct ccase *a, const struct ccase *b,
448                 const struct sort_criteria *criteria)
449 {
450   int i;
451
452   assert (a != NULL);
453   assert (b != NULL);
454   
455   for (i = 0; i < criteria->crit_cnt; i++)
456     {
457       const struct sort_criterion *c = &criteria->crits[i];
458       int result;
459       
460       if (c->width == 0)
461         {
462           double af = case_num (a, c->fv);
463           double bf = case_num (b, c->fv);
464           
465           result = af < bf ? -1 : af > bf;
466         }
467       else
468         result = memcmp (case_str (a, c->fv), case_str (b, c->fv), c->width);
469
470       if (result != 0)
471         return c->dir == SRT_ASCEND ? result : -result;
472     }
473
474   return 0;
475 }
476
477 /* Compares record-run tuples A and B on run number first, then
478    on record, then on case index. */
479 static int
480 compare_record_run (const struct record_run *a,
481                     const struct record_run *b,
482                     struct initial_run_state *irs)
483 {
484   int result = a->run < b->run ? -1 : a->run > b->run;
485   if (result == 0)
486     result = compare_record (&a->record, &b->record, irs->xsrt->criteria);
487   if (result == 0)
488     result = a->idx < b->idx ? -1 : a->idx > b->idx;
489   return result;
490 }
491
492 /* Compares record-run tuples A and B on run number first, then
493    on the current record according to SCP, but in descending
494    order. */
495 static int
496 compare_record_run_minheap (const void *a, const void *b, void *irs) 
497 {
498   return -compare_record_run (a, b, irs);
499 }
500
501 /* Begins a new initial run, specifically its output file. */
502 static void
503 start_run (struct initial_run_state *irs)
504 {
505   irs->run++;
506   irs->case_cnt = 0;
507   irs->casefile = casefile_create (irs->xsrt->value_cnt);
508   casefile_to_disk (irs->casefile);
509   case_nullify (&irs->last_output); 
510 }
511
512 /* Ends the current initial run.  */
513 static void
514 end_run (struct initial_run_state *irs)
515 {
516   struct external_sort *xsrt = irs->xsrt;
517
518   /* Record initial run. */
519   if (irs->casefile != NULL) 
520     {
521       casefile_sleep (irs->casefile);
522       if (xsrt->run_cnt >= xsrt->run_cap) 
523         {
524           xsrt->run_cap *= 2;
525           xsrt->runs = xnrealloc (xsrt->runs,
526                                   xsrt->run_cap, sizeof *xsrt->runs);
527         }
528       xsrt->runs[xsrt->run_cnt++] = irs->casefile;
529       if (casefile_error (irs->casefile))
530         irs->okay = false;
531       irs->casefile = NULL; 
532     }
533 }
534
535 /* Writes a record to the current initial run. */
536 static void
537 output_record (struct initial_run_state *irs)
538 {
539   struct record_run *record_run;
540   struct ccase case_tmp;
541   
542   /* Extract minimum case from heap. */
543   assert (irs->record_cnt > 0);
544   pop_heap (irs->records, irs->record_cnt--, sizeof *irs->records,
545             compare_record_run_minheap, irs);
546   record_run = irs->records + irs->record_cnt;
547
548   /* Bail if an error has occurred. */
549   if (!irs->okay)
550     return;
551
552   /* Start new run if necessary. */
553   assert (record_run->run == irs->run
554           || record_run->run == irs->run + 1);
555   if (record_run->run != irs->run)
556     {
557       end_run (irs);
558       start_run (irs);
559     }
560   assert (record_run->run == irs->run);
561   irs->case_cnt++;
562
563   /* Write to disk. */
564   if (irs->casefile != NULL)
565     casefile_append (irs->casefile, &record_run->record);
566
567   /* This record becomes last_output. */
568   irs->last_output = case_tmp = record_run->record;
569   record_run->record = irs->records[irs->record_cap - 1].record;
570   irs->records[irs->record_cap - 1].record = case_tmp;
571 }
572 \f
573 /* Merging. */
574
575 static int choose_merge (struct casefile *runs[], int run_cnt, int order);
576 static struct casefile *merge_once (struct external_sort *,
577                                     struct casefile *[], size_t);
578
579 /* Repeatedly merges run until only one is left,
580    and returns the final casefile.
581    Returns a null pointer if an I/O error occurs. */
582 static struct casefile *
583 merge (struct external_sort *xsrt)
584 {
585   while (xsrt->run_cnt > 1)
586     {
587       int order = min (MAX_MERGE_ORDER, xsrt->run_cnt);
588       int idx = choose_merge (xsrt->runs, xsrt->run_cnt, order);
589       xsrt->runs[idx] = merge_once (xsrt, xsrt->runs + idx, order);
590       remove_range (xsrt->runs, xsrt->run_cnt, sizeof *xsrt->runs,
591                     idx + 1, order - 1);
592       xsrt->run_cnt -= order - 1;
593
594       if (xsrt->runs[idx] == NULL)
595         return NULL;
596     }
597   assert (xsrt->run_cnt == 1);
598   xsrt->run_cnt = 0;
599   return xsrt->runs[0];
600 }
601
602 /* Chooses ORDER runs out of the RUN_CNT runs in RUNS to merge,
603    and returns the index of the first one.
604
605    For stability, we must merge only consecutive runs.  For
606    efficiency, we choose the shortest consecutive sequence of
607    runs. */
608 static int
609 choose_merge (struct casefile *runs[], int run_cnt, int order) 
610 {
611   int min_idx, min_sum;
612   int cur_idx, cur_sum;
613   int i;
614
615   /* Sum up the length of the first ORDER runs. */
616   cur_sum = 0;
617   for (i = 0; i < order; i++)
618     cur_sum += casefile_get_case_cnt (runs[i]);
619
620   /* Find the shortest group of ORDER runs,
621      using a running total for efficiency. */
622   min_idx = 0;
623   min_sum = cur_sum;
624   for (cur_idx = 1; cur_idx + order <= run_cnt; cur_idx++)
625     {
626       cur_sum -= casefile_get_case_cnt (runs[cur_idx - 1]);
627       cur_sum += casefile_get_case_cnt (runs[cur_idx + order - 1]);
628       if (cur_sum < min_sum)
629         {
630           min_sum = cur_sum;
631           min_idx = cur_idx;
632         }
633     }
634
635   return min_idx;
636 }
637
638 /* Merges the RUN_CNT initial runs specified in INPUT_FILES into a
639    new run, and returns the new run.
640    Returns a null pointer if an I/O error occurs. */
641 static struct casefile *
642 merge_once (struct external_sort *xsrt,
643             struct casefile **const input_files,
644             size_t run_cnt)
645 {
646   struct run
647     {
648       struct casefile *file;
649       struct casereader *reader;
650       struct ccase ccase;
651     }
652   *runs;
653
654   struct casefile *output = NULL;
655   int i;
656
657   /* Open input files. */
658   runs = xnmalloc (run_cnt, sizeof *runs);
659   for (i = 0; i < run_cnt; i++) 
660     {
661       struct run *r = &runs[i];
662       r->file = input_files[i];
663       r->reader = casefile_get_destructive_reader (r->file);
664       if (!casereader_read_xfer (r->reader, &r->ccase))
665         {
666           run_cnt--;
667           i--;
668         }
669     }
670
671   /* Create output file. */
672   output = casefile_create (xsrt->value_cnt);
673   casefile_to_disk (output);
674
675   /* Merge. */
676   while (run_cnt > 0) 
677     {
678       struct run *min_run, *run;
679       
680       /* Find minimum. */
681       min_run = runs;
682       for (run = runs + 1; run < runs + run_cnt; run++)
683         if (compare_record (&run->ccase, &min_run->ccase, xsrt->criteria) < 0)
684           min_run = run;
685
686       /* Write minimum to output file. */
687       casefile_append_xfer (output, &min_run->ccase);
688
689       /* Read another case from minimum run. */
690       if (!casereader_read_xfer (min_run->reader, &min_run->ccase))
691         {
692           if (casefile_error (min_run->file) || casefile_error (output))
693             goto error;
694           casereader_destroy (min_run->reader);
695           casefile_destroy (min_run->file);
696
697           remove_element (runs, run_cnt, sizeof *runs, min_run - runs);
698           run_cnt--;
699         } 
700     }
701
702   if (!casefile_sleep (output))
703     goto error;
704   free (runs);
705
706   return output;
707
708  error:
709   for (i = 0; i < run_cnt; i++) 
710     casefile_destroy (runs[i].file);
711   casefile_destroy (output);
712   free (runs);
713   return NULL;
714 }