1 /* PSPP - computes sample statistics.
2 Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3 Written by Ben Pfaff <blp@gnu.org>.
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License as
7 published by the Free Software Foundation; either version 2 of the
8 License, or (at your option) any later version.
10 This program is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
28 #include "file-handle.h"
41 /* Specifies how to make an aggregate variable. */
44 struct agr_var *next; /* Next in list. */
46 /* Collected during parsing. */
47 struct variable *src; /* Source variable. */
48 struct variable *dest; /* Target variable. */
49 int function; /* Function. */
50 int include_missing; /* 1=Include user-missing values. */
51 union value arg[2]; /* Arguments. */
53 /* Accumulated during AGGREGATE execution. */
58 struct moments1 *moments;
61 /* Aggregation functions. */
64 NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
65 FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
66 N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
67 FUNC = 0x1f, /* Function mask. */
68 FSTRING = 1<<5, /* String function bit. */
71 /* Attributes of an aggregation function. */
74 const char *name; /* Aggregation function name. */
75 int n_args; /* Number of arguments. */
76 int alpha_type; /* When given ALPHA arguments, output type. */
77 struct fmt_spec format; /* Format spec if alpha_type != ALPHA. */
80 /* Attributes of aggregation functions. */
81 static const struct agr_func agr_func_tab[] =
83 {"<NONE>", 0, -1, {0, 0, 0}},
84 {"SUM", 0, -1, {FMT_F, 8, 2}},
85 {"MEAN", 0, -1, {FMT_F, 8, 2}},
86 {"SD", 0, -1, {FMT_F, 8, 2}},
87 {"MAX", 0, ALPHA, {-1, -1, -1}},
88 {"MIN", 0, ALPHA, {-1, -1, -1}},
89 {"PGT", 1, NUMERIC, {FMT_F, 5, 1}},
90 {"PLT", 1, NUMERIC, {FMT_F, 5, 1}},
91 {"PIN", 2, NUMERIC, {FMT_F, 5, 1}},
92 {"POUT", 2, NUMERIC, {FMT_F, 5, 1}},
93 {"FGT", 1, NUMERIC, {FMT_F, 5, 3}},
94 {"FLT", 1, NUMERIC, {FMT_F, 5, 3}},
95 {"FIN", 2, NUMERIC, {FMT_F, 5, 3}},
96 {"FOUT", 2, NUMERIC, {FMT_F, 5, 3}},
97 {"N", 0, NUMERIC, {FMT_F, 7, 0}},
98 {"NU", 0, NUMERIC, {FMT_F, 7, 0}},
99 {"NMISS", 0, NUMERIC, {FMT_F, 7, 0}},
100 {"NUMISS", 0, NUMERIC, {FMT_F, 7, 0}},
101 {"FIRST", 0, ALPHA, {-1, -1, -1}},
102 {"LAST", 0, ALPHA, {-1, -1, -1}},
103 {NULL, 0, -1, {-1, -1, -1}},
104 {"N", 0, NUMERIC, {FMT_F, 7, 0}},
105 {"NU", 0, NUMERIC, {FMT_F, 7, 0}},
108 /* Missing value types. */
109 enum missing_treatment
111 ITEMWISE, /* Missing values item by item. */
112 COLUMNWISE /* Missing values column by column. */
115 /* An entire AGGREGATE procedure. */
118 /* We have either an output file or a sink. */
119 struct file_handle *out_file; /* Output file, or null if none. */
120 struct case_sink *sink; /* Sink, or null if none. */
122 /* Break variables. */
123 struct sort_criteria *sort; /* Sort criteria. */
124 struct variable **break_vars; /* Break variables. */
125 size_t break_var_cnt; /* Number of break variables. */
126 union value *prev_break; /* Last values of break variables. */
128 enum missing_treatment missing; /* How to treat missing values. */
129 struct agr_var *agr_vars; /* First aggregate variable. */
130 struct dictionary *dict; /* Aggregate dictionary. */
131 int case_cnt; /* Counts aggregated cases. */
132 struct ccase agr_case; /* Aggregate case for output. */
133 flt64 *sfm_agr_case; /* Aggregate case in SFM format. */
136 static void initialize_aggregate_info (struct agr_proc *);
139 static int parse_aggregate_functions (struct agr_proc *);
140 static void agr_destroy (struct agr_proc *);
141 static int aggregate_single_case (struct agr_proc *agr,
142 const struct ccase *input,
143 struct ccase *output);
144 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
145 static int create_sysfile (struct agr_proc *);
147 /* Aggregating to the active file. */
148 static int agr_to_active_file (struct ccase *, void *aux);
150 /* Aggregating to a system file. */
151 static void write_case_to_sfm (struct agr_proc *agr);
152 static int presorted_agr_to_sysfile (struct ccase *, void *aux);
156 /* Parses and executes the AGGREGATE procedure. */
162 /* Have we seen these subcommands? */
167 agr.missing = ITEMWISE;
169 agr.break_vars = NULL;
173 agr.prev_break = NULL;
175 agr.dict = dict_create ();
176 dict_set_label (agr.dict, dict_get_label (default_dict));
177 dict_set_documents (agr.dict, dict_get_documents (default_dict));
179 /* Read most of the subcommands. */
184 if (lex_match_id ("OUTFILE"))
188 msg (SE, _("%s subcommand given multiple times."),"OUTFILE");
198 agr.out_file = fh_parse_file_handle ();
199 if (agr.out_file == NULL)
203 else if (lex_match_id ("MISSING"))
206 if (!lex_match_id ("COLUMNWISE"))
208 lex_error (_("while expecting COLUMNWISE"));
211 agr.missing = COLUMNWISE;
213 else if (lex_match_id ("DOCUMENT"))
215 else if (lex_match_id ("PRESORTED"))
217 else if (lex_match_id ("BREAK"))
223 msg (SE, _("%s subcommand given multiple times."),"BREAK");
229 agr.sort = sort_parse_criteria (default_dict,
230 &agr.break_vars, &agr.break_var_cnt);
231 if (agr.sort == NULL)
234 for (i = 0; i < agr.break_var_cnt; i++)
236 struct variable *v = dict_clone_var (agr.dict, agr.break_vars[i],
237 agr.break_vars[i]->name);
244 /* Check for proper syntax. */
246 msg (SW, _("BREAK subcommand not specified."));
248 /* Read in the aggregate functions. */
249 if (!parse_aggregate_functions (&agr))
252 /* Delete documents. */
254 dict_set_documents (agr.dict, NULL);
256 /* Cancel SPLIT FILE. */
257 dict_set_split_vars (agr.dict, NULL, 0);
261 case_create (&agr.agr_case, dict_get_next_value_idx (agr.dict));
262 initialize_aggregate_info (&agr);
264 /* Output to active file or external file? */
265 if (agr.out_file == NULL)
267 /* The active file will be replaced by the aggregated data,
268 so TEMPORARY is moot. */
271 if (agr.sort != NULL && (seen & 4) == 0)
272 sort_active_file_in_place (agr.sort);
274 agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
275 if (agr.sink->class->open != NULL)
276 agr.sink->class->open (agr.sink);
277 vfm_sink = create_case_sink (&null_sink_class, default_dict, NULL);
278 procedure (agr_to_active_file, &agr);
279 if (agr.case_cnt > 0)
281 dump_aggregate_info (&agr, &agr.agr_case);
282 agr.sink->class->write (agr.sink, &agr.agr_case);
284 dict_destroy (default_dict);
285 default_dict = agr.dict;
287 vfm_source = agr.sink->class->make_source (agr.sink);
288 free_case_sink (agr.sink);
292 if (!create_sysfile (&agr))
295 if (agr.sort != NULL && (seen & 4) == 0)
297 /* Sorting is needed. */
298 struct casefile *dst;
299 struct casereader *reader;
302 dst = sort_active_file_to_casefile (agr.sort);
305 reader = casefile_get_destructive_reader (dst);
306 while (casereader_read_xfer (reader, &c))
308 if (aggregate_single_case (&agr, &c, &agr.agr_case))
309 write_case_to_sfm (&agr);
312 casereader_destroy (reader);
313 casefile_destroy (dst);
317 /* Active file is already sorted. */
318 procedure (presorted_agr_to_sysfile, &agr);
321 if (agr.case_cnt > 0)
323 dump_aggregate_info (&agr, &agr.agr_case);
324 write_case_to_sfm (&agr);
326 fh_close_handle (agr.out_file);
337 /* Create a system file for use in aggregation to an external
340 create_sysfile (struct agr_proc *agr)
342 struct sfm_write_info w;
345 w.compress = get_scompression();
346 if (!sfm_write_dictionary (&w))
349 agr->sfm_agr_case = xmalloc (sizeof *agr->sfm_agr_case * w.case_size);
354 /* Parse all the aggregate functions. */
356 parse_aggregate_functions (struct agr_proc *agr)
358 struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
360 /* Parse everything. */
369 const struct agr_func *function;
374 struct variable **src;
388 /* Parse the list of target variables. */
389 while (!lex_match ('='))
391 int n_dest_prev = n_dest;
393 if (!parse_DATA_LIST_vars (&dest, &n_dest, PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
396 /* Assign empty labels. */
400 dest_label = xrealloc (dest_label, sizeof *dest_label * n_dest);
401 for (j = n_dest_prev; j < n_dest; j++)
402 dest_label[j] = NULL;
405 if (token == T_STRING)
407 ds_truncate (&tokstr, 255);
408 dest_label[n_dest - 1] = xstrdup (ds_c_str (&tokstr));
413 /* Get the name of the aggregation function. */
416 lex_error (_("expecting aggregation function"));
421 if (tokid[strlen (tokid) - 1] == '.')
424 tokid[strlen (tokid) - 1] = 0;
427 for (function = agr_func_tab; function->name; function++)
428 if (!strcmp (function->name, tokid))
430 if (NULL == function->name)
432 msg (SE, _("Unknown aggregation function %s."), tokid);
435 func_index = function - agr_func_tab;
438 /* Check for leading lparen. */
439 if (!lex_match ('('))
442 func_index = N_NO_VARS;
443 else if (func_index == NU)
444 func_index = NU_NO_VARS;
447 lex_error (_("expecting `('"));
451 /* Parse list of source variables. */
453 int pv_opts = PV_NO_SCRATCH;
455 if (func_index == SUM || func_index == MEAN || func_index == SD)
456 pv_opts |= PV_NUMERIC;
457 else if (function->n_args)
458 pv_opts |= PV_SAME_TYPE;
460 if (!parse_variables (default_dict, &src, &n_src, pv_opts))
464 /* Parse function arguments, for those functions that
465 require arguments. */
466 if (function->n_args != 0)
467 for (i = 0; i < function->n_args; i++)
472 if (token == T_STRING)
474 arg[i].c = xstrdup (ds_c_str (&tokstr));
477 else if (token == T_NUM)
482 msg (SE, _("Missing argument %d to %s."), i + 1, function->name);
488 if (type != src[0]->type)
490 msg (SE, _("Arguments to %s must be of same type as "
491 "source variables."),
497 /* Trailing rparen. */
500 lex_error (_("expecting `)'"));
504 /* Now check that the number of source variables match the
505 number of target variables. Do this here because if we
506 do it earlier then the user can get very misleading error
507 messages; i.e., `AGGREGATE x=SUM(y t).' will get this
508 error message when a proper message would be more like
509 `unknown variable t'. */
512 msg (SE, _("Number of source variables (%d) does not match "
513 "number of target variables (%d)."),
519 /* Finally add these to the linked list of aggregation
521 for (i = 0; i < n_dest; i++)
523 struct agr_var *v = xmalloc (sizeof *v);
525 /* Add variable to chain. */
526 if (agr->agr_vars != NULL)
534 /* Create the target variable in the aggregate
537 struct variable *destvar;
539 v->function = func_index;
547 if (src[i]->type == ALPHA)
549 v->function |= FSTRING;
550 v->string = xmalloc (src[i]->width);
553 if (v->src->type == NUMERIC || function->alpha_type == NUMERIC)
556 output_width = v->src->width;
558 if (function->alpha_type == ALPHA)
559 destvar = dict_clone_var (agr->dict, v->src, dest[i]);
562 destvar = dict_create_var (agr->dict, dest[i], output_width);
563 if (output_width == 0)
564 destvar->print = destvar->write = function->format;
565 if (output_width == 0 && dict_get_weight (default_dict) != NULL
566 && (func_index == N || func_index == N_NO_VARS
567 || func_index == NU || func_index == NU_NO_VARS))
569 struct fmt_spec f = {FMT_F, 8, 2};
571 destvar->print = destvar->write = f;
576 destvar = dict_create_var (agr->dict, dest[i], 0);
581 msg (SE, _("Variable name %s is not unique within the "
582 "aggregate file dictionary, which contains "
583 "the aggregate variables and the break "
594 destvar->label = dest_label[i];
595 dest_label[i] = NULL;
597 else if (function->alpha_type == ALPHA)
598 destvar->print = destvar->write = function->format;
603 v->include_missing = include_missing;
609 if (v->src->type == NUMERIC)
610 for (j = 0; j < function->n_args; j++)
611 v->arg[j].f = arg[j].f;
613 for (j = 0; j < function->n_args; j++)
614 v->arg[j].c = xstrdup (arg[j].c);
618 if (src != NULL && src[0]->type == ALPHA)
619 for (i = 0; i < function->n_args; i++)
629 if (!lex_match ('/'))
634 lex_error ("expecting end of command");
640 for (i = 0; i < n_dest; i++)
643 free (dest_label[i]);
649 if (src && n_src && src[0]->type == ALPHA)
650 for (i = 0; i < function->n_args; i++)
663 agr_destroy (struct agr_proc *agr)
665 struct agr_var *iter, *next;
667 if (agr->dict != NULL)
668 dict_destroy (agr->dict);
669 if (agr->sort != NULL)
670 sort_destroy_criteria (agr->sort);
671 free (agr->break_vars);
672 for (iter = agr->agr_vars; iter; iter = next)
676 if (iter->function & FSTRING)
681 n_args = agr_func_tab[iter->function & FUNC].n_args;
682 for (i = 0; i < n_args; i++)
683 free (iter->arg[i].c);
686 else if (iter->function == SD)
687 moments1_destroy (iter->moments);
690 free (agr->prev_break);
691 case_destroy (&agr->agr_case);
696 static void accumulate_aggregate_info (struct agr_proc *,
697 const struct ccase *);
698 static void dump_aggregate_info (struct agr_proc *, struct ccase *);
700 /* Processes a single case INPUT for aggregation. If output is
701 warranted, writes it to OUTPUT and returns nonzero.
702 Otherwise, returns zero and OUTPUT is unmodified. */
704 aggregate_single_case (struct agr_proc *agr,
705 const struct ccase *input, struct ccase *output)
707 /* The first case always begins a new break group. We also need to
708 preserve the values of the case for later comparison. */
709 if (agr->case_cnt++ == 0)
716 for (i = 0; i < agr->break_var_cnt; i++)
717 n_elem += agr->break_vars[i]->nv;
720 agr->prev_break = xmalloc (sizeof *agr->prev_break * n_elem);
722 /* Copy INPUT into prev_break. */
724 union value *iter = agr->prev_break;
727 for (i = 0; i < agr->break_var_cnt; i++)
729 struct variable *v = agr->break_vars[i];
731 if (v->type == NUMERIC)
732 (iter++)->f = case_num (input, v->fv);
735 memcpy (iter->s, case_str (input, v->fv), v->width);
741 accumulate_aggregate_info (agr, input);
746 /* Compare the value of each break variable to the values on the
749 union value *iter = agr->prev_break;
752 for (i = 0; i < agr->break_var_cnt; i++)
754 struct variable *v = agr->break_vars[i];
759 if (case_num (input, v->fv) != iter->f)
764 if (memcmp (case_str (input, v->fv), iter->s, v->width))
774 accumulate_aggregate_info (agr, input);
779 /* The values of the break variable are different from the values on
780 the previous case. That means that it's time to dump aggregate
782 dump_aggregate_info (agr, output);
783 initialize_aggregate_info (agr);
784 accumulate_aggregate_info (agr, input);
786 /* Copy INPUT into prev_break. */
788 union value *iter = agr->prev_break;
791 for (i = 0; i < agr->break_var_cnt; i++)
793 struct variable *v = agr->break_vars[i];
795 if (v->type == NUMERIC)
796 (iter++)->f = case_num (input, v->fv);
799 memcpy (iter->s, case_str (input, v->fv), v->width);
808 /* Accumulates aggregation data from the case INPUT. */
810 accumulate_aggregate_info (struct agr_proc *agr,
811 const struct ccase *input)
813 struct agr_var *iter;
817 weight = dict_get_case_weight (default_dict, input, &bad_warn);
819 for (iter = agr->agr_vars; iter; iter = iter->next)
822 const union value *v = case_data (input, iter->src->fv);
824 if ((!iter->include_missing && is_missing (v, iter->src))
825 || (iter->include_missing && iter->src->type == NUMERIC
828 switch (iter->function)
831 iter->dbl[0] += weight;
841 /* This is horrible. There are too many possibilities. */
842 switch (iter->function)
845 iter->dbl[0] += v->f;
848 iter->dbl[0] += v->f * weight;
849 iter->dbl[1] += weight;
852 moments1_add (iter->moments, v->f, weight);
855 iter->dbl[0] = max (iter->dbl[0], v->f);
859 if (memcmp (iter->string, v->s, iter->src->width) < 0)
860 memcpy (iter->string, v->s, iter->src->width);
864 iter->dbl[0] = min (iter->dbl[0], v->f);
868 if (memcmp (iter->string, v->s, iter->src->width) > 0)
869 memcpy (iter->string, v->s, iter->src->width);
874 if (v->f > iter->arg[0].f)
875 iter->dbl[0] += weight;
876 iter->dbl[1] += weight;
880 if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
881 iter->dbl[0] += weight;
882 iter->dbl[1] += weight;
886 if (v->f < iter->arg[0].f)
887 iter->dbl[0] += weight;
888 iter->dbl[1] += weight;
892 if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
893 iter->dbl[0] += weight;
894 iter->dbl[1] += weight;
898 if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
899 iter->dbl[0] += weight;
900 iter->dbl[1] += weight;
904 if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
905 && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
906 iter->dbl[0] += weight;
907 iter->dbl[1] += weight;
911 if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
912 iter->dbl[0] += weight;
913 iter->dbl[1] += weight;
917 if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
918 && memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
919 iter->dbl[0] += weight;
920 iter->dbl[1] += weight;
923 iter->dbl[0] += weight;
935 case FIRST | FSTRING:
938 memcpy (iter->string, v->s, iter->src->width);
947 memcpy (iter->string, v->s, iter->src->width);
954 switch (iter->function)
957 iter->dbl[0] += weight;
968 /* We've come to a record that differs from the previous in one or
969 more of the break variables. Make an output record from the
970 accumulated statistics in the OUTPUT case. */
972 dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
978 for (i = 0; i < agr->break_var_cnt; i++)
980 int nv = agr->break_vars[i]->nv;
981 memcpy (case_data_rw (output, value_idx),
982 &agr->prev_break[value_idx],
983 sizeof (union value) * nv);
991 for (i = agr->agr_vars; i; i = i->next)
993 union value *v = case_data_rw (output, i->dest->fv);
995 if (agr->missing == COLUMNWISE && i->missing != 0
996 && (i->function & FUNC) != N && (i->function & FUNC) != NU
997 && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
999 if (i->function & FSTRING)
1000 memset (v->s, ' ', i->dest->width);
1006 switch (i->function)
1012 v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
1018 /* FIXME: we should use two passes. */
1019 moments1_calculate (i->moments, NULL, NULL, &variance,
1021 if (variance != SYSMIS)
1022 v->f = sqrt (variance);
1029 v->f = i->int1 ? i->dbl[0] : SYSMIS;
1034 memcpy (v->s, i->string, i->dest->width);
1036 memset (v->s, ' ', i->dest->width);
1041 case FOUT | FSTRING:
1042 v->f = i->int2 ? (double) i->int1 / (double) i->int2 : SYSMIS;
1048 v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
1057 case POUT | FSTRING:
1058 v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
1068 v->f = i->int1 ? i->dbl[0] : SYSMIS;
1070 case FIRST | FSTRING:
1071 case LAST | FSTRING:
1073 memcpy (v->s, i->string, i->dest->width);
1075 memset (v->s, ' ', i->dest->width);
1096 /* Resets the state for all the aggregate functions. */
1098 initialize_aggregate_info (struct agr_proc *agr)
1100 struct agr_var *iter;
1102 for (iter = agr->agr_vars; iter; iter = iter->next)
1105 switch (iter->function)
1108 iter->dbl[0] = DBL_MAX;
1111 memset (iter->string, 255, iter->src->width);
1114 iter->dbl[0] = -DBL_MAX;
1117 memset (iter->string, 0, iter->src->width);
1120 if (iter->moments == NULL)
1121 iter->moments = moments1_create (MOMENT_VARIANCE);
1123 moments1_clear (iter->moments);
1126 iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1127 iter->int1 = iter->int2 = 0;
1133 /* Aggregate each case as it comes through. Cases which aren't needed
1136 agr_to_active_file (struct ccase *c, void *agr_)
1138 struct agr_proc *agr = agr_;
1140 if (aggregate_single_case (agr, c, &agr->agr_case))
1141 agr->sink->class->write (agr->sink, &agr->agr_case);
1146 /* Writes AGR->agr_case to AGR->out_file. */
1148 write_case_to_sfm (struct agr_proc *agr)
1153 p = agr->sfm_agr_case;
1154 for (i = 0; i < dict_get_var_cnt (agr->dict); i++)
1156 struct variable *v = dict_get_var (agr->dict, i);
1158 if (v->type == NUMERIC)
1160 double src = case_num (&agr->agr_case, v->fv);
1168 memcpy (p, case_str (&agr->agr_case, v->fv), v->width);
1169 memset (&((char *) p)[v->width], ' ',
1170 REM_RND_UP (v->width, sizeof (flt64)));
1171 p += DIV_RND_UP (v->width, sizeof (flt64));
1175 sfm_write_case (agr->out_file, agr->sfm_agr_case, p - agr->sfm_agr_case);
1178 /* Aggregate the current case and output it if we passed a
1181 presorted_agr_to_sysfile (struct ccase *c, void *agr_)
1183 struct agr_proc *agr = agr_;
1185 if (aggregate_single_case (agr, c, &agr->agr_case))
1186 write_case_to_sfm (agr);