1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24 #include <data/any-writer.h>
25 #include <data/case-sink.h>
26 #include <data/case.h>
27 #include <data/casefile.h>
28 #include <data/dictionary.h>
29 #include <data/file-handle-def.h>
30 #include <procedure.h>
31 #include <data/settings.h>
32 #include <data/storage-stream.h>
33 #include <data/sys-file-writer.h>
34 #include <data/variable.h>
35 #include <language/command.h>
36 #include <language/data-io/file-handle.h>
37 #include <language/lexer/lexer.h>
38 #include <language/stats/sort-criteria.h>
39 #include <libpspp/alloc.h>
40 #include <libpspp/message.h>
41 #include <libpspp/message.h>
42 #include <libpspp/misc.h>
43 #include <libpspp/pool.h>
44 #include <libpspp/str.h>
45 #include <math/moments.h>
46 #include <math/sort.h>
47 #include <procedure.h>
50 #define _(msgid) gettext (msgid)
52 /* Specifies how to make an aggregate variable. */
55 struct agr_var *next; /* Next in list. */
57 /* Collected during parsing. */
58 struct variable *src; /* Source variable. */
59 struct variable *dest; /* Target variable. */
60 int function; /* Function. */
61 int include_missing; /* 1=Include user-missing values. */
62 union value arg[2]; /* Arguments. */
64 /* Accumulated during AGGREGATE execution. */
69 struct moments1 *moments;
72 /* Aggregation functions. */
75 NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
76 FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
77 N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
78 FUNC = 0x1f, /* Function mask. */
79 FSTRING = 1<<5, /* String function bit. */
82 /* Attributes of an aggregation function. */
85 const char *name; /* Aggregation function name. */
86 size_t n_args; /* Number of arguments. */
87 int alpha_type; /* When given ALPHA arguments, output type. */
88 struct fmt_spec format; /* Format spec if alpha_type != ALPHA. */
91 /* Attributes of aggregation functions. */
92 static const struct agr_func agr_func_tab[] =
94 {"<NONE>", 0, -1, {0, 0, 0}},
95 {"SUM", 0, -1, {FMT_F, 8, 2}},
96 {"MEAN", 0, -1, {FMT_F, 8, 2}},
97 {"SD", 0, -1, {FMT_F, 8, 2}},
98 {"MAX", 0, ALPHA, {-1, -1, -1}},
99 {"MIN", 0, ALPHA, {-1, -1, -1}},
100 {"PGT", 1, NUMERIC, {FMT_F, 5, 1}},
101 {"PLT", 1, NUMERIC, {FMT_F, 5, 1}},
102 {"PIN", 2, NUMERIC, {FMT_F, 5, 1}},
103 {"POUT", 2, NUMERIC, {FMT_F, 5, 1}},
104 {"FGT", 1, NUMERIC, {FMT_F, 5, 3}},
105 {"FLT", 1, NUMERIC, {FMT_F, 5, 3}},
106 {"FIN", 2, NUMERIC, {FMT_F, 5, 3}},
107 {"FOUT", 2, NUMERIC, {FMT_F, 5, 3}},
108 {"N", 0, NUMERIC, {FMT_F, 7, 0}},
109 {"NU", 0, NUMERIC, {FMT_F, 7, 0}},
110 {"NMISS", 0, NUMERIC, {FMT_F, 7, 0}},
111 {"NUMISS", 0, NUMERIC, {FMT_F, 7, 0}},
112 {"FIRST", 0, ALPHA, {-1, -1, -1}},
113 {"LAST", 0, ALPHA, {-1, -1, -1}},
114 {NULL, 0, -1, {-1, -1, -1}},
115 {"N", 0, NUMERIC, {FMT_F, 7, 0}},
116 {"NU", 0, NUMERIC, {FMT_F, 7, 0}},
119 /* Missing value types. */
120 enum missing_treatment
122 ITEMWISE, /* Missing values item by item. */
123 COLUMNWISE /* Missing values column by column. */
126 /* An entire AGGREGATE procedure. */
129 /* We have either an output file or a sink. */
130 struct any_writer *writer; /* Output file, or null if none. */
131 struct case_sink *sink; /* Sink, or null if none. */
133 /* Break variables. */
134 struct sort_criteria *sort; /* Sort criteria. */
135 struct variable **break_vars; /* Break variables. */
136 size_t break_var_cnt; /* Number of break variables. */
137 struct ccase break_case; /* Last values of break variables. */
139 enum missing_treatment missing; /* How to treat missing values. */
140 struct agr_var *agr_vars; /* First aggregate variable. */
141 struct dictionary *dict; /* Aggregate dictionary. */
142 int case_cnt; /* Counts aggregated cases. */
143 struct ccase agr_case; /* Aggregate case for output. */
146 static void initialize_aggregate_info (struct agr_proc *,
147 const struct ccase *);
150 static int parse_aggregate_functions (struct agr_proc *);
151 static void agr_destroy (struct agr_proc *);
152 static int aggregate_single_case (struct agr_proc *agr,
153 const struct ccase *input,
154 struct ccase *output);
155 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
157 /* Aggregating to the active file. */
158 static bool agr_to_active_file (struct ccase *, void *aux);
160 /* Aggregating to a system file. */
161 static bool presorted_agr_to_sysfile (struct ccase *, void *aux);
165 /* Parses and executes the AGGREGATE procedure. */
170 struct file_handle *out_file = NULL;
172 bool copy_documents = false;
173 bool presorted = false;
176 memset(&agr, 0 , sizeof (agr));
177 agr.missing = ITEMWISE;
178 case_nullify (&agr.break_case);
180 agr.dict = dict_create ();
181 dict_set_label (agr.dict, dict_get_label (default_dict));
182 dict_set_documents (agr.dict, dict_get_documents (default_dict));
184 /* OUTFILE subcommand must be first. */
185 if (!lex_force_match_id ("OUTFILE"))
188 if (!lex_match ('*'))
190 out_file = fh_parse (FH_REF_FILE | FH_REF_SCRATCH);
191 if (out_file == NULL)
195 /* Read most of the subcommands. */
200 if (lex_match_id ("MISSING"))
203 if (!lex_match_id ("COLUMNWISE"))
205 lex_error (_("while expecting COLUMNWISE"));
208 agr.missing = COLUMNWISE;
210 else if (lex_match_id ("DOCUMENT"))
211 copy_documents = true;
212 else if (lex_match_id ("PRESORTED"))
214 else if (lex_match_id ("BREAK"))
219 agr.sort = sort_parse_criteria (default_dict,
220 &agr.break_vars, &agr.break_var_cnt,
221 &saw_direction, NULL);
222 if (agr.sort == NULL)
225 for (i = 0; i < agr.break_var_cnt; i++)
226 dict_clone_var_assert (agr.dict, agr.break_vars[i],
227 agr.break_vars[i]->name);
229 /* BREAK must follow the options. */
234 lex_error (_("expecting BREAK"));
238 if (presorted && saw_direction)
239 msg (SW, _("When PRESORTED is specified, specifying sorting directions "
240 "with (A) or (D) has no effect. Output data will be sorted "
241 "the same way as the input data."));
243 /* Read in the aggregate functions. */
245 if (!parse_aggregate_functions (&agr))
248 /* Delete documents. */
250 dict_set_documents (agr.dict, NULL);
252 /* Cancel SPLIT FILE. */
253 dict_set_split_vars (agr.dict, NULL, 0);
257 case_create (&agr.agr_case, dict_get_next_value_idx (agr.dict));
259 /* Output to active file or external file? */
260 if (out_file == NULL)
262 /* The active file will be replaced by the aggregated data,
263 so TEMPORARY is moot. */
264 proc_cancel_temporary_transformations ();
266 if (agr.sort != NULL && !presorted)
268 if (!sort_active_file_in_place (agr.sort))
272 agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
273 if (agr.sink->class->open != NULL)
274 agr.sink->class->open (agr.sink);
275 proc_set_sink (create_case_sink (&null_sink_class, default_dict, NULL));
276 if (!procedure (agr_to_active_file, &agr))
278 if (agr.case_cnt > 0)
280 dump_aggregate_info (&agr, &agr.agr_case);
281 if (!agr.sink->class->write (agr.sink, &agr.agr_case))
284 discard_variables ();
285 default_dict = agr.dict;
287 proc_set_source (agr.sink->class->make_source (agr.sink));
288 free_case_sink (agr.sink);
292 agr.writer = any_writer_open (out_file, agr.dict);
293 if (agr.writer == NULL)
296 if (agr.sort != NULL && !presorted)
298 /* Sorting is needed. */
299 struct casefile *dst;
300 struct casereader *reader;
304 dst = sort_active_file_to_casefile (agr.sort);
307 reader = casefile_get_destructive_reader (dst);
308 while (ok && casereader_read_xfer (reader, &c))
310 if (aggregate_single_case (&agr, &c, &agr.agr_case))
311 ok = any_writer_write (agr.writer, &agr.agr_case);
314 casereader_destroy (reader);
316 ok = !casefile_error (dst);
317 casefile_destroy (dst);
323 /* Active file is already sorted. */
324 if (!procedure (presorted_agr_to_sysfile, &agr))
328 if (agr.case_cnt > 0)
330 dump_aggregate_info (&agr, &agr.agr_case);
331 any_writer_write (agr.writer, &agr.agr_case);
333 if (any_writer_error (agr.writer))
342 return CMD_CASCADING_FAILURE;
345 /* Parse all the aggregate functions. */
347 parse_aggregate_functions (struct agr_proc *agr)
349 struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
351 /* Parse everything. */
360 const struct agr_func *function;
365 struct variable **src;
379 /* Parse the list of target variables. */
380 while (!lex_match ('='))
382 size_t n_dest_prev = n_dest;
384 if (!parse_DATA_LIST_vars (&dest, &n_dest,
385 PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
388 /* Assign empty labels. */
392 dest_label = xnrealloc (dest_label, n_dest, sizeof *dest_label);
393 for (j = n_dest_prev; j < n_dest; j++)
394 dest_label[j] = NULL;
397 if (token == T_STRING)
399 ds_truncate (&tokstr, 255);
400 dest_label[n_dest - 1] = xstrdup (ds_c_str (&tokstr));
405 /* Get the name of the aggregation function. */
408 lex_error (_("expecting aggregation function"));
413 if (tokid[strlen (tokid) - 1] == '.')
416 tokid[strlen (tokid) - 1] = 0;
419 for (function = agr_func_tab; function->name; function++)
420 if (!strcasecmp (function->name, tokid))
422 if (NULL == function->name)
424 msg (SE, _("Unknown aggregation function %s."), tokid);
427 func_index = function - agr_func_tab;
430 /* Check for leading lparen. */
431 if (!lex_match ('('))
434 func_index = N_NO_VARS;
435 else if (func_index == NU)
436 func_index = NU_NO_VARS;
439 lex_error (_("expecting `('"));
445 /* Parse list of source variables. */
447 int pv_opts = PV_NO_SCRATCH;
449 if (func_index == SUM || func_index == MEAN || func_index == SD)
450 pv_opts |= PV_NUMERIC;
451 else if (function->n_args)
452 pv_opts |= PV_SAME_TYPE;
454 if (!parse_variables (default_dict, &src, &n_src, pv_opts))
458 /* Parse function arguments, for those functions that
459 require arguments. */
460 if (function->n_args != 0)
461 for (i = 0; i < function->n_args; i++)
466 if (token == T_STRING)
468 arg[i].c = xstrdup (ds_c_str (&tokstr));
471 else if (lex_is_number ())
476 msg (SE, _("Missing argument %d to %s."), i + 1,
483 if (type != src[0]->type)
485 msg (SE, _("Arguments to %s must be of same type as "
486 "source variables."),
492 /* Trailing rparen. */
495 lex_error (_("expecting `)'"));
499 /* Now check that the number of source variables match
500 the number of target variables. If we check earlier
501 than this, the user can get very misleading error
502 message, i.e. `AGGREGATE x=SUM(y t).' will get this
503 error message when a proper message would be more
504 like `unknown variable t'. */
507 msg (SE, _("Number of source variables (%u) does not match "
508 "number of target variables (%u)."),
509 (unsigned) n_src, (unsigned) n_dest);
513 if ((func_index == PIN || func_index == POUT
514 || func_index == FIN || func_index == FOUT)
515 && ((src[0]->type == NUMERIC && arg[0].f > arg[1].f)
516 || (src[0]->type == ALPHA
517 && str_compare_rpad (arg[0].c, arg[1].c) > 0)))
519 union value t = arg[0];
523 msg (SW, _("The value arguments passed to the %s function "
524 "are out-of-order. They will be treated as if "
525 "they had been specified in the correct order."),
530 /* Finally add these to the linked list of aggregation
532 for (i = 0; i < n_dest; i++)
534 struct agr_var *v = xmalloc (sizeof *v);
536 /* Add variable to chain. */
537 if (agr->agr_vars != NULL)
545 /* Create the target variable in the aggregate
548 struct variable *destvar;
550 v->function = func_index;
556 if (src[i]->type == ALPHA)
558 v->function |= FSTRING;
559 v->string = xmalloc (src[i]->width);
562 if (function->alpha_type == ALPHA)
563 destvar = dict_clone_var (agr->dict, v->src, dest[i]);
566 assert (v->src->type == NUMERIC
567 || function->alpha_type == NUMERIC);
568 destvar = dict_create_var (agr->dict, dest[i], 0);
571 if ((func_index == N || func_index == NMISS)
572 && dict_get_weight (default_dict) != NULL)
573 destvar->print = destvar->write = f8_2;
575 destvar->print = destvar->write = function->format;
580 destvar = dict_create_var (agr->dict, dest[i], 0);
581 if (func_index == N_NO_VARS
582 && dict_get_weight (default_dict) != NULL)
583 destvar->print = destvar->write = f8_2;
585 destvar->print = destvar->write = function->format;
590 msg (SE, _("Variable name %s is not unique within the "
591 "aggregate file dictionary, which contains "
592 "the aggregate variables and the break "
601 destvar->label = dest_label[i];
602 dest_label[i] = NULL;
608 v->include_missing = include_missing;
614 if (v->src->type == NUMERIC)
615 for (j = 0; j < function->n_args; j++)
616 v->arg[j].f = arg[j].f;
618 for (j = 0; j < function->n_args; j++)
619 v->arg[j].c = xstrdup (arg[j].c);
623 if (src != NULL && src[0]->type == ALPHA)
624 for (i = 0; i < function->n_args; i++)
634 if (!lex_match ('/'))
639 lex_error ("expecting end of command");
645 for (i = 0; i < n_dest; i++)
648 free (dest_label[i]);
654 if (src && n_src && src[0]->type == ALPHA)
655 for (i = 0; i < function->n_args; i++)
668 agr_destroy (struct agr_proc *agr)
670 struct agr_var *iter, *next;
672 any_writer_close (agr->writer);
673 if (agr->sort != NULL)
674 sort_destroy_criteria (agr->sort);
675 free (agr->break_vars);
676 case_destroy (&agr->break_case);
677 for (iter = agr->agr_vars; iter; iter = next)
681 if (iter->function & FSTRING)
686 n_args = agr_func_tab[iter->function & FUNC].n_args;
687 for (i = 0; i < n_args; i++)
688 free (iter->arg[i].c);
691 else if (iter->function == SD)
692 moments1_destroy (iter->moments);
695 if (agr->dict != NULL)
696 dict_destroy (agr->dict);
698 case_destroy (&agr->agr_case);
703 static void accumulate_aggregate_info (struct agr_proc *,
704 const struct ccase *);
705 static void dump_aggregate_info (struct agr_proc *, struct ccase *);
707 /* Processes a single case INPUT for aggregation. If output is
708 warranted, writes it to OUTPUT and returns nonzero.
709 Otherwise, returns zero and OUTPUT is unmodified. */
711 aggregate_single_case (struct agr_proc *agr,
712 const struct ccase *input, struct ccase *output)
714 bool finished_group = false;
716 if (agr->case_cnt++ == 0)
717 initialize_aggregate_info (agr, input);
718 else if (case_compare (&agr->break_case, input,
719 agr->break_vars, agr->break_var_cnt))
721 dump_aggregate_info (agr, output);
722 finished_group = true;
724 initialize_aggregate_info (agr, input);
727 accumulate_aggregate_info (agr, input);
728 return finished_group;
731 /* Accumulates aggregation data from the case INPUT. */
733 accumulate_aggregate_info (struct agr_proc *agr,
734 const struct ccase *input)
736 struct agr_var *iter;
740 weight = dict_get_case_weight (default_dict, input, &bad_warn);
742 for (iter = agr->agr_vars; iter; iter = iter->next)
745 const union value *v = case_data (input, iter->src->fv);
747 if ((!iter->include_missing
748 && mv_is_value_missing (&iter->src->miss, v))
749 || (iter->include_missing && iter->src->type == NUMERIC
752 switch (iter->function)
755 case NMISS | FSTRING:
756 iter->dbl[0] += weight;
759 case NUMISS | FSTRING:
767 /* This is horrible. There are too many possibilities. */
768 switch (iter->function)
771 iter->dbl[0] += v->f * weight;
775 iter->dbl[0] += v->f * weight;
776 iter->dbl[1] += weight;
779 moments1_add (iter->moments, v->f, weight);
782 iter->dbl[0] = max (iter->dbl[0], v->f);
786 if (memcmp (iter->string, v->s, iter->src->width) < 0)
787 memcpy (iter->string, v->s, iter->src->width);
791 iter->dbl[0] = min (iter->dbl[0], v->f);
795 if (memcmp (iter->string, v->s, iter->src->width) > 0)
796 memcpy (iter->string, v->s, iter->src->width);
801 if (v->f > iter->arg[0].f)
802 iter->dbl[0] += weight;
803 iter->dbl[1] += weight;
807 if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
808 iter->dbl[0] += weight;
809 iter->dbl[1] += weight;
813 if (v->f < iter->arg[0].f)
814 iter->dbl[0] += weight;
815 iter->dbl[1] += weight;
819 if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
820 iter->dbl[0] += weight;
821 iter->dbl[1] += weight;
825 if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
826 iter->dbl[0] += weight;
827 iter->dbl[1] += weight;
831 if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
832 && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
833 iter->dbl[0] += weight;
834 iter->dbl[1] += weight;
838 if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
839 iter->dbl[0] += weight;
840 iter->dbl[1] += weight;
844 if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
845 || memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
846 iter->dbl[0] += weight;
847 iter->dbl[1] += weight;
851 iter->dbl[0] += weight;
864 case FIRST | FSTRING:
867 memcpy (iter->string, v->s, iter->src->width);
876 memcpy (iter->string, v->s, iter->src->width);
880 case NMISS | FSTRING:
882 case NUMISS | FSTRING:
883 /* Our value is not missing or it would have been
884 caught earlier. Nothing to do. */
890 switch (iter->function)
893 iter->dbl[0] += weight;
904 /* We've come to a record that differs from the previous in one or
905 more of the break variables. Make an output record from the
906 accumulated statistics in the OUTPUT case. */
908 dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
914 for (i = 0; i < agr->break_var_cnt; i++)
916 struct variable *v = agr->break_vars[i];
917 memcpy (case_data_rw (output, value_idx),
918 case_data (&agr->break_case, v->fv),
919 sizeof (union value) * v->nv);
927 for (i = agr->agr_vars; i; i = i->next)
929 union value *v = case_data_rw (output, i->dest->fv);
931 if (agr->missing == COLUMNWISE && i->missing != 0
932 && (i->function & FUNC) != N && (i->function & FUNC) != NU
933 && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
935 if (i->dest->type == ALPHA)
936 memset (v->s, ' ', i->dest->width);
945 v->f = i->int1 ? i->dbl[0] : SYSMIS;
948 v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
954 /* FIXME: we should use two passes. */
955 moments1_calculate (i->moments, NULL, NULL, &variance,
957 if (variance != SYSMIS)
958 v->f = sqrt (variance);
965 v->f = i->int1 ? i->dbl[0] : SYSMIS;
970 memcpy (v->s, i->string, i->dest->width);
972 memset (v->s, ' ', i->dest->width);
982 v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
992 v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
1004 v->f = i->int1 ? i->dbl[0] : SYSMIS;
1006 case FIRST | FSTRING:
1007 case LAST | FSTRING:
1009 memcpy (v->s, i->string, i->dest->width);
1011 memset (v->s, ' ', i->dest->width);
1020 case NMISS | FSTRING:
1024 case NUMISS | FSTRING:
1034 /* Resets the state for all the aggregate functions. */
1036 initialize_aggregate_info (struct agr_proc *agr, const struct ccase *input)
1038 struct agr_var *iter;
1040 case_destroy (&agr->break_case);
1041 case_clone (&agr->break_case, input);
1043 for (iter = agr->agr_vars; iter; iter = iter->next)
1046 iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1047 iter->int1 = iter->int2 = 0;
1048 switch (iter->function)
1051 iter->dbl[0] = DBL_MAX;
1054 memset (iter->string, 255, iter->src->width);
1057 iter->dbl[0] = -DBL_MAX;
1060 memset (iter->string, 0, iter->src->width);
1063 if (iter->moments == NULL)
1064 iter->moments = moments1_create (MOMENT_VARIANCE);
1066 moments1_clear (iter->moments);
1074 /* Aggregate each case as it comes through. Cases which aren't needed
1076 Returns true if successful, false if an I/O error occurred. */
1078 agr_to_active_file (struct ccase *c, void *agr_)
1080 struct agr_proc *agr = agr_;
1082 if (aggregate_single_case (agr, c, &agr->agr_case))
1083 return agr->sink->class->write (agr->sink, &agr->agr_case);
1088 /* Aggregate the current case and output it if we passed a
1091 presorted_agr_to_sysfile (struct ccase *c, void *agr_)
1093 struct agr_proc *agr = agr_;
1095 if (aggregate_single_case (agr, c, &agr->agr_case))
1096 return any_writer_write (agr->writer, &agr->agr_case);