1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
30 #include <data/case-source.h>
31 #include <data/case.h>
32 #include <data/casefile.h>
33 #include <data/procedure.h>
34 #include <data/settings.h>
35 #include <data/variable.h>
36 #include <data/storage-stream.h>
37 #include <language/expressions/public.h>
38 #include <libpspp/alloc.h>
39 #include <libpspp/array.h>
40 #include <libpspp/message.h>
41 #include <libpspp/message.h>
42 #include <libpspp/misc.h>
43 #include <libpspp/str.h>
46 #define _(msgid) gettext (msgid)
48 /* These should only be changed for testing purposes. */
50 int max_buffers = INT_MAX;
51 bool allow_internal_sort = true;
53 static int compare_record (const struct ccase *, const struct ccase *,
54 const struct sort_criteria *);
55 static struct casefile *do_internal_sort (struct casereader *,
56 const struct sort_criteria *);
57 static struct casefile *do_external_sort (struct casereader *,
58 const struct sort_criteria *);
60 /* Get ready to sort the active file. */
62 prepare_to_sort_active_file (void)
64 proc_cancel_temporary_transformations ();
67 /* Sorts the active file in-place according to CRITERIA.
68 Returns nonzero if successful. */
70 sort_active_file_in_place (const struct sort_criteria *criteria)
72 struct casefile *in, *out;
74 prepare_to_sort_active_file ();
75 if (!procedure (NULL, NULL))
78 in = proc_capture_output ();
79 out = sort_execute (casefile_get_destructive_reader (in), criteria);
83 proc_set_source (storage_source_create (out));
87 /* Data passed to sort_to_casefile_callback(). */
88 struct sort_to_casefile_cb_data
90 const struct sort_criteria *criteria;
91 struct casefile *output;
94 /* Sorts casefile CF according to the criteria in CB_DATA. */
96 sort_to_casefile_callback (const struct casefile *cf, void *cb_data_)
98 struct sort_to_casefile_cb_data *cb_data = cb_data_;
99 cb_data->output = sort_execute (casefile_get_reader (cf), cb_data->criteria);
100 return cb_data->output != NULL;
103 /* Sorts the active file to a separate casefile. If successful,
104 returns the sorted casefile. Returns a null pointer on
107 sort_active_file_to_casefile (const struct sort_criteria *criteria)
109 struct sort_to_casefile_cb_data cb_data;
111 prepare_to_sort_active_file ();
113 cb_data.criteria = criteria;
114 cb_data.output = NULL;
115 multipass_procedure (sort_to_casefile_callback, &cb_data);
117 return cb_data.output;
121 /* Reads all the cases from READER, which is destroyed. Sorts
122 the cases according to CRITERIA. Returns the sorted cases in
123 a newly created casefile. */
125 sort_execute (struct casereader *reader, const struct sort_criteria *criteria)
127 struct casefile *output = do_internal_sort (reader, criteria);
129 output = do_external_sort (reader, criteria);
130 casereader_destroy (reader);
134 /* A case and its index. */
137 struct ccase c; /* Case. */
138 unsigned long idx; /* Index to allow for stable sorting. */
141 static int compare_indexed_cases (const void *, const void *, void *);
143 /* If the data is in memory, do an internal sort and return a new
144 casefile for the data. Otherwise, return a null pointer. */
145 static struct casefile *
146 do_internal_sort (struct casereader *reader,
147 const struct sort_criteria *criteria)
149 const struct casefile *src;
150 struct casefile *dst;
151 unsigned long case_cnt;
153 if (!allow_internal_sort)
156 src = casereader_get_casefile (reader);
157 if (casefile_get_case_cnt (src) > 1 && !casefile_in_core (src))
160 case_cnt = casefile_get_case_cnt (src);
161 dst = casefile_create (casefile_get_value_cnt (src));
164 struct indexed_case *cases = nmalloc (sizeof *cases, case_cnt);
169 for (i = 0; i < case_cnt; i++)
171 bool ok = casereader_read_xfer (reader, &cases[i].c);
177 sort (cases, case_cnt, sizeof *cases, compare_indexed_cases,
180 for (i = 0; i < case_cnt; i++)
181 casefile_append_xfer (dst, &cases[i].c);
182 if (casefile_error (dst))
190 casefile_destroy (dst);
198 /* Compares the variables specified by CRITERIA between the cases
199 at A and B, with a "last resort" comparison for stability, and
200 returns a strcmp()-type result. */
202 compare_indexed_cases (const void *a_, const void *b_, void *criteria_)
204 struct sort_criteria *criteria = criteria_;
205 const struct indexed_case *a = a_;
206 const struct indexed_case *b = b_;
207 int result = compare_record (&a->c, &b->c, criteria);
209 result = a->idx < b->idx ? -1 : a->idx > b->idx;
215 /* Maximum order of merge (external sort only). The maximum
216 reasonable value is about 7. Above that, it would be a good
217 idea to use a heap in merge_once() to select the minimum. */
218 #define MAX_MERGE_ORDER 7
220 /* Results of an external sort. */
223 const struct sort_criteria *criteria; /* Sort criteria. */
224 size_t value_cnt; /* Size of data in `union value's. */
225 struct casefile **runs; /* Array of initial runs. */
226 size_t run_cnt, run_cap; /* Number of runs, allocated capacity. */
229 /* Prototypes for helper functions. */
230 static int write_runs (struct external_sort *, struct casereader *);
231 static struct casefile *merge (struct external_sort *);
232 static void destroy_external_sort (struct external_sort *);
234 /* Performs a stable external sort of the active file according
235 to the specification in SCP. Forms initial runs using a heap
236 as a reservoir. Merges the initial runs according to a
237 pattern that assures stability. */
238 static struct casefile *
239 do_external_sort (struct casereader *reader,
240 const struct sort_criteria *criteria)
242 struct external_sort *xsrt;
244 if (!casefile_to_disk (casereader_get_casefile (reader)))
247 xsrt = xmalloc (sizeof *xsrt);
248 xsrt->criteria = criteria;
249 xsrt->value_cnt = casefile_get_value_cnt (casereader_get_casefile (reader));
252 xsrt->runs = xnmalloc (xsrt->run_cap, sizeof *xsrt->runs);
253 if (write_runs (xsrt, reader))
255 struct casefile *output = merge (xsrt);
256 destroy_external_sort (xsrt);
261 destroy_external_sort (xsrt);
268 destroy_external_sort (struct external_sort *xsrt)
274 for (i = 0; i < xsrt->run_cnt; i++)
275 casefile_destroy (xsrt->runs[i]);
281 /* Replacement selection. */
283 /* Pairs a record with a run number. */
286 int run; /* Run number of case. */
287 struct ccase record; /* Case data. */
288 size_t idx; /* Case number (for stability). */
291 /* Represents a set of initial runs during an external sort. */
292 struct initial_run_state
294 struct external_sort *xsrt;
297 struct record_run *records; /* Records arranged as a heap. */
298 size_t record_cnt; /* Current number of records. */
299 size_t record_cap; /* Capacity for records. */
301 /* Run currently being output. */
302 int run; /* Run number. */
303 size_t case_cnt; /* Number of cases so far. */
304 struct casefile *casefile; /* Output file. */
305 struct ccase last_output; /* Record last output. */
307 int okay; /* Zero if an error has been encountered. */
310 static bool destroy_initial_run_state (struct initial_run_state *);
311 static void process_case (struct initial_run_state *, const struct ccase *,
313 static int allocate_cases (struct initial_run_state *);
314 static void output_record (struct initial_run_state *);
315 static void start_run (struct initial_run_state *);
316 static void end_run (struct initial_run_state *);
317 static int compare_record_run (const struct record_run *,
318 const struct record_run *,
319 struct initial_run_state *);
320 static int compare_record_run_minheap (const void *, const void *, void *);
322 /* Reads cases from READER and composes initial runs in XSRT. */
324 write_runs (struct external_sort *xsrt, struct casereader *reader)
326 struct initial_run_state *irs;
331 /* Allocate memory for cases. */
332 irs = xmalloc (sizeof *irs);
335 irs->record_cnt = irs->record_cap = 0;
338 irs->casefile = NULL;
339 case_nullify (&irs->last_output);
341 if (!allocate_cases (irs))
344 /* Create initial runs. */
346 for (; irs->okay && casereader_read (reader, &c); case_destroy (&c))
347 process_case (irs, &c, idx++);
348 while (irs->okay && irs->record_cnt > 0)
355 if (!destroy_initial_run_state (irs))
361 /* Add a single case to an initial run. */
363 process_case (struct initial_run_state *irs, const struct ccase *c, size_t idx)
365 struct record_run *rr;
367 /* Compose record_run for this run and add to heap. */
368 assert (irs->record_cnt < irs->record_cap - 1);
369 rr = irs->records + irs->record_cnt++;
370 case_copy (&rr->record, 0, c, 0, irs->xsrt->value_cnt);
373 if (!case_is_null (&irs->last_output)
374 && compare_record (c, &irs->last_output, irs->xsrt->criteria) < 0)
375 rr->run = irs->run + 1;
376 push_heap (irs->records, irs->record_cnt, sizeof *irs->records,
377 compare_record_run_minheap, irs);
379 /* Output a record if the reservoir is full. */
380 if (irs->record_cnt == irs->record_cap - 1 && irs->okay)
384 /* Destroys the initial run state represented by IRS.
385 Returns true if successful, false if an I/O error occurred. */
387 destroy_initial_run_state (struct initial_run_state *irs)
395 for (i = 0; i < irs->record_cap; i++)
396 case_destroy (&irs->records[i].record);
399 if (irs->casefile != NULL)
400 ok = casefile_sleep (irs->casefile);
406 /* Allocates room for lots of cases as a buffer. */
408 allocate_cases (struct initial_run_state *irs)
410 int approx_case_cost; /* Approximate memory cost of one case in bytes. */
411 int max_cases; /* Maximum number of cases to allocate. */
414 /* Allocate as many cases as we can within the workspace
416 approx_case_cost = (sizeof *irs->records
417 + irs->xsrt->value_cnt * sizeof (union value)
418 + 4 * sizeof (void *));
419 max_cases = get_workspace() / approx_case_cost;
420 if (max_cases > max_buffers)
421 max_cases = max_buffers;
422 irs->records = nmalloc (sizeof *irs->records, max_cases);
423 if (irs->records != NULL)
424 for (i = 0; i < max_cases; i++)
425 if (!case_try_create (&irs->records[i].record, irs->xsrt->value_cnt))
430 irs->record_cap = max_cases;
432 /* Fail if we didn't allocate an acceptable number of cases. */
433 if (irs->records == NULL || max_cases < min_buffers)
435 msg (SE, _("Out of memory. Could not allocate room for minimum of %d "
436 "cases of %d bytes each. (PSPP workspace is currently "
437 "restricted to a maximum of %d KB.)"),
438 min_buffers, approx_case_cost, get_workspace() / 1024);
444 /* Compares the VAR_CNT variables in VARS[] between the `value's at
445 A and B, and returns a strcmp()-type result. */
447 compare_record (const struct ccase *a, const struct ccase *b,
448 const struct sort_criteria *criteria)
455 for (i = 0; i < criteria->crit_cnt; i++)
457 const struct sort_criterion *c = &criteria->crits[i];
462 double af = case_num (a, c->fv);
463 double bf = case_num (b, c->fv);
465 result = af < bf ? -1 : af > bf;
468 result = memcmp (case_str (a, c->fv), case_str (b, c->fv), c->width);
471 return c->dir == SRT_ASCEND ? result : -result;
477 /* Compares record-run tuples A and B on run number first, then
478 on record, then on case index. */
480 compare_record_run (const struct record_run *a,
481 const struct record_run *b,
482 struct initial_run_state *irs)
484 int result = a->run < b->run ? -1 : a->run > b->run;
486 result = compare_record (&a->record, &b->record, irs->xsrt->criteria);
488 result = a->idx < b->idx ? -1 : a->idx > b->idx;
492 /* Compares record-run tuples A and B on run number first, then
493 on the current record according to SCP, but in descending
496 compare_record_run_minheap (const void *a, const void *b, void *irs)
498 return -compare_record_run (a, b, irs);
501 /* Begins a new initial run, specifically its output file. */
503 start_run (struct initial_run_state *irs)
507 irs->casefile = casefile_create (irs->xsrt->value_cnt);
508 casefile_to_disk (irs->casefile);
509 case_nullify (&irs->last_output);
512 /* Ends the current initial run. */
514 end_run (struct initial_run_state *irs)
516 struct external_sort *xsrt = irs->xsrt;
518 /* Record initial run. */
519 if (irs->casefile != NULL)
521 casefile_sleep (irs->casefile);
522 if (xsrt->run_cnt >= xsrt->run_cap)
525 xsrt->runs = xnrealloc (xsrt->runs,
526 xsrt->run_cap, sizeof *xsrt->runs);
528 xsrt->runs[xsrt->run_cnt++] = irs->casefile;
529 if (casefile_error (irs->casefile))
531 irs->casefile = NULL;
535 /* Writes a record to the current initial run. */
537 output_record (struct initial_run_state *irs)
539 struct record_run *record_run;
540 struct ccase case_tmp;
542 /* Extract minimum case from heap. */
543 assert (irs->record_cnt > 0);
544 pop_heap (irs->records, irs->record_cnt--, sizeof *irs->records,
545 compare_record_run_minheap, irs);
546 record_run = irs->records + irs->record_cnt;
548 /* Bail if an error has occurred. */
552 /* Start new run if necessary. */
553 assert (record_run->run == irs->run
554 || record_run->run == irs->run + 1);
555 if (record_run->run != irs->run)
560 assert (record_run->run == irs->run);
564 if (irs->casefile != NULL)
565 casefile_append (irs->casefile, &record_run->record);
567 /* This record becomes last_output. */
568 irs->last_output = case_tmp = record_run->record;
569 record_run->record = irs->records[irs->record_cap - 1].record;
570 irs->records[irs->record_cap - 1].record = case_tmp;
575 static int choose_merge (struct casefile *runs[], int run_cnt, int order);
576 static struct casefile *merge_once (struct external_sort *,
577 struct casefile *[], size_t);
579 /* Repeatedly merges run until only one is left,
580 and returns the final casefile.
581 Returns a null pointer if an I/O error occurs. */
582 static struct casefile *
583 merge (struct external_sort *xsrt)
585 while (xsrt->run_cnt > 1)
587 int order = min (MAX_MERGE_ORDER, xsrt->run_cnt);
588 int idx = choose_merge (xsrt->runs, xsrt->run_cnt, order);
589 xsrt->runs[idx] = merge_once (xsrt, xsrt->runs + idx, order);
590 remove_range (xsrt->runs, xsrt->run_cnt, sizeof *xsrt->runs,
592 xsrt->run_cnt -= order - 1;
594 if (xsrt->runs[idx] == NULL)
597 assert (xsrt->run_cnt == 1);
599 return xsrt->runs[0];
602 /* Chooses ORDER runs out of the RUN_CNT runs in RUNS to merge,
603 and returns the index of the first one.
605 For stability, we must merge only consecutive runs. For
606 efficiency, we choose the shortest consecutive sequence of
609 choose_merge (struct casefile *runs[], int run_cnt, int order)
611 int min_idx, min_sum;
612 int cur_idx, cur_sum;
615 /* Sum up the length of the first ORDER runs. */
617 for (i = 0; i < order; i++)
618 cur_sum += casefile_get_case_cnt (runs[i]);
620 /* Find the shortest group of ORDER runs,
621 using a running total for efficiency. */
624 for (cur_idx = 1; cur_idx + order <= run_cnt; cur_idx++)
626 cur_sum -= casefile_get_case_cnt (runs[cur_idx - 1]);
627 cur_sum += casefile_get_case_cnt (runs[cur_idx + order - 1]);
628 if (cur_sum < min_sum)
638 /* Merges the RUN_CNT initial runs specified in INPUT_FILES into a
639 new run, and returns the new run.
640 Returns a null pointer if an I/O error occurs. */
641 static struct casefile *
642 merge_once (struct external_sort *xsrt,
643 struct casefile **const input_files,
648 struct casefile *file;
649 struct casereader *reader;
654 struct casefile *output = NULL;
657 /* Open input files. */
658 runs = xnmalloc (run_cnt, sizeof *runs);
659 for (i = 0; i < run_cnt; i++)
661 struct run *r = &runs[i];
662 r->file = input_files[i];
663 r->reader = casefile_get_destructive_reader (r->file);
664 if (!casereader_read_xfer (r->reader, &r->ccase))
671 /* Create output file. */
672 output = casefile_create (xsrt->value_cnt);
673 casefile_to_disk (output);
678 struct run *min_run, *run;
682 for (run = runs + 1; run < runs + run_cnt; run++)
683 if (compare_record (&run->ccase, &min_run->ccase, xsrt->criteria) < 0)
686 /* Write minimum to output file. */
687 casefile_append_xfer (output, &min_run->ccase);
689 /* Read another case from minimum run. */
690 if (!casereader_read_xfer (min_run->reader, &min_run->ccase))
692 if (casefile_error (min_run->file) || casefile_error (output))
694 casereader_destroy (min_run->reader);
695 casefile_destroy (min_run->file);
697 remove_element (runs, run_cnt, sizeof *runs, min_run - runs);
702 if (!casefile_sleep (output))
709 for (i = 0; i < run_cnt; i++)
710 casefile_destroy (runs[i].file);
711 casefile_destroy (output);