1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
30 #include <data/case-source.h>
31 #include <data/case.h>
32 #include <data/casefile.h>
33 #include <data/fastfile.h>
34 #include <data/procedure.h>
35 #include <data/settings.h>
36 #include <data/variable.h>
37 #include <data/storage-stream.h>
38 #include <language/expressions/public.h>
39 #include <libpspp/alloc.h>
40 #include <libpspp/array.h>
41 #include <libpspp/assertion.h>
42 #include <libpspp/message.h>
43 #include <libpspp/message.h>
44 #include <libpspp/misc.h>
45 #include <libpspp/str.h>
48 #define _(msgid) gettext (msgid)
50 /* These should only be changed for testing purposes. */
52 int max_buffers = INT_MAX;
53 bool allow_internal_sort = true;
55 static int compare_record (const struct ccase *, const struct ccase *,
56 const struct sort_criteria *);
57 static struct casefile *do_internal_sort (struct casereader *,
58 const struct sort_criteria *);
59 static struct casefile *do_external_sort (struct casereader *,
60 const struct sort_criteria *);
62 /* Get ready to sort the active file. */
64 prepare_to_sort_active_file (void)
66 proc_cancel_temporary_transformations ();
69 /* Sorts the active file in-place according to CRITERIA.
70 Returns nonzero if successful. */
72 sort_active_file_in_place (const struct sort_criteria *criteria)
74 struct casefile *in, *out;
76 prepare_to_sort_active_file ();
77 if (!procedure (NULL, NULL))
80 in = proc_capture_output ();
81 out = sort_execute (casefile_get_destructive_reader (in), criteria);
85 proc_set_source (storage_source_create (out));
89 /* Data passed to sort_to_casefile_callback(). */
90 struct sort_to_casefile_cb_data
92 const struct sort_criteria *criteria;
93 struct casefile *output;
96 /* Sorts casefile CF according to the criteria in CB_DATA. */
98 sort_to_casefile_callback (const struct casefile *cf, void *cb_data_)
100 struct sort_to_casefile_cb_data *cb_data = cb_data_;
101 cb_data->output = sort_execute (casefile_get_reader (cf), cb_data->criteria);
102 return cb_data->output != NULL;
105 /* Sorts the active file to a separate casefile. If successful,
106 returns the sorted casefile. Returns a null pointer on
109 sort_active_file_to_casefile (const struct sort_criteria *criteria)
111 struct sort_to_casefile_cb_data cb_data;
113 prepare_to_sort_active_file ();
115 cb_data.criteria = criteria;
116 cb_data.output = NULL;
117 if (!multipass_procedure (sort_to_casefile_callback, &cb_data))
119 casefile_destroy (cb_data.output);
122 return cb_data.output;
126 /* Reads all the cases from READER, which is destroyed. Sorts
127 the cases according to CRITERIA. Returns the sorted cases in
128 a newly created casefile. */
130 sort_execute (struct casereader *reader, const struct sort_criteria *criteria)
132 struct casefile *output = do_internal_sort (reader, criteria);
134 output = do_external_sort (reader, criteria);
135 casereader_destroy (reader);
139 /* A case and its index. */
142 struct ccase c; /* Case. */
143 unsigned long idx; /* Index to allow for stable sorting. */
146 static int compare_indexed_cases (const void *, const void *, void *);
148 /* If the data is in memory, do an internal sort and return a new
149 casefile for the data. Otherwise, return a null pointer. */
150 static struct casefile *
151 do_internal_sort (struct casereader *reader,
152 const struct sort_criteria *criteria)
154 const struct casefile *src;
155 struct casefile *dst;
156 unsigned long case_cnt;
158 if (!allow_internal_sort)
161 src = casereader_get_casefile (reader);
162 if (casefile_get_case_cnt (src) > 1 && !casefile_in_core (src))
165 case_cnt = casefile_get_case_cnt (src);
166 dst = fastfile_create (casefile_get_value_cnt (src));
169 struct indexed_case *cases = nmalloc (sizeof *cases, case_cnt);
174 for (i = 0; i < case_cnt; i++)
176 bool ok = casereader_read_xfer (reader, &cases[i].c);
182 sort (cases, case_cnt, sizeof *cases, compare_indexed_cases,
185 for (i = 0; i < case_cnt; i++)
186 casefile_append_xfer (dst, &cases[i].c);
187 if (casefile_error (dst))
195 casefile_destroy (dst);
203 /* Compares the variables specified by CRITERIA between the cases
204 at A and B, with a "last resort" comparison for stability, and
205 returns a strcmp()-type result. */
207 compare_indexed_cases (const void *a_, const void *b_, void *criteria_)
209 struct sort_criteria *criteria = criteria_;
210 const struct indexed_case *a = a_;
211 const struct indexed_case *b = b_;
212 int result = compare_record (&a->c, &b->c, criteria);
214 result = a->idx < b->idx ? -1 : a->idx > b->idx;
220 /* Maximum order of merge (external sort only). The maximum
221 reasonable value is about 7. Above that, it would be a good
222 idea to use a heap in merge_once() to select the minimum. */
223 #define MAX_MERGE_ORDER 7
225 /* Results of an external sort. */
228 const struct sort_criteria *criteria; /* Sort criteria. */
229 size_t value_cnt; /* Size of data in `union value's. */
230 struct casefile **runs; /* Array of initial runs. */
231 size_t run_cnt, run_cap; /* Number of runs, allocated capacity. */
234 /* Prototypes for helper functions. */
235 static int write_runs (struct external_sort *, struct casereader *);
236 static struct casefile *merge (struct external_sort *);
237 static void destroy_external_sort (struct external_sort *);
239 /* Performs a stable external sort of the active file according
240 to the specification in SCP. Forms initial runs using a heap
241 as a reservoir. Merges the initial runs according to a
242 pattern that assures stability. */
243 static struct casefile *
244 do_external_sort (struct casereader *reader,
245 const struct sort_criteria *criteria)
247 struct external_sort *xsrt;
249 if (!casefile_to_disk (casereader_get_casefile (reader)))
252 xsrt = xmalloc (sizeof *xsrt);
253 xsrt->criteria = criteria;
254 xsrt->value_cnt = casefile_get_value_cnt (casereader_get_casefile (reader));
257 xsrt->runs = xnmalloc (xsrt->run_cap, sizeof *xsrt->runs);
258 if (write_runs (xsrt, reader))
260 struct casefile *output = merge (xsrt);
261 destroy_external_sort (xsrt);
266 destroy_external_sort (xsrt);
273 destroy_external_sort (struct external_sort *xsrt)
279 for (i = 0; i < xsrt->run_cnt; i++)
280 casefile_destroy (xsrt->runs[i]);
286 /* Replacement selection. */
288 /* Pairs a record with a run number. */
291 int run; /* Run number of case. */
292 struct ccase record; /* Case data. */
293 size_t idx; /* Case number (for stability). */
296 /* Represents a set of initial runs during an external sort. */
297 struct initial_run_state
299 struct external_sort *xsrt;
302 struct record_run *records; /* Records arranged as a heap. */
303 size_t record_cnt; /* Current number of records. */
304 size_t record_cap; /* Capacity for records. */
306 /* Run currently being output. */
307 int run; /* Run number. */
308 size_t case_cnt; /* Number of cases so far. */
309 struct casefile *casefile; /* Output file. */
310 struct ccase last_output; /* Record last output. */
312 int okay; /* Zero if an error has been encountered. */
315 static bool destroy_initial_run_state (struct initial_run_state *);
316 static void process_case (struct initial_run_state *, const struct ccase *,
318 static int allocate_cases (struct initial_run_state *);
319 static void output_record (struct initial_run_state *);
320 static void start_run (struct initial_run_state *);
321 static void end_run (struct initial_run_state *);
322 static int compare_record_run (const struct record_run *,
323 const struct record_run *,
324 struct initial_run_state *);
325 static int compare_record_run_minheap (const void *, const void *, void *);
327 /* Reads cases from READER and composes initial runs in XSRT. */
329 write_runs (struct external_sort *xsrt, struct casereader *reader)
331 struct initial_run_state *irs;
336 /* Allocate memory for cases. */
337 irs = xmalloc (sizeof *irs);
340 irs->record_cnt = irs->record_cap = 0;
343 irs->casefile = NULL;
344 case_nullify (&irs->last_output);
346 if (!allocate_cases (irs))
349 /* Create initial runs. */
351 for (; irs->okay && casereader_read (reader, &c); case_destroy (&c))
352 process_case (irs, &c, idx++);
353 while (irs->okay && irs->record_cnt > 0)
360 if (!destroy_initial_run_state (irs))
366 /* Add a single case to an initial run. */
368 process_case (struct initial_run_state *irs, const struct ccase *c, size_t idx)
370 struct record_run *rr;
372 /* Compose record_run for this run and add to heap. */
373 assert (irs->record_cnt < irs->record_cap - 1);
374 rr = irs->records + irs->record_cnt++;
375 case_copy (&rr->record, 0, c, 0, irs->xsrt->value_cnt);
378 if (!case_is_null (&irs->last_output)
379 && compare_record (c, &irs->last_output, irs->xsrt->criteria) < 0)
380 rr->run = irs->run + 1;
381 push_heap (irs->records, irs->record_cnt, sizeof *irs->records,
382 compare_record_run_minheap, irs);
384 /* Output a record if the reservoir is full. */
385 if (irs->record_cnt == irs->record_cap - 1 && irs->okay)
389 /* Destroys the initial run state represented by IRS.
390 Returns true if successful, false if an I/O error occurred. */
392 destroy_initial_run_state (struct initial_run_state *irs)
400 for (i = 0; i < irs->record_cap; i++)
401 case_destroy (&irs->records[i].record);
404 if (irs->casefile != NULL)
405 ok = casefile_sleep (irs->casefile);
411 /* Allocates room for lots of cases as a buffer. */
413 allocate_cases (struct initial_run_state *irs)
415 int approx_case_cost; /* Approximate memory cost of one case in bytes. */
416 int max_cases; /* Maximum number of cases to allocate. */
419 /* Allocate as many cases as we can within the workspace
421 approx_case_cost = (sizeof *irs->records
422 + irs->xsrt->value_cnt * sizeof (union value)
423 + 4 * sizeof (void *));
424 max_cases = get_workspace() / approx_case_cost;
425 if (max_cases > max_buffers)
426 max_cases = max_buffers;
427 irs->records = nmalloc (sizeof *irs->records, max_cases);
428 if (irs->records != NULL)
429 for (i = 0; i < max_cases; i++)
430 if (!case_try_create (&irs->records[i].record, irs->xsrt->value_cnt))
435 irs->record_cap = max_cases;
437 /* Fail if we didn't allocate an acceptable number of cases. */
438 if (irs->records == NULL || max_cases < min_buffers)
440 msg (SE, _("Out of memory. Could not allocate room for minimum of %d "
441 "cases of %d bytes each. (PSPP workspace is currently "
442 "restricted to a maximum of %d KB.)"),
443 min_buffers, approx_case_cost, get_workspace() / 1024);
449 /* Compares the VAR_CNT variables in VARS[] between the `value's at
450 A and B, and returns a strcmp()-type result. */
452 compare_record (const struct ccase *a, const struct ccase *b,
453 const struct sort_criteria *criteria)
460 for (i = 0; i < criteria->crit_cnt; i++)
462 const struct sort_criterion *c = &criteria->crits[i];
467 double af = case_num (a, c->fv);
468 double bf = case_num (b, c->fv);
470 result = af < bf ? -1 : af > bf;
473 result = memcmp (case_str (a, c->fv), case_str (b, c->fv), c->width);
476 return c->dir == SRT_ASCEND ? result : -result;
482 /* Compares record-run tuples A and B on run number first, then
483 on record, then on case index. */
485 compare_record_run (const struct record_run *a,
486 const struct record_run *b,
487 struct initial_run_state *irs)
489 int result = a->run < b->run ? -1 : a->run > b->run;
491 result = compare_record (&a->record, &b->record, irs->xsrt->criteria);
493 result = a->idx < b->idx ? -1 : a->idx > b->idx;
497 /* Compares record-run tuples A and B on run number first, then
498 on the current record according to SCP, but in descending
501 compare_record_run_minheap (const void *a, const void *b, void *irs)
503 return -compare_record_run (a, b, irs);
506 /* Begins a new initial run, specifically its output file. */
508 start_run (struct initial_run_state *irs)
512 irs->casefile = fastfile_create (irs->xsrt->value_cnt);
513 casefile_to_disk (irs->casefile);
514 case_nullify (&irs->last_output);
517 /* Ends the current initial run. */
519 end_run (struct initial_run_state *irs)
521 struct external_sort *xsrt = irs->xsrt;
523 /* Record initial run. */
524 if (irs->casefile != NULL)
526 casefile_sleep (irs->casefile);
527 if (xsrt->run_cnt >= xsrt->run_cap)
530 xsrt->runs = xnrealloc (xsrt->runs,
531 xsrt->run_cap, sizeof *xsrt->runs);
533 xsrt->runs[xsrt->run_cnt++] = irs->casefile;
534 if (casefile_error (irs->casefile))
536 irs->casefile = NULL;
540 /* Writes a record to the current initial run. */
542 output_record (struct initial_run_state *irs)
544 struct record_run *record_run;
545 struct ccase case_tmp;
547 /* Extract minimum case from heap. */
548 assert (irs->record_cnt > 0);
549 pop_heap (irs->records, irs->record_cnt--, sizeof *irs->records,
550 compare_record_run_minheap, irs);
551 record_run = irs->records + irs->record_cnt;
553 /* Bail if an error has occurred. */
557 /* Start new run if necessary. */
558 assert (record_run->run == irs->run
559 || record_run->run == irs->run + 1);
560 if (record_run->run != irs->run)
565 assert (record_run->run == irs->run);
569 if (irs->casefile != NULL)
570 casefile_append (irs->casefile, &record_run->record);
572 /* This record becomes last_output. */
573 irs->last_output = case_tmp = record_run->record;
574 record_run->record = irs->records[irs->record_cap - 1].record;
575 irs->records[irs->record_cap - 1].record = case_tmp;
580 static int choose_merge (struct casefile *runs[], int run_cnt, int order);
581 static struct casefile *merge_once (struct external_sort *,
582 struct casefile *[], size_t);
584 /* Repeatedly merges run until only one is left,
585 and returns the final casefile.
586 Returns a null pointer if an I/O error occurs. */
587 static struct casefile *
588 merge (struct external_sort *xsrt)
590 while (xsrt->run_cnt > 1)
592 int order = min (MAX_MERGE_ORDER, xsrt->run_cnt);
593 int idx = choose_merge (xsrt->runs, xsrt->run_cnt, order);
594 xsrt->runs[idx] = merge_once (xsrt, xsrt->runs + idx, order);
595 remove_range (xsrt->runs, xsrt->run_cnt, sizeof *xsrt->runs,
597 xsrt->run_cnt -= order - 1;
599 if (xsrt->runs[idx] == NULL)
602 assert (xsrt->run_cnt == 1);
604 return xsrt->runs[0];
607 /* Chooses ORDER runs out of the RUN_CNT runs in RUNS to merge,
608 and returns the index of the first one.
610 For stability, we must merge only consecutive runs. For
611 efficiency, we choose the shortest consecutive sequence of
614 choose_merge (struct casefile *runs[], int run_cnt, int order)
616 int min_idx, min_sum;
617 int cur_idx, cur_sum;
620 /* Sum up the length of the first ORDER runs. */
622 for (i = 0; i < order; i++)
623 cur_sum += casefile_get_case_cnt (runs[i]);
625 /* Find the shortest group of ORDER runs,
626 using a running total for efficiency. */
629 for (cur_idx = 1; cur_idx + order <= run_cnt; cur_idx++)
631 cur_sum -= casefile_get_case_cnt (runs[cur_idx - 1]);
632 cur_sum += casefile_get_case_cnt (runs[cur_idx + order - 1]);
633 if (cur_sum < min_sum)
643 /* Merges the RUN_CNT initial runs specified in INPUT_FILES into a
644 new run, and returns the new run.
645 Returns a null pointer if an I/O error occurs. */
646 static struct casefile *
647 merge_once (struct external_sort *xsrt,
648 struct casefile **const input_files,
653 struct casefile *file;
654 struct casereader *reader;
659 struct casefile *output = NULL;
662 /* Open input files. */
663 runs = xnmalloc (run_cnt, sizeof *runs);
664 for (i = 0; i < run_cnt; i++)
666 struct run *r = &runs[i];
667 r->file = input_files[i];
668 r->reader = casefile_get_destructive_reader (r->file);
669 if (!casereader_read_xfer (r->reader, &r->ccase))
676 /* Create output file. */
677 output = fastfile_create (xsrt->value_cnt);
678 casefile_to_disk (output);
683 struct run *min_run, *run;
687 for (run = runs + 1; run < runs + run_cnt; run++)
688 if (compare_record (&run->ccase, &min_run->ccase, xsrt->criteria) < 0)
691 /* Write minimum to output file. */
692 casefile_append_xfer (output, &min_run->ccase);
694 /* Read another case from minimum run. */
695 if (!casereader_read_xfer (min_run->reader, &min_run->ccase))
697 if (casefile_error (min_run->file) || casefile_error (output))
699 casereader_destroy (min_run->reader);
700 casefile_destroy (min_run->file);
702 remove_element (runs, run_cnt, sizeof *runs, min_run - runs);
707 if (!casefile_sleep (output))
714 for (i = 0; i < run_cnt; i++)
715 casefile_destroy (runs[i].file);
716 casefile_destroy (output);