1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24 #include <data/any-writer.h>
25 #include <data/case-sink.h>
26 #include <data/case.h>
27 #include <data/casefile.h>
28 #include <data/dictionary.h>
29 #include <data/file-handle-def.h>
30 #include <data/settings.h>
31 #include <data/storage-stream.h>
32 #include <data/sys-file-writer.h>
33 #include <data/variable.h>
34 #include <language/command.h>
35 #include <language/data-io/file-handle.h>
36 #include <language/lexer/lexer.h>
37 #include <language/stats/sort-criteria.h>
38 #include <libpspp/alloc.h>
39 #include <libpspp/message.h>
40 #include <libpspp/message.h>
41 #include <libpspp/misc.h>
42 #include <libpspp/pool.h>
43 #include <libpspp/str.h>
44 #include <math/moments.h>
45 #include <math/sort.h>
46 #include <procedure.h>
49 #define _(msgid) gettext (msgid)
51 /* Specifies how to make an aggregate variable. */
54 struct agr_var *next; /* Next in list. */
56 /* Collected during parsing. */
57 struct variable *src; /* Source variable. */
58 struct variable *dest; /* Target variable. */
59 int function; /* Function. */
60 int include_missing; /* 1=Include user-missing values. */
61 union value arg[2]; /* Arguments. */
63 /* Accumulated during AGGREGATE execution. */
68 struct moments1 *moments;
71 /* Aggregation functions. */
74 NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
75 FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
76 N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
77 FUNC = 0x1f, /* Function mask. */
78 FSTRING = 1<<5, /* String function bit. */
81 /* Attributes of an aggregation function. */
84 const char *name; /* Aggregation function name. */
85 size_t n_args; /* Number of arguments. */
86 int alpha_type; /* When given ALPHA arguments, output type. */
87 struct fmt_spec format; /* Format spec if alpha_type != ALPHA. */
90 /* Attributes of aggregation functions. */
91 static const struct agr_func agr_func_tab[] =
93 {"<NONE>", 0, -1, {0, 0, 0}},
94 {"SUM", 0, -1, {FMT_F, 8, 2}},
95 {"MEAN", 0, -1, {FMT_F, 8, 2}},
96 {"SD", 0, -1, {FMT_F, 8, 2}},
97 {"MAX", 0, ALPHA, {-1, -1, -1}},
98 {"MIN", 0, ALPHA, {-1, -1, -1}},
99 {"PGT", 1, NUMERIC, {FMT_F, 5, 1}},
100 {"PLT", 1, NUMERIC, {FMT_F, 5, 1}},
101 {"PIN", 2, NUMERIC, {FMT_F, 5, 1}},
102 {"POUT", 2, NUMERIC, {FMT_F, 5, 1}},
103 {"FGT", 1, NUMERIC, {FMT_F, 5, 3}},
104 {"FLT", 1, NUMERIC, {FMT_F, 5, 3}},
105 {"FIN", 2, NUMERIC, {FMT_F, 5, 3}},
106 {"FOUT", 2, NUMERIC, {FMT_F, 5, 3}},
107 {"N", 0, NUMERIC, {FMT_F, 7, 0}},
108 {"NU", 0, NUMERIC, {FMT_F, 7, 0}},
109 {"NMISS", 0, NUMERIC, {FMT_F, 7, 0}},
110 {"NUMISS", 0, NUMERIC, {FMT_F, 7, 0}},
111 {"FIRST", 0, ALPHA, {-1, -1, -1}},
112 {"LAST", 0, ALPHA, {-1, -1, -1}},
113 {NULL, 0, -1, {-1, -1, -1}},
114 {"N", 0, NUMERIC, {FMT_F, 7, 0}},
115 {"NU", 0, NUMERIC, {FMT_F, 7, 0}},
118 /* Missing value types. */
119 enum missing_treatment
121 ITEMWISE, /* Missing values item by item. */
122 COLUMNWISE /* Missing values column by column. */
125 /* An entire AGGREGATE procedure. */
128 /* We have either an output file or a sink. */
129 struct any_writer *writer; /* Output file, or null if none. */
130 struct case_sink *sink; /* Sink, or null if none. */
132 /* Break variables. */
133 struct sort_criteria *sort; /* Sort criteria. */
134 struct variable **break_vars; /* Break variables. */
135 size_t break_var_cnt; /* Number of break variables. */
136 struct ccase break_case; /* Last values of break variables. */
138 enum missing_treatment missing; /* How to treat missing values. */
139 struct agr_var *agr_vars; /* First aggregate variable. */
140 struct dictionary *dict; /* Aggregate dictionary. */
141 int case_cnt; /* Counts aggregated cases. */
142 struct ccase agr_case; /* Aggregate case for output. */
145 static void initialize_aggregate_info (struct agr_proc *,
146 const struct ccase *);
149 static int parse_aggregate_functions (struct agr_proc *);
150 static void agr_destroy (struct agr_proc *);
151 static int aggregate_single_case (struct agr_proc *agr,
152 const struct ccase *input,
153 struct ccase *output);
154 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
156 /* Aggregating to the active file. */
157 static bool agr_to_active_file (struct ccase *, void *aux);
159 /* Aggregating to a system file. */
160 static bool presorted_agr_to_sysfile (struct ccase *, void *aux);
164 /* Parses and executes the AGGREGATE procedure. */
169 struct file_handle *out_file = NULL;
171 bool copy_documents = false;
172 bool presorted = false;
175 memset(&agr, 0 , sizeof (agr));
176 agr.missing = ITEMWISE;
177 case_nullify (&agr.break_case);
179 agr.dict = dict_create ();
180 dict_set_label (agr.dict, dict_get_label (default_dict));
181 dict_set_documents (agr.dict, dict_get_documents (default_dict));
183 /* OUTFILE subcommand must be first. */
184 if (!lex_force_match_id ("OUTFILE"))
187 if (!lex_match ('*'))
189 out_file = fh_parse (FH_REF_FILE | FH_REF_SCRATCH);
190 if (out_file == NULL)
194 /* Read most of the subcommands. */
199 if (lex_match_id ("MISSING"))
202 if (!lex_match_id ("COLUMNWISE"))
204 lex_error (_("while expecting COLUMNWISE"));
207 agr.missing = COLUMNWISE;
209 else if (lex_match_id ("DOCUMENT"))
210 copy_documents = true;
211 else if (lex_match_id ("PRESORTED"))
213 else if (lex_match_id ("BREAK"))
218 agr.sort = sort_parse_criteria (default_dict,
219 &agr.break_vars, &agr.break_var_cnt,
220 &saw_direction, NULL);
221 if (agr.sort == NULL)
224 for (i = 0; i < agr.break_var_cnt; i++)
225 dict_clone_var_assert (agr.dict, agr.break_vars[i],
226 agr.break_vars[i]->name);
228 /* BREAK must follow the options. */
233 lex_error (_("expecting BREAK"));
237 if (presorted && saw_direction)
238 msg (SW, _("When PRESORTED is specified, specifying sorting directions "
239 "with (A) or (D) has no effect. Output data will be sorted "
240 "the same way as the input data."));
242 /* Read in the aggregate functions. */
244 if (!parse_aggregate_functions (&agr))
247 /* Delete documents. */
249 dict_set_documents (agr.dict, NULL);
251 /* Cancel SPLIT FILE. */
252 dict_set_split_vars (agr.dict, NULL, 0);
256 case_create (&agr.agr_case, dict_get_next_value_idx (agr.dict));
258 /* Output to active file or external file? */
259 if (out_file == NULL)
261 /* The active file will be replaced by the aggregated data,
262 so TEMPORARY is moot. */
265 if (agr.sort != NULL && !presorted)
267 if (!sort_active_file_in_place (agr.sort))
271 agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
272 if (agr.sink->class->open != NULL)
273 agr.sink->class->open (agr.sink);
274 vfm_sink = create_case_sink (&null_sink_class, default_dict, NULL);
275 if (!procedure (agr_to_active_file, &agr))
277 if (agr.case_cnt > 0)
279 dump_aggregate_info (&agr, &agr.agr_case);
280 if (!agr.sink->class->write (agr.sink, &agr.agr_case))
283 dict_destroy (default_dict);
284 default_dict = agr.dict;
286 vfm_source = agr.sink->class->make_source (agr.sink);
287 free_case_sink (agr.sink);
291 agr.writer = any_writer_open (out_file, agr.dict);
292 if (agr.writer == NULL)
295 if (agr.sort != NULL && !presorted)
297 /* Sorting is needed. */
298 struct casefile *dst;
299 struct casereader *reader;
303 dst = sort_active_file_to_casefile (agr.sort);
306 reader = casefile_get_destructive_reader (dst);
307 while (ok && casereader_read_xfer (reader, &c))
309 if (aggregate_single_case (&agr, &c, &agr.agr_case))
310 ok = any_writer_write (agr.writer, &agr.agr_case);
313 casereader_destroy (reader);
315 ok = !casefile_error (dst);
316 casefile_destroy (dst);
322 /* Active file is already sorted. */
323 if (!procedure (presorted_agr_to_sysfile, &agr))
327 if (agr.case_cnt > 0)
329 dump_aggregate_info (&agr, &agr.agr_case);
330 any_writer_write (agr.writer, &agr.agr_case);
332 if (any_writer_error (agr.writer))
341 return CMD_CASCADING_FAILURE;
344 /* Parse all the aggregate functions. */
346 parse_aggregate_functions (struct agr_proc *agr)
348 struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
350 /* Parse everything. */
359 const struct agr_func *function;
364 struct variable **src;
378 /* Parse the list of target variables. */
379 while (!lex_match ('='))
381 size_t n_dest_prev = n_dest;
383 if (!parse_DATA_LIST_vars (&dest, &n_dest,
384 PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
387 /* Assign empty labels. */
391 dest_label = xnrealloc (dest_label, n_dest, sizeof *dest_label);
392 for (j = n_dest_prev; j < n_dest; j++)
393 dest_label[j] = NULL;
396 if (token == T_STRING)
398 ds_truncate (&tokstr, 255);
399 dest_label[n_dest - 1] = xstrdup (ds_c_str (&tokstr));
404 /* Get the name of the aggregation function. */
407 lex_error (_("expecting aggregation function"));
412 if (tokid[strlen (tokid) - 1] == '.')
415 tokid[strlen (tokid) - 1] = 0;
418 for (function = agr_func_tab; function->name; function++)
419 if (!strcasecmp (function->name, tokid))
421 if (NULL == function->name)
423 msg (SE, _("Unknown aggregation function %s."), tokid);
426 func_index = function - agr_func_tab;
429 /* Check for leading lparen. */
430 if (!lex_match ('('))
433 func_index = N_NO_VARS;
434 else if (func_index == NU)
435 func_index = NU_NO_VARS;
438 lex_error (_("expecting `('"));
444 /* Parse list of source variables. */
446 int pv_opts = PV_NO_SCRATCH;
448 if (func_index == SUM || func_index == MEAN || func_index == SD)
449 pv_opts |= PV_NUMERIC;
450 else if (function->n_args)
451 pv_opts |= PV_SAME_TYPE;
453 if (!parse_variables (default_dict, &src, &n_src, pv_opts))
457 /* Parse function arguments, for those functions that
458 require arguments. */
459 if (function->n_args != 0)
460 for (i = 0; i < function->n_args; i++)
465 if (token == T_STRING)
467 arg[i].c = xstrdup (ds_c_str (&tokstr));
470 else if (lex_is_number ())
475 msg (SE, _("Missing argument %d to %s."), i + 1,
482 if (type != src[0]->type)
484 msg (SE, _("Arguments to %s must be of same type as "
485 "source variables."),
491 /* Trailing rparen. */
494 lex_error (_("expecting `)'"));
498 /* Now check that the number of source variables match
499 the number of target variables. If we check earlier
500 than this, the user can get very misleading error
501 message, i.e. `AGGREGATE x=SUM(y t).' will get this
502 error message when a proper message would be more
503 like `unknown variable t'. */
506 msg (SE, _("Number of source variables (%u) does not match "
507 "number of target variables (%u)."),
508 (unsigned) n_src, (unsigned) n_dest);
512 if ((func_index == PIN || func_index == POUT
513 || func_index == FIN || func_index == FOUT)
514 && ((src[0]->type == NUMERIC && arg[0].f > arg[1].f)
515 || (src[0]->type == ALPHA
516 && str_compare_rpad (arg[0].c, arg[1].c) > 0)))
518 union value t = arg[0];
522 msg (SW, _("The value arguments passed to the %s function "
523 "are out-of-order. They will be treated as if "
524 "they had been specified in the correct order."),
529 /* Finally add these to the linked list of aggregation
531 for (i = 0; i < n_dest; i++)
533 struct agr_var *v = xmalloc (sizeof *v);
535 /* Add variable to chain. */
536 if (agr->agr_vars != NULL)
544 /* Create the target variable in the aggregate
547 struct variable *destvar;
549 v->function = func_index;
555 if (src[i]->type == ALPHA)
557 v->function |= FSTRING;
558 v->string = xmalloc (src[i]->width);
561 if (function->alpha_type == ALPHA)
562 destvar = dict_clone_var (agr->dict, v->src, dest[i]);
565 assert (v->src->type == NUMERIC
566 || function->alpha_type == NUMERIC);
567 destvar = dict_create_var (agr->dict, dest[i], 0);
570 if ((func_index == N || func_index == NMISS)
571 && dict_get_weight (default_dict) != NULL)
572 destvar->print = destvar->write = f8_2;
574 destvar->print = destvar->write = function->format;
579 destvar = dict_create_var (agr->dict, dest[i], 0);
580 if (func_index == N_NO_VARS
581 && dict_get_weight (default_dict) != NULL)
582 destvar->print = destvar->write = f8_2;
584 destvar->print = destvar->write = function->format;
589 msg (SE, _("Variable name %s is not unique within the "
590 "aggregate file dictionary, which contains "
591 "the aggregate variables and the break "
600 destvar->label = dest_label[i];
601 dest_label[i] = NULL;
607 v->include_missing = include_missing;
613 if (v->src->type == NUMERIC)
614 for (j = 0; j < function->n_args; j++)
615 v->arg[j].f = arg[j].f;
617 for (j = 0; j < function->n_args; j++)
618 v->arg[j].c = xstrdup (arg[j].c);
622 if (src != NULL && src[0]->type == ALPHA)
623 for (i = 0; i < function->n_args; i++)
633 if (!lex_match ('/'))
638 lex_error ("expecting end of command");
644 for (i = 0; i < n_dest; i++)
647 free (dest_label[i]);
653 if (src && n_src && src[0]->type == ALPHA)
654 for (i = 0; i < function->n_args; i++)
667 agr_destroy (struct agr_proc *agr)
669 struct agr_var *iter, *next;
671 any_writer_close (agr->writer);
672 if (agr->sort != NULL)
673 sort_destroy_criteria (agr->sort);
674 free (agr->break_vars);
675 case_destroy (&agr->break_case);
676 for (iter = agr->agr_vars; iter; iter = next)
680 if (iter->function & FSTRING)
685 n_args = agr_func_tab[iter->function & FUNC].n_args;
686 for (i = 0; i < n_args; i++)
687 free (iter->arg[i].c);
690 else if (iter->function == SD)
691 moments1_destroy (iter->moments);
694 if (agr->dict != NULL)
695 dict_destroy (agr->dict);
697 case_destroy (&agr->agr_case);
702 static void accumulate_aggregate_info (struct agr_proc *,
703 const struct ccase *);
704 static void dump_aggregate_info (struct agr_proc *, struct ccase *);
706 /* Processes a single case INPUT for aggregation. If output is
707 warranted, writes it to OUTPUT and returns nonzero.
708 Otherwise, returns zero and OUTPUT is unmodified. */
710 aggregate_single_case (struct agr_proc *agr,
711 const struct ccase *input, struct ccase *output)
713 bool finished_group = false;
715 if (agr->case_cnt++ == 0)
716 initialize_aggregate_info (agr, input);
717 else if (case_compare (&agr->break_case, input,
718 agr->break_vars, agr->break_var_cnt))
720 dump_aggregate_info (agr, output);
721 finished_group = true;
723 initialize_aggregate_info (agr, input);
726 accumulate_aggregate_info (agr, input);
727 return finished_group;
730 /* Accumulates aggregation data from the case INPUT. */
732 accumulate_aggregate_info (struct agr_proc *agr,
733 const struct ccase *input)
735 struct agr_var *iter;
739 weight = dict_get_case_weight (default_dict, input, &bad_warn);
741 for (iter = agr->agr_vars; iter; iter = iter->next)
744 const union value *v = case_data (input, iter->src->fv);
746 if ((!iter->include_missing
747 && mv_is_value_missing (&iter->src->miss, v))
748 || (iter->include_missing && iter->src->type == NUMERIC
751 switch (iter->function)
754 case NMISS | FSTRING:
755 iter->dbl[0] += weight;
758 case NUMISS | FSTRING:
766 /* This is horrible. There are too many possibilities. */
767 switch (iter->function)
770 iter->dbl[0] += v->f * weight;
774 iter->dbl[0] += v->f * weight;
775 iter->dbl[1] += weight;
778 moments1_add (iter->moments, v->f, weight);
781 iter->dbl[0] = max (iter->dbl[0], v->f);
785 if (memcmp (iter->string, v->s, iter->src->width) < 0)
786 memcpy (iter->string, v->s, iter->src->width);
790 iter->dbl[0] = min (iter->dbl[0], v->f);
794 if (memcmp (iter->string, v->s, iter->src->width) > 0)
795 memcpy (iter->string, v->s, iter->src->width);
800 if (v->f > iter->arg[0].f)
801 iter->dbl[0] += weight;
802 iter->dbl[1] += weight;
806 if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
807 iter->dbl[0] += weight;
808 iter->dbl[1] += weight;
812 if (v->f < iter->arg[0].f)
813 iter->dbl[0] += weight;
814 iter->dbl[1] += weight;
818 if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
819 iter->dbl[0] += weight;
820 iter->dbl[1] += weight;
824 if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
825 iter->dbl[0] += weight;
826 iter->dbl[1] += weight;
830 if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
831 && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
832 iter->dbl[0] += weight;
833 iter->dbl[1] += weight;
837 if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
838 iter->dbl[0] += weight;
839 iter->dbl[1] += weight;
843 if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
844 || memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
845 iter->dbl[0] += weight;
846 iter->dbl[1] += weight;
850 iter->dbl[0] += weight;
863 case FIRST | FSTRING:
866 memcpy (iter->string, v->s, iter->src->width);
875 memcpy (iter->string, v->s, iter->src->width);
879 case NMISS | FSTRING:
881 case NUMISS | FSTRING:
882 /* Our value is not missing or it would have been
883 caught earlier. Nothing to do. */
889 switch (iter->function)
892 iter->dbl[0] += weight;
903 /* We've come to a record that differs from the previous in one or
904 more of the break variables. Make an output record from the
905 accumulated statistics in the OUTPUT case. */
907 dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
913 for (i = 0; i < agr->break_var_cnt; i++)
915 struct variable *v = agr->break_vars[i];
916 memcpy (case_data_rw (output, value_idx),
917 case_data (&agr->break_case, v->fv),
918 sizeof (union value) * v->nv);
926 for (i = agr->agr_vars; i; i = i->next)
928 union value *v = case_data_rw (output, i->dest->fv);
930 if (agr->missing == COLUMNWISE && i->missing != 0
931 && (i->function & FUNC) != N && (i->function & FUNC) != NU
932 && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
934 if (i->dest->type == ALPHA)
935 memset (v->s, ' ', i->dest->width);
944 v->f = i->int1 ? i->dbl[0] : SYSMIS;
947 v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
953 /* FIXME: we should use two passes. */
954 moments1_calculate (i->moments, NULL, NULL, &variance,
956 if (variance != SYSMIS)
957 v->f = sqrt (variance);
964 v->f = i->int1 ? i->dbl[0] : SYSMIS;
969 memcpy (v->s, i->string, i->dest->width);
971 memset (v->s, ' ', i->dest->width);
981 v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
991 v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
1003 v->f = i->int1 ? i->dbl[0] : SYSMIS;
1005 case FIRST | FSTRING:
1006 case LAST | FSTRING:
1008 memcpy (v->s, i->string, i->dest->width);
1010 memset (v->s, ' ', i->dest->width);
1019 case NMISS | FSTRING:
1023 case NUMISS | FSTRING:
1033 /* Resets the state for all the aggregate functions. */
1035 initialize_aggregate_info (struct agr_proc *agr, const struct ccase *input)
1037 struct agr_var *iter;
1039 case_destroy (&agr->break_case);
1040 case_clone (&agr->break_case, input);
1042 for (iter = agr->agr_vars; iter; iter = iter->next)
1045 iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1046 iter->int1 = iter->int2 = 0;
1047 switch (iter->function)
1050 iter->dbl[0] = DBL_MAX;
1053 memset (iter->string, 255, iter->src->width);
1056 iter->dbl[0] = -DBL_MAX;
1059 memset (iter->string, 0, iter->src->width);
1062 if (iter->moments == NULL)
1063 iter->moments = moments1_create (MOMENT_VARIANCE);
1065 moments1_clear (iter->moments);
1073 /* Aggregate each case as it comes through. Cases which aren't needed
1075 Returns true if successful, false if an I/O error occurred. */
1077 agr_to_active_file (struct ccase *c, void *agr_)
1079 struct agr_proc *agr = agr_;
1081 if (aggregate_single_case (agr, c, &agr->agr_case))
1082 return agr->sink->class->write (agr->sink, &agr->agr_case);
1087 /* Aggregate the current case and output it if we passed a
1090 presorted_agr_to_sysfile (struct ccase *c, void *agr_)
1092 struct agr_proc *agr = agr_;
1094 if (aggregate_single_case (agr, c, &agr->agr_case))
1095 return any_writer_write (agr->writer, &agr->agr_case);