1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License as
6 published by the Free Software Foundation; either version 2 of the
7 License, or (at your option) any later version.
9 This program is distributed in the hope that it will be useful, but
10 WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
29 #include <data/case-source.h>
30 #include <data/case.h>
31 #include <data/casefile.h>
32 #include <data/fastfile.h>
33 #include <data/casefile-factory.h>
34 #include <data/fastfile-factory.h>
35 #include <data/procedure.h>
36 #include <data/settings.h>
37 #include <data/variable.h>
38 #include <data/storage-stream.h>
39 #include <libpspp/alloc.h>
40 #include <libpspp/array.h>
41 #include <libpspp/assertion.h>
42 #include <libpspp/message.h>
43 #include <libpspp/message.h>
44 #include <libpspp/misc.h>
45 #include <libpspp/str.h>
50 #define _(msgid) gettext (msgid)
52 /* These should only be changed for testing purposes. */
54 int max_buffers = INT_MAX;
55 bool allow_internal_sort = true;
57 static int compare_record (const struct ccase *, const struct ccase *,
58 const struct sort_criteria *);
59 static struct casefile *do_internal_sort (struct casereader *,
60 const struct sort_criteria *,
61 struct casefile_factory *
63 static struct casefile *do_external_sort (struct casereader *,
64 const struct sort_criteria *,
65 struct casefile_factory *
69 /* Sorts the active file in-place according to CRITERIA.
70 Returns true if successful. */
72 sort_active_file_in_place (struct dataset *ds,
73 const struct sort_criteria *criteria)
75 struct casefile *in, *out;
77 proc_cancel_temporary_transformations (ds);
78 if (!procedure (ds, NULL, NULL))
81 in = proc_capture_output (ds);
82 out = sort_execute (casefile_get_destructive_reader (in), criteria,
83 dataset_get_casefile_factory (ds));
87 proc_set_source (ds, storage_source_create (out));
91 /* Data passed to sort_to_casefile_callback(). */
92 struct sort_to_casefile_cb_data
94 const struct sort_criteria *criteria;
95 struct casefile *output;
96 struct casefile_factory *factory ;
99 /* Sorts casefile CF according to the criteria in CB_DATA. */
101 sort_to_casefile_callback (const struct casefile *cf, void *cb_data_)
103 struct sort_to_casefile_cb_data *cb_data = cb_data_;
104 cb_data->output = sort_execute (casefile_get_reader (cf, NULL),
108 return cb_data->output != NULL;
111 /* Sorts the active file to a separate casefile. If successful,
112 returns the sorted casefile. Returns a null pointer on
115 sort_active_file_to_casefile (struct dataset *ds,
116 const struct sort_criteria *criteria)
118 struct sort_to_casefile_cb_data cb_data;
120 proc_cancel_temporary_transformations (ds);
122 cb_data.criteria = criteria;
123 cb_data.output = NULL;
124 cb_data.factory = dataset_get_casefile_factory (ds);
125 if (!multipass_procedure (ds, sort_to_casefile_callback, &cb_data))
127 casefile_destroy (cb_data.output);
130 return cb_data.output;
134 /* Reads all the cases from READER, which is destroyed. Sorts
135 the cases according to CRITERIA. Returns the sorted cases in
136 a newly created casefile, which will be created by FACTORY.
137 If FACTORY is NULL, then a local fastfile_factory will be used.
140 sort_execute (struct casereader *reader,
141 const struct sort_criteria *criteria,
142 struct casefile_factory *factory
145 struct casefile_factory *local_factory = NULL;
146 struct casefile *output ;
147 if ( factory == NULL )
148 factory = local_factory = fastfile_factory_create ();
150 output = do_internal_sort (reader, criteria, factory);
152 output = do_external_sort (reader, criteria, factory);
153 casereader_destroy (reader);
155 fastfile_factory_destroy (local_factory);
160 /* A case and its index. */
163 struct ccase c; /* Case. */
164 unsigned long idx; /* Index to allow for stable sorting. */
167 static int compare_indexed_cases (const void *, const void *, const void *);
169 /* If the data is in memory, do an internal sort and return a new
170 casefile for the data. Otherwise, return a null pointer. */
171 static struct casefile *
172 do_internal_sort (struct casereader *reader,
173 const struct sort_criteria *criteria,
174 struct casefile_factory *factory)
176 const struct casefile *src;
177 struct casefile *dst;
178 unsigned long case_cnt;
180 if (!allow_internal_sort)
183 src = casereader_get_casefile (reader);
184 if (casefile_get_case_cnt (src) > 1 && !casefile_in_core (src))
187 case_cnt = casefile_get_case_cnt (src);
188 dst = factory->create_casefile (factory, casefile_get_value_cnt (src));
191 struct indexed_case *cases = nmalloc (sizeof *cases, case_cnt);
196 for (i = 0; i < case_cnt; i++)
198 bool ok = casereader_read_xfer (reader, &cases[i].c);
204 sort (cases, case_cnt, sizeof *cases, compare_indexed_cases,
207 for (i = 0; i < case_cnt; i++)
208 casefile_append_xfer (dst, &cases[i].c);
209 if (casefile_error (dst))
217 casefile_destroy (dst);
225 /* Compares the variables specified by CRITERIA between the cases
226 at A and B, with a "last resort" comparison for stability, and
227 returns a strcmp()-type result. */
229 compare_indexed_cases (const void *a_, const void *b_, const void *criteria_)
231 const struct sort_criteria *criteria = criteria_;
232 const struct indexed_case *a = a_;
233 const struct indexed_case *b = b_;
234 int result = compare_record (&a->c, &b->c, criteria);
236 result = a->idx < b->idx ? -1 : a->idx > b->idx;
242 /* Maximum order of merge (external sort only). The maximum
243 reasonable value is about 7. Above that, it would be a good
244 idea to use a heap in merge_once() to select the minimum. */
245 #define MAX_MERGE_ORDER 7
247 /* Results of an external sort. */
250 const struct sort_criteria *criteria; /* Sort criteria. */
251 size_t value_cnt; /* Size of data in `union value's. */
252 struct casefile **runs; /* Array of initial runs. */
253 size_t run_cnt, run_cap; /* Number of runs, allocated capacity. */
254 struct casefile_factory *factory; /* Factory used to create the result */
257 /* Prototypes for helper functions. */
258 static int write_runs (struct external_sort *, struct casereader *);
259 static struct casefile *merge (struct external_sort *);
260 static void destroy_external_sort (struct external_sort *);
262 /* Performs a stable external sort of the active file according
263 to the specification in SCP. Forms initial runs using a heap
264 as a reservoir. Merges the initial runs according to a
265 pattern that assures stability. */
266 static struct casefile *
267 do_external_sort (struct casereader *reader,
268 const struct sort_criteria *criteria,
269 struct casefile_factory *factory
272 struct external_sort *xsrt;
274 if (!casefile_to_disk (casereader_get_casefile (reader)))
277 xsrt = xmalloc (sizeof *xsrt);
278 xsrt->criteria = criteria;
279 xsrt->value_cnt = casefile_get_value_cnt (casereader_get_casefile (reader));
282 xsrt->runs = xnmalloc (xsrt->run_cap, sizeof *xsrt->runs);
283 xsrt->factory = factory;
284 if (write_runs (xsrt, reader))
286 struct casefile *output = merge (xsrt);
287 destroy_external_sort (xsrt);
292 destroy_external_sort (xsrt);
299 destroy_external_sort (struct external_sort *xsrt)
305 for (i = 0; i < xsrt->run_cnt; i++)
306 casefile_destroy (xsrt->runs[i]);
312 /* Replacement selection. */
314 /* Pairs a record with a run number. */
317 int run; /* Run number of case. */
318 struct ccase record; /* Case data. */
319 size_t idx; /* Case number (for stability). */
322 /* Represents a set of initial runs during an external sort. */
323 struct initial_run_state
325 struct external_sort *xsrt;
328 struct record_run *records; /* Records arranged as a heap. */
329 size_t record_cnt; /* Current number of records. */
330 size_t record_cap; /* Capacity for records. */
332 /* Run currently being output. */
333 int run; /* Run number. */
334 size_t case_cnt; /* Number of cases so far. */
335 struct casefile *casefile; /* Output file. */
336 struct ccase last_output; /* Record last output. */
338 int okay; /* Zero if an error has been encountered. */
341 static bool destroy_initial_run_state (struct initial_run_state *);
342 static void process_case (struct initial_run_state *,
343 const struct ccase *, size_t);
344 static int allocate_cases (struct initial_run_state *);
345 static void output_record (struct initial_run_state *);
346 static void start_run (struct initial_run_state *);
347 static void end_run (struct initial_run_state *);
348 static int compare_record_run (const struct record_run *,
349 const struct record_run *,
350 const struct initial_run_state *);
351 static int compare_record_run_minheap (const void *, const void *,
354 /* Reads cases from READER and composes initial runs in XSRT. */
356 write_runs (struct external_sort *xsrt, struct casereader *reader)
358 struct initial_run_state *irs;
363 /* Allocate memory for cases. */
364 irs = xmalloc (sizeof *irs);
367 irs->record_cnt = irs->record_cap = 0;
370 irs->casefile = NULL;
371 case_nullify (&irs->last_output);
373 if (!allocate_cases (irs))
376 /* Create initial runs. */
378 for (; irs->okay && casereader_read (reader, &c); case_destroy (&c))
379 process_case (irs, &c, idx++);
380 while (irs->okay && irs->record_cnt > 0)
387 if (!destroy_initial_run_state (irs))
393 /* Add a single case to an initial run. */
395 process_case (struct initial_run_state *irs, const struct ccase *c,
398 struct record_run *rr;
400 /* Compose record_run for this run and add to heap. */
401 assert (irs->record_cnt < irs->record_cap - 1);
402 rr = irs->records + irs->record_cnt++;
403 case_copy (&rr->record, 0, c, 0, irs->xsrt->value_cnt);
406 if (!case_is_null (&irs->last_output)
407 && compare_record (c, &irs->last_output, irs->xsrt->criteria) < 0)
408 rr->run = irs->run + 1;
409 push_heap (irs->records, irs->record_cnt, sizeof *irs->records,
410 compare_record_run_minheap, irs);
412 /* Output a record if the reservoir is full. */
413 if (irs->record_cnt == irs->record_cap - 1 && irs->okay)
417 /* Destroys the initial run state represented by IRS.
418 Returns true if successful, false if an I/O error occurred. */
420 destroy_initial_run_state (struct initial_run_state *irs)
428 for (i = 0; i < irs->record_cap; i++)
429 case_destroy (&irs->records[i].record);
432 if (irs->casefile != NULL)
433 ok = casefile_sleep (irs->casefile);
439 /* Allocates room for lots of cases as a buffer. */
441 allocate_cases (struct initial_run_state *irs)
443 int approx_case_cost; /* Approximate memory cost of one case in bytes. */
444 int max_cases; /* Maximum number of cases to allocate. */
447 /* Allocate as many cases as we can within the workspace
449 approx_case_cost = (sizeof *irs->records
450 + irs->xsrt->value_cnt * sizeof (union value)
451 + 4 * sizeof (void *));
452 max_cases = get_workspace() / approx_case_cost;
453 if (max_cases > max_buffers)
454 max_cases = max_buffers;
455 irs->records = nmalloc (sizeof *irs->records, max_cases);
456 if (irs->records != NULL)
457 for (i = 0; i < max_cases; i++)
458 if (!case_try_create (&irs->records[i].record, irs->xsrt->value_cnt))
463 irs->record_cap = max_cases;
465 /* Fail if we didn't allocate an acceptable number of cases. */
466 if (irs->records == NULL || max_cases < min_buffers)
468 msg (SE, _("Out of memory. Could not allocate room for minimum of %d "
469 "cases of %d bytes each. (PSPP workspace is currently "
470 "restricted to a maximum of %d KB.)"),
471 min_buffers, approx_case_cost, get_workspace() / 1024);
477 /* Compares the VAR_CNT variables in VARS[] between the `value's at
478 A and B, and returns a strcmp()-type result. */
480 compare_record (const struct ccase *a, const struct ccase *b,
481 const struct sort_criteria *criteria)
488 for (i = 0; i < criteria->crit_cnt; i++)
490 const struct sort_criterion *c = &criteria->crits[i];
495 double af = case_num_idx (a, c->fv);
496 double bf = case_num_idx (b, c->fv);
498 result = af < bf ? -1 : af > bf;
501 result = memcmp (case_str_idx (a, c->fv),
502 case_str_idx (b, c->fv), c->width);
505 return c->dir == SRT_ASCEND ? result : -result;
511 /* Compares record-run tuples A and B on run number first, then
512 on record, then on case index. */
514 compare_record_run (const struct record_run *a,
515 const struct record_run *b,
516 const struct initial_run_state *irs)
518 int result = a->run < b->run ? -1 : a->run > b->run;
520 result = compare_record (&a->record, &b->record, irs->xsrt->criteria);
522 result = a->idx < b->idx ? -1 : a->idx > b->idx;
526 /* Compares record-run tuples A and B on run number first, then
527 on the current record according to SCP, but in descending
530 compare_record_run_minheap (const void *a, const void *b, const void *irs)
532 return -compare_record_run (a, b, irs);
535 /* Begins a new initial run, specifically its output file. */
537 start_run (struct initial_run_state *irs)
542 /* This casefile is internal to the sort, so don't use the factory
544 irs->casefile = fastfile_create (irs->xsrt->value_cnt);
545 casefile_to_disk (irs->casefile);
546 case_nullify (&irs->last_output);
549 /* Ends the current initial run. */
551 end_run (struct initial_run_state *irs)
553 struct external_sort *xsrt = irs->xsrt;
555 /* Record initial run. */
556 if (irs->casefile != NULL)
558 casefile_sleep (irs->casefile);
559 if (xsrt->run_cnt >= xsrt->run_cap)
562 xsrt->runs = xnrealloc (xsrt->runs,
563 xsrt->run_cap, sizeof *xsrt->runs);
565 xsrt->runs[xsrt->run_cnt++] = irs->casefile;
566 if (casefile_error (irs->casefile))
568 irs->casefile = NULL;
572 /* Writes a record to the current initial run. */
574 output_record (struct initial_run_state *irs)
576 struct record_run *record_run;
577 struct ccase case_tmp;
579 /* Extract minimum case from heap. */
580 assert (irs->record_cnt > 0);
581 pop_heap (irs->records, irs->record_cnt--, sizeof *irs->records,
582 compare_record_run_minheap, irs);
583 record_run = irs->records + irs->record_cnt;
585 /* Bail if an error has occurred. */
589 /* Start new run if necessary. */
590 assert (record_run->run == irs->run
591 || record_run->run == irs->run + 1);
592 if (record_run->run != irs->run)
597 assert (record_run->run == irs->run);
601 if (irs->casefile != NULL)
602 casefile_append (irs->casefile, &record_run->record);
604 /* This record becomes last_output. */
605 irs->last_output = case_tmp = record_run->record;
606 record_run->record = irs->records[irs->record_cap - 1].record;
607 irs->records[irs->record_cap - 1].record = case_tmp;
612 static int choose_merge (struct casefile *runs[], int run_cnt, int order);
613 static struct casefile *merge_once (struct external_sort *,
614 struct casefile *[], size_t);
616 /* Repeatedly merges run until only one is left,
617 and returns the final casefile.
618 Returns a null pointer if an I/O error occurs. */
619 static struct casefile *
620 merge (struct external_sort *xsrt)
622 while (xsrt->run_cnt > 1)
624 int order = MIN (MAX_MERGE_ORDER, xsrt->run_cnt);
625 int idx = choose_merge (xsrt->runs, xsrt->run_cnt, order);
626 xsrt->runs[idx] = merge_once (xsrt, xsrt->runs + idx, order);
627 remove_range (xsrt->runs, xsrt->run_cnt, sizeof *xsrt->runs,
629 xsrt->run_cnt -= order - 1;
631 if (xsrt->runs[idx] == NULL)
634 assert (xsrt->run_cnt == 1);
636 return xsrt->runs[0];
639 /* Chooses ORDER runs out of the RUN_CNT runs in RUNS to merge,
640 and returns the index of the first one.
642 For stability, we must merge only consecutive runs. For
643 efficiency, we choose the shortest consecutive sequence of
646 choose_merge (struct casefile *runs[], int run_cnt, int order)
648 int min_idx, min_sum;
649 int cur_idx, cur_sum;
652 /* Sum up the length of the first ORDER runs. */
654 for (i = 0; i < order; i++)
655 cur_sum += casefile_get_case_cnt (runs[i]);
657 /* Find the shortest group of ORDER runs,
658 using a running total for efficiency. */
661 for (cur_idx = 1; cur_idx + order <= run_cnt; cur_idx++)
663 cur_sum -= casefile_get_case_cnt (runs[cur_idx - 1]);
664 cur_sum += casefile_get_case_cnt (runs[cur_idx + order - 1]);
665 if (cur_sum < min_sum)
675 /* Merges the RUN_CNT initial runs specified in INPUT_FILES into a
676 new run, and returns the new run.
677 Returns a null pointer if an I/O error occurs. */
678 static struct casefile *
679 merge_once (struct external_sort *xsrt,
680 struct casefile **const input_files,
685 struct casefile *file;
686 struct casereader *reader;
691 struct casefile *output = NULL;
694 /* Open input files. */
695 runs = xnmalloc (run_cnt, sizeof *runs);
696 for (i = 0; i < run_cnt; i++)
698 struct run *r = &runs[i];
699 r->file = input_files[i];
700 r->reader = casefile_get_destructive_reader (r->file);
701 if (!casereader_read_xfer (r->reader, &r->ccase))
708 /* Create output file. */
709 output = xsrt->factory->create_casefile (xsrt->factory, xsrt->value_cnt);
710 casefile_to_disk (output);
715 struct run *min_run, *run;
719 for (run = runs + 1; run < runs + run_cnt; run++)
720 if (compare_record (&run->ccase, &min_run->ccase, xsrt->criteria) < 0)
723 /* Write minimum to output file. */
724 casefile_append_xfer (output, &min_run->ccase);
726 /* Read another case from minimum run. */
727 if (!casereader_read_xfer (min_run->reader, &min_run->ccase))
729 if (casefile_error (min_run->file) || casefile_error (output))
731 casereader_destroy (min_run->reader);
732 casefile_destroy (min_run->file);
734 remove_element (runs, run_cnt, sizeof *runs, min_run - runs);
739 if (!casefile_sleep (output))
746 for (i = 0; i < run_cnt; i++)
747 casefile_destroy (runs[i].file);
748 casefile_destroy (output);