1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 #include <libpspp/message.h>
23 #include <libpspp/alloc.h>
24 #include <data/any-writer.h>
25 #include <data/case.h>
26 #include <data/casefile.h>
27 #include <language/command.h>
28 #include <data/dictionary.h>
29 #include <libpspp/message.h>
30 #include <data/file-handle-def.h>
31 #include <language/data-io/file-handle.h>
32 #include <language/lexer/lexer.h>
33 #include <language/stats/sort-criteria.h>
34 #include <libpspp/misc.h>
35 #include <math/moments.h>
36 #include <libpspp/pool.h>
37 #include <data/settings.h>
38 #include <data/sys-file-writer.h>
39 #include <math/sort.h>
40 #include <libpspp/str.h>
41 #include <data/variable.h>
42 #include <procedure.h>
45 #define _(msgid) gettext (msgid)
47 /* Specifies how to make an aggregate variable. */
50 struct agr_var *next; /* Next in list. */
52 /* Collected during parsing. */
53 struct variable *src; /* Source variable. */
54 struct variable *dest; /* Target variable. */
55 int function; /* Function. */
56 int include_missing; /* 1=Include user-missing values. */
57 union value arg[2]; /* Arguments. */
59 /* Accumulated during AGGREGATE execution. */
64 struct moments1 *moments;
67 /* Aggregation functions. */
70 NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
71 FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
72 N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
73 FUNC = 0x1f, /* Function mask. */
74 FSTRING = 1<<5, /* String function bit. */
77 /* Attributes of an aggregation function. */
80 const char *name; /* Aggregation function name. */
81 size_t n_args; /* Number of arguments. */
82 int alpha_type; /* When given ALPHA arguments, output type. */
83 struct fmt_spec format; /* Format spec if alpha_type != ALPHA. */
86 /* Attributes of aggregation functions. */
87 static const struct agr_func agr_func_tab[] =
89 {"<NONE>", 0, -1, {0, 0, 0}},
90 {"SUM", 0, -1, {FMT_F, 8, 2}},
91 {"MEAN", 0, -1, {FMT_F, 8, 2}},
92 {"SD", 0, -1, {FMT_F, 8, 2}},
93 {"MAX", 0, ALPHA, {-1, -1, -1}},
94 {"MIN", 0, ALPHA, {-1, -1, -1}},
95 {"PGT", 1, NUMERIC, {FMT_F, 5, 1}},
96 {"PLT", 1, NUMERIC, {FMT_F, 5, 1}},
97 {"PIN", 2, NUMERIC, {FMT_F, 5, 1}},
98 {"POUT", 2, NUMERIC, {FMT_F, 5, 1}},
99 {"FGT", 1, NUMERIC, {FMT_F, 5, 3}},
100 {"FLT", 1, NUMERIC, {FMT_F, 5, 3}},
101 {"FIN", 2, NUMERIC, {FMT_F, 5, 3}},
102 {"FOUT", 2, NUMERIC, {FMT_F, 5, 3}},
103 {"N", 0, NUMERIC, {FMT_F, 7, 0}},
104 {"NU", 0, NUMERIC, {FMT_F, 7, 0}},
105 {"NMISS", 0, NUMERIC, {FMT_F, 7, 0}},
106 {"NUMISS", 0, NUMERIC, {FMT_F, 7, 0}},
107 {"FIRST", 0, ALPHA, {-1, -1, -1}},
108 {"LAST", 0, ALPHA, {-1, -1, -1}},
109 {NULL, 0, -1, {-1, -1, -1}},
110 {"N", 0, NUMERIC, {FMT_F, 7, 0}},
111 {"NU", 0, NUMERIC, {FMT_F, 7, 0}},
114 /* Missing value types. */
115 enum missing_treatment
117 ITEMWISE, /* Missing values item by item. */
118 COLUMNWISE /* Missing values column by column. */
121 /* An entire AGGREGATE procedure. */
124 /* We have either an output file or a sink. */
125 struct any_writer *writer; /* Output file, or null if none. */
126 struct case_sink *sink; /* Sink, or null if none. */
128 /* Break variables. */
129 struct sort_criteria *sort; /* Sort criteria. */
130 struct variable **break_vars; /* Break variables. */
131 size_t break_var_cnt; /* Number of break variables. */
132 struct ccase break_case; /* Last values of break variables. */
134 enum missing_treatment missing; /* How to treat missing values. */
135 struct agr_var *agr_vars; /* First aggregate variable. */
136 struct dictionary *dict; /* Aggregate dictionary. */
137 int case_cnt; /* Counts aggregated cases. */
138 struct ccase agr_case; /* Aggregate case for output. */
141 static void initialize_aggregate_info (struct agr_proc *,
142 const struct ccase *);
145 static int parse_aggregate_functions (struct agr_proc *);
146 static void agr_destroy (struct agr_proc *);
147 static int aggregate_single_case (struct agr_proc *agr,
148 const struct ccase *input,
149 struct ccase *output);
150 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
152 /* Aggregating to the active file. */
153 static bool agr_to_active_file (struct ccase *, void *aux);
155 /* Aggregating to a system file. */
156 static bool presorted_agr_to_sysfile (struct ccase *, void *aux);
160 /* Parses and executes the AGGREGATE procedure. */
165 struct file_handle *out_file = NULL;
167 bool copy_documents = false;
168 bool presorted = false;
171 memset(&agr, 0 , sizeof (agr));
172 agr.missing = ITEMWISE;
173 case_nullify (&agr.break_case);
175 agr.dict = dict_create ();
176 dict_set_label (agr.dict, dict_get_label (default_dict));
177 dict_set_documents (agr.dict, dict_get_documents (default_dict));
179 /* OUTFILE subcommand must be first. */
180 if (!lex_force_match_id ("OUTFILE"))
183 if (!lex_match ('*'))
185 out_file = fh_parse (FH_REF_FILE | FH_REF_SCRATCH);
186 if (out_file == NULL)
190 /* Read most of the subcommands. */
195 if (lex_match_id ("MISSING"))
198 if (!lex_match_id ("COLUMNWISE"))
200 lex_error (_("while expecting COLUMNWISE"));
203 agr.missing = COLUMNWISE;
205 else if (lex_match_id ("DOCUMENT"))
206 copy_documents = true;
207 else if (lex_match_id ("PRESORTED"))
209 else if (lex_match_id ("BREAK"))
214 agr.sort = sort_parse_criteria (default_dict,
215 &agr.break_vars, &agr.break_var_cnt,
216 &saw_direction, NULL);
217 if (agr.sort == NULL)
220 for (i = 0; i < agr.break_var_cnt; i++)
221 dict_clone_var_assert (agr.dict, agr.break_vars[i],
222 agr.break_vars[i]->name);
224 /* BREAK must follow the options. */
229 lex_error (_("expecting BREAK"));
233 if (presorted && saw_direction)
234 msg (SW, _("When PRESORTED is specified, specifying sorting directions "
235 "with (A) or (D) has no effect. Output data will be sorted "
236 "the same way as the input data."));
238 /* Read in the aggregate functions. */
240 if (!parse_aggregate_functions (&agr))
243 /* Delete documents. */
245 dict_set_documents (agr.dict, NULL);
247 /* Cancel SPLIT FILE. */
248 dict_set_split_vars (agr.dict, NULL, 0);
252 case_create (&agr.agr_case, dict_get_next_value_idx (agr.dict));
254 /* Output to active file or external file? */
255 if (out_file == NULL)
257 /* The active file will be replaced by the aggregated data,
258 so TEMPORARY is moot. */
261 if (agr.sort != NULL && !presorted)
263 if (!sort_active_file_in_place (agr.sort))
267 agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
268 if (agr.sink->class->open != NULL)
269 agr.sink->class->open (agr.sink);
270 vfm_sink = create_case_sink (&null_sink_class, default_dict, NULL);
271 if (!procedure (agr_to_active_file, &agr))
273 if (agr.case_cnt > 0)
275 dump_aggregate_info (&agr, &agr.agr_case);
276 if (!agr.sink->class->write (agr.sink, &agr.agr_case))
279 dict_destroy (default_dict);
280 default_dict = agr.dict;
282 vfm_source = agr.sink->class->make_source (agr.sink);
283 free_case_sink (agr.sink);
287 agr.writer = any_writer_open (out_file, agr.dict);
288 if (agr.writer == NULL)
291 if (agr.sort != NULL && !presorted)
293 /* Sorting is needed. */
294 struct casefile *dst;
295 struct casereader *reader;
299 dst = sort_active_file_to_casefile (agr.sort);
302 reader = casefile_get_destructive_reader (dst);
303 while (ok && casereader_read_xfer (reader, &c))
305 if (aggregate_single_case (&agr, &c, &agr.agr_case))
306 ok = any_writer_write (agr.writer, &agr.agr_case);
309 casereader_destroy (reader);
311 ok = !casefile_error (dst);
312 casefile_destroy (dst);
318 /* Active file is already sorted. */
319 if (!procedure (presorted_agr_to_sysfile, &agr))
323 if (agr.case_cnt > 0)
325 dump_aggregate_info (&agr, &agr.agr_case);
326 any_writer_write (agr.writer, &agr.agr_case);
328 if (any_writer_error (agr.writer))
337 return CMD_CASCADING_FAILURE;
340 /* Parse all the aggregate functions. */
342 parse_aggregate_functions (struct agr_proc *agr)
344 struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
346 /* Parse everything. */
355 const struct agr_func *function;
360 struct variable **src;
374 /* Parse the list of target variables. */
375 while (!lex_match ('='))
377 size_t n_dest_prev = n_dest;
379 if (!parse_DATA_LIST_vars (&dest, &n_dest,
380 PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
383 /* Assign empty labels. */
387 dest_label = xnrealloc (dest_label, n_dest, sizeof *dest_label);
388 for (j = n_dest_prev; j < n_dest; j++)
389 dest_label[j] = NULL;
392 if (token == T_STRING)
394 ds_truncate (&tokstr, 255);
395 dest_label[n_dest - 1] = xstrdup (ds_c_str (&tokstr));
400 /* Get the name of the aggregation function. */
403 lex_error (_("expecting aggregation function"));
408 if (tokid[strlen (tokid) - 1] == '.')
411 tokid[strlen (tokid) - 1] = 0;
414 for (function = agr_func_tab; function->name; function++)
415 if (!strcasecmp (function->name, tokid))
417 if (NULL == function->name)
419 msg (SE, _("Unknown aggregation function %s."), tokid);
422 func_index = function - agr_func_tab;
425 /* Check for leading lparen. */
426 if (!lex_match ('('))
429 func_index = N_NO_VARS;
430 else if (func_index == NU)
431 func_index = NU_NO_VARS;
434 lex_error (_("expecting `('"));
440 /* Parse list of source variables. */
442 int pv_opts = PV_NO_SCRATCH;
444 if (func_index == SUM || func_index == MEAN || func_index == SD)
445 pv_opts |= PV_NUMERIC;
446 else if (function->n_args)
447 pv_opts |= PV_SAME_TYPE;
449 if (!parse_variables (default_dict, &src, &n_src, pv_opts))
453 /* Parse function arguments, for those functions that
454 require arguments. */
455 if (function->n_args != 0)
456 for (i = 0; i < function->n_args; i++)
461 if (token == T_STRING)
463 arg[i].c = xstrdup (ds_c_str (&tokstr));
466 else if (lex_is_number ())
471 msg (SE, _("Missing argument %d to %s."), i + 1,
478 if (type != src[0]->type)
480 msg (SE, _("Arguments to %s must be of same type as "
481 "source variables."),
487 /* Trailing rparen. */
490 lex_error (_("expecting `)'"));
494 /* Now check that the number of source variables match
495 the number of target variables. If we check earlier
496 than this, the user can get very misleading error
497 message, i.e. `AGGREGATE x=SUM(y t).' will get this
498 error message when a proper message would be more
499 like `unknown variable t'. */
502 msg (SE, _("Number of source variables (%u) does not match "
503 "number of target variables (%u)."),
504 (unsigned) n_src, (unsigned) n_dest);
508 if ((func_index == PIN || func_index == POUT
509 || func_index == FIN || func_index == FOUT)
510 && ((src[0]->type == NUMERIC && arg[0].f > arg[1].f)
511 || (src[0]->type == ALPHA
512 && str_compare_rpad (arg[0].c, arg[1].c) > 0)))
514 union value t = arg[0];
518 msg (SW, _("The value arguments passed to the %s function "
519 "are out-of-order. They will be treated as if "
520 "they had been specified in the correct order."),
525 /* Finally add these to the linked list of aggregation
527 for (i = 0; i < n_dest; i++)
529 struct agr_var *v = xmalloc (sizeof *v);
531 /* Add variable to chain. */
532 if (agr->agr_vars != NULL)
540 /* Create the target variable in the aggregate
543 struct variable *destvar;
545 v->function = func_index;
551 if (src[i]->type == ALPHA)
553 v->function |= FSTRING;
554 v->string = xmalloc (src[i]->width);
557 if (function->alpha_type == ALPHA)
558 destvar = dict_clone_var (agr->dict, v->src, dest[i]);
561 assert (v->src->type == NUMERIC
562 || function->alpha_type == NUMERIC);
563 destvar = dict_create_var (agr->dict, dest[i], 0);
566 if ((func_index == N || func_index == NMISS)
567 && dict_get_weight (default_dict) != NULL)
568 destvar->print = destvar->write = f8_2;
570 destvar->print = destvar->write = function->format;
575 destvar = dict_create_var (agr->dict, dest[i], 0);
576 if (func_index == N_NO_VARS
577 && dict_get_weight (default_dict) != NULL)
578 destvar->print = destvar->write = f8_2;
580 destvar->print = destvar->write = function->format;
585 msg (SE, _("Variable name %s is not unique within the "
586 "aggregate file dictionary, which contains "
587 "the aggregate variables and the break "
596 destvar->label = dest_label[i];
597 dest_label[i] = NULL;
603 v->include_missing = include_missing;
609 if (v->src->type == NUMERIC)
610 for (j = 0; j < function->n_args; j++)
611 v->arg[j].f = arg[j].f;
613 for (j = 0; j < function->n_args; j++)
614 v->arg[j].c = xstrdup (arg[j].c);
618 if (src != NULL && src[0]->type == ALPHA)
619 for (i = 0; i < function->n_args; i++)
629 if (!lex_match ('/'))
634 lex_error ("expecting end of command");
640 for (i = 0; i < n_dest; i++)
643 free (dest_label[i]);
649 if (src && n_src && src[0]->type == ALPHA)
650 for (i = 0; i < function->n_args; i++)
663 agr_destroy (struct agr_proc *agr)
665 struct agr_var *iter, *next;
667 any_writer_close (agr->writer);
668 if (agr->sort != NULL)
669 sort_destroy_criteria (agr->sort);
670 free (agr->break_vars);
671 case_destroy (&agr->break_case);
672 for (iter = agr->agr_vars; iter; iter = next)
676 if (iter->function & FSTRING)
681 n_args = agr_func_tab[iter->function & FUNC].n_args;
682 for (i = 0; i < n_args; i++)
683 free (iter->arg[i].c);
686 else if (iter->function == SD)
687 moments1_destroy (iter->moments);
690 if (agr->dict != NULL)
691 dict_destroy (agr->dict);
693 case_destroy (&agr->agr_case);
698 static void accumulate_aggregate_info (struct agr_proc *,
699 const struct ccase *);
700 static void dump_aggregate_info (struct agr_proc *, struct ccase *);
702 /* Processes a single case INPUT for aggregation. If output is
703 warranted, writes it to OUTPUT and returns nonzero.
704 Otherwise, returns zero and OUTPUT is unmodified. */
706 aggregate_single_case (struct agr_proc *agr,
707 const struct ccase *input, struct ccase *output)
709 bool finished_group = false;
711 if (agr->case_cnt++ == 0)
712 initialize_aggregate_info (agr, input);
713 else if (case_compare (&agr->break_case, input,
714 agr->break_vars, agr->break_var_cnt))
716 dump_aggregate_info (agr, output);
717 finished_group = true;
719 initialize_aggregate_info (agr, input);
722 accumulate_aggregate_info (agr, input);
723 return finished_group;
726 /* Accumulates aggregation data from the case INPUT. */
728 accumulate_aggregate_info (struct agr_proc *agr,
729 const struct ccase *input)
731 struct agr_var *iter;
735 weight = dict_get_case_weight (default_dict, input, &bad_warn);
737 for (iter = agr->agr_vars; iter; iter = iter->next)
740 const union value *v = case_data (input, iter->src->fv);
742 if ((!iter->include_missing
743 && mv_is_value_missing (&iter->src->miss, v))
744 || (iter->include_missing && iter->src->type == NUMERIC
747 switch (iter->function)
750 case NMISS | FSTRING:
751 iter->dbl[0] += weight;
754 case NUMISS | FSTRING:
762 /* This is horrible. There are too many possibilities. */
763 switch (iter->function)
766 iter->dbl[0] += v->f * weight;
770 iter->dbl[0] += v->f * weight;
771 iter->dbl[1] += weight;
774 moments1_add (iter->moments, v->f, weight);
777 iter->dbl[0] = max (iter->dbl[0], v->f);
781 if (memcmp (iter->string, v->s, iter->src->width) < 0)
782 memcpy (iter->string, v->s, iter->src->width);
786 iter->dbl[0] = min (iter->dbl[0], v->f);
790 if (memcmp (iter->string, v->s, iter->src->width) > 0)
791 memcpy (iter->string, v->s, iter->src->width);
796 if (v->f > iter->arg[0].f)
797 iter->dbl[0] += weight;
798 iter->dbl[1] += weight;
802 if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
803 iter->dbl[0] += weight;
804 iter->dbl[1] += weight;
808 if (v->f < iter->arg[0].f)
809 iter->dbl[0] += weight;
810 iter->dbl[1] += weight;
814 if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
815 iter->dbl[0] += weight;
816 iter->dbl[1] += weight;
820 if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
821 iter->dbl[0] += weight;
822 iter->dbl[1] += weight;
826 if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
827 && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
828 iter->dbl[0] += weight;
829 iter->dbl[1] += weight;
833 if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
834 iter->dbl[0] += weight;
835 iter->dbl[1] += weight;
839 if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
840 || memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
841 iter->dbl[0] += weight;
842 iter->dbl[1] += weight;
846 iter->dbl[0] += weight;
859 case FIRST | FSTRING:
862 memcpy (iter->string, v->s, iter->src->width);
871 memcpy (iter->string, v->s, iter->src->width);
875 case NMISS | FSTRING:
877 case NUMISS | FSTRING:
878 /* Our value is not missing or it would have been
879 caught earlier. Nothing to do. */
885 switch (iter->function)
888 iter->dbl[0] += weight;
899 /* We've come to a record that differs from the previous in one or
900 more of the break variables. Make an output record from the
901 accumulated statistics in the OUTPUT case. */
903 dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
909 for (i = 0; i < agr->break_var_cnt; i++)
911 struct variable *v = agr->break_vars[i];
912 memcpy (case_data_rw (output, value_idx),
913 case_data (&agr->break_case, v->fv),
914 sizeof (union value) * v->nv);
922 for (i = agr->agr_vars; i; i = i->next)
924 union value *v = case_data_rw (output, i->dest->fv);
926 if (agr->missing == COLUMNWISE && i->missing != 0
927 && (i->function & FUNC) != N && (i->function & FUNC) != NU
928 && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
930 if (i->dest->type == ALPHA)
931 memset (v->s, ' ', i->dest->width);
940 v->f = i->int1 ? i->dbl[0] : SYSMIS;
943 v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
949 /* FIXME: we should use two passes. */
950 moments1_calculate (i->moments, NULL, NULL, &variance,
952 if (variance != SYSMIS)
953 v->f = sqrt (variance);
960 v->f = i->int1 ? i->dbl[0] : SYSMIS;
965 memcpy (v->s, i->string, i->dest->width);
967 memset (v->s, ' ', i->dest->width);
977 v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
987 v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
999 v->f = i->int1 ? i->dbl[0] : SYSMIS;
1001 case FIRST | FSTRING:
1002 case LAST | FSTRING:
1004 memcpy (v->s, i->string, i->dest->width);
1006 memset (v->s, ' ', i->dest->width);
1015 case NMISS | FSTRING:
1019 case NUMISS | FSTRING:
1029 /* Resets the state for all the aggregate functions. */
1031 initialize_aggregate_info (struct agr_proc *agr, const struct ccase *input)
1033 struct agr_var *iter;
1035 case_destroy (&agr->break_case);
1036 case_clone (&agr->break_case, input);
1038 for (iter = agr->agr_vars; iter; iter = iter->next)
1041 iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1042 iter->int1 = iter->int2 = 0;
1043 switch (iter->function)
1046 iter->dbl[0] = DBL_MAX;
1049 memset (iter->string, 255, iter->src->width);
1052 iter->dbl[0] = -DBL_MAX;
1055 memset (iter->string, 0, iter->src->width);
1058 if (iter->moments == NULL)
1059 iter->moments = moments1_create (MOMENT_VARIANCE);
1061 moments1_clear (iter->moments);
1069 /* Aggregate each case as it comes through. Cases which aren't needed
1071 Returns true if successful, false if an I/O error occurred. */
1073 agr_to_active_file (struct ccase *c, void *agr_)
1075 struct agr_proc *agr = agr_;
1077 if (aggregate_single_case (agr, c, &agr->agr_case))
1078 return agr->sink->class->write (agr->sink, &agr->agr_case);
1083 /* Aggregate the current case and output it if we passed a
1086 presorted_agr_to_sysfile (struct ccase *c, void *agr_)
1088 struct agr_proc *agr = agr_;
1090 if (aggregate_single_case (agr, c, &agr->agr_case))
1091 return any_writer_write (agr->writer, &agr->agr_case);