1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
24 #include <data/any-writer.h>
25 #include <data/case-sink.h>
26 #include <data/case.h>
27 #include <data/casefile.h>
28 #include <data/dictionary.h>
29 #include <data/file-handle-def.h>
30 #include <data/procedure.h>
31 #include <data/settings.h>
32 #include <data/storage-stream.h>
33 #include <data/sys-file-writer.h>
34 #include <data/variable.h>
35 #include <language/command.h>
36 #include <language/data-io/file-handle.h>
37 #include <language/lexer/lexer.h>
38 #include <language/stats/sort-criteria.h>
39 #include <libpspp/alloc.h>
40 #include <libpspp/message.h>
41 #include <libpspp/message.h>
42 #include <libpspp/misc.h>
43 #include <libpspp/pool.h>
44 #include <libpspp/str.h>
45 #include <math/moments.h>
46 #include <math/sort.h>
49 #define _(msgid) gettext (msgid)
51 /* Argument for AGGREGATE function. */
54 double f; /* Numeric. */
55 char *c; /* Short or long string. */
58 /* Specifies how to make an aggregate variable. */
61 struct agr_var *next; /* Next in list. */
63 /* Collected during parsing. */
64 struct variable *src; /* Source variable. */
65 struct variable *dest; /* Target variable. */
66 int function; /* Function. */
67 int include_missing; /* 1=Include user-missing values. */
68 union agr_argument arg[2]; /* Arguments. */
70 /* Accumulated during AGGREGATE execution. */
75 struct moments1 *moments;
78 /* Aggregation functions. */
81 NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
82 FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
83 N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
84 FUNC = 0x1f, /* Function mask. */
85 FSTRING = 1<<5, /* String function bit. */
88 /* Attributes of an aggregation function. */
91 const char *name; /* Aggregation function name. */
92 size_t n_args; /* Number of arguments. */
93 int alpha_type; /* When given ALPHA arguments, output type. */
94 struct fmt_spec format; /* Format spec if alpha_type != ALPHA. */
97 /* Attributes of aggregation functions. */
98 static const struct agr_func agr_func_tab[] =
100 {"<NONE>", 0, -1, {0, 0, 0}},
101 {"SUM", 0, -1, {FMT_F, 8, 2}},
102 {"MEAN", 0, -1, {FMT_F, 8, 2}},
103 {"SD", 0, -1, {FMT_F, 8, 2}},
104 {"MAX", 0, ALPHA, {-1, -1, -1}},
105 {"MIN", 0, ALPHA, {-1, -1, -1}},
106 {"PGT", 1, NUMERIC, {FMT_F, 5, 1}},
107 {"PLT", 1, NUMERIC, {FMT_F, 5, 1}},
108 {"PIN", 2, NUMERIC, {FMT_F, 5, 1}},
109 {"POUT", 2, NUMERIC, {FMT_F, 5, 1}},
110 {"FGT", 1, NUMERIC, {FMT_F, 5, 3}},
111 {"FLT", 1, NUMERIC, {FMT_F, 5, 3}},
112 {"FIN", 2, NUMERIC, {FMT_F, 5, 3}},
113 {"FOUT", 2, NUMERIC, {FMT_F, 5, 3}},
114 {"N", 0, NUMERIC, {FMT_F, 7, 0}},
115 {"NU", 0, NUMERIC, {FMT_F, 7, 0}},
116 {"NMISS", 0, NUMERIC, {FMT_F, 7, 0}},
117 {"NUMISS", 0, NUMERIC, {FMT_F, 7, 0}},
118 {"FIRST", 0, ALPHA, {-1, -1, -1}},
119 {"LAST", 0, ALPHA, {-1, -1, -1}},
120 {NULL, 0, -1, {-1, -1, -1}},
121 {"N", 0, NUMERIC, {FMT_F, 7, 0}},
122 {"NU", 0, NUMERIC, {FMT_F, 7, 0}},
125 /* Missing value types. */
126 enum missing_treatment
128 ITEMWISE, /* Missing values item by item. */
129 COLUMNWISE /* Missing values column by column. */
132 /* An entire AGGREGATE procedure. */
135 /* We have either an output file or a sink. */
136 struct any_writer *writer; /* Output file, or null if none. */
137 struct case_sink *sink; /* Sink, or null if none. */
139 /* Break variables. */
140 struct sort_criteria *sort; /* Sort criteria. */
141 struct variable **break_vars; /* Break variables. */
142 size_t break_var_cnt; /* Number of break variables. */
143 struct ccase break_case; /* Last values of break variables. */
145 enum missing_treatment missing; /* How to treat missing values. */
146 struct agr_var *agr_vars; /* First aggregate variable. */
147 struct dictionary *dict; /* Aggregate dictionary. */
148 int case_cnt; /* Counts aggregated cases. */
149 struct ccase agr_case; /* Aggregate case for output. */
152 static void initialize_aggregate_info (struct agr_proc *,
153 const struct ccase *);
156 static int parse_aggregate_functions (struct agr_proc *);
157 static void agr_destroy (struct agr_proc *);
158 static int aggregate_single_case (struct agr_proc *agr,
159 const struct ccase *input,
160 struct ccase *output);
161 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
163 /* Aggregating to the active file. */
164 static bool agr_to_active_file (const struct ccase *, void *aux);
166 /* Aggregating to a system file. */
167 static bool presorted_agr_to_sysfile (const struct ccase *, void *aux);
171 /* Parses and executes the AGGREGATE procedure. */
176 struct file_handle *out_file = NULL;
178 bool copy_documents = false;
179 bool presorted = false;
182 memset(&agr, 0 , sizeof (agr));
183 agr.missing = ITEMWISE;
184 case_nullify (&agr.break_case);
186 agr.dict = dict_create ();
187 dict_set_label (agr.dict, dict_get_label (default_dict));
188 dict_set_documents (agr.dict, dict_get_documents (default_dict));
190 /* OUTFILE subcommand must be first. */
191 if (!lex_force_match_id ("OUTFILE"))
194 if (!lex_match ('*'))
196 out_file = fh_parse (FH_REF_FILE | FH_REF_SCRATCH);
197 if (out_file == NULL)
201 /* Read most of the subcommands. */
206 if (lex_match_id ("MISSING"))
209 if (!lex_match_id ("COLUMNWISE"))
211 lex_error (_("while expecting COLUMNWISE"));
214 agr.missing = COLUMNWISE;
216 else if (lex_match_id ("DOCUMENT"))
217 copy_documents = true;
218 else if (lex_match_id ("PRESORTED"))
220 else if (lex_match_id ("BREAK"))
225 agr.sort = sort_parse_criteria (default_dict,
226 &agr.break_vars, &agr.break_var_cnt,
227 &saw_direction, NULL);
228 if (agr.sort == NULL)
231 for (i = 0; i < agr.break_var_cnt; i++)
232 dict_clone_var_assert (agr.dict, agr.break_vars[i],
233 agr.break_vars[i]->name);
235 /* BREAK must follow the options. */
240 lex_error (_("expecting BREAK"));
244 if (presorted && saw_direction)
245 msg (SW, _("When PRESORTED is specified, specifying sorting directions "
246 "with (A) or (D) has no effect. Output data will be sorted "
247 "the same way as the input data."));
249 /* Read in the aggregate functions. */
251 if (!parse_aggregate_functions (&agr))
254 /* Delete documents. */
256 dict_set_documents (agr.dict, NULL);
258 /* Cancel SPLIT FILE. */
259 dict_set_split_vars (agr.dict, NULL, 0);
263 case_create (&agr.agr_case, dict_get_next_value_idx (agr.dict));
265 /* Output to active file or external file? */
266 if (out_file == NULL)
268 /* The active file will be replaced by the aggregated data,
269 so TEMPORARY is moot. */
270 proc_cancel_temporary_transformations ();
272 if (agr.sort != NULL && !presorted)
274 if (!sort_active_file_in_place (agr.sort))
278 agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
279 if (agr.sink->class->open != NULL)
280 agr.sink->class->open (agr.sink);
281 proc_set_sink (create_case_sink (&null_sink_class, default_dict, NULL));
282 if (!procedure (agr_to_active_file, &agr))
284 if (agr.case_cnt > 0)
286 dump_aggregate_info (&agr, &agr.agr_case);
287 if (!agr.sink->class->write (agr.sink, &agr.agr_case))
290 discard_variables ();
291 default_dict = agr.dict;
293 proc_set_source (agr.sink->class->make_source (agr.sink));
294 free_case_sink (agr.sink);
298 agr.writer = any_writer_open (out_file, agr.dict);
299 if (agr.writer == NULL)
302 if (agr.sort != NULL && !presorted)
304 /* Sorting is needed. */
305 struct casefile *dst;
306 struct casereader *reader;
310 dst = sort_active_file_to_casefile (agr.sort);
313 reader = casefile_get_destructive_reader (dst);
314 while (ok && casereader_read_xfer (reader, &c))
316 if (aggregate_single_case (&agr, &c, &agr.agr_case))
317 ok = any_writer_write (agr.writer, &agr.agr_case);
320 casereader_destroy (reader);
322 ok = !casefile_error (dst);
323 casefile_destroy (dst);
329 /* Active file is already sorted. */
330 if (!procedure (presorted_agr_to_sysfile, &agr))
334 if (agr.case_cnt > 0)
336 dump_aggregate_info (&agr, &agr.agr_case);
337 any_writer_write (agr.writer, &agr.agr_case);
339 if (any_writer_error (agr.writer))
348 return CMD_CASCADING_FAILURE;
351 /* Parse all the aggregate functions. */
353 parse_aggregate_functions (struct agr_proc *agr)
355 struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
357 /* Parse everything. */
366 const struct agr_func *function;
369 union agr_argument arg[2];
371 struct variable **src;
385 /* Parse the list of target variables. */
386 while (!lex_match ('='))
388 size_t n_dest_prev = n_dest;
390 if (!parse_DATA_LIST_vars (&dest, &n_dest,
391 PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
394 /* Assign empty labels. */
398 dest_label = xnrealloc (dest_label, n_dest, sizeof *dest_label);
399 for (j = n_dest_prev; j < n_dest; j++)
400 dest_label[j] = NULL;
403 if (token == T_STRING)
405 ds_truncate (&tokstr, 255);
406 dest_label[n_dest - 1] = xstrdup (ds_c_str (&tokstr));
411 /* Get the name of the aggregation function. */
414 lex_error (_("expecting aggregation function"));
419 if (tokid[strlen (tokid) - 1] == '.')
422 tokid[strlen (tokid) - 1] = 0;
425 for (function = agr_func_tab; function->name; function++)
426 if (!strcasecmp (function->name, tokid))
428 if (NULL == function->name)
430 msg (SE, _("Unknown aggregation function %s."), tokid);
433 func_index = function - agr_func_tab;
436 /* Check for leading lparen. */
437 if (!lex_match ('('))
440 func_index = N_NO_VARS;
441 else if (func_index == NU)
442 func_index = NU_NO_VARS;
445 lex_error (_("expecting `('"));
451 /* Parse list of source variables. */
453 int pv_opts = PV_NO_SCRATCH;
455 if (func_index == SUM || func_index == MEAN || func_index == SD)
456 pv_opts |= PV_NUMERIC;
457 else if (function->n_args)
458 pv_opts |= PV_SAME_TYPE;
460 if (!parse_variables (default_dict, &src, &n_src, pv_opts))
464 /* Parse function arguments, for those functions that
465 require arguments. */
466 if (function->n_args != 0)
467 for (i = 0; i < function->n_args; i++)
472 if (token == T_STRING)
474 arg[i].c = xstrdup (ds_c_str (&tokstr));
477 else if (lex_is_number ())
482 msg (SE, _("Missing argument %d to %s."), i + 1,
489 if (type != src[0]->type)
491 msg (SE, _("Arguments to %s must be of same type as "
492 "source variables."),
498 /* Trailing rparen. */
501 lex_error (_("expecting `)'"));
505 /* Now check that the number of source variables match
506 the number of target variables. If we check earlier
507 than this, the user can get very misleading error
508 message, i.e. `AGGREGATE x=SUM(y t).' will get this
509 error message when a proper message would be more
510 like `unknown variable t'. */
513 msg (SE, _("Number of source variables (%u) does not match "
514 "number of target variables (%u)."),
515 (unsigned) n_src, (unsigned) n_dest);
519 if ((func_index == PIN || func_index == POUT
520 || func_index == FIN || func_index == FOUT)
521 && ((src[0]->type == NUMERIC && arg[0].f > arg[1].f)
522 || (src[0]->type == ALPHA
523 && str_compare_rpad (arg[0].c, arg[1].c) > 0)))
525 union agr_argument t = arg[0];
529 msg (SW, _("The value arguments passed to the %s function "
530 "are out-of-order. They will be treated as if "
531 "they had been specified in the correct order."),
536 /* Finally add these to the linked list of aggregation
538 for (i = 0; i < n_dest; i++)
540 struct agr_var *v = xmalloc (sizeof *v);
542 /* Add variable to chain. */
543 if (agr->agr_vars != NULL)
551 /* Create the target variable in the aggregate
554 struct variable *destvar;
556 v->function = func_index;
562 if (src[i]->type == ALPHA)
564 v->function |= FSTRING;
565 v->string = xmalloc (src[i]->width);
568 if (function->alpha_type == ALPHA)
569 destvar = dict_clone_var (agr->dict, v->src, dest[i]);
572 assert (v->src->type == NUMERIC
573 || function->alpha_type == NUMERIC);
574 destvar = dict_create_var (agr->dict, dest[i], 0);
577 if ((func_index == N || func_index == NMISS)
578 && dict_get_weight (default_dict) != NULL)
579 destvar->print = destvar->write = f8_2;
581 destvar->print = destvar->write = function->format;
586 destvar = dict_create_var (agr->dict, dest[i], 0);
587 if (func_index == N_NO_VARS
588 && dict_get_weight (default_dict) != NULL)
589 destvar->print = destvar->write = f8_2;
591 destvar->print = destvar->write = function->format;
596 msg (SE, _("Variable name %s is not unique within the "
597 "aggregate file dictionary, which contains "
598 "the aggregate variables and the break "
607 destvar->label = dest_label[i];
608 dest_label[i] = NULL;
614 v->include_missing = include_missing;
620 if (v->src->type == NUMERIC)
621 for (j = 0; j < function->n_args; j++)
622 v->arg[j].f = arg[j].f;
624 for (j = 0; j < function->n_args; j++)
625 v->arg[j].c = xstrdup (arg[j].c);
629 if (src != NULL && src[0]->type == ALPHA)
630 for (i = 0; i < function->n_args; i++)
640 if (!lex_match ('/'))
645 lex_error ("expecting end of command");
651 for (i = 0; i < n_dest; i++)
654 free (dest_label[i]);
660 if (src && n_src && src[0]->type == ALPHA)
661 for (i = 0; i < function->n_args; i++)
674 agr_destroy (struct agr_proc *agr)
676 struct agr_var *iter, *next;
678 any_writer_close (agr->writer);
679 if (agr->sort != NULL)
680 sort_destroy_criteria (agr->sort);
681 free (agr->break_vars);
682 case_destroy (&agr->break_case);
683 for (iter = agr->agr_vars; iter; iter = next)
687 if (iter->function & FSTRING)
692 n_args = agr_func_tab[iter->function & FUNC].n_args;
693 for (i = 0; i < n_args; i++)
694 free (iter->arg[i].c);
697 else if (iter->function == SD)
698 moments1_destroy (iter->moments);
701 if (agr->dict != NULL)
702 dict_destroy (agr->dict);
704 case_destroy (&agr->agr_case);
709 static void accumulate_aggregate_info (struct agr_proc *,
710 const struct ccase *);
711 static void dump_aggregate_info (struct agr_proc *, struct ccase *);
713 /* Processes a single case INPUT for aggregation. If output is
714 warranted, writes it to OUTPUT and returns nonzero.
715 Otherwise, returns zero and OUTPUT is unmodified. */
717 aggregate_single_case (struct agr_proc *agr,
718 const struct ccase *input, struct ccase *output)
720 bool finished_group = false;
722 if (agr->case_cnt++ == 0)
723 initialize_aggregate_info (agr, input);
724 else if (case_compare (&agr->break_case, input,
725 agr->break_vars, agr->break_var_cnt))
727 dump_aggregate_info (agr, output);
728 finished_group = true;
730 initialize_aggregate_info (agr, input);
733 accumulate_aggregate_info (agr, input);
734 return finished_group;
737 /* Accumulates aggregation data from the case INPUT. */
739 accumulate_aggregate_info (struct agr_proc *agr,
740 const struct ccase *input)
742 struct agr_var *iter;
746 weight = dict_get_case_weight (default_dict, input, &bad_warn);
748 for (iter = agr->agr_vars; iter; iter = iter->next)
751 const union value *v = case_data (input, iter->src->fv);
753 if ((!iter->include_missing
754 && mv_is_value_missing (&iter->src->miss, v))
755 || (iter->include_missing && iter->src->type == NUMERIC
758 switch (iter->function)
761 case NMISS | FSTRING:
762 iter->dbl[0] += weight;
765 case NUMISS | FSTRING:
773 /* This is horrible. There are too many possibilities. */
774 switch (iter->function)
777 iter->dbl[0] += v->f * weight;
781 iter->dbl[0] += v->f * weight;
782 iter->dbl[1] += weight;
785 moments1_add (iter->moments, v->f, weight);
788 iter->dbl[0] = max (iter->dbl[0], v->f);
792 if (memcmp (iter->string, v->s, iter->src->width) < 0)
793 memcpy (iter->string, v->s, iter->src->width);
797 iter->dbl[0] = min (iter->dbl[0], v->f);
801 if (memcmp (iter->string, v->s, iter->src->width) > 0)
802 memcpy (iter->string, v->s, iter->src->width);
807 if (v->f > iter->arg[0].f)
808 iter->dbl[0] += weight;
809 iter->dbl[1] += weight;
813 if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
814 iter->dbl[0] += weight;
815 iter->dbl[1] += weight;
819 if (v->f < iter->arg[0].f)
820 iter->dbl[0] += weight;
821 iter->dbl[1] += weight;
825 if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
826 iter->dbl[0] += weight;
827 iter->dbl[1] += weight;
831 if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
832 iter->dbl[0] += weight;
833 iter->dbl[1] += weight;
837 if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
838 && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
839 iter->dbl[0] += weight;
840 iter->dbl[1] += weight;
844 if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
845 iter->dbl[0] += weight;
846 iter->dbl[1] += weight;
850 if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
851 || memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
852 iter->dbl[0] += weight;
853 iter->dbl[1] += weight;
857 iter->dbl[0] += weight;
870 case FIRST | FSTRING:
873 memcpy (iter->string, v->s, iter->src->width);
882 memcpy (iter->string, v->s, iter->src->width);
886 case NMISS | FSTRING:
888 case NUMISS | FSTRING:
889 /* Our value is not missing or it would have been
890 caught earlier. Nothing to do. */
896 switch (iter->function)
899 iter->dbl[0] += weight;
910 /* We've come to a record that differs from the previous in one or
911 more of the break variables. Make an output record from the
912 accumulated statistics in the OUTPUT case. */
914 dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
920 for (i = 0; i < agr->break_var_cnt; i++)
922 struct variable *v = agr->break_vars[i];
923 memcpy (case_data_rw (output, value_idx),
924 case_data (&agr->break_case, v->fv),
925 sizeof (union value) * v->nv);
933 for (i = agr->agr_vars; i; i = i->next)
935 union value *v = case_data_rw (output, i->dest->fv);
937 if (agr->missing == COLUMNWISE && i->missing != 0
938 && (i->function & FUNC) != N && (i->function & FUNC) != NU
939 && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
941 if (i->dest->type == ALPHA)
942 memset (v->s, ' ', i->dest->width);
951 v->f = i->int1 ? i->dbl[0] : SYSMIS;
954 v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
960 /* FIXME: we should use two passes. */
961 moments1_calculate (i->moments, NULL, NULL, &variance,
963 if (variance != SYSMIS)
964 v->f = sqrt (variance);
971 v->f = i->int1 ? i->dbl[0] : SYSMIS;
976 memcpy (v->s, i->string, i->dest->width);
978 memset (v->s, ' ', i->dest->width);
988 v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
998 v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
1010 v->f = i->int1 ? i->dbl[0] : SYSMIS;
1012 case FIRST | FSTRING:
1013 case LAST | FSTRING:
1015 memcpy (v->s, i->string, i->dest->width);
1017 memset (v->s, ' ', i->dest->width);
1026 case NMISS | FSTRING:
1030 case NUMISS | FSTRING:
1040 /* Resets the state for all the aggregate functions. */
1042 initialize_aggregate_info (struct agr_proc *agr, const struct ccase *input)
1044 struct agr_var *iter;
1046 case_destroy (&agr->break_case);
1047 case_clone (&agr->break_case, input);
1049 for (iter = agr->agr_vars; iter; iter = iter->next)
1052 iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1053 iter->int1 = iter->int2 = 0;
1054 switch (iter->function)
1057 iter->dbl[0] = DBL_MAX;
1060 memset (iter->string, 255, iter->src->width);
1063 iter->dbl[0] = -DBL_MAX;
1066 memset (iter->string, 0, iter->src->width);
1069 if (iter->moments == NULL)
1070 iter->moments = moments1_create (MOMENT_VARIANCE);
1072 moments1_clear (iter->moments);
1080 /* Aggregate each case as it comes through. Cases which aren't needed
1082 Returns true if successful, false if an I/O error occurred. */
1084 agr_to_active_file (const struct ccase *c, void *agr_)
1086 struct agr_proc *agr = agr_;
1088 if (aggregate_single_case (agr, c, &agr->agr_case))
1089 return agr->sink->class->write (agr->sink, &agr->agr_case);
1094 /* Aggregate the current case and output it if we passed a
1097 presorted_agr_to_sysfile (const struct ccase *c, void *agr_)
1099 struct agr_proc *agr = agr_;
1101 if (aggregate_single_case (agr, c, &agr->agr_case))
1102 return any_writer_write (agr->writer, &agr->agr_case);