1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 #include <libpspp/message.h>
23 #include <libpspp/alloc.h>
28 #include <libpspp/array.h>
30 #include <data/case.h>
31 #include <data/casefile.h>
32 #include <libpspp/message.h>
33 #include <language/expressions/public.h>
35 #include <libpspp/misc.h>
36 #include <data/settings.h>
37 #include <libpspp/str.h>
38 #include <data/variable.h>
39 #include <procedure.h>
42 #define _(msgid) gettext (msgid)
44 /* These should only be changed for testing purposes. */
46 int max_buffers = INT_MAX;
47 bool allow_internal_sort = true;
49 static int compare_record (const struct ccase *, const struct ccase *,
50 const struct sort_criteria *);
51 static struct casefile *do_internal_sort (struct casereader *,
52 const struct sort_criteria *);
53 static struct casefile *do_external_sort (struct casereader *,
54 const struct sort_criteria *);
56 /* Gets ready to sort the active file, either in-place or to a
59 prepare_to_sort_active_file (void)
63 /* Cancel temporary transformations and PROCESS IF. */
66 expr_free (process_if_expr);
67 process_if_expr = NULL;
69 /* Make sure source cases are in a storage source. */
70 ok = procedure (NULL, NULL);
71 assert (case_source_is_class (vfm_source, &storage_source_class));
76 /* Sorts the active file in-place according to CRITERIA.
77 Returns nonzero if successful. */
79 sort_active_file_in_place (const struct sort_criteria *criteria)
81 struct casefile *src, *dst;
83 if (!prepare_to_sort_active_file ())
86 src = storage_source_get_casefile (vfm_source);
87 dst = sort_execute (casefile_get_destructive_reader (src), criteria);
88 free_case_source (vfm_source);
94 vfm_source = storage_source_create (dst);
98 /* Sorts the active file to a separate casefile. If successful,
99 returns the sorted casefile. Returns a null pointer on
102 sort_active_file_to_casefile (const struct sort_criteria *criteria)
104 struct casefile *src;
106 if (!prepare_to_sort_active_file ())
109 src = storage_source_get_casefile (vfm_source);
110 return sort_execute (casefile_get_reader (src), criteria);
114 /* Reads all the cases from READER, which is destroyed. Sorts
115 the cases according to CRITERIA. Returns the sorted cases in
116 a newly created casefile. */
118 sort_execute (struct casereader *reader, const struct sort_criteria *criteria)
120 struct casefile *output = do_internal_sort (reader, criteria);
122 output = do_external_sort (reader, criteria);
123 casereader_destroy (reader);
127 /* A case and its index. */
130 struct ccase c; /* Case. */
131 unsigned long idx; /* Index to allow for stable sorting. */
134 static int compare_indexed_cases (const void *, const void *, void *);
136 /* If the data is in memory, do an internal sort and return a new
137 casefile for the data. Otherwise, return a null pointer. */
138 static struct casefile *
139 do_internal_sort (struct casereader *reader,
140 const struct sort_criteria *criteria)
142 const struct casefile *src;
143 struct casefile *dst;
144 unsigned long case_cnt;
146 if (!allow_internal_sort)
149 src = casereader_get_casefile (reader);
150 if (casefile_get_case_cnt (src) > 1 && !casefile_in_core (src))
153 case_cnt = casefile_get_case_cnt (src);
154 dst = casefile_create (casefile_get_value_cnt (src));
157 struct indexed_case *cases = nmalloc (sizeof *cases, case_cnt);
162 for (i = 0; i < case_cnt; i++)
164 bool ok = casereader_read_xfer (reader, &cases[i].c);
170 sort (cases, case_cnt, sizeof *cases, compare_indexed_cases,
173 for (i = 0; i < case_cnt; i++)
174 casefile_append_xfer (dst, &cases[i].c);
175 if (casefile_error (dst))
183 casefile_destroy (dst);
191 /* Compares the variables specified by CRITERIA between the cases
192 at A and B, with a "last resort" comparison for stability, and
193 returns a strcmp()-type result. */
195 compare_indexed_cases (const void *a_, const void *b_, void *criteria_)
197 struct sort_criteria *criteria = criteria_;
198 const struct indexed_case *a = a_;
199 const struct indexed_case *b = b_;
200 int result = compare_record (&a->c, &b->c, criteria);
202 result = a->idx < b->idx ? -1 : a->idx > b->idx;
208 /* Maximum order of merge (external sort only). The maximum
209 reasonable value is about 7. Above that, it would be a good
210 idea to use a heap in merge_once() to select the minimum. */
211 #define MAX_MERGE_ORDER 7
213 /* Results of an external sort. */
216 const struct sort_criteria *criteria; /* Sort criteria. */
217 size_t value_cnt; /* Size of data in `union value's. */
218 struct casefile **runs; /* Array of initial runs. */
219 size_t run_cnt, run_cap; /* Number of runs, allocated capacity. */
222 /* Prototypes for helper functions. */
223 static int write_runs (struct external_sort *, struct casereader *);
224 static struct casefile *merge (struct external_sort *);
225 static void destroy_external_sort (struct external_sort *);
227 /* Performs a stable external sort of the active file according
228 to the specification in SCP. Forms initial runs using a heap
229 as a reservoir. Merges the initial runs according to a
230 pattern that assures stability. */
231 static struct casefile *
232 do_external_sort (struct casereader *reader,
233 const struct sort_criteria *criteria)
235 struct external_sort *xsrt;
237 if (!casefile_to_disk (casereader_get_casefile (reader)))
240 xsrt = xmalloc (sizeof *xsrt);
241 xsrt->criteria = criteria;
242 xsrt->value_cnt = casefile_get_value_cnt (casereader_get_casefile (reader));
245 xsrt->runs = xnmalloc (xsrt->run_cap, sizeof *xsrt->runs);
246 if (write_runs (xsrt, reader))
248 struct casefile *output = merge (xsrt);
249 destroy_external_sort (xsrt);
254 destroy_external_sort (xsrt);
261 destroy_external_sort (struct external_sort *xsrt)
267 for (i = 0; i < xsrt->run_cnt; i++)
268 casefile_destroy (xsrt->runs[i]);
274 /* Replacement selection. */
276 /* Pairs a record with a run number. */
279 int run; /* Run number of case. */
280 struct ccase record; /* Case data. */
281 size_t idx; /* Case number (for stability). */
284 /* Represents a set of initial runs during an external sort. */
285 struct initial_run_state
287 struct external_sort *xsrt;
290 struct record_run *records; /* Records arranged as a heap. */
291 size_t record_cnt; /* Current number of records. */
292 size_t record_cap; /* Capacity for records. */
294 /* Run currently being output. */
295 int run; /* Run number. */
296 size_t case_cnt; /* Number of cases so far. */
297 struct casefile *casefile; /* Output file. */
298 struct ccase last_output; /* Record last output. */
300 int okay; /* Zero if an error has been encountered. */
303 static const struct case_sink_class sort_sink_class;
305 static bool destroy_initial_run_state (struct initial_run_state *);
306 static void process_case (struct initial_run_state *, const struct ccase *,
308 static int allocate_cases (struct initial_run_state *);
309 static void output_record (struct initial_run_state *);
310 static void start_run (struct initial_run_state *);
311 static void end_run (struct initial_run_state *);
312 static int compare_record_run (const struct record_run *,
313 const struct record_run *,
314 struct initial_run_state *);
315 static int compare_record_run_minheap (const void *, const void *, void *);
317 /* Reads cases from READER and composes initial runs in XSRT. */
319 write_runs (struct external_sort *xsrt, struct casereader *reader)
321 struct initial_run_state *irs;
326 /* Allocate memory for cases. */
327 irs = xmalloc (sizeof *irs);
330 irs->record_cnt = irs->record_cap = 0;
333 irs->casefile = NULL;
334 case_nullify (&irs->last_output);
336 if (!allocate_cases (irs))
339 /* Create initial runs. */
341 for (; irs->okay && casereader_read (reader, &c); case_destroy (&c))
342 process_case (irs, &c, idx++);
343 while (irs->okay && irs->record_cnt > 0)
350 if (!destroy_initial_run_state (irs))
356 /* Add a single case to an initial run. */
358 process_case (struct initial_run_state *irs, const struct ccase *c, size_t idx)
360 struct record_run *rr;
362 /* Compose record_run for this run and add to heap. */
363 assert (irs->record_cnt < irs->record_cap - 1);
364 rr = irs->records + irs->record_cnt++;
365 case_copy (&rr->record, 0, c, 0, irs->xsrt->value_cnt);
368 if (!case_is_null (&irs->last_output)
369 && compare_record (c, &irs->last_output, irs->xsrt->criteria) < 0)
370 rr->run = irs->run + 1;
371 push_heap (irs->records, irs->record_cnt, sizeof *irs->records,
372 compare_record_run_minheap, irs);
374 /* Output a record if the reservoir is full. */
375 if (irs->record_cnt == irs->record_cap - 1 && irs->okay)
379 /* Destroys the initial run state represented by IRS.
380 Returns true if successful, false if an I/O error occurred. */
382 destroy_initial_run_state (struct initial_run_state *irs)
390 for (i = 0; i < irs->record_cap; i++)
391 case_destroy (&irs->records[i].record);
394 if (irs->casefile != NULL)
395 ok = casefile_sleep (irs->casefile);
401 /* Allocates room for lots of cases as a buffer. */
403 allocate_cases (struct initial_run_state *irs)
405 int approx_case_cost; /* Approximate memory cost of one case in bytes. */
406 int max_cases; /* Maximum number of cases to allocate. */
409 /* Allocate as many cases as we can within the workspace
411 approx_case_cost = (sizeof *irs->records
412 + irs->xsrt->value_cnt * sizeof (union value)
413 + 4 * sizeof (void *));
414 max_cases = get_workspace() / approx_case_cost;
415 if (max_cases > max_buffers)
416 max_cases = max_buffers;
417 irs->records = nmalloc (sizeof *irs->records, max_cases);
418 if (irs->records != NULL)
419 for (i = 0; i < max_cases; i++)
420 if (!case_try_create (&irs->records[i].record, irs->xsrt->value_cnt))
425 irs->record_cap = max_cases;
427 /* Fail if we didn't allocate an acceptable number of cases. */
428 if (irs->records == NULL || max_cases < min_buffers)
430 msg (SE, _("Out of memory. Could not allocate room for minimum of %d "
431 "cases of %d bytes each. (PSPP workspace is currently "
432 "restricted to a maximum of %d KB.)"),
433 min_buffers, approx_case_cost, get_workspace() / 1024);
439 /* Compares the VAR_CNT variables in VARS[] between the `value's at
440 A and B, and returns a strcmp()-type result. */
442 compare_record (const struct ccase *a, const struct ccase *b,
443 const struct sort_criteria *criteria)
450 for (i = 0; i < criteria->crit_cnt; i++)
452 const struct sort_criterion *c = &criteria->crits[i];
457 double af = case_num (a, c->fv);
458 double bf = case_num (b, c->fv);
460 result = af < bf ? -1 : af > bf;
463 result = memcmp (case_str (a, c->fv), case_str (b, c->fv), c->width);
466 return c->dir == SRT_ASCEND ? result : -result;
472 /* Compares record-run tuples A and B on run number first, then
473 on record, then on case index. */
475 compare_record_run (const struct record_run *a,
476 const struct record_run *b,
477 struct initial_run_state *irs)
479 int result = a->run < b->run ? -1 : a->run > b->run;
481 result = compare_record (&a->record, &b->record, irs->xsrt->criteria);
483 result = a->idx < b->idx ? -1 : a->idx > b->idx;
487 /* Compares record-run tuples A and B on run number first, then
488 on the current record according to SCP, but in descending
491 compare_record_run_minheap (const void *a, const void *b, void *irs)
493 return -compare_record_run (a, b, irs);
496 /* Begins a new initial run, specifically its output file. */
498 start_run (struct initial_run_state *irs)
502 irs->casefile = casefile_create (irs->xsrt->value_cnt);
503 casefile_to_disk (irs->casefile);
504 case_nullify (&irs->last_output);
507 /* Ends the current initial run. */
509 end_run (struct initial_run_state *irs)
511 struct external_sort *xsrt = irs->xsrt;
513 /* Record initial run. */
514 if (irs->casefile != NULL)
516 casefile_sleep (irs->casefile);
517 if (xsrt->run_cnt >= xsrt->run_cap)
520 xsrt->runs = xnrealloc (xsrt->runs,
521 xsrt->run_cap, sizeof *xsrt->runs);
523 xsrt->runs[xsrt->run_cnt++] = irs->casefile;
524 if (casefile_error (irs->casefile))
526 irs->casefile = NULL;
530 /* Writes a record to the current initial run. */
532 output_record (struct initial_run_state *irs)
534 struct record_run *record_run;
535 struct ccase case_tmp;
537 /* Extract minimum case from heap. */
538 assert (irs->record_cnt > 0);
539 pop_heap (irs->records, irs->record_cnt--, sizeof *irs->records,
540 compare_record_run_minheap, irs);
541 record_run = irs->records + irs->record_cnt;
543 /* Bail if an error has occurred. */
547 /* Start new run if necessary. */
548 assert (record_run->run == irs->run
549 || record_run->run == irs->run + 1);
550 if (record_run->run != irs->run)
555 assert (record_run->run == irs->run);
559 if (irs->casefile != NULL)
560 casefile_append (irs->casefile, &record_run->record);
562 /* This record becomes last_output. */
563 irs->last_output = case_tmp = record_run->record;
564 record_run->record = irs->records[irs->record_cap - 1].record;
565 irs->records[irs->record_cap - 1].record = case_tmp;
570 static int choose_merge (struct casefile *runs[], int run_cnt, int order);
571 static struct casefile *merge_once (struct external_sort *,
572 struct casefile *[], size_t);
574 /* Repeatedly merges run until only one is left,
575 and returns the final casefile.
576 Returns a null pointer if an I/O error occurs. */
577 static struct casefile *
578 merge (struct external_sort *xsrt)
580 while (xsrt->run_cnt > 1)
582 int order = min (MAX_MERGE_ORDER, xsrt->run_cnt);
583 int idx = choose_merge (xsrt->runs, xsrt->run_cnt, order);
584 xsrt->runs[idx] = merge_once (xsrt, xsrt->runs + idx, order);
585 remove_range (xsrt->runs, xsrt->run_cnt, sizeof *xsrt->runs,
587 xsrt->run_cnt -= order - 1;
589 if (xsrt->runs[idx] == NULL)
592 assert (xsrt->run_cnt == 1);
594 return xsrt->runs[0];
597 /* Chooses ORDER runs out of the RUN_CNT runs in RUNS to merge,
598 and returns the index of the first one.
600 For stability, we must merge only consecutive runs. For
601 efficiency, we choose the shortest consecutive sequence of
604 choose_merge (struct casefile *runs[], int run_cnt, int order)
606 int min_idx, min_sum;
607 int cur_idx, cur_sum;
610 /* Sum up the length of the first ORDER runs. */
612 for (i = 0; i < order; i++)
613 cur_sum += casefile_get_case_cnt (runs[i]);
615 /* Find the shortest group of ORDER runs,
616 using a running total for efficiency. */
619 for (cur_idx = 1; cur_idx + order <= run_cnt; cur_idx++)
621 cur_sum -= casefile_get_case_cnt (runs[cur_idx - 1]);
622 cur_sum += casefile_get_case_cnt (runs[cur_idx + order - 1]);
623 if (cur_sum < min_sum)
633 /* Merges the RUN_CNT initial runs specified in INPUT_FILES into a
634 new run, and returns the new run.
635 Returns a null pointer if an I/O error occurs. */
636 static struct casefile *
637 merge_once (struct external_sort *xsrt,
638 struct casefile **const input_files,
643 struct casefile *file;
644 struct casereader *reader;
649 struct casefile *output = NULL;
652 /* Open input files. */
653 runs = xnmalloc (run_cnt, sizeof *runs);
654 for (i = 0; i < run_cnt; i++)
656 struct run *r = &runs[i];
657 r->file = input_files[i];
658 r->reader = casefile_get_destructive_reader (r->file);
659 if (!casereader_read_xfer (r->reader, &r->ccase))
666 /* Create output file. */
667 output = casefile_create (xsrt->value_cnt);
668 casefile_to_disk (output);
673 struct run *min_run, *run;
677 for (run = runs + 1; run < runs + run_cnt; run++)
678 if (compare_record (&run->ccase, &min_run->ccase, xsrt->criteria) < 0)
681 /* Write minimum to output file. */
682 casefile_append_xfer (output, &min_run->ccase);
684 /* Read another case from minimum run. */
685 if (!casereader_read_xfer (min_run->reader, &min_run->ccase))
687 if (casefile_error (min_run->file) || casefile_error (output))
689 casereader_destroy (min_run->reader);
690 casefile_destroy (min_run->file);
692 remove_element (runs, run_cnt, sizeof *runs, min_run - runs);
697 if (!casefile_sleep (output))
704 for (i = 0; i < run_cnt; i++)
705 casefile_destroy (runs[i].file);
706 casefile_destroy (output);