1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 #include <libpspp/message.h>
23 #include <libpspp/alloc.h>
24 #include <data/any-writer.h>
25 #include <data/case.h>
26 #include <data/casefile.h>
27 #include <language/command.h>
28 #include <data/dictionary.h>
29 #include <libpspp/message.h>
30 #include <data/file-handle-def.h>
31 #include <language/data-io/file-handle.h>
32 #include <language/lexer/lexer.h>
33 #include <language/stats/sort-criteria.h>
34 #include <libpspp/misc.h>
35 #include <math/moments.h>
36 #include <libpspp/pool.h>
37 #include <data/settings.h>
38 #include <data/sys-file-writer.h>
39 #include <math/sort.h>
40 #include <libpspp/str.h>
41 #include <data/variable.h>
42 #include <procedure.h>
45 #define _(msgid) gettext (msgid)
47 /* Specifies how to make an aggregate variable. */
50 struct agr_var *next; /* Next in list. */
52 /* Collected during parsing. */
53 struct variable *src; /* Source variable. */
54 struct variable *dest; /* Target variable. */
55 int function; /* Function. */
56 int include_missing; /* 1=Include user-missing values. */
57 union value arg[2]; /* Arguments. */
59 /* Accumulated during AGGREGATE execution. */
64 struct moments1 *moments;
67 /* Aggregation functions. */
70 NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
71 FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
72 N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
73 FUNC = 0x1f, /* Function mask. */
74 FSTRING = 1<<5, /* String function bit. */
77 /* Attributes of an aggregation function. */
80 const char *name; /* Aggregation function name. */
81 size_t n_args; /* Number of arguments. */
82 int alpha_type; /* When given ALPHA arguments, output type. */
83 struct fmt_spec format; /* Format spec if alpha_type != ALPHA. */
86 /* Attributes of aggregation functions. */
87 static const struct agr_func agr_func_tab[] =
89 {"<NONE>", 0, -1, {0, 0, 0}},
90 {"SUM", 0, -1, {FMT_F, 8, 2}},
91 {"MEAN", 0, -1, {FMT_F, 8, 2}},
92 {"SD", 0, -1, {FMT_F, 8, 2}},
93 {"MAX", 0, ALPHA, {-1, -1, -1}},
94 {"MIN", 0, ALPHA, {-1, -1, -1}},
95 {"PGT", 1, NUMERIC, {FMT_F, 5, 1}},
96 {"PLT", 1, NUMERIC, {FMT_F, 5, 1}},
97 {"PIN", 2, NUMERIC, {FMT_F, 5, 1}},
98 {"POUT", 2, NUMERIC, {FMT_F, 5, 1}},
99 {"FGT", 1, NUMERIC, {FMT_F, 5, 3}},
100 {"FLT", 1, NUMERIC, {FMT_F, 5, 3}},
101 {"FIN", 2, NUMERIC, {FMT_F, 5, 3}},
102 {"FOUT", 2, NUMERIC, {FMT_F, 5, 3}},
103 {"N", 0, NUMERIC, {FMT_F, 7, 0}},
104 {"NU", 0, NUMERIC, {FMT_F, 7, 0}},
105 {"NMISS", 0, NUMERIC, {FMT_F, 7, 0}},
106 {"NUMISS", 0, NUMERIC, {FMT_F, 7, 0}},
107 {"FIRST", 0, ALPHA, {-1, -1, -1}},
108 {"LAST", 0, ALPHA, {-1, -1, -1}},
109 {NULL, 0, -1, {-1, -1, -1}},
110 {"N", 0, NUMERIC, {FMT_F, 7, 0}},
111 {"NU", 0, NUMERIC, {FMT_F, 7, 0}},
114 /* Missing value types. */
115 enum missing_treatment
117 ITEMWISE, /* Missing values item by item. */
118 COLUMNWISE /* Missing values column by column. */
121 /* An entire AGGREGATE procedure. */
124 /* We have either an output file or a sink. */
125 struct any_writer *writer; /* Output file, or null if none. */
126 struct case_sink *sink; /* Sink, or null if none. */
128 /* Break variables. */
129 struct sort_criteria *sort; /* Sort criteria. */
130 struct variable **break_vars; /* Break variables. */
131 size_t break_var_cnt; /* Number of break variables. */
132 struct ccase break_case; /* Last values of break variables. */
134 enum missing_treatment missing; /* How to treat missing values. */
135 struct agr_var *agr_vars; /* First aggregate variable. */
136 struct dictionary *dict; /* Aggregate dictionary. */
137 int case_cnt; /* Counts aggregated cases. */
138 struct ccase agr_case; /* Aggregate case for output. */
141 static void initialize_aggregate_info (struct agr_proc *,
142 const struct ccase *);
145 static int parse_aggregate_functions (struct agr_proc *);
146 static void agr_destroy (struct agr_proc *);
147 static int aggregate_single_case (struct agr_proc *agr,
148 const struct ccase *input,
149 struct ccase *output);
150 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
152 /* Aggregating to the active file. */
153 static bool agr_to_active_file (struct ccase *, void *aux);
155 /* Aggregating to a system file. */
156 static bool presorted_agr_to_sysfile (struct ccase *, void *aux);
160 /* Parses and executes the AGGREGATE procedure. */
165 struct file_handle *out_file = NULL;
167 bool copy_documents = false;
168 bool presorted = false;
171 memset(&agr, 0 , sizeof (agr));
172 agr.missing = ITEMWISE;
173 case_nullify (&agr.break_case);
175 agr.dict = dict_create ();
176 dict_set_label (agr.dict, dict_get_label (default_dict));
177 dict_set_documents (agr.dict, dict_get_documents (default_dict));
179 /* OUTFILE subcommand must be first. */
180 if (!lex_force_match_id ("OUTFILE"))
183 if (!lex_match ('*'))
185 out_file = fh_parse (FH_REF_FILE | FH_REF_SCRATCH);
186 if (out_file == NULL)
190 /* Read most of the subcommands. */
195 if (lex_match_id ("MISSING"))
198 if (!lex_match_id ("COLUMNWISE"))
200 lex_error (_("while expecting COLUMNWISE"));
203 agr.missing = COLUMNWISE;
205 else if (lex_match_id ("DOCUMENT"))
206 copy_documents = true;
207 else if (lex_match_id ("PRESORTED"))
209 else if (lex_match_id ("BREAK"))
214 agr.sort = sort_parse_criteria (default_dict,
215 &agr.break_vars, &agr.break_var_cnt,
216 &saw_direction, NULL);
217 if (agr.sort == NULL)
220 for (i = 0; i < agr.break_var_cnt; i++)
221 dict_clone_var_assert (agr.dict, agr.break_vars[i],
222 agr.break_vars[i]->name);
224 /* BREAK must follow the options. */
229 lex_error (_("expecting BREAK"));
233 if (presorted && saw_direction)
234 msg (SW, _("When PRESORTED is specified, specifying sorting directions "
235 "with (A) or (D) has no effect. Output data will be sorted "
236 "the same way as the input data."));
238 /* Read in the aggregate functions. */
240 if (!parse_aggregate_functions (&agr))
243 /* Delete documents. */
245 dict_set_documents (agr.dict, NULL);
247 /* Cancel SPLIT FILE. */
248 dict_set_split_vars (agr.dict, NULL, 0);
252 case_create (&agr.agr_case, dict_get_next_value_idx (agr.dict));
254 /* Output to active file or external file? */
255 if (out_file == NULL)
257 /* The active file will be replaced by the aggregated data,
258 so TEMPORARY is moot. */
261 if (agr.sort != NULL && !presorted)
263 if (!sort_active_file_in_place (agr.sort))
267 agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
268 if (agr.sink->class->open != NULL)
269 agr.sink->class->open (agr.sink);
270 vfm_sink = create_case_sink (&null_sink_class, default_dict, NULL);
271 if (!procedure (agr_to_active_file, &agr))
273 if (agr.case_cnt > 0)
275 dump_aggregate_info (&agr, &agr.agr_case);
276 if (!agr.sink->class->write (agr.sink, &agr.agr_case))
279 dict_destroy (default_dict);
280 default_dict = agr.dict;
282 vfm_source = agr.sink->class->make_source (agr.sink);
283 free_case_sink (agr.sink);
287 agr.writer = any_writer_open (out_file, agr.dict);
288 if (agr.writer == NULL)
291 if (agr.sort != NULL && !presorted)
293 /* Sorting is needed. */
294 struct casefile *dst;
295 struct casereader *reader;
299 dst = sort_active_file_to_casefile (agr.sort);
302 reader = casefile_get_destructive_reader (dst);
303 while (ok && casereader_read_xfer (reader, &c))
305 if (aggregate_single_case (&agr, &c, &agr.agr_case))
306 ok = any_writer_write (agr.writer, &agr.agr_case);
309 casereader_destroy (reader);
311 ok = !casefile_error (dst);
312 casefile_destroy (dst);
318 /* Active file is already sorted. */
319 if (!procedure (presorted_agr_to_sysfile, &agr))
323 if (agr.case_cnt > 0)
325 dump_aggregate_info (&agr, &agr.agr_case);
326 any_writer_write (agr.writer, &agr.agr_case);
328 if (any_writer_error (agr.writer))
337 return CMD_CASCADING_FAILURE;
340 /* Parse all the aggregate functions. */
342 parse_aggregate_functions (struct agr_proc *agr)
344 struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
346 /* Parse everything. */
355 const struct agr_func *function;
360 struct variable **src;
374 /* Parse the list of target variables. */
375 while (!lex_match ('='))
377 size_t n_dest_prev = n_dest;
379 if (!parse_DATA_LIST_vars (&dest, &n_dest,
380 PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
383 /* Assign empty labels. */
387 dest_label = xnrealloc (dest_label, n_dest, sizeof *dest_label);
388 for (j = n_dest_prev; j < n_dest; j++)
389 dest_label[j] = NULL;
392 if (token == T_STRING)
394 ds_truncate (&tokstr, 255);
395 dest_label[n_dest - 1] = xstrdup (ds_c_str (&tokstr));
400 /* Get the name of the aggregation function. */
403 lex_error (_("expecting aggregation function"));
408 if (tokid[strlen (tokid) - 1] == '.')
411 tokid[strlen (tokid) - 1] = 0;
414 for (function = agr_func_tab; function->name; function++)
415 if (!strcasecmp (function->name, tokid))
417 if (NULL == function->name)
419 msg (SE, _("Unknown aggregation function %s."), tokid);
422 func_index = function - agr_func_tab;
425 /* Check for leading lparen. */
426 if (!lex_match ('('))
429 func_index = N_NO_VARS;
430 else if (func_index == NU)
431 func_index = NU_NO_VARS;
434 lex_error (_("expecting `('"));
440 /* Parse list of source variables. */
442 int pv_opts = PV_NO_SCRATCH;
444 if (func_index == SUM || func_index == MEAN || func_index == SD)
445 pv_opts |= PV_NUMERIC;
446 else if (function->n_args)
447 pv_opts |= PV_SAME_TYPE;
449 if (!parse_variables (default_dict, &src, &n_src, pv_opts))
453 /* Parse function arguments, for those functions that
454 require arguments. */
455 if (function->n_args != 0)
456 for (i = 0; i < function->n_args; i++)
461 if (token == T_STRING)
463 arg[i].c = xstrdup (ds_c_str (&tokstr));
466 else if (lex_is_number ())
471 msg (SE, _("Missing argument %d to %s."), i + 1,
478 if (type != src[0]->type)
480 msg (SE, _("Arguments to %s must be of same type as "
481 "source variables."),
487 /* Trailing rparen. */
490 lex_error (_("expecting `)'"));
494 /* Now check that the number of source variables match
495 the number of target variables. If we check earlier
496 than this, the user can get very misleading error
497 message, i.e. `AGGREGATE x=SUM(y t).' will get this
498 error message when a proper message would be more
499 like `unknown variable t'. */
502 msg (SE, _("Number of source variables (%u) does not match "
503 "number of target variables (%u)."),
504 (unsigned) n_src, (unsigned) n_dest);
508 if ((func_index == PIN || func_index == POUT
509 || func_index == FIN || func_index == FOUT)
510 && ((src[0]->type == NUMERIC && arg[0].f > arg[1].f)
511 || (src[0]->type == ALPHA
512 && str_compare_rpad (arg[0].c, arg[1].c) > 0)))
514 union value t = arg[0];
518 msg (SW, _("The value arguments passed to the %s function "
519 "are out-of-order. They will be treated as if "
520 "they had been specified in the correct order."),
525 /* Finally add these to the linked list of aggregation
527 for (i = 0; i < n_dest; i++)
529 struct agr_var *v = xmalloc (sizeof *v);
531 /* Add variable to chain. */
532 if (agr->agr_vars != NULL)
540 /* Create the target variable in the aggregate
543 struct variable *destvar;
545 v->function = func_index;
551 if (src[i]->type == ALPHA)
553 v->function |= FSTRING;
554 v->string = xmalloc (src[i]->width);
557 if (function->alpha_type == ALPHA)
558 destvar = dict_clone_var (agr->dict, v->src, dest[i]);
561 assert (v->src->type == NUMERIC
562 || function->alpha_type == NUMERIC);
563 destvar = dict_create_var (agr->dict, dest[i], 0);
566 if ((func_index == N || func_index == NMISS)
567 && dict_get_weight (default_dict) != NULL)
568 destvar->print = destvar->write = f8_2;
570 destvar->print = destvar->write = function->format;
575 destvar = dict_create_var (agr->dict, dest[i], 0);
576 if (func_index == N_NO_VARS
577 && dict_get_weight (default_dict) != NULL)
578 destvar->print = destvar->write = f8_2;
580 destvar->print = destvar->write = function->format;
585 msg (SE, _("Variable name %s is not unique within the "
586 "aggregate file dictionary, which contains "
587 "the aggregate variables and the break "
597 destvar->label = dest_label[i];
598 dest_label[i] = NULL;
604 v->include_missing = include_missing;
610 if (v->src->type == NUMERIC)
611 for (j = 0; j < function->n_args; j++)
612 v->arg[j].f = arg[j].f;
614 for (j = 0; j < function->n_args; j++)
615 v->arg[j].c = xstrdup (arg[j].c);
619 if (src != NULL && src[0]->type == ALPHA)
620 for (i = 0; i < function->n_args; i++)
630 if (!lex_match ('/'))
635 lex_error ("expecting end of command");
641 for (i = 0; i < n_dest; i++)
644 free (dest_label[i]);
650 if (src && n_src && src[0]->type == ALPHA)
651 for (i = 0; i < function->n_args; i++)
664 agr_destroy (struct agr_proc *agr)
666 struct agr_var *iter, *next;
668 any_writer_close (agr->writer);
669 if (agr->sort != NULL)
670 sort_destroy_criteria (agr->sort);
671 free (agr->break_vars);
672 case_destroy (&agr->break_case);
673 for (iter = agr->agr_vars; iter; iter = next)
677 if (iter->function & FSTRING)
682 n_args = agr_func_tab[iter->function & FUNC].n_args;
683 for (i = 0; i < n_args; i++)
684 free (iter->arg[i].c);
687 else if (iter->function == SD)
688 moments1_destroy (iter->moments);
691 if (agr->dict != NULL)
692 dict_destroy (agr->dict);
694 case_destroy (&agr->agr_case);
699 static void accumulate_aggregate_info (struct agr_proc *,
700 const struct ccase *);
701 static void dump_aggregate_info (struct agr_proc *, struct ccase *);
703 /* Processes a single case INPUT for aggregation. If output is
704 warranted, writes it to OUTPUT and returns nonzero.
705 Otherwise, returns zero and OUTPUT is unmodified. */
707 aggregate_single_case (struct agr_proc *agr,
708 const struct ccase *input, struct ccase *output)
710 bool finished_group = false;
712 if (agr->case_cnt++ == 0)
713 initialize_aggregate_info (agr, input);
714 else if (case_compare (&agr->break_case, input,
715 agr->break_vars, agr->break_var_cnt))
717 dump_aggregate_info (agr, output);
718 finished_group = true;
720 initialize_aggregate_info (agr, input);
723 accumulate_aggregate_info (agr, input);
724 return finished_group;
727 /* Accumulates aggregation data from the case INPUT. */
729 accumulate_aggregate_info (struct agr_proc *agr,
730 const struct ccase *input)
732 struct agr_var *iter;
736 weight = dict_get_case_weight (default_dict, input, &bad_warn);
738 for (iter = agr->agr_vars; iter; iter = iter->next)
741 const union value *v = case_data (input, iter->src->fv);
743 if ((!iter->include_missing
744 && mv_is_value_missing (&iter->src->miss, v))
745 || (iter->include_missing && iter->src->type == NUMERIC
748 switch (iter->function)
751 case NMISS | FSTRING:
752 iter->dbl[0] += weight;
755 case NUMISS | FSTRING:
763 /* This is horrible. There are too many possibilities. */
764 switch (iter->function)
767 iter->dbl[0] += v->f * weight;
771 iter->dbl[0] += v->f * weight;
772 iter->dbl[1] += weight;
775 moments1_add (iter->moments, v->f, weight);
778 iter->dbl[0] = max (iter->dbl[0], v->f);
782 if (memcmp (iter->string, v->s, iter->src->width) < 0)
783 memcpy (iter->string, v->s, iter->src->width);
787 iter->dbl[0] = min (iter->dbl[0], v->f);
791 if (memcmp (iter->string, v->s, iter->src->width) > 0)
792 memcpy (iter->string, v->s, iter->src->width);
797 if (v->f > iter->arg[0].f)
798 iter->dbl[0] += weight;
799 iter->dbl[1] += weight;
803 if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
804 iter->dbl[0] += weight;
805 iter->dbl[1] += weight;
809 if (v->f < iter->arg[0].f)
810 iter->dbl[0] += weight;
811 iter->dbl[1] += weight;
815 if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
816 iter->dbl[0] += weight;
817 iter->dbl[1] += weight;
821 if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
822 iter->dbl[0] += weight;
823 iter->dbl[1] += weight;
827 if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
828 && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
829 iter->dbl[0] += weight;
830 iter->dbl[1] += weight;
834 if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
835 iter->dbl[0] += weight;
836 iter->dbl[1] += weight;
840 if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
841 || memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
842 iter->dbl[0] += weight;
843 iter->dbl[1] += weight;
847 iter->dbl[0] += weight;
860 case FIRST | FSTRING:
863 memcpy (iter->string, v->s, iter->src->width);
872 memcpy (iter->string, v->s, iter->src->width);
876 case NMISS | FSTRING:
878 case NUMISS | FSTRING:
879 /* Our value is not missing or it would have been
880 caught earlier. Nothing to do. */
886 switch (iter->function)
889 iter->dbl[0] += weight;
900 /* We've come to a record that differs from the previous in one or
901 more of the break variables. Make an output record from the
902 accumulated statistics in the OUTPUT case. */
904 dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
910 for (i = 0; i < agr->break_var_cnt; i++)
912 struct variable *v = agr->break_vars[i];
913 memcpy (case_data_rw (output, value_idx),
914 case_data (&agr->break_case, v->fv),
915 sizeof (union value) * v->nv);
923 for (i = agr->agr_vars; i; i = i->next)
925 union value *v = case_data_rw (output, i->dest->fv);
927 if (agr->missing == COLUMNWISE && i->missing != 0
928 && (i->function & FUNC) != N && (i->function & FUNC) != NU
929 && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
931 if (i->dest->type == ALPHA)
932 memset (v->s, ' ', i->dest->width);
941 v->f = i->int1 ? i->dbl[0] : SYSMIS;
944 v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
950 /* FIXME: we should use two passes. */
951 moments1_calculate (i->moments, NULL, NULL, &variance,
953 if (variance != SYSMIS)
954 v->f = sqrt (variance);
961 v->f = i->int1 ? i->dbl[0] : SYSMIS;
966 memcpy (v->s, i->string, i->dest->width);
968 memset (v->s, ' ', i->dest->width);
978 v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
988 v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
1000 v->f = i->int1 ? i->dbl[0] : SYSMIS;
1002 case FIRST | FSTRING:
1003 case LAST | FSTRING:
1005 memcpy (v->s, i->string, i->dest->width);
1007 memset (v->s, ' ', i->dest->width);
1016 case NMISS | FSTRING:
1020 case NUMISS | FSTRING:
1030 /* Resets the state for all the aggregate functions. */
1032 initialize_aggregate_info (struct agr_proc *agr, const struct ccase *input)
1034 struct agr_var *iter;
1036 case_destroy (&agr->break_case);
1037 case_clone (&agr->break_case, input);
1039 for (iter = agr->agr_vars; iter; iter = iter->next)
1042 iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1043 iter->int1 = iter->int2 = 0;
1044 switch (iter->function)
1047 iter->dbl[0] = DBL_MAX;
1050 memset (iter->string, 255, iter->src->width);
1053 iter->dbl[0] = -DBL_MAX;
1056 memset (iter->string, 0, iter->src->width);
1059 if (iter->moments == NULL)
1060 iter->moments = moments1_create (MOMENT_VARIANCE);
1062 moments1_clear (iter->moments);
1070 /* Aggregate each case as it comes through. Cases which aren't needed
1072 Returns true if successful, false if an I/O error occurred. */
1074 agr_to_active_file (struct ccase *c, void *agr_)
1076 struct agr_proc *agr = agr_;
1078 if (aggregate_single_case (agr, c, &agr->agr_case))
1079 return agr->sink->class->write (agr->sink, &agr->agr_case);
1084 /* Aggregate the current case and output it if we passed a
1087 presorted_agr_to_sysfile (struct ccase *c, void *agr_)
1089 struct agr_proc *agr = agr_;
1091 if (aggregate_single_case (agr, c, &agr->agr_case))
1092 return any_writer_write (agr->writer, &agr->agr_case);