2a4eda87ad042092f81d3eccd93c4bb46c2f4025
[pspp-builds.git] / src / language / stats / aggregate.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2008, 2009, 2010, 2011 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include "language/stats/aggregate.h"
20
21 #include <stdlib.h>
22
23 #include "data/any-writer.h"
24 #include "data/case.h"
25 #include "data/casegrouper.h"
26 #include "data/casereader.h"
27 #include "data/casewriter.h"
28 #include "data/dictionary.h"
29 #include "data/file-handle-def.h"
30 #include "data/format.h"
31 #include "data/procedure.h"
32 #include "data/settings.h"
33 #include "data/subcase.h"
34 #include "data/sys-file-writer.h"
35 #include "data/variable.h"
36 #include "language/command.h"
37 #include "language/data-io/file-handle.h"
38 #include "language/lexer/lexer.h"
39 #include "language/lexer/variable-parser.h"
40 #include "language/stats/sort-criteria.h"
41 #include "libpspp/assertion.h"
42 #include "libpspp/i18n.h"
43 #include "libpspp/message.h"
44 #include "libpspp/misc.h"
45 #include "libpspp/pool.h"
46 #include "libpspp/str.h"
47 #include "math/moments.h"
48 #include "math/percentiles.h"
49 #include "math/sort.h"
50 #include "math/statistic.h"
51
52 #include "gl/minmax.h"
53 #include "gl/xalloc.h"
54
55 #include "gettext.h"
56 #define _(msgid) gettext (msgid)
57 #define N_(msgid) msgid
58
59 /* Argument for AGGREGATE function. */
60 union agr_argument
61   {
62     double f;                           /* Numeric. */
63     char *c;                            /* Short or long string. */
64   };
65
66 /* Specifies how to make an aggregate variable. */
67 struct agr_var
68   {
69     struct agr_var *next;               /* Next in list. */
70
71     /* Collected during parsing. */
72     const struct variable *src; /* Source variable. */
73     struct variable *dest;      /* Target variable. */
74     int function;               /* Function. */
75     enum mv_class exclude;      /* Classes of missing values to exclude. */
76     union agr_argument arg[2];  /* Arguments. */
77
78     /* Accumulated during AGGREGATE execution. */
79     double dbl[3];
80     int int1, int2;
81     char *string;
82     bool saw_missing;
83     struct moments1 *moments;
84     double cc;
85
86     struct variable *subject;
87     struct variable *weight;
88     struct casewriter *writer;
89   };
90
91
92 /* Attributes of aggregation functions. */
93 const struct agr_func agr_func_tab[] =
94   {
95     {"SUM",     N_("Sum of values"),                         AGR_SV_YES, 0, -1,          {FMT_F, 8, 2}},
96     {"MEAN",    N_("Mean average"),                          AGR_SV_YES, 0, -1,          {FMT_F, 8, 2}},
97     {"MEDIAN",  N_("Median average"),                        AGR_SV_YES, 0, -1,          {FMT_F, 8, 2}},
98     {"SD",      N_("Standard deviation"),                    AGR_SV_YES, 0, -1,          {FMT_F, 8, 2}},
99     {"MAX",     N_("Maximum value"),                         AGR_SV_YES, 0, VAL_STRING,  {-1, -1, -1}},
100     {"MIN",     N_("Minimum value"),                         AGR_SV_YES, 0, VAL_STRING,  {-1, -1, -1}},
101     {"PGT",     N_("Percentage greater than"),               AGR_SV_YES, 1, VAL_NUMERIC, {FMT_F, 5, 1}},
102     {"PLT",     N_("Percentage less than"),                  AGR_SV_YES, 1, VAL_NUMERIC, {FMT_F, 5, 1}},
103     {"PIN",     N_("Percentage included in range"),          AGR_SV_YES, 2, VAL_NUMERIC, {FMT_F, 5, 1}},
104     {"POUT",    N_("Percentage excluded from range"),        AGR_SV_YES, 2, VAL_NUMERIC, {FMT_F, 5, 1}},
105     {"FGT",     N_("Fraction greater than"),                 AGR_SV_YES, 1, VAL_NUMERIC, {FMT_F, 5, 3}},
106     {"FLT",     N_("Fraction less than"),                    AGR_SV_YES, 1, VAL_NUMERIC, {FMT_F, 5, 3}},
107     {"FIN",     N_("Fraction included in range"),            AGR_SV_YES, 2, VAL_NUMERIC, {FMT_F, 5, 3}},
108     {"FOUT",    N_("Fraction excluded from range"),          AGR_SV_YES, 2, VAL_NUMERIC, {FMT_F, 5, 3}},
109     {"N",       N_("Number of cases"),                       AGR_SV_NO,  0, VAL_NUMERIC, {FMT_F, 7, 0}},
110     {"NU",      N_("Number of cases (unweighted)"),          AGR_SV_OPT, 0, VAL_NUMERIC, {FMT_F, 7, 0}},
111     {"NMISS",   N_("Number of missing values"),              AGR_SV_YES, 0, VAL_NUMERIC, {FMT_F, 7, 0}},
112     {"NUMISS",  N_("Number of missing values (unweighted)"), AGR_SV_YES, 0, VAL_NUMERIC, {FMT_F, 7, 0}},
113     {"FIRST",   N_("First non-missing value"),               AGR_SV_YES, 0, VAL_STRING,  {-1, -1, -1}},
114     {"LAST",    N_("Last non-missing value"),                AGR_SV_YES, 0, VAL_STRING,  {-1, -1, -1}},
115     {NULL,      NULL,                                        AGR_SV_NO,  0, -1,          {-1, -1, -1}},
116   };
117
118 /* Missing value types. */
119 enum missing_treatment
120   {
121     ITEMWISE,           /* Missing values item by item. */
122     COLUMNWISE          /* Missing values column by column. */
123   };
124
125 /* An entire AGGREGATE procedure. */
126 struct agr_proc
127   {
128     /* Break variables. */
129     struct subcase sort;                /* Sort criteria (break variables). */
130     const struct variable **break_vars;       /* Break variables. */
131     size_t break_var_cnt;               /* Number of break variables. */
132
133     enum missing_treatment missing;     /* How to treat missing values. */
134     struct agr_var *agr_vars;           /* First aggregate variable. */
135     struct dictionary *dict;            /* Aggregate dictionary. */
136     const struct dictionary *src_dict;  /* Dict of the source */
137     int case_cnt;                       /* Counts aggregated cases. */
138
139     bool add_variables;                 /* True iff the aggregated variables should
140                                            be appended to the existing dictionary */
141   };
142
143 static void initialize_aggregate_info (struct agr_proc *);
144
145 static void accumulate_aggregate_info (struct agr_proc *,
146                                        const struct ccase *);
147 /* Prototypes. */
148 static bool parse_aggregate_functions (struct lexer *, const struct dictionary *,
149                                        struct agr_proc *);
150 static void agr_destroy (struct agr_proc *);
151 static void dump_aggregate_info (const struct agr_proc *agr,
152                                  struct casewriter *output,
153                                  const struct ccase *break_case);
154 \f
155 /* Parsing. */
156
157 /* Parses and executes the AGGREGATE procedure. */
158 int
159 cmd_aggregate (struct lexer *lexer, struct dataset *ds)
160 {
161   struct dictionary *dict = dataset_dict (ds);
162   struct agr_proc agr;
163   struct file_handle *out_file = NULL;
164   struct casereader *input = NULL, *group;
165   struct casegrouper *grouper;
166   struct casewriter *output = NULL;
167
168   bool copy_documents = false;
169   bool presorted = false;
170   bool saw_direction;
171   bool ok;
172
173   memset(&agr, 0 , sizeof (agr));
174   agr.missing = ITEMWISE;
175   agr.src_dict = dict;
176   subcase_init_empty (&agr.sort);
177
178   /* OUTFILE subcommand must be first. */
179   lex_match (lexer, T_SLASH);
180   if (!lex_force_match_id (lexer, "OUTFILE"))
181     goto error;
182   lex_match (lexer, T_EQUALS);
183   if (!lex_match (lexer, T_ASTERISK))
184     {
185       out_file = fh_parse (lexer, FH_REF_FILE | FH_REF_SCRATCH);
186       if (out_file == NULL)
187         goto error;
188     }
189
190   if (out_file == NULL && lex_match_id (lexer, "MODE"))
191     {
192       lex_match (lexer, T_EQUALS);
193       if (lex_match_id (lexer, "ADDVARIABLES"))
194         {
195           agr.add_variables = true;
196
197           /* presorted is assumed in ADDVARIABLES mode */
198           presorted = true;
199         }
200       else if (lex_match_id (lexer, "REPLACE"))
201         {
202           agr.add_variables = false;
203         }
204       else
205         goto error;
206     }
207
208   if ( agr.add_variables )
209     agr.dict = dict_clone (dict);
210   else
211     agr.dict = dict_create ();    
212
213   dict_set_label (agr.dict, dict_get_label (dict));
214   dict_set_documents (agr.dict, dict_get_documents (dict));
215
216   /* Read most of the subcommands. */
217   for (;;)
218     {
219       lex_match (lexer, T_SLASH);
220
221       if (lex_match_id (lexer, "MISSING"))
222         {
223           lex_match (lexer, T_EQUALS);
224           if (!lex_match_id (lexer, "COLUMNWISE"))
225             {
226               lex_error (lexer, _("expecting %s"), "COLUMNWISE");
227               goto error;
228             }
229           agr.missing = COLUMNWISE;
230         }
231       else if (lex_match_id (lexer, "DOCUMENT"))
232         copy_documents = true;
233       else if (lex_match_id (lexer, "PRESORTED"))
234         presorted = true;
235       else if (lex_force_match_id (lexer, "BREAK"))
236         {
237           int i;
238
239           lex_match (lexer, T_EQUALS);
240           if (!parse_sort_criteria (lexer, dict, &agr.sort, &agr.break_vars,
241                                     &saw_direction))
242             goto error;
243           agr.break_var_cnt = subcase_get_n_fields (&agr.sort);
244
245           if  (! agr.add_variables)
246             for (i = 0; i < agr.break_var_cnt; i++)
247               dict_clone_var_assert (agr.dict, agr.break_vars[i]);
248
249           /* BREAK must follow the options. */
250           break;
251         }
252       else
253         goto error;
254
255     }
256   if (presorted && saw_direction)
257     msg (SW, _("When PRESORTED is specified, specifying sorting directions "
258                "with (A) or (D) has no effect.  Output data will be sorted "
259                "the same way as the input data."));
260
261   /* Read in the aggregate functions. */
262   lex_match (lexer, T_SLASH);
263   if (!parse_aggregate_functions (lexer, dict, &agr))
264     goto error;
265
266   /* Delete documents. */
267   if (!copy_documents)
268     dict_clear_documents (agr.dict);
269
270   /* Cancel SPLIT FILE. */
271   dict_set_split_vars (agr.dict, NULL, 0);
272
273   /* Initialize. */
274   agr.case_cnt = 0;
275
276   if (out_file == NULL)
277     {
278       /* The active file will be replaced by the aggregated data,
279          so TEMPORARY is moot. */
280       proc_cancel_temporary_transformations (ds);
281       proc_discard_output (ds);
282       output = autopaging_writer_create (dict_get_proto (agr.dict));
283     }
284   else
285     {
286       output = any_writer_open (out_file, agr.dict);
287       if (output == NULL)
288         goto error;
289     }
290
291   input = proc_open (ds);
292   if (!subcase_is_empty (&agr.sort) && !presorted)
293     {
294       input = sort_execute (input, &agr.sort);
295       subcase_clear (&agr.sort);
296     }
297
298   for (grouper = casegrouper_create_vars (input, agr.break_vars,
299                                           agr.break_var_cnt);
300        casegrouper_get_next_group (grouper, &group);
301        casereader_destroy (group))
302     {
303       struct casereader *placeholder = NULL;
304       struct ccase *c = casereader_peek (group, 0);
305
306       if (c == NULL)
307         {
308           casereader_destroy (group);
309           continue;
310         }
311
312       initialize_aggregate_info (&agr);
313
314       if ( agr.add_variables )
315         placeholder = casereader_clone (group);
316
317       {
318         struct ccase *cg;
319         for (; (cg = casereader_read (group)) != NULL; case_unref (cg))
320           accumulate_aggregate_info (&agr, cg);
321       }
322
323
324       if  (agr.add_variables)
325         {
326           struct ccase *cg;
327           for (; (cg = casereader_read (placeholder)) != NULL; case_unref (cg))
328             dump_aggregate_info (&agr, output, cg);
329
330           casereader_destroy (placeholder);
331         }
332       else
333         {
334           dump_aggregate_info (&agr, output, c);
335           case_unref (c);
336         }
337     }
338   if (!casegrouper_destroy (grouper))
339     goto error;
340
341   if (!proc_commit (ds))
342     {
343       input = NULL;
344       goto error;
345     }
346   input = NULL;
347
348   if (out_file == NULL)
349     {
350       struct casereader *next_input = casewriter_make_reader (output);
351       if (next_input == NULL)
352         goto error;
353
354       proc_set_active_file (ds, next_input, agr.dict);
355       agr.dict = NULL;
356     }
357   else
358     {
359       ok = casewriter_destroy (output);
360       output = NULL;
361       if (!ok)
362         goto error;
363     }
364
365   agr_destroy (&agr);
366   fh_unref (out_file);
367   return CMD_SUCCESS;
368
369 error:
370   if (input != NULL)
371     proc_commit (ds);
372   casewriter_destroy (output);
373   agr_destroy (&agr);
374   fh_unref (out_file);
375   return CMD_CASCADING_FAILURE;
376 }
377
378 /* Parse all the aggregate functions. */
379 static bool
380 parse_aggregate_functions (struct lexer *lexer, const struct dictionary *dict,
381                            struct agr_proc *agr)
382 {
383   struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
384
385   /* Parse everything. */
386   tail = NULL;
387   for (;;)
388     {
389       char **dest;
390       char **dest_label;
391       size_t n_dest;
392       struct string function_name;
393
394       enum mv_class exclude;
395       const struct agr_func *function;
396       int func_index;
397
398       union agr_argument arg[2];
399
400       const struct variable **src;
401       size_t n_src;
402
403       size_t i;
404
405       dest = NULL;
406       dest_label = NULL;
407       n_dest = 0;
408       src = NULL;
409       function = NULL;
410       n_src = 0;
411       arg[0].c = NULL;
412       arg[1].c = NULL;
413       ds_init_empty (&function_name);
414
415       /* Parse the list of target variables. */
416       while (!lex_match (lexer, T_EQUALS))
417         {
418           size_t n_dest_prev = n_dest;
419
420           if (!parse_DATA_LIST_vars (lexer, dict, &dest, &n_dest,
421                                      (PV_APPEND | PV_SINGLE | PV_NO_SCRATCH
422                                       | PV_NO_DUPLICATE)))
423             goto error;
424
425           /* Assign empty labels. */
426           {
427             int j;
428
429             dest_label = xnrealloc (dest_label, n_dest, sizeof *dest_label);
430             for (j = n_dest_prev; j < n_dest; j++)
431               dest_label[j] = NULL;
432           }
433
434
435
436           if (lex_is_string (lexer))
437             {
438               dest_label[n_dest - 1] = xstrdup (lex_tokcstr (lexer));
439               lex_get (lexer);
440             }
441         }
442
443       /* Get the name of the aggregation function. */
444       if (lex_token (lexer) != T_ID)
445         {
446           lex_error (lexer, _("expecting aggregation function"));
447           goto error;
448         }
449
450       ds_assign_substring (&function_name, lex_tokss (lexer));
451       exclude = ds_chomp_byte (&function_name, '.') ? MV_SYSTEM : MV_ANY;
452
453       for (function = agr_func_tab; function->name; function++)
454         if (!strcasecmp (function->name, ds_cstr (&function_name)))
455           break;
456       if (NULL == function->name)
457         {
458           msg (SE, _("Unknown aggregation function %s."),
459                ds_cstr (&function_name));
460           goto error;
461         }
462       ds_destroy (&function_name);
463       func_index = function - agr_func_tab;
464       lex_get (lexer);
465
466       /* Check for leading lparen. */
467       if (!lex_match (lexer, T_LPAREN))
468         {
469           if (function->src_vars == AGR_SV_YES)
470             {
471               lex_force_match (lexer, T_LPAREN);
472               goto error;
473             }
474         }
475       else
476         {
477           /* Parse list of source variables. */
478           {
479             int pv_opts = PV_NO_SCRATCH;
480
481             if (func_index == SUM || func_index == MEAN || func_index == SD)
482               pv_opts |= PV_NUMERIC;
483             else if (function->n_args)
484               pv_opts |= PV_SAME_TYPE;
485
486             if (!parse_variables_const (lexer, dict, &src, &n_src, pv_opts))
487               goto error;
488           }
489
490           /* Parse function arguments, for those functions that
491              require arguments. */
492           if (function->n_args != 0)
493             for (i = 0; i < function->n_args; i++)
494               {
495                 int type;
496
497                 lex_match (lexer, T_COMMA);
498                 if (lex_is_string (lexer))
499                   {
500                     arg[i].c = recode_string (dict_get_encoding (agr->dict),
501                                               "UTF-8", lex_tokcstr (lexer),
502                                               -1);
503                     type = VAL_STRING;
504                   }
505                 else if (lex_is_number (lexer))
506                   {
507                     arg[i].f = lex_tokval (lexer);
508                     type = VAL_NUMERIC;
509                   }
510                 else
511                   {
512                     msg (SE, _("Missing argument %zu to %s."),
513                          i + 1, function->name);
514                     goto error;
515                   }
516
517                 lex_get (lexer);
518
519                 if (type != var_get_type (src[0]))
520                   {
521                     msg (SE, _("Arguments to %s must be of same type as "
522                                "source variables."),
523                          function->name);
524                     goto error;
525                   }
526               }
527
528           /* Trailing rparen. */
529           if (!lex_force_match (lexer, T_RPAREN))
530             goto error;
531
532           /* Now check that the number of source variables match
533              the number of target variables.  If we check earlier
534              than this, the user can get very misleading error
535              message, i.e. `AGGREGATE x=SUM(y t).' will get this
536              error message when a proper message would be more
537              like `unknown variable t'. */
538           if (n_src != n_dest)
539             {
540               msg (SE, _("Number of source variables (%zu) does not match "
541                          "number of target variables (%zu)."),
542                     n_src, n_dest);
543               goto error;
544             }
545
546           if ((func_index == PIN || func_index == POUT
547               || func_index == FIN || func_index == FOUT)
548               && (var_is_numeric (src[0])
549                   ? arg[0].f > arg[1].f
550                   : str_compare_rpad (arg[0].c, arg[1].c) > 0))
551             {
552               union agr_argument t = arg[0];
553               arg[0] = arg[1];
554               arg[1] = t;
555
556               msg (SW, _("The value arguments passed to the %s function "
557                          "are out-of-order.  They will be treated as if "
558                          "they had been specified in the correct order."),
559                    function->name);
560             }
561         }
562
563       /* Finally add these to the linked list of aggregation
564          variables. */
565       for (i = 0; i < n_dest; i++)
566         {
567           struct agr_var *v = xzalloc (sizeof *v);
568
569           /* Add variable to chain. */
570           if (agr->agr_vars != NULL)
571             tail->next = v;
572           else
573             agr->agr_vars = v;
574           tail = v;
575           tail->next = NULL;
576           v->moments = NULL;
577
578           /* Create the target variable in the aggregate
579              dictionary. */
580           {
581             struct variable *destvar;
582
583             v->function = func_index;
584
585             if (src)
586               {
587                 v->src = src[i];
588
589                 if (var_is_alpha (src[i]))
590                   {
591                     v->function |= FSTRING;
592                     v->string = xmalloc (var_get_width (src[i]));
593                   }
594
595                 if (function->alpha_type == VAL_STRING)
596                   destvar = dict_clone_var_as (agr->dict, v->src, dest[i]);
597                 else
598                   {
599                     assert (var_is_numeric (v->src)
600                             || function->alpha_type == VAL_NUMERIC);
601                     destvar = dict_create_var (agr->dict, dest[i], 0);
602                     if (destvar != NULL)
603                       {
604                         struct fmt_spec f;
605                         if ((func_index == N || func_index == NMISS)
606                             && dict_get_weight (dict) != NULL)
607                           f = fmt_for_output (FMT_F, 8, 2);
608                         else
609                           f = function->format;
610                         var_set_both_formats (destvar, &f);
611                       }
612                   }
613               } else {
614                 struct fmt_spec f;
615                 v->src = NULL;
616                 destvar = dict_create_var (agr->dict, dest[i], 0);
617                 if (destvar != NULL)
618                   {
619                     if ((func_index == N || func_index == NMISS)
620                         && dict_get_weight (dict) != NULL)
621                       f = fmt_for_output (FMT_F, 8, 2);
622                     else
623                       f = function->format;
624                     var_set_both_formats (destvar, &f);
625                   }
626             }
627
628             if (!destvar)
629               {
630                 msg (SE, _("Variable name %s is not unique within the "
631                            "aggregate file dictionary, which contains "
632                            "the aggregate variables and the break "
633                            "variables."),
634                      dest[i]);
635                 goto error;
636               }
637
638             free (dest[i]);
639             if (dest_label[i])
640               var_set_label (destvar, dest_label[i],
641                              dict_get_encoding (agr->dict), true);
642
643             v->dest = destvar;
644           }
645
646           v->exclude = exclude;
647
648           if (v->src != NULL)
649             {
650               int j;
651
652               if (var_is_numeric (v->src))
653                 for (j = 0; j < function->n_args; j++)
654                   v->arg[j].f = arg[j].f;
655               else
656                 for (j = 0; j < function->n_args; j++)
657                   v->arg[j].c = xstrdup (arg[j].c);
658             }
659         }
660
661       if (src != NULL && var_is_alpha (src[0]))
662         for (i = 0; i < function->n_args; i++)
663           {
664             free (arg[i].c);
665             arg[i].c = NULL;
666           }
667
668       free (src);
669       free (dest);
670       free (dest_label);
671
672       if (!lex_match (lexer, T_SLASH))
673         {
674           if (lex_token (lexer) == T_ENDCMD)
675             return true;
676
677           lex_error (lexer, "expecting end of command");
678           return false;
679         }
680       continue;
681
682     error:
683       ds_destroy (&function_name);
684       for (i = 0; i < n_dest; i++)
685         {
686           free (dest[i]);
687           free (dest_label[i]);
688         }
689       free (dest);
690       free (dest_label);
691       free (arg[0].c);
692       free (arg[1].c);
693       if (src && n_src && var_is_alpha (src[0]))
694         for (i = 0; i < function->n_args; i++)
695           {
696             free (arg[i].c);
697             arg[i].c = NULL;
698           }
699       free (src);
700
701       return false;
702     }
703 }
704
705 /* Destroys AGR. */
706 static void
707 agr_destroy (struct agr_proc *agr)
708 {
709   struct agr_var *iter, *next;
710
711   subcase_destroy (&agr->sort);
712   free (agr->break_vars);
713   for (iter = agr->agr_vars; iter; iter = next)
714     {
715       next = iter->next;
716
717       if (iter->function & FSTRING)
718         {
719           size_t n_args;
720           size_t i;
721
722           n_args = agr_func_tab[iter->function & FUNC].n_args;
723           for (i = 0; i < n_args; i++)
724             free (iter->arg[i].c);
725           free (iter->string);
726         }
727       else if (iter->function == SD)
728         moments1_destroy (iter->moments);
729
730       dict_destroy_internal_var (iter->subject);
731       dict_destroy_internal_var (iter->weight);
732
733       free (iter);
734     }
735   if (agr->dict != NULL)
736     dict_destroy (agr->dict);
737 }
738 \f
739 /* Execution. */
740
741 /* Accumulates aggregation data from the case INPUT. */
742 static void
743 accumulate_aggregate_info (struct agr_proc *agr, const struct ccase *input)
744 {
745   struct agr_var *iter;
746   double weight;
747   bool bad_warn = true;
748
749   weight = dict_get_case_weight (agr->src_dict, input, &bad_warn);
750
751   for (iter = agr->agr_vars; iter; iter = iter->next)
752     if (iter->src)
753       {
754         const union value *v = case_data (input, iter->src);
755         int src_width = var_get_width (iter->src);
756
757         if (var_is_value_missing (iter->src, v, iter->exclude))
758           {
759             switch (iter->function)
760               {
761               case NMISS:
762               case NMISS | FSTRING:
763                 iter->dbl[0] += weight;
764                 break;
765               case NUMISS:
766               case NUMISS | FSTRING:
767                 iter->int1++;
768                 break;
769               }
770             iter->saw_missing = true;
771             continue;
772           }
773
774         /* This is horrible.  There are too many possibilities. */
775         switch (iter->function)
776           {
777           case SUM:
778             iter->dbl[0] += v->f * weight;
779             iter->int1 = 1;
780             break;
781           case MEAN:
782             iter->dbl[0] += v->f * weight;
783             iter->dbl[1] += weight;
784             break;
785           case MEDIAN:
786             {
787               double wv ;
788               struct ccase *cout;
789
790               cout = case_create (casewriter_get_proto (iter->writer));
791
792               case_data_rw (cout, iter->subject)->f
793                 = case_data (input, iter->src)->f;
794
795               wv = dict_get_case_weight (agr->src_dict, input, NULL);
796
797               case_data_rw (cout, iter->weight)->f = wv;
798
799               iter->cc += wv;
800
801               casewriter_write (iter->writer, cout);
802             }
803             break;
804           case SD:
805             moments1_add (iter->moments, v->f, weight);
806             break;
807           case MAX:
808             iter->dbl[0] = MAX (iter->dbl[0], v->f);
809             iter->int1 = 1;
810             break;
811           case MAX | FSTRING:
812             /* Need to do some kind of Unicode collation thingy here */
813             if (memcmp (iter->string, value_str (v, src_width), src_width) < 0)
814               memcpy (iter->string, value_str (v, src_width), src_width);
815             iter->int1 = 1;
816             break;
817           case MIN:
818             iter->dbl[0] = MIN (iter->dbl[0], v->f);
819             iter->int1 = 1;
820             break;
821           case MIN | FSTRING:
822             if (memcmp (iter->string, value_str (v, src_width), src_width) > 0)
823               memcpy (iter->string, value_str (v, src_width), src_width);
824             iter->int1 = 1;
825             break;
826           case FGT:
827           case PGT:
828             if (v->f > iter->arg[0].f)
829               iter->dbl[0] += weight;
830             iter->dbl[1] += weight;
831             break;
832           case FGT | FSTRING:
833           case PGT | FSTRING:
834             if (memcmp (iter->arg[0].c,
835                         value_str (v, src_width), src_width) < 0)
836               iter->dbl[0] += weight;
837             iter->dbl[1] += weight;
838             break;
839           case FLT:
840           case PLT:
841             if (v->f < iter->arg[0].f)
842               iter->dbl[0] += weight;
843             iter->dbl[1] += weight;
844             break;
845           case FLT | FSTRING:
846           case PLT | FSTRING:
847             if (memcmp (iter->arg[0].c,
848                         value_str (v, src_width), src_width) > 0)
849               iter->dbl[0] += weight;
850             iter->dbl[1] += weight;
851             break;
852           case FIN:
853           case PIN:
854             if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
855               iter->dbl[0] += weight;
856             iter->dbl[1] += weight;
857             break;
858           case FIN | FSTRING:
859           case PIN | FSTRING:
860             if (memcmp (iter->arg[0].c,
861                         value_str (v, src_width), src_width) <= 0
862                 && memcmp (iter->arg[1].c,
863                            value_str (v, src_width), src_width) >= 0)
864               iter->dbl[0] += weight;
865             iter->dbl[1] += weight;
866             break;
867           case FOUT:
868           case POUT:
869             if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
870               iter->dbl[0] += weight;
871             iter->dbl[1] += weight;
872             break;
873           case FOUT | FSTRING:
874           case POUT | FSTRING:
875             if (memcmp (iter->arg[0].c,
876                         value_str (v, src_width), src_width) > 0
877                 || memcmp (iter->arg[1].c,
878                            value_str (v, src_width), src_width) < 0)
879               iter->dbl[0] += weight;
880             iter->dbl[1] += weight;
881             break;
882           case N:
883           case N | FSTRING:
884             iter->dbl[0] += weight;
885             break;
886           case NU:
887           case NU | FSTRING:
888             iter->int1++;
889             break;
890           case FIRST:
891             if (iter->int1 == 0)
892               {
893                 iter->dbl[0] = v->f;
894                 iter->int1 = 1;
895               }
896             break;
897           case FIRST | FSTRING:
898             if (iter->int1 == 0)
899               {
900                 memcpy (iter->string, value_str (v, src_width), src_width);
901                 iter->int1 = 1;
902               }
903             break;
904           case LAST:
905             iter->dbl[0] = v->f;
906             iter->int1 = 1;
907             break;
908           case LAST | FSTRING:
909             memcpy (iter->string, value_str (v, src_width), src_width);
910             iter->int1 = 1;
911             break;
912           case NMISS:
913           case NMISS | FSTRING:
914           case NUMISS:
915           case NUMISS | FSTRING:
916             /* Our value is not missing or it would have been
917                caught earlier.  Nothing to do. */
918             break;
919           default:
920             NOT_REACHED ();
921           }
922       } else {
923       switch (iter->function)
924         {
925         case N:
926           iter->dbl[0] += weight;
927           break;
928         case NU:
929           iter->int1++;
930           break;
931         default:
932           NOT_REACHED ();
933         }
934     }
935 }
936
937 /* Writes an aggregated record to OUTPUT. */
938 static void
939 dump_aggregate_info (const struct agr_proc *agr, struct casewriter *output, const struct ccase *break_case)
940 {
941   struct ccase *c = case_create (dict_get_proto (agr->dict));
942
943   if ( agr->add_variables)
944     {
945       case_copy (c, 0, break_case, 0, dict_get_var_cnt (agr->src_dict));
946     }
947   else
948     {
949       int value_idx = 0;
950       int i;
951
952       for (i = 0; i < agr->break_var_cnt; i++)
953         {
954           const struct variable *v = agr->break_vars[i];
955           value_copy (case_data_rw_idx (c, value_idx),
956                       case_data (break_case, v),
957                       var_get_width (v));
958           value_idx++;
959         }
960     }
961
962   {
963     struct agr_var *i;
964
965     for (i = agr->agr_vars; i; i = i->next)
966       {
967         union value *v = case_data_rw (c, i->dest);
968         int width = var_get_width (i->dest);
969
970         if (agr->missing == COLUMNWISE && i->saw_missing
971             && (i->function & FUNC) != N && (i->function & FUNC) != NU
972             && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
973           {
974             value_set_missing (v, width);
975             casewriter_destroy (i->writer);
976             continue;
977           }
978
979         switch (i->function)
980           {
981           case SUM:
982             v->f = i->int1 ? i->dbl[0] : SYSMIS;
983             break;
984           case MEAN:
985             v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
986             break;
987           case MEDIAN:
988             {
989               if ( i->writer)
990                 {
991                   struct percentile *median = percentile_create (0.5, i->cc);
992                   struct order_stats *os = &median->parent;
993                   struct casereader *sorted_reader = casewriter_make_reader (i->writer);
994                   i->writer = NULL;
995
996                   order_stats_accumulate (&os, 1,
997                                           sorted_reader,
998                                           i->weight,
999                                           i->subject,
1000                                           i->exclude);
1001                   i->dbl[0] = percentile_calculate (median, PC_HAVERAGE);
1002                   statistic_destroy (&median->parent.parent);
1003                 }
1004               v->f = i->dbl[0];
1005             }
1006             break;
1007           case SD:
1008             {
1009               double variance;
1010
1011               /* FIXME: we should use two passes. */
1012               moments1_calculate (i->moments, NULL, NULL, &variance,
1013                                  NULL, NULL);
1014               if (variance != SYSMIS)
1015                 v->f = sqrt (variance);
1016               else
1017                 v->f = SYSMIS;
1018             }
1019             break;
1020           case MAX:
1021           case MIN:
1022             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1023             break;
1024           case MAX | FSTRING:
1025           case MIN | FSTRING:
1026             if (i->int1)
1027               memcpy (value_str_rw (v, width), i->string, width);
1028             else
1029               value_set_missing (v, width);
1030             break;
1031           case FGT:
1032           case FGT | FSTRING:
1033           case FLT:
1034           case FLT | FSTRING:
1035           case FIN:
1036           case FIN | FSTRING:
1037           case FOUT:
1038           case FOUT | FSTRING:
1039             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
1040             break;
1041           case PGT:
1042           case PGT | FSTRING:
1043           case PLT:
1044           case PLT | FSTRING:
1045           case PIN:
1046           case PIN | FSTRING:
1047           case POUT:
1048           case POUT | FSTRING:
1049             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
1050             break;
1051           case N:
1052           case N | FSTRING:
1053               v->f = i->dbl[0];
1054             break;
1055           case NU:
1056           case NU | FSTRING:
1057             v->f = i->int1;
1058             break;
1059           case FIRST:
1060           case LAST:
1061             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1062             break;
1063           case FIRST | FSTRING:
1064           case LAST | FSTRING:
1065             if (i->int1)
1066               memcpy (value_str_rw (v, width), i->string, width);
1067             else
1068               value_set_missing (v, width);
1069             break;
1070           case NMISS:
1071           case NMISS | FSTRING:
1072             v->f = i->dbl[0];
1073             break;
1074           case NUMISS:
1075           case NUMISS | FSTRING:
1076             v->f = i->int1;
1077             break;
1078           default:
1079             NOT_REACHED ();
1080           }
1081       }
1082   }
1083
1084   casewriter_write (output, c);
1085 }
1086
1087 /* Resets the state for all the aggregate functions. */
1088 static void
1089 initialize_aggregate_info (struct agr_proc *agr)
1090 {
1091   struct agr_var *iter;
1092
1093   for (iter = agr->agr_vars; iter; iter = iter->next)
1094     {
1095       iter->saw_missing = false;
1096       iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1097       iter->int1 = iter->int2 = 0;
1098       switch (iter->function)
1099         {
1100         case MIN:
1101           iter->dbl[0] = DBL_MAX;
1102           break;
1103         case MIN | FSTRING:
1104           memset (iter->string, 255, var_get_width (iter->src));
1105           break;
1106         case MAX:
1107           iter->dbl[0] = -DBL_MAX;
1108           break;
1109         case MAX | FSTRING:
1110           memset (iter->string, 0, var_get_width (iter->src));
1111           break;
1112         case MEDIAN:
1113           {
1114             struct caseproto *proto;
1115             struct subcase ordering;
1116
1117             proto = caseproto_create ();
1118             proto = caseproto_add_width (proto, 0);
1119             proto = caseproto_add_width (proto, 0);
1120
1121             if ( ! iter->subject)
1122               iter->subject = dict_create_internal_var (0, 0);
1123
1124             if ( ! iter->weight)
1125               iter->weight = dict_create_internal_var (1, 0);
1126
1127             subcase_init_var (&ordering, iter->subject, SC_ASCEND);
1128             iter->writer = sort_create_writer (&ordering, proto);
1129             subcase_destroy (&ordering);
1130             caseproto_unref (proto);
1131
1132             iter->cc = 0;
1133           }
1134           break;
1135         case SD:
1136           if (iter->moments == NULL)
1137             iter->moments = moments1_create (MOMENT_VARIANCE);
1138           else
1139             moments1_clear (iter->moments);
1140           break;
1141         default:
1142           break;
1143         }
1144     }
1145 }