1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
26 #include "algorithm.h"
46 #include <sys/types.h>
53 #include "debug-print.h"
58 SRT_ASCEND, /* A, B, C, ..., X, Y, Z. */
59 SRT_DESCEND /* Z, Y, X, ..., C, B, A. */
62 /* A sort criterion. */
65 int fv; /* Variable data index. */
66 int width; /* 0=numeric, otherwise string widthe. */
67 enum sort_direction dir; /* Sort direction. */
70 /* A set of sort criteria. */
73 struct sort_criterion *crits;
77 static int compare_case_dblptrs (const void *, const void *, void *);
78 static int compare_record (const struct ccase *, const struct ccase *,
79 const struct sort_criteria *);
80 static struct casefile *do_internal_sort (struct casereader *,
81 const struct sort_criteria *);
82 static struct casefile *do_external_sort (struct casereader *,
83 const struct sort_criteria *);
85 /* Performs the SORT CASES procedures. */
89 struct sort_criteria *criteria;
94 criteria = sort_parse_criteria (default_dict, NULL, NULL);
98 success = sort_active_file_in_place (criteria);
99 sort_destroy_criteria (criteria);
100 return success ? lex_end_of_command () : CMD_FAILURE;
103 /* Gets ready to sort the active file, either in-place or to a
104 separate casefile. */
106 prepare_to_sort_active_file (void)
108 /* Cancel temporary transformations and PROCESS IF. */
111 expr_free (process_if_expr);
112 process_if_expr = NULL;
114 /* Make sure source cases are in a storage source. */
115 procedure (NULL, NULL);
116 assert (case_source_is_class (vfm_source, &storage_source_class));
119 /* Sorts the active file in-place according to CRITERIA.
120 Returns nonzero if successful. */
122 sort_active_file_in_place (const struct sort_criteria *criteria)
124 struct casefile *src, *dst;
126 prepare_to_sort_active_file ();
128 src = storage_source_get_casefile (vfm_source);
129 dst = sort_execute (casefile_get_destructive_reader (src), criteria);
130 free_case_source (vfm_source);
136 vfm_source = storage_source_create (dst, default_dict);
140 /* Sorts the active file to a separate casefile. If successful,
141 returns the sorted casefile. Returns a null pointer on
144 sort_active_file_to_casefile (const struct sort_criteria *criteria)
146 struct casefile *src;
148 prepare_to_sort_active_file ();
150 src = storage_source_get_casefile (vfm_source);
151 return sort_execute (casefile_get_reader (src), criteria);
154 /* Parses a list of sort keys and returns a struct sort_cases_pgm
155 based on it. Returns a null pointer on error. */
156 struct sort_criteria *
157 sort_parse_criteria (const struct dictionary *dict,
158 struct variable ***vars, int *var_cnt)
160 struct sort_criteria *criteria;
161 struct variable **local_vars = NULL;
162 size_t local_var_cnt;
164 assert ((vars == NULL) == (var_cnt == NULL));
168 var_cnt = &local_var_cnt;
171 criteria = xmalloc (sizeof *criteria);
172 criteria->crits = NULL;
173 criteria->crit_cnt = 0;
180 int prev_var_cnt = *var_cnt;
181 enum sort_direction direction;
184 if (!parse_variables (dict, vars, var_cnt,
185 PV_NO_DUPLICATE | PV_APPEND | PV_NO_SCRATCH))
188 /* Sort direction. */
191 if (lex_match_id ("D") || lex_match_id ("DOWN"))
192 direction = SRT_DESCEND;
193 else if (lex_match_id ("A") || lex_match_id ("UP"))
194 direction = SRT_ASCEND;
197 msg (SE, _("`A' or `D' expected inside parentheses."));
200 if (!lex_match (')'))
202 msg (SE, _("`)' expected."));
207 direction = SRT_ASCEND;
209 criteria->crits = xrealloc (criteria->crits,
210 sizeof *criteria->crits * *var_cnt);
211 criteria->crit_cnt = *var_cnt;
212 for (; prev_var_cnt < criteria->crit_cnt; prev_var_cnt++)
214 struct sort_criterion *c = &criteria->crits[prev_var_cnt];
215 c->fv = (*vars)[prev_var_cnt]->fv;
216 c->width = (*vars)[prev_var_cnt]->width;
220 while (token != '.' && token != '/');
227 sort_destroy_criteria (criteria);
231 /* Destroys a SORT CASES program. */
233 sort_destroy_criteria (struct sort_criteria *criteria)
235 if (criteria != NULL)
237 free (criteria->crits);
242 /* Reads all the cases from READER, which is destroyed. Sorts
243 the cases according to CRITERIA. Returns the sorted cases in
244 a newly created casefile. */
246 sort_execute (struct casereader *reader, const struct sort_criteria *criteria)
248 struct casefile *output;
250 output = do_internal_sort (reader, criteria);
252 output = do_external_sort (reader, criteria);
253 casereader_destroy (reader);
257 /* If the data is in memory, do an internal sort and return a new
258 casefile for the data. */
259 static struct casefile *
260 do_internal_sort (struct casereader *reader,
261 const struct sort_criteria *criteria)
263 const struct casefile *src;
264 struct casefile *dst;
265 struct ccase *cases, **case_ptrs;
266 unsigned long case_cnt;
268 src = casereader_get_casefile (reader);
269 if (casefile_get_case_cnt (src) > 1 && !casefile_in_core (src))
272 case_cnt = casefile_get_case_cnt (src);
273 cases = malloc (sizeof *cases * case_cnt);
274 case_ptrs = malloc (sizeof *case_ptrs * case_cnt);
275 if ((cases != NULL && case_ptrs != NULL) || case_cnt == 0)
277 unsigned long case_idx;
279 for (case_idx = 0; case_idx < case_cnt; case_idx++)
281 int success = casereader_read_xfer (reader, &cases[case_idx]);
283 case_ptrs[case_idx] = &cases[case_idx];
286 sort (case_ptrs, case_cnt, sizeof *case_ptrs, compare_case_dblptrs,
289 dst = casefile_create (casefile_get_value_cnt (src));
290 for (case_idx = 0; case_idx < case_cnt; case_idx++)
291 casefile_append_xfer (dst, case_ptrs[case_idx]);
302 /* Compares the variables specified by CRITERIA between the cases
303 at A and B, and returns a strcmp()-type result. */
305 compare_case_dblptrs (const void *a_, const void *b_, void *criteria_)
307 struct sort_criteria *criteria = criteria_;
308 struct ccase *const *pa = a_;
309 struct ccase *const *pb = b_;
310 struct ccase *a = *pa;
311 struct ccase *b = *pb;
313 return compare_record (a, b, criteria);
318 /* Maximum order of merge. If you increase this, then you should
319 use a heap for comparing cases during merge. */
320 #define MAX_MERGE_ORDER 7
322 /* Minimum total number of records for buffers. */
323 #define MIN_BUFFER_TOTAL_SIZE_RECS 64
325 /* Minimum single input buffer size, in bytes and records. */
326 #define MIN_BUFFER_SIZE_BYTES 4096
327 #define MIN_BUFFER_SIZE_RECS 16
329 #if MIN_BUFFER_SIZE_RECS * 2 + 16 > MIN_BUFFER_TOTAL_SIZE_RECS
330 #error MIN_BUFFER_SIZE_RECS and MIN_BUFFER_TOTAL_SIZE_RECS do not make sense.
333 /* Sorts initial runs A and B in decending order by length. */
335 compare_initial_runs (const void *a_, const void *b_, void *aux UNUSED)
337 const struct casefile *a = a_;
338 const struct casefile *b = b_;
339 unsigned long a_case_cnt = casefile_get_case_cnt (a);
340 unsigned long b_case_cnt = casefile_get_case_cnt (b);
342 return a_case_cnt > b_case_cnt ? -1 : a_case_cnt < b_case_cnt;
345 /* Results of an external sort. */
348 const struct sort_criteria *criteria; /* Sort criteria. */
349 size_t value_cnt; /* Size of data in `union value's. */
350 struct casefile **initial_runs; /* Array of initial runs. */
351 size_t run_cnt, run_cap; /* Number of runs, allocated capacity. */
354 /* Prototypes for helper functions. */
355 static int write_initial_runs (struct external_sort *, struct casereader *);
356 static int merge (struct external_sort *);
357 static void destroy_external_sort (struct external_sort *);
359 /* Performs an external sort of the active file according to the
360 specification in SCP. Forms initial runs using a heap as a
361 reservoir. Determines the optimum merge pattern via Huffman's
362 method (see Knuth vol. 3, 2nd edition, p. 365-366), and merges
363 according to that pattern. */
364 static struct casefile *
365 do_external_sort (struct casereader *reader,
366 const struct sort_criteria *criteria)
368 struct external_sort *xsrt;
370 casefile_to_disk (casereader_get_casefile (reader));
372 xsrt = xmalloc (sizeof *xsrt);
373 xsrt->criteria = criteria;
374 xsrt->value_cnt = casefile_get_value_cnt (casereader_get_casefile (reader));
377 xsrt->initial_runs = xmalloc (sizeof *xsrt->initial_runs * xsrt->run_cap);
378 if (write_initial_runs (xsrt, reader) && merge (xsrt))
380 struct casefile *output = xsrt->initial_runs[0];
381 xsrt->initial_runs[0] = NULL;
382 destroy_external_sort (xsrt);
387 destroy_external_sort (xsrt);
394 destroy_external_sort (struct external_sort *xsrt)
400 for (i = 0; i < xsrt->run_cnt; i++)
401 casefile_destroy (xsrt->initial_runs[i]);
402 free (xsrt->initial_runs);
407 /* Replacement selection. */
409 /* Pairs a record with a run number. */
412 int run; /* Run number of case. */
413 struct ccase record; /* Case data. */
416 /* Represents a set of initial runs during an external sort. */
417 struct initial_run_state
419 struct external_sort *xsrt;
422 struct record_run *records; /* Records arranged as a heap. */
423 size_t record_cnt; /* Current number of records. */
424 size_t record_cap; /* Capacity for records. */
426 /* Run currently being output. */
427 int run; /* Run number. */
428 size_t case_cnt; /* Number of cases so far. */
429 struct casefile *casefile; /* Output file. */
430 struct ccase last_output; /* Record last output. */
432 int okay; /* Zero if an error has been encountered. */
435 static const struct case_sink_class sort_sink_class;
437 static void destroy_initial_run_state (struct initial_run_state *);
438 static void process_case (struct initial_run_state *, const struct ccase *);
439 static int allocate_cases (struct initial_run_state *);
440 static void output_record (struct initial_run_state *);
441 static void start_run (struct initial_run_state *);
442 static void end_run (struct initial_run_state *);
443 static int compare_record_run (const struct record_run *,
444 const struct record_run *,
445 struct initial_run_state *);
446 static int compare_record_run_minheap (const void *, const void *, void *);
448 /* Reads cases from READER and composes initial runs in XSRT. */
450 write_initial_runs (struct external_sort *xsrt, struct casereader *reader)
452 struct initial_run_state *irs;
456 /* Allocate memory for cases. */
457 irs = xmalloc (sizeof *irs);
460 irs->record_cnt = irs->record_cap = 0;
463 irs->casefile = NULL;
464 case_nullify (&irs->last_output);
466 if (!allocate_cases (irs))
469 /* Create initial runs. */
471 for (; irs->okay && casereader_read (reader, &c); case_destroy (&c))
472 process_case (irs, &c);
473 while (irs->okay && irs->record_cnt > 0)
480 destroy_initial_run_state (irs);
485 /* Add a single case to an initial run. */
487 process_case (struct initial_run_state *irs, const struct ccase *c)
489 struct record_run *new_record_run;
491 /* Compose record_run for this run and add to heap. */
492 assert (irs->record_cnt < irs->record_cap - 1);
493 new_record_run = irs->records + irs->record_cnt++;
494 case_copy (&new_record_run->record, 0, c, 0, irs->xsrt->value_cnt);
495 new_record_run->run = irs->run;
496 if (!case_is_null (&irs->last_output)
497 && compare_record (c, &irs->last_output, irs->xsrt->criteria) < 0)
498 new_record_run->run = irs->run + 1;
499 push_heap (irs->records, irs->record_cnt, sizeof *irs->records,
500 compare_record_run_minheap, irs);
502 /* Output a record if the reservoir is full. */
503 if (irs->record_cnt == irs->record_cap - 1 && irs->okay)
507 /* Destroys the initial run state represented by IRS. */
509 destroy_initial_run_state (struct initial_run_state *irs)
516 for (i = 0; i < irs->record_cap; i++)
517 case_destroy (&irs->records[i].record);
520 if (irs->casefile != NULL)
521 casefile_sleep (irs->casefile);
526 /* Allocates room for lots of cases as a buffer. */
528 allocate_cases (struct initial_run_state *irs)
530 int approx_case_cost; /* Approximate memory cost of one case in bytes. */
531 int max_cases; /* Maximum number of cases to allocate. */
534 /* Allocate as many cases as we can within the workspace
536 approx_case_cost = (sizeof *irs->records
537 + irs->xsrt->value_cnt * sizeof (union value)
538 + 4 * sizeof (void *));
539 max_cases = get_max_workspace() / approx_case_cost;
540 irs->records = malloc (sizeof *irs->records * max_cases);
541 for (i = 0; i < max_cases; i++)
542 if (!case_try_create (&irs->records[i].record, irs->xsrt->value_cnt))
547 irs->record_cap = max_cases;
549 /* Fail if we didn't allocate an acceptable number of cases. */
550 if (irs->records == NULL || max_cases < MIN_BUFFER_TOTAL_SIZE_RECS)
552 msg (SE, _("Out of memory. Could not allocate room for minimum of %d "
553 "cases of %d bytes each. (PSPP workspace is currently "
554 "restricted to a maximum of %d KB.)"),
555 MIN_BUFFER_TOTAL_SIZE_RECS, approx_case_cost, get_max_workspace() / 1024);
561 /* Compares the VAR_CNT variables in VARS[] between the `value's at
562 A and B, and returns a strcmp()-type result. */
564 compare_record (const struct ccase *a, const struct ccase *b,
565 const struct sort_criteria *criteria)
572 for (i = 0; i < criteria->crit_cnt; i++)
574 const struct sort_criterion *c = &criteria->crits[i];
579 double af = case_num (a, c->fv);
580 double bf = case_num (b, c->fv);
582 result = af < bf ? -1 : af > bf;
585 result = memcmp (case_str (a, c->fv), case_str (b, c->fv), c->width);
588 return c->dir == SRT_ASCEND ? result : -result;
594 /* Compares record-run tuples A and B on run number first, then
595 on the current record according to SCP. */
597 compare_record_run (const struct record_run *a,
598 const struct record_run *b,
599 struct initial_run_state *irs)
601 if (a->run != b->run)
602 return a->run > b->run ? 1 : -1;
604 return compare_record (&a->record, &b->record, irs->xsrt->criteria);
607 /* Compares record-run tuples A and B on run number first, then
608 on the current record according to SCP, but in descending
611 compare_record_run_minheap (const void *a, const void *b, void *irs)
613 return -compare_record_run (a, b, irs);
616 /* Begins a new initial run, specifically its output file. */
618 start_run (struct initial_run_state *irs)
622 irs->casefile = casefile_create (irs->xsrt->value_cnt);
623 casefile_to_disk (irs->casefile);
624 case_nullify (&irs->last_output);
627 /* Ends the current initial run. */
629 end_run (struct initial_run_state *irs)
631 struct external_sort *xsrt = irs->xsrt;
633 /* Record initial run. */
634 if (irs->casefile != NULL)
636 if (xsrt->run_cnt >= xsrt->run_cap)
640 = xrealloc (xsrt->initial_runs,
641 sizeof *xsrt->initial_runs * xsrt->run_cap);
643 xsrt->initial_runs[xsrt->run_cnt++] = irs->casefile;
644 irs->casefile = NULL;
648 /* Writes a record to the current initial run. */
650 output_record (struct initial_run_state *irs)
652 struct record_run *record_run;
653 struct ccase case_tmp;
655 /* Extract minimum case from heap. */
656 assert (irs->record_cnt > 0);
657 pop_heap (irs->records, irs->record_cnt--, sizeof *irs->records,
658 compare_record_run_minheap, irs);
659 record_run = irs->records + irs->record_cnt;
661 /* Bail if an error has occurred. */
665 /* Start new run if necessary. */
666 assert (record_run->run == irs->run
667 || record_run->run == irs->run + 1);
668 if (record_run->run != irs->run)
673 assert (record_run->run == irs->run);
677 if (irs->casefile != NULL)
678 casefile_append (irs->casefile, &record_run->record);
680 /* This record becomes last_output. */
681 irs->last_output = case_tmp = record_run->record;
682 record_run->record = irs->records[irs->record_cap - 1].record;
683 irs->records[irs->record_cap - 1].record = case_tmp;
688 /* State of merging initial runs. */
691 struct external_sort *xsrt; /* External sort state. */
692 struct ccase *cases; /* Buffers. */
693 size_t case_cnt; /* Number of buffers. */
697 static struct casefile *merge_once (struct merge_state *,
698 struct casefile *[], size_t);
699 static int mod (int, int);
701 /* Performs a series of P-way merges of initial runs. */
703 merge (struct external_sort *xsrt)
705 struct merge_state mrg; /* State of merge. */
706 size_t approx_case_cost; /* Approximate memory cost of one case. */
707 int max_order; /* Maximum order of merge. */
708 size_t dummy_run_cnt; /* Number of dummy runs to insert. */
714 /* Allocate as many cases as possible into cases. */
715 approx_case_cost = (sizeof *mrg.cases
716 + xsrt->value_cnt * sizeof (union value)
717 + 4 * sizeof (void *));
718 mrg.case_cnt = get_max_workspace() / approx_case_cost;
719 mrg.cases = malloc (sizeof *mrg.cases * mrg.case_cnt);
720 if (mrg.cases == NULL)
722 for (i = 0; i < mrg.case_cnt; i++)
723 if (!case_try_create (&mrg.cases[i], xsrt->value_cnt))
728 if (mrg.case_cnt < MIN_BUFFER_TOTAL_SIZE_RECS)
730 msg (SE, _("Out of memory. Could not allocate room for minimum of %d "
731 "cases of %d bytes each. (PSPP workspace is currently "
732 "restricted to a maximum of %d KB.)"),
733 MIN_BUFFER_TOTAL_SIZE_RECS, approx_case_cost, get_max_workspace() / 1024);
737 /* Determine maximum order of merge. */
738 max_order = MAX_MERGE_ORDER;
739 if (mrg.case_cnt / max_order < MIN_BUFFER_SIZE_RECS)
740 max_order = mrg.case_cnt / MIN_BUFFER_SIZE_RECS;
741 else if (mrg.case_cnt / max_order * xsrt->value_cnt * sizeof (union value)
742 < MIN_BUFFER_SIZE_BYTES)
743 max_order = mrg.case_cnt / (MIN_BUFFER_SIZE_BYTES
744 / (xsrt->value_cnt * sizeof (union value)));
747 if (max_order > xsrt->run_cnt)
748 max_order = xsrt->run_cnt;
750 /* Repeatedly merge the P shortest existing runs until only one run
752 make_heap (xsrt->initial_runs, xsrt->run_cnt, sizeof *xsrt->initial_runs,
753 compare_initial_runs, NULL);
754 dummy_run_cnt = mod (1 - (int) xsrt->run_cnt, max_order - 1);
755 assert (max_order == 2
756 || (xsrt->run_cnt + dummy_run_cnt) % (max_order - 1) == 1);
757 while (xsrt->run_cnt > 1)
759 struct casefile *output_run;
763 /* Choose order of merge (max_order after first merge). */
764 order = max_order - dummy_run_cnt;
767 /* Choose runs to merge. */
768 assert (xsrt->run_cnt >= order);
769 for (i = 0; i < order; i++)
770 pop_heap (xsrt->initial_runs, xsrt->run_cnt--,
771 sizeof *xsrt->initial_runs,
772 compare_initial_runs, NULL);
775 output_run = merge_once (&mrg,
776 xsrt->initial_runs + xsrt->run_cnt, order);
777 if (output_run == NULL)
780 /* Add output run to heap. */
781 xsrt->initial_runs[xsrt->run_cnt++] = output_run;
782 push_heap (xsrt->initial_runs, xsrt->run_cnt, sizeof *xsrt->initial_runs,
783 compare_initial_runs, NULL);
786 /* Exactly one run is left, which contains the entire sorted
787 file. We could use it to find a total case count. */
788 assert (xsrt->run_cnt == 1);
793 for (i = 0; i < mrg.case_cnt; i++)
794 case_destroy (&mrg.cases[i]);
800 /* Modulo function as defined by Knuth. */
808 else if (x > 0 && y > 0)
810 else if (x < 0 && y > 0)
816 /* Merges the RUN_CNT initial runs specified in INPUT_RUNS into a
817 new run. Returns nonzero only if successful. Adds an entry
818 to MRG->xsrt->runs for the output file if and only if the
819 output file is actually created. Always deletes all the input
821 static struct casefile *
822 merge_once (struct merge_state *mrg,
823 struct casefile *input_runs[],
826 struct casereader *input_readers[MAX_MERGE_ORDER];
827 struct ccase input_cases[MAX_MERGE_ORDER];
828 struct casefile *output_casefile = NULL;
831 for (i = 0; i < run_cnt; i++)
833 input_readers[i] = casefile_get_reader (input_runs[i]);
834 if (!casereader_read_xfer (input_readers[i], &input_cases[i]))
841 output_casefile = casefile_create (mrg->xsrt->value_cnt);
842 casefile_to_disk (output_casefile);
851 for (i = 1; i < run_cnt; i++)
852 if (compare_record (&input_cases[i], &input_cases[min_idx],
853 mrg->xsrt->criteria) < 0)
856 /* Write minimum to output file. */
857 casefile_append_xfer (output_casefile, &input_cases[i]);
859 if (!casereader_read_xfer (input_readers[i], &input_cases[i]))
861 casereader_destroy (input_readers[i]);
862 casefile_destroy (input_runs[i]);
865 input_runs[i] = input_runs[run_cnt--];
866 input_readers[i] = input_readers[run_cnt--];
867 input_cases[i] = input_cases[run_cnt--];
871 casefile_sleep (output_casefile);
873 return output_casefile;