02257f9434f29555bcb2538f5b321a961d099788
[pspp-builds.git] / src / language / stats / aggregate.c
1 /* PSPP - a program for statistical analysis.
2    Copyright (C) 1997-9, 2000, 2006, 2008 Free Software Foundation, Inc.
3
4    This program is free software: you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation, either version 3 of the License, or
7    (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program.  If not, see <http://www.gnu.org/licenses/>. */
16
17 #include <config.h>
18
19 #include <stdlib.h>
20
21 #include <data/any-writer.h>
22 #include <data/case.h>
23 #include <data/casegrouper.h>
24 #include <data/casereader.h>
25 #include <data/casewriter.h>
26 #include <data/dictionary.h>
27 #include <data/file-handle-def.h>
28 #include <data/format.h>
29 #include <data/procedure.h>
30 #include <data/settings.h>
31 #include <data/subcase.h>
32 #include <data/sys-file-writer.h>
33 #include <data/variable.h>
34 #include <language/command.h>
35 #include <language/data-io/file-handle.h>
36 #include <language/lexer/lexer.h>
37 #include <language/lexer/variable-parser.h>
38 #include <language/stats/sort-criteria.h>
39 #include <libpspp/assertion.h>
40 #include <libpspp/message.h>
41 #include <libpspp/misc.h>
42 #include <libpspp/pool.h>
43 #include <libpspp/str.h>
44 #include <math/moments.h>
45 #include <math/sort.h>
46 #include <math/statistic.h>
47 #include <math/percentiles.h>
48
49 #include "minmax.h"
50 #include "xalloc.h"
51
52 #include "gettext.h"
53 #define _(msgid) gettext (msgid)
54
55 /* Argument for AGGREGATE function. */
56 union agr_argument
57   {
58     double f;                           /* Numeric. */
59     char *c;                            /* Short or long string. */
60   };
61
62 /* Specifies how to make an aggregate variable. */
63 struct agr_var
64   {
65     struct agr_var *next;               /* Next in list. */
66
67     /* Collected during parsing. */
68     const struct variable *src; /* Source variable. */
69     struct variable *dest;      /* Target variable. */
70     int function;               /* Function. */
71     enum mv_class exclude;      /* Classes of missing values to exclude. */
72     union agr_argument arg[2];  /* Arguments. */
73
74     /* Accumulated during AGGREGATE execution. */
75     double dbl[3];
76     int int1, int2;
77     char *string;
78     bool saw_missing;
79     struct moments1 *moments;
80     double cc;
81
82     struct variable *subject;
83     struct variable *weight;
84     struct casewriter *writer;
85   };
86
87 /* Aggregation functions. */
88 enum
89   {
90     NONE, SUM, MEAN, MEDIAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
91     FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
92     N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
93     FUNC = 0x1f, /* Function mask. */
94     FSTRING = 1<<5, /* String function bit. */
95   };
96
97 /* Attributes of an aggregation function. */
98 struct agr_func
99   {
100     const char *name;           /* Aggregation function name. */
101     size_t n_args;              /* Number of arguments. */
102     enum val_type alpha_type;   /* When given ALPHA arguments, output type. */
103     struct fmt_spec format;     /* Format spec if alpha_type != ALPHA. */
104   };
105
106 /* Attributes of aggregation functions. */
107 static const struct agr_func agr_func_tab[] =
108   {
109     {"<NONE>",  0, -1,          {0, 0, 0}},
110     {"SUM",     0, -1,          {FMT_F, 8, 2}},
111     {"MEAN",    0, -1,          {FMT_F, 8, 2}},
112     {"MEDIAN",  0, -1,          {FMT_F, 8, 2}},
113     {"SD",      0, -1,          {FMT_F, 8, 2}},
114     {"MAX",     0, VAL_STRING,  {-1, -1, -1}},
115     {"MIN",     0, VAL_STRING,  {-1, -1, -1}},
116     {"PGT",     1, VAL_NUMERIC, {FMT_F, 5, 1}},
117     {"PLT",     1, VAL_NUMERIC, {FMT_F, 5, 1}},
118     {"PIN",     2, VAL_NUMERIC, {FMT_F, 5, 1}},
119     {"POUT",    2, VAL_NUMERIC, {FMT_F, 5, 1}},
120     {"FGT",     1, VAL_NUMERIC, {FMT_F, 5, 3}},
121     {"FLT",     1, VAL_NUMERIC, {FMT_F, 5, 3}},
122     {"FIN",     2, VAL_NUMERIC, {FMT_F, 5, 3}},
123     {"FOUT",    2, VAL_NUMERIC, {FMT_F, 5, 3}},
124     {"N",       0, VAL_NUMERIC, {FMT_F, 7, 0}},
125     {"NU",      0, VAL_NUMERIC, {FMT_F, 7, 0}},
126     {"NMISS",   0, VAL_NUMERIC, {FMT_F, 7, 0}},
127     {"NUMISS",  0, VAL_NUMERIC, {FMT_F, 7, 0}},
128     {"FIRST",   0, VAL_STRING,  {-1, -1, -1}},
129     {"LAST",    0, VAL_STRING,  {-1, -1, -1}},
130     {NULL,      0, -1,          {-1, -1, -1}},
131     {"N",       0, VAL_NUMERIC, {FMT_F, 7, 0}},
132     {"NU",      0, VAL_NUMERIC, {FMT_F, 7, 0}},
133   };
134
135 /* Missing value types. */
136 enum missing_treatment
137   {
138     ITEMWISE,           /* Missing values item by item. */
139     COLUMNWISE          /* Missing values column by column. */
140   };
141
142 /* An entire AGGREGATE procedure. */
143 struct agr_proc
144   {
145     /* Break variables. */
146     struct subcase sort;                /* Sort criteria (break variables). */
147     const struct variable **break_vars;       /* Break variables. */
148     size_t break_var_cnt;               /* Number of break variables. */
149     struct ccase break_case;            /* Last values of break variables. */
150
151     enum missing_treatment missing;     /* How to treat missing values. */
152     struct agr_var *agr_vars;           /* First aggregate variable. */
153     struct dictionary *dict;            /* Aggregate dictionary. */
154     const struct dictionary *src_dict;  /* Dict of the source */
155     int case_cnt;                       /* Counts aggregated cases. */
156   };
157
158 static void initialize_aggregate_info (struct agr_proc *,
159                                        const struct ccase *);
160
161 static void accumulate_aggregate_info (struct agr_proc *,
162                                        const struct ccase *);
163 /* Prototypes. */
164 static bool parse_aggregate_functions (struct lexer *, const struct dictionary *,
165                                        struct agr_proc *);
166 static void agr_destroy (struct agr_proc *);
167 static void dump_aggregate_info (struct agr_proc *agr,
168                                  struct casewriter *output);
169 \f
170 /* Parsing. */
171
172 /* Parses and executes the AGGREGATE procedure. */
173 int
174 cmd_aggregate (struct lexer *lexer, struct dataset *ds)
175 {
176   struct dictionary *dict = dataset_dict (ds);
177   struct agr_proc agr;
178   struct file_handle *out_file = NULL;
179   struct casereader *input = NULL, *group;
180   struct casegrouper *grouper;
181   struct casewriter *output = NULL;
182
183   bool copy_documents = false;
184   bool presorted = false;
185   bool saw_direction;
186   bool ok;
187
188   memset(&agr, 0 , sizeof (agr));
189   agr.missing = ITEMWISE;
190   case_nullify (&agr.break_case);
191
192   agr.dict = dict_create ();
193   agr.src_dict = dict;
194   subcase_init_empty (&agr.sort);
195   dict_set_label (agr.dict, dict_get_label (dict));
196   dict_set_documents (agr.dict, dict_get_documents (dict));
197
198   /* OUTFILE subcommand must be first. */
199   if (!lex_force_match_id (lexer, "OUTFILE"))
200     goto error;
201   lex_match (lexer, '=');
202   if (!lex_match (lexer, '*'))
203     {
204       out_file = fh_parse (lexer, FH_REF_FILE | FH_REF_SCRATCH);
205       if (out_file == NULL)
206         goto error;
207     }
208
209   /* Read most of the subcommands. */
210   for (;;)
211     {
212       lex_match (lexer, '/');
213
214       if (lex_match_id (lexer, "MISSING"))
215         {
216           lex_match (lexer, '=');
217           if (!lex_match_id (lexer, "COLUMNWISE"))
218             {
219               lex_error (lexer, _("while expecting COLUMNWISE"));
220               goto error;
221             }
222           agr.missing = COLUMNWISE;
223         }
224       else if (lex_match_id (lexer, "DOCUMENT"))
225         copy_documents = true;
226       else if (lex_match_id (lexer, "PRESORTED"))
227         presorted = true;
228       else if (lex_match_id (lexer, "BREAK"))
229         {
230           int i;
231
232           lex_match (lexer, '=');
233           if (!parse_sort_criteria (lexer, dict, &agr.sort, &agr.break_vars,
234                                     &saw_direction))
235             goto error;
236           agr.break_var_cnt = subcase_get_n_fields (&agr.sort);
237
238           for (i = 0; i < agr.break_var_cnt; i++)
239             dict_clone_var_assert (agr.dict, agr.break_vars[i],
240                                    var_get_name (agr.break_vars[i]));
241
242           /* BREAK must follow the options. */
243           break;
244         }
245       else
246         {
247           lex_error (lexer, _("expecting BREAK"));
248           goto error;
249         }
250     }
251   if (presorted && saw_direction)
252     msg (SW, _("When PRESORTED is specified, specifying sorting directions "
253                "with (A) or (D) has no effect.  Output data will be sorted "
254                "the same way as the input data."));
255
256   /* Read in the aggregate functions. */
257   lex_match (lexer, '/');
258   if (!parse_aggregate_functions (lexer, dict, &agr))
259     goto error;
260
261   /* Delete documents. */
262   if (!copy_documents)
263     dict_clear_documents (agr.dict);
264
265   /* Cancel SPLIT FILE. */
266   dict_set_split_vars (agr.dict, NULL, 0);
267
268   /* Initialize. */
269   agr.case_cnt = 0;
270
271   if (out_file == NULL)
272     {
273       /* The active file will be replaced by the aggregated data,
274          so TEMPORARY is moot. */
275       proc_cancel_temporary_transformations (ds);
276       proc_discard_output (ds);
277       output = autopaging_writer_create (dict_get_next_value_idx (agr.dict));
278     }
279   else
280     {
281       output = any_writer_open (out_file, agr.dict);
282       if (output == NULL)
283         goto error;
284     }
285
286   input = proc_open (ds);
287   if (!subcase_is_empty (&agr.sort) && !presorted)
288     {
289       input = sort_execute (input, &agr.sort);
290       subcase_clear (&agr.sort);
291     }
292
293   for (grouper = casegrouper_create_vars (input, agr.break_vars,
294                                           agr.break_var_cnt);
295        casegrouper_get_next_group (grouper, &group);
296        casereader_destroy (group))
297     {
298       struct ccase c;
299
300       if (!casereader_peek (group, 0, &c))
301         {
302           casereader_destroy (group);
303           continue;
304         }
305       initialize_aggregate_info (&agr, &c);
306       case_destroy (&c);
307
308       for (; casereader_read (group, &c); case_destroy (&c))
309         accumulate_aggregate_info (&agr, &c);
310       dump_aggregate_info (&agr, output);
311     }
312   if (!casegrouper_destroy (grouper))
313     goto error;
314
315   if (!proc_commit (ds))
316     {
317       input = NULL;
318       goto error;
319     }
320   input = NULL;
321
322   if (out_file == NULL)
323     {
324       struct casereader *next_input = casewriter_make_reader (output);
325       if (next_input == NULL)
326         goto error;
327
328       proc_set_active_file (ds, next_input, agr.dict);
329       agr.dict = NULL;
330     }
331   else
332     {
333       ok = casewriter_destroy (output);
334       output = NULL;
335       if (!ok)
336         goto error;
337     }
338
339   agr_destroy (&agr);
340   fh_unref (out_file);
341   return CMD_SUCCESS;
342
343 error:
344   if (input != NULL)
345     proc_commit (ds);
346   casewriter_destroy (output);
347   agr_destroy (&agr);
348   fh_unref (out_file);
349   return CMD_CASCADING_FAILURE;
350 }
351
352 /* Parse all the aggregate functions. */
353 static bool
354 parse_aggregate_functions (struct lexer *lexer, const struct dictionary *dict,
355                            struct agr_proc *agr)
356 {
357   struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
358
359   /* Parse everything. */
360   tail = NULL;
361   for (;;)
362     {
363       char **dest;
364       char **dest_label;
365       size_t n_dest;
366       struct string function_name;
367
368       enum mv_class exclude;
369       const struct agr_func *function;
370       int func_index;
371
372       union agr_argument arg[2];
373
374       const struct variable **src;
375       size_t n_src;
376
377       size_t i;
378
379       dest = NULL;
380       dest_label = NULL;
381       n_dest = 0;
382       src = NULL;
383       function = NULL;
384       n_src = 0;
385       arg[0].c = NULL;
386       arg[1].c = NULL;
387       ds_init_empty (&function_name);
388
389       /* Parse the list of target variables. */
390       while (!lex_match (lexer, '='))
391         {
392           size_t n_dest_prev = n_dest;
393
394           if (!parse_DATA_LIST_vars (lexer, &dest, &n_dest,
395                                      PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
396             goto error;
397
398           /* Assign empty labels. */
399           {
400             int j;
401
402             dest_label = xnrealloc (dest_label, n_dest, sizeof *dest_label);
403             for (j = n_dest_prev; j < n_dest; j++)
404               dest_label[j] = NULL;
405           }
406
407
408
409           if (lex_token (lexer) == T_STRING)
410             {
411               struct string label;
412               ds_init_string (&label, lex_tokstr (lexer));
413
414               ds_truncate (&label, 255);
415               dest_label[n_dest - 1] = ds_xstrdup (&label);
416               lex_get (lexer);
417               ds_destroy (&label);
418             }
419         }
420
421       /* Get the name of the aggregation function. */
422       if (lex_token (lexer) != T_ID)
423         {
424           lex_error (lexer, _("expecting aggregation function"));
425           goto error;
426         }
427
428       exclude = MV_ANY;
429
430       ds_assign_string (&function_name, lex_tokstr (lexer));
431
432       ds_chomp (&function_name, '.');
433
434       if (lex_tokid(lexer)[strlen (lex_tokid (lexer)) - 1] == '.')
435         exclude = MV_SYSTEM;
436
437       for (function = agr_func_tab; function->name; function++)
438         if (!strcasecmp (function->name, ds_cstr (&function_name)))
439           break;
440       if (NULL == function->name)
441         {
442           msg (SE, _("Unknown aggregation function %s."),
443                ds_cstr (&function_name));
444           goto error;
445         }
446       ds_destroy (&function_name);
447       func_index = function - agr_func_tab;
448       lex_get (lexer);
449
450       /* Check for leading lparen. */
451       if (!lex_match (lexer, '('))
452         {
453           if (func_index == N)
454             func_index = N_NO_VARS;
455           else if (func_index == NU)
456             func_index = NU_NO_VARS;
457           else
458             {
459               lex_error (lexer, _("expecting `('"));
460               goto error;
461             }
462         }
463       else
464         {
465           /* Parse list of source variables. */
466           {
467             int pv_opts = PV_NO_SCRATCH;
468
469             if (func_index == SUM || func_index == MEAN || func_index == SD)
470               pv_opts |= PV_NUMERIC;
471             else if (function->n_args)
472               pv_opts |= PV_SAME_TYPE;
473
474             if (!parse_variables_const (lexer, dict, &src, &n_src, pv_opts))
475               goto error;
476           }
477
478           /* Parse function arguments, for those functions that
479              require arguments. */
480           if (function->n_args != 0)
481             for (i = 0; i < function->n_args; i++)
482               {
483                 int type;
484
485                 lex_match (lexer, ',');
486                 if (lex_token (lexer) == T_STRING)
487                   {
488                     arg[i].c = ds_xstrdup (lex_tokstr (lexer));
489                     type = VAL_STRING;
490                   }
491                 else if (lex_is_number (lexer))
492                   {
493                     arg[i].f = lex_tokval (lexer);
494                     type = VAL_NUMERIC;
495                   }
496                 else
497                   {
498                     msg (SE, _("Missing argument %zu to %s."),
499                          i + 1, function->name);
500                     goto error;
501                   }
502
503                 lex_get (lexer);
504
505                 if (type != var_get_type (src[0]))
506                   {
507                     msg (SE, _("Arguments to %s must be of same type as "
508                                "source variables."),
509                          function->name);
510                     goto error;
511                   }
512               }
513
514           /* Trailing rparen. */
515           if (!lex_match (lexer, ')'))
516             {
517               lex_error (lexer, _("expecting `)'"));
518               goto error;
519             }
520
521           /* Now check that the number of source variables match
522              the number of target variables.  If we check earlier
523              than this, the user can get very misleading error
524              message, i.e. `AGGREGATE x=SUM(y t).' will get this
525              error message when a proper message would be more
526              like `unknown variable t'. */
527           if (n_src != n_dest)
528             {
529               msg (SE, _("Number of source variables (%zu) does not match "
530                          "number of target variables (%zu)."),
531                     n_src, n_dest);
532               goto error;
533             }
534
535           if ((func_index == PIN || func_index == POUT
536               || func_index == FIN || func_index == FOUT)
537               && (var_is_numeric (src[0])
538                   ? arg[0].f > arg[1].f
539                   : str_compare_rpad (arg[0].c, arg[1].c) > 0))
540             {
541               union agr_argument t = arg[0];
542               arg[0] = arg[1];
543               arg[1] = t;
544
545               msg (SW, _("The value arguments passed to the %s function "
546                          "are out-of-order.  They will be treated as if "
547                          "they had been specified in the correct order."),
548                    function->name);
549             }
550         }
551
552       /* Finally add these to the linked list of aggregation
553          variables. */
554       for (i = 0; i < n_dest; i++)
555         {
556           struct agr_var *v = xzalloc (sizeof *v);
557
558           /* Add variable to chain. */
559           if (agr->agr_vars != NULL)
560             tail->next = v;
561           else
562             agr->agr_vars = v;
563           tail = v;
564           tail->next = NULL;
565           v->moments = NULL;
566
567           /* Create the target variable in the aggregate
568              dictionary. */
569           {
570             struct variable *destvar;
571
572             v->function = func_index;
573
574             if (src)
575               {
576                 v->src = src[i];
577
578                 if (var_is_alpha (src[i]))
579                   {
580                     v->function |= FSTRING;
581                     v->string = xmalloc (var_get_width (src[i]));
582                   }
583
584                 if (function->alpha_type == VAL_STRING)
585                   destvar = dict_clone_var (agr->dict, v->src, dest[i]);
586                 else
587                   {
588                     assert (var_is_numeric (v->src)
589                             || function->alpha_type == VAL_NUMERIC);
590                     destvar = dict_create_var (agr->dict, dest[i], 0);
591                     if (destvar != NULL)
592                       {
593                         struct fmt_spec f;
594                         if ((func_index == N || func_index == NMISS)
595                             && dict_get_weight (dict) != NULL)
596                           f = fmt_for_output (FMT_F, 8, 2);
597                         else
598                           f = function->format;
599                         var_set_both_formats (destvar, &f);
600                       }
601                   }
602               } else {
603                 struct fmt_spec f;
604                 v->src = NULL;
605                 destvar = dict_create_var (agr->dict, dest[i], 0);
606                 if (func_index == N_NO_VARS && dict_get_weight (dict) != NULL)
607                   f = fmt_for_output (FMT_F, 8, 2);
608                 else
609                   f = function->format;
610                 var_set_both_formats (destvar, &f);
611               }
612
613             if (!destvar)
614               {
615                 msg (SE, _("Variable name %s is not unique within the "
616                            "aggregate file dictionary, which contains "
617                            "the aggregate variables and the break "
618                            "variables."),
619                      dest[i]);
620                 goto error;
621               }
622
623             free (dest[i]);
624             if (dest_label[i])
625               var_set_label (destvar, dest_label[i]);
626
627             v->dest = destvar;
628           }
629
630           v->exclude = exclude;
631
632           if (v->src != NULL)
633             {
634               int j;
635
636               if (var_is_numeric (v->src))
637                 for (j = 0; j < function->n_args; j++)
638                   v->arg[j].f = arg[j].f;
639               else
640                 for (j = 0; j < function->n_args; j++)
641                   v->arg[j].c = xstrdup (arg[j].c);
642             }
643         }
644
645       if (src != NULL && var_is_alpha (src[0]))
646         for (i = 0; i < function->n_args; i++)
647           {
648             free (arg[i].c);
649             arg[i].c = NULL;
650           }
651
652       free (src);
653       free (dest);
654       free (dest_label);
655
656       if (!lex_match (lexer, '/'))
657         {
658           if (lex_token (lexer) == '.')
659             return true;
660
661           lex_error (lexer, "expecting end of command");
662           return false;
663         }
664       continue;
665
666     error:
667       ds_destroy (&function_name);
668       for (i = 0; i < n_dest; i++)
669         {
670           free (dest[i]);
671           free (dest_label[i]);
672         }
673       free (dest);
674       free (dest_label);
675       free (arg[0].c);
676       free (arg[1].c);
677       if (src && n_src && var_is_alpha (src[0]))
678         for (i = 0; i < function->n_args; i++)
679           {
680             free (arg[i].c);
681             arg[i].c = NULL;
682           }
683       free (src);
684
685       return false;
686     }
687 }
688
689 /* Destroys AGR. */
690 static void
691 agr_destroy (struct agr_proc *agr)
692 {
693   struct agr_var *iter, *next;
694
695   subcase_destroy (&agr->sort);
696   free (agr->break_vars);
697   case_destroy (&agr->break_case);
698   for (iter = agr->agr_vars; iter; iter = next)
699     {
700       next = iter->next;
701
702       if (iter->function & FSTRING)
703         {
704           size_t n_args;
705           size_t i;
706
707           n_args = agr_func_tab[iter->function & FUNC].n_args;
708           for (i = 0; i < n_args; i++)
709             free (iter->arg[i].c);
710           free (iter->string);
711         }
712       else if (iter->function == SD)
713         moments1_destroy (iter->moments);
714
715       var_destroy (iter->subject);
716       var_destroy (iter->weight);
717
718       free (iter);
719     }
720   if (agr->dict != NULL)
721     dict_destroy (agr->dict);
722 }
723 \f
724 /* Execution. */
725
726 /* Accumulates aggregation data from the case INPUT. */
727 static void
728 accumulate_aggregate_info (struct agr_proc *agr, const struct ccase *input)
729 {
730   struct agr_var *iter;
731   double weight;
732   bool bad_warn = true;
733
734   weight = dict_get_case_weight (agr->src_dict, input, &bad_warn);
735
736   for (iter = agr->agr_vars; iter; iter = iter->next)
737     if (iter->src)
738       {
739         const union value *v = case_data (input, iter->src);
740         int src_width = var_get_width (iter->src);
741
742         if (var_is_value_missing (iter->src, v, iter->exclude))
743           {
744             switch (iter->function)
745               {
746               case NMISS:
747               case NMISS | FSTRING:
748                 iter->dbl[0] += weight;
749                 break;
750               case NUMISS:
751               case NUMISS | FSTRING:
752                 iter->int1++;
753                 break;
754               }
755             iter->saw_missing = true;
756             continue;
757           }
758
759         /* This is horrible.  There are too many possibilities. */
760         switch (iter->function)
761           {
762           case SUM:
763             iter->dbl[0] += v->f * weight;
764             iter->int1 = 1;
765             break;
766           case MEAN:
767             iter->dbl[0] += v->f * weight;
768             iter->dbl[1] += weight;
769             break;
770           case MEDIAN:
771             {
772               double wv ;
773               struct ccase cout;
774               case_create (&cout, 2);
775
776               case_data_rw (&cout, iter->subject)->f =
777                 case_data (input, iter->src)->f;
778
779               wv = dict_get_case_weight (agr->src_dict, input, NULL);
780
781               case_data_rw (&cout, iter->weight)->f = wv;
782
783               iter->cc += wv;
784
785               casewriter_write (iter->writer, &cout);
786               case_destroy (&cout);
787             }
788             break;
789           case SD:
790             moments1_add (iter->moments, v->f, weight);
791             break;
792           case MAX:
793             iter->dbl[0] = MAX (iter->dbl[0], v->f);
794             iter->int1 = 1;
795             break;
796           case MAX | FSTRING:
797             if (memcmp (iter->string, v->s, src_width) < 0)
798               memcpy (iter->string, v->s, src_width);
799             iter->int1 = 1;
800             break;
801           case MIN:
802             iter->dbl[0] = MIN (iter->dbl[0], v->f);
803             iter->int1 = 1;
804             break;
805           case MIN | FSTRING:
806             if (memcmp (iter->string, v->s, src_width) > 0)
807               memcpy (iter->string, v->s, src_width);
808             iter->int1 = 1;
809             break;
810           case FGT:
811           case PGT:
812             if (v->f > iter->arg[0].f)
813               iter->dbl[0] += weight;
814             iter->dbl[1] += weight;
815             break;
816           case FGT | FSTRING:
817           case PGT | FSTRING:
818             if (memcmp (iter->arg[0].c, v->s, src_width) < 0)
819               iter->dbl[0] += weight;
820             iter->dbl[1] += weight;
821             break;
822           case FLT:
823           case PLT:
824             if (v->f < iter->arg[0].f)
825               iter->dbl[0] += weight;
826             iter->dbl[1] += weight;
827             break;
828           case FLT | FSTRING:
829           case PLT | FSTRING:
830             if (memcmp (iter->arg[0].c, v->s, src_width) > 0)
831               iter->dbl[0] += weight;
832             iter->dbl[1] += weight;
833             break;
834           case FIN:
835           case PIN:
836             if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
837               iter->dbl[0] += weight;
838             iter->dbl[1] += weight;
839             break;
840           case FIN | FSTRING:
841           case PIN | FSTRING:
842             if (memcmp (iter->arg[0].c, v->s, src_width) <= 0
843                 && memcmp (iter->arg[1].c, v->s, src_width) >= 0)
844               iter->dbl[0] += weight;
845             iter->dbl[1] += weight;
846             break;
847           case FOUT:
848           case POUT:
849             if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
850               iter->dbl[0] += weight;
851             iter->dbl[1] += weight;
852             break;
853           case FOUT | FSTRING:
854           case POUT | FSTRING:
855             if (memcmp (iter->arg[0].c, v->s, src_width) > 0
856                 || memcmp (iter->arg[1].c, v->s, src_width) < 0)
857               iter->dbl[0] += weight;
858             iter->dbl[1] += weight;
859             break;
860           case N:
861           case N | FSTRING:
862             iter->dbl[0] += weight;
863             break;
864           case NU:
865           case NU | FSTRING:
866             iter->int1++;
867             break;
868           case FIRST:
869             if (iter->int1 == 0)
870               {
871                 iter->dbl[0] = v->f;
872                 iter->int1 = 1;
873               }
874             break;
875           case FIRST | FSTRING:
876             if (iter->int1 == 0)
877               {
878                 memcpy (iter->string, v->s, src_width);
879                 iter->int1 = 1;
880               }
881             break;
882           case LAST:
883             iter->dbl[0] = v->f;
884             iter->int1 = 1;
885             break;
886           case LAST | FSTRING:
887             memcpy (iter->string, v->s, src_width);
888             iter->int1 = 1;
889             break;
890           case NMISS:
891           case NMISS | FSTRING:
892           case NUMISS:
893           case NUMISS | FSTRING:
894             /* Our value is not missing or it would have been
895                caught earlier.  Nothing to do. */
896             break;
897           default:
898             NOT_REACHED ();
899           }
900     } else {
901       switch (iter->function)
902         {
903         case N_NO_VARS:
904           iter->dbl[0] += weight;
905           break;
906         case NU_NO_VARS:
907           iter->int1++;
908           break;
909         default:
910           NOT_REACHED ();
911         }
912     }
913 }
914
915 /* Writes an aggregated record to OUTPUT. */
916 static void
917 dump_aggregate_info (struct agr_proc *agr, struct casewriter *output)
918 {
919   struct ccase c;
920
921   case_create (&c, dict_get_next_value_idx (agr->dict));
922
923   {
924     int value_idx = 0;
925     int i;
926
927     for (i = 0; i < agr->break_var_cnt; i++)
928       {
929         const struct variable *v = agr->break_vars[i];
930         size_t value_cnt = var_get_value_cnt (v);
931         memcpy (case_data_rw_idx (&c, value_idx),
932                 case_data (&agr->break_case, v),
933                 sizeof (union value) * value_cnt);
934         value_idx += value_cnt;
935       }
936   }
937
938   {
939     struct agr_var *i;
940
941     for (i = agr->agr_vars; i; i = i->next)
942       {
943         union value *v = case_data_rw (&c, i->dest);
944
945
946         if (agr->missing == COLUMNWISE && i->saw_missing
947             && (i->function & FUNC) != N && (i->function & FUNC) != NU
948             && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
949           {
950             if (var_is_alpha (i->dest))
951               memset (v->s, ' ', var_get_width (i->dest));
952             else
953               v->f = SYSMIS;
954
955             casewriter_destroy (i->writer);
956
957             continue;
958           }
959
960         switch (i->function)
961           {
962           case SUM:
963             v->f = i->int1 ? i->dbl[0] : SYSMIS;
964             break;
965           case MEAN:
966             v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
967             break;
968           case MEDIAN:
969             {
970               struct casereader *sorted_reader;
971               struct order_stats *median = percentile_create (0.5, i->cc);
972
973               sorted_reader = casewriter_make_reader (i->writer);
974
975               order_stats_accumulate (&median, 1,
976                                       sorted_reader,
977                                       i->weight,
978                                       i->subject,
979                                       i->exclude);
980
981               v->f = percentile_calculate ((struct percentile *) median,
982                                            PC_HAVERAGE);
983
984               statistic_destroy ((struct statistic *) median);
985             }
986             break;
987           case SD:
988             {
989               double variance;
990
991               /* FIXME: we should use two passes. */
992               moments1_calculate (i->moments, NULL, NULL, &variance,
993                                  NULL, NULL);
994               if (variance != SYSMIS)
995                 v->f = sqrt (variance);
996               else
997                 v->f = SYSMIS;
998             }
999             break;
1000           case MAX:
1001           case MIN:
1002             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1003             break;
1004           case MAX | FSTRING:
1005           case MIN | FSTRING:
1006             if (i->int1)
1007               memcpy (v->s, i->string, var_get_width (i->dest));
1008             else
1009               memset (v->s, ' ', var_get_width (i->dest));
1010             break;
1011           case FGT:
1012           case FGT | FSTRING:
1013           case FLT:
1014           case FLT | FSTRING:
1015           case FIN:
1016           case FIN | FSTRING:
1017           case FOUT:
1018           case FOUT | FSTRING:
1019             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
1020             break;
1021           case PGT:
1022           case PGT | FSTRING:
1023           case PLT:
1024           case PLT | FSTRING:
1025           case PIN:
1026           case PIN | FSTRING:
1027           case POUT:
1028           case POUT | FSTRING:
1029             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
1030             break;
1031           case N:
1032           case N | FSTRING:
1033             v->f = i->dbl[0];
1034             break;
1035           case NU:
1036           case NU | FSTRING:
1037             v->f = i->int1;
1038             break;
1039           case FIRST:
1040           case LAST:
1041             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1042             break;
1043           case FIRST | FSTRING:
1044           case LAST | FSTRING:
1045             if (i->int1)
1046               memcpy (v->s, i->string, var_get_width (i->dest));
1047             else
1048               memset (v->s, ' ', var_get_width (i->dest));
1049             break;
1050           case N_NO_VARS:
1051             v->f = i->dbl[0];
1052             break;
1053           case NU_NO_VARS:
1054             v->f = i->int1;
1055             break;
1056           case NMISS:
1057           case NMISS | FSTRING:
1058             v->f = i->dbl[0];
1059             break;
1060           case NUMISS:
1061           case NUMISS | FSTRING:
1062             v->f = i->int1;
1063             break;
1064           default:
1065             NOT_REACHED ();
1066           }
1067       }
1068   }
1069
1070   casewriter_write (output, &c);
1071 }
1072
1073 /* Resets the state for all the aggregate functions. */
1074 static void
1075 initialize_aggregate_info (struct agr_proc *agr, const struct ccase *input)
1076 {
1077   struct agr_var *iter;
1078
1079   case_destroy (&agr->break_case);
1080   case_clone (&agr->break_case, input);
1081
1082   for (iter = agr->agr_vars; iter; iter = iter->next)
1083     {
1084       iter->saw_missing = false;
1085       iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1086       iter->int1 = iter->int2 = 0;
1087       switch (iter->function)
1088         {
1089         case MIN:
1090           iter->dbl[0] = DBL_MAX;
1091           break;
1092         case MIN | FSTRING:
1093           memset (iter->string, 255, var_get_width (iter->src));
1094           break;
1095         case MAX:
1096           iter->dbl[0] = -DBL_MAX;
1097           break;
1098         case MAX | FSTRING:
1099           memset (iter->string, 0, var_get_width (iter->src));
1100           break;
1101         case MEDIAN:
1102           {
1103             struct subcase ordering;
1104
1105             if ( ! iter->subject)
1106               iter->subject = var_create_internal (0);
1107
1108             if ( ! iter->weight)
1109               iter->weight = var_create_internal (1);
1110
1111             subcase_init_var (&ordering, iter->subject, SC_ASCEND);
1112             iter->writer = sort_create_writer (&ordering, 2);
1113             subcase_destroy (&ordering);
1114
1115             iter->cc = 0;
1116           }
1117           break;
1118         case SD:
1119           if (iter->moments == NULL)
1120             iter->moments = moments1_create (MOMENT_VARIANCE);
1121           else
1122             moments1_clear (iter->moments);
1123           break;
1124         default:
1125           break;
1126         }
1127     }
1128 }