1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
30 #include <data/case-source.h>
31 #include <data/case.h>
32 #include <data/casefile.h>
33 #include <data/procedure.h>
34 #include <data/settings.h>
35 #include <data/variable.h>
36 #include <data/storage-stream.h>
37 #include <language/expressions/public.h>
38 #include <libpspp/alloc.h>
39 #include <libpspp/array.h>
40 #include <libpspp/assertion.h>
41 #include <libpspp/message.h>
42 #include <libpspp/message.h>
43 #include <libpspp/misc.h>
44 #include <libpspp/str.h>
47 #define _(msgid) gettext (msgid)
49 /* These should only be changed for testing purposes. */
51 int max_buffers = INT_MAX;
52 bool allow_internal_sort = true;
54 static int compare_record (const struct ccase *, const struct ccase *,
55 const struct sort_criteria *);
56 static struct casefile *do_internal_sort (struct casereader *,
57 const struct sort_criteria *);
58 static struct casefile *do_external_sort (struct casereader *,
59 const struct sort_criteria *);
61 /* Get ready to sort the active file. */
63 prepare_to_sort_active_file (void)
65 proc_cancel_temporary_transformations ();
68 /* Sorts the active file in-place according to CRITERIA.
69 Returns nonzero if successful. */
71 sort_active_file_in_place (const struct sort_criteria *criteria)
73 struct casefile *in, *out;
75 prepare_to_sort_active_file ();
76 if (!procedure (NULL, NULL))
79 in = proc_capture_output ();
80 out = sort_execute (casefile_get_destructive_reader (in), criteria);
84 proc_set_source (storage_source_create (out));
88 /* Data passed to sort_to_casefile_callback(). */
89 struct sort_to_casefile_cb_data
91 const struct sort_criteria *criteria;
92 struct casefile *output;
95 /* Sorts casefile CF according to the criteria in CB_DATA. */
97 sort_to_casefile_callback (const struct casefile *cf, void *cb_data_)
99 struct sort_to_casefile_cb_data *cb_data = cb_data_;
100 cb_data->output = sort_execute (casefile_get_reader (cf), cb_data->criteria);
101 return cb_data->output != NULL;
104 /* Sorts the active file to a separate casefile. If successful,
105 returns the sorted casefile. Returns a null pointer on
108 sort_active_file_to_casefile (const struct sort_criteria *criteria)
110 struct sort_to_casefile_cb_data cb_data;
112 prepare_to_sort_active_file ();
114 cb_data.criteria = criteria;
115 cb_data.output = NULL;
116 if (!multipass_procedure (sort_to_casefile_callback, &cb_data))
118 casefile_destroy (cb_data.output);
121 return cb_data.output;
125 /* Reads all the cases from READER, which is destroyed. Sorts
126 the cases according to CRITERIA. Returns the sorted cases in
127 a newly created casefile. */
129 sort_execute (struct casereader *reader, const struct sort_criteria *criteria)
131 struct casefile *output = do_internal_sort (reader, criteria);
133 output = do_external_sort (reader, criteria);
134 casereader_destroy (reader);
138 /* A case and its index. */
141 struct ccase c; /* Case. */
142 unsigned long idx; /* Index to allow for stable sorting. */
145 static int compare_indexed_cases (const void *, const void *, void *);
147 /* If the data is in memory, do an internal sort and return a new
148 casefile for the data. Otherwise, return a null pointer. */
149 static struct casefile *
150 do_internal_sort (struct casereader *reader,
151 const struct sort_criteria *criteria)
153 const struct casefile *src;
154 struct casefile *dst;
155 unsigned long case_cnt;
157 if (!allow_internal_sort)
160 src = casereader_get_casefile (reader);
161 if (casefile_get_case_cnt (src) > 1 && !casefile_in_core (src))
164 case_cnt = casefile_get_case_cnt (src);
165 dst = casefile_create (casefile_get_value_cnt (src));
168 struct indexed_case *cases = nmalloc (sizeof *cases, case_cnt);
173 for (i = 0; i < case_cnt; i++)
175 bool ok = casereader_read_xfer (reader, &cases[i].c);
181 sort (cases, case_cnt, sizeof *cases, compare_indexed_cases,
184 for (i = 0; i < case_cnt; i++)
185 casefile_append_xfer (dst, &cases[i].c);
186 if (casefile_error (dst))
194 casefile_destroy (dst);
202 /* Compares the variables specified by CRITERIA between the cases
203 at A and B, with a "last resort" comparison for stability, and
204 returns a strcmp()-type result. */
206 compare_indexed_cases (const void *a_, const void *b_, void *criteria_)
208 struct sort_criteria *criteria = criteria_;
209 const struct indexed_case *a = a_;
210 const struct indexed_case *b = b_;
211 int result = compare_record (&a->c, &b->c, criteria);
213 result = a->idx < b->idx ? -1 : a->idx > b->idx;
219 /* Maximum order of merge (external sort only). The maximum
220 reasonable value is about 7. Above that, it would be a good
221 idea to use a heap in merge_once() to select the minimum. */
222 #define MAX_MERGE_ORDER 7
224 /* Results of an external sort. */
227 const struct sort_criteria *criteria; /* Sort criteria. */
228 size_t value_cnt; /* Size of data in `union value's. */
229 struct casefile **runs; /* Array of initial runs. */
230 size_t run_cnt, run_cap; /* Number of runs, allocated capacity. */
233 /* Prototypes for helper functions. */
234 static int write_runs (struct external_sort *, struct casereader *);
235 static struct casefile *merge (struct external_sort *);
236 static void destroy_external_sort (struct external_sort *);
238 /* Performs a stable external sort of the active file according
239 to the specification in SCP. Forms initial runs using a heap
240 as a reservoir. Merges the initial runs according to a
241 pattern that assures stability. */
242 static struct casefile *
243 do_external_sort (struct casereader *reader,
244 const struct sort_criteria *criteria)
246 struct external_sort *xsrt;
248 if (!casefile_to_disk (casereader_get_casefile (reader)))
251 xsrt = xmalloc (sizeof *xsrt);
252 xsrt->criteria = criteria;
253 xsrt->value_cnt = casefile_get_value_cnt (casereader_get_casefile (reader));
256 xsrt->runs = xnmalloc (xsrt->run_cap, sizeof *xsrt->runs);
257 if (write_runs (xsrt, reader))
259 struct casefile *output = merge (xsrt);
260 destroy_external_sort (xsrt);
265 destroy_external_sort (xsrt);
272 destroy_external_sort (struct external_sort *xsrt)
278 for (i = 0; i < xsrt->run_cnt; i++)
279 casefile_destroy (xsrt->runs[i]);
285 /* Replacement selection. */
287 /* Pairs a record with a run number. */
290 int run; /* Run number of case. */
291 struct ccase record; /* Case data. */
292 size_t idx; /* Case number (for stability). */
295 /* Represents a set of initial runs during an external sort. */
296 struct initial_run_state
298 struct external_sort *xsrt;
301 struct record_run *records; /* Records arranged as a heap. */
302 size_t record_cnt; /* Current number of records. */
303 size_t record_cap; /* Capacity for records. */
305 /* Run currently being output. */
306 int run; /* Run number. */
307 size_t case_cnt; /* Number of cases so far. */
308 struct casefile *casefile; /* Output file. */
309 struct ccase last_output; /* Record last output. */
311 int okay; /* Zero if an error has been encountered. */
314 static bool destroy_initial_run_state (struct initial_run_state *);
315 static void process_case (struct initial_run_state *, const struct ccase *,
317 static int allocate_cases (struct initial_run_state *);
318 static void output_record (struct initial_run_state *);
319 static void start_run (struct initial_run_state *);
320 static void end_run (struct initial_run_state *);
321 static int compare_record_run (const struct record_run *,
322 const struct record_run *,
323 struct initial_run_state *);
324 static int compare_record_run_minheap (const void *, const void *, void *);
326 /* Reads cases from READER and composes initial runs in XSRT. */
328 write_runs (struct external_sort *xsrt, struct casereader *reader)
330 struct initial_run_state *irs;
335 /* Allocate memory for cases. */
336 irs = xmalloc (sizeof *irs);
339 irs->record_cnt = irs->record_cap = 0;
342 irs->casefile = NULL;
343 case_nullify (&irs->last_output);
345 if (!allocate_cases (irs))
348 /* Create initial runs. */
350 for (; irs->okay && casereader_read (reader, &c); case_destroy (&c))
351 process_case (irs, &c, idx++);
352 while (irs->okay && irs->record_cnt > 0)
359 if (!destroy_initial_run_state (irs))
365 /* Add a single case to an initial run. */
367 process_case (struct initial_run_state *irs, const struct ccase *c, size_t idx)
369 struct record_run *rr;
371 /* Compose record_run for this run and add to heap. */
372 assert (irs->record_cnt < irs->record_cap - 1);
373 rr = irs->records + irs->record_cnt++;
374 case_copy (&rr->record, 0, c, 0, irs->xsrt->value_cnt);
377 if (!case_is_null (&irs->last_output)
378 && compare_record (c, &irs->last_output, irs->xsrt->criteria) < 0)
379 rr->run = irs->run + 1;
380 push_heap (irs->records, irs->record_cnt, sizeof *irs->records,
381 compare_record_run_minheap, irs);
383 /* Output a record if the reservoir is full. */
384 if (irs->record_cnt == irs->record_cap - 1 && irs->okay)
388 /* Destroys the initial run state represented by IRS.
389 Returns true if successful, false if an I/O error occurred. */
391 destroy_initial_run_state (struct initial_run_state *irs)
399 for (i = 0; i < irs->record_cap; i++)
400 case_destroy (&irs->records[i].record);
403 if (irs->casefile != NULL)
404 ok = casefile_sleep (irs->casefile);
410 /* Allocates room for lots of cases as a buffer. */
412 allocate_cases (struct initial_run_state *irs)
414 int approx_case_cost; /* Approximate memory cost of one case in bytes. */
415 int max_cases; /* Maximum number of cases to allocate. */
418 /* Allocate as many cases as we can within the workspace
420 approx_case_cost = (sizeof *irs->records
421 + irs->xsrt->value_cnt * sizeof (union value)
422 + 4 * sizeof (void *));
423 max_cases = get_workspace() / approx_case_cost;
424 if (max_cases > max_buffers)
425 max_cases = max_buffers;
426 irs->records = nmalloc (sizeof *irs->records, max_cases);
427 if (irs->records != NULL)
428 for (i = 0; i < max_cases; i++)
429 if (!case_try_create (&irs->records[i].record, irs->xsrt->value_cnt))
434 irs->record_cap = max_cases;
436 /* Fail if we didn't allocate an acceptable number of cases. */
437 if (irs->records == NULL || max_cases < min_buffers)
439 msg (SE, _("Out of memory. Could not allocate room for minimum of %d "
440 "cases of %d bytes each. (PSPP workspace is currently "
441 "restricted to a maximum of %d KB.)"),
442 min_buffers, approx_case_cost, get_workspace() / 1024);
448 /* Compares the VAR_CNT variables in VARS[] between the `value's at
449 A and B, and returns a strcmp()-type result. */
451 compare_record (const struct ccase *a, const struct ccase *b,
452 const struct sort_criteria *criteria)
459 for (i = 0; i < criteria->crit_cnt; i++)
461 const struct sort_criterion *c = &criteria->crits[i];
466 double af = case_num (a, c->fv);
467 double bf = case_num (b, c->fv);
469 result = af < bf ? -1 : af > bf;
472 result = memcmp (case_str (a, c->fv), case_str (b, c->fv), c->width);
475 return c->dir == SRT_ASCEND ? result : -result;
481 /* Compares record-run tuples A and B on run number first, then
482 on record, then on case index. */
484 compare_record_run (const struct record_run *a,
485 const struct record_run *b,
486 struct initial_run_state *irs)
488 int result = a->run < b->run ? -1 : a->run > b->run;
490 result = compare_record (&a->record, &b->record, irs->xsrt->criteria);
492 result = a->idx < b->idx ? -1 : a->idx > b->idx;
496 /* Compares record-run tuples A and B on run number first, then
497 on the current record according to SCP, but in descending
500 compare_record_run_minheap (const void *a, const void *b, void *irs)
502 return -compare_record_run (a, b, irs);
505 /* Begins a new initial run, specifically its output file. */
507 start_run (struct initial_run_state *irs)
511 irs->casefile = casefile_create (irs->xsrt->value_cnt);
512 casefile_to_disk (irs->casefile);
513 case_nullify (&irs->last_output);
516 /* Ends the current initial run. */
518 end_run (struct initial_run_state *irs)
520 struct external_sort *xsrt = irs->xsrt;
522 /* Record initial run. */
523 if (irs->casefile != NULL)
525 casefile_sleep (irs->casefile);
526 if (xsrt->run_cnt >= xsrt->run_cap)
529 xsrt->runs = xnrealloc (xsrt->runs,
530 xsrt->run_cap, sizeof *xsrt->runs);
532 xsrt->runs[xsrt->run_cnt++] = irs->casefile;
533 if (casefile_error (irs->casefile))
535 irs->casefile = NULL;
539 /* Writes a record to the current initial run. */
541 output_record (struct initial_run_state *irs)
543 struct record_run *record_run;
544 struct ccase case_tmp;
546 /* Extract minimum case from heap. */
547 assert (irs->record_cnt > 0);
548 pop_heap (irs->records, irs->record_cnt--, sizeof *irs->records,
549 compare_record_run_minheap, irs);
550 record_run = irs->records + irs->record_cnt;
552 /* Bail if an error has occurred. */
556 /* Start new run if necessary. */
557 assert (record_run->run == irs->run
558 || record_run->run == irs->run + 1);
559 if (record_run->run != irs->run)
564 assert (record_run->run == irs->run);
568 if (irs->casefile != NULL)
569 casefile_append (irs->casefile, &record_run->record);
571 /* This record becomes last_output. */
572 irs->last_output = case_tmp = record_run->record;
573 record_run->record = irs->records[irs->record_cap - 1].record;
574 irs->records[irs->record_cap - 1].record = case_tmp;
579 static int choose_merge (struct casefile *runs[], int run_cnt, int order);
580 static struct casefile *merge_once (struct external_sort *,
581 struct casefile *[], size_t);
583 /* Repeatedly merges run until only one is left,
584 and returns the final casefile.
585 Returns a null pointer if an I/O error occurs. */
586 static struct casefile *
587 merge (struct external_sort *xsrt)
589 while (xsrt->run_cnt > 1)
591 int order = min (MAX_MERGE_ORDER, xsrt->run_cnt);
592 int idx = choose_merge (xsrt->runs, xsrt->run_cnt, order);
593 xsrt->runs[idx] = merge_once (xsrt, xsrt->runs + idx, order);
594 remove_range (xsrt->runs, xsrt->run_cnt, sizeof *xsrt->runs,
596 xsrt->run_cnt -= order - 1;
598 if (xsrt->runs[idx] == NULL)
601 assert (xsrt->run_cnt == 1);
603 return xsrt->runs[0];
606 /* Chooses ORDER runs out of the RUN_CNT runs in RUNS to merge,
607 and returns the index of the first one.
609 For stability, we must merge only consecutive runs. For
610 efficiency, we choose the shortest consecutive sequence of
613 choose_merge (struct casefile *runs[], int run_cnt, int order)
615 int min_idx, min_sum;
616 int cur_idx, cur_sum;
619 /* Sum up the length of the first ORDER runs. */
621 for (i = 0; i < order; i++)
622 cur_sum += casefile_get_case_cnt (runs[i]);
624 /* Find the shortest group of ORDER runs,
625 using a running total for efficiency. */
628 for (cur_idx = 1; cur_idx + order <= run_cnt; cur_idx++)
630 cur_sum -= casefile_get_case_cnt (runs[cur_idx - 1]);
631 cur_sum += casefile_get_case_cnt (runs[cur_idx + order - 1]);
632 if (cur_sum < min_sum)
642 /* Merges the RUN_CNT initial runs specified in INPUT_FILES into a
643 new run, and returns the new run.
644 Returns a null pointer if an I/O error occurs. */
645 static struct casefile *
646 merge_once (struct external_sort *xsrt,
647 struct casefile **const input_files,
652 struct casefile *file;
653 struct casereader *reader;
658 struct casefile *output = NULL;
661 /* Open input files. */
662 runs = xnmalloc (run_cnt, sizeof *runs);
663 for (i = 0; i < run_cnt; i++)
665 struct run *r = &runs[i];
666 r->file = input_files[i];
667 r->reader = casefile_get_destructive_reader (r->file);
668 if (!casereader_read_xfer (r->reader, &r->ccase))
675 /* Create output file. */
676 output = casefile_create (xsrt->value_cnt);
677 casefile_to_disk (output);
682 struct run *min_run, *run;
686 for (run = runs + 1; run < runs + run_cnt; run++)
687 if (compare_record (&run->ccase, &min_run->ccase, xsrt->criteria) < 0)
690 /* Write minimum to output file. */
691 casefile_append_xfer (output, &min_run->ccase);
693 /* Read another case from minimum run. */
694 if (!casereader_read_xfer (min_run->reader, &min_run->ccase))
696 if (casefile_error (min_run->file) || casefile_error (output))
698 casereader_destroy (min_run->reader);
699 casefile_destroy (min_run->file);
701 remove_element (runs, run_cnt, sizeof *runs, min_run - runs);
706 if (!casefile_sleep (output))
713 for (i = 0; i < run_cnt; i++)
714 casefile_destroy (runs[i].file);
715 casefile_destroy (output);