1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
30 #include <data/case-source.h>
31 #include <data/case.h>
32 #include <data/casefile.h>
33 #include <data/fastfile.h>
34 #include <data/procedure.h>
35 #include <data/settings.h>
36 #include <data/variable.h>
37 #include <data/storage-stream.h>
38 #include <libpspp/alloc.h>
39 #include <libpspp/array.h>
40 #include <libpspp/assertion.h>
41 #include <libpspp/message.h>
42 #include <libpspp/message.h>
43 #include <libpspp/misc.h>
44 #include <libpspp/str.h>
49 #define _(msgid) gettext (msgid)
51 /* These should only be changed for testing purposes. */
53 int max_buffers = INT_MAX;
54 bool allow_internal_sort = true;
56 static int compare_record (const struct ccase *, const struct ccase *,
57 const struct sort_criteria *);
58 static struct casefile *do_internal_sort (struct casereader *,
59 const struct sort_criteria *);
60 static struct casefile *do_external_sort (struct casereader *,
61 const struct sort_criteria *);
64 /* Sorts the active file in-place according to CRITERIA.
65 Returns true if successful. */
67 sort_active_file_in_place (struct dataset *ds,
68 const struct sort_criteria *criteria)
70 struct casefile *in, *out;
72 proc_cancel_temporary_transformations (ds);
73 if (!procedure (ds, NULL, NULL))
76 in = proc_capture_output (ds);
77 out = sort_execute (casefile_get_destructive_reader (in), criteria);
81 proc_set_source (ds, storage_source_create (out));
85 /* Data passed to sort_to_casefile_callback(). */
86 struct sort_to_casefile_cb_data
88 const struct sort_criteria *criteria;
89 struct casefile *output;
92 /* Sorts casefile CF according to the criteria in CB_DATA. */
94 sort_to_casefile_callback (const struct casefile *cf, void *cb_data_)
96 struct sort_to_casefile_cb_data *cb_data = cb_data_;
97 cb_data->output = sort_execute (casefile_get_reader (cf, NULL), cb_data->criteria);
98 return cb_data->output != NULL;
101 /* Sorts the active file to a separate casefile. If successful,
102 returns the sorted casefile. Returns a null pointer on
105 sort_active_file_to_casefile (struct dataset *ds,
106 const struct sort_criteria *criteria)
108 struct sort_to_casefile_cb_data cb_data;
110 proc_cancel_temporary_transformations (ds);
112 cb_data.criteria = criteria;
113 cb_data.output = NULL;
114 if (!multipass_procedure (ds, sort_to_casefile_callback, &cb_data))
116 casefile_destroy (cb_data.output);
119 return cb_data.output;
123 /* Reads all the cases from READER, which is destroyed. Sorts
124 the cases according to CRITERIA. Returns the sorted cases in
125 a newly created casefile. */
127 sort_execute (struct casereader *reader, const struct sort_criteria *criteria)
129 struct casefile *output = do_internal_sort (reader, criteria);
131 output = do_external_sort (reader, criteria);
132 casereader_destroy (reader);
136 /* A case and its index. */
139 struct ccase c; /* Case. */
140 unsigned long idx; /* Index to allow for stable sorting. */
143 static int compare_indexed_cases (const void *, const void *, const void *);
145 /* If the data is in memory, do an internal sort and return a new
146 casefile for the data. Otherwise, return a null pointer. */
147 static struct casefile *
148 do_internal_sort (struct casereader *reader,
149 const struct sort_criteria *criteria)
151 const struct casefile *src;
152 struct casefile *dst;
153 unsigned long case_cnt;
155 if (!allow_internal_sort)
158 src = casereader_get_casefile (reader);
159 if (casefile_get_case_cnt (src) > 1 && !casefile_in_core (src))
162 case_cnt = casefile_get_case_cnt (src);
163 dst = fastfile_create (casefile_get_value_cnt (src));
166 struct indexed_case *cases = nmalloc (sizeof *cases, case_cnt);
171 for (i = 0; i < case_cnt; i++)
173 bool ok = casereader_read_xfer (reader, &cases[i].c);
179 sort (cases, case_cnt, sizeof *cases, compare_indexed_cases,
182 for (i = 0; i < case_cnt; i++)
183 casefile_append_xfer (dst, &cases[i].c);
184 if (casefile_error (dst))
192 casefile_destroy (dst);
200 /* Compares the variables specified by CRITERIA between the cases
201 at A and B, with a "last resort" comparison for stability, and
202 returns a strcmp()-type result. */
204 compare_indexed_cases (const void *a_, const void *b_, const void *criteria_)
206 const struct sort_criteria *criteria = criteria_;
207 const struct indexed_case *a = a_;
208 const struct indexed_case *b = b_;
209 int result = compare_record (&a->c, &b->c, criteria);
211 result = a->idx < b->idx ? -1 : a->idx > b->idx;
217 /* Maximum order of merge (external sort only). The maximum
218 reasonable value is about 7. Above that, it would be a good
219 idea to use a heap in merge_once() to select the minimum. */
220 #define MAX_MERGE_ORDER 7
222 /* Results of an external sort. */
225 const struct sort_criteria *criteria; /* Sort criteria. */
226 size_t value_cnt; /* Size of data in `union value's. */
227 struct casefile **runs; /* Array of initial runs. */
228 size_t run_cnt, run_cap; /* Number of runs, allocated capacity. */
231 /* Prototypes for helper functions. */
232 static int write_runs (struct external_sort *, struct casereader *);
233 static struct casefile *merge (struct external_sort *);
234 static void destroy_external_sort (struct external_sort *);
236 /* Performs a stable external sort of the active file according
237 to the specification in SCP. Forms initial runs using a heap
238 as a reservoir. Merges the initial runs according to a
239 pattern that assures stability. */
240 static struct casefile *
241 do_external_sort (struct casereader *reader,
242 const struct sort_criteria *criteria)
244 struct external_sort *xsrt;
246 if (!casefile_to_disk (casereader_get_casefile (reader)))
249 xsrt = xmalloc (sizeof *xsrt);
250 xsrt->criteria = criteria;
251 xsrt->value_cnt = casefile_get_value_cnt (casereader_get_casefile (reader));
254 xsrt->runs = xnmalloc (xsrt->run_cap, sizeof *xsrt->runs);
255 if (write_runs (xsrt, reader))
257 struct casefile *output = merge (xsrt);
258 destroy_external_sort (xsrt);
263 destroy_external_sort (xsrt);
270 destroy_external_sort (struct external_sort *xsrt)
276 for (i = 0; i < xsrt->run_cnt; i++)
277 casefile_destroy (xsrt->runs[i]);
283 /* Replacement selection. */
285 /* Pairs a record with a run number. */
288 int run; /* Run number of case. */
289 struct ccase record; /* Case data. */
290 size_t idx; /* Case number (for stability). */
293 /* Represents a set of initial runs during an external sort. */
294 struct initial_run_state
296 struct external_sort *xsrt;
299 struct record_run *records; /* Records arranged as a heap. */
300 size_t record_cnt; /* Current number of records. */
301 size_t record_cap; /* Capacity for records. */
303 /* Run currently being output. */
304 int run; /* Run number. */
305 size_t case_cnt; /* Number of cases so far. */
306 struct casefile *casefile; /* Output file. */
307 struct ccase last_output; /* Record last output. */
309 int okay; /* Zero if an error has been encountered. */
312 static bool destroy_initial_run_state (struct initial_run_state *);
313 static void process_case (struct initial_run_state *,
314 const struct ccase *, size_t);
315 static int allocate_cases (struct initial_run_state *);
316 static void output_record (struct initial_run_state *);
317 static void start_run (struct initial_run_state *);
318 static void end_run (struct initial_run_state *);
319 static int compare_record_run (const struct record_run *,
320 const struct record_run *,
321 const struct initial_run_state *);
322 static int compare_record_run_minheap (const void *, const void *,
325 /* Reads cases from READER and composes initial runs in XSRT. */
327 write_runs (struct external_sort *xsrt, struct casereader *reader)
329 struct initial_run_state *irs;
334 /* Allocate memory for cases. */
335 irs = xmalloc (sizeof *irs);
338 irs->record_cnt = irs->record_cap = 0;
341 irs->casefile = NULL;
342 case_nullify (&irs->last_output);
344 if (!allocate_cases (irs))
347 /* Create initial runs. */
349 for (; irs->okay && casereader_read (reader, &c); case_destroy (&c))
350 process_case (irs, &c, idx++);
351 while (irs->okay && irs->record_cnt > 0)
358 if (!destroy_initial_run_state (irs))
364 /* Add a single case to an initial run. */
366 process_case (struct initial_run_state *irs, const struct ccase *c,
369 struct record_run *rr;
371 /* Compose record_run for this run and add to heap. */
372 assert (irs->record_cnt < irs->record_cap - 1);
373 rr = irs->records + irs->record_cnt++;
374 case_copy (&rr->record, 0, c, 0, irs->xsrt->value_cnt);
377 if (!case_is_null (&irs->last_output)
378 && compare_record (c, &irs->last_output, irs->xsrt->criteria) < 0)
379 rr->run = irs->run + 1;
380 push_heap (irs->records, irs->record_cnt, sizeof *irs->records,
381 compare_record_run_minheap, irs);
383 /* Output a record if the reservoir is full. */
384 if (irs->record_cnt == irs->record_cap - 1 && irs->okay)
388 /* Destroys the initial run state represented by IRS.
389 Returns true if successful, false if an I/O error occurred. */
391 destroy_initial_run_state (struct initial_run_state *irs)
399 for (i = 0; i < irs->record_cap; i++)
400 case_destroy (&irs->records[i].record);
403 if (irs->casefile != NULL)
404 ok = casefile_sleep (irs->casefile);
410 /* Allocates room for lots of cases as a buffer. */
412 allocate_cases (struct initial_run_state *irs)
414 int approx_case_cost; /* Approximate memory cost of one case in bytes. */
415 int max_cases; /* Maximum number of cases to allocate. */
418 /* Allocate as many cases as we can within the workspace
420 approx_case_cost = (sizeof *irs->records
421 + irs->xsrt->value_cnt * sizeof (union value)
422 + 4 * sizeof (void *));
423 max_cases = get_workspace() / approx_case_cost;
424 if (max_cases > max_buffers)
425 max_cases = max_buffers;
426 irs->records = nmalloc (sizeof *irs->records, max_cases);
427 if (irs->records != NULL)
428 for (i = 0; i < max_cases; i++)
429 if (!case_try_create (&irs->records[i].record, irs->xsrt->value_cnt))
434 irs->record_cap = max_cases;
436 /* Fail if we didn't allocate an acceptable number of cases. */
437 if (irs->records == NULL || max_cases < min_buffers)
439 msg (SE, _("Out of memory. Could not allocate room for minimum of %d "
440 "cases of %d bytes each. (PSPP workspace is currently "
441 "restricted to a maximum of %d KB.)"),
442 min_buffers, approx_case_cost, get_workspace() / 1024);
448 /* Compares the VAR_CNT variables in VARS[] between the `value's at
449 A and B, and returns a strcmp()-type result. */
451 compare_record (const struct ccase *a, const struct ccase *b,
452 const struct sort_criteria *criteria)
459 for (i = 0; i < criteria->crit_cnt; i++)
461 const struct sort_criterion *c = &criteria->crits[i];
466 double af = case_num_idx (a, c->fv);
467 double bf = case_num_idx (b, c->fv);
469 result = af < bf ? -1 : af > bf;
472 result = memcmp (case_str_idx (a, c->fv),
473 case_str_idx (b, c->fv), c->width);
476 return c->dir == SRT_ASCEND ? result : -result;
482 /* Compares record-run tuples A and B on run number first, then
483 on record, then on case index. */
485 compare_record_run (const struct record_run *a,
486 const struct record_run *b,
487 const struct initial_run_state *irs)
489 int result = a->run < b->run ? -1 : a->run > b->run;
491 result = compare_record (&a->record, &b->record, irs->xsrt->criteria);
493 result = a->idx < b->idx ? -1 : a->idx > b->idx;
497 /* Compares record-run tuples A and B on run number first, then
498 on the current record according to SCP, but in descending
501 compare_record_run_minheap (const void *a, const void *b, const void *irs)
503 return -compare_record_run (a, b, irs);
506 /* Begins a new initial run, specifically its output file. */
508 start_run (struct initial_run_state *irs)
512 irs->casefile = fastfile_create (irs->xsrt->value_cnt);
513 casefile_to_disk (irs->casefile);
514 case_nullify (&irs->last_output);
517 /* Ends the current initial run. */
519 end_run (struct initial_run_state *irs)
521 struct external_sort *xsrt = irs->xsrt;
523 /* Record initial run. */
524 if (irs->casefile != NULL)
526 casefile_sleep (irs->casefile);
527 if (xsrt->run_cnt >= xsrt->run_cap)
530 xsrt->runs = xnrealloc (xsrt->runs,
531 xsrt->run_cap, sizeof *xsrt->runs);
533 xsrt->runs[xsrt->run_cnt++] = irs->casefile;
534 if (casefile_error (irs->casefile))
536 irs->casefile = NULL;
540 /* Writes a record to the current initial run. */
542 output_record (struct initial_run_state *irs)
544 struct record_run *record_run;
545 struct ccase case_tmp;
547 /* Extract minimum case from heap. */
548 assert (irs->record_cnt > 0);
549 pop_heap (irs->records, irs->record_cnt--, sizeof *irs->records,
550 compare_record_run_minheap, irs);
551 record_run = irs->records + irs->record_cnt;
553 /* Bail if an error has occurred. */
557 /* Start new run if necessary. */
558 assert (record_run->run == irs->run
559 || record_run->run == irs->run + 1);
560 if (record_run->run != irs->run)
565 assert (record_run->run == irs->run);
569 if (irs->casefile != NULL)
570 casefile_append (irs->casefile, &record_run->record);
572 /* This record becomes last_output. */
573 irs->last_output = case_tmp = record_run->record;
574 record_run->record = irs->records[irs->record_cap - 1].record;
575 irs->records[irs->record_cap - 1].record = case_tmp;
580 static int choose_merge (struct casefile *runs[], int run_cnt, int order);
581 static struct casefile *merge_once (struct external_sort *,
582 struct casefile *[], size_t);
584 /* Repeatedly merges run until only one is left,
585 and returns the final casefile.
586 Returns a null pointer if an I/O error occurs. */
587 static struct casefile *
588 merge (struct external_sort *xsrt)
590 while (xsrt->run_cnt > 1)
592 int order = MIN (MAX_MERGE_ORDER, xsrt->run_cnt);
593 int idx = choose_merge (xsrt->runs, xsrt->run_cnt, order);
594 xsrt->runs[idx] = merge_once (xsrt, xsrt->runs + idx, order);
595 remove_range (xsrt->runs, xsrt->run_cnt, sizeof *xsrt->runs,
597 xsrt->run_cnt -= order - 1;
599 if (xsrt->runs[idx] == NULL)
602 assert (xsrt->run_cnt == 1);
604 return xsrt->runs[0];
607 /* Chooses ORDER runs out of the RUN_CNT runs in RUNS to merge,
608 and returns the index of the first one.
610 For stability, we must merge only consecutive runs. For
611 efficiency, we choose the shortest consecutive sequence of
614 choose_merge (struct casefile *runs[], int run_cnt, int order)
616 int min_idx, min_sum;
617 int cur_idx, cur_sum;
620 /* Sum up the length of the first ORDER runs. */
622 for (i = 0; i < order; i++)
623 cur_sum += casefile_get_case_cnt (runs[i]);
625 /* Find the shortest group of ORDER runs,
626 using a running total for efficiency. */
629 for (cur_idx = 1; cur_idx + order <= run_cnt; cur_idx++)
631 cur_sum -= casefile_get_case_cnt (runs[cur_idx - 1]);
632 cur_sum += casefile_get_case_cnt (runs[cur_idx + order - 1]);
633 if (cur_sum < min_sum)
643 /* Merges the RUN_CNT initial runs specified in INPUT_FILES into a
644 new run, and returns the new run.
645 Returns a null pointer if an I/O error occurs. */
646 static struct casefile *
647 merge_once (struct external_sort *xsrt,
648 struct casefile **const input_files,
653 struct casefile *file;
654 struct casereader *reader;
659 struct casefile *output = NULL;
662 /* Open input files. */
663 runs = xnmalloc (run_cnt, sizeof *runs);
664 for (i = 0; i < run_cnt; i++)
666 struct run *r = &runs[i];
667 r->file = input_files[i];
668 r->reader = casefile_get_destructive_reader (r->file);
669 if (!casereader_read_xfer (r->reader, &r->ccase))
676 /* Create output file. */
677 output = fastfile_create (xsrt->value_cnt);
678 casefile_to_disk (output);
683 struct run *min_run, *run;
687 for (run = runs + 1; run < runs + run_cnt; run++)
688 if (compare_record (&run->ccase, &min_run->ccase, xsrt->criteria) < 0)
691 /* Write minimum to output file. */
692 casefile_append_xfer (output, &min_run->ccase);
694 /* Read another case from minimum run. */
695 if (!casereader_read_xfer (min_run->reader, &min_run->ccase))
697 if (casefile_error (min_run->file) || casefile_error (output))
699 casereader_destroy (min_run->reader);
700 casefile_destroy (min_run->file);
702 remove_element (runs, run_cnt, sizeof *runs, min_run - runs);
707 if (!casefile_sleep (output))
714 for (i = 0; i < run_cnt; i++)
715 casefile_destroy (runs[i].file);
716 casefile_destroy (output);