Fixed blank replacement
[pspp-builds.git] / src / sort.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21 #include "sort.h"
22 #include "error.h"
23 #include <limits.h>
24 #include <stdio.h>
25 #include <stdlib.h>
26 #include <errno.h>
27 #include "algorithm.h"
28 #include "alloc.h"
29 #include <stdbool.h>
30 #include "case.h"
31 #include "casefile.h"
32 #include "command.h"
33 #include "error.h"
34 #include "expressions/public.h"
35 #include "glob.h"
36 #include "lexer.h"
37 #include "misc.h"
38 #include "settings.h"
39 #include "sort-prs.h"
40 #include "str.h"
41 #include "var.h"
42 #include "vfm.h"
43 #include "vfmP.h"
44
45 #include "gettext.h"
46 #define _(msgid) gettext (msgid)
47
48 #include "debug-print.h"
49
50 /* These should only be changed for testing purposes. */
51 static int min_buffers = 64;
52 static int max_buffers = INT_MAX;
53 static bool allow_internal_sort = true;
54
55 static int compare_record (const struct ccase *, const struct ccase *,
56                            const struct sort_criteria *);
57 static struct casefile *do_internal_sort (struct casereader *,
58                                           const struct sort_criteria *);
59 static struct casefile *do_external_sort (struct casereader *,
60                                           const struct sort_criteria *);
61
62 /* Performs the SORT CASES procedures. */
63 int
64 cmd_sort_cases (void)
65 {
66   struct sort_criteria *criteria;
67   bool success = false;
68
69   lex_match (T_BY);
70
71   criteria = sort_parse_criteria (default_dict, NULL, NULL, NULL, NULL);
72   if (criteria == NULL)
73     return CMD_FAILURE;
74
75   if (test_mode && lex_match ('/')) 
76     {
77       if (!lex_force_match_id ("BUFFERS") || !lex_match ('=')
78           || !lex_force_int ())
79         goto done;
80
81       min_buffers = max_buffers = lex_integer ();
82       allow_internal_sort = false;
83       if (max_buffers < 2) 
84         {
85           msg (SE, _("Buffer limit must be at least 2."));
86           goto done;
87         }
88
89       lex_get ();
90     }
91
92   success = sort_active_file_in_place (criteria);
93
94  done:
95   min_buffers = 64;
96   max_buffers = INT_MAX;
97   allow_internal_sort = true;
98   
99   sort_destroy_criteria (criteria);
100   return success ? lex_end_of_command () : CMD_FAILURE;
101 }
102
103 /* Gets ready to sort the active file, either in-place or to a
104    separate casefile. */
105 static void
106 prepare_to_sort_active_file (void) 
107 {
108   /* Cancel temporary transformations and PROCESS IF. */
109   if (temporary != 0)
110     cancel_temporary (); 
111   expr_free (process_if_expr);
112   process_if_expr = NULL;
113
114   /* Make sure source cases are in a storage source. */
115   procedure (NULL, NULL);
116   assert (case_source_is_class (vfm_source, &storage_source_class));
117 }
118
119 /* Sorts the active file in-place according to CRITERIA.
120    Returns nonzero if successful. */
121 int
122 sort_active_file_in_place (const struct sort_criteria *criteria) 
123 {
124   struct casefile *src, *dst;
125   
126   prepare_to_sort_active_file ();
127
128   src = storage_source_get_casefile (vfm_source);
129   dst = sort_execute (casefile_get_destructive_reader (src), criteria);
130   free_case_source (vfm_source);
131   vfm_source = NULL;
132
133   if (dst == NULL) 
134     return 0;
135
136   vfm_source = storage_source_create (dst);
137   return 1;
138 }
139
140 /* Sorts the active file to a separate casefile.  If successful,
141    returns the sorted casefile.  Returns a null pointer on
142    failure. */
143 struct casefile *
144 sort_active_file_to_casefile (const struct sort_criteria *criteria) 
145 {
146   struct casefile *src;
147   
148   prepare_to_sort_active_file ();
149
150   src = storage_source_get_casefile (vfm_source);
151   return sort_execute (casefile_get_reader (src), criteria);
152 }
153
154
155 /* Reads all the cases from READER, which is destroyed.  Sorts
156    the cases according to CRITERIA.  Returns the sorted cases in
157    a newly created casefile. */
158 struct casefile *
159 sort_execute (struct casereader *reader, const struct sort_criteria *criteria)
160 {
161   struct casefile *output = do_internal_sort (reader, criteria);
162   if (output == NULL)
163     output = do_external_sort (reader, criteria);
164   casereader_destroy (reader);
165   return output;
166 }
167 \f
168 /* A case and its index. */
169 struct indexed_case 
170   {
171     struct ccase c;     /* Case. */
172     unsigned long idx;  /* Index to allow for stable sorting. */
173   };
174
175 static int compare_indexed_cases (const void *, const void *, void *);
176
177 /* If the data is in memory, do an internal sort and return a new
178    casefile for the data.  Otherwise, return a null pointer. */
179 static struct casefile *
180 do_internal_sort (struct casereader *reader,
181                   const struct sort_criteria *criteria)
182 {
183   const struct casefile *src;
184   struct casefile *dst;
185   unsigned long case_cnt;
186
187   if (!allow_internal_sort)
188     return NULL;
189
190   src = casereader_get_casefile (reader);
191   if (casefile_get_case_cnt (src) > 1 && !casefile_in_core (src))
192     return NULL;
193       
194   case_cnt = casefile_get_case_cnt (src);
195   dst = casefile_create (casefile_get_value_cnt (src));
196   if (case_cnt != 0) 
197     {
198       struct indexed_case *cases = nmalloc (sizeof *cases, case_cnt);
199       if (cases != NULL) 
200         {
201           unsigned long i;
202           
203           for (i = 0; i < case_cnt; i++)
204             {
205               casereader_read_xfer_assert (reader, &cases[i].c);
206               cases[i].idx = i;
207             }
208
209           sort (cases, case_cnt, sizeof *cases, compare_indexed_cases,
210                 (void *) criteria);
211       
212           for (i = 0; i < case_cnt; i++)
213             casefile_append_xfer (dst, &cases[i].c);
214
215           free (cases);
216         }
217       else 
218         {
219           /* Failure. */
220           casefile_destroy (dst);
221           dst = NULL;
222         }
223     }
224
225   return dst;
226 }
227
228 /* Compares the variables specified by CRITERIA between the cases
229    at A and B, with a "last resort" comparison for stability, and
230    returns a strcmp()-type result. */
231 static int
232 compare_indexed_cases (const void *a_, const void *b_, void *criteria_)
233 {
234   struct sort_criteria *criteria = criteria_;
235   const struct indexed_case *a = a_;
236   const struct indexed_case *b = b_;
237   int result = compare_record (&a->c, &b->c, criteria);
238   if (result == 0)
239     result = a->idx < b->idx ? -1 : a->idx > b->idx;
240   return result;
241 }
242 \f
243 /* External sort. */
244
245 /* Maximum order of merge (external sort only).  The maximum
246    reasonable value is about 7.  Above that, it would be a good
247    idea to use a heap in merge_once() to select the minimum. */
248 #define MAX_MERGE_ORDER 7
249
250 /* Results of an external sort. */
251 struct external_sort 
252   {
253     const struct sort_criteria *criteria; /* Sort criteria. */
254     size_t value_cnt;                 /* Size of data in `union value's. */
255     struct casefile **runs;           /* Array of initial runs. */
256     size_t run_cnt, run_cap;          /* Number of runs, allocated capacity. */
257   };
258
259 /* Prototypes for helper functions. */
260 static int write_runs (struct external_sort *, struct casereader *);
261 static struct casefile *merge (struct external_sort *);
262 static void destroy_external_sort (struct external_sort *);
263
264 /* Performs a stable external sort of the active file according
265    to the specification in SCP.  Forms initial runs using a heap
266    as a reservoir.  Merges the initial runs according to a
267    pattern that assures stability. */
268 static struct casefile *
269 do_external_sort (struct casereader *reader,
270                   const struct sort_criteria *criteria)
271 {
272   struct external_sort *xsrt;
273
274   casefile_to_disk (casereader_get_casefile (reader));
275
276   xsrt = xmalloc (sizeof *xsrt);
277   xsrt->criteria = criteria;
278   xsrt->value_cnt = casefile_get_value_cnt (casereader_get_casefile (reader));
279   xsrt->run_cap = 512;
280   xsrt->run_cnt = 0;
281   xsrt->runs = xnmalloc (xsrt->run_cap, sizeof *xsrt->runs);
282   if (write_runs (xsrt, reader))
283     {
284       struct casefile *output = merge (xsrt);
285       destroy_external_sort (xsrt);
286       return output;
287     }
288   else
289     {
290       destroy_external_sort (xsrt);
291       return NULL;
292     }
293 }
294
295 /* Destroys XSRT. */
296 static void
297 destroy_external_sort (struct external_sort *xsrt) 
298 {
299   if (xsrt != NULL) 
300     {
301       int i;
302       
303       for (i = 0; i < xsrt->run_cnt; i++)
304         casefile_destroy (xsrt->runs[i]);
305       free (xsrt->runs);
306       free (xsrt);
307     }
308 }
309 \f
310 /* Replacement selection. */
311
312 /* Pairs a record with a run number. */
313 struct record_run
314   {
315     int run;                    /* Run number of case. */
316     struct ccase record;        /* Case data. */
317     size_t idx;                 /* Case number (for stability). */
318   };
319
320 /* Represents a set of initial runs during an external sort. */
321 struct initial_run_state 
322   {
323     struct external_sort *xsrt;
324
325     /* Reservoir. */
326     struct record_run *records; /* Records arranged as a heap. */
327     size_t record_cnt;          /* Current number of records. */
328     size_t record_cap;          /* Capacity for records. */
329     
330     /* Run currently being output. */
331     int run;                    /* Run number. */
332     size_t case_cnt;            /* Number of cases so far. */
333     struct casefile *casefile;  /* Output file. */
334     struct ccase last_output;   /* Record last output. */
335
336     int okay;                   /* Zero if an error has been encountered. */
337   };
338
339 static const struct case_sink_class sort_sink_class;
340
341 static void destroy_initial_run_state (struct initial_run_state *);
342 static void process_case (struct initial_run_state *, const struct ccase *,
343                           size_t);
344 static int allocate_cases (struct initial_run_state *);
345 static void output_record (struct initial_run_state *);
346 static void start_run (struct initial_run_state *);
347 static void end_run (struct initial_run_state *);
348 static int compare_record_run (const struct record_run *,
349                                const struct record_run *,
350                                struct initial_run_state *);
351 static int compare_record_run_minheap (const void *, const void *, void *);
352
353 /* Reads cases from READER and composes initial runs in XSRT. */
354 static int
355 write_runs (struct external_sort *xsrt, struct casereader *reader)
356 {
357   struct initial_run_state *irs;
358   struct ccase c;
359   size_t idx = 0;
360   int success = 0;
361
362   /* Allocate memory for cases. */
363   irs = xmalloc (sizeof *irs);
364   irs->xsrt = xsrt;
365   irs->records = NULL;
366   irs->record_cnt = irs->record_cap = 0;
367   irs->run = 0;
368   irs->case_cnt = 0;
369   irs->casefile = NULL;
370   case_nullify (&irs->last_output);
371   irs->okay = 1;
372   if (!allocate_cases (irs)) 
373     goto done;
374
375   /* Create initial runs. */
376   start_run (irs);
377   for (; irs->okay && casereader_read (reader, &c); case_destroy (&c))
378     process_case (irs, &c, idx++);
379   while (irs->okay && irs->record_cnt > 0)
380     output_record (irs);
381   end_run (irs);
382
383   success = irs->okay;
384
385  done:
386   destroy_initial_run_state (irs);
387
388   return success;
389 }
390
391 /* Add a single case to an initial run. */
392 static void
393 process_case (struct initial_run_state *irs, const struct ccase *c, size_t idx)
394 {
395   struct record_run *rr;
396
397   /* Compose record_run for this run and add to heap. */
398   assert (irs->record_cnt < irs->record_cap - 1);
399   rr = irs->records + irs->record_cnt++;
400   case_copy (&rr->record, 0, c, 0, irs->xsrt->value_cnt);
401   rr->run = irs->run;
402   rr->idx = idx;
403   if (!case_is_null (&irs->last_output)
404       && compare_record (c, &irs->last_output, irs->xsrt->criteria) < 0)
405     rr->run = irs->run + 1;
406   push_heap (irs->records, irs->record_cnt, sizeof *irs->records,
407              compare_record_run_minheap, irs);
408
409   /* Output a record if the reservoir is full. */
410   if (irs->record_cnt == irs->record_cap - 1 && irs->okay)
411     output_record (irs);
412 }
413
414 /* Destroys the initial run state represented by IRS. */
415 static void
416 destroy_initial_run_state (struct initial_run_state *irs) 
417 {
418   int i;
419
420   if (irs == NULL)
421     return;
422
423   for (i = 0; i < irs->record_cap; i++)
424     case_destroy (&irs->records[i].record);
425   free (irs->records);
426
427   if (irs->casefile != NULL)
428     casefile_sleep (irs->casefile);
429
430   free (irs);
431 }
432
433 /* Allocates room for lots of cases as a buffer. */
434 static int
435 allocate_cases (struct initial_run_state *irs)
436 {
437   int approx_case_cost; /* Approximate memory cost of one case in bytes. */
438   int max_cases;        /* Maximum number of cases to allocate. */
439   int i;
440
441   /* Allocate as many cases as we can within the workspace
442      limit. */
443   approx_case_cost = (sizeof *irs->records
444                       + irs->xsrt->value_cnt * sizeof (union value)
445                       + 4 * sizeof (void *));
446   max_cases = get_max_workspace() / approx_case_cost;
447   if (max_cases > max_buffers)
448     max_cases = max_buffers;
449   irs->records = nmalloc (sizeof *irs->records, max_cases);
450   if (irs->records != NULL)
451     for (i = 0; i < max_cases; i++)
452       if (!case_try_create (&irs->records[i].record, irs->xsrt->value_cnt))
453         {
454           max_cases = i;
455           break;
456         }
457   irs->record_cap = max_cases;
458
459   /* Fail if we didn't allocate an acceptable number of cases. */
460   if (irs->records == NULL || max_cases < min_buffers)
461     {
462       msg (SE, _("Out of memory.  Could not allocate room for minimum of %d "
463                  "cases of %d bytes each.  (PSPP workspace is currently "
464                  "restricted to a maximum of %d KB.)"),
465            min_buffers, approx_case_cost, get_max_workspace() / 1024);
466       return 0;
467     }
468   return 1;
469 }
470
471 /* Compares the VAR_CNT variables in VARS[] between the `value's at
472    A and B, and returns a strcmp()-type result. */
473 static int
474 compare_record (const struct ccase *a, const struct ccase *b,
475                 const struct sort_criteria *criteria)
476 {
477   int i;
478
479   assert (a != NULL);
480   assert (b != NULL);
481   
482   for (i = 0; i < criteria->crit_cnt; i++)
483     {
484       const struct sort_criterion *c = &criteria->crits[i];
485       int result;
486       
487       if (c->width == 0)
488         {
489           double af = case_num (a, c->fv);
490           double bf = case_num (b, c->fv);
491           
492           result = af < bf ? -1 : af > bf;
493         }
494       else
495         result = memcmp (case_str (a, c->fv), case_str (b, c->fv), c->width);
496
497       if (result != 0)
498         return c->dir == SRT_ASCEND ? result : -result;
499     }
500
501   return 0;
502 }
503
504 /* Compares record-run tuples A and B on run number first, then
505    on record, then on case index. */
506 static int
507 compare_record_run (const struct record_run *a,
508                     const struct record_run *b,
509                     struct initial_run_state *irs)
510 {
511   int result = a->run < b->run ? -1 : a->run > b->run;
512   if (result == 0)
513     result = compare_record (&a->record, &b->record, irs->xsrt->criteria);
514   if (result == 0)
515     result = a->idx < b->idx ? -1 : a->idx > b->idx;
516   return result;
517 }
518
519 /* Compares record-run tuples A and B on run number first, then
520    on the current record according to SCP, but in descending
521    order. */
522 static int
523 compare_record_run_minheap (const void *a, const void *b, void *irs) 
524 {
525   return -compare_record_run (a, b, irs);
526 }
527
528 /* Begins a new initial run, specifically its output file. */
529 static void
530 start_run (struct initial_run_state *irs)
531 {
532   irs->run++;
533   irs->case_cnt = 0;
534   irs->casefile = casefile_create (irs->xsrt->value_cnt);
535   casefile_to_disk (irs->casefile);
536   case_nullify (&irs->last_output); 
537 }
538
539 /* Ends the current initial run.  */
540 static void
541 end_run (struct initial_run_state *irs)
542 {
543   struct external_sort *xsrt = irs->xsrt;
544
545   /* Record initial run. */
546   if (irs->casefile != NULL) 
547     {
548       casefile_sleep (irs->casefile);
549       if (xsrt->run_cnt >= xsrt->run_cap) 
550         {
551           xsrt->run_cap *= 2;
552           xsrt->runs = xnrealloc (xsrt->runs,
553                                   xsrt->run_cap, sizeof *xsrt->runs);
554         }
555       xsrt->runs[xsrt->run_cnt++] = irs->casefile;
556       irs->casefile = NULL; 
557     }
558 }
559
560 /* Writes a record to the current initial run. */
561 static void
562 output_record (struct initial_run_state *irs)
563 {
564   struct record_run *record_run;
565   struct ccase case_tmp;
566   
567   /* Extract minimum case from heap. */
568   assert (irs->record_cnt > 0);
569   pop_heap (irs->records, irs->record_cnt--, sizeof *irs->records,
570             compare_record_run_minheap, irs);
571   record_run = irs->records + irs->record_cnt;
572
573   /* Bail if an error has occurred. */
574   if (!irs->okay)
575     return;
576
577   /* Start new run if necessary. */
578   assert (record_run->run == irs->run
579           || record_run->run == irs->run + 1);
580   if (record_run->run != irs->run)
581     {
582       end_run (irs);
583       start_run (irs);
584     }
585   assert (record_run->run == irs->run);
586   irs->case_cnt++;
587
588   /* Write to disk. */
589   if (irs->casefile != NULL)
590     casefile_append (irs->casefile, &record_run->record);
591
592   /* This record becomes last_output. */
593   irs->last_output = case_tmp = record_run->record;
594   record_run->record = irs->records[irs->record_cap - 1].record;
595   irs->records[irs->record_cap - 1].record = case_tmp;
596 }
597 \f
598 /* Merging. */
599
600 static int choose_merge (struct casefile *runs[], int run_cnt, int order);
601 static struct casefile *merge_once (struct external_sort *,
602                                     struct casefile *[], size_t);
603
604 /* Repeatedly merges run until only one is left,
605    and returns the final casefile.  */
606 static struct casefile *
607 merge (struct external_sort *xsrt)
608 {
609   while (xsrt->run_cnt > 1)
610     {
611       int order = min (MAX_MERGE_ORDER, xsrt->run_cnt);
612       int idx = choose_merge (xsrt->runs, xsrt->run_cnt, order);
613       xsrt->runs[idx] = merge_once (xsrt, xsrt->runs + idx, order);
614       remove_range (xsrt->runs, xsrt->run_cnt, sizeof *xsrt->runs,
615                     idx + 1, order - 1);
616       xsrt->run_cnt -= order - 1;
617     }
618   assert (xsrt->run_cnt == 1);
619   xsrt->run_cnt = 0;
620   return xsrt->runs[0];
621 }
622
623 /* Chooses ORDER runs out of the RUN_CNT runs in RUNS to merge,
624    and returns the index of the first one.
625
626    For stability, we must merge only consecutive runs.  For
627    efficiency, we choose the shortest consecutive sequence of
628    runs. */
629 static int
630 choose_merge (struct casefile *runs[], int run_cnt, int order) 
631 {
632   int min_idx, min_sum;
633   int cur_idx, cur_sum;
634   int i;
635
636   /* Sum up the length of the first ORDER runs. */
637   cur_sum = 0;
638   for (i = 0; i < order; i++)
639     cur_sum += casefile_get_case_cnt (runs[i]);
640
641   /* Find the shortest group of ORDER runs,
642      using a running total for efficiency. */
643   min_idx = 0;
644   min_sum = cur_sum;
645   for (cur_idx = 1; cur_idx + order <= run_cnt; cur_idx++)
646     {
647       cur_sum -= casefile_get_case_cnt (runs[cur_idx - 1]);
648       cur_sum += casefile_get_case_cnt (runs[cur_idx + order - 1]);
649       if (cur_sum < min_sum)
650         {
651           min_sum = cur_sum;
652           min_idx = cur_idx;
653         }
654     }
655
656   return min_idx;
657 }
658
659 /* Merges the RUN_CNT initial runs specified in INPUT_FILES into a
660    new run, and returns the new run. */
661 static struct casefile *
662 merge_once (struct external_sort *xsrt,
663             struct casefile **const input_files,
664             size_t run_cnt)
665 {
666   struct run
667     {
668       struct casefile *file;
669       struct casereader *reader;
670       struct ccase ccase;
671     }
672   *runs;
673
674   struct casefile *output = NULL;
675   int i;
676
677   /* Open input files. */
678   runs = xnmalloc (run_cnt, sizeof *runs);
679   for (i = 0; i < run_cnt; i++) 
680     {
681       struct run *r = &runs[i];
682       r->file = input_files[i];
683       r->reader = casefile_get_destructive_reader (r->file);
684       if (!casereader_read_xfer (r->reader, &r->ccase))
685         {
686           run_cnt--;
687           i--;
688         }
689     }
690
691   /* Create output file. */
692   output = casefile_create (xsrt->value_cnt);
693   casefile_to_disk (output);
694
695   /* Merge. */
696   while (run_cnt > 0) 
697     {
698       struct run *min_run, *run;
699       
700       /* Find minimum. */
701       min_run = runs;
702       for (run = runs + 1; run < runs + run_cnt; run++)
703         if (compare_record (&run->ccase, &min_run->ccase, xsrt->criteria) < 0)
704           min_run = run;
705
706       /* Write minimum to output file. */
707       casefile_append_xfer (output, &min_run->ccase);
708
709       /* Read another case from minimum run. */
710       if (!casereader_read_xfer (min_run->reader, &min_run->ccase))
711         {
712           casereader_destroy (min_run->reader);
713           casefile_destroy (min_run->file);
714
715           remove_element (runs, run_cnt, sizeof *runs, min_run - runs);
716           run_cnt--;
717         } 
718     }
719
720   casefile_sleep (output);
721   free (runs);
722
723   return output;
724 }