CROSSTABS: Handle case where all cases in a crosstabulation are missing.
[pspp] / src / language / stats / aggregate.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2008, 2009, 2010 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <stdlib.h>
20
21 #include <data/any-writer.h>
22 #include <data/case.h>
23 #include <data/casegrouper.h>
24 #include <data/casereader.h>
25 #include <data/casewriter.h>
26 #include <data/dictionary.h>
27 #include <data/file-handle-def.h>
28 #include <data/format.h>
29 #include <data/procedure.h>
30 #include <data/settings.h>
31 #include <data/subcase.h>
32 #include <data/sys-file-writer.h>
33 #include <data/variable.h>
34 #include <language/command.h>
35 #include <language/data-io/file-handle.h>
36 #include <language/lexer/lexer.h>
37 #include <language/lexer/variable-parser.h>
38 #include <language/stats/sort-criteria.h>
39 #include <libpspp/assertion.h>
40 #include <libpspp/message.h>
41 #include <libpspp/misc.h>
42 #include <libpspp/pool.h>
43 #include <libpspp/str.h>
44 #include <math/moments.h>
45 #include <math/sort.h>
46 #include <math/statistic.h>
47 #include <math/percentiles.h>
48
49 #include "aggregate.h"
50
51 #include "minmax.h"
52 #include "xalloc.h"
53
54 #include "gettext.h"
55 #define _(msgid) gettext (msgid)
56 #define N_(msgid) msgid
57
58 /* Argument for AGGREGATE function. */
59 union agr_argument
60   {
61     double f;                           /* Numeric. */
62     char *c;                            /* Short or long string. */
63   };
64
65 /* Specifies how to make an aggregate variable. */
66 struct agr_var
67   {
68     struct agr_var *next;               /* Next in list. */
69
70     /* Collected during parsing. */
71     const struct variable *src; /* Source variable. */
72     struct variable *dest;      /* Target variable. */
73     int function;               /* Function. */
74     enum mv_class exclude;      /* Classes of missing values to exclude. */
75     union agr_argument arg[2];  /* Arguments. */
76
77     /* Accumulated during AGGREGATE execution. */
78     double dbl[3];
79     int int1, int2;
80     char *string;
81     bool saw_missing;
82     struct moments1 *moments;
83     double cc;
84
85     struct variable *subject;
86     struct variable *weight;
87     struct casewriter *writer;
88   };
89
90
91 /* Attributes of aggregation functions. */
92 const struct agr_func agr_func_tab[] =
93   {
94     {"SUM",     N_("Sum of values"), AGR_SV_YES, 0, -1,          {FMT_F, 8, 2}},
95     {"MEAN",    N_("Mean average"), AGR_SV_YES, 0, -1,          {FMT_F, 8, 2}},
96     {"MEDIAN",  N_("Median average"), AGR_SV_YES, 0, -1,          {FMT_F, 8, 2}},
97     {"SD",      N_("Standard deviation"), AGR_SV_YES, 0, -1,          {FMT_F, 8, 2}},
98     {"MAX",     N_("Maximum value"), AGR_SV_YES, 0, VAL_STRING,  {-1, -1, -1}},
99     {"MIN",     N_("Minimum value"), AGR_SV_YES, 0, VAL_STRING,  {-1, -1, -1}},
100     {"PGT",     N_("Percentage greater than"), AGR_SV_YES, 1, VAL_NUMERIC, {FMT_F, 5, 1}},
101     {"PLT",     N_("Percentage less than"), AGR_SV_YES, 1, VAL_NUMERIC, {FMT_F, 5, 1}},
102     {"PIN",     N_("Percentage included in range"), AGR_SV_YES, 2, VAL_NUMERIC, {FMT_F, 5, 1}},
103     {"POUT",    N_("Percentage excluded from range"), AGR_SV_YES, 2, VAL_NUMERIC, {FMT_F, 5, 1}},
104     {"FGT",     N_("Fraction greater than"), AGR_SV_YES, 1, VAL_NUMERIC, {FMT_F, 5, 3}},
105     {"FLT",     N_("Fraction less than"), AGR_SV_YES, 1, VAL_NUMERIC, {FMT_F, 5, 3}},
106     {"FIN",     N_("Fraction included in range"), AGR_SV_YES, 2, VAL_NUMERIC, {FMT_F, 5, 3}},
107     {"FOUT",    N_("Fraction excluded from range"), AGR_SV_YES, 2, VAL_NUMERIC, {FMT_F, 5, 3}},
108     {"N",       N_("Number of cases"), AGR_SV_NO, 0, VAL_NUMERIC, {FMT_F, 7, 0}},
109     {"NU",      N_("Number of cases (unweighted)"), AGR_SV_OPT, 0, VAL_NUMERIC, {FMT_F, 7, 0}},
110     {"NMISS",   N_("Number of missing values"), AGR_SV_YES, 0, VAL_NUMERIC, {FMT_F, 7, 0}},
111     {"NUMISS",  N_("Number of missing values (unweighted)"), AGR_SV_YES, 0, VAL_NUMERIC, {FMT_F, 7, 0}},
112     {"FIRST",   N_("First non-missing value"), AGR_SV_YES, 0, VAL_STRING,  {-1, -1, -1}},
113     {"LAST",    N_("Last non-missing value"), AGR_SV_YES, 0, VAL_STRING,  {-1, -1, -1}},
114     {NULL,      NULL, AGR_SV_NO, 0, -1,          {-1, -1, -1}},
115   };
116
117 /* Missing value types. */
118 enum missing_treatment
119   {
120     ITEMWISE,           /* Missing values item by item. */
121     COLUMNWISE          /* Missing values column by column. */
122   };
123
124 /* An entire AGGREGATE procedure. */
125 struct agr_proc
126   {
127     /* Break variables. */
128     struct subcase sort;                /* Sort criteria (break variables). */
129     const struct variable **break_vars;       /* Break variables. */
130     size_t break_var_cnt;               /* Number of break variables. */
131
132     enum missing_treatment missing;     /* How to treat missing values. */
133     struct agr_var *agr_vars;           /* First aggregate variable. */
134     struct dictionary *dict;            /* Aggregate dictionary. */
135     const struct dictionary *src_dict;  /* Dict of the source */
136     int case_cnt;                       /* Counts aggregated cases. */
137
138     bool add_variables;                 /* True iff the aggregated variables should
139                                            be appended to the existing dictionary */
140   };
141
142 static void initialize_aggregate_info (struct agr_proc *);
143
144 static void accumulate_aggregate_info (struct agr_proc *,
145                                        const struct ccase *);
146 /* Prototypes. */
147 static bool parse_aggregate_functions (struct lexer *, const struct dictionary *,
148                                        struct agr_proc *);
149 static void agr_destroy (struct agr_proc *);
150 static void dump_aggregate_info (const struct agr_proc *agr,
151                                  struct casewriter *output,
152                                  const struct ccase *break_case);
153 \f
154 /* Parsing. */
155
156 /* Parses and executes the AGGREGATE procedure. */
157 int
158 cmd_aggregate (struct lexer *lexer, struct dataset *ds)
159 {
160   struct dictionary *dict = dataset_dict (ds);
161   struct agr_proc agr;
162   struct file_handle *out_file = NULL;
163   struct casereader *input = NULL, *group;
164   struct casegrouper *grouper;
165   struct casewriter *output = NULL;
166
167   bool copy_documents = false;
168   bool presorted = false;
169   bool saw_direction;
170   bool ok;
171
172   memset(&agr, 0 , sizeof (agr));
173   agr.missing = ITEMWISE;
174   agr.src_dict = dict;
175   subcase_init_empty (&agr.sort);
176
177   /* OUTFILE subcommand must be first. */
178   lex_match (lexer, '/');
179   if (!lex_force_match_id (lexer, "OUTFILE"))
180     goto error;
181   lex_match (lexer, '=');
182   if (!lex_match (lexer, '*'))
183     {
184       out_file = fh_parse (lexer, FH_REF_FILE | FH_REF_SCRATCH);
185       if (out_file == NULL)
186         goto error;
187     }
188
189   if (out_file == NULL && lex_match_id (lexer, "MODE"))
190     {
191       lex_match (lexer, '=');
192       if (lex_match_id (lexer, "ADDVARIABLES"))
193         {
194           agr.add_variables = true;
195
196           /* presorted is assumed in ADDVARIABLES mode */
197           presorted = true;
198         }
199       else if (lex_match_id (lexer, "REPLACE"))
200         {
201           agr.add_variables = false;
202         }
203       else
204         goto error;
205     }
206
207   if ( agr.add_variables )
208     agr.dict = dict_clone (dict);
209   else
210     agr.dict = dict_create ();    
211
212   dict_set_label (agr.dict, dict_get_label (dict));
213   dict_set_documents (agr.dict, dict_get_documents (dict));
214
215   /* Read most of the subcommands. */
216   for (;;)
217     {
218       lex_match (lexer, '/');
219
220       if (lex_match_id (lexer, "MISSING"))
221         {
222           lex_match (lexer, '=');
223           if (!lex_match_id (lexer, "COLUMNWISE"))
224             {
225               lex_error (lexer, _("expecting %s"), "COLUMNWISE");
226               goto error;
227             }
228           agr.missing = COLUMNWISE;
229         }
230       else if (lex_match_id (lexer, "DOCUMENT"))
231         copy_documents = true;
232       else if (lex_match_id (lexer, "PRESORTED"))
233         presorted = true;
234       else if (lex_force_match_id (lexer, "BREAK"))
235         {
236           int i;
237
238           lex_match (lexer, '=');
239           if (!parse_sort_criteria (lexer, dict, &agr.sort, &agr.break_vars,
240                                     &saw_direction))
241             goto error;
242           agr.break_var_cnt = subcase_get_n_fields (&agr.sort);
243
244           if  (! agr.add_variables)
245             for (i = 0; i < agr.break_var_cnt; i++)
246               dict_clone_var_assert (agr.dict, agr.break_vars[i]);
247
248           /* BREAK must follow the options. */
249           break;
250         }
251       else
252         goto error;
253
254     }
255   if (presorted && saw_direction)
256     msg (SW, _("When PRESORTED is specified, specifying sorting directions "
257                "with (A) or (D) has no effect.  Output data will be sorted "
258                "the same way as the input data."));
259
260   /* Read in the aggregate functions. */
261   lex_match (lexer, '/');
262   if (!parse_aggregate_functions (lexer, dict, &agr))
263     goto error;
264
265   /* Delete documents. */
266   if (!copy_documents)
267     dict_clear_documents (agr.dict);
268
269   /* Cancel SPLIT FILE. */
270   dict_set_split_vars (agr.dict, NULL, 0);
271
272   /* Initialize. */
273   agr.case_cnt = 0;
274
275   if (out_file == NULL)
276     {
277       /* The active file will be replaced by the aggregated data,
278          so TEMPORARY is moot. */
279       proc_cancel_temporary_transformations (ds);
280       proc_discard_output (ds);
281       output = autopaging_writer_create (dict_get_proto (agr.dict));
282     }
283   else
284     {
285       output = any_writer_open (out_file, agr.dict);
286       if (output == NULL)
287         goto error;
288     }
289
290   input = proc_open (ds);
291   if (!subcase_is_empty (&agr.sort) && !presorted)
292     {
293       input = sort_execute (input, &agr.sort);
294       subcase_clear (&agr.sort);
295     }
296
297   for (grouper = casegrouper_create_vars (input, agr.break_vars,
298                                           agr.break_var_cnt);
299        casegrouper_get_next_group (grouper, &group);
300        casereader_destroy (group))
301     {
302       struct casereader *placeholder = NULL;
303       struct ccase *c = casereader_peek (group, 0);
304
305       if (c == NULL)
306         {
307           casereader_destroy (group);
308           continue;
309         }
310
311       initialize_aggregate_info (&agr);
312
313       if ( agr.add_variables )
314         placeholder = casereader_clone (group);
315
316       {
317         struct ccase *cg;
318         for (; (cg = casereader_read (group)) != NULL; case_unref (cg))
319           accumulate_aggregate_info (&agr, cg);
320       }
321
322
323       if  (agr.add_variables)
324         {
325           struct ccase *cg;
326           for (; (cg = casereader_read (placeholder)) != NULL; case_unref (cg))
327             dump_aggregate_info (&agr, output, cg);
328
329           casereader_destroy (placeholder);
330         }
331       else
332         {
333           dump_aggregate_info (&agr, output, c);
334           case_unref (c);
335         }
336     }
337   if (!casegrouper_destroy (grouper))
338     goto error;
339
340   if (!proc_commit (ds))
341     {
342       input = NULL;
343       goto error;
344     }
345   input = NULL;
346
347   if (out_file == NULL)
348     {
349       struct casereader *next_input = casewriter_make_reader (output);
350       if (next_input == NULL)
351         goto error;
352
353       proc_set_active_file (ds, next_input, agr.dict);
354       agr.dict = NULL;
355     }
356   else
357     {
358       ok = casewriter_destroy (output);
359       output = NULL;
360       if (!ok)
361         goto error;
362     }
363
364   agr_destroy (&agr);
365   fh_unref (out_file);
366   return CMD_SUCCESS;
367
368 error:
369   if (input != NULL)
370     proc_commit (ds);
371   casewriter_destroy (output);
372   agr_destroy (&agr);
373   fh_unref (out_file);
374   return CMD_CASCADING_FAILURE;
375 }
376
377 /* Parse all the aggregate functions. */
378 static bool
379 parse_aggregate_functions (struct lexer *lexer, const struct dictionary *dict,
380                            struct agr_proc *agr)
381 {
382   struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
383
384   /* Parse everything. */
385   tail = NULL;
386   for (;;)
387     {
388       char **dest;
389       char **dest_label;
390       size_t n_dest;
391       struct string function_name;
392
393       enum mv_class exclude;
394       const struct agr_func *function;
395       int func_index;
396
397       union agr_argument arg[2];
398
399       const struct variable **src;
400       size_t n_src;
401
402       size_t i;
403
404       dest = NULL;
405       dest_label = NULL;
406       n_dest = 0;
407       src = NULL;
408       function = NULL;
409       n_src = 0;
410       arg[0].c = NULL;
411       arg[1].c = NULL;
412       ds_init_empty (&function_name);
413
414       /* Parse the list of target variables. */
415       while (!lex_match (lexer, '='))
416         {
417           size_t n_dest_prev = n_dest;
418
419           if (!parse_DATA_LIST_vars (lexer, &dest, &n_dest,
420                                      (PV_APPEND | PV_SINGLE | PV_NO_SCRATCH
421                                       | PV_NO_DUPLICATE)))
422             goto error;
423
424           /* Assign empty labels. */
425           {
426             int j;
427
428             dest_label = xnrealloc (dest_label, n_dest, sizeof *dest_label);
429             for (j = n_dest_prev; j < n_dest; j++)
430               dest_label[j] = NULL;
431           }
432
433
434
435           if (lex_is_string (lexer))
436             {
437               struct string label;
438               ds_init_string (&label, lex_tokstr (lexer));
439
440               ds_truncate (&label, 255);
441               dest_label[n_dest - 1] = ds_xstrdup (&label);
442               lex_get (lexer);
443               ds_destroy (&label);
444             }
445         }
446
447       /* Get the name of the aggregation function. */
448       if (lex_token (lexer) != T_ID)
449         {
450           lex_error (lexer, _("expecting aggregation function"));
451           goto error;
452         }
453
454       ds_assign_string (&function_name, lex_tokstr (lexer));
455       exclude = ds_chomp (&function_name, '.') ? MV_SYSTEM : MV_ANY;
456
457       for (function = agr_func_tab; function->name; function++)
458         if (!strcasecmp (function->name, ds_cstr (&function_name)))
459           break;
460       if (NULL == function->name)
461         {
462           msg (SE, _("Unknown aggregation function %s."),
463                ds_cstr (&function_name));
464           goto error;
465         }
466       ds_destroy (&function_name);
467       func_index = function - agr_func_tab;
468       lex_get (lexer);
469
470       /* Check for leading lparen. */
471       if (!lex_match (lexer, '('))
472         {
473           if (function->src_vars == AGR_SV_YES)
474             {
475               lex_force_match (lexer, '(');
476               goto error;
477             }
478         }
479       else
480         {
481           /* Parse list of source variables. */
482           {
483             int pv_opts = PV_NO_SCRATCH;
484
485             if (func_index == SUM || func_index == MEAN || func_index == SD)
486               pv_opts |= PV_NUMERIC;
487             else if (function->n_args)
488               pv_opts |= PV_SAME_TYPE;
489
490             if (!parse_variables_const (lexer, dict, &src, &n_src, pv_opts))
491               goto error;
492           }
493
494           /* Parse function arguments, for those functions that
495              require arguments. */
496           if (function->n_args != 0)
497             for (i = 0; i < function->n_args; i++)
498               {
499                 int type;
500
501                 lex_match (lexer, ',');
502                 if (lex_is_string (lexer))
503                   {
504                     arg[i].c = ds_xstrdup (lex_tokstr (lexer));
505                     type = VAL_STRING;
506                   }
507                 else if (lex_is_number (lexer))
508                   {
509                     arg[i].f = lex_tokval (lexer);
510                     type = VAL_NUMERIC;
511                   }
512                 else
513                   {
514                     msg (SE, _("Missing argument %zu to %s."),
515                          i + 1, function->name);
516                     goto error;
517                   }
518
519                 lex_get (lexer);
520
521                 if (type != var_get_type (src[0]))
522                   {
523                     msg (SE, _("Arguments to %s must be of same type as "
524                                "source variables."),
525                          function->name);
526                     goto error;
527                   }
528               }
529
530           /* Trailing rparen. */
531           if (!lex_force_match (lexer, ')'))
532             goto error;
533
534           /* Now check that the number of source variables match
535              the number of target variables.  If we check earlier
536              than this, the user can get very misleading error
537              message, i.e. `AGGREGATE x=SUM(y t).' will get this
538              error message when a proper message would be more
539              like `unknown variable t'. */
540           if (n_src != n_dest)
541             {
542               msg (SE, _("Number of source variables (%zu) does not match "
543                          "number of target variables (%zu)."),
544                     n_src, n_dest);
545               goto error;
546             }
547
548           if ((func_index == PIN || func_index == POUT
549               || func_index == FIN || func_index == FOUT)
550               && (var_is_numeric (src[0])
551                   ? arg[0].f > arg[1].f
552                   : str_compare_rpad (arg[0].c, arg[1].c) > 0))
553             {
554               union agr_argument t = arg[0];
555               arg[0] = arg[1];
556               arg[1] = t;
557
558               msg (SW, _("The value arguments passed to the %s function "
559                          "are out-of-order.  They will be treated as if "
560                          "they had been specified in the correct order."),
561                    function->name);
562             }
563         }
564
565       /* Finally add these to the linked list of aggregation
566          variables. */
567       for (i = 0; i < n_dest; i++)
568         {
569           struct agr_var *v = xzalloc (sizeof *v);
570
571           /* Add variable to chain. */
572           if (agr->agr_vars != NULL)
573             tail->next = v;
574           else
575             agr->agr_vars = v;
576           tail = v;
577           tail->next = NULL;
578           v->moments = NULL;
579
580           /* Create the target variable in the aggregate
581              dictionary. */
582           {
583             struct variable *destvar;
584
585             v->function = func_index;
586
587             if (src)
588               {
589                 v->src = src[i];
590
591                 if (var_is_alpha (src[i]))
592                   {
593                     v->function |= FSTRING;
594                     v->string = xmalloc (var_get_width (src[i]));
595                   }
596
597                 if (function->alpha_type == VAL_STRING)
598                   destvar = dict_clone_var_as (agr->dict, v->src, dest[i]);
599                 else
600                   {
601                     assert (var_is_numeric (v->src)
602                             || function->alpha_type == VAL_NUMERIC);
603                     destvar = dict_create_var (agr->dict, dest[i], 0);
604                     if (destvar != NULL)
605                       {
606                         struct fmt_spec f;
607                         if ((func_index == N || func_index == NMISS)
608                             && dict_get_weight (dict) != NULL)
609                           f = fmt_for_output (FMT_F, 8, 2);
610                         else
611                           f = function->format;
612                         var_set_both_formats (destvar, &f);
613                       }
614                   }
615               } else {
616                 struct fmt_spec f;
617                 v->src = NULL;
618                 destvar = dict_create_var (agr->dict, dest[i], 0);
619                 if (destvar != NULL)
620                   {
621                     if ((func_index == N || func_index == NMISS)
622                         && dict_get_weight (dict) != NULL)
623                       f = fmt_for_output (FMT_F, 8, 2);
624                     else
625                       f = function->format;
626                     var_set_both_formats (destvar, &f);
627                   }
628             }
629
630             if (!destvar)
631               {
632                 msg (SE, _("Variable name %s is not unique within the "
633                            "aggregate file dictionary, which contains "
634                            "the aggregate variables and the break "
635                            "variables."),
636                      dest[i]);
637                 goto error;
638               }
639
640             free (dest[i]);
641             if (dest_label[i])
642               var_set_label (destvar, dest_label[i]);
643
644             v->dest = destvar;
645           }
646
647           v->exclude = exclude;
648
649           if (v->src != NULL)
650             {
651               int j;
652
653               if (var_is_numeric (v->src))
654                 for (j = 0; j < function->n_args; j++)
655                   v->arg[j].f = arg[j].f;
656               else
657                 for (j = 0; j < function->n_args; j++)
658                   v->arg[j].c = xstrdup (arg[j].c);
659             }
660         }
661
662       if (src != NULL && var_is_alpha (src[0]))
663         for (i = 0; i < function->n_args; i++)
664           {
665             free (arg[i].c);
666             arg[i].c = NULL;
667           }
668
669       free (src);
670       free (dest);
671       free (dest_label);
672
673       if (!lex_match (lexer, '/'))
674         {
675           if (lex_token (lexer) == '.')
676             return true;
677
678           lex_error (lexer, "expecting end of command");
679           return false;
680         }
681       continue;
682
683     error:
684       ds_destroy (&function_name);
685       for (i = 0; i < n_dest; i++)
686         {
687           free (dest[i]);
688           free (dest_label[i]);
689         }
690       free (dest);
691       free (dest_label);
692       free (arg[0].c);
693       free (arg[1].c);
694       if (src && n_src && var_is_alpha (src[0]))
695         for (i = 0; i < function->n_args; i++)
696           {
697             free (arg[i].c);
698             arg[i].c = NULL;
699           }
700       free (src);
701
702       return false;
703     }
704 }
705
706 /* Destroys AGR. */
707 static void
708 agr_destroy (struct agr_proc *agr)
709 {
710   struct agr_var *iter, *next;
711
712   subcase_destroy (&agr->sort);
713   free (agr->break_vars);
714   for (iter = agr->agr_vars; iter; iter = next)
715     {
716       next = iter->next;
717
718       if (iter->function & FSTRING)
719         {
720           size_t n_args;
721           size_t i;
722
723           n_args = agr_func_tab[iter->function & FUNC].n_args;
724           for (i = 0; i < n_args; i++)
725             free (iter->arg[i].c);
726           free (iter->string);
727         }
728       else if (iter->function == SD)
729         moments1_destroy (iter->moments);
730
731       dict_destroy_internal_var (iter->subject);
732       dict_destroy_internal_var (iter->weight);
733
734       free (iter);
735     }
736   if (agr->dict != NULL)
737     dict_destroy (agr->dict);
738 }
739 \f
740 /* Execution. */
741
742 /* Accumulates aggregation data from the case INPUT. */
743 static void
744 accumulate_aggregate_info (struct agr_proc *agr, const struct ccase *input)
745 {
746   struct agr_var *iter;
747   double weight;
748   bool bad_warn = true;
749
750   weight = dict_get_case_weight (agr->src_dict, input, &bad_warn);
751
752   for (iter = agr->agr_vars; iter; iter = iter->next)
753     if (iter->src)
754       {
755         const union value *v = case_data (input, iter->src);
756         int src_width = var_get_width (iter->src);
757
758         if (var_is_value_missing (iter->src, v, iter->exclude))
759           {
760             switch (iter->function)
761               {
762               case NMISS:
763               case NMISS | FSTRING:
764                 iter->dbl[0] += weight;
765                 break;
766               case NUMISS:
767               case NUMISS | FSTRING:
768                 iter->int1++;
769                 break;
770               }
771             iter->saw_missing = true;
772             continue;
773           }
774
775         /* This is horrible.  There are too many possibilities. */
776         switch (iter->function)
777           {
778           case SUM:
779             iter->dbl[0] += v->f * weight;
780             iter->int1 = 1;
781             break;
782           case MEAN:
783             iter->dbl[0] += v->f * weight;
784             iter->dbl[1] += weight;
785             break;
786           case MEDIAN:
787             {
788               double wv ;
789               struct ccase *cout;
790
791               cout = case_create (casewriter_get_proto (iter->writer));
792
793               case_data_rw (cout, iter->subject)->f
794                 = case_data (input, iter->src)->f;
795
796               wv = dict_get_case_weight (agr->src_dict, input, NULL);
797
798               case_data_rw (cout, iter->weight)->f = wv;
799
800               iter->cc += wv;
801
802               casewriter_write (iter->writer, cout);
803             }
804             break;
805           case SD:
806             moments1_add (iter->moments, v->f, weight);
807             break;
808           case MAX:
809             iter->dbl[0] = MAX (iter->dbl[0], v->f);
810             iter->int1 = 1;
811             break;
812           case MAX | FSTRING:
813             if (memcmp (iter->string, value_str (v, src_width), src_width) < 0)
814               memcpy (iter->string, value_str (v, src_width), src_width);
815             iter->int1 = 1;
816             break;
817           case MIN:
818             iter->dbl[0] = MIN (iter->dbl[0], v->f);
819             iter->int1 = 1;
820             break;
821           case MIN | FSTRING:
822             if (memcmp (iter->string, value_str (v, src_width), src_width) > 0)
823               memcpy (iter->string, value_str (v, src_width), src_width);
824             iter->int1 = 1;
825             break;
826           case FGT:
827           case PGT:
828             if (v->f > iter->arg[0].f)
829               iter->dbl[0] += weight;
830             iter->dbl[1] += weight;
831             break;
832           case FGT | FSTRING:
833           case PGT | FSTRING:
834             if (memcmp (iter->arg[0].c,
835                         value_str (v, src_width), src_width) < 0)
836               iter->dbl[0] += weight;
837             iter->dbl[1] += weight;
838             break;
839           case FLT:
840           case PLT:
841             if (v->f < iter->arg[0].f)
842               iter->dbl[0] += weight;
843             iter->dbl[1] += weight;
844             break;
845           case FLT | FSTRING:
846           case PLT | FSTRING:
847             if (memcmp (iter->arg[0].c,
848                         value_str (v, src_width), src_width) > 0)
849               iter->dbl[0] += weight;
850             iter->dbl[1] += weight;
851             break;
852           case FIN:
853           case PIN:
854             if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
855               iter->dbl[0] += weight;
856             iter->dbl[1] += weight;
857             break;
858           case FIN | FSTRING:
859           case PIN | FSTRING:
860             if (memcmp (iter->arg[0].c,
861                         value_str (v, src_width), src_width) <= 0
862                 && memcmp (iter->arg[1].c,
863                            value_str (v, src_width), src_width) >= 0)
864               iter->dbl[0] += weight;
865             iter->dbl[1] += weight;
866             break;
867           case FOUT:
868           case POUT:
869             if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
870               iter->dbl[0] += weight;
871             iter->dbl[1] += weight;
872             break;
873           case FOUT | FSTRING:
874           case POUT | FSTRING:
875             if (memcmp (iter->arg[0].c,
876                         value_str (v, src_width), src_width) > 0
877                 || memcmp (iter->arg[1].c,
878                            value_str (v, src_width), src_width) < 0)
879               iter->dbl[0] += weight;
880             iter->dbl[1] += weight;
881             break;
882           case N:
883           case N | FSTRING:
884             iter->dbl[0] += weight;
885             break;
886           case NU:
887           case NU | FSTRING:
888             iter->int1++;
889             break;
890           case FIRST:
891             if (iter->int1 == 0)
892               {
893                 iter->dbl[0] = v->f;
894                 iter->int1 = 1;
895               }
896             break;
897           case FIRST | FSTRING:
898             if (iter->int1 == 0)
899               {
900                 memcpy (iter->string, value_str (v, src_width), src_width);
901                 iter->int1 = 1;
902               }
903             break;
904           case LAST:
905             iter->dbl[0] = v->f;
906             iter->int1 = 1;
907             break;
908           case LAST | FSTRING:
909             memcpy (iter->string, value_str (v, src_width), src_width);
910             iter->int1 = 1;
911             break;
912           case NMISS:
913           case NMISS | FSTRING:
914           case NUMISS:
915           case NUMISS | FSTRING:
916             /* Our value is not missing or it would have been
917                caught earlier.  Nothing to do. */
918             break;
919           default:
920             NOT_REACHED ();
921           }
922       } else {
923       switch (iter->function)
924         {
925         case N:
926           iter->dbl[0] += weight;
927           break;
928         case NU:
929           iter->int1++;
930           break;
931         default:
932           NOT_REACHED ();
933         }
934     }
935 }
936
937 /* Writes an aggregated record to OUTPUT. */
938 static void
939 dump_aggregate_info (const struct agr_proc *agr, struct casewriter *output, const struct ccase *break_case)
940 {
941   struct ccase *c = case_create (dict_get_proto (agr->dict));
942
943   if ( agr->add_variables)
944     {
945       case_copy (c, 0, break_case, 0, dict_get_var_cnt (agr->src_dict));
946     }
947   else
948     {
949       int value_idx = 0;
950       int i;
951
952       for (i = 0; i < agr->break_var_cnt; i++)
953         {
954           const struct variable *v = agr->break_vars[i];
955           value_copy (case_data_rw_idx (c, value_idx),
956                       case_data (break_case, v),
957                       var_get_width (v));
958           value_idx++;
959         }
960     }
961
962   {
963     struct agr_var *i;
964
965     for (i = agr->agr_vars; i; i = i->next)
966       {
967         union value *v = case_data_rw (c, i->dest);
968         int width = var_get_width (i->dest);
969
970         if (agr->missing == COLUMNWISE && i->saw_missing
971             && (i->function & FUNC) != N && (i->function & FUNC) != NU
972             && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
973           {
974             value_set_missing (v, width);
975             casewriter_destroy (i->writer);
976             continue;
977           }
978
979         switch (i->function)
980           {
981           case SUM:
982             v->f = i->int1 ? i->dbl[0] : SYSMIS;
983             break;
984           case MEAN:
985             v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
986             break;
987           case MEDIAN:
988             {
989               if ( i->writer)
990                 {
991                   struct percentile *median = percentile_create (0.5, i->cc);
992                   struct order_stats *os = &median->parent;
993                   struct casereader *sorted_reader = casewriter_make_reader (i->writer);
994                   i->writer = NULL;
995
996                   order_stats_accumulate (&os, 1,
997                                           sorted_reader,
998                                           i->weight,
999                                           i->subject,
1000                                           i->exclude);
1001                   i->dbl[0] = percentile_calculate (median, PC_HAVERAGE);
1002                   statistic_destroy (&median->parent.parent);
1003                 }
1004               v->f = i->dbl[0];
1005             }
1006             break;
1007           case SD:
1008             {
1009               double variance;
1010
1011               /* FIXME: we should use two passes. */
1012               moments1_calculate (i->moments, NULL, NULL, &variance,
1013                                  NULL, NULL);
1014               if (variance != SYSMIS)
1015                 v->f = sqrt (variance);
1016               else
1017                 v->f = SYSMIS;
1018             }
1019             break;
1020           case MAX:
1021           case MIN:
1022             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1023             break;
1024           case MAX | FSTRING:
1025           case MIN | FSTRING:
1026             if (i->int1)
1027               memcpy (value_str_rw (v, width), i->string, width);
1028             else
1029               value_set_missing (v, width);
1030             break;
1031           case FGT:
1032           case FGT | FSTRING:
1033           case FLT:
1034           case FLT | FSTRING:
1035           case FIN:
1036           case FIN | FSTRING:
1037           case FOUT:
1038           case FOUT | FSTRING:
1039             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
1040             break;
1041           case PGT:
1042           case PGT | FSTRING:
1043           case PLT:
1044           case PLT | FSTRING:
1045           case PIN:
1046           case PIN | FSTRING:
1047           case POUT:
1048           case POUT | FSTRING:
1049             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
1050             break;
1051           case N:
1052           case N | FSTRING:
1053               v->f = i->dbl[0];
1054             break;
1055           case NU:
1056           case NU | FSTRING:
1057             v->f = i->int1;
1058             break;
1059           case FIRST:
1060           case LAST:
1061             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1062             break;
1063           case FIRST | FSTRING:
1064           case LAST | FSTRING:
1065             if (i->int1)
1066               memcpy (value_str_rw (v, width), i->string, width);
1067             else
1068               value_set_missing (v, width);
1069             break;
1070           case NMISS:
1071           case NMISS | FSTRING:
1072             v->f = i->dbl[0];
1073             break;
1074           case NUMISS:
1075           case NUMISS | FSTRING:
1076             v->f = i->int1;
1077             break;
1078           default:
1079             NOT_REACHED ();
1080           }
1081       }
1082   }
1083
1084   casewriter_write (output, c);
1085 }
1086
1087 /* Resets the state for all the aggregate functions. */
1088 static void
1089 initialize_aggregate_info (struct agr_proc *agr)
1090 {
1091   struct agr_var *iter;
1092
1093   for (iter = agr->agr_vars; iter; iter = iter->next)
1094     {
1095       iter->saw_missing = false;
1096       iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1097       iter->int1 = iter->int2 = 0;
1098       switch (iter->function)
1099         {
1100         case MIN:
1101           iter->dbl[0] = DBL_MAX;
1102           break;
1103         case MIN | FSTRING:
1104           memset (iter->string, 255, var_get_width (iter->src));
1105           break;
1106         case MAX:
1107           iter->dbl[0] = -DBL_MAX;
1108           break;
1109         case MAX | FSTRING:
1110           memset (iter->string, 0, var_get_width (iter->src));
1111           break;
1112         case MEDIAN:
1113           {
1114             struct caseproto *proto;
1115             struct subcase ordering;
1116
1117             proto = caseproto_create ();
1118             proto = caseproto_add_width (proto, 0);
1119             proto = caseproto_add_width (proto, 0);
1120
1121             if ( ! iter->subject)
1122               iter->subject = dict_create_internal_var (0, 0);
1123
1124             if ( ! iter->weight)
1125               iter->weight = dict_create_internal_var (1, 0);
1126
1127             subcase_init_var (&ordering, iter->subject, SC_ASCEND);
1128             iter->writer = sort_create_writer (&ordering, proto);
1129             subcase_destroy (&ordering);
1130             caseproto_unref (proto);
1131
1132             iter->cc = 0;
1133           }
1134           break;
1135         case SD:
1136           if (iter->moments == NULL)
1137             iter->moments = moments1_create (MOMENT_VARIANCE);
1138           else
1139             moments1_clear (iter->moments);
1140           break;
1141         default:
1142           break;
1143         }
1144     }
1145 }