15376f62b88142c10fadbcba0720519df66a890c
[pspp] / src / language / stats / aggregate.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2008, 2009, 2010 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <stdlib.h>
20
21 #include <data/any-writer.h>
22 #include <data/case.h>
23 #include <data/casegrouper.h>
24 #include <data/casereader.h>
25 #include <data/casewriter.h>
26 #include <data/dictionary.h>
27 #include <data/file-handle-def.h>
28 #include <data/format.h>
29 #include <data/procedure.h>
30 #include <data/settings.h>
31 #include <data/subcase.h>
32 #include <data/sys-file-writer.h>
33 #include <data/variable.h>
34 #include <language/command.h>
35 #include <language/data-io/file-handle.h>
36 #include <language/lexer/lexer.h>
37 #include <language/lexer/variable-parser.h>
38 #include <language/stats/sort-criteria.h>
39 #include <libpspp/assertion.h>
40 #include <libpspp/message.h>
41 #include <libpspp/misc.h>
42 #include <libpspp/pool.h>
43 #include <libpspp/str.h>
44 #include <math/moments.h>
45 #include <math/sort.h>
46 #include <math/statistic.h>
47 #include <math/percentiles.h>
48
49 #include "aggregate.h"
50
51 #include "minmax.h"
52 #include "xalloc.h"
53
54 #include "gettext.h"
55 #define _(msgid) gettext (msgid)
56 #define N_(msgid) msgid
57
58 /* Argument for AGGREGATE function. */
59 union agr_argument
60   {
61     double f;                           /* Numeric. */
62     char *c;                            /* Short or long string. */
63   };
64
65 /* Specifies how to make an aggregate variable. */
66 struct agr_var
67   {
68     struct agr_var *next;               /* Next in list. */
69
70     /* Collected during parsing. */
71     const struct variable *src; /* Source variable. */
72     struct variable *dest;      /* Target variable. */
73     int function;               /* Function. */
74     enum mv_class exclude;      /* Classes of missing values to exclude. */
75     union agr_argument arg[2];  /* Arguments. */
76
77     /* Accumulated during AGGREGATE execution. */
78     double dbl[3];
79     int int1, int2;
80     char *string;
81     bool saw_missing;
82     struct moments1 *moments;
83     double cc;
84
85     struct variable *subject;
86     struct variable *weight;
87     struct casewriter *writer;
88   };
89
90
91 /* Attributes of aggregation functions. */
92 const struct agr_func agr_func_tab[] =
93   {
94     {"SUM",     N_("Sum of values"), AGR_SV_YES, 0, -1,          {FMT_F, 8, 2}},
95     {"MEAN",    N_("Mean average"), AGR_SV_YES, 0, -1,          {FMT_F, 8, 2}},
96     {"MEDIAN",  N_("Median average"), AGR_SV_YES, 0, -1,          {FMT_F, 8, 2}},
97     {"SD",      N_("Standard deviation"), AGR_SV_YES, 0, -1,          {FMT_F, 8, 2}},
98     {"MAX",     N_("Maximum value"), AGR_SV_YES, 0, VAL_STRING,  {-1, -1, -1}},
99     {"MIN",     N_("Minimum value"), AGR_SV_YES, 0, VAL_STRING,  {-1, -1, -1}},
100     {"PGT",     N_("Percentage greater than"), AGR_SV_YES, 1, VAL_NUMERIC, {FMT_F, 5, 1}},
101     {"PLT",     N_("Percentage less than"), AGR_SV_YES, 1, VAL_NUMERIC, {FMT_F, 5, 1}},
102     {"PIN",     N_("Percentage included in range"), AGR_SV_YES, 2, VAL_NUMERIC, {FMT_F, 5, 1}},
103     {"POUT",    N_("Percentage excluded from range"), AGR_SV_YES, 2, VAL_NUMERIC, {FMT_F, 5, 1}},
104     {"FGT",     N_("Fraction greater than"), AGR_SV_YES, 1, VAL_NUMERIC, {FMT_F, 5, 3}},
105     {"FLT",     N_("Fraction less than"), AGR_SV_YES, 1, VAL_NUMERIC, {FMT_F, 5, 3}},
106     {"FIN",     N_("Fraction included in range"), AGR_SV_YES, 2, VAL_NUMERIC, {FMT_F, 5, 3}},
107     {"FOUT",    N_("Fraction excluded from range"), AGR_SV_YES, 2, VAL_NUMERIC, {FMT_F, 5, 3}},
108     {"N",       N_("Number of cases"), AGR_SV_NO, 0, VAL_NUMERIC, {FMT_F, 7, 0}},
109     {"NU",      N_("Number of cases (unweighted)"), AGR_SV_OPT, 0, VAL_NUMERIC, {FMT_F, 7, 0}},
110     {"NMISS",   N_("Number of missing values"), AGR_SV_YES, 0, VAL_NUMERIC, {FMT_F, 7, 0}},
111     {"NUMISS",  N_("Number of missing values (unweighted)"), AGR_SV_YES, 0, VAL_NUMERIC, {FMT_F, 7, 0}},
112     {"FIRST",   N_("First non-missing value"), AGR_SV_YES, 0, VAL_STRING,  {-1, -1, -1}},
113     {"LAST",    N_("Last non-missing value"), AGR_SV_YES, 0, VAL_STRING,  {-1, -1, -1}},
114     {NULL,      NULL, AGR_SV_NO, 0, -1,          {-1, -1, -1}},
115   };
116
117 /* Missing value types. */
118 enum missing_treatment
119   {
120     ITEMWISE,           /* Missing values item by item. */
121     COLUMNWISE          /* Missing values column by column. */
122   };
123
124 /* An entire AGGREGATE procedure. */
125 struct agr_proc
126   {
127     /* Break variables. */
128     struct subcase sort;                /* Sort criteria (break variables). */
129     const struct variable **break_vars;       /* Break variables. */
130     size_t break_var_cnt;               /* Number of break variables. */
131
132     enum missing_treatment missing;     /* How to treat missing values. */
133     struct agr_var *agr_vars;           /* First aggregate variable. */
134     struct dictionary *dict;            /* Aggregate dictionary. */
135     const struct dictionary *src_dict;  /* Dict of the source */
136     int case_cnt;                       /* Counts aggregated cases. */
137
138     bool add_variables;                 /* True iff the aggregated variables should
139                                            be appended to the existing dictionary */
140   };
141
142 static void initialize_aggregate_info (struct agr_proc *);
143
144 static void accumulate_aggregate_info (struct agr_proc *,
145                                        const struct ccase *);
146 /* Prototypes. */
147 static bool parse_aggregate_functions (struct lexer *, const struct dictionary *,
148                                        struct agr_proc *);
149 static void agr_destroy (struct agr_proc *);
150 static void dump_aggregate_info (const struct agr_proc *agr,
151                                  struct casewriter *output,
152                                  const struct ccase *break_case);
153 \f
154 /* Parsing. */
155
156 /* Parses and executes the AGGREGATE procedure. */
157 int
158 cmd_aggregate (struct lexer *lexer, struct dataset *ds)
159 {
160   struct dictionary *dict = dataset_dict (ds);
161   struct agr_proc agr;
162   struct file_handle *out_file = NULL;
163   struct casereader *input = NULL, *group;
164   struct casegrouper *grouper;
165   struct casewriter *output = NULL;
166
167   bool copy_documents = false;
168   bool presorted = false;
169   bool saw_direction;
170   bool ok;
171
172   memset(&agr, 0 , sizeof (agr));
173   agr.missing = ITEMWISE;
174   agr.src_dict = dict;
175   subcase_init_empty (&agr.sort);
176
177   /* OUTFILE subcommand must be first. */
178   lex_match (lexer, '/');
179   if (!lex_force_match_id (lexer, "OUTFILE"))
180     goto error;
181   lex_match (lexer, '=');
182   if (!lex_match (lexer, '*'))
183     {
184       out_file = fh_parse (lexer, FH_REF_FILE | FH_REF_SCRATCH);
185       if (out_file == NULL)
186         goto error;
187     }
188
189   if (out_file == NULL && lex_match_id (lexer, "MODE"))
190     {
191       lex_match (lexer, '=');
192       if (lex_match_id (lexer, "ADDVARIABLES"))
193         {
194           agr.add_variables = true;
195
196           /* presorted is assumed in ADDVARIABLES mode */
197           presorted = true;
198         }
199       else if (lex_match_id (lexer, "REPLACE"))
200         {
201           agr.add_variables = false;
202         }
203       else
204         goto error;
205     }
206
207   if ( agr.add_variables )
208     agr.dict = dict_clone (dict);
209   else
210     agr.dict = dict_create ();    
211
212   dict_set_label (agr.dict, dict_get_label (dict));
213   dict_set_documents (agr.dict, dict_get_documents (dict));
214
215   /* Read most of the subcommands. */
216   for (;;)
217     {
218       lex_match (lexer, '/');
219
220       if (lex_match_id (lexer, "MISSING"))
221         {
222           lex_match (lexer, '=');
223           if (!lex_match_id (lexer, "COLUMNWISE"))
224             {
225               lex_error (lexer, _("while expecting COLUMNWISE"));
226               goto error;
227             }
228           agr.missing = COLUMNWISE;
229         }
230       else if (lex_match_id (lexer, "DOCUMENT"))
231         copy_documents = true;
232       else if (lex_match_id (lexer, "PRESORTED"))
233         presorted = true;
234       else if (lex_match_id (lexer, "BREAK"))
235         {
236           int i;
237
238           lex_match (lexer, '=');
239           if (!parse_sort_criteria (lexer, dict, &agr.sort, &agr.break_vars,
240                                     &saw_direction))
241             goto error;
242           agr.break_var_cnt = subcase_get_n_fields (&agr.sort);
243
244           if  (! agr.add_variables)
245             for (i = 0; i < agr.break_var_cnt; i++)
246               dict_clone_var_assert (agr.dict, agr.break_vars[i]);
247
248           /* BREAK must follow the options. */
249           break;
250         }
251       else
252         {
253           lex_error (lexer, _("expecting BREAK"));
254           goto error;
255         }
256     }
257   if (presorted && saw_direction)
258     msg (SW, _("When PRESORTED is specified, specifying sorting directions "
259                "with (A) or (D) has no effect.  Output data will be sorted "
260                "the same way as the input data."));
261
262   /* Read in the aggregate functions. */
263   lex_match (lexer, '/');
264   if (!parse_aggregate_functions (lexer, dict, &agr))
265     goto error;
266
267   /* Delete documents. */
268   if (!copy_documents)
269     dict_clear_documents (agr.dict);
270
271   /* Cancel SPLIT FILE. */
272   dict_set_split_vars (agr.dict, NULL, 0);
273
274   /* Initialize. */
275   agr.case_cnt = 0;
276
277   if (out_file == NULL)
278     {
279       /* The active file will be replaced by the aggregated data,
280          so TEMPORARY is moot. */
281       proc_cancel_temporary_transformations (ds);
282       proc_discard_output (ds);
283       output = autopaging_writer_create (dict_get_proto (agr.dict));
284     }
285   else
286     {
287       output = any_writer_open (out_file, agr.dict);
288       if (output == NULL)
289         goto error;
290     }
291
292   input = proc_open (ds);
293   if (!subcase_is_empty (&agr.sort) && !presorted)
294     {
295       input = sort_execute (input, &agr.sort);
296       subcase_clear (&agr.sort);
297     }
298
299   for (grouper = casegrouper_create_vars (input, agr.break_vars,
300                                           agr.break_var_cnt);
301        casegrouper_get_next_group (grouper, &group);
302        casereader_destroy (group))
303     {
304       struct casereader *placeholder = NULL;
305       struct ccase *c = casereader_peek (group, 0);
306
307       if (c == NULL)
308         {
309           casereader_destroy (group);
310           continue;
311         }
312
313       initialize_aggregate_info (&agr);
314
315       if ( agr.add_variables )
316         placeholder = casereader_clone (group);
317
318       {
319         struct ccase *cg;
320         for (; (cg = casereader_read (group)) != NULL; case_unref (cg))
321           accumulate_aggregate_info (&agr, cg);
322       }
323
324
325       if  (agr.add_variables)
326         {
327           struct ccase *cg;
328           for (; (cg = casereader_read (placeholder)) != NULL; case_unref (cg))
329             dump_aggregate_info (&agr, output, cg);
330
331           casereader_destroy (placeholder);
332         }
333       else
334         {
335           dump_aggregate_info (&agr, output, c);
336           case_unref (c);
337         }
338     }
339   if (!casegrouper_destroy (grouper))
340     goto error;
341
342   if (!proc_commit (ds))
343     {
344       input = NULL;
345       goto error;
346     }
347   input = NULL;
348
349   if (out_file == NULL)
350     {
351       struct casereader *next_input = casewriter_make_reader (output);
352       if (next_input == NULL)
353         goto error;
354
355       proc_set_active_file (ds, next_input, agr.dict);
356       agr.dict = NULL;
357     }
358   else
359     {
360       ok = casewriter_destroy (output);
361       output = NULL;
362       if (!ok)
363         goto error;
364     }
365
366   agr_destroy (&agr);
367   fh_unref (out_file);
368   return CMD_SUCCESS;
369
370 error:
371   if (input != NULL)
372     proc_commit (ds);
373   casewriter_destroy (output);
374   agr_destroy (&agr);
375   fh_unref (out_file);
376   return CMD_CASCADING_FAILURE;
377 }
378
379 /* Parse all the aggregate functions. */
380 static bool
381 parse_aggregate_functions (struct lexer *lexer, const struct dictionary *dict,
382                            struct agr_proc *agr)
383 {
384   struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
385
386   /* Parse everything. */
387   tail = NULL;
388   for (;;)
389     {
390       char **dest;
391       char **dest_label;
392       size_t n_dest;
393       struct string function_name;
394
395       enum mv_class exclude;
396       const struct agr_func *function;
397       int func_index;
398
399       union agr_argument arg[2];
400
401       const struct variable **src;
402       size_t n_src;
403
404       size_t i;
405
406       dest = NULL;
407       dest_label = NULL;
408       n_dest = 0;
409       src = NULL;
410       function = NULL;
411       n_src = 0;
412       arg[0].c = NULL;
413       arg[1].c = NULL;
414       ds_init_empty (&function_name);
415
416       /* Parse the list of target variables. */
417       while (!lex_match (lexer, '='))
418         {
419           size_t n_dest_prev = n_dest;
420
421           if (!parse_DATA_LIST_vars (lexer, &dest, &n_dest,
422                                      (PV_APPEND | PV_SINGLE | PV_NO_SCRATCH
423                                       | PV_NO_DUPLICATE)))
424             goto error;
425
426           /* Assign empty labels. */
427           {
428             int j;
429
430             dest_label = xnrealloc (dest_label, n_dest, sizeof *dest_label);
431             for (j = n_dest_prev; j < n_dest; j++)
432               dest_label[j] = NULL;
433           }
434
435
436
437           if (lex_token (lexer) == T_STRING)
438             {
439               struct string label;
440               ds_init_string (&label, lex_tokstr (lexer));
441
442               ds_truncate (&label, 255);
443               dest_label[n_dest - 1] = ds_xstrdup (&label);
444               lex_get (lexer);
445               ds_destroy (&label);
446             }
447         }
448
449       /* Get the name of the aggregation function. */
450       if (lex_token (lexer) != T_ID)
451         {
452           lex_error (lexer, _("expecting aggregation function"));
453           goto error;
454         }
455
456       ds_assign_string (&function_name, lex_tokstr (lexer));
457       exclude = ds_chomp (&function_name, '.') ? MV_SYSTEM : MV_ANY;
458
459       for (function = agr_func_tab; function->name; function++)
460         if (!strcasecmp (function->name, ds_cstr (&function_name)))
461           break;
462       if (NULL == function->name)
463         {
464           msg (SE, _("Unknown aggregation function %s."),
465                ds_cstr (&function_name));
466           goto error;
467         }
468       ds_destroy (&function_name);
469       func_index = function - agr_func_tab;
470       lex_get (lexer);
471
472       /* Check for leading lparen. */
473       if (!lex_match (lexer, '('))
474         {
475           if (function->src_vars == AGR_SV_YES)
476             {
477               lex_error (lexer, _("expecting `('"));
478               goto error;
479             }
480         }
481       else
482         {
483           /* Parse list of source variables. */
484           {
485             int pv_opts = PV_NO_SCRATCH;
486
487             if (func_index == SUM || func_index == MEAN || func_index == SD)
488               pv_opts |= PV_NUMERIC;
489             else if (function->n_args)
490               pv_opts |= PV_SAME_TYPE;
491
492             if (!parse_variables_const (lexer, dict, &src, &n_src, pv_opts))
493               goto error;
494           }
495
496           /* Parse function arguments, for those functions that
497              require arguments. */
498           if (function->n_args != 0)
499             for (i = 0; i < function->n_args; i++)
500               {
501                 int type;
502
503                 lex_match (lexer, ',');
504                 if (lex_token (lexer) == T_STRING)
505                   {
506                     arg[i].c = ds_xstrdup (lex_tokstr (lexer));
507                     type = VAL_STRING;
508                   }
509                 else if (lex_is_number (lexer))
510                   {
511                     arg[i].f = lex_tokval (lexer);
512                     type = VAL_NUMERIC;
513                   }
514                 else
515                   {
516                     msg (SE, _("Missing argument %zu to %s."),
517                          i + 1, function->name);
518                     goto error;
519                   }
520
521                 lex_get (lexer);
522
523                 if (type != var_get_type (src[0]))
524                   {
525                     msg (SE, _("Arguments to %s must be of same type as "
526                                "source variables."),
527                          function->name);
528                     goto error;
529                   }
530               }
531
532           /* Trailing rparen. */
533           if (!lex_match (lexer, ')'))
534             {
535               lex_error (lexer, _("expecting `)'"));
536               goto error;
537             }
538
539           /* Now check that the number of source variables match
540              the number of target variables.  If we check earlier
541              than this, the user can get very misleading error
542              message, i.e. `AGGREGATE x=SUM(y t).' will get this
543              error message when a proper message would be more
544              like `unknown variable t'. */
545           if (n_src != n_dest)
546             {
547               msg (SE, _("Number of source variables (%zu) does not match "
548                          "number of target variables (%zu)."),
549                     n_src, n_dest);
550               goto error;
551             }
552
553           if ((func_index == PIN || func_index == POUT
554               || func_index == FIN || func_index == FOUT)
555               && (var_is_numeric (src[0])
556                   ? arg[0].f > arg[1].f
557                   : str_compare_rpad (arg[0].c, arg[1].c) > 0))
558             {
559               union agr_argument t = arg[0];
560               arg[0] = arg[1];
561               arg[1] = t;
562
563               msg (SW, _("The value arguments passed to the %s function "
564                          "are out-of-order.  They will be treated as if "
565                          "they had been specified in the correct order."),
566                    function->name);
567             }
568         }
569
570       /* Finally add these to the linked list of aggregation
571          variables. */
572       for (i = 0; i < n_dest; i++)
573         {
574           struct agr_var *v = xzalloc (sizeof *v);
575
576           /* Add variable to chain. */
577           if (agr->agr_vars != NULL)
578             tail->next = v;
579           else
580             agr->agr_vars = v;
581           tail = v;
582           tail->next = NULL;
583           v->moments = NULL;
584
585           /* Create the target variable in the aggregate
586              dictionary. */
587           {
588             struct variable *destvar;
589
590             v->function = func_index;
591
592             if (src)
593               {
594                 v->src = src[i];
595
596                 if (var_is_alpha (src[i]))
597                   {
598                     v->function |= FSTRING;
599                     v->string = xmalloc (var_get_width (src[i]));
600                   }
601
602                 if (function->alpha_type == VAL_STRING)
603                   destvar = dict_clone_var_as (agr->dict, v->src, dest[i]);
604                 else
605                   {
606                     assert (var_is_numeric (v->src)
607                             || function->alpha_type == VAL_NUMERIC);
608                     destvar = dict_create_var (agr->dict, dest[i], 0);
609                     if (destvar != NULL)
610                       {
611                         struct fmt_spec f;
612                         if ((func_index == N || func_index == NMISS)
613                             && dict_get_weight (dict) != NULL)
614                           f = fmt_for_output (FMT_F, 8, 2);
615                         else
616                           f = function->format;
617                         var_set_both_formats (destvar, &f);
618                       }
619                   }
620               } else {
621                 struct fmt_spec f;
622                 v->src = NULL;
623                 destvar = dict_create_var (agr->dict, dest[i], 0);
624                 if (destvar != NULL)
625                   {
626                     if ((func_index == N || func_index == NMISS)
627                         && dict_get_weight (dict) != NULL)
628                       f = fmt_for_output (FMT_F, 8, 2);
629                     else
630                       f = function->format;
631                     var_set_both_formats (destvar, &f);
632                   }
633             }
634
635             if (!destvar)
636               {
637                 msg (SE, _("Variable name %s is not unique within the "
638                            "aggregate file dictionary, which contains "
639                            "the aggregate variables and the break "
640                            "variables."),
641                      dest[i]);
642                 goto error;
643               }
644
645             free (dest[i]);
646             if (dest_label[i])
647               var_set_label (destvar, dest_label[i]);
648
649             v->dest = destvar;
650           }
651
652           v->exclude = exclude;
653
654           if (v->src != NULL)
655             {
656               int j;
657
658               if (var_is_numeric (v->src))
659                 for (j = 0; j < function->n_args; j++)
660                   v->arg[j].f = arg[j].f;
661               else
662                 for (j = 0; j < function->n_args; j++)
663                   v->arg[j].c = xstrdup (arg[j].c);
664             }
665         }
666
667       if (src != NULL && var_is_alpha (src[0]))
668         for (i = 0; i < function->n_args; i++)
669           {
670             free (arg[i].c);
671             arg[i].c = NULL;
672           }
673
674       free (src);
675       free (dest);
676       free (dest_label);
677
678       if (!lex_match (lexer, '/'))
679         {
680           if (lex_token (lexer) == '.')
681             return true;
682
683           lex_error (lexer, "expecting end of command");
684           return false;
685         }
686       continue;
687
688     error:
689       ds_destroy (&function_name);
690       for (i = 0; i < n_dest; i++)
691         {
692           free (dest[i]);
693           free (dest_label[i]);
694         }
695       free (dest);
696       free (dest_label);
697       free (arg[0].c);
698       free (arg[1].c);
699       if (src && n_src && var_is_alpha (src[0]))
700         for (i = 0; i < function->n_args; i++)
701           {
702             free (arg[i].c);
703             arg[i].c = NULL;
704           }
705       free (src);
706
707       return false;
708     }
709 }
710
711 /* Destroys AGR. */
712 static void
713 agr_destroy (struct agr_proc *agr)
714 {
715   struct agr_var *iter, *next;
716
717   subcase_destroy (&agr->sort);
718   free (agr->break_vars);
719   for (iter = agr->agr_vars; iter; iter = next)
720     {
721       next = iter->next;
722
723       if (iter->function & FSTRING)
724         {
725           size_t n_args;
726           size_t i;
727
728           n_args = agr_func_tab[iter->function & FUNC].n_args;
729           for (i = 0; i < n_args; i++)
730             free (iter->arg[i].c);
731           free (iter->string);
732         }
733       else if (iter->function == SD)
734         moments1_destroy (iter->moments);
735
736       dict_destroy_internal_var (iter->subject);
737       dict_destroy_internal_var (iter->weight);
738
739       free (iter);
740     }
741   if (agr->dict != NULL)
742     dict_destroy (agr->dict);
743 }
744 \f
745 /* Execution. */
746
747 /* Accumulates aggregation data from the case INPUT. */
748 static void
749 accumulate_aggregate_info (struct agr_proc *agr, const struct ccase *input)
750 {
751   struct agr_var *iter;
752   double weight;
753   bool bad_warn = true;
754
755   weight = dict_get_case_weight (agr->src_dict, input, &bad_warn);
756
757   for (iter = agr->agr_vars; iter; iter = iter->next)
758     if (iter->src)
759       {
760         const union value *v = case_data (input, iter->src);
761         int src_width = var_get_width (iter->src);
762
763         if (var_is_value_missing (iter->src, v, iter->exclude))
764           {
765             switch (iter->function)
766               {
767               case NMISS:
768               case NMISS | FSTRING:
769                 iter->dbl[0] += weight;
770                 break;
771               case NUMISS:
772               case NUMISS | FSTRING:
773                 iter->int1++;
774                 break;
775               }
776             iter->saw_missing = true;
777             continue;
778           }
779
780         /* This is horrible.  There are too many possibilities. */
781         switch (iter->function)
782           {
783           case SUM:
784             iter->dbl[0] += v->f * weight;
785             iter->int1 = 1;
786             break;
787           case MEAN:
788             iter->dbl[0] += v->f * weight;
789             iter->dbl[1] += weight;
790             break;
791           case MEDIAN:
792             {
793               double wv ;
794               struct ccase *cout;
795
796               cout = case_create (casewriter_get_proto (iter->writer));
797
798               case_data_rw (cout, iter->subject)->f
799                 = case_data (input, iter->src)->f;
800
801               wv = dict_get_case_weight (agr->src_dict, input, NULL);
802
803               case_data_rw (cout, iter->weight)->f = wv;
804
805               iter->cc += wv;
806
807               casewriter_write (iter->writer, cout);
808             }
809             break;
810           case SD:
811             moments1_add (iter->moments, v->f, weight);
812             break;
813           case MAX:
814             iter->dbl[0] = MAX (iter->dbl[0], v->f);
815             iter->int1 = 1;
816             break;
817           case MAX | FSTRING:
818             if (memcmp (iter->string, value_str (v, src_width), src_width) < 0)
819               memcpy (iter->string, value_str (v, src_width), src_width);
820             iter->int1 = 1;
821             break;
822           case MIN:
823             iter->dbl[0] = MIN (iter->dbl[0], v->f);
824             iter->int1 = 1;
825             break;
826           case MIN | FSTRING:
827             if (memcmp (iter->string, value_str (v, src_width), src_width) > 0)
828               memcpy (iter->string, value_str (v, src_width), src_width);
829             iter->int1 = 1;
830             break;
831           case FGT:
832           case PGT:
833             if (v->f > iter->arg[0].f)
834               iter->dbl[0] += weight;
835             iter->dbl[1] += weight;
836             break;
837           case FGT | FSTRING:
838           case PGT | FSTRING:
839             if (memcmp (iter->arg[0].c,
840                         value_str (v, src_width), src_width) < 0)
841               iter->dbl[0] += weight;
842             iter->dbl[1] += weight;
843             break;
844           case FLT:
845           case PLT:
846             if (v->f < iter->arg[0].f)
847               iter->dbl[0] += weight;
848             iter->dbl[1] += weight;
849             break;
850           case FLT | FSTRING:
851           case PLT | FSTRING:
852             if (memcmp (iter->arg[0].c,
853                         value_str (v, src_width), src_width) > 0)
854               iter->dbl[0] += weight;
855             iter->dbl[1] += weight;
856             break;
857           case FIN:
858           case PIN:
859             if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
860               iter->dbl[0] += weight;
861             iter->dbl[1] += weight;
862             break;
863           case FIN | FSTRING:
864           case PIN | FSTRING:
865             if (memcmp (iter->arg[0].c,
866                         value_str (v, src_width), src_width) <= 0
867                 && memcmp (iter->arg[1].c,
868                            value_str (v, src_width), src_width) >= 0)
869               iter->dbl[0] += weight;
870             iter->dbl[1] += weight;
871             break;
872           case FOUT:
873           case POUT:
874             if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
875               iter->dbl[0] += weight;
876             iter->dbl[1] += weight;
877             break;
878           case FOUT | FSTRING:
879           case POUT | FSTRING:
880             if (memcmp (iter->arg[0].c,
881                         value_str (v, src_width), src_width) > 0
882                 || memcmp (iter->arg[1].c,
883                            value_str (v, src_width), src_width) < 0)
884               iter->dbl[0] += weight;
885             iter->dbl[1] += weight;
886             break;
887           case N:
888           case N | FSTRING:
889             iter->dbl[0] += weight;
890             break;
891           case NU:
892           case NU | FSTRING:
893             iter->int1++;
894             break;
895           case FIRST:
896             if (iter->int1 == 0)
897               {
898                 iter->dbl[0] = v->f;
899                 iter->int1 = 1;
900               }
901             break;
902           case FIRST | FSTRING:
903             if (iter->int1 == 0)
904               {
905                 memcpy (iter->string, value_str (v, src_width), src_width);
906                 iter->int1 = 1;
907               }
908             break;
909           case LAST:
910             iter->dbl[0] = v->f;
911             iter->int1 = 1;
912             break;
913           case LAST | FSTRING:
914             memcpy (iter->string, value_str (v, src_width), src_width);
915             iter->int1 = 1;
916             break;
917           case NMISS:
918           case NMISS | FSTRING:
919           case NUMISS:
920           case NUMISS | FSTRING:
921             /* Our value is not missing or it would have been
922                caught earlier.  Nothing to do. */
923             break;
924           default:
925             NOT_REACHED ();
926           }
927       } else {
928       switch (iter->function)
929         {
930         case N:
931           iter->dbl[0] += weight;
932           break;
933         case NU:
934           iter->int1++;
935           break;
936         default:
937           NOT_REACHED ();
938         }
939     }
940 }
941
942 /* Writes an aggregated record to OUTPUT. */
943 static void
944 dump_aggregate_info (const struct agr_proc *agr, struct casewriter *output, const struct ccase *break_case)
945 {
946   struct ccase *c = case_create (dict_get_proto (agr->dict));
947
948   if ( agr->add_variables)
949     {
950       case_copy (c, 0, break_case, 0, dict_get_var_cnt (agr->src_dict));
951     }
952   else
953     {
954       int value_idx = 0;
955       int i;
956
957       for (i = 0; i < agr->break_var_cnt; i++)
958         {
959           const struct variable *v = agr->break_vars[i];
960           value_copy (case_data_rw_idx (c, value_idx),
961                       case_data (break_case, v),
962                       var_get_width (v));
963           value_idx++;
964         }
965     }
966
967   {
968     struct agr_var *i;
969
970     for (i = agr->agr_vars; i; i = i->next)
971       {
972         union value *v = case_data_rw (c, i->dest);
973         int width = var_get_width (i->dest);
974
975         if (agr->missing == COLUMNWISE && i->saw_missing
976             && (i->function & FUNC) != N && (i->function & FUNC) != NU
977             && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
978           {
979             value_set_missing (v, width);
980             casewriter_destroy (i->writer);
981             continue;
982           }
983
984         switch (i->function)
985           {
986           case SUM:
987             v->f = i->int1 ? i->dbl[0] : SYSMIS;
988             break;
989           case MEAN:
990             v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
991             break;
992           case MEDIAN:
993             {
994               if ( i->writer)
995                 {
996                   struct percentile *median = percentile_create (0.5, i->cc);
997                   struct order_stats *os = &median->parent;
998                   struct casereader *sorted_reader = casewriter_make_reader (i->writer);
999                   i->writer = NULL;
1000
1001                   order_stats_accumulate (&os, 1,
1002                                           sorted_reader,
1003                                           i->weight,
1004                                           i->subject,
1005                                           i->exclude);
1006                   i->dbl[0] = percentile_calculate (median, PC_HAVERAGE);
1007                   statistic_destroy (&median->parent.parent);
1008                 }
1009               v->f = i->dbl[0];
1010             }
1011             break;
1012           case SD:
1013             {
1014               double variance;
1015
1016               /* FIXME: we should use two passes. */
1017               moments1_calculate (i->moments, NULL, NULL, &variance,
1018                                  NULL, NULL);
1019               if (variance != SYSMIS)
1020                 v->f = sqrt (variance);
1021               else
1022                 v->f = SYSMIS;
1023             }
1024             break;
1025           case MAX:
1026           case MIN:
1027             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1028             break;
1029           case MAX | FSTRING:
1030           case MIN | FSTRING:
1031             if (i->int1)
1032               memcpy (value_str_rw (v, width), i->string, width);
1033             else
1034               value_set_missing (v, width);
1035             break;
1036           case FGT:
1037           case FGT | FSTRING:
1038           case FLT:
1039           case FLT | FSTRING:
1040           case FIN:
1041           case FIN | FSTRING:
1042           case FOUT:
1043           case FOUT | FSTRING:
1044             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
1045             break;
1046           case PGT:
1047           case PGT | FSTRING:
1048           case PLT:
1049           case PLT | FSTRING:
1050           case PIN:
1051           case PIN | FSTRING:
1052           case POUT:
1053           case POUT | FSTRING:
1054             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
1055             break;
1056           case N:
1057           case N | FSTRING:
1058               v->f = i->dbl[0];
1059             break;
1060           case NU:
1061           case NU | FSTRING:
1062             v->f = i->int1;
1063             break;
1064           case FIRST:
1065           case LAST:
1066             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1067             break;
1068           case FIRST | FSTRING:
1069           case LAST | FSTRING:
1070             if (i->int1)
1071               memcpy (value_str_rw (v, width), i->string, width);
1072             else
1073               value_set_missing (v, width);
1074             break;
1075           case NMISS:
1076           case NMISS | FSTRING:
1077             v->f = i->dbl[0];
1078             break;
1079           case NUMISS:
1080           case NUMISS | FSTRING:
1081             v->f = i->int1;
1082             break;
1083           default:
1084             NOT_REACHED ();
1085           }
1086       }
1087   }
1088
1089   casewriter_write (output, c);
1090 }
1091
1092 /* Resets the state for all the aggregate functions. */
1093 static void
1094 initialize_aggregate_info (struct agr_proc *agr)
1095 {
1096   struct agr_var *iter;
1097
1098   for (iter = agr->agr_vars; iter; iter = iter->next)
1099     {
1100       iter->saw_missing = false;
1101       iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1102       iter->int1 = iter->int2 = 0;
1103       switch (iter->function)
1104         {
1105         case MIN:
1106           iter->dbl[0] = DBL_MAX;
1107           break;
1108         case MIN | FSTRING:
1109           memset (iter->string, 255, var_get_width (iter->src));
1110           break;
1111         case MAX:
1112           iter->dbl[0] = -DBL_MAX;
1113           break;
1114         case MAX | FSTRING:
1115           memset (iter->string, 0, var_get_width (iter->src));
1116           break;
1117         case MEDIAN:
1118           {
1119             struct caseproto *proto;
1120             struct subcase ordering;
1121
1122             proto = caseproto_create ();
1123             proto = caseproto_add_width (proto, 0);
1124             proto = caseproto_add_width (proto, 0);
1125
1126             if ( ! iter->subject)
1127               iter->subject = dict_create_internal_var (0, 0);
1128
1129             if ( ! iter->weight)
1130               iter->weight = dict_create_internal_var (1, 0);
1131
1132             subcase_init_var (&ordering, iter->subject, SC_ASCEND);
1133             iter->writer = sort_create_writer (&ordering, proto);
1134             subcase_destroy (&ordering);
1135             caseproto_unref (proto);
1136
1137             iter->cc = 0;
1138           }
1139           break;
1140         case SD:
1141           if (iter->moments == NULL)
1142             iter->moments = moments1_create (MOMENT_VARIANCE);
1143           else
1144             moments1_clear (iter->moments);
1145           break;
1146         default:
1147           break;
1148         }
1149     }
1150 }