1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
26 #include "file-handle.h"
39 /* Specifies how to make an aggregate variable. */
42 struct agr_var *next; /* Next in list. */
44 /* Collected during parsing. */
45 struct variable *src; /* Source variable. */
46 struct variable *dest; /* Target variable. */
47 int function; /* Function. */
48 int include_missing; /* 1=Include user-missing values. */
49 union value arg[2]; /* Arguments. */
51 /* Accumulated during AGGREGATE execution. */
58 /* Aggregation functions. */
61 NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
62 FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
63 N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
64 FUNC = 0x1f, /* Function mask. */
65 FSTRING = 1<<5, /* String function bit. */
68 /* Attributes of an aggregation function. */
71 const char *name; /* Aggregation function name. */
72 int n_args; /* Number of arguments. */
73 int alpha_type; /* When given ALPHA arguments, output type. */
74 struct fmt_spec format; /* Format spec if alpha_type != ALPHA. */
77 /* Attributes of aggregation functions. */
78 static const struct agr_func agr_func_tab[] =
80 {"<NONE>", 0, -1, {0, 0, 0}},
81 {"SUM", 0, -1, {FMT_F, 8, 2}},
82 {"MEAN", 0, -1, {FMT_F, 8, 2}},
83 {"SD", 0, -1, {FMT_F, 8, 2}},
84 {"MAX", 0, ALPHA, {-1, -1, -1}},
85 {"MIN", 0, ALPHA, {-1, -1, -1}},
86 {"PGT", 1, NUMERIC, {FMT_F, 5, 1}},
87 {"PLT", 1, NUMERIC, {FMT_F, 5, 1}},
88 {"PIN", 2, NUMERIC, {FMT_F, 5, 1}},
89 {"POUT", 2, NUMERIC, {FMT_F, 5, 1}},
90 {"FGT", 1, NUMERIC, {FMT_F, 5, 3}},
91 {"FLT", 1, NUMERIC, {FMT_F, 5, 3}},
92 {"FIN", 2, NUMERIC, {FMT_F, 5, 3}},
93 {"FOUT", 2, NUMERIC, {FMT_F, 5, 3}},
94 {"N", 0, NUMERIC, {FMT_F, 7, 0}},
95 {"NU", 0, NUMERIC, {FMT_F, 7, 0}},
96 {"NMISS", 0, NUMERIC, {FMT_F, 7, 0}},
97 {"NUMISS", 0, NUMERIC, {FMT_F, 7, 0}},
98 {"FIRST", 0, ALPHA, {-1, -1, -1}},
99 {"LAST", 0, ALPHA, {-1, -1, -1}},
100 {NULL, 0, -1, {-1, -1, -1}},
101 {"N", 0, NUMERIC, {FMT_F, 7, 0}},
102 {"NU", 0, NUMERIC, {FMT_F, 7, 0}},
105 /* Missing value types. */
106 enum missing_treatment
108 ITEMWISE, /* Missing values item by item. */
109 COLUMNWISE /* Missing values column by column. */
112 /* An entire AGGREGATE procedure. */
115 /* We have either an output file or a sink. */
116 struct file_handle *out_file; /* Output file, or null if none. */
117 struct case_sink *sink; /* Sink, or null if none. */
119 enum missing_treatment missing; /* How to treat missing values. */
120 struct sort_cases_pgm *sort; /* Sort program. */
121 struct agr_var *vars; /* First aggregate variable. */
122 struct dictionary *dict; /* Aggregate dictionary. */
123 int case_cnt; /* Counts aggregated cases. */
124 union value *prev_break; /* Last values of break variables. */
125 struct ccase *agr_case; /* Aggregate case for output. */
126 flt64 *sfm_agr_case; /* Aggregate case in SFM format. */
129 static void initialize_aggregate_info (struct agr_proc *);
132 static int parse_aggregate_functions (struct agr_proc *);
133 static void agr_destroy (struct agr_proc *);
134 static int aggregate_single_case (struct agr_proc *agr,
135 const struct ccase *input,
136 struct ccase *output);
137 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
138 static int create_sysfile (struct agr_proc *);
140 /* Aggregating to the active file. */
141 static int agr_to_active_file (struct ccase *, void *aux);
143 /* Aggregating to a system file. */
144 static void write_case_to_sfm (struct agr_proc *agr);
145 static int presorted_agr_to_sysfile (struct ccase *, void *aux);
146 static int sort_agr_to_sysfile (const struct ccase *, void *aux);
150 /* Parses and executes the AGGREGATE procedure. */
156 /* Have we seen these subcommands? */
161 agr.missing = ITEMWISE;
166 agr.prev_break = NULL;
168 agr.dict = dict_create ();
169 dict_set_label (agr.dict, dict_get_label (default_dict));
170 dict_set_documents (agr.dict, dict_get_documents (default_dict));
172 lex_match_id ("AGGREGATE");
174 /* Read most of the subcommands. */
179 if (lex_match_id ("OUTFILE"))
183 msg (SE, _("%s subcommand given multiple times."),"OUTFILE");
193 agr.out_file = fh_parse_file_handle ();
194 if (agr.out_file == NULL)
198 else if (lex_match_id ("MISSING"))
201 if (!lex_match_id ("COLUMNWISE"))
203 lex_error (_("while expecting COLUMNWISE"));
206 agr.missing = COLUMNWISE;
208 else if (lex_match_id ("DOCUMENT"))
210 else if (lex_match_id ("PRESORTED"))
212 else if (lex_match_id ("BREAK"))
216 msg (SE, _("%s subcommand given multiple times."),"BREAK");
222 agr.sort = parse_sort ();
223 if (agr.sort == NULL)
229 for (i = 0; i < agr.sort->var_cnt; i++)
233 v = dict_clone_var (agr.dict, agr.sort->vars[i],
234 agr.sort->vars[i]->name);
242 /* Check for proper syntax. */
244 msg (SW, _("BREAK subcommand not specified."));
246 /* Read in the aggregate functions. */
247 if (!parse_aggregate_functions (&agr))
250 /* Delete documents. */
252 dict_set_documents (agr.dict, NULL);
254 /* Cancel SPLIT FILE. */
255 dict_set_split_vars (agr.dict, NULL, 0);
259 agr.agr_case = xmalloc (dict_get_case_size (agr.dict));
260 initialize_aggregate_info (&agr);
262 /* Output to active file or external file? */
263 if (agr.out_file == NULL)
265 /* The active file will be replaced by the aggregated data,
266 so TEMPORARY is moot. */
269 if (agr.sort != NULL && (seen & 4) == 0)
270 sort_cases (agr.sort, 0);
272 agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
273 if (agr.sink->class->open != NULL)
274 agr.sink->class->open (agr.sink);
275 vfm_sink = create_case_sink (&null_sink_class, default_dict, NULL);
276 procedure (agr_to_active_file, &agr);
277 if (agr.case_cnt > 0)
279 dump_aggregate_info (&agr, agr.agr_case);
280 agr.sink->class->write (agr.sink, agr.agr_case);
282 dict_destroy (default_dict);
283 default_dict = agr.dict;
285 vfm_source = agr.sink->class->make_source (agr.sink);
286 free_case_sink (agr.sink);
290 if (!create_sysfile (&agr))
293 if (agr.sort != NULL && (seen & 4) == 0)
295 /* Sorting is needed. */
296 sort_cases (agr.sort, 1);
297 read_sort_output (agr.sort, sort_agr_to_sysfile, NULL);
301 /* Active file is already sorted. */
302 procedure (presorted_agr_to_sysfile, &agr);
305 if (agr.case_cnt > 0)
307 dump_aggregate_info (&agr, agr.agr_case);
308 write_case_to_sfm (&agr);
310 fh_close_handle (agr.out_file);
321 /* Create a system file for use in aggregation to an external
324 create_sysfile (struct agr_proc *agr)
326 struct sfm_write_info w;
329 w.compress = set_scompression;
330 if (!sfm_write_dictionary (&w))
333 agr->sfm_agr_case = xmalloc (sizeof *agr->sfm_agr_case * w.case_size);
338 /* Parse all the aggregate functions. */
340 parse_aggregate_functions (struct agr_proc *agr)
342 struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
344 /* Parse everything. */
353 const struct agr_func *function;
358 struct variable **src;
372 /* Parse the list of target variables. */
373 while (!lex_match ('='))
375 int n_dest_prev = n_dest;
377 if (!parse_DATA_LIST_vars (&dest, &n_dest, PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
380 /* Assign empty labels. */
384 dest_label = xrealloc (dest_label, sizeof *dest_label * n_dest);
385 for (j = n_dest_prev; j < n_dest; j++)
386 dest_label[j] = NULL;
389 if (token == T_STRING)
391 ds_truncate (&tokstr, 120);
392 dest_label[n_dest - 1] = xstrdup (ds_value (&tokstr));
397 /* Get the name of the aggregation function. */
400 lex_error (_("expecting aggregation function"));
405 if (tokid[strlen (tokid) - 1] == '.')
408 tokid[strlen (tokid) - 1] = 0;
411 for (function = agr_func_tab; function->name; function++)
412 if (!strcmp (function->name, tokid))
414 if (NULL == function->name)
416 msg (SE, _("Unknown aggregation function %s."), tokid);
419 func_index = function - agr_func_tab;
422 /* Check for leading lparen. */
423 if (!lex_match ('('))
426 func_index = N_NO_VARS;
427 else if (func_index == NU)
428 func_index = NU_NO_VARS;
431 lex_error (_("expecting `('"));
435 /* Parse list of source variables. */
437 int pv_opts = PV_NO_SCRATCH;
439 if (func_index == SUM || func_index == MEAN || func_index == SD)
440 pv_opts |= PV_NUMERIC;
441 else if (function->n_args)
442 pv_opts |= PV_SAME_TYPE;
444 if (!parse_variables (default_dict, &src, &n_src, pv_opts))
448 /* Parse function arguments, for those functions that
449 require arguments. */
450 if (function->n_args != 0)
451 for (i = 0; i < function->n_args; i++)
456 if (token == T_STRING)
458 arg[i].c = xstrdup (ds_value (&tokstr));
461 else if (token == T_NUM)
466 msg (SE, _("Missing argument %d to %s."), i + 1, function->name);
472 if (type != src[0]->type)
474 msg (SE, _("Arguments to %s must be of same type as "
475 "source variables."),
481 /* Trailing rparen. */
484 lex_error (_("expecting `)'"));
488 /* Now check that the number of source variables match the
489 number of target variables. Do this here because if we
490 do it earlier then the user can get very misleading error
491 messages; i.e., `AGGREGATE x=SUM(y t).' will get this
492 error message when a proper message would be more like
493 `unknown variable t'. */
496 msg (SE, _("Number of source variables (%d) does not match "
497 "number of target variables (%d)."),
503 /* Finally add these to the linked list of aggregation
505 for (i = 0; i < n_dest; i++)
507 struct agr_var *v = xmalloc (sizeof *v);
509 /* Add variable to chain. */
510 if (agr->vars != NULL)
517 /* Create the target variable in the aggregate
520 struct variable *destvar;
522 v->function = func_index;
530 if (src[i]->type == ALPHA)
532 v->function |= FSTRING;
533 v->string = xmalloc (src[i]->width);
536 if (v->src->type == NUMERIC || function->alpha_type == NUMERIC)
539 output_width = v->src->width;
541 if (function->alpha_type == ALPHA)
542 destvar = dict_clone_var (agr->dict, v->src, dest[i]);
545 destvar = dict_create_var (agr->dict, dest[i], output_width);
546 if (output_width == 0)
547 destvar->print = destvar->write = function->format;
548 if (output_width == 0 && dict_get_weight (default_dict) != NULL
549 && (func_index == N || func_index == N_NO_VARS
550 || func_index == NU || func_index == NU_NO_VARS))
552 struct fmt_spec f = {FMT_F, 8, 2};
554 destvar->print = destvar->write = f;
559 destvar = dict_create_var (agr->dict, dest[i], 0);
564 msg (SE, _("Variable name %s is not unique within the "
565 "aggregate file dictionary, which contains "
566 "the aggregate variables and the break "
577 destvar->label = dest_label[i];
578 dest_label[i] = NULL;
580 else if (function->alpha_type == ALPHA)
581 destvar->print = destvar->write = function->format;
586 v->include_missing = include_missing;
592 if (v->src->type == NUMERIC)
593 for (j = 0; j < function->n_args; j++)
594 v->arg[j].f = arg[j].f;
596 for (j = 0; j < function->n_args; j++)
597 v->arg[j].c = xstrdup (arg[j].c);
601 if (src != NULL && src[0]->type == ALPHA)
602 for (i = 0; i < function->n_args; i++)
612 if (!lex_match ('/'))
617 lex_error ("expecting end of command");
623 for (i = 0; i < n_dest; i++)
626 free (dest_label[i]);
632 if (src && n_src && src[0]->type == ALPHA)
633 for (i = 0; i < function->n_args; i++)
646 agr_destroy (struct agr_proc *agr)
648 struct agr_var *iter, *next;
650 if (agr->dict != NULL)
651 dict_destroy (agr->dict);
652 if (agr->sort != NULL)
653 destroy_sort_cases_pgm (agr->sort);
654 for (iter = agr->vars; iter; iter = next)
658 if (iter->function & FSTRING)
663 n_args = agr_func_tab[iter->function & FUNC].n_args;
664 for (i = 0; i < n_args; i++)
665 free (iter->arg[i].c);
670 free (agr->prev_break);
671 free (agr->agr_case);
676 static void accumulate_aggregate_info (struct agr_proc *,
677 const struct ccase *);
678 static void dump_aggregate_info (struct agr_proc *, struct ccase *);
680 /* Processes a single case INPUT for aggregation. If output is
681 warranted, writes it to OUTPUT and returns nonzero.
682 Otherwise, returns zero and OUTPUT is unmodified. */
684 aggregate_single_case (struct agr_proc *agr,
685 const struct ccase *input, struct ccase *output)
687 /* The first case always begins a new break group. We also need to
688 preserve the values of the case for later comparison. */
689 if (agr->case_cnt++ == 0)
696 for (i = 0; i < agr->sort->var_cnt; i++)
697 n_elem += agr->sort->vars[i]->nv;
700 agr->prev_break = xmalloc (sizeof *agr->prev_break * n_elem);
702 /* Copy INPUT into prev_break. */
704 union value *iter = agr->prev_break;
707 for (i = 0; i < agr->sort->var_cnt; i++)
709 struct variable *v = agr->sort->vars[i];
711 if (v->type == NUMERIC)
712 (iter++)->f = input->data[v->fv].f;
715 memcpy (iter->s, input->data[v->fv].s, v->width);
721 accumulate_aggregate_info (agr, input);
726 /* Compare the value of each break variable to the values on the
729 union value *iter = agr->prev_break;
732 for (i = 0; i < agr->sort->var_cnt; i++)
734 struct variable *v = agr->sort->vars[i];
739 if (input->data[v->fv].f != iter->f)
744 if (memcmp (input->data[v->fv].s, iter->s, v->width))
754 accumulate_aggregate_info (agr, input);
759 /* The values of the break variable are different from the values on
760 the previous case. That means that it's time to dump aggregate
762 dump_aggregate_info (agr, output);
763 initialize_aggregate_info (agr);
764 accumulate_aggregate_info (agr, input);
766 /* Copy INPUT into prev_break. */
768 union value *iter = agr->prev_break;
771 for (i = 0; i < agr->sort->var_cnt; i++)
773 struct variable *v = agr->sort->vars[i];
775 if (v->type == NUMERIC)
776 (iter++)->f = input->data[v->fv].f;
779 memcpy (iter->s, input->data[v->fv].s, v->width);
788 /* Accumulates aggregation data from the case INPUT. */
790 accumulate_aggregate_info (struct agr_proc *agr,
791 const struct ccase *input)
793 struct agr_var *iter;
796 weight = dict_get_case_weight (default_dict, input);
798 for (iter = agr->vars; iter; iter = iter->next)
801 const union value *v = &input->data[iter->src->fv];
803 if ((!iter->include_missing && is_missing (v, iter->src))
804 || (iter->include_missing && iter->src->type == NUMERIC
807 switch (iter->function)
810 iter->dbl[0] += weight;
820 /* This is horrible. There are too many possibilities. */
821 switch (iter->function)
824 iter->dbl[0] += v->f;
827 iter->dbl[0] += v->f * weight;
828 iter->dbl[1] += weight;
832 double product = v->f * weight;
833 iter->dbl[0] += product;
834 iter->dbl[1] += product * v->f;
835 iter->dbl[2] += weight;
839 iter->dbl[0] = max (iter->dbl[0], v->f);
843 if (memcmp (iter->string, v->s, iter->src->width) < 0)
844 memcpy (iter->string, v->s, iter->src->width);
848 iter->dbl[0] = min (iter->dbl[0], v->f);
852 if (memcmp (iter->string, v->s, iter->src->width) > 0)
853 memcpy (iter->string, v->s, iter->src->width);
858 if (v->f > iter->arg[0].f)
859 iter->dbl[0] += weight;
860 iter->dbl[1] += weight;
864 if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
865 iter->dbl[0] += weight;
866 iter->dbl[1] += weight;
870 if (v->f < iter->arg[0].f)
871 iter->dbl[0] += weight;
872 iter->dbl[1] += weight;
876 if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
877 iter->dbl[0] += weight;
878 iter->dbl[1] += weight;
882 if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
883 iter->dbl[0] += weight;
884 iter->dbl[1] += weight;
888 if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
889 && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
890 iter->dbl[0] += weight;
891 iter->dbl[1] += weight;
895 if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
896 iter->dbl[0] += weight;
897 iter->dbl[1] += weight;
901 if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
902 && memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
903 iter->dbl[0] += weight;
904 iter->dbl[1] += weight;
907 iter->dbl[0] += weight;
919 case FIRST | FSTRING:
922 memcpy (iter->string, v->s, iter->src->width);
931 memcpy (iter->string, v->s, iter->src->width);
938 switch (iter->function)
941 iter->dbl[0] += weight;
952 /* We've come to a record that differs from the previous in one or
953 more of the break variables. Make an output record from the
954 accumulated statistics in the OUTPUT case. */
956 dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
964 for (i = 0; i < agr->sort->var_cnt; i++)
965 n_elem += agr->sort->vars[i]->nv;
967 memcpy (output->data, agr->prev_break, sizeof (union value) * n_elem);
973 for (i = agr->vars; i; i = i->next)
975 union value *v = &output->data[i->dest->fv];
977 if (agr->missing == COLUMNWISE && i->missing != 0
978 && (i->function & FUNC) != N && (i->function & FUNC) != NU
979 && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
981 if (i->function & FSTRING)
982 memset (v->s, ' ', i->dest->width);
994 v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
997 v->f = ((i->dbl[2] > 1.0)
998 ? calc_stddev (calc_variance (i->dbl, i->dbl[2]))
1003 v->f = i->int1 ? i->dbl[0] : SYSMIS;
1008 memcpy (v->s, i->string, i->dest->width);
1010 memset (v->s, ' ', i->dest->width);
1015 case FOUT | FSTRING:
1016 v->f = i->int2 ? (double) i->int1 / (double) i->int2 : SYSMIS;
1022 v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
1031 case POUT | FSTRING:
1032 v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
1042 v->f = i->int1 ? i->dbl[0] : SYSMIS;
1044 case FIRST | FSTRING:
1045 case LAST | FSTRING:
1047 memcpy (v->s, i->string, i->dest->width);
1049 memset (v->s, ' ', i->dest->width);
1070 /* Resets the state for all the aggregate functions. */
1072 initialize_aggregate_info (struct agr_proc *agr)
1074 struct agr_var *iter;
1076 for (iter = agr->vars; iter; iter = iter->next)
1079 switch (iter->function)
1082 iter->dbl[0] = DBL_MAX;
1085 memset (iter->string, 255, iter->src->width);
1088 iter->dbl[0] = -DBL_MAX;
1091 memset (iter->string, 0, iter->src->width);
1094 iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1095 iter->int1 = iter->int2 = 0;
1101 /* Aggregate each case as it comes through. Cases which aren't needed
1104 agr_to_active_file (struct ccase *c, void *agr_)
1106 struct agr_proc *agr = agr_;
1108 if (aggregate_single_case (agr, c, agr->agr_case))
1109 agr->sink->class->write (agr->sink, agr->agr_case);
1114 /* Writes AGR->agr_case to AGR->out_file. */
1116 write_case_to_sfm (struct agr_proc *agr)
1121 p = agr->sfm_agr_case;
1122 for (i = 0; i < dict_get_var_cnt (agr->dict); i++)
1124 struct variable *v = dict_get_var (agr->dict, i);
1126 if (v->type == NUMERIC)
1128 double src = agr->agr_case->data[v->fv].f;
1136 memcpy (p, agr->agr_case->data[v->fv].s, v->width);
1137 memset (&((char *) p)[v->width], ' ',
1138 REM_RND_UP (v->width, sizeof (flt64)));
1139 p += DIV_RND_UP (v->width, sizeof (flt64));
1143 sfm_write_case (agr->out_file, agr->sfm_agr_case, p - agr->sfm_agr_case);
1146 /* Aggregate the current case and output it if we passed a
1149 presorted_agr_to_sysfile (struct ccase *c, void *agr_)
1151 sort_agr_to_sysfile (c, agr_);
1155 /* Aggregate the current case and output it if we passed a
1158 sort_agr_to_sysfile (const struct ccase *c, void *agr_)
1160 struct agr_proc *agr = agr_;
1162 if (aggregate_single_case (agr, c, agr->agr_case))
1163 write_case_to_sfm (agr);