1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24 #include <data/any-writer.h>
25 #include <data/case-sink.h>
26 #include <data/case.h>
27 #include <data/casefile.h>
28 #include <data/dictionary.h>
29 #include <data/file-handle-def.h>
30 #include <data/procedure.h>
31 #include <data/settings.h>
32 #include <data/storage-stream.h>
33 #include <data/sys-file-writer.h>
34 #include <data/variable.h>
35 #include <language/command.h>
36 #include <language/data-io/file-handle.h>
37 #include <language/lexer/lexer.h>
38 #include <language/lexer/variable-parser.h>
39 #include <language/stats/sort-criteria.h>
40 #include <libpspp/alloc.h>
41 #include <libpspp/assertion.h>
42 #include <libpspp/message.h>
43 #include <libpspp/misc.h>
44 #include <libpspp/pool.h>
45 #include <libpspp/str.h>
46 #include <math/moments.h>
47 #include <math/sort.h>
50 #define _(msgid) gettext (msgid)
52 /* Argument for AGGREGATE function. */
55 double f; /* Numeric. */
56 char *c; /* Short or long string. */
59 /* Specifies how to make an aggregate variable. */
62 struct agr_var *next; /* Next in list. */
64 /* Collected during parsing. */
65 struct variable *src; /* Source variable. */
66 struct variable *dest; /* Target variable. */
67 int function; /* Function. */
68 int include_missing; /* 1=Include user-missing values. */
69 union agr_argument arg[2]; /* Arguments. */
71 /* Accumulated during AGGREGATE execution. */
76 struct moments1 *moments;
79 /* Aggregation functions. */
82 NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
83 FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
84 N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
85 FUNC = 0x1f, /* Function mask. */
86 FSTRING = 1<<5, /* String function bit. */
89 /* Attributes of an aggregation function. */
92 const char *name; /* Aggregation function name. */
93 size_t n_args; /* Number of arguments. */
94 int alpha_type; /* When given ALPHA arguments, output type. */
95 struct fmt_spec format; /* Format spec if alpha_type != ALPHA. */
98 /* Attributes of aggregation functions. */
99 static const struct agr_func agr_func_tab[] =
101 {"<NONE>", 0, -1, {0, 0, 0}},
102 {"SUM", 0, -1, {FMT_F, 8, 2}},
103 {"MEAN", 0, -1, {FMT_F, 8, 2}},
104 {"SD", 0, -1, {FMT_F, 8, 2}},
105 {"MAX", 0, ALPHA, {-1, -1, -1}},
106 {"MIN", 0, ALPHA, {-1, -1, -1}},
107 {"PGT", 1, NUMERIC, {FMT_F, 5, 1}},
108 {"PLT", 1, NUMERIC, {FMT_F, 5, 1}},
109 {"PIN", 2, NUMERIC, {FMT_F, 5, 1}},
110 {"POUT", 2, NUMERIC, {FMT_F, 5, 1}},
111 {"FGT", 1, NUMERIC, {FMT_F, 5, 3}},
112 {"FLT", 1, NUMERIC, {FMT_F, 5, 3}},
113 {"FIN", 2, NUMERIC, {FMT_F, 5, 3}},
114 {"FOUT", 2, NUMERIC, {FMT_F, 5, 3}},
115 {"N", 0, NUMERIC, {FMT_F, 7, 0}},
116 {"NU", 0, NUMERIC, {FMT_F, 7, 0}},
117 {"NMISS", 0, NUMERIC, {FMT_F, 7, 0}},
118 {"NUMISS", 0, NUMERIC, {FMT_F, 7, 0}},
119 {"FIRST", 0, ALPHA, {-1, -1, -1}},
120 {"LAST", 0, ALPHA, {-1, -1, -1}},
121 {NULL, 0, -1, {-1, -1, -1}},
122 {"N", 0, NUMERIC, {FMT_F, 7, 0}},
123 {"NU", 0, NUMERIC, {FMT_F, 7, 0}},
126 /* Missing value types. */
127 enum missing_treatment
129 ITEMWISE, /* Missing values item by item. */
130 COLUMNWISE /* Missing values column by column. */
133 /* An entire AGGREGATE procedure. */
136 /* We have either an output file or a sink. */
137 struct any_writer *writer; /* Output file, or null if none. */
138 struct case_sink *sink; /* Sink, or null if none. */
140 /* Break variables. */
141 struct sort_criteria *sort; /* Sort criteria. */
142 struct variable **break_vars; /* Break variables. */
143 size_t break_var_cnt; /* Number of break variables. */
144 struct ccase break_case; /* Last values of break variables. */
146 enum missing_treatment missing; /* How to treat missing values. */
147 struct agr_var *agr_vars; /* First aggregate variable. */
148 struct dictionary *dict; /* Aggregate dictionary. */
149 int case_cnt; /* Counts aggregated cases. */
150 struct ccase agr_case; /* Aggregate case for output. */
153 static void initialize_aggregate_info (struct agr_proc *,
154 const struct ccase *);
157 static int parse_aggregate_functions (struct agr_proc *);
158 static void agr_destroy (struct agr_proc *);
159 static int aggregate_single_case (struct agr_proc *agr,
160 const struct ccase *input,
161 struct ccase *output);
162 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
164 /* Aggregating to the active file. */
165 static bool agr_to_active_file (const struct ccase *, void *aux);
167 /* Aggregating to a system file. */
168 static bool presorted_agr_to_sysfile (const struct ccase *, void *aux);
172 /* Parses and executes the AGGREGATE procedure. */
177 struct file_handle *out_file = NULL;
179 bool copy_documents = false;
180 bool presorted = false;
183 memset(&agr, 0 , sizeof (agr));
184 agr.missing = ITEMWISE;
185 case_nullify (&agr.break_case);
187 agr.dict = dict_create ();
188 dict_set_label (agr.dict, dict_get_label (default_dict));
189 dict_set_documents (agr.dict, dict_get_documents (default_dict));
191 /* OUTFILE subcommand must be first. */
192 if (!lex_force_match_id ("OUTFILE"))
195 if (!lex_match ('*'))
197 out_file = fh_parse (FH_REF_FILE | FH_REF_SCRATCH);
198 if (out_file == NULL)
202 /* Read most of the subcommands. */
207 if (lex_match_id ("MISSING"))
210 if (!lex_match_id ("COLUMNWISE"))
212 lex_error (_("while expecting COLUMNWISE"));
215 agr.missing = COLUMNWISE;
217 else if (lex_match_id ("DOCUMENT"))
218 copy_documents = true;
219 else if (lex_match_id ("PRESORTED"))
221 else if (lex_match_id ("BREAK"))
226 agr.sort = sort_parse_criteria (default_dict,
227 &agr.break_vars, &agr.break_var_cnt,
228 &saw_direction, NULL);
229 if (agr.sort == NULL)
232 for (i = 0; i < agr.break_var_cnt; i++)
233 dict_clone_var_assert (agr.dict, agr.break_vars[i],
234 agr.break_vars[i]->name);
236 /* BREAK must follow the options. */
241 lex_error (_("expecting BREAK"));
245 if (presorted && saw_direction)
246 msg (SW, _("When PRESORTED is specified, specifying sorting directions "
247 "with (A) or (D) has no effect. Output data will be sorted "
248 "the same way as the input data."));
250 /* Read in the aggregate functions. */
252 if (!parse_aggregate_functions (&agr))
255 /* Delete documents. */
257 dict_set_documents (agr.dict, NULL);
259 /* Cancel SPLIT FILE. */
260 dict_set_split_vars (agr.dict, NULL, 0);
264 case_create (&agr.agr_case, dict_get_next_value_idx (agr.dict));
266 /* Output to active file or external file? */
267 if (out_file == NULL)
269 /* The active file will be replaced by the aggregated data,
270 so TEMPORARY is moot. */
271 proc_cancel_temporary_transformations ();
273 if (agr.sort != NULL && !presorted)
275 if (!sort_active_file_in_place (agr.sort))
279 agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
280 if (agr.sink->class->open != NULL)
281 agr.sink->class->open (agr.sink);
282 proc_set_sink (create_case_sink (&null_sink_class, default_dict, NULL));
283 if (!procedure (agr_to_active_file, &agr))
285 if (agr.case_cnt > 0)
287 dump_aggregate_info (&agr, &agr.agr_case);
288 if (!agr.sink->class->write (agr.sink, &agr.agr_case))
291 discard_variables ();
292 dict_destroy (default_dict);
293 default_dict = agr.dict;
295 proc_set_source (agr.sink->class->make_source (agr.sink));
296 free_case_sink (agr.sink);
300 agr.writer = any_writer_open (out_file, agr.dict);
301 if (agr.writer == NULL)
304 if (agr.sort != NULL && !presorted)
306 /* Sorting is needed. */
307 struct casefile *dst;
308 struct casereader *reader;
312 dst = sort_active_file_to_casefile (agr.sort);
315 reader = casefile_get_destructive_reader (dst);
316 while (ok && casereader_read_xfer (reader, &c))
318 if (aggregate_single_case (&agr, &c, &agr.agr_case))
319 ok = any_writer_write (agr.writer, &agr.agr_case);
322 casereader_destroy (reader);
324 ok = !casefile_error (dst);
325 casefile_destroy (dst);
331 /* Active file is already sorted. */
332 if (!procedure (presorted_agr_to_sysfile, &agr))
336 if (agr.case_cnt > 0)
338 dump_aggregate_info (&agr, &agr.agr_case);
339 any_writer_write (agr.writer, &agr.agr_case);
341 if (any_writer_error (agr.writer))
350 return CMD_CASCADING_FAILURE;
353 /* Parse all the aggregate functions. */
355 parse_aggregate_functions (struct agr_proc *agr)
357 struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
359 /* Parse everything. */
368 const struct agr_func *function;
371 union agr_argument arg[2];
373 struct variable **src;
387 /* Parse the list of target variables. */
388 while (!lex_match ('='))
390 size_t n_dest_prev = n_dest;
392 if (!parse_DATA_LIST_vars (&dest, &n_dest,
393 PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
396 /* Assign empty labels. */
400 dest_label = xnrealloc (dest_label, n_dest, sizeof *dest_label);
401 for (j = n_dest_prev; j < n_dest; j++)
402 dest_label[j] = NULL;
405 if (token == T_STRING)
407 ds_truncate (&tokstr, 255);
408 dest_label[n_dest - 1] = ds_xstrdup (&tokstr);
413 /* Get the name of the aggregation function. */
416 lex_error (_("expecting aggregation function"));
421 if (tokid[strlen (tokid) - 1] == '.')
424 tokid[strlen (tokid) - 1] = 0;
427 for (function = agr_func_tab; function->name; function++)
428 if (!strcasecmp (function->name, tokid))
430 if (NULL == function->name)
432 msg (SE, _("Unknown aggregation function %s."), tokid);
435 func_index = function - agr_func_tab;
438 /* Check for leading lparen. */
439 if (!lex_match ('('))
442 func_index = N_NO_VARS;
443 else if (func_index == NU)
444 func_index = NU_NO_VARS;
447 lex_error (_("expecting `('"));
453 /* Parse list of source variables. */
455 int pv_opts = PV_NO_SCRATCH;
457 if (func_index == SUM || func_index == MEAN || func_index == SD)
458 pv_opts |= PV_NUMERIC;
459 else if (function->n_args)
460 pv_opts |= PV_SAME_TYPE;
462 if (!parse_variables (default_dict, &src, &n_src, pv_opts))
466 /* Parse function arguments, for those functions that
467 require arguments. */
468 if (function->n_args != 0)
469 for (i = 0; i < function->n_args; i++)
474 if (token == T_STRING)
476 arg[i].c = ds_xstrdup (&tokstr);
479 else if (lex_is_number ())
484 msg (SE, _("Missing argument %d to %s."), i + 1,
491 if (type != src[0]->type)
493 msg (SE, _("Arguments to %s must be of same type as "
494 "source variables."),
500 /* Trailing rparen. */
503 lex_error (_("expecting `)'"));
507 /* Now check that the number of source variables match
508 the number of target variables. If we check earlier
509 than this, the user can get very misleading error
510 message, i.e. `AGGREGATE x=SUM(y t).' will get this
511 error message when a proper message would be more
512 like `unknown variable t'. */
515 msg (SE, _("Number of source variables (%u) does not match "
516 "number of target variables (%u)."),
517 (unsigned) n_src, (unsigned) n_dest);
521 if ((func_index == PIN || func_index == POUT
522 || func_index == FIN || func_index == FOUT)
523 && ((src[0]->type == NUMERIC && arg[0].f > arg[1].f)
524 || (src[0]->type == ALPHA
525 && str_compare_rpad (arg[0].c, arg[1].c) > 0)))
527 union agr_argument t = arg[0];
531 msg (SW, _("The value arguments passed to the %s function "
532 "are out-of-order. They will be treated as if "
533 "they had been specified in the correct order."),
538 /* Finally add these to the linked list of aggregation
540 for (i = 0; i < n_dest; i++)
542 struct agr_var *v = xmalloc (sizeof *v);
544 /* Add variable to chain. */
545 if (agr->agr_vars != NULL)
553 /* Create the target variable in the aggregate
556 struct variable *destvar;
558 v->function = func_index;
564 if (src[i]->type == ALPHA)
566 v->function |= FSTRING;
567 v->string = xmalloc (src[i]->width);
570 if (function->alpha_type == ALPHA)
571 destvar = dict_clone_var (agr->dict, v->src, dest[i]);
574 assert (v->src->type == NUMERIC
575 || function->alpha_type == NUMERIC);
576 destvar = dict_create_var (agr->dict, dest[i], 0);
579 if ((func_index == N || func_index == NMISS)
580 && dict_get_weight (default_dict) != NULL)
581 destvar->print = destvar->write = f8_2;
583 destvar->print = destvar->write = function->format;
588 destvar = dict_create_var (agr->dict, dest[i], 0);
589 if (func_index == N_NO_VARS
590 && dict_get_weight (default_dict) != NULL)
591 destvar->print = destvar->write = f8_2;
593 destvar->print = destvar->write = function->format;
598 msg (SE, _("Variable name %s is not unique within the "
599 "aggregate file dictionary, which contains "
600 "the aggregate variables and the break "
609 destvar->label = dest_label[i];
610 dest_label[i] = NULL;
616 v->include_missing = include_missing;
622 if (v->src->type == NUMERIC)
623 for (j = 0; j < function->n_args; j++)
624 v->arg[j].f = arg[j].f;
626 for (j = 0; j < function->n_args; j++)
627 v->arg[j].c = xstrdup (arg[j].c);
631 if (src != NULL && src[0]->type == ALPHA)
632 for (i = 0; i < function->n_args; i++)
642 if (!lex_match ('/'))
647 lex_error ("expecting end of command");
653 for (i = 0; i < n_dest; i++)
656 free (dest_label[i]);
662 if (src && n_src && src[0]->type == ALPHA)
663 for (i = 0; i < function->n_args; i++)
676 agr_destroy (struct agr_proc *agr)
678 struct agr_var *iter, *next;
680 any_writer_close (agr->writer);
681 if (agr->sort != NULL)
682 sort_destroy_criteria (agr->sort);
683 free (agr->break_vars);
684 case_destroy (&agr->break_case);
685 for (iter = agr->agr_vars; iter; iter = next)
689 if (iter->function & FSTRING)
694 n_args = agr_func_tab[iter->function & FUNC].n_args;
695 for (i = 0; i < n_args; i++)
696 free (iter->arg[i].c);
699 else if (iter->function == SD)
700 moments1_destroy (iter->moments);
703 if (agr->dict != NULL)
704 dict_destroy (agr->dict);
706 case_destroy (&agr->agr_case);
711 static void accumulate_aggregate_info (struct agr_proc *,
712 const struct ccase *);
713 static void dump_aggregate_info (struct agr_proc *, struct ccase *);
715 /* Processes a single case INPUT for aggregation. If output is
716 warranted, writes it to OUTPUT and returns nonzero.
717 Otherwise, returns zero and OUTPUT is unmodified. */
719 aggregate_single_case (struct agr_proc *agr,
720 const struct ccase *input, struct ccase *output)
722 bool finished_group = false;
724 if (agr->case_cnt++ == 0)
725 initialize_aggregate_info (agr, input);
726 else if (case_compare (&agr->break_case, input,
727 agr->break_vars, agr->break_var_cnt))
729 dump_aggregate_info (agr, output);
730 finished_group = true;
732 initialize_aggregate_info (agr, input);
735 accumulate_aggregate_info (agr, input);
736 return finished_group;
739 /* Accumulates aggregation data from the case INPUT. */
741 accumulate_aggregate_info (struct agr_proc *agr,
742 const struct ccase *input)
744 struct agr_var *iter;
748 weight = dict_get_case_weight (default_dict, input, &bad_warn);
750 for (iter = agr->agr_vars; iter; iter = iter->next)
753 const union value *v = case_data (input, iter->src->fv);
755 if ((!iter->include_missing
756 && mv_is_value_missing (&iter->src->miss, v))
757 || (iter->include_missing && iter->src->type == NUMERIC
760 switch (iter->function)
763 case NMISS | FSTRING:
764 iter->dbl[0] += weight;
767 case NUMISS | FSTRING:
775 /* This is horrible. There are too many possibilities. */
776 switch (iter->function)
779 iter->dbl[0] += v->f * weight;
783 iter->dbl[0] += v->f * weight;
784 iter->dbl[1] += weight;
787 moments1_add (iter->moments, v->f, weight);
790 iter->dbl[0] = max (iter->dbl[0], v->f);
794 if (memcmp (iter->string, v->s, iter->src->width) < 0)
795 memcpy (iter->string, v->s, iter->src->width);
799 iter->dbl[0] = min (iter->dbl[0], v->f);
803 if (memcmp (iter->string, v->s, iter->src->width) > 0)
804 memcpy (iter->string, v->s, iter->src->width);
809 if (v->f > iter->arg[0].f)
810 iter->dbl[0] += weight;
811 iter->dbl[1] += weight;
815 if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
816 iter->dbl[0] += weight;
817 iter->dbl[1] += weight;
821 if (v->f < iter->arg[0].f)
822 iter->dbl[0] += weight;
823 iter->dbl[1] += weight;
827 if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
828 iter->dbl[0] += weight;
829 iter->dbl[1] += weight;
833 if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
834 iter->dbl[0] += weight;
835 iter->dbl[1] += weight;
839 if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
840 && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
841 iter->dbl[0] += weight;
842 iter->dbl[1] += weight;
846 if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
847 iter->dbl[0] += weight;
848 iter->dbl[1] += weight;
852 if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
853 || memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
854 iter->dbl[0] += weight;
855 iter->dbl[1] += weight;
859 iter->dbl[0] += weight;
872 case FIRST | FSTRING:
875 memcpy (iter->string, v->s, iter->src->width);
884 memcpy (iter->string, v->s, iter->src->width);
888 case NMISS | FSTRING:
890 case NUMISS | FSTRING:
891 /* Our value is not missing or it would have been
892 caught earlier. Nothing to do. */
898 switch (iter->function)
901 iter->dbl[0] += weight;
912 /* We've come to a record that differs from the previous in one or
913 more of the break variables. Make an output record from the
914 accumulated statistics in the OUTPUT case. */
916 dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
922 for (i = 0; i < agr->break_var_cnt; i++)
924 struct variable *v = agr->break_vars[i];
925 memcpy (case_data_rw (output, value_idx),
926 case_data (&agr->break_case, v->fv),
927 sizeof (union value) * v->nv);
935 for (i = agr->agr_vars; i; i = i->next)
937 union value *v = case_data_rw (output, i->dest->fv);
939 if (agr->missing == COLUMNWISE && i->missing != 0
940 && (i->function & FUNC) != N && (i->function & FUNC) != NU
941 && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
943 if (i->dest->type == ALPHA)
944 memset (v->s, ' ', i->dest->width);
953 v->f = i->int1 ? i->dbl[0] : SYSMIS;
956 v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
962 /* FIXME: we should use two passes. */
963 moments1_calculate (i->moments, NULL, NULL, &variance,
965 if (variance != SYSMIS)
966 v->f = sqrt (variance);
973 v->f = i->int1 ? i->dbl[0] : SYSMIS;
978 memcpy (v->s, i->string, i->dest->width);
980 memset (v->s, ' ', i->dest->width);
990 v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
1000 v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
1012 v->f = i->int1 ? i->dbl[0] : SYSMIS;
1014 case FIRST | FSTRING:
1015 case LAST | FSTRING:
1017 memcpy (v->s, i->string, i->dest->width);
1019 memset (v->s, ' ', i->dest->width);
1028 case NMISS | FSTRING:
1032 case NUMISS | FSTRING:
1042 /* Resets the state for all the aggregate functions. */
1044 initialize_aggregate_info (struct agr_proc *agr, const struct ccase *input)
1046 struct agr_var *iter;
1048 case_destroy (&agr->break_case);
1049 case_clone (&agr->break_case, input);
1051 for (iter = agr->agr_vars; iter; iter = iter->next)
1054 iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1055 iter->int1 = iter->int2 = 0;
1056 switch (iter->function)
1059 iter->dbl[0] = DBL_MAX;
1062 memset (iter->string, 255, iter->src->width);
1065 iter->dbl[0] = -DBL_MAX;
1068 memset (iter->string, 0, iter->src->width);
1071 if (iter->moments == NULL)
1072 iter->moments = moments1_create (MOMENT_VARIANCE);
1074 moments1_clear (iter->moments);
1082 /* Aggregate each case as it comes through. Cases which aren't needed
1084 Returns true if successful, false if an I/O error occurred. */
1086 agr_to_active_file (const struct ccase *c, void *agr_)
1088 struct agr_proc *agr = agr_;
1090 if (aggregate_single_case (agr, c, &agr->agr_case))
1091 return agr->sink->class->write (agr->sink, &agr->agr_case);
1096 /* Aggregate the current case and output it if we passed a
1099 presorted_agr_to_sysfile (const struct ccase *c, void *agr_)
1101 struct agr_proc *agr = agr_;
1103 if (aggregate_single_case (agr, c, &agr->agr_case))
1104 return any_writer_write (agr->writer, &agr->agr_case);