1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
26 #include "file-handle.h"
39 /* Specifies how to make an aggregate variable. */
42 struct agr_var *next; /* Next in list. */
44 /* Collected during parsing. */
45 struct variable *src; /* Source variable. */
46 struct variable *dest; /* Target variable. */
47 int function; /* Function. */
48 int include_missing; /* 1=Include user-missing values. */
49 union value arg[2]; /* Arguments. */
51 /* Accumulated during AGGREGATE execution. */
58 /* Aggregation functions. */
61 NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
62 FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
63 N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
64 FUNC = 0x1f, /* Function mask. */
65 FSTRING = 1<<5, /* String function bit. */
68 /* Attributes of an aggregation function. */
71 const char *name; /* Aggregation function name. */
72 int n_args; /* Number of arguments. */
73 int alpha_type; /* When given ALPHA arguments, output type. */
74 struct fmt_spec format; /* Format spec if alpha_type != ALPHA. */
77 /* Attributes of aggregation functions. */
78 static const struct agr_func agr_func_tab[] =
80 {"<NONE>", 0, -1, {0, 0, 0}},
81 {"SUM", 0, -1, {FMT_F, 8, 2}},
82 {"MEAN", 0, -1, {FMT_F, 8, 2}},
83 {"SD", 0, -1, {FMT_F, 8, 2}},
84 {"MAX", 0, ALPHA, {-1, -1, -1}},
85 {"MIN", 0, ALPHA, {-1, -1, -1}},
86 {"PGT", 1, NUMERIC, {FMT_F, 5, 1}},
87 {"PLT", 1, NUMERIC, {FMT_F, 5, 1}},
88 {"PIN", 2, NUMERIC, {FMT_F, 5, 1}},
89 {"POUT", 2, NUMERIC, {FMT_F, 5, 1}},
90 {"FGT", 1, NUMERIC, {FMT_F, 5, 3}},
91 {"FLT", 1, NUMERIC, {FMT_F, 5, 3}},
92 {"FIN", 2, NUMERIC, {FMT_F, 5, 3}},
93 {"FOUT", 2, NUMERIC, {FMT_F, 5, 3}},
94 {"N", 0, NUMERIC, {FMT_F, 7, 0}},
95 {"NU", 0, NUMERIC, {FMT_F, 7, 0}},
96 {"NMISS", 0, NUMERIC, {FMT_F, 7, 0}},
97 {"NUMISS", 0, NUMERIC, {FMT_F, 7, 0}},
98 {"FIRST", 0, ALPHA, {-1, -1, -1}},
99 {"LAST", 0, ALPHA, {-1, -1, -1}},
100 {NULL, 0, -1, {-1, -1, -1}},
101 {"N", 0, NUMERIC, {FMT_F, 7, 0}},
102 {"NU", 0, NUMERIC, {FMT_F, 7, 0}},
105 /* Missing value types. */
106 enum missing_treatment
108 ITEMWISE, /* Missing values item by item. */
109 COLUMNWISE /* Missing values column by column. */
112 /* An entire AGGREGATE procedure. */
115 /* We have either an output file or a sink. */
116 struct file_handle *out_file; /* Output file, or null if none. */
117 struct case_sink *sink; /* Sink, or null if none. */
119 enum missing_treatment missing; /* How to treat missing values. */
120 struct sort_cases_pgm *sort; /* Sort program. */
121 struct agr_var *vars; /* First aggregate variable. */
122 struct dictionary *dict; /* Aggregate dictionary. */
123 int case_cnt; /* Counts aggregated cases. */
124 union value *prev_break; /* Last values of break variables. */
125 struct ccase *agr_case; /* Aggregate case for output. */
126 flt64 *sfm_agr_case; /* Aggregate case in SFM format. */
129 static void initialize_aggregate_info (struct agr_proc *);
132 static int parse_aggregate_functions (struct agr_proc *);
133 static void agr_destroy (struct agr_proc *);
134 static int aggregate_single_case (struct agr_proc *agr,
135 const struct ccase *input,
136 struct ccase *output);
137 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
138 static int create_sysfile (struct agr_proc *);
140 /* Aggregating to the active file. */
141 static int agr_to_active_file (struct ccase *, void *aux);
143 /* Aggregating to a system file. */
144 static void write_case_to_sfm (struct agr_proc *agr);
145 static int presorted_agr_to_sysfile (struct ccase *, void *aux);
146 static int sort_agr_to_sysfile (const struct ccase *, void *aux);
150 /* Parses and executes the AGGREGATE procedure. */
156 /* Have we seen these subcommands? */
161 agr.missing = ITEMWISE;
166 agr.prev_break = NULL;
168 agr.dict = dict_create ();
169 dict_set_label (agr.dict, dict_get_label (default_dict));
170 dict_set_documents (agr.dict, dict_get_documents (default_dict));
172 /* Read most of the subcommands. */
177 if (lex_match_id ("OUTFILE"))
181 msg (SE, _("%s subcommand given multiple times."),"OUTFILE");
191 agr.out_file = fh_parse_file_handle ();
192 if (agr.out_file == NULL)
196 else if (lex_match_id ("MISSING"))
199 if (!lex_match_id ("COLUMNWISE"))
201 lex_error (_("while expecting COLUMNWISE"));
204 agr.missing = COLUMNWISE;
206 else if (lex_match_id ("DOCUMENT"))
208 else if (lex_match_id ("PRESORTED"))
210 else if (lex_match_id ("BREAK"))
214 msg (SE, _("%s subcommand given multiple times."),"BREAK");
220 agr.sort = parse_sort ();
221 if (agr.sort == NULL)
227 for (i = 0; i < agr.sort->var_cnt; i++)
231 v = dict_clone_var (agr.dict, agr.sort->vars[i],
232 agr.sort->vars[i]->name);
240 /* Check for proper syntax. */
242 msg (SW, _("BREAK subcommand not specified."));
244 /* Read in the aggregate functions. */
245 if (!parse_aggregate_functions (&agr))
248 /* Delete documents. */
250 dict_set_documents (agr.dict, NULL);
252 /* Cancel SPLIT FILE. */
253 dict_set_split_vars (agr.dict, NULL, 0);
257 agr.agr_case = xmalloc (dict_get_case_size (agr.dict));
258 initialize_aggregate_info (&agr);
260 /* Output to active file or external file? */
261 if (agr.out_file == NULL)
263 /* The active file will be replaced by the aggregated data,
264 so TEMPORARY is moot. */
267 if (agr.sort != NULL && (seen & 4) == 0)
268 sort_cases (agr.sort, 0);
270 agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
271 if (agr.sink->class->open != NULL)
272 agr.sink->class->open (agr.sink);
273 vfm_sink = create_case_sink (&null_sink_class, default_dict, NULL);
274 procedure (agr_to_active_file, &agr);
275 if (agr.case_cnt > 0)
277 dump_aggregate_info (&agr, agr.agr_case);
278 agr.sink->class->write (agr.sink, agr.agr_case);
280 dict_destroy (default_dict);
281 default_dict = agr.dict;
283 vfm_source = agr.sink->class->make_source (agr.sink);
284 free_case_sink (agr.sink);
288 if (!create_sysfile (&agr))
291 if (agr.sort != NULL && (seen & 4) == 0)
293 /* Sorting is needed. */
294 sort_cases (agr.sort, 1);
295 read_sort_output (agr.sort, sort_agr_to_sysfile, NULL);
299 /* Active file is already sorted. */
300 procedure (presorted_agr_to_sysfile, &agr);
303 if (agr.case_cnt > 0)
305 dump_aggregate_info (&agr, agr.agr_case);
306 write_case_to_sfm (&agr);
308 fh_close_handle (agr.out_file);
319 /* Create a system file for use in aggregation to an external
322 create_sysfile (struct agr_proc *agr)
324 struct sfm_write_info w;
327 w.compress = get_scompression();
328 if (!sfm_write_dictionary (&w))
331 agr->sfm_agr_case = xmalloc (sizeof *agr->sfm_agr_case * w.case_size);
336 /* Parse all the aggregate functions. */
338 parse_aggregate_functions (struct agr_proc *agr)
340 struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
342 /* Parse everything. */
351 const struct agr_func *function;
356 struct variable **src;
370 /* Parse the list of target variables. */
371 while (!lex_match ('='))
373 int n_dest_prev = n_dest;
375 if (!parse_DATA_LIST_vars (&dest, &n_dest, PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
378 /* Assign empty labels. */
382 dest_label = xrealloc (dest_label, sizeof *dest_label * n_dest);
383 for (j = n_dest_prev; j < n_dest; j++)
384 dest_label[j] = NULL;
387 if (token == T_STRING)
389 ds_truncate (&tokstr, 120);
390 dest_label[n_dest - 1] = xstrdup (ds_value (&tokstr));
395 /* Get the name of the aggregation function. */
398 lex_error (_("expecting aggregation function"));
403 if (tokid[strlen (tokid) - 1] == '.')
406 tokid[strlen (tokid) - 1] = 0;
409 for (function = agr_func_tab; function->name; function++)
410 if (!strcmp (function->name, tokid))
412 if (NULL == function->name)
414 msg (SE, _("Unknown aggregation function %s."), tokid);
417 func_index = function - agr_func_tab;
420 /* Check for leading lparen. */
421 if (!lex_match ('('))
424 func_index = N_NO_VARS;
425 else if (func_index == NU)
426 func_index = NU_NO_VARS;
429 lex_error (_("expecting `('"));
433 /* Parse list of source variables. */
435 int pv_opts = PV_NO_SCRATCH;
437 if (func_index == SUM || func_index == MEAN || func_index == SD)
438 pv_opts |= PV_NUMERIC;
439 else if (function->n_args)
440 pv_opts |= PV_SAME_TYPE;
442 if (!parse_variables (default_dict, &src, &n_src, pv_opts))
446 /* Parse function arguments, for those functions that
447 require arguments. */
448 if (function->n_args != 0)
449 for (i = 0; i < function->n_args; i++)
454 if (token == T_STRING)
456 arg[i].c = xstrdup (ds_value (&tokstr));
459 else if (token == T_NUM)
464 msg (SE, _("Missing argument %d to %s."), i + 1, function->name);
470 if (type != src[0]->type)
472 msg (SE, _("Arguments to %s must be of same type as "
473 "source variables."),
479 /* Trailing rparen. */
482 lex_error (_("expecting `)'"));
486 /* Now check that the number of source variables match the
487 number of target variables. Do this here because if we
488 do it earlier then the user can get very misleading error
489 messages; i.e., `AGGREGATE x=SUM(y t).' will get this
490 error message when a proper message would be more like
491 `unknown variable t'. */
494 msg (SE, _("Number of source variables (%d) does not match "
495 "number of target variables (%d)."),
501 /* Finally add these to the linked list of aggregation
503 for (i = 0; i < n_dest; i++)
505 struct agr_var *v = xmalloc (sizeof *v);
507 /* Add variable to chain. */
508 if (agr->vars != NULL)
515 /* Create the target variable in the aggregate
518 struct variable *destvar;
520 v->function = func_index;
528 if (src[i]->type == ALPHA)
530 v->function |= FSTRING;
531 v->string = xmalloc (src[i]->width);
534 if (v->src->type == NUMERIC || function->alpha_type == NUMERIC)
537 output_width = v->src->width;
539 if (function->alpha_type == ALPHA)
540 destvar = dict_clone_var (agr->dict, v->src, dest[i]);
543 destvar = dict_create_var (agr->dict, dest[i], output_width);
544 if (output_width == 0)
545 destvar->print = destvar->write = function->format;
546 if (output_width == 0 && dict_get_weight (default_dict) != NULL
547 && (func_index == N || func_index == N_NO_VARS
548 || func_index == NU || func_index == NU_NO_VARS))
550 struct fmt_spec f = {FMT_F, 8, 2};
552 destvar->print = destvar->write = f;
557 destvar = dict_create_var (agr->dict, dest[i], 0);
562 msg (SE, _("Variable name %s is not unique within the "
563 "aggregate file dictionary, which contains "
564 "the aggregate variables and the break "
575 destvar->label = dest_label[i];
576 dest_label[i] = NULL;
578 else if (function->alpha_type == ALPHA)
579 destvar->print = destvar->write = function->format;
584 v->include_missing = include_missing;
590 if (v->src->type == NUMERIC)
591 for (j = 0; j < function->n_args; j++)
592 v->arg[j].f = arg[j].f;
594 for (j = 0; j < function->n_args; j++)
595 v->arg[j].c = xstrdup (arg[j].c);
599 if (src != NULL && src[0]->type == ALPHA)
600 for (i = 0; i < function->n_args; i++)
610 if (!lex_match ('/'))
615 lex_error ("expecting end of command");
621 for (i = 0; i < n_dest; i++)
624 free (dest_label[i]);
630 if (src && n_src && src[0]->type == ALPHA)
631 for (i = 0; i < function->n_args; i++)
644 agr_destroy (struct agr_proc *agr)
646 struct agr_var *iter, *next;
648 if (agr->dict != NULL)
649 dict_destroy (agr->dict);
650 if (agr->sort != NULL)
651 destroy_sort_cases_pgm (agr->sort);
652 for (iter = agr->vars; iter; iter = next)
656 if (iter->function & FSTRING)
661 n_args = agr_func_tab[iter->function & FUNC].n_args;
662 for (i = 0; i < n_args; i++)
663 free (iter->arg[i].c);
668 free (agr->prev_break);
669 free (agr->agr_case);
674 static void accumulate_aggregate_info (struct agr_proc *,
675 const struct ccase *);
676 static void dump_aggregate_info (struct agr_proc *, struct ccase *);
678 /* Processes a single case INPUT for aggregation. If output is
679 warranted, writes it to OUTPUT and returns nonzero.
680 Otherwise, returns zero and OUTPUT is unmodified. */
682 aggregate_single_case (struct agr_proc *agr,
683 const struct ccase *input, struct ccase *output)
685 /* The first case always begins a new break group. We also need to
686 preserve the values of the case for later comparison. */
687 if (agr->case_cnt++ == 0)
694 for (i = 0; i < agr->sort->var_cnt; i++)
695 n_elem += agr->sort->vars[i]->nv;
698 agr->prev_break = xmalloc (sizeof *agr->prev_break * n_elem);
700 /* Copy INPUT into prev_break. */
702 union value *iter = agr->prev_break;
705 for (i = 0; i < agr->sort->var_cnt; i++)
707 struct variable *v = agr->sort->vars[i];
709 if (v->type == NUMERIC)
710 (iter++)->f = input->data[v->fv].f;
713 memcpy (iter->s, input->data[v->fv].s, v->width);
719 accumulate_aggregate_info (agr, input);
724 /* Compare the value of each break variable to the values on the
727 union value *iter = agr->prev_break;
730 for (i = 0; i < agr->sort->var_cnt; i++)
732 struct variable *v = agr->sort->vars[i];
737 if (input->data[v->fv].f != iter->f)
742 if (memcmp (input->data[v->fv].s, iter->s, v->width))
752 accumulate_aggregate_info (agr, input);
757 /* The values of the break variable are different from the values on
758 the previous case. That means that it's time to dump aggregate
760 dump_aggregate_info (agr, output);
761 initialize_aggregate_info (agr);
762 accumulate_aggregate_info (agr, input);
764 /* Copy INPUT into prev_break. */
766 union value *iter = agr->prev_break;
769 for (i = 0; i < agr->sort->var_cnt; i++)
771 struct variable *v = agr->sort->vars[i];
773 if (v->type == NUMERIC)
774 (iter++)->f = input->data[v->fv].f;
777 memcpy (iter->s, input->data[v->fv].s, v->width);
786 /* Accumulates aggregation data from the case INPUT. */
788 accumulate_aggregate_info (struct agr_proc *agr,
789 const struct ccase *input)
791 struct agr_var *iter;
794 weight = dict_get_case_weight (default_dict, input);
796 for (iter = agr->vars; iter; iter = iter->next)
799 const union value *v = &input->data[iter->src->fv];
801 if ((!iter->include_missing && is_missing (v, iter->src))
802 || (iter->include_missing && iter->src->type == NUMERIC
805 switch (iter->function)
808 iter->dbl[0] += weight;
818 /* This is horrible. There are too many possibilities. */
819 switch (iter->function)
822 iter->dbl[0] += v->f;
825 iter->dbl[0] += v->f * weight;
826 iter->dbl[1] += weight;
830 double product = v->f * weight;
831 iter->dbl[0] += product;
832 iter->dbl[1] += product * v->f;
833 iter->dbl[2] += weight;
837 iter->dbl[0] = max (iter->dbl[0], v->f);
841 if (memcmp (iter->string, v->s, iter->src->width) < 0)
842 memcpy (iter->string, v->s, iter->src->width);
846 iter->dbl[0] = min (iter->dbl[0], v->f);
850 if (memcmp (iter->string, v->s, iter->src->width) > 0)
851 memcpy (iter->string, v->s, iter->src->width);
856 if (v->f > iter->arg[0].f)
857 iter->dbl[0] += weight;
858 iter->dbl[1] += weight;
862 if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
863 iter->dbl[0] += weight;
864 iter->dbl[1] += weight;
868 if (v->f < iter->arg[0].f)
869 iter->dbl[0] += weight;
870 iter->dbl[1] += weight;
874 if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
875 iter->dbl[0] += weight;
876 iter->dbl[1] += weight;
880 if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
881 iter->dbl[0] += weight;
882 iter->dbl[1] += weight;
886 if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
887 && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
888 iter->dbl[0] += weight;
889 iter->dbl[1] += weight;
893 if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
894 iter->dbl[0] += weight;
895 iter->dbl[1] += weight;
899 if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
900 && memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
901 iter->dbl[0] += weight;
902 iter->dbl[1] += weight;
905 iter->dbl[0] += weight;
917 case FIRST | FSTRING:
920 memcpy (iter->string, v->s, iter->src->width);
929 memcpy (iter->string, v->s, iter->src->width);
936 switch (iter->function)
939 iter->dbl[0] += weight;
950 /* We've come to a record that differs from the previous in one or
951 more of the break variables. Make an output record from the
952 accumulated statistics in the OUTPUT case. */
954 dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
962 for (i = 0; i < agr->sort->var_cnt; i++)
963 n_elem += agr->sort->vars[i]->nv;
965 memcpy (output->data, agr->prev_break, sizeof (union value) * n_elem);
971 for (i = agr->vars; i; i = i->next)
973 union value *v = &output->data[i->dest->fv];
975 if (agr->missing == COLUMNWISE && i->missing != 0
976 && (i->function & FUNC) != N && (i->function & FUNC) != NU
977 && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
979 if (i->function & FSTRING)
980 memset (v->s, ' ', i->dest->width);
992 v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
995 v->f = ((i->dbl[2] > 1.0)
996 ? calc_stddev (calc_variance (i->dbl, i->dbl[2]))
1001 v->f = i->int1 ? i->dbl[0] : SYSMIS;
1006 memcpy (v->s, i->string, i->dest->width);
1008 memset (v->s, ' ', i->dest->width);
1013 case FOUT | FSTRING:
1014 v->f = i->int2 ? (double) i->int1 / (double) i->int2 : SYSMIS;
1020 v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
1029 case POUT | FSTRING:
1030 v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
1040 v->f = i->int1 ? i->dbl[0] : SYSMIS;
1042 case FIRST | FSTRING:
1043 case LAST | FSTRING:
1045 memcpy (v->s, i->string, i->dest->width);
1047 memset (v->s, ' ', i->dest->width);
1068 /* Resets the state for all the aggregate functions. */
1070 initialize_aggregate_info (struct agr_proc *agr)
1072 struct agr_var *iter;
1074 for (iter = agr->vars; iter; iter = iter->next)
1077 switch (iter->function)
1080 iter->dbl[0] = DBL_MAX;
1083 memset (iter->string, 255, iter->src->width);
1086 iter->dbl[0] = -DBL_MAX;
1089 memset (iter->string, 0, iter->src->width);
1092 iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1093 iter->int1 = iter->int2 = 0;
1099 /* Aggregate each case as it comes through. Cases which aren't needed
1102 agr_to_active_file (struct ccase *c, void *agr_)
1104 struct agr_proc *agr = agr_;
1106 if (aggregate_single_case (agr, c, agr->agr_case))
1107 agr->sink->class->write (agr->sink, agr->agr_case);
1112 /* Writes AGR->agr_case to AGR->out_file. */
1114 write_case_to_sfm (struct agr_proc *agr)
1119 p = agr->sfm_agr_case;
1120 for (i = 0; i < dict_get_var_cnt (agr->dict); i++)
1122 struct variable *v = dict_get_var (agr->dict, i);
1124 if (v->type == NUMERIC)
1126 double src = agr->agr_case->data[v->fv].f;
1134 memcpy (p, agr->agr_case->data[v->fv].s, v->width);
1135 memset (&((char *) p)[v->width], ' ',
1136 REM_RND_UP (v->width, sizeof (flt64)));
1137 p += DIV_RND_UP (v->width, sizeof (flt64));
1141 sfm_write_case (agr->out_file, agr->sfm_agr_case, p - agr->sfm_agr_case);
1144 /* Aggregate the current case and output it if we passed a
1147 presorted_agr_to_sysfile (struct ccase *c, void *agr_)
1149 sort_agr_to_sysfile (c, agr_);
1153 /* Aggregate the current case and output it if we passed a
1156 sort_agr_to_sysfile (const struct ccase *c, void *agr_)
1158 struct agr_proc *agr = agr_;
1160 if (aggregate_single_case (agr, c, agr->agr_case))
1161 write_case_to_sfm (agr);