297d2abee4cc495cc17f1aaedd4da0d9e00f53bf
[pspp-builds.git] / src / language / stats / aggregate.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
3
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License as
6    published by the Free Software Foundation; either version 2 of the
7    License, or (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful, but
10    WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12    General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
17    02110-1301, USA. */
18
19 #include <config.h>
20
21 #include <stdlib.h>
22
23 #include <data/any-writer.h>
24 #include <data/case-sink.h>
25 #include <data/case.h>
26 #include <data/casefile.h>
27 #include <data/dictionary.h>
28 #include <data/file-handle-def.h>
29 #include <data/format.h>
30 #include <data/procedure.h>
31 #include <data/settings.h>
32 #include <data/storage-stream.h>
33 #include <data/sys-file-writer.h>
34 #include <data/variable.h>
35 #include <language/command.h>
36 #include <language/data-io/file-handle.h>
37 #include <language/lexer/lexer.h>
38 #include <language/lexer/variable-parser.h>
39 #include <language/stats/sort-criteria.h>
40 #include <libpspp/alloc.h>
41 #include <libpspp/assertion.h>
42 #include <libpspp/message.h>
43 #include <libpspp/misc.h>
44 #include <libpspp/pool.h>
45 #include <libpspp/str.h>
46 #include <math/moments.h>
47 #include <math/sort.h>
48
49 #include "minmax.h"
50
51 #include "gettext.h"
52 #define _(msgid) gettext (msgid)
53
54 /* Argument for AGGREGATE function. */
55 union agr_argument
56   {
57     double f;                           /* Numeric. */
58     char *c;                            /* Short or long string. */
59   };
60
61 /* Specifies how to make an aggregate variable. */
62 struct agr_var
63   {
64     struct agr_var *next;               /* Next in list. */
65
66     /* Collected during parsing. */
67     const struct variable *src; /* Source variable. */
68     struct variable *dest;      /* Target variable. */
69     int function;               /* Function. */
70     enum mv_class exclude;      /* Classes of missing values to exclude. */
71     union agr_argument arg[2];  /* Arguments. */
72
73     /* Accumulated during AGGREGATE execution. */
74     double dbl[3];
75     int int1, int2;
76     char *string;
77     bool saw_missing;
78     struct moments1 *moments;
79   };
80
81 /* Aggregation functions. */
82 enum
83   {
84     NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
85     FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
86     N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
87     FUNC = 0x1f, /* Function mask. */
88     FSTRING = 1<<5, /* String function bit. */
89   };
90
91 /* Attributes of an aggregation function. */
92 struct agr_func
93   {
94     const char *name;           /* Aggregation function name. */
95     size_t n_args;              /* Number of arguments. */
96     enum var_type alpha_type;   /* When given ALPHA arguments, output type. */
97     struct fmt_spec format;     /* Format spec if alpha_type != ALPHA. */
98   };
99
100 /* Attributes of aggregation functions. */
101 static const struct agr_func agr_func_tab[] = 
102   {
103     {"<NONE>",  0, -1,          {0, 0, 0}},
104     {"SUM",     0, -1,          {FMT_F, 8, 2}},
105     {"MEAN",    0, -1,          {FMT_F, 8, 2}},
106     {"SD",      0, -1,          {FMT_F, 8, 2}},
107     {"MAX",     0, VAR_STRING,  {-1, -1, -1}}, 
108     {"MIN",     0, VAR_STRING,  {-1, -1, -1}}, 
109     {"PGT",     1, VAR_NUMERIC, {FMT_F, 5, 1}},      
110     {"PLT",     1, VAR_NUMERIC, {FMT_F, 5, 1}},       
111     {"PIN",     2, VAR_NUMERIC, {FMT_F, 5, 1}},       
112     {"POUT",    2, VAR_NUMERIC, {FMT_F, 5, 1}},       
113     {"FGT",     1, VAR_NUMERIC, {FMT_F, 5, 3}},       
114     {"FLT",     1, VAR_NUMERIC, {FMT_F, 5, 3}},       
115     {"FIN",     2, VAR_NUMERIC, {FMT_F, 5, 3}},       
116     {"FOUT",    2, VAR_NUMERIC, {FMT_F, 5, 3}},       
117     {"N",       0, VAR_NUMERIC, {FMT_F, 7, 0}},       
118     {"NU",      0, VAR_NUMERIC, {FMT_F, 7, 0}},       
119     {"NMISS",   0, VAR_NUMERIC, {FMT_F, 7, 0}},       
120     {"NUMISS",  0, VAR_NUMERIC, {FMT_F, 7, 0}},       
121     {"FIRST",   0, VAR_STRING,  {-1, -1, -1}}, 
122     {"LAST",    0, VAR_STRING,  {-1, -1, -1}},
123     {NULL,      0, -1,          {-1, -1, -1}},
124     {"N",       0, VAR_NUMERIC, {FMT_F, 7, 0}},
125     {"NU",      0, VAR_NUMERIC, {FMT_F, 7, 0}},
126   };
127
128 /* Missing value types. */
129 enum missing_treatment
130   {
131     ITEMWISE,           /* Missing values item by item. */
132     COLUMNWISE          /* Missing values column by column. */
133   };
134
135 /* An entire AGGREGATE procedure. */
136 struct agr_proc 
137   {
138     /* We have either an output file or a sink. */
139     struct any_writer *writer;          /* Output file, or null if none. */
140     struct case_sink *sink;             /* Sink, or null if none. */
141
142     /* Break variables. */
143     struct sort_criteria *sort;         /* Sort criteria. */
144     const struct variable **break_vars;       /* Break variables. */
145     size_t break_var_cnt;               /* Number of break variables. */
146     struct ccase break_case;            /* Last values of break variables. */
147
148     enum missing_treatment missing;     /* How to treat missing values. */
149     struct agr_var *agr_vars;           /* First aggregate variable. */
150     struct dictionary *dict;            /* Aggregate dictionary. */
151     const struct dictionary *src_dict;  /* Dict of the source */
152     int case_cnt;                       /* Counts aggregated cases. */
153     struct ccase agr_case;              /* Aggregate case for output. */
154   };
155
156 static void initialize_aggregate_info (struct agr_proc *,
157                                        const struct ccase *);
158
159 /* Prototypes. */
160 static bool parse_aggregate_functions (struct lexer *, const struct dictionary *,
161                                        struct agr_proc *);
162 static void agr_destroy (struct agr_proc *);
163 static bool aggregate_single_case (struct agr_proc *agr,
164                                    const struct ccase *input,
165                                    struct ccase *output);
166 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
167 \f
168 /* Parsing. */
169
170 /* Parses and executes the AGGREGATE procedure. */
171 int
172 cmd_aggregate (struct lexer *lexer, struct dataset *ds)
173 {
174   struct dictionary *dict = dataset_dict (ds);
175   struct agr_proc agr;
176   struct file_handle *out_file = NULL;
177
178   bool copy_documents = false;
179   bool presorted = false;
180   bool saw_direction;
181
182   memset(&agr, 0 , sizeof (agr));
183   agr.missing = ITEMWISE;
184   case_nullify (&agr.break_case);
185
186   agr.dict = dict_create ();
187   agr.src_dict = dict;
188   dict_set_label (agr.dict, dict_get_label (dict));
189   dict_set_documents (agr.dict, dict_get_documents (dict));
190
191   /* OUTFILE subcommand must be first. */
192   if (!lex_force_match_id (lexer, "OUTFILE"))
193     goto error;
194   lex_match (lexer, '=');
195   if (!lex_match (lexer, '*'))
196     {
197       out_file = fh_parse (lexer, FH_REF_FILE | FH_REF_SCRATCH);
198       if (out_file == NULL)
199         goto error;
200     }
201   
202   /* Read most of the subcommands. */
203   for (;;)
204     {
205       lex_match (lexer, '/');
206       
207       if (lex_match_id (lexer, "MISSING"))
208         {
209           lex_match (lexer, '=');
210           if (!lex_match_id (lexer, "COLUMNWISE"))
211             {
212               lex_error (lexer, _("while expecting COLUMNWISE"));
213               goto error;
214             }
215           agr.missing = COLUMNWISE;
216         }
217       else if (lex_match_id (lexer, "DOCUMENT"))
218         copy_documents = true;
219       else if (lex_match_id (lexer, "PRESORTED"))
220         presorted = true;
221       else if (lex_match_id (lexer, "BREAK"))
222         {
223           int i;
224
225           lex_match (lexer, '=');
226           agr.sort = sort_parse_criteria (lexer, dict,
227                                           &agr.break_vars, &agr.break_var_cnt,
228                                           &saw_direction, NULL);
229           if (agr.sort == NULL)
230             goto error;
231           
232           for (i = 0; i < agr.break_var_cnt; i++)
233             dict_clone_var_assert (agr.dict, agr.break_vars[i],
234                                    var_get_name (agr.break_vars[i]));
235
236           /* BREAK must follow the options. */
237           break;
238         }
239       else
240         {
241           lex_error (lexer, _("expecting BREAK"));
242           goto error;
243         }
244     }
245   if (presorted && saw_direction)
246     msg (SW, _("When PRESORTED is specified, specifying sorting directions "
247                "with (A) or (D) has no effect.  Output data will be sorted "
248                "the same way as the input data."));
249       
250   /* Read in the aggregate functions. */
251   lex_match (lexer, '/');
252   if (!parse_aggregate_functions (lexer, dict, &agr))
253     goto error;
254
255   /* Delete documents. */
256   if (!copy_documents)
257     dict_clear_documents (agr.dict);
258
259   /* Cancel SPLIT FILE. */
260   dict_set_split_vars (agr.dict, NULL, 0);
261   
262   /* Initialize. */
263   agr.case_cnt = 0;
264   case_create (&agr.agr_case, dict_get_next_value_idx (agr.dict));
265
266   /* Output to active file or external file? */
267   if (out_file == NULL) 
268     {
269       struct ccase *c;
270       
271       /* The active file will be replaced by the aggregated data,
272          so TEMPORARY is moot. */
273       proc_cancel_temporary_transformations (ds);
274
275       if (agr.sort != NULL && !presorted) 
276         {
277           if (!sort_active_file_in_place (ds, agr.sort))
278             goto error;
279         }
280
281       agr.sink = create_case_sink (&storage_sink_class, agr.dict,
282                                    dataset_get_casefile_factory (ds),
283                                    NULL);
284       if (agr.sink->class->open != NULL)
285         agr.sink->class->open (agr.sink);
286       proc_set_sink (ds, 
287                      create_case_sink (&null_sink_class, dict,
288                                        dataset_get_casefile_factory (ds),
289                                        NULL));
290       proc_open (ds);
291       while (proc_read (ds, &c))
292         if (aggregate_single_case (&agr, c, &agr.agr_case)) 
293           if (!agr.sink->class->write (agr.sink, &agr.agr_case)) 
294             {
295               proc_close (ds);
296               goto error; 
297             }
298       if (!proc_close (ds))
299         goto error;
300
301       if (agr.case_cnt > 0) 
302         {
303           dump_aggregate_info (&agr, &agr.agr_case);
304           if (!agr.sink->class->write (agr.sink, &agr.agr_case))
305             goto error;
306         }
307       discard_variables (ds);
308       dataset_set_dict (ds, agr.dict);
309       agr.dict = NULL;
310       proc_set_source (ds, agr.sink->class->make_source (agr.sink));
311       free_case_sink (agr.sink);
312     }
313   else
314     {
315       agr.writer = any_writer_open (out_file, agr.dict);
316       if (agr.writer == NULL)
317         goto error;
318       
319       if (agr.sort != NULL && !presorted) 
320         {
321           /* Sorting is needed. */
322           struct casefile *dst;
323           struct casereader *reader;
324           struct ccase c;
325           bool ok = true;
326           
327           dst = sort_active_file_to_casefile (ds, agr.sort);
328           if (dst == NULL)
329             goto error;
330           reader = casefile_get_destructive_reader (dst);
331           while (ok && casereader_read_xfer (reader, &c)) 
332             {
333               if (aggregate_single_case (&agr, &c, &agr.agr_case)) 
334                 ok = any_writer_write (agr.writer, &agr.agr_case);
335               case_destroy (&c);
336             }
337           casereader_destroy (reader);
338           if (ok)
339             ok = !casefile_error (dst);
340           casefile_destroy (dst);
341           if (!ok)
342             goto error;
343         }
344       else 
345         {
346           /* Active file is already sorted. */
347           struct ccase *c;
348           
349           proc_open (ds);
350           while (proc_read (ds, &c))
351             if (aggregate_single_case (&agr, c, &agr.agr_case)) 
352               if (!any_writer_write (agr.writer, &agr.agr_case)) 
353                 {
354                   proc_close (ds);
355                   goto error;
356                 }
357           if (!proc_close (ds))
358             goto error;
359         }
360       
361       if (agr.case_cnt > 0) 
362         {
363           dump_aggregate_info (&agr, &agr.agr_case);
364           any_writer_write (agr.writer, &agr.agr_case);
365         }
366       if (any_writer_error (agr.writer))
367         goto error;
368     }
369   
370   agr_destroy (&agr);
371   return CMD_SUCCESS;
372
373 error:
374   agr_destroy (&agr);
375   return CMD_CASCADING_FAILURE;
376 }
377
378 /* Parse all the aggregate functions. */
379 static bool
380 parse_aggregate_functions (struct lexer *lexer, const struct dictionary *dict, struct agr_proc *agr)
381 {
382   struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
383
384   /* Parse everything. */
385   tail = NULL;
386   for (;;)
387     {
388       char **dest;
389       char **dest_label;
390       size_t n_dest;
391       struct string function_name;
392
393       enum mv_class exclude;
394       const struct agr_func *function;
395       int func_index;
396
397       union agr_argument arg[2];
398
399       const struct variable **src;
400       size_t n_src;
401
402       size_t i;
403
404       dest = NULL;
405       dest_label = NULL;
406       n_dest = 0;
407       src = NULL;
408       function = NULL;
409       n_src = 0;
410       arg[0].c = NULL;
411       arg[1].c = NULL;
412       ds_init_empty (&function_name);
413
414       /* Parse the list of target variables. */
415       while (!lex_match (lexer, '='))
416         {
417           size_t n_dest_prev = n_dest;
418           
419           if (!parse_DATA_LIST_vars (lexer, &dest, &n_dest,
420                                      PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
421             goto error;
422
423           /* Assign empty labels. */
424           {
425             int j;
426
427             dest_label = xnrealloc (dest_label, n_dest, sizeof *dest_label);
428             for (j = n_dest_prev; j < n_dest; j++)
429               dest_label[j] = NULL;
430           }
431
432
433           
434           if (lex_token (lexer) == T_STRING)
435             {
436               struct string label;
437               ds_init_string (&label, lex_tokstr (lexer));
438
439               ds_truncate (&label, 255);
440               dest_label[n_dest - 1] = ds_xstrdup (&label);
441               lex_get (lexer);
442               ds_destroy (&label);
443             }
444         }
445
446       /* Get the name of the aggregation function. */
447       if (lex_token (lexer) != T_ID)
448         {
449           lex_error (lexer, _("expecting aggregation function"));
450           goto error;
451         }
452
453       exclude = MV_ANY;
454
455       ds_assign_string (&function_name, lex_tokstr (lexer));
456
457       ds_chomp (&function_name, '.');
458
459       if (lex_tokid(lexer)[strlen (lex_tokid (lexer)) - 1] == '.')
460         exclude = MV_SYSTEM;
461
462       for (function = agr_func_tab; function->name; function++)
463         if (!strcasecmp (function->name, ds_cstr (&function_name)))
464           break;
465       if (NULL == function->name)
466         {
467           msg (SE, _("Unknown aggregation function %s."), 
468                ds_cstr (&function_name));
469           goto error;
470         }
471       ds_destroy (&function_name);
472       func_index = function - agr_func_tab;
473       lex_get (lexer);
474
475       /* Check for leading lparen. */
476       if (!lex_match (lexer, '('))
477         {
478           if (func_index == N)
479             func_index = N_NO_VARS;
480           else if (func_index == NU)
481             func_index = NU_NO_VARS;
482           else
483             {
484               lex_error (lexer, _("expecting `('"));
485               goto error;
486             }
487         }
488       else
489         {
490           /* Parse list of source variables. */
491           {
492             int pv_opts = PV_NO_SCRATCH;
493
494             if (func_index == SUM || func_index == MEAN || func_index == SD)
495               pv_opts |= PV_NUMERIC;
496             else if (function->n_args)
497               pv_opts |= PV_SAME_TYPE;
498
499             if (!parse_variables_const (lexer, dict, &src, &n_src, pv_opts))
500               goto error;
501           }
502
503           /* Parse function arguments, for those functions that
504              require arguments. */
505           if (function->n_args != 0)
506             for (i = 0; i < function->n_args; i++)
507               {
508                 int type;
509             
510                 lex_match (lexer, ',');
511                 if (lex_token (lexer) == T_STRING)
512                   {
513                     arg[i].c = ds_xstrdup (lex_tokstr (lexer));
514                     type = VAR_STRING;
515                   }
516                 else if (lex_is_number (lexer))
517                   {
518                     arg[i].f = lex_tokval (lexer);
519                     type = VAR_NUMERIC;
520                   }
521                 else
522                   {
523                     msg (SE, _("Missing argument %d to %s."),
524                          (int) i + 1, function->name);
525                     goto error;
526                   }
527             
528                 lex_get (lexer);
529
530                 if (type != var_get_type (src[0]))
531                   {
532                     msg (SE, _("Arguments to %s must be of same type as "
533                                "source variables."),
534                          function->name);
535                     goto error;
536                   }
537               }
538
539           /* Trailing rparen. */
540           if (!lex_match (lexer, ')'))
541             {
542               lex_error (lexer, _("expecting `)'"));
543               goto error;
544             }
545           
546           /* Now check that the number of source variables match
547              the number of target variables.  If we check earlier
548              than this, the user can get very misleading error
549              message, i.e. `AGGREGATE x=SUM(y t).' will get this
550              error message when a proper message would be more
551              like `unknown variable t'. */
552           if (n_src != n_dest)
553             {
554               msg (SE, _("Number of source variables (%u) does not match "
555                          "number of target variables (%u)."),
556                    (unsigned) n_src, (unsigned) n_dest);
557               goto error;
558             }
559
560           if ((func_index == PIN || func_index == POUT
561               || func_index == FIN || func_index == FOUT) 
562               && (var_is_numeric (src[0])
563                   ? arg[0].f > arg[1].f
564                   : str_compare_rpad (arg[0].c, arg[1].c) > 0))
565             {
566               union agr_argument t = arg[0];
567               arg[0] = arg[1];
568               arg[1] = t;
569                   
570               msg (SW, _("The value arguments passed to the %s function "
571                          "are out-of-order.  They will be treated as if "
572                          "they had been specified in the correct order."),
573                    function->name);
574             }
575         }
576         
577       /* Finally add these to the linked list of aggregation
578          variables. */
579       for (i = 0; i < n_dest; i++)
580         {
581           struct agr_var *v = xmalloc (sizeof *v);
582
583           /* Add variable to chain. */
584           if (agr->agr_vars != NULL)
585             tail->next = v;
586           else
587             agr->agr_vars = v;
588           tail = v;
589           tail->next = NULL;
590           v->moments = NULL;
591           
592           /* Create the target variable in the aggregate
593              dictionary. */
594           {
595             struct variable *destvar;
596             
597             v->function = func_index;
598
599             if (src)
600               {
601                 v->src = src[i];
602                 
603                 if (var_is_alpha (src[i]))
604                   {
605                     v->function |= FSTRING;
606                     v->string = xmalloc (var_get_width (src[i]));
607                   }
608
609                 if (function->alpha_type == VAR_STRING)
610                   destvar = dict_clone_var (agr->dict, v->src, dest[i]);
611                 else
612                   {
613                     assert (var_is_numeric (v->src)
614                             || function->alpha_type == VAR_NUMERIC);
615                     destvar = dict_create_var (agr->dict, dest[i], 0);
616                     if (destvar != NULL) 
617                       {
618                         struct fmt_spec f;
619                         if ((func_index == N || func_index == NMISS)
620                             && dict_get_weight (dict) != NULL)
621                           f = fmt_for_output (FMT_F, 8, 2); 
622                         else
623                           f = function->format;
624                         var_set_both_formats (destvar, &f);
625                       }
626                   }
627               } else {
628                 struct fmt_spec f;
629                 v->src = NULL;
630                 destvar = dict_create_var (agr->dict, dest[i], 0);
631                 if (func_index == N_NO_VARS && dict_get_weight (dict) != NULL)
632                   f = fmt_for_output (FMT_F, 8, 2); 
633                 else
634                   f = function->format;
635                 var_set_both_formats (destvar, &f);
636               }
637           
638             if (!destvar)
639               {
640                 msg (SE, _("Variable name %s is not unique within the "
641                            "aggregate file dictionary, which contains "
642                            "the aggregate variables and the break "
643                            "variables."),
644                      dest[i]);
645                 goto error;
646               }
647
648             free (dest[i]);
649             if (dest_label[i])
650               var_set_label (destvar, dest_label[i]);
651
652             v->dest = destvar;
653           }
654           
655           v->exclude = exclude;
656
657           if (v->src != NULL)
658             {
659               int j;
660
661               if (var_is_numeric (v->src))
662                 for (j = 0; j < function->n_args; j++)
663                   v->arg[j].f = arg[j].f;
664               else
665                 for (j = 0; j < function->n_args; j++)
666                   v->arg[j].c = xstrdup (arg[j].c);
667             }
668         }
669       
670       if (src != NULL && var_is_alpha (src[0]))
671         for (i = 0; i < function->n_args; i++)
672           {
673             free (arg[i].c);
674             arg[i].c = NULL;
675           }
676
677       free (src);
678       free (dest);
679       free (dest_label);
680
681       if (!lex_match (lexer, '/'))
682         {
683           if (lex_token (lexer) == '.')
684             return true;
685
686           lex_error (lexer, "expecting end of command");
687           return false;
688         }
689       continue;
690       
691     error:
692       ds_destroy (&function_name);
693       for (i = 0; i < n_dest; i++)
694         {
695           free (dest[i]);
696           free (dest_label[i]);
697         }
698       free (dest);
699       free (dest_label);
700       free (arg[0].c);
701       free (arg[1].c);
702       if (src && n_src && var_is_alpha (src[0]))
703         for (i = 0; i < function->n_args; i++)
704           {
705             free (arg[i].c);
706             arg[i].c = NULL;
707           }
708       free (src);
709         
710       return false;
711     }
712 }
713
714 /* Destroys AGR. */
715 static void
716 agr_destroy (struct agr_proc *agr)
717 {
718   struct agr_var *iter, *next;
719
720   any_writer_close (agr->writer);
721   if (agr->sort != NULL)
722     sort_destroy_criteria (agr->sort);
723   free (agr->break_vars);
724   case_destroy (&agr->break_case);
725   for (iter = agr->agr_vars; iter; iter = next)
726     {
727       next = iter->next;
728
729       if (iter->function & FSTRING)
730         {
731           size_t n_args;
732           size_t i;
733
734           n_args = agr_func_tab[iter->function & FUNC].n_args;
735           for (i = 0; i < n_args; i++)
736             free (iter->arg[i].c);
737           free (iter->string);
738         }
739       else if (iter->function == SD)
740         moments1_destroy (iter->moments);
741       free (iter);
742     }
743   if (agr->dict != NULL)
744     dict_destroy (agr->dict);
745
746   case_destroy (&agr->agr_case);
747 }
748 \f
749 /* Execution. */
750
751 static void accumulate_aggregate_info (struct agr_proc *,
752                                        const struct ccase *);
753 static void dump_aggregate_info (struct agr_proc *, struct ccase *);
754
755 /* Processes a single case INPUT for aggregation.  If output is
756    warranted, writes it to OUTPUT and returns true.
757    Otherwise, returns false and OUTPUT is unmodified. */
758 static bool
759 aggregate_single_case (struct agr_proc *agr,
760                        const struct ccase *input, struct ccase *output)
761 {
762   bool finished_group = false;
763   
764   if (agr->case_cnt++ == 0)
765     initialize_aggregate_info (agr, input);
766   else if (case_compare (&agr->break_case, input,
767                          agr->break_vars, agr->break_var_cnt))
768     {
769       dump_aggregate_info (agr, output);
770       finished_group = true;
771
772       initialize_aggregate_info (agr, input);
773     }
774
775   accumulate_aggregate_info (agr, input);
776   return finished_group;
777 }
778
779 /* Accumulates aggregation data from the case INPUT. */
780 static void 
781 accumulate_aggregate_info (struct agr_proc *agr,
782                            const struct ccase *input)
783 {
784   struct agr_var *iter;
785   double weight;
786   bool bad_warn = true;
787
788   weight = dict_get_case_weight (agr->src_dict, input, &bad_warn);
789
790   for (iter = agr->agr_vars; iter; iter = iter->next)
791     if (iter->src)
792       {
793         const union value *v = case_data (input, iter->src);
794         int src_width = var_get_width (iter->src);
795
796         if (var_is_value_missing (iter->src, v, iter->exclude))
797           {
798             switch (iter->function)
799               {
800               case NMISS:
801               case NMISS | FSTRING:
802                 iter->dbl[0] += weight;
803                 break;
804               case NUMISS:
805               case NUMISS | FSTRING:
806                 iter->int1++;
807                 break;
808               }
809             iter->saw_missing = true;
810             continue;
811           }
812         
813         /* This is horrible.  There are too many possibilities. */
814         switch (iter->function)
815           {
816           case SUM:
817             iter->dbl[0] += v->f * weight;
818             iter->int1 = 1;
819             break;
820           case MEAN:
821             iter->dbl[0] += v->f * weight;
822             iter->dbl[1] += weight;
823             break;
824           case SD:
825             moments1_add (iter->moments, v->f, weight);
826             break;
827           case MAX:
828             iter->dbl[0] = MAX (iter->dbl[0], v->f);
829             iter->int1 = 1;
830             break;
831           case MAX | FSTRING:
832             if (memcmp (iter->string, v->s, src_width) < 0)
833               memcpy (iter->string, v->s, src_width);
834             iter->int1 = 1;
835             break;
836           case MIN:
837             iter->dbl[0] = MIN (iter->dbl[0], v->f);
838             iter->int1 = 1;
839             break;
840           case MIN | FSTRING:
841             if (memcmp (iter->string, v->s, src_width) > 0)
842               memcpy (iter->string, v->s, src_width);
843             iter->int1 = 1;
844             break;
845           case FGT:
846           case PGT:
847             if (v->f > iter->arg[0].f)
848               iter->dbl[0] += weight;
849             iter->dbl[1] += weight;
850             break;
851           case FGT | FSTRING:
852           case PGT | FSTRING:
853             if (memcmp (iter->arg[0].c, v->s, src_width) < 0)
854               iter->dbl[0] += weight;
855             iter->dbl[1] += weight;
856             break;
857           case FLT:
858           case PLT:
859             if (v->f < iter->arg[0].f)
860               iter->dbl[0] += weight;
861             iter->dbl[1] += weight;
862             break;
863           case FLT | FSTRING:
864           case PLT | FSTRING:
865             if (memcmp (iter->arg[0].c, v->s, src_width) > 0)
866               iter->dbl[0] += weight;
867             iter->dbl[1] += weight;
868             break;
869           case FIN:
870           case PIN:
871             if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
872               iter->dbl[0] += weight;
873             iter->dbl[1] += weight;
874             break;
875           case FIN | FSTRING:
876           case PIN | FSTRING:
877             if (memcmp (iter->arg[0].c, v->s, src_width) <= 0
878                 && memcmp (iter->arg[1].c, v->s, src_width) >= 0)
879               iter->dbl[0] += weight;
880             iter->dbl[1] += weight;
881             break;
882           case FOUT:
883           case POUT:
884             if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
885               iter->dbl[0] += weight;
886             iter->dbl[1] += weight;
887             break;
888           case FOUT | FSTRING:
889           case POUT | FSTRING:
890             if (memcmp (iter->arg[0].c, v->s, src_width) > 0
891                 || memcmp (iter->arg[1].c, v->s, src_width) < 0)
892               iter->dbl[0] += weight;
893             iter->dbl[1] += weight;
894             break;
895           case N:
896           case N | FSTRING:
897             iter->dbl[0] += weight;
898             break;
899           case NU:
900           case NU | FSTRING:
901             iter->int1++;
902             break;
903           case FIRST:
904             if (iter->int1 == 0)
905               {
906                 iter->dbl[0] = v->f;
907                 iter->int1 = 1;
908               }
909             break;
910           case FIRST | FSTRING:
911             if (iter->int1 == 0)
912               {
913                 memcpy (iter->string, v->s, src_width);
914                 iter->int1 = 1;
915               }
916             break;
917           case LAST:
918             iter->dbl[0] = v->f;
919             iter->int1 = 1;
920             break;
921           case LAST | FSTRING:
922             memcpy (iter->string, v->s, src_width);
923             iter->int1 = 1;
924             break;
925           case NMISS:
926           case NMISS | FSTRING:
927           case NUMISS:
928           case NUMISS | FSTRING:
929             /* Our value is not missing or it would have been
930                caught earlier.  Nothing to do. */
931             break;
932           default:
933             NOT_REACHED ();
934           }
935     } else {
936       switch (iter->function)
937         {
938         case N_NO_VARS:
939           iter->dbl[0] += weight;
940           break;
941         case NU_NO_VARS:
942           iter->int1++;
943           break;
944         default:
945           NOT_REACHED ();
946         }
947     }
948 }
949
950 /* We've come to a record that differs from the previous in one or
951    more of the break variables.  Make an output record from the
952    accumulated statistics in the OUTPUT case. */
953 static void 
954 dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
955 {
956   {
957     int value_idx = 0;
958     int i;
959
960     for (i = 0; i < agr->break_var_cnt; i++) 
961       {
962         const struct variable *v = agr->break_vars[i];
963         size_t value_cnt = var_get_value_cnt (v);
964         memcpy (case_data_rw_idx (output, value_idx),
965                 case_data (&agr->break_case, v),
966                 sizeof (union value) * value_cnt);
967         value_idx += value_cnt; 
968       }
969   }
970   
971   {
972     struct agr_var *i;
973   
974     for (i = agr->agr_vars; i; i = i->next)
975       {
976         union value *v = case_data_rw (output, i->dest);
977
978         if (agr->missing == COLUMNWISE && i->saw_missing
979             && (i->function & FUNC) != N && (i->function & FUNC) != NU
980             && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
981           {
982             if (var_is_alpha (i->dest))
983               memset (v->s, ' ', var_get_width (i->dest));
984             else
985               v->f = SYSMIS;
986             continue;
987           }
988         
989         switch (i->function)
990           {
991           case SUM:
992             v->f = i->int1 ? i->dbl[0] : SYSMIS;
993             break;
994           case MEAN:
995             v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
996             break;
997           case SD:
998             {
999               double variance;
1000
1001               /* FIXME: we should use two passes. */
1002               moments1_calculate (i->moments, NULL, NULL, &variance,
1003                                  NULL, NULL);
1004               if (variance != SYSMIS)
1005                 v->f = sqrt (variance);
1006               else
1007                 v->f = SYSMIS; 
1008             }
1009             break;
1010           case MAX:
1011           case MIN:
1012             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1013             break;
1014           case MAX | FSTRING:
1015           case MIN | FSTRING:
1016             if (i->int1)
1017               memcpy (v->s, i->string, var_get_width (i->dest));
1018             else
1019               memset (v->s, ' ', var_get_width (i->dest));
1020             break;
1021           case FGT:
1022           case FGT | FSTRING:
1023           case FLT:
1024           case FLT | FSTRING:
1025           case FIN:
1026           case FIN | FSTRING:
1027           case FOUT:
1028           case FOUT | FSTRING:
1029             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
1030             break;
1031           case PGT:
1032           case PGT | FSTRING:
1033           case PLT:
1034           case PLT | FSTRING:
1035           case PIN:
1036           case PIN | FSTRING:
1037           case POUT:
1038           case POUT | FSTRING:
1039             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
1040             break;
1041           case N:
1042           case N | FSTRING:
1043             v->f = i->dbl[0];
1044             break;
1045           case NU:
1046           case NU | FSTRING:
1047             v->f = i->int1;
1048             break;
1049           case FIRST:
1050           case LAST:
1051             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1052             break;
1053           case FIRST | FSTRING:
1054           case LAST | FSTRING:
1055             if (i->int1)
1056               memcpy (v->s, i->string, var_get_width (i->dest));
1057             else
1058               memset (v->s, ' ', var_get_width (i->dest));
1059             break;
1060           case N_NO_VARS:
1061             v->f = i->dbl[0];
1062             break;
1063           case NU_NO_VARS:
1064             v->f = i->int1;
1065             break;
1066           case NMISS:
1067           case NMISS | FSTRING:
1068             v->f = i->dbl[0];
1069             break;
1070           case NUMISS:
1071           case NUMISS | FSTRING:
1072             v->f = i->int1;
1073             break;
1074           default:
1075             NOT_REACHED ();
1076           }
1077       }
1078   }
1079 }
1080
1081 /* Resets the state for all the aggregate functions. */
1082 static void
1083 initialize_aggregate_info (struct agr_proc *agr, const struct ccase *input)
1084 {
1085   struct agr_var *iter;
1086
1087   case_destroy (&agr->break_case);
1088   case_clone (&agr->break_case, input);
1089
1090   for (iter = agr->agr_vars; iter; iter = iter->next)
1091     {
1092       iter->saw_missing = false;
1093       iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1094       iter->int1 = iter->int2 = 0;
1095       switch (iter->function)
1096         {
1097         case MIN:
1098           iter->dbl[0] = DBL_MAX;
1099           break;
1100         case MIN | FSTRING:
1101           memset (iter->string, 255, var_get_width (iter->src));
1102           break;
1103         case MAX:
1104           iter->dbl[0] = -DBL_MAX;
1105           break;
1106         case MAX | FSTRING:
1107           memset (iter->string, 0, var_get_width (iter->src));
1108           break;
1109         case SD:
1110           if (iter->moments == NULL)
1111             iter->moments = moments1_create (MOMENT_VARIANCE);
1112           else
1113             moments1_clear (iter->moments);
1114           break;
1115         default:
1116           break;
1117         }
1118     }
1119 }