Remove "Written by Ben Pfaff <blp@gnu.org>" lines everywhere.
[pspp-builds.git] / src / language / stats / aggregate.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
3
4    This program is free software; you can redistribute it and/or
5    modify it under the terms of the GNU General Public License as
6    published by the Free Software Foundation; either version 2 of the
7    License, or (at your option) any later version.
8
9    This program is distributed in the hope that it will be useful, but
10    WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12    General Public License for more details.
13
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
17    02110-1301, USA. */
18
19 #include <config.h>
20
21 #include <stdlib.h>
22
23 #include <data/any-writer.h>
24 #include <data/case-sink.h>
25 #include <data/case.h>
26 #include <data/casefile.h>
27 #include <data/dictionary.h>
28 #include <data/file-handle-def.h>
29 #include <data/format.h>
30 #include <data/procedure.h>
31 #include <data/settings.h>
32 #include <data/storage-stream.h>
33 #include <data/sys-file-writer.h>
34 #include <data/variable.h>
35 #include <language/command.h>
36 #include <language/data-io/file-handle.h>
37 #include <language/lexer/lexer.h>
38 #include <language/lexer/variable-parser.h>
39 #include <language/stats/sort-criteria.h>
40 #include <libpspp/alloc.h>
41 #include <libpspp/assertion.h>
42 #include <libpspp/message.h>
43 #include <libpspp/misc.h>
44 #include <libpspp/pool.h>
45 #include <libpspp/str.h>
46 #include <math/moments.h>
47 #include <math/sort.h>
48
49 #include "minmax.h"
50
51 #include "gettext.h"
52 #define _(msgid) gettext (msgid)
53
54 /* Argument for AGGREGATE function. */
55 union agr_argument
56   {
57     double f;                           /* Numeric. */
58     char *c;                            /* Short or long string. */
59   };
60
61 /* Specifies how to make an aggregate variable. */
62 struct agr_var
63   {
64     struct agr_var *next;               /* Next in list. */
65
66     /* Collected during parsing. */
67     struct variable *src;       /* Source variable. */
68     struct variable *dest;      /* Target variable. */
69     int function;               /* Function. */
70     int include_missing;        /* 1=Include user-missing values. */
71     union agr_argument arg[2];  /* Arguments. */
72
73     /* Accumulated during AGGREGATE execution. */
74     double dbl[3];
75     int int1, int2;
76     char *string;
77     int missing;
78     struct moments1 *moments;
79   };
80
81 /* Aggregation functions. */
82 enum
83   {
84     NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
85     FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
86     N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
87     FUNC = 0x1f, /* Function mask. */
88     FSTRING = 1<<5, /* String function bit. */
89   };
90
91 /* Attributes of an aggregation function. */
92 struct agr_func
93   {
94     const char *name;           /* Aggregation function name. */
95     size_t n_args;              /* Number of arguments. */
96     enum var_type alpha_type;   /* When given ALPHA arguments, output type. */
97     struct fmt_spec format;     /* Format spec if alpha_type != ALPHA. */
98   };
99
100 /* Attributes of aggregation functions. */
101 static const struct agr_func agr_func_tab[] = 
102   {
103     {"<NONE>",  0, -1,          {0, 0, 0}},
104     {"SUM",     0, -1,          {FMT_F, 8, 2}},
105     {"MEAN",    0, -1,          {FMT_F, 8, 2}},
106     {"SD",      0, -1,          {FMT_F, 8, 2}},
107     {"MAX",     0, VAR_STRING,  {-1, -1, -1}}, 
108     {"MIN",     0, VAR_STRING,  {-1, -1, -1}}, 
109     {"PGT",     1, VAR_NUMERIC, {FMT_F, 5, 1}},      
110     {"PLT",     1, VAR_NUMERIC, {FMT_F, 5, 1}},       
111     {"PIN",     2, VAR_NUMERIC, {FMT_F, 5, 1}},       
112     {"POUT",    2, VAR_NUMERIC, {FMT_F, 5, 1}},       
113     {"FGT",     1, VAR_NUMERIC, {FMT_F, 5, 3}},       
114     {"FLT",     1, VAR_NUMERIC, {FMT_F, 5, 3}},       
115     {"FIN",     2, VAR_NUMERIC, {FMT_F, 5, 3}},       
116     {"FOUT",    2, VAR_NUMERIC, {FMT_F, 5, 3}},       
117     {"N",       0, VAR_NUMERIC, {FMT_F, 7, 0}},       
118     {"NU",      0, VAR_NUMERIC, {FMT_F, 7, 0}},       
119     {"NMISS",   0, VAR_NUMERIC, {FMT_F, 7, 0}},       
120     {"NUMISS",  0, VAR_NUMERIC, {FMT_F, 7, 0}},       
121     {"FIRST",   0, VAR_STRING,  {-1, -1, -1}}, 
122     {"LAST",    0, VAR_STRING,  {-1, -1, -1}},
123     {NULL,      0, -1,          {-1, -1, -1}},
124     {"N",       0, VAR_NUMERIC, {FMT_F, 7, 0}},
125     {"NU",      0, VAR_NUMERIC, {FMT_F, 7, 0}},
126   };
127
128 /* Missing value types. */
129 enum missing_treatment
130   {
131     ITEMWISE,           /* Missing values item by item. */
132     COLUMNWISE          /* Missing values column by column. */
133   };
134
135 /* An entire AGGREGATE procedure. */
136 struct agr_proc 
137   {
138     /* We have either an output file or a sink. */
139     struct any_writer *writer;          /* Output file, or null if none. */
140     struct case_sink *sink;             /* Sink, or null if none. */
141
142     /* Break variables. */
143     struct sort_criteria *sort;         /* Sort criteria. */
144     struct variable **break_vars;       /* Break variables. */
145     size_t break_var_cnt;               /* Number of break variables. */
146     struct ccase break_case;            /* Last values of break variables. */
147
148     enum missing_treatment missing;     /* How to treat missing values. */
149     struct agr_var *agr_vars;           /* First aggregate variable. */
150     struct dictionary *dict;            /* Aggregate dictionary. */
151     const struct dictionary *src_dict;  /* Dict of the source */
152     int case_cnt;                       /* Counts aggregated cases. */
153     struct ccase agr_case;              /* Aggregate case for output. */
154   };
155
156 static void initialize_aggregate_info (struct agr_proc *,
157                                        const struct ccase *);
158
159 /* Prototypes. */
160 static bool parse_aggregate_functions (struct lexer *, const struct dictionary *,
161                                        struct agr_proc *);
162 static void agr_destroy (struct agr_proc *);
163 static bool aggregate_single_case (struct agr_proc *agr,
164                                    const struct ccase *input,
165                                    struct ccase *output);
166 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
167
168 /* Aggregating to the active file. */
169 static bool agr_to_active_file (const struct ccase *, void *aux, const struct dataset *);
170
171 /* Aggregating to a system file. */
172 static bool presorted_agr_to_sysfile (const struct ccase *, void *aux, const struct dataset *);
173 \f
174 /* Parsing. */
175
176 /* Parses and executes the AGGREGATE procedure. */
177 int
178 cmd_aggregate (struct lexer *lexer, struct dataset *ds)
179 {
180   struct dictionary *dict = dataset_dict (ds);
181   struct agr_proc agr;
182   struct file_handle *out_file = NULL;
183
184   bool copy_documents = false;
185   bool presorted = false;
186   bool saw_direction;
187
188   memset(&agr, 0 , sizeof (agr));
189   agr.missing = ITEMWISE;
190   case_nullify (&agr.break_case);
191   
192   agr.dict = dict_create ();
193   agr.src_dict = dict;
194   dict_set_label (agr.dict, dict_get_label (dict));
195   dict_set_documents (agr.dict, dict_get_documents (dict));
196
197   /* OUTFILE subcommand must be first. */
198   if (!lex_force_match_id (lexer, "OUTFILE"))
199     goto error;
200   lex_match (lexer, '=');
201   if (!lex_match (lexer, '*'))
202     {
203       out_file = fh_parse (lexer, FH_REF_FILE | FH_REF_SCRATCH);
204       if (out_file == NULL)
205         goto error;
206     }
207   
208   /* Read most of the subcommands. */
209   for (;;)
210     {
211       lex_match (lexer, '/');
212       
213       if (lex_match_id (lexer, "MISSING"))
214         {
215           lex_match (lexer, '=');
216           if (!lex_match_id (lexer, "COLUMNWISE"))
217             {
218               lex_error (lexer, _("while expecting COLUMNWISE"));
219               goto error;
220             }
221           agr.missing = COLUMNWISE;
222         }
223       else if (lex_match_id (lexer, "DOCUMENT"))
224         copy_documents = true;
225       else if (lex_match_id (lexer, "PRESORTED"))
226         presorted = true;
227       else if (lex_match_id (lexer, "BREAK"))
228         {
229           int i;
230
231           lex_match (lexer, '=');
232           agr.sort = sort_parse_criteria (lexer, dict,
233                                           &agr.break_vars, &agr.break_var_cnt,
234                                           &saw_direction, NULL);
235           if (agr.sort == NULL)
236             goto error;
237           
238           for (i = 0; i < agr.break_var_cnt; i++)
239             dict_clone_var_assert (agr.dict, agr.break_vars[i],
240                                    var_get_name (agr.break_vars[i]));
241
242           /* BREAK must follow the options. */
243           break;
244         }
245       else
246         {
247           lex_error (lexer, _("expecting BREAK"));
248           goto error;
249         }
250     }
251   if (presorted && saw_direction)
252     msg (SW, _("When PRESORTED is specified, specifying sorting directions "
253                "with (A) or (D) has no effect.  Output data will be sorted "
254                "the same way as the input data."));
255       
256   /* Read in the aggregate functions. */
257   lex_match (lexer, '/');
258   if (!parse_aggregate_functions (lexer, dict, &agr))
259     goto error;
260
261   /* Delete documents. */
262   if (!copy_documents)
263     dict_set_documents (agr.dict, NULL);
264
265   /* Cancel SPLIT FILE. */
266   dict_set_split_vars (agr.dict, NULL, 0);
267   
268   /* Initialize. */
269   agr.case_cnt = 0;
270   case_create (&agr.agr_case, dict_get_next_value_idx (agr.dict));
271
272   /* Output to active file or external file? */
273   if (out_file == NULL) 
274     {
275       /* The active file will be replaced by the aggregated data,
276          so TEMPORARY is moot. */
277       proc_cancel_temporary_transformations (ds);
278
279       if (agr.sort != NULL && !presorted) 
280         {
281           if (!sort_active_file_in_place (ds, agr.sort))
282             goto error;
283         }
284
285       agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
286       if (agr.sink->class->open != NULL)
287         agr.sink->class->open (agr.sink);
288       proc_set_sink (ds, 
289                      create_case_sink (&null_sink_class, 
290                                        dict, NULL));
291       if (!procedure (ds, agr_to_active_file, &agr))
292         goto error;
293       if (agr.case_cnt > 0) 
294         {
295           dump_aggregate_info (&agr, &agr.agr_case);
296           if (!agr.sink->class->write (agr.sink, &agr.agr_case))
297             goto error;
298         }
299       discard_variables (ds);
300       dict_destroy (dict);
301       dataset_set_dict (ds, agr.dict);
302       agr.dict = NULL;
303       proc_set_source (ds, 
304                        agr.sink->class->make_source (agr.sink));
305       free_case_sink (agr.sink);
306     }
307   else
308     {
309       agr.writer = any_writer_open (out_file, agr.dict);
310       if (agr.writer == NULL)
311         goto error;
312       
313       if (agr.sort != NULL && !presorted) 
314         {
315           /* Sorting is needed. */
316           struct casefile *dst;
317           struct casereader *reader;
318           struct ccase c;
319           bool ok = true;
320           
321           dst = sort_active_file_to_casefile (ds, agr.sort);
322           if (dst == NULL)
323             goto error;
324           reader = casefile_get_destructive_reader (dst);
325           while (ok && casereader_read_xfer (reader, &c)) 
326             {
327               if (aggregate_single_case (&agr, &c, &agr.agr_case)) 
328                 ok = any_writer_write (agr.writer, &agr.agr_case);
329               case_destroy (&c);
330             }
331           casereader_destroy (reader);
332           if (ok)
333             ok = !casefile_error (dst);
334           casefile_destroy (dst);
335           if (!ok)
336             goto error;
337         }
338       else 
339         {
340           /* Active file is already sorted. */
341           if (!procedure (ds, presorted_agr_to_sysfile, &agr))
342             goto error;
343         }
344       
345       if (agr.case_cnt > 0) 
346         {
347           dump_aggregate_info (&agr, &agr.agr_case);
348           any_writer_write (agr.writer, &agr.agr_case);
349         }
350       if (any_writer_error (agr.writer))
351         goto error;
352     }
353   
354   agr_destroy (&agr);
355   return CMD_SUCCESS;
356
357 error:
358   agr_destroy (&agr);
359   return CMD_CASCADING_FAILURE;
360 }
361
362 /* Parse all the aggregate functions. */
363 static bool
364 parse_aggregate_functions (struct lexer *lexer, const struct dictionary *dict, struct agr_proc *agr)
365 {
366   struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
367
368   /* Parse everything. */
369   tail = NULL;
370   for (;;)
371     {
372       char **dest;
373       char **dest_label;
374       size_t n_dest;
375       struct string function_name;
376
377       int include_missing;
378       const struct agr_func *function;
379       int func_index;
380
381       union agr_argument arg[2];
382
383       struct variable **src;
384       size_t n_src;
385
386       size_t i;
387
388       dest = NULL;
389       dest_label = NULL;
390       n_dest = 0;
391       src = NULL;
392       function = NULL;
393       n_src = 0;
394       arg[0].c = NULL;
395       arg[1].c = NULL;
396
397       /* Parse the list of target variables. */
398       while (!lex_match (lexer, '='))
399         {
400           size_t n_dest_prev = n_dest;
401           
402           if (!parse_DATA_LIST_vars (lexer, &dest, &n_dest,
403                                      PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
404             goto error;
405
406           /* Assign empty labels. */
407           {
408             int j;
409
410             dest_label = xnrealloc (dest_label, n_dest, sizeof *dest_label);
411             for (j = n_dest_prev; j < n_dest; j++)
412               dest_label[j] = NULL;
413           }
414
415
416           
417           if (lex_token (lexer) == T_STRING)
418             {
419               struct string label;
420               ds_init_string (&label, lex_tokstr (lexer));
421
422               ds_truncate (&label, 255);
423               dest_label[n_dest - 1] = ds_xstrdup (&label);
424               lex_get (lexer);
425               ds_destroy (&label);
426             }
427         }
428
429       /* Get the name of the aggregation function. */
430       if (lex_token (lexer) != T_ID)
431         {
432           lex_error (lexer, _("expecting aggregation function"));
433           goto error;
434         }
435
436       include_missing = 0;
437
438       ds_init_string (&function_name, lex_tokstr (lexer));
439
440       ds_chomp (&function_name, '.');
441
442       if (lex_tokid(lexer)[strlen (lex_tokid (lexer)) - 1] == '.')
443           include_missing = 1;
444
445       for (function = agr_func_tab; function->name; function++)
446         if (!strcasecmp (function->name, ds_cstr (&function_name)))
447           break;
448       if (NULL == function->name)
449         {
450           msg (SE, _("Unknown aggregation function %s."), 
451                ds_cstr (&function_name));
452           goto error;
453         }
454       ds_destroy (&function_name);
455       func_index = function - agr_func_tab;
456       lex_get (lexer);
457
458       /* Check for leading lparen. */
459       if (!lex_match (lexer, '('))
460         {
461           if (func_index == N)
462             func_index = N_NO_VARS;
463           else if (func_index == NU)
464             func_index = NU_NO_VARS;
465           else
466             {
467               lex_error (lexer, _("expecting `('"));
468               goto error;
469             }
470         }
471       else
472         {
473           /* Parse list of source variables. */
474           {
475             int pv_opts = PV_NO_SCRATCH;
476
477             if (func_index == SUM || func_index == MEAN || func_index == SD)
478               pv_opts |= PV_NUMERIC;
479             else if (function->n_args)
480               pv_opts |= PV_SAME_TYPE;
481
482             if (!parse_variables (lexer, dict, &src, &n_src, pv_opts))
483               goto error;
484           }
485
486           /* Parse function arguments, for those functions that
487              require arguments. */
488           if (function->n_args != 0)
489             for (i = 0; i < function->n_args; i++)
490               {
491                 int type;
492             
493                 lex_match (lexer, ',');
494                 if (lex_token (lexer) == T_STRING)
495                   {
496                     arg[i].c = ds_xstrdup (lex_tokstr (lexer));
497                     type = VAR_STRING;
498                   }
499                 else if (lex_is_number (lexer))
500                   {
501                     arg[i].f = lex_tokval (lexer);
502                     type = VAR_NUMERIC;
503                   }
504                 else
505                   {
506                     msg (SE, _("Missing argument %d to %s."), i + 1,
507                          function->name);
508                     goto error;
509                   }
510             
511                 lex_get (lexer);
512
513                 if (type != var_get_type (src[0]))
514                   {
515                     msg (SE, _("Arguments to %s must be of same type as "
516                                "source variables."),
517                          function->name);
518                     goto error;
519                   }
520               }
521
522           /* Trailing rparen. */
523           if (!lex_match (lexer, ')'))
524             {
525               lex_error (lexer, _("expecting `)'"));
526               goto error;
527             }
528           
529           /* Now check that the number of source variables match
530              the number of target variables.  If we check earlier
531              than this, the user can get very misleading error
532              message, i.e. `AGGREGATE x=SUM(y t).' will get this
533              error message when a proper message would be more
534              like `unknown variable t'. */
535           if (n_src != n_dest)
536             {
537               msg (SE, _("Number of source variables (%u) does not match "
538                          "number of target variables (%u)."),
539                    (unsigned) n_src, (unsigned) n_dest);
540               goto error;
541             }
542
543           if ((func_index == PIN || func_index == POUT
544               || func_index == FIN || func_index == FOUT) 
545               && (var_is_numeric (src[0])
546                   ? arg[0].f > arg[1].f
547                   : str_compare_rpad (arg[0].c, arg[1].c) > 0))
548             {
549               union agr_argument t = arg[0];
550               arg[0] = arg[1];
551               arg[1] = t;
552                   
553               msg (SW, _("The value arguments passed to the %s function "
554                          "are out-of-order.  They will be treated as if "
555                          "they had been specified in the correct order."),
556                    function->name);
557             }
558         }
559         
560       /* Finally add these to the linked list of aggregation
561          variables. */
562       for (i = 0; i < n_dest; i++)
563         {
564           struct agr_var *v = xmalloc (sizeof *v);
565
566           /* Add variable to chain. */
567           if (agr->agr_vars != NULL)
568             tail->next = v;
569           else
570             agr->agr_vars = v;
571           tail = v;
572           tail->next = NULL;
573           v->moments = NULL;
574           
575           /* Create the target variable in the aggregate
576              dictionary. */
577           {
578             struct variable *destvar;
579             
580             v->function = func_index;
581
582             if (src)
583               {
584                 v->src = src[i];
585                 
586                 if (var_is_alpha (src[i]))
587                   {
588                     v->function |= FSTRING;
589                     v->string = xmalloc (var_get_width (src[i]));
590                   }
591
592                 if (function->alpha_type == VAR_STRING)
593                   destvar = dict_clone_var (agr->dict, v->src, dest[i]);
594                 else
595                   {
596                     assert (var_is_numeric (v->src)
597                             || function->alpha_type == VAR_NUMERIC);
598                     destvar = dict_create_var (agr->dict, dest[i], 0);
599                     if (destvar != NULL) 
600                       {
601                         struct fmt_spec f;
602                         if ((func_index == N || func_index == NMISS)
603                             && dict_get_weight (dict) != NULL)
604                           f = fmt_for_output (FMT_F, 8, 2); 
605                         else
606                           f = function->format;
607                         var_set_both_formats (destvar, &f);
608                       }
609                   }
610               } else {
611                 struct fmt_spec f;
612                 v->src = NULL;
613                 destvar = dict_create_var (agr->dict, dest[i], 0);
614                 if (func_index == N_NO_VARS && dict_get_weight (dict) != NULL)
615                   f = fmt_for_output (FMT_F, 8, 2); 
616                 else
617                   f = function->format;
618                 var_set_both_formats (destvar, &f);
619               }
620           
621             if (!destvar)
622               {
623                 msg (SE, _("Variable name %s is not unique within the "
624                            "aggregate file dictionary, which contains "
625                            "the aggregate variables and the break "
626                            "variables."),
627                      dest[i]);
628                 goto error;
629               }
630
631             free (dest[i]);
632             if (dest_label[i])
633               var_set_label (destvar, dest_label[i]);
634
635             v->dest = destvar;
636           }
637           
638           v->include_missing = include_missing;
639
640           if (v->src != NULL)
641             {
642               int j;
643
644               if (var_is_numeric (v->src))
645                 for (j = 0; j < function->n_args; j++)
646                   v->arg[j].f = arg[j].f;
647               else
648                 for (j = 0; j < function->n_args; j++)
649                   v->arg[j].c = xstrdup (arg[j].c);
650             }
651         }
652       
653       if (src != NULL && var_is_alpha (src[0]))
654         for (i = 0; i < function->n_args; i++)
655           {
656             free (arg[i].c);
657             arg[i].c = NULL;
658           }
659
660       free (src);
661       free (dest);
662       free (dest_label);
663
664       if (!lex_match (lexer, '/'))
665         {
666           if (lex_token (lexer) == '.')
667             return true;
668
669           lex_error (lexer, "expecting end of command");
670           return false;
671         }
672       continue;
673       
674     error:
675       ds_destroy (&function_name);
676       for (i = 0; i < n_dest; i++)
677         {
678           free (dest[i]);
679           free (dest_label[i]);
680         }
681       free (dest);
682       free (dest_label);
683       free (arg[0].c);
684       free (arg[1].c);
685       if (src && n_src && var_is_alpha (src[0]))
686         for (i = 0; i < function->n_args; i++)
687           {
688             free (arg[i].c);
689             arg[i].c = NULL;
690           }
691       free (src);
692         
693       return false;
694     }
695 }
696
697 /* Destroys AGR. */
698 static void
699 agr_destroy (struct agr_proc *agr)
700 {
701   struct agr_var *iter, *next;
702
703   any_writer_close (agr->writer);
704   if (agr->sort != NULL)
705     sort_destroy_criteria (agr->sort);
706   free (agr->break_vars);
707   case_destroy (&agr->break_case);
708   for (iter = agr->agr_vars; iter; iter = next)
709     {
710       next = iter->next;
711
712       if (iter->function & FSTRING)
713         {
714           size_t n_args;
715           size_t i;
716
717           n_args = agr_func_tab[iter->function & FUNC].n_args;
718           for (i = 0; i < n_args; i++)
719             free (iter->arg[i].c);
720           free (iter->string);
721         }
722       else if (iter->function == SD)
723         moments1_destroy (iter->moments);
724       free (iter);
725     }
726   if (agr->dict != NULL)
727     dict_destroy (agr->dict);
728
729   case_destroy (&agr->agr_case);
730 }
731 \f
732 /* Execution. */
733
734 static void accumulate_aggregate_info (struct agr_proc *,
735                                        const struct ccase *);
736 static void dump_aggregate_info (struct agr_proc *, struct ccase *);
737
738 /* Processes a single case INPUT for aggregation.  If output is
739    warranted, writes it to OUTPUT and returns true.
740    Otherwise, returns false and OUTPUT is unmodified. */
741 static bool
742 aggregate_single_case (struct agr_proc *agr,
743                        const struct ccase *input, struct ccase *output)
744 {
745   bool finished_group = false;
746   
747   if (agr->case_cnt++ == 0)
748     initialize_aggregate_info (agr, input);
749   else if (case_compare (&agr->break_case, input,
750                          agr->break_vars, agr->break_var_cnt))
751     {
752       dump_aggregate_info (agr, output);
753       finished_group = true;
754
755       initialize_aggregate_info (agr, input);
756     }
757
758   accumulate_aggregate_info (agr, input);
759   return finished_group;
760 }
761
762 /* Accumulates aggregation data from the case INPUT. */
763 static void 
764 accumulate_aggregate_info (struct agr_proc *agr,
765                            const struct ccase *input)
766 {
767   struct agr_var *iter;
768   double weight;
769   bool bad_warn = true;
770
771   weight = dict_get_case_weight (agr->src_dict, input, &bad_warn);
772
773   for (iter = agr->agr_vars; iter; iter = iter->next)
774     if (iter->src)
775       {
776         const union value *v = case_data (input, iter->src);
777         int src_width = var_get_width (iter->src);
778
779         if (iter->include_missing
780             ? var_is_numeric (iter->src) && v->f == SYSMIS
781             : var_is_value_missing (iter->src, v))
782           {
783             switch (iter->function)
784               {
785               case NMISS:
786               case NMISS | FSTRING:
787                 iter->dbl[0] += weight;
788                 break;
789               case NUMISS:
790               case NUMISS | FSTRING:
791                 iter->int1++;
792                 break;
793               }
794             iter->missing = 1;
795             continue;
796           }
797         
798         /* This is horrible.  There are too many possibilities. */
799         switch (iter->function)
800           {
801           case SUM:
802             iter->dbl[0] += v->f * weight;
803             iter->int1 = 1;
804             break;
805           case MEAN:
806             iter->dbl[0] += v->f * weight;
807             iter->dbl[1] += weight;
808             break;
809           case SD:
810             moments1_add (iter->moments, v->f, weight);
811             break;
812           case MAX:
813             iter->dbl[0] = MAX (iter->dbl[0], v->f);
814             iter->int1 = 1;
815             break;
816           case MAX | FSTRING:
817             if (memcmp (iter->string, v->s, src_width) < 0)
818               memcpy (iter->string, v->s, src_width);
819             iter->int1 = 1;
820             break;
821           case MIN:
822             iter->dbl[0] = MIN (iter->dbl[0], v->f);
823             iter->int1 = 1;
824             break;
825           case MIN | FSTRING:
826             if (memcmp (iter->string, v->s, src_width) > 0)
827               memcpy (iter->string, v->s, src_width);
828             iter->int1 = 1;
829             break;
830           case FGT:
831           case PGT:
832             if (v->f > iter->arg[0].f)
833               iter->dbl[0] += weight;
834             iter->dbl[1] += weight;
835             break;
836           case FGT | FSTRING:
837           case PGT | FSTRING:
838             if (memcmp (iter->arg[0].c, v->s, src_width) < 0)
839               iter->dbl[0] += weight;
840             iter->dbl[1] += weight;
841             break;
842           case FLT:
843           case PLT:
844             if (v->f < iter->arg[0].f)
845               iter->dbl[0] += weight;
846             iter->dbl[1] += weight;
847             break;
848           case FLT | FSTRING:
849           case PLT | FSTRING:
850             if (memcmp (iter->arg[0].c, v->s, src_width) > 0)
851               iter->dbl[0] += weight;
852             iter->dbl[1] += weight;
853             break;
854           case FIN:
855           case PIN:
856             if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
857               iter->dbl[0] += weight;
858             iter->dbl[1] += weight;
859             break;
860           case FIN | FSTRING:
861           case PIN | FSTRING:
862             if (memcmp (iter->arg[0].c, v->s, src_width) <= 0
863                 && memcmp (iter->arg[1].c, v->s, src_width) >= 0)
864               iter->dbl[0] += weight;
865             iter->dbl[1] += weight;
866             break;
867           case FOUT:
868           case POUT:
869             if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
870               iter->dbl[0] += weight;
871             iter->dbl[1] += weight;
872             break;
873           case FOUT | FSTRING:
874           case POUT | FSTRING:
875             if (memcmp (iter->arg[0].c, v->s, src_width) > 0
876                 || memcmp (iter->arg[1].c, v->s, src_width) < 0)
877               iter->dbl[0] += weight;
878             iter->dbl[1] += weight;
879             break;
880           case N:
881           case N | FSTRING:
882             iter->dbl[0] += weight;
883             break;
884           case NU:
885           case NU | FSTRING:
886             iter->int1++;
887             break;
888           case FIRST:
889             if (iter->int1 == 0)
890               {
891                 iter->dbl[0] = v->f;
892                 iter->int1 = 1;
893               }
894             break;
895           case FIRST | FSTRING:
896             if (iter->int1 == 0)
897               {
898                 memcpy (iter->string, v->s, src_width);
899                 iter->int1 = 1;
900               }
901             break;
902           case LAST:
903             iter->dbl[0] = v->f;
904             iter->int1 = 1;
905             break;
906           case LAST | FSTRING:
907             memcpy (iter->string, v->s, src_width);
908             iter->int1 = 1;
909             break;
910           case NMISS:
911           case NMISS | FSTRING:
912           case NUMISS:
913           case NUMISS | FSTRING:
914             /* Our value is not missing or it would have been
915                caught earlier.  Nothing to do. */
916             break;
917           default:
918             NOT_REACHED ();
919           }
920     } else {
921       switch (iter->function)
922         {
923         case N_NO_VARS:
924           iter->dbl[0] += weight;
925           break;
926         case NU_NO_VARS:
927           iter->int1++;
928           break;
929         default:
930           NOT_REACHED ();
931         }
932     }
933 }
934
935 /* We've come to a record that differs from the previous in one or
936    more of the break variables.  Make an output record from the
937    accumulated statistics in the OUTPUT case. */
938 static void 
939 dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
940 {
941   {
942     int value_idx = 0;
943     int i;
944
945     for (i = 0; i < agr->break_var_cnt; i++) 
946       {
947         struct variable *v = agr->break_vars[i];
948         size_t value_cnt = var_get_value_cnt (v);
949         memcpy (case_data_rw_idx (output, value_idx),
950                 case_data (&agr->break_case, v),
951                 sizeof (union value) * value_cnt);
952         value_idx += value_cnt; 
953       }
954   }
955   
956   {
957     struct agr_var *i;
958   
959     for (i = agr->agr_vars; i; i = i->next)
960       {
961         union value *v = case_data_rw (output, i->dest);
962
963         if (agr->missing == COLUMNWISE && i->missing != 0
964             && (i->function & FUNC) != N && (i->function & FUNC) != NU
965             && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
966           {
967             if (var_is_alpha (i->dest))
968               memset (v->s, ' ', var_get_width (i->dest));
969             else
970               v->f = SYSMIS;
971             continue;
972           }
973         
974         switch (i->function)
975           {
976           case SUM:
977             v->f = i->int1 ? i->dbl[0] : SYSMIS;
978             break;
979           case MEAN:
980             v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
981             break;
982           case SD:
983             {
984               double variance;
985
986               /* FIXME: we should use two passes. */
987               moments1_calculate (i->moments, NULL, NULL, &variance,
988                                  NULL, NULL);
989               if (variance != SYSMIS)
990                 v->f = sqrt (variance);
991               else
992                 v->f = SYSMIS; 
993             }
994             break;
995           case MAX:
996           case MIN:
997             v->f = i->int1 ? i->dbl[0] : SYSMIS;
998             break;
999           case MAX | FSTRING:
1000           case MIN | FSTRING:
1001             if (i->int1)
1002               memcpy (v->s, i->string, var_get_width (i->dest));
1003             else
1004               memset (v->s, ' ', var_get_width (i->dest));
1005             break;
1006           case FGT:
1007           case FGT | FSTRING:
1008           case FLT:
1009           case FLT | FSTRING:
1010           case FIN:
1011           case FIN | FSTRING:
1012           case FOUT:
1013           case FOUT | FSTRING:
1014             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
1015             break;
1016           case PGT:
1017           case PGT | FSTRING:
1018           case PLT:
1019           case PLT | FSTRING:
1020           case PIN:
1021           case PIN | FSTRING:
1022           case POUT:
1023           case POUT | FSTRING:
1024             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
1025             break;
1026           case N:
1027           case N | FSTRING:
1028             v->f = i->dbl[0];
1029             break;
1030           case NU:
1031           case NU | FSTRING:
1032             v->f = i->int1;
1033             break;
1034           case FIRST:
1035           case LAST:
1036             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1037             break;
1038           case FIRST | FSTRING:
1039           case LAST | FSTRING:
1040             if (i->int1)
1041               memcpy (v->s, i->string, var_get_width (i->dest));
1042             else
1043               memset (v->s, ' ', var_get_width (i->dest));
1044             break;
1045           case N_NO_VARS:
1046             v->f = i->dbl[0];
1047             break;
1048           case NU_NO_VARS:
1049             v->f = i->int1;
1050             break;
1051           case NMISS:
1052           case NMISS | FSTRING:
1053             v->f = i->dbl[0];
1054             break;
1055           case NUMISS:
1056           case NUMISS | FSTRING:
1057             v->f = i->int1;
1058             break;
1059           default:
1060             NOT_REACHED ();
1061           }
1062       }
1063   }
1064 }
1065
1066 /* Resets the state for all the aggregate functions. */
1067 static void
1068 initialize_aggregate_info (struct agr_proc *agr, const struct ccase *input)
1069 {
1070   struct agr_var *iter;
1071
1072   case_destroy (&agr->break_case);
1073   case_clone (&agr->break_case, input);
1074
1075   for (iter = agr->agr_vars; iter; iter = iter->next)
1076     {
1077       iter->missing = 0;
1078       iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1079       iter->int1 = iter->int2 = 0;
1080       switch (iter->function)
1081         {
1082         case MIN:
1083           iter->dbl[0] = DBL_MAX;
1084           break;
1085         case MIN | FSTRING:
1086           memset (iter->string, 255, var_get_width (iter->src));
1087           break;
1088         case MAX:
1089           iter->dbl[0] = -DBL_MAX;
1090           break;
1091         case MAX | FSTRING:
1092           memset (iter->string, 0, var_get_width (iter->src));
1093           break;
1094         case SD:
1095           if (iter->moments == NULL)
1096             iter->moments = moments1_create (MOMENT_VARIANCE);
1097           else
1098             moments1_clear (iter->moments);
1099           break;
1100         default:
1101           break;
1102         }
1103     }
1104 }
1105 \f
1106 /* Aggregate each case as it comes through.  Cases which aren't needed
1107    are dropped.
1108    Returns true if successful, false if an I/O error occurred. */
1109 static bool
1110 agr_to_active_file (const struct ccase *c, void *agr_, const struct dataset *ds UNUSED)
1111 {
1112   struct agr_proc *agr = agr_;
1113
1114   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1115     return agr->sink->class->write (agr->sink, &agr->agr_case);
1116
1117   return true;
1118 }
1119
1120 /* Aggregate the current case and output it if we passed a
1121    breakpoint. */
1122 static bool
1123 presorted_agr_to_sysfile (const struct ccase *c, void *agr_, 
1124                           const struct dataset *ds UNUSED) 
1125 {
1126   struct agr_proc *agr = agr_;
1127
1128   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1129     return any_writer_write (agr->writer, &agr->agr_case);
1130   
1131   return true;
1132 }