1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24 #include <data/any-writer.h>
25 #include <data/case-sink.h>
26 #include <data/case.h>
27 #include <data/casefile.h>
28 #include <data/dictionary.h>
29 #include <data/file-handle-def.h>
30 #include <data/procedure.h>
31 #include <data/settings.h>
32 #include <data/storage-stream.h>
33 #include <data/sys-file-writer.h>
34 #include <data/variable.h>
35 #include <language/command.h>
36 #include <language/data-io/file-handle.h>
37 #include <language/lexer/lexer.h>
38 #include <language/lexer/variable-parser.h>
39 #include <language/stats/sort-criteria.h>
40 #include <libpspp/alloc.h>
41 #include <libpspp/assertion.h>
42 #include <libpspp/message.h>
43 #include <libpspp/misc.h>
44 #include <libpspp/pool.h>
45 #include <libpspp/str.h>
46 #include <math/moments.h>
47 #include <math/sort.h>
50 #define _(msgid) gettext (msgid)
52 /* Argument for AGGREGATE function. */
55 double f; /* Numeric. */
56 char *c; /* Short or long string. */
59 /* Specifies how to make an aggregate variable. */
62 struct agr_var *next; /* Next in list. */
64 /* Collected during parsing. */
65 struct variable *src; /* Source variable. */
66 struct variable *dest; /* Target variable. */
67 int function; /* Function. */
68 int include_missing; /* 1=Include user-missing values. */
69 union agr_argument arg[2]; /* Arguments. */
71 /* Accumulated during AGGREGATE execution. */
76 struct moments1 *moments;
79 /* Aggregation functions. */
82 NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
83 FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
84 N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
85 FUNC = 0x1f, /* Function mask. */
86 FSTRING = 1<<5, /* String function bit. */
89 /* Attributes of an aggregation function. */
92 const char *name; /* Aggregation function name. */
93 size_t n_args; /* Number of arguments. */
94 int alpha_type; /* When given ALPHA arguments, output type. */
95 struct fmt_spec format; /* Format spec if alpha_type != ALPHA. */
98 /* Attributes of aggregation functions. */
99 static const struct agr_func agr_func_tab[] =
101 {"<NONE>", 0, -1, {0, 0, 0}},
102 {"SUM", 0, -1, {FMT_F, 8, 2}},
103 {"MEAN", 0, -1, {FMT_F, 8, 2}},
104 {"SD", 0, -1, {FMT_F, 8, 2}},
105 {"MAX", 0, ALPHA, {-1, -1, -1}},
106 {"MIN", 0, ALPHA, {-1, -1, -1}},
107 {"PGT", 1, NUMERIC, {FMT_F, 5, 1}},
108 {"PLT", 1, NUMERIC, {FMT_F, 5, 1}},
109 {"PIN", 2, NUMERIC, {FMT_F, 5, 1}},
110 {"POUT", 2, NUMERIC, {FMT_F, 5, 1}},
111 {"FGT", 1, NUMERIC, {FMT_F, 5, 3}},
112 {"FLT", 1, NUMERIC, {FMT_F, 5, 3}},
113 {"FIN", 2, NUMERIC, {FMT_F, 5, 3}},
114 {"FOUT", 2, NUMERIC, {FMT_F, 5, 3}},
115 {"N", 0, NUMERIC, {FMT_F, 7, 0}},
116 {"NU", 0, NUMERIC, {FMT_F, 7, 0}},
117 {"NMISS", 0, NUMERIC, {FMT_F, 7, 0}},
118 {"NUMISS", 0, NUMERIC, {FMT_F, 7, 0}},
119 {"FIRST", 0, ALPHA, {-1, -1, -1}},
120 {"LAST", 0, ALPHA, {-1, -1, -1}},
121 {NULL, 0, -1, {-1, -1, -1}},
122 {"N", 0, NUMERIC, {FMT_F, 7, 0}},
123 {"NU", 0, NUMERIC, {FMT_F, 7, 0}},
126 /* Missing value types. */
127 enum missing_treatment
129 ITEMWISE, /* Missing values item by item. */
130 COLUMNWISE /* Missing values column by column. */
133 /* An entire AGGREGATE procedure. */
136 /* We have either an output file or a sink. */
137 struct any_writer *writer; /* Output file, or null if none. */
138 struct case_sink *sink; /* Sink, or null if none. */
140 /* Break variables. */
141 struct sort_criteria *sort; /* Sort criteria. */
142 struct variable **break_vars; /* Break variables. */
143 size_t break_var_cnt; /* Number of break variables. */
144 struct ccase break_case; /* Last values of break variables. */
146 enum missing_treatment missing; /* How to treat missing values. */
147 struct agr_var *agr_vars; /* First aggregate variable. */
148 struct dictionary *dict; /* Aggregate dictionary. */
149 const struct dictionary *src_dict; /* Dict of the source */
150 int case_cnt; /* Counts aggregated cases. */
151 struct ccase agr_case; /* Aggregate case for output. */
154 static void initialize_aggregate_info (struct agr_proc *,
155 const struct ccase *);
158 static bool parse_aggregate_functions (const struct dictionary *,
160 static void agr_destroy (struct agr_proc *);
161 static bool aggregate_single_case (struct agr_proc *agr,
162 const struct ccase *input,
163 struct ccase *output);
164 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
166 /* Aggregating to the active file. */
167 static bool agr_to_active_file (const struct ccase *, void *aux, const struct dataset *);
169 /* Aggregating to a system file. */
170 static bool presorted_agr_to_sysfile (const struct ccase *, void *aux, const struct dataset *);
174 /* Parses and executes the AGGREGATE procedure. */
176 cmd_aggregate (struct dataset *ds)
178 struct dictionary *dict = dataset_dict (ds);
180 struct file_handle *out_file = NULL;
182 bool copy_documents = false;
183 bool presorted = false;
186 memset(&agr, 0 , sizeof (agr));
187 agr.missing = ITEMWISE;
188 case_nullify (&agr.break_case);
190 agr.dict = dict_create ();
192 dict_set_label (agr.dict, dict_get_label (dict));
193 dict_set_documents (agr.dict, dict_get_documents (dict));
195 /* OUTFILE subcommand must be first. */
196 if (!lex_force_match_id ("OUTFILE"))
199 if (!lex_match ('*'))
201 out_file = fh_parse (FH_REF_FILE | FH_REF_SCRATCH);
202 if (out_file == NULL)
206 /* Read most of the subcommands. */
211 if (lex_match_id ("MISSING"))
214 if (!lex_match_id ("COLUMNWISE"))
216 lex_error (_("while expecting COLUMNWISE"));
219 agr.missing = COLUMNWISE;
221 else if (lex_match_id ("DOCUMENT"))
222 copy_documents = true;
223 else if (lex_match_id ("PRESORTED"))
225 else if (lex_match_id ("BREAK"))
230 agr.sort = sort_parse_criteria (dict,
231 &agr.break_vars, &agr.break_var_cnt,
232 &saw_direction, NULL);
233 if (agr.sort == NULL)
236 for (i = 0; i < agr.break_var_cnt; i++)
237 dict_clone_var_assert (agr.dict, agr.break_vars[i],
238 agr.break_vars[i]->name);
240 /* BREAK must follow the options. */
245 lex_error (_("expecting BREAK"));
249 if (presorted && saw_direction)
250 msg (SW, _("When PRESORTED is specified, specifying sorting directions "
251 "with (A) or (D) has no effect. Output data will be sorted "
252 "the same way as the input data."));
254 /* Read in the aggregate functions. */
256 if (!parse_aggregate_functions (dict, &agr))
259 /* Delete documents. */
261 dict_set_documents (agr.dict, NULL);
263 /* Cancel SPLIT FILE. */
264 dict_set_split_vars (agr.dict, NULL, 0);
268 case_create (&agr.agr_case, dict_get_next_value_idx (agr.dict));
270 /* Output to active file or external file? */
271 if (out_file == NULL)
273 /* The active file will be replaced by the aggregated data,
274 so TEMPORARY is moot. */
275 proc_cancel_temporary_transformations (ds);
277 if (agr.sort != NULL && !presorted)
279 if (!sort_active_file_in_place (ds, agr.sort))
283 agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
284 if (agr.sink->class->open != NULL)
285 agr.sink->class->open (agr.sink);
287 create_case_sink (&null_sink_class,
289 if (!procedure (ds, agr_to_active_file, &agr))
291 if (agr.case_cnt > 0)
293 dump_aggregate_info (&agr, &agr.agr_case);
294 if (!agr.sink->class->write (agr.sink, &agr.agr_case))
297 discard_variables (ds);
299 dataset_set_dict (ds, agr.dict);
302 agr.sink->class->make_source (agr.sink));
303 free_case_sink (agr.sink);
307 agr.writer = any_writer_open (out_file, agr.dict);
308 if (agr.writer == NULL)
311 if (agr.sort != NULL && !presorted)
313 /* Sorting is needed. */
314 struct casefile *dst;
315 struct casereader *reader;
319 dst = sort_active_file_to_casefile (ds, agr.sort);
322 reader = casefile_get_destructive_reader (dst);
323 while (ok && casereader_read_xfer (reader, &c))
325 if (aggregate_single_case (&agr, &c, &agr.agr_case))
326 ok = any_writer_write (agr.writer, &agr.agr_case);
329 casereader_destroy (reader);
331 ok = !casefile_error (dst);
332 casefile_destroy (dst);
338 /* Active file is already sorted. */
339 if (!procedure (ds, presorted_agr_to_sysfile, &agr))
343 if (agr.case_cnt > 0)
345 dump_aggregate_info (&agr, &agr.agr_case);
346 any_writer_write (agr.writer, &agr.agr_case);
348 if (any_writer_error (agr.writer))
357 return CMD_CASCADING_FAILURE;
360 /* Parse all the aggregate functions. */
362 parse_aggregate_functions (const struct dictionary *dict, struct agr_proc *agr)
364 struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
366 /* Parse everything. */
375 const struct agr_func *function;
378 union agr_argument arg[2];
380 struct variable **src;
394 /* Parse the list of target variables. */
395 while (!lex_match ('='))
397 size_t n_dest_prev = n_dest;
399 if (!parse_DATA_LIST_vars (&dest, &n_dest,
400 PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
403 /* Assign empty labels. */
407 dest_label = xnrealloc (dest_label, n_dest, sizeof *dest_label);
408 for (j = n_dest_prev; j < n_dest; j++)
409 dest_label[j] = NULL;
412 if (token == T_STRING)
414 ds_truncate (&tokstr, 255);
415 dest_label[n_dest - 1] = ds_xstrdup (&tokstr);
420 /* Get the name of the aggregation function. */
423 lex_error (_("expecting aggregation function"));
428 if (tokid[strlen (tokid) - 1] == '.')
431 tokid[strlen (tokid) - 1] = 0;
434 for (function = agr_func_tab; function->name; function++)
435 if (!strcasecmp (function->name, tokid))
437 if (NULL == function->name)
439 msg (SE, _("Unknown aggregation function %s."), tokid);
442 func_index = function - agr_func_tab;
445 /* Check for leading lparen. */
446 if (!lex_match ('('))
449 func_index = N_NO_VARS;
450 else if (func_index == NU)
451 func_index = NU_NO_VARS;
454 lex_error (_("expecting `('"));
460 /* Parse list of source variables. */
462 int pv_opts = PV_NO_SCRATCH;
464 if (func_index == SUM || func_index == MEAN || func_index == SD)
465 pv_opts |= PV_NUMERIC;
466 else if (function->n_args)
467 pv_opts |= PV_SAME_TYPE;
469 if (!parse_variables (dict, &src, &n_src, pv_opts))
473 /* Parse function arguments, for those functions that
474 require arguments. */
475 if (function->n_args != 0)
476 for (i = 0; i < function->n_args; i++)
481 if (token == T_STRING)
483 arg[i].c = ds_xstrdup (&tokstr);
486 else if (lex_is_number ())
491 msg (SE, _("Missing argument %d to %s."), i + 1,
498 if (type != src[0]->type)
500 msg (SE, _("Arguments to %s must be of same type as "
501 "source variables."),
507 /* Trailing rparen. */
510 lex_error (_("expecting `)'"));
514 /* Now check that the number of source variables match
515 the number of target variables. If we check earlier
516 than this, the user can get very misleading error
517 message, i.e. `AGGREGATE x=SUM(y t).' will get this
518 error message when a proper message would be more
519 like `unknown variable t'. */
522 msg (SE, _("Number of source variables (%u) does not match "
523 "number of target variables (%u)."),
524 (unsigned) n_src, (unsigned) n_dest);
528 if ((func_index == PIN || func_index == POUT
529 || func_index == FIN || func_index == FOUT)
530 && ((src[0]->type == NUMERIC && arg[0].f > arg[1].f)
531 || (src[0]->type == ALPHA
532 && str_compare_rpad (arg[0].c, arg[1].c) > 0)))
534 union agr_argument t = arg[0];
538 msg (SW, _("The value arguments passed to the %s function "
539 "are out-of-order. They will be treated as if "
540 "they had been specified in the correct order."),
545 /* Finally add these to the linked list of aggregation
547 for (i = 0; i < n_dest; i++)
549 struct agr_var *v = xmalloc (sizeof *v);
551 /* Add variable to chain. */
552 if (agr->agr_vars != NULL)
560 /* Create the target variable in the aggregate
563 struct variable *destvar;
565 v->function = func_index;
571 if (src[i]->type == ALPHA)
573 v->function |= FSTRING;
574 v->string = xmalloc (src[i]->width);
577 if (function->alpha_type == ALPHA)
578 destvar = dict_clone_var (agr->dict, v->src, dest[i]);
581 assert (v->src->type == NUMERIC
582 || function->alpha_type == NUMERIC);
583 destvar = dict_create_var (agr->dict, dest[i], 0);
586 if ((func_index == N || func_index == NMISS)
587 && dict_get_weight (dict) != NULL)
588 destvar->print = destvar->write = f8_2;
590 destvar->print = destvar->write = function->format;
595 destvar = dict_create_var (agr->dict, dest[i], 0);
596 if (func_index == N_NO_VARS
597 && dict_get_weight (dict) != NULL)
598 destvar->print = destvar->write = f8_2;
600 destvar->print = destvar->write = function->format;
605 msg (SE, _("Variable name %s is not unique within the "
606 "aggregate file dictionary, which contains "
607 "the aggregate variables and the break "
616 destvar->label = dest_label[i];
617 dest_label[i] = NULL;
623 v->include_missing = include_missing;
629 if (v->src->type == NUMERIC)
630 for (j = 0; j < function->n_args; j++)
631 v->arg[j].f = arg[j].f;
633 for (j = 0; j < function->n_args; j++)
634 v->arg[j].c = xstrdup (arg[j].c);
638 if (src != NULL && src[0]->type == ALPHA)
639 for (i = 0; i < function->n_args; i++)
649 if (!lex_match ('/'))
654 lex_error ("expecting end of command");
660 for (i = 0; i < n_dest; i++)
663 free (dest_label[i]);
669 if (src && n_src && src[0]->type == ALPHA)
670 for (i = 0; i < function->n_args; i++)
683 agr_destroy (struct agr_proc *agr)
685 struct agr_var *iter, *next;
687 any_writer_close (agr->writer);
688 if (agr->sort != NULL)
689 sort_destroy_criteria (agr->sort);
690 free (agr->break_vars);
691 case_destroy (&agr->break_case);
692 for (iter = agr->agr_vars; iter; iter = next)
696 if (iter->function & FSTRING)
701 n_args = agr_func_tab[iter->function & FUNC].n_args;
702 for (i = 0; i < n_args; i++)
703 free (iter->arg[i].c);
706 else if (iter->function == SD)
707 moments1_destroy (iter->moments);
710 if (agr->dict != NULL)
711 dict_destroy (agr->dict);
713 case_destroy (&agr->agr_case);
718 static void accumulate_aggregate_info (struct agr_proc *,
719 const struct ccase *);
720 static void dump_aggregate_info (struct agr_proc *, struct ccase *);
722 /* Processes a single case INPUT for aggregation. If output is
723 warranted, writes it to OUTPUT and returns true.
724 Otherwise, returns false and OUTPUT is unmodified. */
726 aggregate_single_case (struct agr_proc *agr,
727 const struct ccase *input, struct ccase *output)
729 bool finished_group = false;
731 if (agr->case_cnt++ == 0)
732 initialize_aggregate_info (agr, input);
733 else if (case_compare (&agr->break_case, input,
734 agr->break_vars, agr->break_var_cnt))
736 dump_aggregate_info (agr, output);
737 finished_group = true;
739 initialize_aggregate_info (agr, input);
742 accumulate_aggregate_info (agr, input);
743 return finished_group;
746 /* Accumulates aggregation data from the case INPUT. */
748 accumulate_aggregate_info (struct agr_proc *agr,
749 const struct ccase *input)
751 struct agr_var *iter;
753 bool bad_warn = true;
755 weight = dict_get_case_weight (agr->src_dict, input, &bad_warn);
757 for (iter = agr->agr_vars; iter; iter = iter->next)
760 const union value *v = case_data (input, iter->src->fv);
762 if ((!iter->include_missing
763 && mv_is_value_missing (&iter->src->miss, v))
764 || (iter->include_missing && iter->src->type == NUMERIC
767 switch (iter->function)
770 case NMISS | FSTRING:
771 iter->dbl[0] += weight;
774 case NUMISS | FSTRING:
782 /* This is horrible. There are too many possibilities. */
783 switch (iter->function)
786 iter->dbl[0] += v->f * weight;
790 iter->dbl[0] += v->f * weight;
791 iter->dbl[1] += weight;
794 moments1_add (iter->moments, v->f, weight);
797 iter->dbl[0] = max (iter->dbl[0], v->f);
801 if (memcmp (iter->string, v->s, iter->src->width) < 0)
802 memcpy (iter->string, v->s, iter->src->width);
806 iter->dbl[0] = min (iter->dbl[0], v->f);
810 if (memcmp (iter->string, v->s, iter->src->width) > 0)
811 memcpy (iter->string, v->s, iter->src->width);
816 if (v->f > iter->arg[0].f)
817 iter->dbl[0] += weight;
818 iter->dbl[1] += weight;
822 if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
823 iter->dbl[0] += weight;
824 iter->dbl[1] += weight;
828 if (v->f < iter->arg[0].f)
829 iter->dbl[0] += weight;
830 iter->dbl[1] += weight;
834 if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
835 iter->dbl[0] += weight;
836 iter->dbl[1] += weight;
840 if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
841 iter->dbl[0] += weight;
842 iter->dbl[1] += weight;
846 if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
847 && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
848 iter->dbl[0] += weight;
849 iter->dbl[1] += weight;
853 if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
854 iter->dbl[0] += weight;
855 iter->dbl[1] += weight;
859 if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
860 || memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
861 iter->dbl[0] += weight;
862 iter->dbl[1] += weight;
866 iter->dbl[0] += weight;
879 case FIRST | FSTRING:
882 memcpy (iter->string, v->s, iter->src->width);
891 memcpy (iter->string, v->s, iter->src->width);
895 case NMISS | FSTRING:
897 case NUMISS | FSTRING:
898 /* Our value is not missing or it would have been
899 caught earlier. Nothing to do. */
905 switch (iter->function)
908 iter->dbl[0] += weight;
919 /* We've come to a record that differs from the previous in one or
920 more of the break variables. Make an output record from the
921 accumulated statistics in the OUTPUT case. */
923 dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
929 for (i = 0; i < agr->break_var_cnt; i++)
931 struct variable *v = agr->break_vars[i];
932 memcpy (case_data_rw (output, value_idx),
933 case_data (&agr->break_case, v->fv),
934 sizeof (union value) * v->nv);
942 for (i = agr->agr_vars; i; i = i->next)
944 union value *v = case_data_rw (output, i->dest->fv);
946 if (agr->missing == COLUMNWISE && i->missing != 0
947 && (i->function & FUNC) != N && (i->function & FUNC) != NU
948 && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
950 if (i->dest->type == ALPHA)
951 memset (v->s, ' ', i->dest->width);
960 v->f = i->int1 ? i->dbl[0] : SYSMIS;
963 v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
969 /* FIXME: we should use two passes. */
970 moments1_calculate (i->moments, NULL, NULL, &variance,
972 if (variance != SYSMIS)
973 v->f = sqrt (variance);
980 v->f = i->int1 ? i->dbl[0] : SYSMIS;
985 memcpy (v->s, i->string, i->dest->width);
987 memset (v->s, ' ', i->dest->width);
997 v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
1006 case POUT | FSTRING:
1007 v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
1019 v->f = i->int1 ? i->dbl[0] : SYSMIS;
1021 case FIRST | FSTRING:
1022 case LAST | FSTRING:
1024 memcpy (v->s, i->string, i->dest->width);
1026 memset (v->s, ' ', i->dest->width);
1035 case NMISS | FSTRING:
1039 case NUMISS | FSTRING:
1049 /* Resets the state for all the aggregate functions. */
1051 initialize_aggregate_info (struct agr_proc *agr, const struct ccase *input)
1053 struct agr_var *iter;
1055 case_destroy (&agr->break_case);
1056 case_clone (&agr->break_case, input);
1058 for (iter = agr->agr_vars; iter; iter = iter->next)
1061 iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1062 iter->int1 = iter->int2 = 0;
1063 switch (iter->function)
1066 iter->dbl[0] = DBL_MAX;
1069 memset (iter->string, 255, iter->src->width);
1072 iter->dbl[0] = -DBL_MAX;
1075 memset (iter->string, 0, iter->src->width);
1078 if (iter->moments == NULL)
1079 iter->moments = moments1_create (MOMENT_VARIANCE);
1081 moments1_clear (iter->moments);
1089 /* Aggregate each case as it comes through. Cases which aren't needed
1091 Returns true if successful, false if an I/O error occurred. */
1093 agr_to_active_file (const struct ccase *c, void *agr_, const struct dataset *ds UNUSED)
1095 struct agr_proc *agr = agr_;
1097 if (aggregate_single_case (agr, c, &agr->agr_case))
1098 return agr->sink->class->write (agr->sink, &agr->agr_case);
1103 /* Aggregate the current case and output it if we passed a
1106 presorted_agr_to_sysfile (const struct ccase *c, void *agr_, const struct dataset *ds UNUSED)
1108 struct agr_proc *agr = agr_;
1110 if (aggregate_single_case (agr, c, &agr->agr_case))
1111 return any_writer_write (agr->writer, &agr->agr_case);