Added new files resulting from directory restructuring.
[pspp-builds.git] / src / math / sort.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21 #include "sort.h"
22 #include "message.h"
23 #include "alloc.h"
24 #include <limits.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <errno.h>
28 #include "array.h"
29 #include <stdbool.h>
30 #include "case.h"
31 #include "casefile.h"
32 #include "message.h"
33
34 #include "misc.h"
35 #include "settings.h"
36 #include "str.h"
37 #include "variable.h"
38 #include "procedure.h"
39
40 #include "gettext.h"
41 #define _(msgid) gettext (msgid)
42
43 #include "debug-print.h"
44
45 /* These should only be changed for testing purposes. */
46 int min_buffers = 64;
47 int max_buffers = INT_MAX;
48 bool allow_internal_sort = true;
49
50 static int compare_record (const struct ccase *, const struct ccase *,
51                            const struct sort_criteria *);
52 static struct casefile *do_internal_sort (struct casereader *,
53                                           const struct sort_criteria *);
54 static struct casefile *do_external_sort (struct casereader *,
55                                           const struct sort_criteria *);
56
57 /* Gets ready to sort the active file, either in-place or to a
58    separate casefile. */
59 static bool
60 prepare_to_sort_active_file (void) 
61 {
62   bool ok;
63   
64   /* Cancel temporary transformations and PROCESS IF. */
65   if (temporary != 0)
66     cancel_temporary (); 
67   expr_free (process_if_expr);
68   process_if_expr = NULL;
69
70   /* Make sure source cases are in a storage source. */
71   ok = procedure (NULL, NULL);
72   assert (case_source_is_class (vfm_source, &storage_source_class));
73
74   return ok;
75 }
76
77 /* Sorts the active file in-place according to CRITERIA.
78    Returns nonzero if successful. */
79 int
80 sort_active_file_in_place (const struct sort_criteria *criteria) 
81 {
82   struct casefile *src, *dst;
83   
84   if (!prepare_to_sort_active_file ())
85     return 0;
86
87   src = storage_source_get_casefile (vfm_source);
88   dst = sort_execute (casefile_get_destructive_reader (src), criteria);
89   free_case_source (vfm_source);
90   vfm_source = NULL;
91
92   if (dst == NULL) 
93     return 0;
94
95   vfm_source = storage_source_create (dst);
96   return 1;
97 }
98
99 /* Sorts the active file to a separate casefile.  If successful,
100    returns the sorted casefile.  Returns a null pointer on
101    failure. */
102 struct casefile *
103 sort_active_file_to_casefile (const struct sort_criteria *criteria) 
104 {
105   struct casefile *src;
106   
107   if (!prepare_to_sort_active_file ())
108     return NULL;
109
110   src = storage_source_get_casefile (vfm_source);
111   return sort_execute (casefile_get_reader (src), criteria);
112 }
113
114
115 /* Reads all the cases from READER, which is destroyed.  Sorts
116    the cases according to CRITERIA.  Returns the sorted cases in
117    a newly created casefile. */
118 struct casefile *
119 sort_execute (struct casereader *reader, const struct sort_criteria *criteria)
120 {
121   struct casefile *output = do_internal_sort (reader, criteria);
122   if (output == NULL)
123     output = do_external_sort (reader, criteria);
124   casereader_destroy (reader);
125   return output;
126 }
127 \f
128 /* A case and its index. */
129 struct indexed_case 
130   {
131     struct ccase c;     /* Case. */
132     unsigned long idx;  /* Index to allow for stable sorting. */
133   };
134
135 static int compare_indexed_cases (const void *, const void *, void *);
136
137 /* If the data is in memory, do an internal sort and return a new
138    casefile for the data.  Otherwise, return a null pointer. */
139 static struct casefile *
140 do_internal_sort (struct casereader *reader,
141                   const struct sort_criteria *criteria)
142 {
143   const struct casefile *src;
144   struct casefile *dst;
145   unsigned long case_cnt;
146
147   if (!allow_internal_sort)
148     return NULL;
149
150   src = casereader_get_casefile (reader);
151   if (casefile_get_case_cnt (src) > 1 && !casefile_in_core (src))
152     return NULL;
153       
154   case_cnt = casefile_get_case_cnt (src);
155   dst = casefile_create (casefile_get_value_cnt (src));
156   if (case_cnt != 0) 
157     {
158       struct indexed_case *cases = nmalloc (sizeof *cases, case_cnt);
159       if (cases != NULL) 
160         {
161           unsigned long i;
162           
163           for (i = 0; i < case_cnt; i++)
164             {
165               bool ok = casereader_read_xfer (reader, &cases[i].c);
166               if (!ok)
167                 abort ();
168               cases[i].idx = i;
169             }
170
171           sort (cases, case_cnt, sizeof *cases, compare_indexed_cases,
172                 (void *) criteria);
173       
174           for (i = 0; i < case_cnt; i++)
175             casefile_append_xfer (dst, &cases[i].c);
176           if (casefile_error (dst))
177             abort ();
178
179           free (cases);
180         }
181       else 
182         {
183           /* Failure. */
184           casefile_destroy (dst);
185           dst = NULL;
186         }
187     }
188
189   return dst;
190 }
191
192 /* Compares the variables specified by CRITERIA between the cases
193    at A and B, with a "last resort" comparison for stability, and
194    returns a strcmp()-type result. */
195 static int
196 compare_indexed_cases (const void *a_, const void *b_, void *criteria_)
197 {
198   struct sort_criteria *criteria = criteria_;
199   const struct indexed_case *a = a_;
200   const struct indexed_case *b = b_;
201   int result = compare_record (&a->c, &b->c, criteria);
202   if (result == 0)
203     result = a->idx < b->idx ? -1 : a->idx > b->idx;
204   return result;
205 }
206 \f
207 /* External sort. */
208
209 /* Maximum order of merge (external sort only).  The maximum
210    reasonable value is about 7.  Above that, it would be a good
211    idea to use a heap in merge_once() to select the minimum. */
212 #define MAX_MERGE_ORDER 7
213
214 /* Results of an external sort. */
215 struct external_sort 
216   {
217     const struct sort_criteria *criteria; /* Sort criteria. */
218     size_t value_cnt;                 /* Size of data in `union value's. */
219     struct casefile **runs;           /* Array of initial runs. */
220     size_t run_cnt, run_cap;          /* Number of runs, allocated capacity. */
221   };
222
223 /* Prototypes for helper functions. */
224 static int write_runs (struct external_sort *, struct casereader *);
225 static struct casefile *merge (struct external_sort *);
226 static void destroy_external_sort (struct external_sort *);
227
228 /* Performs a stable external sort of the active file according
229    to the specification in SCP.  Forms initial runs using a heap
230    as a reservoir.  Merges the initial runs according to a
231    pattern that assures stability. */
232 static struct casefile *
233 do_external_sort (struct casereader *reader,
234                   const struct sort_criteria *criteria)
235 {
236   struct external_sort *xsrt;
237
238   if (!casefile_to_disk (casereader_get_casefile (reader)))
239     return NULL;
240
241   xsrt = xmalloc (sizeof *xsrt);
242   xsrt->criteria = criteria;
243   xsrt->value_cnt = casefile_get_value_cnt (casereader_get_casefile (reader));
244   xsrt->run_cap = 512;
245   xsrt->run_cnt = 0;
246   xsrt->runs = xnmalloc (xsrt->run_cap, sizeof *xsrt->runs);
247   if (write_runs (xsrt, reader))
248     {
249       struct casefile *output = merge (xsrt);
250       destroy_external_sort (xsrt);
251       return output;
252     }
253   else
254     {
255       destroy_external_sort (xsrt);
256       return NULL;
257     }
258 }
259
260 /* Destroys XSRT. */
261 static void
262 destroy_external_sort (struct external_sort *xsrt) 
263 {
264   if (xsrt != NULL) 
265     {
266       int i;
267       
268       for (i = 0; i < xsrt->run_cnt; i++)
269         casefile_destroy (xsrt->runs[i]);
270       free (xsrt->runs);
271       free (xsrt);
272     }
273 }
274 \f
275 /* Replacement selection. */
276
277 /* Pairs a record with a run number. */
278 struct record_run
279   {
280     int run;                    /* Run number of case. */
281     struct ccase record;        /* Case data. */
282     size_t idx;                 /* Case number (for stability). */
283   };
284
285 /* Represents a set of initial runs during an external sort. */
286 struct initial_run_state 
287   {
288     struct external_sort *xsrt;
289
290     /* Reservoir. */
291     struct record_run *records; /* Records arranged as a heap. */
292     size_t record_cnt;          /* Current number of records. */
293     size_t record_cap;          /* Capacity for records. */
294     
295     /* Run currently being output. */
296     int run;                    /* Run number. */
297     size_t case_cnt;            /* Number of cases so far. */
298     struct casefile *casefile;  /* Output file. */
299     struct ccase last_output;   /* Record last output. */
300
301     int okay;                   /* Zero if an error has been encountered. */
302   };
303
304 static const struct case_sink_class sort_sink_class;
305
306 static bool destroy_initial_run_state (struct initial_run_state *);
307 static void process_case (struct initial_run_state *, const struct ccase *,
308                           size_t);
309 static int allocate_cases (struct initial_run_state *);
310 static void output_record (struct initial_run_state *);
311 static void start_run (struct initial_run_state *);
312 static void end_run (struct initial_run_state *);
313 static int compare_record_run (const struct record_run *,
314                                const struct record_run *,
315                                struct initial_run_state *);
316 static int compare_record_run_minheap (const void *, const void *, void *);
317
318 /* Reads cases from READER and composes initial runs in XSRT. */
319 static int
320 write_runs (struct external_sort *xsrt, struct casereader *reader)
321 {
322   struct initial_run_state *irs;
323   struct ccase c;
324   size_t idx = 0;
325   int success = 0;
326
327   /* Allocate memory for cases. */
328   irs = xmalloc (sizeof *irs);
329   irs->xsrt = xsrt;
330   irs->records = NULL;
331   irs->record_cnt = irs->record_cap = 0;
332   irs->run = 0;
333   irs->case_cnt = 0;
334   irs->casefile = NULL;
335   case_nullify (&irs->last_output);
336   irs->okay = 1;
337   if (!allocate_cases (irs)) 
338     goto done;
339
340   /* Create initial runs. */
341   start_run (irs);
342   for (; irs->okay && casereader_read (reader, &c); case_destroy (&c))
343     process_case (irs, &c, idx++);
344   while (irs->okay && irs->record_cnt > 0)
345     output_record (irs);
346   end_run (irs);
347
348   success = irs->okay;
349
350  done:
351   if (!destroy_initial_run_state (irs))
352     success = false;
353
354   return success;
355 }
356
357 /* Add a single case to an initial run. */
358 static void
359 process_case (struct initial_run_state *irs, const struct ccase *c, size_t idx)
360 {
361   struct record_run *rr;
362
363   /* Compose record_run for this run and add to heap. */
364   assert (irs->record_cnt < irs->record_cap - 1);
365   rr = irs->records + irs->record_cnt++;
366   case_copy (&rr->record, 0, c, 0, irs->xsrt->value_cnt);
367   rr->run = irs->run;
368   rr->idx = idx;
369   if (!case_is_null (&irs->last_output)
370       && compare_record (c, &irs->last_output, irs->xsrt->criteria) < 0)
371     rr->run = irs->run + 1;
372   push_heap (irs->records, irs->record_cnt, sizeof *irs->records,
373              compare_record_run_minheap, irs);
374
375   /* Output a record if the reservoir is full. */
376   if (irs->record_cnt == irs->record_cap - 1 && irs->okay)
377     output_record (irs);
378 }
379
380 /* Destroys the initial run state represented by IRS.
381    Returns true if successful, false if an I/O error occurred. */
382 static bool
383 destroy_initial_run_state (struct initial_run_state *irs) 
384 {
385   int i;
386   bool ok = true;
387
388   if (irs == NULL)
389     return true;
390
391   for (i = 0; i < irs->record_cap; i++)
392     case_destroy (&irs->records[i].record);
393   free (irs->records);
394
395   if (irs->casefile != NULL)
396     ok = casefile_sleep (irs->casefile);
397
398   free (irs);
399   return ok;
400 }
401
402 /* Allocates room for lots of cases as a buffer. */
403 static int
404 allocate_cases (struct initial_run_state *irs)
405 {
406   int approx_case_cost; /* Approximate memory cost of one case in bytes. */
407   int max_cases;        /* Maximum number of cases to allocate. */
408   int i;
409
410   /* Allocate as many cases as we can within the workspace
411      limit. */
412   approx_case_cost = (sizeof *irs->records
413                       + irs->xsrt->value_cnt * sizeof (union value)
414                       + 4 * sizeof (void *));
415   max_cases = get_workspace() / approx_case_cost;
416   if (max_cases > max_buffers)
417     max_cases = max_buffers;
418   irs->records = nmalloc (sizeof *irs->records, max_cases);
419   if (irs->records != NULL)
420     for (i = 0; i < max_cases; i++)
421       if (!case_try_create (&irs->records[i].record, irs->xsrt->value_cnt))
422         {
423           max_cases = i;
424           break;
425         }
426   irs->record_cap = max_cases;
427
428   /* Fail if we didn't allocate an acceptable number of cases. */
429   if (irs->records == NULL || max_cases < min_buffers)
430     {
431       msg (SE, _("Out of memory.  Could not allocate room for minimum of %d "
432                  "cases of %d bytes each.  (PSPP workspace is currently "
433                  "restricted to a maximum of %d KB.)"),
434            min_buffers, approx_case_cost, get_workspace() / 1024);
435       return 0;
436     }
437   return 1;
438 }
439
440 /* Compares the VAR_CNT variables in VARS[] between the `value's at
441    A and B, and returns a strcmp()-type result. */
442 static int
443 compare_record (const struct ccase *a, const struct ccase *b,
444                 const struct sort_criteria *criteria)
445 {
446   int i;
447
448   assert (a != NULL);
449   assert (b != NULL);
450   
451   for (i = 0; i < criteria->crit_cnt; i++)
452     {
453       const struct sort_criterion *c = &criteria->crits[i];
454       int result;
455       
456       if (c->width == 0)
457         {
458           double af = case_num (a, c->fv);
459           double bf = case_num (b, c->fv);
460           
461           result = af < bf ? -1 : af > bf;
462         }
463       else
464         result = memcmp (case_str (a, c->fv), case_str (b, c->fv), c->width);
465
466       if (result != 0)
467         return c->dir == SRT_ASCEND ? result : -result;
468     }
469
470   return 0;
471 }
472
473 /* Compares record-run tuples A and B on run number first, then
474    on record, then on case index. */
475 static int
476 compare_record_run (const struct record_run *a,
477                     const struct record_run *b,
478                     struct initial_run_state *irs)
479 {
480   int result = a->run < b->run ? -1 : a->run > b->run;
481   if (result == 0)
482     result = compare_record (&a->record, &b->record, irs->xsrt->criteria);
483   if (result == 0)
484     result = a->idx < b->idx ? -1 : a->idx > b->idx;
485   return result;
486 }
487
488 /* Compares record-run tuples A and B on run number first, then
489    on the current record according to SCP, but in descending
490    order. */
491 static int
492 compare_record_run_minheap (const void *a, const void *b, void *irs) 
493 {
494   return -compare_record_run (a, b, irs);
495 }
496
497 /* Begins a new initial run, specifically its output file. */
498 static void
499 start_run (struct initial_run_state *irs)
500 {
501   irs->run++;
502   irs->case_cnt = 0;
503   irs->casefile = casefile_create (irs->xsrt->value_cnt);
504   casefile_to_disk (irs->casefile);
505   case_nullify (&irs->last_output); 
506 }
507
508 /* Ends the current initial run.  */
509 static void
510 end_run (struct initial_run_state *irs)
511 {
512   struct external_sort *xsrt = irs->xsrt;
513
514   /* Record initial run. */
515   if (irs->casefile != NULL) 
516     {
517       casefile_sleep (irs->casefile);
518       if (xsrt->run_cnt >= xsrt->run_cap) 
519         {
520           xsrt->run_cap *= 2;
521           xsrt->runs = xnrealloc (xsrt->runs,
522                                   xsrt->run_cap, sizeof *xsrt->runs);
523         }
524       xsrt->runs[xsrt->run_cnt++] = irs->casefile;
525       if (casefile_error (irs->casefile))
526         irs->okay = false;
527       irs->casefile = NULL; 
528     }
529 }
530
531 /* Writes a record to the current initial run. */
532 static void
533 output_record (struct initial_run_state *irs)
534 {
535   struct record_run *record_run;
536   struct ccase case_tmp;
537   
538   /* Extract minimum case from heap. */
539   assert (irs->record_cnt > 0);
540   pop_heap (irs->records, irs->record_cnt--, sizeof *irs->records,
541             compare_record_run_minheap, irs);
542   record_run = irs->records + irs->record_cnt;
543
544   /* Bail if an error has occurred. */
545   if (!irs->okay)
546     return;
547
548   /* Start new run if necessary. */
549   assert (record_run->run == irs->run
550           || record_run->run == irs->run + 1);
551   if (record_run->run != irs->run)
552     {
553       end_run (irs);
554       start_run (irs);
555     }
556   assert (record_run->run == irs->run);
557   irs->case_cnt++;
558
559   /* Write to disk. */
560   if (irs->casefile != NULL)
561     casefile_append (irs->casefile, &record_run->record);
562
563   /* This record becomes last_output. */
564   irs->last_output = case_tmp = record_run->record;
565   record_run->record = irs->records[irs->record_cap - 1].record;
566   irs->records[irs->record_cap - 1].record = case_tmp;
567 }
568 \f
569 /* Merging. */
570
571 static int choose_merge (struct casefile *runs[], int run_cnt, int order);
572 static struct casefile *merge_once (struct external_sort *,
573                                     struct casefile *[], size_t);
574
575 /* Repeatedly merges run until only one is left,
576    and returns the final casefile.
577    Returns a null pointer if an I/O error occurs. */
578 static struct casefile *
579 merge (struct external_sort *xsrt)
580 {
581   while (xsrt->run_cnt > 1)
582     {
583       int order = min (MAX_MERGE_ORDER, xsrt->run_cnt);
584       int idx = choose_merge (xsrt->runs, xsrt->run_cnt, order);
585       xsrt->runs[idx] = merge_once (xsrt, xsrt->runs + idx, order);
586       remove_range (xsrt->runs, xsrt->run_cnt, sizeof *xsrt->runs,
587                     idx + 1, order - 1);
588       xsrt->run_cnt -= order - 1;
589
590       if (xsrt->runs[idx] == NULL)
591         return NULL;
592     }
593   assert (xsrt->run_cnt == 1);
594   xsrt->run_cnt = 0;
595   return xsrt->runs[0];
596 }
597
598 /* Chooses ORDER runs out of the RUN_CNT runs in RUNS to merge,
599    and returns the index of the first one.
600
601    For stability, we must merge only consecutive runs.  For
602    efficiency, we choose the shortest consecutive sequence of
603    runs. */
604 static int
605 choose_merge (struct casefile *runs[], int run_cnt, int order) 
606 {
607   int min_idx, min_sum;
608   int cur_idx, cur_sum;
609   int i;
610
611   /* Sum up the length of the first ORDER runs. */
612   cur_sum = 0;
613   for (i = 0; i < order; i++)
614     cur_sum += casefile_get_case_cnt (runs[i]);
615
616   /* Find the shortest group of ORDER runs,
617      using a running total for efficiency. */
618   min_idx = 0;
619   min_sum = cur_sum;
620   for (cur_idx = 1; cur_idx + order <= run_cnt; cur_idx++)
621     {
622       cur_sum -= casefile_get_case_cnt (runs[cur_idx - 1]);
623       cur_sum += casefile_get_case_cnt (runs[cur_idx + order - 1]);
624       if (cur_sum < min_sum)
625         {
626           min_sum = cur_sum;
627           min_idx = cur_idx;
628         }
629     }
630
631   return min_idx;
632 }
633
634 /* Merges the RUN_CNT initial runs specified in INPUT_FILES into a
635    new run, and returns the new run.
636    Returns a null pointer if an I/O error occurs. */
637 static struct casefile *
638 merge_once (struct external_sort *xsrt,
639             struct casefile **const input_files,
640             size_t run_cnt)
641 {
642   struct run
643     {
644       struct casefile *file;
645       struct casereader *reader;
646       struct ccase ccase;
647     }
648   *runs;
649
650   struct casefile *output = NULL;
651   int i;
652
653   /* Open input files. */
654   runs = xnmalloc (run_cnt, sizeof *runs);
655   for (i = 0; i < run_cnt; i++) 
656     {
657       struct run *r = &runs[i];
658       r->file = input_files[i];
659       r->reader = casefile_get_destructive_reader (r->file);
660       if (!casereader_read_xfer (r->reader, &r->ccase))
661         {
662           run_cnt--;
663           i--;
664         }
665     }
666
667   /* Create output file. */
668   output = casefile_create (xsrt->value_cnt);
669   casefile_to_disk (output);
670
671   /* Merge. */
672   while (run_cnt > 0) 
673     {
674       struct run *min_run, *run;
675       
676       /* Find minimum. */
677       min_run = runs;
678       for (run = runs + 1; run < runs + run_cnt; run++)
679         if (compare_record (&run->ccase, &min_run->ccase, xsrt->criteria) < 0)
680           min_run = run;
681
682       /* Write minimum to output file. */
683       casefile_append_xfer (output, &min_run->ccase);
684
685       /* Read another case from minimum run. */
686       if (!casereader_read_xfer (min_run->reader, &min_run->ccase))
687         {
688           if (casefile_error (min_run->file) || casefile_error (output))
689             goto error;
690           casereader_destroy (min_run->reader);
691           casefile_destroy (min_run->file);
692
693           remove_element (runs, run_cnt, sizeof *runs, min_run - runs);
694           run_cnt--;
695         } 
696     }
697
698   if (!casefile_sleep (output))
699     goto error;
700   free (runs);
701
702   return output;
703
704  error:
705   for (i = 0; i < run_cnt; i++) 
706     casefile_destroy (runs[i].file);
707   casefile_destroy (output);
708   free (runs);
709   return NULL;
710 }