1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
26 #include "algorithm.h"
44 #include <sys/types.h>
51 #include "debug-print.h"
53 /* Other prototypes. */
54 static int compare_record (const union value *, const union value *,
55 const struct sort_cases_pgm *, int *idx_to_fv);
56 static int compare_case_lists (const void *, const void *, void *);
57 static struct internal_sort *do_internal_sort (struct sort_cases_pgm *,
59 static void destroy_internal_sort (struct internal_sort *);
60 static struct external_sort *do_external_sort (struct sort_cases_pgm *,
62 static void destroy_external_sort (struct external_sort *);
63 struct sort_cases_pgm *parse_sort (void);
65 /* Performs the SORT CASES procedures. */
69 struct sort_cases_pgm *scp;
72 lex_match_id ("SORT");
73 lex_match_id ("CASES");
82 msg (SE, _("SORT CASES may not be used after TEMPORARY. "
83 "Temporary transformations will be made permanent."));
87 success = sort_cases (scp, 0);
88 destroy_sort_cases_pgm (scp);
90 return lex_end_of_command ();
95 /* Parses a list of sort keys and returns a struct sort_cases_pgm
96 based on it. Returns a null pointer on error. */
97 struct sort_cases_pgm *
100 struct sort_cases_pgm *scp;
102 scp = xmalloc (sizeof *scp);
112 int prev_var_cnt = scp->var_cnt;
113 enum sort_direction direction = SRT_ASCEND;
116 if (!parse_variables (default_dict, &scp->vars, &scp->var_cnt,
117 PV_NO_DUPLICATE | PV_APPEND | PV_NO_SCRATCH))
120 /* Sort direction. */
123 if (lex_match_id ("D") || lex_match_id ("DOWN"))
124 direction = SRT_DESCEND;
125 else if (!lex_match_id ("A") && !lex_match_id ("UP"))
127 msg (SE, _("`A' or `D' expected inside parentheses."));
130 if (!lex_match (')'))
132 msg (SE, _("`)' expected."));
136 scp->dirs = xrealloc (scp->dirs, sizeof *scp->dirs * scp->var_cnt);
137 for (; prev_var_cnt < scp->var_cnt; prev_var_cnt++)
138 scp->dirs[prev_var_cnt] = direction;
140 while (token != '.' && token != '/');
145 destroy_sort_cases_pgm (scp);
149 /* Destroys a SORT CASES program. */
151 destroy_sort_cases_pgm (struct sort_cases_pgm *scp)
155 assert (scp->ref_cnt > 0);
156 if (--scp->ref_cnt > 0)
161 destroy_internal_sort (scp->isrt);
162 destroy_external_sort (scp->xsrt);
167 /* Sorts the active file based on the key variables specified in
168 global variables vars and var_cnt. The output is either to the
169 active file, if SEPARATE is zero, or to a separate file, if
170 SEPARATE is nonzero. In the latter case the output cases can be
171 read with a call to read_sort_output(). (In the former case the
172 output cases should be dealt with through the usual vfm interface.)
174 The caller is responsible for freeing vars[]. */
176 sort_cases (struct sort_cases_pgm *scp, int separate)
179 = sizeof (union value) * dict_get_compacted_value_cnt (default_dict);
181 /* Not sure this is necessary but it's good to be safe. */
182 if (separate && case_source_is_class (vfm_source, &sort_source_class))
183 procedure (NULL, NULL);
185 /* SORT CASES cancels PROCESS IF. */
186 expr_free (process_if_expr);
187 process_if_expr = NULL;
189 /* Try an internal sort first. */
190 scp->isrt = do_internal_sort (scp, separate);
191 if (scp->isrt != NULL)
194 /* Fall back to an external sort. */
195 if (vfm_source != NULL
196 && case_source_is_class (vfm_source, &storage_source_class))
197 storage_source_to_disk (vfm_source);
198 scp->xsrt = do_external_sort (scp, separate);
199 if (scp->xsrt != NULL)
202 destroy_sort_cases_pgm (scp);
206 /* Results of an internal sort. */
209 struct case_list **results;
212 /* If the data is in memory, do an internal sort. Return
214 static struct internal_sort *
215 do_internal_sort (struct sort_cases_pgm *scp, int separate)
217 struct internal_sort *isrt;
219 isrt = xmalloc (sizeof *isrt);
220 isrt->results = NULL;
222 if (case_source_is_class (vfm_source, &storage_source_class)
223 && !storage_source_on_disk (vfm_source))
225 struct case_list *case_list;
226 struct case_list **case_array;
230 case_cnt = vfm_source->class->count (vfm_source);
234 if (case_cnt > set_max_workspace / sizeof *case_array)
237 case_list = storage_source_get_cases (vfm_source);
238 case_array = malloc (sizeof *case_array * (case_cnt + 1));
239 if (case_array == NULL)
242 for (i = 0; case_list != NULL; i++)
244 case_array[i] = case_list;
245 case_list = case_list->next;
247 assert (i == case_cnt);
248 case_array[case_cnt] = NULL;
250 sort (case_array, case_cnt, sizeof *case_array,
251 compare_case_lists, scp);
255 storage_source_set_cases (vfm_source, case_array[0]);
256 for (i = 1; i <= case_cnt; i++)
257 case_array[i - 1]->next = case_array[i];
261 isrt->results = case_array;
271 /* Destroys an internal sort result. */
273 destroy_internal_sort (struct internal_sort *isrt)
277 free (isrt->results);
282 /* Compares the VAR_CNT variables in VARS[] between the
283 `case_list's at A and B, and returns a strcmp()-type
286 compare_case_lists (const void *a_, const void *b_, void *scp_)
288 struct sort_cases_pgm *scp = scp_;
289 struct case_list *const *pa = a_;
290 struct case_list *const *pb = b_;
291 struct case_list *a = *pa;
292 struct case_list *b = *pb;
294 return compare_record (a->c.data, b->c.data, scp, NULL);
299 /* Maximum order of merge. If you increase this, then you should
300 use a heap for comparing cases during merge. */
301 #define MAX_MERGE_ORDER 7
303 /* Minimum total number of records for buffers. */
304 #define MIN_BUFFER_TOTAL_SIZE_RECS 64
306 /* Minimum single input buffer size, in bytes and records. */
307 #define MIN_BUFFER_SIZE_BYTES 4096
308 #define MIN_BUFFER_SIZE_RECS 16
310 #if MIN_BUFFER_SIZE_RECS * 2 + 16 > MIN_BUFFER_TOTAL_SIZE_RECS
311 #error MIN_BUFFER_SIZE_RECS and MIN_BUFFER_TOTAL_SIZE_RECS do not make sense.
314 /* An initial run and its length. */
317 int file_idx; /* File index. */
318 size_t case_cnt; /* Number of cases. */
321 /* Sorts initial runs A and B in decending order by length. */
323 compare_initial_runs (const void *a_, const void *b_, void *aux UNUSED)
325 const struct initial_run *a = a_;
326 const struct initial_run *b = b_;
328 return a->case_cnt > b->case_cnt ? -1 : a->case_cnt <b->case_cnt;
331 /* Results of an external sort. */
334 struct sort_cases_pgm *scp; /* SORT CASES info. */
335 struct initial_run *initial_runs; /* Array of initial runs. */
336 size_t run_cnt, run_cap; /* Number of runs, allocated capacity. */
337 char *temp_dir; /* Temporary file directory name. */
338 char *temp_name; /* Name of a temporary file. */
339 int next_file_idx; /* Lowest unused file index. */
342 /* Prototypes for helper functions. */
343 static void sort_sink_write (struct case_sink *, const struct ccase *);
344 static int write_initial_runs (struct external_sort *, int separate);
345 static int init_external_sort (struct external_sort *);
346 static int merge (struct external_sort *);
347 static void rmdir_temp_dir (struct external_sort *);
348 static void remove_temp_file (struct external_sort *xsrt, int file_idx);
350 /* Performs an external sort of the active file according to the
351 specification in SCP. Forms initial runs using a heap as a
352 reservoir. Determines the optimum merge pattern via Huffman's
353 method (see Knuth vol. 3, 2nd edition, p. 365-366), and merges
354 according to that pattern. */
355 static struct external_sort *
356 do_external_sort (struct sort_cases_pgm *scp, int separate)
358 struct external_sort *xsrt;
361 xsrt = xmalloc (sizeof *xsrt);
363 if (!init_external_sort (xsrt))
365 if (!write_initial_runs (xsrt, separate))
375 /* Don't destroy anything because we'll need it for reading
381 destroy_external_sort (xsrt);
388 destroy_external_sort (struct external_sort *xsrt)
394 for (i = 0; i < xsrt->run_cnt; i++)
395 remove_temp_file (xsrt, xsrt->initial_runs[i].file_idx);
396 rmdir_temp_dir (xsrt);
397 free (xsrt->temp_dir);
398 free (xsrt->temp_name);
399 free (xsrt->initial_runs);
405 /* Creates and returns the name of a temporary directory. */
409 const char *parent_dir;
412 if (getenv ("TMPDIR") != NULL)
413 parent_dir = getenv ("TMPDIR");
415 parent_dir = P_tmpdir;
417 temp_dir = xmalloc (strlen (parent_dir) + 32);
418 sprintf (temp_dir, "%s%cpsppXXXXXX", parent_dir, DIR_SEPARATOR);
419 if (mkdtemp (temp_dir) == NULL)
421 msg (SE, _("%s: Creating temporary directory: %s."),
422 temp_dir, strerror (errno));
429 #else /* !HAVE_MKDTEMP */
430 /* Creates directory DIR. */
432 do_mkdir (const char *dir)
435 return mkdir (dir, S_IRWXU);
441 /* Creates and returns the name of a temporary directory. */
447 for (i = 0; i < 100; i++)
449 char temp_dir[L_tmpnam + 1];
450 if (tmpnam (temp_dir) == NULL)
452 msg (SE, _("Generating temporary directory name failed: %s."),
456 else if (do_mkdir (temp_dir) == 0)
457 return xstrdup (temp_dir);
460 msg (SE, _("Creating temporary directory failed: %s."), strerror (errno));
463 #endif /* !HAVE_MKDTEMP */
465 /* Sets up to open temporary files. */
467 init_external_sort (struct external_sort *xsrt)
470 xsrt->temp_dir = NULL;
471 xsrt->next_file_idx = 0;
476 xsrt->initial_runs = xmalloc (sizeof *xsrt->initial_runs * xsrt->run_cap);
478 /* Temporary directory. */
479 xsrt->temp_dir = make_temp_dir ();
480 xsrt->temp_name = NULL;
481 if (xsrt->temp_dir == NULL)
483 xsrt->temp_name = xmalloc (strlen (xsrt->temp_dir) + 64);
488 /* Returns nonzero if we should return an error even though the
489 operation succeeded. Useful for testing. */
491 simulate_error (void)
493 static int op_err_cnt = -1;
496 if (op_err_cnt == -1 || op_cnt++ < op_err_cnt)
505 /* Removes the directory created for temporary files, if one
508 rmdir_temp_dir (struct external_sort *xsrt)
510 if (xsrt->temp_dir != NULL && rmdir (xsrt->temp_dir) == -1)
512 msg (SE, _("%s: Error removing directory for temporary files: %s."),
513 xsrt->temp_dir, strerror (errno));
514 xsrt->temp_dir = NULL;
518 /* Returns the name of temporary file number FILE_IDX for XSRT.
519 The name is written into a static buffer, so be careful. */
521 get_temp_file_name (struct external_sort *xsrt, int file_idx)
523 assert (xsrt->temp_dir != NULL);
524 sprintf (xsrt->temp_name, "%s%c%04d",
525 xsrt->temp_dir, DIR_SEPARATOR, file_idx);
526 return xsrt->temp_name;
529 /* Opens temporary file numbered FILE_IDX for XSRT with mode MODE
530 and returns the FILE *. */
532 open_temp_file (struct external_sort *xsrt, int file_idx, const char *mode)
537 temp_file = get_temp_file_name (xsrt, file_idx);
539 file = fopen (temp_file, mode);
540 if (simulate_error () || file == NULL)
541 msg (SE, _("%s: Error opening temporary file for %s: %s."),
542 temp_file, mode[0] == 'r' ? "reading" : "writing",
548 /* Closes FILE, which is the temporary file numbered FILE_IDX
549 under XSRT. Returns nonzero only if successful. */
551 close_temp_file (struct external_sort *xsrt, int file_idx, FILE *file)
555 char *temp_file = get_temp_file_name (xsrt, file_idx);
556 if (simulate_error () || fclose (file) == EOF)
558 msg (SE, _("%s: Error closing temporary file: %s."),
559 temp_file, strerror (errno));
566 /* Delete temporary file numbered FILE_IDX for XSRT. */
568 remove_temp_file (struct external_sort *xsrt, int file_idx)
572 char *temp_file = get_temp_file_name (xsrt, file_idx);
573 if (simulate_error () || remove (temp_file) != 0)
574 msg (SE, _("%s: Error removing temporary file: %s."),
575 temp_file, strerror (errno));
579 /* Writes SIZE bytes from buffer DATA into FILE, which is
580 temporary file numbered FILE_IDX from XSRT. */
582 write_temp_file (struct external_sort *xsrt, int file_idx,
583 FILE *file, const void *data, size_t size)
585 if (!simulate_error () && fwrite (data, size, 1, file) == 1)
589 char *temp_file = get_temp_file_name (xsrt, file_idx);
590 msg (SE, _("%s: Error writing temporary file: %s."),
591 temp_file, strerror (errno));
596 /* Reads SIZE bytes into buffer DATA into FILE, which is
597 temporary file numbered FILE_IDX from XSRT. */
599 read_temp_file (struct external_sort *xsrt, int file_idx,
600 FILE *file, void *data, size_t size)
602 if (!simulate_error () && fread (data, size, 1, file) == 1)
606 char *temp_file = get_temp_file_name (xsrt, file_idx);
608 msg (SE, _("%s: Error reading temporary file: %s."),
609 temp_file, strerror (errno));
611 msg (SE, _("%s: Unexpected end of temporary file."),
617 /* Replacement selection. */
619 /* Pairs a record with a run number. */
622 int run; /* Run number of case. */
623 struct case_list *record; /* Case data. */
626 /* Represents a set of initial runs during an external sort. */
627 struct initial_run_state
629 struct external_sort *xsrt;
631 int *idx_to_fv; /* Translation table copied from sink. */
634 struct record_run *records; /* Records arranged as a heap. */
635 size_t record_cnt; /* Current number of records. */
636 size_t record_cap; /* Capacity for records. */
637 struct case_list *free_list;/* Cases not in heap. */
639 /* Run currently being output. */
640 int file_idx; /* Temporary file number. */
641 size_t case_cnt; /* Number of cases so far. */
642 FILE *output_file; /* Output file. */
643 struct case_list *last_output;/* Record last output. */
645 int okay; /* Zero if an error has been encountered. */
648 static const struct case_sink_class sort_sink_class;
650 static void destroy_initial_run_state (struct initial_run_state *irs);
651 static int allocate_cases (struct initial_run_state *);
652 static struct case_list *grab_case (struct initial_run_state *);
653 static void release_case (struct initial_run_state *, struct case_list *);
654 static void output_record (struct initial_run_state *irs);
655 static void start_run (struct initial_run_state *irs);
656 static void end_run (struct initial_run_state *irs);
657 static int compare_record_run (const struct record_run *,
658 const struct record_run *,
659 struct initial_run_state *);
660 static int compare_record_run_minheap (const void *, const void *, void *);
662 /* Writes initial runs for XSRT, sending them to a separate file
663 if SEPARATE is nonzero. */
665 write_initial_runs (struct external_sort *xsrt, int separate)
667 struct initial_run_state *irs;
670 /* Allocate memory for cases. */
671 irs = xmalloc (sizeof *irs);
674 irs->record_cnt = irs->record_cap = 0;
675 irs->free_list = NULL;
676 irs->output_file = NULL;
677 irs->last_output = NULL;
681 if (!allocate_cases (irs))
684 /* Create case sink. */
687 if (vfm_sink != NULL && vfm_sink->class->destroy != NULL)
688 vfm_sink->class->destroy (vfm_sink);
689 vfm_sink = create_case_sink (&sort_sink_class, default_dict, irs);
690 xsrt->scp->ref_cnt++;
693 /* Create initial runs. */
695 procedure (NULL, NULL);
696 irs->idx_to_fv = NULL;
697 while (irs->record_cnt > 0 && irs->okay)
704 destroy_initial_run_state (irs);
709 /* Add a single case to an initial run. */
711 sort_sink_write (struct case_sink *sink, const struct ccase *c)
713 struct initial_run_state *irs = sink->aux;
714 struct record_run *new_record_run;
719 irs->idx_to_fv = sink->idx_to_fv;
721 /* Compose record_run for this run and add to heap. */
722 assert (irs->record_cnt < irs->record_cap);
723 new_record_run = irs->records + irs->record_cnt++;
724 new_record_run->record = grab_case (irs);
725 memcpy (new_record_run->record->c.data, c->data, irs->xsrt->scp->case_size);
726 new_record_run->run = irs->file_idx;
727 if (irs->last_output != NULL
728 && compare_record (c->data, irs->last_output->c.data,
729 irs->xsrt->scp, sink->idx_to_fv) < 0)
730 new_record_run->run = irs->xsrt->next_file_idx;
731 push_heap (irs->records, irs->record_cnt, sizeof *irs->records,
732 compare_record_run_minheap, irs);
734 /* Output a record if the reservoir is full. */
735 if (irs->record_cnt == irs->record_cap && irs->okay)
739 /* Destroys the initial run state represented by IRS. */
741 destroy_initial_run_state (struct initial_run_state *irs)
743 struct case_list *iter, *next;
749 /* Release cases to free list. */
750 for (i = 0; i < irs->record_cnt; i++)
751 release_case (irs, irs->records[i].record);
752 if (irs->last_output != NULL)
753 release_case (irs, irs->last_output);
755 /* Free cases in free list. */
756 for (iter = irs->free_list; iter != NULL; iter = next)
763 if (irs->output_file != NULL)
764 close_temp_file (irs->xsrt, irs->file_idx, irs->output_file);
769 /* Allocates room for lots of cases as a buffer. */
771 allocate_cases (struct initial_run_state *irs)
773 int approx_case_cost; /* Approximate memory cost of one case in bytes. */
774 int max_cases; /* Maximum number of cases to allocate. */
777 /* Allocate as many cases as we can within the workspace
779 approx_case_cost = (sizeof *irs->records
780 + sizeof *irs->free_list
781 + irs->xsrt->scp->case_size
782 + 4 * sizeof (void *));
783 max_cases = set_max_workspace / approx_case_cost;
784 irs->records = malloc (sizeof *irs->records * max_cases);
785 for (i = 0; i < max_cases; i++)
788 c = malloc (sizeof *c
789 + irs->xsrt->scp->case_size
790 - sizeof (union value));
796 release_case (irs, c);
799 /* irs->records gets all but one of the allocated cases.
800 The extra is used for last_output. */
801 irs->record_cap = max_cases - 1;
803 /* Fail if we didn't allocate an acceptable number of cases. */
804 if (irs->records == NULL || max_cases < MIN_BUFFER_TOTAL_SIZE_RECS)
806 msg (SE, _("Out of memory. Could not allocate room for minimum of %d "
807 "cases of %d bytes each. (PSPP workspace is currently "
808 "restricted to a maximum of %d KB.)"),
809 MIN_BUFFER_TOTAL_SIZE_RECS, approx_case_cost, set_max_workspace / 1024);
815 /* Compares the VAR_CNT variables in VARS[] between the `value's at
816 A and B, and returns a strcmp()-type result. */
818 compare_record (const union value *a, const union value *b,
819 const struct sort_cases_pgm *scp,
827 for (i = 0; i < scp->var_cnt; i++)
829 struct variable *v = scp->vars[i];
833 if (idx_to_fv != NULL)
834 fv = idx_to_fv[v->index];
838 if (v->type == NUMERIC)
843 result = af < bf ? -1 : af > bf;
846 result = memcmp (a[fv].s, b[fv].s, v->width);
850 if (scp->dirs[i] == SRT_DESCEND)
859 /* Compares record-run tuples A and B on run number first, then
860 on the current record according to SCP. */
862 compare_record_run (const struct record_run *a,
863 const struct record_run *b,
864 struct initial_run_state *irs)
866 if (a->run != b->run)
867 return a->run > b->run ? 1 : -1;
869 return compare_record (a->record->c.data, b->record->c.data,
870 irs->xsrt->scp, irs->idx_to_fv);
873 /* Compares record-run tuples A and B on run number first, then
874 on the current record according to SCP, but in descending
877 compare_record_run_minheap (const void *a, const void *b, void *irs)
879 return -compare_record_run (a, b, irs);
882 /* Begins a new initial run, specifically its output file. */
884 start_run (struct initial_run_state *irs)
886 irs->file_idx = irs->xsrt->next_file_idx++;
888 irs->output_file = open_temp_file (irs->xsrt, irs->file_idx, "wb");
889 if (irs->output_file == NULL)
891 if (irs->last_output != NULL)
893 release_case (irs, irs->last_output);
894 irs->last_output = NULL;
898 /* Ends the current initial run. */
900 end_run (struct initial_run_state *irs)
902 struct external_sort *xsrt = irs->xsrt;
904 /* Record initial run. */
905 if (xsrt->run_cnt >= xsrt->run_cap)
909 = xrealloc (xsrt->initial_runs,
910 sizeof *xsrt->initial_runs * xsrt->run_cap);
912 xsrt->initial_runs[xsrt->run_cnt].file_idx = irs->file_idx;
913 xsrt->initial_runs[xsrt->run_cnt].case_cnt = irs->case_cnt;
916 /* Close file handle. */
917 if (irs->output_file != NULL
918 && !close_temp_file (irs->xsrt, irs->file_idx, irs->output_file))
920 irs->output_file = NULL;
923 /* Writes a record to the current initial run. */
925 output_record (struct initial_run_state *irs)
927 struct record_run *record_run;
929 /* Extract minimum case from heap. */
930 assert (irs->record_cnt > 0);
931 pop_heap (irs->records, irs->record_cnt--, sizeof *irs->records,
932 compare_record_run_minheap, irs);
933 record_run = irs->records + irs->record_cnt;
935 /* Bail if an error has occurred. */
939 /* Start new run if necessary. */
940 assert (record_run->run == irs->file_idx
941 || record_run->run == irs->xsrt->next_file_idx);
942 if (record_run->run != irs->file_idx)
947 assert (record_run->run == irs->file_idx);
951 if (irs->output_file != NULL
952 && !write_temp_file (irs->xsrt, irs->file_idx, irs->output_file,
953 &record_run->record->c, irs->xsrt->scp->case_size))
956 /* This record becomes last_output. */
957 if (irs->last_output != NULL)
958 release_case (irs, irs->last_output);
959 irs->last_output = record_run->record;
962 /* Gets a case from the free list in IRS. It is an error to call
963 this function if the free list is empty. */
964 static struct case_list *
965 grab_case (struct initial_run_state *irs)
969 assert (irs != NULL);
970 assert (irs->free_list != NULL);
973 irs->free_list = c->next;
977 /* Returns C to the free list in IRS. */
979 release_case (struct initial_run_state *irs, struct case_list *c)
981 assert (irs != NULL);
984 c->next = irs->free_list;
990 /* State of merging initial runs. */
993 struct external_sort *xsrt; /* External sort state. */
994 struct ccase **cases; /* Buffers. */
995 size_t case_cnt; /* Number of buffers. */
999 static int merge_once (struct merge_state *,
1000 const struct initial_run[], size_t,
1001 struct initial_run *);
1002 static int fill_run_buffer (struct merge_state *, struct run *);
1003 static int mod (int, int);
1005 /* Performs a series of P-way merges of initial runs
1008 merge (struct external_sort *xsrt)
1010 struct merge_state mrg; /* State of merge. */
1011 size_t approx_case_cost; /* Approximate memory cost of one case. */
1012 int max_order; /* Maximum order of merge. */
1013 size_t dummy_run_cnt; /* Number of dummy runs to insert. */
1019 /* Allocate as many cases as possible into cases. */
1020 approx_case_cost = (sizeof *mrg.cases
1021 + xsrt->scp->case_size + 4 * sizeof (void *));
1022 mrg.case_cnt = set_max_workspace / approx_case_cost;
1023 mrg.cases = malloc (sizeof *mrg.cases * mrg.case_cnt);
1024 if (mrg.cases == NULL)
1026 for (i = 0; i < mrg.case_cnt; i++)
1028 mrg.cases[i] = malloc (xsrt->scp->case_size);
1029 if (mrg.cases[i] == NULL)
1035 if (mrg.case_cnt < MIN_BUFFER_TOTAL_SIZE_RECS)
1037 msg (SE, _("Out of memory. Could not allocate room for minimum of %d "
1038 "cases of %d bytes each. (PSPP workspace is currently "
1039 "restricted to a maximum of %d KB.)"),
1040 MIN_BUFFER_TOTAL_SIZE_RECS, approx_case_cost, set_max_workspace / 1024);
1044 /* Determine maximum order of merge. */
1045 max_order = MAX_MERGE_ORDER;
1046 if (mrg.case_cnt / max_order < MIN_BUFFER_SIZE_RECS)
1047 max_order = mrg.case_cnt / MIN_BUFFER_SIZE_RECS;
1048 else if (mrg.case_cnt / max_order * xsrt->scp->case_size
1049 < MIN_BUFFER_SIZE_BYTES)
1050 max_order = mrg.case_cnt / (MIN_BUFFER_SIZE_BYTES / xsrt->scp->case_size);
1053 if (max_order > xsrt->run_cnt)
1054 max_order = xsrt->run_cnt;
1056 /* Repeatedly merge the P shortest existing runs until only one run
1058 make_heap (xsrt->initial_runs, xsrt->run_cnt, sizeof *xsrt->initial_runs,
1059 compare_initial_runs, NULL);
1060 dummy_run_cnt = mod (1 - (int) xsrt->run_cnt, max_order - 1);
1061 assert (max_order == 1
1062 || (xsrt->run_cnt + dummy_run_cnt) % (max_order - 1) == 1);
1063 while (xsrt->run_cnt > 1)
1065 struct initial_run output_run;
1069 /* Choose order of merge (max_order after first merge). */
1070 order = max_order - dummy_run_cnt;
1073 /* Choose runs to merge. */
1074 assert (xsrt->run_cnt >= order);
1075 for (i = 0; i < order; i++)
1076 pop_heap (xsrt->initial_runs, xsrt->run_cnt--,
1077 sizeof *xsrt->initial_runs,
1078 compare_initial_runs, NULL);
1081 if (!merge_once (&mrg, xsrt->initial_runs + xsrt->run_cnt, order,
1085 /* Add output run to heap. */
1086 xsrt->initial_runs[xsrt->run_cnt++] = output_run;
1087 push_heap (xsrt->initial_runs, xsrt->run_cnt, sizeof *xsrt->initial_runs,
1088 compare_initial_runs, NULL);
1091 /* Exactly one run is left, which contains the entire sorted
1092 file. We could use it to find a total case count. */
1093 assert (xsrt->run_cnt == 1);
1098 for (i = 0; i < mrg.case_cnt; i++)
1099 free (mrg.cases[i]);
1105 /* Modulo function as defined by Knuth. */
1113 else if (x > 0 && y > 0)
1115 else if (x < 0 && y > 0)
1116 return y - (-x) % y;
1121 /* A run of data for use in merging. */
1124 FILE *file; /* File that contains run. */
1125 int file_idx; /* Index of file that contains run. */
1126 struct ccase **buffer; /* Case buffer. */
1127 struct ccase **buffer_head; /* First unconsumed case in buffer. */
1128 struct ccase **buffer_tail; /* One past last unconsumed case in buffer. */
1129 size_t buffer_cap; /* Number of cases buffer can hold. */
1130 size_t unread_case_cnt; /* Number of cases not yet read. */
1133 /* Merges the RUN_CNT initial runs specified in INPUT_RUNS into a
1134 new run. Returns nonzero only if successful. Adds an entry
1135 to MRG->xsrt->runs for the output file if and only if the
1136 output file is actually created. Always deletes all the input
1139 merge_once (struct merge_state *mrg,
1140 const struct initial_run input_runs[],
1142 struct initial_run *output_run)
1144 struct run runs[MAX_MERGE_ORDER];
1145 FILE *output_file = NULL;
1149 /* Initialize runs[]. */
1150 for (i = 0; i < run_cnt; i++)
1152 runs[i].file = NULL;
1153 runs[i].file_idx = input_runs[i].file_idx;
1154 runs[i].buffer = mrg->cases + mrg->case_cnt / run_cnt * i;
1155 runs[i].buffer_head = runs[i].buffer;
1156 runs[i].buffer_tail = runs[i].buffer;
1157 runs[i].buffer_cap = mrg->case_cnt / run_cnt;
1158 runs[i].unread_case_cnt = input_runs[i].case_cnt;
1161 /* Open input files. */
1162 for (i = 0; i < run_cnt; i++)
1164 runs[i].file = open_temp_file (mrg->xsrt, runs[i].file_idx, "rb");
1165 if (runs[i].file == NULL)
1169 /* Create output file and count cases to be output. */
1170 output_run->file_idx = mrg->xsrt->next_file_idx++;
1171 output_run->case_cnt = 0;
1172 for (i = 0; i < run_cnt; i++)
1173 output_run->case_cnt += input_runs[i].case_cnt;
1174 output_file = open_temp_file (mrg->xsrt, output_run->file_idx, "wb");
1175 if (output_file == NULL)
1178 /* Prime buffers. */
1179 for (i = 0; i < run_cnt; i++)
1180 if (!fill_run_buffer (mrg, runs + i))
1186 struct run *min_run;
1190 for (i = 1; i < run_cnt; i++)
1191 if (compare_record ((*runs[i].buffer_head)->data,
1192 (*min_run->buffer_head)->data,
1193 mrg->xsrt->scp, NULL) < 0)
1196 /* Write minimum to output file. */
1197 if (!write_temp_file (mrg->xsrt, min_run->file_idx, output_file,
1198 (*min_run->buffer_head)->data,
1199 mrg->xsrt->scp->case_size))
1202 /* Remove case from buffer. */
1203 if (++min_run->buffer_head >= min_run->buffer_tail)
1205 /* Buffer is empty. Fill from file. */
1206 if (!fill_run_buffer (mrg, min_run))
1209 /* If buffer is still empty, delete its run. */
1210 if (min_run->buffer_head >= min_run->buffer_tail)
1212 close_temp_file (mrg->xsrt, min_run->file_idx, min_run->file);
1213 remove_temp_file (mrg->xsrt, min_run->file_idx);
1214 *min_run = runs[--run_cnt];
1216 /* We could donate the now-unused buffer space to
1222 /* Close output file. */
1223 close_temp_file (mrg->xsrt, output_run->file_idx, output_file);
1228 /* Close and remove output file. */
1229 if (output_file != NULL)
1231 close_temp_file (mrg->xsrt, output_run->file_idx, output_file);
1232 remove_temp_file (mrg->xsrt, output_run->file_idx);
1235 /* Close and remove any remaining input runs. */
1236 for (i = 0; i < run_cnt; i++)
1238 close_temp_file (mrg->xsrt, runs[i].file_idx, runs[i].file);
1239 remove_temp_file (mrg->xsrt, runs[i].file_idx);
1245 /* Reads as many cases as possible into RUN's buffer.
1246 Reads nonzero unless a disk error occurs. */
1248 fill_run_buffer (struct merge_state *mrg, struct run *run)
1250 run->buffer_head = run->buffer_tail = run->buffer;
1251 while (run->unread_case_cnt > 0
1252 && run->buffer_tail < run->buffer + run->buffer_cap)
1254 if (!read_temp_file (mrg->xsrt, run->file_idx, run->file,
1255 (*run->buffer_tail)->data,
1256 mrg->xsrt->scp->case_size))
1259 run->unread_case_cnt--;
1266 static struct case_source *
1267 sort_sink_make_source (struct case_sink *sink)
1269 struct initial_run_state *irs = sink->aux;
1271 return create_case_source (&sort_source_class, default_dict,
1275 static const struct case_sink_class sort_sink_class =
1281 sort_sink_make_source,
1284 struct sort_source_aux
1286 struct sort_cases_pgm *scp;
1288 write_case_func *write_case;
1289 write_case_data wc_data;
1292 /* Passes C to the write_case function. */
1294 sort_source_read_helper (const struct ccase *src, void *aux_)
1296 struct sort_source_aux *aux = aux_;
1298 memcpy (aux->dst, src, aux->scp->case_size);
1299 return aux->write_case (aux->wc_data);
1302 /* Reads all the records from the source stream and passes them
1305 sort_source_read (struct case_source *source,
1307 write_case_func *write_case, write_case_data wc_data)
1309 struct sort_cases_pgm *scp = source->aux;
1310 struct sort_source_aux aux;
1314 aux.write_case = write_case;
1315 aux.wc_data = wc_data;
1317 read_sort_output (scp, sort_source_read_helper, &aux);
1320 static void read_internal_sort_output (struct internal_sort *isrt,
1321 read_sort_output_func *, void *aux);
1322 static void read_external_sort_output (struct external_sort *xsrt,
1323 read_sort_output_func *, void *aux);
1325 /* Reads all the records from the output stream and passes them to the
1326 function provided, which must have an interface identical to
1329 read_sort_output (struct sort_cases_pgm *scp,
1330 read_sort_output_func *output_func, void *aux)
1332 assert ((scp->isrt != NULL) + (scp->xsrt != NULL) <= 1);
1333 if (scp->isrt != NULL)
1334 read_internal_sort_output (scp->isrt, output_func, aux);
1335 else if (scp->xsrt != NULL)
1336 read_external_sort_output (scp->xsrt, output_func, aux);
1339 /* No results. Probably an external sort that failed. */
1344 read_internal_sort_output (struct internal_sort *isrt,
1345 read_sort_output_func *output_func,
1348 struct case_list **p;
1350 for (p = isrt->results; *p; p++)
1351 if (!output_func (&(*p)->c, aux))
1353 free (isrt->results);
1357 read_external_sort_output (struct external_sort *xsrt,
1358 read_sort_output_func *output_func, void *aux)
1365 assert (xsrt->run_cnt == 1);
1366 file_idx = xsrt->initial_runs[0].file_idx;
1368 file = open_temp_file (xsrt, file_idx, "rb");
1375 c = xmalloc (xsrt->scp->case_size);
1376 for (i = 0; i < xsrt->initial_runs[0].case_cnt; i++)
1378 if (!read_temp_file (xsrt, file_idx, file, c, xsrt->scp->case_size))
1384 if (!output_func (c, aux))
1391 sort_source_destroy (struct case_source *source)
1393 struct sort_cases_pgm *scp = source->aux;
1395 destroy_sort_cases_pgm (scp);
1398 const struct case_source_class sort_source_class =
1403 sort_source_destroy,