1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
30 #include <data/case-source.h>
31 #include <data/case.h>
32 #include <data/casefile.h>
33 #include <data/procedure.h>
34 #include <data/settings.h>
35 #include <data/variable.h>
36 #include <data/storage-stream.h>
37 #include <language/expressions/public.h>
38 #include <libpspp/alloc.h>
39 #include <libpspp/array.h>
40 #include <libpspp/message.h>
41 #include <libpspp/message.h>
42 #include <libpspp/misc.h>
43 #include <libpspp/str.h>
46 #define _(msgid) gettext (msgid)
48 /* These should only be changed for testing purposes. */
50 int max_buffers = INT_MAX;
51 bool allow_internal_sort = true;
53 static int compare_record (const struct ccase *, const struct ccase *,
54 const struct sort_criteria *);
55 static struct casefile *do_internal_sort (struct casereader *,
56 const struct sort_criteria *);
57 static struct casefile *do_external_sort (struct casereader *,
58 const struct sort_criteria *);
60 /* Get ready to sort the active file. */
62 prepare_to_sort_active_file (void)
64 proc_cancel_temporary_transformations ();
67 /* Sorts the active file in-place according to CRITERIA.
68 Returns nonzero if successful. */
70 sort_active_file_in_place (const struct sort_criteria *criteria)
72 struct casefile *in, *out;
74 prepare_to_sort_active_file ();
75 if (!procedure (NULL, NULL))
78 in = proc_capture_output ();
79 out = sort_execute (casefile_get_destructive_reader (in), criteria);
83 proc_set_source (storage_source_create (out));
87 /* Data passed to sort_to_casefile_callback(). */
88 struct sort_to_casefile_cb_data
90 const struct sort_criteria *criteria;
91 struct casefile *output;
94 /* Sorts casefile CF according to the criteria in CB_DATA. */
96 sort_to_casefile_callback (const struct casefile *cf, void *cb_data_)
98 struct sort_to_casefile_cb_data *cb_data = cb_data_;
99 cb_data->output = sort_execute (casefile_get_reader (cf), cb_data->criteria);
100 return cb_data->output != NULL;
103 /* Sorts the active file to a separate casefile. If successful,
104 returns the sorted casefile. Returns a null pointer on
107 sort_active_file_to_casefile (const struct sort_criteria *criteria)
109 struct sort_to_casefile_cb_data cb_data;
111 prepare_to_sort_active_file ();
113 cb_data.criteria = criteria;
114 cb_data.output = NULL;
115 if (!multipass_procedure (sort_to_casefile_callback, &cb_data))
117 casefile_destroy (cb_data.output);
120 return cb_data.output;
124 /* Reads all the cases from READER, which is destroyed. Sorts
125 the cases according to CRITERIA. Returns the sorted cases in
126 a newly created casefile. */
128 sort_execute (struct casereader *reader, const struct sort_criteria *criteria)
130 struct casefile *output = do_internal_sort (reader, criteria);
132 output = do_external_sort (reader, criteria);
133 casereader_destroy (reader);
137 /* A case and its index. */
140 struct ccase c; /* Case. */
141 unsigned long idx; /* Index to allow for stable sorting. */
144 static int compare_indexed_cases (const void *, const void *, void *);
146 /* If the data is in memory, do an internal sort and return a new
147 casefile for the data. Otherwise, return a null pointer. */
148 static struct casefile *
149 do_internal_sort (struct casereader *reader,
150 const struct sort_criteria *criteria)
152 const struct casefile *src;
153 struct casefile *dst;
154 unsigned long case_cnt;
156 if (!allow_internal_sort)
159 src = casereader_get_casefile (reader);
160 if (casefile_get_case_cnt (src) > 1 && !casefile_in_core (src))
163 case_cnt = casefile_get_case_cnt (src);
164 dst = casefile_create (casefile_get_value_cnt (src));
167 struct indexed_case *cases = nmalloc (sizeof *cases, case_cnt);
172 for (i = 0; i < case_cnt; i++)
174 bool ok = casereader_read_xfer (reader, &cases[i].c);
180 sort (cases, case_cnt, sizeof *cases, compare_indexed_cases,
183 for (i = 0; i < case_cnt; i++)
184 casefile_append_xfer (dst, &cases[i].c);
185 if (casefile_error (dst))
193 casefile_destroy (dst);
201 /* Compares the variables specified by CRITERIA between the cases
202 at A and B, with a "last resort" comparison for stability, and
203 returns a strcmp()-type result. */
205 compare_indexed_cases (const void *a_, const void *b_, void *criteria_)
207 struct sort_criteria *criteria = criteria_;
208 const struct indexed_case *a = a_;
209 const struct indexed_case *b = b_;
210 int result = compare_record (&a->c, &b->c, criteria);
212 result = a->idx < b->idx ? -1 : a->idx > b->idx;
218 /* Maximum order of merge (external sort only). The maximum
219 reasonable value is about 7. Above that, it would be a good
220 idea to use a heap in merge_once() to select the minimum. */
221 #define MAX_MERGE_ORDER 7
223 /* Results of an external sort. */
226 const struct sort_criteria *criteria; /* Sort criteria. */
227 size_t value_cnt; /* Size of data in `union value's. */
228 struct casefile **runs; /* Array of initial runs. */
229 size_t run_cnt, run_cap; /* Number of runs, allocated capacity. */
232 /* Prototypes for helper functions. */
233 static int write_runs (struct external_sort *, struct casereader *);
234 static struct casefile *merge (struct external_sort *);
235 static void destroy_external_sort (struct external_sort *);
237 /* Performs a stable external sort of the active file according
238 to the specification in SCP. Forms initial runs using a heap
239 as a reservoir. Merges the initial runs according to a
240 pattern that assures stability. */
241 static struct casefile *
242 do_external_sort (struct casereader *reader,
243 const struct sort_criteria *criteria)
245 struct external_sort *xsrt;
247 if (!casefile_to_disk (casereader_get_casefile (reader)))
250 xsrt = xmalloc (sizeof *xsrt);
251 xsrt->criteria = criteria;
252 xsrt->value_cnt = casefile_get_value_cnt (casereader_get_casefile (reader));
255 xsrt->runs = xnmalloc (xsrt->run_cap, sizeof *xsrt->runs);
256 if (write_runs (xsrt, reader))
258 struct casefile *output = merge (xsrt);
259 destroy_external_sort (xsrt);
264 destroy_external_sort (xsrt);
271 destroy_external_sort (struct external_sort *xsrt)
277 for (i = 0; i < xsrt->run_cnt; i++)
278 casefile_destroy (xsrt->runs[i]);
284 /* Replacement selection. */
286 /* Pairs a record with a run number. */
289 int run; /* Run number of case. */
290 struct ccase record; /* Case data. */
291 size_t idx; /* Case number (for stability). */
294 /* Represents a set of initial runs during an external sort. */
295 struct initial_run_state
297 struct external_sort *xsrt;
300 struct record_run *records; /* Records arranged as a heap. */
301 size_t record_cnt; /* Current number of records. */
302 size_t record_cap; /* Capacity for records. */
304 /* Run currently being output. */
305 int run; /* Run number. */
306 size_t case_cnt; /* Number of cases so far. */
307 struct casefile *casefile; /* Output file. */
308 struct ccase last_output; /* Record last output. */
310 int okay; /* Zero if an error has been encountered. */
313 static bool destroy_initial_run_state (struct initial_run_state *);
314 static void process_case (struct initial_run_state *, const struct ccase *,
316 static int allocate_cases (struct initial_run_state *);
317 static void output_record (struct initial_run_state *);
318 static void start_run (struct initial_run_state *);
319 static void end_run (struct initial_run_state *);
320 static int compare_record_run (const struct record_run *,
321 const struct record_run *,
322 struct initial_run_state *);
323 static int compare_record_run_minheap (const void *, const void *, void *);
325 /* Reads cases from READER and composes initial runs in XSRT. */
327 write_runs (struct external_sort *xsrt, struct casereader *reader)
329 struct initial_run_state *irs;
334 /* Allocate memory for cases. */
335 irs = xmalloc (sizeof *irs);
338 irs->record_cnt = irs->record_cap = 0;
341 irs->casefile = NULL;
342 case_nullify (&irs->last_output);
344 if (!allocate_cases (irs))
347 /* Create initial runs. */
349 for (; irs->okay && casereader_read (reader, &c); case_destroy (&c))
350 process_case (irs, &c, idx++);
351 while (irs->okay && irs->record_cnt > 0)
358 if (!destroy_initial_run_state (irs))
364 /* Add a single case to an initial run. */
366 process_case (struct initial_run_state *irs, const struct ccase *c, size_t idx)
368 struct record_run *rr;
370 /* Compose record_run for this run and add to heap. */
371 assert (irs->record_cnt < irs->record_cap - 1);
372 rr = irs->records + irs->record_cnt++;
373 case_copy (&rr->record, 0, c, 0, irs->xsrt->value_cnt);
376 if (!case_is_null (&irs->last_output)
377 && compare_record (c, &irs->last_output, irs->xsrt->criteria) < 0)
378 rr->run = irs->run + 1;
379 push_heap (irs->records, irs->record_cnt, sizeof *irs->records,
380 compare_record_run_minheap, irs);
382 /* Output a record if the reservoir is full. */
383 if (irs->record_cnt == irs->record_cap - 1 && irs->okay)
387 /* Destroys the initial run state represented by IRS.
388 Returns true if successful, false if an I/O error occurred. */
390 destroy_initial_run_state (struct initial_run_state *irs)
398 for (i = 0; i < irs->record_cap; i++)
399 case_destroy (&irs->records[i].record);
402 if (irs->casefile != NULL)
403 ok = casefile_sleep (irs->casefile);
409 /* Allocates room for lots of cases as a buffer. */
411 allocate_cases (struct initial_run_state *irs)
413 int approx_case_cost; /* Approximate memory cost of one case in bytes. */
414 int max_cases; /* Maximum number of cases to allocate. */
417 /* Allocate as many cases as we can within the workspace
419 approx_case_cost = (sizeof *irs->records
420 + irs->xsrt->value_cnt * sizeof (union value)
421 + 4 * sizeof (void *));
422 max_cases = get_workspace() / approx_case_cost;
423 if (max_cases > max_buffers)
424 max_cases = max_buffers;
425 irs->records = nmalloc (sizeof *irs->records, max_cases);
426 if (irs->records != NULL)
427 for (i = 0; i < max_cases; i++)
428 if (!case_try_create (&irs->records[i].record, irs->xsrt->value_cnt))
433 irs->record_cap = max_cases;
435 /* Fail if we didn't allocate an acceptable number of cases. */
436 if (irs->records == NULL || max_cases < min_buffers)
438 msg (SE, _("Out of memory. Could not allocate room for minimum of %d "
439 "cases of %d bytes each. (PSPP workspace is currently "
440 "restricted to a maximum of %d KB.)"),
441 min_buffers, approx_case_cost, get_workspace() / 1024);
447 /* Compares the VAR_CNT variables in VARS[] between the `value's at
448 A and B, and returns a strcmp()-type result. */
450 compare_record (const struct ccase *a, const struct ccase *b,
451 const struct sort_criteria *criteria)
458 for (i = 0; i < criteria->crit_cnt; i++)
460 const struct sort_criterion *c = &criteria->crits[i];
465 double af = case_num (a, c->fv);
466 double bf = case_num (b, c->fv);
468 result = af < bf ? -1 : af > bf;
471 result = memcmp (case_str (a, c->fv), case_str (b, c->fv), c->width);
474 return c->dir == SRT_ASCEND ? result : -result;
480 /* Compares record-run tuples A and B on run number first, then
481 on record, then on case index. */
483 compare_record_run (const struct record_run *a,
484 const struct record_run *b,
485 struct initial_run_state *irs)
487 int result = a->run < b->run ? -1 : a->run > b->run;
489 result = compare_record (&a->record, &b->record, irs->xsrt->criteria);
491 result = a->idx < b->idx ? -1 : a->idx > b->idx;
495 /* Compares record-run tuples A and B on run number first, then
496 on the current record according to SCP, but in descending
499 compare_record_run_minheap (const void *a, const void *b, void *irs)
501 return -compare_record_run (a, b, irs);
504 /* Begins a new initial run, specifically its output file. */
506 start_run (struct initial_run_state *irs)
510 irs->casefile = casefile_create (irs->xsrt->value_cnt);
511 casefile_to_disk (irs->casefile);
512 case_nullify (&irs->last_output);
515 /* Ends the current initial run. */
517 end_run (struct initial_run_state *irs)
519 struct external_sort *xsrt = irs->xsrt;
521 /* Record initial run. */
522 if (irs->casefile != NULL)
524 casefile_sleep (irs->casefile);
525 if (xsrt->run_cnt >= xsrt->run_cap)
528 xsrt->runs = xnrealloc (xsrt->runs,
529 xsrt->run_cap, sizeof *xsrt->runs);
531 xsrt->runs[xsrt->run_cnt++] = irs->casefile;
532 if (casefile_error (irs->casefile))
534 irs->casefile = NULL;
538 /* Writes a record to the current initial run. */
540 output_record (struct initial_run_state *irs)
542 struct record_run *record_run;
543 struct ccase case_tmp;
545 /* Extract minimum case from heap. */
546 assert (irs->record_cnt > 0);
547 pop_heap (irs->records, irs->record_cnt--, sizeof *irs->records,
548 compare_record_run_minheap, irs);
549 record_run = irs->records + irs->record_cnt;
551 /* Bail if an error has occurred. */
555 /* Start new run if necessary. */
556 assert (record_run->run == irs->run
557 || record_run->run == irs->run + 1);
558 if (record_run->run != irs->run)
563 assert (record_run->run == irs->run);
567 if (irs->casefile != NULL)
568 casefile_append (irs->casefile, &record_run->record);
570 /* This record becomes last_output. */
571 irs->last_output = case_tmp = record_run->record;
572 record_run->record = irs->records[irs->record_cap - 1].record;
573 irs->records[irs->record_cap - 1].record = case_tmp;
578 static int choose_merge (struct casefile *runs[], int run_cnt, int order);
579 static struct casefile *merge_once (struct external_sort *,
580 struct casefile *[], size_t);
582 /* Repeatedly merges run until only one is left,
583 and returns the final casefile.
584 Returns a null pointer if an I/O error occurs. */
585 static struct casefile *
586 merge (struct external_sort *xsrt)
588 while (xsrt->run_cnt > 1)
590 int order = min (MAX_MERGE_ORDER, xsrt->run_cnt);
591 int idx = choose_merge (xsrt->runs, xsrt->run_cnt, order);
592 xsrt->runs[idx] = merge_once (xsrt, xsrt->runs + idx, order);
593 remove_range (xsrt->runs, xsrt->run_cnt, sizeof *xsrt->runs,
595 xsrt->run_cnt -= order - 1;
597 if (xsrt->runs[idx] == NULL)
600 assert (xsrt->run_cnt == 1);
602 return xsrt->runs[0];
605 /* Chooses ORDER runs out of the RUN_CNT runs in RUNS to merge,
606 and returns the index of the first one.
608 For stability, we must merge only consecutive runs. For
609 efficiency, we choose the shortest consecutive sequence of
612 choose_merge (struct casefile *runs[], int run_cnt, int order)
614 int min_idx, min_sum;
615 int cur_idx, cur_sum;
618 /* Sum up the length of the first ORDER runs. */
620 for (i = 0; i < order; i++)
621 cur_sum += casefile_get_case_cnt (runs[i]);
623 /* Find the shortest group of ORDER runs,
624 using a running total for efficiency. */
627 for (cur_idx = 1; cur_idx + order <= run_cnt; cur_idx++)
629 cur_sum -= casefile_get_case_cnt (runs[cur_idx - 1]);
630 cur_sum += casefile_get_case_cnt (runs[cur_idx + order - 1]);
631 if (cur_sum < min_sum)
641 /* Merges the RUN_CNT initial runs specified in INPUT_FILES into a
642 new run, and returns the new run.
643 Returns a null pointer if an I/O error occurs. */
644 static struct casefile *
645 merge_once (struct external_sort *xsrt,
646 struct casefile **const input_files,
651 struct casefile *file;
652 struct casereader *reader;
657 struct casefile *output = NULL;
660 /* Open input files. */
661 runs = xnmalloc (run_cnt, sizeof *runs);
662 for (i = 0; i < run_cnt; i++)
664 struct run *r = &runs[i];
665 r->file = input_files[i];
666 r->reader = casefile_get_destructive_reader (r->file);
667 if (!casereader_read_xfer (r->reader, &r->ccase))
674 /* Create output file. */
675 output = casefile_create (xsrt->value_cnt);
676 casefile_to_disk (output);
681 struct run *min_run, *run;
685 for (run = runs + 1; run < runs + run_cnt; run++)
686 if (compare_record (&run->ccase, &min_run->ccase, xsrt->criteria) < 0)
689 /* Write minimum to output file. */
690 casefile_append_xfer (output, &min_run->ccase);
692 /* Read another case from minimum run. */
693 if (!casereader_read_xfer (min_run->reader, &min_run->ccase))
695 if (casefile_error (min_run->file) || casefile_error (output))
697 casereader_destroy (min_run->reader);
698 casefile_destroy (min_run->file);
700 remove_element (runs, run_cnt, sizeof *runs, min_run - runs);
705 if (!casefile_sleep (output))
712 for (i = 0; i < run_cnt; i++)
713 casefile_destroy (runs[i].file);
714 casefile_destroy (output);