1 /* PSPP - a program for statistical analysis.
2 Copyright (C) 1997-9, 2000, 2006, 2008, 2009, 2010, 2011, 2012, 2014 Free Software Foundation, Inc.
4 This program is free software: you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation, either version 3 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>. */
19 #include "language/commands/aggregate.h"
23 #include "data/any-writer.h"
24 #include "data/case.h"
25 #include "data/casegrouper.h"
26 #include "data/casereader.h"
27 #include "data/casewriter.h"
28 #include "data/dataset.h"
29 #include "data/dictionary.h"
30 #include "data/file-handle-def.h"
31 #include "data/format.h"
32 #include "data/settings.h"
33 #include "data/subcase.h"
34 #include "data/sys-file-writer.h"
35 #include "data/variable.h"
36 #include "language/command.h"
37 #include "language/commands/file-handle.h"
38 #include "language/lexer/lexer.h"
39 #include "language/lexer/variable-parser.h"
40 #include "language/commands/sort-criteria.h"
41 #include "libpspp/assertion.h"
42 #include "libpspp/i18n.h"
43 #include "libpspp/message.h"
44 #include "libpspp/misc.h"
45 #include "libpspp/pool.h"
46 #include "libpspp/str.h"
47 #include "math/moments.h"
48 #include "math/percentiles.h"
49 #include "math/sort.h"
50 #include "math/statistic.h"
52 #include "gl/c-strcase.h"
53 #include "gl/minmax.h"
54 #include "gl/xalloc.h"
57 #define _(msgid) gettext (msgid)
58 #define N_(msgid) msgid
60 /* Argument for AGGREGATE function.
62 Only one of the members is used, so this could be a union, but it's simpler
66 double f; /* Numeric. */
67 struct substring s; /* String. */
70 /* Specifies how to make an aggregate variable. */
73 /* Collected during parsing. */
74 const struct variable *src; /* Source variable. */
75 struct variable *dest; /* Target variable. */
76 enum agr_function function; /* Function. */
77 enum mv_class exclude; /* Classes of missing values to exclude. */
78 struct agr_argument arg[2]; /* Arguments. */
80 /* Accumulated during AGGREGATE execution. */
82 double W; /* Total non-missing weight. */
86 struct moments1 *moments;
88 struct dictionary *dict;
89 struct variable *subject;
90 struct variable *weight;
91 struct casewriter *writer;
94 /* Attributes of aggregation functions. */
95 const struct agr_func agr_func_tab[] =
97 #define AGRF(ENUM, NAME, DESCRIPTION, SRC_VARS, N_ARGS, ALPHA_TYPE, W, D) \
98 [ENUM] = { NAME, DESCRIPTION, SRC_VARS, N_ARGS, ALPHA_TYPE, \
99 { .type = (W) > 0 ? FMT_F : -1, .w = W, .d = D } },
102 {NULL, NULL, AGR_SV_NO, 0, -1, {-1, -1, -1}},
105 /* Missing value types. */
106 enum missing_treatment
108 ITEMWISE, /* Missing values item by item. */
109 COLUMNWISE /* Missing values column by column. */
112 /* An entire AGGREGATE procedure. */
115 /* Break variables. */
116 struct subcase sort; /* Sort criteria (break variables). */
117 const struct variable **break_vars; /* Break variables. */
118 size_t break_n_vars; /* Number of break variables. */
120 enum missing_treatment missing; /* How to treat missing values. */
121 struct agr_var *agr_vars; /* Aggregate variables. */
123 struct dictionary *dict; /* Aggregate dictionary. */
124 const struct dictionary *src_dict; /* Dict of the source */
125 int n_cases; /* Counts aggregated cases. */
127 bool add_variables; /* True iff the aggregated variables should
128 be appended to the existing dictionary */
131 static void initialize_aggregate_info (struct agr_proc *);
133 static void accumulate_aggregate_info (struct agr_proc *,
134 const struct ccase *);
136 static bool parse_aggregate_functions (struct lexer *, const struct dictionary *,
138 static void agr_destroy (struct agr_proc *);
139 static void dump_aggregate_info (const struct agr_proc *agr,
140 struct casewriter *output,
141 const struct ccase *break_case);
145 /* Parses and executes the AGGREGATE procedure. */
147 cmd_aggregate (struct lexer *lexer, struct dataset *ds)
149 struct dictionary *dict = dataset_dict (ds);
150 struct agr_proc agr = {
154 struct file_handle *out_file = NULL;
155 struct casereader *input = NULL;
156 struct casewriter *output = NULL;
158 bool copy_documents = false;
159 bool presorted = false;
160 int addvariables_ofs = 0;
162 /* OUTFILE subcommand must be first. */
163 if (lex_match_phrase (lexer, "/OUTFILE") || lex_match_id (lexer, "OUTFILE"))
165 lex_match (lexer, T_EQUALS);
166 if (!lex_match (lexer, T_ASTERISK))
168 out_file = fh_parse (lexer, FH_REF_FILE, dataset_session (ds));
169 if (out_file == NULL)
173 if (!out_file && lex_match_id (lexer, "MODE"))
175 lex_match (lexer, T_EQUALS);
176 if (lex_match_id (lexer, "ADDVARIABLES"))
178 addvariables_ofs = lex_ofs (lexer) - 1;
179 agr.add_variables = true;
182 else if (lex_match_id (lexer, "REPLACE"))
183 agr.add_variables = false;
186 lex_error_expecting (lexer, "ADDVARIABLES", "REPLACE");
193 agr.add_variables = true;
197 if (lex_match_phrase (lexer, "/MISSING"))
199 lex_match (lexer, T_EQUALS);
200 if (!lex_match_id (lexer, "COLUMNWISE"))
202 lex_error_expecting (lexer, "COLUMNWISE");
205 agr.missing = COLUMNWISE;
208 int presorted_ofs = 0;
210 if (lex_match_phrase (lexer, "/DOCUMENT"))
211 copy_documents = true;
212 else if (lex_match_phrase (lexer, "/PRESORTED"))
215 presorted_ofs = lex_ofs (lexer) - 1;
220 if (agr.add_variables)
221 agr.dict = dict_clone (dict);
223 agr.dict = dict_create (dict_get_encoding (dict));
225 dict_set_label (agr.dict, dict_get_label (dict));
226 dict_set_documents (agr.dict, dict_get_documents (dict));
228 if (lex_match_phrase (lexer, "/BREAK"))
230 lex_match (lexer, T_EQUALS);
232 int break_start = lex_ofs (lexer);
233 if (!parse_sort_criteria (lexer, dict, &agr.sort, &agr.break_vars,
236 int break_end = lex_ofs (lexer) - 1;
237 agr.break_n_vars = subcase_get_n_fields (&agr.sort);
239 if (! agr.add_variables)
240 for (size_t i = 0; i < agr.break_n_vars; i++)
241 dict_clone_var_assert (agr.dict, agr.break_vars[i]);
243 if (presorted && saw_direction)
245 lex_ofs_msg (lexer, SW, break_start, break_end,
246 _("When the input data is presorted, specifying "
247 "sorting directions with (A) or (D) has no effect. "
248 "Output data will be sorted the same way as the "
251 lex_ofs_msg (lexer, SN, presorted_ofs, presorted_ofs,
252 _("The PRESORTED subcommand state that the "
253 "input data is presorted."));
254 else if (addvariables_ofs)
255 lex_ofs_msg (lexer, SN, addvariables_ofs, addvariables_ofs,
256 _("ADDVARIABLES implies that the input data "
259 msg (SN, _("The input data must be presorted because the "
260 "OUTFILE subcommand is not specified."));
264 /* Read in the aggregate functions. */
265 if (!parse_aggregate_functions (lexer, dict, &agr))
268 /* Delete documents. */
270 dict_clear_documents (agr.dict);
272 /* Cancel SPLIT FILE. */
273 dict_clear_split_vars (agr.dict);
278 if (out_file == NULL)
280 /* The active dataset will be replaced by the aggregated data,
281 so TEMPORARY is moot. */
282 proc_make_temporary_transformations_permanent (ds);
283 proc_discard_output (ds);
284 output = autopaging_writer_create (dict_get_proto (agr.dict));
288 output = any_writer_open (out_file, agr.dict);
293 input = proc_open (ds);
294 if (!subcase_is_empty (&agr.sort) && !presorted)
296 input = sort_execute (input, &agr.sort);
297 subcase_clear (&agr.sort);
300 struct casegrouper *grouper;
301 struct casereader *group;
302 for (grouper = casegrouper_create_vars (input, agr.break_vars,
304 casegrouper_get_next_group (grouper, &group);
305 casereader_destroy (group))
307 struct casereader *placeholder = NULL;
308 struct ccase *c = casereader_peek (group, 0);
312 casereader_destroy (group);
316 initialize_aggregate_info (&agr);
318 if (agr.add_variables)
319 placeholder = casereader_clone (group);
323 for (; (cg = casereader_read (group)) != NULL; case_unref (cg))
324 accumulate_aggregate_info (&agr, cg);
328 if (agr.add_variables)
331 for (; (cg = casereader_read (placeholder)) != NULL; case_unref (cg))
332 dump_aggregate_info (&agr, output, cg);
334 casereader_destroy (placeholder);
338 dump_aggregate_info (&agr, output, c);
342 if (!casegrouper_destroy (grouper))
345 bool ok = proc_commit (ds);
350 if (out_file == NULL)
352 struct casereader *next_input = casewriter_make_reader (output);
353 if (next_input == NULL)
356 dataset_set_dict (ds, agr.dict);
357 dataset_set_source (ds, next_input);
362 ok = casewriter_destroy (output);
375 casewriter_destroy (output);
378 return CMD_CASCADING_FAILURE;
382 parse_agr_func_name (struct lexer *lexer, int *func_index,
383 enum mv_class *exclude)
385 if (lex_token (lexer) != T_ID)
387 lex_error (lexer, _("Syntax error expecting aggregation function."));
391 struct substring name = lex_tokss (lexer);
392 *exclude = ss_chomp_byte (&name, '.') ? MV_SYSTEM : MV_ANY;
394 for (const struct agr_func *f = agr_func_tab; f->name; f++)
395 if (ss_equals_case (ss_cstr (f->name), name))
397 *func_index = f - agr_func_tab;
401 lex_error (lexer, _("Unknown aggregation function %s."), lex_tokcstr (lexer));
405 /* Parse all the aggregate functions. */
407 parse_aggregate_functions (struct lexer *lexer, const struct dictionary *dict,
408 struct agr_proc *agr)
410 if (!lex_force_match (lexer, T_SLASH))
413 size_t starting_n_vars = dict_get_n_vars (dict);
414 size_t allocated_agr_vars = 0;
416 /* Parse everything. */
420 char **dest_label = NULL;
423 struct agr_argument arg[2] = { { .f = 0 }, { .f = 0 } };
425 const struct variable **src = NULL;
427 /* Parse the list of target variables. */
428 int dst_start_ofs = lex_ofs (lexer);
429 while (!lex_match (lexer, T_EQUALS))
431 size_t n_vars_prev = n_vars;
433 if (!parse_DATA_LIST_vars (lexer, dict, &dest, &n_vars,
434 (PV_APPEND | PV_SINGLE | PV_NO_SCRATCH
438 /* Assign empty labels. */
439 dest_label = xnrealloc (dest_label, n_vars, sizeof *dest_label);
440 for (size_t j = n_vars_prev; j < n_vars; j++)
441 dest_label[j] = NULL;
443 if (lex_is_string (lexer))
445 dest_label[n_vars - 1] = xstrdup (lex_tokcstr (lexer));
449 int dst_end_ofs = lex_ofs (lexer) - 2;
451 /* Get the name of the aggregation function. */
453 enum mv_class exclude;
454 if (!parse_agr_func_name (lexer, &func_index, &exclude))
456 const struct agr_func *function = &agr_func_tab[func_index];
458 /* Check for leading lparen. */
459 if (!lex_match (lexer, T_LPAREN))
461 if (function->src_vars == AGR_SV_YES)
463 bool ok UNUSED = lex_force_match (lexer, T_LPAREN);
469 /* Parse list of source variables. */
470 int pv_opts = PV_NO_SCRATCH;
471 if (func_index == AGRF_SUM || func_index == AGRF_MEAN
472 || func_index == AGRF_MEDIAN || func_index == AGRF_SD)
473 pv_opts |= PV_NUMERIC;
474 else if (function->n_args)
475 pv_opts |= PV_SAME_TYPE;
477 int src_start_ofs = lex_ofs (lexer);
479 if (!parse_variables_const (lexer, dict, &src, &n_src, pv_opts))
481 int src_end_ofs = lex_ofs (lexer) - 1;
483 /* Parse function arguments, for those functions that
484 require arguments. */
485 int args_start_ofs = 0;
486 if (function->n_args != 0)
487 for (size_t i = 0; i < function->n_args; i++)
489 lex_match (lexer, T_COMMA);
492 if (lex_is_string (lexer))
494 else if (lex_is_number (lexer))
498 lex_error (lexer, _("Missing argument %zu to %s."),
499 i + 1, function->name);
503 if (type != var_get_type (src[0]))
505 msg (SE, _("Arguments to %s must be of same type as "
506 "source variables."),
508 if (type == VAL_NUMERIC)
510 lex_next_msg (lexer, SN, 0, 0,
511 _("The argument is numeric."));
512 lex_ofs_msg (lexer, SN, src_start_ofs, src_end_ofs,
513 _("The variables have string type."));
517 lex_next_msg (lexer, SN, 0, 0,
518 _("The argument is a string."));
519 lex_ofs_msg (lexer, SN, src_start_ofs, src_end_ofs,
520 _("The variables are numeric."));
526 args_start_ofs = lex_ofs (lexer);
527 if (type == VAL_NUMERIC)
528 arg[i].f = lex_tokval (lexer);
530 arg[i].s = recode_substring_pool (dict_get_encoding (agr->dict),
531 "UTF-8", lex_tokss (lexer),
535 int args_end_ofs = lex_ofs (lexer) - 1;
537 /* Trailing rparen. */
538 if (!lex_force_match (lexer, T_RPAREN))
541 /* Now check that the number of source variables match
542 the number of target variables. If we check earlier
543 than this, the user can get very misleading error
544 message, i.e. `AGGREGATE x=SUM(y t).' will get this
545 error message when a proper message would be more
546 like `unknown variable t'. */
549 msg (SE, _("Number of source variables (%zu) does not match "
550 "number of target variables (%zu)."),
552 lex_ofs_msg (lexer, SN, src_start_ofs, src_end_ofs,
553 _("These are the source variables."));
554 lex_ofs_msg (lexer, SN, dst_start_ofs, dst_end_ofs,
555 _("These are the target variables."));
559 if ((func_index == AGRF_PIN || func_index == AGRF_POUT
560 || func_index == AGRF_FIN || func_index == AGRF_FOUT)
561 && (var_is_numeric (src[0])
562 ? arg[0].f > arg[1].f
563 : buf_compare_rpad (arg[0].s.string, arg[0].s.length,
564 arg[1].s.string, arg[1].s.length) > 0))
566 struct agr_argument tmp = arg[0];
570 lex_ofs_msg (lexer, SW, args_start_ofs, args_end_ofs,
571 _("The value arguments passed to the %s function "
572 "are out of order. They will be treated as if "
573 "they had been specified in the correct order."),
578 /* Finally add these to the aggregation variables. */
579 for (size_t i = 0; i < n_vars; i++)
581 const struct variable *existing_var = dict_lookup_var (agr->dict,
585 if (var_get_dict_index (existing_var) >= starting_n_vars)
586 lex_ofs_error (lexer, dst_start_ofs, dst_end_ofs,
587 _("Duplicate target variable name %s."),
589 else if (agr->add_variables)
590 lex_ofs_error (lexer, dst_start_ofs, dst_end_ofs,
591 _("Variable name %s duplicates the name of a "
592 "variable in the active file dictionary."),
595 lex_ofs_error (lexer, dst_start_ofs, dst_end_ofs,
596 _("Variable name %s duplicates the name of a "
597 "break variable."), dest[i]);
602 if (agr->n_agr_vars >= allocated_agr_vars)
603 agr->agr_vars = x2nrealloc (agr->agr_vars, &allocated_agr_vars,
604 sizeof *agr->agr_vars);
605 struct agr_var *v = &agr->agr_vars[agr->n_agr_vars++];
606 *v = (struct agr_var) {
609 .function = func_index,
610 .src = src ? src[i] : NULL,
613 /* Create the target variable in the aggregate dictionary. */
614 if (v->src && var_is_alpha (v->src))
615 v->string = xmalloc (var_get_width (v->src));
617 if (v->src && function->alpha_type == VAL_STRING)
618 v->dest = dict_clone_var_as_assert (agr->dict, v->src, dest[i]);
621 v->dest = dict_create_var_assert (agr->dict, dest[i], 0);
624 if ((func_index == AGRF_N || func_index == AGRF_NMISS)
625 && dict_get_weight (dict) != NULL)
626 f = fmt_for_output (FMT_F, 8, 2);
628 f = function->format;
629 var_set_both_formats (v->dest, f);
632 var_set_label (v->dest, dest_label[i]);
635 for (size_t j = 0; j < function->n_args; j++)
636 v->arg[j] = (struct agr_argument) {
638 .s = arg[j].s.string ? ss_clone (arg[j].s) : ss_empty (),
642 ss_dealloc (&arg[0].s);
643 ss_dealloc (&arg[1].s);
646 for (size_t i = 0; i < n_vars; i++)
649 free (dest_label[i]);
654 if (!lex_match (lexer, T_SLASH))
656 if (lex_token (lexer) == T_ENDCMD)
659 lex_error (lexer, "Syntax error expecting end of command.");
665 for (size_t i = 0; i < n_vars; i++)
668 free (dest_label[i]);
672 ss_dealloc (&arg[0].s);
673 ss_dealloc (&arg[1].s);
682 agr_destroy (struct agr_proc *agr)
684 subcase_uninit (&agr->sort);
685 free (agr->break_vars);
686 for (size_t i = 0; i < agr->n_agr_vars; i++)
688 struct agr_var *av = &agr->agr_vars[i];
690 ss_dealloc (&av->arg[0].s);
691 ss_dealloc (&av->arg[1].s);
694 if (av->function == AGRF_SD)
695 moments1_destroy (av->moments);
697 dict_unref (av->dict);
699 free (agr->agr_vars);
700 if (agr->dict != NULL)
701 dict_unref (agr->dict);
706 /* Accumulates aggregation data from the case INPUT. */
708 accumulate_aggregate_info (struct agr_proc *agr, const struct ccase *input)
710 bool bad_warn = true;
711 double weight = dict_get_case_weight (agr->src_dict, input, &bad_warn);
712 for (size_t i = 0; i < agr->n_agr_vars; i++)
714 struct agr_var *av = &agr->agr_vars[i];
717 bool is_string = var_is_alpha (av->src);
718 const union value *v = case_data (input, av->src);
719 int src_width = var_get_width (av->src);
720 const struct substring vs = (src_width > 0
721 ? value_ss (v, src_width)
724 if (var_is_value_missing (av->src, v) & av->exclude)
726 switch (av->function)
760 av->saw_missing = true;
764 /* This is horrible. There are too many possibilities. */
766 switch (av->function)
769 av->dbl += v->f * weight;
774 av->dbl += v->f * weight;
779 struct ccase *cout = case_create (casewriter_get_proto (av->writer));
780 *case_num_rw (cout, av->subject) = case_num (input, av->src);
781 *case_num_rw (cout, av->weight) = weight;
782 casewriter_write (av->writer, cout);
787 moments1_add (av->moments, v->f, weight);
792 av->dbl = MAX (av->dbl, v->f);
793 else if (memcmp (av->string, v->s, src_width) < 0)
794 memcpy (av->string, v->s, src_width);
800 av->dbl = MIN (av->dbl, v->f);
801 else if (memcmp (av->string, v->s, src_width) > 0)
802 memcpy (av->string, v->s, src_width);
803 av->dbl = MIN (av->dbl, v->f);
811 ? ss_compare_rpad (av->arg[0].s, vs) < 0
812 : v->f > av->arg[0].f)
820 ? ss_compare_rpad (av->arg[0].s, vs) > 0
821 : v->f < av->arg[0].f)
829 ? (ss_compare_rpad (av->arg[0].s, vs) <= 0
830 && ss_compare_rpad (av->arg[1].s, vs) >= 0)
831 : av->arg[0].f <= v->f && v->f <= av->arg[1].f)
839 ? (ss_compare_rpad (av->arg[0].s, vs) > 0
840 || ss_compare_rpad (av->arg[1].s, vs) < 0)
841 : av->arg[0].f > v->f || v->f > av->arg[1].f)
857 memcpy (av->string, v->s, src_width);
866 memcpy (av->string, v->s, src_width);
874 /* Our value is not missing or it would have been
875 caught earlier. Nothing to do. */
882 switch (av->function)
919 /* Writes an aggregated record to OUTPUT. */
921 dump_aggregate_info (const struct agr_proc *agr, struct casewriter *output, const struct ccase *break_case)
923 struct ccase *c = case_create (dict_get_proto (agr->dict));
925 if (agr->add_variables)
927 case_copy (c, 0, break_case, 0, dict_get_n_vars (agr->src_dict));
933 for (size_t i = 0; i < agr->break_n_vars; i++)
935 const struct variable *v = agr->break_vars[i];
936 value_copy (case_data_rw_idx (c, value_idx),
937 case_data (break_case, v),
943 for (size_t i = 0; i < agr->n_agr_vars; i++)
945 struct agr_var *av = &agr->agr_vars[i];
946 union value *v = case_data_rw (c, av->dest);
947 int width = var_get_width (av->dest);
949 if (agr->missing == COLUMNWISE && av->saw_missing
950 && av->function != AGRF_N
951 && av->function != AGRF_NU
952 && av->function != AGRF_NMISS
953 && av->function != AGRF_NUMISS)
955 value_set_missing (v, width);
956 casewriter_destroy (av->writer);
960 switch (av->function)
963 v->f = av->int1 ? av->dbl : SYSMIS;
967 v->f = av->W != 0.0 ? av->dbl / av->W : SYSMIS;
974 struct percentile *median = percentile_create (0.5, av->W);
975 struct order_stats *os = &median->parent;
976 struct casereader *sorted_reader = casewriter_make_reader (av->writer);
979 order_stats_accumulate (&os, 1,
984 av->dbl = percentile_calculate (median, PC_HAVERAGE);
985 statistic_destroy (&median->parent.parent);
995 moments1_calculate (av->moments, NULL, NULL, &variance,
997 v->f = variance != SYSMIS ? sqrt (variance) : SYSMIS;
1006 v->f = av->int1 ? av->dbl : SYSMIS;
1010 memcpy (v->s, av->string, width);
1012 value_set_missing (v, width);
1020 v->f = av->W ? av->dbl / av->W : SYSMIS;
1027 v->f = av->W ? av->dbl / av->W * 100.0 : SYSMIS;
1052 casewriter_write (output, c);
1055 /* Resets the state for all the aggregate functions. */
1057 initialize_aggregate_info (struct agr_proc *agr)
1059 for (size_t i = 0; i < agr->n_agr_vars; i++)
1061 struct agr_var *av = &agr->agr_vars[i];
1062 av->saw_missing = false;
1063 av->dbl = av->W = 0.0;
1066 int width = av->src ? var_get_width (av->src) : 0;
1067 switch (av->function)
1073 memset (av->string, 255, width);
1080 memset (av->string, 0, width);
1085 struct caseproto *proto = caseproto_create ();
1086 proto = caseproto_add_width (proto, 0);
1087 proto = caseproto_add_width (proto, 0);
1090 av->dict = dict_create ("UTF-8");
1092 av->subject = dict_create_var (av->dict, "subject", 0);
1094 av->weight = dict_create_var (av->dict, "weight", 0);
1096 struct subcase ordering;
1097 subcase_init_var (&ordering, av->subject, SC_ASCEND);
1098 av->writer = sort_create_writer (&ordering, proto);
1099 subcase_uninit (&ordering);
1100 caseproto_unref (proto);
1105 if (av->moments == NULL)
1106 av->moments = moments1_create (MOMENT_VARIANCE);
1108 moments1_clear (av->moments);