1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
26 #include "algorithm.h"
33 #include "expressions/public.h"
47 #include <sys/types.h>
54 #include "debug-print.h"
59 SRT_ASCEND, /* A, B, C, ..., X, Y, Z. */
60 SRT_DESCEND /* Z, Y, X, ..., C, B, A. */
63 /* A sort criterion. */
66 int fv; /* Variable data index. */
67 int width; /* 0=numeric, otherwise string widthe. */
68 enum sort_direction dir; /* Sort direction. */
71 /* A set of sort criteria. */
74 struct sort_criterion *crits;
78 static int compare_record (const struct ccase *, const struct ccase *,
79 const struct sort_criteria *);
80 static struct casefile *do_internal_sort (struct casereader *,
81 const struct sort_criteria *);
82 static struct casefile *do_external_sort (struct casereader *,
83 const struct sort_criteria *);
85 /* Performs the SORT CASES procedures. */
89 struct sort_criteria *criteria;
94 criteria = sort_parse_criteria (default_dict, NULL, NULL, NULL);
98 success = sort_active_file_in_place (criteria);
99 sort_destroy_criteria (criteria);
100 return success ? lex_end_of_command () : CMD_FAILURE;
103 /* Gets ready to sort the active file, either in-place or to a
104 separate casefile. */
106 prepare_to_sort_active_file (void)
108 /* Cancel temporary transformations and PROCESS IF. */
111 expr_free (process_if_expr);
112 process_if_expr = NULL;
114 /* Make sure source cases are in a storage source. */
115 procedure (NULL, NULL);
116 assert (case_source_is_class (vfm_source, &storage_source_class));
119 /* Sorts the active file in-place according to CRITERIA.
120 Returns nonzero if successful. */
122 sort_active_file_in_place (const struct sort_criteria *criteria)
124 struct casefile *src, *dst;
126 prepare_to_sort_active_file ();
128 src = storage_source_get_casefile (vfm_source);
129 dst = sort_execute (casefile_get_destructive_reader (src), criteria);
130 free_case_source (vfm_source);
136 vfm_source = storage_source_create (dst);
140 /* Sorts the active file to a separate casefile. If successful,
141 returns the sorted casefile. Returns a null pointer on
144 sort_active_file_to_casefile (const struct sort_criteria *criteria)
146 struct casefile *src;
148 prepare_to_sort_active_file ();
150 src = storage_source_get_casefile (vfm_source);
151 return sort_execute (casefile_get_reader (src), criteria);
154 /* Parses a list of sort keys and returns a struct sort_criteria
155 based on it. Returns a null pointer on error.
156 If SAW_DIRECTION is nonnull, sets *SAW_DIRECTION to true if at
157 least one parenthesized sort direction was specified, false
159 struct sort_criteria *
160 sort_parse_criteria (const struct dictionary *dict,
161 struct variable ***vars, int *var_cnt,
164 struct sort_criteria *criteria;
165 struct variable **local_vars = NULL;
166 size_t local_var_cnt;
168 assert ((vars == NULL) == (var_cnt == NULL));
172 var_cnt = &local_var_cnt;
175 criteria = xmalloc (sizeof *criteria);
176 criteria->crits = NULL;
177 criteria->crit_cnt = 0;
181 if (saw_direction != NULL)
182 *saw_direction = false;
186 int prev_var_cnt = *var_cnt;
187 enum sort_direction direction;
190 if (!parse_variables (dict, vars, var_cnt,
191 PV_NO_DUPLICATE | PV_APPEND | PV_NO_SCRATCH))
194 /* Sort direction. */
197 if (lex_match_id ("D") || lex_match_id ("DOWN"))
198 direction = SRT_DESCEND;
199 else if (lex_match_id ("A") || lex_match_id ("UP"))
200 direction = SRT_ASCEND;
203 msg (SE, _("`A' or `D' expected inside parentheses."));
206 if (!lex_match (')'))
208 msg (SE, _("`)' expected."));
211 *saw_direction = true;
214 direction = SRT_ASCEND;
216 criteria->crits = xrealloc (criteria->crits,
217 sizeof *criteria->crits * *var_cnt);
218 criteria->crit_cnt = *var_cnt;
219 for (; prev_var_cnt < criteria->crit_cnt; prev_var_cnt++)
221 struct sort_criterion *c = &criteria->crits[prev_var_cnt];
222 c->fv = (*vars)[prev_var_cnt]->fv;
223 c->width = (*vars)[prev_var_cnt]->width;
227 while (token != '.' && token != '/');
234 sort_destroy_criteria (criteria);
238 /* Destroys a SORT CASES program. */
240 sort_destroy_criteria (struct sort_criteria *criteria)
242 if (criteria != NULL)
244 free (criteria->crits);
249 /* Reads all the cases from READER, which is destroyed. Sorts
250 the cases according to CRITERIA. Returns the sorted cases in
251 a newly created casefile. */
253 sort_execute (struct casereader *reader, const struct sort_criteria *criteria)
255 struct casefile *output;
257 output = do_internal_sort (reader, criteria);
259 output = do_external_sort (reader, criteria);
260 casereader_destroy (reader);
264 /* A case and its index. */
267 struct ccase c; /* Case. */
268 unsigned long idx; /* Index to allow for stable sorting. */
271 static int compare_indexed_cases (const void *, const void *, void *);
273 /* If the data is in memory, do an internal sort and return a new
274 casefile for the data. */
275 static struct casefile *
276 do_internal_sort (struct casereader *reader,
277 const struct sort_criteria *criteria)
279 const struct casefile *src;
280 struct casefile *dst;
281 unsigned long case_cnt;
283 src = casereader_get_casefile (reader);
284 if (casefile_get_case_cnt (src) > 1 && !casefile_in_core (src))
287 case_cnt = casefile_get_case_cnt (src);
288 dst = casefile_create (casefile_get_value_cnt (src));
291 struct indexed_case *cases = malloc (sizeof *cases * case_cnt);
296 for (i = 0; i < case_cnt; i++)
298 casereader_read_xfer_assert (reader, &cases[i].c);
302 sort (cases, case_cnt, sizeof *cases, compare_indexed_cases,
305 for (i = 0; i < case_cnt; i++)
306 casefile_append_xfer (dst, &cases[i].c);
313 casefile_destroy (dst);
321 /* Compares the variables specified by CRITERIA between the cases
322 at A and B, with a "last resort" comparison for stability, and
323 returns a strcmp()-type result. */
325 compare_indexed_cases (const void *a_, const void *b_, void *criteria_)
327 struct sort_criteria *criteria = criteria_;
328 const struct indexed_case *a = a_;
329 const struct indexed_case *b = b_;
330 int result = compare_record (&a->c, &b->c, criteria);
332 result = a->idx < b->idx ? -1 : a->idx > b->idx;
338 /* Maximum order of merge. If you increase this, then you should
339 use a heap for comparing cases during merge. */
340 #define MAX_MERGE_ORDER 7
342 /* Minimum total number of records for buffers. */
343 #define MIN_BUFFER_TOTAL_SIZE_RECS 64
345 /* Minimum single input buffer size, in bytes and records. */
346 #define MIN_BUFFER_SIZE_BYTES 4096
347 #define MIN_BUFFER_SIZE_RECS 16
349 #if MIN_BUFFER_SIZE_RECS * 2 + 16 > MIN_BUFFER_TOTAL_SIZE_RECS
350 #error MIN_BUFFER_SIZE_RECS and MIN_BUFFER_TOTAL_SIZE_RECS do not make sense.
353 /* Sorts initial runs A and B in decending order by length. */
355 compare_initial_runs (const void *a_, const void *b_, void *aux UNUSED)
357 struct casefile *const *a = a_;
358 struct casefile *const *b = b_;
359 unsigned long a_case_cnt = casefile_get_case_cnt (*a);
360 unsigned long b_case_cnt = casefile_get_case_cnt (*b);
362 return a_case_cnt > b_case_cnt ? -1 : a_case_cnt < b_case_cnt;
365 /* Results of an external sort. */
368 const struct sort_criteria *criteria; /* Sort criteria. */
369 size_t value_cnt; /* Size of data in `union value's. */
370 struct casefile **initial_runs; /* Array of initial runs. */
371 size_t run_cnt, run_cap; /* Number of runs, allocated capacity. */
374 /* Prototypes for helper functions. */
375 static int write_initial_runs (struct external_sort *, struct casereader *);
376 static int merge (struct external_sort *);
377 static void destroy_external_sort (struct external_sort *);
379 /* Performs an external sort of the active file according to the
380 specification in SCP. Forms initial runs using a heap as a
381 reservoir. Determines the optimum merge pattern via Huffman's
382 method (see Knuth vol. 3, 2nd edition, p. 365-366), and merges
383 according to that pattern. */
384 static struct casefile *
385 do_external_sort (struct casereader *reader,
386 const struct sort_criteria *criteria)
388 struct external_sort *xsrt;
390 casefile_to_disk (casereader_get_casefile (reader));
392 xsrt = xmalloc (sizeof *xsrt);
393 xsrt->criteria = criteria;
394 xsrt->value_cnt = casefile_get_value_cnt (casereader_get_casefile (reader));
397 xsrt->initial_runs = xmalloc (sizeof *xsrt->initial_runs * xsrt->run_cap);
398 if (write_initial_runs (xsrt, reader) && merge (xsrt))
400 struct casefile *output = xsrt->initial_runs[0];
401 xsrt->initial_runs[0] = NULL;
402 destroy_external_sort (xsrt);
407 destroy_external_sort (xsrt);
414 destroy_external_sort (struct external_sort *xsrt)
420 for (i = 0; i < xsrt->run_cnt; i++)
421 casefile_destroy (xsrt->initial_runs[i]);
422 free (xsrt->initial_runs);
427 /* Replacement selection. */
429 /* Pairs a record with a run number. */
432 int run; /* Run number of case. */
433 struct ccase record; /* Case data. */
436 /* Represents a set of initial runs during an external sort. */
437 struct initial_run_state
439 struct external_sort *xsrt;
442 struct record_run *records; /* Records arranged as a heap. */
443 size_t record_cnt; /* Current number of records. */
444 size_t record_cap; /* Capacity for records. */
446 /* Run currently being output. */
447 int run; /* Run number. */
448 size_t case_cnt; /* Number of cases so far. */
449 struct casefile *casefile; /* Output file. */
450 struct ccase last_output; /* Record last output. */
452 int okay; /* Zero if an error has been encountered. */
455 static const struct case_sink_class sort_sink_class;
457 static void destroy_initial_run_state (struct initial_run_state *);
458 static void process_case (struct initial_run_state *, const struct ccase *);
459 static int allocate_cases (struct initial_run_state *);
460 static void output_record (struct initial_run_state *);
461 static void start_run (struct initial_run_state *);
462 static void end_run (struct initial_run_state *);
463 static int compare_record_run (const struct record_run *,
464 const struct record_run *,
465 struct initial_run_state *);
466 static int compare_record_run_minheap (const void *, const void *, void *);
468 /* Reads cases from READER and composes initial runs in XSRT. */
470 write_initial_runs (struct external_sort *xsrt, struct casereader *reader)
472 struct initial_run_state *irs;
476 /* Allocate memory for cases. */
477 irs = xmalloc (sizeof *irs);
480 irs->record_cnt = irs->record_cap = 0;
483 irs->casefile = NULL;
484 case_nullify (&irs->last_output);
486 if (!allocate_cases (irs))
489 /* Create initial runs. */
491 for (; irs->okay && casereader_read (reader, &c); case_destroy (&c))
492 process_case (irs, &c);
493 while (irs->okay && irs->record_cnt > 0)
500 destroy_initial_run_state (irs);
505 /* Add a single case to an initial run. */
507 process_case (struct initial_run_state *irs, const struct ccase *c)
509 struct record_run *new_record_run;
511 /* Compose record_run for this run and add to heap. */
512 assert (irs->record_cnt < irs->record_cap - 1);
513 new_record_run = irs->records + irs->record_cnt++;
514 case_copy (&new_record_run->record, 0, c, 0, irs->xsrt->value_cnt);
515 new_record_run->run = irs->run;
516 if (!case_is_null (&irs->last_output)
517 && compare_record (c, &irs->last_output, irs->xsrt->criteria) < 0)
518 new_record_run->run = irs->run + 1;
519 push_heap (irs->records, irs->record_cnt, sizeof *irs->records,
520 compare_record_run_minheap, irs);
522 /* Output a record if the reservoir is full. */
523 if (irs->record_cnt == irs->record_cap - 1 && irs->okay)
527 /* Destroys the initial run state represented by IRS. */
529 destroy_initial_run_state (struct initial_run_state *irs)
536 for (i = 0; i < irs->record_cap; i++)
537 case_destroy (&irs->records[i].record);
540 if (irs->casefile != NULL)
541 casefile_sleep (irs->casefile);
546 /* Allocates room for lots of cases as a buffer. */
548 allocate_cases (struct initial_run_state *irs)
550 int approx_case_cost; /* Approximate memory cost of one case in bytes. */
551 int max_cases; /* Maximum number of cases to allocate. */
554 /* Allocate as many cases as we can within the workspace
556 approx_case_cost = (sizeof *irs->records
557 + irs->xsrt->value_cnt * sizeof (union value)
558 + 4 * sizeof (void *));
559 max_cases = get_max_workspace() / approx_case_cost;
560 irs->records = malloc (sizeof *irs->records * max_cases);
561 for (i = 0; i < max_cases; i++)
562 if (!case_try_create (&irs->records[i].record, irs->xsrt->value_cnt))
567 irs->record_cap = max_cases;
569 /* Fail if we didn't allocate an acceptable number of cases. */
570 if (irs->records == NULL || max_cases < MIN_BUFFER_TOTAL_SIZE_RECS)
572 msg (SE, _("Out of memory. Could not allocate room for minimum of %d "
573 "cases of %d bytes each. (PSPP workspace is currently "
574 "restricted to a maximum of %d KB.)"),
575 MIN_BUFFER_TOTAL_SIZE_RECS, approx_case_cost, get_max_workspace() / 1024);
581 /* Compares the VAR_CNT variables in VARS[] between the `value's at
582 A and B, and returns a strcmp()-type result. */
584 compare_record (const struct ccase *a, const struct ccase *b,
585 const struct sort_criteria *criteria)
592 for (i = 0; i < criteria->crit_cnt; i++)
594 const struct sort_criterion *c = &criteria->crits[i];
599 double af = case_num (a, c->fv);
600 double bf = case_num (b, c->fv);
602 result = af < bf ? -1 : af > bf;
605 result = memcmp (case_str (a, c->fv), case_str (b, c->fv), c->width);
608 return c->dir == SRT_ASCEND ? result : -result;
614 /* Compares record-run tuples A and B on run number first, then
615 on the current record according to SCP. */
617 compare_record_run (const struct record_run *a,
618 const struct record_run *b,
619 struct initial_run_state *irs)
621 if (a->run != b->run)
622 return a->run > b->run ? 1 : -1;
624 return compare_record (&a->record, &b->record, irs->xsrt->criteria);
627 /* Compares record-run tuples A and B on run number first, then
628 on the current record according to SCP, but in descending
631 compare_record_run_minheap (const void *a, const void *b, void *irs)
633 return -compare_record_run (a, b, irs);
636 /* Begins a new initial run, specifically its output file. */
638 start_run (struct initial_run_state *irs)
642 irs->casefile = casefile_create (irs->xsrt->value_cnt);
643 casefile_to_disk (irs->casefile);
644 case_nullify (&irs->last_output);
647 /* Ends the current initial run. */
649 end_run (struct initial_run_state *irs)
651 struct external_sort *xsrt = irs->xsrt;
653 /* Record initial run. */
654 if (irs->casefile != NULL)
656 if (xsrt->run_cnt >= xsrt->run_cap)
660 = xrealloc (xsrt->initial_runs,
661 sizeof *xsrt->initial_runs * xsrt->run_cap);
663 xsrt->initial_runs[xsrt->run_cnt++] = irs->casefile;
664 irs->casefile = NULL;
668 /* Writes a record to the current initial run. */
670 output_record (struct initial_run_state *irs)
672 struct record_run *record_run;
673 struct ccase case_tmp;
675 /* Extract minimum case from heap. */
676 assert (irs->record_cnt > 0);
677 pop_heap (irs->records, irs->record_cnt--, sizeof *irs->records,
678 compare_record_run_minheap, irs);
679 record_run = irs->records + irs->record_cnt;
681 /* Bail if an error has occurred. */
685 /* Start new run if necessary. */
686 assert (record_run->run == irs->run
687 || record_run->run == irs->run + 1);
688 if (record_run->run != irs->run)
693 assert (record_run->run == irs->run);
697 if (irs->casefile != NULL)
698 casefile_append (irs->casefile, &record_run->record);
700 /* This record becomes last_output. */
701 irs->last_output = case_tmp = record_run->record;
702 record_run->record = irs->records[irs->record_cap - 1].record;
703 irs->records[irs->record_cap - 1].record = case_tmp;
708 /* State of merging initial runs. */
711 struct external_sort *xsrt; /* External sort state. */
712 struct ccase *cases; /* Buffers. */
713 size_t case_cnt; /* Number of buffers. */
717 static struct casefile *merge_once (struct merge_state *,
718 struct casefile *[], size_t);
719 static int mod (int, int);
721 /* Performs a series of P-way merges of initial runs. */
723 merge (struct external_sort *xsrt)
725 struct merge_state mrg; /* State of merge. */
726 size_t approx_case_cost; /* Approximate memory cost of one case. */
727 int max_order; /* Maximum order of merge. */
728 size_t dummy_run_cnt; /* Number of dummy runs to insert. */
734 /* Allocate as many cases as possible into cases. */
735 approx_case_cost = (sizeof *mrg.cases
736 + xsrt->value_cnt * sizeof (union value)
737 + 4 * sizeof (void *));
738 mrg.case_cnt = get_max_workspace() / approx_case_cost;
739 mrg.cases = malloc (sizeof *mrg.cases * mrg.case_cnt);
740 if (mrg.cases == NULL)
742 for (i = 0; i < mrg.case_cnt; i++)
743 if (!case_try_create (&mrg.cases[i], xsrt->value_cnt))
748 if (mrg.case_cnt < MIN_BUFFER_TOTAL_SIZE_RECS)
750 msg (SE, _("Out of memory. Could not allocate room for minimum of %d "
751 "cases of %d bytes each. (PSPP workspace is currently "
752 "restricted to a maximum of %d KB.)"),
753 MIN_BUFFER_TOTAL_SIZE_RECS, approx_case_cost, get_max_workspace() / 1024);
757 /* Determine maximum order of merge. */
758 max_order = MAX_MERGE_ORDER;
759 if (mrg.case_cnt / max_order < MIN_BUFFER_SIZE_RECS)
760 max_order = mrg.case_cnt / MIN_BUFFER_SIZE_RECS;
761 else if (mrg.case_cnt / max_order * xsrt->value_cnt * sizeof (union value)
762 < MIN_BUFFER_SIZE_BYTES)
763 max_order = mrg.case_cnt / (MIN_BUFFER_SIZE_BYTES
764 / (xsrt->value_cnt * sizeof (union value)));
767 if (max_order > xsrt->run_cnt)
768 max_order = xsrt->run_cnt;
770 /* Repeatedly merge the P shortest existing runs until only one run
772 make_heap (xsrt->initial_runs, xsrt->run_cnt, sizeof *xsrt->initial_runs,
773 compare_initial_runs, NULL);
774 dummy_run_cnt = mod (1 - (int) xsrt->run_cnt, max_order - 1);
776 assert (max_order > 0);
777 assert (max_order <= 2
778 || (xsrt->run_cnt + dummy_run_cnt) % (max_order - 1) == 1);
779 while (xsrt->run_cnt > 1)
781 struct casefile *output_run;
785 /* Choose order of merge (max_order after first merge). */
786 order = max_order - dummy_run_cnt;
789 /* Choose runs to merge. */
790 assert (xsrt->run_cnt >= order);
791 for (i = 0; i < order; i++)
792 pop_heap (xsrt->initial_runs, xsrt->run_cnt--,
793 sizeof *xsrt->initial_runs,
794 compare_initial_runs, NULL);
797 output_run = merge_once (&mrg,
798 xsrt->initial_runs + xsrt->run_cnt, order);
799 if (output_run == NULL)
802 /* Add output run to heap. */
803 xsrt->initial_runs[xsrt->run_cnt++] = output_run;
804 push_heap (xsrt->initial_runs, xsrt->run_cnt, sizeof *xsrt->initial_runs,
805 compare_initial_runs, NULL);
808 /* Exactly one run is left, which contains the entire sorted
809 file. We could use it to find a total case count. */
810 assert (xsrt->run_cnt == 1);
815 for (i = 0; i < mrg.case_cnt; i++)
816 case_destroy (&mrg.cases[i]);
822 /* Modulo function as defined by Knuth. */
830 else if (x > 0 && y > 0)
832 else if (x < 0 && y > 0)
838 /* Merges the RUN_CNT initial runs specified in INPUT_RUNS into a
839 new run. Returns nonzero only if successful. Adds an entry
840 to MRG->xsrt->runs for the output file if and only if the
841 output file is actually created. Always deletes all the input
843 static struct casefile *
844 merge_once (struct merge_state *mrg,
845 struct casefile *input_runs[],
848 struct casereader *input_readers[MAX_MERGE_ORDER];
849 struct ccase input_cases[MAX_MERGE_ORDER];
850 struct casefile *output_casefile = NULL;
853 for (i = 0; i < run_cnt; i++)
855 input_readers[i] = casefile_get_destructive_reader (input_runs[i]);
856 if (!casereader_read_xfer (input_readers[i], &input_cases[i]))
863 output_casefile = casefile_create (mrg->xsrt->value_cnt);
864 casefile_to_disk (output_casefile);
873 for (i = 1; i < run_cnt; i++)
874 if (compare_record (&input_cases[i], &input_cases[min_idx],
875 mrg->xsrt->criteria) < 0)
878 /* Write minimum to output file. */
879 casefile_append_xfer (output_casefile, &input_cases[min_idx]);
881 if (!casereader_read_xfer (input_readers[min_idx],
882 &input_cases[min_idx]))
884 casereader_destroy (input_readers[min_idx]);
885 casefile_destroy (input_runs[min_idx]);
888 input_runs[min_idx] = input_runs[run_cnt];
889 input_readers[min_idx] = input_readers[run_cnt];
890 input_cases[min_idx] = input_cases[run_cnt];
894 casefile_sleep (output_casefile);
896 return output_casefile;