1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
26 #include "algorithm.h"
39 #include "workspace.h"
46 #include <sys/types.h>
53 #include "debug-print.h"
55 /* Other prototypes. */
56 static int compare_record (const union value *, const union value *,
57 const struct sort_cases_pgm *, int *idx_to_fv);
58 static int compare_cases (const struct ccase *, const struct ccase *, void *);
59 static int compare_case_dblptrs (const void *, const void *, void *);
60 static struct internal_sort *do_internal_sort (struct sort_cases_pgm *,
62 static void destroy_internal_sort (struct internal_sort *);
63 static struct external_sort *do_external_sort (struct sort_cases_pgm *,
65 static void destroy_external_sort (struct external_sort *);
66 struct sort_cases_pgm *parse_sort (void);
68 /* Performs the SORT CASES procedures. */
72 struct sort_cases_pgm *scp;
83 msg (SE, _("SORT CASES may not be used after TEMPORARY. "
84 "Temporary transformations will be made permanent."));
88 success = sort_cases (scp, 0);
89 destroy_sort_cases_pgm (scp);
91 return lex_end_of_command ();
96 /* Parses a list of sort keys and returns a struct sort_cases_pgm
97 based on it. Returns a null pointer on error. */
98 struct sort_cases_pgm *
101 struct sort_cases_pgm *scp;
103 scp = xmalloc (sizeof *scp);
113 int prev_var_cnt = scp->var_cnt;
114 enum sort_direction direction = SRT_ASCEND;
117 if (!parse_variables (default_dict, &scp->vars, &scp->var_cnt,
118 PV_NO_DUPLICATE | PV_APPEND | PV_NO_SCRATCH))
121 /* Sort direction. */
124 if (lex_match_id ("D") || lex_match_id ("DOWN"))
125 direction = SRT_DESCEND;
126 else if (!lex_match_id ("A") && !lex_match_id ("UP"))
128 msg (SE, _("`A' or `D' expected inside parentheses."));
131 if (!lex_match (')'))
133 msg (SE, _("`)' expected."));
137 scp->dirs = xrealloc (scp->dirs, sizeof *scp->dirs * scp->var_cnt);
138 for (; prev_var_cnt < scp->var_cnt; prev_var_cnt++)
139 scp->dirs[prev_var_cnt] = direction;
141 while (token != '.' && token != '/');
146 destroy_sort_cases_pgm (scp);
150 /* Destroys a SORT CASES program. */
152 destroy_sort_cases_pgm (struct sort_cases_pgm *scp)
156 assert (scp->ref_cnt > 0);
157 if (--scp->ref_cnt > 0)
162 destroy_internal_sort (scp->isrt);
163 destroy_external_sort (scp->xsrt);
168 /* Sorts the active file based on the key variables specified in
169 global variables vars and var_cnt.
171 If SEPARATE is zero, then output goes to the active file. The
172 output cases can be read through the usual VFM interfaces.
174 If SEPARATE is nonzero, then output goes to a separate file.
175 The output cases can be read with a call to
178 The caller is responsible for freeing SCP. */
180 sort_cases (struct sort_cases_pgm *scp, int separate)
183 = sizeof (union value) * dict_get_compacted_value_cnt (default_dict);
185 /* Not sure this is necessary but it's good to be safe. */
186 if (separate && case_source_is_class (vfm_source, &sort_source_class))
187 procedure (NULL, NULL);
189 /* SORT CASES cancels PROCESS IF. */
190 expr_free (process_if_expr);
191 process_if_expr = NULL;
193 /* Try an internal sort first. */
194 scp->isrt = do_internal_sort (scp, separate);
195 if (scp->isrt != NULL)
198 /* Fall back to an external sort. */
199 scp->xsrt = do_external_sort (scp, separate);
200 if (scp->xsrt != NULL)
203 destroy_sort_cases_pgm (scp);
207 /* Results of an internal sort.
208 Used only for sorting to a separate file. */
211 const struct ccase **cases;
215 /* If the data is in memory, do an internal sort. Return
217 static struct internal_sort *
218 do_internal_sort (struct sort_cases_pgm *scp, int separate)
220 struct internal_sort *isrt;
222 isrt = xmalloc (sizeof *isrt);
226 if (case_source_is_class (vfm_source, &storage_source_class))
228 struct casefile *casefile = storage_source_get_casefile (vfm_source);
232 if (!casefile_sort (casefile, compare_cases, scp))
237 /* FIXME FIXME FIXME.
238 This is crap because the casefile could get flushed
239 to disk between the time we sort it and we use it
240 later, causing invalid pointer accesses.
241 The right solution is probably to extend casefiles
242 to support duplication. */
243 struct casereader *reader;
246 if (!casefile_in_core (casefile))
249 isrt->case_cnt = casefile_get_case_cnt (casefile);
250 isrt->cases = workspace_malloc (sizeof *isrt->cases
252 if (isrt->cases == NULL)
255 reader = casefile_get_reader (casefile);
256 for (case_idx = 0; case_idx < isrt->case_cnt; case_idx++)
258 casereader_read (reader, &isrt->cases[case_idx]);
259 assert (isrt->cases[case_idx] != NULL);
261 casereader_destroy (reader);
263 sort (isrt->cases, isrt->case_cnt, casefile_get_case_size (casefile),
264 compare_case_dblptrs, scp);
275 /* Destroys an internal sort result. */
277 destroy_internal_sort (struct internal_sort *isrt)
281 workspace_free (isrt->cases, sizeof *isrt->cases * isrt->case_cnt);
286 /* Compares the variables specified by SCP between the cases at A
287 and B, and returns a strcmp()-type result. */
289 compare_cases (const struct ccase *a, const struct ccase *b,
292 struct sort_cases_pgm *scp = scp_;
294 return compare_record (a->data, b->data, scp, NULL);
297 /* Compares the variables specified by SCP between the cases at A
298 and B, and returns a strcmp()-type result. */
300 compare_case_dblptrs (const void *a_, const void *b_, void *scp_)
302 struct sort_cases_pgm *scp = scp_;
303 struct ccase *const *pa = a_;
304 struct ccase *const *pb = b_;
305 struct ccase *a = *pa;
306 struct ccase *b = *pb;
308 return compare_record (a->data, b->data, scp, NULL);
313 /* Maximum order of merge. If you increase this, then you should
314 use a heap for comparing cases during merge. */
315 #define MAX_MERGE_ORDER 7
317 /* Minimum total number of records for buffers. */
318 #define MIN_BUFFER_TOTAL_SIZE_RECS 64
320 /* Minimum single input buffer size, in bytes and records. */
321 #define MIN_BUFFER_SIZE_BYTES 4096
322 #define MIN_BUFFER_SIZE_RECS 16
324 #if MIN_BUFFER_SIZE_RECS * 2 + 16 > MIN_BUFFER_TOTAL_SIZE_RECS
325 #error MIN_BUFFER_SIZE_RECS and MIN_BUFFER_TOTAL_SIZE_RECS do not make sense.
328 /* An initial run and its length. */
331 int file_idx; /* File index. */
332 size_t case_cnt; /* Number of cases. */
335 /* Sorts initial runs A and B in decending order by length. */
337 compare_initial_runs (const void *a_, const void *b_, void *aux UNUSED)
339 const struct initial_run *a = a_;
340 const struct initial_run *b = b_;
342 return a->case_cnt > b->case_cnt ? -1 : a->case_cnt <b->case_cnt;
345 /* Results of an external sort. */
348 struct sort_cases_pgm *scp; /* SORT CASES info. */
349 struct initial_run *initial_runs; /* Array of initial runs. */
350 size_t run_cnt, run_cap; /* Number of runs, allocated capacity. */
351 char *temp_dir; /* Temporary file directory name. */
352 char *temp_name; /* Name of a temporary file. */
353 int next_file_idx; /* Lowest unused file index. */
356 /* Prototypes for helper functions. */
357 static void sort_sink_write (struct case_sink *, const struct ccase *);
358 static int write_initial_runs (struct external_sort *, int separate);
359 static int init_external_sort (struct external_sort *);
360 static int merge (struct external_sort *);
361 static void rmdir_temp_dir (struct external_sort *);
362 static void remove_temp_file (struct external_sort *xsrt, int file_idx);
364 /* Performs an external sort of the active file according to the
365 specification in SCP. Forms initial runs using a heap as a
366 reservoir. Determines the optimum merge pattern via Huffman's
367 method (see Knuth vol. 3, 2nd edition, p. 365-366), and merges
368 according to that pattern. */
369 static struct external_sort *
370 do_external_sort (struct sort_cases_pgm *scp, int separate)
372 struct external_sort *xsrt;
375 if (vfm_source != NULL
376 && case_source_is_class (vfm_source, &storage_source_class))
377 casefile_to_disk (storage_source_get_casefile (vfm_source));
379 xsrt = xmalloc (sizeof *xsrt);
381 if (!init_external_sort (xsrt))
383 if (!write_initial_runs (xsrt, separate))
393 /* Don't destroy anything because we'll need it for reading
399 destroy_external_sort (xsrt);
406 destroy_external_sort (struct external_sort *xsrt)
412 for (i = 0; i < xsrt->run_cnt; i++)
413 remove_temp_file (xsrt, xsrt->initial_runs[i].file_idx);
414 rmdir_temp_dir (xsrt);
415 free (xsrt->temp_dir);
416 free (xsrt->temp_name);
417 free (xsrt->initial_runs);
423 /* Creates and returns the name of a temporary directory. */
427 const char *parent_dir;
430 if (getenv ("TMPDIR") != NULL)
431 parent_dir = getenv ("TMPDIR");
433 parent_dir = P_tmpdir;
435 temp_dir = xmalloc (strlen (parent_dir) + 32);
436 sprintf (temp_dir, "%s%cpsppXXXXXX", parent_dir, DIR_SEPARATOR);
437 if (mkdtemp (temp_dir) == NULL)
439 msg (SE, _("%s: Creating temporary directory: %s."),
440 temp_dir, strerror (errno));
447 #else /* !HAVE_MKDTEMP */
448 /* Creates directory DIR. */
450 do_mkdir (const char *dir)
453 return mkdir (dir, S_IRWXU);
459 /* Creates and returns the name of a temporary directory. */
465 for (i = 0; i < 100; i++)
467 char temp_dir[L_tmpnam + 1];
468 if (tmpnam (temp_dir) == NULL)
470 msg (SE, _("Generating temporary directory name failed: %s."),
474 else if (do_mkdir (temp_dir) == 0)
475 return xstrdup (temp_dir);
478 msg (SE, _("Creating temporary directory failed: %s."), strerror (errno));
481 #endif /* !HAVE_MKDTEMP */
483 /* Sets up to open temporary files. */
485 init_external_sort (struct external_sort *xsrt)
488 xsrt->temp_dir = NULL;
489 xsrt->next_file_idx = 0;
494 xsrt->initial_runs = xmalloc (sizeof *xsrt->initial_runs * xsrt->run_cap);
496 /* Temporary directory. */
497 xsrt->temp_dir = make_temp_dir ();
498 xsrt->temp_name = NULL;
499 if (xsrt->temp_dir == NULL)
501 xsrt->temp_name = xmalloc (strlen (xsrt->temp_dir) + 64);
506 /* Returns nonzero if we should return an error even though the
507 operation succeeded. Useful for testing. */
509 simulate_error (void)
511 static int op_err_cnt = -1;
514 if (op_err_cnt == -1 || op_cnt++ < op_err_cnt)
523 /* Removes the directory created for temporary files, if one
526 rmdir_temp_dir (struct external_sort *xsrt)
528 if (xsrt->temp_dir != NULL && rmdir (xsrt->temp_dir) == -1)
530 msg (SW, _("%s: Error removing directory for temporary files: %s."),
531 xsrt->temp_dir, strerror (errno));
532 xsrt->temp_dir = NULL;
536 /* Returns the name of temporary file number FILE_IDX for XSRT.
537 The name is written into a static buffer, so be careful. */
539 get_temp_file_name (struct external_sort *xsrt, int file_idx)
541 assert (xsrt->temp_dir != NULL);
542 sprintf (xsrt->temp_name, "%s%c%04d",
543 xsrt->temp_dir, DIR_SEPARATOR, file_idx);
544 return xsrt->temp_name;
547 /* Opens temporary file numbered FILE_IDX for XSRT with mode MODE
548 and returns the FILE *. */
550 open_temp_file (struct external_sort *xsrt, int file_idx, const char *mode)
555 temp_file = get_temp_file_name (xsrt, file_idx);
557 file = fopen (temp_file, mode);
558 if (simulate_error () || file == NULL)
559 msg (SE, _("%s: Error opening temporary file for %s: %s."),
560 temp_file, mode[0] == 'r' ? "reading" : "writing",
566 /* Closes FILE, which is the temporary file numbered FILE_IDX
567 under XSRT. Returns nonzero only if successful. */
569 close_temp_file (struct external_sort *xsrt, int file_idx, FILE *file)
573 char *temp_file = get_temp_file_name (xsrt, file_idx);
574 if (simulate_error () || fclose (file) == EOF)
576 msg (SE, _("%s: Error closing temporary file: %s."),
577 temp_file, strerror (errno));
584 /* Delete temporary file numbered FILE_IDX for XSRT. */
586 remove_temp_file (struct external_sort *xsrt, int file_idx)
590 char *temp_file = get_temp_file_name (xsrt, file_idx);
591 if (simulate_error () || remove (temp_file) != 0)
592 msg (SW, _("%s: Error removing temporary file: %s."),
593 temp_file, strerror (errno));
597 /* Writes SIZE bytes from buffer DATA into FILE, which is
598 temporary file numbered FILE_IDX from XSRT. */
600 write_temp_file (struct external_sort *xsrt, int file_idx,
601 FILE *file, const void *data, size_t size)
603 if (!simulate_error () && fwrite (data, size, 1, file) == 1)
607 char *temp_file = get_temp_file_name (xsrt, file_idx);
608 msg (SE, _("%s: Error writing temporary file: %s."),
609 temp_file, strerror (errno));
614 /* Reads SIZE bytes into buffer DATA into FILE, which is
615 temporary file numbered FILE_IDX from XSRT. */
617 read_temp_file (struct external_sort *xsrt, int file_idx,
618 FILE *file, void *data, size_t size)
620 if (!simulate_error () && fread (data, size, 1, file) == 1)
624 char *temp_file = get_temp_file_name (xsrt, file_idx);
626 msg (SE, _("%s: Error reading temporary file: %s."),
627 temp_file, strerror (errno));
629 msg (SE, _("%s: Unexpected end of temporary file."),
635 /* Replacement selection. */
637 /* Pairs a record with a run number. */
640 int run; /* Run number of case. */
641 struct case_list *record; /* Case data. */
644 /* Represents a set of initial runs during an external sort. */
645 struct initial_run_state
647 struct external_sort *xsrt;
649 int *idx_to_fv; /* Translation table copied from sink. */
652 struct record_run *records; /* Records arranged as a heap. */
653 size_t record_cnt; /* Current number of records. */
654 size_t record_cap; /* Capacity for records. */
655 struct case_list *free_list;/* Cases not in heap. */
657 /* Run currently being output. */
658 int file_idx; /* Temporary file number. */
659 size_t case_cnt; /* Number of cases so far. */
660 FILE *output_file; /* Output file. */
661 struct case_list *last_output;/* Record last output. */
663 int okay; /* Zero if an error has been encountered. */
666 static const struct case_sink_class sort_sink_class;
668 static void destroy_initial_run_state (struct initial_run_state *irs);
669 static int allocate_cases (struct initial_run_state *);
670 static struct case_list *grab_case (struct initial_run_state *);
671 static void release_case (struct initial_run_state *, struct case_list *);
672 static void output_record (struct initial_run_state *irs);
673 static void start_run (struct initial_run_state *irs);
674 static void end_run (struct initial_run_state *irs);
675 static int compare_record_run (const struct record_run *,
676 const struct record_run *,
677 struct initial_run_state *);
678 static int compare_record_run_minheap (const void *, const void *, void *);
680 /* Writes initial runs for XSRT, sending them to a separate file
681 if SEPARATE is nonzero. */
683 write_initial_runs (struct external_sort *xsrt, int separate)
685 struct initial_run_state *irs;
688 /* Allocate memory for cases. */
689 irs = xmalloc (sizeof *irs);
692 irs->record_cnt = irs->record_cap = 0;
693 irs->free_list = NULL;
694 irs->output_file = NULL;
695 irs->last_output = NULL;
699 if (!allocate_cases (irs))
702 /* Create case sink. */
705 if (vfm_sink != NULL && vfm_sink->class->destroy != NULL)
706 vfm_sink->class->destroy (vfm_sink);
707 vfm_sink = create_case_sink (&sort_sink_class, default_dict, irs);
708 xsrt->scp->ref_cnt++;
711 /* Create initial runs. */
713 procedure (NULL, NULL);
714 irs->idx_to_fv = NULL;
715 while (irs->record_cnt > 0 && irs->okay)
722 destroy_initial_run_state (irs);
727 /* Add a single case to an initial run. */
729 sort_sink_write (struct case_sink *sink, const struct ccase *c)
731 struct initial_run_state *irs = sink->aux;
732 struct record_run *new_record_run;
737 irs->idx_to_fv = sink->idx_to_fv;
739 /* Compose record_run for this run and add to heap. */
740 assert (irs->record_cnt < irs->record_cap);
741 new_record_run = irs->records + irs->record_cnt++;
742 new_record_run->record = grab_case (irs);
743 memcpy (new_record_run->record->c.data, c->data, irs->xsrt->scp->case_size);
744 new_record_run->run = irs->file_idx;
745 if (irs->last_output != NULL
746 && compare_record (c->data, irs->last_output->c.data,
747 irs->xsrt->scp, sink->idx_to_fv) < 0)
748 new_record_run->run = irs->xsrt->next_file_idx;
749 push_heap (irs->records, irs->record_cnt, sizeof *irs->records,
750 compare_record_run_minheap, irs);
752 /* Output a record if the reservoir is full. */
753 if (irs->record_cnt == irs->record_cap && irs->okay)
757 /* Destroys the initial run state represented by IRS. */
759 destroy_initial_run_state (struct initial_run_state *irs)
761 struct case_list *iter, *next;
767 /* Release cases to free list. */
768 for (i = 0; i < irs->record_cnt; i++)
769 release_case (irs, irs->records[i].record);
770 if (irs->last_output != NULL)
771 release_case (irs, irs->last_output);
773 /* Free cases in free list. */
774 for (iter = irs->free_list; iter != NULL; iter = next)
781 if (irs->output_file != NULL)
782 close_temp_file (irs->xsrt, irs->file_idx, irs->output_file);
787 /* Allocates room for lots of cases as a buffer. */
789 allocate_cases (struct initial_run_state *irs)
791 int approx_case_cost; /* Approximate memory cost of one case in bytes. */
792 int max_cases; /* Maximum number of cases to allocate. */
795 /* Allocate as many cases as we can within the workspace
797 approx_case_cost = (sizeof *irs->records
798 + sizeof *irs->free_list
799 + irs->xsrt->scp->case_size
800 + 4 * sizeof (void *));
801 max_cases = get_max_workspace() / approx_case_cost;
802 irs->records = malloc (sizeof *irs->records * max_cases);
803 for (i = 0; i < max_cases; i++)
806 c = malloc (sizeof *c
807 + irs->xsrt->scp->case_size
808 - sizeof (union value));
814 release_case (irs, c);
817 /* irs->records gets all but one of the allocated cases.
818 The extra is used for last_output. */
819 irs->record_cap = max_cases - 1;
821 /* Fail if we didn't allocate an acceptable number of cases. */
822 if (irs->records == NULL || max_cases < MIN_BUFFER_TOTAL_SIZE_RECS)
824 msg (SE, _("Out of memory. Could not allocate room for minimum of %d "
825 "cases of %d bytes each. (PSPP workspace is currently "
826 "restricted to a maximum of %d KB.)"),
827 MIN_BUFFER_TOTAL_SIZE_RECS, approx_case_cost, get_max_workspace() / 1024);
833 /* Compares the VAR_CNT variables in VARS[] between the `value's at
834 A and B, and returns a strcmp()-type result. */
836 compare_record (const union value *a, const union value *b,
837 const struct sort_cases_pgm *scp,
845 for (i = 0; i < scp->var_cnt; i++)
847 struct variable *v = scp->vars[i];
851 if (idx_to_fv != NULL)
852 fv = idx_to_fv[v->index];
856 if (v->type == NUMERIC)
861 result = af < bf ? -1 : af > bf;
864 result = memcmp (a[fv].s, b[fv].s, v->width);
868 if (scp->dirs[i] == SRT_DESCEND)
877 /* Compares record-run tuples A and B on run number first, then
878 on the current record according to SCP. */
880 compare_record_run (const struct record_run *a,
881 const struct record_run *b,
882 struct initial_run_state *irs)
884 if (a->run != b->run)
885 return a->run > b->run ? 1 : -1;
887 return compare_record (a->record->c.data, b->record->c.data,
888 irs->xsrt->scp, irs->idx_to_fv);
891 /* Compares record-run tuples A and B on run number first, then
892 on the current record according to SCP, but in descending
895 compare_record_run_minheap (const void *a, const void *b, void *irs)
897 return -compare_record_run (a, b, irs);
900 /* Begins a new initial run, specifically its output file. */
902 start_run (struct initial_run_state *irs)
904 irs->file_idx = irs->xsrt->next_file_idx++;
906 irs->output_file = open_temp_file (irs->xsrt, irs->file_idx, "wb");
907 if (irs->output_file == NULL)
909 if (irs->last_output != NULL)
911 release_case (irs, irs->last_output);
912 irs->last_output = NULL;
916 /* Ends the current initial run. */
918 end_run (struct initial_run_state *irs)
920 struct external_sort *xsrt = irs->xsrt;
922 /* Record initial run. */
923 if (xsrt->run_cnt >= xsrt->run_cap)
927 = xrealloc (xsrt->initial_runs,
928 sizeof *xsrt->initial_runs * xsrt->run_cap);
930 xsrt->initial_runs[xsrt->run_cnt].file_idx = irs->file_idx;
931 xsrt->initial_runs[xsrt->run_cnt].case_cnt = irs->case_cnt;
934 /* Close file handle. */
935 if (irs->output_file != NULL
936 && !close_temp_file (irs->xsrt, irs->file_idx, irs->output_file))
938 irs->output_file = NULL;
941 /* Writes a record to the current initial run. */
943 output_record (struct initial_run_state *irs)
945 struct record_run *record_run;
947 /* Extract minimum case from heap. */
948 assert (irs->record_cnt > 0);
949 pop_heap (irs->records, irs->record_cnt--, sizeof *irs->records,
950 compare_record_run_minheap, irs);
951 record_run = irs->records + irs->record_cnt;
953 /* Bail if an error has occurred. */
957 /* Start new run if necessary. */
958 assert (record_run->run == irs->file_idx
959 || record_run->run == irs->xsrt->next_file_idx);
960 if (record_run->run != irs->file_idx)
965 assert (record_run->run == irs->file_idx);
969 if (irs->output_file != NULL
970 && !write_temp_file (irs->xsrt, irs->file_idx, irs->output_file,
971 &record_run->record->c, irs->xsrt->scp->case_size))
974 /* This record becomes last_output. */
975 if (irs->last_output != NULL)
976 release_case (irs, irs->last_output);
977 irs->last_output = record_run->record;
980 /* Gets a case from the free list in IRS. It is an error to call
981 this function if the free list is empty. */
982 static struct case_list *
983 grab_case (struct initial_run_state *irs)
987 assert (irs != NULL);
988 assert (irs->free_list != NULL);
991 irs->free_list = c->next;
995 /* Returns C to the free list in IRS. */
997 release_case (struct initial_run_state *irs, struct case_list *c)
999 assert (irs != NULL);
1002 c->next = irs->free_list;
1008 /* State of merging initial runs. */
1011 struct external_sort *xsrt; /* External sort state. */
1012 struct ccase **cases; /* Buffers. */
1013 size_t case_cnt; /* Number of buffers. */
1017 static int merge_once (struct merge_state *,
1018 const struct initial_run[], size_t,
1019 struct initial_run *);
1020 static int fill_run_buffer (struct merge_state *, struct run *);
1021 static int mod (int, int);
1023 /* Performs a series of P-way merges of initial runs
1026 merge (struct external_sort *xsrt)
1028 struct merge_state mrg; /* State of merge. */
1029 size_t approx_case_cost; /* Approximate memory cost of one case. */
1030 int max_order; /* Maximum order of merge. */
1031 size_t dummy_run_cnt; /* Number of dummy runs to insert. */
1037 /* Allocate as many cases as possible into cases. */
1038 approx_case_cost = (sizeof *mrg.cases
1039 + xsrt->scp->case_size + 4 * sizeof (void *));
1040 mrg.case_cnt = get_max_workspace() / approx_case_cost;
1041 mrg.cases = malloc (sizeof *mrg.cases * mrg.case_cnt);
1042 if (mrg.cases == NULL)
1044 for (i = 0; i < mrg.case_cnt; i++)
1046 mrg.cases[i] = malloc (xsrt->scp->case_size);
1047 if (mrg.cases[i] == NULL)
1053 if (mrg.case_cnt < MIN_BUFFER_TOTAL_SIZE_RECS)
1055 msg (SE, _("Out of memory. Could not allocate room for minimum of %d "
1056 "cases of %d bytes each. (PSPP workspace is currently "
1057 "restricted to a maximum of %d KB.)"),
1058 MIN_BUFFER_TOTAL_SIZE_RECS, approx_case_cost, get_max_workspace() / 1024);
1062 /* Determine maximum order of merge. */
1063 max_order = MAX_MERGE_ORDER;
1064 if (mrg.case_cnt / max_order < MIN_BUFFER_SIZE_RECS)
1065 max_order = mrg.case_cnt / MIN_BUFFER_SIZE_RECS;
1066 else if (mrg.case_cnt / max_order * xsrt->scp->case_size
1067 < MIN_BUFFER_SIZE_BYTES)
1068 max_order = mrg.case_cnt / (MIN_BUFFER_SIZE_BYTES / xsrt->scp->case_size);
1071 if (max_order > xsrt->run_cnt)
1072 max_order = xsrt->run_cnt;
1074 /* Repeatedly merge the P shortest existing runs until only one run
1076 make_heap (xsrt->initial_runs, xsrt->run_cnt, sizeof *xsrt->initial_runs,
1077 compare_initial_runs, NULL);
1078 dummy_run_cnt = mod (1 - (int) xsrt->run_cnt, max_order - 1);
1079 assert (max_order == 1
1080 || (xsrt->run_cnt + dummy_run_cnt) % (max_order - 1) == 1);
1081 while (xsrt->run_cnt > 1)
1083 struct initial_run output_run;
1087 /* Choose order of merge (max_order after first merge). */
1088 order = max_order - dummy_run_cnt;
1091 /* Choose runs to merge. */
1092 assert (xsrt->run_cnt >= order);
1093 for (i = 0; i < order; i++)
1094 pop_heap (xsrt->initial_runs, xsrt->run_cnt--,
1095 sizeof *xsrt->initial_runs,
1096 compare_initial_runs, NULL);
1099 if (!merge_once (&mrg, xsrt->initial_runs + xsrt->run_cnt, order,
1103 /* Add output run to heap. */
1104 xsrt->initial_runs[xsrt->run_cnt++] = output_run;
1105 push_heap (xsrt->initial_runs, xsrt->run_cnt, sizeof *xsrt->initial_runs,
1106 compare_initial_runs, NULL);
1109 /* Exactly one run is left, which contains the entire sorted
1110 file. We could use it to find a total case count. */
1111 assert (xsrt->run_cnt == 1);
1116 for (i = 0; i < mrg.case_cnt; i++)
1117 free (mrg.cases[i]);
1123 /* Modulo function as defined by Knuth. */
1131 else if (x > 0 && y > 0)
1133 else if (x < 0 && y > 0)
1134 return y - (-x) % y;
1140 /* A run of data for use in merging. */
1143 FILE *file; /* File that contains run. */
1144 int file_idx; /* Index of file that contains run. */
1145 struct ccase **buffer; /* Case buffer. */
1146 struct ccase **buffer_head; /* First unconsumed case in buffer. */
1147 struct ccase **buffer_tail; /* One past last unconsumed case in buffer. */
1148 size_t buffer_cap; /* Number of cases buffer can hold. */
1149 size_t unread_case_cnt; /* Number of cases not yet read. */
1152 /* Merges the RUN_CNT initial runs specified in INPUT_RUNS into a
1153 new run. Returns nonzero only if successful. Adds an entry
1154 to MRG->xsrt->runs for the output file if and only if the
1155 output file is actually created. Always deletes all the input
1158 merge_once (struct merge_state *mrg,
1159 const struct initial_run input_runs[],
1161 struct initial_run *output_run)
1163 struct run runs[MAX_MERGE_ORDER];
1164 FILE *output_file = NULL;
1168 /* Initialize runs[]. */
1169 for (i = 0; i < run_cnt; i++)
1171 runs[i].file = NULL;
1172 runs[i].file_idx = input_runs[i].file_idx;
1173 runs[i].buffer = mrg->cases + mrg->case_cnt / run_cnt * i;
1174 runs[i].buffer_head = runs[i].buffer;
1175 runs[i].buffer_tail = runs[i].buffer;
1176 runs[i].buffer_cap = mrg->case_cnt / run_cnt;
1177 runs[i].unread_case_cnt = input_runs[i].case_cnt;
1180 /* Open input files. */
1181 for (i = 0; i < run_cnt; i++)
1183 runs[i].file = open_temp_file (mrg->xsrt, runs[i].file_idx, "rb");
1184 if (runs[i].file == NULL)
1188 /* Create output file and count cases to be output. */
1189 output_run->file_idx = mrg->xsrt->next_file_idx++;
1190 output_run->case_cnt = 0;
1191 for (i = 0; i < run_cnt; i++)
1192 output_run->case_cnt += input_runs[i].case_cnt;
1193 output_file = open_temp_file (mrg->xsrt, output_run->file_idx, "wb");
1194 if (output_file == NULL)
1197 /* Prime buffers. */
1198 for (i = 0; i < run_cnt; i++)
1199 if (!fill_run_buffer (mrg, runs + i))
1205 struct run *min_run;
1209 for (i = 1; i < run_cnt; i++)
1210 if (compare_record ((*runs[i].buffer_head)->data,
1211 (*min_run->buffer_head)->data,
1212 mrg->xsrt->scp, NULL) < 0)
1215 /* Write minimum to output file. */
1216 if (!write_temp_file (mrg->xsrt, min_run->file_idx, output_file,
1217 (*min_run->buffer_head)->data,
1218 mrg->xsrt->scp->case_size))
1221 /* Remove case from buffer. */
1222 if (++min_run->buffer_head >= min_run->buffer_tail)
1224 /* Buffer is empty. Fill from file. */
1225 if (!fill_run_buffer (mrg, min_run))
1228 /* If buffer is still empty, delete its run. */
1229 if (min_run->buffer_head >= min_run->buffer_tail)
1231 close_temp_file (mrg->xsrt, min_run->file_idx, min_run->file);
1232 remove_temp_file (mrg->xsrt, min_run->file_idx);
1233 *min_run = runs[--run_cnt];
1235 /* We could donate the now-unused buffer space to
1241 /* Close output file. */
1242 close_temp_file (mrg->xsrt, output_run->file_idx, output_file);
1247 /* Close and remove output file. */
1248 if (output_file != NULL)
1250 close_temp_file (mrg->xsrt, output_run->file_idx, output_file);
1251 remove_temp_file (mrg->xsrt, output_run->file_idx);
1254 /* Close and remove any remaining input runs. */
1255 for (i = 0; i < run_cnt; i++)
1257 close_temp_file (mrg->xsrt, runs[i].file_idx, runs[i].file);
1258 remove_temp_file (mrg->xsrt, runs[i].file_idx);
1264 /* Reads as many cases as possible into RUN's buffer.
1265 Reads nonzero unless a disk error occurs. */
1267 fill_run_buffer (struct merge_state *mrg, struct run *run)
1269 run->buffer_head = run->buffer_tail = run->buffer;
1270 while (run->unread_case_cnt > 0
1271 && run->buffer_tail < run->buffer + run->buffer_cap)
1273 if (!read_temp_file (mrg->xsrt, run->file_idx, run->file,
1274 (*run->buffer_tail)->data,
1275 mrg->xsrt->scp->case_size))
1278 run->unread_case_cnt--;
1285 static struct case_source *
1286 sort_sink_make_source (struct case_sink *sink)
1288 struct initial_run_state *irs = sink->aux;
1290 return create_case_source (&sort_source_class, default_dict,
1294 static const struct case_sink_class sort_sink_class =
1300 sort_sink_make_source,
1303 struct sort_source_aux
1305 struct sort_cases_pgm *scp;
1307 write_case_func *write_case;
1308 write_case_data wc_data;
1311 /* Passes C to the write_case function. */
1313 sort_source_read_helper (const struct ccase *src, void *aux_)
1315 struct sort_source_aux *aux = aux_;
1317 memcpy (aux->dst, src, aux->scp->case_size);
1318 return aux->write_case (aux->wc_data);
1321 /* Reads all the records from the source stream and passes them
1324 sort_source_read (struct case_source *source,
1326 write_case_func *write_case, write_case_data wc_data)
1328 struct sort_cases_pgm *scp = source->aux;
1329 struct sort_source_aux aux;
1333 aux.write_case = write_case;
1334 aux.wc_data = wc_data;
1336 read_sort_output (scp, sort_source_read_helper, &aux);
1339 static void read_internal_sort_output (struct internal_sort *isrt,
1340 read_sort_output_func *, void *aux);
1341 static void read_external_sort_output (struct external_sort *xsrt,
1342 read_sort_output_func *, void *aux);
1344 /* Reads all the records from the output stream and passes them to the
1345 function provided, which must have an interface identical to
1348 read_sort_output (struct sort_cases_pgm *scp,
1349 read_sort_output_func *output_func, void *aux)
1351 assert ((scp->isrt != NULL) + (scp->xsrt != NULL) <= 1);
1352 if (scp->isrt != NULL)
1353 read_internal_sort_output (scp->isrt, output_func, aux);
1354 else if (scp->xsrt != NULL)
1355 read_external_sort_output (scp->xsrt, output_func, aux);
1358 /* No results. Probably an external sort that failed. */
1363 read_internal_sort_output (struct internal_sort *isrt,
1364 read_sort_output_func *output_func,
1369 for (case_idx = 0; case_idx < isrt->case_cnt; case_idx++)
1370 if (!output_func (isrt->cases[case_idx], aux))
1375 read_external_sort_output (struct external_sort *xsrt,
1376 read_sort_output_func *output_func, void *aux)
1383 assert (xsrt->run_cnt == 1);
1384 file_idx = xsrt->initial_runs[0].file_idx;
1386 file = open_temp_file (xsrt, file_idx, "rb");
1393 c = xmalloc (xsrt->scp->case_size);
1394 for (i = 0; i < xsrt->initial_runs[0].case_cnt; i++)
1396 if (!read_temp_file (xsrt, file_idx, file, c, xsrt->scp->case_size))
1402 if (!output_func (c, aux))
1406 close_temp_file (xsrt, file_idx, file);
1410 sort_source_destroy (struct case_source *source)
1412 struct sort_cases_pgm *scp = source->aux;
1414 destroy_sort_cases_pgm (scp);
1417 const struct case_source_class sort_source_class =
1422 sort_source_destroy,