AGGREGATE: Add MODE=ADDVARIABLES subcommand.
[pspp] / src / language / stats / aggregate.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2008, 2009, 2010 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <stdlib.h>
20
21 #include <data/any-writer.h>
22 #include <data/case.h>
23 #include <data/casegrouper.h>
24 #include <data/casereader.h>
25 #include <data/casewriter.h>
26 #include <data/dictionary.h>
27 #include <data/file-handle-def.h>
28 #include <data/format.h>
29 #include <data/procedure.h>
30 #include <data/settings.h>
31 #include <data/subcase.h>
32 #include <data/sys-file-writer.h>
33 #include <data/variable.h>
34 #include <language/command.h>
35 #include <language/data-io/file-handle.h>
36 #include <language/lexer/lexer.h>
37 #include <language/lexer/variable-parser.h>
38 #include <language/stats/sort-criteria.h>
39 #include <libpspp/assertion.h>
40 #include <libpspp/message.h>
41 #include <libpspp/misc.h>
42 #include <libpspp/pool.h>
43 #include <libpspp/str.h>
44 #include <math/moments.h>
45 #include <math/sort.h>
46 #include <math/statistic.h>
47 #include <math/percentiles.h>
48
49 #include "minmax.h"
50 #include "xalloc.h"
51
52 #include "gettext.h"
53 #define _(msgid) gettext (msgid)
54
55 /* Argument for AGGREGATE function. */
56 union agr_argument
57   {
58     double f;                           /* Numeric. */
59     char *c;                            /* Short or long string. */
60   };
61
62 /* Specifies how to make an aggregate variable. */
63 struct agr_var
64   {
65     struct agr_var *next;               /* Next in list. */
66
67     /* Collected during parsing. */
68     const struct variable *src; /* Source variable. */
69     struct variable *dest;      /* Target variable. */
70     int function;               /* Function. */
71     enum mv_class exclude;      /* Classes of missing values to exclude. */
72     union agr_argument arg[2];  /* Arguments. */
73
74     /* Accumulated during AGGREGATE execution. */
75     double dbl[3];
76     int int1, int2;
77     char *string;
78     bool saw_missing;
79     struct moments1 *moments;
80     double cc;
81
82     struct variable *subject;
83     struct variable *weight;
84     struct casewriter *writer;
85   };
86
87 /* Aggregation functions. */
88 enum
89   {
90     NONE, SUM, MEAN, MEDIAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
91     FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
92     N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
93     FUNC = 0x1f, /* Function mask. */
94     FSTRING = 1<<5, /* String function bit. */
95   };
96
97 /* Attributes of an aggregation function. */
98 struct agr_func
99   {
100     const char *name;           /* Aggregation function name. */
101     size_t n_args;              /* Number of arguments. */
102     enum val_type alpha_type;   /* When given ALPHA arguments, output type. */
103     struct fmt_spec format;     /* Format spec if alpha_type != ALPHA. */
104   };
105
106 /* Attributes of aggregation functions. */
107 static const struct agr_func agr_func_tab[] =
108   {
109     {"<NONE>",  0, -1,          {0, 0, 0}},
110     {"SUM",     0, -1,          {FMT_F, 8, 2}},
111     {"MEAN",    0, -1,          {FMT_F, 8, 2}},
112     {"MEDIAN",  0, -1,          {FMT_F, 8, 2}},
113     {"SD",      0, -1,          {FMT_F, 8, 2}},
114     {"MAX",     0, VAL_STRING,  {-1, -1, -1}},
115     {"MIN",     0, VAL_STRING,  {-1, -1, -1}},
116     {"PGT",     1, VAL_NUMERIC, {FMT_F, 5, 1}},
117     {"PLT",     1, VAL_NUMERIC, {FMT_F, 5, 1}},
118     {"PIN",     2, VAL_NUMERIC, {FMT_F, 5, 1}},
119     {"POUT",    2, VAL_NUMERIC, {FMT_F, 5, 1}},
120     {"FGT",     1, VAL_NUMERIC, {FMT_F, 5, 3}},
121     {"FLT",     1, VAL_NUMERIC, {FMT_F, 5, 3}},
122     {"FIN",     2, VAL_NUMERIC, {FMT_F, 5, 3}},
123     {"FOUT",    2, VAL_NUMERIC, {FMT_F, 5, 3}},
124     {"N",       0, VAL_NUMERIC, {FMT_F, 7, 0}},
125     {"NU",      0, VAL_NUMERIC, {FMT_F, 7, 0}},
126     {"NMISS",   0, VAL_NUMERIC, {FMT_F, 7, 0}},
127     {"NUMISS",  0, VAL_NUMERIC, {FMT_F, 7, 0}},
128     {"FIRST",   0, VAL_STRING,  {-1, -1, -1}},
129     {"LAST",    0, VAL_STRING,  {-1, -1, -1}},
130     {NULL,      0, -1,          {-1, -1, -1}},
131     {"N",       0, VAL_NUMERIC, {FMT_F, 7, 0}},
132     {"NU",      0, VAL_NUMERIC, {FMT_F, 7, 0}},
133   };
134
135 /* Missing value types. */
136 enum missing_treatment
137   {
138     ITEMWISE,           /* Missing values item by item. */
139     COLUMNWISE          /* Missing values column by column. */
140   };
141
142 /* An entire AGGREGATE procedure. */
143 struct agr_proc
144   {
145     /* Break variables. */
146     struct subcase sort;                /* Sort criteria (break variables). */
147     const struct variable **break_vars;       /* Break variables. */
148     size_t break_var_cnt;               /* Number of break variables. */
149
150     enum missing_treatment missing;     /* How to treat missing values. */
151     struct agr_var *agr_vars;           /* First aggregate variable. */
152     struct dictionary *dict;            /* Aggregate dictionary. */
153     const struct dictionary *src_dict;  /* Dict of the source */
154     int case_cnt;                       /* Counts aggregated cases. */
155
156     bool add_variables;                 /* True iff the aggregated variables should
157                                            be appended to the existing dictionary */
158   };
159
160 static void initialize_aggregate_info (struct agr_proc *);
161
162 static void accumulate_aggregate_info (struct agr_proc *,
163                                        const struct ccase *);
164 /* Prototypes. */
165 static bool parse_aggregate_functions (struct lexer *, const struct dictionary *,
166                                        struct agr_proc *);
167 static void agr_destroy (struct agr_proc *);
168 static void dump_aggregate_info (const struct agr_proc *agr,
169                                  struct casewriter *output,
170                                  const struct ccase *break_case);
171 \f
172 /* Parsing. */
173
174 /* Parses and executes the AGGREGATE procedure. */
175 int
176 cmd_aggregate (struct lexer *lexer, struct dataset *ds)
177 {
178   struct dictionary *dict = dataset_dict (ds);
179   struct agr_proc agr;
180   struct file_handle *out_file = NULL;
181   struct casereader *input = NULL, *group;
182   struct casegrouper *grouper;
183   struct casewriter *output = NULL;
184
185   bool copy_documents = false;
186   bool presorted = false;
187   bool saw_direction;
188   bool ok;
189
190   memset(&agr, 0 , sizeof (agr));
191   agr.missing = ITEMWISE;
192   agr.src_dict = dict;
193   subcase_init_empty (&agr.sort);
194
195   /* OUTFILE subcommand must be first. */
196   lex_match (lexer, '/');
197   if (!lex_force_match_id (lexer, "OUTFILE"))
198     goto error;
199   lex_match (lexer, '=');
200   if (!lex_match (lexer, '*'))
201     {
202       out_file = fh_parse (lexer, FH_REF_FILE | FH_REF_SCRATCH);
203       if (out_file == NULL)
204         goto error;
205     }
206
207   if (out_file == NULL && lex_match_id (lexer, "MODE"))
208     {
209       lex_match (lexer, '=');
210       if (lex_match_id (lexer, "ADDVARIABLES"))
211         {
212           agr.add_variables = true;
213
214           /* presorted is assumed in ADDVARIABLES mode */
215           presorted = true;
216         }
217       else if (lex_match_id (lexer, "REPLACE"))
218         {
219           agr.add_variables = false;
220         }
221       else
222         goto error;
223     }
224
225   if ( agr.add_variables )
226     agr.dict = dict_clone (dict);
227   else
228     agr.dict = dict_create ();    
229
230   dict_set_label (agr.dict, dict_get_label (dict));
231   dict_set_documents (agr.dict, dict_get_documents (dict));
232
233   /* Read most of the subcommands. */
234   for (;;)
235     {
236       lex_match (lexer, '/');
237
238       if (lex_match_id (lexer, "MISSING"))
239         {
240           lex_match (lexer, '=');
241           if (!lex_match_id (lexer, "COLUMNWISE"))
242             {
243               lex_error (lexer, _("while expecting COLUMNWISE"));
244               goto error;
245             }
246           agr.missing = COLUMNWISE;
247         }
248       else if (lex_match_id (lexer, "DOCUMENT"))
249         copy_documents = true;
250       else if (lex_match_id (lexer, "PRESORTED"))
251         presorted = true;
252       else if (lex_match_id (lexer, "BREAK"))
253         {
254           int i;
255
256           lex_match (lexer, '=');
257           if (!parse_sort_criteria (lexer, dict, &agr.sort, &agr.break_vars,
258                                     &saw_direction))
259             goto error;
260           agr.break_var_cnt = subcase_get_n_fields (&agr.sort);
261
262           if  (! agr.add_variables)
263             for (i = 0; i < agr.break_var_cnt; i++)
264               dict_clone_var_assert (agr.dict, agr.break_vars[i]);
265
266           /* BREAK must follow the options. */
267           break;
268         }
269       else
270         {
271           lex_error (lexer, _("expecting BREAK"));
272           goto error;
273         }
274     }
275   if (presorted && saw_direction)
276     msg (SW, _("When PRESORTED is specified, specifying sorting directions "
277                "with (A) or (D) has no effect.  Output data will be sorted "
278                "the same way as the input data."));
279
280   /* Read in the aggregate functions. */
281   lex_match (lexer, '/');
282   if (!parse_aggregate_functions (lexer, dict, &agr))
283     goto error;
284
285   /* Delete documents. */
286   if (!copy_documents)
287     dict_clear_documents (agr.dict);
288
289   /* Cancel SPLIT FILE. */
290   dict_set_split_vars (agr.dict, NULL, 0);
291
292   /* Initialize. */
293   agr.case_cnt = 0;
294
295   if (out_file == NULL)
296     {
297       /* The active file will be replaced by the aggregated data,
298          so TEMPORARY is moot. */
299       proc_cancel_temporary_transformations (ds);
300       proc_discard_output (ds);
301       output = autopaging_writer_create (dict_get_proto (agr.dict));
302     }
303   else
304     {
305       output = any_writer_open (out_file, agr.dict);
306       if (output == NULL)
307         goto error;
308     }
309
310   input = proc_open (ds);
311   if (!subcase_is_empty (&agr.sort) && !presorted)
312     {
313       input = sort_execute (input, &agr.sort);
314       subcase_clear (&agr.sort);
315     }
316
317   for (grouper = casegrouper_create_vars (input, agr.break_vars,
318                                           agr.break_var_cnt);
319        casegrouper_get_next_group (grouper, &group);
320        casereader_destroy (group))
321     {
322       struct casereader *placeholder = NULL;
323       struct ccase *c = casereader_peek (group, 0);
324
325       if (c == NULL)
326         {
327           casereader_destroy (group);
328           continue;
329         }
330
331       initialize_aggregate_info (&agr);
332
333       if ( agr.add_variables )
334         placeholder = casereader_clone (group);
335
336       {
337         struct ccase *cg;
338         for (; (cg = casereader_read (group)) != NULL; case_unref (cg))
339           accumulate_aggregate_info (&agr, cg);
340       }
341
342
343       if  (agr.add_variables)
344         {
345           struct ccase *cg;
346           for (; (cg = casereader_read (placeholder)) != NULL; case_unref (cg))
347             dump_aggregate_info (&agr, output, cg);
348
349           casereader_destroy (placeholder);
350         }
351       else
352         {
353           dump_aggregate_info (&agr, output, c);
354           case_unref (c);
355         }
356     }
357   if (!casegrouper_destroy (grouper))
358     goto error;
359
360   if (!proc_commit (ds))
361     {
362       input = NULL;
363       goto error;
364     }
365   input = NULL;
366
367   if (out_file == NULL)
368     {
369       struct casereader *next_input = casewriter_make_reader (output);
370       if (next_input == NULL)
371         goto error;
372
373       proc_set_active_file (ds, next_input, agr.dict);
374       agr.dict = NULL;
375     }
376   else
377     {
378       ok = casewriter_destroy (output);
379       output = NULL;
380       if (!ok)
381         goto error;
382     }
383
384   agr_destroy (&agr);
385   fh_unref (out_file);
386   return CMD_SUCCESS;
387
388 error:
389   if (input != NULL)
390     proc_commit (ds);
391   casewriter_destroy (output);
392   agr_destroy (&agr);
393   fh_unref (out_file);
394   return CMD_CASCADING_FAILURE;
395 }
396
397 /* Parse all the aggregate functions. */
398 static bool
399 parse_aggregate_functions (struct lexer *lexer, const struct dictionary *dict,
400                            struct agr_proc *agr)
401 {
402   struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
403
404   /* Parse everything. */
405   tail = NULL;
406   for (;;)
407     {
408       char **dest;
409       char **dest_label;
410       size_t n_dest;
411       struct string function_name;
412
413       enum mv_class exclude;
414       const struct agr_func *function;
415       int func_index;
416
417       union agr_argument arg[2];
418
419       const struct variable **src;
420       size_t n_src;
421
422       size_t i;
423
424       dest = NULL;
425       dest_label = NULL;
426       n_dest = 0;
427       src = NULL;
428       function = NULL;
429       n_src = 0;
430       arg[0].c = NULL;
431       arg[1].c = NULL;
432       ds_init_empty (&function_name);
433
434       /* Parse the list of target variables. */
435       while (!lex_match (lexer, '='))
436         {
437           size_t n_dest_prev = n_dest;
438
439           if (!parse_DATA_LIST_vars (lexer, &dest, &n_dest,
440                                      (PV_APPEND | PV_SINGLE | PV_NO_SCRATCH
441                                       | PV_NO_DUPLICATE)))
442             goto error;
443
444           /* Assign empty labels. */
445           {
446             int j;
447
448             dest_label = xnrealloc (dest_label, n_dest, sizeof *dest_label);
449             for (j = n_dest_prev; j < n_dest; j++)
450               dest_label[j] = NULL;
451           }
452
453
454
455           if (lex_token (lexer) == T_STRING)
456             {
457               struct string label;
458               ds_init_string (&label, lex_tokstr (lexer));
459
460               ds_truncate (&label, 255);
461               dest_label[n_dest - 1] = ds_xstrdup (&label);
462               lex_get (lexer);
463               ds_destroy (&label);
464             }
465         }
466
467       /* Get the name of the aggregation function. */
468       if (lex_token (lexer) != T_ID)
469         {
470           lex_error (lexer, _("expecting aggregation function"));
471           goto error;
472         }
473
474       exclude = MV_ANY;
475
476       ds_assign_string (&function_name, lex_tokstr (lexer));
477
478       ds_chomp (&function_name, '.');
479
480       if (lex_tokid(lexer)[strlen (lex_tokid (lexer)) - 1] == '.')
481         exclude = MV_SYSTEM;
482
483       for (function = agr_func_tab; function->name; function++)
484         if (!strcasecmp (function->name, ds_cstr (&function_name)))
485           break;
486       if (NULL == function->name)
487         {
488           msg (SE, _("Unknown aggregation function %s."),
489                ds_cstr (&function_name));
490           goto error;
491         }
492       ds_destroy (&function_name);
493       func_index = function - agr_func_tab;
494       lex_get (lexer);
495
496       /* Check for leading lparen. */
497       if (!lex_match (lexer, '('))
498         {
499           if (func_index == N)
500             func_index = N_NO_VARS;
501           else if (func_index == NU)
502             func_index = NU_NO_VARS;
503           else
504             {
505               lex_error (lexer, _("expecting `('"));
506               goto error;
507             }
508         }
509       else
510         {
511           /* Parse list of source variables. */
512           {
513             int pv_opts = PV_NO_SCRATCH;
514
515             if (func_index == SUM || func_index == MEAN || func_index == SD)
516               pv_opts |= PV_NUMERIC;
517             else if (function->n_args)
518               pv_opts |= PV_SAME_TYPE;
519
520             if (!parse_variables_const (lexer, dict, &src, &n_src, pv_opts))
521               goto error;
522           }
523
524           /* Parse function arguments, for those functions that
525              require arguments. */
526           if (function->n_args != 0)
527             for (i = 0; i < function->n_args; i++)
528               {
529                 int type;
530
531                 lex_match (lexer, ',');
532                 if (lex_token (lexer) == T_STRING)
533                   {
534                     arg[i].c = ds_xstrdup (lex_tokstr (lexer));
535                     type = VAL_STRING;
536                   }
537                 else if (lex_is_number (lexer))
538                   {
539                     arg[i].f = lex_tokval (lexer);
540                     type = VAL_NUMERIC;
541                   }
542                 else
543                   {
544                     msg (SE, _("Missing argument %zu to %s."),
545                          i + 1, function->name);
546                     goto error;
547                   }
548
549                 lex_get (lexer);
550
551                 if (type != var_get_type (src[0]))
552                   {
553                     msg (SE, _("Arguments to %s must be of same type as "
554                                "source variables."),
555                          function->name);
556                     goto error;
557                   }
558               }
559
560           /* Trailing rparen. */
561           if (!lex_match (lexer, ')'))
562             {
563               lex_error (lexer, _("expecting `)'"));
564               goto error;
565             }
566
567           /* Now check that the number of source variables match
568              the number of target variables.  If we check earlier
569              than this, the user can get very misleading error
570              message, i.e. `AGGREGATE x=SUM(y t).' will get this
571              error message when a proper message would be more
572              like `unknown variable t'. */
573           if (n_src != n_dest)
574             {
575               msg (SE, _("Number of source variables (%zu) does not match "
576                          "number of target variables (%zu)."),
577                     n_src, n_dest);
578               goto error;
579             }
580
581           if ((func_index == PIN || func_index == POUT
582               || func_index == FIN || func_index == FOUT)
583               && (var_is_numeric (src[0])
584                   ? arg[0].f > arg[1].f
585                   : str_compare_rpad (arg[0].c, arg[1].c) > 0))
586             {
587               union agr_argument t = arg[0];
588               arg[0] = arg[1];
589               arg[1] = t;
590
591               msg (SW, _("The value arguments passed to the %s function "
592                          "are out-of-order.  They will be treated as if "
593                          "they had been specified in the correct order."),
594                    function->name);
595             }
596         }
597
598       /* Finally add these to the linked list of aggregation
599          variables. */
600       for (i = 0; i < n_dest; i++)
601         {
602           struct agr_var *v = xzalloc (sizeof *v);
603
604           /* Add variable to chain. */
605           if (agr->agr_vars != NULL)
606             tail->next = v;
607           else
608             agr->agr_vars = v;
609           tail = v;
610           tail->next = NULL;
611           v->moments = NULL;
612
613           /* Create the target variable in the aggregate
614              dictionary. */
615           {
616             struct variable *destvar;
617
618             v->function = func_index;
619
620             if (src)
621               {
622                 v->src = src[i];
623
624                 if (var_is_alpha (src[i]))
625                   {
626                     v->function |= FSTRING;
627                     v->string = xmalloc (var_get_width (src[i]));
628                   }
629
630                 if (function->alpha_type == VAL_STRING)
631                   destvar = dict_clone_var_as (agr->dict, v->src, dest[i]);
632                 else
633                   {
634                     assert (var_is_numeric (v->src)
635                             || function->alpha_type == VAL_NUMERIC);
636                     destvar = dict_create_var (agr->dict, dest[i], 0);
637                     if (destvar != NULL)
638                       {
639                         struct fmt_spec f;
640                         if ((func_index == N || func_index == NMISS)
641                             && dict_get_weight (dict) != NULL)
642                           f = fmt_for_output (FMT_F, 8, 2);
643                         else
644                           f = function->format;
645                         var_set_both_formats (destvar, &f);
646                       }
647                   }
648               } else {
649                 struct fmt_spec f;
650                 v->src = NULL;
651                 destvar = dict_create_var (agr->dict, dest[i], 0);
652                 if (func_index == N_NO_VARS && dict_get_weight (dict) != NULL)
653                   f = fmt_for_output (FMT_F, 8, 2);
654                 else
655                   f = function->format;
656                 var_set_both_formats (destvar, &f);
657               }
658
659             if (!destvar)
660               {
661                 msg (SE, _("Variable name %s is not unique within the "
662                            "aggregate file dictionary, which contains "
663                            "the aggregate variables and the break "
664                            "variables."),
665                      dest[i]);
666                 goto error;
667               }
668
669             free (dest[i]);
670             if (dest_label[i])
671               var_set_label (destvar, dest_label[i]);
672
673             v->dest = destvar;
674           }
675
676           v->exclude = exclude;
677
678           if (v->src != NULL)
679             {
680               int j;
681
682               if (var_is_numeric (v->src))
683                 for (j = 0; j < function->n_args; j++)
684                   v->arg[j].f = arg[j].f;
685               else
686                 for (j = 0; j < function->n_args; j++)
687                   v->arg[j].c = xstrdup (arg[j].c);
688             }
689         }
690
691       if (src != NULL && var_is_alpha (src[0]))
692         for (i = 0; i < function->n_args; i++)
693           {
694             free (arg[i].c);
695             arg[i].c = NULL;
696           }
697
698       free (src);
699       free (dest);
700       free (dest_label);
701
702       if (!lex_match (lexer, '/'))
703         {
704           if (lex_token (lexer) == '.')
705             return true;
706
707           lex_error (lexer, "expecting end of command");
708           return false;
709         }
710       continue;
711
712     error:
713       ds_destroy (&function_name);
714       for (i = 0; i < n_dest; i++)
715         {
716           free (dest[i]);
717           free (dest_label[i]);
718         }
719       free (dest);
720       free (dest_label);
721       free (arg[0].c);
722       free (arg[1].c);
723       if (src && n_src && var_is_alpha (src[0]))
724         for (i = 0; i < function->n_args; i++)
725           {
726             free (arg[i].c);
727             arg[i].c = NULL;
728           }
729       free (src);
730
731       return false;
732     }
733 }
734
735 /* Destroys AGR. */
736 static void
737 agr_destroy (struct agr_proc *agr)
738 {
739   struct agr_var *iter, *next;
740
741   subcase_destroy (&agr->sort);
742   free (agr->break_vars);
743   for (iter = agr->agr_vars; iter; iter = next)
744     {
745       next = iter->next;
746
747       if (iter->function & FSTRING)
748         {
749           size_t n_args;
750           size_t i;
751
752           n_args = agr_func_tab[iter->function & FUNC].n_args;
753           for (i = 0; i < n_args; i++)
754             free (iter->arg[i].c);
755           free (iter->string);
756         }
757       else if (iter->function == SD)
758         moments1_destroy (iter->moments);
759
760       dict_destroy_internal_var (iter->subject);
761       dict_destroy_internal_var (iter->weight);
762
763       free (iter);
764     }
765   if (agr->dict != NULL)
766     dict_destroy (agr->dict);
767 }
768 \f
769 /* Execution. */
770
771 /* Accumulates aggregation data from the case INPUT. */
772 static void
773 accumulate_aggregate_info (struct agr_proc *agr, const struct ccase *input)
774 {
775   struct agr_var *iter;
776   double weight;
777   bool bad_warn = true;
778
779   weight = dict_get_case_weight (agr->src_dict, input, &bad_warn);
780
781   for (iter = agr->agr_vars; iter; iter = iter->next)
782     if (iter->src)
783       {
784         const union value *v = case_data (input, iter->src);
785         int src_width = var_get_width (iter->src);
786
787         if (var_is_value_missing (iter->src, v, iter->exclude))
788           {
789             switch (iter->function)
790               {
791               case NMISS:
792               case NMISS | FSTRING:
793                 iter->dbl[0] += weight;
794                 break;
795               case NUMISS:
796               case NUMISS | FSTRING:
797                 iter->int1++;
798                 break;
799               }
800             iter->saw_missing = true;
801             continue;
802           }
803
804         /* This is horrible.  There are too many possibilities. */
805         switch (iter->function)
806           {
807           case SUM:
808             iter->dbl[0] += v->f * weight;
809             iter->int1 = 1;
810             break;
811           case MEAN:
812             iter->dbl[0] += v->f * weight;
813             iter->dbl[1] += weight;
814             break;
815           case MEDIAN:
816             {
817               double wv ;
818               struct ccase *cout;
819
820               cout = case_create (casewriter_get_proto (iter->writer));
821
822               case_data_rw (cout, iter->subject)->f
823                 = case_data (input, iter->src)->f;
824
825               wv = dict_get_case_weight (agr->src_dict, input, NULL);
826
827               case_data_rw (cout, iter->weight)->f = wv;
828
829               iter->cc += wv;
830
831               casewriter_write (iter->writer, cout);
832             }
833             break;
834           case SD:
835             moments1_add (iter->moments, v->f, weight);
836             break;
837           case MAX:
838             iter->dbl[0] = MAX (iter->dbl[0], v->f);
839             iter->int1 = 1;
840             break;
841           case MAX | FSTRING:
842             if (memcmp (iter->string, value_str (v, src_width), src_width) < 0)
843               memcpy (iter->string, value_str (v, src_width), src_width);
844             iter->int1 = 1;
845             break;
846           case MIN:
847             iter->dbl[0] = MIN (iter->dbl[0], v->f);
848             iter->int1 = 1;
849             break;
850           case MIN | FSTRING:
851             if (memcmp (iter->string, value_str (v, src_width), src_width) > 0)
852               memcpy (iter->string, value_str (v, src_width), src_width);
853             iter->int1 = 1;
854             break;
855           case FGT:
856           case PGT:
857             if (v->f > iter->arg[0].f)
858               iter->dbl[0] += weight;
859             iter->dbl[1] += weight;
860             break;
861           case FGT | FSTRING:
862           case PGT | FSTRING:
863             if (memcmp (iter->arg[0].c,
864                         value_str (v, src_width), src_width) < 0)
865               iter->dbl[0] += weight;
866             iter->dbl[1] += weight;
867             break;
868           case FLT:
869           case PLT:
870             if (v->f < iter->arg[0].f)
871               iter->dbl[0] += weight;
872             iter->dbl[1] += weight;
873             break;
874           case FLT | FSTRING:
875           case PLT | FSTRING:
876             if (memcmp (iter->arg[0].c,
877                         value_str (v, src_width), src_width) > 0)
878               iter->dbl[0] += weight;
879             iter->dbl[1] += weight;
880             break;
881           case FIN:
882           case PIN:
883             if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
884               iter->dbl[0] += weight;
885             iter->dbl[1] += weight;
886             break;
887           case FIN | FSTRING:
888           case PIN | FSTRING:
889             if (memcmp (iter->arg[0].c,
890                         value_str (v, src_width), src_width) <= 0
891                 && memcmp (iter->arg[1].c,
892                            value_str (v, src_width), src_width) >= 0)
893               iter->dbl[0] += weight;
894             iter->dbl[1] += weight;
895             break;
896           case FOUT:
897           case POUT:
898             if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
899               iter->dbl[0] += weight;
900             iter->dbl[1] += weight;
901             break;
902           case FOUT | FSTRING:
903           case POUT | FSTRING:
904             if (memcmp (iter->arg[0].c,
905                         value_str (v, src_width), src_width) > 0
906                 || memcmp (iter->arg[1].c,
907                            value_str (v, src_width), src_width) < 0)
908               iter->dbl[0] += weight;
909             iter->dbl[1] += weight;
910             break;
911           case N:
912           case N | FSTRING:
913             iter->dbl[0] += weight;
914             break;
915           case NU:
916           case NU | FSTRING:
917             iter->int1++;
918             break;
919           case FIRST:
920             if (iter->int1 == 0)
921               {
922                 iter->dbl[0] = v->f;
923                 iter->int1 = 1;
924               }
925             break;
926           case FIRST | FSTRING:
927             if (iter->int1 == 0)
928               {
929                 memcpy (iter->string, value_str (v, src_width), src_width);
930                 iter->int1 = 1;
931               }
932             break;
933           case LAST:
934             iter->dbl[0] = v->f;
935             iter->int1 = 1;
936             break;
937           case LAST | FSTRING:
938             memcpy (iter->string, value_str (v, src_width), src_width);
939             iter->int1 = 1;
940             break;
941           case NMISS:
942           case NMISS | FSTRING:
943           case NUMISS:
944           case NUMISS | FSTRING:
945             /* Our value is not missing or it would have been
946                caught earlier.  Nothing to do. */
947             break;
948           default:
949             NOT_REACHED ();
950           }
951     } else {
952       switch (iter->function)
953         {
954         case N_NO_VARS:
955           iter->dbl[0] += weight;
956           break;
957         case NU_NO_VARS:
958           iter->int1++;
959           break;
960         default:
961           NOT_REACHED ();
962         }
963     }
964 }
965
966 /* Writes an aggregated record to OUTPUT. */
967 static void
968 dump_aggregate_info (const struct agr_proc *agr, struct casewriter *output, const struct ccase *break_case)
969 {
970   struct ccase *c = case_create (dict_get_proto (agr->dict));
971
972   if ( agr->add_variables)
973     {
974       case_copy (c, 0, break_case, 0, dict_get_var_cnt (agr->src_dict));
975     }
976   else
977     {
978       int value_idx = 0;
979       int i;
980
981       for (i = 0; i < agr->break_var_cnt; i++)
982         {
983           const struct variable *v = agr->break_vars[i];
984           value_copy (case_data_rw_idx (c, value_idx),
985                       case_data (break_case, v),
986                       var_get_width (v));
987           value_idx++;
988         }
989     }
990
991   {
992     struct agr_var *i;
993
994     for (i = agr->agr_vars; i; i = i->next)
995       {
996         union value *v = case_data_rw (c, i->dest);
997         int width = var_get_width (i->dest);
998
999         if (agr->missing == COLUMNWISE && i->saw_missing
1000             && (i->function & FUNC) != N && (i->function & FUNC) != NU
1001             && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
1002           {
1003             value_set_missing (v, width);
1004             casewriter_destroy (i->writer);
1005             continue;
1006           }
1007
1008         switch (i->function)
1009           {
1010           case SUM:
1011             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1012             break;
1013           case MEAN:
1014             v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
1015             break;
1016           case MEDIAN:
1017             {
1018               struct casereader *sorted_reader;
1019               struct percentile *median = percentile_create (0.5, i->cc);
1020               struct order_stats *os = &median->parent;
1021
1022               sorted_reader = casewriter_make_reader (i->writer);
1023
1024               order_stats_accumulate (&os, 1,
1025                                       sorted_reader,
1026                                       i->weight,
1027                                       i->subject,
1028                                       i->exclude);
1029
1030               v->f = percentile_calculate (median, PC_HAVERAGE);
1031
1032               statistic_destroy (&median->parent.parent);
1033             }
1034             break;
1035           case SD:
1036             {
1037               double variance;
1038
1039               /* FIXME: we should use two passes. */
1040               moments1_calculate (i->moments, NULL, NULL, &variance,
1041                                  NULL, NULL);
1042               if (variance != SYSMIS)
1043                 v->f = sqrt (variance);
1044               else
1045                 v->f = SYSMIS;
1046             }
1047             break;
1048           case MAX:
1049           case MIN:
1050             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1051             break;
1052           case MAX | FSTRING:
1053           case MIN | FSTRING:
1054             if (i->int1)
1055               memcpy (value_str_rw (v, width), i->string, width);
1056             else
1057               value_set_missing (v, width);
1058             break;
1059           case FGT:
1060           case FGT | FSTRING:
1061           case FLT:
1062           case FLT | FSTRING:
1063           case FIN:
1064           case FIN | FSTRING:
1065           case FOUT:
1066           case FOUT | FSTRING:
1067             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
1068             break;
1069           case PGT:
1070           case PGT | FSTRING:
1071           case PLT:
1072           case PLT | FSTRING:
1073           case PIN:
1074           case PIN | FSTRING:
1075           case POUT:
1076           case POUT | FSTRING:
1077             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
1078             break;
1079           case N:
1080           case N | FSTRING:
1081             v->f = i->dbl[0];
1082             break;
1083           case NU:
1084           case NU | FSTRING:
1085             v->f = i->int1;
1086             break;
1087           case FIRST:
1088           case LAST:
1089             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1090             break;
1091           case FIRST | FSTRING:
1092           case LAST | FSTRING:
1093             if (i->int1)
1094               memcpy (value_str_rw (v, width), i->string, width);
1095             else
1096               value_set_missing (v, width);
1097             break;
1098           case N_NO_VARS:
1099             v->f = i->dbl[0];
1100             break;
1101           case NU_NO_VARS:
1102             v->f = i->int1;
1103             break;
1104           case NMISS:
1105           case NMISS | FSTRING:
1106             v->f = i->dbl[0];
1107             break;
1108           case NUMISS:
1109           case NUMISS | FSTRING:
1110             v->f = i->int1;
1111             break;
1112           default:
1113             NOT_REACHED ();
1114           }
1115       }
1116   }
1117
1118   casewriter_write (output, c);
1119 }
1120
1121 /* Resets the state for all the aggregate functions. */
1122 static void
1123 initialize_aggregate_info (struct agr_proc *agr)
1124 {
1125   struct agr_var *iter;
1126
1127   for (iter = agr->agr_vars; iter; iter = iter->next)
1128     {
1129       iter->saw_missing = false;
1130       iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1131       iter->int1 = iter->int2 = 0;
1132       switch (iter->function)
1133         {
1134         case MIN:
1135           iter->dbl[0] = DBL_MAX;
1136           break;
1137         case MIN | FSTRING:
1138           memset (iter->string, 255, var_get_width (iter->src));
1139           break;
1140         case MAX:
1141           iter->dbl[0] = -DBL_MAX;
1142           break;
1143         case MAX | FSTRING:
1144           memset (iter->string, 0, var_get_width (iter->src));
1145           break;
1146         case MEDIAN:
1147           {
1148             struct caseproto *proto;
1149             struct subcase ordering;
1150
1151             proto = caseproto_create ();
1152             proto = caseproto_add_width (proto, 0);
1153             proto = caseproto_add_width (proto, 0);
1154
1155             if ( ! iter->subject)
1156               iter->subject = dict_create_internal_var (0, 0);
1157
1158             if ( ! iter->weight)
1159               iter->weight = dict_create_internal_var (1, 0);
1160
1161             subcase_init_var (&ordering, iter->subject, SC_ASCEND);
1162             iter->writer = sort_create_writer (&ordering, proto);
1163             subcase_destroy (&ordering);
1164             caseproto_unref (proto);
1165
1166             iter->cc = 0;
1167           }
1168           break;
1169         case SD:
1170           if (iter->moments == NULL)
1171             iter->moments = moments1_create (MOMENT_VARIANCE);
1172           else
1173             moments1_clear (iter->moments);
1174           break;
1175         default:
1176           break;
1177         }
1178     }
1179 }