5e4be3bcd9643bb4ae3014c46c4caee454b4e254
[pspp-builds.git] / src / language / stats / aggregate.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21
22 #include <stdlib.h>
23
24 #include <data/any-writer.h>
25 #include <data/case-sink.h>
26 #include <data/case.h>
27 #include <data/casefile.h>
28 #include <data/dictionary.h>
29 #include <data/file-handle-def.h>
30 #include <data/format.h>
31 #include <data/procedure.h>
32 #include <data/settings.h>
33 #include <data/storage-stream.h>
34 #include <data/sys-file-writer.h>
35 #include <data/variable.h>
36 #include <language/command.h>
37 #include <language/data-io/file-handle.h>
38 #include <language/lexer/lexer.h>
39 #include <language/lexer/variable-parser.h>
40 #include <language/stats/sort-criteria.h>
41 #include <libpspp/alloc.h>
42 #include <libpspp/assertion.h>
43 #include <libpspp/message.h>
44 #include <libpspp/misc.h>
45 #include <libpspp/pool.h>
46 #include <libpspp/str.h>
47 #include <math/moments.h>
48 #include <math/sort.h>
49
50 #include "minmax.h"
51
52 #include "gettext.h"
53 #define _(msgid) gettext (msgid)
54
55 /* Argument for AGGREGATE function. */
56 union agr_argument
57   {
58     double f;                           /* Numeric. */
59     char *c;                            /* Short or long string. */
60   };
61
62 /* Specifies how to make an aggregate variable. */
63 struct agr_var
64   {
65     struct agr_var *next;               /* Next in list. */
66
67     /* Collected during parsing. */
68     struct variable *src;       /* Source variable. */
69     struct variable *dest;      /* Target variable. */
70     int function;               /* Function. */
71     int include_missing;        /* 1=Include user-missing values. */
72     union agr_argument arg[2];  /* Arguments. */
73
74     /* Accumulated during AGGREGATE execution. */
75     double dbl[3];
76     int int1, int2;
77     char *string;
78     int missing;
79     struct moments1 *moments;
80   };
81
82 /* Aggregation functions. */
83 enum
84   {
85     NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
86     FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
87     N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
88     FUNC = 0x1f, /* Function mask. */
89     FSTRING = 1<<5, /* String function bit. */
90   };
91
92 /* Attributes of an aggregation function. */
93 struct agr_func
94   {
95     const char *name;           /* Aggregation function name. */
96     size_t n_args;              /* Number of arguments. */
97     enum var_type alpha_type;   /* When given ALPHA arguments, output type. */
98     struct fmt_spec format;     /* Format spec if alpha_type != ALPHA. */
99   };
100
101 /* Attributes of aggregation functions. */
102 static const struct agr_func agr_func_tab[] = 
103   {
104     {"<NONE>",  0, -1,          {0, 0, 0}},
105     {"SUM",     0, -1,          {FMT_F, 8, 2}},
106     {"MEAN",    0, -1,          {FMT_F, 8, 2}},
107     {"SD",      0, -1,          {FMT_F, 8, 2}},
108     {"MAX",     0, VAR_STRING,  {-1, -1, -1}}, 
109     {"MIN",     0, VAR_STRING,  {-1, -1, -1}}, 
110     {"PGT",     1, VAR_NUMERIC, {FMT_F, 5, 1}},      
111     {"PLT",     1, VAR_NUMERIC, {FMT_F, 5, 1}},       
112     {"PIN",     2, VAR_NUMERIC, {FMT_F, 5, 1}},       
113     {"POUT",    2, VAR_NUMERIC, {FMT_F, 5, 1}},       
114     {"FGT",     1, VAR_NUMERIC, {FMT_F, 5, 3}},       
115     {"FLT",     1, VAR_NUMERIC, {FMT_F, 5, 3}},       
116     {"FIN",     2, VAR_NUMERIC, {FMT_F, 5, 3}},       
117     {"FOUT",    2, VAR_NUMERIC, {FMT_F, 5, 3}},       
118     {"N",       0, VAR_NUMERIC, {FMT_F, 7, 0}},       
119     {"NU",      0, VAR_NUMERIC, {FMT_F, 7, 0}},       
120     {"NMISS",   0, VAR_NUMERIC, {FMT_F, 7, 0}},       
121     {"NUMISS",  0, VAR_NUMERIC, {FMT_F, 7, 0}},       
122     {"FIRST",   0, VAR_STRING,  {-1, -1, -1}}, 
123     {"LAST",    0, VAR_STRING,  {-1, -1, -1}},
124     {NULL,      0, -1,          {-1, -1, -1}},
125     {"N",       0, VAR_NUMERIC, {FMT_F, 7, 0}},
126     {"NU",      0, VAR_NUMERIC, {FMT_F, 7, 0}},
127   };
128
129 /* Missing value types. */
130 enum missing_treatment
131   {
132     ITEMWISE,           /* Missing values item by item. */
133     COLUMNWISE          /* Missing values column by column. */
134   };
135
136 /* An entire AGGREGATE procedure. */
137 struct agr_proc 
138   {
139     /* We have either an output file or a sink. */
140     struct any_writer *writer;          /* Output file, or null if none. */
141     struct case_sink *sink;             /* Sink, or null if none. */
142
143     /* Break variables. */
144     struct sort_criteria *sort;         /* Sort criteria. */
145     struct variable **break_vars;       /* Break variables. */
146     size_t break_var_cnt;               /* Number of break variables. */
147     struct ccase break_case;            /* Last values of break variables. */
148
149     enum missing_treatment missing;     /* How to treat missing values. */
150     struct agr_var *agr_vars;           /* First aggregate variable. */
151     struct dictionary *dict;            /* Aggregate dictionary. */
152     const struct dictionary *src_dict;  /* Dict of the source */
153     int case_cnt;                       /* Counts aggregated cases. */
154     struct ccase agr_case;              /* Aggregate case for output. */
155   };
156
157 static void initialize_aggregate_info (struct agr_proc *,
158                                        const struct ccase *);
159
160 /* Prototypes. */
161 static bool parse_aggregate_functions (struct lexer *, const struct dictionary *,
162                                        struct agr_proc *);
163 static void agr_destroy (struct agr_proc *);
164 static bool aggregate_single_case (struct agr_proc *agr,
165                                    const struct ccase *input,
166                                    struct ccase *output);
167 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
168
169 /* Aggregating to the active file. */
170 static bool agr_to_active_file (const struct ccase *, void *aux, const struct dataset *);
171
172 /* Aggregating to a system file. */
173 static bool presorted_agr_to_sysfile (const struct ccase *, void *aux, const struct dataset *);
174 \f
175 /* Parsing. */
176
177 /* Parses and executes the AGGREGATE procedure. */
178 int
179 cmd_aggregate (struct lexer *lexer, struct dataset *ds)
180 {
181   struct dictionary *dict = dataset_dict (ds);
182   struct agr_proc agr;
183   struct file_handle *out_file = NULL;
184
185   bool copy_documents = false;
186   bool presorted = false;
187   bool saw_direction;
188
189   memset(&agr, 0 , sizeof (agr));
190   agr.missing = ITEMWISE;
191   case_nullify (&agr.break_case);
192   
193   agr.dict = dict_create ();
194   agr.src_dict = dict;
195   dict_set_label (agr.dict, dict_get_label (dict));
196   dict_set_documents (agr.dict, dict_get_documents (dict));
197
198   /* OUTFILE subcommand must be first. */
199   if (!lex_force_match_id (lexer, "OUTFILE"))
200     goto error;
201   lex_match (lexer, '=');
202   if (!lex_match (lexer, '*'))
203     {
204       out_file = fh_parse (lexer, FH_REF_FILE | FH_REF_SCRATCH);
205       if (out_file == NULL)
206         goto error;
207     }
208   
209   /* Read most of the subcommands. */
210   for (;;)
211     {
212       lex_match (lexer, '/');
213       
214       if (lex_match_id (lexer, "MISSING"))
215         {
216           lex_match (lexer, '=');
217           if (!lex_match_id (lexer, "COLUMNWISE"))
218             {
219               lex_error (lexer, _("while expecting COLUMNWISE"));
220               goto error;
221             }
222           agr.missing = COLUMNWISE;
223         }
224       else if (lex_match_id (lexer, "DOCUMENT"))
225         copy_documents = true;
226       else if (lex_match_id (lexer, "PRESORTED"))
227         presorted = true;
228       else if (lex_match_id (lexer, "BREAK"))
229         {
230           int i;
231
232           lex_match (lexer, '=');
233           agr.sort = sort_parse_criteria (lexer, dict,
234                                           &agr.break_vars, &agr.break_var_cnt,
235                                           &saw_direction, NULL);
236           if (agr.sort == NULL)
237             goto error;
238           
239           for (i = 0; i < agr.break_var_cnt; i++)
240             dict_clone_var_assert (agr.dict, agr.break_vars[i],
241                                    var_get_name (agr.break_vars[i]));
242
243           /* BREAK must follow the options. */
244           break;
245         }
246       else
247         {
248           lex_error (lexer, _("expecting BREAK"));
249           goto error;
250         }
251     }
252   if (presorted && saw_direction)
253     msg (SW, _("When PRESORTED is specified, specifying sorting directions "
254                "with (A) or (D) has no effect.  Output data will be sorted "
255                "the same way as the input data."));
256       
257   /* Read in the aggregate functions. */
258   lex_match (lexer, '/');
259   if (!parse_aggregate_functions (lexer, dict, &agr))
260     goto error;
261
262   /* Delete documents. */
263   if (!copy_documents)
264     dict_set_documents (agr.dict, NULL);
265
266   /* Cancel SPLIT FILE. */
267   dict_set_split_vars (agr.dict, NULL, 0);
268   
269   /* Initialize. */
270   agr.case_cnt = 0;
271   case_create (&agr.agr_case, dict_get_next_value_idx (agr.dict));
272
273   /* Output to active file or external file? */
274   if (out_file == NULL) 
275     {
276       /* The active file will be replaced by the aggregated data,
277          so TEMPORARY is moot. */
278       proc_cancel_temporary_transformations (ds);
279
280       if (agr.sort != NULL && !presorted) 
281         {
282           if (!sort_active_file_in_place (ds, agr.sort))
283             goto error;
284         }
285
286       agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
287       if (agr.sink->class->open != NULL)
288         agr.sink->class->open (agr.sink);
289       proc_set_sink (ds, 
290                      create_case_sink (&null_sink_class, 
291                                        dict, NULL));
292       if (!procedure (ds, agr_to_active_file, &agr))
293         goto error;
294       if (agr.case_cnt > 0) 
295         {
296           dump_aggregate_info (&agr, &agr.agr_case);
297           if (!agr.sink->class->write (agr.sink, &agr.agr_case))
298             goto error;
299         }
300       discard_variables (ds);
301       dict_destroy (dict);
302       dataset_set_dict (ds, agr.dict);
303       agr.dict = NULL;
304       proc_set_source (ds, 
305                        agr.sink->class->make_source (agr.sink));
306       free_case_sink (agr.sink);
307     }
308   else
309     {
310       agr.writer = any_writer_open (out_file, agr.dict);
311       if (agr.writer == NULL)
312         goto error;
313       
314       if (agr.sort != NULL && !presorted) 
315         {
316           /* Sorting is needed. */
317           struct casefile *dst;
318           struct casereader *reader;
319           struct ccase c;
320           bool ok = true;
321           
322           dst = sort_active_file_to_casefile (ds, agr.sort);
323           if (dst == NULL)
324             goto error;
325           reader = casefile_get_destructive_reader (dst);
326           while (ok && casereader_read_xfer (reader, &c)) 
327             {
328               if (aggregate_single_case (&agr, &c, &agr.agr_case)) 
329                 ok = any_writer_write (agr.writer, &agr.agr_case);
330               case_destroy (&c);
331             }
332           casereader_destroy (reader);
333           if (ok)
334             ok = !casefile_error (dst);
335           casefile_destroy (dst);
336           if (!ok)
337             goto error;
338         }
339       else 
340         {
341           /* Active file is already sorted. */
342           if (!procedure (ds, presorted_agr_to_sysfile, &agr))
343             goto error;
344         }
345       
346       if (agr.case_cnt > 0) 
347         {
348           dump_aggregate_info (&agr, &agr.agr_case);
349           any_writer_write (agr.writer, &agr.agr_case);
350         }
351       if (any_writer_error (agr.writer))
352         goto error;
353     }
354   
355   agr_destroy (&agr);
356   return CMD_SUCCESS;
357
358 error:
359   agr_destroy (&agr);
360   return CMD_CASCADING_FAILURE;
361 }
362
363 /* Parse all the aggregate functions. */
364 static bool
365 parse_aggregate_functions (struct lexer *lexer, const struct dictionary *dict, struct agr_proc *agr)
366 {
367   struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
368
369   /* Parse everything. */
370   tail = NULL;
371   for (;;)
372     {
373       char **dest;
374       char **dest_label;
375       size_t n_dest;
376       struct string function_name;
377
378       int include_missing;
379       const struct agr_func *function;
380       int func_index;
381
382       union agr_argument arg[2];
383
384       struct variable **src;
385       size_t n_src;
386
387       size_t i;
388
389       dest = NULL;
390       dest_label = NULL;
391       n_dest = 0;
392       src = NULL;
393       function = NULL;
394       n_src = 0;
395       arg[0].c = NULL;
396       arg[1].c = NULL;
397
398       /* Parse the list of target variables. */
399       while (!lex_match (lexer, '='))
400         {
401           size_t n_dest_prev = n_dest;
402           
403           if (!parse_DATA_LIST_vars (lexer, &dest, &n_dest,
404                                      PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
405             goto error;
406
407           /* Assign empty labels. */
408           {
409             int j;
410
411             dest_label = xnrealloc (dest_label, n_dest, sizeof *dest_label);
412             for (j = n_dest_prev; j < n_dest; j++)
413               dest_label[j] = NULL;
414           }
415
416
417           
418           if (lex_token (lexer) == T_STRING)
419             {
420               struct string label;
421               ds_init_string (&label, lex_tokstr (lexer));
422
423               ds_truncate (&label, 255);
424               dest_label[n_dest - 1] = ds_xstrdup (&label);
425               lex_get (lexer);
426               ds_destroy (&label);
427             }
428         }
429
430       /* Get the name of the aggregation function. */
431       if (lex_token (lexer) != T_ID)
432         {
433           lex_error (lexer, _("expecting aggregation function"));
434           goto error;
435         }
436
437       include_missing = 0;
438
439       ds_init_string (&function_name, lex_tokstr (lexer));
440
441       ds_chomp (&function_name, '.');
442
443       if (lex_tokid(lexer)[strlen (lex_tokid (lexer)) - 1] == '.')
444           include_missing = 1;
445
446       for (function = agr_func_tab; function->name; function++)
447         if (!strcasecmp (function->name, ds_cstr (&function_name)))
448           break;
449       if (NULL == function->name)
450         {
451           msg (SE, _("Unknown aggregation function %s."), 
452                ds_cstr (&function_name));
453           goto error;
454         }
455       ds_destroy (&function_name);
456       func_index = function - agr_func_tab;
457       lex_get (lexer);
458
459       /* Check for leading lparen. */
460       if (!lex_match (lexer, '('))
461         {
462           if (func_index == N)
463             func_index = N_NO_VARS;
464           else if (func_index == NU)
465             func_index = NU_NO_VARS;
466           else
467             {
468               lex_error (lexer, _("expecting `('"));
469               goto error;
470             }
471         }
472       else
473         {
474           /* Parse list of source variables. */
475           {
476             int pv_opts = PV_NO_SCRATCH;
477
478             if (func_index == SUM || func_index == MEAN || func_index == SD)
479               pv_opts |= PV_NUMERIC;
480             else if (function->n_args)
481               pv_opts |= PV_SAME_TYPE;
482
483             if (!parse_variables (lexer, dict, &src, &n_src, pv_opts))
484               goto error;
485           }
486
487           /* Parse function arguments, for those functions that
488              require arguments. */
489           if (function->n_args != 0)
490             for (i = 0; i < function->n_args; i++)
491               {
492                 int type;
493             
494                 lex_match (lexer, ',');
495                 if (lex_token (lexer) == T_STRING)
496                   {
497                     arg[i].c = ds_xstrdup (lex_tokstr (lexer));
498                     type = VAR_STRING;
499                   }
500                 else if (lex_is_number (lexer))
501                   {
502                     arg[i].f = lex_tokval (lexer);
503                     type = VAR_NUMERIC;
504                   }
505                 else
506                   {
507                     msg (SE, _("Missing argument %d to %s."), i + 1,
508                          function->name);
509                     goto error;
510                   }
511             
512                 lex_get (lexer);
513
514                 if (type != var_get_type (src[0]))
515                   {
516                     msg (SE, _("Arguments to %s must be of same type as "
517                                "source variables."),
518                          function->name);
519                     goto error;
520                   }
521               }
522
523           /* Trailing rparen. */
524           if (!lex_match (lexer, ')'))
525             {
526               lex_error (lexer, _("expecting `)'"));
527               goto error;
528             }
529           
530           /* Now check that the number of source variables match
531              the number of target variables.  If we check earlier
532              than this, the user can get very misleading error
533              message, i.e. `AGGREGATE x=SUM(y t).' will get this
534              error message when a proper message would be more
535              like `unknown variable t'. */
536           if (n_src != n_dest)
537             {
538               msg (SE, _("Number of source variables (%u) does not match "
539                          "number of target variables (%u)."),
540                    (unsigned) n_src, (unsigned) n_dest);
541               goto error;
542             }
543
544           if ((func_index == PIN || func_index == POUT
545               || func_index == FIN || func_index == FOUT) 
546               && (var_is_numeric (src[0])
547                   ? arg[0].f > arg[1].f
548                   : str_compare_rpad (arg[0].c, arg[1].c) > 0))
549             {
550               union agr_argument t = arg[0];
551               arg[0] = arg[1];
552               arg[1] = t;
553                   
554               msg (SW, _("The value arguments passed to the %s function "
555                          "are out-of-order.  They will be treated as if "
556                          "they had been specified in the correct order."),
557                    function->name);
558             }
559         }
560         
561       /* Finally add these to the linked list of aggregation
562          variables. */
563       for (i = 0; i < n_dest; i++)
564         {
565           struct agr_var *v = xmalloc (sizeof *v);
566
567           /* Add variable to chain. */
568           if (agr->agr_vars != NULL)
569             tail->next = v;
570           else
571             agr->agr_vars = v;
572           tail = v;
573           tail->next = NULL;
574           v->moments = NULL;
575           
576           /* Create the target variable in the aggregate
577              dictionary. */
578           {
579             struct variable *destvar;
580             
581             v->function = func_index;
582
583             if (src)
584               {
585                 v->src = src[i];
586                 
587                 if (var_is_alpha (src[i]))
588                   {
589                     v->function |= FSTRING;
590                     v->string = xmalloc (var_get_width (src[i]));
591                   }
592
593                 if (function->alpha_type == VAR_STRING)
594                   destvar = dict_clone_var (agr->dict, v->src, dest[i]);
595                 else
596                   {
597                     assert (var_is_numeric (v->src)
598                             || function->alpha_type == VAR_NUMERIC);
599                     destvar = dict_create_var (agr->dict, dest[i], 0);
600                     if (destvar != NULL) 
601                       {
602                         struct fmt_spec f;
603                         if ((func_index == N || func_index == NMISS)
604                             && dict_get_weight (dict) != NULL)
605                           f = fmt_for_output (FMT_F, 8, 2); 
606                         else
607                           f = function->format;
608                         var_set_both_formats (destvar, &f);
609                       }
610                   }
611               } else {
612                 struct fmt_spec f;
613                 v->src = NULL;
614                 destvar = dict_create_var (agr->dict, dest[i], 0);
615                 if (func_index == N_NO_VARS && dict_get_weight (dict) != NULL)
616                   f = fmt_for_output (FMT_F, 8, 2); 
617                 else
618                   f = function->format;
619                 var_set_both_formats (destvar, &f);
620               }
621           
622             if (!destvar)
623               {
624                 msg (SE, _("Variable name %s is not unique within the "
625                            "aggregate file dictionary, which contains "
626                            "the aggregate variables and the break "
627                            "variables."),
628                      dest[i]);
629                 goto error;
630               }
631
632             free (dest[i]);
633             if (dest_label[i])
634               var_set_label (destvar, dest_label[i]);
635
636             v->dest = destvar;
637           }
638           
639           v->include_missing = include_missing;
640
641           if (v->src != NULL)
642             {
643               int j;
644
645               if (var_is_numeric (v->src))
646                 for (j = 0; j < function->n_args; j++)
647                   v->arg[j].f = arg[j].f;
648               else
649                 for (j = 0; j < function->n_args; j++)
650                   v->arg[j].c = xstrdup (arg[j].c);
651             }
652         }
653       
654       if (src != NULL && var_is_alpha (src[0]))
655         for (i = 0; i < function->n_args; i++)
656           {
657             free (arg[i].c);
658             arg[i].c = NULL;
659           }
660
661       free (src);
662       free (dest);
663       free (dest_label);
664
665       if (!lex_match (lexer, '/'))
666         {
667           if (lex_token (lexer) == '.')
668             return true;
669
670           lex_error (lexer, "expecting end of command");
671           return false;
672         }
673       continue;
674       
675     error:
676       ds_destroy (&function_name);
677       for (i = 0; i < n_dest; i++)
678         {
679           free (dest[i]);
680           free (dest_label[i]);
681         }
682       free (dest);
683       free (dest_label);
684       free (arg[0].c);
685       free (arg[1].c);
686       if (src && n_src && var_is_alpha (src[0]))
687         for (i = 0; i < function->n_args; i++)
688           {
689             free (arg[i].c);
690             arg[i].c = NULL;
691           }
692       free (src);
693         
694       return false;
695     }
696 }
697
698 /* Destroys AGR. */
699 static void
700 agr_destroy (struct agr_proc *agr)
701 {
702   struct agr_var *iter, *next;
703
704   any_writer_close (agr->writer);
705   if (agr->sort != NULL)
706     sort_destroy_criteria (agr->sort);
707   free (agr->break_vars);
708   case_destroy (&agr->break_case);
709   for (iter = agr->agr_vars; iter; iter = next)
710     {
711       next = iter->next;
712
713       if (iter->function & FSTRING)
714         {
715           size_t n_args;
716           size_t i;
717
718           n_args = agr_func_tab[iter->function & FUNC].n_args;
719           for (i = 0; i < n_args; i++)
720             free (iter->arg[i].c);
721           free (iter->string);
722         }
723       else if (iter->function == SD)
724         moments1_destroy (iter->moments);
725       free (iter);
726     }
727   if (agr->dict != NULL)
728     dict_destroy (agr->dict);
729
730   case_destroy (&agr->agr_case);
731 }
732 \f
733 /* Execution. */
734
735 static void accumulate_aggregate_info (struct agr_proc *,
736                                        const struct ccase *);
737 static void dump_aggregate_info (struct agr_proc *, struct ccase *);
738
739 /* Processes a single case INPUT for aggregation.  If output is
740    warranted, writes it to OUTPUT and returns true.
741    Otherwise, returns false and OUTPUT is unmodified. */
742 static bool
743 aggregate_single_case (struct agr_proc *agr,
744                        const struct ccase *input, struct ccase *output)
745 {
746   bool finished_group = false;
747   
748   if (agr->case_cnt++ == 0)
749     initialize_aggregate_info (agr, input);
750   else if (case_compare (&agr->break_case, input,
751                          agr->break_vars, agr->break_var_cnt))
752     {
753       dump_aggregate_info (agr, output);
754       finished_group = true;
755
756       initialize_aggregate_info (agr, input);
757     }
758
759   accumulate_aggregate_info (agr, input);
760   return finished_group;
761 }
762
763 /* Accumulates aggregation data from the case INPUT. */
764 static void 
765 accumulate_aggregate_info (struct agr_proc *agr,
766                            const struct ccase *input)
767 {
768   struct agr_var *iter;
769   double weight;
770   bool bad_warn = true;
771
772   weight = dict_get_case_weight (agr->src_dict, input, &bad_warn);
773
774   for (iter = agr->agr_vars; iter; iter = iter->next)
775     if (iter->src)
776       {
777         const union value *v = case_data (input, iter->src);
778         int src_width = var_get_width (iter->src);
779
780         if (iter->include_missing
781             ? var_is_numeric (iter->src) && v->f == SYSMIS
782             : var_is_value_missing (iter->src, v))
783           {
784             switch (iter->function)
785               {
786               case NMISS:
787               case NMISS | FSTRING:
788                 iter->dbl[0] += weight;
789                 break;
790               case NUMISS:
791               case NUMISS | FSTRING:
792                 iter->int1++;
793                 break;
794               }
795             iter->missing = 1;
796             continue;
797           }
798         
799         /* This is horrible.  There are too many possibilities. */
800         switch (iter->function)
801           {
802           case SUM:
803             iter->dbl[0] += v->f * weight;
804             iter->int1 = 1;
805             break;
806           case MEAN:
807             iter->dbl[0] += v->f * weight;
808             iter->dbl[1] += weight;
809             break;
810           case SD:
811             moments1_add (iter->moments, v->f, weight);
812             break;
813           case MAX:
814             iter->dbl[0] = MAX (iter->dbl[0], v->f);
815             iter->int1 = 1;
816             break;
817           case MAX | FSTRING:
818             if (memcmp (iter->string, v->s, src_width) < 0)
819               memcpy (iter->string, v->s, src_width);
820             iter->int1 = 1;
821             break;
822           case MIN:
823             iter->dbl[0] = MIN (iter->dbl[0], v->f);
824             iter->int1 = 1;
825             break;
826           case MIN | FSTRING:
827             if (memcmp (iter->string, v->s, src_width) > 0)
828               memcpy (iter->string, v->s, src_width);
829             iter->int1 = 1;
830             break;
831           case FGT:
832           case PGT:
833             if (v->f > iter->arg[0].f)
834               iter->dbl[0] += weight;
835             iter->dbl[1] += weight;
836             break;
837           case FGT | FSTRING:
838           case PGT | FSTRING:
839             if (memcmp (iter->arg[0].c, v->s, src_width) < 0)
840               iter->dbl[0] += weight;
841             iter->dbl[1] += weight;
842             break;
843           case FLT:
844           case PLT:
845             if (v->f < iter->arg[0].f)
846               iter->dbl[0] += weight;
847             iter->dbl[1] += weight;
848             break;
849           case FLT | FSTRING:
850           case PLT | FSTRING:
851             if (memcmp (iter->arg[0].c, v->s, src_width) > 0)
852               iter->dbl[0] += weight;
853             iter->dbl[1] += weight;
854             break;
855           case FIN:
856           case PIN:
857             if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
858               iter->dbl[0] += weight;
859             iter->dbl[1] += weight;
860             break;
861           case FIN | FSTRING:
862           case PIN | FSTRING:
863             if (memcmp (iter->arg[0].c, v->s, src_width) <= 0
864                 && memcmp (iter->arg[1].c, v->s, src_width) >= 0)
865               iter->dbl[0] += weight;
866             iter->dbl[1] += weight;
867             break;
868           case FOUT:
869           case POUT:
870             if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
871               iter->dbl[0] += weight;
872             iter->dbl[1] += weight;
873             break;
874           case FOUT | FSTRING:
875           case POUT | FSTRING:
876             if (memcmp (iter->arg[0].c, v->s, src_width) > 0
877                 || memcmp (iter->arg[1].c, v->s, src_width) < 0)
878               iter->dbl[0] += weight;
879             iter->dbl[1] += weight;
880             break;
881           case N:
882           case N | FSTRING:
883             iter->dbl[0] += weight;
884             break;
885           case NU:
886           case NU | FSTRING:
887             iter->int1++;
888             break;
889           case FIRST:
890             if (iter->int1 == 0)
891               {
892                 iter->dbl[0] = v->f;
893                 iter->int1 = 1;
894               }
895             break;
896           case FIRST | FSTRING:
897             if (iter->int1 == 0)
898               {
899                 memcpy (iter->string, v->s, src_width);
900                 iter->int1 = 1;
901               }
902             break;
903           case LAST:
904             iter->dbl[0] = v->f;
905             iter->int1 = 1;
906             break;
907           case LAST | FSTRING:
908             memcpy (iter->string, v->s, src_width);
909             iter->int1 = 1;
910             break;
911           case NMISS:
912           case NMISS | FSTRING:
913           case NUMISS:
914           case NUMISS | FSTRING:
915             /* Our value is not missing or it would have been
916                caught earlier.  Nothing to do. */
917             break;
918           default:
919             NOT_REACHED ();
920           }
921     } else {
922       switch (iter->function)
923         {
924         case N_NO_VARS:
925           iter->dbl[0] += weight;
926           break;
927         case NU_NO_VARS:
928           iter->int1++;
929           break;
930         default:
931           NOT_REACHED ();
932         }
933     }
934 }
935
936 /* We've come to a record that differs from the previous in one or
937    more of the break variables.  Make an output record from the
938    accumulated statistics in the OUTPUT case. */
939 static void 
940 dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
941 {
942   {
943     int value_idx = 0;
944     int i;
945
946     for (i = 0; i < agr->break_var_cnt; i++) 
947       {
948         struct variable *v = agr->break_vars[i];
949         size_t value_cnt = var_get_value_cnt (v);
950         memcpy (case_data_rw_idx (output, value_idx),
951                 case_data (&agr->break_case, v),
952                 sizeof (union value) * value_cnt);
953         value_idx += value_cnt; 
954       }
955   }
956   
957   {
958     struct agr_var *i;
959   
960     for (i = agr->agr_vars; i; i = i->next)
961       {
962         union value *v = case_data_rw (output, i->dest);
963
964         if (agr->missing == COLUMNWISE && i->missing != 0
965             && (i->function & FUNC) != N && (i->function & FUNC) != NU
966             && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
967           {
968             if (var_is_alpha (i->dest))
969               memset (v->s, ' ', var_get_width (i->dest));
970             else
971               v->f = SYSMIS;
972             continue;
973           }
974         
975         switch (i->function)
976           {
977           case SUM:
978             v->f = i->int1 ? i->dbl[0] : SYSMIS;
979             break;
980           case MEAN:
981             v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
982             break;
983           case SD:
984             {
985               double variance;
986
987               /* FIXME: we should use two passes. */
988               moments1_calculate (i->moments, NULL, NULL, &variance,
989                                  NULL, NULL);
990               if (variance != SYSMIS)
991                 v->f = sqrt (variance);
992               else
993                 v->f = SYSMIS; 
994             }
995             break;
996           case MAX:
997           case MIN:
998             v->f = i->int1 ? i->dbl[0] : SYSMIS;
999             break;
1000           case MAX | FSTRING:
1001           case MIN | FSTRING:
1002             if (i->int1)
1003               memcpy (v->s, i->string, var_get_width (i->dest));
1004             else
1005               memset (v->s, ' ', var_get_width (i->dest));
1006             break;
1007           case FGT:
1008           case FGT | FSTRING:
1009           case FLT:
1010           case FLT | FSTRING:
1011           case FIN:
1012           case FIN | FSTRING:
1013           case FOUT:
1014           case FOUT | FSTRING:
1015             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
1016             break;
1017           case PGT:
1018           case PGT | FSTRING:
1019           case PLT:
1020           case PLT | FSTRING:
1021           case PIN:
1022           case PIN | FSTRING:
1023           case POUT:
1024           case POUT | FSTRING:
1025             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
1026             break;
1027           case N:
1028           case N | FSTRING:
1029             v->f = i->dbl[0];
1030             break;
1031           case NU:
1032           case NU | FSTRING:
1033             v->f = i->int1;
1034             break;
1035           case FIRST:
1036           case LAST:
1037             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1038             break;
1039           case FIRST | FSTRING:
1040           case LAST | FSTRING:
1041             if (i->int1)
1042               memcpy (v->s, i->string, var_get_width (i->dest));
1043             else
1044               memset (v->s, ' ', var_get_width (i->dest));
1045             break;
1046           case N_NO_VARS:
1047             v->f = i->dbl[0];
1048             break;
1049           case NU_NO_VARS:
1050             v->f = i->int1;
1051             break;
1052           case NMISS:
1053           case NMISS | FSTRING:
1054             v->f = i->dbl[0];
1055             break;
1056           case NUMISS:
1057           case NUMISS | FSTRING:
1058             v->f = i->int1;
1059             break;
1060           default:
1061             NOT_REACHED ();
1062           }
1063       }
1064   }
1065 }
1066
1067 /* Resets the state for all the aggregate functions. */
1068 static void
1069 initialize_aggregate_info (struct agr_proc *agr, const struct ccase *input)
1070 {
1071   struct agr_var *iter;
1072
1073   case_destroy (&agr->break_case);
1074   case_clone (&agr->break_case, input);
1075
1076   for (iter = agr->agr_vars; iter; iter = iter->next)
1077     {
1078       iter->missing = 0;
1079       iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1080       iter->int1 = iter->int2 = 0;
1081       switch (iter->function)
1082         {
1083         case MIN:
1084           iter->dbl[0] = DBL_MAX;
1085           break;
1086         case MIN | FSTRING:
1087           memset (iter->string, 255, var_get_width (iter->src));
1088           break;
1089         case MAX:
1090           iter->dbl[0] = -DBL_MAX;
1091           break;
1092         case MAX | FSTRING:
1093           memset (iter->string, 0, var_get_width (iter->src));
1094           break;
1095         case SD:
1096           if (iter->moments == NULL)
1097             iter->moments = moments1_create (MOMENT_VARIANCE);
1098           else
1099             moments1_clear (iter->moments);
1100           break;
1101         default:
1102           break;
1103         }
1104     }
1105 }
1106 \f
1107 /* Aggregate each case as it comes through.  Cases which aren't needed
1108    are dropped.
1109    Returns true if successful, false if an I/O error occurred. */
1110 static bool
1111 agr_to_active_file (const struct ccase *c, void *agr_, const struct dataset *ds UNUSED)
1112 {
1113   struct agr_proc *agr = agr_;
1114
1115   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1116     return agr->sink->class->write (agr->sink, &agr->agr_case);
1117
1118   return true;
1119 }
1120
1121 /* Aggregate the current case and output it if we passed a
1122    breakpoint. */
1123 static bool
1124 presorted_agr_to_sysfile (const struct ccase *c, void *agr_, 
1125                           const struct dataset *ds UNUSED) 
1126 {
1127   struct agr_proc *agr = agr_;
1128
1129   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1130     return any_writer_write (agr->writer, &agr->agr_case);
1131   
1132   return true;
1133 }