d3e4446d075bc8be8a92cd003c62649e83b021f6
[pspp-builds.git] / src / math / sort.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
3
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License as
6    published by the Free Software Foundation; either version 2 of the
7    License, or (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful, but
10    WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12    General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
17    02110-1301, USA. */
18
19 #include <config.h>
20
21 #include "sort.h"
22
23 #include <errno.h>
24 #include <limits.h>
25 #include <stdbool.h>
26 #include <stdio.h>
27 #include <stdlib.h>
28
29 #include <data/case-source.h>
30 #include <data/case.h>
31 #include <data/casefile.h>
32 #include <data/fastfile.h>
33 #include <data/casefile-factory.h>
34 #include <data/fastfile-factory.h>
35 #include <data/procedure.h>
36 #include <data/settings.h>
37 #include <data/variable.h>
38 #include <data/storage-stream.h>
39 #include <libpspp/alloc.h>
40 #include <libpspp/array.h>
41 #include <libpspp/assertion.h>
42 #include <libpspp/message.h>
43 #include <libpspp/message.h>
44 #include <libpspp/misc.h>
45 #include <libpspp/str.h>
46
47 #include "minmax.h"
48
49 #include "gettext.h"
50 #define _(msgid) gettext (msgid)
51
52 /* These should only be changed for testing purposes. */
53 int min_buffers = 64;
54 int max_buffers = INT_MAX;
55 bool allow_internal_sort = true;
56
57 static int compare_record (const struct ccase *, const struct ccase *,
58                            const struct sort_criteria *);
59 static struct casefile *do_internal_sort (struct casereader *,
60                                           const struct sort_criteria *,
61                                           struct casefile_factory *
62                                           );
63 static struct casefile *do_external_sort (struct casereader *,
64                                           const struct sort_criteria *,
65                                           struct casefile_factory *
66                                           );
67
68
69 /* Sorts the active file in-place according to CRITERIA.
70    Returns true if successful. */
71 bool
72 sort_active_file_in_place (struct dataset *ds, 
73                            const struct sort_criteria *criteria) 
74 {
75   struct casefile *in, *out;
76
77   proc_cancel_temporary_transformations (ds);
78   if (!procedure (ds, NULL, NULL))
79     return false;
80   
81   in = proc_capture_output (ds);
82   out = sort_execute (casefile_get_destructive_reader (in), criteria, 
83                       dataset_get_casefile_factory (ds));
84   if (out == NULL) 
85     return false;
86
87   proc_set_source (ds, storage_source_create (out));
88   return true;
89 }
90
91 /* Data passed to sort_to_casefile_callback(). */
92 struct sort_to_casefile_cb_data 
93   {
94     const struct sort_criteria *criteria;
95     struct casefile *output;
96     struct casefile_factory *factory ;
97   };
98
99 /* Sorts casefile CF according to the criteria in CB_DATA. */
100 static bool
101 sort_to_casefile_callback (const struct casefile *cf, void *cb_data_) 
102 {
103   struct sort_to_casefile_cb_data *cb_data = cb_data_;
104   cb_data->output = sort_execute (casefile_get_reader (cf, NULL), 
105                                   cb_data->criteria,
106                                   cb_data->factory
107                                   );
108   return cb_data->output != NULL;
109 }
110
111 /* Sorts the active file to a separate casefile.  If successful,
112    returns the sorted casefile.  Returns a null pointer on
113    failure. */
114 struct casefile *
115 sort_active_file_to_casefile (struct dataset *ds, 
116                               const struct sort_criteria *criteria) 
117 {
118   struct sort_to_casefile_cb_data cb_data;
119   
120   proc_cancel_temporary_transformations (ds);
121
122   cb_data.criteria = criteria;
123   cb_data.output = NULL;
124   cb_data.factory = dataset_get_casefile_factory (ds);
125   if (!multipass_procedure (ds, sort_to_casefile_callback, &cb_data)) 
126     {
127       casefile_destroy (cb_data.output);
128       return NULL;
129     }
130   return cb_data.output;
131 }
132
133
134 /* Reads all the cases from READER, which is destroyed.  Sorts
135    the cases according to CRITERIA.  Returns the sorted cases in
136    a newly created casefile, which will be created by FACTORY.
137    If FACTORY is NULL, then a local fastfile_factory will be used.
138 */
139 struct casefile *
140 sort_execute (struct casereader *reader,
141               const struct sort_criteria *criteria,
142               struct casefile_factory *factory
143               )
144 {
145   struct casefile_factory *local_factory = NULL;
146   struct casefile *output ;
147   if ( factory == NULL )
148     factory = local_factory = fastfile_factory_create ();
149
150   output = do_internal_sort (reader, criteria, factory);
151   if (output == NULL)
152     output = do_external_sort (reader, criteria, factory);
153   casereader_destroy (reader);
154
155   fastfile_factory_destroy (local_factory);
156
157   return output;
158 }
159 \f
160 /* A case and its index. */
161 struct indexed_case 
162   {
163     struct ccase c;     /* Case. */
164     unsigned long idx;  /* Index to allow for stable sorting. */
165   };
166
167 static int compare_indexed_cases (const void *, const void *, const void *);
168
169 /* If the data is in memory, do an internal sort and return a new
170    casefile for the data.  Otherwise, return a null pointer. */
171 static struct casefile *
172 do_internal_sort (struct casereader *reader,
173                   const struct sort_criteria *criteria, 
174                   struct casefile_factory *factory)
175 {
176   const struct casefile *src;
177   struct casefile *dst;
178   unsigned long case_cnt;
179
180   if (!allow_internal_sort)
181     return NULL;
182
183   src = casereader_get_casefile (reader);
184   if (casefile_get_case_cnt (src) > 1 && !casefile_in_core (src))
185     return NULL;
186       
187   case_cnt = casefile_get_case_cnt (src);
188   dst = factory->create_casefile (factory, casefile_get_value_cnt (src));
189   if (case_cnt != 0) 
190     {
191       struct indexed_case *cases = nmalloc (sizeof *cases, case_cnt);
192       if (cases != NULL) 
193         {
194           unsigned long i;
195           
196           for (i = 0; i < case_cnt; i++)
197             {
198               bool ok = casereader_read_xfer (reader, &cases[i].c);
199               if (!ok)
200                 NOT_REACHED ();
201               cases[i].idx = i;
202             }
203
204           sort (cases, case_cnt, sizeof *cases, compare_indexed_cases,
205                 (void *) criteria);
206       
207           for (i = 0; i < case_cnt; i++)
208             casefile_append_xfer (dst, &cases[i].c);
209           if (casefile_error (dst))
210             NOT_REACHED ();
211
212           free (cases);
213         }
214       else 
215         {
216           /* Failure. */
217           casefile_destroy (dst);
218           dst = NULL;
219         }
220     }
221
222   return dst;
223 }
224
225 /* Compares the variables specified by CRITERIA between the cases
226    at A and B, with a "last resort" comparison for stability, and
227    returns a strcmp()-type result. */
228 static int
229 compare_indexed_cases (const void *a_, const void *b_, const void *criteria_)
230 {
231   const struct sort_criteria *criteria = criteria_;
232   const struct indexed_case *a = a_;
233   const struct indexed_case *b = b_;
234   int result = compare_record (&a->c, &b->c, criteria);
235   if (result == 0)
236     result = a->idx < b->idx ? -1 : a->idx > b->idx;
237   return result;
238 }
239 \f
240 /* External sort. */
241
242 /* Maximum order of merge (external sort only).  The maximum
243    reasonable value is about 7.  Above that, it would be a good
244    idea to use a heap in merge_once() to select the minimum. */
245 #define MAX_MERGE_ORDER 7
246
247 /* Results of an external sort. */
248 struct external_sort 
249   {
250     const struct sort_criteria *criteria; /* Sort criteria. */
251     size_t value_cnt;                 /* Size of data in `union value's. */
252     struct casefile **runs;           /* Array of initial runs. */
253     size_t run_cnt, run_cap;          /* Number of runs, allocated capacity. */
254     struct casefile_factory *factory; /* Factory used to  create the result */
255   };
256
257 /* Prototypes for helper functions. */
258 static int write_runs (struct external_sort *, struct casereader *);
259 static struct casefile *merge (struct external_sort *);
260 static void destroy_external_sort (struct external_sort *);
261
262 /* Performs a stable external sort of the active file according
263    to the specification in SCP.  Forms initial runs using a heap
264    as a reservoir.  Merges the initial runs according to a
265    pattern that assures stability. */
266 static struct casefile *
267 do_external_sort (struct casereader *reader,
268                   const struct sort_criteria *criteria,
269                   struct casefile_factory *factory
270                   )
271 {
272   struct external_sort *xsrt;
273
274   if (!casefile_to_disk (casereader_get_casefile (reader)))
275     return NULL;
276
277   xsrt = xmalloc (sizeof *xsrt);
278   xsrt->criteria = criteria;
279   xsrt->value_cnt = casefile_get_value_cnt (casereader_get_casefile (reader));
280   xsrt->run_cap = 512;
281   xsrt->run_cnt = 0;
282   xsrt->runs = xnmalloc (xsrt->run_cap, sizeof *xsrt->runs);
283   xsrt->factory = factory;
284   if (write_runs (xsrt, reader))
285     {
286       struct casefile *output = merge (xsrt);
287       destroy_external_sort (xsrt);
288       return output;
289     }
290   else
291     {
292       destroy_external_sort (xsrt);
293       return NULL;
294     }
295 }
296
297 /* Destroys XSRT. */
298 static void
299 destroy_external_sort (struct external_sort *xsrt) 
300 {
301   if (xsrt != NULL) 
302     {
303       int i;
304       
305       for (i = 0; i < xsrt->run_cnt; i++)
306         casefile_destroy (xsrt->runs[i]);
307       free (xsrt->runs);
308       free (xsrt);
309     }
310 }
311 \f
312 /* Replacement selection. */
313
314 /* Pairs a record with a run number. */
315 struct record_run
316   {
317     int run;                    /* Run number of case. */
318     struct ccase record;        /* Case data. */
319     size_t idx;                 /* Case number (for stability). */
320   };
321
322 /* Represents a set of initial runs during an external sort. */
323 struct initial_run_state 
324   {
325     struct external_sort *xsrt;
326
327     /* Reservoir. */
328     struct record_run *records; /* Records arranged as a heap. */
329     size_t record_cnt;          /* Current number of records. */
330     size_t record_cap;          /* Capacity for records. */
331     
332     /* Run currently being output. */
333     int run;                    /* Run number. */
334     size_t case_cnt;            /* Number of cases so far. */
335     struct casefile *casefile;  /* Output file. */
336     struct ccase last_output;   /* Record last output. */
337
338     int okay;                   /* Zero if an error has been encountered. */
339   };
340
341 static bool destroy_initial_run_state (struct initial_run_state *);
342 static void process_case (struct initial_run_state *, 
343                           const struct ccase *, size_t);
344 static int allocate_cases (struct initial_run_state *);
345 static void output_record (struct initial_run_state *);
346 static void start_run (struct initial_run_state *);
347 static void end_run (struct initial_run_state *);
348 static int compare_record_run (const struct record_run *,
349                                const struct record_run *,
350                                const struct initial_run_state *);
351 static int compare_record_run_minheap (const void *, const void *, 
352                                        const void *);
353
354 /* Reads cases from READER and composes initial runs in XSRT. */
355 static int
356 write_runs (struct external_sort *xsrt, struct casereader *reader)
357 {
358   struct initial_run_state *irs;
359   struct ccase c;
360   size_t idx = 0;
361   int success = 0;
362
363   /* Allocate memory for cases. */
364   irs = xmalloc (sizeof *irs);
365   irs->xsrt = xsrt;
366   irs->records = NULL;
367   irs->record_cnt = irs->record_cap = 0;
368   irs->run = 0;
369   irs->case_cnt = 0;
370   irs->casefile = NULL;
371   case_nullify (&irs->last_output);
372   irs->okay = 1;
373   if (!allocate_cases (irs)) 
374     goto done;
375
376   /* Create initial runs. */
377   start_run (irs);
378   for (; irs->okay && casereader_read (reader, &c); case_destroy (&c))
379     process_case (irs, &c, idx++);
380   while (irs->okay && irs->record_cnt > 0)
381     output_record (irs);
382   end_run (irs);
383
384   success = irs->okay;
385
386  done:
387   if (!destroy_initial_run_state (irs))
388     success = false;
389
390   return success;
391 }
392
393 /* Add a single case to an initial run. */
394 static void
395 process_case (struct initial_run_state *irs, const struct ccase *c, 
396               size_t idx)
397 {
398   struct record_run *rr;
399
400   /* Compose record_run for this run and add to heap. */
401   assert (irs->record_cnt < irs->record_cap - 1);
402   rr = irs->records + irs->record_cnt++;
403   case_copy (&rr->record, 0, c, 0, irs->xsrt->value_cnt);
404   rr->run = irs->run;
405   rr->idx = idx;
406   if (!case_is_null (&irs->last_output)
407       && compare_record (c, &irs->last_output, irs->xsrt->criteria) < 0)
408     rr->run = irs->run + 1;
409   push_heap (irs->records, irs->record_cnt, sizeof *irs->records,
410              compare_record_run_minheap, irs);
411
412   /* Output a record if the reservoir is full. */
413   if (irs->record_cnt == irs->record_cap - 1 && irs->okay)
414     output_record (irs);
415 }
416
417 /* Destroys the initial run state represented by IRS.
418    Returns true if successful, false if an I/O error occurred. */
419 static bool
420 destroy_initial_run_state (struct initial_run_state *irs) 
421 {
422   int i;
423   bool ok = true;
424
425   if (irs == NULL)
426     return true;
427
428   for (i = 0; i < irs->record_cap; i++)
429     case_destroy (&irs->records[i].record);
430   free (irs->records);
431
432   if (irs->casefile != NULL)
433     ok = casefile_sleep (irs->casefile);
434
435   free (irs);
436   return ok;
437 }
438
439 /* Allocates room for lots of cases as a buffer. */
440 static int
441 allocate_cases (struct initial_run_state *irs)
442 {
443   int approx_case_cost; /* Approximate memory cost of one case in bytes. */
444   int max_cases;        /* Maximum number of cases to allocate. */
445   int i;
446
447   /* Allocate as many cases as we can within the workspace
448      limit. */
449   approx_case_cost = (sizeof *irs->records
450                       + irs->xsrt->value_cnt * sizeof (union value)
451                       + 4 * sizeof (void *));
452   max_cases = get_workspace() / approx_case_cost;
453   if (max_cases > max_buffers)
454     max_cases = max_buffers;
455   irs->records = nmalloc (sizeof *irs->records, max_cases);
456   if (irs->records != NULL)
457     for (i = 0; i < max_cases; i++)
458       if (!case_try_create (&irs->records[i].record, irs->xsrt->value_cnt))
459         {
460           max_cases = i;
461           break;
462         }
463   irs->record_cap = max_cases;
464
465   /* Fail if we didn't allocate an acceptable number of cases. */
466   if (irs->records == NULL || max_cases < min_buffers)
467     {
468       msg (SE, _("Out of memory.  Could not allocate room for minimum of %d "
469                  "cases of %d bytes each.  (PSPP workspace is currently "
470                  "restricted to a maximum of %d KB.)"),
471            min_buffers, approx_case_cost, get_workspace() / 1024);
472       return 0;
473     }
474   return 1;
475 }
476
477 /* Compares the VAR_CNT variables in VARS[] between the `value's at
478    A and B, and returns a strcmp()-type result. */
479 static int
480 compare_record (const struct ccase *a, const struct ccase *b,
481                 const struct sort_criteria *criteria)
482 {
483   int i;
484
485   assert (a != NULL);
486   assert (b != NULL);
487   
488   for (i = 0; i < criteria->crit_cnt; i++)
489     {
490       const struct sort_criterion *c = &criteria->crits[i];
491       int result;
492       
493       if (c->width == 0)
494         {
495           double af = case_num_idx (a, c->fv);
496           double bf = case_num_idx (b, c->fv);
497           
498           result = af < bf ? -1 : af > bf;
499         }
500       else
501         result = memcmp (case_str_idx (a, c->fv),
502                          case_str_idx (b, c->fv), c->width);
503
504       if (result != 0)
505         return c->dir == SRT_ASCEND ? result : -result;
506     }
507
508   return 0;
509 }
510
511 /* Compares record-run tuples A and B on run number first, then
512    on record, then on case index. */
513 static int
514 compare_record_run (const struct record_run *a,
515                     const struct record_run *b,
516                     const struct initial_run_state *irs)
517 {
518   int result = a->run < b->run ? -1 : a->run > b->run;
519   if (result == 0)
520     result = compare_record (&a->record, &b->record, irs->xsrt->criteria);
521   if (result == 0)
522     result = a->idx < b->idx ? -1 : a->idx > b->idx;
523   return result;
524 }
525
526 /* Compares record-run tuples A and B on run number first, then
527    on the current record according to SCP, but in descending
528    order. */
529 static int
530 compare_record_run_minheap (const void *a, const void *b, const void *irs) 
531 {
532   return -compare_record_run (a, b, irs);
533 }
534
535 /* Begins a new initial run, specifically its output file. */
536 static void
537 start_run (struct initial_run_state *irs)
538 {
539   irs->run++;
540   irs->case_cnt = 0;
541
542   /* This casefile is internal to the sort, so don't use the factory
543      to create it. */
544   irs->casefile = fastfile_create (irs->xsrt->value_cnt);
545   casefile_to_disk (irs->casefile);
546   case_nullify (&irs->last_output); 
547 }
548
549 /* Ends the current initial run.  */
550 static void
551 end_run (struct initial_run_state *irs)
552 {
553   struct external_sort *xsrt = irs->xsrt;
554
555   /* Record initial run. */
556   if (irs->casefile != NULL) 
557     {
558       casefile_sleep (irs->casefile);
559       if (xsrt->run_cnt >= xsrt->run_cap) 
560         {
561           xsrt->run_cap *= 2;
562           xsrt->runs = xnrealloc (xsrt->runs,
563                                   xsrt->run_cap, sizeof *xsrt->runs);
564         }
565       xsrt->runs[xsrt->run_cnt++] = irs->casefile;
566       if (casefile_error (irs->casefile))
567         irs->okay = false;
568       irs->casefile = NULL; 
569     }
570 }
571
572 /* Writes a record to the current initial run. */
573 static void
574 output_record (struct initial_run_state *irs)
575 {
576   struct record_run *record_run;
577   struct ccase case_tmp;
578   
579   /* Extract minimum case from heap. */
580   assert (irs->record_cnt > 0);
581   pop_heap (irs->records, irs->record_cnt--, sizeof *irs->records,
582             compare_record_run_minheap, irs);
583   record_run = irs->records + irs->record_cnt;
584
585   /* Bail if an error has occurred. */
586   if (!irs->okay)
587     return;
588
589   /* Start new run if necessary. */
590   assert (record_run->run == irs->run
591           || record_run->run == irs->run + 1);
592   if (record_run->run != irs->run)
593     {
594       end_run (irs);
595       start_run (irs);
596     }
597   assert (record_run->run == irs->run);
598   irs->case_cnt++;
599
600   /* Write to disk. */
601   if (irs->casefile != NULL)
602     casefile_append (irs->casefile, &record_run->record);
603
604   /* This record becomes last_output. */
605   irs->last_output = case_tmp = record_run->record;
606   record_run->record = irs->records[irs->record_cap - 1].record;
607   irs->records[irs->record_cap - 1].record = case_tmp;
608 }
609 \f
610 /* Merging. */
611
612 static int choose_merge (struct casefile *runs[], int run_cnt, int order);
613 static struct casefile *merge_once (struct external_sort *,
614                                     struct casefile *[], size_t);
615
616 /* Repeatedly merges run until only one is left,
617    and returns the final casefile.
618    Returns a null pointer if an I/O error occurs. */
619 static struct casefile *
620 merge (struct external_sort *xsrt)
621 {
622   while (xsrt->run_cnt > 1)
623     {
624       int order = MIN (MAX_MERGE_ORDER, xsrt->run_cnt);
625       int idx = choose_merge (xsrt->runs, xsrt->run_cnt, order);
626       xsrt->runs[idx] = merge_once (xsrt, xsrt->runs + idx, order);
627       remove_range (xsrt->runs, xsrt->run_cnt, sizeof *xsrt->runs,
628                     idx + 1, order - 1);
629       xsrt->run_cnt -= order - 1;
630
631       if (xsrt->runs[idx] == NULL)
632         return NULL;
633     }
634   assert (xsrt->run_cnt == 1);
635   xsrt->run_cnt = 0;
636   return xsrt->runs[0];
637 }
638
639 /* Chooses ORDER runs out of the RUN_CNT runs in RUNS to merge,
640    and returns the index of the first one.
641
642    For stability, we must merge only consecutive runs.  For
643    efficiency, we choose the shortest consecutive sequence of
644    runs. */
645 static int
646 choose_merge (struct casefile *runs[], int run_cnt, int order) 
647 {
648   int min_idx, min_sum;
649   int cur_idx, cur_sum;
650   int i;
651
652   /* Sum up the length of the first ORDER runs. */
653   cur_sum = 0;
654   for (i = 0; i < order; i++)
655     cur_sum += casefile_get_case_cnt (runs[i]);
656
657   /* Find the shortest group of ORDER runs,
658      using a running total for efficiency. */
659   min_idx = 0;
660   min_sum = cur_sum;
661   for (cur_idx = 1; cur_idx + order <= run_cnt; cur_idx++)
662     {
663       cur_sum -= casefile_get_case_cnt (runs[cur_idx - 1]);
664       cur_sum += casefile_get_case_cnt (runs[cur_idx + order - 1]);
665       if (cur_sum < min_sum)
666         {
667           min_sum = cur_sum;
668           min_idx = cur_idx;
669         }
670     }
671
672   return min_idx;
673 }
674
675 /* Merges the RUN_CNT initial runs specified in INPUT_FILES into a
676    new run, and returns the new run.
677    Returns a null pointer if an I/O error occurs. */
678 static struct casefile *
679 merge_once (struct external_sort *xsrt,
680             struct casefile **const input_files,
681             size_t run_cnt)
682 {
683   struct run
684     {
685       struct casefile *file;
686       struct casereader *reader;
687       struct ccase ccase;
688     }
689   *runs;
690
691   struct casefile *output = NULL;
692   int i;
693
694   /* Open input files. */
695   runs = xnmalloc (run_cnt, sizeof *runs);
696   for (i = 0; i < run_cnt; i++) 
697     {
698       struct run *r = &runs[i];
699       r->file = input_files[i];
700       r->reader = casefile_get_destructive_reader (r->file);
701       if (!casereader_read_xfer (r->reader, &r->ccase))
702         {
703           run_cnt--;
704           i--;
705         }
706     }
707
708   /* Create output file. */
709   output = xsrt->factory->create_casefile (xsrt->factory, xsrt->value_cnt);
710   casefile_to_disk (output);
711
712   /* Merge. */
713   while (run_cnt > 0) 
714     {
715       struct run *min_run, *run;
716       
717       /* Find minimum. */
718       min_run = runs;
719       for (run = runs + 1; run < runs + run_cnt; run++)
720         if (compare_record (&run->ccase, &min_run->ccase, xsrt->criteria) < 0)
721           min_run = run;
722
723       /* Write minimum to output file. */
724       casefile_append_xfer (output, &min_run->ccase);
725
726       /* Read another case from minimum run. */
727       if (!casereader_read_xfer (min_run->reader, &min_run->ccase))
728         {
729           if (casefile_error (min_run->file) || casefile_error (output))
730             goto error;
731           casereader_destroy (min_run->reader);
732           casefile_destroy (min_run->file);
733
734           remove_element (runs, run_cnt, sizeof *runs, min_run - runs);
735           run_cnt--;
736         } 
737     }
738
739   if (!casefile_sleep (output))
740     goto error;
741   free (runs);
742
743   return output;
744
745  error:
746   for (i = 0; i < run_cnt; i++) 
747     casefile_destroy (runs[i].file);
748   casefile_destroy (output);
749   free (runs);
750   return NULL;
751 }