1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
26 #include "algorithm.h"
33 #include "expressions/public.h"
47 #include <sys/types.h>
54 #include "debug-print.h"
59 SRT_ASCEND, /* A, B, C, ..., X, Y, Z. */
60 SRT_DESCEND /* Z, Y, X, ..., C, B, A. */
63 /* A sort criterion. */
66 int fv; /* Variable data index. */
67 int width; /* 0=numeric, otherwise string widthe. */
68 enum sort_direction dir; /* Sort direction. */
71 /* A set of sort criteria. */
74 struct sort_criterion *crits;
78 static int compare_case_dblptrs (const void *, const void *, void *);
79 static int compare_record (const struct ccase *, const struct ccase *,
80 const struct sort_criteria *);
81 static struct casefile *do_internal_sort (struct casereader *,
82 const struct sort_criteria *);
83 static struct casefile *do_external_sort (struct casereader *,
84 const struct sort_criteria *);
86 /* Performs the SORT CASES procedures. */
90 struct sort_criteria *criteria;
95 criteria = sort_parse_criteria (default_dict, NULL, NULL, NULL);
99 success = sort_active_file_in_place (criteria);
100 sort_destroy_criteria (criteria);
101 return success ? lex_end_of_command () : CMD_FAILURE;
104 /* Gets ready to sort the active file, either in-place or to a
105 separate casefile. */
107 prepare_to_sort_active_file (void)
109 /* Cancel temporary transformations and PROCESS IF. */
112 expr_free (process_if_expr);
113 process_if_expr = NULL;
115 /* Make sure source cases are in a storage source. */
116 procedure (NULL, NULL);
117 assert (case_source_is_class (vfm_source, &storage_source_class));
120 /* Sorts the active file in-place according to CRITERIA.
121 Returns nonzero if successful. */
123 sort_active_file_in_place (const struct sort_criteria *criteria)
125 struct casefile *src, *dst;
127 prepare_to_sort_active_file ();
129 src = storage_source_get_casefile (vfm_source);
130 dst = sort_execute (casefile_get_destructive_reader (src), criteria);
131 free_case_source (vfm_source);
137 vfm_source = storage_source_create (dst, default_dict);
141 /* Sorts the active file to a separate casefile. If successful,
142 returns the sorted casefile. Returns a null pointer on
145 sort_active_file_to_casefile (const struct sort_criteria *criteria)
147 struct casefile *src;
149 prepare_to_sort_active_file ();
151 src = storage_source_get_casefile (vfm_source);
152 return sort_execute (casefile_get_reader (src), criteria);
155 /* Parses a list of sort keys and returns a struct sort_criteria
156 based on it. Returns a null pointer on error.
157 If SAW_DIRECTION is nonnull, sets *SAW_DIRECTION to true if at
158 least one parenthesized sort direction was specified, false
160 struct sort_criteria *
161 sort_parse_criteria (const struct dictionary *dict,
162 struct variable ***vars, int *var_cnt,
165 struct sort_criteria *criteria;
166 struct variable **local_vars = NULL;
167 size_t local_var_cnt;
169 assert ((vars == NULL) == (var_cnt == NULL));
173 var_cnt = &local_var_cnt;
176 criteria = xmalloc (sizeof *criteria);
177 criteria->crits = NULL;
178 criteria->crit_cnt = 0;
182 if (saw_direction != NULL)
183 *saw_direction = false;
187 int prev_var_cnt = *var_cnt;
188 enum sort_direction direction;
191 if (!parse_variables (dict, vars, var_cnt,
192 PV_NO_DUPLICATE | PV_APPEND | PV_NO_SCRATCH))
195 /* Sort direction. */
198 if (lex_match_id ("D") || lex_match_id ("DOWN"))
199 direction = SRT_DESCEND;
200 else if (lex_match_id ("A") || lex_match_id ("UP"))
201 direction = SRT_ASCEND;
204 msg (SE, _("`A' or `D' expected inside parentheses."));
207 if (!lex_match (')'))
209 msg (SE, _("`)' expected."));
212 *saw_direction = true;
215 direction = SRT_ASCEND;
217 criteria->crits = xrealloc (criteria->crits,
218 sizeof *criteria->crits * *var_cnt);
219 criteria->crit_cnt = *var_cnt;
220 for (; prev_var_cnt < criteria->crit_cnt; prev_var_cnt++)
222 struct sort_criterion *c = &criteria->crits[prev_var_cnt];
223 c->fv = (*vars)[prev_var_cnt]->fv;
224 c->width = (*vars)[prev_var_cnt]->width;
228 while (token != '.' && token != '/');
235 sort_destroy_criteria (criteria);
239 /* Destroys a SORT CASES program. */
241 sort_destroy_criteria (struct sort_criteria *criteria)
243 if (criteria != NULL)
245 free (criteria->crits);
250 /* Reads all the cases from READER, which is destroyed. Sorts
251 the cases according to CRITERIA. Returns the sorted cases in
252 a newly created casefile. */
254 sort_execute (struct casereader *reader, const struct sort_criteria *criteria)
256 struct casefile *output;
258 output = do_internal_sort (reader, criteria);
260 output = do_external_sort (reader, criteria);
261 casereader_destroy (reader);
265 /* If the data is in memory, do an internal sort and return a new
266 casefile for the data. */
267 static struct casefile *
268 do_internal_sort (struct casereader *reader,
269 const struct sort_criteria *criteria)
271 const struct casefile *src;
272 struct casefile *dst;
273 struct ccase *cases, **case_ptrs;
274 unsigned long case_cnt;
276 src = casereader_get_casefile (reader);
277 if (casefile_get_case_cnt (src) > 1 && !casefile_in_core (src))
280 case_cnt = casefile_get_case_cnt (src);
281 cases = malloc (sizeof *cases * case_cnt);
282 case_ptrs = malloc (sizeof *case_ptrs * case_cnt);
283 if ((cases != NULL && case_ptrs != NULL) || case_cnt == 0)
285 unsigned long case_idx;
287 for (case_idx = 0; case_idx < case_cnt; case_idx++)
289 int success = casereader_read_xfer (reader, &cases[case_idx]);
291 case_ptrs[case_idx] = &cases[case_idx];
294 sort (case_ptrs, case_cnt, sizeof *case_ptrs, compare_case_dblptrs,
297 dst = casefile_create (casefile_get_value_cnt (src));
298 for (case_idx = 0; case_idx < case_cnt; case_idx++)
299 casefile_append_xfer (dst, case_ptrs[case_idx]);
310 /* Compares the variables specified by CRITERIA between the cases
311 at A and B, and returns a strcmp()-type result. */
313 compare_case_dblptrs (const void *a_, const void *b_, void *criteria_)
315 struct sort_criteria *criteria = criteria_;
316 struct ccase *const *pa = a_;
317 struct ccase *const *pb = b_;
318 struct ccase *a = *pa;
319 struct ccase *b = *pb;
321 return compare_record (a, b, criteria);
326 /* Maximum order of merge. If you increase this, then you should
327 use a heap for comparing cases during merge. */
328 #define MAX_MERGE_ORDER 7
330 /* Minimum total number of records for buffers. */
331 #define MIN_BUFFER_TOTAL_SIZE_RECS 64
333 /* Minimum single input buffer size, in bytes and records. */
334 #define MIN_BUFFER_SIZE_BYTES 4096
335 #define MIN_BUFFER_SIZE_RECS 16
337 #if MIN_BUFFER_SIZE_RECS * 2 + 16 > MIN_BUFFER_TOTAL_SIZE_RECS
338 #error MIN_BUFFER_SIZE_RECS and MIN_BUFFER_TOTAL_SIZE_RECS do not make sense.
341 /* Sorts initial runs A and B in decending order by length. */
343 compare_initial_runs (const void *a_, const void *b_, void *aux UNUSED)
345 struct casefile *const *a = a_;
346 struct casefile *const *b = b_;
347 unsigned long a_case_cnt = casefile_get_case_cnt (*a);
348 unsigned long b_case_cnt = casefile_get_case_cnt (*b);
350 return a_case_cnt > b_case_cnt ? -1 : a_case_cnt < b_case_cnt;
353 /* Results of an external sort. */
356 const struct sort_criteria *criteria; /* Sort criteria. */
357 size_t value_cnt; /* Size of data in `union value's. */
358 struct casefile **initial_runs; /* Array of initial runs. */
359 size_t run_cnt, run_cap; /* Number of runs, allocated capacity. */
362 /* Prototypes for helper functions. */
363 static int write_initial_runs (struct external_sort *, struct casereader *);
364 static int merge (struct external_sort *);
365 static void destroy_external_sort (struct external_sort *);
367 /* Performs an external sort of the active file according to the
368 specification in SCP. Forms initial runs using a heap as a
369 reservoir. Determines the optimum merge pattern via Huffman's
370 method (see Knuth vol. 3, 2nd edition, p. 365-366), and merges
371 according to that pattern. */
372 static struct casefile *
373 do_external_sort (struct casereader *reader,
374 const struct sort_criteria *criteria)
376 struct external_sort *xsrt;
378 casefile_to_disk (casereader_get_casefile (reader));
380 xsrt = xmalloc (sizeof *xsrt);
381 xsrt->criteria = criteria;
382 xsrt->value_cnt = casefile_get_value_cnt (casereader_get_casefile (reader));
385 xsrt->initial_runs = xmalloc (sizeof *xsrt->initial_runs * xsrt->run_cap);
386 if (write_initial_runs (xsrt, reader) && merge (xsrt))
388 struct casefile *output = xsrt->initial_runs[0];
389 xsrt->initial_runs[0] = NULL;
390 destroy_external_sort (xsrt);
395 destroy_external_sort (xsrt);
402 destroy_external_sort (struct external_sort *xsrt)
408 for (i = 0; i < xsrt->run_cnt; i++)
409 casefile_destroy (xsrt->initial_runs[i]);
410 free (xsrt->initial_runs);
415 /* Replacement selection. */
417 /* Pairs a record with a run number. */
420 int run; /* Run number of case. */
421 struct ccase record; /* Case data. */
424 /* Represents a set of initial runs during an external sort. */
425 struct initial_run_state
427 struct external_sort *xsrt;
430 struct record_run *records; /* Records arranged as a heap. */
431 size_t record_cnt; /* Current number of records. */
432 size_t record_cap; /* Capacity for records. */
434 /* Run currently being output. */
435 int run; /* Run number. */
436 size_t case_cnt; /* Number of cases so far. */
437 struct casefile *casefile; /* Output file. */
438 struct ccase last_output; /* Record last output. */
440 int okay; /* Zero if an error has been encountered. */
443 static const struct case_sink_class sort_sink_class;
445 static void destroy_initial_run_state (struct initial_run_state *);
446 static void process_case (struct initial_run_state *, const struct ccase *);
447 static int allocate_cases (struct initial_run_state *);
448 static void output_record (struct initial_run_state *);
449 static void start_run (struct initial_run_state *);
450 static void end_run (struct initial_run_state *);
451 static int compare_record_run (const struct record_run *,
452 const struct record_run *,
453 struct initial_run_state *);
454 static int compare_record_run_minheap (const void *, const void *, void *);
456 /* Reads cases from READER and composes initial runs in XSRT. */
458 write_initial_runs (struct external_sort *xsrt, struct casereader *reader)
460 struct initial_run_state *irs;
464 /* Allocate memory for cases. */
465 irs = xmalloc (sizeof *irs);
468 irs->record_cnt = irs->record_cap = 0;
471 irs->casefile = NULL;
472 case_nullify (&irs->last_output);
474 if (!allocate_cases (irs))
477 /* Create initial runs. */
479 for (; irs->okay && casereader_read (reader, &c); case_destroy (&c))
480 process_case (irs, &c);
481 while (irs->okay && irs->record_cnt > 0)
488 destroy_initial_run_state (irs);
493 /* Add a single case to an initial run. */
495 process_case (struct initial_run_state *irs, const struct ccase *c)
497 struct record_run *new_record_run;
499 /* Compose record_run for this run and add to heap. */
500 assert (irs->record_cnt < irs->record_cap - 1);
501 new_record_run = irs->records + irs->record_cnt++;
502 case_copy (&new_record_run->record, 0, c, 0, irs->xsrt->value_cnt);
503 new_record_run->run = irs->run;
504 if (!case_is_null (&irs->last_output)
505 && compare_record (c, &irs->last_output, irs->xsrt->criteria) < 0)
506 new_record_run->run = irs->run + 1;
507 push_heap (irs->records, irs->record_cnt, sizeof *irs->records,
508 compare_record_run_minheap, irs);
510 /* Output a record if the reservoir is full. */
511 if (irs->record_cnt == irs->record_cap - 1 && irs->okay)
515 /* Destroys the initial run state represented by IRS. */
517 destroy_initial_run_state (struct initial_run_state *irs)
524 for (i = 0; i < irs->record_cap; i++)
525 case_destroy (&irs->records[i].record);
528 if (irs->casefile != NULL)
529 casefile_sleep (irs->casefile);
534 /* Allocates room for lots of cases as a buffer. */
536 allocate_cases (struct initial_run_state *irs)
538 int approx_case_cost; /* Approximate memory cost of one case in bytes. */
539 int max_cases; /* Maximum number of cases to allocate. */
542 /* Allocate as many cases as we can within the workspace
544 approx_case_cost = (sizeof *irs->records
545 + irs->xsrt->value_cnt * sizeof (union value)
546 + 4 * sizeof (void *));
547 max_cases = get_max_workspace() / approx_case_cost;
548 irs->records = malloc (sizeof *irs->records * max_cases);
549 for (i = 0; i < max_cases; i++)
550 if (!case_try_create (&irs->records[i].record, irs->xsrt->value_cnt))
555 irs->record_cap = max_cases;
557 /* Fail if we didn't allocate an acceptable number of cases. */
558 if (irs->records == NULL || max_cases < MIN_BUFFER_TOTAL_SIZE_RECS)
560 msg (SE, _("Out of memory. Could not allocate room for minimum of %d "
561 "cases of %d bytes each. (PSPP workspace is currently "
562 "restricted to a maximum of %d KB.)"),
563 MIN_BUFFER_TOTAL_SIZE_RECS, approx_case_cost, get_max_workspace() / 1024);
569 /* Compares the VAR_CNT variables in VARS[] between the `value's at
570 A and B, and returns a strcmp()-type result. */
572 compare_record (const struct ccase *a, const struct ccase *b,
573 const struct sort_criteria *criteria)
580 for (i = 0; i < criteria->crit_cnt; i++)
582 const struct sort_criterion *c = &criteria->crits[i];
587 double af = case_num (a, c->fv);
588 double bf = case_num (b, c->fv);
590 result = af < bf ? -1 : af > bf;
593 result = memcmp (case_str (a, c->fv), case_str (b, c->fv), c->width);
596 return c->dir == SRT_ASCEND ? result : -result;
602 /* Compares record-run tuples A and B on run number first, then
603 on the current record according to SCP. */
605 compare_record_run (const struct record_run *a,
606 const struct record_run *b,
607 struct initial_run_state *irs)
609 if (a->run != b->run)
610 return a->run > b->run ? 1 : -1;
612 return compare_record (&a->record, &b->record, irs->xsrt->criteria);
615 /* Compares record-run tuples A and B on run number first, then
616 on the current record according to SCP, but in descending
619 compare_record_run_minheap (const void *a, const void *b, void *irs)
621 return -compare_record_run (a, b, irs);
624 /* Begins a new initial run, specifically its output file. */
626 start_run (struct initial_run_state *irs)
630 irs->casefile = casefile_create (irs->xsrt->value_cnt);
631 casefile_to_disk (irs->casefile);
632 case_nullify (&irs->last_output);
635 /* Ends the current initial run. */
637 end_run (struct initial_run_state *irs)
639 struct external_sort *xsrt = irs->xsrt;
641 /* Record initial run. */
642 if (irs->casefile != NULL)
644 if (xsrt->run_cnt >= xsrt->run_cap)
648 = xrealloc (xsrt->initial_runs,
649 sizeof *xsrt->initial_runs * xsrt->run_cap);
651 xsrt->initial_runs[xsrt->run_cnt++] = irs->casefile;
652 irs->casefile = NULL;
656 /* Writes a record to the current initial run. */
658 output_record (struct initial_run_state *irs)
660 struct record_run *record_run;
661 struct ccase case_tmp;
663 /* Extract minimum case from heap. */
664 assert (irs->record_cnt > 0);
665 pop_heap (irs->records, irs->record_cnt--, sizeof *irs->records,
666 compare_record_run_minheap, irs);
667 record_run = irs->records + irs->record_cnt;
669 /* Bail if an error has occurred. */
673 /* Start new run if necessary. */
674 assert (record_run->run == irs->run
675 || record_run->run == irs->run + 1);
676 if (record_run->run != irs->run)
681 assert (record_run->run == irs->run);
685 if (irs->casefile != NULL)
686 casefile_append (irs->casefile, &record_run->record);
688 /* This record becomes last_output. */
689 irs->last_output = case_tmp = record_run->record;
690 record_run->record = irs->records[irs->record_cap - 1].record;
691 irs->records[irs->record_cap - 1].record = case_tmp;
696 /* State of merging initial runs. */
699 struct external_sort *xsrt; /* External sort state. */
700 struct ccase *cases; /* Buffers. */
701 size_t case_cnt; /* Number of buffers. */
705 static struct casefile *merge_once (struct merge_state *,
706 struct casefile *[], size_t);
707 static int mod (int, int);
709 /* Performs a series of P-way merges of initial runs. */
711 merge (struct external_sort *xsrt)
713 struct merge_state mrg; /* State of merge. */
714 size_t approx_case_cost; /* Approximate memory cost of one case. */
715 int max_order; /* Maximum order of merge. */
716 size_t dummy_run_cnt; /* Number of dummy runs to insert. */
722 /* Allocate as many cases as possible into cases. */
723 approx_case_cost = (sizeof *mrg.cases
724 + xsrt->value_cnt * sizeof (union value)
725 + 4 * sizeof (void *));
726 mrg.case_cnt = get_max_workspace() / approx_case_cost;
727 mrg.cases = malloc (sizeof *mrg.cases * mrg.case_cnt);
728 if (mrg.cases == NULL)
730 for (i = 0; i < mrg.case_cnt; i++)
731 if (!case_try_create (&mrg.cases[i], xsrt->value_cnt))
736 if (mrg.case_cnt < MIN_BUFFER_TOTAL_SIZE_RECS)
738 msg (SE, _("Out of memory. Could not allocate room for minimum of %d "
739 "cases of %d bytes each. (PSPP workspace is currently "
740 "restricted to a maximum of %d KB.)"),
741 MIN_BUFFER_TOTAL_SIZE_RECS, approx_case_cost, get_max_workspace() / 1024);
745 /* Determine maximum order of merge. */
746 max_order = MAX_MERGE_ORDER;
747 if (mrg.case_cnt / max_order < MIN_BUFFER_SIZE_RECS)
748 max_order = mrg.case_cnt / MIN_BUFFER_SIZE_RECS;
749 else if (mrg.case_cnt / max_order * xsrt->value_cnt * sizeof (union value)
750 < MIN_BUFFER_SIZE_BYTES)
751 max_order = mrg.case_cnt / (MIN_BUFFER_SIZE_BYTES
752 / (xsrt->value_cnt * sizeof (union value)));
755 if (max_order > xsrt->run_cnt)
756 max_order = xsrt->run_cnt;
758 /* Repeatedly merge the P shortest existing runs until only one run
760 make_heap (xsrt->initial_runs, xsrt->run_cnt, sizeof *xsrt->initial_runs,
761 compare_initial_runs, NULL);
762 dummy_run_cnt = mod (1 - (int) xsrt->run_cnt, max_order - 1);
764 assert (max_order > 0);
765 assert (max_order <= 2
766 || (xsrt->run_cnt + dummy_run_cnt) % (max_order - 1) == 1);
767 while (xsrt->run_cnt > 1)
769 struct casefile *output_run;
773 /* Choose order of merge (max_order after first merge). */
774 order = max_order - dummy_run_cnt;
777 /* Choose runs to merge. */
778 assert (xsrt->run_cnt >= order);
779 for (i = 0; i < order; i++)
780 pop_heap (xsrt->initial_runs, xsrt->run_cnt--,
781 sizeof *xsrt->initial_runs,
782 compare_initial_runs, NULL);
785 output_run = merge_once (&mrg,
786 xsrt->initial_runs + xsrt->run_cnt, order);
787 if (output_run == NULL)
790 /* Add output run to heap. */
791 xsrt->initial_runs[xsrt->run_cnt++] = output_run;
792 push_heap (xsrt->initial_runs, xsrt->run_cnt, sizeof *xsrt->initial_runs,
793 compare_initial_runs, NULL);
796 /* Exactly one run is left, which contains the entire sorted
797 file. We could use it to find a total case count. */
798 assert (xsrt->run_cnt == 1);
803 for (i = 0; i < mrg.case_cnt; i++)
804 case_destroy (&mrg.cases[i]);
810 /* Modulo function as defined by Knuth. */
818 else if (x > 0 && y > 0)
820 else if (x < 0 && y > 0)
826 /* Merges the RUN_CNT initial runs specified in INPUT_RUNS into a
827 new run. Returns nonzero only if successful. Adds an entry
828 to MRG->xsrt->runs for the output file if and only if the
829 output file is actually created. Always deletes all the input
831 static struct casefile *
832 merge_once (struct merge_state *mrg,
833 struct casefile *input_runs[],
836 struct casereader *input_readers[MAX_MERGE_ORDER];
837 struct ccase input_cases[MAX_MERGE_ORDER];
838 struct casefile *output_casefile = NULL;
841 for (i = 0; i < run_cnt; i++)
843 input_readers[i] = casefile_get_destructive_reader (input_runs[i]);
844 if (!casereader_read_xfer (input_readers[i], &input_cases[i]))
851 output_casefile = casefile_create (mrg->xsrt->value_cnt);
852 casefile_to_disk (output_casefile);
861 for (i = 1; i < run_cnt; i++)
862 if (compare_record (&input_cases[i], &input_cases[min_idx],
863 mrg->xsrt->criteria) < 0)
866 /* Write minimum to output file. */
867 casefile_append_xfer (output_casefile, &input_cases[min_idx]);
869 if (!casereader_read_xfer (input_readers[min_idx],
870 &input_cases[min_idx]))
872 casereader_destroy (input_readers[min_idx]);
873 casefile_destroy (input_runs[min_idx]);
876 input_runs[min_idx] = input_runs[run_cnt];
877 input_readers[min_idx] = input_readers[run_cnt];
878 input_cases[min_idx] = input_cases[run_cnt];
882 casefile_sleep (output_casefile);
884 return output_casefile;