Change terminology from "active file" to "active dataset".
[pspp-builds.git] / src / language / stats / aggregate.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2008, 2009, 2010, 2011 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include "language/stats/aggregate.h"
20
21 #include <stdlib.h>
22
23 #include "data/any-writer.h"
24 #include "data/case.h"
25 #include "data/casegrouper.h"
26 #include "data/casereader.h"
27 #include "data/casewriter.h"
28 #include "data/dataset.h"
29 #include "data/dictionary.h"
30 #include "data/file-handle-def.h"
31 #include "data/format.h"
32 #include "data/settings.h"
33 #include "data/subcase.h"
34 #include "data/sys-file-writer.h"
35 #include "data/variable.h"
36 #include "language/command.h"
37 #include "language/data-io/file-handle.h"
38 #include "language/lexer/lexer.h"
39 #include "language/lexer/variable-parser.h"
40 #include "language/stats/sort-criteria.h"
41 #include "libpspp/assertion.h"
42 #include "libpspp/i18n.h"
43 #include "libpspp/message.h"
44 #include "libpspp/misc.h"
45 #include "libpspp/pool.h"
46 #include "libpspp/str.h"
47 #include "math/moments.h"
48 #include "math/percentiles.h"
49 #include "math/sort.h"
50 #include "math/statistic.h"
51
52 #include "gl/minmax.h"
53 #include "gl/xalloc.h"
54
55 #include "gettext.h"
56 #define _(msgid) gettext (msgid)
57 #define N_(msgid) msgid
58
59 /* Argument for AGGREGATE function. */
60 union agr_argument
61   {
62     double f;                           /* Numeric. */
63     char *c;                            /* Short or long string. */
64   };
65
66 /* Specifies how to make an aggregate variable. */
67 struct agr_var
68   {
69     struct agr_var *next;               /* Next in list. */
70
71     /* Collected during parsing. */
72     const struct variable *src; /* Source variable. */
73     struct variable *dest;      /* Target variable. */
74     int function;               /* Function. */
75     enum mv_class exclude;      /* Classes of missing values to exclude. */
76     union agr_argument arg[2];  /* Arguments. */
77
78     /* Accumulated during AGGREGATE execution. */
79     double dbl[3];
80     int int1, int2;
81     char *string;
82     bool saw_missing;
83     struct moments1 *moments;
84     double cc;
85
86     struct variable *subject;
87     struct variable *weight;
88     struct casewriter *writer;
89   };
90
91
92 /* Attributes of aggregation functions. */
93 const struct agr_func agr_func_tab[] =
94   {
95     {"SUM",     N_("Sum of values"),                         AGR_SV_YES, 0, -1,          {FMT_F, 8, 2}},
96     {"MEAN",    N_("Mean average"),                          AGR_SV_YES, 0, -1,          {FMT_F, 8, 2}},
97     {"MEDIAN",  N_("Median average"),                        AGR_SV_YES, 0, -1,          {FMT_F, 8, 2}},
98     {"SD",      N_("Standard deviation"),                    AGR_SV_YES, 0, -1,          {FMT_F, 8, 2}},
99     {"MAX",     N_("Maximum value"),                         AGR_SV_YES, 0, VAL_STRING,  {-1, -1, -1}},
100     {"MIN",     N_("Minimum value"),                         AGR_SV_YES, 0, VAL_STRING,  {-1, -1, -1}},
101     {"PGT",     N_("Percentage greater than"),               AGR_SV_YES, 1, VAL_NUMERIC, {FMT_F, 5, 1}},
102     {"PLT",     N_("Percentage less than"),                  AGR_SV_YES, 1, VAL_NUMERIC, {FMT_F, 5, 1}},
103     {"PIN",     N_("Percentage included in range"),          AGR_SV_YES, 2, VAL_NUMERIC, {FMT_F, 5, 1}},
104     {"POUT",    N_("Percentage excluded from range"),        AGR_SV_YES, 2, VAL_NUMERIC, {FMT_F, 5, 1}},
105     {"FGT",     N_("Fraction greater than"),                 AGR_SV_YES, 1, VAL_NUMERIC, {FMT_F, 5, 3}},
106     {"FLT",     N_("Fraction less than"),                    AGR_SV_YES, 1, VAL_NUMERIC, {FMT_F, 5, 3}},
107     {"FIN",     N_("Fraction included in range"),            AGR_SV_YES, 2, VAL_NUMERIC, {FMT_F, 5, 3}},
108     {"FOUT",    N_("Fraction excluded from range"),          AGR_SV_YES, 2, VAL_NUMERIC, {FMT_F, 5, 3}},
109     {"N",       N_("Number of cases"),                       AGR_SV_NO,  0, VAL_NUMERIC, {FMT_F, 7, 0}},
110     {"NU",      N_("Number of cases (unweighted)"),          AGR_SV_OPT, 0, VAL_NUMERIC, {FMT_F, 7, 0}},
111     {"NMISS",   N_("Number of missing values"),              AGR_SV_YES, 0, VAL_NUMERIC, {FMT_F, 7, 0}},
112     {"NUMISS",  N_("Number of missing values (unweighted)"), AGR_SV_YES, 0, VAL_NUMERIC, {FMT_F, 7, 0}},
113     {"FIRST",   N_("First non-missing value"),               AGR_SV_YES, 0, VAL_STRING,  {-1, -1, -1}},
114     {"LAST",    N_("Last non-missing value"),                AGR_SV_YES, 0, VAL_STRING,  {-1, -1, -1}},
115     {NULL,      NULL,                                        AGR_SV_NO,  0, -1,          {-1, -1, -1}},
116   };
117
118 /* Missing value types. */
119 enum missing_treatment
120   {
121     ITEMWISE,           /* Missing values item by item. */
122     COLUMNWISE          /* Missing values column by column. */
123   };
124
125 /* An entire AGGREGATE procedure. */
126 struct agr_proc
127   {
128     /* Break variables. */
129     struct subcase sort;                /* Sort criteria (break variables). */
130     const struct variable **break_vars;       /* Break variables. */
131     size_t break_var_cnt;               /* Number of break variables. */
132
133     enum missing_treatment missing;     /* How to treat missing values. */
134     struct agr_var *agr_vars;           /* First aggregate variable. */
135     struct dictionary *dict;            /* Aggregate dictionary. */
136     const struct dictionary *src_dict;  /* Dict of the source */
137     int case_cnt;                       /* Counts aggregated cases. */
138
139     bool add_variables;                 /* True iff the aggregated variables should
140                                            be appended to the existing dictionary */
141   };
142
143 static void initialize_aggregate_info (struct agr_proc *);
144
145 static void accumulate_aggregate_info (struct agr_proc *,
146                                        const struct ccase *);
147 /* Prototypes. */
148 static bool parse_aggregate_functions (struct lexer *, const struct dictionary *,
149                                        struct agr_proc *);
150 static void agr_destroy (struct agr_proc *);
151 static void dump_aggregate_info (const struct agr_proc *agr,
152                                  struct casewriter *output,
153                                  const struct ccase *break_case);
154 \f
155 /* Parsing. */
156
157 /* Parses and executes the AGGREGATE procedure. */
158 int
159 cmd_aggregate (struct lexer *lexer, struct dataset *ds)
160 {
161   struct dictionary *dict = dataset_dict (ds);
162   struct agr_proc agr;
163   struct file_handle *out_file = NULL;
164   struct casereader *input = NULL, *group;
165   struct casegrouper *grouper;
166   struct casewriter *output = NULL;
167
168   bool copy_documents = false;
169   bool presorted = false;
170   bool saw_direction;
171   bool ok;
172
173   memset(&agr, 0 , sizeof (agr));
174   agr.missing = ITEMWISE;
175   agr.src_dict = dict;
176   subcase_init_empty (&agr.sort);
177
178   /* OUTFILE subcommand must be first. */
179   lex_match (lexer, T_SLASH);
180   if (!lex_force_match_id (lexer, "OUTFILE"))
181     goto error;
182   lex_match (lexer, T_EQUALS);
183   if (!lex_match (lexer, T_ASTERISK))
184     {
185       out_file = fh_parse (lexer, FH_REF_FILE | FH_REF_SCRATCH);
186       if (out_file == NULL)
187         goto error;
188     }
189
190   if (out_file == NULL && lex_match_id (lexer, "MODE"))
191     {
192       lex_match (lexer, T_EQUALS);
193       if (lex_match_id (lexer, "ADDVARIABLES"))
194         {
195           agr.add_variables = true;
196
197           /* presorted is assumed in ADDVARIABLES mode */
198           presorted = true;
199         }
200       else if (lex_match_id (lexer, "REPLACE"))
201         {
202           agr.add_variables = false;
203         }
204       else
205         goto error;
206     }
207
208   if ( agr.add_variables )
209     agr.dict = dict_clone (dict);
210   else
211     agr.dict = dict_create ();    
212
213   dict_set_label (agr.dict, dict_get_label (dict));
214   dict_set_documents (agr.dict, dict_get_documents (dict));
215
216   /* Read most of the subcommands. */
217   for (;;)
218     {
219       lex_match (lexer, T_SLASH);
220
221       if (lex_match_id (lexer, "MISSING"))
222         {
223           lex_match (lexer, T_EQUALS);
224           if (!lex_match_id (lexer, "COLUMNWISE"))
225             {
226               lex_error (lexer, _("expecting %s"), "COLUMNWISE");
227               goto error;
228             }
229           agr.missing = COLUMNWISE;
230         }
231       else if (lex_match_id (lexer, "DOCUMENT"))
232         copy_documents = true;
233       else if (lex_match_id (lexer, "PRESORTED"))
234         presorted = true;
235       else if (lex_force_match_id (lexer, "BREAK"))
236         {
237           int i;
238
239           lex_match (lexer, T_EQUALS);
240           if (!parse_sort_criteria (lexer, dict, &agr.sort, &agr.break_vars,
241                                     &saw_direction))
242             goto error;
243           agr.break_var_cnt = subcase_get_n_fields (&agr.sort);
244
245           if  (! agr.add_variables)
246             for (i = 0; i < agr.break_var_cnt; i++)
247               dict_clone_var_assert (agr.dict, agr.break_vars[i]);
248
249           /* BREAK must follow the options. */
250           break;
251         }
252       else
253         goto error;
254
255     }
256   if (presorted && saw_direction)
257     msg (SW, _("When PRESORTED is specified, specifying sorting directions "
258                "with (A) or (D) has no effect.  Output data will be sorted "
259                "the same way as the input data."));
260
261   /* Read in the aggregate functions. */
262   lex_match (lexer, T_SLASH);
263   if (!parse_aggregate_functions (lexer, dict, &agr))
264     goto error;
265
266   /* Delete documents. */
267   if (!copy_documents)
268     dict_clear_documents (agr.dict);
269
270   /* Cancel SPLIT FILE. */
271   dict_set_split_vars (agr.dict, NULL, 0);
272
273   /* Initialize. */
274   agr.case_cnt = 0;
275
276   if (out_file == NULL)
277     {
278       /* The active dataset will be replaced by the aggregated data,
279          so TEMPORARY is moot. */
280       proc_cancel_temporary_transformations (ds);
281       proc_discard_output (ds);
282       output = autopaging_writer_create (dict_get_proto (agr.dict));
283     }
284   else
285     {
286       output = any_writer_open (out_file, agr.dict);
287       if (output == NULL)
288         goto error;
289     }
290
291   input = proc_open (ds);
292   if (!subcase_is_empty (&agr.sort) && !presorted)
293     {
294       input = sort_execute (input, &agr.sort);
295       subcase_clear (&agr.sort);
296     }
297
298   for (grouper = casegrouper_create_vars (input, agr.break_vars,
299                                           agr.break_var_cnt);
300        casegrouper_get_next_group (grouper, &group);
301        casereader_destroy (group))
302     {
303       struct casereader *placeholder = NULL;
304       struct ccase *c = casereader_peek (group, 0);
305
306       if (c == NULL)
307         {
308           casereader_destroy (group);
309           continue;
310         }
311
312       initialize_aggregate_info (&agr);
313
314       if ( agr.add_variables )
315         placeholder = casereader_clone (group);
316
317       {
318         struct ccase *cg;
319         for (; (cg = casereader_read (group)) != NULL; case_unref (cg))
320           accumulate_aggregate_info (&agr, cg);
321       }
322
323
324       if  (agr.add_variables)
325         {
326           struct ccase *cg;
327           for (; (cg = casereader_read (placeholder)) != NULL; case_unref (cg))
328             dump_aggregate_info (&agr, output, cg);
329
330           casereader_destroy (placeholder);
331         }
332       else
333         {
334           dump_aggregate_info (&agr, output, c);
335           case_unref (c);
336         }
337     }
338   if (!casegrouper_destroy (grouper))
339     goto error;
340
341   if (!proc_commit (ds))
342     {
343       input = NULL;
344       goto error;
345     }
346   input = NULL;
347
348   if (out_file == NULL)
349     {
350       struct casereader *next_input = casewriter_make_reader (output);
351       if (next_input == NULL)
352         goto error;
353
354       dataset_set_dict (ds, agr.dict);
355       dataset_set_source (ds, next_input);
356       agr.dict = NULL;
357     }
358   else
359     {
360       ok = casewriter_destroy (output);
361       output = NULL;
362       if (!ok)
363         goto error;
364     }
365
366   agr_destroy (&agr);
367   fh_unref (out_file);
368   return CMD_SUCCESS;
369
370 error:
371   if (input != NULL)
372     proc_commit (ds);
373   casewriter_destroy (output);
374   agr_destroy (&agr);
375   fh_unref (out_file);
376   return CMD_CASCADING_FAILURE;
377 }
378
379 /* Parse all the aggregate functions. */
380 static bool
381 parse_aggregate_functions (struct lexer *lexer, const struct dictionary *dict,
382                            struct agr_proc *agr)
383 {
384   struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
385
386   /* Parse everything. */
387   tail = NULL;
388   for (;;)
389     {
390       char **dest;
391       char **dest_label;
392       size_t n_dest;
393       struct string function_name;
394
395       enum mv_class exclude;
396       const struct agr_func *function;
397       int func_index;
398
399       union agr_argument arg[2];
400
401       const struct variable **src;
402       size_t n_src;
403
404       size_t i;
405
406       dest = NULL;
407       dest_label = NULL;
408       n_dest = 0;
409       src = NULL;
410       function = NULL;
411       n_src = 0;
412       arg[0].c = NULL;
413       arg[1].c = NULL;
414       ds_init_empty (&function_name);
415
416       /* Parse the list of target variables. */
417       while (!lex_match (lexer, T_EQUALS))
418         {
419           size_t n_dest_prev = n_dest;
420
421           if (!parse_DATA_LIST_vars (lexer, dict, &dest, &n_dest,
422                                      (PV_APPEND | PV_SINGLE | PV_NO_SCRATCH
423                                       | PV_NO_DUPLICATE)))
424             goto error;
425
426           /* Assign empty labels. */
427           {
428             int j;
429
430             dest_label = xnrealloc (dest_label, n_dest, sizeof *dest_label);
431             for (j = n_dest_prev; j < n_dest; j++)
432               dest_label[j] = NULL;
433           }
434
435
436
437           if (lex_is_string (lexer))
438             {
439               dest_label[n_dest - 1] = xstrdup (lex_tokcstr (lexer));
440               lex_get (lexer);
441             }
442         }
443
444       /* Get the name of the aggregation function. */
445       if (lex_token (lexer) != T_ID)
446         {
447           lex_error (lexer, _("expecting aggregation function"));
448           goto error;
449         }
450
451       ds_assign_substring (&function_name, lex_tokss (lexer));
452       exclude = ds_chomp_byte (&function_name, '.') ? MV_SYSTEM : MV_ANY;
453
454       for (function = agr_func_tab; function->name; function++)
455         if (!strcasecmp (function->name, ds_cstr (&function_name)))
456           break;
457       if (NULL == function->name)
458         {
459           msg (SE, _("Unknown aggregation function %s."),
460                ds_cstr (&function_name));
461           goto error;
462         }
463       ds_destroy (&function_name);
464       func_index = function - agr_func_tab;
465       lex_get (lexer);
466
467       /* Check for leading lparen. */
468       if (!lex_match (lexer, T_LPAREN))
469         {
470           if (function->src_vars == AGR_SV_YES)
471             {
472               lex_force_match (lexer, T_LPAREN);
473               goto error;
474             }
475         }
476       else
477         {
478           /* Parse list of source variables. */
479           {
480             int pv_opts = PV_NO_SCRATCH;
481
482             if (func_index == SUM || func_index == MEAN || func_index == SD)
483               pv_opts |= PV_NUMERIC;
484             else if (function->n_args)
485               pv_opts |= PV_SAME_TYPE;
486
487             if (!parse_variables_const (lexer, dict, &src, &n_src, pv_opts))
488               goto error;
489           }
490
491           /* Parse function arguments, for those functions that
492              require arguments. */
493           if (function->n_args != 0)
494             for (i = 0; i < function->n_args; i++)
495               {
496                 int type;
497
498                 lex_match (lexer, T_COMMA);
499                 if (lex_is_string (lexer))
500                   {
501                     arg[i].c = recode_string (dict_get_encoding (agr->dict),
502                                               "UTF-8", lex_tokcstr (lexer),
503                                               -1);
504                     type = VAL_STRING;
505                   }
506                 else if (lex_is_number (lexer))
507                   {
508                     arg[i].f = lex_tokval (lexer);
509                     type = VAL_NUMERIC;
510                   }
511                 else
512                   {
513                     msg (SE, _("Missing argument %zu to %s."),
514                          i + 1, function->name);
515                     goto error;
516                   }
517
518                 lex_get (lexer);
519
520                 if (type != var_get_type (src[0]))
521                   {
522                     msg (SE, _("Arguments to %s must be of same type as "
523                                "source variables."),
524                          function->name);
525                     goto error;
526                   }
527               }
528
529           /* Trailing rparen. */
530           if (!lex_force_match (lexer, T_RPAREN))
531             goto error;
532
533           /* Now check that the number of source variables match
534              the number of target variables.  If we check earlier
535              than this, the user can get very misleading error
536              message, i.e. `AGGREGATE x=SUM(y t).' will get this
537              error message when a proper message would be more
538              like `unknown variable t'. */
539           if (n_src != n_dest)
540             {
541               msg (SE, _("Number of source variables (%zu) does not match "
542                          "number of target variables (%zu)."),
543                     n_src, n_dest);
544               goto error;
545             }
546
547           if ((func_index == PIN || func_index == POUT
548               || func_index == FIN || func_index == FOUT)
549               && (var_is_numeric (src[0])
550                   ? arg[0].f > arg[1].f
551                   : str_compare_rpad (arg[0].c, arg[1].c) > 0))
552             {
553               union agr_argument t = arg[0];
554               arg[0] = arg[1];
555               arg[1] = t;
556
557               msg (SW, _("The value arguments passed to the %s function "
558                          "are out-of-order.  They will be treated as if "
559                          "they had been specified in the correct order."),
560                    function->name);
561             }
562         }
563
564       /* Finally add these to the linked list of aggregation
565          variables. */
566       for (i = 0; i < n_dest; i++)
567         {
568           struct agr_var *v = xzalloc (sizeof *v);
569
570           /* Add variable to chain. */
571           if (agr->agr_vars != NULL)
572             tail->next = v;
573           else
574             agr->agr_vars = v;
575           tail = v;
576           tail->next = NULL;
577           v->moments = NULL;
578
579           /* Create the target variable in the aggregate
580              dictionary. */
581           {
582             struct variable *destvar;
583
584             v->function = func_index;
585
586             if (src)
587               {
588                 v->src = src[i];
589
590                 if (var_is_alpha (src[i]))
591                   {
592                     v->function |= FSTRING;
593                     v->string = xmalloc (var_get_width (src[i]));
594                   }
595
596                 if (function->alpha_type == VAL_STRING)
597                   destvar = dict_clone_var_as (agr->dict, v->src, dest[i]);
598                 else
599                   {
600                     assert (var_is_numeric (v->src)
601                             || function->alpha_type == VAL_NUMERIC);
602                     destvar = dict_create_var (agr->dict, dest[i], 0);
603                     if (destvar != NULL)
604                       {
605                         struct fmt_spec f;
606                         if ((func_index == N || func_index == NMISS)
607                             && dict_get_weight (dict) != NULL)
608                           f = fmt_for_output (FMT_F, 8, 2);
609                         else
610                           f = function->format;
611                         var_set_both_formats (destvar, &f);
612                       }
613                   }
614               } else {
615                 struct fmt_spec f;
616                 v->src = NULL;
617                 destvar = dict_create_var (agr->dict, dest[i], 0);
618                 if (destvar != NULL)
619                   {
620                     if ((func_index == N || func_index == NMISS)
621                         && dict_get_weight (dict) != NULL)
622                       f = fmt_for_output (FMT_F, 8, 2);
623                     else
624                       f = function->format;
625                     var_set_both_formats (destvar, &f);
626                   }
627             }
628
629             if (!destvar)
630               {
631                 msg (SE, _("Variable name %s is not unique within the "
632                            "aggregate file dictionary, which contains "
633                            "the aggregate variables and the break "
634                            "variables."),
635                      dest[i]);
636                 goto error;
637               }
638
639             free (dest[i]);
640             if (dest_label[i])
641               var_set_label (destvar, dest_label[i],
642                              dict_get_encoding (agr->dict), true);
643
644             v->dest = destvar;
645           }
646
647           v->exclude = exclude;
648
649           if (v->src != NULL)
650             {
651               int j;
652
653               if (var_is_numeric (v->src))
654                 for (j = 0; j < function->n_args; j++)
655                   v->arg[j].f = arg[j].f;
656               else
657                 for (j = 0; j < function->n_args; j++)
658                   v->arg[j].c = xstrdup (arg[j].c);
659             }
660         }
661
662       if (src != NULL && var_is_alpha (src[0]))
663         for (i = 0; i < function->n_args; i++)
664           {
665             free (arg[i].c);
666             arg[i].c = NULL;
667           }
668
669       free (src);
670       free (dest);
671       free (dest_label);
672
673       if (!lex_match (lexer, T_SLASH))
674         {
675           if (lex_token (lexer) == T_ENDCMD)
676             return true;
677
678           lex_error (lexer, "expecting end of command");
679           return false;
680         }
681       continue;
682
683     error:
684       ds_destroy (&function_name);
685       for (i = 0; i < n_dest; i++)
686         {
687           free (dest[i]);
688           free (dest_label[i]);
689         }
690       free (dest);
691       free (dest_label);
692       free (arg[0].c);
693       free (arg[1].c);
694       if (src && n_src && var_is_alpha (src[0]))
695         for (i = 0; i < function->n_args; i++)
696           {
697             free (arg[i].c);
698             arg[i].c = NULL;
699           }
700       free (src);
701
702       return false;
703     }
704 }
705
706 /* Destroys AGR. */
707 static void
708 agr_destroy (struct agr_proc *agr)
709 {
710   struct agr_var *iter, *next;
711
712   subcase_destroy (&agr->sort);
713   free (agr->break_vars);
714   for (iter = agr->agr_vars; iter; iter = next)
715     {
716       next = iter->next;
717
718       if (iter->function & FSTRING)
719         {
720           size_t n_args;
721           size_t i;
722
723           n_args = agr_func_tab[iter->function & FUNC].n_args;
724           for (i = 0; i < n_args; i++)
725             free (iter->arg[i].c);
726           free (iter->string);
727         }
728       else if (iter->function == SD)
729         moments1_destroy (iter->moments);
730
731       dict_destroy_internal_var (iter->subject);
732       dict_destroy_internal_var (iter->weight);
733
734       free (iter);
735     }
736   if (agr->dict != NULL)
737     dict_destroy (agr->dict);
738 }
739 \f
740 /* Execution. */
741
742 /* Accumulates aggregation data from the case INPUT. */
743 static void
744 accumulate_aggregate_info (struct agr_proc *agr, const struct ccase *input)
745 {
746   struct agr_var *iter;
747   double weight;
748   bool bad_warn = true;
749
750   weight = dict_get_case_weight (agr->src_dict, input, &bad_warn);
751
752   for (iter = agr->agr_vars; iter; iter = iter->next)
753     if (iter->src)
754       {
755         const union value *v = case_data (input, iter->src);
756         int src_width = var_get_width (iter->src);
757
758         if (var_is_value_missing (iter->src, v, iter->exclude))
759           {
760             switch (iter->function)
761               {
762               case NMISS:
763               case NMISS | FSTRING:
764                 iter->dbl[0] += weight;
765                 break;
766               case NUMISS:
767               case NUMISS | FSTRING:
768                 iter->int1++;
769                 break;
770               }
771             iter->saw_missing = true;
772             continue;
773           }
774
775         /* This is horrible.  There are too many possibilities. */
776         switch (iter->function)
777           {
778           case SUM:
779             iter->dbl[0] += v->f * weight;
780             iter->int1 = 1;
781             break;
782           case MEAN:
783             iter->dbl[0] += v->f * weight;
784             iter->dbl[1] += weight;
785             break;
786           case MEDIAN:
787             {
788               double wv ;
789               struct ccase *cout;
790
791               cout = case_create (casewriter_get_proto (iter->writer));
792
793               case_data_rw (cout, iter->subject)->f
794                 = case_data (input, iter->src)->f;
795
796               wv = dict_get_case_weight (agr->src_dict, input, NULL);
797
798               case_data_rw (cout, iter->weight)->f = wv;
799
800               iter->cc += wv;
801
802               casewriter_write (iter->writer, cout);
803             }
804             break;
805           case SD:
806             moments1_add (iter->moments, v->f, weight);
807             break;
808           case MAX:
809             iter->dbl[0] = MAX (iter->dbl[0], v->f);
810             iter->int1 = 1;
811             break;
812           case MAX | FSTRING:
813             /* Need to do some kind of Unicode collation thingy here */
814             if (memcmp (iter->string, value_str (v, src_width), src_width) < 0)
815               memcpy (iter->string, value_str (v, src_width), src_width);
816             iter->int1 = 1;
817             break;
818           case MIN:
819             iter->dbl[0] = MIN (iter->dbl[0], v->f);
820             iter->int1 = 1;
821             break;
822           case MIN | FSTRING:
823             if (memcmp (iter->string, value_str (v, src_width), src_width) > 0)
824               memcpy (iter->string, value_str (v, src_width), src_width);
825             iter->int1 = 1;
826             break;
827           case FGT:
828           case PGT:
829             if (v->f > iter->arg[0].f)
830               iter->dbl[0] += weight;
831             iter->dbl[1] += weight;
832             break;
833           case FGT | FSTRING:
834           case PGT | FSTRING:
835             if (memcmp (iter->arg[0].c,
836                         value_str (v, src_width), src_width) < 0)
837               iter->dbl[0] += weight;
838             iter->dbl[1] += weight;
839             break;
840           case FLT:
841           case PLT:
842             if (v->f < iter->arg[0].f)
843               iter->dbl[0] += weight;
844             iter->dbl[1] += weight;
845             break;
846           case FLT | FSTRING:
847           case PLT | FSTRING:
848             if (memcmp (iter->arg[0].c,
849                         value_str (v, src_width), src_width) > 0)
850               iter->dbl[0] += weight;
851             iter->dbl[1] += weight;
852             break;
853           case FIN:
854           case PIN:
855             if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
856               iter->dbl[0] += weight;
857             iter->dbl[1] += weight;
858             break;
859           case FIN | FSTRING:
860           case PIN | FSTRING:
861             if (memcmp (iter->arg[0].c,
862                         value_str (v, src_width), src_width) <= 0
863                 && memcmp (iter->arg[1].c,
864                            value_str (v, src_width), src_width) >= 0)
865               iter->dbl[0] += weight;
866             iter->dbl[1] += weight;
867             break;
868           case FOUT:
869           case POUT:
870             if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
871               iter->dbl[0] += weight;
872             iter->dbl[1] += weight;
873             break;
874           case FOUT | FSTRING:
875           case POUT | FSTRING:
876             if (memcmp (iter->arg[0].c,
877                         value_str (v, src_width), src_width) > 0
878                 || memcmp (iter->arg[1].c,
879                            value_str (v, src_width), src_width) < 0)
880               iter->dbl[0] += weight;
881             iter->dbl[1] += weight;
882             break;
883           case N:
884           case N | FSTRING:
885             iter->dbl[0] += weight;
886             break;
887           case NU:
888           case NU | FSTRING:
889             iter->int1++;
890             break;
891           case FIRST:
892             if (iter->int1 == 0)
893               {
894                 iter->dbl[0] = v->f;
895                 iter->int1 = 1;
896               }
897             break;
898           case FIRST | FSTRING:
899             if (iter->int1 == 0)
900               {
901                 memcpy (iter->string, value_str (v, src_width), src_width);
902                 iter->int1 = 1;
903               }
904             break;
905           case LAST:
906             iter->dbl[0] = v->f;
907             iter->int1 = 1;
908             break;
909           case LAST | FSTRING:
910             memcpy (iter->string, value_str (v, src_width), src_width);
911             iter->int1 = 1;
912             break;
913           case NMISS:
914           case NMISS | FSTRING:
915           case NUMISS:
916           case NUMISS | FSTRING:
917             /* Our value is not missing or it would have been
918                caught earlier.  Nothing to do. */
919             break;
920           default:
921             NOT_REACHED ();
922           }
923       } else {
924       switch (iter->function)
925         {
926         case N:
927           iter->dbl[0] += weight;
928           break;
929         case NU:
930           iter->int1++;
931           break;
932         default:
933           NOT_REACHED ();
934         }
935     }
936 }
937
938 /* Writes an aggregated record to OUTPUT. */
939 static void
940 dump_aggregate_info (const struct agr_proc *agr, struct casewriter *output, const struct ccase *break_case)
941 {
942   struct ccase *c = case_create (dict_get_proto (agr->dict));
943
944   if ( agr->add_variables)
945     {
946       case_copy (c, 0, break_case, 0, dict_get_var_cnt (agr->src_dict));
947     }
948   else
949     {
950       int value_idx = 0;
951       int i;
952
953       for (i = 0; i < agr->break_var_cnt; i++)
954         {
955           const struct variable *v = agr->break_vars[i];
956           value_copy (case_data_rw_idx (c, value_idx),
957                       case_data (break_case, v),
958                       var_get_width (v));
959           value_idx++;
960         }
961     }
962
963   {
964     struct agr_var *i;
965
966     for (i = agr->agr_vars; i; i = i->next)
967       {
968         union value *v = case_data_rw (c, i->dest);
969         int width = var_get_width (i->dest);
970
971         if (agr->missing == COLUMNWISE && i->saw_missing
972             && (i->function & FUNC) != N && (i->function & FUNC) != NU
973             && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
974           {
975             value_set_missing (v, width);
976             casewriter_destroy (i->writer);
977             continue;
978           }
979
980         switch (i->function)
981           {
982           case SUM:
983             v->f = i->int1 ? i->dbl[0] : SYSMIS;
984             break;
985           case MEAN:
986             v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
987             break;
988           case MEDIAN:
989             {
990               if ( i->writer)
991                 {
992                   struct percentile *median = percentile_create (0.5, i->cc);
993                   struct order_stats *os = &median->parent;
994                   struct casereader *sorted_reader = casewriter_make_reader (i->writer);
995                   i->writer = NULL;
996
997                   order_stats_accumulate (&os, 1,
998                                           sorted_reader,
999                                           i->weight,
1000                                           i->subject,
1001                                           i->exclude);
1002                   i->dbl[0] = percentile_calculate (median, PC_HAVERAGE);
1003                   statistic_destroy (&median->parent.parent);
1004                 }
1005               v->f = i->dbl[0];
1006             }
1007             break;
1008           case SD:
1009             {
1010               double variance;
1011
1012               /* FIXME: we should use two passes. */
1013               moments1_calculate (i->moments, NULL, NULL, &variance,
1014                                  NULL, NULL);
1015               if (variance != SYSMIS)
1016                 v->f = sqrt (variance);
1017               else
1018                 v->f = SYSMIS;
1019             }
1020             break;
1021           case MAX:
1022           case MIN:
1023             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1024             break;
1025           case MAX | FSTRING:
1026           case MIN | FSTRING:
1027             if (i->int1)
1028               memcpy (value_str_rw (v, width), i->string, width);
1029             else
1030               value_set_missing (v, width);
1031             break;
1032           case FGT:
1033           case FGT | FSTRING:
1034           case FLT:
1035           case FLT | FSTRING:
1036           case FIN:
1037           case FIN | FSTRING:
1038           case FOUT:
1039           case FOUT | FSTRING:
1040             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
1041             break;
1042           case PGT:
1043           case PGT | FSTRING:
1044           case PLT:
1045           case PLT | FSTRING:
1046           case PIN:
1047           case PIN | FSTRING:
1048           case POUT:
1049           case POUT | FSTRING:
1050             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
1051             break;
1052           case N:
1053           case N | FSTRING:
1054               v->f = i->dbl[0];
1055             break;
1056           case NU:
1057           case NU | FSTRING:
1058             v->f = i->int1;
1059             break;
1060           case FIRST:
1061           case LAST:
1062             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1063             break;
1064           case FIRST | FSTRING:
1065           case LAST | FSTRING:
1066             if (i->int1)
1067               memcpy (value_str_rw (v, width), i->string, width);
1068             else
1069               value_set_missing (v, width);
1070             break;
1071           case NMISS:
1072           case NMISS | FSTRING:
1073             v->f = i->dbl[0];
1074             break;
1075           case NUMISS:
1076           case NUMISS | FSTRING:
1077             v->f = i->int1;
1078             break;
1079           default:
1080             NOT_REACHED ();
1081           }
1082       }
1083   }
1084
1085   casewriter_write (output, c);
1086 }
1087
1088 /* Resets the state for all the aggregate functions. */
1089 static void
1090 initialize_aggregate_info (struct agr_proc *agr)
1091 {
1092   struct agr_var *iter;
1093
1094   for (iter = agr->agr_vars; iter; iter = iter->next)
1095     {
1096       iter->saw_missing = false;
1097       iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1098       iter->int1 = iter->int2 = 0;
1099       switch (iter->function)
1100         {
1101         case MIN:
1102           iter->dbl[0] = DBL_MAX;
1103           break;
1104         case MIN | FSTRING:
1105           memset (iter->string, 255, var_get_width (iter->src));
1106           break;
1107         case MAX:
1108           iter->dbl[0] = -DBL_MAX;
1109           break;
1110         case MAX | FSTRING:
1111           memset (iter->string, 0, var_get_width (iter->src));
1112           break;
1113         case MEDIAN:
1114           {
1115             struct caseproto *proto;
1116             struct subcase ordering;
1117
1118             proto = caseproto_create ();
1119             proto = caseproto_add_width (proto, 0);
1120             proto = caseproto_add_width (proto, 0);
1121
1122             if ( ! iter->subject)
1123               iter->subject = dict_create_internal_var (0, 0);
1124
1125             if ( ! iter->weight)
1126               iter->weight = dict_create_internal_var (1, 0);
1127
1128             subcase_init_var (&ordering, iter->subject, SC_ASCEND);
1129             iter->writer = sort_create_writer (&ordering, proto);
1130             subcase_destroy (&ordering);
1131             caseproto_unref (proto);
1132
1133             iter->cc = 0;
1134           }
1135           break;
1136         case SD:
1137           if (iter->moments == NULL)
1138             iter->moments = moments1_create (MOMENT_VARIANCE);
1139           else
1140             moments1_clear (iter->moments);
1141           break;
1142         default:
1143           break;
1144         }
1145     }
1146 }