1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
30 #include <data/case-source.h>
31 #include <data/case.h>
32 #include <data/casefile.h>
33 #include <data/settings.h>
34 #include <data/variable.h>
35 #include <data/storage-stream.h>
36 #include <language/expressions/public.h>
37 #include <libpspp/alloc.h>
38 #include <libpspp/array.h>
39 #include <libpspp/message.h>
40 #include <libpspp/message.h>
41 #include <libpspp/misc.h>
42 #include <libpspp/str.h>
43 #include <procedure.h>
46 #define _(msgid) gettext (msgid)
48 /* These should only be changed for testing purposes. */
50 int max_buffers = INT_MAX;
51 bool allow_internal_sort = true;
53 static int compare_record (const struct ccase *, const struct ccase *,
54 const struct sort_criteria *);
55 static struct casefile *do_internal_sort (struct casereader *,
56 const struct sort_criteria *);
57 static struct casefile *do_external_sort (struct casereader *,
58 const struct sort_criteria *);
60 /* Gets ready to sort the active file, either in-place or to a
63 prepare_to_sort_active_file (void)
67 /* Cancel temporary transformations and PROCESS IF. */
70 expr_free (process_if_expr);
71 process_if_expr = NULL;
73 /* Make sure source cases are in a storage source. */
74 ok = procedure (NULL, NULL);
75 assert (case_source_is_class (vfm_source, &storage_source_class));
80 /* Sorts the active file in-place according to CRITERIA.
81 Returns nonzero if successful. */
83 sort_active_file_in_place (const struct sort_criteria *criteria)
85 struct casefile *src, *dst;
87 if (!prepare_to_sort_active_file ())
90 src = storage_source_get_casefile (vfm_source);
91 dst = sort_execute (casefile_get_destructive_reader (src), criteria);
92 free_case_source (vfm_source);
98 vfm_source = storage_source_create (dst);
102 /* Sorts the active file to a separate casefile. If successful,
103 returns the sorted casefile. Returns a null pointer on
106 sort_active_file_to_casefile (const struct sort_criteria *criteria)
108 struct casefile *src;
110 if (!prepare_to_sort_active_file ())
113 src = storage_source_get_casefile (vfm_source);
114 return sort_execute (casefile_get_reader (src), criteria);
118 /* Reads all the cases from READER, which is destroyed. Sorts
119 the cases according to CRITERIA. Returns the sorted cases in
120 a newly created casefile. */
122 sort_execute (struct casereader *reader, const struct sort_criteria *criteria)
124 struct casefile *output = do_internal_sort (reader, criteria);
126 output = do_external_sort (reader, criteria);
127 casereader_destroy (reader);
131 /* A case and its index. */
134 struct ccase c; /* Case. */
135 unsigned long idx; /* Index to allow for stable sorting. */
138 static int compare_indexed_cases (const void *, const void *, void *);
140 /* If the data is in memory, do an internal sort and return a new
141 casefile for the data. Otherwise, return a null pointer. */
142 static struct casefile *
143 do_internal_sort (struct casereader *reader,
144 const struct sort_criteria *criteria)
146 const struct casefile *src;
147 struct casefile *dst;
148 unsigned long case_cnt;
150 if (!allow_internal_sort)
153 src = casereader_get_casefile (reader);
154 if (casefile_get_case_cnt (src) > 1 && !casefile_in_core (src))
157 case_cnt = casefile_get_case_cnt (src);
158 dst = casefile_create (casefile_get_value_cnt (src));
161 struct indexed_case *cases = nmalloc (sizeof *cases, case_cnt);
166 for (i = 0; i < case_cnt; i++)
168 bool ok = casereader_read_xfer (reader, &cases[i].c);
174 sort (cases, case_cnt, sizeof *cases, compare_indexed_cases,
177 for (i = 0; i < case_cnt; i++)
178 casefile_append_xfer (dst, &cases[i].c);
179 if (casefile_error (dst))
187 casefile_destroy (dst);
195 /* Compares the variables specified by CRITERIA between the cases
196 at A and B, with a "last resort" comparison for stability, and
197 returns a strcmp()-type result. */
199 compare_indexed_cases (const void *a_, const void *b_, void *criteria_)
201 struct sort_criteria *criteria = criteria_;
202 const struct indexed_case *a = a_;
203 const struct indexed_case *b = b_;
204 int result = compare_record (&a->c, &b->c, criteria);
206 result = a->idx < b->idx ? -1 : a->idx > b->idx;
212 /* Maximum order of merge (external sort only). The maximum
213 reasonable value is about 7. Above that, it would be a good
214 idea to use a heap in merge_once() to select the minimum. */
215 #define MAX_MERGE_ORDER 7
217 /* Results of an external sort. */
220 const struct sort_criteria *criteria; /* Sort criteria. */
221 size_t value_cnt; /* Size of data in `union value's. */
222 struct casefile **runs; /* Array of initial runs. */
223 size_t run_cnt, run_cap; /* Number of runs, allocated capacity. */
226 /* Prototypes for helper functions. */
227 static int write_runs (struct external_sort *, struct casereader *);
228 static struct casefile *merge (struct external_sort *);
229 static void destroy_external_sort (struct external_sort *);
231 /* Performs a stable external sort of the active file according
232 to the specification in SCP. Forms initial runs using a heap
233 as a reservoir. Merges the initial runs according to a
234 pattern that assures stability. */
235 static struct casefile *
236 do_external_sort (struct casereader *reader,
237 const struct sort_criteria *criteria)
239 struct external_sort *xsrt;
241 if (!casefile_to_disk (casereader_get_casefile (reader)))
244 xsrt = xmalloc (sizeof *xsrt);
245 xsrt->criteria = criteria;
246 xsrt->value_cnt = casefile_get_value_cnt (casereader_get_casefile (reader));
249 xsrt->runs = xnmalloc (xsrt->run_cap, sizeof *xsrt->runs);
250 if (write_runs (xsrt, reader))
252 struct casefile *output = merge (xsrt);
253 destroy_external_sort (xsrt);
258 destroy_external_sort (xsrt);
265 destroy_external_sort (struct external_sort *xsrt)
271 for (i = 0; i < xsrt->run_cnt; i++)
272 casefile_destroy (xsrt->runs[i]);
278 /* Replacement selection. */
280 /* Pairs a record with a run number. */
283 int run; /* Run number of case. */
284 struct ccase record; /* Case data. */
285 size_t idx; /* Case number (for stability). */
288 /* Represents a set of initial runs during an external sort. */
289 struct initial_run_state
291 struct external_sort *xsrt;
294 struct record_run *records; /* Records arranged as a heap. */
295 size_t record_cnt; /* Current number of records. */
296 size_t record_cap; /* Capacity for records. */
298 /* Run currently being output. */
299 int run; /* Run number. */
300 size_t case_cnt; /* Number of cases so far. */
301 struct casefile *casefile; /* Output file. */
302 struct ccase last_output; /* Record last output. */
304 int okay; /* Zero if an error has been encountered. */
307 static bool destroy_initial_run_state (struct initial_run_state *);
308 static void process_case (struct initial_run_state *, const struct ccase *,
310 static int allocate_cases (struct initial_run_state *);
311 static void output_record (struct initial_run_state *);
312 static void start_run (struct initial_run_state *);
313 static void end_run (struct initial_run_state *);
314 static int compare_record_run (const struct record_run *,
315 const struct record_run *,
316 struct initial_run_state *);
317 static int compare_record_run_minheap (const void *, const void *, void *);
319 /* Reads cases from READER and composes initial runs in XSRT. */
321 write_runs (struct external_sort *xsrt, struct casereader *reader)
323 struct initial_run_state *irs;
328 /* Allocate memory for cases. */
329 irs = xmalloc (sizeof *irs);
332 irs->record_cnt = irs->record_cap = 0;
335 irs->casefile = NULL;
336 case_nullify (&irs->last_output);
338 if (!allocate_cases (irs))
341 /* Create initial runs. */
343 for (; irs->okay && casereader_read (reader, &c); case_destroy (&c))
344 process_case (irs, &c, idx++);
345 while (irs->okay && irs->record_cnt > 0)
352 if (!destroy_initial_run_state (irs))
358 /* Add a single case to an initial run. */
360 process_case (struct initial_run_state *irs, const struct ccase *c, size_t idx)
362 struct record_run *rr;
364 /* Compose record_run for this run and add to heap. */
365 assert (irs->record_cnt < irs->record_cap - 1);
366 rr = irs->records + irs->record_cnt++;
367 case_copy (&rr->record, 0, c, 0, irs->xsrt->value_cnt);
370 if (!case_is_null (&irs->last_output)
371 && compare_record (c, &irs->last_output, irs->xsrt->criteria) < 0)
372 rr->run = irs->run + 1;
373 push_heap (irs->records, irs->record_cnt, sizeof *irs->records,
374 compare_record_run_minheap, irs);
376 /* Output a record if the reservoir is full. */
377 if (irs->record_cnt == irs->record_cap - 1 && irs->okay)
381 /* Destroys the initial run state represented by IRS.
382 Returns true if successful, false if an I/O error occurred. */
384 destroy_initial_run_state (struct initial_run_state *irs)
392 for (i = 0; i < irs->record_cap; i++)
393 case_destroy (&irs->records[i].record);
396 if (irs->casefile != NULL)
397 ok = casefile_sleep (irs->casefile);
403 /* Allocates room for lots of cases as a buffer. */
405 allocate_cases (struct initial_run_state *irs)
407 int approx_case_cost; /* Approximate memory cost of one case in bytes. */
408 int max_cases; /* Maximum number of cases to allocate. */
411 /* Allocate as many cases as we can within the workspace
413 approx_case_cost = (sizeof *irs->records
414 + irs->xsrt->value_cnt * sizeof (union value)
415 + 4 * sizeof (void *));
416 max_cases = get_workspace() / approx_case_cost;
417 if (max_cases > max_buffers)
418 max_cases = max_buffers;
419 irs->records = nmalloc (sizeof *irs->records, max_cases);
420 if (irs->records != NULL)
421 for (i = 0; i < max_cases; i++)
422 if (!case_try_create (&irs->records[i].record, irs->xsrt->value_cnt))
427 irs->record_cap = max_cases;
429 /* Fail if we didn't allocate an acceptable number of cases. */
430 if (irs->records == NULL || max_cases < min_buffers)
432 msg (SE, _("Out of memory. Could not allocate room for minimum of %d "
433 "cases of %d bytes each. (PSPP workspace is currently "
434 "restricted to a maximum of %d KB.)"),
435 min_buffers, approx_case_cost, get_workspace() / 1024);
441 /* Compares the VAR_CNT variables in VARS[] between the `value's at
442 A and B, and returns a strcmp()-type result. */
444 compare_record (const struct ccase *a, const struct ccase *b,
445 const struct sort_criteria *criteria)
452 for (i = 0; i < criteria->crit_cnt; i++)
454 const struct sort_criterion *c = &criteria->crits[i];
459 double af = case_num (a, c->fv);
460 double bf = case_num (b, c->fv);
462 result = af < bf ? -1 : af > bf;
465 result = memcmp (case_str (a, c->fv), case_str (b, c->fv), c->width);
468 return c->dir == SRT_ASCEND ? result : -result;
474 /* Compares record-run tuples A and B on run number first, then
475 on record, then on case index. */
477 compare_record_run (const struct record_run *a,
478 const struct record_run *b,
479 struct initial_run_state *irs)
481 int result = a->run < b->run ? -1 : a->run > b->run;
483 result = compare_record (&a->record, &b->record, irs->xsrt->criteria);
485 result = a->idx < b->idx ? -1 : a->idx > b->idx;
489 /* Compares record-run tuples A and B on run number first, then
490 on the current record according to SCP, but in descending
493 compare_record_run_minheap (const void *a, const void *b, void *irs)
495 return -compare_record_run (a, b, irs);
498 /* Begins a new initial run, specifically its output file. */
500 start_run (struct initial_run_state *irs)
504 irs->casefile = casefile_create (irs->xsrt->value_cnt);
505 casefile_to_disk (irs->casefile);
506 case_nullify (&irs->last_output);
509 /* Ends the current initial run. */
511 end_run (struct initial_run_state *irs)
513 struct external_sort *xsrt = irs->xsrt;
515 /* Record initial run. */
516 if (irs->casefile != NULL)
518 casefile_sleep (irs->casefile);
519 if (xsrt->run_cnt >= xsrt->run_cap)
522 xsrt->runs = xnrealloc (xsrt->runs,
523 xsrt->run_cap, sizeof *xsrt->runs);
525 xsrt->runs[xsrt->run_cnt++] = irs->casefile;
526 if (casefile_error (irs->casefile))
528 irs->casefile = NULL;
532 /* Writes a record to the current initial run. */
534 output_record (struct initial_run_state *irs)
536 struct record_run *record_run;
537 struct ccase case_tmp;
539 /* Extract minimum case from heap. */
540 assert (irs->record_cnt > 0);
541 pop_heap (irs->records, irs->record_cnt--, sizeof *irs->records,
542 compare_record_run_minheap, irs);
543 record_run = irs->records + irs->record_cnt;
545 /* Bail if an error has occurred. */
549 /* Start new run if necessary. */
550 assert (record_run->run == irs->run
551 || record_run->run == irs->run + 1);
552 if (record_run->run != irs->run)
557 assert (record_run->run == irs->run);
561 if (irs->casefile != NULL)
562 casefile_append (irs->casefile, &record_run->record);
564 /* This record becomes last_output. */
565 irs->last_output = case_tmp = record_run->record;
566 record_run->record = irs->records[irs->record_cap - 1].record;
567 irs->records[irs->record_cap - 1].record = case_tmp;
572 static int choose_merge (struct casefile *runs[], int run_cnt, int order);
573 static struct casefile *merge_once (struct external_sort *,
574 struct casefile *[], size_t);
576 /* Repeatedly merges run until only one is left,
577 and returns the final casefile.
578 Returns a null pointer if an I/O error occurs. */
579 static struct casefile *
580 merge (struct external_sort *xsrt)
582 while (xsrt->run_cnt > 1)
584 int order = min (MAX_MERGE_ORDER, xsrt->run_cnt);
585 int idx = choose_merge (xsrt->runs, xsrt->run_cnt, order);
586 xsrt->runs[idx] = merge_once (xsrt, xsrt->runs + idx, order);
587 remove_range (xsrt->runs, xsrt->run_cnt, sizeof *xsrt->runs,
589 xsrt->run_cnt -= order - 1;
591 if (xsrt->runs[idx] == NULL)
594 assert (xsrt->run_cnt == 1);
596 return xsrt->runs[0];
599 /* Chooses ORDER runs out of the RUN_CNT runs in RUNS to merge,
600 and returns the index of the first one.
602 For stability, we must merge only consecutive runs. For
603 efficiency, we choose the shortest consecutive sequence of
606 choose_merge (struct casefile *runs[], int run_cnt, int order)
608 int min_idx, min_sum;
609 int cur_idx, cur_sum;
612 /* Sum up the length of the first ORDER runs. */
614 for (i = 0; i < order; i++)
615 cur_sum += casefile_get_case_cnt (runs[i]);
617 /* Find the shortest group of ORDER runs,
618 using a running total for efficiency. */
621 for (cur_idx = 1; cur_idx + order <= run_cnt; cur_idx++)
623 cur_sum -= casefile_get_case_cnt (runs[cur_idx - 1]);
624 cur_sum += casefile_get_case_cnt (runs[cur_idx + order - 1]);
625 if (cur_sum < min_sum)
635 /* Merges the RUN_CNT initial runs specified in INPUT_FILES into a
636 new run, and returns the new run.
637 Returns a null pointer if an I/O error occurs. */
638 static struct casefile *
639 merge_once (struct external_sort *xsrt,
640 struct casefile **const input_files,
645 struct casefile *file;
646 struct casereader *reader;
651 struct casefile *output = NULL;
654 /* Open input files. */
655 runs = xnmalloc (run_cnt, sizeof *runs);
656 for (i = 0; i < run_cnt; i++)
658 struct run *r = &runs[i];
659 r->file = input_files[i];
660 r->reader = casefile_get_destructive_reader (r->file);
661 if (!casereader_read_xfer (r->reader, &r->ccase))
668 /* Create output file. */
669 output = casefile_create (xsrt->value_cnt);
670 casefile_to_disk (output);
675 struct run *min_run, *run;
679 for (run = runs + 1; run < runs + run_cnt; run++)
680 if (compare_record (&run->ccase, &min_run->ccase, xsrt->criteria) < 0)
683 /* Write minimum to output file. */
684 casefile_append_xfer (output, &min_run->ccase);
686 /* Read another case from minimum run. */
687 if (!casereader_read_xfer (min_run->reader, &min_run->ccase))
689 if (casefile_error (min_run->file) || casefile_error (output))
691 casereader_destroy (min_run->reader);
692 casefile_destroy (min_run->file);
694 remove_element (runs, run_cnt, sizeof *runs, min_run - runs);
699 if (!casefile_sleep (output))
706 for (i = 0; i < run_cnt; i++)
707 casefile_destroy (runs[i].file);
708 casefile_destroy (output);