Applied patch #5661
[pspp] / src / language / stats / aggregate.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
3
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License as
6    published by the Free Software Foundation; either version 2 of the
7    License, or (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful, but
10    WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12    General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
17    02110-1301, USA. */
18
19 #include <config.h>
20
21 #include <stdlib.h>
22
23 #include <data/any-writer.h>
24 #include <data/case-sink.h>
25 #include <data/case.h>
26 #include <data/casefile.h>
27 #include <data/dictionary.h>
28 #include <data/file-handle-def.h>
29 #include <data/format.h>
30 #include <data/procedure.h>
31 #include <data/settings.h>
32 #include <data/storage-stream.h>
33 #include <data/sys-file-writer.h>
34 #include <data/variable.h>
35 #include <language/command.h>
36 #include <language/data-io/file-handle.h>
37 #include <language/lexer/lexer.h>
38 #include <language/lexer/variable-parser.h>
39 #include <language/stats/sort-criteria.h>
40 #include <libpspp/alloc.h>
41 #include <libpspp/assertion.h>
42 #include <libpspp/message.h>
43 #include <libpspp/misc.h>
44 #include <libpspp/pool.h>
45 #include <libpspp/str.h>
46 #include <math/moments.h>
47 #include <math/sort.h>
48
49 #include "minmax.h"
50
51 #include "gettext.h"
52 #define _(msgid) gettext (msgid)
53
54 /* Argument for AGGREGATE function. */
55 union agr_argument
56   {
57     double f;                           /* Numeric. */
58     char *c;                            /* Short or long string. */
59   };
60
61 /* Specifies how to make an aggregate variable. */
62 struct agr_var
63   {
64     struct agr_var *next;               /* Next in list. */
65
66     /* Collected during parsing. */
67     struct variable *src;       /* Source variable. */
68     struct variable *dest;      /* Target variable. */
69     int function;               /* Function. */
70     enum mv_class exclude;      /* Classes of missing values to exclude. */
71     union agr_argument arg[2];  /* Arguments. */
72
73     /* Accumulated during AGGREGATE execution. */
74     double dbl[3];
75     int int1, int2;
76     char *string;
77     bool saw_missing;
78     struct moments1 *moments;
79   };
80
81 /* Aggregation functions. */
82 enum
83   {
84     NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
85     FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
86     N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
87     FUNC = 0x1f, /* Function mask. */
88     FSTRING = 1<<5, /* String function bit. */
89   };
90
91 /* Attributes of an aggregation function. */
92 struct agr_func
93   {
94     const char *name;           /* Aggregation function name. */
95     size_t n_args;              /* Number of arguments. */
96     enum var_type alpha_type;   /* When given ALPHA arguments, output type. */
97     struct fmt_spec format;     /* Format spec if alpha_type != ALPHA. */
98   };
99
100 /* Attributes of aggregation functions. */
101 static const struct agr_func agr_func_tab[] = 
102   {
103     {"<NONE>",  0, -1,          {0, 0, 0}},
104     {"SUM",     0, -1,          {FMT_F, 8, 2}},
105     {"MEAN",    0, -1,          {FMT_F, 8, 2}},
106     {"SD",      0, -1,          {FMT_F, 8, 2}},
107     {"MAX",     0, VAR_STRING,  {-1, -1, -1}}, 
108     {"MIN",     0, VAR_STRING,  {-1, -1, -1}}, 
109     {"PGT",     1, VAR_NUMERIC, {FMT_F, 5, 1}},      
110     {"PLT",     1, VAR_NUMERIC, {FMT_F, 5, 1}},       
111     {"PIN",     2, VAR_NUMERIC, {FMT_F, 5, 1}},       
112     {"POUT",    2, VAR_NUMERIC, {FMT_F, 5, 1}},       
113     {"FGT",     1, VAR_NUMERIC, {FMT_F, 5, 3}},       
114     {"FLT",     1, VAR_NUMERIC, {FMT_F, 5, 3}},       
115     {"FIN",     2, VAR_NUMERIC, {FMT_F, 5, 3}},       
116     {"FOUT",    2, VAR_NUMERIC, {FMT_F, 5, 3}},       
117     {"N",       0, VAR_NUMERIC, {FMT_F, 7, 0}},       
118     {"NU",      0, VAR_NUMERIC, {FMT_F, 7, 0}},       
119     {"NMISS",   0, VAR_NUMERIC, {FMT_F, 7, 0}},       
120     {"NUMISS",  0, VAR_NUMERIC, {FMT_F, 7, 0}},       
121     {"FIRST",   0, VAR_STRING,  {-1, -1, -1}}, 
122     {"LAST",    0, VAR_STRING,  {-1, -1, -1}},
123     {NULL,      0, -1,          {-1, -1, -1}},
124     {"N",       0, VAR_NUMERIC, {FMT_F, 7, 0}},
125     {"NU",      0, VAR_NUMERIC, {FMT_F, 7, 0}},
126   };
127
128 /* Missing value types. */
129 enum missing_treatment
130   {
131     ITEMWISE,           /* Missing values item by item. */
132     COLUMNWISE          /* Missing values column by column. */
133   };
134
135 /* An entire AGGREGATE procedure. */
136 struct agr_proc 
137   {
138     /* We have either an output file or a sink. */
139     struct any_writer *writer;          /* Output file, or null if none. */
140     struct case_sink *sink;             /* Sink, or null if none. */
141
142     /* Break variables. */
143     struct sort_criteria *sort;         /* Sort criteria. */
144     struct variable **break_vars;       /* Break variables. */
145     size_t break_var_cnt;               /* Number of break variables. */
146     struct ccase break_case;            /* Last values of break variables. */
147
148     enum missing_treatment missing;     /* How to treat missing values. */
149     struct agr_var *agr_vars;           /* First aggregate variable. */
150     struct dictionary *dict;            /* Aggregate dictionary. */
151     const struct dictionary *src_dict;  /* Dict of the source */
152     int case_cnt;                       /* Counts aggregated cases. */
153     struct ccase agr_case;              /* Aggregate case for output. */
154   };
155
156 static void initialize_aggregate_info (struct agr_proc *,
157                                        const struct ccase *);
158
159 /* Prototypes. */
160 static bool parse_aggregate_functions (struct lexer *, const struct dictionary *,
161                                        struct agr_proc *);
162 static void agr_destroy (struct agr_proc *);
163 static bool aggregate_single_case (struct agr_proc *agr,
164                                    const struct ccase *input,
165                                    struct ccase *output);
166 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
167 \f
168 /* Parsing. */
169
170 /* Parses and executes the AGGREGATE procedure. */
171 int
172 cmd_aggregate (struct lexer *lexer, struct dataset *ds)
173 {
174   struct dictionary *dict = dataset_dict (ds);
175   struct agr_proc agr;
176   struct file_handle *out_file = NULL;
177
178   bool copy_documents = false;
179   bool presorted = false;
180   bool saw_direction;
181
182   memset(&agr, 0 , sizeof (agr));
183   agr.missing = ITEMWISE;
184   case_nullify (&agr.break_case);
185
186   agr.dict = dict_create ();
187   agr.src_dict = dict;
188   dict_set_label (agr.dict, dict_get_label (dict));
189   dict_set_documents (agr.dict, dict_get_documents (dict));
190
191   /* OUTFILE subcommand must be first. */
192   if (!lex_force_match_id (lexer, "OUTFILE"))
193     goto error;
194   lex_match (lexer, '=');
195   if (!lex_match (lexer, '*'))
196     {
197       out_file = fh_parse (lexer, FH_REF_FILE | FH_REF_SCRATCH);
198       if (out_file == NULL)
199         goto error;
200     }
201   
202   /* Read most of the subcommands. */
203   for (;;)
204     {
205       lex_match (lexer, '/');
206       
207       if (lex_match_id (lexer, "MISSING"))
208         {
209           lex_match (lexer, '=');
210           if (!lex_match_id (lexer, "COLUMNWISE"))
211             {
212               lex_error (lexer, _("while expecting COLUMNWISE"));
213               goto error;
214             }
215           agr.missing = COLUMNWISE;
216         }
217       else if (lex_match_id (lexer, "DOCUMENT"))
218         copy_documents = true;
219       else if (lex_match_id (lexer, "PRESORTED"))
220         presorted = true;
221       else if (lex_match_id (lexer, "BREAK"))
222         {
223           int i;
224
225           lex_match (lexer, '=');
226           agr.sort = sort_parse_criteria (lexer, dict,
227                                           &agr.break_vars, &agr.break_var_cnt,
228                                           &saw_direction, NULL);
229           if (agr.sort == NULL)
230             goto error;
231           
232           for (i = 0; i < agr.break_var_cnt; i++)
233             dict_clone_var_assert (agr.dict, agr.break_vars[i],
234                                    var_get_name (agr.break_vars[i]));
235
236           /* BREAK must follow the options. */
237           break;
238         }
239       else
240         {
241           lex_error (lexer, _("expecting BREAK"));
242           goto error;
243         }
244     }
245   if (presorted && saw_direction)
246     msg (SW, _("When PRESORTED is specified, specifying sorting directions "
247                "with (A) or (D) has no effect.  Output data will be sorted "
248                "the same way as the input data."));
249       
250   /* Read in the aggregate functions. */
251   lex_match (lexer, '/');
252   if (!parse_aggregate_functions (lexer, dict, &agr))
253     goto error;
254
255   /* Delete documents. */
256   if (!copy_documents)
257     dict_set_documents (agr.dict, NULL);
258
259   /* Cancel SPLIT FILE. */
260   dict_set_split_vars (agr.dict, NULL, 0);
261   
262   /* Initialize. */
263   agr.case_cnt = 0;
264   case_create (&agr.agr_case, dict_get_next_value_idx (agr.dict));
265
266   /* Output to active file or external file? */
267   if (out_file == NULL) 
268     {
269       struct ccase *c;
270       
271       /* The active file will be replaced by the aggregated data,
272          so TEMPORARY is moot. */
273       proc_cancel_temporary_transformations (ds);
274
275       if (agr.sort != NULL && !presorted) 
276         {
277           if (!sort_active_file_in_place (ds, agr.sort))
278             goto error;
279         }
280
281       agr.sink = create_case_sink (&storage_sink_class, agr.dict,
282                                    dataset_get_casefile_factory (ds),
283                                    NULL);
284       if (agr.sink->class->open != NULL)
285         agr.sink->class->open (agr.sink);
286       proc_set_sink (ds, 
287                      create_case_sink (&null_sink_class, dict,
288                                        dataset_get_casefile_factory (ds),
289                                        NULL));
290       proc_open (ds);
291       while (proc_read (ds, &c))
292         if (aggregate_single_case (&agr, c, &agr.agr_case)) 
293           if (!agr.sink->class->write (agr.sink, &agr.agr_case)) 
294             {
295               proc_close (ds);
296               goto error; 
297             }
298       if (!proc_close (ds))
299         goto error;
300
301       if (agr.case_cnt > 0) 
302         {
303           dump_aggregate_info (&agr, &agr.agr_case);
304           if (!agr.sink->class->write (agr.sink, &agr.agr_case))
305             goto error;
306         }
307       discard_variables (ds);
308       dataset_set_dict (ds, agr.dict);
309       agr.dict = NULL;
310       proc_set_source (ds, agr.sink->class->make_source (agr.sink));
311       free_case_sink (agr.sink);
312     }
313   else
314     {
315       agr.writer = any_writer_open (out_file, agr.dict);
316       if (agr.writer == NULL)
317         goto error;
318       
319       if (agr.sort != NULL && !presorted) 
320         {
321           /* Sorting is needed. */
322           struct casefile *dst;
323           struct casereader *reader;
324           struct ccase c;
325           bool ok = true;
326           
327           dst = sort_active_file_to_casefile (ds, agr.sort);
328           if (dst == NULL)
329             goto error;
330           reader = casefile_get_destructive_reader (dst);
331           while (ok && casereader_read_xfer (reader, &c)) 
332             {
333               if (aggregate_single_case (&agr, &c, &agr.agr_case)) 
334                 ok = any_writer_write (agr.writer, &agr.agr_case);
335               case_destroy (&c);
336             }
337           casereader_destroy (reader);
338           if (ok)
339             ok = !casefile_error (dst);
340           casefile_destroy (dst);
341           if (!ok)
342             goto error;
343         }
344       else 
345         {
346           /* Active file is already sorted. */
347           struct ccase *c;
348           
349           proc_open (ds);
350           while (proc_read (ds, &c))
351             if (aggregate_single_case (&agr, c, &agr.agr_case)) 
352               if (!any_writer_write (agr.writer, &agr.agr_case)) 
353                 {
354                   proc_close (ds);
355                   goto error;
356                 }
357           if (!proc_close (ds))
358             goto error;
359         }
360       
361       if (agr.case_cnt > 0) 
362         {
363           dump_aggregate_info (&agr, &agr.agr_case);
364           any_writer_write (agr.writer, &agr.agr_case);
365         }
366       if (any_writer_error (agr.writer))
367         goto error;
368     }
369   
370   agr_destroy (&agr);
371   return CMD_SUCCESS;
372
373 error:
374   agr_destroy (&agr);
375   return CMD_CASCADING_FAILURE;
376 }
377
378 /* Parse all the aggregate functions. */
379 static bool
380 parse_aggregate_functions (struct lexer *lexer, const struct dictionary *dict, struct agr_proc *agr)
381 {
382   struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
383
384   /* Parse everything. */
385   tail = NULL;
386   for (;;)
387     {
388       char **dest;
389       char **dest_label;
390       size_t n_dest;
391       struct string function_name;
392
393       enum mv_class exclude;
394       const struct agr_func *function;
395       int func_index;
396
397       union agr_argument arg[2];
398
399       struct variable **src;
400       size_t n_src;
401
402       size_t i;
403
404       dest = NULL;
405       dest_label = NULL;
406       n_dest = 0;
407       src = NULL;
408       function = NULL;
409       n_src = 0;
410       arg[0].c = NULL;
411       arg[1].c = NULL;
412
413       /* Parse the list of target variables. */
414       while (!lex_match (lexer, '='))
415         {
416           size_t n_dest_prev = n_dest;
417           
418           if (!parse_DATA_LIST_vars (lexer, &dest, &n_dest,
419                                      PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
420             goto error;
421
422           /* Assign empty labels. */
423           {
424             int j;
425
426             dest_label = xnrealloc (dest_label, n_dest, sizeof *dest_label);
427             for (j = n_dest_prev; j < n_dest; j++)
428               dest_label[j] = NULL;
429           }
430
431
432           
433           if (lex_token (lexer) == T_STRING)
434             {
435               struct string label;
436               ds_init_string (&label, lex_tokstr (lexer));
437
438               ds_truncate (&label, 255);
439               dest_label[n_dest - 1] = ds_xstrdup (&label);
440               lex_get (lexer);
441               ds_destroy (&label);
442             }
443         }
444
445       /* Get the name of the aggregation function. */
446       if (lex_token (lexer) != T_ID)
447         {
448           lex_error (lexer, _("expecting aggregation function"));
449           goto error;
450         }
451
452       exclude = MV_ANY;
453
454       ds_init_string (&function_name, lex_tokstr (lexer));
455
456       ds_chomp (&function_name, '.');
457
458       if (lex_tokid(lexer)[strlen (lex_tokid (lexer)) - 1] == '.')
459         exclude = MV_SYSTEM;
460
461       for (function = agr_func_tab; function->name; function++)
462         if (!strcasecmp (function->name, ds_cstr (&function_name)))
463           break;
464       if (NULL == function->name)
465         {
466           msg (SE, _("Unknown aggregation function %s."), 
467                ds_cstr (&function_name));
468           goto error;
469         }
470       ds_destroy (&function_name);
471       func_index = function - agr_func_tab;
472       lex_get (lexer);
473
474       /* Check for leading lparen. */
475       if (!lex_match (lexer, '('))
476         {
477           if (func_index == N)
478             func_index = N_NO_VARS;
479           else if (func_index == NU)
480             func_index = NU_NO_VARS;
481           else
482             {
483               lex_error (lexer, _("expecting `('"));
484               goto error;
485             }
486         }
487       else
488         {
489           /* Parse list of source variables. */
490           {
491             int pv_opts = PV_NO_SCRATCH;
492
493             if (func_index == SUM || func_index == MEAN || func_index == SD)
494               pv_opts |= PV_NUMERIC;
495             else if (function->n_args)
496               pv_opts |= PV_SAME_TYPE;
497
498             if (!parse_variables (lexer, dict, &src, &n_src, pv_opts))
499               goto error;
500           }
501
502           /* Parse function arguments, for those functions that
503              require arguments. */
504           if (function->n_args != 0)
505             for (i = 0; i < function->n_args; i++)
506               {
507                 int type;
508             
509                 lex_match (lexer, ',');
510                 if (lex_token (lexer) == T_STRING)
511                   {
512                     arg[i].c = ds_xstrdup (lex_tokstr (lexer));
513                     type = VAR_STRING;
514                   }
515                 else if (lex_is_number (lexer))
516                   {
517                     arg[i].f = lex_tokval (lexer);
518                     type = VAR_NUMERIC;
519                   }
520                 else
521                   {
522                     msg (SE, _("Missing argument %d to %s."), i + 1,
523                          function->name);
524                     goto error;
525                   }
526             
527                 lex_get (lexer);
528
529                 if (type != var_get_type (src[0]))
530                   {
531                     msg (SE, _("Arguments to %s must be of same type as "
532                                "source variables."),
533                          function->name);
534                     goto error;
535                   }
536               }
537
538           /* Trailing rparen. */
539           if (!lex_match (lexer, ')'))
540             {
541               lex_error (lexer, _("expecting `)'"));
542               goto error;
543             }
544           
545           /* Now check that the number of source variables match
546              the number of target variables.  If we check earlier
547              than this, the user can get very misleading error
548              message, i.e. `AGGREGATE x=SUM(y t).' will get this
549              error message when a proper message would be more
550              like `unknown variable t'. */
551           if (n_src != n_dest)
552             {
553               msg (SE, _("Number of source variables (%u) does not match "
554                          "number of target variables (%u)."),
555                    (unsigned) n_src, (unsigned) n_dest);
556               goto error;
557             }
558
559           if ((func_index == PIN || func_index == POUT
560               || func_index == FIN || func_index == FOUT) 
561               && (var_is_numeric (src[0])
562                   ? arg[0].f > arg[1].f
563                   : str_compare_rpad (arg[0].c, arg[1].c) > 0))
564             {
565               union agr_argument t = arg[0];
566               arg[0] = arg[1];
567               arg[1] = t;
568                   
569               msg (SW, _("The value arguments passed to the %s function "
570                          "are out-of-order.  They will be treated as if "
571                          "they had been specified in the correct order."),
572                    function->name);
573             }
574         }
575         
576       /* Finally add these to the linked list of aggregation
577          variables. */
578       for (i = 0; i < n_dest; i++)
579         {
580           struct agr_var *v = xmalloc (sizeof *v);
581
582           /* Add variable to chain. */
583           if (agr->agr_vars != NULL)
584             tail->next = v;
585           else
586             agr->agr_vars = v;
587           tail = v;
588           tail->next = NULL;
589           v->moments = NULL;
590           
591           /* Create the target variable in the aggregate
592              dictionary. */
593           {
594             struct variable *destvar;
595             
596             v->function = func_index;
597
598             if (src)
599               {
600                 v->src = src[i];
601                 
602                 if (var_is_alpha (src[i]))
603                   {
604                     v->function |= FSTRING;
605                     v->string = xmalloc (var_get_width (src[i]));
606                   }
607
608                 if (function->alpha_type == VAR_STRING)
609                   destvar = dict_clone_var (agr->dict, v->src, dest[i]);
610                 else
611                   {
612                     assert (var_is_numeric (v->src)
613                             || function->alpha_type == VAR_NUMERIC);
614                     destvar = dict_create_var (agr->dict, dest[i], 0);
615                     if (destvar != NULL) 
616                       {
617                         struct fmt_spec f;
618                         if ((func_index == N || func_index == NMISS)
619                             && dict_get_weight (dict) != NULL)
620                           f = fmt_for_output (FMT_F, 8, 2); 
621                         else
622                           f = function->format;
623                         var_set_both_formats (destvar, &f);
624                       }
625                   }
626               } else {
627                 struct fmt_spec f;
628                 v->src = NULL;
629                 destvar = dict_create_var (agr->dict, dest[i], 0);
630                 if (func_index == N_NO_VARS && dict_get_weight (dict) != NULL)
631                   f = fmt_for_output (FMT_F, 8, 2); 
632                 else
633                   f = function->format;
634                 var_set_both_formats (destvar, &f);
635               }
636           
637             if (!destvar)
638               {
639                 msg (SE, _("Variable name %s is not unique within the "
640                            "aggregate file dictionary, which contains "
641                            "the aggregate variables and the break "
642                            "variables."),
643                      dest[i]);
644                 goto error;
645               }
646
647             free (dest[i]);
648             if (dest_label[i])
649               var_set_label (destvar, dest_label[i]);
650
651             v->dest = destvar;
652           }
653           
654           v->exclude = exclude;
655
656           if (v->src != NULL)
657             {
658               int j;
659
660               if (var_is_numeric (v->src))
661                 for (j = 0; j < function->n_args; j++)
662                   v->arg[j].f = arg[j].f;
663               else
664                 for (j = 0; j < function->n_args; j++)
665                   v->arg[j].c = xstrdup (arg[j].c);
666             }
667         }
668       
669       if (src != NULL && var_is_alpha (src[0]))
670         for (i = 0; i < function->n_args; i++)
671           {
672             free (arg[i].c);
673             arg[i].c = NULL;
674           }
675
676       free (src);
677       free (dest);
678       free (dest_label);
679
680       if (!lex_match (lexer, '/'))
681         {
682           if (lex_token (lexer) == '.')
683             return true;
684
685           lex_error (lexer, "expecting end of command");
686           return false;
687         }
688       continue;
689       
690     error:
691       ds_destroy (&function_name);
692       for (i = 0; i < n_dest; i++)
693         {
694           free (dest[i]);
695           free (dest_label[i]);
696         }
697       free (dest);
698       free (dest_label);
699       free (arg[0].c);
700       free (arg[1].c);
701       if (src && n_src && var_is_alpha (src[0]))
702         for (i = 0; i < function->n_args; i++)
703           {
704             free (arg[i].c);
705             arg[i].c = NULL;
706           }
707       free (src);
708         
709       return false;
710     }
711 }
712
713 /* Destroys AGR. */
714 static void
715 agr_destroy (struct agr_proc *agr)
716 {
717   struct agr_var *iter, *next;
718
719   any_writer_close (agr->writer);
720   if (agr->sort != NULL)
721     sort_destroy_criteria (agr->sort);
722   free (agr->break_vars);
723   case_destroy (&agr->break_case);
724   for (iter = agr->agr_vars; iter; iter = next)
725     {
726       next = iter->next;
727
728       if (iter->function & FSTRING)
729         {
730           size_t n_args;
731           size_t i;
732
733           n_args = agr_func_tab[iter->function & FUNC].n_args;
734           for (i = 0; i < n_args; i++)
735             free (iter->arg[i].c);
736           free (iter->string);
737         }
738       else if (iter->function == SD)
739         moments1_destroy (iter->moments);
740       free (iter);
741     }
742   if (agr->dict != NULL)
743     dict_destroy (agr->dict);
744
745   case_destroy (&agr->agr_case);
746 }
747 \f
748 /* Execution. */
749
750 static void accumulate_aggregate_info (struct agr_proc *,
751                                        const struct ccase *);
752 static void dump_aggregate_info (struct agr_proc *, struct ccase *);
753
754 /* Processes a single case INPUT for aggregation.  If output is
755    warranted, writes it to OUTPUT and returns true.
756    Otherwise, returns false and OUTPUT is unmodified. */
757 static bool
758 aggregate_single_case (struct agr_proc *agr,
759                        const struct ccase *input, struct ccase *output)
760 {
761   bool finished_group = false;
762   
763   if (agr->case_cnt++ == 0)
764     initialize_aggregate_info (agr, input);
765   else if (case_compare (&agr->break_case, input,
766                          agr->break_vars, agr->break_var_cnt))
767     {
768       dump_aggregate_info (agr, output);
769       finished_group = true;
770
771       initialize_aggregate_info (agr, input);
772     }
773
774   accumulate_aggregate_info (agr, input);
775   return finished_group;
776 }
777
778 /* Accumulates aggregation data from the case INPUT. */
779 static void 
780 accumulate_aggregate_info (struct agr_proc *agr,
781                            const struct ccase *input)
782 {
783   struct agr_var *iter;
784   double weight;
785   bool bad_warn = true;
786
787   weight = dict_get_case_weight (agr->src_dict, input, &bad_warn);
788
789   for (iter = agr->agr_vars; iter; iter = iter->next)
790     if (iter->src)
791       {
792         const union value *v = case_data (input, iter->src);
793         int src_width = var_get_width (iter->src);
794
795         if (var_is_value_missing (iter->src, v, iter->exclude))
796           {
797             switch (iter->function)
798               {
799               case NMISS:
800               case NMISS | FSTRING:
801                 iter->dbl[0] += weight;
802                 break;
803               case NUMISS:
804               case NUMISS | FSTRING:
805                 iter->int1++;
806                 break;
807               }
808             iter->saw_missing = true;
809             continue;
810           }
811         
812         /* This is horrible.  There are too many possibilities. */
813         switch (iter->function)
814           {
815           case SUM:
816             iter->dbl[0] += v->f * weight;
817             iter->int1 = 1;
818             break;
819           case MEAN:
820             iter->dbl[0] += v->f * weight;
821             iter->dbl[1] += weight;
822             break;
823           case SD:
824             moments1_add (iter->moments, v->f, weight);
825             break;
826           case MAX:
827             iter->dbl[0] = MAX (iter->dbl[0], v->f);
828             iter->int1 = 1;
829             break;
830           case MAX | FSTRING:
831             if (memcmp (iter->string, v->s, src_width) < 0)
832               memcpy (iter->string, v->s, src_width);
833             iter->int1 = 1;
834             break;
835           case MIN:
836             iter->dbl[0] = MIN (iter->dbl[0], v->f);
837             iter->int1 = 1;
838             break;
839           case MIN | FSTRING:
840             if (memcmp (iter->string, v->s, src_width) > 0)
841               memcpy (iter->string, v->s, src_width);
842             iter->int1 = 1;
843             break;
844           case FGT:
845           case PGT:
846             if (v->f > iter->arg[0].f)
847               iter->dbl[0] += weight;
848             iter->dbl[1] += weight;
849             break;
850           case FGT | FSTRING:
851           case PGT | FSTRING:
852             if (memcmp (iter->arg[0].c, v->s, src_width) < 0)
853               iter->dbl[0] += weight;
854             iter->dbl[1] += weight;
855             break;
856           case FLT:
857           case PLT:
858             if (v->f < iter->arg[0].f)
859               iter->dbl[0] += weight;
860             iter->dbl[1] += weight;
861             break;
862           case FLT | FSTRING:
863           case PLT | FSTRING:
864             if (memcmp (iter->arg[0].c, v->s, src_width) > 0)
865               iter->dbl[0] += weight;
866             iter->dbl[1] += weight;
867             break;
868           case FIN:
869           case PIN:
870             if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
871               iter->dbl[0] += weight;
872             iter->dbl[1] += weight;
873             break;
874           case FIN | FSTRING:
875           case PIN | FSTRING:
876             if (memcmp (iter->arg[0].c, v->s, src_width) <= 0
877                 && memcmp (iter->arg[1].c, v->s, src_width) >= 0)
878               iter->dbl[0] += weight;
879             iter->dbl[1] += weight;
880             break;
881           case FOUT:
882           case POUT:
883             if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
884               iter->dbl[0] += weight;
885             iter->dbl[1] += weight;
886             break;
887           case FOUT | FSTRING:
888           case POUT | FSTRING:
889             if (memcmp (iter->arg[0].c, v->s, src_width) > 0
890                 || memcmp (iter->arg[1].c, v->s, src_width) < 0)
891               iter->dbl[0] += weight;
892             iter->dbl[1] += weight;
893             break;
894           case N:
895           case N | FSTRING:
896             iter->dbl[0] += weight;
897             break;
898           case NU:
899           case NU | FSTRING:
900             iter->int1++;
901             break;
902           case FIRST:
903             if (iter->int1 == 0)
904               {
905                 iter->dbl[0] = v->f;
906                 iter->int1 = 1;
907               }
908             break;
909           case FIRST | FSTRING:
910             if (iter->int1 == 0)
911               {
912                 memcpy (iter->string, v->s, src_width);
913                 iter->int1 = 1;
914               }
915             break;
916           case LAST:
917             iter->dbl[0] = v->f;
918             iter->int1 = 1;
919             break;
920           case LAST | FSTRING:
921             memcpy (iter->string, v->s, src_width);
922             iter->int1 = 1;
923             break;
924           case NMISS:
925           case NMISS | FSTRING:
926           case NUMISS:
927           case NUMISS | FSTRING:
928             /* Our value is not missing or it would have been
929                caught earlier.  Nothing to do. */
930             break;
931           default:
932             NOT_REACHED ();
933           }
934     } else {
935       switch (iter->function)
936         {
937         case N_NO_VARS:
938           iter->dbl[0] += weight;
939           break;
940         case NU_NO_VARS:
941           iter->int1++;
942           break;
943         default:
944           NOT_REACHED ();
945         }
946     }
947 }
948
949 /* We've come to a record that differs from the previous in one or
950    more of the break variables.  Make an output record from the
951    accumulated statistics in the OUTPUT case. */
952 static void 
953 dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
954 {
955   {
956     int value_idx = 0;
957     int i;
958
959     for (i = 0; i < agr->break_var_cnt; i++) 
960       {
961         struct variable *v = agr->break_vars[i];
962         size_t value_cnt = var_get_value_cnt (v);
963         memcpy (case_data_rw_idx (output, value_idx),
964                 case_data (&agr->break_case, v),
965                 sizeof (union value) * value_cnt);
966         value_idx += value_cnt; 
967       }
968   }
969   
970   {
971     struct agr_var *i;
972   
973     for (i = agr->agr_vars; i; i = i->next)
974       {
975         union value *v = case_data_rw (output, i->dest);
976
977         if (agr->missing == COLUMNWISE && i->saw_missing
978             && (i->function & FUNC) != N && (i->function & FUNC) != NU
979             && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
980           {
981             if (var_is_alpha (i->dest))
982               memset (v->s, ' ', var_get_width (i->dest));
983             else
984               v->f = SYSMIS;
985             continue;
986           }
987         
988         switch (i->function)
989           {
990           case SUM:
991             v->f = i->int1 ? i->dbl[0] : SYSMIS;
992             break;
993           case MEAN:
994             v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
995             break;
996           case SD:
997             {
998               double variance;
999
1000               /* FIXME: we should use two passes. */
1001               moments1_calculate (i->moments, NULL, NULL, &variance,
1002                                  NULL, NULL);
1003               if (variance != SYSMIS)
1004                 v->f = sqrt (variance);
1005               else
1006                 v->f = SYSMIS; 
1007             }
1008             break;
1009           case MAX:
1010           case MIN:
1011             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1012             break;
1013           case MAX | FSTRING:
1014           case MIN | FSTRING:
1015             if (i->int1)
1016               memcpy (v->s, i->string, var_get_width (i->dest));
1017             else
1018               memset (v->s, ' ', var_get_width (i->dest));
1019             break;
1020           case FGT:
1021           case FGT | FSTRING:
1022           case FLT:
1023           case FLT | FSTRING:
1024           case FIN:
1025           case FIN | FSTRING:
1026           case FOUT:
1027           case FOUT | FSTRING:
1028             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
1029             break;
1030           case PGT:
1031           case PGT | FSTRING:
1032           case PLT:
1033           case PLT | FSTRING:
1034           case PIN:
1035           case PIN | FSTRING:
1036           case POUT:
1037           case POUT | FSTRING:
1038             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
1039             break;
1040           case N:
1041           case N | FSTRING:
1042             v->f = i->dbl[0];
1043             break;
1044           case NU:
1045           case NU | FSTRING:
1046             v->f = i->int1;
1047             break;
1048           case FIRST:
1049           case LAST:
1050             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1051             break;
1052           case FIRST | FSTRING:
1053           case LAST | FSTRING:
1054             if (i->int1)
1055               memcpy (v->s, i->string, var_get_width (i->dest));
1056             else
1057               memset (v->s, ' ', var_get_width (i->dest));
1058             break;
1059           case N_NO_VARS:
1060             v->f = i->dbl[0];
1061             break;
1062           case NU_NO_VARS:
1063             v->f = i->int1;
1064             break;
1065           case NMISS:
1066           case NMISS | FSTRING:
1067             v->f = i->dbl[0];
1068             break;
1069           case NUMISS:
1070           case NUMISS | FSTRING:
1071             v->f = i->int1;
1072             break;
1073           default:
1074             NOT_REACHED ();
1075           }
1076       }
1077   }
1078 }
1079
1080 /* Resets the state for all the aggregate functions. */
1081 static void
1082 initialize_aggregate_info (struct agr_proc *agr, const struct ccase *input)
1083 {
1084   struct agr_var *iter;
1085
1086   case_destroy (&agr->break_case);
1087   case_clone (&agr->break_case, input);
1088
1089   for (iter = agr->agr_vars; iter; iter = iter->next)
1090     {
1091       iter->saw_missing = false;
1092       iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1093       iter->int1 = iter->int2 = 0;
1094       switch (iter->function)
1095         {
1096         case MIN:
1097           iter->dbl[0] = DBL_MAX;
1098           break;
1099         case MIN | FSTRING:
1100           memset (iter->string, 255, var_get_width (iter->src));
1101           break;
1102         case MAX:
1103           iter->dbl[0] = -DBL_MAX;
1104           break;
1105         case MAX | FSTRING:
1106           memset (iter->string, 0, var_get_width (iter->src));
1107           break;
1108         case SD:
1109           if (iter->moments == NULL)
1110             iter->moments = moments1_create (MOMENT_VARIANCE);
1111           else
1112             moments1_clear (iter->moments);
1113           break;
1114         default:
1115           break;
1116         }
1117     }
1118 }