AGGREGATE: Use array instead of linked list.
[pspp] / src / language / stats / aggregate.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2008, 2009, 2010, 2011, 2012, 2014 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include "language/stats/aggregate.h"
20
21 #include <stdlib.h>
22
23 #include "data/any-writer.h"
24 #include "data/case.h"
25 #include "data/casegrouper.h"
26 #include "data/casereader.h"
27 #include "data/casewriter.h"
28 #include "data/dataset.h"
29 #include "data/dictionary.h"
30 #include "data/file-handle-def.h"
31 #include "data/format.h"
32 #include "data/settings.h"
33 #include "data/subcase.h"
34 #include "data/sys-file-writer.h"
35 #include "data/variable.h"
36 #include "language/command.h"
37 #include "language/data-io/file-handle.h"
38 #include "language/lexer/lexer.h"
39 #include "language/lexer/variable-parser.h"
40 #include "language/stats/sort-criteria.h"
41 #include "libpspp/assertion.h"
42 #include "libpspp/i18n.h"
43 #include "libpspp/message.h"
44 #include "libpspp/misc.h"
45 #include "libpspp/pool.h"
46 #include "libpspp/str.h"
47 #include "math/moments.h"
48 #include "math/percentiles.h"
49 #include "math/sort.h"
50 #include "math/statistic.h"
51
52 #include "gl/c-strcase.h"
53 #include "gl/minmax.h"
54 #include "gl/xalloc.h"
55
56 #include "gettext.h"
57 #define _(msgid) gettext (msgid)
58 #define N_(msgid) msgid
59
60 /* Argument for AGGREGATE function. */
61 union agr_argument
62   {
63     double f;                           /* Numeric. */
64     char *c;                            /* Short or long string. */
65   };
66
67 /* Specifies how to make an aggregate variable. */
68 struct agr_var
69   {
70     /* Collected during parsing. */
71     const struct variable *src; /* Source variable. */
72     struct variable *dest;      /* Target variable. */
73     int function;               /* Function. */
74     enum mv_class exclude;      /* Classes of missing values to exclude. */
75     union agr_argument arg[2];  /* Arguments. */
76
77     /* Accumulated during AGGREGATE execution. */
78     double dbl[3];
79     int int1;
80     char *string;
81     bool saw_missing;
82     struct moments1 *moments;
83     double cc;
84
85     struct variable *subject;
86     struct variable *weight;
87     struct casewriter *writer;
88   };
89
90
91 /* Attributes of aggregation functions. */
92 const struct agr_func agr_func_tab[] =
93   {
94     {"SUM",     N_("Sum of values"),                         AGR_SV_YES, 0, -1,          { .type = FMT_F, .w = 8, .d = 2 }},
95     {"MEAN",    N_("Mean average"),                          AGR_SV_YES, 0, -1,          { .type = FMT_F, .w = 8, .d = 2 }},
96     {"MEDIAN",  N_("Median average"),                        AGR_SV_YES, 0, -1,          { .type = FMT_F, .w = 8, .d = 2 }},
97     {"SD",      N_("Standard deviation"),                    AGR_SV_YES, 0, -1,          { .type = FMT_F, .w = 8, .d = 2 }},
98     {"MAX",     N_("Maximum value"),                         AGR_SV_YES, 0, VAL_STRING,  {-1, -1, -1}},
99     {"MIN",     N_("Minimum value"),                         AGR_SV_YES, 0, VAL_STRING,  {-1, -1, -1}},
100     {"PGT",     N_("Percentage greater than"),               AGR_SV_YES, 1, VAL_NUMERIC, { .type = FMT_F, .w = 5, .d = 1 }},
101     {"PLT",     N_("Percentage less than"),                  AGR_SV_YES, 1, VAL_NUMERIC, { .type = FMT_F, .w = 5, .d = 1 }},
102     {"PIN",     N_("Percentage included in range"),          AGR_SV_YES, 2, VAL_NUMERIC, { .type = FMT_F, .w = 5, .d = 1 }},
103     {"POUT",    N_("Percentage excluded from range"),        AGR_SV_YES, 2, VAL_NUMERIC, { .type = FMT_F, .w = 5, .d = 1 }},
104     {"FGT",     N_("Fraction greater than"),                 AGR_SV_YES, 1, VAL_NUMERIC, { .type = FMT_F, .w = 5, .d = 3 }},
105     {"FLT",     N_("Fraction less than"),                    AGR_SV_YES, 1, VAL_NUMERIC, { .type = FMT_F, .w = 5, .d = 3 }},
106     {"FIN",     N_("Fraction included in range"),            AGR_SV_YES, 2, VAL_NUMERIC, { .type = FMT_F, .w = 5, .d = 3 }},
107     {"FOUT",    N_("Fraction excluded from range"),          AGR_SV_YES, 2, VAL_NUMERIC, { .type = FMT_F, .w = 5, .d = 3 }},
108     {"N",       N_("Number of cases"),                       AGR_SV_NO,  0, VAL_NUMERIC, { .type = FMT_F, .w = 7, .d = 0 }},
109     {"NU",      N_("Number of cases (unweighted)"),          AGR_SV_OPT, 0, VAL_NUMERIC, { .type = FMT_F, .w = 7, .d = 0 }},
110     {"NMISS",   N_("Number of missing values"),              AGR_SV_YES, 0, VAL_NUMERIC, { .type = FMT_F, .w = 7, .d = 0 }},
111     {"NUMISS",  N_("Number of missing values (unweighted)"), AGR_SV_YES, 0, VAL_NUMERIC, { .type = FMT_F, .w = 7, .d = 0 }},
112     {"FIRST",   N_("First non-missing value"),               AGR_SV_YES, 0, VAL_STRING,  {-1, -1, -1}},
113     {"LAST",    N_("Last non-missing value"),                AGR_SV_YES, 0, VAL_STRING,  {-1, -1, -1}},
114     {NULL,      NULL,                                        AGR_SV_NO,  0, -1,          {-1, -1, -1}},
115   };
116
117 /* Missing value types. */
118 enum missing_treatment
119   {
120     ITEMWISE,           /* Missing values item by item. */
121     COLUMNWISE          /* Missing values column by column. */
122   };
123
124 /* An entire AGGREGATE procedure. */
125 struct agr_proc
126   {
127     /* Break variables. */
128     struct subcase sort;                /* Sort criteria (break variables). */
129     const struct variable **break_vars;       /* Break variables. */
130     size_t break_n_vars;                /* Number of break variables. */
131
132     enum missing_treatment missing;     /* How to treat missing values. */
133     struct agr_var *agr_vars;           /* Aggregate variables. */
134     size_t n_agr_vars;
135     struct dictionary *dict;            /* Aggregate dictionary. */
136     const struct dictionary *src_dict;  /* Dict of the source */
137     int n_cases;                        /* Counts aggregated cases. */
138
139     bool add_variables;                 /* True iff the aggregated variables should
140                                            be appended to the existing dictionary */
141   };
142
143 static void initialize_aggregate_info (struct agr_proc *);
144
145 static void accumulate_aggregate_info (struct agr_proc *,
146                                        const struct ccase *);
147 /* Prototypes. */
148 static bool parse_aggregate_functions (struct lexer *, const struct dictionary *,
149                                        struct agr_proc *);
150 static void agr_destroy (struct agr_proc *);
151 static void dump_aggregate_info (const struct agr_proc *agr,
152                                  struct casewriter *output,
153                                  const struct ccase *break_case);
154 \f
155 /* Parsing. */
156
157 /* Parses and executes the AGGREGATE procedure. */
158 int
159 cmd_aggregate (struct lexer *lexer, struct dataset *ds)
160 {
161   struct dictionary *dict = dataset_dict (ds);
162   struct agr_proc agr;
163   struct file_handle *out_file = NULL;
164   struct casereader *input = NULL, *group;
165   struct casegrouper *grouper;
166   struct casewriter *output = NULL;
167
168   bool copy_documents = false;
169   bool presorted = false;
170   bool saw_direction;
171   bool ok;
172
173   memset(&agr, 0 , sizeof (agr));
174   agr.missing = ITEMWISE;
175   agr.src_dict = dict;
176   subcase_init_empty (&agr.sort);
177
178   /* OUTFILE subcommand must be first. */
179   lex_match (lexer, T_SLASH);
180   if (!lex_force_match_id (lexer, "OUTFILE"))
181     goto error;
182   lex_match (lexer, T_EQUALS);
183   if (!lex_match (lexer, T_ASTERISK))
184     {
185       out_file = fh_parse (lexer, FH_REF_FILE, dataset_session (ds));
186       if (out_file == NULL)
187         goto error;
188     }
189
190   if (out_file == NULL && lex_match_id (lexer, "MODE"))
191     {
192       lex_match (lexer, T_EQUALS);
193       if (lex_match_id (lexer, "ADDVARIABLES"))
194         {
195           agr.add_variables = true;
196
197           /* presorted is assumed in ADDVARIABLES mode */
198           presorted = true;
199         }
200       else if (lex_match_id (lexer, "REPLACE"))
201         {
202           agr.add_variables = false;
203         }
204       else
205         goto error;
206     }
207
208   if (agr.add_variables)
209     agr.dict = dict_clone (dict);
210   else
211     agr.dict = dict_create (dict_get_encoding (dict));
212
213   dict_set_label (agr.dict, dict_get_label (dict));
214   dict_set_documents (agr.dict, dict_get_documents (dict));
215
216   /* Read most of the subcommands. */
217   for (;;)
218     {
219       lex_match (lexer, T_SLASH);
220
221       if (lex_match_id (lexer, "MISSING"))
222         {
223           lex_match (lexer, T_EQUALS);
224           if (!lex_match_id (lexer, "COLUMNWISE"))
225             {
226               lex_error_expecting (lexer, "COLUMNWISE");
227               goto error;
228             }
229           agr.missing = COLUMNWISE;
230         }
231       else if (lex_match_id (lexer, "DOCUMENT"))
232         copy_documents = true;
233       else if (lex_match_id (lexer, "PRESORTED"))
234         presorted = true;
235       else if (lex_force_match_id (lexer, "BREAK"))
236         {
237           int i;
238
239           lex_match (lexer, T_EQUALS);
240           if (!parse_sort_criteria (lexer, dict, &agr.sort, &agr.break_vars,
241                                     &saw_direction))
242             goto error;
243           agr.break_n_vars = subcase_get_n_fields (&agr.sort);
244
245           if  (! agr.add_variables)
246             for (i = 0; i < agr.break_n_vars; i++)
247               dict_clone_var_assert (agr.dict, agr.break_vars[i]);
248
249           /* BREAK must follow the options. */
250           break;
251         }
252       else
253         goto error;
254
255     }
256   if (presorted && saw_direction)
257     msg (SW, _("When PRESORTED is specified, specifying sorting directions "
258                "with (A) or (D) has no effect.  Output data will be sorted "
259                "the same way as the input data."));
260
261   /* Read in the aggregate functions. */
262   lex_match (lexer, T_SLASH);
263   if (!parse_aggregate_functions (lexer, dict, &agr))
264     goto error;
265
266   /* Delete documents. */
267   if (!copy_documents)
268     dict_clear_documents (agr.dict);
269
270   /* Cancel SPLIT FILE. */
271   dict_clear_split_vars (agr.dict);
272
273   /* Initialize. */
274   agr.n_cases = 0;
275
276   if (out_file == NULL)
277     {
278       /* The active dataset will be replaced by the aggregated data,
279          so TEMPORARY is moot. */
280       proc_cancel_temporary_transformations (ds);
281       proc_discard_output (ds);
282       output = autopaging_writer_create (dict_get_proto (agr.dict));
283     }
284   else
285     {
286       output = any_writer_open (out_file, agr.dict);
287       if (output == NULL)
288         goto error;
289     }
290
291   input = proc_open (ds);
292   if (!subcase_is_empty (&agr.sort) && !presorted)
293     {
294       input = sort_execute (input, &agr.sort);
295       subcase_clear (&agr.sort);
296     }
297
298   for (grouper = casegrouper_create_vars (input, agr.break_vars,
299                                           agr.break_n_vars);
300        casegrouper_get_next_group (grouper, &group);
301        casereader_destroy (group))
302     {
303       struct casereader *placeholder = NULL;
304       struct ccase *c = casereader_peek (group, 0);
305
306       if (c == NULL)
307         {
308           casereader_destroy (group);
309           continue;
310         }
311
312       initialize_aggregate_info (&agr);
313
314       if (agr.add_variables)
315         placeholder = casereader_clone (group);
316
317       {
318         struct ccase *cg;
319         for (; (cg = casereader_read (group)) != NULL; case_unref (cg))
320           accumulate_aggregate_info (&agr, cg);
321       }
322
323
324       if  (agr.add_variables)
325         {
326           struct ccase *cg;
327           for (; (cg = casereader_read (placeholder)) != NULL; case_unref (cg))
328             dump_aggregate_info (&agr, output, cg);
329
330           casereader_destroy (placeholder);
331         }
332       else
333         {
334           dump_aggregate_info (&agr, output, c);
335         }
336       case_unref (c);
337     }
338   if (!casegrouper_destroy (grouper))
339     goto error;
340
341   if (!proc_commit (ds))
342     {
343       input = NULL;
344       goto error;
345     }
346   input = NULL;
347
348   if (out_file == NULL)
349     {
350       struct casereader *next_input = casewriter_make_reader (output);
351       if (next_input == NULL)
352         goto error;
353
354       dataset_set_dict (ds, agr.dict);
355       dataset_set_source (ds, next_input);
356       agr.dict = NULL;
357     }
358   else
359     {
360       ok = casewriter_destroy (output);
361       output = NULL;
362       if (!ok)
363         goto error;
364     }
365
366   agr_destroy (&agr);
367   fh_unref (out_file);
368   return CMD_SUCCESS;
369
370 error:
371   if (input != NULL)
372     proc_commit (ds);
373   casewriter_destroy (output);
374   agr_destroy (&agr);
375   fh_unref (out_file);
376   return CMD_CASCADING_FAILURE;
377 }
378
379 /* Parse all the aggregate functions. */
380 static bool
381 parse_aggregate_functions (struct lexer *lexer, const struct dictionary *dict,
382                            struct agr_proc *agr)
383 {
384   size_t allocated_agr_vars = 0;
385
386   /* Parse everything. */
387   for (;;)
388     {
389       char **dest;
390       char **dest_label;
391       size_t n_dest;
392       struct string function_name;
393
394       enum mv_class exclude;
395       const struct agr_func *function;
396       int func_index;
397
398       union agr_argument arg[2];
399
400       const struct variable **src;
401       size_t n_src;
402
403       size_t i;
404
405       dest = NULL;
406       dest_label = NULL;
407       n_dest = 0;
408       src = NULL;
409       function = NULL;
410       n_src = 0;
411       arg[0].c = NULL;
412       arg[1].c = NULL;
413       ds_init_empty (&function_name);
414
415       /* Parse the list of target variables. */
416       while (!lex_match (lexer, T_EQUALS))
417         {
418           size_t n_dest_prev = n_dest;
419
420           if (!parse_DATA_LIST_vars (lexer, dict, &dest, &n_dest,
421                                      (PV_APPEND | PV_SINGLE | PV_NO_SCRATCH
422                                       | PV_NO_DUPLICATE)))
423             goto error;
424
425           /* Assign empty labels. */
426           {
427             int j;
428
429             dest_label = xnrealloc (dest_label, n_dest, sizeof *dest_label);
430             for (j = n_dest_prev; j < n_dest; j++)
431               dest_label[j] = NULL;
432           }
433
434
435
436           if (lex_is_string (lexer))
437             {
438               dest_label[n_dest - 1] = xstrdup (lex_tokcstr (lexer));
439               lex_get (lexer);
440             }
441         }
442
443       /* Get the name of the aggregation function. */
444       if (lex_token (lexer) != T_ID)
445         {
446           lex_error (lexer, _("Syntax error expecting aggregation function."));
447           goto error;
448         }
449
450       ds_assign_substring (&function_name, lex_tokss (lexer));
451       exclude = ds_chomp_byte (&function_name, '.') ? MV_SYSTEM : MV_ANY;
452
453       for (function = agr_func_tab; function->name; function++)
454         if (!c_strcasecmp (function->name, ds_cstr (&function_name)))
455           break;
456       if (NULL == function->name)
457         {
458           lex_error (lexer, _("Unknown aggregation function %s."),
459                      ds_cstr (&function_name));
460           goto error;
461         }
462       ds_destroy (&function_name);
463       func_index = function - agr_func_tab;
464       lex_get (lexer);
465
466       /* Check for leading lparen. */
467       if (!lex_match (lexer, T_LPAREN))
468         {
469           if (function->src_vars == AGR_SV_YES)
470             {
471               goto error;
472             }
473         }
474       else
475         {
476           /* Parse list of source variables. */
477           int pv_opts = PV_NO_SCRATCH;
478           if (func_index == SUM || func_index == MEAN || func_index == SD)
479             pv_opts |= PV_NUMERIC;
480           else if (function->n_args)
481             pv_opts |= PV_SAME_TYPE;
482
483           int vars_start_ofs = lex_ofs (lexer);
484           if (!parse_variables_const (lexer, dict, &src, &n_src, pv_opts))
485             goto error;
486           int vars_end_ofs = lex_ofs (lexer) - 1;
487
488           /* Parse function arguments, for those functions that
489              require arguments. */
490           int args_start_ofs = 0;
491           if (function->n_args != 0)
492             for (i = 0; i < function->n_args; i++)
493               {
494                 int type;
495
496                 lex_match (lexer, T_COMMA);
497                 if (i == 0)
498                   args_start_ofs = lex_ofs (lexer);
499                 if (lex_is_string (lexer))
500                   {
501                     arg[i].c = recode_string (dict_get_encoding (agr->dict),
502                                               "UTF-8", lex_tokcstr (lexer),
503                                               -1);
504                     type = VAL_STRING;
505                   }
506                 else if (lex_is_number (lexer))
507                   {
508                     arg[i].f = lex_tokval (lexer);
509                     type = VAL_NUMERIC;
510                   }
511                 else
512                   {
513                     lex_error (lexer, _("Missing argument %zu to %s."),
514                                i + 1, function->name);
515                     goto error;
516                   }
517                 if (type != var_get_type (src[0]))
518                   {
519                     msg (SE, _("Arguments to %s must be of same type as "
520                                "source variables."),
521                          function->name);
522                     if (type == VAL_NUMERIC)
523                       {
524                         lex_next_msg (lexer, SN, 0, 0,
525                                       _("The argument is numeric."));
526                         lex_ofs_msg (lexer, SN, vars_start_ofs, vars_end_ofs,
527                                      _("The variables have string type."));
528                       }
529                     else
530                       {
531                         lex_next_msg (lexer, SN, 0, 0,
532                                       _("The argument is a string."));
533                         lex_ofs_msg (lexer, SN, vars_start_ofs, vars_end_ofs,
534                                      _("The variables are numeric."));
535                       }
536                     goto error;
537                   }
538
539                 lex_get (lexer);
540               }
541           int args_end_ofs = lex_ofs (lexer) - 1;
542
543           /* Trailing rparen. */
544           if (!lex_force_match (lexer, T_RPAREN))
545             goto error;
546
547           /* Now check that the number of source variables match
548              the number of target variables.  If we check earlier
549              than this, the user can get very misleading error
550              message, i.e. `AGGREGATE x=SUM(y t).' will get this
551              error message when a proper message would be more
552              like `unknown variable t'. */
553           if (n_src != n_dest)
554             {
555               msg (SE, _("Number of source variables (%zu) does not match "
556                          "number of target variables (%zu)."),
557                     n_src, n_dest);
558               goto error;
559             }
560
561           if ((func_index == PIN || func_index == POUT
562               || func_index == FIN || func_index == FOUT)
563               && (var_is_numeric (src[0])
564                   ? arg[0].f > arg[1].f
565                   : str_compare_rpad (arg[0].c, arg[1].c) > 0))
566             {
567               union agr_argument t = arg[0];
568               arg[0] = arg[1];
569               arg[1] = t;
570
571               lex_ofs_msg (lexer, SW, args_start_ofs, args_end_ofs,
572                            _("The value arguments passed to the %s function "
573                              "are out of order.  They will be treated as if "
574                              "they had been specified in the correct order."),
575                            function->name);
576             }
577         }
578
579       /* Finally add these to the linked list of aggregation
580          variables. */
581       for (i = 0; i < n_dest; i++)
582         {
583           if (agr->n_agr_vars >= allocated_agr_vars)
584             agr->agr_vars = x2nrealloc (agr->agr_vars, &allocated_agr_vars,
585                                         sizeof *agr->agr_vars);
586           struct agr_var *v = &agr->agr_vars[agr->n_agr_vars++];
587           *v = (struct agr_var) {
588             .moments = NULL,
589             .src = src ? src[i] : NULL,
590             .exclude = exclude,
591           };
592
593           /* Create the target variable in the aggregate
594              dictionary. */
595           {
596             struct variable *destvar;
597
598             v->function = func_index;
599
600             if (src)
601               {
602                 if (var_is_alpha (src[i]))
603                   {
604                     v->function |= FSTRING;
605                     v->string = xmalloc (var_get_width (src[i]));
606                   }
607
608                 if (function->alpha_type == VAL_STRING)
609                   destvar = dict_clone_var_as (agr->dict, v->src, dest[i]);
610                 else
611                   {
612                     assert (var_is_numeric (v->src)
613                             || function->alpha_type == VAL_NUMERIC);
614                     destvar = dict_create_var (agr->dict, dest[i], 0);
615                     if (destvar != NULL)
616                       {
617                         struct fmt_spec f;
618                         if ((func_index == N || func_index == NMISS)
619                             && dict_get_weight (dict) != NULL)
620                           f = fmt_for_output (FMT_F, 8, 2);
621                         else
622                           f = function->format;
623                         var_set_both_formats (destvar, &f);
624                       }
625                   }
626               } else {
627                 struct fmt_spec f;
628                 destvar = dict_create_var (agr->dict, dest[i], 0);
629                 if (destvar != NULL)
630                   {
631                     if ((func_index == N || func_index == NMISS)
632                         && dict_get_weight (dict) != NULL)
633                       f = fmt_for_output (FMT_F, 8, 2);
634                     else
635                       f = function->format;
636                     var_set_both_formats (destvar, &f);
637                   }
638             }
639
640             if (!destvar)
641               {
642                 msg (SE, _("Variable name %s is not unique within the "
643                            "aggregate file dictionary, which contains "
644                            "the aggregate variables and the break "
645                            "variables."),
646                      dest[i]);
647                 goto error;
648               }
649
650             free (dest[i]);
651             if (dest_label[i])
652               var_set_label (destvar, dest_label[i]);
653
654             v->dest = destvar;
655           }
656
657           if (v->src != NULL)
658             {
659               int j;
660
661               if (var_is_numeric (v->src))
662                 for (j = 0; j < function->n_args; j++)
663                   v->arg[j].f = arg[j].f;
664               else
665                 for (j = 0; j < function->n_args; j++)
666                   v->arg[j].c = xstrdup (arg[j].c);
667             }
668         }
669
670       if (src != NULL && var_is_alpha (src[0]))
671         for (i = 0; i < function->n_args; i++)
672           {
673             free (arg[i].c);
674             arg[i].c = NULL;
675           }
676
677       free (src);
678       free (dest);
679       free (dest_label);
680
681       if (!lex_match (lexer, T_SLASH))
682         {
683           if (lex_token (lexer) == T_ENDCMD)
684             return true;
685
686           lex_error (lexer, "Syntax error expecting end of command.");
687           return false;
688         }
689       continue;
690
691     error:
692       ds_destroy (&function_name);
693       for (i = 0; i < n_dest; i++)
694         {
695           free (dest[i]);
696           free (dest_label[i]);
697         }
698       free (dest);
699       free (dest_label);
700       free (arg[0].c);
701       free (arg[1].c);
702       if (src && n_src && var_is_alpha (src[0]))
703         for (i = 0; i < function->n_args; i++)
704           {
705             free (arg[i].c);
706             arg[i].c = NULL;
707           }
708       free (src);
709
710       return false;
711     }
712 }
713
714 /* Destroys AGR. */
715 static void
716 agr_destroy (struct agr_proc *agr)
717 {
718   subcase_uninit (&agr->sort);
719   free (agr->break_vars);
720   for (size_t i = 0; i < agr->n_agr_vars; i++)
721     {
722       struct agr_var *av = &agr->agr_vars[i];
723
724       if (av->function & FSTRING)
725         {
726           size_t n_args = agr_func_tab[av->function & FUNC].n_args;
727           for (size_t i = 0; i < n_args; i++)
728             free (av->arg[i].c);
729           free (av->string);
730         }
731       else if (av->function == SD)
732         moments1_destroy (av->moments);
733
734       dict_destroy_internal_var (av->subject);
735       dict_destroy_internal_var (av->weight);
736     }
737   free (agr->agr_vars);
738   if (agr->dict != NULL)
739     dict_unref (agr->dict);
740 }
741 \f
742 /* Execution. */
743
744 /* Accumulates aggregation data from the case INPUT. */
745 static void
746 accumulate_aggregate_info (struct agr_proc *agr, const struct ccase *input)
747 {
748   double weight;
749   bool bad_warn = true;
750
751   weight = dict_get_case_weight (agr->src_dict, input, &bad_warn);
752
753   for (size_t i = 0; i < agr->n_agr_vars; i++)
754     {
755       struct agr_var *av = &agr->agr_vars[i];
756       if (av->src)
757         {
758           const union value *v = case_data (input, av->src);
759           int src_width = var_get_width (av->src);
760
761           if (var_is_value_missing (av->src, v) & av->exclude)
762             {
763               switch (av->function)
764                 {
765                 case NMISS:
766                 case NMISS | FSTRING:
767                   av->dbl[0] += weight;
768                   break;
769                 case NUMISS:
770                 case NUMISS | FSTRING:
771                   av->int1++;
772                   break;
773                 }
774               av->saw_missing = true;
775               continue;
776             }
777
778           /* This is horrible.  There are too many possibilities. */
779           switch (av->function)
780             {
781             case SUM:
782               av->dbl[0] += v->f * weight;
783               av->int1 = 1;
784               break;
785             case MEAN:
786               av->dbl[0] += v->f * weight;
787               av->dbl[1] += weight;
788               break;
789             case MEDIAN:
790               {
791                 double wv ;
792                 struct ccase *cout;
793
794                 cout = case_create (casewriter_get_proto (av->writer));
795
796                 *case_num_rw (cout, av->subject) = case_num (input, av->src);
797
798                 wv = dict_get_case_weight (agr->src_dict, input, NULL);
799
800                 *case_num_rw (cout, av->weight) = wv;
801
802                 av->cc += wv;
803
804                 casewriter_write (av->writer, cout);
805               }
806               break;
807             case SD:
808               moments1_add (av->moments, v->f, weight);
809               break;
810             case MAX:
811               av->dbl[0] = MAX (av->dbl[0], v->f);
812               av->int1 = 1;
813               break;
814             case MAX | FSTRING:
815               /* Need to do some kind of Unicode collation thingy here */
816               if (memcmp (av->string, v->s, src_width) < 0)
817                 memcpy (av->string, v->s, src_width);
818               av->int1 = 1;
819               break;
820             case MIN:
821               av->dbl[0] = MIN (av->dbl[0], v->f);
822               av->int1 = 1;
823               break;
824             case MIN | FSTRING:
825               if (memcmp (av->string, v->s, src_width) > 0)
826                 memcpy (av->string, v->s, src_width);
827               av->int1 = 1;
828               break;
829             case FGT:
830             case PGT:
831               if (v->f > av->arg[0].f)
832                 av->dbl[0] += weight;
833               av->dbl[1] += weight;
834               break;
835             case FGT | FSTRING:
836             case PGT | FSTRING:
837               if (memcmp (av->arg[0].c, v->s, src_width) < 0)
838                 av->dbl[0] += weight;
839               av->dbl[1] += weight;
840               break;
841             case FLT:
842             case PLT:
843               if (v->f < av->arg[0].f)
844                 av->dbl[0] += weight;
845               av->dbl[1] += weight;
846               break;
847             case FLT | FSTRING:
848             case PLT | FSTRING:
849               if (memcmp (av->arg[0].c, v->s, src_width) > 0)
850                 av->dbl[0] += weight;
851               av->dbl[1] += weight;
852               break;
853             case FIN:
854             case PIN:
855               if (av->arg[0].f <= v->f && v->f <= av->arg[1].f)
856                 av->dbl[0] += weight;
857               av->dbl[1] += weight;
858               break;
859             case FIN | FSTRING:
860             case PIN | FSTRING:
861               if (memcmp (av->arg[0].c, v->s, src_width) <= 0
862                   && memcmp (av->arg[1].c, v->s, src_width) >= 0)
863                 av->dbl[0] += weight;
864               av->dbl[1] += weight;
865               break;
866             case FOUT:
867             case POUT:
868               if (av->arg[0].f > v->f || v->f > av->arg[1].f)
869                 av->dbl[0] += weight;
870               av->dbl[1] += weight;
871               break;
872             case FOUT | FSTRING:
873             case POUT | FSTRING:
874               if (memcmp (av->arg[0].c, v->s, src_width) > 0
875                   || memcmp (av->arg[1].c, v->s, src_width) < 0)
876                 av->dbl[0] += weight;
877               av->dbl[1] += weight;
878               break;
879             case N:
880             case N | FSTRING:
881               av->dbl[0] += weight;
882               break;
883             case NU:
884             case NU | FSTRING:
885               av->int1++;
886               break;
887             case FIRST:
888               if (av->int1 == 0)
889                 {
890                   av->dbl[0] = v->f;
891                   av->int1 = 1;
892                 }
893               break;
894             case FIRST | FSTRING:
895               if (av->int1 == 0)
896                 {
897                   memcpy (av->string, v->s, src_width);
898                   av->int1 = 1;
899                 }
900               break;
901             case LAST:
902               av->dbl[0] = v->f;
903               av->int1 = 1;
904               break;
905             case LAST | FSTRING:
906               memcpy (av->string, v->s, src_width);
907               av->int1 = 1;
908               break;
909             case NMISS:
910             case NMISS | FSTRING:
911             case NUMISS:
912             case NUMISS | FSTRING:
913               /* Our value is not missing or it would have been
914                  caught earlier.  Nothing to do. */
915               break;
916             default:
917               NOT_REACHED ();
918             }
919         }
920       else
921         {
922           switch (av->function)
923             {
924             case N:
925               av->dbl[0] += weight;
926               break;
927             case NU:
928               av->int1++;
929               break;
930             default:
931               NOT_REACHED ();
932             }
933         }
934     }
935 }
936
937 /* Writes an aggregated record to OUTPUT. */
938 static void
939 dump_aggregate_info (const struct agr_proc *agr, struct casewriter *output, const struct ccase *break_case)
940 {
941   struct ccase *c = case_create (dict_get_proto (agr->dict));
942
943   if (agr->add_variables)
944     {
945       case_copy (c, 0, break_case, 0, dict_get_n_vars (agr->src_dict));
946     }
947   else
948     {
949       int value_idx = 0;
950       int i;
951
952       for (i = 0; i < agr->break_n_vars; i++)
953         {
954           const struct variable *v = agr->break_vars[i];
955           value_copy (case_data_rw_idx (c, value_idx),
956                       case_data (break_case, v),
957                       var_get_width (v));
958           value_idx++;
959         }
960     }
961
962   for (size_t i = 0; i < agr->n_agr_vars; i++)
963     {
964       struct agr_var *av = &agr->agr_vars[i];
965       union value *v = case_data_rw (c, av->dest);
966       int width = var_get_width (av->dest);
967
968       if (agr->missing == COLUMNWISE && av->saw_missing
969           && (av->function & FUNC) != N && (av->function & FUNC) != NU
970           && (av->function & FUNC) != NMISS && (av->function & FUNC) != NUMISS)
971         {
972           value_set_missing (v, width);
973           casewriter_destroy (av->writer);
974           continue;
975         }
976
977       switch (av->function)
978         {
979         case SUM:
980           v->f = av->int1 ? av->dbl[0] : SYSMIS;
981           break;
982         case MEAN:
983           v->f = av->dbl[1] != 0.0 ? av->dbl[0] / av->dbl[1] : SYSMIS;
984           break;
985         case MEDIAN:
986           {
987             if (av->writer)
988               {
989                 struct percentile *median = percentile_create (0.5, av->cc);
990                 struct order_stats *os = &median->parent;
991                 struct casereader *sorted_reader = casewriter_make_reader (av->writer);
992                 av->writer = NULL;
993
994                 order_stats_accumulate (&os, 1,
995                                         sorted_reader,
996                                         av->weight,
997                                         av->subject,
998                                         av->exclude);
999                 av->dbl[0] = percentile_calculate (median, PC_HAVERAGE);
1000                 statistic_destroy (&median->parent.parent);
1001               }
1002             v->f = av->dbl[0];
1003           }
1004           break;
1005         case SD:
1006           {
1007             double variance;
1008
1009             /* FIXME: we should use two passes. */
1010             moments1_calculate (av->moments, NULL, NULL, &variance,
1011                                 NULL, NULL);
1012             if (variance != SYSMIS)
1013               v->f = sqrt (variance);
1014             else
1015               v->f = SYSMIS;
1016           }
1017           break;
1018         case MAX:
1019         case MIN:
1020           v->f = av->int1 ? av->dbl[0] : SYSMIS;
1021           break;
1022         case MAX | FSTRING:
1023         case MIN | FSTRING:
1024           if (av->int1)
1025             memcpy (v->s, av->string, width);
1026           else
1027             value_set_missing (v, width);
1028           break;
1029         case FGT:
1030         case FGT | FSTRING:
1031         case FLT:
1032         case FLT | FSTRING:
1033         case FIN:
1034         case FIN | FSTRING:
1035         case FOUT:
1036         case FOUT | FSTRING:
1037           v->f = av->dbl[1] ? av->dbl[0] / av->dbl[1] : SYSMIS;
1038           break;
1039         case PGT:
1040         case PGT | FSTRING:
1041         case PLT:
1042         case PLT | FSTRING:
1043         case PIN:
1044         case PIN | FSTRING:
1045         case POUT:
1046         case POUT | FSTRING:
1047           v->f = av->dbl[1] ? av->dbl[0] / av->dbl[1] * 100.0 : SYSMIS;
1048           break;
1049         case N:
1050         case N | FSTRING:
1051           v->f = av->dbl[0];
1052           break;
1053         case NU:
1054         case NU | FSTRING:
1055           v->f = av->int1;
1056           break;
1057         case FIRST:
1058         case LAST:
1059           v->f = av->int1 ? av->dbl[0] : SYSMIS;
1060           break;
1061         case FIRST | FSTRING:
1062         case LAST | FSTRING:
1063           if (av->int1)
1064             memcpy (v->s, av->string, width);
1065           else
1066             value_set_missing (v, width);
1067           break;
1068         case NMISS:
1069         case NMISS | FSTRING:
1070           v->f = av->dbl[0];
1071           break;
1072         case NUMISS:
1073         case NUMISS | FSTRING:
1074           v->f = av->int1;
1075           break;
1076         default:
1077           NOT_REACHED ();
1078         }
1079     }
1080
1081   casewriter_write (output, c);
1082 }
1083
1084 /* Resets the state for all the aggregate functions. */
1085 static void
1086 initialize_aggregate_info (struct agr_proc *agr)
1087 {
1088   for (size_t i = 0; i < agr->n_agr_vars; i++)
1089     {
1090       struct agr_var *av = &agr->agr_vars[i];
1091       av->saw_missing = false;
1092       av->dbl[0] = av->dbl[1] = av->dbl[2] = 0.0;
1093       av->int1 = 0;
1094       switch (av->function)
1095         {
1096         case MIN:
1097           av->dbl[0] = DBL_MAX;
1098           break;
1099         case MIN | FSTRING:
1100           memset (av->string, 255, var_get_width (av->src));
1101           break;
1102         case MAX:
1103           av->dbl[0] = -DBL_MAX;
1104           break;
1105         case MAX | FSTRING:
1106           memset (av->string, 0, var_get_width (av->src));
1107           break;
1108         case MEDIAN:
1109           {
1110             struct caseproto *proto;
1111             struct subcase ordering;
1112
1113             proto = caseproto_create ();
1114             proto = caseproto_add_width (proto, 0);
1115             proto = caseproto_add_width (proto, 0);
1116
1117             if (! av->subject)
1118               av->subject = dict_create_internal_var (0, 0);
1119
1120             if (! av->weight)
1121               av->weight = dict_create_internal_var (1, 0);
1122
1123             subcase_init_var (&ordering, av->subject, SC_ASCEND);
1124             av->writer = sort_create_writer (&ordering, proto);
1125             subcase_uninit (&ordering);
1126             caseproto_unref (proto);
1127
1128             av->cc = 0;
1129           }
1130           break;
1131         case SD:
1132           if (av->moments == NULL)
1133             av->moments = moments1_create (MOMENT_VARIANCE);
1134           else
1135             moments1_clear (av->moments);
1136           break;
1137         default:
1138           break;
1139         }
1140     }
1141 }