1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
30 #include <data/case-source.h>
31 #include <data/case.h>
32 #include <data/casefile.h>
33 #include <data/settings.h>
34 #include <data/variable.h>
35 #include <data/storage-stream.h>
36 #include <language/expressions/public.h>
37 #include <libpspp/alloc.h>
38 #include <libpspp/array.h>
39 #include <libpspp/message.h>
40 #include <libpspp/message.h>
41 #include <libpspp/misc.h>
42 #include <libpspp/str.h>
43 #include <procedure.h>
46 #define _(msgid) gettext (msgid)
48 /* These should only be changed for testing purposes. */
50 int max_buffers = INT_MAX;
51 bool allow_internal_sort = true;
53 static int compare_record (const struct ccase *, const struct ccase *,
54 const struct sort_criteria *);
55 static struct casefile *do_internal_sort (struct casereader *,
56 const struct sort_criteria *);
57 static struct casefile *do_external_sort (struct casereader *,
58 const struct sort_criteria *);
60 /* Gets ready to sort the active file, either in-place or to a
63 prepare_to_sort_active_file (void)
67 /* Cancel temporary transformations and PROCESS IF. */
70 expr_free (process_if_expr);
71 process_if_expr = NULL;
73 /* Make sure source cases are in a storage source. */
74 ok = procedure (NULL, NULL);
75 assert (case_source_is_class (vfm_source, &storage_source_class));
80 /* Sorts the active file in-place according to CRITERIA.
81 Returns nonzero if successful. */
83 sort_active_file_in_place (const struct sort_criteria *criteria)
85 struct casefile *src, *dst;
87 if (!prepare_to_sort_active_file ())
90 src = storage_source_get_casefile (vfm_source);
91 dst = sort_execute (casefile_get_destructive_reader (src), criteria);
92 free_case_source (vfm_source);
98 vfm_source = storage_source_create (dst);
102 /* Sorts the active file to a separate casefile. If successful,
103 returns the sorted casefile. Returns a null pointer on
106 sort_active_file_to_casefile (const struct sort_criteria *criteria)
108 struct casefile *src;
110 if (!prepare_to_sort_active_file ())
113 src = storage_source_get_casefile (vfm_source);
114 return sort_execute (casefile_get_reader (src), criteria);
118 /* Reads all the cases from READER, which is destroyed. Sorts
119 the cases according to CRITERIA. Returns the sorted cases in
120 a newly created casefile. */
122 sort_execute (struct casereader *reader, const struct sort_criteria *criteria)
124 struct casefile *output = do_internal_sort (reader, criteria);
126 output = do_external_sort (reader, criteria);
127 casereader_destroy (reader);
131 /* A case and its index. */
134 struct ccase c; /* Case. */
135 unsigned long idx; /* Index to allow for stable sorting. */
138 static int compare_indexed_cases (const void *, const void *, void *);
140 /* If the data is in memory, do an internal sort and return a new
141 casefile for the data. Otherwise, return a null pointer. */
142 static struct casefile *
143 do_internal_sort (struct casereader *reader,
144 const struct sort_criteria *criteria)
146 const struct casefile *src;
147 struct casefile *dst;
148 unsigned long case_cnt;
150 if (!allow_internal_sort)
153 src = casereader_get_casefile (reader);
154 if (casefile_get_case_cnt (src) > 1 && !casefile_in_core (src))
157 case_cnt = casefile_get_case_cnt (src);
158 dst = casefile_create (casefile_get_value_cnt (src));
161 struct indexed_case *cases = nmalloc (sizeof *cases, case_cnt);
166 for (i = 0; i < case_cnt; i++)
168 bool ok = casereader_read_xfer (reader, &cases[i].c);
174 sort (cases, case_cnt, sizeof *cases, compare_indexed_cases,
177 for (i = 0; i < case_cnt; i++)
178 casefile_append_xfer (dst, &cases[i].c);
179 if (casefile_error (dst))
187 casefile_destroy (dst);
195 /* Compares the variables specified by CRITERIA between the cases
196 at A and B, with a "last resort" comparison for stability, and
197 returns a strcmp()-type result. */
199 compare_indexed_cases (const void *a_, const void *b_, void *criteria_)
201 struct sort_criteria *criteria = criteria_;
202 const struct indexed_case *a = a_;
203 const struct indexed_case *b = b_;
204 int result = compare_record (&a->c, &b->c, criteria);
206 result = a->idx < b->idx ? -1 : a->idx > b->idx;
212 /* Maximum order of merge (external sort only). The maximum
213 reasonable value is about 7. Above that, it would be a good
214 idea to use a heap in merge_once() to select the minimum. */
215 #define MAX_MERGE_ORDER 7
217 /* Results of an external sort. */
220 const struct sort_criteria *criteria; /* Sort criteria. */
221 size_t value_cnt; /* Size of data in `union value's. */
222 struct casefile **runs; /* Array of initial runs. */
223 size_t run_cnt, run_cap; /* Number of runs, allocated capacity. */
226 /* Prototypes for helper functions. */
227 static int write_runs (struct external_sort *, struct casereader *);
228 static struct casefile *merge (struct external_sort *);
229 static void destroy_external_sort (struct external_sort *);
231 /* Performs a stable external sort of the active file according
232 to the specification in SCP. Forms initial runs using a heap
233 as a reservoir. Merges the initial runs according to a
234 pattern that assures stability. */
235 static struct casefile *
236 do_external_sort (struct casereader *reader,
237 const struct sort_criteria *criteria)
239 struct external_sort *xsrt;
241 if (!casefile_to_disk (casereader_get_casefile (reader)))
244 xsrt = xmalloc (sizeof *xsrt);
245 xsrt->criteria = criteria;
246 xsrt->value_cnt = casefile_get_value_cnt (casereader_get_casefile (reader));
249 xsrt->runs = xnmalloc (xsrt->run_cap, sizeof *xsrt->runs);
250 if (write_runs (xsrt, reader))
252 struct casefile *output = merge (xsrt);
253 destroy_external_sort (xsrt);
258 destroy_external_sort (xsrt);
265 destroy_external_sort (struct external_sort *xsrt)
271 for (i = 0; i < xsrt->run_cnt; i++)
272 casefile_destroy (xsrt->runs[i]);
278 /* Replacement selection. */
280 /* Pairs a record with a run number. */
283 int run; /* Run number of case. */
284 struct ccase record; /* Case data. */
285 size_t idx; /* Case number (for stability). */
288 /* Represents a set of initial runs during an external sort. */
289 struct initial_run_state
291 struct external_sort *xsrt;
294 struct record_run *records; /* Records arranged as a heap. */
295 size_t record_cnt; /* Current number of records. */
296 size_t record_cap; /* Capacity for records. */
298 /* Run currently being output. */
299 int run; /* Run number. */
300 size_t case_cnt; /* Number of cases so far. */
301 struct casefile *casefile; /* Output file. */
302 struct ccase last_output; /* Record last output. */
304 int okay; /* Zero if an error has been encountered. */
307 static const struct case_sink_class sort_sink_class;
309 static bool destroy_initial_run_state (struct initial_run_state *);
310 static void process_case (struct initial_run_state *, const struct ccase *,
312 static int allocate_cases (struct initial_run_state *);
313 static void output_record (struct initial_run_state *);
314 static void start_run (struct initial_run_state *);
315 static void end_run (struct initial_run_state *);
316 static int compare_record_run (const struct record_run *,
317 const struct record_run *,
318 struct initial_run_state *);
319 static int compare_record_run_minheap (const void *, const void *, void *);
321 /* Reads cases from READER and composes initial runs in XSRT. */
323 write_runs (struct external_sort *xsrt, struct casereader *reader)
325 struct initial_run_state *irs;
330 /* Allocate memory for cases. */
331 irs = xmalloc (sizeof *irs);
334 irs->record_cnt = irs->record_cap = 0;
337 irs->casefile = NULL;
338 case_nullify (&irs->last_output);
340 if (!allocate_cases (irs))
343 /* Create initial runs. */
345 for (; irs->okay && casereader_read (reader, &c); case_destroy (&c))
346 process_case (irs, &c, idx++);
347 while (irs->okay && irs->record_cnt > 0)
354 if (!destroy_initial_run_state (irs))
360 /* Add a single case to an initial run. */
362 process_case (struct initial_run_state *irs, const struct ccase *c, size_t idx)
364 struct record_run *rr;
366 /* Compose record_run for this run and add to heap. */
367 assert (irs->record_cnt < irs->record_cap - 1);
368 rr = irs->records + irs->record_cnt++;
369 case_copy (&rr->record, 0, c, 0, irs->xsrt->value_cnt);
372 if (!case_is_null (&irs->last_output)
373 && compare_record (c, &irs->last_output, irs->xsrt->criteria) < 0)
374 rr->run = irs->run + 1;
375 push_heap (irs->records, irs->record_cnt, sizeof *irs->records,
376 compare_record_run_minheap, irs);
378 /* Output a record if the reservoir is full. */
379 if (irs->record_cnt == irs->record_cap - 1 && irs->okay)
383 /* Destroys the initial run state represented by IRS.
384 Returns true if successful, false if an I/O error occurred. */
386 destroy_initial_run_state (struct initial_run_state *irs)
394 for (i = 0; i < irs->record_cap; i++)
395 case_destroy (&irs->records[i].record);
398 if (irs->casefile != NULL)
399 ok = casefile_sleep (irs->casefile);
405 /* Allocates room for lots of cases as a buffer. */
407 allocate_cases (struct initial_run_state *irs)
409 int approx_case_cost; /* Approximate memory cost of one case in bytes. */
410 int max_cases; /* Maximum number of cases to allocate. */
413 /* Allocate as many cases as we can within the workspace
415 approx_case_cost = (sizeof *irs->records
416 + irs->xsrt->value_cnt * sizeof (union value)
417 + 4 * sizeof (void *));
418 max_cases = get_workspace() / approx_case_cost;
419 if (max_cases > max_buffers)
420 max_cases = max_buffers;
421 irs->records = nmalloc (sizeof *irs->records, max_cases);
422 if (irs->records != NULL)
423 for (i = 0; i < max_cases; i++)
424 if (!case_try_create (&irs->records[i].record, irs->xsrt->value_cnt))
429 irs->record_cap = max_cases;
431 /* Fail if we didn't allocate an acceptable number of cases. */
432 if (irs->records == NULL || max_cases < min_buffers)
434 msg (SE, _("Out of memory. Could not allocate room for minimum of %d "
435 "cases of %d bytes each. (PSPP workspace is currently "
436 "restricted to a maximum of %d KB.)"),
437 min_buffers, approx_case_cost, get_workspace() / 1024);
443 /* Compares the VAR_CNT variables in VARS[] between the `value's at
444 A and B, and returns a strcmp()-type result. */
446 compare_record (const struct ccase *a, const struct ccase *b,
447 const struct sort_criteria *criteria)
454 for (i = 0; i < criteria->crit_cnt; i++)
456 const struct sort_criterion *c = &criteria->crits[i];
461 double af = case_num (a, c->fv);
462 double bf = case_num (b, c->fv);
464 result = af < bf ? -1 : af > bf;
467 result = memcmp (case_str (a, c->fv), case_str (b, c->fv), c->width);
470 return c->dir == SRT_ASCEND ? result : -result;
476 /* Compares record-run tuples A and B on run number first, then
477 on record, then on case index. */
479 compare_record_run (const struct record_run *a,
480 const struct record_run *b,
481 struct initial_run_state *irs)
483 int result = a->run < b->run ? -1 : a->run > b->run;
485 result = compare_record (&a->record, &b->record, irs->xsrt->criteria);
487 result = a->idx < b->idx ? -1 : a->idx > b->idx;
491 /* Compares record-run tuples A and B on run number first, then
492 on the current record according to SCP, but in descending
495 compare_record_run_minheap (const void *a, const void *b, void *irs)
497 return -compare_record_run (a, b, irs);
500 /* Begins a new initial run, specifically its output file. */
502 start_run (struct initial_run_state *irs)
506 irs->casefile = casefile_create (irs->xsrt->value_cnt);
507 casefile_to_disk (irs->casefile);
508 case_nullify (&irs->last_output);
511 /* Ends the current initial run. */
513 end_run (struct initial_run_state *irs)
515 struct external_sort *xsrt = irs->xsrt;
517 /* Record initial run. */
518 if (irs->casefile != NULL)
520 casefile_sleep (irs->casefile);
521 if (xsrt->run_cnt >= xsrt->run_cap)
524 xsrt->runs = xnrealloc (xsrt->runs,
525 xsrt->run_cap, sizeof *xsrt->runs);
527 xsrt->runs[xsrt->run_cnt++] = irs->casefile;
528 if (casefile_error (irs->casefile))
530 irs->casefile = NULL;
534 /* Writes a record to the current initial run. */
536 output_record (struct initial_run_state *irs)
538 struct record_run *record_run;
539 struct ccase case_tmp;
541 /* Extract minimum case from heap. */
542 assert (irs->record_cnt > 0);
543 pop_heap (irs->records, irs->record_cnt--, sizeof *irs->records,
544 compare_record_run_minheap, irs);
545 record_run = irs->records + irs->record_cnt;
547 /* Bail if an error has occurred. */
551 /* Start new run if necessary. */
552 assert (record_run->run == irs->run
553 || record_run->run == irs->run + 1);
554 if (record_run->run != irs->run)
559 assert (record_run->run == irs->run);
563 if (irs->casefile != NULL)
564 casefile_append (irs->casefile, &record_run->record);
566 /* This record becomes last_output. */
567 irs->last_output = case_tmp = record_run->record;
568 record_run->record = irs->records[irs->record_cap - 1].record;
569 irs->records[irs->record_cap - 1].record = case_tmp;
574 static int choose_merge (struct casefile *runs[], int run_cnt, int order);
575 static struct casefile *merge_once (struct external_sort *,
576 struct casefile *[], size_t);
578 /* Repeatedly merges run until only one is left,
579 and returns the final casefile.
580 Returns a null pointer if an I/O error occurs. */
581 static struct casefile *
582 merge (struct external_sort *xsrt)
584 while (xsrt->run_cnt > 1)
586 int order = min (MAX_MERGE_ORDER, xsrt->run_cnt);
587 int idx = choose_merge (xsrt->runs, xsrt->run_cnt, order);
588 xsrt->runs[idx] = merge_once (xsrt, xsrt->runs + idx, order);
589 remove_range (xsrt->runs, xsrt->run_cnt, sizeof *xsrt->runs,
591 xsrt->run_cnt -= order - 1;
593 if (xsrt->runs[idx] == NULL)
596 assert (xsrt->run_cnt == 1);
598 return xsrt->runs[0];
601 /* Chooses ORDER runs out of the RUN_CNT runs in RUNS to merge,
602 and returns the index of the first one.
604 For stability, we must merge only consecutive runs. For
605 efficiency, we choose the shortest consecutive sequence of
608 choose_merge (struct casefile *runs[], int run_cnt, int order)
610 int min_idx, min_sum;
611 int cur_idx, cur_sum;
614 /* Sum up the length of the first ORDER runs. */
616 for (i = 0; i < order; i++)
617 cur_sum += casefile_get_case_cnt (runs[i]);
619 /* Find the shortest group of ORDER runs,
620 using a running total for efficiency. */
623 for (cur_idx = 1; cur_idx + order <= run_cnt; cur_idx++)
625 cur_sum -= casefile_get_case_cnt (runs[cur_idx - 1]);
626 cur_sum += casefile_get_case_cnt (runs[cur_idx + order - 1]);
627 if (cur_sum < min_sum)
637 /* Merges the RUN_CNT initial runs specified in INPUT_FILES into a
638 new run, and returns the new run.
639 Returns a null pointer if an I/O error occurs. */
640 static struct casefile *
641 merge_once (struct external_sort *xsrt,
642 struct casefile **const input_files,
647 struct casefile *file;
648 struct casereader *reader;
653 struct casefile *output = NULL;
656 /* Open input files. */
657 runs = xnmalloc (run_cnt, sizeof *runs);
658 for (i = 0; i < run_cnt; i++)
660 struct run *r = &runs[i];
661 r->file = input_files[i];
662 r->reader = casefile_get_destructive_reader (r->file);
663 if (!casereader_read_xfer (r->reader, &r->ccase))
670 /* Create output file. */
671 output = casefile_create (xsrt->value_cnt);
672 casefile_to_disk (output);
677 struct run *min_run, *run;
681 for (run = runs + 1; run < runs + run_cnt; run++)
682 if (compare_record (&run->ccase, &min_run->ccase, xsrt->criteria) < 0)
685 /* Write minimum to output file. */
686 casefile_append_xfer (output, &min_run->ccase);
688 /* Read another case from minimum run. */
689 if (!casereader_read_xfer (min_run->reader, &min_run->ccase))
691 if (casefile_error (min_run->file) || casefile_error (output))
693 casereader_destroy (min_run->reader);
694 casefile_destroy (min_run->file);
696 remove_element (runs, run_cnt, sizeof *runs, min_run - runs);
701 if (!casefile_sleep (output))
708 for (i = 0; i < run_cnt; i++)
709 casefile_destroy (runs[i].file);
710 casefile_destroy (output);