Continue reforming procedure execution. In this phase, break
[pspp] / src / math / sort.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21
22 #include "sort.h"
23
24 #include <errno.h>
25 #include <limits.h>
26 #include <stdbool.h>
27 #include <stdio.h>
28 #include <stdlib.h>
29
30 #include <data/case-source.h>
31 #include <data/case.h>
32 #include <data/casefile.h>
33 #include <data/settings.h>
34 #include <data/variable.h>
35 #include <data/storage-stream.h>
36 #include <language/expressions/public.h>
37 #include <libpspp/alloc.h>
38 #include <libpspp/array.h>
39 #include <libpspp/message.h>
40 #include <libpspp/message.h>
41 #include <libpspp/misc.h>
42 #include <libpspp/str.h>
43 #include <procedure.h>
44
45 #include "gettext.h"
46 #define _(msgid) gettext (msgid)
47
48 /* These should only be changed for testing purposes. */
49 int min_buffers = 64;
50 int max_buffers = INT_MAX;
51 bool allow_internal_sort = true;
52
53 static int compare_record (const struct ccase *, const struct ccase *,
54                            const struct sort_criteria *);
55 static struct casefile *do_internal_sort (struct casereader *,
56                                           const struct sort_criteria *);
57 static struct casefile *do_external_sort (struct casereader *,
58                                           const struct sort_criteria *);
59
60 /* Gets ready to sort the active file, either in-place or to a
61    separate casefile. */
62 static bool
63 prepare_to_sort_active_file (void) 
64 {
65   bool ok;
66   
67   /* Cancel temporary transformations and PROCESS IF. */
68   if (temporary != 0)
69     cancel_temporary (); 
70   expr_free (process_if_expr);
71   process_if_expr = NULL;
72
73   /* Make sure source cases are in a storage source. */
74   ok = procedure (NULL, NULL);
75   assert (case_source_is_class (vfm_source, &storage_source_class));
76
77   return ok;
78 }
79
80 /* Sorts the active file in-place according to CRITERIA.
81    Returns nonzero if successful. */
82 int
83 sort_active_file_in_place (const struct sort_criteria *criteria) 
84 {
85   struct casefile *src, *dst;
86   
87   if (!prepare_to_sort_active_file ())
88     return 0;
89
90   src = storage_source_get_casefile (vfm_source);
91   dst = sort_execute (casefile_get_destructive_reader (src), criteria);
92   free_case_source (vfm_source);
93   vfm_source = NULL;
94
95   if (dst == NULL) 
96     return 0;
97
98   vfm_source = storage_source_create (dst);
99   return 1;
100 }
101
102 /* Sorts the active file to a separate casefile.  If successful,
103    returns the sorted casefile.  Returns a null pointer on
104    failure. */
105 struct casefile *
106 sort_active_file_to_casefile (const struct sort_criteria *criteria) 
107 {
108   struct casefile *src;
109   
110   if (!prepare_to_sort_active_file ())
111     return NULL;
112
113   src = storage_source_get_casefile (vfm_source);
114   return sort_execute (casefile_get_reader (src), criteria);
115 }
116
117
118 /* Reads all the cases from READER, which is destroyed.  Sorts
119    the cases according to CRITERIA.  Returns the sorted cases in
120    a newly created casefile. */
121 struct casefile *
122 sort_execute (struct casereader *reader, const struct sort_criteria *criteria)
123 {
124   struct casefile *output = do_internal_sort (reader, criteria);
125   if (output == NULL)
126     output = do_external_sort (reader, criteria);
127   casereader_destroy (reader);
128   return output;
129 }
130 \f
131 /* A case and its index. */
132 struct indexed_case 
133   {
134     struct ccase c;     /* Case. */
135     unsigned long idx;  /* Index to allow for stable sorting. */
136   };
137
138 static int compare_indexed_cases (const void *, const void *, void *);
139
140 /* If the data is in memory, do an internal sort and return a new
141    casefile for the data.  Otherwise, return a null pointer. */
142 static struct casefile *
143 do_internal_sort (struct casereader *reader,
144                   const struct sort_criteria *criteria)
145 {
146   const struct casefile *src;
147   struct casefile *dst;
148   unsigned long case_cnt;
149
150   if (!allow_internal_sort)
151     return NULL;
152
153   src = casereader_get_casefile (reader);
154   if (casefile_get_case_cnt (src) > 1 && !casefile_in_core (src))
155     return NULL;
156       
157   case_cnt = casefile_get_case_cnt (src);
158   dst = casefile_create (casefile_get_value_cnt (src));
159   if (case_cnt != 0) 
160     {
161       struct indexed_case *cases = nmalloc (sizeof *cases, case_cnt);
162       if (cases != NULL) 
163         {
164           unsigned long i;
165           
166           for (i = 0; i < case_cnt; i++)
167             {
168               bool ok = casereader_read_xfer (reader, &cases[i].c);
169               if (!ok)
170                 abort ();
171               cases[i].idx = i;
172             }
173
174           sort (cases, case_cnt, sizeof *cases, compare_indexed_cases,
175                 (void *) criteria);
176       
177           for (i = 0; i < case_cnt; i++)
178             casefile_append_xfer (dst, &cases[i].c);
179           if (casefile_error (dst))
180             abort ();
181
182           free (cases);
183         }
184       else 
185         {
186           /* Failure. */
187           casefile_destroy (dst);
188           dst = NULL;
189         }
190     }
191
192   return dst;
193 }
194
195 /* Compares the variables specified by CRITERIA between the cases
196    at A and B, with a "last resort" comparison for stability, and
197    returns a strcmp()-type result. */
198 static int
199 compare_indexed_cases (const void *a_, const void *b_, void *criteria_)
200 {
201   struct sort_criteria *criteria = criteria_;
202   const struct indexed_case *a = a_;
203   const struct indexed_case *b = b_;
204   int result = compare_record (&a->c, &b->c, criteria);
205   if (result == 0)
206     result = a->idx < b->idx ? -1 : a->idx > b->idx;
207   return result;
208 }
209 \f
210 /* External sort. */
211
212 /* Maximum order of merge (external sort only).  The maximum
213    reasonable value is about 7.  Above that, it would be a good
214    idea to use a heap in merge_once() to select the minimum. */
215 #define MAX_MERGE_ORDER 7
216
217 /* Results of an external sort. */
218 struct external_sort 
219   {
220     const struct sort_criteria *criteria; /* Sort criteria. */
221     size_t value_cnt;                 /* Size of data in `union value's. */
222     struct casefile **runs;           /* Array of initial runs. */
223     size_t run_cnt, run_cap;          /* Number of runs, allocated capacity. */
224   };
225
226 /* Prototypes for helper functions. */
227 static int write_runs (struct external_sort *, struct casereader *);
228 static struct casefile *merge (struct external_sort *);
229 static void destroy_external_sort (struct external_sort *);
230
231 /* Performs a stable external sort of the active file according
232    to the specification in SCP.  Forms initial runs using a heap
233    as a reservoir.  Merges the initial runs according to a
234    pattern that assures stability. */
235 static struct casefile *
236 do_external_sort (struct casereader *reader,
237                   const struct sort_criteria *criteria)
238 {
239   struct external_sort *xsrt;
240
241   if (!casefile_to_disk (casereader_get_casefile (reader)))
242     return NULL;
243
244   xsrt = xmalloc (sizeof *xsrt);
245   xsrt->criteria = criteria;
246   xsrt->value_cnt = casefile_get_value_cnt (casereader_get_casefile (reader));
247   xsrt->run_cap = 512;
248   xsrt->run_cnt = 0;
249   xsrt->runs = xnmalloc (xsrt->run_cap, sizeof *xsrt->runs);
250   if (write_runs (xsrt, reader))
251     {
252       struct casefile *output = merge (xsrt);
253       destroy_external_sort (xsrt);
254       return output;
255     }
256   else
257     {
258       destroy_external_sort (xsrt);
259       return NULL;
260     }
261 }
262
263 /* Destroys XSRT. */
264 static void
265 destroy_external_sort (struct external_sort *xsrt) 
266 {
267   if (xsrt != NULL) 
268     {
269       int i;
270       
271       for (i = 0; i < xsrt->run_cnt; i++)
272         casefile_destroy (xsrt->runs[i]);
273       free (xsrt->runs);
274       free (xsrt);
275     }
276 }
277 \f
278 /* Replacement selection. */
279
280 /* Pairs a record with a run number. */
281 struct record_run
282   {
283     int run;                    /* Run number of case. */
284     struct ccase record;        /* Case data. */
285     size_t idx;                 /* Case number (for stability). */
286   };
287
288 /* Represents a set of initial runs during an external sort. */
289 struct initial_run_state 
290   {
291     struct external_sort *xsrt;
292
293     /* Reservoir. */
294     struct record_run *records; /* Records arranged as a heap. */
295     size_t record_cnt;          /* Current number of records. */
296     size_t record_cap;          /* Capacity for records. */
297     
298     /* Run currently being output. */
299     int run;                    /* Run number. */
300     size_t case_cnt;            /* Number of cases so far. */
301     struct casefile *casefile;  /* Output file. */
302     struct ccase last_output;   /* Record last output. */
303
304     int okay;                   /* Zero if an error has been encountered. */
305   };
306
307 static const struct case_sink_class sort_sink_class;
308
309 static bool destroy_initial_run_state (struct initial_run_state *);
310 static void process_case (struct initial_run_state *, const struct ccase *,
311                           size_t);
312 static int allocate_cases (struct initial_run_state *);
313 static void output_record (struct initial_run_state *);
314 static void start_run (struct initial_run_state *);
315 static void end_run (struct initial_run_state *);
316 static int compare_record_run (const struct record_run *,
317                                const struct record_run *,
318                                struct initial_run_state *);
319 static int compare_record_run_minheap (const void *, const void *, void *);
320
321 /* Reads cases from READER and composes initial runs in XSRT. */
322 static int
323 write_runs (struct external_sort *xsrt, struct casereader *reader)
324 {
325   struct initial_run_state *irs;
326   struct ccase c;
327   size_t idx = 0;
328   int success = 0;
329
330   /* Allocate memory for cases. */
331   irs = xmalloc (sizeof *irs);
332   irs->xsrt = xsrt;
333   irs->records = NULL;
334   irs->record_cnt = irs->record_cap = 0;
335   irs->run = 0;
336   irs->case_cnt = 0;
337   irs->casefile = NULL;
338   case_nullify (&irs->last_output);
339   irs->okay = 1;
340   if (!allocate_cases (irs)) 
341     goto done;
342
343   /* Create initial runs. */
344   start_run (irs);
345   for (; irs->okay && casereader_read (reader, &c); case_destroy (&c))
346     process_case (irs, &c, idx++);
347   while (irs->okay && irs->record_cnt > 0)
348     output_record (irs);
349   end_run (irs);
350
351   success = irs->okay;
352
353  done:
354   if (!destroy_initial_run_state (irs))
355     success = false;
356
357   return success;
358 }
359
360 /* Add a single case to an initial run. */
361 static void
362 process_case (struct initial_run_state *irs, const struct ccase *c, size_t idx)
363 {
364   struct record_run *rr;
365
366   /* Compose record_run for this run and add to heap. */
367   assert (irs->record_cnt < irs->record_cap - 1);
368   rr = irs->records + irs->record_cnt++;
369   case_copy (&rr->record, 0, c, 0, irs->xsrt->value_cnt);
370   rr->run = irs->run;
371   rr->idx = idx;
372   if (!case_is_null (&irs->last_output)
373       && compare_record (c, &irs->last_output, irs->xsrt->criteria) < 0)
374     rr->run = irs->run + 1;
375   push_heap (irs->records, irs->record_cnt, sizeof *irs->records,
376              compare_record_run_minheap, irs);
377
378   /* Output a record if the reservoir is full. */
379   if (irs->record_cnt == irs->record_cap - 1 && irs->okay)
380     output_record (irs);
381 }
382
383 /* Destroys the initial run state represented by IRS.
384    Returns true if successful, false if an I/O error occurred. */
385 static bool
386 destroy_initial_run_state (struct initial_run_state *irs) 
387 {
388   int i;
389   bool ok = true;
390
391   if (irs == NULL)
392     return true;
393
394   for (i = 0; i < irs->record_cap; i++)
395     case_destroy (&irs->records[i].record);
396   free (irs->records);
397
398   if (irs->casefile != NULL)
399     ok = casefile_sleep (irs->casefile);
400
401   free (irs);
402   return ok;
403 }
404
405 /* Allocates room for lots of cases as a buffer. */
406 static int
407 allocate_cases (struct initial_run_state *irs)
408 {
409   int approx_case_cost; /* Approximate memory cost of one case in bytes. */
410   int max_cases;        /* Maximum number of cases to allocate. */
411   int i;
412
413   /* Allocate as many cases as we can within the workspace
414      limit. */
415   approx_case_cost = (sizeof *irs->records
416                       + irs->xsrt->value_cnt * sizeof (union value)
417                       + 4 * sizeof (void *));
418   max_cases = get_workspace() / approx_case_cost;
419   if (max_cases > max_buffers)
420     max_cases = max_buffers;
421   irs->records = nmalloc (sizeof *irs->records, max_cases);
422   if (irs->records != NULL)
423     for (i = 0; i < max_cases; i++)
424       if (!case_try_create (&irs->records[i].record, irs->xsrt->value_cnt))
425         {
426           max_cases = i;
427           break;
428         }
429   irs->record_cap = max_cases;
430
431   /* Fail if we didn't allocate an acceptable number of cases. */
432   if (irs->records == NULL || max_cases < min_buffers)
433     {
434       msg (SE, _("Out of memory.  Could not allocate room for minimum of %d "
435                  "cases of %d bytes each.  (PSPP workspace is currently "
436                  "restricted to a maximum of %d KB.)"),
437            min_buffers, approx_case_cost, get_workspace() / 1024);
438       return 0;
439     }
440   return 1;
441 }
442
443 /* Compares the VAR_CNT variables in VARS[] between the `value's at
444    A and B, and returns a strcmp()-type result. */
445 static int
446 compare_record (const struct ccase *a, const struct ccase *b,
447                 const struct sort_criteria *criteria)
448 {
449   int i;
450
451   assert (a != NULL);
452   assert (b != NULL);
453   
454   for (i = 0; i < criteria->crit_cnt; i++)
455     {
456       const struct sort_criterion *c = &criteria->crits[i];
457       int result;
458       
459       if (c->width == 0)
460         {
461           double af = case_num (a, c->fv);
462           double bf = case_num (b, c->fv);
463           
464           result = af < bf ? -1 : af > bf;
465         }
466       else
467         result = memcmp (case_str (a, c->fv), case_str (b, c->fv), c->width);
468
469       if (result != 0)
470         return c->dir == SRT_ASCEND ? result : -result;
471     }
472
473   return 0;
474 }
475
476 /* Compares record-run tuples A and B on run number first, then
477    on record, then on case index. */
478 static int
479 compare_record_run (const struct record_run *a,
480                     const struct record_run *b,
481                     struct initial_run_state *irs)
482 {
483   int result = a->run < b->run ? -1 : a->run > b->run;
484   if (result == 0)
485     result = compare_record (&a->record, &b->record, irs->xsrt->criteria);
486   if (result == 0)
487     result = a->idx < b->idx ? -1 : a->idx > b->idx;
488   return result;
489 }
490
491 /* Compares record-run tuples A and B on run number first, then
492    on the current record according to SCP, but in descending
493    order. */
494 static int
495 compare_record_run_minheap (const void *a, const void *b, void *irs) 
496 {
497   return -compare_record_run (a, b, irs);
498 }
499
500 /* Begins a new initial run, specifically its output file. */
501 static void
502 start_run (struct initial_run_state *irs)
503 {
504   irs->run++;
505   irs->case_cnt = 0;
506   irs->casefile = casefile_create (irs->xsrt->value_cnt);
507   casefile_to_disk (irs->casefile);
508   case_nullify (&irs->last_output); 
509 }
510
511 /* Ends the current initial run.  */
512 static void
513 end_run (struct initial_run_state *irs)
514 {
515   struct external_sort *xsrt = irs->xsrt;
516
517   /* Record initial run. */
518   if (irs->casefile != NULL) 
519     {
520       casefile_sleep (irs->casefile);
521       if (xsrt->run_cnt >= xsrt->run_cap) 
522         {
523           xsrt->run_cap *= 2;
524           xsrt->runs = xnrealloc (xsrt->runs,
525                                   xsrt->run_cap, sizeof *xsrt->runs);
526         }
527       xsrt->runs[xsrt->run_cnt++] = irs->casefile;
528       if (casefile_error (irs->casefile))
529         irs->okay = false;
530       irs->casefile = NULL; 
531     }
532 }
533
534 /* Writes a record to the current initial run. */
535 static void
536 output_record (struct initial_run_state *irs)
537 {
538   struct record_run *record_run;
539   struct ccase case_tmp;
540   
541   /* Extract minimum case from heap. */
542   assert (irs->record_cnt > 0);
543   pop_heap (irs->records, irs->record_cnt--, sizeof *irs->records,
544             compare_record_run_minheap, irs);
545   record_run = irs->records + irs->record_cnt;
546
547   /* Bail if an error has occurred. */
548   if (!irs->okay)
549     return;
550
551   /* Start new run if necessary. */
552   assert (record_run->run == irs->run
553           || record_run->run == irs->run + 1);
554   if (record_run->run != irs->run)
555     {
556       end_run (irs);
557       start_run (irs);
558     }
559   assert (record_run->run == irs->run);
560   irs->case_cnt++;
561
562   /* Write to disk. */
563   if (irs->casefile != NULL)
564     casefile_append (irs->casefile, &record_run->record);
565
566   /* This record becomes last_output. */
567   irs->last_output = case_tmp = record_run->record;
568   record_run->record = irs->records[irs->record_cap - 1].record;
569   irs->records[irs->record_cap - 1].record = case_tmp;
570 }
571 \f
572 /* Merging. */
573
574 static int choose_merge (struct casefile *runs[], int run_cnt, int order);
575 static struct casefile *merge_once (struct external_sort *,
576                                     struct casefile *[], size_t);
577
578 /* Repeatedly merges run until only one is left,
579    and returns the final casefile.
580    Returns a null pointer if an I/O error occurs. */
581 static struct casefile *
582 merge (struct external_sort *xsrt)
583 {
584   while (xsrt->run_cnt > 1)
585     {
586       int order = min (MAX_MERGE_ORDER, xsrt->run_cnt);
587       int idx = choose_merge (xsrt->runs, xsrt->run_cnt, order);
588       xsrt->runs[idx] = merge_once (xsrt, xsrt->runs + idx, order);
589       remove_range (xsrt->runs, xsrt->run_cnt, sizeof *xsrt->runs,
590                     idx + 1, order - 1);
591       xsrt->run_cnt -= order - 1;
592
593       if (xsrt->runs[idx] == NULL)
594         return NULL;
595     }
596   assert (xsrt->run_cnt == 1);
597   xsrt->run_cnt = 0;
598   return xsrt->runs[0];
599 }
600
601 /* Chooses ORDER runs out of the RUN_CNT runs in RUNS to merge,
602    and returns the index of the first one.
603
604    For stability, we must merge only consecutive runs.  For
605    efficiency, we choose the shortest consecutive sequence of
606    runs. */
607 static int
608 choose_merge (struct casefile *runs[], int run_cnt, int order) 
609 {
610   int min_idx, min_sum;
611   int cur_idx, cur_sum;
612   int i;
613
614   /* Sum up the length of the first ORDER runs. */
615   cur_sum = 0;
616   for (i = 0; i < order; i++)
617     cur_sum += casefile_get_case_cnt (runs[i]);
618
619   /* Find the shortest group of ORDER runs,
620      using a running total for efficiency. */
621   min_idx = 0;
622   min_sum = cur_sum;
623   for (cur_idx = 1; cur_idx + order <= run_cnt; cur_idx++)
624     {
625       cur_sum -= casefile_get_case_cnt (runs[cur_idx - 1]);
626       cur_sum += casefile_get_case_cnt (runs[cur_idx + order - 1]);
627       if (cur_sum < min_sum)
628         {
629           min_sum = cur_sum;
630           min_idx = cur_idx;
631         }
632     }
633
634   return min_idx;
635 }
636
637 /* Merges the RUN_CNT initial runs specified in INPUT_FILES into a
638    new run, and returns the new run.
639    Returns a null pointer if an I/O error occurs. */
640 static struct casefile *
641 merge_once (struct external_sort *xsrt,
642             struct casefile **const input_files,
643             size_t run_cnt)
644 {
645   struct run
646     {
647       struct casefile *file;
648       struct casereader *reader;
649       struct ccase ccase;
650     }
651   *runs;
652
653   struct casefile *output = NULL;
654   int i;
655
656   /* Open input files. */
657   runs = xnmalloc (run_cnt, sizeof *runs);
658   for (i = 0; i < run_cnt; i++) 
659     {
660       struct run *r = &runs[i];
661       r->file = input_files[i];
662       r->reader = casefile_get_destructive_reader (r->file);
663       if (!casereader_read_xfer (r->reader, &r->ccase))
664         {
665           run_cnt--;
666           i--;
667         }
668     }
669
670   /* Create output file. */
671   output = casefile_create (xsrt->value_cnt);
672   casefile_to_disk (output);
673
674   /* Merge. */
675   while (run_cnt > 0) 
676     {
677       struct run *min_run, *run;
678       
679       /* Find minimum. */
680       min_run = runs;
681       for (run = runs + 1; run < runs + run_cnt; run++)
682         if (compare_record (&run->ccase, &min_run->ccase, xsrt->criteria) < 0)
683           min_run = run;
684
685       /* Write minimum to output file. */
686       casefile_append_xfer (output, &min_run->ccase);
687
688       /* Read another case from minimum run. */
689       if (!casereader_read_xfer (min_run->reader, &min_run->ccase))
690         {
691           if (casefile_error (min_run->file) || casefile_error (output))
692             goto error;
693           casereader_destroy (min_run->reader);
694           casefile_destroy (min_run->file);
695
696           remove_element (runs, run_cnt, sizeof *runs, min_run - runs);
697           run_cnt--;
698         } 
699     }
700
701   if (!casefile_sleep (output))
702     goto error;
703   free (runs);
704
705   return output;
706
707  error:
708   for (i = 0; i < run_cnt; i++) 
709     casefile_destroy (runs[i].file);
710   casefile_destroy (output);
711   free (runs);
712   return NULL;
713 }