sack works
[pspp] / src / language / commands / aggregate.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2008, 2009, 2010, 2011, 2012, 2014 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include "language/commands/aggregate.h"
20
21 #include <stdlib.h>
22
23 #include "data/any-writer.h"
24 #include "data/case.h"
25 #include "data/casegrouper.h"
26 #include "data/casereader.h"
27 #include "data/casewriter.h"
28 #include "data/dataset.h"
29 #include "data/dictionary.h"
30 #include "data/file-handle-def.h"
31 #include "data/format.h"
32 #include "data/settings.h"
33 #include "data/subcase.h"
34 #include "data/sys-file-writer.h"
35 #include "data/variable.h"
36 #include "language/command.h"
37 #include "language/commands/file-handle.h"
38 #include "language/lexer/lexer.h"
39 #include "language/lexer/variable-parser.h"
40 #include "language/commands/sort-criteria.h"
41 #include "libpspp/assertion.h"
42 #include "libpspp/i18n.h"
43 #include "libpspp/message.h"
44 #include "libpspp/misc.h"
45 #include "libpspp/pool.h"
46 #include "libpspp/str.h"
47 #include "math/moments.h"
48 #include "math/percentiles.h"
49 #include "math/sort.h"
50 #include "math/statistic.h"
51
52 #include "gl/c-strcase.h"
53 #include "gl/minmax.h"
54 #include "gl/xalloc.h"
55
56 #include "gettext.h"
57 #define _(msgid) gettext (msgid)
58 #define N_(msgid) msgid
59
60 /* Argument for AGGREGATE function.
61
62    Only one of the members is used, so this could be a union, but it's simpler
63    to just have both. */
64 struct agr_argument
65   {
66     double f;                           /* Numeric. */
67     struct substring s;                 /* String. */
68   };
69
70 /* Specifies how to make an aggregate variable. */
71 struct agr_var
72   {
73     /* Collected during parsing. */
74     const struct variable *src; /* Source variable. */
75     struct variable *dest;      /* Target variable. */
76     enum agr_function function; /* Function. */
77     enum mv_class exclude;      /* Classes of missing values to exclude. */
78     struct agr_argument arg[2]; /* Arguments. */
79
80     /* Accumulated during AGGREGATE execution. */
81     double dbl;
82     double W;                   /* Total non-missing weight. */
83     int int1;
84     char *string;
85     bool saw_missing;
86     struct moments1 *moments;
87
88     struct dictionary *dict;
89     struct variable *subject;
90     struct variable *weight;
91     struct casewriter *writer;
92   };
93
94 /* Attributes of aggregation functions. */
95 const struct agr_func agr_func_tab[] =
96   {
97 #define AGRF(ENUM, NAME, DESCRIPTION, SRC_VARS, N_ARGS, ALPHA_TYPE, W, D) \
98     [ENUM] = { NAME, DESCRIPTION, SRC_VARS, N_ARGS, ALPHA_TYPE, \
99                { .type = (W) > 0 ? FMT_F : -1, .w = W, .d = D } },
100 AGGREGATE_FUNCTIONS
101 #undef AGRF
102     {NULL, NULL, AGR_SV_NO, 0, -1, {-1, -1, -1}},
103   };
104
105 /* Missing value types. */
106 enum missing_treatment
107   {
108     ITEMWISE,           /* Missing values item by item. */
109     COLUMNWISE          /* Missing values column by column. */
110   };
111
112 /* An entire AGGREGATE procedure. */
113 struct agr_proc
114   {
115     /* Break variables. */
116     struct subcase sort;                /* Sort criteria (break variables). */
117     const struct variable **break_vars;       /* Break variables. */
118     size_t break_n_vars;                /* Number of break variables. */
119
120     enum missing_treatment missing;     /* How to treat missing values. */
121     struct agr_var *agr_vars;           /* Aggregate variables. */
122     size_t n_agr_vars;
123     struct dictionary *dict;            /* Aggregate dictionary. */
124     const struct dictionary *src_dict;  /* Dict of the source */
125     int n_cases;                        /* Counts aggregated cases. */
126
127     bool add_variables;                 /* True iff the aggregated variables should
128                                            be appended to the existing dictionary */
129   };
130
131 static void initialize_aggregate_info (struct agr_proc *);
132
133 static void accumulate_aggregate_info (struct agr_proc *,
134                                        const struct ccase *);
135 /* Prototypes. */
136 static bool parse_aggregate_functions (struct lexer *, const struct dictionary *,
137                                        struct agr_proc *);
138 static void agr_destroy (struct agr_proc *);
139 static void dump_aggregate_info (const struct agr_proc *agr,
140                                  struct casewriter *output,
141                                  const struct ccase *break_case);
142 \f
143 /* Parsing. */
144
145 /* Parses and executes the AGGREGATE procedure. */
146 int
147 cmd_aggregate (struct lexer *lexer, struct dataset *ds)
148 {
149   struct dictionary *dict = dataset_dict (ds);
150   struct agr_proc agr = {
151     .missing = ITEMWISE,
152     .src_dict = dict,
153   };
154   struct file_handle *out_file = NULL;
155   struct casereader *input = NULL;
156   struct casewriter *output = NULL;
157
158   bool copy_documents = false;
159   bool presorted = false;
160   int addvariables_ofs = 0;
161
162   /* OUTFILE subcommand must be first. */
163   if (lex_match_phrase (lexer, "/OUTFILE") || lex_match_id (lexer, "OUTFILE"))
164     {
165       lex_match (lexer, T_EQUALS);
166       if (!lex_match (lexer, T_ASTERISK))
167         {
168           out_file = fh_parse (lexer, FH_REF_FILE, dataset_session (ds));
169           if (out_file == NULL)
170             goto error;
171         }
172
173       if (!out_file && lex_match_id (lexer, "MODE"))
174         {
175           lex_match (lexer, T_EQUALS);
176           if (lex_match_id (lexer, "ADDVARIABLES"))
177             {
178               addvariables_ofs = lex_ofs (lexer) - 1;
179               agr.add_variables = true;
180               presorted = true;
181             }
182           else if (lex_match_id (lexer, "REPLACE"))
183             agr.add_variables = false;
184           else
185             {
186               lex_error_expecting (lexer, "ADDVARIABLES", "REPLACE");
187               goto error;
188             }
189         }
190     }
191   else
192     {
193       agr.add_variables = true;
194       presorted = true;
195     }
196
197   if (lex_match_phrase (lexer, "/MISSING"))
198     {
199       lex_match (lexer, T_EQUALS);
200       if (!lex_match_id (lexer, "COLUMNWISE"))
201         {
202           lex_error_expecting (lexer, "COLUMNWISE");
203           goto error;
204         }
205       agr.missing = COLUMNWISE;
206     }
207
208   int presorted_ofs = 0;
209   for (;;)
210     if (lex_match_phrase (lexer, "/DOCUMENT"))
211       copy_documents = true;
212     else if (lex_match_phrase (lexer, "/PRESORTED"))
213       {
214         presorted = true;
215         presorted_ofs = lex_ofs (lexer) - 1;
216       }
217     else
218       break;
219
220   if (agr.add_variables)
221     agr.dict = dict_clone (dict);
222   else
223     agr.dict = dict_create (dict_get_encoding (dict));
224
225   dict_set_label (agr.dict, dict_get_label (dict));
226   dict_set_documents (agr.dict, dict_get_documents (dict));
227
228   if (lex_match_phrase (lexer, "/BREAK"))
229     {
230       lex_match (lexer, T_EQUALS);
231       bool saw_direction;
232       int break_start = lex_ofs (lexer);
233       if (!parse_sort_criteria (lexer, dict, &agr.sort, &agr.break_vars,
234                                 &saw_direction))
235         goto error;
236       int break_end = lex_ofs (lexer) - 1;
237       agr.break_n_vars = subcase_get_n_fields (&agr.sort);
238
239       if  (! agr.add_variables)
240         for (size_t i = 0; i < agr.break_n_vars; i++)
241           dict_clone_var_assert (agr.dict, agr.break_vars[i]);
242
243       if (presorted && saw_direction)
244         {
245           lex_ofs_msg (lexer, SW, break_start, break_end,
246                        _("When the input data is presorted, specifying "
247                          "sorting directions with (A) or (D) has no effect.  "
248                          "Output data will be sorted the same way as the "
249                          "input data."));
250           if (presorted_ofs)
251             lex_ofs_msg (lexer, SN, presorted_ofs, presorted_ofs,
252                          _("The PRESORTED subcommand state that the "
253                            "input data is presorted."));
254           else if (addvariables_ofs)
255             lex_ofs_msg (lexer, SN, addvariables_ofs, addvariables_ofs,
256                          _("ADDVARIABLES implies that the input data "
257                            "is presorted."));
258           else
259             msg (SN, _("The input data must be presorted because the "
260                        "OUTFILE subcommand is not specified."));
261         }
262     }
263
264   /* Read in the aggregate functions. */
265   if (!parse_aggregate_functions (lexer, dict, &agr))
266     goto error;
267
268   /* Delete documents. */
269   if (!copy_documents)
270     dict_clear_documents (agr.dict);
271
272   /* Cancel SPLIT FILE. */
273   dict_clear_split_vars (agr.dict);
274
275   /* Initialize. */
276   agr.n_cases = 0;
277
278   if (out_file == NULL)
279     {
280       /* The active dataset will be replaced by the aggregated data,
281          so TEMPORARY is moot. */
282       proc_make_temporary_transformations_permanent (ds);
283       proc_discard_output (ds);
284       output = autopaging_writer_create (dict_get_proto (agr.dict));
285     }
286   else
287     {
288       output = any_writer_open (out_file, agr.dict);
289       if (output == NULL)
290         goto error;
291     }
292
293   input = proc_open (ds);
294   if (!subcase_is_empty (&agr.sort) && !presorted)
295     {
296       input = sort_execute (input, &agr.sort);
297       subcase_clear (&agr.sort);
298     }
299
300   struct casegrouper *grouper;
301   struct casereader *group;
302   for (grouper = casegrouper_create_vars (input, agr.break_vars,
303                                           agr.break_n_vars);
304        casegrouper_get_next_group (grouper, &group);
305        casereader_destroy (group))
306     {
307       struct casereader *placeholder = NULL;
308       struct ccase *c = casereader_peek (group, 0);
309
310       if (c == NULL)
311         {
312           casereader_destroy (group);
313           continue;
314         }
315
316       initialize_aggregate_info (&agr);
317
318       if (agr.add_variables)
319         placeholder = casereader_clone (group);
320
321       {
322         struct ccase *cg;
323         for (; (cg = casereader_read (group)) != NULL; case_unref (cg))
324           accumulate_aggregate_info (&agr, cg);
325       }
326
327
328       if  (agr.add_variables)
329         {
330           struct ccase *cg;
331           for (; (cg = casereader_read (placeholder)) != NULL; case_unref (cg))
332             dump_aggregate_info (&agr, output, cg);
333
334           casereader_destroy (placeholder);
335         }
336       else
337         {
338           dump_aggregate_info (&agr, output, c);
339         }
340       case_unref (c);
341     }
342   if (!casegrouper_destroy (grouper))
343     goto error;
344
345   bool ok = proc_commit (ds);
346   input = NULL;
347   if (!ok)
348     goto error;
349
350   if (out_file == NULL)
351     {
352       struct casereader *next_input = casewriter_make_reader (output);
353       if (next_input == NULL)
354         goto error;
355
356       dataset_set_dict (ds, agr.dict);
357       dataset_set_source (ds, next_input);
358       agr.dict = NULL;
359     }
360   else
361     {
362       ok = casewriter_destroy (output);
363       output = NULL;
364       if (!ok)
365         goto error;
366     }
367
368   agr_destroy (&agr);
369   fh_unref (out_file);
370   return CMD_SUCCESS;
371
372 error:
373   if (input != NULL)
374     proc_commit (ds);
375   casewriter_destroy (output);
376   agr_destroy (&agr);
377   fh_unref (out_file);
378   return CMD_CASCADING_FAILURE;
379 }
380
381 static bool
382 parse_agr_func_name (struct lexer *lexer, int *func_index,
383                      enum mv_class *exclude)
384 {
385   if (lex_token (lexer) != T_ID)
386     {
387       lex_error (lexer, _("Syntax error expecting aggregation function."));
388       return false;
389     }
390
391   struct substring name = lex_tokss (lexer);
392   *exclude = ss_chomp_byte (&name, '.') ? MV_SYSTEM : MV_ANY;
393
394   for (const struct agr_func *f = agr_func_tab; f->name; f++)
395     if (ss_equals_case (ss_cstr (f->name), name))
396       {
397         *func_index = f - agr_func_tab;
398         lex_get (lexer);
399         return true;
400       }
401   lex_error (lexer, _("Unknown aggregation function %s."), lex_tokcstr (lexer));
402   return false;
403 }
404
405 /* Parse all the aggregate functions. */
406 static bool
407 parse_aggregate_functions (struct lexer *lexer, const struct dictionary *dict,
408                            struct agr_proc *agr)
409 {
410   if (!lex_force_match (lexer, T_SLASH))
411     return false;
412
413   size_t starting_n_vars = dict_get_n_vars (dict);
414   size_t allocated_agr_vars = 0;
415
416   /* Parse everything. */
417   for (;;)
418     {
419       char **dest = NULL;
420       char **dest_label = NULL;
421       size_t n_vars = 0;
422
423       struct agr_argument arg[2] = { { .f = 0 }, { .f = 0 } };
424
425       const struct variable **src = NULL;
426
427       /* Parse the list of target variables. */
428       int dst_start_ofs = lex_ofs (lexer);
429       while (!lex_match (lexer, T_EQUALS))
430         {
431           size_t n_vars_prev = n_vars;
432
433           if (!parse_DATA_LIST_vars (lexer, dict, &dest, &n_vars,
434                                      (PV_APPEND | PV_SINGLE | PV_NO_SCRATCH
435                                       | PV_NO_DUPLICATE)))
436             goto error;
437
438           /* Assign empty labels. */
439           dest_label = xnrealloc (dest_label, n_vars, sizeof *dest_label);
440           for (size_t j = n_vars_prev; j < n_vars; j++)
441             dest_label[j] = NULL;
442
443           if (lex_is_string (lexer))
444             {
445               dest_label[n_vars - 1] = xstrdup (lex_tokcstr (lexer));
446               lex_get (lexer);
447             }
448         }
449       int dst_end_ofs = lex_ofs (lexer) - 2;
450
451       /* Get the name of the aggregation function. */
452       int func_index;
453       enum mv_class exclude;
454       if (!parse_agr_func_name (lexer, &func_index, &exclude))
455         goto error;
456       const struct agr_func *function = &agr_func_tab[func_index];
457
458       /* Check for leading lparen. */
459       if (!lex_match (lexer, T_LPAREN))
460         {
461           if (function->src_vars == AGR_SV_YES)
462             {
463               bool ok UNUSED = lex_force_match (lexer, T_LPAREN);
464               goto error;
465             }
466         }
467       else
468         {
469           /* Parse list of source variables. */
470           int pv_opts = PV_NO_SCRATCH;
471           if (func_index == AGRF_SUM || func_index == AGRF_MEAN
472               || func_index == AGRF_MEDIAN || func_index == AGRF_SD)
473             pv_opts |= PV_NUMERIC;
474           else if (function->n_args)
475             pv_opts |= PV_SAME_TYPE;
476
477           int src_start_ofs = lex_ofs (lexer);
478           size_t n_src;
479           if (!parse_variables_const (lexer, dict, &src, &n_src, pv_opts))
480             goto error;
481           int src_end_ofs = lex_ofs (lexer) - 1;
482
483           /* Parse function arguments, for those functions that
484              require arguments. */
485           int args_start_ofs = 0;
486           if (function->n_args != 0)
487             for (size_t i = 0; i < function->n_args; i++)
488               {
489                 lex_match (lexer, T_COMMA);
490
491                 enum val_type type;
492                 if (lex_is_string (lexer))
493                   type = VAL_STRING;
494                 else if (lex_is_number (lexer))
495                   type = VAL_NUMERIC;
496                 else
497                   {
498                     lex_error (lexer, _("Missing argument %zu to %s."),
499                                i + 1, function->name);
500                     goto error;
501                   }
502
503                 if (type != var_get_type (src[0]))
504                   {
505                     msg (SE, _("Arguments to %s must be of same type as "
506                                "source variables."),
507                          function->name);
508                     if (type == VAL_NUMERIC)
509                       {
510                         lex_next_msg (lexer, SN, 0, 0,
511                                       _("The argument is numeric."));
512                         lex_ofs_msg (lexer, SN, src_start_ofs, src_end_ofs,
513                                      _("The variables have string type."));
514                       }
515                     else
516                       {
517                         lex_next_msg (lexer, SN, 0, 0,
518                                       _("The argument is a string."));
519                         lex_ofs_msg (lexer, SN, src_start_ofs, src_end_ofs,
520                                      _("The variables are numeric."));
521                       }
522                     goto error;
523                   }
524
525                 if (i == 0)
526                   args_start_ofs = lex_ofs (lexer);
527                 if (type == VAL_NUMERIC)
528                   arg[i].f = lex_tokval (lexer);
529                 else
530                   arg[i].s = recode_substring_pool (dict_get_encoding (agr->dict),
531                                                     "UTF-8", lex_tokss (lexer),
532                                                     NULL);
533                 lex_get (lexer);
534               }
535           int args_end_ofs = lex_ofs (lexer) - 1;
536
537           /* Trailing rparen. */
538           if (!lex_force_match (lexer, T_RPAREN))
539             goto error;
540
541           /* Now check that the number of source variables match
542              the number of target variables.  If we check earlier
543              than this, the user can get very misleading error
544              message, i.e. `AGGREGATE x=SUM(y t).' will get this
545              error message when a proper message would be more
546              like `unknown variable t'. */
547           if (n_src != n_vars)
548             {
549               msg (SE, _("Number of source variables (%zu) does not match "
550                          "number of target variables (%zu)."),
551                    n_src, n_vars);
552               lex_ofs_msg (lexer, SN, src_start_ofs, src_end_ofs,
553                            _("These are the source variables."));
554               lex_ofs_msg (lexer, SN, dst_start_ofs, dst_end_ofs,
555                            _("These are the target variables."));
556               goto error;
557             }
558
559           if ((func_index == AGRF_PIN || func_index == AGRF_POUT
560               || func_index == AGRF_FIN || func_index == AGRF_FOUT)
561               && (var_is_numeric (src[0])
562                   ? arg[0].f > arg[1].f
563                   : buf_compare_rpad (arg[0].s.string, arg[0].s.length,
564                                       arg[1].s.string, arg[1].s.length) > 0))
565             {
566               struct agr_argument tmp = arg[0];
567               arg[0] = arg[1];
568               arg[1] = tmp;
569
570               lex_ofs_msg (lexer, SW, args_start_ofs, args_end_ofs,
571                            _("The value arguments passed to the %s function "
572                              "are out of order.  They will be treated as if "
573                              "they had been specified in the correct order."),
574                            function->name);
575             }
576         }
577
578       /* Finally add these to the aggregation variables. */
579       for (size_t i = 0; i < n_vars; i++)
580         {
581           const struct variable *existing_var = dict_lookup_var (agr->dict,
582                                                                  dest[i]);
583           if (existing_var)
584             {
585               if (var_get_dict_index (existing_var) >= starting_n_vars)
586                 lex_ofs_error (lexer, dst_start_ofs, dst_end_ofs,
587                                _("Duplicate target variable name %s."),
588                                dest[i]);
589               else if (agr->add_variables)
590                 lex_ofs_error (lexer, dst_start_ofs, dst_end_ofs,
591                                _("Variable name %s duplicates the name of a "
592                                  "variable in the active file dictionary."),
593                                dest[i]);
594               else
595                 lex_ofs_error (lexer, dst_start_ofs, dst_end_ofs,
596                                _("Variable name %s duplicates the name of a "
597                                  "break variable."), dest[i]);
598               goto error;
599             }
600
601           /* Add variable. */
602           if (agr->n_agr_vars >= allocated_agr_vars)
603             agr->agr_vars = x2nrealloc (agr->agr_vars, &allocated_agr_vars,
604                                         sizeof *agr->agr_vars);
605           struct agr_var *v = &agr->agr_vars[agr->n_agr_vars++];
606           *v = (struct agr_var) {
607             .exclude = exclude,
608             .moments = NULL,
609             .function = func_index,
610             .src = src ? src[i] : NULL,
611           };
612
613           /* Create the target variable in the aggregate dictionary. */
614           if (v->src && var_is_alpha (v->src))
615             v->string = xmalloc (var_get_width (v->src));
616
617           if (v->src && function->alpha_type == VAL_STRING)
618             v->dest = dict_clone_var_as_assert (agr->dict, v->src, dest[i]);
619           else
620             {
621               v->dest = dict_create_var_assert (agr->dict, dest[i], 0);
622
623               struct fmt_spec f;
624               if ((func_index == AGRF_N || func_index == AGRF_NMISS)
625                   && dict_get_weight (dict) != NULL)
626                 f = fmt_for_output (FMT_F, 8, 2);
627               else
628                 f = function->format;
629               var_set_both_formats (v->dest, f);
630             }
631           if (dest_label[i])
632             var_set_label (v->dest, dest_label[i]);
633
634           if (v->src != NULL)
635             for (size_t j = 0; j < function->n_args; j++)
636               v->arg[j] = (struct agr_argument) {
637                 .f = arg[j].f,
638                 .s = arg[j].s.string ? ss_clone (arg[j].s) : ss_empty (),
639               };
640         }
641
642       ss_dealloc (&arg[0].s);
643       ss_dealloc (&arg[1].s);
644
645       free (src);
646       for (size_t i = 0; i < n_vars; i++)
647         {
648           free (dest[i]);
649           free (dest_label[i]);
650         }
651       free (dest);
652       free (dest_label);
653
654       if (!lex_match (lexer, T_SLASH))
655         {
656           if (lex_token (lexer) == T_ENDCMD)
657             return true;
658
659           lex_error (lexer, "Syntax error expecting end of command.");
660           return false;
661         }
662       continue;
663
664     error:
665       for (size_t i = 0; i < n_vars; i++)
666         {
667           free (dest[i]);
668           free (dest_label[i]);
669         }
670       free (dest);
671       free (dest_label);
672       ss_dealloc (&arg[0].s);
673       ss_dealloc (&arg[1].s);
674       free (src);
675
676       return false;
677     }
678 }
679
680 /* Destroys AGR. */
681 static void
682 agr_destroy (struct agr_proc *agr)
683 {
684   subcase_uninit (&agr->sort);
685   free (agr->break_vars);
686   for (size_t i = 0; i < agr->n_agr_vars; i++)
687     {
688       struct agr_var *av = &agr->agr_vars[i];
689
690       ss_dealloc (&av->arg[0].s);
691       ss_dealloc (&av->arg[1].s);
692       free (av->string);
693
694       if (av->function == AGRF_SD)
695         moments1_destroy (av->moments);
696
697       dict_unref (av->dict);
698     }
699   free (agr->agr_vars);
700   if (agr->dict != NULL)
701     dict_unref (agr->dict);
702 }
703 \f
704 /* Execution. */
705
706 /* Accumulates aggregation data from the case INPUT. */
707 static void
708 accumulate_aggregate_info (struct agr_proc *agr, const struct ccase *input)
709 {
710   bool bad_warn = true;
711   double weight = dict_get_case_weight (agr->src_dict, input, &bad_warn);
712   for (size_t i = 0; i < agr->n_agr_vars; i++)
713     {
714       struct agr_var *av = &agr->agr_vars[i];
715       if (av->src)
716         {
717           bool is_string = var_is_alpha (av->src);
718           const union value *v = case_data (input, av->src);
719           int src_width = var_get_width (av->src);
720           const struct substring vs = (src_width > 0
721                                        ? value_ss (v, src_width)
722                                        : ss_empty ());
723
724           if (var_is_value_missing (av->src, v) & av->exclude)
725             {
726               switch (av->function)
727                 {
728                 case AGRF_NMISS:
729                   av->dbl += weight;
730                   break;
731
732                 case AGRF_NUMISS:
733                   av->int1++;
734                   break;
735
736                 case AGRF_SUM:
737                 case AGRF_MEAN:
738                 case AGRF_MEDIAN:
739                 case AGRF_SD:
740                 case AGRF_MAX:
741                 case AGRF_MIN:
742                 case AGRF_PGT:
743                 case AGRF_PLT:
744                 case AGRF_PIN:
745                 case AGRF_POUT:
746                 case AGRF_FGT:
747                 case AGRF_FLT:
748                 case AGRF_FIN:
749                 case AGRF_FOUT:
750                 case AGRF_CGT:
751                 case AGRF_CLT:
752                 case AGRF_CIN:
753                 case AGRF_COUT:
754                 case AGRF_N:
755                 case AGRF_NU:
756                 case AGRF_FIRST:
757                 case AGRF_LAST:
758                   break;
759                 }
760               av->saw_missing = true;
761               continue;
762             }
763
764           /* This is horrible.  There are too many possibilities. */
765           av->W += weight;
766           switch (av->function)
767             {
768             case AGRF_SUM:
769               av->dbl += v->f * weight;
770               av->int1 = 1;
771               break;
772
773             case AGRF_MEAN:
774               av->dbl += v->f * weight;
775               break;
776
777             case AGRF_MEDIAN:
778               {
779                 struct ccase *cout = case_create (casewriter_get_proto (av->writer));
780                 *case_num_rw (cout, av->subject) = case_num (input, av->src);
781                 *case_num_rw (cout, av->weight) = weight;
782                 casewriter_write (av->writer, cout);
783               }
784               break;
785
786             case AGRF_SD:
787               moments1_add (av->moments, v->f, weight);
788               break;
789
790             case AGRF_MAX:
791               if (!is_string)
792                 av->dbl = MAX (av->dbl, v->f);
793               else if (memcmp (av->string, v->s, src_width) < 0)
794                 memcpy (av->string, v->s, src_width);
795               av->int1 = 1;
796               break;
797
798             case AGRF_MIN:
799               if (!is_string)
800                 av->dbl = MIN (av->dbl, v->f);
801               else if (memcmp (av->string, v->s, src_width) > 0)
802                 memcpy (av->string, v->s, src_width);
803               av->dbl = MIN (av->dbl, v->f);
804               av->int1 = 1;
805               break;
806
807             case AGRF_FGT:
808             case AGRF_PGT:
809             case AGRF_CGT:
810               if (is_string
811                   ? ss_compare_rpad (av->arg[0].s, vs) < 0
812                   : v->f > av->arg[0].f)
813                 av->dbl += weight;
814               break;
815
816             case AGRF_FLT:
817             case AGRF_PLT:
818             case AGRF_CLT:
819               if (is_string
820                   ? ss_compare_rpad (av->arg[0].s, vs) > 0
821                   : v->f < av->arg[0].f)
822                 av->dbl += weight;
823               break;
824
825             case AGRF_FIN:
826             case AGRF_PIN:
827             case AGRF_CIN:
828               if (is_string
829                   ? (ss_compare_rpad (av->arg[0].s, vs) <= 0
830                      && ss_compare_rpad (av->arg[1].s, vs) >= 0)
831                   : av->arg[0].f <= v->f && v->f <= av->arg[1].f)
832                 av->dbl += weight;
833               break;
834
835             case AGRF_FOUT:
836             case AGRF_POUT:
837             case AGRF_COUT:
838               if (is_string
839                   ? (ss_compare_rpad (av->arg[0].s, vs) > 0
840                      || ss_compare_rpad (av->arg[1].s, vs) < 0)
841                   : av->arg[0].f > v->f || v->f > av->arg[1].f)
842                 av->dbl += weight;
843               break;
844
845             case AGRF_N:
846               av->dbl += weight;
847               break;
848
849             case AGRF_NU:
850               av->int1++;
851               break;
852
853             case AGRF_FIRST:
854               if (av->int1 == 0)
855                 {
856                   if (is_string)
857                     memcpy (av->string, v->s, src_width);
858                   else
859                     av->dbl = v->f;
860                   av->int1 = 1;
861                 }
862               break;
863
864             case AGRF_LAST:
865               if (is_string)
866                 memcpy (av->string, v->s, src_width);
867               else
868                 av->dbl = v->f;
869               av->int1 = 1;
870               break;
871
872             case AGRF_NMISS:
873             case AGRF_NUMISS:
874               /* Our value is not missing or it would have been
875                  caught earlier.  Nothing to do. */
876               break;
877             }
878         }
879       else
880         {
881           av->W += weight;
882           switch (av->function)
883             {
884             case AGRF_N:
885               break;
886
887             case AGRF_NU:
888               av->int1++;
889               break;
890
891             case AGRF_SUM:
892             case AGRF_MEAN:
893             case AGRF_MEDIAN:
894             case AGRF_SD:
895             case AGRF_MAX:
896             case AGRF_MIN:
897             case AGRF_PGT:
898             case AGRF_PLT:
899             case AGRF_PIN:
900             case AGRF_POUT:
901             case AGRF_FGT:
902             case AGRF_FLT:
903             case AGRF_FIN:
904             case AGRF_FOUT:
905             case AGRF_CGT:
906             case AGRF_CLT:
907             case AGRF_CIN:
908             case AGRF_COUT:
909             case AGRF_NMISS:
910             case AGRF_NUMISS:
911             case AGRF_FIRST:
912             case AGRF_LAST:
913               NOT_REACHED ();
914             }
915         }
916     }
917 }
918
919 /* Writes an aggregated record to OUTPUT. */
920 static void
921 dump_aggregate_info (const struct agr_proc *agr, struct casewriter *output, const struct ccase *break_case)
922 {
923   struct ccase *c = case_create (dict_get_proto (agr->dict));
924
925   if (agr->add_variables)
926     {
927       case_copy (c, 0, break_case, 0, dict_get_n_vars (agr->src_dict));
928     }
929   else
930     {
931       int value_idx = 0;
932
933       for (size_t i = 0; i < agr->break_n_vars; i++)
934         {
935           const struct variable *v = agr->break_vars[i];
936           value_copy (case_data_rw_idx (c, value_idx),
937                       case_data (break_case, v),
938                       var_get_width (v));
939           value_idx++;
940         }
941     }
942
943   for (size_t i = 0; i < agr->n_agr_vars; i++)
944     {
945       struct agr_var *av = &agr->agr_vars[i];
946       union value *v = case_data_rw (c, av->dest);
947       int width = var_get_width (av->dest);
948
949       if (agr->missing == COLUMNWISE && av->saw_missing
950           && av->function != AGRF_N
951           && av->function != AGRF_NU
952           && av->function != AGRF_NMISS
953           && av->function != AGRF_NUMISS)
954         {
955           value_set_missing (v, width);
956           casewriter_destroy (av->writer);
957           continue;
958         }
959
960       switch (av->function)
961         {
962         case AGRF_SUM:
963           v->f = av->int1 ? av->dbl : SYSMIS;
964           break;
965
966         case AGRF_MEAN:
967           v->f = av->W != 0.0 ? av->dbl / av->W : SYSMIS;
968           break;
969
970         case AGRF_MEDIAN:
971           {
972             if (av->writer)
973               {
974                 struct percentile *median = percentile_create (0.5, av->W);
975                 struct order_stats *os = &median->parent;
976                 struct casereader *sorted_reader = casewriter_make_reader (av->writer);
977                 av->writer = NULL;
978
979                 order_stats_accumulate (&os, 1,
980                                         sorted_reader,
981                                         av->weight,
982                                         av->subject,
983                                         av->exclude);
984                 av->dbl = percentile_calculate (median, PC_HAVERAGE);
985                 statistic_destroy (&median->parent.parent);
986               }
987             v->f = av->dbl;
988           }
989           break;
990
991         case AGRF_SD:
992           {
993             double variance;
994
995             moments1_calculate (av->moments, NULL, NULL, &variance,
996                                 NULL, NULL);
997             v->f = variance != SYSMIS ? sqrt (variance) : SYSMIS;
998           }
999           break;
1000
1001         case AGRF_MAX:
1002         case AGRF_MIN:
1003         case AGRF_FIRST:
1004         case AGRF_LAST:
1005           if (!width)
1006             v->f = av->int1 ? av->dbl : SYSMIS;
1007           else
1008             {
1009               if (av->int1)
1010                 memcpy (v->s, av->string, width);
1011               else
1012                 value_set_missing (v, width);
1013             }
1014           break;
1015
1016         case AGRF_FGT:
1017         case AGRF_FLT:
1018         case AGRF_FIN:
1019         case AGRF_FOUT:
1020           v->f = av->W ? av->dbl / av->W : SYSMIS;
1021           break;
1022
1023         case AGRF_PGT:
1024         case AGRF_PLT:
1025         case AGRF_PIN:
1026         case AGRF_POUT:
1027           v->f = av->W ? av->dbl / av->W * 100.0 : SYSMIS;
1028           break;
1029
1030         case AGRF_CGT:
1031         case AGRF_CLT:
1032         case AGRF_CIN:
1033         case AGRF_COUT:
1034           v->f = av->dbl;
1035           break;
1036
1037         case AGRF_N:
1038           v->f = av->W;
1039           break;
1040
1041         case AGRF_NU:
1042         case AGRF_NUMISS:
1043           v->f = av->int1;
1044           break;
1045
1046         case AGRF_NMISS:
1047           v->f = av->dbl;
1048           break;
1049         }
1050     }
1051
1052   casewriter_write (output, c);
1053 }
1054
1055 /* Resets the state for all the aggregate functions. */
1056 static void
1057 initialize_aggregate_info (struct agr_proc *agr)
1058 {
1059   for (size_t i = 0; i < agr->n_agr_vars; i++)
1060     {
1061       struct agr_var *av = &agr->agr_vars[i];
1062       av->saw_missing = false;
1063       av->dbl = av->W = 0.0;
1064       av->int1 = 0;
1065
1066       int width = av->src ? var_get_width (av->src) : 0;
1067       switch (av->function)
1068         {
1069         case AGRF_MIN:
1070           if (!width)
1071             av->dbl = DBL_MAX;
1072           else
1073             memset (av->string, 255, width);
1074           break;
1075
1076         case AGRF_MAX:
1077           if (!width)
1078             av->dbl = -DBL_MAX;
1079           else
1080             memset (av->string, 0, width);
1081           break;
1082
1083         case AGRF_MEDIAN:
1084           {
1085             struct caseproto *proto = caseproto_create ();
1086             proto = caseproto_add_width (proto, 0);
1087             proto = caseproto_add_width (proto, 0);
1088
1089             if (!av->dict)
1090               av->dict = dict_create ("UTF-8");
1091             if (! av->subject)
1092               av->subject = dict_create_var (av->dict, "subject", 0);
1093             if (! av->weight)
1094               av->weight = dict_create_var (av->dict, "weight", 0);
1095
1096             struct subcase ordering;
1097             subcase_init_var (&ordering, av->subject, SC_ASCEND);
1098             av->writer = sort_create_writer (&ordering, proto);
1099             subcase_uninit (&ordering);
1100             caseproto_unref (proto);
1101           }
1102           break;
1103
1104         case AGRF_SD:
1105           if (av->moments == NULL)
1106             av->moments = moments1_create (MOMENT_VARIANCE);
1107           else
1108             moments1_clear (av->moments);
1109           break;
1110
1111         case AGRF_SUM:
1112         case AGRF_MEAN:
1113         case AGRF_PGT:
1114         case AGRF_PLT:
1115         case AGRF_PIN:
1116         case AGRF_POUT:
1117         case AGRF_FGT:
1118         case AGRF_FLT:
1119         case AGRF_FIN:
1120         case AGRF_FOUT:
1121         case AGRF_CGT:
1122         case AGRF_CLT:
1123         case AGRF_CIN:
1124         case AGRF_COUT:
1125         case AGRF_N:
1126         case AGRF_NU:
1127         case AGRF_NMISS:
1128         case AGRF_NUMISS:
1129         case AGRF_FIRST:
1130         case AGRF_LAST:
1131           break;
1132         }
1133     }
1134 }