1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
30 #include <data/case-source.h>
31 #include <data/case.h>
32 #include <data/casefile.h>
33 #include <data/fastfile.h>
34 #include <data/procedure.h>
35 #include <data/settings.h>
36 #include <data/variable.h>
37 #include <data/storage-stream.h>
38 #include <language/expressions/public.h>
39 #include <libpspp/alloc.h>
40 #include <libpspp/array.h>
41 #include <libpspp/assertion.h>
42 #include <libpspp/message.h>
43 #include <libpspp/message.h>
44 #include <libpspp/misc.h>
45 #include <libpspp/str.h>
48 #define _(msgid) gettext (msgid)
50 /* These should only be changed for testing purposes. */
52 int max_buffers = INT_MAX;
53 bool allow_internal_sort = true;
55 static int compare_record (const struct ccase *, const struct ccase *,
56 const struct sort_criteria *);
57 static struct casefile *do_internal_sort (struct casereader *,
58 const struct sort_criteria *);
59 static struct casefile *do_external_sort (struct casereader *,
60 const struct sort_criteria *);
63 /* Sorts the active file in-place according to CRITERIA.
64 Returns true if successful. */
66 sort_active_file_in_place (struct dataset *ds,
67 const struct sort_criteria *criteria)
69 struct casefile *in, *out;
71 proc_cancel_temporary_transformations (ds);
72 if (!procedure (ds, NULL, NULL))
75 in = proc_capture_output (ds);
76 out = sort_execute (casefile_get_destructive_reader (in), criteria);
80 proc_set_source (ds, storage_source_create (out));
84 /* Data passed to sort_to_casefile_callback(). */
85 struct sort_to_casefile_cb_data
87 const struct sort_criteria *criteria;
88 struct casefile *output;
91 /* Sorts casefile CF according to the criteria in CB_DATA. */
93 sort_to_casefile_callback (const struct casefile *cf, void *cb_data_)
95 struct sort_to_casefile_cb_data *cb_data = cb_data_;
96 cb_data->output = sort_execute (casefile_get_reader (cf), cb_data->criteria);
97 return cb_data->output != NULL;
100 /* Sorts the active file to a separate casefile. If successful,
101 returns the sorted casefile. Returns a null pointer on
104 sort_active_file_to_casefile (struct dataset *ds,
105 const struct sort_criteria *criteria)
107 struct sort_to_casefile_cb_data cb_data;
109 proc_cancel_temporary_transformations (ds);
111 cb_data.criteria = criteria;
112 cb_data.output = NULL;
113 if (!multipass_procedure (ds, sort_to_casefile_callback, &cb_data))
115 casefile_destroy (cb_data.output);
118 return cb_data.output;
122 /* Reads all the cases from READER, which is destroyed. Sorts
123 the cases according to CRITERIA. Returns the sorted cases in
124 a newly created casefile. */
126 sort_execute (struct casereader *reader, const struct sort_criteria *criteria)
128 struct casefile *output = do_internal_sort (reader, criteria);
130 output = do_external_sort (reader, criteria);
131 casereader_destroy (reader);
135 /* A case and its index. */
138 struct ccase c; /* Case. */
139 unsigned long idx; /* Index to allow for stable sorting. */
142 static int compare_indexed_cases (const void *, const void *, void *);
144 /* If the data is in memory, do an internal sort and return a new
145 casefile for the data. Otherwise, return a null pointer. */
146 static struct casefile *
147 do_internal_sort (struct casereader *reader,
148 const struct sort_criteria *criteria)
150 const struct casefile *src;
151 struct casefile *dst;
152 unsigned long case_cnt;
154 if (!allow_internal_sort)
157 src = casereader_get_casefile (reader);
158 if (casefile_get_case_cnt (src) > 1 && !casefile_in_core (src))
161 case_cnt = casefile_get_case_cnt (src);
162 dst = fastfile_create (casefile_get_value_cnt (src));
165 struct indexed_case *cases = nmalloc (sizeof *cases, case_cnt);
170 for (i = 0; i < case_cnt; i++)
172 bool ok = casereader_read_xfer (reader, &cases[i].c);
178 sort (cases, case_cnt, sizeof *cases, compare_indexed_cases,
181 for (i = 0; i < case_cnt; i++)
182 casefile_append_xfer (dst, &cases[i].c);
183 if (casefile_error (dst))
191 casefile_destroy (dst);
199 /* Compares the variables specified by CRITERIA between the cases
200 at A and B, with a "last resort" comparison for stability, and
201 returns a strcmp()-type result. */
203 compare_indexed_cases (const void *a_, const void *b_, void *criteria_)
205 struct sort_criteria *criteria = criteria_;
206 const struct indexed_case *a = a_;
207 const struct indexed_case *b = b_;
208 int result = compare_record (&a->c, &b->c, criteria);
210 result = a->idx < b->idx ? -1 : a->idx > b->idx;
216 /* Maximum order of merge (external sort only). The maximum
217 reasonable value is about 7. Above that, it would be a good
218 idea to use a heap in merge_once() to select the minimum. */
219 #define MAX_MERGE_ORDER 7
221 /* Results of an external sort. */
224 const struct sort_criteria *criteria; /* Sort criteria. */
225 size_t value_cnt; /* Size of data in `union value's. */
226 struct casefile **runs; /* Array of initial runs. */
227 size_t run_cnt, run_cap; /* Number of runs, allocated capacity. */
230 /* Prototypes for helper functions. */
231 static int write_runs (struct external_sort *, struct casereader *);
232 static struct casefile *merge (struct external_sort *);
233 static void destroy_external_sort (struct external_sort *);
235 /* Performs a stable external sort of the active file according
236 to the specification in SCP. Forms initial runs using a heap
237 as a reservoir. Merges the initial runs according to a
238 pattern that assures stability. */
239 static struct casefile *
240 do_external_sort (struct casereader *reader,
241 const struct sort_criteria *criteria)
243 struct external_sort *xsrt;
245 if (!casefile_to_disk (casereader_get_casefile (reader)))
248 xsrt = xmalloc (sizeof *xsrt);
249 xsrt->criteria = criteria;
250 xsrt->value_cnt = casefile_get_value_cnt (casereader_get_casefile (reader));
253 xsrt->runs = xnmalloc (xsrt->run_cap, sizeof *xsrt->runs);
254 if (write_runs (xsrt, reader))
256 struct casefile *output = merge (xsrt);
257 destroy_external_sort (xsrt);
262 destroy_external_sort (xsrt);
269 destroy_external_sort (struct external_sort *xsrt)
275 for (i = 0; i < xsrt->run_cnt; i++)
276 casefile_destroy (xsrt->runs[i]);
282 /* Replacement selection. */
284 /* Pairs a record with a run number. */
287 int run; /* Run number of case. */
288 struct ccase record; /* Case data. */
289 size_t idx; /* Case number (for stability). */
292 /* Represents a set of initial runs during an external sort. */
293 struct initial_run_state
295 struct external_sort *xsrt;
298 struct record_run *records; /* Records arranged as a heap. */
299 size_t record_cnt; /* Current number of records. */
300 size_t record_cap; /* Capacity for records. */
302 /* Run currently being output. */
303 int run; /* Run number. */
304 size_t case_cnt; /* Number of cases so far. */
305 struct casefile *casefile; /* Output file. */
306 struct ccase last_output; /* Record last output. */
308 int okay; /* Zero if an error has been encountered. */
311 static bool destroy_initial_run_state (struct initial_run_state *);
312 static void process_case (struct initial_run_state *, const struct ccase *,
314 static int allocate_cases (struct initial_run_state *);
315 static void output_record (struct initial_run_state *);
316 static void start_run (struct initial_run_state *);
317 static void end_run (struct initial_run_state *);
318 static int compare_record_run (const struct record_run *,
319 const struct record_run *,
320 struct initial_run_state *);
321 static int compare_record_run_minheap (const void *, const void *, void *);
323 /* Reads cases from READER and composes initial runs in XSRT. */
325 write_runs (struct external_sort *xsrt, struct casereader *reader)
327 struct initial_run_state *irs;
332 /* Allocate memory for cases. */
333 irs = xmalloc (sizeof *irs);
336 irs->record_cnt = irs->record_cap = 0;
339 irs->casefile = NULL;
340 case_nullify (&irs->last_output);
342 if (!allocate_cases (irs))
345 /* Create initial runs. */
347 for (; irs->okay && casereader_read (reader, &c); case_destroy (&c))
348 process_case (irs, &c, idx++);
349 while (irs->okay && irs->record_cnt > 0)
356 if (!destroy_initial_run_state (irs))
362 /* Add a single case to an initial run. */
364 process_case (struct initial_run_state *irs, const struct ccase *c, size_t idx)
366 struct record_run *rr;
368 /* Compose record_run for this run and add to heap. */
369 assert (irs->record_cnt < irs->record_cap - 1);
370 rr = irs->records + irs->record_cnt++;
371 case_copy (&rr->record, 0, c, 0, irs->xsrt->value_cnt);
374 if (!case_is_null (&irs->last_output)
375 && compare_record (c, &irs->last_output, irs->xsrt->criteria) < 0)
376 rr->run = irs->run + 1;
377 push_heap (irs->records, irs->record_cnt, sizeof *irs->records,
378 compare_record_run_minheap, irs);
380 /* Output a record if the reservoir is full. */
381 if (irs->record_cnt == irs->record_cap - 1 && irs->okay)
385 /* Destroys the initial run state represented by IRS.
386 Returns true if successful, false if an I/O error occurred. */
388 destroy_initial_run_state (struct initial_run_state *irs)
396 for (i = 0; i < irs->record_cap; i++)
397 case_destroy (&irs->records[i].record);
400 if (irs->casefile != NULL)
401 ok = casefile_sleep (irs->casefile);
407 /* Allocates room for lots of cases as a buffer. */
409 allocate_cases (struct initial_run_state *irs)
411 int approx_case_cost; /* Approximate memory cost of one case in bytes. */
412 int max_cases; /* Maximum number of cases to allocate. */
415 /* Allocate as many cases as we can within the workspace
417 approx_case_cost = (sizeof *irs->records
418 + irs->xsrt->value_cnt * sizeof (union value)
419 + 4 * sizeof (void *));
420 max_cases = get_workspace() / approx_case_cost;
421 if (max_cases > max_buffers)
422 max_cases = max_buffers;
423 irs->records = nmalloc (sizeof *irs->records, max_cases);
424 if (irs->records != NULL)
425 for (i = 0; i < max_cases; i++)
426 if (!case_try_create (&irs->records[i].record, irs->xsrt->value_cnt))
431 irs->record_cap = max_cases;
433 /* Fail if we didn't allocate an acceptable number of cases. */
434 if (irs->records == NULL || max_cases < min_buffers)
436 msg (SE, _("Out of memory. Could not allocate room for minimum of %d "
437 "cases of %d bytes each. (PSPP workspace is currently "
438 "restricted to a maximum of %d KB.)"),
439 min_buffers, approx_case_cost, get_workspace() / 1024);
445 /* Compares the VAR_CNT variables in VARS[] between the `value's at
446 A and B, and returns a strcmp()-type result. */
448 compare_record (const struct ccase *a, const struct ccase *b,
449 const struct sort_criteria *criteria)
456 for (i = 0; i < criteria->crit_cnt; i++)
458 const struct sort_criterion *c = &criteria->crits[i];
463 double af = case_num (a, c->fv);
464 double bf = case_num (b, c->fv);
466 result = af < bf ? -1 : af > bf;
469 result = memcmp (case_str (a, c->fv), case_str (b, c->fv), c->width);
472 return c->dir == SRT_ASCEND ? result : -result;
478 /* Compares record-run tuples A and B on run number first, then
479 on record, then on case index. */
481 compare_record_run (const struct record_run *a,
482 const struct record_run *b,
483 struct initial_run_state *irs)
485 int result = a->run < b->run ? -1 : a->run > b->run;
487 result = compare_record (&a->record, &b->record, irs->xsrt->criteria);
489 result = a->idx < b->idx ? -1 : a->idx > b->idx;
493 /* Compares record-run tuples A and B on run number first, then
494 on the current record according to SCP, but in descending
497 compare_record_run_minheap (const void *a, const void *b, void *irs)
499 return -compare_record_run (a, b, irs);
502 /* Begins a new initial run, specifically its output file. */
504 start_run (struct initial_run_state *irs)
508 irs->casefile = fastfile_create (irs->xsrt->value_cnt);
509 casefile_to_disk (irs->casefile);
510 case_nullify (&irs->last_output);
513 /* Ends the current initial run. */
515 end_run (struct initial_run_state *irs)
517 struct external_sort *xsrt = irs->xsrt;
519 /* Record initial run. */
520 if (irs->casefile != NULL)
522 casefile_sleep (irs->casefile);
523 if (xsrt->run_cnt >= xsrt->run_cap)
526 xsrt->runs = xnrealloc (xsrt->runs,
527 xsrt->run_cap, sizeof *xsrt->runs);
529 xsrt->runs[xsrt->run_cnt++] = irs->casefile;
530 if (casefile_error (irs->casefile))
532 irs->casefile = NULL;
536 /* Writes a record to the current initial run. */
538 output_record (struct initial_run_state *irs)
540 struct record_run *record_run;
541 struct ccase case_tmp;
543 /* Extract minimum case from heap. */
544 assert (irs->record_cnt > 0);
545 pop_heap (irs->records, irs->record_cnt--, sizeof *irs->records,
546 compare_record_run_minheap, irs);
547 record_run = irs->records + irs->record_cnt;
549 /* Bail if an error has occurred. */
553 /* Start new run if necessary. */
554 assert (record_run->run == irs->run
555 || record_run->run == irs->run + 1);
556 if (record_run->run != irs->run)
561 assert (record_run->run == irs->run);
565 if (irs->casefile != NULL)
566 casefile_append (irs->casefile, &record_run->record);
568 /* This record becomes last_output. */
569 irs->last_output = case_tmp = record_run->record;
570 record_run->record = irs->records[irs->record_cap - 1].record;
571 irs->records[irs->record_cap - 1].record = case_tmp;
576 static int choose_merge (struct casefile *runs[], int run_cnt, int order);
577 static struct casefile *merge_once (struct external_sort *,
578 struct casefile *[], size_t);
580 /* Repeatedly merges run until only one is left,
581 and returns the final casefile.
582 Returns a null pointer if an I/O error occurs. */
583 static struct casefile *
584 merge (struct external_sort *xsrt)
586 while (xsrt->run_cnt > 1)
588 int order = min (MAX_MERGE_ORDER, xsrt->run_cnt);
589 int idx = choose_merge (xsrt->runs, xsrt->run_cnt, order);
590 xsrt->runs[idx] = merge_once (xsrt, xsrt->runs + idx, order);
591 remove_range (xsrt->runs, xsrt->run_cnt, sizeof *xsrt->runs,
593 xsrt->run_cnt -= order - 1;
595 if (xsrt->runs[idx] == NULL)
598 assert (xsrt->run_cnt == 1);
600 return xsrt->runs[0];
603 /* Chooses ORDER runs out of the RUN_CNT runs in RUNS to merge,
604 and returns the index of the first one.
606 For stability, we must merge only consecutive runs. For
607 efficiency, we choose the shortest consecutive sequence of
610 choose_merge (struct casefile *runs[], int run_cnt, int order)
612 int min_idx, min_sum;
613 int cur_idx, cur_sum;
616 /* Sum up the length of the first ORDER runs. */
618 for (i = 0; i < order; i++)
619 cur_sum += casefile_get_case_cnt (runs[i]);
621 /* Find the shortest group of ORDER runs,
622 using a running total for efficiency. */
625 for (cur_idx = 1; cur_idx + order <= run_cnt; cur_idx++)
627 cur_sum -= casefile_get_case_cnt (runs[cur_idx - 1]);
628 cur_sum += casefile_get_case_cnt (runs[cur_idx + order - 1]);
629 if (cur_sum < min_sum)
639 /* Merges the RUN_CNT initial runs specified in INPUT_FILES into a
640 new run, and returns the new run.
641 Returns a null pointer if an I/O error occurs. */
642 static struct casefile *
643 merge_once (struct external_sort *xsrt,
644 struct casefile **const input_files,
649 struct casefile *file;
650 struct casereader *reader;
655 struct casefile *output = NULL;
658 /* Open input files. */
659 runs = xnmalloc (run_cnt, sizeof *runs);
660 for (i = 0; i < run_cnt; i++)
662 struct run *r = &runs[i];
663 r->file = input_files[i];
664 r->reader = casefile_get_destructive_reader (r->file);
665 if (!casereader_read_xfer (r->reader, &r->ccase))
672 /* Create output file. */
673 output = fastfile_create (xsrt->value_cnt);
674 casefile_to_disk (output);
679 struct run *min_run, *run;
683 for (run = runs + 1; run < runs + run_cnt; run++)
684 if (compare_record (&run->ccase, &min_run->ccase, xsrt->criteria) < 0)
687 /* Write minimum to output file. */
688 casefile_append_xfer (output, &min_run->ccase);
690 /* Read another case from minimum run. */
691 if (!casereader_read_xfer (min_run->reader, &min_run->ccase))
693 if (casefile_error (min_run->file) || casefile_error (output))
695 casereader_destroy (min_run->reader);
696 casefile_destroy (min_run->file);
698 remove_element (runs, run_cnt, sizeof *runs, min_run - runs);
703 if (!casefile_sleep (output))
710 for (i = 0; i < run_cnt; i++)
711 casefile_destroy (runs[i].file);
712 casefile_destroy (output);