1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
26 #include "file-handle.h"
39 /* Specifies how to make an aggregate variable. */
42 struct agr_var *next; /* Next in list. */
44 /* Collected during parsing. */
45 struct variable *src; /* Source variable. */
46 struct variable *dest; /* Target variable. */
47 int function; /* Function. */
48 int include_missing; /* 1=Include user-missing values. */
49 union value arg[2]; /* Arguments. */
51 /* Accumulated during AGGREGATE execution. */
56 struct moments *moments;
59 /* Aggregation functions. */
62 NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
63 FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
64 N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
65 FUNC = 0x1f, /* Function mask. */
66 FSTRING = 1<<5, /* String function bit. */
69 /* Attributes of an aggregation function. */
72 const char *name; /* Aggregation function name. */
73 int n_args; /* Number of arguments. */
74 int alpha_type; /* When given ALPHA arguments, output type. */
75 struct fmt_spec format; /* Format spec if alpha_type != ALPHA. */
78 /* Attributes of aggregation functions. */
79 static const struct agr_func agr_func_tab[] =
81 {"<NONE>", 0, -1, {0, 0, 0}},
82 {"SUM", 0, -1, {FMT_F, 8, 2}},
83 {"MEAN", 0, -1, {FMT_F, 8, 2}},
84 {"SD", 0, -1, {FMT_F, 8, 2}},
85 {"MAX", 0, ALPHA, {-1, -1, -1}},
86 {"MIN", 0, ALPHA, {-1, -1, -1}},
87 {"PGT", 1, NUMERIC, {FMT_F, 5, 1}},
88 {"PLT", 1, NUMERIC, {FMT_F, 5, 1}},
89 {"PIN", 2, NUMERIC, {FMT_F, 5, 1}},
90 {"POUT", 2, NUMERIC, {FMT_F, 5, 1}},
91 {"FGT", 1, NUMERIC, {FMT_F, 5, 3}},
92 {"FLT", 1, NUMERIC, {FMT_F, 5, 3}},
93 {"FIN", 2, NUMERIC, {FMT_F, 5, 3}},
94 {"FOUT", 2, NUMERIC, {FMT_F, 5, 3}},
95 {"N", 0, NUMERIC, {FMT_F, 7, 0}},
96 {"NU", 0, NUMERIC, {FMT_F, 7, 0}},
97 {"NMISS", 0, NUMERIC, {FMT_F, 7, 0}},
98 {"NUMISS", 0, NUMERIC, {FMT_F, 7, 0}},
99 {"FIRST", 0, ALPHA, {-1, -1, -1}},
100 {"LAST", 0, ALPHA, {-1, -1, -1}},
101 {NULL, 0, -1, {-1, -1, -1}},
102 {"N", 0, NUMERIC, {FMT_F, 7, 0}},
103 {"NU", 0, NUMERIC, {FMT_F, 7, 0}},
106 /* Missing value types. */
107 enum missing_treatment
109 ITEMWISE, /* Missing values item by item. */
110 COLUMNWISE /* Missing values column by column. */
113 /* An entire AGGREGATE procedure. */
116 /* We have either an output file or a sink. */
117 struct file_handle *out_file; /* Output file, or null if none. */
118 struct case_sink *sink; /* Sink, or null if none. */
120 enum missing_treatment missing; /* How to treat missing values. */
121 struct sort_cases_pgm *sort; /* Sort program. */
122 struct agr_var *vars; /* First aggregate variable. */
123 struct dictionary *dict; /* Aggregate dictionary. */
124 int case_cnt; /* Counts aggregated cases. */
125 union value *prev_break; /* Last values of break variables. */
126 struct ccase *agr_case; /* Aggregate case for output. */
127 flt64 *sfm_agr_case; /* Aggregate case in SFM format. */
130 static void initialize_aggregate_info (struct agr_proc *);
133 static int parse_aggregate_functions (struct agr_proc *);
134 static void agr_destroy (struct agr_proc *);
135 static int aggregate_single_case (struct agr_proc *agr,
136 const struct ccase *input,
137 struct ccase *output);
138 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
139 static int create_sysfile (struct agr_proc *);
141 /* Aggregating to the active file. */
142 static int agr_to_active_file (struct ccase *, void *aux);
144 /* Aggregating to a system file. */
145 static void write_case_to_sfm (struct agr_proc *agr);
146 static int presorted_agr_to_sysfile (struct ccase *, void *aux);
147 static int sort_agr_to_sysfile (const struct ccase *, void *aux);
151 /* Parses and executes the AGGREGATE procedure. */
157 /* Have we seen these subcommands? */
162 agr.missing = ITEMWISE;
167 agr.prev_break = NULL;
169 agr.dict = dict_create ();
170 dict_set_label (agr.dict, dict_get_label (default_dict));
171 dict_set_documents (agr.dict, dict_get_documents (default_dict));
173 /* Read most of the subcommands. */
178 if (lex_match_id ("OUTFILE"))
182 msg (SE, _("%s subcommand given multiple times."),"OUTFILE");
192 agr.out_file = fh_parse_file_handle ();
193 if (agr.out_file == NULL)
197 else if (lex_match_id ("MISSING"))
200 if (!lex_match_id ("COLUMNWISE"))
202 lex_error (_("while expecting COLUMNWISE"));
205 agr.missing = COLUMNWISE;
207 else if (lex_match_id ("DOCUMENT"))
209 else if (lex_match_id ("PRESORTED"))
211 else if (lex_match_id ("BREAK"))
215 msg (SE, _("%s subcommand given multiple times."),"BREAK");
221 agr.sort = parse_sort ();
222 if (agr.sort == NULL)
228 for (i = 0; i < agr.sort->var_cnt; i++)
232 v = dict_clone_var (agr.dict, agr.sort->vars[i],
233 agr.sort->vars[i]->name);
241 /* Check for proper syntax. */
243 msg (SW, _("BREAK subcommand not specified."));
245 /* Read in the aggregate functions. */
246 if (!parse_aggregate_functions (&agr))
249 /* Delete documents. */
251 dict_set_documents (agr.dict, NULL);
253 /* Cancel SPLIT FILE. */
254 dict_set_split_vars (agr.dict, NULL, 0);
258 agr.agr_case = xmalloc (dict_get_case_size (agr.dict));
259 initialize_aggregate_info (&agr);
261 /* Output to active file or external file? */
262 if (agr.out_file == NULL)
264 /* The active file will be replaced by the aggregated data,
265 so TEMPORARY is moot. */
268 if (agr.sort != NULL && (seen & 4) == 0)
269 sort_cases (agr.sort, 0);
271 agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
272 if (agr.sink->class->open != NULL)
273 agr.sink->class->open (agr.sink);
274 vfm_sink = create_case_sink (&null_sink_class, default_dict, NULL);
275 procedure (agr_to_active_file, &agr);
276 if (agr.case_cnt > 0)
278 dump_aggregate_info (&agr, agr.agr_case);
279 agr.sink->class->write (agr.sink, agr.agr_case);
281 dict_destroy (default_dict);
282 default_dict = agr.dict;
284 vfm_source = agr.sink->class->make_source (agr.sink);
285 free_case_sink (agr.sink);
289 if (!create_sysfile (&agr))
292 if (agr.sort != NULL && (seen & 4) == 0)
294 /* Sorting is needed. */
295 sort_cases (agr.sort, 1);
296 read_sort_output (agr.sort, sort_agr_to_sysfile, NULL);
300 /* Active file is already sorted. */
301 procedure (presorted_agr_to_sysfile, &agr);
304 if (agr.case_cnt > 0)
306 dump_aggregate_info (&agr, agr.agr_case);
307 write_case_to_sfm (&agr);
309 fh_close_handle (agr.out_file);
320 /* Create a system file for use in aggregation to an external
323 create_sysfile (struct agr_proc *agr)
325 struct sfm_write_info w;
328 w.compress = get_scompression();
329 if (!sfm_write_dictionary (&w))
332 agr->sfm_agr_case = xmalloc (sizeof *agr->sfm_agr_case * w.case_size);
337 /* Parse all the aggregate functions. */
339 parse_aggregate_functions (struct agr_proc *agr)
341 struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
343 /* Parse everything. */
352 const struct agr_func *function;
357 struct variable **src;
371 /* Parse the list of target variables. */
372 while (!lex_match ('='))
374 int n_dest_prev = n_dest;
376 if (!parse_DATA_LIST_vars (&dest, &n_dest, PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
379 /* Assign empty labels. */
383 dest_label = xrealloc (dest_label, sizeof *dest_label * n_dest);
384 for (j = n_dest_prev; j < n_dest; j++)
385 dest_label[j] = NULL;
388 if (token == T_STRING)
390 ds_truncate (&tokstr, 120);
391 dest_label[n_dest - 1] = xstrdup (ds_value (&tokstr));
396 /* Get the name of the aggregation function. */
399 lex_error (_("expecting aggregation function"));
404 if (tokid[strlen (tokid) - 1] == '.')
407 tokid[strlen (tokid) - 1] = 0;
410 for (function = agr_func_tab; function->name; function++)
411 if (!strcmp (function->name, tokid))
413 if (NULL == function->name)
415 msg (SE, _("Unknown aggregation function %s."), tokid);
418 func_index = function - agr_func_tab;
421 /* Check for leading lparen. */
422 if (!lex_match ('('))
425 func_index = N_NO_VARS;
426 else if (func_index == NU)
427 func_index = NU_NO_VARS;
430 lex_error (_("expecting `('"));
434 /* Parse list of source variables. */
436 int pv_opts = PV_NO_SCRATCH;
438 if (func_index == SUM || func_index == MEAN || func_index == SD)
439 pv_opts |= PV_NUMERIC;
440 else if (function->n_args)
441 pv_opts |= PV_SAME_TYPE;
443 if (!parse_variables (default_dict, &src, &n_src, pv_opts))
447 /* Parse function arguments, for those functions that
448 require arguments. */
449 if (function->n_args != 0)
450 for (i = 0; i < function->n_args; i++)
455 if (token == T_STRING)
457 arg[i].c = xstrdup (ds_value (&tokstr));
460 else if (token == T_NUM)
465 msg (SE, _("Missing argument %d to %s."), i + 1, function->name);
471 if (type != src[0]->type)
473 msg (SE, _("Arguments to %s must be of same type as "
474 "source variables."),
480 /* Trailing rparen. */
483 lex_error (_("expecting `)'"));
487 /* Now check that the number of source variables match the
488 number of target variables. Do this here because if we
489 do it earlier then the user can get very misleading error
490 messages; i.e., `AGGREGATE x=SUM(y t).' will get this
491 error message when a proper message would be more like
492 `unknown variable t'. */
495 msg (SE, _("Number of source variables (%d) does not match "
496 "number of target variables (%d)."),
502 /* Finally add these to the linked list of aggregation
504 for (i = 0; i < n_dest; i++)
506 struct agr_var *v = xmalloc (sizeof *v);
508 /* Add variable to chain. */
509 if (agr->vars != NULL)
517 /* Create the target variable in the aggregate
520 struct variable *destvar;
522 v->function = func_index;
530 if (src[i]->type == ALPHA)
532 v->function |= FSTRING;
533 v->string = xmalloc (src[i]->width);
536 if (v->src->type == NUMERIC || function->alpha_type == NUMERIC)
539 output_width = v->src->width;
541 if (function->alpha_type == ALPHA)
542 destvar = dict_clone_var (agr->dict, v->src, dest[i]);
545 destvar = dict_create_var (agr->dict, dest[i], output_width);
546 if (output_width == 0)
547 destvar->print = destvar->write = function->format;
548 if (output_width == 0 && dict_get_weight (default_dict) != NULL
549 && (func_index == N || func_index == N_NO_VARS
550 || func_index == NU || func_index == NU_NO_VARS))
552 struct fmt_spec f = {FMT_F, 8, 2};
554 destvar->print = destvar->write = f;
559 destvar = dict_create_var (agr->dict, dest[i], 0);
564 msg (SE, _("Variable name %s is not unique within the "
565 "aggregate file dictionary, which contains "
566 "the aggregate variables and the break "
577 destvar->label = dest_label[i];
578 dest_label[i] = NULL;
580 else if (function->alpha_type == ALPHA)
581 destvar->print = destvar->write = function->format;
586 v->include_missing = include_missing;
592 if (v->src->type == NUMERIC)
593 for (j = 0; j < function->n_args; j++)
594 v->arg[j].f = arg[j].f;
596 for (j = 0; j < function->n_args; j++)
597 v->arg[j].c = xstrdup (arg[j].c);
601 if (src != NULL && src[0]->type == ALPHA)
602 for (i = 0; i < function->n_args; i++)
612 if (!lex_match ('/'))
617 lex_error ("expecting end of command");
623 for (i = 0; i < n_dest; i++)
626 free (dest_label[i]);
632 if (src && n_src && src[0]->type == ALPHA)
633 for (i = 0; i < function->n_args; i++)
646 agr_destroy (struct agr_proc *agr)
648 struct agr_var *iter, *next;
650 if (agr->dict != NULL)
651 dict_destroy (agr->dict);
652 if (agr->sort != NULL)
653 destroy_sort_cases_pgm (agr->sort);
654 for (iter = agr->vars; iter; iter = next)
658 if (iter->function & FSTRING)
663 n_args = agr_func_tab[iter->function & FUNC].n_args;
664 for (i = 0; i < n_args; i++)
665 free (iter->arg[i].c);
668 else if (iter->function == SD)
669 moments_destroy (iter->moments);
672 free (agr->prev_break);
673 free (agr->agr_case);
678 static void accumulate_aggregate_info (struct agr_proc *,
679 const struct ccase *);
680 static void dump_aggregate_info (struct agr_proc *, struct ccase *);
682 /* Processes a single case INPUT for aggregation. If output is
683 warranted, writes it to OUTPUT and returns nonzero.
684 Otherwise, returns zero and OUTPUT is unmodified. */
686 aggregate_single_case (struct agr_proc *agr,
687 const struct ccase *input, struct ccase *output)
689 /* The first case always begins a new break group. We also need to
690 preserve the values of the case for later comparison. */
691 if (agr->case_cnt++ == 0)
698 for (i = 0; i < agr->sort->var_cnt; i++)
699 n_elem += agr->sort->vars[i]->nv;
702 agr->prev_break = xmalloc (sizeof *agr->prev_break * n_elem);
704 /* Copy INPUT into prev_break. */
706 union value *iter = agr->prev_break;
709 for (i = 0; i < agr->sort->var_cnt; i++)
711 struct variable *v = agr->sort->vars[i];
713 if (v->type == NUMERIC)
714 (iter++)->f = input->data[v->fv].f;
717 memcpy (iter->s, input->data[v->fv].s, v->width);
723 accumulate_aggregate_info (agr, input);
728 /* Compare the value of each break variable to the values on the
731 union value *iter = agr->prev_break;
734 for (i = 0; i < agr->sort->var_cnt; i++)
736 struct variable *v = agr->sort->vars[i];
741 if (input->data[v->fv].f != iter->f)
746 if (memcmp (input->data[v->fv].s, iter->s, v->width))
756 accumulate_aggregate_info (agr, input);
761 /* The values of the break variable are different from the values on
762 the previous case. That means that it's time to dump aggregate
764 dump_aggregate_info (agr, output);
765 initialize_aggregate_info (agr);
766 accumulate_aggregate_info (agr, input);
768 /* Copy INPUT into prev_break. */
770 union value *iter = agr->prev_break;
773 for (i = 0; i < agr->sort->var_cnt; i++)
775 struct variable *v = agr->sort->vars[i];
777 if (v->type == NUMERIC)
778 (iter++)->f = input->data[v->fv].f;
781 memcpy (iter->s, input->data[v->fv].s, v->width);
790 /* Accumulates aggregation data from the case INPUT. */
792 accumulate_aggregate_info (struct agr_proc *agr,
793 const struct ccase *input)
795 struct agr_var *iter;
798 weight = dict_get_case_weight (default_dict, input);
800 for (iter = agr->vars; iter; iter = iter->next)
803 const union value *v = &input->data[iter->src->fv];
805 if ((!iter->include_missing && is_missing (v, iter->src))
806 || (iter->include_missing && iter->src->type == NUMERIC
809 switch (iter->function)
812 iter->dbl[0] += weight;
822 /* This is horrible. There are too many possibilities. */
823 switch (iter->function)
826 iter->dbl[0] += v->f;
829 iter->dbl[0] += v->f * weight;
830 iter->dbl[1] += weight;
833 moments_pass_two (iter->moments, v->f, weight);
836 iter->dbl[0] = max (iter->dbl[0], v->f);
840 if (memcmp (iter->string, v->s, iter->src->width) < 0)
841 memcpy (iter->string, v->s, iter->src->width);
845 iter->dbl[0] = min (iter->dbl[0], v->f);
849 if (memcmp (iter->string, v->s, iter->src->width) > 0)
850 memcpy (iter->string, v->s, iter->src->width);
855 if (v->f > iter->arg[0].f)
856 iter->dbl[0] += weight;
857 iter->dbl[1] += weight;
861 if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
862 iter->dbl[0] += weight;
863 iter->dbl[1] += weight;
867 if (v->f < iter->arg[0].f)
868 iter->dbl[0] += weight;
869 iter->dbl[1] += weight;
873 if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
874 iter->dbl[0] += weight;
875 iter->dbl[1] += weight;
879 if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
880 iter->dbl[0] += weight;
881 iter->dbl[1] += weight;
885 if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
886 && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
887 iter->dbl[0] += weight;
888 iter->dbl[1] += weight;
892 if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
893 iter->dbl[0] += weight;
894 iter->dbl[1] += weight;
898 if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
899 && memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
900 iter->dbl[0] += weight;
901 iter->dbl[1] += weight;
904 iter->dbl[0] += weight;
916 case FIRST | FSTRING:
919 memcpy (iter->string, v->s, iter->src->width);
928 memcpy (iter->string, v->s, iter->src->width);
935 switch (iter->function)
938 iter->dbl[0] += weight;
949 /* We've come to a record that differs from the previous in one or
950 more of the break variables. Make an output record from the
951 accumulated statistics in the OUTPUT case. */
953 dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
961 for (i = 0; i < agr->sort->var_cnt; i++)
962 n_elem += agr->sort->vars[i]->nv;
964 memcpy (output->data, agr->prev_break, sizeof (union value) * n_elem);
970 for (i = agr->vars; i; i = i->next)
972 union value *v = &output->data[i->dest->fv];
974 if (agr->missing == COLUMNWISE && i->missing != 0
975 && (i->function & FUNC) != N && (i->function & FUNC) != NU
976 && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
978 if (i->function & FSTRING)
979 memset (v->s, ' ', i->dest->width);
991 v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
997 /* FIXME: we should use two passes. */
998 moments_calculate (i->moments, NULL, NULL, &variance,
1000 if (variance != SYSMIS)
1001 v->f = sqrt (variance);
1008 v->f = i->int1 ? i->dbl[0] : SYSMIS;
1013 memcpy (v->s, i->string, i->dest->width);
1015 memset (v->s, ' ', i->dest->width);
1020 case FOUT | FSTRING:
1021 v->f = i->int2 ? (double) i->int1 / (double) i->int2 : SYSMIS;
1027 v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
1036 case POUT | FSTRING:
1037 v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
1047 v->f = i->int1 ? i->dbl[0] : SYSMIS;
1049 case FIRST | FSTRING:
1050 case LAST | FSTRING:
1052 memcpy (v->s, i->string, i->dest->width);
1054 memset (v->s, ' ', i->dest->width);
1075 /* Resets the state for all the aggregate functions. */
1077 initialize_aggregate_info (struct agr_proc *agr)
1079 struct agr_var *iter;
1081 for (iter = agr->vars; iter; iter = iter->next)
1084 switch (iter->function)
1087 iter->dbl[0] = DBL_MAX;
1090 memset (iter->string, 255, iter->src->width);
1093 iter->dbl[0] = -DBL_MAX;
1096 memset (iter->string, 0, iter->src->width);
1099 if (iter->moments == NULL)
1100 iter->moments = moments_create (MOMENT_VARIANCE);
1102 moments_clear (iter->moments);
1105 iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1106 iter->int1 = iter->int2 = 0;
1112 /* Aggregate each case as it comes through. Cases which aren't needed
1115 agr_to_active_file (struct ccase *c, void *agr_)
1117 struct agr_proc *agr = agr_;
1119 if (aggregate_single_case (agr, c, agr->agr_case))
1120 agr->sink->class->write (agr->sink, agr->agr_case);
1125 /* Writes AGR->agr_case to AGR->out_file. */
1127 write_case_to_sfm (struct agr_proc *agr)
1132 p = agr->sfm_agr_case;
1133 for (i = 0; i < dict_get_var_cnt (agr->dict); i++)
1135 struct variable *v = dict_get_var (agr->dict, i);
1137 if (v->type == NUMERIC)
1139 double src = agr->agr_case->data[v->fv].f;
1147 memcpy (p, agr->agr_case->data[v->fv].s, v->width);
1148 memset (&((char *) p)[v->width], ' ',
1149 REM_RND_UP (v->width, sizeof (flt64)));
1150 p += DIV_RND_UP (v->width, sizeof (flt64));
1154 sfm_write_case (agr->out_file, agr->sfm_agr_case, p - agr->sfm_agr_case);
1157 /* Aggregate the current case and output it if we passed a
1160 presorted_agr_to_sysfile (struct ccase *c, void *agr_)
1162 sort_agr_to_sysfile (c, agr_);
1166 /* Aggregate the current case and output it if we passed a
1169 sort_agr_to_sysfile (const struct ccase *c, void *agr_)
1171 struct agr_proc *agr = agr_;
1173 if (aggregate_single_case (agr, c, agr->agr_case))
1174 write_case_to_sfm (agr);