1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
27 #include "algorithm.h"
34 #include "expressions/public.h"
44 #include "debug-print.h"
49 SRT_ASCEND, /* A, B, C, ..., X, Y, Z. */
50 SRT_DESCEND /* Z, Y, X, ..., C, B, A. */
53 /* A sort criterion. */
56 int fv; /* Variable data index. */
57 int width; /* 0=numeric, otherwise string widthe. */
58 enum sort_direction dir; /* Sort direction. */
61 /* A set of sort criteria. */
64 struct sort_criterion *crits;
68 /* These should only be changed for testing purposes. */
69 static int min_buffers = 64;
70 static int max_buffers = INT_MAX;
71 static bool allow_internal_sort = true;
73 static int compare_record (const struct ccase *, const struct ccase *,
74 const struct sort_criteria *);
75 static struct casefile *do_internal_sort (struct casereader *,
76 const struct sort_criteria *);
77 static struct casefile *do_external_sort (struct casereader *,
78 const struct sort_criteria *);
80 /* Performs the SORT CASES procedures. */
84 struct sort_criteria *criteria;
89 criteria = sort_parse_criteria (default_dict, NULL, NULL, NULL);
93 if (test_mode && lex_match ('/'))
95 if (!lex_force_match_id ("BUFFERS") || !lex_match ('=')
99 min_buffers = max_buffers = lex_integer ();
100 allow_internal_sort = false;
103 msg (SE, _("Buffer limit must be at least 2."));
110 success = sort_active_file_in_place (criteria);
114 max_buffers = INT_MAX;
115 allow_internal_sort = true;
117 sort_destroy_criteria (criteria);
118 return success ? lex_end_of_command () : CMD_FAILURE;
121 /* Gets ready to sort the active file, either in-place or to a
122 separate casefile. */
124 prepare_to_sort_active_file (void)
126 /* Cancel temporary transformations and PROCESS IF. */
129 expr_free (process_if_expr);
130 process_if_expr = NULL;
132 /* Make sure source cases are in a storage source. */
133 procedure (NULL, NULL);
134 assert (case_source_is_class (vfm_source, &storage_source_class));
137 /* Sorts the active file in-place according to CRITERIA.
138 Returns nonzero if successful. */
140 sort_active_file_in_place (const struct sort_criteria *criteria)
142 struct casefile *src, *dst;
144 prepare_to_sort_active_file ();
146 src = storage_source_get_casefile (vfm_source);
147 dst = sort_execute (casefile_get_destructive_reader (src), criteria);
148 free_case_source (vfm_source);
154 vfm_source = storage_source_create (dst);
158 /* Sorts the active file to a separate casefile. If successful,
159 returns the sorted casefile. Returns a null pointer on
162 sort_active_file_to_casefile (const struct sort_criteria *criteria)
164 struct casefile *src;
166 prepare_to_sort_active_file ();
168 src = storage_source_get_casefile (vfm_source);
169 return sort_execute (casefile_get_reader (src), criteria);
172 /* Parses a list of sort keys and returns a struct sort_criteria
173 based on it. Returns a null pointer on error.
174 If SAW_DIRECTION is nonnull, sets *SAW_DIRECTION to true if at
175 least one parenthesized sort direction was specified, false
177 struct sort_criteria *
178 sort_parse_criteria (const struct dictionary *dict,
179 struct variable ***vars, int *var_cnt,
182 struct sort_criteria *criteria;
183 struct variable **local_vars = NULL;
184 size_t local_var_cnt;
186 assert ((vars == NULL) == (var_cnt == NULL));
190 var_cnt = &local_var_cnt;
193 criteria = xmalloc (sizeof *criteria);
194 criteria->crits = NULL;
195 criteria->crit_cnt = 0;
199 if (saw_direction != NULL)
200 *saw_direction = false;
204 int prev_var_cnt = *var_cnt;
205 enum sort_direction direction;
208 if (!parse_variables (dict, vars, var_cnt,
209 PV_NO_DUPLICATE | PV_APPEND | PV_NO_SCRATCH))
212 /* Sort direction. */
215 if (lex_match_id ("D") || lex_match_id ("DOWN"))
216 direction = SRT_DESCEND;
217 else if (lex_match_id ("A") || lex_match_id ("UP"))
218 direction = SRT_ASCEND;
221 msg (SE, _("`A' or `D' expected inside parentheses."));
224 if (!lex_match (')'))
226 msg (SE, _("`)' expected."));
229 *saw_direction = true;
232 direction = SRT_ASCEND;
234 criteria->crits = xrealloc (criteria->crits,
235 sizeof *criteria->crits * *var_cnt);
236 criteria->crit_cnt = *var_cnt;
237 for (; prev_var_cnt < criteria->crit_cnt; prev_var_cnt++)
239 struct sort_criterion *c = &criteria->crits[prev_var_cnt];
240 c->fv = (*vars)[prev_var_cnt]->fv;
241 c->width = (*vars)[prev_var_cnt]->width;
245 while (token != '.' && token != '/');
252 sort_destroy_criteria (criteria);
256 /* Destroys a SORT CASES program. */
258 sort_destroy_criteria (struct sort_criteria *criteria)
260 if (criteria != NULL)
262 free (criteria->crits);
267 /* Reads all the cases from READER, which is destroyed. Sorts
268 the cases according to CRITERIA. Returns the sorted cases in
269 a newly created casefile. */
271 sort_execute (struct casereader *reader, const struct sort_criteria *criteria)
273 struct casefile *output = do_internal_sort (reader, criteria);
275 output = do_external_sort (reader, criteria);
276 casereader_destroy (reader);
280 /* A case and its index. */
283 struct ccase c; /* Case. */
284 unsigned long idx; /* Index to allow for stable sorting. */
287 static int compare_indexed_cases (const void *, const void *, void *);
289 /* If the data is in memory, do an internal sort and return a new
290 casefile for the data. Otherwise, return a null pointer. */
291 static struct casefile *
292 do_internal_sort (struct casereader *reader,
293 const struct sort_criteria *criteria)
295 const struct casefile *src;
296 struct casefile *dst;
297 unsigned long case_cnt;
299 if (!allow_internal_sort)
302 src = casereader_get_casefile (reader);
303 if (casefile_get_case_cnt (src) > 1 && !casefile_in_core (src))
306 case_cnt = casefile_get_case_cnt (src);
307 dst = casefile_create (casefile_get_value_cnt (src));
310 struct indexed_case *cases = malloc (sizeof *cases * case_cnt);
315 for (i = 0; i < case_cnt; i++)
317 casereader_read_xfer_assert (reader, &cases[i].c);
321 sort (cases, case_cnt, sizeof *cases, compare_indexed_cases,
324 for (i = 0; i < case_cnt; i++)
325 casefile_append_xfer (dst, &cases[i].c);
332 casefile_destroy (dst);
340 /* Compares the variables specified by CRITERIA between the cases
341 at A and B, with a "last resort" comparison for stability, and
342 returns a strcmp()-type result. */
344 compare_indexed_cases (const void *a_, const void *b_, void *criteria_)
346 struct sort_criteria *criteria = criteria_;
347 const struct indexed_case *a = a_;
348 const struct indexed_case *b = b_;
349 int result = compare_record (&a->c, &b->c, criteria);
351 result = a->idx < b->idx ? -1 : a->idx > b->idx;
357 /* Maximum order of merge (external sort only). The maximum
358 reasonable value is about 7. Above that, it would be a good
359 idea to use a heap in merge_once() to select the minimum. */
360 #define MAX_MERGE_ORDER 7
362 /* Results of an external sort. */
365 const struct sort_criteria *criteria; /* Sort criteria. */
366 size_t value_cnt; /* Size of data in `union value's. */
367 struct casefile **runs; /* Array of initial runs. */
368 size_t run_cnt, run_cap; /* Number of runs, allocated capacity. */
371 /* Prototypes for helper functions. */
372 static int write_runs (struct external_sort *, struct casereader *);
373 static struct casefile *merge (struct external_sort *);
374 static void destroy_external_sort (struct external_sort *);
376 /* Performs a stable external sort of the active file according
377 to the specification in SCP. Forms initial runs using a heap
378 as a reservoir. Merges the initial runs according to a
379 pattern that assures stability. */
380 static struct casefile *
381 do_external_sort (struct casereader *reader,
382 const struct sort_criteria *criteria)
384 struct external_sort *xsrt;
386 casefile_to_disk (casereader_get_casefile (reader));
388 xsrt = xmalloc (sizeof *xsrt);
389 xsrt->criteria = criteria;
390 xsrt->value_cnt = casefile_get_value_cnt (casereader_get_casefile (reader));
393 xsrt->runs = xmalloc (sizeof *xsrt->runs * xsrt->run_cap);
394 if (write_runs (xsrt, reader))
396 struct casefile *output = merge (xsrt);
397 destroy_external_sort (xsrt);
402 destroy_external_sort (xsrt);
409 destroy_external_sort (struct external_sort *xsrt)
415 for (i = 0; i < xsrt->run_cnt; i++)
416 casefile_destroy (xsrt->runs[i]);
422 /* Replacement selection. */
424 /* Pairs a record with a run number. */
427 int run; /* Run number of case. */
428 struct ccase record; /* Case data. */
429 size_t idx; /* Case number (for stability). */
432 /* Represents a set of initial runs during an external sort. */
433 struct initial_run_state
435 struct external_sort *xsrt;
438 struct record_run *records; /* Records arranged as a heap. */
439 size_t record_cnt; /* Current number of records. */
440 size_t record_cap; /* Capacity for records. */
442 /* Run currently being output. */
443 int run; /* Run number. */
444 size_t case_cnt; /* Number of cases so far. */
445 struct casefile *casefile; /* Output file. */
446 struct ccase last_output; /* Record last output. */
448 int okay; /* Zero if an error has been encountered. */
451 static const struct case_sink_class sort_sink_class;
453 static void destroy_initial_run_state (struct initial_run_state *);
454 static void process_case (struct initial_run_state *, const struct ccase *,
456 static int allocate_cases (struct initial_run_state *);
457 static void output_record (struct initial_run_state *);
458 static void start_run (struct initial_run_state *);
459 static void end_run (struct initial_run_state *);
460 static int compare_record_run (const struct record_run *,
461 const struct record_run *,
462 struct initial_run_state *);
463 static int compare_record_run_minheap (const void *, const void *, void *);
465 /* Reads cases from READER and composes initial runs in XSRT. */
467 write_runs (struct external_sort *xsrt, struct casereader *reader)
469 struct initial_run_state *irs;
474 /* Allocate memory for cases. */
475 irs = xmalloc (sizeof *irs);
478 irs->record_cnt = irs->record_cap = 0;
481 irs->casefile = NULL;
482 case_nullify (&irs->last_output);
484 if (!allocate_cases (irs))
487 /* Create initial runs. */
489 for (; irs->okay && casereader_read (reader, &c); case_destroy (&c))
490 process_case (irs, &c, idx++);
491 while (irs->okay && irs->record_cnt > 0)
498 destroy_initial_run_state (irs);
503 /* Add a single case to an initial run. */
505 process_case (struct initial_run_state *irs, const struct ccase *c, size_t idx)
507 struct record_run *rr;
509 /* Compose record_run for this run and add to heap. */
510 assert (irs->record_cnt < irs->record_cap - 1);
511 rr = irs->records + irs->record_cnt++;
512 case_copy (&rr->record, 0, c, 0, irs->xsrt->value_cnt);
515 if (!case_is_null (&irs->last_output)
516 && compare_record (c, &irs->last_output, irs->xsrt->criteria) < 0)
517 rr->run = irs->run + 1;
518 push_heap (irs->records, irs->record_cnt, sizeof *irs->records,
519 compare_record_run_minheap, irs);
521 /* Output a record if the reservoir is full. */
522 if (irs->record_cnt == irs->record_cap - 1 && irs->okay)
526 /* Destroys the initial run state represented by IRS. */
528 destroy_initial_run_state (struct initial_run_state *irs)
535 for (i = 0; i < irs->record_cap; i++)
536 case_destroy (&irs->records[i].record);
539 if (irs->casefile != NULL)
540 casefile_sleep (irs->casefile);
545 /* Allocates room for lots of cases as a buffer. */
547 allocate_cases (struct initial_run_state *irs)
549 int approx_case_cost; /* Approximate memory cost of one case in bytes. */
550 int max_cases; /* Maximum number of cases to allocate. */
553 /* Allocate as many cases as we can within the workspace
555 approx_case_cost = (sizeof *irs->records
556 + irs->xsrt->value_cnt * sizeof (union value)
557 + 4 * sizeof (void *));
558 max_cases = get_max_workspace() / approx_case_cost;
559 if (max_cases > max_buffers)
560 max_cases = max_buffers;
561 irs->records = malloc (sizeof *irs->records * max_cases);
562 for (i = 0; i < max_cases; i++)
563 if (!case_try_create (&irs->records[i].record, irs->xsrt->value_cnt))
568 irs->record_cap = max_cases;
570 /* Fail if we didn't allocate an acceptable number of cases. */
571 if (irs->records == NULL || max_cases < min_buffers)
573 msg (SE, _("Out of memory. Could not allocate room for minimum of %d "
574 "cases of %d bytes each. (PSPP workspace is currently "
575 "restricted to a maximum of %d KB.)"),
576 min_buffers, approx_case_cost, get_max_workspace() / 1024);
582 /* Compares the VAR_CNT variables in VARS[] between the `value's at
583 A and B, and returns a strcmp()-type result. */
585 compare_record (const struct ccase *a, const struct ccase *b,
586 const struct sort_criteria *criteria)
593 for (i = 0; i < criteria->crit_cnt; i++)
595 const struct sort_criterion *c = &criteria->crits[i];
600 double af = case_num (a, c->fv);
601 double bf = case_num (b, c->fv);
603 result = af < bf ? -1 : af > bf;
606 result = memcmp (case_str (a, c->fv), case_str (b, c->fv), c->width);
609 return c->dir == SRT_ASCEND ? result : -result;
615 /* Compares record-run tuples A and B on run number first, then
616 on record, then on case index. */
618 compare_record_run (const struct record_run *a,
619 const struct record_run *b,
620 struct initial_run_state *irs)
622 int result = a->run < b->run ? -1 : a->run > b->run;
624 result = compare_record (&a->record, &b->record, irs->xsrt->criteria);
626 result = a->idx < b->idx ? -1 : a->idx > b->idx;
630 /* Compares record-run tuples A and B on run number first, then
631 on the current record according to SCP, but in descending
634 compare_record_run_minheap (const void *a, const void *b, void *irs)
636 return -compare_record_run (a, b, irs);
639 /* Begins a new initial run, specifically its output file. */
641 start_run (struct initial_run_state *irs)
645 irs->casefile = casefile_create (irs->xsrt->value_cnt);
646 casefile_to_disk (irs->casefile);
647 case_nullify (&irs->last_output);
650 /* Ends the current initial run. */
652 end_run (struct initial_run_state *irs)
654 struct external_sort *xsrt = irs->xsrt;
656 /* Record initial run. */
657 if (irs->casefile != NULL)
659 casefile_sleep (irs->casefile);
660 if (xsrt->run_cnt >= xsrt->run_cap)
663 xsrt->runs = xrealloc (xsrt->runs,
664 sizeof *xsrt->runs * xsrt->run_cap);
666 xsrt->runs[xsrt->run_cnt++] = irs->casefile;
667 irs->casefile = NULL;
671 /* Writes a record to the current initial run. */
673 output_record (struct initial_run_state *irs)
675 struct record_run *record_run;
676 struct ccase case_tmp;
678 /* Extract minimum case from heap. */
679 assert (irs->record_cnt > 0);
680 pop_heap (irs->records, irs->record_cnt--, sizeof *irs->records,
681 compare_record_run_minheap, irs);
682 record_run = irs->records + irs->record_cnt;
684 /* Bail if an error has occurred. */
688 /* Start new run if necessary. */
689 assert (record_run->run == irs->run
690 || record_run->run == irs->run + 1);
691 if (record_run->run != irs->run)
696 assert (record_run->run == irs->run);
700 if (irs->casefile != NULL)
701 casefile_append (irs->casefile, &record_run->record);
703 /* This record becomes last_output. */
704 irs->last_output = case_tmp = record_run->record;
705 record_run->record = irs->records[irs->record_cap - 1].record;
706 irs->records[irs->record_cap - 1].record = case_tmp;
711 static int choose_merge (struct casefile *runs[], int run_cnt, int order);
712 static struct casefile *merge_once (struct external_sort *,
713 struct casefile *[], size_t);
715 /* Repeatedly merges run until only one is left,
716 and returns the final casefile. */
717 static struct casefile *
718 merge (struct external_sort *xsrt)
720 while (xsrt->run_cnt > 1)
722 int order = min (MAX_MERGE_ORDER, xsrt->run_cnt);
723 int idx = choose_merge (xsrt->runs, xsrt->run_cnt, order);
724 xsrt->runs[idx] = merge_once (xsrt, xsrt->runs + idx, order);
725 remove_range (xsrt->runs, xsrt->run_cnt, sizeof *xsrt->runs,
727 xsrt->run_cnt -= order - 1;
729 assert (xsrt->run_cnt == 1);
731 return xsrt->runs[0];
734 /* Chooses ORDER runs out of the RUN_CNT runs in RUNS to merge,
735 and returns the index of the first one.
737 For stability, we must merge only consecutive runs. For
738 efficiency, we choose the shortest consecutive sequence of
741 choose_merge (struct casefile *runs[], int run_cnt, int order)
743 int min_idx, min_sum;
744 int cur_idx, cur_sum;
747 /* Sum up the length of the first ORDER runs. */
749 for (i = 0; i < order; i++)
750 cur_sum += casefile_get_case_cnt (runs[i]);
752 /* Find the shortest group of ORDER runs,
753 using a running total for efficiency. */
756 for (cur_idx = 1; cur_idx + order <= run_cnt; cur_idx++)
758 cur_sum -= casefile_get_case_cnt (runs[cur_idx - 1]);
759 cur_sum += casefile_get_case_cnt (runs[cur_idx + order - 1]);
760 if (cur_sum < min_sum)
770 /* Merges the RUN_CNT initial runs specified in INPUT_FILES into a
771 new run, and returns the new run. */
772 static struct casefile *
773 merge_once (struct external_sort *xsrt,
774 struct casefile **const input_files,
779 struct casefile *file;
780 struct casereader *reader;
785 struct casefile *output = NULL;
788 /* Open input files. */
789 runs = xmalloc (sizeof *runs * run_cnt);
790 for (i = 0; i < run_cnt; i++)
792 struct run *r = &runs[i];
793 r->file = input_files[i];
794 r->reader = casefile_get_destructive_reader (r->file);
795 if (!casereader_read_xfer (r->reader, &r->ccase))
802 /* Create output file. */
803 output = casefile_create (xsrt->value_cnt);
804 casefile_to_disk (output);
809 struct run *min_run, *run;
813 for (run = runs + 1; run < runs + run_cnt; run++)
814 if (compare_record (&run->ccase, &min_run->ccase, xsrt->criteria) < 0)
817 /* Write minimum to output file. */
818 casefile_append_xfer (output, &min_run->ccase);
820 /* Read another case from minimum run. */
821 if (!casereader_read_xfer (min_run->reader, &min_run->ccase))
823 casereader_destroy (min_run->reader);
824 casefile_destroy (min_run->file);
826 remove_element (runs, run_cnt, sizeof *runs, min_run - runs);
831 casefile_sleep (output);