Fix memory leak in AGGREGATE.
[pspp-builds.git] / src / language / stats / aggregate.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21
22 #include <stdlib.h>
23
24 #include <data/any-writer.h>
25 #include <data/case-sink.h>
26 #include <data/case.h>
27 #include <data/casefile.h>
28 #include <data/dictionary.h>
29 #include <data/file-handle-def.h>
30 #include <data/procedure.h>
31 #include <data/settings.h>
32 #include <data/storage-stream.h>
33 #include <data/sys-file-writer.h>
34 #include <data/variable.h>
35 #include <language/command.h>
36 #include <language/data-io/file-handle.h>
37 #include <language/lexer/lexer.h>
38 #include <language/stats/sort-criteria.h>
39 #include <libpspp/alloc.h>
40 #include <libpspp/message.h>
41 #include <libpspp/message.h>
42 #include <libpspp/misc.h>
43 #include <libpspp/pool.h>
44 #include <libpspp/str.h>
45 #include <math/moments.h>
46 #include <math/sort.h>
47
48 #include "gettext.h"
49 #define _(msgid) gettext (msgid)
50
51 /* Argument for AGGREGATE function. */
52 union agr_argument
53   {
54     double f;                           /* Numeric. */
55     char *c;                            /* Short or long string. */
56   };
57
58 /* Specifies how to make an aggregate variable. */
59 struct agr_var
60   {
61     struct agr_var *next;               /* Next in list. */
62
63     /* Collected during parsing. */
64     struct variable *src;       /* Source variable. */
65     struct variable *dest;      /* Target variable. */
66     int function;               /* Function. */
67     int include_missing;        /* 1=Include user-missing values. */
68     union agr_argument arg[2];  /* Arguments. */
69
70     /* Accumulated during AGGREGATE execution. */
71     double dbl[3];
72     int int1, int2;
73     char *string;
74     int missing;
75     struct moments1 *moments;
76   };
77
78 /* Aggregation functions. */
79 enum
80   {
81     NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
82     FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
83     N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
84     FUNC = 0x1f, /* Function mask. */
85     FSTRING = 1<<5, /* String function bit. */
86   };
87
88 /* Attributes of an aggregation function. */
89 struct agr_func
90   {
91     const char *name;           /* Aggregation function name. */
92     size_t n_args;              /* Number of arguments. */
93     int alpha_type;             /* When given ALPHA arguments, output type. */
94     struct fmt_spec format;     /* Format spec if alpha_type != ALPHA. */
95   };
96
97 /* Attributes of aggregation functions. */
98 static const struct agr_func agr_func_tab[] = 
99   {
100     {"<NONE>",  0, -1,      {0, 0, 0}},
101     {"SUM",     0, -1,      {FMT_F, 8, 2}},
102     {"MEAN",    0, -1,      {FMT_F, 8, 2}},
103     {"SD",      0, -1,      {FMT_F, 8, 2}},
104     {"MAX",     0, ALPHA,   {-1, -1, -1}}, 
105     {"MIN",     0, ALPHA,   {-1, -1, -1}}, 
106     {"PGT",     1, NUMERIC, {FMT_F, 5, 1}},      
107     {"PLT",     1, NUMERIC, {FMT_F, 5, 1}},       
108     {"PIN",     2, NUMERIC, {FMT_F, 5, 1}},       
109     {"POUT",    2, NUMERIC, {FMT_F, 5, 1}},       
110     {"FGT",     1, NUMERIC, {FMT_F, 5, 3}},       
111     {"FLT",     1, NUMERIC, {FMT_F, 5, 3}},       
112     {"FIN",     2, NUMERIC, {FMT_F, 5, 3}},       
113     {"FOUT",    2, NUMERIC, {FMT_F, 5, 3}},       
114     {"N",       0, NUMERIC, {FMT_F, 7, 0}},       
115     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},       
116     {"NMISS",   0, NUMERIC, {FMT_F, 7, 0}},       
117     {"NUMISS",  0, NUMERIC, {FMT_F, 7, 0}},       
118     {"FIRST",   0, ALPHA,   {-1, -1, -1}}, 
119     {"LAST",    0, ALPHA,   {-1, -1, -1}},
120     {NULL,      0, -1,      {-1, -1, -1}},
121     {"N",       0, NUMERIC, {FMT_F, 7, 0}},
122     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},
123   };
124
125 /* Missing value types. */
126 enum missing_treatment
127   {
128     ITEMWISE,           /* Missing values item by item. */
129     COLUMNWISE          /* Missing values column by column. */
130   };
131
132 /* An entire AGGREGATE procedure. */
133 struct agr_proc 
134   {
135     /* We have either an output file or a sink. */
136     struct any_writer *writer;          /* Output file, or null if none. */
137     struct case_sink *sink;             /* Sink, or null if none. */
138
139     /* Break variables. */
140     struct sort_criteria *sort;         /* Sort criteria. */
141     struct variable **break_vars;       /* Break variables. */
142     size_t break_var_cnt;               /* Number of break variables. */
143     struct ccase break_case;            /* Last values of break variables. */
144
145     enum missing_treatment missing;     /* How to treat missing values. */
146     struct agr_var *agr_vars;           /* First aggregate variable. */
147     struct dictionary *dict;            /* Aggregate dictionary. */
148     int case_cnt;                       /* Counts aggregated cases. */
149     struct ccase agr_case;              /* Aggregate case for output. */
150   };
151
152 static void initialize_aggregate_info (struct agr_proc *,
153                                        const struct ccase *);
154
155 /* Prototypes. */
156 static int parse_aggregate_functions (struct agr_proc *);
157 static void agr_destroy (struct agr_proc *);
158 static int aggregate_single_case (struct agr_proc *agr,
159                                   const struct ccase *input,
160                                   struct ccase *output);
161 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
162
163 /* Aggregating to the active file. */
164 static bool agr_to_active_file (const struct ccase *, void *aux);
165
166 /* Aggregating to a system file. */
167 static bool presorted_agr_to_sysfile (const struct ccase *, void *aux);
168 \f
169 /* Parsing. */
170
171 /* Parses and executes the AGGREGATE procedure. */
172 int
173 cmd_aggregate (void)
174 {
175   struct agr_proc agr;
176   struct file_handle *out_file = NULL;
177
178   bool copy_documents = false;
179   bool presorted = false;
180   bool saw_direction;
181
182   memset(&agr, 0 , sizeof (agr));
183   agr.missing = ITEMWISE;
184   case_nullify (&agr.break_case);
185   
186   agr.dict = dict_create ();
187   dict_set_label (agr.dict, dict_get_label (default_dict));
188   dict_set_documents (agr.dict, dict_get_documents (default_dict));
189
190   /* OUTFILE subcommand must be first. */
191   if (!lex_force_match_id ("OUTFILE"))
192     goto error;
193   lex_match ('=');
194   if (!lex_match ('*'))
195     {
196       out_file = fh_parse (FH_REF_FILE | FH_REF_SCRATCH);
197       if (out_file == NULL)
198         goto error;
199     }
200   
201   /* Read most of the subcommands. */
202   for (;;)
203     {
204       lex_match ('/');
205       
206       if (lex_match_id ("MISSING"))
207         {
208           lex_match ('=');
209           if (!lex_match_id ("COLUMNWISE"))
210             {
211               lex_error (_("while expecting COLUMNWISE"));
212               goto error;
213             }
214           agr.missing = COLUMNWISE;
215         }
216       else if (lex_match_id ("DOCUMENT"))
217         copy_documents = true;
218       else if (lex_match_id ("PRESORTED"))
219         presorted = true;
220       else if (lex_match_id ("BREAK"))
221         {
222           int i;
223
224           lex_match ('=');
225           agr.sort = sort_parse_criteria (default_dict,
226                                           &agr.break_vars, &agr.break_var_cnt,
227                                           &saw_direction, NULL);
228           if (agr.sort == NULL)
229             goto error;
230           
231           for (i = 0; i < agr.break_var_cnt; i++)
232             dict_clone_var_assert (agr.dict, agr.break_vars[i],
233                                    agr.break_vars[i]->name);
234
235           /* BREAK must follow the options. */
236           break;
237         }
238       else
239         {
240           lex_error (_("expecting BREAK"));
241           goto error;
242         }
243     }
244   if (presorted && saw_direction)
245     msg (SW, _("When PRESORTED is specified, specifying sorting directions "
246                "with (A) or (D) has no effect.  Output data will be sorted "
247                "the same way as the input data."));
248       
249   /* Read in the aggregate functions. */
250   lex_match ('/');
251   if (!parse_aggregate_functions (&agr))
252     goto error;
253
254   /* Delete documents. */
255   if (!copy_documents)
256     dict_set_documents (agr.dict, NULL);
257
258   /* Cancel SPLIT FILE. */
259   dict_set_split_vars (agr.dict, NULL, 0);
260   
261   /* Initialize. */
262   agr.case_cnt = 0;
263   case_create (&agr.agr_case, dict_get_next_value_idx (agr.dict));
264
265   /* Output to active file or external file? */
266   if (out_file == NULL) 
267     {
268       /* The active file will be replaced by the aggregated data,
269          so TEMPORARY is moot. */
270       proc_cancel_temporary_transformations ();
271
272       if (agr.sort != NULL && !presorted) 
273         {
274           if (!sort_active_file_in_place (agr.sort))
275             goto error;
276         }
277
278       agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
279       if (agr.sink->class->open != NULL)
280         agr.sink->class->open (agr.sink);
281       proc_set_sink (create_case_sink (&null_sink_class, default_dict, NULL));
282       if (!procedure (agr_to_active_file, &agr))
283         goto error;
284       if (agr.case_cnt > 0) 
285         {
286           dump_aggregate_info (&agr, &agr.agr_case);
287           if (!agr.sink->class->write (agr.sink, &agr.agr_case))
288             goto error;
289         }
290       discard_variables ();
291       dict_destroy (default_dict);
292       default_dict = agr.dict;
293       agr.dict = NULL;
294       proc_set_source (agr.sink->class->make_source (agr.sink));
295       free_case_sink (agr.sink);
296     }
297   else
298     {
299       agr.writer = any_writer_open (out_file, agr.dict);
300       if (agr.writer == NULL)
301         goto error;
302       
303       if (agr.sort != NULL && !presorted) 
304         {
305           /* Sorting is needed. */
306           struct casefile *dst;
307           struct casereader *reader;
308           struct ccase c;
309           bool ok = true;
310           
311           dst = sort_active_file_to_casefile (agr.sort);
312           if (dst == NULL)
313             goto error;
314           reader = casefile_get_destructive_reader (dst);
315           while (ok && casereader_read_xfer (reader, &c)) 
316             {
317               if (aggregate_single_case (&agr, &c, &agr.agr_case)) 
318                 ok = any_writer_write (agr.writer, &agr.agr_case);
319               case_destroy (&c);
320             }
321           casereader_destroy (reader);
322           if (ok)
323             ok = !casefile_error (dst);
324           casefile_destroy (dst);
325           if (!ok)
326             goto error;
327         }
328       else 
329         {
330           /* Active file is already sorted. */
331           if (!procedure (presorted_agr_to_sysfile, &agr))
332             goto error;
333         }
334       
335       if (agr.case_cnt > 0) 
336         {
337           dump_aggregate_info (&agr, &agr.agr_case);
338           any_writer_write (agr.writer, &agr.agr_case);
339         }
340       if (any_writer_error (agr.writer))
341         goto error;
342     }
343   
344   agr_destroy (&agr);
345   return CMD_SUCCESS;
346
347 error:
348   agr_destroy (&agr);
349   return CMD_CASCADING_FAILURE;
350 }
351
352 /* Parse all the aggregate functions. */
353 static int
354 parse_aggregate_functions (struct agr_proc *agr)
355 {
356   struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
357
358   /* Parse everything. */
359   tail = NULL;
360   for (;;)
361     {
362       char **dest;
363       char **dest_label;
364       size_t n_dest;
365
366       int include_missing;
367       const struct agr_func *function;
368       int func_index;
369
370       union agr_argument arg[2];
371
372       struct variable **src;
373       size_t n_src;
374
375       size_t i;
376
377       dest = NULL;
378       dest_label = NULL;
379       n_dest = 0;
380       src = NULL;
381       function = NULL;
382       n_src = 0;
383       arg[0].c = NULL;
384       arg[1].c = NULL;
385
386       /* Parse the list of target variables. */
387       while (!lex_match ('='))
388         {
389           size_t n_dest_prev = n_dest;
390           
391           if (!parse_DATA_LIST_vars (&dest, &n_dest,
392                                      PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
393             goto error;
394
395           /* Assign empty labels. */
396           {
397             int j;
398
399             dest_label = xnrealloc (dest_label, n_dest, sizeof *dest_label);
400             for (j = n_dest_prev; j < n_dest; j++)
401               dest_label[j] = NULL;
402           }
403           
404           if (token == T_STRING)
405             {
406               ds_truncate (&tokstr, 255);
407               dest_label[n_dest - 1] = xstrdup (ds_c_str (&tokstr));
408               lex_get ();
409             }
410         }
411
412       /* Get the name of the aggregation function. */
413       if (token != T_ID)
414         {
415           lex_error (_("expecting aggregation function"));
416           goto error;
417         }
418
419       include_missing = 0;
420       if (tokid[strlen (tokid) - 1] == '.')
421         {
422           include_missing = 1;
423           tokid[strlen (tokid) - 1] = 0;
424         }
425       
426       for (function = agr_func_tab; function->name; function++)
427         if (!strcasecmp (function->name, tokid))
428           break;
429       if (NULL == function->name)
430         {
431           msg (SE, _("Unknown aggregation function %s."), tokid);
432           goto error;
433         }
434       func_index = function - agr_func_tab;
435       lex_get ();
436
437       /* Check for leading lparen. */
438       if (!lex_match ('('))
439         {
440           if (func_index == N)
441             func_index = N_NO_VARS;
442           else if (func_index == NU)
443             func_index = NU_NO_VARS;
444           else
445             {
446               lex_error (_("expecting `('"));
447               goto error;
448             }
449         }
450       else
451         {
452           /* Parse list of source variables. */
453           {
454             int pv_opts = PV_NO_SCRATCH;
455
456             if (func_index == SUM || func_index == MEAN || func_index == SD)
457               pv_opts |= PV_NUMERIC;
458             else if (function->n_args)
459               pv_opts |= PV_SAME_TYPE;
460
461             if (!parse_variables (default_dict, &src, &n_src, pv_opts))
462               goto error;
463           }
464
465           /* Parse function arguments, for those functions that
466              require arguments. */
467           if (function->n_args != 0)
468             for (i = 0; i < function->n_args; i++)
469               {
470                 int type;
471             
472                 lex_match (',');
473                 if (token == T_STRING)
474                   {
475                     arg[i].c = xstrdup (ds_c_str (&tokstr));
476                     type = ALPHA;
477                   }
478                 else if (lex_is_number ())
479                   {
480                     arg[i].f = tokval;
481                     type = NUMERIC;
482                   } else {
483                     msg (SE, _("Missing argument %d to %s."), i + 1,
484                          function->name);
485                     goto error;
486                   }
487             
488                 lex_get ();
489
490                 if (type != src[0]->type)
491                   {
492                     msg (SE, _("Arguments to %s must be of same type as "
493                                "source variables."),
494                          function->name);
495                     goto error;
496                   }
497               }
498
499           /* Trailing rparen. */
500           if (!lex_match(')'))
501             {
502               lex_error (_("expecting `)'"));
503               goto error;
504             }
505           
506           /* Now check that the number of source variables match
507              the number of target variables.  If we check earlier
508              than this, the user can get very misleading error
509              message, i.e. `AGGREGATE x=SUM(y t).' will get this
510              error message when a proper message would be more
511              like `unknown variable t'. */
512           if (n_src != n_dest)
513             {
514               msg (SE, _("Number of source variables (%u) does not match "
515                          "number of target variables (%u)."),
516                    (unsigned) n_src, (unsigned) n_dest);
517               goto error;
518             }
519
520           if ((func_index == PIN || func_index == POUT
521               || func_index == FIN || func_index == FOUT) 
522               && ((src[0]->type == NUMERIC && arg[0].f > arg[1].f)
523                   || (src[0]->type == ALPHA
524                       && str_compare_rpad (arg[0].c, arg[1].c) > 0)))
525             {
526               union agr_argument t = arg[0];
527               arg[0] = arg[1];
528               arg[1] = t;
529                   
530               msg (SW, _("The value arguments passed to the %s function "
531                          "are out-of-order.  They will be treated as if "
532                          "they had been specified in the correct order."),
533                    function->name);
534             }
535         }
536         
537       /* Finally add these to the linked list of aggregation
538          variables. */
539       for (i = 0; i < n_dest; i++)
540         {
541           struct agr_var *v = xmalloc (sizeof *v);
542
543           /* Add variable to chain. */
544           if (agr->agr_vars != NULL)
545             tail->next = v;
546           else
547             agr->agr_vars = v;
548           tail = v;
549           tail->next = NULL;
550           v->moments = NULL;
551           
552           /* Create the target variable in the aggregate
553              dictionary. */
554           {
555             struct variable *destvar;
556             
557             v->function = func_index;
558
559             if (src)
560               {
561                 v->src = src[i];
562                 
563                 if (src[i]->type == ALPHA)
564                   {
565                     v->function |= FSTRING;
566                     v->string = xmalloc (src[i]->width);
567                   }
568
569                 if (function->alpha_type == ALPHA)
570                   destvar = dict_clone_var (agr->dict, v->src, dest[i]);
571                 else
572                   {
573                     assert (v->src->type == NUMERIC
574                             || function->alpha_type == NUMERIC);
575                     destvar = dict_create_var (agr->dict, dest[i], 0);
576                     if (destvar != NULL) 
577                       {
578                         if ((func_index == N || func_index == NMISS)
579                             && dict_get_weight (default_dict) != NULL)
580                           destvar->print = destvar->write = f8_2; 
581                         else
582                           destvar->print = destvar->write = function->format;
583                       }
584                   }
585               } else {
586                 v->src = NULL;
587                 destvar = dict_create_var (agr->dict, dest[i], 0);
588                 if (func_index == N_NO_VARS
589                     && dict_get_weight (default_dict) != NULL)
590                   destvar->print = destvar->write = f8_2; 
591                 else
592                   destvar->print = destvar->write = function->format;
593               }
594           
595             if (!destvar)
596               {
597                 msg (SE, _("Variable name %s is not unique within the "
598                            "aggregate file dictionary, which contains "
599                            "the aggregate variables and the break "
600                            "variables."),
601                      dest[i]);
602                 goto error;
603               }
604
605             free (dest[i]);
606             if (dest_label[i])
607               {
608                 destvar->label = dest_label[i];
609                 dest_label[i] = NULL;
610               }
611
612             v->dest = destvar;
613           }
614           
615           v->include_missing = include_missing;
616
617           if (v->src != NULL)
618             {
619               int j;
620
621               if (v->src->type == NUMERIC)
622                 for (j = 0; j < function->n_args; j++)
623                   v->arg[j].f = arg[j].f;
624               else
625                 for (j = 0; j < function->n_args; j++)
626                   v->arg[j].c = xstrdup (arg[j].c);
627             }
628         }
629       
630       if (src != NULL && src[0]->type == ALPHA)
631         for (i = 0; i < function->n_args; i++)
632           {
633             free (arg[i].c);
634             arg[i].c = NULL;
635           }
636
637       free (src);
638       free (dest);
639       free (dest_label);
640
641       if (!lex_match ('/'))
642         {
643           if (token == '.')
644             return 1;
645
646           lex_error ("expecting end of command");
647           return 0;
648         }
649       continue;
650       
651     error:
652       for (i = 0; i < n_dest; i++)
653         {
654           free (dest[i]);
655           free (dest_label[i]);
656         }
657       free (dest);
658       free (dest_label);
659       free (arg[0].c);
660       free (arg[1].c);
661       if (src && n_src && src[0]->type == ALPHA)
662         for (i = 0; i < function->n_args; i++)
663           {
664             free (arg[i].c);
665             arg[i].c = NULL;
666           }
667       free (src);
668         
669       return 0;
670     }
671 }
672
673 /* Destroys AGR. */
674 static void
675 agr_destroy (struct agr_proc *agr)
676 {
677   struct agr_var *iter, *next;
678
679   any_writer_close (agr->writer);
680   if (agr->sort != NULL)
681     sort_destroy_criteria (agr->sort);
682   free (agr->break_vars);
683   case_destroy (&agr->break_case);
684   for (iter = agr->agr_vars; iter; iter = next)
685     {
686       next = iter->next;
687
688       if (iter->function & FSTRING)
689         {
690           size_t n_args;
691           size_t i;
692
693           n_args = agr_func_tab[iter->function & FUNC].n_args;
694           for (i = 0; i < n_args; i++)
695             free (iter->arg[i].c);
696           free (iter->string);
697         }
698       else if (iter->function == SD)
699         moments1_destroy (iter->moments);
700       free (iter);
701     }
702   if (agr->dict != NULL)
703     dict_destroy (agr->dict);
704
705   case_destroy (&agr->agr_case);
706 }
707 \f
708 /* Execution. */
709
710 static void accumulate_aggregate_info (struct agr_proc *,
711                                        const struct ccase *);
712 static void dump_aggregate_info (struct agr_proc *, struct ccase *);
713
714 /* Processes a single case INPUT for aggregation.  If output is
715    warranted, writes it to OUTPUT and returns nonzero.
716    Otherwise, returns zero and OUTPUT is unmodified. */
717 static int
718 aggregate_single_case (struct agr_proc *agr,
719                        const struct ccase *input, struct ccase *output)
720 {
721   bool finished_group = false;
722   
723   if (agr->case_cnt++ == 0)
724     initialize_aggregate_info (agr, input);
725   else if (case_compare (&agr->break_case, input,
726                          agr->break_vars, agr->break_var_cnt))
727     {
728       dump_aggregate_info (agr, output);
729       finished_group = true;
730
731       initialize_aggregate_info (agr, input);
732     }
733
734   accumulate_aggregate_info (agr, input);
735   return finished_group;
736 }
737
738 /* Accumulates aggregation data from the case INPUT. */
739 static void 
740 accumulate_aggregate_info (struct agr_proc *agr,
741                            const struct ccase *input)
742 {
743   struct agr_var *iter;
744   double weight;
745   int bad_warn = 1;
746
747   weight = dict_get_case_weight (default_dict, input, &bad_warn);
748
749   for (iter = agr->agr_vars; iter; iter = iter->next)
750     if (iter->src)
751       {
752         const union value *v = case_data (input, iter->src->fv);
753
754         if ((!iter->include_missing
755              && mv_is_value_missing (&iter->src->miss, v))
756             || (iter->include_missing && iter->src->type == NUMERIC
757                 && v->f == SYSMIS))
758           {
759             switch (iter->function)
760               {
761               case NMISS:
762               case NMISS | FSTRING:
763                 iter->dbl[0] += weight;
764                 break;
765               case NUMISS:
766               case NUMISS | FSTRING:
767                 iter->int1++;
768                 break;
769               }
770             iter->missing = 1;
771             continue;
772           }
773         
774         /* This is horrible.  There are too many possibilities. */
775         switch (iter->function)
776           {
777           case SUM:
778             iter->dbl[0] += v->f * weight;
779             iter->int1 = 1;
780             break;
781           case MEAN:
782             iter->dbl[0] += v->f * weight;
783             iter->dbl[1] += weight;
784             break;
785           case SD:
786             moments1_add (iter->moments, v->f, weight);
787             break;
788           case MAX:
789             iter->dbl[0] = max (iter->dbl[0], v->f);
790             iter->int1 = 1;
791             break;
792           case MAX | FSTRING:
793             if (memcmp (iter->string, v->s, iter->src->width) < 0)
794               memcpy (iter->string, v->s, iter->src->width);
795             iter->int1 = 1;
796             break;
797           case MIN:
798             iter->dbl[0] = min (iter->dbl[0], v->f);
799             iter->int1 = 1;
800             break;
801           case MIN | FSTRING:
802             if (memcmp (iter->string, v->s, iter->src->width) > 0)
803               memcpy (iter->string, v->s, iter->src->width);
804             iter->int1 = 1;
805             break;
806           case FGT:
807           case PGT:
808             if (v->f > iter->arg[0].f)
809               iter->dbl[0] += weight;
810             iter->dbl[1] += weight;
811             break;
812           case FGT | FSTRING:
813           case PGT | FSTRING:
814             if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
815               iter->dbl[0] += weight;
816             iter->dbl[1] += weight;
817             break;
818           case FLT:
819           case PLT:
820             if (v->f < iter->arg[0].f)
821               iter->dbl[0] += weight;
822             iter->dbl[1] += weight;
823             break;
824           case FLT | FSTRING:
825           case PLT | FSTRING:
826             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
827               iter->dbl[0] += weight;
828             iter->dbl[1] += weight;
829             break;
830           case FIN:
831           case PIN:
832             if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
833               iter->dbl[0] += weight;
834             iter->dbl[1] += weight;
835             break;
836           case FIN | FSTRING:
837           case PIN | FSTRING:
838             if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
839                 && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
840               iter->dbl[0] += weight;
841             iter->dbl[1] += weight;
842             break;
843           case FOUT:
844           case POUT:
845             if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
846               iter->dbl[0] += weight;
847             iter->dbl[1] += weight;
848             break;
849           case FOUT | FSTRING:
850           case POUT | FSTRING:
851             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
852                 || memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
853               iter->dbl[0] += weight;
854             iter->dbl[1] += weight;
855             break;
856           case N:
857           case N | FSTRING:
858             iter->dbl[0] += weight;
859             break;
860           case NU:
861           case NU | FSTRING:
862             iter->int1++;
863             break;
864           case FIRST:
865             if (iter->int1 == 0)
866               {
867                 iter->dbl[0] = v->f;
868                 iter->int1 = 1;
869               }
870             break;
871           case FIRST | FSTRING:
872             if (iter->int1 == 0)
873               {
874                 memcpy (iter->string, v->s, iter->src->width);
875                 iter->int1 = 1;
876               }
877             break;
878           case LAST:
879             iter->dbl[0] = v->f;
880             iter->int1 = 1;
881             break;
882           case LAST | FSTRING:
883             memcpy (iter->string, v->s, iter->src->width);
884             iter->int1 = 1;
885             break;
886           case NMISS:
887           case NMISS | FSTRING:
888           case NUMISS:
889           case NUMISS | FSTRING:
890             /* Our value is not missing or it would have been
891                caught earlier.  Nothing to do. */
892             break;
893           default:
894             assert (0);
895           }
896     } else {
897       switch (iter->function)
898         {
899         case N_NO_VARS:
900           iter->dbl[0] += weight;
901           break;
902         case NU_NO_VARS:
903           iter->int1++;
904           break;
905         default:
906           assert (0);
907         }
908     }
909 }
910
911 /* We've come to a record that differs from the previous in one or
912    more of the break variables.  Make an output record from the
913    accumulated statistics in the OUTPUT case. */
914 static void 
915 dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
916 {
917   {
918     int value_idx = 0;
919     int i;
920
921     for (i = 0; i < agr->break_var_cnt; i++) 
922       {
923         struct variable *v = agr->break_vars[i];
924         memcpy (case_data_rw (output, value_idx),
925                 case_data (&agr->break_case, v->fv),
926                 sizeof (union value) * v->nv);
927         value_idx += v->nv; 
928       }
929   }
930   
931   {
932     struct agr_var *i;
933   
934     for (i = agr->agr_vars; i; i = i->next)
935       {
936         union value *v = case_data_rw (output, i->dest->fv);
937
938         if (agr->missing == COLUMNWISE && i->missing != 0
939             && (i->function & FUNC) != N && (i->function & FUNC) != NU
940             && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
941           {
942             if (i->dest->type == ALPHA)
943               memset (v->s, ' ', i->dest->width);
944             else
945               v->f = SYSMIS;
946             continue;
947           }
948         
949         switch (i->function)
950           {
951           case SUM:
952             v->f = i->int1 ? i->dbl[0] : SYSMIS;
953             break;
954           case MEAN:
955             v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
956             break;
957           case SD:
958             {
959               double variance;
960
961               /* FIXME: we should use two passes. */
962               moments1_calculate (i->moments, NULL, NULL, &variance,
963                                  NULL, NULL);
964               if (variance != SYSMIS)
965                 v->f = sqrt (variance);
966               else
967                 v->f = SYSMIS; 
968             }
969             break;
970           case MAX:
971           case MIN:
972             v->f = i->int1 ? i->dbl[0] : SYSMIS;
973             break;
974           case MAX | FSTRING:
975           case MIN | FSTRING:
976             if (i->int1)
977               memcpy (v->s, i->string, i->dest->width);
978             else
979               memset (v->s, ' ', i->dest->width);
980             break;
981           case FGT:
982           case FGT | FSTRING:
983           case FLT:
984           case FLT | FSTRING:
985           case FIN:
986           case FIN | FSTRING:
987           case FOUT:
988           case FOUT | FSTRING:
989             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
990             break;
991           case PGT:
992           case PGT | FSTRING:
993           case PLT:
994           case PLT | FSTRING:
995           case PIN:
996           case PIN | FSTRING:
997           case POUT:
998           case POUT | FSTRING:
999             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
1000             break;
1001           case N:
1002           case N | FSTRING:
1003             v->f = i->dbl[0];
1004             break;
1005           case NU:
1006           case NU | FSTRING:
1007             v->f = i->int1;
1008             break;
1009           case FIRST:
1010           case LAST:
1011             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1012             break;
1013           case FIRST | FSTRING:
1014           case LAST | FSTRING:
1015             if (i->int1)
1016               memcpy (v->s, i->string, i->dest->width);
1017             else
1018               memset (v->s, ' ', i->dest->width);
1019             break;
1020           case N_NO_VARS:
1021             v->f = i->dbl[0];
1022             break;
1023           case NU_NO_VARS:
1024             v->f = i->int1;
1025             break;
1026           case NMISS:
1027           case NMISS | FSTRING:
1028             v->f = i->dbl[0];
1029             break;
1030           case NUMISS:
1031           case NUMISS | FSTRING:
1032             v->f = i->int1;
1033             break;
1034           default:
1035             assert (0);
1036           }
1037       }
1038   }
1039 }
1040
1041 /* Resets the state for all the aggregate functions. */
1042 static void
1043 initialize_aggregate_info (struct agr_proc *agr, const struct ccase *input)
1044 {
1045   struct agr_var *iter;
1046
1047   case_destroy (&agr->break_case);
1048   case_clone (&agr->break_case, input);
1049
1050   for (iter = agr->agr_vars; iter; iter = iter->next)
1051     {
1052       iter->missing = 0;
1053       iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1054       iter->int1 = iter->int2 = 0;
1055       switch (iter->function)
1056         {
1057         case MIN:
1058           iter->dbl[0] = DBL_MAX;
1059           break;
1060         case MIN | FSTRING:
1061           memset (iter->string, 255, iter->src->width);
1062           break;
1063         case MAX:
1064           iter->dbl[0] = -DBL_MAX;
1065           break;
1066         case MAX | FSTRING:
1067           memset (iter->string, 0, iter->src->width);
1068           break;
1069         case SD:
1070           if (iter->moments == NULL)
1071             iter->moments = moments1_create (MOMENT_VARIANCE);
1072           else
1073             moments1_clear (iter->moments);
1074           break;
1075         default:
1076           break;
1077         }
1078     }
1079 }
1080 \f
1081 /* Aggregate each case as it comes through.  Cases which aren't needed
1082    are dropped.
1083    Returns true if successful, false if an I/O error occurred. */
1084 static bool
1085 agr_to_active_file (const struct ccase *c, void *agr_)
1086 {
1087   struct agr_proc *agr = agr_;
1088
1089   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1090     return agr->sink->class->write (agr->sink, &agr->agr_case);
1091
1092   return true;
1093 }
1094
1095 /* Aggregate the current case and output it if we passed a
1096    breakpoint. */
1097 static bool
1098 presorted_agr_to_sysfile (const struct ccase *c, void *agr_) 
1099 {
1100   struct agr_proc *agr = agr_;
1101
1102   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1103     return any_writer_write (agr->writer, &agr->agr_case);
1104   
1105   return true;
1106 }