1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
27 #include "algorithm.h"
34 #include "expressions/public.h"
45 #include "debug-print.h"
47 /* These should only be changed for testing purposes. */
48 static int min_buffers = 64;
49 static int max_buffers = INT_MAX;
50 static bool allow_internal_sort = true;
52 static int compare_record (const struct ccase *, const struct ccase *,
53 const struct sort_criteria *);
54 static struct casefile *do_internal_sort (struct casereader *,
55 const struct sort_criteria *);
56 static struct casefile *do_external_sort (struct casereader *,
57 const struct sort_criteria *);
59 /* Performs the SORT CASES procedures. */
63 struct sort_criteria *criteria;
68 criteria = sort_parse_criteria (default_dict, NULL, NULL, NULL, NULL);
72 if (test_mode && lex_match ('/'))
74 if (!lex_force_match_id ("BUFFERS") || !lex_match ('=')
78 min_buffers = max_buffers = lex_integer ();
79 allow_internal_sort = false;
82 msg (SE, _("Buffer limit must be at least 2."));
89 success = sort_active_file_in_place (criteria);
93 max_buffers = INT_MAX;
94 allow_internal_sort = true;
96 sort_destroy_criteria (criteria);
97 return success ? lex_end_of_command () : CMD_FAILURE;
100 /* Gets ready to sort the active file, either in-place or to a
101 separate casefile. */
103 prepare_to_sort_active_file (void)
105 /* Cancel temporary transformations and PROCESS IF. */
108 expr_free (process_if_expr);
109 process_if_expr = NULL;
111 /* Make sure source cases are in a storage source. */
112 procedure (NULL, NULL);
113 assert (case_source_is_class (vfm_source, &storage_source_class));
116 /* Sorts the active file in-place according to CRITERIA.
117 Returns nonzero if successful. */
119 sort_active_file_in_place (const struct sort_criteria *criteria)
121 struct casefile *src, *dst;
123 prepare_to_sort_active_file ();
125 src = storage_source_get_casefile (vfm_source);
126 dst = sort_execute (casefile_get_destructive_reader (src), criteria);
127 free_case_source (vfm_source);
133 vfm_source = storage_source_create (dst);
137 /* Sorts the active file to a separate casefile. If successful,
138 returns the sorted casefile. Returns a null pointer on
141 sort_active_file_to_casefile (const struct sort_criteria *criteria)
143 struct casefile *src;
145 prepare_to_sort_active_file ();
147 src = storage_source_get_casefile (vfm_source);
148 return sort_execute (casefile_get_reader (src), criteria);
152 /* Reads all the cases from READER, which is destroyed. Sorts
153 the cases according to CRITERIA. Returns the sorted cases in
154 a newly created casefile. */
156 sort_execute (struct casereader *reader, const struct sort_criteria *criteria)
158 struct casefile *output = do_internal_sort (reader, criteria);
160 output = do_external_sort (reader, criteria);
161 casereader_destroy (reader);
165 /* A case and its index. */
168 struct ccase c; /* Case. */
169 unsigned long idx; /* Index to allow for stable sorting. */
172 static int compare_indexed_cases (const void *, const void *, void *);
174 /* If the data is in memory, do an internal sort and return a new
175 casefile for the data. Otherwise, return a null pointer. */
176 static struct casefile *
177 do_internal_sort (struct casereader *reader,
178 const struct sort_criteria *criteria)
180 const struct casefile *src;
181 struct casefile *dst;
182 unsigned long case_cnt;
184 if (!allow_internal_sort)
187 src = casereader_get_casefile (reader);
188 if (casefile_get_case_cnt (src) > 1 && !casefile_in_core (src))
191 case_cnt = casefile_get_case_cnt (src);
192 dst = casefile_create (casefile_get_value_cnt (src));
195 struct indexed_case *cases = malloc (sizeof *cases * case_cnt);
200 for (i = 0; i < case_cnt; i++)
202 casereader_read_xfer_assert (reader, &cases[i].c);
206 sort (cases, case_cnt, sizeof *cases, compare_indexed_cases,
209 for (i = 0; i < case_cnt; i++)
210 casefile_append_xfer (dst, &cases[i].c);
217 casefile_destroy (dst);
225 /* Compares the variables specified by CRITERIA between the cases
226 at A and B, with a "last resort" comparison for stability, and
227 returns a strcmp()-type result. */
229 compare_indexed_cases (const void *a_, const void *b_, void *criteria_)
231 struct sort_criteria *criteria = criteria_;
232 const struct indexed_case *a = a_;
233 const struct indexed_case *b = b_;
234 int result = compare_record (&a->c, &b->c, criteria);
236 result = a->idx < b->idx ? -1 : a->idx > b->idx;
242 /* Maximum order of merge (external sort only). The maximum
243 reasonable value is about 7. Above that, it would be a good
244 idea to use a heap in merge_once() to select the minimum. */
245 #define MAX_MERGE_ORDER 7
247 /* Results of an external sort. */
250 const struct sort_criteria *criteria; /* Sort criteria. */
251 size_t value_cnt; /* Size of data in `union value's. */
252 struct casefile **runs; /* Array of initial runs. */
253 size_t run_cnt, run_cap; /* Number of runs, allocated capacity. */
256 /* Prototypes for helper functions. */
257 static int write_runs (struct external_sort *, struct casereader *);
258 static struct casefile *merge (struct external_sort *);
259 static void destroy_external_sort (struct external_sort *);
261 /* Performs a stable external sort of the active file according
262 to the specification in SCP. Forms initial runs using a heap
263 as a reservoir. Merges the initial runs according to a
264 pattern that assures stability. */
265 static struct casefile *
266 do_external_sort (struct casereader *reader,
267 const struct sort_criteria *criteria)
269 struct external_sort *xsrt;
271 casefile_to_disk (casereader_get_casefile (reader));
273 xsrt = xmalloc (sizeof *xsrt);
274 xsrt->criteria = criteria;
275 xsrt->value_cnt = casefile_get_value_cnt (casereader_get_casefile (reader));
278 xsrt->runs = xmalloc (sizeof *xsrt->runs * xsrt->run_cap);
279 if (write_runs (xsrt, reader))
281 struct casefile *output = merge (xsrt);
282 destroy_external_sort (xsrt);
287 destroy_external_sort (xsrt);
294 destroy_external_sort (struct external_sort *xsrt)
300 for (i = 0; i < xsrt->run_cnt; i++)
301 casefile_destroy (xsrt->runs[i]);
307 /* Replacement selection. */
309 /* Pairs a record with a run number. */
312 int run; /* Run number of case. */
313 struct ccase record; /* Case data. */
314 size_t idx; /* Case number (for stability). */
317 /* Represents a set of initial runs during an external sort. */
318 struct initial_run_state
320 struct external_sort *xsrt;
323 struct record_run *records; /* Records arranged as a heap. */
324 size_t record_cnt; /* Current number of records. */
325 size_t record_cap; /* Capacity for records. */
327 /* Run currently being output. */
328 int run; /* Run number. */
329 size_t case_cnt; /* Number of cases so far. */
330 struct casefile *casefile; /* Output file. */
331 struct ccase last_output; /* Record last output. */
333 int okay; /* Zero if an error has been encountered. */
336 static const struct case_sink_class sort_sink_class;
338 static void destroy_initial_run_state (struct initial_run_state *);
339 static void process_case (struct initial_run_state *, const struct ccase *,
341 static int allocate_cases (struct initial_run_state *);
342 static void output_record (struct initial_run_state *);
343 static void start_run (struct initial_run_state *);
344 static void end_run (struct initial_run_state *);
345 static int compare_record_run (const struct record_run *,
346 const struct record_run *,
347 struct initial_run_state *);
348 static int compare_record_run_minheap (const void *, const void *, void *);
350 /* Reads cases from READER and composes initial runs in XSRT. */
352 write_runs (struct external_sort *xsrt, struct casereader *reader)
354 struct initial_run_state *irs;
359 /* Allocate memory for cases. */
360 irs = xmalloc (sizeof *irs);
363 irs->record_cnt = irs->record_cap = 0;
366 irs->casefile = NULL;
367 case_nullify (&irs->last_output);
369 if (!allocate_cases (irs))
372 /* Create initial runs. */
374 for (; irs->okay && casereader_read (reader, &c); case_destroy (&c))
375 process_case (irs, &c, idx++);
376 while (irs->okay && irs->record_cnt > 0)
383 destroy_initial_run_state (irs);
388 /* Add a single case to an initial run. */
390 process_case (struct initial_run_state *irs, const struct ccase *c, size_t idx)
392 struct record_run *rr;
394 /* Compose record_run for this run and add to heap. */
395 assert (irs->record_cnt < irs->record_cap - 1);
396 rr = irs->records + irs->record_cnt++;
397 case_copy (&rr->record, 0, c, 0, irs->xsrt->value_cnt);
400 if (!case_is_null (&irs->last_output)
401 && compare_record (c, &irs->last_output, irs->xsrt->criteria) < 0)
402 rr->run = irs->run + 1;
403 push_heap (irs->records, irs->record_cnt, sizeof *irs->records,
404 compare_record_run_minheap, irs);
406 /* Output a record if the reservoir is full. */
407 if (irs->record_cnt == irs->record_cap - 1 && irs->okay)
411 /* Destroys the initial run state represented by IRS. */
413 destroy_initial_run_state (struct initial_run_state *irs)
420 for (i = 0; i < irs->record_cap; i++)
421 case_destroy (&irs->records[i].record);
424 if (irs->casefile != NULL)
425 casefile_sleep (irs->casefile);
430 /* Allocates room for lots of cases as a buffer. */
432 allocate_cases (struct initial_run_state *irs)
434 int approx_case_cost; /* Approximate memory cost of one case in bytes. */
435 int max_cases; /* Maximum number of cases to allocate. */
438 /* Allocate as many cases as we can within the workspace
440 approx_case_cost = (sizeof *irs->records
441 + irs->xsrt->value_cnt * sizeof (union value)
442 + 4 * sizeof (void *));
443 max_cases = get_max_workspace() / approx_case_cost;
444 if (max_cases > max_buffers)
445 max_cases = max_buffers;
446 irs->records = malloc (sizeof *irs->records * max_cases);
447 for (i = 0; i < max_cases; i++)
448 if (!case_try_create (&irs->records[i].record, irs->xsrt->value_cnt))
453 irs->record_cap = max_cases;
455 /* Fail if we didn't allocate an acceptable number of cases. */
456 if (irs->records == NULL || max_cases < min_buffers)
458 msg (SE, _("Out of memory. Could not allocate room for minimum of %d "
459 "cases of %d bytes each. (PSPP workspace is currently "
460 "restricted to a maximum of %d KB.)"),
461 min_buffers, approx_case_cost, get_max_workspace() / 1024);
467 /* Compares the VAR_CNT variables in VARS[] between the `value's at
468 A and B, and returns a strcmp()-type result. */
470 compare_record (const struct ccase *a, const struct ccase *b,
471 const struct sort_criteria *criteria)
478 for (i = 0; i < criteria->crit_cnt; i++)
480 const struct sort_criterion *c = &criteria->crits[i];
485 double af = case_num (a, c->fv);
486 double bf = case_num (b, c->fv);
488 result = af < bf ? -1 : af > bf;
491 result = memcmp (case_str (a, c->fv), case_str (b, c->fv), c->width);
494 return c->dir == SRT_ASCEND ? result : -result;
500 /* Compares record-run tuples A and B on run number first, then
501 on record, then on case index. */
503 compare_record_run (const struct record_run *a,
504 const struct record_run *b,
505 struct initial_run_state *irs)
507 int result = a->run < b->run ? -1 : a->run > b->run;
509 result = compare_record (&a->record, &b->record, irs->xsrt->criteria);
511 result = a->idx < b->idx ? -1 : a->idx > b->idx;
515 /* Compares record-run tuples A and B on run number first, then
516 on the current record according to SCP, but in descending
519 compare_record_run_minheap (const void *a, const void *b, void *irs)
521 return -compare_record_run (a, b, irs);
524 /* Begins a new initial run, specifically its output file. */
526 start_run (struct initial_run_state *irs)
530 irs->casefile = casefile_create (irs->xsrt->value_cnt);
531 casefile_to_disk (irs->casefile);
532 case_nullify (&irs->last_output);
535 /* Ends the current initial run. */
537 end_run (struct initial_run_state *irs)
539 struct external_sort *xsrt = irs->xsrt;
541 /* Record initial run. */
542 if (irs->casefile != NULL)
544 casefile_sleep (irs->casefile);
545 if (xsrt->run_cnt >= xsrt->run_cap)
548 xsrt->runs = xrealloc (xsrt->runs,
549 sizeof *xsrt->runs * xsrt->run_cap);
551 xsrt->runs[xsrt->run_cnt++] = irs->casefile;
552 irs->casefile = NULL;
556 /* Writes a record to the current initial run. */
558 output_record (struct initial_run_state *irs)
560 struct record_run *record_run;
561 struct ccase case_tmp;
563 /* Extract minimum case from heap. */
564 assert (irs->record_cnt > 0);
565 pop_heap (irs->records, irs->record_cnt--, sizeof *irs->records,
566 compare_record_run_minheap, irs);
567 record_run = irs->records + irs->record_cnt;
569 /* Bail if an error has occurred. */
573 /* Start new run if necessary. */
574 assert (record_run->run == irs->run
575 || record_run->run == irs->run + 1);
576 if (record_run->run != irs->run)
581 assert (record_run->run == irs->run);
585 if (irs->casefile != NULL)
586 casefile_append (irs->casefile, &record_run->record);
588 /* This record becomes last_output. */
589 irs->last_output = case_tmp = record_run->record;
590 record_run->record = irs->records[irs->record_cap - 1].record;
591 irs->records[irs->record_cap - 1].record = case_tmp;
596 static int choose_merge (struct casefile *runs[], int run_cnt, int order);
597 static struct casefile *merge_once (struct external_sort *,
598 struct casefile *[], size_t);
600 /* Repeatedly merges run until only one is left,
601 and returns the final casefile. */
602 static struct casefile *
603 merge (struct external_sort *xsrt)
605 while (xsrt->run_cnt > 1)
607 int order = min (MAX_MERGE_ORDER, xsrt->run_cnt);
608 int idx = choose_merge (xsrt->runs, xsrt->run_cnt, order);
609 xsrt->runs[idx] = merge_once (xsrt, xsrt->runs + idx, order);
610 remove_range (xsrt->runs, xsrt->run_cnt, sizeof *xsrt->runs,
612 xsrt->run_cnt -= order - 1;
614 assert (xsrt->run_cnt == 1);
616 return xsrt->runs[0];
619 /* Chooses ORDER runs out of the RUN_CNT runs in RUNS to merge,
620 and returns the index of the first one.
622 For stability, we must merge only consecutive runs. For
623 efficiency, we choose the shortest consecutive sequence of
626 choose_merge (struct casefile *runs[], int run_cnt, int order)
628 int min_idx, min_sum;
629 int cur_idx, cur_sum;
632 /* Sum up the length of the first ORDER runs. */
634 for (i = 0; i < order; i++)
635 cur_sum += casefile_get_case_cnt (runs[i]);
637 /* Find the shortest group of ORDER runs,
638 using a running total for efficiency. */
641 for (cur_idx = 1; cur_idx + order <= run_cnt; cur_idx++)
643 cur_sum -= casefile_get_case_cnt (runs[cur_idx - 1]);
644 cur_sum += casefile_get_case_cnt (runs[cur_idx + order - 1]);
645 if (cur_sum < min_sum)
655 /* Merges the RUN_CNT initial runs specified in INPUT_FILES into a
656 new run, and returns the new run. */
657 static struct casefile *
658 merge_once (struct external_sort *xsrt,
659 struct casefile **const input_files,
664 struct casefile *file;
665 struct casereader *reader;
670 struct casefile *output = NULL;
673 /* Open input files. */
674 runs = xmalloc (sizeof *runs * run_cnt);
675 for (i = 0; i < run_cnt; i++)
677 struct run *r = &runs[i];
678 r->file = input_files[i];
679 r->reader = casefile_get_destructive_reader (r->file);
680 if (!casereader_read_xfer (r->reader, &r->ccase))
687 /* Create output file. */
688 output = casefile_create (xsrt->value_cnt);
689 casefile_to_disk (output);
694 struct run *min_run, *run;
698 for (run = runs + 1; run < runs + run_cnt; run++)
699 if (compare_record (&run->ccase, &min_run->ccase, xsrt->criteria) < 0)
702 /* Write minimum to output file. */
703 casefile_append_xfer (output, &min_run->ccase);
705 /* Read another case from minimum run. */
706 if (!casereader_read_xfer (min_run->reader, &min_run->ccase))
708 casereader_destroy (min_run->reader);
709 casefile_destroy (min_run->file);
711 remove_element (runs, run_cnt, sizeof *runs, min_run - runs);
716 casefile_sleep (output);