1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
27 #include "dictionary.h"
29 #include "file-handle.h"
35 #include "sfm-write.h"
42 /* Specifies how to make an aggregate variable. */
45 struct agr_var *next; /* Next in list. */
47 /* Collected during parsing. */
48 struct variable *src; /* Source variable. */
49 struct variable *dest; /* Target variable. */
50 int function; /* Function. */
51 int include_missing; /* 1=Include user-missing values. */
52 union value arg[2]; /* Arguments. */
54 /* Accumulated during AGGREGATE execution. */
59 struct moments1 *moments;
62 /* Aggregation functions. */
65 NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
66 FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
67 N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
68 FUNC = 0x1f, /* Function mask. */
69 FSTRING = 1<<5, /* String function bit. */
72 /* Attributes of an aggregation function. */
75 const char *name; /* Aggregation function name. */
76 int n_args; /* Number of arguments. */
77 int alpha_type; /* When given ALPHA arguments, output type. */
78 struct fmt_spec format; /* Format spec if alpha_type != ALPHA. */
81 /* Attributes of aggregation functions. */
82 static const struct agr_func agr_func_tab[] =
84 {"<NONE>", 0, -1, {0, 0, 0}},
85 {"SUM", 0, -1, {FMT_F, 8, 2}},
86 {"MEAN", 0, -1, {FMT_F, 8, 2}},
87 {"SD", 0, -1, {FMT_F, 8, 2}},
88 {"MAX", 0, ALPHA, {-1, -1, -1}},
89 {"MIN", 0, ALPHA, {-1, -1, -1}},
90 {"PGT", 1, NUMERIC, {FMT_F, 5, 1}},
91 {"PLT", 1, NUMERIC, {FMT_F, 5, 1}},
92 {"PIN", 2, NUMERIC, {FMT_F, 5, 1}},
93 {"POUT", 2, NUMERIC, {FMT_F, 5, 1}},
94 {"FGT", 1, NUMERIC, {FMT_F, 5, 3}},
95 {"FLT", 1, NUMERIC, {FMT_F, 5, 3}},
96 {"FIN", 2, NUMERIC, {FMT_F, 5, 3}},
97 {"FOUT", 2, NUMERIC, {FMT_F, 5, 3}},
98 {"N", 0, NUMERIC, {FMT_F, 7, 0}},
99 {"NU", 0, NUMERIC, {FMT_F, 7, 0}},
100 {"NMISS", 0, NUMERIC, {FMT_F, 7, 0}},
101 {"NUMISS", 0, NUMERIC, {FMT_F, 7, 0}},
102 {"FIRST", 0, ALPHA, {-1, -1, -1}},
103 {"LAST", 0, ALPHA, {-1, -1, -1}},
104 {NULL, 0, -1, {-1, -1, -1}},
105 {"N", 0, NUMERIC, {FMT_F, 7, 0}},
106 {"NU", 0, NUMERIC, {FMT_F, 7, 0}},
109 /* Missing value types. */
110 enum missing_treatment
112 ITEMWISE, /* Missing values item by item. */
113 COLUMNWISE /* Missing values column by column. */
116 /* An entire AGGREGATE procedure. */
119 /* We have either an output file or a sink. */
120 struct sfm_writer *writer; /* Output file, or null if none. */
121 struct case_sink *sink; /* Sink, or null if none. */
123 /* Break variables. */
124 struct sort_criteria *sort; /* Sort criteria. */
125 struct variable **break_vars; /* Break variables. */
126 size_t break_var_cnt; /* Number of break variables. */
127 union value *prev_break; /* Last values of break variables. */
129 enum missing_treatment missing; /* How to treat missing values. */
130 struct agr_var *agr_vars; /* First aggregate variable. */
131 struct dictionary *dict; /* Aggregate dictionary. */
132 int case_cnt; /* Counts aggregated cases. */
133 struct ccase agr_case; /* Aggregate case for output. */
136 static void initialize_aggregate_info (struct agr_proc *);
139 static int parse_aggregate_functions (struct agr_proc *);
140 static void agr_destroy (struct agr_proc *);
141 static int aggregate_single_case (struct agr_proc *agr,
142 const struct ccase *input,
143 struct ccase *output);
144 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
146 /* Aggregating to the active file. */
147 static int agr_to_active_file (struct ccase *, void *aux);
149 /* Aggregating to a system file. */
150 static int presorted_agr_to_sysfile (struct ccase *, void *aux);
154 /* Parses and executes the AGGREGATE procedure. */
159 struct file_handle *out_file = NULL;
161 bool copy_documents = false;
162 bool presorted = false;
165 memset(&agr, 0 , sizeof (agr));
166 agr.missing = ITEMWISE;
168 agr.dict = dict_create ();
169 dict_set_label (agr.dict, dict_get_label (default_dict));
170 dict_set_documents (agr.dict, dict_get_documents (default_dict));
172 /* OUTFILE subcommand must be first. */
173 if (!lex_force_match_id ("OUTFILE"))
176 if (!lex_match ('*'))
178 out_file = fh_parse ();
179 if (out_file == NULL)
183 /* Read most of the subcommands. */
188 if (lex_match_id ("MISSING"))
191 if (!lex_match_id ("COLUMNWISE"))
193 lex_error (_("while expecting COLUMNWISE"));
196 agr.missing = COLUMNWISE;
198 else if (lex_match_id ("DOCUMENT"))
199 copy_documents = true;
200 else if (lex_match_id ("PRESORTED"))
202 else if (lex_match_id ("BREAK"))
207 agr.sort = sort_parse_criteria (default_dict,
208 &agr.break_vars, &agr.break_var_cnt,
210 if (agr.sort == NULL)
213 for (i = 0; i < agr.break_var_cnt; i++)
215 struct variable *v = dict_clone_var (agr.dict, agr.break_vars[i],
216 agr.break_vars[i]->name,
217 agr.break_vars[i]->longname
222 /* BREAK must follow the options. */
227 lex_error (_("expecting BREAK"));
231 if (presorted && saw_direction)
232 msg (SW, _("When PRESORTED is specified, specifying sorting directions "
233 "with (A) or (D) has no effect. Output data will be sorted "
234 "the same way as the input data."));
236 /* Read in the aggregate functions. */
238 if (!parse_aggregate_functions (&agr))
241 /* Delete documents. */
243 dict_set_documents (agr.dict, NULL);
245 /* Cancel SPLIT FILE. */
246 dict_set_split_vars (agr.dict, NULL, 0);
250 case_create (&agr.agr_case, dict_get_next_value_idx (agr.dict));
251 initialize_aggregate_info (&agr);
253 /* Output to active file or external file? */
254 if (out_file == NULL)
256 /* The active file will be replaced by the aggregated data,
257 so TEMPORARY is moot. */
260 if (agr.sort != NULL && !presorted)
261 sort_active_file_in_place (agr.sort);
263 agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
264 if (agr.sink->class->open != NULL)
265 agr.sink->class->open (agr.sink);
266 vfm_sink = create_case_sink (&null_sink_class, default_dict, NULL);
267 procedure (agr_to_active_file, &agr);
268 if (agr.case_cnt > 0)
270 dump_aggregate_info (&agr, &agr.agr_case);
271 agr.sink->class->write (agr.sink, &agr.agr_case);
273 dict_destroy (default_dict);
274 default_dict = agr.dict;
276 vfm_source = agr.sink->class->make_source (agr.sink);
277 free_case_sink (agr.sink);
281 agr.writer = sfm_open_writer (out_file, agr.dict, get_scompression (), 0);
282 if (agr.writer == NULL)
285 if (agr.sort != NULL && !presorted)
287 /* Sorting is needed. */
288 struct casefile *dst;
289 struct casereader *reader;
292 dst = sort_active_file_to_casefile (agr.sort);
295 reader = casefile_get_destructive_reader (dst);
296 while (casereader_read_xfer (reader, &c))
298 if (aggregate_single_case (&agr, &c, &agr.agr_case))
299 sfm_write_case (agr.writer, &agr.agr_case);
302 casereader_destroy (reader);
303 casefile_destroy (dst);
307 /* Active file is already sorted. */
308 procedure (presorted_agr_to_sysfile, &agr);
311 if (agr.case_cnt > 0)
313 dump_aggregate_info (&agr, &agr.agr_case);
314 sfm_write_case (agr.writer, &agr.agr_case);
326 /* Parse all the aggregate functions. */
328 parse_aggregate_functions (struct agr_proc *agr)
330 struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
332 /* Parse everything. */
341 const struct agr_func *function;
346 struct variable **src;
360 /* Parse the list of target variables. */
361 while (!lex_match ('='))
363 int n_dest_prev = n_dest;
365 if (!parse_DATA_LIST_vars (&dest, &n_dest,
366 PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
369 /* Assign empty labels. */
373 dest_label = xrealloc (dest_label, sizeof *dest_label * n_dest);
374 for (j = n_dest_prev; j < n_dest; j++)
375 dest_label[j] = NULL;
378 if (token == T_STRING)
380 ds_truncate (&tokstr, 255);
381 dest_label[n_dest - 1] = xstrdup (ds_c_str (&tokstr));
386 /* Get the name of the aggregation function. */
389 lex_error (_("expecting aggregation function"));
394 if (tokid[strlen (tokid) - 1] == '.')
397 tokid[strlen (tokid) - 1] = 0;
400 for (function = agr_func_tab; function->name; function++)
401 if (!strcasecmp (function->name, tokid))
403 if (NULL == function->name)
405 msg (SE, _("Unknown aggregation function %s."), tokid);
408 func_index = function - agr_func_tab;
411 /* Check for leading lparen. */
412 if (!lex_match ('('))
415 func_index = N_NO_VARS;
416 else if (func_index == NU)
417 func_index = NU_NO_VARS;
420 lex_error (_("expecting `('"));
426 /* Parse list of source variables. */
428 int pv_opts = PV_NO_SCRATCH;
430 if (func_index == SUM || func_index == MEAN || func_index == SD)
431 pv_opts |= PV_NUMERIC;
432 else if (function->n_args)
433 pv_opts |= PV_SAME_TYPE;
435 if (!parse_variables (default_dict, &src, &n_src, pv_opts))
439 /* Parse function arguments, for those functions that
440 require arguments. */
441 if (function->n_args != 0)
442 for (i = 0; i < function->n_args; i++)
447 if (token == T_STRING)
449 arg[i].c = xstrdup (ds_c_str (&tokstr));
452 else if (lex_is_number ())
457 msg (SE, _("Missing argument %d to %s."), i + 1, function->name);
463 if (type != src[0]->type)
465 msg (SE, _("Arguments to %s must be of same type as "
466 "source variables."),
472 /* Trailing rparen. */
475 lex_error (_("expecting `)'"));
479 /* Now check that the number of source variables match
480 the number of target variables. If we check earlier
481 than this, the user can get very misleading error
482 message, i.e. `AGGREGATE x=SUM(y t).' will get this
483 error message when a proper message would be more
484 like `unknown variable t'. */
487 msg (SE, _("Number of source variables (%d) does not match "
488 "number of target variables (%d)."),
493 if ((func_index == PIN || func_index == POUT
494 || func_index == FIN || func_index == FOUT)
495 && ((src[0]->type == NUMERIC && arg[0].f > arg[1].f)
496 || (src[0]->type == ALPHA
497 && st_compare_pad (arg[0].c, strlen (arg[0].c),
498 arg[1].c, strlen (arg[1].c)) > 0)))
500 union value t = arg[0];
504 msg (SW, _("The value arguments passed to the %s function "
505 "are out-of-order. They will be treated as if "
506 "they had been specified in the correct order."),
511 /* Finally add these to the linked list of aggregation
513 for (i = 0; i < n_dest; i++)
515 struct agr_var *v = xmalloc (sizeof *v);
517 /* Add variable to chain. */
518 if (agr->agr_vars != NULL)
526 /* Create the target variable in the aggregate
529 static const struct fmt_spec f8_2 = {FMT_F, 8, 2};
530 struct variable *destvar;
532 v->function = func_index;
538 if (src[i]->type == ALPHA)
540 v->function |= FSTRING;
541 v->string = xmalloc (src[i]->width);
544 if (function->alpha_type == ALPHA)
545 destvar = dict_clone_var (agr->dict, v->src, 0, dest[i] );
546 else if (v->src->type == NUMERIC
547 || function->alpha_type == NUMERIC)
549 destvar = dict_create_var (agr->dict, dest[i], 0);
552 if ((func_index == N || func_index == NMISS)
553 && dict_get_weight (default_dict) != NULL)
554 destvar->print = destvar->write = f8_2;
556 destvar->print = destvar->write = function->format;
561 destvar = dict_create_var (agr->dict, dest[i], 0);
562 if (func_index == N_NO_VARS
563 && dict_get_weight (default_dict) != NULL)
564 destvar->print = destvar->write = f8_2;
566 destvar->print = destvar->write = function->format;
571 msg (SE, _("Variable name %s is not unique within the "
572 "aggregate file dictionary, which contains "
573 "the aggregate variables and the break "
583 destvar->label = dest_label[i];
584 dest_label[i] = NULL;
590 v->include_missing = include_missing;
596 if (v->src->type == NUMERIC)
597 for (j = 0; j < function->n_args; j++)
598 v->arg[j].f = arg[j].f;
600 for (j = 0; j < function->n_args; j++)
601 v->arg[j].c = xstrdup (arg[j].c);
605 if (src != NULL && src[0]->type == ALPHA)
606 for (i = 0; i < function->n_args; i++)
616 if (!lex_match ('/'))
621 lex_error ("expecting end of command");
627 for (i = 0; i < n_dest; i++)
630 free (dest_label[i]);
636 if (src && n_src && src[0]->type == ALPHA)
637 for (i = 0; i < function->n_args; i++)
650 agr_destroy (struct agr_proc *agr)
652 struct agr_var *iter, *next;
654 sfm_close_writer (agr->writer);
655 if (agr->sort != NULL)
656 sort_destroy_criteria (agr->sort);
657 free (agr->break_vars);
658 free (agr->prev_break);
659 for (iter = agr->agr_vars; iter; iter = next)
663 if (iter->function & FSTRING)
668 n_args = agr_func_tab[iter->function & FUNC].n_args;
669 for (i = 0; i < n_args; i++)
670 free (iter->arg[i].c);
673 else if (iter->function == SD)
674 moments1_destroy (iter->moments);
677 if (agr->dict != NULL)
678 dict_destroy (agr->dict);
680 case_destroy (&agr->agr_case);
685 static void accumulate_aggregate_info (struct agr_proc *,
686 const struct ccase *);
687 static void dump_aggregate_info (struct agr_proc *, struct ccase *);
689 /* Processes a single case INPUT for aggregation. If output is
690 warranted, writes it to OUTPUT and returns nonzero.
691 Otherwise, returns zero and OUTPUT is unmodified. */
693 aggregate_single_case (struct agr_proc *agr,
694 const struct ccase *input, struct ccase *output)
696 /* The first case always begins a new break group. We also need to
697 preserve the values of the case for later comparison. */
698 if (agr->case_cnt++ == 0)
705 for (i = 0; i < agr->break_var_cnt; i++)
706 n_elem += agr->break_vars[i]->nv;
709 agr->prev_break = xmalloc (sizeof *agr->prev_break * n_elem);
711 /* Copy INPUT into prev_break. */
713 union value *iter = agr->prev_break;
716 for (i = 0; i < agr->break_var_cnt; i++)
718 struct variable *v = agr->break_vars[i];
720 if (v->type == NUMERIC)
721 (iter++)->f = case_num (input, v->fv);
724 memcpy (iter->s, case_str (input, v->fv), v->width);
730 accumulate_aggregate_info (agr, input);
735 /* Compare the value of each break variable to the values on the
738 union value *iter = agr->prev_break;
741 for (i = 0; i < agr->break_var_cnt; i++)
743 struct variable *v = agr->break_vars[i];
748 if (case_num (input, v->fv) != iter->f)
753 if (memcmp (case_str (input, v->fv), iter->s, v->width))
763 accumulate_aggregate_info (agr, input);
768 /* The values of the break variable are different from the values on
769 the previous case. That means that it's time to dump aggregate
771 dump_aggregate_info (agr, output);
772 initialize_aggregate_info (agr);
773 accumulate_aggregate_info (agr, input);
775 /* Copy INPUT into prev_break. */
777 union value *iter = agr->prev_break;
780 for (i = 0; i < agr->break_var_cnt; i++)
782 struct variable *v = agr->break_vars[i];
784 if (v->type == NUMERIC)
785 (iter++)->f = case_num (input, v->fv);
788 memcpy (iter->s, case_str (input, v->fv), v->width);
797 /* Accumulates aggregation data from the case INPUT. */
799 accumulate_aggregate_info (struct agr_proc *agr,
800 const struct ccase *input)
802 struct agr_var *iter;
806 weight = dict_get_case_weight (default_dict, input, &bad_warn);
808 for (iter = agr->agr_vars; iter; iter = iter->next)
811 const union value *v = case_data (input, iter->src->fv);
813 if ((!iter->include_missing && is_missing (v, iter->src))
814 || (iter->include_missing && iter->src->type == NUMERIC
817 switch (iter->function)
820 case NMISS | FSTRING:
821 iter->dbl[0] += weight;
824 case NUMISS | FSTRING:
832 /* This is horrible. There are too many possibilities. */
833 switch (iter->function)
836 iter->dbl[0] += v->f * weight;
840 iter->dbl[0] += v->f * weight;
841 iter->dbl[1] += weight;
844 moments1_add (iter->moments, v->f, weight);
847 iter->dbl[0] = max (iter->dbl[0], v->f);
851 if (memcmp (iter->string, v->s, iter->src->width) < 0)
852 memcpy (iter->string, v->s, iter->src->width);
856 iter->dbl[0] = min (iter->dbl[0], v->f);
860 if (memcmp (iter->string, v->s, iter->src->width) > 0)
861 memcpy (iter->string, v->s, iter->src->width);
866 if (v->f > iter->arg[0].f)
867 iter->dbl[0] += weight;
868 iter->dbl[1] += weight;
872 if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
873 iter->dbl[0] += weight;
874 iter->dbl[1] += weight;
878 if (v->f < iter->arg[0].f)
879 iter->dbl[0] += weight;
880 iter->dbl[1] += weight;
884 if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
885 iter->dbl[0] += weight;
886 iter->dbl[1] += weight;
890 if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
891 iter->dbl[0] += weight;
892 iter->dbl[1] += weight;
896 if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
897 && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
898 iter->dbl[0] += weight;
899 iter->dbl[1] += weight;
903 if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
904 iter->dbl[0] += weight;
905 iter->dbl[1] += weight;
909 if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
910 || memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
911 iter->dbl[0] += weight;
912 iter->dbl[1] += weight;
916 iter->dbl[0] += weight;
929 case FIRST | FSTRING:
932 memcpy (iter->string, v->s, iter->src->width);
941 memcpy (iter->string, v->s, iter->src->width);
945 case NMISS | FSTRING:
947 case NUMISS | FSTRING:
948 /* Our value is not missing or it would have been
949 caught earlier. Nothing to do. */
955 switch (iter->function)
958 iter->dbl[0] += weight;
969 /* We've come to a record that differs from the previous in one or
970 more of the break variables. Make an output record from the
971 accumulated statistics in the OUTPUT case. */
973 dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
979 for (i = 0; i < agr->break_var_cnt; i++)
981 int nv = agr->break_vars[i]->nv;
982 memcpy (case_data_rw (output, value_idx),
983 &agr->prev_break[value_idx],
984 sizeof (union value) * nv);
992 for (i = agr->agr_vars; i; i = i->next)
994 union value *v = case_data_rw (output, i->dest->fv);
996 if (agr->missing == COLUMNWISE && i->missing != 0
997 && (i->function & FUNC) != N && (i->function & FUNC) != NU
998 && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
1000 if (i->dest->type == ALPHA)
1001 memset (v->s, ' ', i->dest->width);
1007 switch (i->function)
1010 v->f = i->int1 ? i->dbl[0] : SYSMIS;
1013 v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
1019 /* FIXME: we should use two passes. */
1020 moments1_calculate (i->moments, NULL, NULL, &variance,
1022 if (variance != SYSMIS)
1023 v->f = sqrt (variance);
1030 v->f = i->int1 ? i->dbl[0] : SYSMIS;
1035 memcpy (v->s, i->string, i->dest->width);
1037 memset (v->s, ' ', i->dest->width);
1046 case FOUT | FSTRING:
1047 v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
1056 case POUT | FSTRING:
1057 v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
1069 v->f = i->int1 ? i->dbl[0] : SYSMIS;
1071 case FIRST | FSTRING:
1072 case LAST | FSTRING:
1074 memcpy (v->s, i->string, i->dest->width);
1076 memset (v->s, ' ', i->dest->width);
1085 case NMISS | FSTRING:
1089 case NUMISS | FSTRING:
1099 /* Resets the state for all the aggregate functions. */
1101 initialize_aggregate_info (struct agr_proc *agr)
1103 struct agr_var *iter;
1105 for (iter = agr->agr_vars; iter; iter = iter->next)
1108 iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1109 iter->int1 = iter->int2 = 0;
1110 switch (iter->function)
1113 iter->dbl[0] = DBL_MAX;
1116 memset (iter->string, 255, iter->src->width);
1119 iter->dbl[0] = -DBL_MAX;
1122 memset (iter->string, 0, iter->src->width);
1125 if (iter->moments == NULL)
1126 iter->moments = moments1_create (MOMENT_VARIANCE);
1128 moments1_clear (iter->moments);
1136 /* Aggregate each case as it comes through. Cases which aren't needed
1139 agr_to_active_file (struct ccase *c, void *agr_)
1141 struct agr_proc *agr = agr_;
1143 if (aggregate_single_case (agr, c, &agr->agr_case))
1144 agr->sink->class->write (agr->sink, &agr->agr_case);
1149 /* Aggregate the current case and output it if we passed a
1152 presorted_agr_to_sysfile (struct ccase *c, void *agr_)
1154 struct agr_proc *agr = agr_;
1156 if (aggregate_single_case (agr, c, &agr->agr_case))
1157 sfm_write_case (agr->writer, &agr->agr_case);