1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
27 #include "algorithm.h"
34 #include "expressions/public.h"
44 #include "debug-print.h"
49 SRT_ASCEND, /* A, B, C, ..., X, Y, Z. */
50 SRT_DESCEND /* Z, Y, X, ..., C, B, A. */
53 /* A sort criterion. */
56 int fv; /* Variable data index. */
57 int width; /* 0=numeric, otherwise string widthe. */
58 enum sort_direction dir; /* Sort direction. */
61 /* A set of sort criteria. */
64 struct sort_criterion *crits;
68 /* These should only be changed for testing purposes. */
69 static int min_buffers = 64;
70 static int max_buffers = INT_MAX;
71 static bool allow_internal_sort = true;
73 static int compare_record (const struct ccase *, const struct ccase *,
74 const struct sort_criteria *);
75 static struct casefile *do_internal_sort (struct casereader *,
76 const struct sort_criteria *);
77 static struct casefile *do_external_sort (struct casereader *,
78 const struct sort_criteria *);
80 /* Performs the SORT CASES procedures. */
84 struct sort_criteria *criteria;
89 criteria = sort_parse_criteria (default_dict, NULL, NULL, NULL);
93 if (test_mode && lex_match ('/'))
95 if (!lex_force_match_id ("BUFFERS") || !lex_match ('=')
99 min_buffers = max_buffers = lex_integer ();
100 allow_internal_sort = false;
103 msg (SE, _("Buffer limit must be at least 2."));
110 success = sort_active_file_in_place (criteria);
114 max_buffers = INT_MAX;
115 allow_internal_sort = true;
117 sort_destroy_criteria (criteria);
118 return success ? lex_end_of_command () : CMD_FAILURE;
121 /* Gets ready to sort the active file, either in-place or to a
122 separate casefile. */
124 prepare_to_sort_active_file (void)
126 /* Cancel temporary transformations and PROCESS IF. */
129 expr_free (process_if_expr);
130 process_if_expr = NULL;
132 /* Make sure source cases are in a storage source. */
133 procedure (NULL, NULL);
134 assert (case_source_is_class (vfm_source, &storage_source_class));
137 /* Sorts the active file in-place according to CRITERIA.
138 Returns nonzero if successful. */
140 sort_active_file_in_place (const struct sort_criteria *criteria)
142 struct casefile *src, *dst;
144 prepare_to_sort_active_file ();
146 src = storage_source_get_casefile (vfm_source);
147 dst = sort_execute (casefile_get_destructive_reader (src), criteria);
148 free_case_source (vfm_source);
154 vfm_source = storage_source_create (dst);
158 /* Sorts the active file to a separate casefile. If successful,
159 returns the sorted casefile. Returns a null pointer on
162 sort_active_file_to_casefile (const struct sort_criteria *criteria)
164 struct casefile *src;
166 prepare_to_sort_active_file ();
168 src = storage_source_get_casefile (vfm_source);
169 return sort_execute (casefile_get_reader (src), criteria);
172 /* Parses a list of sort keys and returns a struct sort_criteria
173 based on it. Returns a null pointer on error.
174 If SAW_DIRECTION is nonnull, sets *SAW_DIRECTION to true if at
175 least one parenthesized sort direction was specified, false
177 struct sort_criteria *
178 sort_parse_criteria (const struct dictionary *dict,
179 struct variable ***vars, int *var_cnt,
182 struct sort_criteria *criteria;
183 struct variable **local_vars = NULL;
184 size_t local_var_cnt;
186 assert ((vars == NULL) == (var_cnt == NULL));
190 var_cnt = &local_var_cnt;
193 criteria = xmalloc (sizeof *criteria);
194 criteria->crits = NULL;
195 criteria->crit_cnt = 0;
199 if (saw_direction != NULL)
200 *saw_direction = false;
204 int prev_var_cnt = *var_cnt;
205 enum sort_direction direction;
208 if (!parse_variables (dict, vars, var_cnt,
209 PV_NO_DUPLICATE | PV_APPEND | PV_NO_SCRATCH))
212 /* Sort direction. */
215 if (lex_match_id ("D") || lex_match_id ("DOWN"))
216 direction = SRT_DESCEND;
217 else if (lex_match_id ("A") || lex_match_id ("UP"))
218 direction = SRT_ASCEND;
221 msg (SE, _("`A' or `D' expected inside parentheses."));
224 if (!lex_match (')'))
226 msg (SE, _("`)' expected."));
229 if (saw_direction != NULL)
230 *saw_direction = true;
233 direction = SRT_ASCEND;
235 criteria->crits = xrealloc (criteria->crits,
236 sizeof *criteria->crits * *var_cnt);
237 criteria->crit_cnt = *var_cnt;
238 for (; prev_var_cnt < criteria->crit_cnt; prev_var_cnt++)
240 struct sort_criterion *c = &criteria->crits[prev_var_cnt];
241 c->fv = (*vars)[prev_var_cnt]->fv;
242 c->width = (*vars)[prev_var_cnt]->width;
246 while (token != '.' && token != '/');
253 sort_destroy_criteria (criteria);
257 /* Destroys a SORT CASES program. */
259 sort_destroy_criteria (struct sort_criteria *criteria)
261 if (criteria != NULL)
263 free (criteria->crits);
268 /* Reads all the cases from READER, which is destroyed. Sorts
269 the cases according to CRITERIA. Returns the sorted cases in
270 a newly created casefile. */
272 sort_execute (struct casereader *reader, const struct sort_criteria *criteria)
274 struct casefile *output = do_internal_sort (reader, criteria);
276 output = do_external_sort (reader, criteria);
277 casereader_destroy (reader);
281 /* A case and its index. */
284 struct ccase c; /* Case. */
285 unsigned long idx; /* Index to allow for stable sorting. */
288 static int compare_indexed_cases (const void *, const void *, void *);
290 /* If the data is in memory, do an internal sort and return a new
291 casefile for the data. Otherwise, return a null pointer. */
292 static struct casefile *
293 do_internal_sort (struct casereader *reader,
294 const struct sort_criteria *criteria)
296 const struct casefile *src;
297 struct casefile *dst;
298 unsigned long case_cnt;
300 if (!allow_internal_sort)
303 src = casereader_get_casefile (reader);
304 if (casefile_get_case_cnt (src) > 1 && !casefile_in_core (src))
307 case_cnt = casefile_get_case_cnt (src);
308 dst = casefile_create (casefile_get_value_cnt (src));
311 struct indexed_case *cases = malloc (sizeof *cases * case_cnt);
316 for (i = 0; i < case_cnt; i++)
318 casereader_read_xfer_assert (reader, &cases[i].c);
322 sort (cases, case_cnt, sizeof *cases, compare_indexed_cases,
325 for (i = 0; i < case_cnt; i++)
326 casefile_append_xfer (dst, &cases[i].c);
333 casefile_destroy (dst);
341 /* Compares the variables specified by CRITERIA between the cases
342 at A and B, with a "last resort" comparison for stability, and
343 returns a strcmp()-type result. */
345 compare_indexed_cases (const void *a_, const void *b_, void *criteria_)
347 struct sort_criteria *criteria = criteria_;
348 const struct indexed_case *a = a_;
349 const struct indexed_case *b = b_;
350 int result = compare_record (&a->c, &b->c, criteria);
352 result = a->idx < b->idx ? -1 : a->idx > b->idx;
358 /* Maximum order of merge (external sort only). The maximum
359 reasonable value is about 7. Above that, it would be a good
360 idea to use a heap in merge_once() to select the minimum. */
361 #define MAX_MERGE_ORDER 7
363 /* Results of an external sort. */
366 const struct sort_criteria *criteria; /* Sort criteria. */
367 size_t value_cnt; /* Size of data in `union value's. */
368 struct casefile **runs; /* Array of initial runs. */
369 size_t run_cnt, run_cap; /* Number of runs, allocated capacity. */
372 /* Prototypes for helper functions. */
373 static int write_runs (struct external_sort *, struct casereader *);
374 static struct casefile *merge (struct external_sort *);
375 static void destroy_external_sort (struct external_sort *);
377 /* Performs a stable external sort of the active file according
378 to the specification in SCP. Forms initial runs using a heap
379 as a reservoir. Merges the initial runs according to a
380 pattern that assures stability. */
381 static struct casefile *
382 do_external_sort (struct casereader *reader,
383 const struct sort_criteria *criteria)
385 struct external_sort *xsrt;
387 casefile_to_disk (casereader_get_casefile (reader));
389 xsrt = xmalloc (sizeof *xsrt);
390 xsrt->criteria = criteria;
391 xsrt->value_cnt = casefile_get_value_cnt (casereader_get_casefile (reader));
394 xsrt->runs = xmalloc (sizeof *xsrt->runs * xsrt->run_cap);
395 if (write_runs (xsrt, reader))
397 struct casefile *output = merge (xsrt);
398 destroy_external_sort (xsrt);
403 destroy_external_sort (xsrt);
410 destroy_external_sort (struct external_sort *xsrt)
416 for (i = 0; i < xsrt->run_cnt; i++)
417 casefile_destroy (xsrt->runs[i]);
423 /* Replacement selection. */
425 /* Pairs a record with a run number. */
428 int run; /* Run number of case. */
429 struct ccase record; /* Case data. */
430 size_t idx; /* Case number (for stability). */
433 /* Represents a set of initial runs during an external sort. */
434 struct initial_run_state
436 struct external_sort *xsrt;
439 struct record_run *records; /* Records arranged as a heap. */
440 size_t record_cnt; /* Current number of records. */
441 size_t record_cap; /* Capacity for records. */
443 /* Run currently being output. */
444 int run; /* Run number. */
445 size_t case_cnt; /* Number of cases so far. */
446 struct casefile *casefile; /* Output file. */
447 struct ccase last_output; /* Record last output. */
449 int okay; /* Zero if an error has been encountered. */
452 static const struct case_sink_class sort_sink_class;
454 static void destroy_initial_run_state (struct initial_run_state *);
455 static void process_case (struct initial_run_state *, const struct ccase *,
457 static int allocate_cases (struct initial_run_state *);
458 static void output_record (struct initial_run_state *);
459 static void start_run (struct initial_run_state *);
460 static void end_run (struct initial_run_state *);
461 static int compare_record_run (const struct record_run *,
462 const struct record_run *,
463 struct initial_run_state *);
464 static int compare_record_run_minheap (const void *, const void *, void *);
466 /* Reads cases from READER and composes initial runs in XSRT. */
468 write_runs (struct external_sort *xsrt, struct casereader *reader)
470 struct initial_run_state *irs;
475 /* Allocate memory for cases. */
476 irs = xmalloc (sizeof *irs);
479 irs->record_cnt = irs->record_cap = 0;
482 irs->casefile = NULL;
483 case_nullify (&irs->last_output);
485 if (!allocate_cases (irs))
488 /* Create initial runs. */
490 for (; irs->okay && casereader_read (reader, &c); case_destroy (&c))
491 process_case (irs, &c, idx++);
492 while (irs->okay && irs->record_cnt > 0)
499 destroy_initial_run_state (irs);
504 /* Add a single case to an initial run. */
506 process_case (struct initial_run_state *irs, const struct ccase *c, size_t idx)
508 struct record_run *rr;
510 /* Compose record_run for this run and add to heap. */
511 assert (irs->record_cnt < irs->record_cap - 1);
512 rr = irs->records + irs->record_cnt++;
513 case_copy (&rr->record, 0, c, 0, irs->xsrt->value_cnt);
516 if (!case_is_null (&irs->last_output)
517 && compare_record (c, &irs->last_output, irs->xsrt->criteria) < 0)
518 rr->run = irs->run + 1;
519 push_heap (irs->records, irs->record_cnt, sizeof *irs->records,
520 compare_record_run_minheap, irs);
522 /* Output a record if the reservoir is full. */
523 if (irs->record_cnt == irs->record_cap - 1 && irs->okay)
527 /* Destroys the initial run state represented by IRS. */
529 destroy_initial_run_state (struct initial_run_state *irs)
536 for (i = 0; i < irs->record_cap; i++)
537 case_destroy (&irs->records[i].record);
540 if (irs->casefile != NULL)
541 casefile_sleep (irs->casefile);
546 /* Allocates room for lots of cases as a buffer. */
548 allocate_cases (struct initial_run_state *irs)
550 int approx_case_cost; /* Approximate memory cost of one case in bytes. */
551 int max_cases; /* Maximum number of cases to allocate. */
554 /* Allocate as many cases as we can within the workspace
556 approx_case_cost = (sizeof *irs->records
557 + irs->xsrt->value_cnt * sizeof (union value)
558 + 4 * sizeof (void *));
559 max_cases = get_max_workspace() / approx_case_cost;
560 if (max_cases > max_buffers)
561 max_cases = max_buffers;
562 irs->records = malloc (sizeof *irs->records * max_cases);
563 for (i = 0; i < max_cases; i++)
564 if (!case_try_create (&irs->records[i].record, irs->xsrt->value_cnt))
569 irs->record_cap = max_cases;
571 /* Fail if we didn't allocate an acceptable number of cases. */
572 if (irs->records == NULL || max_cases < min_buffers)
574 msg (SE, _("Out of memory. Could not allocate room for minimum of %d "
575 "cases of %d bytes each. (PSPP workspace is currently "
576 "restricted to a maximum of %d KB.)"),
577 min_buffers, approx_case_cost, get_max_workspace() / 1024);
583 /* Compares the VAR_CNT variables in VARS[] between the `value's at
584 A and B, and returns a strcmp()-type result. */
586 compare_record (const struct ccase *a, const struct ccase *b,
587 const struct sort_criteria *criteria)
594 for (i = 0; i < criteria->crit_cnt; i++)
596 const struct sort_criterion *c = &criteria->crits[i];
601 double af = case_num (a, c->fv);
602 double bf = case_num (b, c->fv);
604 result = af < bf ? -1 : af > bf;
607 result = memcmp (case_str (a, c->fv), case_str (b, c->fv), c->width);
610 return c->dir == SRT_ASCEND ? result : -result;
616 /* Compares record-run tuples A and B on run number first, then
617 on record, then on case index. */
619 compare_record_run (const struct record_run *a,
620 const struct record_run *b,
621 struct initial_run_state *irs)
623 int result = a->run < b->run ? -1 : a->run > b->run;
625 result = compare_record (&a->record, &b->record, irs->xsrt->criteria);
627 result = a->idx < b->idx ? -1 : a->idx > b->idx;
631 /* Compares record-run tuples A and B on run number first, then
632 on the current record according to SCP, but in descending
635 compare_record_run_minheap (const void *a, const void *b, void *irs)
637 return -compare_record_run (a, b, irs);
640 /* Begins a new initial run, specifically its output file. */
642 start_run (struct initial_run_state *irs)
646 irs->casefile = casefile_create (irs->xsrt->value_cnt);
647 casefile_to_disk (irs->casefile);
648 case_nullify (&irs->last_output);
651 /* Ends the current initial run. */
653 end_run (struct initial_run_state *irs)
655 struct external_sort *xsrt = irs->xsrt;
657 /* Record initial run. */
658 if (irs->casefile != NULL)
660 casefile_sleep (irs->casefile);
661 if (xsrt->run_cnt >= xsrt->run_cap)
664 xsrt->runs = xrealloc (xsrt->runs,
665 sizeof *xsrt->runs * xsrt->run_cap);
667 xsrt->runs[xsrt->run_cnt++] = irs->casefile;
668 irs->casefile = NULL;
672 /* Writes a record to the current initial run. */
674 output_record (struct initial_run_state *irs)
676 struct record_run *record_run;
677 struct ccase case_tmp;
679 /* Extract minimum case from heap. */
680 assert (irs->record_cnt > 0);
681 pop_heap (irs->records, irs->record_cnt--, sizeof *irs->records,
682 compare_record_run_minheap, irs);
683 record_run = irs->records + irs->record_cnt;
685 /* Bail if an error has occurred. */
689 /* Start new run if necessary. */
690 assert (record_run->run == irs->run
691 || record_run->run == irs->run + 1);
692 if (record_run->run != irs->run)
697 assert (record_run->run == irs->run);
701 if (irs->casefile != NULL)
702 casefile_append (irs->casefile, &record_run->record);
704 /* This record becomes last_output. */
705 irs->last_output = case_tmp = record_run->record;
706 record_run->record = irs->records[irs->record_cap - 1].record;
707 irs->records[irs->record_cap - 1].record = case_tmp;
712 static int choose_merge (struct casefile *runs[], int run_cnt, int order);
713 static struct casefile *merge_once (struct external_sort *,
714 struct casefile *[], size_t);
716 /* Repeatedly merges run until only one is left,
717 and returns the final casefile. */
718 static struct casefile *
719 merge (struct external_sort *xsrt)
721 while (xsrt->run_cnt > 1)
723 int order = min (MAX_MERGE_ORDER, xsrt->run_cnt);
724 int idx = choose_merge (xsrt->runs, xsrt->run_cnt, order);
725 xsrt->runs[idx] = merge_once (xsrt, xsrt->runs + idx, order);
726 remove_range (xsrt->runs, xsrt->run_cnt, sizeof *xsrt->runs,
728 xsrt->run_cnt -= order - 1;
730 assert (xsrt->run_cnt == 1);
732 return xsrt->runs[0];
735 /* Chooses ORDER runs out of the RUN_CNT runs in RUNS to merge,
736 and returns the index of the first one.
738 For stability, we must merge only consecutive runs. For
739 efficiency, we choose the shortest consecutive sequence of
742 choose_merge (struct casefile *runs[], int run_cnt, int order)
744 int min_idx, min_sum;
745 int cur_idx, cur_sum;
748 /* Sum up the length of the first ORDER runs. */
750 for (i = 0; i < order; i++)
751 cur_sum += casefile_get_case_cnt (runs[i]);
753 /* Find the shortest group of ORDER runs,
754 using a running total for efficiency. */
757 for (cur_idx = 1; cur_idx + order <= run_cnt; cur_idx++)
759 cur_sum -= casefile_get_case_cnt (runs[cur_idx - 1]);
760 cur_sum += casefile_get_case_cnt (runs[cur_idx + order - 1]);
761 if (cur_sum < min_sum)
771 /* Merges the RUN_CNT initial runs specified in INPUT_FILES into a
772 new run, and returns the new run. */
773 static struct casefile *
774 merge_once (struct external_sort *xsrt,
775 struct casefile **const input_files,
780 struct casefile *file;
781 struct casereader *reader;
786 struct casefile *output = NULL;
789 /* Open input files. */
790 runs = xmalloc (sizeof *runs * run_cnt);
791 for (i = 0; i < run_cnt; i++)
793 struct run *r = &runs[i];
794 r->file = input_files[i];
795 r->reader = casefile_get_destructive_reader (r->file);
796 if (!casereader_read_xfer (r->reader, &r->ccase))
803 /* Create output file. */
804 output = casefile_create (xsrt->value_cnt);
805 casefile_to_disk (output);
810 struct run *min_run, *run;
814 for (run = runs + 1; run < runs + run_cnt; run++)
815 if (compare_record (&run->ccase, &min_run->ccase, xsrt->criteria) < 0)
818 /* Write minimum to output file. */
819 casefile_append_xfer (output, &min_run->ccase);
821 /* Read another case from minimum run. */
822 if (!casereader_read_xfer (min_run->reader, &min_run->ccase))
824 casereader_destroy (min_run->reader);
825 casefile_destroy (min_run->file);
827 remove_element (runs, run_cnt, sizeof *runs, min_run - runs);
832 casefile_sleep (output);