1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
27 #include "dictionary.h"
29 #include "file-handle.h"
35 #include "sfm-write.h"
42 /* Specifies how to make an aggregate variable. */
45 struct agr_var *next; /* Next in list. */
47 /* Collected during parsing. */
48 struct variable *src; /* Source variable. */
49 struct variable *dest; /* Target variable. */
50 int function; /* Function. */
51 int include_missing; /* 1=Include user-missing values. */
52 union value arg[2]; /* Arguments. */
54 /* Accumulated during AGGREGATE execution. */
59 struct moments1 *moments;
62 /* Aggregation functions. */
65 NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
66 FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
67 N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
68 FUNC = 0x1f, /* Function mask. */
69 FSTRING = 1<<5, /* String function bit. */
72 /* Attributes of an aggregation function. */
75 const char *name; /* Aggregation function name. */
76 int n_args; /* Number of arguments. */
77 int alpha_type; /* When given ALPHA arguments, output type. */
78 struct fmt_spec format; /* Format spec if alpha_type != ALPHA. */
81 /* Attributes of aggregation functions. */
82 static const struct agr_func agr_func_tab[] =
84 {"<NONE>", 0, -1, {0, 0, 0}},
85 {"SUM", 0, -1, {FMT_F, 8, 2}},
86 {"MEAN", 0, -1, {FMT_F, 8, 2}},
87 {"SD", 0, -1, {FMT_F, 8, 2}},
88 {"MAX", 0, ALPHA, {-1, -1, -1}},
89 {"MIN", 0, ALPHA, {-1, -1, -1}},
90 {"PGT", 1, NUMERIC, {FMT_F, 5, 1}},
91 {"PLT", 1, NUMERIC, {FMT_F, 5, 1}},
92 {"PIN", 2, NUMERIC, {FMT_F, 5, 1}},
93 {"POUT", 2, NUMERIC, {FMT_F, 5, 1}},
94 {"FGT", 1, NUMERIC, {FMT_F, 5, 3}},
95 {"FLT", 1, NUMERIC, {FMT_F, 5, 3}},
96 {"FIN", 2, NUMERIC, {FMT_F, 5, 3}},
97 {"FOUT", 2, NUMERIC, {FMT_F, 5, 3}},
98 {"N", 0, NUMERIC, {FMT_F, 7, 0}},
99 {"NU", 0, NUMERIC, {FMT_F, 7, 0}},
100 {"NMISS", 0, NUMERIC, {FMT_F, 7, 0}},
101 {"NUMISS", 0, NUMERIC, {FMT_F, 7, 0}},
102 {"FIRST", 0, ALPHA, {-1, -1, -1}},
103 {"LAST", 0, ALPHA, {-1, -1, -1}},
104 {NULL, 0, -1, {-1, -1, -1}},
105 {"N", 0, NUMERIC, {FMT_F, 7, 0}},
106 {"NU", 0, NUMERIC, {FMT_F, 7, 0}},
109 /* Missing value types. */
110 enum missing_treatment
112 ITEMWISE, /* Missing values item by item. */
113 COLUMNWISE /* Missing values column by column. */
116 /* An entire AGGREGATE procedure. */
119 /* We have either an output file or a sink. */
120 struct sfm_writer *writer; /* Output file, or null if none. */
121 struct case_sink *sink; /* Sink, or null if none. */
123 /* Break variables. */
124 struct sort_criteria *sort; /* Sort criteria. */
125 struct variable **break_vars; /* Break variables. */
126 size_t break_var_cnt; /* Number of break variables. */
127 union value *prev_break; /* Last values of break variables. */
129 enum missing_treatment missing; /* How to treat missing values. */
130 struct agr_var *agr_vars; /* First aggregate variable. */
131 struct dictionary *dict; /* Aggregate dictionary. */
132 int case_cnt; /* Counts aggregated cases. */
133 struct ccase agr_case; /* Aggregate case for output. */
136 static void initialize_aggregate_info (struct agr_proc *);
139 static int parse_aggregate_functions (struct agr_proc *);
140 static void agr_destroy (struct agr_proc *);
141 static int aggregate_single_case (struct agr_proc *agr,
142 const struct ccase *input,
143 struct ccase *output);
144 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
146 /* Aggregating to the active file. */
147 static int agr_to_active_file (struct ccase *, void *aux);
149 /* Aggregating to a system file. */
150 static int presorted_agr_to_sysfile (struct ccase *, void *aux);
154 /* Parses and executes the AGGREGATE procedure. */
159 struct file_handle *out_file = NULL;
161 /* Have we seen these subcommands? */
164 memset(&agr, 0 , sizeof (agr));
165 agr.missing = ITEMWISE;
167 agr.dict = dict_create ();
168 dict_set_label (agr.dict, dict_get_label (default_dict));
169 dict_set_documents (agr.dict, dict_get_documents (default_dict));
171 /* Read most of the subcommands. */
176 if (lex_match_id ("OUTFILE"))
180 msg (SE, _("%s subcommand given multiple times."),"OUTFILE");
186 if (!lex_match ('*'))
188 out_file = fh_parse ();
189 if (out_file == NULL)
193 else if (lex_match_id ("MISSING"))
196 if (!lex_match_id ("COLUMNWISE"))
198 lex_error (_("while expecting COLUMNWISE"));
201 agr.missing = COLUMNWISE;
203 else if (lex_match_id ("DOCUMENT"))
205 else if (lex_match_id ("PRESORTED"))
207 else if (lex_match_id ("BREAK"))
213 msg (SE, _("%s subcommand given multiple times."),"BREAK");
219 agr.sort = sort_parse_criteria (default_dict,
220 &agr.break_vars, &agr.break_var_cnt);
221 if (agr.sort == NULL)
224 for (i = 0; i < agr.break_var_cnt; i++)
226 struct variable *v = dict_clone_var (agr.dict, agr.break_vars[i],
227 agr.break_vars[i]->name);
234 /* Check for proper syntax. */
236 msg (SW, _("BREAK subcommand not specified."));
238 /* Read in the aggregate functions. */
239 if (!parse_aggregate_functions (&agr))
242 /* Delete documents. */
244 dict_set_documents (agr.dict, NULL);
246 /* Cancel SPLIT FILE. */
247 dict_set_split_vars (agr.dict, NULL, 0);
251 case_create (&agr.agr_case, dict_get_next_value_idx (agr.dict));
252 initialize_aggregate_info (&agr);
254 /* Output to active file or external file? */
255 if (out_file == NULL)
257 /* The active file will be replaced by the aggregated data,
258 so TEMPORARY is moot. */
261 if (agr.sort != NULL && (seen & 4) == 0)
262 sort_active_file_in_place (agr.sort);
264 agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
265 if (agr.sink->class->open != NULL)
266 agr.sink->class->open (agr.sink);
267 vfm_sink = create_case_sink (&null_sink_class, default_dict, NULL);
268 procedure (agr_to_active_file, &agr);
269 if (agr.case_cnt > 0)
271 dump_aggregate_info (&agr, &agr.agr_case);
272 agr.sink->class->write (agr.sink, &agr.agr_case);
274 dict_destroy (default_dict);
275 default_dict = agr.dict;
277 vfm_source = agr.sink->class->make_source (agr.sink);
278 free_case_sink (agr.sink);
282 agr.writer = sfm_open_writer (out_file, agr.dict, get_scompression ());
283 if (agr.writer == NULL)
286 if (agr.sort != NULL && (seen & 4) == 0)
288 /* Sorting is needed. */
289 struct casefile *dst;
290 struct casereader *reader;
293 dst = sort_active_file_to_casefile (agr.sort);
296 reader = casefile_get_destructive_reader (dst);
297 while (casereader_read_xfer (reader, &c))
299 if (aggregate_single_case (&agr, &c, &agr.agr_case))
300 sfm_write_case (agr.writer, &agr.agr_case);
303 casereader_destroy (reader);
304 casefile_destroy (dst);
308 /* Active file is already sorted. */
309 procedure (presorted_agr_to_sysfile, &agr);
312 if (agr.case_cnt > 0)
314 dump_aggregate_info (&agr, &agr.agr_case);
315 sfm_write_case (agr.writer, &agr.agr_case);
327 /* Parse all the aggregate functions. */
329 parse_aggregate_functions (struct agr_proc *agr)
331 struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
333 /* Parse everything. */
342 const struct agr_func *function;
347 struct variable **src;
361 /* Parse the list of target variables. */
362 while (!lex_match ('='))
364 int n_dest_prev = n_dest;
366 if (!parse_DATA_LIST_vars (&dest, &n_dest,
367 PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
370 /* Assign empty labels. */
374 dest_label = xrealloc (dest_label, sizeof *dest_label * n_dest);
375 for (j = n_dest_prev; j < n_dest; j++)
376 dest_label[j] = NULL;
379 if (token == T_STRING)
381 ds_truncate (&tokstr, 255);
382 dest_label[n_dest - 1] = xstrdup (ds_c_str (&tokstr));
387 /* Get the name of the aggregation function. */
390 lex_error (_("expecting aggregation function"));
395 if (tokid[strlen (tokid) - 1] == '.')
398 tokid[strlen (tokid) - 1] = 0;
401 for (function = agr_func_tab; function->name; function++)
402 if (!strcmp (function->name, tokid))
404 if (NULL == function->name)
406 msg (SE, _("Unknown aggregation function %s."), tokid);
409 func_index = function - agr_func_tab;
412 /* Check for leading lparen. */
413 if (!lex_match ('('))
416 func_index = N_NO_VARS;
417 else if (func_index == NU)
418 func_index = NU_NO_VARS;
421 lex_error (_("expecting `('"));
427 /* Parse list of source variables. */
429 int pv_opts = PV_NO_SCRATCH;
431 if (func_index == SUM || func_index == MEAN || func_index == SD)
432 pv_opts |= PV_NUMERIC;
433 else if (function->n_args)
434 pv_opts |= PV_SAME_TYPE;
436 if (!parse_variables (default_dict, &src, &n_src, pv_opts))
440 /* Parse function arguments, for those functions that
441 require arguments. */
442 if (function->n_args != 0)
443 for (i = 0; i < function->n_args; i++)
448 if (token == T_STRING)
450 arg[i].c = xstrdup (ds_c_str (&tokstr));
453 else if (lex_is_number ())
458 msg (SE, _("Missing argument %d to %s."), i + 1, function->name);
464 if (type != src[0]->type)
466 msg (SE, _("Arguments to %s must be of same type as "
467 "source variables."),
473 /* Trailing rparen. */
476 lex_error (_("expecting `)'"));
480 /* Now check that the number of source variables match the
481 number of target variables. Do this here because if we
482 do it earlier then the user can get very misleading error
483 messages; i.e., `AGGREGATE x=SUM(y t).' will get this
484 error message when a proper message would be more like
485 `unknown variable t'. */
488 msg (SE, _("Number of source variables (%d) does not match "
489 "number of target variables (%d)."),
495 /* Finally add these to the linked list of aggregation
497 for (i = 0; i < n_dest; i++)
499 struct agr_var *v = xmalloc (sizeof *v);
501 /* Add variable to chain. */
502 if (agr->agr_vars != NULL)
510 /* Create the target variable in the aggregate
513 struct variable *destvar;
515 v->function = func_index;
521 if (src[i]->type == ALPHA)
523 v->function |= FSTRING;
524 v->string = xmalloc (src[i]->width);
527 if (function->alpha_type == ALPHA)
528 destvar = dict_clone_var (agr->dict, v->src, dest[i]);
529 else if (v->src->type == NUMERIC
530 || function->alpha_type == NUMERIC)
532 destvar = dict_create_var (agr->dict, dest[i], 0);
535 || func_index == N_NO_VARS
536 || func_index == NMISS)
537 && dict_get_weight (default_dict) != NULL)
539 static const struct fmt_spec f8_2 = {FMT_F, 8, 2};
541 destvar->print = destvar->write = f8_2;
544 destvar->print = destvar->write = function->format;
547 destvar = dict_create_var (agr->dict, dest[i],
551 destvar = dict_create_var (agr->dict, dest[i], 0);
556 msg (SE, _("Variable name %s is not unique within the "
557 "aggregate file dictionary, which contains "
558 "the aggregate variables and the break "
569 destvar->label = dest_label[i];
570 dest_label[i] = NULL;
576 v->include_missing = include_missing;
582 if (v->src->type == NUMERIC)
583 for (j = 0; j < function->n_args; j++)
584 v->arg[j].f = arg[j].f;
586 for (j = 0; j < function->n_args; j++)
587 v->arg[j].c = xstrdup (arg[j].c);
591 if (src != NULL && src[0]->type == ALPHA)
592 for (i = 0; i < function->n_args; i++)
602 if (!lex_match ('/'))
607 lex_error ("expecting end of command");
613 for (i = 0; i < n_dest; i++)
616 free (dest_label[i]);
622 if (src && n_src && src[0]->type == ALPHA)
623 for (i = 0; i < function->n_args; i++)
636 agr_destroy (struct agr_proc *agr)
638 struct agr_var *iter, *next;
640 sfm_close_writer (agr->writer);
641 if (agr->sort != NULL)
642 sort_destroy_criteria (agr->sort);
643 free (agr->break_vars);
644 free (agr->prev_break);
645 for (iter = agr->agr_vars; iter; iter = next)
649 if (iter->function & FSTRING)
654 n_args = agr_func_tab[iter->function & FUNC].n_args;
655 for (i = 0; i < n_args; i++)
656 free (iter->arg[i].c);
659 else if (iter->function == SD)
660 moments1_destroy (iter->moments);
663 if (agr->dict != NULL)
664 dict_destroy (agr->dict);
666 case_destroy (&agr->agr_case);
671 static void accumulate_aggregate_info (struct agr_proc *,
672 const struct ccase *);
673 static void dump_aggregate_info (struct agr_proc *, struct ccase *);
675 /* Processes a single case INPUT for aggregation. If output is
676 warranted, writes it to OUTPUT and returns nonzero.
677 Otherwise, returns zero and OUTPUT is unmodified. */
679 aggregate_single_case (struct agr_proc *agr,
680 const struct ccase *input, struct ccase *output)
682 /* The first case always begins a new break group. We also need to
683 preserve the values of the case for later comparison. */
684 if (agr->case_cnt++ == 0)
691 for (i = 0; i < agr->break_var_cnt; i++)
692 n_elem += agr->break_vars[i]->nv;
695 agr->prev_break = xmalloc (sizeof *agr->prev_break * n_elem);
697 /* Copy INPUT into prev_break. */
699 union value *iter = agr->prev_break;
702 for (i = 0; i < agr->break_var_cnt; i++)
704 struct variable *v = agr->break_vars[i];
706 if (v->type == NUMERIC)
707 (iter++)->f = case_num (input, v->fv);
710 memcpy (iter->s, case_str (input, v->fv), v->width);
716 accumulate_aggregate_info (agr, input);
721 /* Compare the value of each break variable to the values on the
724 union value *iter = agr->prev_break;
727 for (i = 0; i < agr->break_var_cnt; i++)
729 struct variable *v = agr->break_vars[i];
734 if (case_num (input, v->fv) != iter->f)
739 if (memcmp (case_str (input, v->fv), iter->s, v->width))
749 accumulate_aggregate_info (agr, input);
754 /* The values of the break variable are different from the values on
755 the previous case. That means that it's time to dump aggregate
757 dump_aggregate_info (agr, output);
758 initialize_aggregate_info (agr);
759 accumulate_aggregate_info (agr, input);
761 /* Copy INPUT into prev_break. */
763 union value *iter = agr->prev_break;
766 for (i = 0; i < agr->break_var_cnt; i++)
768 struct variable *v = agr->break_vars[i];
770 if (v->type == NUMERIC)
771 (iter++)->f = case_num (input, v->fv);
774 memcpy (iter->s, case_str (input, v->fv), v->width);
783 /* Accumulates aggregation data from the case INPUT. */
785 accumulate_aggregate_info (struct agr_proc *agr,
786 const struct ccase *input)
788 struct agr_var *iter;
792 weight = dict_get_case_weight (default_dict, input, &bad_warn);
794 for (iter = agr->agr_vars; iter; iter = iter->next)
797 const union value *v = case_data (input, iter->src->fv);
799 if ((!iter->include_missing && is_missing (v, iter->src))
800 || (iter->include_missing && iter->src->type == NUMERIC
803 switch (iter->function)
806 iter->dbl[0] += weight;
816 /* This is horrible. There are too many possibilities. */
817 switch (iter->function)
820 iter->dbl[0] += v->f;
823 iter->dbl[0] += v->f * weight;
824 iter->dbl[1] += weight;
827 moments1_add (iter->moments, v->f, weight);
830 iter->dbl[0] = max (iter->dbl[0], v->f);
834 if (memcmp (iter->string, v->s, iter->src->width) < 0)
835 memcpy (iter->string, v->s, iter->src->width);
839 iter->dbl[0] = min (iter->dbl[0], v->f);
843 if (memcmp (iter->string, v->s, iter->src->width) > 0)
844 memcpy (iter->string, v->s, iter->src->width);
849 if (v->f > iter->arg[0].f)
850 iter->dbl[0] += weight;
851 iter->dbl[1] += weight;
855 if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
856 iter->dbl[0] += weight;
857 iter->dbl[1] += weight;
861 if (v->f < iter->arg[0].f)
862 iter->dbl[0] += weight;
863 iter->dbl[1] += weight;
867 if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
868 iter->dbl[0] += weight;
869 iter->dbl[1] += weight;
873 if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
874 iter->dbl[0] += weight;
875 iter->dbl[1] += weight;
879 if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
880 && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
881 iter->dbl[0] += weight;
882 iter->dbl[1] += weight;
886 if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
887 iter->dbl[0] += weight;
888 iter->dbl[1] += weight;
892 if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
893 && memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
894 iter->dbl[0] += weight;
895 iter->dbl[1] += weight;
898 iter->dbl[0] += weight;
910 case FIRST | FSTRING:
913 memcpy (iter->string, v->s, iter->src->width);
922 memcpy (iter->string, v->s, iter->src->width);
929 switch (iter->function)
932 iter->dbl[0] += weight;
943 /* We've come to a record that differs from the previous in one or
944 more of the break variables. Make an output record from the
945 accumulated statistics in the OUTPUT case. */
947 dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
953 for (i = 0; i < agr->break_var_cnt; i++)
955 int nv = agr->break_vars[i]->nv;
956 memcpy (case_data_rw (output, value_idx),
957 &agr->prev_break[value_idx],
958 sizeof (union value) * nv);
966 for (i = agr->agr_vars; i; i = i->next)
968 union value *v = case_data_rw (output, i->dest->fv);
970 if (agr->missing == COLUMNWISE && i->missing != 0
971 && (i->function & FUNC) != N && (i->function & FUNC) != NU
972 && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
974 if (i->function & FSTRING)
975 memset (v->s, ' ', i->dest->width);
987 v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
993 /* FIXME: we should use two passes. */
994 moments1_calculate (i->moments, NULL, NULL, &variance,
996 if (variance != SYSMIS)
997 v->f = sqrt (variance);
1004 v->f = i->int1 ? i->dbl[0] : SYSMIS;
1009 memcpy (v->s, i->string, i->dest->width);
1011 memset (v->s, ' ', i->dest->width);
1016 case FOUT | FSTRING:
1017 v->f = i->int2 ? (double) i->int1 / (double) i->int2 : SYSMIS;
1023 v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
1032 case POUT | FSTRING:
1033 v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
1043 v->f = i->int1 ? i->dbl[0] : SYSMIS;
1045 case FIRST | FSTRING:
1046 case LAST | FSTRING:
1048 memcpy (v->s, i->string, i->dest->width);
1050 memset (v->s, ' ', i->dest->width);
1071 /* Resets the state for all the aggregate functions. */
1073 initialize_aggregate_info (struct agr_proc *agr)
1075 struct agr_var *iter;
1077 for (iter = agr->agr_vars; iter; iter = iter->next)
1080 switch (iter->function)
1083 iter->dbl[0] = DBL_MAX;
1086 memset (iter->string, 255, iter->src->width);
1089 iter->dbl[0] = -DBL_MAX;
1092 memset (iter->string, 0, iter->src->width);
1095 if (iter->moments == NULL)
1096 iter->moments = moments1_create (MOMENT_VARIANCE);
1098 moments1_clear (iter->moments);
1101 iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1102 iter->int1 = iter->int2 = 0;
1108 /* Aggregate each case as it comes through. Cases which aren't needed
1111 agr_to_active_file (struct ccase *c, void *agr_)
1113 struct agr_proc *agr = agr_;
1115 if (aggregate_single_case (agr, c, &agr->agr_case))
1116 agr->sink->class->write (agr->sink, &agr->agr_case);
1121 /* Aggregate the current case and output it if we passed a
1124 presorted_agr_to_sysfile (struct ccase *c, void *agr_)
1126 struct agr_proc *agr = agr_;
1128 if (aggregate_single_case (agr, c, &agr->agr_case))
1129 sfm_write_case (agr->writer, &agr->agr_case);