1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24 #include <data/any-writer.h>
25 #include <data/case-sink.h>
26 #include <data/case.h>
27 #include <data/casefile.h>
28 #include <data/dictionary.h>
29 #include <data/file-handle-def.h>
30 #include <data/procedure.h>
31 #include <data/settings.h>
32 #include <data/storage-stream.h>
33 #include <data/sys-file-writer.h>
34 #include <data/variable.h>
35 #include <language/command.h>
36 #include <language/data-io/file-handle.h>
37 #include <language/lexer/lexer.h>
38 #include <language/stats/sort-criteria.h>
39 #include <libpspp/alloc.h>
40 #include <libpspp/message.h>
41 #include <libpspp/message.h>
42 #include <libpspp/misc.h>
43 #include <libpspp/pool.h>
44 #include <libpspp/str.h>
45 #include <math/moments.h>
46 #include <math/sort.h>
49 #define _(msgid) gettext (msgid)
51 /* Argument for AGGREGATE function. */
54 double f; /* Numeric. */
55 char *c; /* Short or long string. */
58 /* Specifies how to make an aggregate variable. */
61 struct agr_var *next; /* Next in list. */
63 /* Collected during parsing. */
64 struct variable *src; /* Source variable. */
65 struct variable *dest; /* Target variable. */
66 int function; /* Function. */
67 int include_missing; /* 1=Include user-missing values. */
68 union agr_argument arg[2]; /* Arguments. */
70 /* Accumulated during AGGREGATE execution. */
75 struct moments1 *moments;
78 /* Aggregation functions. */
81 NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
82 FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
83 N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
84 FUNC = 0x1f, /* Function mask. */
85 FSTRING = 1<<5, /* String function bit. */
88 /* Attributes of an aggregation function. */
91 const char *name; /* Aggregation function name. */
92 size_t n_args; /* Number of arguments. */
93 int alpha_type; /* When given ALPHA arguments, output type. */
94 struct fmt_spec format; /* Format spec if alpha_type != ALPHA. */
97 /* Attributes of aggregation functions. */
98 static const struct agr_func agr_func_tab[] =
100 {"<NONE>", 0, -1, {0, 0, 0}},
101 {"SUM", 0, -1, {FMT_F, 8, 2}},
102 {"MEAN", 0, -1, {FMT_F, 8, 2}},
103 {"SD", 0, -1, {FMT_F, 8, 2}},
104 {"MAX", 0, ALPHA, {-1, -1, -1}},
105 {"MIN", 0, ALPHA, {-1, -1, -1}},
106 {"PGT", 1, NUMERIC, {FMT_F, 5, 1}},
107 {"PLT", 1, NUMERIC, {FMT_F, 5, 1}},
108 {"PIN", 2, NUMERIC, {FMT_F, 5, 1}},
109 {"POUT", 2, NUMERIC, {FMT_F, 5, 1}},
110 {"FGT", 1, NUMERIC, {FMT_F, 5, 3}},
111 {"FLT", 1, NUMERIC, {FMT_F, 5, 3}},
112 {"FIN", 2, NUMERIC, {FMT_F, 5, 3}},
113 {"FOUT", 2, NUMERIC, {FMT_F, 5, 3}},
114 {"N", 0, NUMERIC, {FMT_F, 7, 0}},
115 {"NU", 0, NUMERIC, {FMT_F, 7, 0}},
116 {"NMISS", 0, NUMERIC, {FMT_F, 7, 0}},
117 {"NUMISS", 0, NUMERIC, {FMT_F, 7, 0}},
118 {"FIRST", 0, ALPHA, {-1, -1, -1}},
119 {"LAST", 0, ALPHA, {-1, -1, -1}},
120 {NULL, 0, -1, {-1, -1, -1}},
121 {"N", 0, NUMERIC, {FMT_F, 7, 0}},
122 {"NU", 0, NUMERIC, {FMT_F, 7, 0}},
125 /* Missing value types. */
126 enum missing_treatment
128 ITEMWISE, /* Missing values item by item. */
129 COLUMNWISE /* Missing values column by column. */
132 /* An entire AGGREGATE procedure. */
135 /* We have either an output file or a sink. */
136 struct any_writer *writer; /* Output file, or null if none. */
137 struct case_sink *sink; /* Sink, or null if none. */
139 /* Break variables. */
140 struct sort_criteria *sort; /* Sort criteria. */
141 struct variable **break_vars; /* Break variables. */
142 size_t break_var_cnt; /* Number of break variables. */
143 struct ccase break_case; /* Last values of break variables. */
145 enum missing_treatment missing; /* How to treat missing values. */
146 struct agr_var *agr_vars; /* First aggregate variable. */
147 struct dictionary *dict; /* Aggregate dictionary. */
148 int case_cnt; /* Counts aggregated cases. */
149 struct ccase agr_case; /* Aggregate case for output. */
152 static void initialize_aggregate_info (struct agr_proc *,
153 const struct ccase *);
156 static int parse_aggregate_functions (struct agr_proc *);
157 static void agr_destroy (struct agr_proc *);
158 static int aggregate_single_case (struct agr_proc *agr,
159 const struct ccase *input,
160 struct ccase *output);
161 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
163 /* Aggregating to the active file. */
164 static bool agr_to_active_file (const struct ccase *, void *aux);
166 /* Aggregating to a system file. */
167 static bool presorted_agr_to_sysfile (const struct ccase *, void *aux);
171 /* Parses and executes the AGGREGATE procedure. */
176 struct file_handle *out_file = NULL;
178 bool copy_documents = false;
179 bool presorted = false;
182 memset(&agr, 0 , sizeof (agr));
183 agr.missing = ITEMWISE;
184 case_nullify (&agr.break_case);
186 agr.dict = dict_create ();
187 dict_set_label (agr.dict, dict_get_label (default_dict));
188 dict_set_documents (agr.dict, dict_get_documents (default_dict));
190 /* OUTFILE subcommand must be first. */
191 if (!lex_force_match_id ("OUTFILE"))
194 if (!lex_match ('*'))
196 out_file = fh_parse (FH_REF_FILE | FH_REF_SCRATCH);
197 if (out_file == NULL)
201 /* Read most of the subcommands. */
206 if (lex_match_id ("MISSING"))
209 if (!lex_match_id ("COLUMNWISE"))
211 lex_error (_("while expecting COLUMNWISE"));
214 agr.missing = COLUMNWISE;
216 else if (lex_match_id ("DOCUMENT"))
217 copy_documents = true;
218 else if (lex_match_id ("PRESORTED"))
220 else if (lex_match_id ("BREAK"))
225 agr.sort = sort_parse_criteria (default_dict,
226 &agr.break_vars, &agr.break_var_cnt,
227 &saw_direction, NULL);
228 if (agr.sort == NULL)
231 for (i = 0; i < agr.break_var_cnt; i++)
232 dict_clone_var_assert (agr.dict, agr.break_vars[i],
233 agr.break_vars[i]->name);
235 /* BREAK must follow the options. */
240 lex_error (_("expecting BREAK"));
244 if (presorted && saw_direction)
245 msg (SW, _("When PRESORTED is specified, specifying sorting directions "
246 "with (A) or (D) has no effect. Output data will be sorted "
247 "the same way as the input data."));
249 /* Read in the aggregate functions. */
251 if (!parse_aggregate_functions (&agr))
254 /* Delete documents. */
256 dict_set_documents (agr.dict, NULL);
258 /* Cancel SPLIT FILE. */
259 dict_set_split_vars (agr.dict, NULL, 0);
263 case_create (&agr.agr_case, dict_get_next_value_idx (agr.dict));
265 /* Output to active file or external file? */
266 if (out_file == NULL)
268 /* The active file will be replaced by the aggregated data,
269 so TEMPORARY is moot. */
270 proc_cancel_temporary_transformations ();
272 if (agr.sort != NULL && !presorted)
274 if (!sort_active_file_in_place (agr.sort))
278 agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
279 if (agr.sink->class->open != NULL)
280 agr.sink->class->open (agr.sink);
281 proc_set_sink (create_case_sink (&null_sink_class, default_dict, NULL));
282 if (!procedure (agr_to_active_file, &agr))
284 if (agr.case_cnt > 0)
286 dump_aggregate_info (&agr, &agr.agr_case);
287 if (!agr.sink->class->write (agr.sink, &agr.agr_case))
290 discard_variables ();
291 dict_destroy (default_dict);
292 default_dict = agr.dict;
294 proc_set_source (agr.sink->class->make_source (agr.sink));
295 free_case_sink (agr.sink);
299 agr.writer = any_writer_open (out_file, agr.dict);
300 if (agr.writer == NULL)
303 if (agr.sort != NULL && !presorted)
305 /* Sorting is needed. */
306 struct casefile *dst;
307 struct casereader *reader;
311 dst = sort_active_file_to_casefile (agr.sort);
314 reader = casefile_get_destructive_reader (dst);
315 while (ok && casereader_read_xfer (reader, &c))
317 if (aggregate_single_case (&agr, &c, &agr.agr_case))
318 ok = any_writer_write (agr.writer, &agr.agr_case);
321 casereader_destroy (reader);
323 ok = !casefile_error (dst);
324 casefile_destroy (dst);
330 /* Active file is already sorted. */
331 if (!procedure (presorted_agr_to_sysfile, &agr))
335 if (agr.case_cnt > 0)
337 dump_aggregate_info (&agr, &agr.agr_case);
338 any_writer_write (agr.writer, &agr.agr_case);
340 if (any_writer_error (agr.writer))
349 return CMD_CASCADING_FAILURE;
352 /* Parse all the aggregate functions. */
354 parse_aggregate_functions (struct agr_proc *agr)
356 struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
358 /* Parse everything. */
367 const struct agr_func *function;
370 union agr_argument arg[2];
372 struct variable **src;
386 /* Parse the list of target variables. */
387 while (!lex_match ('='))
389 size_t n_dest_prev = n_dest;
391 if (!parse_DATA_LIST_vars (&dest, &n_dest,
392 PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
395 /* Assign empty labels. */
399 dest_label = xnrealloc (dest_label, n_dest, sizeof *dest_label);
400 for (j = n_dest_prev; j < n_dest; j++)
401 dest_label[j] = NULL;
404 if (token == T_STRING)
406 ds_truncate (&tokstr, 255);
407 dest_label[n_dest - 1] = xstrdup (ds_c_str (&tokstr));
412 /* Get the name of the aggregation function. */
415 lex_error (_("expecting aggregation function"));
420 if (tokid[strlen (tokid) - 1] == '.')
423 tokid[strlen (tokid) - 1] = 0;
426 for (function = agr_func_tab; function->name; function++)
427 if (!strcasecmp (function->name, tokid))
429 if (NULL == function->name)
431 msg (SE, _("Unknown aggregation function %s."), tokid);
434 func_index = function - agr_func_tab;
437 /* Check for leading lparen. */
438 if (!lex_match ('('))
441 func_index = N_NO_VARS;
442 else if (func_index == NU)
443 func_index = NU_NO_VARS;
446 lex_error (_("expecting `('"));
452 /* Parse list of source variables. */
454 int pv_opts = PV_NO_SCRATCH;
456 if (func_index == SUM || func_index == MEAN || func_index == SD)
457 pv_opts |= PV_NUMERIC;
458 else if (function->n_args)
459 pv_opts |= PV_SAME_TYPE;
461 if (!parse_variables (default_dict, &src, &n_src, pv_opts))
465 /* Parse function arguments, for those functions that
466 require arguments. */
467 if (function->n_args != 0)
468 for (i = 0; i < function->n_args; i++)
473 if (token == T_STRING)
475 arg[i].c = xstrdup (ds_c_str (&tokstr));
478 else if (lex_is_number ())
483 msg (SE, _("Missing argument %d to %s."), i + 1,
490 if (type != src[0]->type)
492 msg (SE, _("Arguments to %s must be of same type as "
493 "source variables."),
499 /* Trailing rparen. */
502 lex_error (_("expecting `)'"));
506 /* Now check that the number of source variables match
507 the number of target variables. If we check earlier
508 than this, the user can get very misleading error
509 message, i.e. `AGGREGATE x=SUM(y t).' will get this
510 error message when a proper message would be more
511 like `unknown variable t'. */
514 msg (SE, _("Number of source variables (%u) does not match "
515 "number of target variables (%u)."),
516 (unsigned) n_src, (unsigned) n_dest);
520 if ((func_index == PIN || func_index == POUT
521 || func_index == FIN || func_index == FOUT)
522 && ((src[0]->type == NUMERIC && arg[0].f > arg[1].f)
523 || (src[0]->type == ALPHA
524 && str_compare_rpad (arg[0].c, arg[1].c) > 0)))
526 union agr_argument t = arg[0];
530 msg (SW, _("The value arguments passed to the %s function "
531 "are out-of-order. They will be treated as if "
532 "they had been specified in the correct order."),
537 /* Finally add these to the linked list of aggregation
539 for (i = 0; i < n_dest; i++)
541 struct agr_var *v = xmalloc (sizeof *v);
543 /* Add variable to chain. */
544 if (agr->agr_vars != NULL)
552 /* Create the target variable in the aggregate
555 struct variable *destvar;
557 v->function = func_index;
563 if (src[i]->type == ALPHA)
565 v->function |= FSTRING;
566 v->string = xmalloc (src[i]->width);
569 if (function->alpha_type == ALPHA)
570 destvar = dict_clone_var (agr->dict, v->src, dest[i]);
573 assert (v->src->type == NUMERIC
574 || function->alpha_type == NUMERIC);
575 destvar = dict_create_var (agr->dict, dest[i], 0);
578 if ((func_index == N || func_index == NMISS)
579 && dict_get_weight (default_dict) != NULL)
580 destvar->print = destvar->write = f8_2;
582 destvar->print = destvar->write = function->format;
587 destvar = dict_create_var (agr->dict, dest[i], 0);
588 if (func_index == N_NO_VARS
589 && dict_get_weight (default_dict) != NULL)
590 destvar->print = destvar->write = f8_2;
592 destvar->print = destvar->write = function->format;
597 msg (SE, _("Variable name %s is not unique within the "
598 "aggregate file dictionary, which contains "
599 "the aggregate variables and the break "
608 destvar->label = dest_label[i];
609 dest_label[i] = NULL;
615 v->include_missing = include_missing;
621 if (v->src->type == NUMERIC)
622 for (j = 0; j < function->n_args; j++)
623 v->arg[j].f = arg[j].f;
625 for (j = 0; j < function->n_args; j++)
626 v->arg[j].c = xstrdup (arg[j].c);
630 if (src != NULL && src[0]->type == ALPHA)
631 for (i = 0; i < function->n_args; i++)
641 if (!lex_match ('/'))
646 lex_error ("expecting end of command");
652 for (i = 0; i < n_dest; i++)
655 free (dest_label[i]);
661 if (src && n_src && src[0]->type == ALPHA)
662 for (i = 0; i < function->n_args; i++)
675 agr_destroy (struct agr_proc *agr)
677 struct agr_var *iter, *next;
679 any_writer_close (agr->writer);
680 if (agr->sort != NULL)
681 sort_destroy_criteria (agr->sort);
682 free (agr->break_vars);
683 case_destroy (&agr->break_case);
684 for (iter = agr->agr_vars; iter; iter = next)
688 if (iter->function & FSTRING)
693 n_args = agr_func_tab[iter->function & FUNC].n_args;
694 for (i = 0; i < n_args; i++)
695 free (iter->arg[i].c);
698 else if (iter->function == SD)
699 moments1_destroy (iter->moments);
702 if (agr->dict != NULL)
703 dict_destroy (agr->dict);
705 case_destroy (&agr->agr_case);
710 static void accumulate_aggregate_info (struct agr_proc *,
711 const struct ccase *);
712 static void dump_aggregate_info (struct agr_proc *, struct ccase *);
714 /* Processes a single case INPUT for aggregation. If output is
715 warranted, writes it to OUTPUT and returns nonzero.
716 Otherwise, returns zero and OUTPUT is unmodified. */
718 aggregate_single_case (struct agr_proc *agr,
719 const struct ccase *input, struct ccase *output)
721 bool finished_group = false;
723 if (agr->case_cnt++ == 0)
724 initialize_aggregate_info (agr, input);
725 else if (case_compare (&agr->break_case, input,
726 agr->break_vars, agr->break_var_cnt))
728 dump_aggregate_info (agr, output);
729 finished_group = true;
731 initialize_aggregate_info (agr, input);
734 accumulate_aggregate_info (agr, input);
735 return finished_group;
738 /* Accumulates aggregation data from the case INPUT. */
740 accumulate_aggregate_info (struct agr_proc *agr,
741 const struct ccase *input)
743 struct agr_var *iter;
747 weight = dict_get_case_weight (default_dict, input, &bad_warn);
749 for (iter = agr->agr_vars; iter; iter = iter->next)
752 const union value *v = case_data (input, iter->src->fv);
754 if ((!iter->include_missing
755 && mv_is_value_missing (&iter->src->miss, v))
756 || (iter->include_missing && iter->src->type == NUMERIC
759 switch (iter->function)
762 case NMISS | FSTRING:
763 iter->dbl[0] += weight;
766 case NUMISS | FSTRING:
774 /* This is horrible. There are too many possibilities. */
775 switch (iter->function)
778 iter->dbl[0] += v->f * weight;
782 iter->dbl[0] += v->f * weight;
783 iter->dbl[1] += weight;
786 moments1_add (iter->moments, v->f, weight);
789 iter->dbl[0] = max (iter->dbl[0], v->f);
793 if (memcmp (iter->string, v->s, iter->src->width) < 0)
794 memcpy (iter->string, v->s, iter->src->width);
798 iter->dbl[0] = min (iter->dbl[0], v->f);
802 if (memcmp (iter->string, v->s, iter->src->width) > 0)
803 memcpy (iter->string, v->s, iter->src->width);
808 if (v->f > iter->arg[0].f)
809 iter->dbl[0] += weight;
810 iter->dbl[1] += weight;
814 if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
815 iter->dbl[0] += weight;
816 iter->dbl[1] += weight;
820 if (v->f < iter->arg[0].f)
821 iter->dbl[0] += weight;
822 iter->dbl[1] += weight;
826 if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
827 iter->dbl[0] += weight;
828 iter->dbl[1] += weight;
832 if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
833 iter->dbl[0] += weight;
834 iter->dbl[1] += weight;
838 if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
839 && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
840 iter->dbl[0] += weight;
841 iter->dbl[1] += weight;
845 if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
846 iter->dbl[0] += weight;
847 iter->dbl[1] += weight;
851 if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
852 || memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
853 iter->dbl[0] += weight;
854 iter->dbl[1] += weight;
858 iter->dbl[0] += weight;
871 case FIRST | FSTRING:
874 memcpy (iter->string, v->s, iter->src->width);
883 memcpy (iter->string, v->s, iter->src->width);
887 case NMISS | FSTRING:
889 case NUMISS | FSTRING:
890 /* Our value is not missing or it would have been
891 caught earlier. Nothing to do. */
897 switch (iter->function)
900 iter->dbl[0] += weight;
911 /* We've come to a record that differs from the previous in one or
912 more of the break variables. Make an output record from the
913 accumulated statistics in the OUTPUT case. */
915 dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
921 for (i = 0; i < agr->break_var_cnt; i++)
923 struct variable *v = agr->break_vars[i];
924 memcpy (case_data_rw (output, value_idx),
925 case_data (&agr->break_case, v->fv),
926 sizeof (union value) * v->nv);
934 for (i = agr->agr_vars; i; i = i->next)
936 union value *v = case_data_rw (output, i->dest->fv);
938 if (agr->missing == COLUMNWISE && i->missing != 0
939 && (i->function & FUNC) != N && (i->function & FUNC) != NU
940 && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
942 if (i->dest->type == ALPHA)
943 memset (v->s, ' ', i->dest->width);
952 v->f = i->int1 ? i->dbl[0] : SYSMIS;
955 v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
961 /* FIXME: we should use two passes. */
962 moments1_calculate (i->moments, NULL, NULL, &variance,
964 if (variance != SYSMIS)
965 v->f = sqrt (variance);
972 v->f = i->int1 ? i->dbl[0] : SYSMIS;
977 memcpy (v->s, i->string, i->dest->width);
979 memset (v->s, ' ', i->dest->width);
989 v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
999 v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
1011 v->f = i->int1 ? i->dbl[0] : SYSMIS;
1013 case FIRST | FSTRING:
1014 case LAST | FSTRING:
1016 memcpy (v->s, i->string, i->dest->width);
1018 memset (v->s, ' ', i->dest->width);
1027 case NMISS | FSTRING:
1031 case NUMISS | FSTRING:
1041 /* Resets the state for all the aggregate functions. */
1043 initialize_aggregate_info (struct agr_proc *agr, const struct ccase *input)
1045 struct agr_var *iter;
1047 case_destroy (&agr->break_case);
1048 case_clone (&agr->break_case, input);
1050 for (iter = agr->agr_vars; iter; iter = iter->next)
1053 iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1054 iter->int1 = iter->int2 = 0;
1055 switch (iter->function)
1058 iter->dbl[0] = DBL_MAX;
1061 memset (iter->string, 255, iter->src->width);
1064 iter->dbl[0] = -DBL_MAX;
1067 memset (iter->string, 0, iter->src->width);
1070 if (iter->moments == NULL)
1071 iter->moments = moments1_create (MOMENT_VARIANCE);
1073 moments1_clear (iter->moments);
1081 /* Aggregate each case as it comes through. Cases which aren't needed
1083 Returns true if successful, false if an I/O error occurred. */
1085 agr_to_active_file (const struct ccase *c, void *agr_)
1087 struct agr_proc *agr = agr_;
1089 if (aggregate_single_case (agr, c, &agr->agr_case))
1090 return agr->sink->class->write (agr->sink, &agr->agr_case);
1095 /* Aggregate the current case and output it if we passed a
1098 presorted_agr_to_sysfile (const struct ccase *c, void *agr_)
1100 struct agr_proc *agr = agr_;
1102 if (aggregate_single_case (agr, c, &agr->agr_case))
1103 return any_writer_write (agr->writer, &agr->agr_case);