1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24 #include <data/any-writer.h>
25 #include <data/case-sink.h>
26 #include <data/case.h>
27 #include <data/casefile.h>
28 #include <data/dictionary.h>
29 #include <data/file-handle-def.h>
30 #include <data/procedure.h>
31 #include <data/settings.h>
32 #include <data/storage-stream.h>
33 #include <data/sys-file-writer.h>
34 #include <data/variable.h>
35 #include <language/command.h>
36 #include <language/data-io/file-handle.h>
37 #include <language/lexer/lexer.h>
38 #include <language/lexer/variable-parser.h>
39 #include <language/stats/sort-criteria.h>
40 #include <libpspp/alloc.h>
41 #include <libpspp/assertion.h>
42 #include <libpspp/message.h>
43 #include <libpspp/misc.h>
44 #include <libpspp/pool.h>
45 #include <libpspp/str.h>
46 #include <math/moments.h>
47 #include <math/sort.h>
50 #define _(msgid) gettext (msgid)
52 /* Argument for AGGREGATE function. */
55 double f; /* Numeric. */
56 char *c; /* Short or long string. */
59 /* Specifies how to make an aggregate variable. */
62 struct agr_var *next; /* Next in list. */
64 /* Collected during parsing. */
65 struct variable *src; /* Source variable. */
66 struct variable *dest; /* Target variable. */
67 int function; /* Function. */
68 int include_missing; /* 1=Include user-missing values. */
69 union agr_argument arg[2]; /* Arguments. */
71 /* Accumulated during AGGREGATE execution. */
76 struct moments1 *moments;
79 /* Aggregation functions. */
82 NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
83 FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
84 N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
85 FUNC = 0x1f, /* Function mask. */
86 FSTRING = 1<<5, /* String function bit. */
89 /* Attributes of an aggregation function. */
92 const char *name; /* Aggregation function name. */
93 size_t n_args; /* Number of arguments. */
94 int alpha_type; /* When given ALPHA arguments, output type. */
95 struct fmt_spec format; /* Format spec if alpha_type != ALPHA. */
98 /* Attributes of aggregation functions. */
99 static const struct agr_func agr_func_tab[] =
101 {"<NONE>", 0, -1, {0, 0, 0}},
102 {"SUM", 0, -1, {FMT_F, 8, 2}},
103 {"MEAN", 0, -1, {FMT_F, 8, 2}},
104 {"SD", 0, -1, {FMT_F, 8, 2}},
105 {"MAX", 0, ALPHA, {-1, -1, -1}},
106 {"MIN", 0, ALPHA, {-1, -1, -1}},
107 {"PGT", 1, NUMERIC, {FMT_F, 5, 1}},
108 {"PLT", 1, NUMERIC, {FMT_F, 5, 1}},
109 {"PIN", 2, NUMERIC, {FMT_F, 5, 1}},
110 {"POUT", 2, NUMERIC, {FMT_F, 5, 1}},
111 {"FGT", 1, NUMERIC, {FMT_F, 5, 3}},
112 {"FLT", 1, NUMERIC, {FMT_F, 5, 3}},
113 {"FIN", 2, NUMERIC, {FMT_F, 5, 3}},
114 {"FOUT", 2, NUMERIC, {FMT_F, 5, 3}},
115 {"N", 0, NUMERIC, {FMT_F, 7, 0}},
116 {"NU", 0, NUMERIC, {FMT_F, 7, 0}},
117 {"NMISS", 0, NUMERIC, {FMT_F, 7, 0}},
118 {"NUMISS", 0, NUMERIC, {FMT_F, 7, 0}},
119 {"FIRST", 0, ALPHA, {-1, -1, -1}},
120 {"LAST", 0, ALPHA, {-1, -1, -1}},
121 {NULL, 0, -1, {-1, -1, -1}},
122 {"N", 0, NUMERIC, {FMT_F, 7, 0}},
123 {"NU", 0, NUMERIC, {FMT_F, 7, 0}},
126 /* Missing value types. */
127 enum missing_treatment
129 ITEMWISE, /* Missing values item by item. */
130 COLUMNWISE /* Missing values column by column. */
133 /* An entire AGGREGATE procedure. */
136 /* We have either an output file or a sink. */
137 struct any_writer *writer; /* Output file, or null if none. */
138 struct case_sink *sink; /* Sink, or null if none. */
140 /* Break variables. */
141 struct sort_criteria *sort; /* Sort criteria. */
142 struct variable **break_vars; /* Break variables. */
143 size_t break_var_cnt; /* Number of break variables. */
144 struct ccase break_case; /* Last values of break variables. */
146 enum missing_treatment missing; /* How to treat missing values. */
147 struct agr_var *agr_vars; /* First aggregate variable. */
148 struct dictionary *dict; /* Aggregate dictionary. */
149 int case_cnt; /* Counts aggregated cases. */
150 struct ccase agr_case; /* Aggregate case for output. */
153 static void initialize_aggregate_info (struct agr_proc *,
154 const struct ccase *);
157 static bool parse_aggregate_functions (struct agr_proc *);
158 static void agr_destroy (struct agr_proc *);
159 static bool aggregate_single_case (struct agr_proc *agr,
160 const struct ccase *input,
161 struct ccase *output);
162 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
164 /* Aggregating to the active file. */
165 static bool agr_to_active_file (const struct ccase *, void *aux);
167 /* Aggregating to a system file. */
168 static bool presorted_agr_to_sysfile (const struct ccase *, void *aux);
172 /* Parses and executes the AGGREGATE procedure. */
177 struct file_handle *out_file = NULL;
179 bool copy_documents = false;
180 bool presorted = false;
183 memset(&agr, 0 , sizeof (agr));
184 agr.missing = ITEMWISE;
185 case_nullify (&agr.break_case);
187 agr.dict = dict_create ();
188 dict_set_label (agr.dict, dict_get_label (dataset_dict (current_dataset)));
189 dict_set_documents (agr.dict, dict_get_documents (dataset_dict (current_dataset)));
191 /* OUTFILE subcommand must be first. */
192 if (!lex_force_match_id ("OUTFILE"))
195 if (!lex_match ('*'))
197 out_file = fh_parse (FH_REF_FILE | FH_REF_SCRATCH);
198 if (out_file == NULL)
202 /* Read most of the subcommands. */
207 if (lex_match_id ("MISSING"))
210 if (!lex_match_id ("COLUMNWISE"))
212 lex_error (_("while expecting COLUMNWISE"));
215 agr.missing = COLUMNWISE;
217 else if (lex_match_id ("DOCUMENT"))
218 copy_documents = true;
219 else if (lex_match_id ("PRESORTED"))
221 else if (lex_match_id ("BREAK"))
226 agr.sort = sort_parse_criteria (dataset_dict (current_dataset),
227 &agr.break_vars, &agr.break_var_cnt,
228 &saw_direction, NULL);
229 if (agr.sort == NULL)
232 for (i = 0; i < agr.break_var_cnt; i++)
233 dict_clone_var_assert (agr.dict, agr.break_vars[i],
234 agr.break_vars[i]->name);
236 /* BREAK must follow the options. */
241 lex_error (_("expecting BREAK"));
245 if (presorted && saw_direction)
246 msg (SW, _("When PRESORTED is specified, specifying sorting directions "
247 "with (A) or (D) has no effect. Output data will be sorted "
248 "the same way as the input data."));
250 /* Read in the aggregate functions. */
252 if (!parse_aggregate_functions (&agr))
255 /* Delete documents. */
257 dict_set_documents (agr.dict, NULL);
259 /* Cancel SPLIT FILE. */
260 dict_set_split_vars (agr.dict, NULL, 0);
264 case_create (&agr.agr_case, dict_get_next_value_idx (agr.dict));
266 /* Output to active file or external file? */
267 if (out_file == NULL)
269 /* The active file will be replaced by the aggregated data,
270 so TEMPORARY is moot. */
271 proc_cancel_temporary_transformations (current_dataset);
273 if (agr.sort != NULL && !presorted)
275 if (!sort_active_file_in_place (agr.sort))
279 agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
280 if (agr.sink->class->open != NULL)
281 agr.sink->class->open (agr.sink);
282 proc_set_sink (current_dataset,
283 create_case_sink (&null_sink_class,
284 dataset_dict (current_dataset), NULL));
285 if (!procedure (current_dataset,agr_to_active_file, &agr))
287 if (agr.case_cnt > 0)
289 dump_aggregate_info (&agr, &agr.agr_case);
290 if (!agr.sink->class->write (agr.sink, &agr.agr_case))
293 discard_variables (current_dataset);
294 dict_destroy (dataset_dict (current_dataset));
295 dataset_set_dict (current_dataset, agr.dict);
297 proc_set_source (current_dataset,
298 agr.sink->class->make_source (agr.sink));
299 free_case_sink (agr.sink);
303 agr.writer = any_writer_open (out_file, agr.dict);
304 if (agr.writer == NULL)
307 if (agr.sort != NULL && !presorted)
309 /* Sorting is needed. */
310 struct casefile *dst;
311 struct casereader *reader;
315 dst = sort_active_file_to_casefile (agr.sort);
318 reader = casefile_get_destructive_reader (dst);
319 while (ok && casereader_read_xfer (reader, &c))
321 if (aggregate_single_case (&agr, &c, &agr.agr_case))
322 ok = any_writer_write (agr.writer, &agr.agr_case);
325 casereader_destroy (reader);
327 ok = !casefile_error (dst);
328 casefile_destroy (dst);
334 /* Active file is already sorted. */
335 if (!procedure (current_dataset,presorted_agr_to_sysfile, &agr))
339 if (agr.case_cnt > 0)
341 dump_aggregate_info (&agr, &agr.agr_case);
342 any_writer_write (agr.writer, &agr.agr_case);
344 if (any_writer_error (agr.writer))
353 return CMD_CASCADING_FAILURE;
356 /* Parse all the aggregate functions. */
358 parse_aggregate_functions (struct agr_proc *agr)
360 struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
362 /* Parse everything. */
371 const struct agr_func *function;
374 union agr_argument arg[2];
376 struct variable **src;
390 /* Parse the list of target variables. */
391 while (!lex_match ('='))
393 size_t n_dest_prev = n_dest;
395 if (!parse_DATA_LIST_vars (&dest, &n_dest,
396 PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
399 /* Assign empty labels. */
403 dest_label = xnrealloc (dest_label, n_dest, sizeof *dest_label);
404 for (j = n_dest_prev; j < n_dest; j++)
405 dest_label[j] = NULL;
408 if (token == T_STRING)
410 ds_truncate (&tokstr, 255);
411 dest_label[n_dest - 1] = ds_xstrdup (&tokstr);
416 /* Get the name of the aggregation function. */
419 lex_error (_("expecting aggregation function"));
424 if (tokid[strlen (tokid) - 1] == '.')
427 tokid[strlen (tokid) - 1] = 0;
430 for (function = agr_func_tab; function->name; function++)
431 if (!strcasecmp (function->name, tokid))
433 if (NULL == function->name)
435 msg (SE, _("Unknown aggregation function %s."), tokid);
438 func_index = function - agr_func_tab;
441 /* Check for leading lparen. */
442 if (!lex_match ('('))
445 func_index = N_NO_VARS;
446 else if (func_index == NU)
447 func_index = NU_NO_VARS;
450 lex_error (_("expecting `('"));
456 /* Parse list of source variables. */
458 int pv_opts = PV_NO_SCRATCH;
460 if (func_index == SUM || func_index == MEAN || func_index == SD)
461 pv_opts |= PV_NUMERIC;
462 else if (function->n_args)
463 pv_opts |= PV_SAME_TYPE;
465 if (!parse_variables (dataset_dict (current_dataset), &src, &n_src, pv_opts))
469 /* Parse function arguments, for those functions that
470 require arguments. */
471 if (function->n_args != 0)
472 for (i = 0; i < function->n_args; i++)
477 if (token == T_STRING)
479 arg[i].c = ds_xstrdup (&tokstr);
482 else if (lex_is_number ())
487 msg (SE, _("Missing argument %d to %s."), i + 1,
494 if (type != src[0]->type)
496 msg (SE, _("Arguments to %s must be of same type as "
497 "source variables."),
503 /* Trailing rparen. */
506 lex_error (_("expecting `)'"));
510 /* Now check that the number of source variables match
511 the number of target variables. If we check earlier
512 than this, the user can get very misleading error
513 message, i.e. `AGGREGATE x=SUM(y t).' will get this
514 error message when a proper message would be more
515 like `unknown variable t'. */
518 msg (SE, _("Number of source variables (%u) does not match "
519 "number of target variables (%u)."),
520 (unsigned) n_src, (unsigned) n_dest);
524 if ((func_index == PIN || func_index == POUT
525 || func_index == FIN || func_index == FOUT)
526 && ((src[0]->type == NUMERIC && arg[0].f > arg[1].f)
527 || (src[0]->type == ALPHA
528 && str_compare_rpad (arg[0].c, arg[1].c) > 0)))
530 union agr_argument t = arg[0];
534 msg (SW, _("The value arguments passed to the %s function "
535 "are out-of-order. They will be treated as if "
536 "they had been specified in the correct order."),
541 /* Finally add these to the linked list of aggregation
543 for (i = 0; i < n_dest; i++)
545 struct agr_var *v = xmalloc (sizeof *v);
547 /* Add variable to chain. */
548 if (agr->agr_vars != NULL)
556 /* Create the target variable in the aggregate
559 struct variable *destvar;
561 v->function = func_index;
567 if (src[i]->type == ALPHA)
569 v->function |= FSTRING;
570 v->string = xmalloc (src[i]->width);
573 if (function->alpha_type == ALPHA)
574 destvar = dict_clone_var (agr->dict, v->src, dest[i]);
577 assert (v->src->type == NUMERIC
578 || function->alpha_type == NUMERIC);
579 destvar = dict_create_var (agr->dict, dest[i], 0);
582 if ((func_index == N || func_index == NMISS)
583 && dict_get_weight (dataset_dict (current_dataset)) != NULL)
584 destvar->print = destvar->write = f8_2;
586 destvar->print = destvar->write = function->format;
591 destvar = dict_create_var (agr->dict, dest[i], 0);
592 if (func_index == N_NO_VARS
593 && dict_get_weight (dataset_dict (current_dataset)) != NULL)
594 destvar->print = destvar->write = f8_2;
596 destvar->print = destvar->write = function->format;
601 msg (SE, _("Variable name %s is not unique within the "
602 "aggregate file dictionary, which contains "
603 "the aggregate variables and the break "
612 destvar->label = dest_label[i];
613 dest_label[i] = NULL;
619 v->include_missing = include_missing;
625 if (v->src->type == NUMERIC)
626 for (j = 0; j < function->n_args; j++)
627 v->arg[j].f = arg[j].f;
629 for (j = 0; j < function->n_args; j++)
630 v->arg[j].c = xstrdup (arg[j].c);
634 if (src != NULL && src[0]->type == ALPHA)
635 for (i = 0; i < function->n_args; i++)
645 if (!lex_match ('/'))
650 lex_error ("expecting end of command");
656 for (i = 0; i < n_dest; i++)
659 free (dest_label[i]);
665 if (src && n_src && src[0]->type == ALPHA)
666 for (i = 0; i < function->n_args; i++)
679 agr_destroy (struct agr_proc *agr)
681 struct agr_var *iter, *next;
683 any_writer_close (agr->writer);
684 if (agr->sort != NULL)
685 sort_destroy_criteria (agr->sort);
686 free (agr->break_vars);
687 case_destroy (&agr->break_case);
688 for (iter = agr->agr_vars; iter; iter = next)
692 if (iter->function & FSTRING)
697 n_args = agr_func_tab[iter->function & FUNC].n_args;
698 for (i = 0; i < n_args; i++)
699 free (iter->arg[i].c);
702 else if (iter->function == SD)
703 moments1_destroy (iter->moments);
706 if (agr->dict != NULL)
707 dict_destroy (agr->dict);
709 case_destroy (&agr->agr_case);
714 static void accumulate_aggregate_info (struct agr_proc *,
715 const struct ccase *);
716 static void dump_aggregate_info (struct agr_proc *, struct ccase *);
718 /* Processes a single case INPUT for aggregation. If output is
719 warranted, writes it to OUTPUT and returns true.
720 Otherwise, returns false and OUTPUT is unmodified. */
722 aggregate_single_case (struct agr_proc *agr,
723 const struct ccase *input, struct ccase *output)
725 bool finished_group = false;
727 if (agr->case_cnt++ == 0)
728 initialize_aggregate_info (agr, input);
729 else if (case_compare (&agr->break_case, input,
730 agr->break_vars, agr->break_var_cnt))
732 dump_aggregate_info (agr, output);
733 finished_group = true;
735 initialize_aggregate_info (agr, input);
738 accumulate_aggregate_info (agr, input);
739 return finished_group;
742 /* Accumulates aggregation data from the case INPUT. */
744 accumulate_aggregate_info (struct agr_proc *agr,
745 const struct ccase *input)
747 struct agr_var *iter;
749 bool bad_warn = true;
751 weight = dict_get_case_weight (dataset_dict (current_dataset), input, &bad_warn);
753 for (iter = agr->agr_vars; iter; iter = iter->next)
756 const union value *v = case_data (input, iter->src->fv);
758 if ((!iter->include_missing
759 && mv_is_value_missing (&iter->src->miss, v))
760 || (iter->include_missing && iter->src->type == NUMERIC
763 switch (iter->function)
766 case NMISS | FSTRING:
767 iter->dbl[0] += weight;
770 case NUMISS | FSTRING:
778 /* This is horrible. There are too many possibilities. */
779 switch (iter->function)
782 iter->dbl[0] += v->f * weight;
786 iter->dbl[0] += v->f * weight;
787 iter->dbl[1] += weight;
790 moments1_add (iter->moments, v->f, weight);
793 iter->dbl[0] = max (iter->dbl[0], v->f);
797 if (memcmp (iter->string, v->s, iter->src->width) < 0)
798 memcpy (iter->string, v->s, iter->src->width);
802 iter->dbl[0] = min (iter->dbl[0], v->f);
806 if (memcmp (iter->string, v->s, iter->src->width) > 0)
807 memcpy (iter->string, v->s, iter->src->width);
812 if (v->f > iter->arg[0].f)
813 iter->dbl[0] += weight;
814 iter->dbl[1] += weight;
818 if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
819 iter->dbl[0] += weight;
820 iter->dbl[1] += weight;
824 if (v->f < iter->arg[0].f)
825 iter->dbl[0] += weight;
826 iter->dbl[1] += weight;
830 if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
831 iter->dbl[0] += weight;
832 iter->dbl[1] += weight;
836 if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
837 iter->dbl[0] += weight;
838 iter->dbl[1] += weight;
842 if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
843 && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
844 iter->dbl[0] += weight;
845 iter->dbl[1] += weight;
849 if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
850 iter->dbl[0] += weight;
851 iter->dbl[1] += weight;
855 if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
856 || memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
857 iter->dbl[0] += weight;
858 iter->dbl[1] += weight;
862 iter->dbl[0] += weight;
875 case FIRST | FSTRING:
878 memcpy (iter->string, v->s, iter->src->width);
887 memcpy (iter->string, v->s, iter->src->width);
891 case NMISS | FSTRING:
893 case NUMISS | FSTRING:
894 /* Our value is not missing or it would have been
895 caught earlier. Nothing to do. */
901 switch (iter->function)
904 iter->dbl[0] += weight;
915 /* We've come to a record that differs from the previous in one or
916 more of the break variables. Make an output record from the
917 accumulated statistics in the OUTPUT case. */
919 dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
925 for (i = 0; i < agr->break_var_cnt; i++)
927 struct variable *v = agr->break_vars[i];
928 memcpy (case_data_rw (output, value_idx),
929 case_data (&agr->break_case, v->fv),
930 sizeof (union value) * v->nv);
938 for (i = agr->agr_vars; i; i = i->next)
940 union value *v = case_data_rw (output, i->dest->fv);
942 if (agr->missing == COLUMNWISE && i->missing != 0
943 && (i->function & FUNC) != N && (i->function & FUNC) != NU
944 && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
946 if (i->dest->type == ALPHA)
947 memset (v->s, ' ', i->dest->width);
956 v->f = i->int1 ? i->dbl[0] : SYSMIS;
959 v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
965 /* FIXME: we should use two passes. */
966 moments1_calculate (i->moments, NULL, NULL, &variance,
968 if (variance != SYSMIS)
969 v->f = sqrt (variance);
976 v->f = i->int1 ? i->dbl[0] : SYSMIS;
981 memcpy (v->s, i->string, i->dest->width);
983 memset (v->s, ' ', i->dest->width);
993 v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
1002 case POUT | FSTRING:
1003 v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
1015 v->f = i->int1 ? i->dbl[0] : SYSMIS;
1017 case FIRST | FSTRING:
1018 case LAST | FSTRING:
1020 memcpy (v->s, i->string, i->dest->width);
1022 memset (v->s, ' ', i->dest->width);
1031 case NMISS | FSTRING:
1035 case NUMISS | FSTRING:
1045 /* Resets the state for all the aggregate functions. */
1047 initialize_aggregate_info (struct agr_proc *agr, const struct ccase *input)
1049 struct agr_var *iter;
1051 case_destroy (&agr->break_case);
1052 case_clone (&agr->break_case, input);
1054 for (iter = agr->agr_vars; iter; iter = iter->next)
1057 iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1058 iter->int1 = iter->int2 = 0;
1059 switch (iter->function)
1062 iter->dbl[0] = DBL_MAX;
1065 memset (iter->string, 255, iter->src->width);
1068 iter->dbl[0] = -DBL_MAX;
1071 memset (iter->string, 0, iter->src->width);
1074 if (iter->moments == NULL)
1075 iter->moments = moments1_create (MOMENT_VARIANCE);
1077 moments1_clear (iter->moments);
1085 /* Aggregate each case as it comes through. Cases which aren't needed
1087 Returns true if successful, false if an I/O error occurred. */
1089 agr_to_active_file (const struct ccase *c, void *agr_)
1091 struct agr_proc *agr = agr_;
1093 if (aggregate_single_case (agr, c, &agr->agr_case))
1094 return agr->sink->class->write (agr->sink, &agr->agr_case);
1099 /* Aggregate the current case and output it if we passed a
1102 presorted_agr_to_sysfile (const struct ccase *c, void *agr_)
1104 struct agr_proc *agr = agr_;
1106 if (aggregate_single_case (agr, c, &agr->agr_case))
1107 return any_writer_write (agr->writer, &agr->agr_case);