1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
30 #include <data/case-source.h>
31 #include <data/case.h>
32 #include <data/casefile.h>
33 #include <data/procedure.h>
34 #include <data/settings.h>
35 #include <data/variable.h>
36 #include <data/storage-stream.h>
37 #include <language/expressions/public.h>
38 #include <libpspp/alloc.h>
39 #include <libpspp/array.h>
40 #include <libpspp/message.h>
41 #include <libpspp/message.h>
42 #include <libpspp/misc.h>
43 #include <libpspp/str.h>
46 #define _(msgid) gettext (msgid)
48 /* These should only be changed for testing purposes. */
50 int max_buffers = INT_MAX;
51 bool allow_internal_sort = true;
53 static int compare_record (const struct ccase *, const struct ccase *,
54 const struct sort_criteria *);
55 static struct casefile *do_internal_sort (struct casereader *,
56 const struct sort_criteria *);
57 static struct casefile *do_external_sort (struct casereader *,
58 const struct sort_criteria *);
60 /* Get ready to sort the active file. */
62 prepare_to_sort_active_file (void)
64 proc_cancel_temporary_transformations ();
65 expr_free (process_if_expr);
66 process_if_expr = NULL;
69 /* Sorts the active file in-place according to CRITERIA.
70 Returns nonzero if successful. */
72 sort_active_file_in_place (const struct sort_criteria *criteria)
74 struct casefile *in, *out;
76 prepare_to_sort_active_file ();
77 if (!procedure (NULL, NULL))
80 in = proc_capture_output ();
81 out = sort_execute (casefile_get_destructive_reader (in), criteria);
85 proc_set_source (storage_source_create (out));
89 /* Data passed to sort_to_casefile_callback(). */
90 struct sort_to_casefile_cb_data
92 const struct sort_criteria *criteria;
93 struct casefile *output;
96 /* Sorts casefile CF according to the criteria in CB_DATA. */
98 sort_to_casefile_callback (const struct casefile *cf, void *cb_data_)
100 struct sort_to_casefile_cb_data *cb_data = cb_data_;
101 cb_data->output = sort_execute (casefile_get_reader (cf), cb_data->criteria);
102 return cb_data->output != NULL;
105 /* Sorts the active file to a separate casefile. If successful,
106 returns the sorted casefile. Returns a null pointer on
109 sort_active_file_to_casefile (const struct sort_criteria *criteria)
111 struct sort_to_casefile_cb_data cb_data;
113 prepare_to_sort_active_file ();
115 cb_data.criteria = criteria;
116 cb_data.output = NULL;
117 multipass_procedure (sort_to_casefile_callback, &cb_data);
119 return cb_data.output;
123 /* Reads all the cases from READER, which is destroyed. Sorts
124 the cases according to CRITERIA. Returns the sorted cases in
125 a newly created casefile. */
127 sort_execute (struct casereader *reader, const struct sort_criteria *criteria)
129 struct casefile *output = do_internal_sort (reader, criteria);
131 output = do_external_sort (reader, criteria);
132 casereader_destroy (reader);
136 /* A case and its index. */
139 struct ccase c; /* Case. */
140 unsigned long idx; /* Index to allow for stable sorting. */
143 static int compare_indexed_cases (const void *, const void *, void *);
145 /* If the data is in memory, do an internal sort and return a new
146 casefile for the data. Otherwise, return a null pointer. */
147 static struct casefile *
148 do_internal_sort (struct casereader *reader,
149 const struct sort_criteria *criteria)
151 const struct casefile *src;
152 struct casefile *dst;
153 unsigned long case_cnt;
155 if (!allow_internal_sort)
158 src = casereader_get_casefile (reader);
159 if (casefile_get_case_cnt (src) > 1 && !casefile_in_core (src))
162 case_cnt = casefile_get_case_cnt (src);
163 dst = casefile_create (casefile_get_value_cnt (src));
166 struct indexed_case *cases = nmalloc (sizeof *cases, case_cnt);
171 for (i = 0; i < case_cnt; i++)
173 bool ok = casereader_read_xfer (reader, &cases[i].c);
179 sort (cases, case_cnt, sizeof *cases, compare_indexed_cases,
182 for (i = 0; i < case_cnt; i++)
183 casefile_append_xfer (dst, &cases[i].c);
184 if (casefile_error (dst))
192 casefile_destroy (dst);
200 /* Compares the variables specified by CRITERIA between the cases
201 at A and B, with a "last resort" comparison for stability, and
202 returns a strcmp()-type result. */
204 compare_indexed_cases (const void *a_, const void *b_, void *criteria_)
206 struct sort_criteria *criteria = criteria_;
207 const struct indexed_case *a = a_;
208 const struct indexed_case *b = b_;
209 int result = compare_record (&a->c, &b->c, criteria);
211 result = a->idx < b->idx ? -1 : a->idx > b->idx;
217 /* Maximum order of merge (external sort only). The maximum
218 reasonable value is about 7. Above that, it would be a good
219 idea to use a heap in merge_once() to select the minimum. */
220 #define MAX_MERGE_ORDER 7
222 /* Results of an external sort. */
225 const struct sort_criteria *criteria; /* Sort criteria. */
226 size_t value_cnt; /* Size of data in `union value's. */
227 struct casefile **runs; /* Array of initial runs. */
228 size_t run_cnt, run_cap; /* Number of runs, allocated capacity. */
231 /* Prototypes for helper functions. */
232 static int write_runs (struct external_sort *, struct casereader *);
233 static struct casefile *merge (struct external_sort *);
234 static void destroy_external_sort (struct external_sort *);
236 /* Performs a stable external sort of the active file according
237 to the specification in SCP. Forms initial runs using a heap
238 as a reservoir. Merges the initial runs according to a
239 pattern that assures stability. */
240 static struct casefile *
241 do_external_sort (struct casereader *reader,
242 const struct sort_criteria *criteria)
244 struct external_sort *xsrt;
246 if (!casefile_to_disk (casereader_get_casefile (reader)))
249 xsrt = xmalloc (sizeof *xsrt);
250 xsrt->criteria = criteria;
251 xsrt->value_cnt = casefile_get_value_cnt (casereader_get_casefile (reader));
254 xsrt->runs = xnmalloc (xsrt->run_cap, sizeof *xsrt->runs);
255 if (write_runs (xsrt, reader))
257 struct casefile *output = merge (xsrt);
258 destroy_external_sort (xsrt);
263 destroy_external_sort (xsrt);
270 destroy_external_sort (struct external_sort *xsrt)
276 for (i = 0; i < xsrt->run_cnt; i++)
277 casefile_destroy (xsrt->runs[i]);
283 /* Replacement selection. */
285 /* Pairs a record with a run number. */
288 int run; /* Run number of case. */
289 struct ccase record; /* Case data. */
290 size_t idx; /* Case number (for stability). */
293 /* Represents a set of initial runs during an external sort. */
294 struct initial_run_state
296 struct external_sort *xsrt;
299 struct record_run *records; /* Records arranged as a heap. */
300 size_t record_cnt; /* Current number of records. */
301 size_t record_cap; /* Capacity for records. */
303 /* Run currently being output. */
304 int run; /* Run number. */
305 size_t case_cnt; /* Number of cases so far. */
306 struct casefile *casefile; /* Output file. */
307 struct ccase last_output; /* Record last output. */
309 int okay; /* Zero if an error has been encountered. */
312 static bool destroy_initial_run_state (struct initial_run_state *);
313 static void process_case (struct initial_run_state *, const struct ccase *,
315 static int allocate_cases (struct initial_run_state *);
316 static void output_record (struct initial_run_state *);
317 static void start_run (struct initial_run_state *);
318 static void end_run (struct initial_run_state *);
319 static int compare_record_run (const struct record_run *,
320 const struct record_run *,
321 struct initial_run_state *);
322 static int compare_record_run_minheap (const void *, const void *, void *);
324 /* Reads cases from READER and composes initial runs in XSRT. */
326 write_runs (struct external_sort *xsrt, struct casereader *reader)
328 struct initial_run_state *irs;
333 /* Allocate memory for cases. */
334 irs = xmalloc (sizeof *irs);
337 irs->record_cnt = irs->record_cap = 0;
340 irs->casefile = NULL;
341 case_nullify (&irs->last_output);
343 if (!allocate_cases (irs))
346 /* Create initial runs. */
348 for (; irs->okay && casereader_read (reader, &c); case_destroy (&c))
349 process_case (irs, &c, idx++);
350 while (irs->okay && irs->record_cnt > 0)
357 if (!destroy_initial_run_state (irs))
363 /* Add a single case to an initial run. */
365 process_case (struct initial_run_state *irs, const struct ccase *c, size_t idx)
367 struct record_run *rr;
369 /* Compose record_run for this run and add to heap. */
370 assert (irs->record_cnt < irs->record_cap - 1);
371 rr = irs->records + irs->record_cnt++;
372 case_copy (&rr->record, 0, c, 0, irs->xsrt->value_cnt);
375 if (!case_is_null (&irs->last_output)
376 && compare_record (c, &irs->last_output, irs->xsrt->criteria) < 0)
377 rr->run = irs->run + 1;
378 push_heap (irs->records, irs->record_cnt, sizeof *irs->records,
379 compare_record_run_minheap, irs);
381 /* Output a record if the reservoir is full. */
382 if (irs->record_cnt == irs->record_cap - 1 && irs->okay)
386 /* Destroys the initial run state represented by IRS.
387 Returns true if successful, false if an I/O error occurred. */
389 destroy_initial_run_state (struct initial_run_state *irs)
397 for (i = 0; i < irs->record_cap; i++)
398 case_destroy (&irs->records[i].record);
401 if (irs->casefile != NULL)
402 ok = casefile_sleep (irs->casefile);
408 /* Allocates room for lots of cases as a buffer. */
410 allocate_cases (struct initial_run_state *irs)
412 int approx_case_cost; /* Approximate memory cost of one case in bytes. */
413 int max_cases; /* Maximum number of cases to allocate. */
416 /* Allocate as many cases as we can within the workspace
418 approx_case_cost = (sizeof *irs->records
419 + irs->xsrt->value_cnt * sizeof (union value)
420 + 4 * sizeof (void *));
421 max_cases = get_workspace() / approx_case_cost;
422 if (max_cases > max_buffers)
423 max_cases = max_buffers;
424 irs->records = nmalloc (sizeof *irs->records, max_cases);
425 if (irs->records != NULL)
426 for (i = 0; i < max_cases; i++)
427 if (!case_try_create (&irs->records[i].record, irs->xsrt->value_cnt))
432 irs->record_cap = max_cases;
434 /* Fail if we didn't allocate an acceptable number of cases. */
435 if (irs->records == NULL || max_cases < min_buffers)
437 msg (SE, _("Out of memory. Could not allocate room for minimum of %d "
438 "cases of %d bytes each. (PSPP workspace is currently "
439 "restricted to a maximum of %d KB.)"),
440 min_buffers, approx_case_cost, get_workspace() / 1024);
446 /* Compares the VAR_CNT variables in VARS[] between the `value's at
447 A and B, and returns a strcmp()-type result. */
449 compare_record (const struct ccase *a, const struct ccase *b,
450 const struct sort_criteria *criteria)
457 for (i = 0; i < criteria->crit_cnt; i++)
459 const struct sort_criterion *c = &criteria->crits[i];
464 double af = case_num (a, c->fv);
465 double bf = case_num (b, c->fv);
467 result = af < bf ? -1 : af > bf;
470 result = memcmp (case_str (a, c->fv), case_str (b, c->fv), c->width);
473 return c->dir == SRT_ASCEND ? result : -result;
479 /* Compares record-run tuples A and B on run number first, then
480 on record, then on case index. */
482 compare_record_run (const struct record_run *a,
483 const struct record_run *b,
484 struct initial_run_state *irs)
486 int result = a->run < b->run ? -1 : a->run > b->run;
488 result = compare_record (&a->record, &b->record, irs->xsrt->criteria);
490 result = a->idx < b->idx ? -1 : a->idx > b->idx;
494 /* Compares record-run tuples A and B on run number first, then
495 on the current record according to SCP, but in descending
498 compare_record_run_minheap (const void *a, const void *b, void *irs)
500 return -compare_record_run (a, b, irs);
503 /* Begins a new initial run, specifically its output file. */
505 start_run (struct initial_run_state *irs)
509 irs->casefile = casefile_create (irs->xsrt->value_cnt);
510 casefile_to_disk (irs->casefile);
511 case_nullify (&irs->last_output);
514 /* Ends the current initial run. */
516 end_run (struct initial_run_state *irs)
518 struct external_sort *xsrt = irs->xsrt;
520 /* Record initial run. */
521 if (irs->casefile != NULL)
523 casefile_sleep (irs->casefile);
524 if (xsrt->run_cnt >= xsrt->run_cap)
527 xsrt->runs = xnrealloc (xsrt->runs,
528 xsrt->run_cap, sizeof *xsrt->runs);
530 xsrt->runs[xsrt->run_cnt++] = irs->casefile;
531 if (casefile_error (irs->casefile))
533 irs->casefile = NULL;
537 /* Writes a record to the current initial run. */
539 output_record (struct initial_run_state *irs)
541 struct record_run *record_run;
542 struct ccase case_tmp;
544 /* Extract minimum case from heap. */
545 assert (irs->record_cnt > 0);
546 pop_heap (irs->records, irs->record_cnt--, sizeof *irs->records,
547 compare_record_run_minheap, irs);
548 record_run = irs->records + irs->record_cnt;
550 /* Bail if an error has occurred. */
554 /* Start new run if necessary. */
555 assert (record_run->run == irs->run
556 || record_run->run == irs->run + 1);
557 if (record_run->run != irs->run)
562 assert (record_run->run == irs->run);
566 if (irs->casefile != NULL)
567 casefile_append (irs->casefile, &record_run->record);
569 /* This record becomes last_output. */
570 irs->last_output = case_tmp = record_run->record;
571 record_run->record = irs->records[irs->record_cap - 1].record;
572 irs->records[irs->record_cap - 1].record = case_tmp;
577 static int choose_merge (struct casefile *runs[], int run_cnt, int order);
578 static struct casefile *merge_once (struct external_sort *,
579 struct casefile *[], size_t);
581 /* Repeatedly merges run until only one is left,
582 and returns the final casefile.
583 Returns a null pointer if an I/O error occurs. */
584 static struct casefile *
585 merge (struct external_sort *xsrt)
587 while (xsrt->run_cnt > 1)
589 int order = min (MAX_MERGE_ORDER, xsrt->run_cnt);
590 int idx = choose_merge (xsrt->runs, xsrt->run_cnt, order);
591 xsrt->runs[idx] = merge_once (xsrt, xsrt->runs + idx, order);
592 remove_range (xsrt->runs, xsrt->run_cnt, sizeof *xsrt->runs,
594 xsrt->run_cnt -= order - 1;
596 if (xsrt->runs[idx] == NULL)
599 assert (xsrt->run_cnt == 1);
601 return xsrt->runs[0];
604 /* Chooses ORDER runs out of the RUN_CNT runs in RUNS to merge,
605 and returns the index of the first one.
607 For stability, we must merge only consecutive runs. For
608 efficiency, we choose the shortest consecutive sequence of
611 choose_merge (struct casefile *runs[], int run_cnt, int order)
613 int min_idx, min_sum;
614 int cur_idx, cur_sum;
617 /* Sum up the length of the first ORDER runs. */
619 for (i = 0; i < order; i++)
620 cur_sum += casefile_get_case_cnt (runs[i]);
622 /* Find the shortest group of ORDER runs,
623 using a running total for efficiency. */
626 for (cur_idx = 1; cur_idx + order <= run_cnt; cur_idx++)
628 cur_sum -= casefile_get_case_cnt (runs[cur_idx - 1]);
629 cur_sum += casefile_get_case_cnt (runs[cur_idx + order - 1]);
630 if (cur_sum < min_sum)
640 /* Merges the RUN_CNT initial runs specified in INPUT_FILES into a
641 new run, and returns the new run.
642 Returns a null pointer if an I/O error occurs. */
643 static struct casefile *
644 merge_once (struct external_sort *xsrt,
645 struct casefile **const input_files,
650 struct casefile *file;
651 struct casereader *reader;
656 struct casefile *output = NULL;
659 /* Open input files. */
660 runs = xnmalloc (run_cnt, sizeof *runs);
661 for (i = 0; i < run_cnt; i++)
663 struct run *r = &runs[i];
664 r->file = input_files[i];
665 r->reader = casefile_get_destructive_reader (r->file);
666 if (!casereader_read_xfer (r->reader, &r->ccase))
673 /* Create output file. */
674 output = casefile_create (xsrt->value_cnt);
675 casefile_to_disk (output);
680 struct run *min_run, *run;
684 for (run = runs + 1; run < runs + run_cnt; run++)
685 if (compare_record (&run->ccase, &min_run->ccase, xsrt->criteria) < 0)
688 /* Write minimum to output file. */
689 casefile_append_xfer (output, &min_run->ccase);
691 /* Read another case from minimum run. */
692 if (!casereader_read_xfer (min_run->reader, &min_run->ccase))
694 if (casefile_error (min_run->file) || casefile_error (output))
696 casereader_destroy (min_run->reader);
697 casefile_destroy (min_run->file);
699 remove_element (runs, run_cnt, sizeof *runs, min_run - runs);
704 if (!casefile_sleep (output))
711 for (i = 0; i < run_cnt; i++)
712 casefile_destroy (runs[i].file);
713 casefile_destroy (output);