1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
38 #include "procedure.h"
41 #define _(msgid) gettext (msgid)
43 #include "debug-print.h"
45 /* These should only be changed for testing purposes. */
47 int max_buffers = INT_MAX;
48 bool allow_internal_sort = true;
50 static int compare_record (const struct ccase *, const struct ccase *,
51 const struct sort_criteria *);
52 static struct casefile *do_internal_sort (struct casereader *,
53 const struct sort_criteria *);
54 static struct casefile *do_external_sort (struct casereader *,
55 const struct sort_criteria *);
57 /* Gets ready to sort the active file, either in-place or to a
60 prepare_to_sort_active_file (void)
64 /* Cancel temporary transformations and PROCESS IF. */
67 expr_free (process_if_expr);
68 process_if_expr = NULL;
70 /* Make sure source cases are in a storage source. */
71 ok = procedure (NULL, NULL);
72 assert (case_source_is_class (vfm_source, &storage_source_class));
77 /* Sorts the active file in-place according to CRITERIA.
78 Returns nonzero if successful. */
80 sort_active_file_in_place (const struct sort_criteria *criteria)
82 struct casefile *src, *dst;
84 if (!prepare_to_sort_active_file ())
87 src = storage_source_get_casefile (vfm_source);
88 dst = sort_execute (casefile_get_destructive_reader (src), criteria);
89 free_case_source (vfm_source);
95 vfm_source = storage_source_create (dst);
99 /* Sorts the active file to a separate casefile. If successful,
100 returns the sorted casefile. Returns a null pointer on
103 sort_active_file_to_casefile (const struct sort_criteria *criteria)
105 struct casefile *src;
107 if (!prepare_to_sort_active_file ())
110 src = storage_source_get_casefile (vfm_source);
111 return sort_execute (casefile_get_reader (src), criteria);
115 /* Reads all the cases from READER, which is destroyed. Sorts
116 the cases according to CRITERIA. Returns the sorted cases in
117 a newly created casefile. */
119 sort_execute (struct casereader *reader, const struct sort_criteria *criteria)
121 struct casefile *output = do_internal_sort (reader, criteria);
123 output = do_external_sort (reader, criteria);
124 casereader_destroy (reader);
128 /* A case and its index. */
131 struct ccase c; /* Case. */
132 unsigned long idx; /* Index to allow for stable sorting. */
135 static int compare_indexed_cases (const void *, const void *, void *);
137 /* If the data is in memory, do an internal sort and return a new
138 casefile for the data. Otherwise, return a null pointer. */
139 static struct casefile *
140 do_internal_sort (struct casereader *reader,
141 const struct sort_criteria *criteria)
143 const struct casefile *src;
144 struct casefile *dst;
145 unsigned long case_cnt;
147 if (!allow_internal_sort)
150 src = casereader_get_casefile (reader);
151 if (casefile_get_case_cnt (src) > 1 && !casefile_in_core (src))
154 case_cnt = casefile_get_case_cnt (src);
155 dst = casefile_create (casefile_get_value_cnt (src));
158 struct indexed_case *cases = nmalloc (sizeof *cases, case_cnt);
163 for (i = 0; i < case_cnt; i++)
165 bool ok = casereader_read_xfer (reader, &cases[i].c);
171 sort (cases, case_cnt, sizeof *cases, compare_indexed_cases,
174 for (i = 0; i < case_cnt; i++)
175 casefile_append_xfer (dst, &cases[i].c);
176 if (casefile_error (dst))
184 casefile_destroy (dst);
192 /* Compares the variables specified by CRITERIA between the cases
193 at A and B, with a "last resort" comparison for stability, and
194 returns a strcmp()-type result. */
196 compare_indexed_cases (const void *a_, const void *b_, void *criteria_)
198 struct sort_criteria *criteria = criteria_;
199 const struct indexed_case *a = a_;
200 const struct indexed_case *b = b_;
201 int result = compare_record (&a->c, &b->c, criteria);
203 result = a->idx < b->idx ? -1 : a->idx > b->idx;
209 /* Maximum order of merge (external sort only). The maximum
210 reasonable value is about 7. Above that, it would be a good
211 idea to use a heap in merge_once() to select the minimum. */
212 #define MAX_MERGE_ORDER 7
214 /* Results of an external sort. */
217 const struct sort_criteria *criteria; /* Sort criteria. */
218 size_t value_cnt; /* Size of data in `union value's. */
219 struct casefile **runs; /* Array of initial runs. */
220 size_t run_cnt, run_cap; /* Number of runs, allocated capacity. */
223 /* Prototypes for helper functions. */
224 static int write_runs (struct external_sort *, struct casereader *);
225 static struct casefile *merge (struct external_sort *);
226 static void destroy_external_sort (struct external_sort *);
228 /* Performs a stable external sort of the active file according
229 to the specification in SCP. Forms initial runs using a heap
230 as a reservoir. Merges the initial runs according to a
231 pattern that assures stability. */
232 static struct casefile *
233 do_external_sort (struct casereader *reader,
234 const struct sort_criteria *criteria)
236 struct external_sort *xsrt;
238 if (!casefile_to_disk (casereader_get_casefile (reader)))
241 xsrt = xmalloc (sizeof *xsrt);
242 xsrt->criteria = criteria;
243 xsrt->value_cnt = casefile_get_value_cnt (casereader_get_casefile (reader));
246 xsrt->runs = xnmalloc (xsrt->run_cap, sizeof *xsrt->runs);
247 if (write_runs (xsrt, reader))
249 struct casefile *output = merge (xsrt);
250 destroy_external_sort (xsrt);
255 destroy_external_sort (xsrt);
262 destroy_external_sort (struct external_sort *xsrt)
268 for (i = 0; i < xsrt->run_cnt; i++)
269 casefile_destroy (xsrt->runs[i]);
275 /* Replacement selection. */
277 /* Pairs a record with a run number. */
280 int run; /* Run number of case. */
281 struct ccase record; /* Case data. */
282 size_t idx; /* Case number (for stability). */
285 /* Represents a set of initial runs during an external sort. */
286 struct initial_run_state
288 struct external_sort *xsrt;
291 struct record_run *records; /* Records arranged as a heap. */
292 size_t record_cnt; /* Current number of records. */
293 size_t record_cap; /* Capacity for records. */
295 /* Run currently being output. */
296 int run; /* Run number. */
297 size_t case_cnt; /* Number of cases so far. */
298 struct casefile *casefile; /* Output file. */
299 struct ccase last_output; /* Record last output. */
301 int okay; /* Zero if an error has been encountered. */
304 static const struct case_sink_class sort_sink_class;
306 static bool destroy_initial_run_state (struct initial_run_state *);
307 static void process_case (struct initial_run_state *, const struct ccase *,
309 static int allocate_cases (struct initial_run_state *);
310 static void output_record (struct initial_run_state *);
311 static void start_run (struct initial_run_state *);
312 static void end_run (struct initial_run_state *);
313 static int compare_record_run (const struct record_run *,
314 const struct record_run *,
315 struct initial_run_state *);
316 static int compare_record_run_minheap (const void *, const void *, void *);
318 /* Reads cases from READER and composes initial runs in XSRT. */
320 write_runs (struct external_sort *xsrt, struct casereader *reader)
322 struct initial_run_state *irs;
327 /* Allocate memory for cases. */
328 irs = xmalloc (sizeof *irs);
331 irs->record_cnt = irs->record_cap = 0;
334 irs->casefile = NULL;
335 case_nullify (&irs->last_output);
337 if (!allocate_cases (irs))
340 /* Create initial runs. */
342 for (; irs->okay && casereader_read (reader, &c); case_destroy (&c))
343 process_case (irs, &c, idx++);
344 while (irs->okay && irs->record_cnt > 0)
351 if (!destroy_initial_run_state (irs))
357 /* Add a single case to an initial run. */
359 process_case (struct initial_run_state *irs, const struct ccase *c, size_t idx)
361 struct record_run *rr;
363 /* Compose record_run for this run and add to heap. */
364 assert (irs->record_cnt < irs->record_cap - 1);
365 rr = irs->records + irs->record_cnt++;
366 case_copy (&rr->record, 0, c, 0, irs->xsrt->value_cnt);
369 if (!case_is_null (&irs->last_output)
370 && compare_record (c, &irs->last_output, irs->xsrt->criteria) < 0)
371 rr->run = irs->run + 1;
372 push_heap (irs->records, irs->record_cnt, sizeof *irs->records,
373 compare_record_run_minheap, irs);
375 /* Output a record if the reservoir is full. */
376 if (irs->record_cnt == irs->record_cap - 1 && irs->okay)
380 /* Destroys the initial run state represented by IRS.
381 Returns true if successful, false if an I/O error occurred. */
383 destroy_initial_run_state (struct initial_run_state *irs)
391 for (i = 0; i < irs->record_cap; i++)
392 case_destroy (&irs->records[i].record);
395 if (irs->casefile != NULL)
396 ok = casefile_sleep (irs->casefile);
402 /* Allocates room for lots of cases as a buffer. */
404 allocate_cases (struct initial_run_state *irs)
406 int approx_case_cost; /* Approximate memory cost of one case in bytes. */
407 int max_cases; /* Maximum number of cases to allocate. */
410 /* Allocate as many cases as we can within the workspace
412 approx_case_cost = (sizeof *irs->records
413 + irs->xsrt->value_cnt * sizeof (union value)
414 + 4 * sizeof (void *));
415 max_cases = get_workspace() / approx_case_cost;
416 if (max_cases > max_buffers)
417 max_cases = max_buffers;
418 irs->records = nmalloc (sizeof *irs->records, max_cases);
419 if (irs->records != NULL)
420 for (i = 0; i < max_cases; i++)
421 if (!case_try_create (&irs->records[i].record, irs->xsrt->value_cnt))
426 irs->record_cap = max_cases;
428 /* Fail if we didn't allocate an acceptable number of cases. */
429 if (irs->records == NULL || max_cases < min_buffers)
431 msg (SE, _("Out of memory. Could not allocate room for minimum of %d "
432 "cases of %d bytes each. (PSPP workspace is currently "
433 "restricted to a maximum of %d KB.)"),
434 min_buffers, approx_case_cost, get_workspace() / 1024);
440 /* Compares the VAR_CNT variables in VARS[] between the `value's at
441 A and B, and returns a strcmp()-type result. */
443 compare_record (const struct ccase *a, const struct ccase *b,
444 const struct sort_criteria *criteria)
451 for (i = 0; i < criteria->crit_cnt; i++)
453 const struct sort_criterion *c = &criteria->crits[i];
458 double af = case_num (a, c->fv);
459 double bf = case_num (b, c->fv);
461 result = af < bf ? -1 : af > bf;
464 result = memcmp (case_str (a, c->fv), case_str (b, c->fv), c->width);
467 return c->dir == SRT_ASCEND ? result : -result;
473 /* Compares record-run tuples A and B on run number first, then
474 on record, then on case index. */
476 compare_record_run (const struct record_run *a,
477 const struct record_run *b,
478 struct initial_run_state *irs)
480 int result = a->run < b->run ? -1 : a->run > b->run;
482 result = compare_record (&a->record, &b->record, irs->xsrt->criteria);
484 result = a->idx < b->idx ? -1 : a->idx > b->idx;
488 /* Compares record-run tuples A and B on run number first, then
489 on the current record according to SCP, but in descending
492 compare_record_run_minheap (const void *a, const void *b, void *irs)
494 return -compare_record_run (a, b, irs);
497 /* Begins a new initial run, specifically its output file. */
499 start_run (struct initial_run_state *irs)
503 irs->casefile = casefile_create (irs->xsrt->value_cnt);
504 casefile_to_disk (irs->casefile);
505 case_nullify (&irs->last_output);
508 /* Ends the current initial run. */
510 end_run (struct initial_run_state *irs)
512 struct external_sort *xsrt = irs->xsrt;
514 /* Record initial run. */
515 if (irs->casefile != NULL)
517 casefile_sleep (irs->casefile);
518 if (xsrt->run_cnt >= xsrt->run_cap)
521 xsrt->runs = xnrealloc (xsrt->runs,
522 xsrt->run_cap, sizeof *xsrt->runs);
524 xsrt->runs[xsrt->run_cnt++] = irs->casefile;
525 if (casefile_error (irs->casefile))
527 irs->casefile = NULL;
531 /* Writes a record to the current initial run. */
533 output_record (struct initial_run_state *irs)
535 struct record_run *record_run;
536 struct ccase case_tmp;
538 /* Extract minimum case from heap. */
539 assert (irs->record_cnt > 0);
540 pop_heap (irs->records, irs->record_cnt--, sizeof *irs->records,
541 compare_record_run_minheap, irs);
542 record_run = irs->records + irs->record_cnt;
544 /* Bail if an error has occurred. */
548 /* Start new run if necessary. */
549 assert (record_run->run == irs->run
550 || record_run->run == irs->run + 1);
551 if (record_run->run != irs->run)
556 assert (record_run->run == irs->run);
560 if (irs->casefile != NULL)
561 casefile_append (irs->casefile, &record_run->record);
563 /* This record becomes last_output. */
564 irs->last_output = case_tmp = record_run->record;
565 record_run->record = irs->records[irs->record_cap - 1].record;
566 irs->records[irs->record_cap - 1].record = case_tmp;
571 static int choose_merge (struct casefile *runs[], int run_cnt, int order);
572 static struct casefile *merge_once (struct external_sort *,
573 struct casefile *[], size_t);
575 /* Repeatedly merges run until only one is left,
576 and returns the final casefile.
577 Returns a null pointer if an I/O error occurs. */
578 static struct casefile *
579 merge (struct external_sort *xsrt)
581 while (xsrt->run_cnt > 1)
583 int order = min (MAX_MERGE_ORDER, xsrt->run_cnt);
584 int idx = choose_merge (xsrt->runs, xsrt->run_cnt, order);
585 xsrt->runs[idx] = merge_once (xsrt, xsrt->runs + idx, order);
586 remove_range (xsrt->runs, xsrt->run_cnt, sizeof *xsrt->runs,
588 xsrt->run_cnt -= order - 1;
590 if (xsrt->runs[idx] == NULL)
593 assert (xsrt->run_cnt == 1);
595 return xsrt->runs[0];
598 /* Chooses ORDER runs out of the RUN_CNT runs in RUNS to merge,
599 and returns the index of the first one.
601 For stability, we must merge only consecutive runs. For
602 efficiency, we choose the shortest consecutive sequence of
605 choose_merge (struct casefile *runs[], int run_cnt, int order)
607 int min_idx, min_sum;
608 int cur_idx, cur_sum;
611 /* Sum up the length of the first ORDER runs. */
613 for (i = 0; i < order; i++)
614 cur_sum += casefile_get_case_cnt (runs[i]);
616 /* Find the shortest group of ORDER runs,
617 using a running total for efficiency. */
620 for (cur_idx = 1; cur_idx + order <= run_cnt; cur_idx++)
622 cur_sum -= casefile_get_case_cnt (runs[cur_idx - 1]);
623 cur_sum += casefile_get_case_cnt (runs[cur_idx + order - 1]);
624 if (cur_sum < min_sum)
634 /* Merges the RUN_CNT initial runs specified in INPUT_FILES into a
635 new run, and returns the new run.
636 Returns a null pointer if an I/O error occurs. */
637 static struct casefile *
638 merge_once (struct external_sort *xsrt,
639 struct casefile **const input_files,
644 struct casefile *file;
645 struct casereader *reader;
650 struct casefile *output = NULL;
653 /* Open input files. */
654 runs = xnmalloc (run_cnt, sizeof *runs);
655 for (i = 0; i < run_cnt; i++)
657 struct run *r = &runs[i];
658 r->file = input_files[i];
659 r->reader = casefile_get_destructive_reader (r->file);
660 if (!casereader_read_xfer (r->reader, &r->ccase))
667 /* Create output file. */
668 output = casefile_create (xsrt->value_cnt);
669 casefile_to_disk (output);
674 struct run *min_run, *run;
678 for (run = runs + 1; run < runs + run_cnt; run++)
679 if (compare_record (&run->ccase, &min_run->ccase, xsrt->criteria) < 0)
682 /* Write minimum to output file. */
683 casefile_append_xfer (output, &min_run->ccase);
685 /* Read another case from minimum run. */
686 if (!casereader_read_xfer (min_run->reader, &min_run->ccase))
688 if (casefile_error (min_run->file) || casefile_error (output))
690 casereader_destroy (min_run->reader);
691 casefile_destroy (min_run->file);
693 remove_element (runs, run_cnt, sizeof *runs, min_run - runs);
698 if (!casefile_sleep (output))
705 for (i = 0; i < run_cnt; i++)
706 casefile_destroy (runs[i].file);
707 casefile_destroy (output);