79d90fdd9d9fb58b6e18e0cd9c5de7a539705417
[pspp-builds.git] / src / language / stats / aggregate.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
3
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License as
6    published by the Free Software Foundation; either version 2 of the
7    License, or (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful, but
10    WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12    General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
17    02110-1301, USA. */
18
19 #include <config.h>
20
21 #include <stdlib.h>
22
23 #include <data/any-writer.h>
24 #include <data/case-sink.h>
25 #include <data/case.h>
26 #include <data/casefile.h>
27 #include <data/dictionary.h>
28 #include <data/file-handle-def.h>
29 #include <data/format.h>
30 #include <data/procedure.h>
31 #include <data/settings.h>
32 #include <data/storage-stream.h>
33 #include <data/sys-file-writer.h>
34 #include <data/variable.h>
35 #include <language/command.h>
36 #include <language/data-io/file-handle.h>
37 #include <language/lexer/lexer.h>
38 #include <language/lexer/variable-parser.h>
39 #include <language/stats/sort-criteria.h>
40 #include <libpspp/alloc.h>
41 #include <libpspp/assertion.h>
42 #include <libpspp/message.h>
43 #include <libpspp/misc.h>
44 #include <libpspp/pool.h>
45 #include <libpspp/str.h>
46 #include <math/moments.h>
47 #include <math/sort.h>
48
49 #include "minmax.h"
50
51 #include "gettext.h"
52 #define _(msgid) gettext (msgid)
53
54 /* Argument for AGGREGATE function. */
55 union agr_argument
56   {
57     double f;                           /* Numeric. */
58     char *c;                            /* Short or long string. */
59   };
60
61 /* Specifies how to make an aggregate variable. */
62 struct agr_var
63   {
64     struct agr_var *next;               /* Next in list. */
65
66     /* Collected during parsing. */
67     struct variable *src;       /* Source variable. */
68     struct variable *dest;      /* Target variable. */
69     int function;               /* Function. */
70     int include_missing;        /* 1=Include user-missing values. */
71     union agr_argument arg[2];  /* Arguments. */
72
73     /* Accumulated during AGGREGATE execution. */
74     double dbl[3];
75     int int1, int2;
76     char *string;
77     int missing;
78     struct moments1 *moments;
79   };
80
81 /* Aggregation functions. */
82 enum
83   {
84     NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
85     FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
86     N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
87     FUNC = 0x1f, /* Function mask. */
88     FSTRING = 1<<5, /* String function bit. */
89   };
90
91 /* Attributes of an aggregation function. */
92 struct agr_func
93   {
94     const char *name;           /* Aggregation function name. */
95     size_t n_args;              /* Number of arguments. */
96     enum var_type alpha_type;   /* When given ALPHA arguments, output type. */
97     struct fmt_spec format;     /* Format spec if alpha_type != ALPHA. */
98   };
99
100 /* Attributes of aggregation functions. */
101 static const struct agr_func agr_func_tab[] = 
102   {
103     {"<NONE>",  0, -1,          {0, 0, 0}},
104     {"SUM",     0, -1,          {FMT_F, 8, 2}},
105     {"MEAN",    0, -1,          {FMT_F, 8, 2}},
106     {"SD",      0, -1,          {FMT_F, 8, 2}},
107     {"MAX",     0, VAR_STRING,  {-1, -1, -1}}, 
108     {"MIN",     0, VAR_STRING,  {-1, -1, -1}}, 
109     {"PGT",     1, VAR_NUMERIC, {FMT_F, 5, 1}},      
110     {"PLT",     1, VAR_NUMERIC, {FMT_F, 5, 1}},       
111     {"PIN",     2, VAR_NUMERIC, {FMT_F, 5, 1}},       
112     {"POUT",    2, VAR_NUMERIC, {FMT_F, 5, 1}},       
113     {"FGT",     1, VAR_NUMERIC, {FMT_F, 5, 3}},       
114     {"FLT",     1, VAR_NUMERIC, {FMT_F, 5, 3}},       
115     {"FIN",     2, VAR_NUMERIC, {FMT_F, 5, 3}},       
116     {"FOUT",    2, VAR_NUMERIC, {FMT_F, 5, 3}},       
117     {"N",       0, VAR_NUMERIC, {FMT_F, 7, 0}},       
118     {"NU",      0, VAR_NUMERIC, {FMT_F, 7, 0}},       
119     {"NMISS",   0, VAR_NUMERIC, {FMT_F, 7, 0}},       
120     {"NUMISS",  0, VAR_NUMERIC, {FMT_F, 7, 0}},       
121     {"FIRST",   0, VAR_STRING,  {-1, -1, -1}}, 
122     {"LAST",    0, VAR_STRING,  {-1, -1, -1}},
123     {NULL,      0, -1,          {-1, -1, -1}},
124     {"N",       0, VAR_NUMERIC, {FMT_F, 7, 0}},
125     {"NU",      0, VAR_NUMERIC, {FMT_F, 7, 0}},
126   };
127
128 /* Missing value types. */
129 enum missing_treatment
130   {
131     ITEMWISE,           /* Missing values item by item. */
132     COLUMNWISE          /* Missing values column by column. */
133   };
134
135 /* An entire AGGREGATE procedure. */
136 struct agr_proc 
137   {
138     /* We have either an output file or a sink. */
139     struct any_writer *writer;          /* Output file, or null if none. */
140     struct case_sink *sink;             /* Sink, or null if none. */
141
142     /* Break variables. */
143     struct sort_criteria *sort;         /* Sort criteria. */
144     struct variable **break_vars;       /* Break variables. */
145     size_t break_var_cnt;               /* Number of break variables. */
146     struct ccase break_case;            /* Last values of break variables. */
147
148     enum missing_treatment missing;     /* How to treat missing values. */
149     struct agr_var *agr_vars;           /* First aggregate variable. */
150     struct dictionary *dict;            /* Aggregate dictionary. */
151     const struct dictionary *src_dict;  /* Dict of the source */
152     int case_cnt;                       /* Counts aggregated cases. */
153     struct ccase agr_case;              /* Aggregate case for output. */
154   };
155
156 static void initialize_aggregate_info (struct agr_proc *,
157                                        const struct ccase *);
158
159 /* Prototypes. */
160 static bool parse_aggregate_functions (struct lexer *, const struct dictionary *,
161                                        struct agr_proc *);
162 static void agr_destroy (struct agr_proc *);
163 static bool aggregate_single_case (struct agr_proc *agr,
164                                    const struct ccase *input,
165                                    struct ccase *output);
166 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
167 \f
168 /* Parsing. */
169
170 /* Parses and executes the AGGREGATE procedure. */
171 int
172 cmd_aggregate (struct lexer *lexer, struct dataset *ds)
173 {
174   struct dictionary *dict = dataset_dict (ds);
175   struct agr_proc agr;
176   struct file_handle *out_file = NULL;
177
178   bool copy_documents = false;
179   bool presorted = false;
180   bool saw_direction;
181
182   memset(&agr, 0 , sizeof (agr));
183   agr.missing = ITEMWISE;
184   case_nullify (&agr.break_case);
185
186   agr.dict = dict_create ();
187   agr.src_dict = dict;
188   dict_set_label (agr.dict, dict_get_label (dict));
189   dict_set_documents (agr.dict, dict_get_documents (dict));
190
191   /* OUTFILE subcommand must be first. */
192   if (!lex_force_match_id (lexer, "OUTFILE"))
193     goto error;
194   lex_match (lexer, '=');
195   if (!lex_match (lexer, '*'))
196     {
197       out_file = fh_parse (lexer, FH_REF_FILE | FH_REF_SCRATCH);
198       if (out_file == NULL)
199         goto error;
200     }
201   
202   /* Read most of the subcommands. */
203   for (;;)
204     {
205       lex_match (lexer, '/');
206       
207       if (lex_match_id (lexer, "MISSING"))
208         {
209           lex_match (lexer, '=');
210           if (!lex_match_id (lexer, "COLUMNWISE"))
211             {
212               lex_error (lexer, _("while expecting COLUMNWISE"));
213               goto error;
214             }
215           agr.missing = COLUMNWISE;
216         }
217       else if (lex_match_id (lexer, "DOCUMENT"))
218         copy_documents = true;
219       else if (lex_match_id (lexer, "PRESORTED"))
220         presorted = true;
221       else if (lex_match_id (lexer, "BREAK"))
222         {
223           int i;
224
225           lex_match (lexer, '=');
226           agr.sort = sort_parse_criteria (lexer, dict,
227                                           &agr.break_vars, &agr.break_var_cnt,
228                                           &saw_direction, NULL);
229           if (agr.sort == NULL)
230             goto error;
231           
232           for (i = 0; i < agr.break_var_cnt; i++)
233             dict_clone_var_assert (agr.dict, agr.break_vars[i],
234                                    var_get_name (agr.break_vars[i]));
235
236           /* BREAK must follow the options. */
237           break;
238         }
239       else
240         {
241           lex_error (lexer, _("expecting BREAK"));
242           goto error;
243         }
244     }
245   if (presorted && saw_direction)
246     msg (SW, _("When PRESORTED is specified, specifying sorting directions "
247                "with (A) or (D) has no effect.  Output data will be sorted "
248                "the same way as the input data."));
249       
250   /* Read in the aggregate functions. */
251   lex_match (lexer, '/');
252   if (!parse_aggregate_functions (lexer, dict, &agr))
253     goto error;
254
255   /* Delete documents. */
256   if (!copy_documents)
257     dict_set_documents (agr.dict, NULL);
258
259   /* Cancel SPLIT FILE. */
260   dict_set_split_vars (agr.dict, NULL, 0);
261   
262   /* Initialize. */
263   agr.case_cnt = 0;
264   case_create (&agr.agr_case, dict_get_next_value_idx (agr.dict));
265
266   /* Output to active file or external file? */
267   if (out_file == NULL) 
268     {
269       struct ccase *c;
270       
271       /* The active file will be replaced by the aggregated data,
272          so TEMPORARY is moot. */
273       proc_cancel_temporary_transformations (ds);
274
275       if (agr.sort != NULL && !presorted) 
276         {
277           if (!sort_active_file_in_place (ds, agr.sort))
278             goto error;
279         }
280
281       agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
282       if (agr.sink->class->open != NULL)
283         agr.sink->class->open (agr.sink);
284       proc_set_sink (ds, 
285                      create_case_sink (&null_sink_class, dict, NULL));
286       proc_open (ds);
287       while (proc_read (ds, &c))
288         if (aggregate_single_case (&agr, c, &agr.agr_case)) 
289           if (!agr.sink->class->write (agr.sink, &agr.agr_case)) 
290             {
291               proc_close (ds);
292               goto error; 
293             }
294       if (!proc_close (ds))
295         goto error;
296       if (agr.case_cnt > 0) 
297         {
298           dump_aggregate_info (&agr, &agr.agr_case);
299           if (!agr.sink->class->write (agr.sink, &agr.agr_case))
300             goto error;
301         }
302       discard_variables (ds);
303       dict_destroy (dict);
304       dataset_set_dict (ds, agr.dict);
305       agr.dict = NULL;
306       proc_set_source (ds, agr.sink->class->make_source (agr.sink));
307       free_case_sink (agr.sink);
308     }
309   else
310     {
311       agr.writer = any_writer_open (out_file, agr.dict);
312       if (agr.writer == NULL)
313         goto error;
314       
315       if (agr.sort != NULL && !presorted) 
316         {
317           /* Sorting is needed. */
318           struct casefile *dst;
319           struct casereader *reader;
320           struct ccase c;
321           bool ok = true;
322           
323           dst = sort_active_file_to_casefile (ds, agr.sort);
324           if (dst == NULL)
325             goto error;
326           reader = casefile_get_destructive_reader (dst);
327           while (ok && casereader_read_xfer (reader, &c)) 
328             {
329               if (aggregate_single_case (&agr, &c, &agr.agr_case)) 
330                 ok = any_writer_write (agr.writer, &agr.agr_case);
331               case_destroy (&c);
332             }
333           casereader_destroy (reader);
334           if (ok)
335             ok = !casefile_error (dst);
336           casefile_destroy (dst);
337           if (!ok)
338             goto error;
339         }
340       else 
341         {
342           /* Active file is already sorted. */
343           struct ccase *c;
344           
345           proc_open (ds);
346           while (proc_read (ds, &c))
347             if (aggregate_single_case (&agr, c, &agr.agr_case)) 
348               if (!any_writer_write (agr.writer, &agr.agr_case)) 
349                 {
350                   proc_close (ds);
351                   goto error;
352                 }
353           if (!proc_close (ds))
354             goto error;
355         }
356       
357       if (agr.case_cnt > 0) 
358         {
359           dump_aggregate_info (&agr, &agr.agr_case);
360           any_writer_write (agr.writer, &agr.agr_case);
361         }
362       if (any_writer_error (agr.writer))
363         goto error;
364     }
365   
366   agr_destroy (&agr);
367   return CMD_SUCCESS;
368
369 error:
370   agr_destroy (&agr);
371   return CMD_CASCADING_FAILURE;
372 }
373
374 /* Parse all the aggregate functions. */
375 static bool
376 parse_aggregate_functions (struct lexer *lexer, const struct dictionary *dict, struct agr_proc *agr)
377 {
378   struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
379
380   /* Parse everything. */
381   tail = NULL;
382   for (;;)
383     {
384       char **dest;
385       char **dest_label;
386       size_t n_dest;
387       struct string function_name;
388
389       int include_missing;
390       const struct agr_func *function;
391       int func_index;
392
393       union agr_argument arg[2];
394
395       struct variable **src;
396       size_t n_src;
397
398       size_t i;
399
400       dest = NULL;
401       dest_label = NULL;
402       n_dest = 0;
403       src = NULL;
404       function = NULL;
405       n_src = 0;
406       arg[0].c = NULL;
407       arg[1].c = NULL;
408
409       /* Parse the list of target variables. */
410       while (!lex_match (lexer, '='))
411         {
412           size_t n_dest_prev = n_dest;
413           
414           if (!parse_DATA_LIST_vars (lexer, &dest, &n_dest,
415                                      PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
416             goto error;
417
418           /* Assign empty labels. */
419           {
420             int j;
421
422             dest_label = xnrealloc (dest_label, n_dest, sizeof *dest_label);
423             for (j = n_dest_prev; j < n_dest; j++)
424               dest_label[j] = NULL;
425           }
426
427
428           
429           if (lex_token (lexer) == T_STRING)
430             {
431               struct string label;
432               ds_init_string (&label, lex_tokstr (lexer));
433
434               ds_truncate (&label, 255);
435               dest_label[n_dest - 1] = ds_xstrdup (&label);
436               lex_get (lexer);
437               ds_destroy (&label);
438             }
439         }
440
441       /* Get the name of the aggregation function. */
442       if (lex_token (lexer) != T_ID)
443         {
444           lex_error (lexer, _("expecting aggregation function"));
445           goto error;
446         }
447
448       include_missing = 0;
449
450       ds_init_string (&function_name, lex_tokstr (lexer));
451
452       ds_chomp (&function_name, '.');
453
454       if (lex_tokid(lexer)[strlen (lex_tokid (lexer)) - 1] == '.')
455           include_missing = 1;
456
457       for (function = agr_func_tab; function->name; function++)
458         if (!strcasecmp (function->name, ds_cstr (&function_name)))
459           break;
460       if (NULL == function->name)
461         {
462           msg (SE, _("Unknown aggregation function %s."), 
463                ds_cstr (&function_name));
464           goto error;
465         }
466       ds_destroy (&function_name);
467       func_index = function - agr_func_tab;
468       lex_get (lexer);
469
470       /* Check for leading lparen. */
471       if (!lex_match (lexer, '('))
472         {
473           if (func_index == N)
474             func_index = N_NO_VARS;
475           else if (func_index == NU)
476             func_index = NU_NO_VARS;
477           else
478             {
479               lex_error (lexer, _("expecting `('"));
480               goto error;
481             }
482         }
483       else
484         {
485           /* Parse list of source variables. */
486           {
487             int pv_opts = PV_NO_SCRATCH;
488
489             if (func_index == SUM || func_index == MEAN || func_index == SD)
490               pv_opts |= PV_NUMERIC;
491             else if (function->n_args)
492               pv_opts |= PV_SAME_TYPE;
493
494             if (!parse_variables (lexer, dict, &src, &n_src, pv_opts))
495               goto error;
496           }
497
498           /* Parse function arguments, for those functions that
499              require arguments. */
500           if (function->n_args != 0)
501             for (i = 0; i < function->n_args; i++)
502               {
503                 int type;
504             
505                 lex_match (lexer, ',');
506                 if (lex_token (lexer) == T_STRING)
507                   {
508                     arg[i].c = ds_xstrdup (lex_tokstr (lexer));
509                     type = VAR_STRING;
510                   }
511                 else if (lex_is_number (lexer))
512                   {
513                     arg[i].f = lex_tokval (lexer);
514                     type = VAR_NUMERIC;
515                   }
516                 else
517                   {
518                     msg (SE, _("Missing argument %d to %s."), i + 1,
519                          function->name);
520                     goto error;
521                   }
522             
523                 lex_get (lexer);
524
525                 if (type != var_get_type (src[0]))
526                   {
527                     msg (SE, _("Arguments to %s must be of same type as "
528                                "source variables."),
529                          function->name);
530                     goto error;
531                   }
532               }
533
534           /* Trailing rparen. */
535           if (!lex_match (lexer, ')'))
536             {
537               lex_error (lexer, _("expecting `)'"));
538               goto error;
539             }
540           
541           /* Now check that the number of source variables match
542              the number of target variables.  If we check earlier
543              than this, the user can get very misleading error
544              message, i.e. `AGGREGATE x=SUM(y t).' will get this
545              error message when a proper message would be more
546              like `unknown variable t'. */
547           if (n_src != n_dest)
548             {
549               msg (SE, _("Number of source variables (%u) does not match "
550                          "number of target variables (%u)."),
551                    (unsigned) n_src, (unsigned) n_dest);
552               goto error;
553             }
554
555           if ((func_index == PIN || func_index == POUT
556               || func_index == FIN || func_index == FOUT) 
557               && (var_is_numeric (src[0])
558                   ? arg[0].f > arg[1].f
559                   : str_compare_rpad (arg[0].c, arg[1].c) > 0))
560             {
561               union agr_argument t = arg[0];
562               arg[0] = arg[1];
563               arg[1] = t;
564                   
565               msg (SW, _("The value arguments passed to the %s function "
566                          "are out-of-order.  They will be treated as if "
567                          "they had been specified in the correct order."),
568                    function->name);
569             }
570         }
571         
572       /* Finally add these to the linked list of aggregation
573          variables. */
574       for (i = 0; i < n_dest; i++)
575         {
576           struct agr_var *v = xmalloc (sizeof *v);
577
578           /* Add variable to chain. */
579           if (agr->agr_vars != NULL)
580             tail->next = v;
581           else
582             agr->agr_vars = v;
583           tail = v;
584           tail->next = NULL;
585           v->moments = NULL;
586           
587           /* Create the target variable in the aggregate
588              dictionary. */
589           {
590             struct variable *destvar;
591             
592             v->function = func_index;
593
594             if (src)
595               {
596                 v->src = src[i];
597                 
598                 if (var_is_alpha (src[i]))
599                   {
600                     v->function |= FSTRING;
601                     v->string = xmalloc (var_get_width (src[i]));
602                   }
603
604                 if (function->alpha_type == VAR_STRING)
605                   destvar = dict_clone_var (agr->dict, v->src, dest[i]);
606                 else
607                   {
608                     assert (var_is_numeric (v->src)
609                             || function->alpha_type == VAR_NUMERIC);
610                     destvar = dict_create_var (agr->dict, dest[i], 0);
611                     if (destvar != NULL) 
612                       {
613                         struct fmt_spec f;
614                         if ((func_index == N || func_index == NMISS)
615                             && dict_get_weight (dict) != NULL)
616                           f = fmt_for_output (FMT_F, 8, 2); 
617                         else
618                           f = function->format;
619                         var_set_both_formats (destvar, &f);
620                       }
621                   }
622               } else {
623                 struct fmt_spec f;
624                 v->src = NULL;
625                 destvar = dict_create_var (agr->dict, dest[i], 0);
626                 if (func_index == N_NO_VARS && dict_get_weight (dict) != NULL)
627                   f = fmt_for_output (FMT_F, 8, 2); 
628                 else
629                   f = function->format;
630                 var_set_both_formats (destvar, &f);
631               }
632           
633             if (!destvar)
634               {
635                 msg (SE, _("Variable name %s is not unique within the "
636                            "aggregate file dictionary, which contains "
637                            "the aggregate variables and the break "
638                            "variables."),
639                      dest[i]);
640                 goto error;
641               }
642
643             free (dest[i]);
644             if (dest_label[i])
645               var_set_label (destvar, dest_label[i]);
646
647             v->dest = destvar;
648           }
649           
650           v->include_missing = include_missing;
651
652           if (v->src != NULL)
653             {
654               int j;
655
656               if (var_is_numeric (v->src))
657                 for (j = 0; j < function->n_args; j++)
658                   v->arg[j].f = arg[j].f;
659               else
660                 for (j = 0; j < function->n_args; j++)
661                   v->arg[j].c = xstrdup (arg[j].c);
662             }
663         }
664       
665       if (src != NULL && var_is_alpha (src[0]))
666         for (i = 0; i < function->n_args; i++)
667           {
668             free (arg[i].c);
669             arg[i].c = NULL;
670           }
671
672       free (src);
673       free (dest);
674       free (dest_label);
675
676       if (!lex_match (lexer, '/'))
677         {
678           if (lex_token (lexer) == '.')
679             return true;
680
681           lex_error (lexer, "expecting end of command");
682           return false;
683         }
684       continue;
685       
686     error:
687       ds_destroy (&function_name);
688       for (i = 0; i < n_dest; i++)
689         {
690           free (dest[i]);
691           free (dest_label[i]);
692         }
693       free (dest);
694       free (dest_label);
695       free (arg[0].c);
696       free (arg[1].c);
697       if (src && n_src && var_is_alpha (src[0]))
698         for (i = 0; i < function->n_args; i++)
699           {
700             free (arg[i].c);
701             arg[i].c = NULL;
702           }
703       free (src);
704         
705       return false;
706     }
707 }
708
709 /* Destroys AGR. */
710 static void
711 agr_destroy (struct agr_proc *agr)
712 {
713   struct agr_var *iter, *next;
714
715   any_writer_close (agr->writer);
716   if (agr->sort != NULL)
717     sort_destroy_criteria (agr->sort);
718   free (agr->break_vars);
719   case_destroy (&agr->break_case);
720   for (iter = agr->agr_vars; iter; iter = next)
721     {
722       next = iter->next;
723
724       if (iter->function & FSTRING)
725         {
726           size_t n_args;
727           size_t i;
728
729           n_args = agr_func_tab[iter->function & FUNC].n_args;
730           for (i = 0; i < n_args; i++)
731             free (iter->arg[i].c);
732           free (iter->string);
733         }
734       else if (iter->function == SD)
735         moments1_destroy (iter->moments);
736       free (iter);
737     }
738   if (agr->dict != NULL)
739     dict_destroy (agr->dict);
740
741   case_destroy (&agr->agr_case);
742 }
743 \f
744 /* Execution. */
745
746 static void accumulate_aggregate_info (struct agr_proc *,
747                                        const struct ccase *);
748 static void dump_aggregate_info (struct agr_proc *, struct ccase *);
749
750 /* Processes a single case INPUT for aggregation.  If output is
751    warranted, writes it to OUTPUT and returns true.
752    Otherwise, returns false and OUTPUT is unmodified. */
753 static bool
754 aggregate_single_case (struct agr_proc *agr,
755                        const struct ccase *input, struct ccase *output)
756 {
757   bool finished_group = false;
758   
759   if (agr->case_cnt++ == 0)
760     initialize_aggregate_info (agr, input);
761   else if (case_compare (&agr->break_case, input,
762                          agr->break_vars, agr->break_var_cnt))
763     {
764       dump_aggregate_info (agr, output);
765       finished_group = true;
766
767       initialize_aggregate_info (agr, input);
768     }
769
770   accumulate_aggregate_info (agr, input);
771   return finished_group;
772 }
773
774 /* Accumulates aggregation data from the case INPUT. */
775 static void 
776 accumulate_aggregate_info (struct agr_proc *agr,
777                            const struct ccase *input)
778 {
779   struct agr_var *iter;
780   double weight;
781   bool bad_warn = true;
782
783   weight = dict_get_case_weight (agr->src_dict, input, &bad_warn);
784
785   for (iter = agr->agr_vars; iter; iter = iter->next)
786     if (iter->src)
787       {
788         const union value *v = case_data (input, iter->src);
789         int src_width = var_get_width (iter->src);
790
791         if (iter->include_missing
792             ? var_is_numeric (iter->src) && v->f == SYSMIS
793             : var_is_value_missing (iter->src, v))
794           {
795             switch (iter->function)
796               {
797               case NMISS:
798               case NMISS | FSTRING:
799                 iter->dbl[0] += weight;
800                 break;
801               case NUMISS:
802               case NUMISS | FSTRING:
803                 iter->int1++;
804                 break;
805               }
806             iter->missing = 1;
807             continue;
808           }
809         
810         /* This is horrible.  There are too many possibilities. */
811         switch (iter->function)
812           {
813           case SUM:
814             iter->dbl[0] += v->f * weight;
815             iter->int1 = 1;
816             break;
817           case MEAN:
818             iter->dbl[0] += v->f * weight;
819             iter->dbl[1] += weight;
820             break;
821           case SD:
822             moments1_add (iter->moments, v->f, weight);
823             break;
824           case MAX:
825             iter->dbl[0] = MAX (iter->dbl[0], v->f);
826             iter->int1 = 1;
827             break;
828           case MAX | FSTRING:
829             if (memcmp (iter->string, v->s, src_width) < 0)
830               memcpy (iter->string, v->s, src_width);
831             iter->int1 = 1;
832             break;
833           case MIN:
834             iter->dbl[0] = MIN (iter->dbl[0], v->f);
835             iter->int1 = 1;
836             break;
837           case MIN | FSTRING:
838             if (memcmp (iter->string, v->s, src_width) > 0)
839               memcpy (iter->string, v->s, src_width);
840             iter->int1 = 1;
841             break;
842           case FGT:
843           case PGT:
844             if (v->f > iter->arg[0].f)
845               iter->dbl[0] += weight;
846             iter->dbl[1] += weight;
847             break;
848           case FGT | FSTRING:
849           case PGT | FSTRING:
850             if (memcmp (iter->arg[0].c, v->s, src_width) < 0)
851               iter->dbl[0] += weight;
852             iter->dbl[1] += weight;
853             break;
854           case FLT:
855           case PLT:
856             if (v->f < iter->arg[0].f)
857               iter->dbl[0] += weight;
858             iter->dbl[1] += weight;
859             break;
860           case FLT | FSTRING:
861           case PLT | FSTRING:
862             if (memcmp (iter->arg[0].c, v->s, src_width) > 0)
863               iter->dbl[0] += weight;
864             iter->dbl[1] += weight;
865             break;
866           case FIN:
867           case PIN:
868             if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
869               iter->dbl[0] += weight;
870             iter->dbl[1] += weight;
871             break;
872           case FIN | FSTRING:
873           case PIN | FSTRING:
874             if (memcmp (iter->arg[0].c, v->s, src_width) <= 0
875                 && memcmp (iter->arg[1].c, v->s, src_width) >= 0)
876               iter->dbl[0] += weight;
877             iter->dbl[1] += weight;
878             break;
879           case FOUT:
880           case POUT:
881             if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
882               iter->dbl[0] += weight;
883             iter->dbl[1] += weight;
884             break;
885           case FOUT | FSTRING:
886           case POUT | FSTRING:
887             if (memcmp (iter->arg[0].c, v->s, src_width) > 0
888                 || memcmp (iter->arg[1].c, v->s, src_width) < 0)
889               iter->dbl[0] += weight;
890             iter->dbl[1] += weight;
891             break;
892           case N:
893           case N | FSTRING:
894             iter->dbl[0] += weight;
895             break;
896           case NU:
897           case NU | FSTRING:
898             iter->int1++;
899             break;
900           case FIRST:
901             if (iter->int1 == 0)
902               {
903                 iter->dbl[0] = v->f;
904                 iter->int1 = 1;
905               }
906             break;
907           case FIRST | FSTRING:
908             if (iter->int1 == 0)
909               {
910                 memcpy (iter->string, v->s, src_width);
911                 iter->int1 = 1;
912               }
913             break;
914           case LAST:
915             iter->dbl[0] = v->f;
916             iter->int1 = 1;
917             break;
918           case LAST | FSTRING:
919             memcpy (iter->string, v->s, src_width);
920             iter->int1 = 1;
921             break;
922           case NMISS:
923           case NMISS | FSTRING:
924           case NUMISS:
925           case NUMISS | FSTRING:
926             /* Our value is not missing or it would have been
927                caught earlier.  Nothing to do. */
928             break;
929           default:
930             NOT_REACHED ();
931           }
932     } else {
933       switch (iter->function)
934         {
935         case N_NO_VARS:
936           iter->dbl[0] += weight;
937           break;
938         case NU_NO_VARS:
939           iter->int1++;
940           break;
941         default:
942           NOT_REACHED ();
943         }
944     }
945 }
946
947 /* We've come to a record that differs from the previous in one or
948    more of the break variables.  Make an output record from the
949    accumulated statistics in the OUTPUT case. */
950 static void 
951 dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
952 {
953   {
954     int value_idx = 0;
955     int i;
956
957     for (i = 0; i < agr->break_var_cnt; i++) 
958       {
959         struct variable *v = agr->break_vars[i];
960         size_t value_cnt = var_get_value_cnt (v);
961         memcpy (case_data_rw_idx (output, value_idx),
962                 case_data (&agr->break_case, v),
963                 sizeof (union value) * value_cnt);
964         value_idx += value_cnt; 
965       }
966   }
967   
968   {
969     struct agr_var *i;
970   
971     for (i = agr->agr_vars; i; i = i->next)
972       {
973         union value *v = case_data_rw (output, i->dest);
974
975         if (agr->missing == COLUMNWISE && i->missing != 0
976             && (i->function & FUNC) != N && (i->function & FUNC) != NU
977             && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
978           {
979             if (var_is_alpha (i->dest))
980               memset (v->s, ' ', var_get_width (i->dest));
981             else
982               v->f = SYSMIS;
983             continue;
984           }
985         
986         switch (i->function)
987           {
988           case SUM:
989             v->f = i->int1 ? i->dbl[0] : SYSMIS;
990             break;
991           case MEAN:
992             v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
993             break;
994           case SD:
995             {
996               double variance;
997
998               /* FIXME: we should use two passes. */
999               moments1_calculate (i->moments, NULL, NULL, &variance,
1000                                  NULL, NULL);
1001               if (variance != SYSMIS)
1002                 v->f = sqrt (variance);
1003               else
1004                 v->f = SYSMIS; 
1005             }
1006             break;
1007           case MAX:
1008           case MIN:
1009             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1010             break;
1011           case MAX | FSTRING:
1012           case MIN | FSTRING:
1013             if (i->int1)
1014               memcpy (v->s, i->string, var_get_width (i->dest));
1015             else
1016               memset (v->s, ' ', var_get_width (i->dest));
1017             break;
1018           case FGT:
1019           case FGT | FSTRING:
1020           case FLT:
1021           case FLT | FSTRING:
1022           case FIN:
1023           case FIN | FSTRING:
1024           case FOUT:
1025           case FOUT | FSTRING:
1026             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
1027             break;
1028           case PGT:
1029           case PGT | FSTRING:
1030           case PLT:
1031           case PLT | FSTRING:
1032           case PIN:
1033           case PIN | FSTRING:
1034           case POUT:
1035           case POUT | FSTRING:
1036             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
1037             break;
1038           case N:
1039           case N | FSTRING:
1040             v->f = i->dbl[0];
1041             break;
1042           case NU:
1043           case NU | FSTRING:
1044             v->f = i->int1;
1045             break;
1046           case FIRST:
1047           case LAST:
1048             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1049             break;
1050           case FIRST | FSTRING:
1051           case LAST | FSTRING:
1052             if (i->int1)
1053               memcpy (v->s, i->string, var_get_width (i->dest));
1054             else
1055               memset (v->s, ' ', var_get_width (i->dest));
1056             break;
1057           case N_NO_VARS:
1058             v->f = i->dbl[0];
1059             break;
1060           case NU_NO_VARS:
1061             v->f = i->int1;
1062             break;
1063           case NMISS:
1064           case NMISS | FSTRING:
1065             v->f = i->dbl[0];
1066             break;
1067           case NUMISS:
1068           case NUMISS | FSTRING:
1069             v->f = i->int1;
1070             break;
1071           default:
1072             NOT_REACHED ();
1073           }
1074       }
1075   }
1076 }
1077
1078 /* Resets the state for all the aggregate functions. */
1079 static void
1080 initialize_aggregate_info (struct agr_proc *agr, const struct ccase *input)
1081 {
1082   struct agr_var *iter;
1083
1084   case_destroy (&agr->break_case);
1085   case_clone (&agr->break_case, input);
1086
1087   for (iter = agr->agr_vars; iter; iter = iter->next)
1088     {
1089       iter->missing = 0;
1090       iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1091       iter->int1 = iter->int2 = 0;
1092       switch (iter->function)
1093         {
1094         case MIN:
1095           iter->dbl[0] = DBL_MAX;
1096           break;
1097         case MIN | FSTRING:
1098           memset (iter->string, 255, var_get_width (iter->src));
1099           break;
1100         case MAX:
1101           iter->dbl[0] = -DBL_MAX;
1102           break;
1103         case MAX | FSTRING:
1104           memset (iter->string, 0, var_get_width (iter->src));
1105           break;
1106         case SD:
1107           if (iter->moments == NULL)
1108             iter->moments = moments1_create (MOMENT_VARIANCE);
1109           else
1110             moments1_clear (iter->moments);
1111           break;
1112         default:
1113           break;
1114         }
1115     }
1116 }