1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
27 #include "algorithm.h"
34 #include "expressions/public.h"
46 #define _(msgid) gettext (msgid)
48 #include "debug-print.h"
50 /* These should only be changed for testing purposes. */
51 static int min_buffers = 64;
52 static int max_buffers = INT_MAX;
53 static bool allow_internal_sort = true;
55 static int compare_record (const struct ccase *, const struct ccase *,
56 const struct sort_criteria *);
57 static struct casefile *do_internal_sort (struct casereader *,
58 const struct sort_criteria *);
59 static struct casefile *do_external_sort (struct casereader *,
60 const struct sort_criteria *);
62 /* Performs the SORT CASES procedures. */
66 struct sort_criteria *criteria;
71 criteria = sort_parse_criteria (default_dict, NULL, NULL, NULL, NULL);
75 if (test_mode && lex_match ('/'))
77 if (!lex_force_match_id ("BUFFERS") || !lex_match ('=')
81 min_buffers = max_buffers = lex_integer ();
82 allow_internal_sort = false;
85 msg (SE, _("Buffer limit must be at least 2."));
92 success = sort_active_file_in_place (criteria);
96 max_buffers = INT_MAX;
97 allow_internal_sort = true;
99 sort_destroy_criteria (criteria);
100 return success ? lex_end_of_command () : CMD_FAILURE;
103 /* Gets ready to sort the active file, either in-place or to a
104 separate casefile. */
106 prepare_to_sort_active_file (void)
108 /* Cancel temporary transformations and PROCESS IF. */
111 expr_free (process_if_expr);
112 process_if_expr = NULL;
114 /* Make sure source cases are in a storage source. */
115 procedure (NULL, NULL);
116 assert (case_source_is_class (vfm_source, &storage_source_class));
119 /* Sorts the active file in-place according to CRITERIA.
120 Returns nonzero if successful. */
122 sort_active_file_in_place (const struct sort_criteria *criteria)
124 struct casefile *src, *dst;
126 prepare_to_sort_active_file ();
128 src = storage_source_get_casefile (vfm_source);
129 dst = sort_execute (casefile_get_destructive_reader (src), criteria);
130 free_case_source (vfm_source);
136 vfm_source = storage_source_create (dst);
140 /* Sorts the active file to a separate casefile. If successful,
141 returns the sorted casefile. Returns a null pointer on
144 sort_active_file_to_casefile (const struct sort_criteria *criteria)
146 struct casefile *src;
148 prepare_to_sort_active_file ();
150 src = storage_source_get_casefile (vfm_source);
151 return sort_execute (casefile_get_reader (src), criteria);
155 /* Reads all the cases from READER, which is destroyed. Sorts
156 the cases according to CRITERIA. Returns the sorted cases in
157 a newly created casefile. */
159 sort_execute (struct casereader *reader, const struct sort_criteria *criteria)
161 struct casefile *output = do_internal_sort (reader, criteria);
163 output = do_external_sort (reader, criteria);
164 casereader_destroy (reader);
168 /* A case and its index. */
171 struct ccase c; /* Case. */
172 unsigned long idx; /* Index to allow for stable sorting. */
175 static int compare_indexed_cases (const void *, const void *, void *);
177 /* If the data is in memory, do an internal sort and return a new
178 casefile for the data. Otherwise, return a null pointer. */
179 static struct casefile *
180 do_internal_sort (struct casereader *reader,
181 const struct sort_criteria *criteria)
183 const struct casefile *src;
184 struct casefile *dst;
185 unsigned long case_cnt;
187 if (!allow_internal_sort)
190 src = casereader_get_casefile (reader);
191 if (casefile_get_case_cnt (src) > 1 && !casefile_in_core (src))
194 case_cnt = casefile_get_case_cnt (src);
195 dst = casefile_create (casefile_get_value_cnt (src));
198 struct indexed_case *cases = malloc (sizeof *cases * case_cnt);
203 for (i = 0; i < case_cnt; i++)
205 casereader_read_xfer_assert (reader, &cases[i].c);
209 sort (cases, case_cnt, sizeof *cases, compare_indexed_cases,
212 for (i = 0; i < case_cnt; i++)
213 casefile_append_xfer (dst, &cases[i].c);
220 casefile_destroy (dst);
228 /* Compares the variables specified by CRITERIA between the cases
229 at A and B, with a "last resort" comparison for stability, and
230 returns a strcmp()-type result. */
232 compare_indexed_cases (const void *a_, const void *b_, void *criteria_)
234 struct sort_criteria *criteria = criteria_;
235 const struct indexed_case *a = a_;
236 const struct indexed_case *b = b_;
237 int result = compare_record (&a->c, &b->c, criteria);
239 result = a->idx < b->idx ? -1 : a->idx > b->idx;
245 /* Maximum order of merge (external sort only). The maximum
246 reasonable value is about 7. Above that, it would be a good
247 idea to use a heap in merge_once() to select the minimum. */
248 #define MAX_MERGE_ORDER 7
250 /* Results of an external sort. */
253 const struct sort_criteria *criteria; /* Sort criteria. */
254 size_t value_cnt; /* Size of data in `union value's. */
255 struct casefile **runs; /* Array of initial runs. */
256 size_t run_cnt, run_cap; /* Number of runs, allocated capacity. */
259 /* Prototypes for helper functions. */
260 static int write_runs (struct external_sort *, struct casereader *);
261 static struct casefile *merge (struct external_sort *);
262 static void destroy_external_sort (struct external_sort *);
264 /* Performs a stable external sort of the active file according
265 to the specification in SCP. Forms initial runs using a heap
266 as a reservoir. Merges the initial runs according to a
267 pattern that assures stability. */
268 static struct casefile *
269 do_external_sort (struct casereader *reader,
270 const struct sort_criteria *criteria)
272 struct external_sort *xsrt;
274 casefile_to_disk (casereader_get_casefile (reader));
276 xsrt = xmalloc (sizeof *xsrt);
277 xsrt->criteria = criteria;
278 xsrt->value_cnt = casefile_get_value_cnt (casereader_get_casefile (reader));
281 xsrt->runs = xmalloc (sizeof *xsrt->runs * xsrt->run_cap);
282 if (write_runs (xsrt, reader))
284 struct casefile *output = merge (xsrt);
285 destroy_external_sort (xsrt);
290 destroy_external_sort (xsrt);
297 destroy_external_sort (struct external_sort *xsrt)
303 for (i = 0; i < xsrt->run_cnt; i++)
304 casefile_destroy (xsrt->runs[i]);
310 /* Replacement selection. */
312 /* Pairs a record with a run number. */
315 int run; /* Run number of case. */
316 struct ccase record; /* Case data. */
317 size_t idx; /* Case number (for stability). */
320 /* Represents a set of initial runs during an external sort. */
321 struct initial_run_state
323 struct external_sort *xsrt;
326 struct record_run *records; /* Records arranged as a heap. */
327 size_t record_cnt; /* Current number of records. */
328 size_t record_cap; /* Capacity for records. */
330 /* Run currently being output. */
331 int run; /* Run number. */
332 size_t case_cnt; /* Number of cases so far. */
333 struct casefile *casefile; /* Output file. */
334 struct ccase last_output; /* Record last output. */
336 int okay; /* Zero if an error has been encountered. */
339 static const struct case_sink_class sort_sink_class;
341 static void destroy_initial_run_state (struct initial_run_state *);
342 static void process_case (struct initial_run_state *, const struct ccase *,
344 static int allocate_cases (struct initial_run_state *);
345 static void output_record (struct initial_run_state *);
346 static void start_run (struct initial_run_state *);
347 static void end_run (struct initial_run_state *);
348 static int compare_record_run (const struct record_run *,
349 const struct record_run *,
350 struct initial_run_state *);
351 static int compare_record_run_minheap (const void *, const void *, void *);
353 /* Reads cases from READER and composes initial runs in XSRT. */
355 write_runs (struct external_sort *xsrt, struct casereader *reader)
357 struct initial_run_state *irs;
362 /* Allocate memory for cases. */
363 irs = xmalloc (sizeof *irs);
366 irs->record_cnt = irs->record_cap = 0;
369 irs->casefile = NULL;
370 case_nullify (&irs->last_output);
372 if (!allocate_cases (irs))
375 /* Create initial runs. */
377 for (; irs->okay && casereader_read (reader, &c); case_destroy (&c))
378 process_case (irs, &c, idx++);
379 while (irs->okay && irs->record_cnt > 0)
386 destroy_initial_run_state (irs);
391 /* Add a single case to an initial run. */
393 process_case (struct initial_run_state *irs, const struct ccase *c, size_t idx)
395 struct record_run *rr;
397 /* Compose record_run for this run and add to heap. */
398 assert (irs->record_cnt < irs->record_cap - 1);
399 rr = irs->records + irs->record_cnt++;
400 case_copy (&rr->record, 0, c, 0, irs->xsrt->value_cnt);
403 if (!case_is_null (&irs->last_output)
404 && compare_record (c, &irs->last_output, irs->xsrt->criteria) < 0)
405 rr->run = irs->run + 1;
406 push_heap (irs->records, irs->record_cnt, sizeof *irs->records,
407 compare_record_run_minheap, irs);
409 /* Output a record if the reservoir is full. */
410 if (irs->record_cnt == irs->record_cap - 1 && irs->okay)
414 /* Destroys the initial run state represented by IRS. */
416 destroy_initial_run_state (struct initial_run_state *irs)
423 for (i = 0; i < irs->record_cap; i++)
424 case_destroy (&irs->records[i].record);
427 if (irs->casefile != NULL)
428 casefile_sleep (irs->casefile);
433 /* Allocates room for lots of cases as a buffer. */
435 allocate_cases (struct initial_run_state *irs)
437 int approx_case_cost; /* Approximate memory cost of one case in bytes. */
438 int max_cases; /* Maximum number of cases to allocate. */
441 /* Allocate as many cases as we can within the workspace
443 approx_case_cost = (sizeof *irs->records
444 + irs->xsrt->value_cnt * sizeof (union value)
445 + 4 * sizeof (void *));
446 max_cases = get_max_workspace() / approx_case_cost;
447 if (max_cases > max_buffers)
448 max_cases = max_buffers;
449 irs->records = malloc (sizeof *irs->records * max_cases);
450 for (i = 0; i < max_cases; i++)
451 if (!case_try_create (&irs->records[i].record, irs->xsrt->value_cnt))
456 irs->record_cap = max_cases;
458 /* Fail if we didn't allocate an acceptable number of cases. */
459 if (irs->records == NULL || max_cases < min_buffers)
461 msg (SE, _("Out of memory. Could not allocate room for minimum of %d "
462 "cases of %d bytes each. (PSPP workspace is currently "
463 "restricted to a maximum of %d KB.)"),
464 min_buffers, approx_case_cost, get_max_workspace() / 1024);
470 /* Compares the VAR_CNT variables in VARS[] between the `value's at
471 A and B, and returns a strcmp()-type result. */
473 compare_record (const struct ccase *a, const struct ccase *b,
474 const struct sort_criteria *criteria)
481 for (i = 0; i < criteria->crit_cnt; i++)
483 const struct sort_criterion *c = &criteria->crits[i];
488 double af = case_num (a, c->fv);
489 double bf = case_num (b, c->fv);
491 result = af < bf ? -1 : af > bf;
494 result = memcmp (case_str (a, c->fv), case_str (b, c->fv), c->width);
497 return c->dir == SRT_ASCEND ? result : -result;
503 /* Compares record-run tuples A and B on run number first, then
504 on record, then on case index. */
506 compare_record_run (const struct record_run *a,
507 const struct record_run *b,
508 struct initial_run_state *irs)
510 int result = a->run < b->run ? -1 : a->run > b->run;
512 result = compare_record (&a->record, &b->record, irs->xsrt->criteria);
514 result = a->idx < b->idx ? -1 : a->idx > b->idx;
518 /* Compares record-run tuples A and B on run number first, then
519 on the current record according to SCP, but in descending
522 compare_record_run_minheap (const void *a, const void *b, void *irs)
524 return -compare_record_run (a, b, irs);
527 /* Begins a new initial run, specifically its output file. */
529 start_run (struct initial_run_state *irs)
533 irs->casefile = casefile_create (irs->xsrt->value_cnt);
534 casefile_to_disk (irs->casefile);
535 case_nullify (&irs->last_output);
538 /* Ends the current initial run. */
540 end_run (struct initial_run_state *irs)
542 struct external_sort *xsrt = irs->xsrt;
544 /* Record initial run. */
545 if (irs->casefile != NULL)
547 casefile_sleep (irs->casefile);
548 if (xsrt->run_cnt >= xsrt->run_cap)
551 xsrt->runs = xrealloc (xsrt->runs,
552 sizeof *xsrt->runs * xsrt->run_cap);
554 xsrt->runs[xsrt->run_cnt++] = irs->casefile;
555 irs->casefile = NULL;
559 /* Writes a record to the current initial run. */
561 output_record (struct initial_run_state *irs)
563 struct record_run *record_run;
564 struct ccase case_tmp;
566 /* Extract minimum case from heap. */
567 assert (irs->record_cnt > 0);
568 pop_heap (irs->records, irs->record_cnt--, sizeof *irs->records,
569 compare_record_run_minheap, irs);
570 record_run = irs->records + irs->record_cnt;
572 /* Bail if an error has occurred. */
576 /* Start new run if necessary. */
577 assert (record_run->run == irs->run
578 || record_run->run == irs->run + 1);
579 if (record_run->run != irs->run)
584 assert (record_run->run == irs->run);
588 if (irs->casefile != NULL)
589 casefile_append (irs->casefile, &record_run->record);
591 /* This record becomes last_output. */
592 irs->last_output = case_tmp = record_run->record;
593 record_run->record = irs->records[irs->record_cap - 1].record;
594 irs->records[irs->record_cap - 1].record = case_tmp;
599 static int choose_merge (struct casefile *runs[], int run_cnt, int order);
600 static struct casefile *merge_once (struct external_sort *,
601 struct casefile *[], size_t);
603 /* Repeatedly merges run until only one is left,
604 and returns the final casefile. */
605 static struct casefile *
606 merge (struct external_sort *xsrt)
608 while (xsrt->run_cnt > 1)
610 int order = min (MAX_MERGE_ORDER, xsrt->run_cnt);
611 int idx = choose_merge (xsrt->runs, xsrt->run_cnt, order);
612 xsrt->runs[idx] = merge_once (xsrt, xsrt->runs + idx, order);
613 remove_range (xsrt->runs, xsrt->run_cnt, sizeof *xsrt->runs,
615 xsrt->run_cnt -= order - 1;
617 assert (xsrt->run_cnt == 1);
619 return xsrt->runs[0];
622 /* Chooses ORDER runs out of the RUN_CNT runs in RUNS to merge,
623 and returns the index of the first one.
625 For stability, we must merge only consecutive runs. For
626 efficiency, we choose the shortest consecutive sequence of
629 choose_merge (struct casefile *runs[], int run_cnt, int order)
631 int min_idx, min_sum;
632 int cur_idx, cur_sum;
635 /* Sum up the length of the first ORDER runs. */
637 for (i = 0; i < order; i++)
638 cur_sum += casefile_get_case_cnt (runs[i]);
640 /* Find the shortest group of ORDER runs,
641 using a running total for efficiency. */
644 for (cur_idx = 1; cur_idx + order <= run_cnt; cur_idx++)
646 cur_sum -= casefile_get_case_cnt (runs[cur_idx - 1]);
647 cur_sum += casefile_get_case_cnt (runs[cur_idx + order - 1]);
648 if (cur_sum < min_sum)
658 /* Merges the RUN_CNT initial runs specified in INPUT_FILES into a
659 new run, and returns the new run. */
660 static struct casefile *
661 merge_once (struct external_sort *xsrt,
662 struct casefile **const input_files,
667 struct casefile *file;
668 struct casereader *reader;
673 struct casefile *output = NULL;
676 /* Open input files. */
677 runs = xmalloc (sizeof *runs * run_cnt);
678 for (i = 0; i < run_cnt; i++)
680 struct run *r = &runs[i];
681 r->file = input_files[i];
682 r->reader = casefile_get_destructive_reader (r->file);
683 if (!casereader_read_xfer (r->reader, &r->ccase))
690 /* Create output file. */
691 output = casefile_create (xsrt->value_cnt);
692 casefile_to_disk (output);
697 struct run *min_run, *run;
701 for (run = runs + 1; run < runs + run_cnt; run++)
702 if (compare_record (&run->ccase, &min_run->ccase, xsrt->criteria) < 0)
705 /* Write minimum to output file. */
706 casefile_append_xfer (output, &min_run->ccase);
708 /* Read another case from minimum run. */
709 if (!casereader_read_xfer (min_run->reader, &min_run->ccase))
711 casereader_destroy (min_run->reader);
712 casefile_destroy (min_run->file);
714 remove_element (runs, run_cnt, sizeof *runs, min_run - runs);
719 casefile_sleep (output);