bcc8a3be09a9467ebb2bc6cfbd7389f2d67e98ce
[pspp] / src / language / stats / aggregate.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2008, 2009, 2010 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <stdlib.h>
20
21 #include <data/any-writer.h>
22 #include <data/case.h>
23 #include <data/casegrouper.h>
24 #include <data/casereader.h>
25 #include <data/casewriter.h>
26 #include <data/dictionary.h>
27 #include <data/file-handle-def.h>
28 #include <data/format.h>
29 #include <data/procedure.h>
30 #include <data/settings.h>
31 #include <data/subcase.h>
32 #include <data/sys-file-writer.h>
33 #include <data/variable.h>
34 #include <language/command.h>
35 #include <language/data-io/file-handle.h>
36 #include <language/lexer/lexer.h>
37 #include <language/lexer/variable-parser.h>
38 #include <language/stats/sort-criteria.h>
39 #include <libpspp/assertion.h>
40 #include <libpspp/message.h>
41 #include <libpspp/misc.h>
42 #include <libpspp/pool.h>
43 #include <libpspp/str.h>
44 #include <math/moments.h>
45 #include <math/sort.h>
46 #include <math/statistic.h>
47 #include <math/percentiles.h>
48
49 #include "aggregate.h"
50
51 #include "minmax.h"
52 #include "xalloc.h"
53
54 #include "gettext.h"
55 #define _(msgid) gettext (msgid)
56 #define N_(msgid) msgid
57
58 /* Argument for AGGREGATE function. */
59 union agr_argument
60   {
61     double f;                           /* Numeric. */
62     char *c;                            /* Short or long string. */
63   };
64
65 /* Specifies how to make an aggregate variable. */
66 struct agr_var
67   {
68     struct agr_var *next;               /* Next in list. */
69
70     /* Collected during parsing. */
71     const struct variable *src; /* Source variable. */
72     struct variable *dest;      /* Target variable. */
73     int function;               /* Function. */
74     enum mv_class exclude;      /* Classes of missing values to exclude. */
75     union agr_argument arg[2];  /* Arguments. */
76
77     /* Accumulated during AGGREGATE execution. */
78     double dbl[3];
79     int int1, int2;
80     char *string;
81     bool saw_missing;
82     struct moments1 *moments;
83     double cc;
84
85     struct variable *subject;
86     struct variable *weight;
87     struct casewriter *writer;
88   };
89
90
91 /* Attributes of aggregation functions. */
92 const struct agr_func agr_func_tab[] =
93   {
94     {"SUM",     N_("Sum of values"), AGR_SV_YES, 0, -1,          {FMT_F, 8, 2}},
95     {"MEAN",    N_("Mean average"), AGR_SV_YES, 0, -1,          {FMT_F, 8, 2}},
96     {"MEDIAN",  N_("Median average"), AGR_SV_YES, 0, -1,          {FMT_F, 8, 2}},
97     {"SD",      N_("Standard deviation"), AGR_SV_YES, 0, -1,          {FMT_F, 8, 2}},
98     {"MAX",     N_("Maximum value"), AGR_SV_YES, 0, VAL_STRING,  {-1, -1, -1}},
99     {"MIN",     N_("Minimum value"), AGR_SV_YES, 0, VAL_STRING,  {-1, -1, -1}},
100     {"PGT",     N_("Percentage greater than"), AGR_SV_YES, 1, VAL_NUMERIC, {FMT_F, 5, 1}},
101     {"PLT",     N_("Percentage less than"), AGR_SV_YES, 1, VAL_NUMERIC, {FMT_F, 5, 1}},
102     {"PIN",     N_("Percentage included in range"), AGR_SV_YES, 2, VAL_NUMERIC, {FMT_F, 5, 1}},
103     {"POUT",    N_("Percentage excluded from range"), AGR_SV_YES, 2, VAL_NUMERIC, {FMT_F, 5, 1}},
104     {"FGT",     N_("Fraction greater than"), AGR_SV_YES, 1, VAL_NUMERIC, {FMT_F, 5, 3}},
105     {"FLT",     N_("Fraction less than"), AGR_SV_YES, 1, VAL_NUMERIC, {FMT_F, 5, 3}},
106     {"FIN",     N_("Fraction included in range"), AGR_SV_YES, 2, VAL_NUMERIC, {FMT_F, 5, 3}},
107     {"FOUT",    N_("Fraction excluded from range"), AGR_SV_YES, 2, VAL_NUMERIC, {FMT_F, 5, 3}},
108     {"N",       N_("Number of cases"), AGR_SV_NO, 0, VAL_NUMERIC, {FMT_F, 7, 0}},
109     {"NU",      N_("Number of cases (unweighted)"), AGR_SV_OPT, 0, VAL_NUMERIC, {FMT_F, 7, 0}},
110     {"NMISS",   N_("Number of missing values"), AGR_SV_YES, 0, VAL_NUMERIC, {FMT_F, 7, 0}},
111     {"NUMISS",  N_("Number of missing values (unweighted)"), AGR_SV_YES, 0, VAL_NUMERIC, {FMT_F, 7, 0}},
112     {"FIRST",   N_("First non-missing value"), AGR_SV_YES, 0, VAL_STRING,  {-1, -1, -1}},
113     {"LAST",    N_("Last non-missing value"), AGR_SV_YES, 0, VAL_STRING,  {-1, -1, -1}},
114     {NULL,      NULL, AGR_SV_NO, 0, -1,          {-1, -1, -1}},
115   };
116
117 /* Missing value types. */
118 enum missing_treatment
119   {
120     ITEMWISE,           /* Missing values item by item. */
121     COLUMNWISE          /* Missing values column by column. */
122   };
123
124 /* An entire AGGREGATE procedure. */
125 struct agr_proc
126   {
127     /* Break variables. */
128     struct subcase sort;                /* Sort criteria (break variables). */
129     const struct variable **break_vars;       /* Break variables. */
130     size_t break_var_cnt;               /* Number of break variables. */
131
132     enum missing_treatment missing;     /* How to treat missing values. */
133     struct agr_var *agr_vars;           /* First aggregate variable. */
134     struct dictionary *dict;            /* Aggregate dictionary. */
135     const struct dictionary *src_dict;  /* Dict of the source */
136     int case_cnt;                       /* Counts aggregated cases. */
137
138     bool add_variables;                 /* True iff the aggregated variables should
139                                            be appended to the existing dictionary */
140   };
141
142 static void initialize_aggregate_info (struct agr_proc *);
143
144 static void accumulate_aggregate_info (struct agr_proc *,
145                                        const struct ccase *);
146 /* Prototypes. */
147 static bool parse_aggregate_functions (struct lexer *, const struct dictionary *,
148                                        struct agr_proc *);
149 static void agr_destroy (struct agr_proc *);
150 static void dump_aggregate_info (const struct agr_proc *agr,
151                                  struct casewriter *output,
152                                  const struct ccase *break_case);
153 \f
154 /* Parsing. */
155
156 /* Parses and executes the AGGREGATE procedure. */
157 int
158 cmd_aggregate (struct lexer *lexer, struct dataset *ds)
159 {
160   struct dictionary *dict = dataset_dict (ds);
161   struct agr_proc agr;
162   struct file_handle *out_file = NULL;
163   struct casereader *input = NULL, *group;
164   struct casegrouper *grouper;
165   struct casewriter *output = NULL;
166
167   bool copy_documents = false;
168   bool presorted = false;
169   bool saw_direction;
170   bool ok;
171
172   memset(&agr, 0 , sizeof (agr));
173   agr.missing = ITEMWISE;
174   agr.src_dict = dict;
175   subcase_init_empty (&agr.sort);
176
177   /* OUTFILE subcommand must be first. */
178   lex_match (lexer, '/');
179   if (!lex_force_match_id (lexer, "OUTFILE"))
180     goto error;
181   lex_match (lexer, '=');
182   if (!lex_match (lexer, '*'))
183     {
184       out_file = fh_parse (lexer, FH_REF_FILE | FH_REF_SCRATCH);
185       if (out_file == NULL)
186         goto error;
187     }
188
189   if (out_file == NULL && lex_match_id (lexer, "MODE"))
190     {
191       lex_match (lexer, '=');
192       if (lex_match_id (lexer, "ADDVARIABLES"))
193         {
194           agr.add_variables = true;
195
196           /* presorted is assumed in ADDVARIABLES mode */
197           presorted = true;
198         }
199       else if (lex_match_id (lexer, "REPLACE"))
200         {
201           agr.add_variables = false;
202         }
203       else
204         goto error;
205     }
206
207   if ( agr.add_variables )
208     agr.dict = dict_clone (dict);
209   else
210     agr.dict = dict_create ();    
211
212   dict_set_label (agr.dict, dict_get_label (dict));
213   dict_set_documents (agr.dict, dict_get_documents (dict));
214
215   /* Read most of the subcommands. */
216   for (;;)
217     {
218       lex_match (lexer, '/');
219
220       if (lex_match_id (lexer, "MISSING"))
221         {
222           lex_match (lexer, '=');
223           if (!lex_match_id (lexer, "COLUMNWISE"))
224             {
225               lex_error (lexer, _("while expecting COLUMNWISE"));
226               goto error;
227             }
228           agr.missing = COLUMNWISE;
229         }
230       else if (lex_match_id (lexer, "DOCUMENT"))
231         copy_documents = true;
232       else if (lex_match_id (lexer, "PRESORTED"))
233         presorted = true;
234       else if (lex_match_id (lexer, "BREAK"))
235         {
236           int i;
237
238           lex_match (lexer, '=');
239           if (!parse_sort_criteria (lexer, dict, &agr.sort, &agr.break_vars,
240                                     &saw_direction))
241             goto error;
242           agr.break_var_cnt = subcase_get_n_fields (&agr.sort);
243
244           if  (! agr.add_variables)
245             for (i = 0; i < agr.break_var_cnt; i++)
246               dict_clone_var_assert (agr.dict, agr.break_vars[i]);
247
248           /* BREAK must follow the options. */
249           break;
250         }
251       else
252         {
253           lex_error (lexer, _("expecting BREAK"));
254           goto error;
255         }
256     }
257   if (presorted && saw_direction)
258     msg (SW, _("When PRESORTED is specified, specifying sorting directions "
259                "with (A) or (D) has no effect.  Output data will be sorted "
260                "the same way as the input data."));
261
262   /* Read in the aggregate functions. */
263   lex_match (lexer, '/');
264   if (!parse_aggregate_functions (lexer, dict, &agr))
265     goto error;
266
267   /* Delete documents. */
268   if (!copy_documents)
269     dict_clear_documents (agr.dict);
270
271   /* Cancel SPLIT FILE. */
272   dict_set_split_vars (agr.dict, NULL, 0);
273
274   /* Initialize. */
275   agr.case_cnt = 0;
276
277   if (out_file == NULL)
278     {
279       /* The active file will be replaced by the aggregated data,
280          so TEMPORARY is moot. */
281       proc_cancel_temporary_transformations (ds);
282       proc_discard_output (ds);
283       output = autopaging_writer_create (dict_get_proto (agr.dict));
284     }
285   else
286     {
287       output = any_writer_open (out_file, agr.dict);
288       if (output == NULL)
289         goto error;
290     }
291
292   input = proc_open (ds);
293   if (!subcase_is_empty (&agr.sort) && !presorted)
294     {
295       input = sort_execute (input, &agr.sort);
296       subcase_clear (&agr.sort);
297     }
298
299   for (grouper = casegrouper_create_vars (input, agr.break_vars,
300                                           agr.break_var_cnt);
301        casegrouper_get_next_group (grouper, &group);
302        casereader_destroy (group))
303     {
304       struct casereader *placeholder = NULL;
305       struct ccase *c = casereader_peek (group, 0);
306
307       if (c == NULL)
308         {
309           casereader_destroy (group);
310           continue;
311         }
312
313       initialize_aggregate_info (&agr);
314
315       if ( agr.add_variables )
316         placeholder = casereader_clone (group);
317
318       {
319         struct ccase *cg;
320         for (; (cg = casereader_read (group)) != NULL; case_unref (cg))
321           accumulate_aggregate_info (&agr, cg);
322       }
323
324
325       if  (agr.add_variables)
326         {
327           struct ccase *cg;
328           for (; (cg = casereader_read (placeholder)) != NULL; case_unref (cg))
329             dump_aggregate_info (&agr, output, cg);
330
331           casereader_destroy (placeholder);
332         }
333       else
334         {
335           dump_aggregate_info (&agr, output, c);
336           case_unref (c);
337         }
338     }
339   if (!casegrouper_destroy (grouper))
340     goto error;
341
342   if (!proc_commit (ds))
343     {
344       input = NULL;
345       goto error;
346     }
347   input = NULL;
348
349   if (out_file == NULL)
350     {
351       struct casereader *next_input = casewriter_make_reader (output);
352       if (next_input == NULL)
353         goto error;
354
355       proc_set_active_file (ds, next_input, agr.dict);
356       agr.dict = NULL;
357     }
358   else
359     {
360       ok = casewriter_destroy (output);
361       output = NULL;
362       if (!ok)
363         goto error;
364     }
365
366   agr_destroy (&agr);
367   fh_unref (out_file);
368   return CMD_SUCCESS;
369
370 error:
371   if (input != NULL)
372     proc_commit (ds);
373   casewriter_destroy (output);
374   agr_destroy (&agr);
375   fh_unref (out_file);
376   return CMD_CASCADING_FAILURE;
377 }
378
379 /* Parse all the aggregate functions. */
380 static bool
381 parse_aggregate_functions (struct lexer *lexer, const struct dictionary *dict,
382                            struct agr_proc *agr)
383 {
384   struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
385
386   /* Parse everything. */
387   tail = NULL;
388   for (;;)
389     {
390       char **dest;
391       char **dest_label;
392       size_t n_dest;
393       struct string function_name;
394
395       enum mv_class exclude;
396       const struct agr_func *function;
397       int func_index;
398
399       union agr_argument arg[2];
400
401       const struct variable **src;
402       size_t n_src;
403
404       size_t i;
405
406       dest = NULL;
407       dest_label = NULL;
408       n_dest = 0;
409       src = NULL;
410       function = NULL;
411       n_src = 0;
412       arg[0].c = NULL;
413       arg[1].c = NULL;
414       ds_init_empty (&function_name);
415
416       /* Parse the list of target variables. */
417       while (!lex_match (lexer, '='))
418         {
419           size_t n_dest_prev = n_dest;
420
421           if (!parse_DATA_LIST_vars (lexer, &dest, &n_dest,
422                                      (PV_APPEND | PV_SINGLE | PV_NO_SCRATCH
423                                       | PV_NO_DUPLICATE)))
424             goto error;
425
426           /* Assign empty labels. */
427           {
428             int j;
429
430             dest_label = xnrealloc (dest_label, n_dest, sizeof *dest_label);
431             for (j = n_dest_prev; j < n_dest; j++)
432               dest_label[j] = NULL;
433           }
434
435
436
437           if (lex_token (lexer) == T_STRING)
438             {
439               struct string label;
440               ds_init_string (&label, lex_tokstr (lexer));
441
442               ds_truncate (&label, 255);
443               dest_label[n_dest - 1] = ds_xstrdup (&label);
444               lex_get (lexer);
445               ds_destroy (&label);
446             }
447         }
448
449       /* Get the name of the aggregation function. */
450       if (lex_token (lexer) != T_ID)
451         {
452           lex_error (lexer, _("expecting aggregation function"));
453           goto error;
454         }
455
456       exclude = MV_ANY;
457
458       ds_assign_string (&function_name, lex_tokstr (lexer));
459
460       ds_chomp (&function_name, '.');
461
462       if (lex_tokid(lexer)[strlen (lex_tokid (lexer)) - 1] == '.')
463         exclude = MV_SYSTEM;
464
465       for (function = agr_func_tab; function->name; function++)
466         if (!strcasecmp (function->name, ds_cstr (&function_name)))
467           break;
468       if (NULL == function->name)
469         {
470           msg (SE, _("Unknown aggregation function %s."),
471                ds_cstr (&function_name));
472           goto error;
473         }
474       ds_destroy (&function_name);
475       func_index = function - agr_func_tab;
476       lex_get (lexer);
477
478       /* Check for leading lparen. */
479       if (!lex_match (lexer, '('))
480         {
481           if (function->src_vars == AGR_SV_YES)
482             {
483               lex_error (lexer, _("expecting `('"));
484               goto error;
485             }
486         }
487       else
488         {
489           /* Parse list of source variables. */
490           {
491             int pv_opts = PV_NO_SCRATCH;
492
493             if (func_index == SUM || func_index == MEAN || func_index == SD)
494               pv_opts |= PV_NUMERIC;
495             else if (function->n_args)
496               pv_opts |= PV_SAME_TYPE;
497
498             if (!parse_variables_const (lexer, dict, &src, &n_src, pv_opts))
499               goto error;
500           }
501
502           /* Parse function arguments, for those functions that
503              require arguments. */
504           if (function->n_args != 0)
505             for (i = 0; i < function->n_args; i++)
506               {
507                 int type;
508
509                 lex_match (lexer, ',');
510                 if (lex_token (lexer) == T_STRING)
511                   {
512                     arg[i].c = ds_xstrdup (lex_tokstr (lexer));
513                     type = VAL_STRING;
514                   }
515                 else if (lex_is_number (lexer))
516                   {
517                     arg[i].f = lex_tokval (lexer);
518                     type = VAL_NUMERIC;
519                   }
520                 else
521                   {
522                     msg (SE, _("Missing argument %zu to %s."),
523                          i + 1, function->name);
524                     goto error;
525                   }
526
527                 lex_get (lexer);
528
529                 if (type != var_get_type (src[0]))
530                   {
531                     msg (SE, _("Arguments to %s must be of same type as "
532                                "source variables."),
533                          function->name);
534                     goto error;
535                   }
536               }
537
538           /* Trailing rparen. */
539           if (!lex_match (lexer, ')'))
540             {
541               lex_error (lexer, _("expecting `)'"));
542               goto error;
543             }
544
545           /* Now check that the number of source variables match
546              the number of target variables.  If we check earlier
547              than this, the user can get very misleading error
548              message, i.e. `AGGREGATE x=SUM(y t).' will get this
549              error message when a proper message would be more
550              like `unknown variable t'. */
551           if (n_src != n_dest)
552             {
553               msg (SE, _("Number of source variables (%zu) does not match "
554                          "number of target variables (%zu)."),
555                     n_src, n_dest);
556               goto error;
557             }
558
559           if ((func_index == PIN || func_index == POUT
560               || func_index == FIN || func_index == FOUT)
561               && (var_is_numeric (src[0])
562                   ? arg[0].f > arg[1].f
563                   : str_compare_rpad (arg[0].c, arg[1].c) > 0))
564             {
565               union agr_argument t = arg[0];
566               arg[0] = arg[1];
567               arg[1] = t;
568
569               msg (SW, _("The value arguments passed to the %s function "
570                          "are out-of-order.  They will be treated as if "
571                          "they had been specified in the correct order."),
572                    function->name);
573             }
574         }
575
576       /* Finally add these to the linked list of aggregation
577          variables. */
578       for (i = 0; i < n_dest; i++)
579         {
580           struct agr_var *v = xzalloc (sizeof *v);
581
582           /* Add variable to chain. */
583           if (agr->agr_vars != NULL)
584             tail->next = v;
585           else
586             agr->agr_vars = v;
587           tail = v;
588           tail->next = NULL;
589           v->moments = NULL;
590
591           /* Create the target variable in the aggregate
592              dictionary. */
593           {
594             struct variable *destvar;
595
596             v->function = func_index;
597
598             if (src)
599               {
600                 v->src = src[i];
601
602                 if (var_is_alpha (src[i]))
603                   {
604                     v->function |= FSTRING;
605                     v->string = xmalloc (var_get_width (src[i]));
606                   }
607
608                 if (function->alpha_type == VAL_STRING)
609                   destvar = dict_clone_var_as (agr->dict, v->src, dest[i]);
610                 else
611                   {
612                     assert (var_is_numeric (v->src)
613                             || function->alpha_type == VAL_NUMERIC);
614                     destvar = dict_create_var (agr->dict, dest[i], 0);
615                     if (destvar != NULL)
616                       {
617                         struct fmt_spec f;
618                         if ((func_index == N || func_index == NMISS)
619                             && dict_get_weight (dict) != NULL)
620                           f = fmt_for_output (FMT_F, 8, 2);
621                         else
622                           f = function->format;
623                         var_set_both_formats (destvar, &f);
624                       }
625                   }
626               } else {
627                 struct fmt_spec f;
628                 v->src = NULL;
629                 destvar = dict_create_var (agr->dict, dest[i], 0);
630                 if (destvar != NULL)
631                   {
632                     if ((func_index == N || func_index == NMISS)
633                         && dict_get_weight (dict) != NULL)
634                       f = fmt_for_output (FMT_F, 8, 2);
635                     else
636                       f = function->format;
637                     var_set_both_formats (destvar, &f);
638                   }
639             }
640
641             if (!destvar)
642               {
643                 msg (SE, _("Variable name %s is not unique within the "
644                            "aggregate file dictionary, which contains "
645                            "the aggregate variables and the break "
646                            "variables."),
647                      dest[i]);
648                 goto error;
649               }
650
651             free (dest[i]);
652             if (dest_label[i])
653               var_set_label (destvar, dest_label[i]);
654
655             v->dest = destvar;
656           }
657
658           v->exclude = exclude;
659
660           if (v->src != NULL)
661             {
662               int j;
663
664               if (var_is_numeric (v->src))
665                 for (j = 0; j < function->n_args; j++)
666                   v->arg[j].f = arg[j].f;
667               else
668                 for (j = 0; j < function->n_args; j++)
669                   v->arg[j].c = xstrdup (arg[j].c);
670             }
671         }
672
673       if (src != NULL && var_is_alpha (src[0]))
674         for (i = 0; i < function->n_args; i++)
675           {
676             free (arg[i].c);
677             arg[i].c = NULL;
678           }
679
680       free (src);
681       free (dest);
682       free (dest_label);
683
684       if (!lex_match (lexer, '/'))
685         {
686           if (lex_token (lexer) == '.')
687             return true;
688
689           lex_error (lexer, "expecting end of command");
690           return false;
691         }
692       continue;
693
694     error:
695       ds_destroy (&function_name);
696       for (i = 0; i < n_dest; i++)
697         {
698           free (dest[i]);
699           free (dest_label[i]);
700         }
701       free (dest);
702       free (dest_label);
703       free (arg[0].c);
704       free (arg[1].c);
705       if (src && n_src && var_is_alpha (src[0]))
706         for (i = 0; i < function->n_args; i++)
707           {
708             free (arg[i].c);
709             arg[i].c = NULL;
710           }
711       free (src);
712
713       return false;
714     }
715 }
716
717 /* Destroys AGR. */
718 static void
719 agr_destroy (struct agr_proc *agr)
720 {
721   struct agr_var *iter, *next;
722
723   subcase_destroy (&agr->sort);
724   free (agr->break_vars);
725   for (iter = agr->agr_vars; iter; iter = next)
726     {
727       next = iter->next;
728
729       if (iter->function & FSTRING)
730         {
731           size_t n_args;
732           size_t i;
733
734           n_args = agr_func_tab[iter->function & FUNC].n_args;
735           for (i = 0; i < n_args; i++)
736             free (iter->arg[i].c);
737           free (iter->string);
738         }
739       else if (iter->function == SD)
740         moments1_destroy (iter->moments);
741
742       dict_destroy_internal_var (iter->subject);
743       dict_destroy_internal_var (iter->weight);
744
745       free (iter);
746     }
747   if (agr->dict != NULL)
748     dict_destroy (agr->dict);
749 }
750 \f
751 /* Execution. */
752
753 /* Accumulates aggregation data from the case INPUT. */
754 static void
755 accumulate_aggregate_info (struct agr_proc *agr, const struct ccase *input)
756 {
757   struct agr_var *iter;
758   double weight;
759   bool bad_warn = true;
760
761   weight = dict_get_case_weight (agr->src_dict, input, &bad_warn);
762
763   for (iter = agr->agr_vars; iter; iter = iter->next)
764     if (iter->src)
765       {
766         const union value *v = case_data (input, iter->src);
767         int src_width = var_get_width (iter->src);
768
769         if (var_is_value_missing (iter->src, v, iter->exclude))
770           {
771             switch (iter->function)
772               {
773               case NMISS:
774               case NMISS | FSTRING:
775                 iter->dbl[0] += weight;
776                 break;
777               case NUMISS:
778               case NUMISS | FSTRING:
779                 iter->int1++;
780                 break;
781               }
782             iter->saw_missing = true;
783             continue;
784           }
785
786         /* This is horrible.  There are too many possibilities. */
787         switch (iter->function)
788           {
789           case SUM:
790             iter->dbl[0] += v->f * weight;
791             iter->int1 = 1;
792             break;
793           case MEAN:
794             iter->dbl[0] += v->f * weight;
795             iter->dbl[1] += weight;
796             break;
797           case MEDIAN:
798             {
799               double wv ;
800               struct ccase *cout;
801
802               cout = case_create (casewriter_get_proto (iter->writer));
803
804               case_data_rw (cout, iter->subject)->f
805                 = case_data (input, iter->src)->f;
806
807               wv = dict_get_case_weight (agr->src_dict, input, NULL);
808
809               case_data_rw (cout, iter->weight)->f = wv;
810
811               iter->cc += wv;
812
813               casewriter_write (iter->writer, cout);
814             }
815             break;
816           case SD:
817             moments1_add (iter->moments, v->f, weight);
818             break;
819           case MAX:
820             iter->dbl[0] = MAX (iter->dbl[0], v->f);
821             iter->int1 = 1;
822             break;
823           case MAX | FSTRING:
824             if (memcmp (iter->string, value_str (v, src_width), src_width) < 0)
825               memcpy (iter->string, value_str (v, src_width), src_width);
826             iter->int1 = 1;
827             break;
828           case MIN:
829             iter->dbl[0] = MIN (iter->dbl[0], v->f);
830             iter->int1 = 1;
831             break;
832           case MIN | FSTRING:
833             if (memcmp (iter->string, value_str (v, src_width), src_width) > 0)
834               memcpy (iter->string, value_str (v, src_width), src_width);
835             iter->int1 = 1;
836             break;
837           case FGT:
838           case PGT:
839             if (v->f > iter->arg[0].f)
840               iter->dbl[0] += weight;
841             iter->dbl[1] += weight;
842             break;
843           case FGT | FSTRING:
844           case PGT | FSTRING:
845             if (memcmp (iter->arg[0].c,
846                         value_str (v, src_width), src_width) < 0)
847               iter->dbl[0] += weight;
848             iter->dbl[1] += weight;
849             break;
850           case FLT:
851           case PLT:
852             if (v->f < iter->arg[0].f)
853               iter->dbl[0] += weight;
854             iter->dbl[1] += weight;
855             break;
856           case FLT | FSTRING:
857           case PLT | FSTRING:
858             if (memcmp (iter->arg[0].c,
859                         value_str (v, src_width), src_width) > 0)
860               iter->dbl[0] += weight;
861             iter->dbl[1] += weight;
862             break;
863           case FIN:
864           case PIN:
865             if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
866               iter->dbl[0] += weight;
867             iter->dbl[1] += weight;
868             break;
869           case FIN | FSTRING:
870           case PIN | FSTRING:
871             if (memcmp (iter->arg[0].c,
872                         value_str (v, src_width), src_width) <= 0
873                 && memcmp (iter->arg[1].c,
874                            value_str (v, src_width), src_width) >= 0)
875               iter->dbl[0] += weight;
876             iter->dbl[1] += weight;
877             break;
878           case FOUT:
879           case POUT:
880             if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
881               iter->dbl[0] += weight;
882             iter->dbl[1] += weight;
883             break;
884           case FOUT | FSTRING:
885           case POUT | FSTRING:
886             if (memcmp (iter->arg[0].c,
887                         value_str (v, src_width), src_width) > 0
888                 || memcmp (iter->arg[1].c,
889                            value_str (v, src_width), src_width) < 0)
890               iter->dbl[0] += weight;
891             iter->dbl[1] += weight;
892             break;
893           case N:
894           case N | FSTRING:
895             iter->dbl[0] += weight;
896             break;
897           case NU:
898           case NU | FSTRING:
899             iter->int1++;
900             break;
901           case FIRST:
902             if (iter->int1 == 0)
903               {
904                 iter->dbl[0] = v->f;
905                 iter->int1 = 1;
906               }
907             break;
908           case FIRST | FSTRING:
909             if (iter->int1 == 0)
910               {
911                 memcpy (iter->string, value_str (v, src_width), src_width);
912                 iter->int1 = 1;
913               }
914             break;
915           case LAST:
916             iter->dbl[0] = v->f;
917             iter->int1 = 1;
918             break;
919           case LAST | FSTRING:
920             memcpy (iter->string, value_str (v, src_width), src_width);
921             iter->int1 = 1;
922             break;
923           case NMISS:
924           case NMISS | FSTRING:
925           case NUMISS:
926           case NUMISS | FSTRING:
927             /* Our value is not missing or it would have been
928                caught earlier.  Nothing to do. */
929             break;
930           default:
931             NOT_REACHED ();
932           }
933       } else {
934       switch (iter->function)
935         {
936         case N:
937           iter->dbl[0] += weight;
938           break;
939         case NU:
940           iter->int1++;
941           break;
942         default:
943           NOT_REACHED ();
944         }
945     }
946 }
947
948 /* Writes an aggregated record to OUTPUT. */
949 static void
950 dump_aggregate_info (const struct agr_proc *agr, struct casewriter *output, const struct ccase *break_case)
951 {
952   struct ccase *c = case_create (dict_get_proto (agr->dict));
953
954   if ( agr->add_variables)
955     {
956       case_copy (c, 0, break_case, 0, dict_get_var_cnt (agr->src_dict));
957     }
958   else
959     {
960       int value_idx = 0;
961       int i;
962
963       for (i = 0; i < agr->break_var_cnt; i++)
964         {
965           const struct variable *v = agr->break_vars[i];
966           value_copy (case_data_rw_idx (c, value_idx),
967                       case_data (break_case, v),
968                       var_get_width (v));
969           value_idx++;
970         }
971     }
972
973   {
974     struct agr_var *i;
975
976     for (i = agr->agr_vars; i; i = i->next)
977       {
978         union value *v = case_data_rw (c, i->dest);
979         int width = var_get_width (i->dest);
980
981         if (agr->missing == COLUMNWISE && i->saw_missing
982             && (i->function & FUNC) != N && (i->function & FUNC) != NU
983             && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
984           {
985             value_set_missing (v, width);
986             casewriter_destroy (i->writer);
987             continue;
988           }
989
990         switch (i->function)
991           {
992           case SUM:
993             v->f = i->int1 ? i->dbl[0] : SYSMIS;
994             break;
995           case MEAN:
996             v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
997             break;
998           case MEDIAN:
999             {
1000               if ( i->writer)
1001                 {
1002                   struct percentile *median = percentile_create (0.5, i->cc);
1003                   struct order_stats *os = &median->parent;
1004                   struct casereader *sorted_reader = casewriter_make_reader (i->writer);
1005                   i->writer = NULL;
1006
1007                   order_stats_accumulate (&os, 1,
1008                                           sorted_reader,
1009                                           i->weight,
1010                                           i->subject,
1011                                           i->exclude);
1012                   i->dbl[0] = percentile_calculate (median, PC_HAVERAGE);
1013                   statistic_destroy (&median->parent.parent);
1014                 }
1015               v->f = i->dbl[0];
1016             }
1017             break;
1018           case SD:
1019             {
1020               double variance;
1021
1022               /* FIXME: we should use two passes. */
1023               moments1_calculate (i->moments, NULL, NULL, &variance,
1024                                  NULL, NULL);
1025               if (variance != SYSMIS)
1026                 v->f = sqrt (variance);
1027               else
1028                 v->f = SYSMIS;
1029             }
1030             break;
1031           case MAX:
1032           case MIN:
1033             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1034             break;
1035           case MAX | FSTRING:
1036           case MIN | FSTRING:
1037             if (i->int1)
1038               memcpy (value_str_rw (v, width), i->string, width);
1039             else
1040               value_set_missing (v, width);
1041             break;
1042           case FGT:
1043           case FGT | FSTRING:
1044           case FLT:
1045           case FLT | FSTRING:
1046           case FIN:
1047           case FIN | FSTRING:
1048           case FOUT:
1049           case FOUT | FSTRING:
1050             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
1051             break;
1052           case PGT:
1053           case PGT | FSTRING:
1054           case PLT:
1055           case PLT | FSTRING:
1056           case PIN:
1057           case PIN | FSTRING:
1058           case POUT:
1059           case POUT | FSTRING:
1060             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
1061             break;
1062           case N:
1063           case N | FSTRING:
1064               v->f = i->dbl[0];
1065             break;
1066           case NU:
1067           case NU | FSTRING:
1068             v->f = i->int1;
1069             break;
1070           case FIRST:
1071           case LAST:
1072             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1073             break;
1074           case FIRST | FSTRING:
1075           case LAST | FSTRING:
1076             if (i->int1)
1077               memcpy (value_str_rw (v, width), i->string, width);
1078             else
1079               value_set_missing (v, width);
1080             break;
1081           case NMISS:
1082           case NMISS | FSTRING:
1083             v->f = i->dbl[0];
1084             break;
1085           case NUMISS:
1086           case NUMISS | FSTRING:
1087             v->f = i->int1;
1088             break;
1089           default:
1090             NOT_REACHED ();
1091           }
1092       }
1093   }
1094
1095   casewriter_write (output, c);
1096 }
1097
1098 /* Resets the state for all the aggregate functions. */
1099 static void
1100 initialize_aggregate_info (struct agr_proc *agr)
1101 {
1102   struct agr_var *iter;
1103
1104   for (iter = agr->agr_vars; iter; iter = iter->next)
1105     {
1106       iter->saw_missing = false;
1107       iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1108       iter->int1 = iter->int2 = 0;
1109       switch (iter->function)
1110         {
1111         case MIN:
1112           iter->dbl[0] = DBL_MAX;
1113           break;
1114         case MIN | FSTRING:
1115           memset (iter->string, 255, var_get_width (iter->src));
1116           break;
1117         case MAX:
1118           iter->dbl[0] = -DBL_MAX;
1119           break;
1120         case MAX | FSTRING:
1121           memset (iter->string, 0, var_get_width (iter->src));
1122           break;
1123         case MEDIAN:
1124           {
1125             struct caseproto *proto;
1126             struct subcase ordering;
1127
1128             proto = caseproto_create ();
1129             proto = caseproto_add_width (proto, 0);
1130             proto = caseproto_add_width (proto, 0);
1131
1132             if ( ! iter->subject)
1133               iter->subject = dict_create_internal_var (0, 0);
1134
1135             if ( ! iter->weight)
1136               iter->weight = dict_create_internal_var (1, 0);
1137
1138             subcase_init_var (&ordering, iter->subject, SC_ASCEND);
1139             iter->writer = sort_create_writer (&ordering, proto);
1140             subcase_destroy (&ordering);
1141             caseproto_unref (proto);
1142
1143             iter->cc = 0;
1144           }
1145           break;
1146         case SD:
1147           if (iter->moments == NULL)
1148             iter->moments = moments1_create (MOMENT_VARIANCE);
1149           else
1150             moments1_clear (iter->moments);
1151           break;
1152         default:
1153           break;
1154         }
1155     }
1156 }