Fixed bugs #11722 and #11676
[pspp] / src / aggregate.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include "error.h"
22 #include <stdlib.h>
23 #include "alloc.h"
24 #include "case.h"
25 #include "casefile.h"
26 #include "command.h"
27 #include "dictionary.h"
28 #include "error.h"
29 #include "file-handle.h"
30 #include "lexer.h"
31 #include "misc.h"
32 #include "moments.h"
33 #include "pool.h"
34 #include "settings.h"
35 #include "sfm-write.h"
36 #include "sort.h"
37 #include "str.h"
38 #include "var.h"
39 #include "vfm.h"
40 #include "vfmP.h"
41
42 /* Specifies how to make an aggregate variable. */
43 struct agr_var
44   {
45     struct agr_var *next;               /* Next in list. */
46
47     /* Collected during parsing. */
48     struct variable *src;       /* Source variable. */
49     struct variable *dest;      /* Target variable. */
50     int function;               /* Function. */
51     int include_missing;        /* 1=Include user-missing values. */
52     union value arg[2];         /* Arguments. */
53
54     /* Accumulated during AGGREGATE execution. */
55     double dbl[3];
56     int int1, int2;
57     char *string;
58     int missing;
59     struct moments1 *moments;
60   };
61
62 /* Aggregation functions. */
63 enum
64   {
65     NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
66     FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
67     N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
68     FUNC = 0x1f, /* Function mask. */
69     FSTRING = 1<<5, /* String function bit. */
70   };
71
72 /* Attributes of an aggregation function. */
73 struct agr_func
74   {
75     const char *name;           /* Aggregation function name. */
76     int n_args;                 /* Number of arguments. */
77     int alpha_type;             /* When given ALPHA arguments, output type. */
78     struct fmt_spec format;     /* Format spec if alpha_type != ALPHA. */
79   };
80
81 /* Attributes of aggregation functions. */
82 static const struct agr_func agr_func_tab[] = 
83   {
84     {"<NONE>",  0, -1,      {0, 0, 0}},
85     {"SUM",     0, -1,      {FMT_F, 8, 2}},
86     {"MEAN",    0, -1,      {FMT_F, 8, 2}},
87     {"SD",      0, -1,      {FMT_F, 8, 2}},
88     {"MAX",     0, ALPHA,   {-1, -1, -1}}, 
89     {"MIN",     0, ALPHA,   {-1, -1, -1}}, 
90     {"PGT",     1, NUMERIC, {FMT_F, 5, 1}},      
91     {"PLT",     1, NUMERIC, {FMT_F, 5, 1}},       
92     {"PIN",     2, NUMERIC, {FMT_F, 5, 1}},       
93     {"POUT",    2, NUMERIC, {FMT_F, 5, 1}},       
94     {"FGT",     1, NUMERIC, {FMT_F, 5, 3}},       
95     {"FLT",     1, NUMERIC, {FMT_F, 5, 3}},       
96     {"FIN",     2, NUMERIC, {FMT_F, 5, 3}},       
97     {"FOUT",    2, NUMERIC, {FMT_F, 5, 3}},       
98     {"N",       0, NUMERIC, {FMT_F, 7, 0}},       
99     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},       
100     {"NMISS",   0, NUMERIC, {FMT_F, 7, 0}},       
101     {"NUMISS",  0, NUMERIC, {FMT_F, 7, 0}},       
102     {"FIRST",   0, ALPHA,   {-1, -1, -1}}, 
103     {"LAST",    0, ALPHA,   {-1, -1, -1}},
104     {NULL,      0, -1,      {-1, -1, -1}},
105     {"N",       0, NUMERIC, {FMT_F, 7, 0}},
106     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},
107   };
108
109 /* Missing value types. */
110 enum missing_treatment
111   {
112     ITEMWISE,           /* Missing values item by item. */
113     COLUMNWISE          /* Missing values column by column. */
114   };
115
116 /* An entire AGGREGATE procedure. */
117 struct agr_proc 
118   {
119     /* We have either an output file or a sink. */
120     struct sfm_writer *writer;          /* Output file, or null if none. */
121     struct case_sink *sink;             /* Sink, or null if none. */
122
123     /* Break variables. */
124     struct sort_criteria *sort;         /* Sort criteria. */
125     struct variable **break_vars;       /* Break variables. */
126     size_t break_var_cnt;               /* Number of break variables. */
127     union value *prev_break;            /* Last values of break variables. */
128
129     enum missing_treatment missing;     /* How to treat missing values. */
130     struct agr_var *agr_vars;           /* First aggregate variable. */
131     struct dictionary *dict;            /* Aggregate dictionary. */
132     int case_cnt;                       /* Counts aggregated cases. */
133     struct ccase agr_case;              /* Aggregate case for output. */
134   };
135
136 static void initialize_aggregate_info (struct agr_proc *);
137
138 /* Prototypes. */
139 static int parse_aggregate_functions (struct agr_proc *);
140 static void agr_destroy (struct agr_proc *);
141 static int aggregate_single_case (struct agr_proc *agr,
142                                   const struct ccase *input,
143                                   struct ccase *output);
144 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
145
146 /* Aggregating to the active file. */
147 static int agr_to_active_file (struct ccase *, void *aux);
148
149 /* Aggregating to a system file. */
150 static int presorted_agr_to_sysfile (struct ccase *, void *aux);
151 \f
152 /* Parsing. */
153
154 /* Parses and executes the AGGREGATE procedure. */
155 int
156 cmd_aggregate (void)
157 {
158   struct agr_proc agr;
159   struct file_handle *out_file = NULL;
160
161   /* Have we seen these subcommands? */
162   unsigned seen = 0;
163
164   memset(&agr, 0 , sizeof (agr));
165   agr.missing = ITEMWISE;
166   
167   agr.dict = dict_create ();
168   dict_set_label (agr.dict, dict_get_label (default_dict));
169   dict_set_documents (agr.dict, dict_get_documents (default_dict));
170   
171   /* Read most of the subcommands. */
172   for (;;)
173     {
174       lex_match ('/');
175       
176       if (lex_match_id ("OUTFILE"))
177         {
178           if (seen & 1)
179             {
180               msg (SE, _("%s subcommand given multiple times."),"OUTFILE");
181               goto error;
182             }
183           seen |= 1;
184               
185           lex_match ('=');
186           if (!lex_match ('*'))
187             {
188               out_file = fh_parse ();
189               if (out_file == NULL)
190                 goto error;
191             }
192         }
193       else if (lex_match_id ("MISSING"))
194         {
195           lex_match ('=');
196           if (!lex_match_id ("COLUMNWISE"))
197             {
198               lex_error (_("while expecting COLUMNWISE"));
199               goto error;
200             }
201           agr.missing = COLUMNWISE;
202         }
203       else if (lex_match_id ("DOCUMENT"))
204         seen |= 2;
205       else if (lex_match_id ("PRESORTED"))
206         seen |= 4;
207       else if (lex_match_id ("BREAK"))
208         {
209           int i;
210
211           if (seen & 8)
212             {
213               msg (SE, _("%s subcommand given multiple times."),"BREAK");
214               goto error;
215             }
216           seen |= 8;
217
218           lex_match ('=');
219           agr.sort = sort_parse_criteria (default_dict,
220                                           &agr.break_vars, &agr.break_var_cnt);
221           if (agr.sort == NULL)
222             goto error;
223           
224           for (i = 0; i < agr.break_var_cnt; i++)
225             {
226               struct variable *v = dict_clone_var (agr.dict, agr.break_vars[i],
227                                                    agr.break_vars[i]->name);
228               assert (v != NULL);
229             }
230         }
231       else break;
232     }
233
234   /* Check for proper syntax. */
235   if (!(seen & 8))
236     msg (SW, _("BREAK subcommand not specified."));
237       
238   /* Read in the aggregate functions. */
239   if (!parse_aggregate_functions (&agr))
240     goto error;
241
242   /* Delete documents. */
243   if (!(seen & 2))
244     dict_set_documents (agr.dict, NULL);
245
246   /* Cancel SPLIT FILE. */
247   dict_set_split_vars (agr.dict, NULL, 0);
248   
249   /* Initialize. */
250   agr.case_cnt = 0;
251   case_create (&agr.agr_case, dict_get_next_value_idx (agr.dict));
252   initialize_aggregate_info (&agr);
253
254   /* Output to active file or external file? */
255   if (out_file == NULL) 
256     {
257       /* The active file will be replaced by the aggregated data,
258          so TEMPORARY is moot. */
259       cancel_temporary ();
260
261       if (agr.sort != NULL && (seen & 4) == 0)
262         sort_active_file_in_place (agr.sort);
263
264       agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
265       if (agr.sink->class->open != NULL)
266         agr.sink->class->open (agr.sink);
267       vfm_sink = create_case_sink (&null_sink_class, default_dict, NULL);
268       procedure (agr_to_active_file, &agr);
269       if (agr.case_cnt > 0) 
270         {
271           dump_aggregate_info (&agr, &agr.agr_case);
272           agr.sink->class->write (agr.sink, &agr.agr_case);
273         }
274       dict_destroy (default_dict);
275       default_dict = agr.dict;
276       agr.dict = NULL;
277       vfm_source = agr.sink->class->make_source (agr.sink);
278       free_case_sink (agr.sink);
279     }
280   else
281     {
282       agr.writer = sfm_open_writer (out_file, agr.dict, get_scompression ());
283       if (agr.writer == NULL)
284         goto error;
285       
286       if (agr.sort != NULL && (seen & 4) == 0) 
287         {
288           /* Sorting is needed. */
289           struct casefile *dst;
290           struct casereader *reader;
291           struct ccase c;
292           
293           dst = sort_active_file_to_casefile (agr.sort);
294           if (dst == NULL)
295             goto error;
296           reader = casefile_get_destructive_reader (dst);
297           while (casereader_read_xfer (reader, &c)) 
298             {
299               if (aggregate_single_case (&agr, &c, &agr.agr_case)) 
300                 sfm_write_case (agr.writer, &agr.agr_case);
301               case_destroy (&c);
302             }
303           casereader_destroy (reader);
304           casefile_destroy (dst);
305         }
306       else 
307         {
308           /* Active file is already sorted. */
309           procedure (presorted_agr_to_sysfile, &agr);
310         }
311       
312       if (agr.case_cnt > 0) 
313         {
314           dump_aggregate_info (&agr, &agr.agr_case);
315           sfm_write_case (agr.writer, &agr.agr_case);
316         }
317     }
318   
319   agr_destroy (&agr);
320   return CMD_SUCCESS;
321
322 error:
323   agr_destroy (&agr);
324   return CMD_FAILURE;
325 }
326
327 /* Parse all the aggregate functions. */
328 static int
329 parse_aggregate_functions (struct agr_proc *agr)
330 {
331   struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
332
333   /* Parse everything. */
334   tail = NULL;
335   for (;;)
336     {
337       char **dest;
338       char **dest_label;
339       int n_dest;
340
341       int include_missing;
342       const struct agr_func *function;
343       int func_index;
344
345       union value arg[2];
346
347       struct variable **src;
348       int n_src;
349
350       int i;
351
352       dest = NULL;
353       dest_label = NULL;
354       n_dest = 0;
355       src = NULL;
356       function = NULL;
357       n_src = 0;
358       arg[0].c = NULL;
359       arg[1].c = NULL;
360
361       /* Parse the list of target variables. */
362       while (!lex_match ('='))
363         {
364           int n_dest_prev = n_dest;
365           
366           if (!parse_DATA_LIST_vars (&dest, &n_dest,
367                                      PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
368             goto error;
369
370           /* Assign empty labels. */
371           {
372             int j;
373
374             dest_label = xrealloc (dest_label, sizeof *dest_label * n_dest);
375             for (j = n_dest_prev; j < n_dest; j++)
376               dest_label[j] = NULL;
377           }
378           
379           if (token == T_STRING)
380             {
381               ds_truncate (&tokstr, 255);
382               dest_label[n_dest - 1] = xstrdup (ds_c_str (&tokstr));
383               lex_get ();
384             }
385         }
386
387       /* Get the name of the aggregation function. */
388       if (token != T_ID)
389         {
390           lex_error (_("expecting aggregation function"));
391           goto error;
392         }
393
394       include_missing = 0;
395       if (tokid[strlen (tokid) - 1] == '.')
396         {
397           include_missing = 1;
398           tokid[strlen (tokid) - 1] = 0;
399         }
400       
401       for (function = agr_func_tab; function->name; function++)
402         if (!strcmp (function->name, tokid))
403           break;
404       if (NULL == function->name)
405         {
406           msg (SE, _("Unknown aggregation function %s."), tokid);
407           goto error;
408         }
409       func_index = function - agr_func_tab;
410       lex_get ();
411
412       /* Check for leading lparen. */
413       if (!lex_match ('('))
414         {
415           if (func_index == N)
416             func_index = N_NO_VARS;
417           else if (func_index == NU)
418             func_index = NU_NO_VARS;
419           else
420             {
421               lex_error (_("expecting `('"));
422               goto error;
423             }
424         } else {
425           /* Parse list of source variables. */
426           {
427             int pv_opts = PV_NO_SCRATCH;
428
429             if (func_index == SUM || func_index == MEAN || func_index == SD)
430               pv_opts |= PV_NUMERIC;
431             else if (function->n_args)
432               pv_opts |= PV_SAME_TYPE;
433
434             if (!parse_variables (default_dict, &src, &n_src, pv_opts))
435               goto error;
436           }
437
438           /* Parse function arguments, for those functions that
439              require arguments. */
440           if (function->n_args != 0)
441             for (i = 0; i < function->n_args; i++)
442               {
443                 int type;
444             
445                 lex_match (',');
446                 if (token == T_STRING)
447                   {
448                     arg[i].c = xstrdup (ds_c_str (&tokstr));
449                     type = ALPHA;
450                   }
451                 else if (token == T_NUM)
452                   {
453                     arg[i].f = tokval;
454                     type = NUMERIC;
455                   } else {
456                     msg (SE, _("Missing argument %d to %s."), i + 1, function->name);
457                     goto error;
458                   }
459             
460                 lex_get ();
461
462                 if (type != src[0]->type)
463                   {
464                     msg (SE, _("Arguments to %s must be of same type as "
465                                "source variables."),
466                          function->name);
467                     goto error;
468                   }
469               }
470
471           /* Trailing rparen. */
472           if (!lex_match(')'))
473             {
474               lex_error (_("expecting `)'"));
475               goto error;
476             }
477           
478           /* Now check that the number of source variables match the
479              number of target variables.  Do this here because if we
480              do it earlier then the user can get very misleading error
481              messages; i.e., `AGGREGATE x=SUM(y t).' will get this
482              error message when a proper message would be more like
483              `unknown variable t'. */
484           if (n_src != n_dest)
485             {
486               msg (SE, _("Number of source variables (%d) does not match "
487                          "number of target variables (%d)."),
488                    n_src, n_dest);
489               goto error;
490             }
491         }
492         
493       /* Finally add these to the linked list of aggregation
494          variables. */
495       for (i = 0; i < n_dest; i++)
496         {
497           struct agr_var *v = xmalloc (sizeof *v);
498
499           /* Add variable to chain. */
500           if (agr->agr_vars != NULL)
501             tail->next = v;
502           else
503             agr->agr_vars = v;
504           tail = v;
505           tail->next = NULL;
506           v->moments = NULL;
507           
508           /* Create the target variable in the aggregate
509              dictionary. */
510           {
511             struct variable *destvar;
512             
513             v->function = func_index;
514
515             if (src)
516               {
517                 int output_width;
518
519                 v->src = src[i];
520                 
521                 if (src[i]->type == ALPHA)
522                   {
523                     v->function |= FSTRING;
524                     v->string = xmalloc (src[i]->width);
525                   }
526                 
527                 if (v->src->type == NUMERIC || function->alpha_type == NUMERIC)
528                   output_width = 0;
529                 else
530                   output_width = v->src->width;
531
532                 if (function->alpha_type == ALPHA)
533                   destvar = dict_clone_var (agr->dict, v->src, dest[i]);
534                 else
535                   {
536                     destvar = dict_create_var (agr->dict, dest[i], output_width);
537                     if (output_width == 0)
538                       destvar->print = destvar->write = function->format;
539                     if (output_width == 0 && dict_get_weight (default_dict) != NULL
540                         && (func_index == N || func_index == N_NO_VARS
541                             || func_index == NU || func_index == NU_NO_VARS))
542                       {
543                         struct fmt_spec f = {FMT_F, 8, 2};
544                       
545                         destvar->print = destvar->write = f;
546                       }
547                   }
548               } else {
549                 v->src = NULL;
550                 destvar = dict_create_var (agr->dict, dest[i], 0);
551               }
552           
553             if (!destvar)
554               {
555                 msg (SE, _("Variable name %s is not unique within the "
556                            "aggregate file dictionary, which contains "
557                            "the aggregate variables and the break "
558                            "variables."),
559                      dest[i]);
560                 free (dest[i]);
561                 goto error;
562               }
563
564             free (dest[i]);
565             destvar->init = 0;
566             if (dest_label[i])
567               {
568                 destvar->label = dest_label[i];
569                 dest_label[i] = NULL;
570               }
571             else if (function->alpha_type == ALPHA)
572               destvar->print = destvar->write = function->format;
573
574             v->dest = destvar;
575           }
576           
577           v->include_missing = include_missing;
578
579           if (v->src != NULL)
580             {
581               int j;
582
583               if (v->src->type == NUMERIC)
584                 for (j = 0; j < function->n_args; j++)
585                   v->arg[j].f = arg[j].f;
586               else
587                 for (j = 0; j < function->n_args; j++)
588                   v->arg[j].c = xstrdup (arg[j].c);
589             }
590         }
591       
592       if (src != NULL && src[0]->type == ALPHA)
593         for (i = 0; i < function->n_args; i++)
594           {
595             free (arg[i].c);
596             arg[i].c = NULL;
597           }
598
599       free (src);
600       free (dest);
601       free (dest_label);
602
603       if (!lex_match ('/'))
604         {
605           if (token == '.')
606             return 1;
607
608           lex_error ("expecting end of command");
609           return 0;
610         }
611       continue;
612       
613     error:
614       for (i = 0; i < n_dest; i++)
615         {
616           free (dest[i]);
617           free (dest_label[i]);
618         }
619       free (dest);
620       free (dest_label);
621       free (arg[0].c);
622       free (arg[1].c);
623       if (src && n_src && src[0]->type == ALPHA)
624         for (i = 0; i < function->n_args; i++)
625           {
626             free (arg[i].c);
627             arg[i].c = NULL;
628           }
629       free (src);
630         
631       return 0;
632     }
633 }
634
635 /* Destroys AGR. */
636 static void
637 agr_destroy (struct agr_proc *agr)
638 {
639   struct agr_var *iter, *next;
640
641   sfm_close_writer (agr->writer);
642   if (agr->sort != NULL)
643     sort_destroy_criteria (agr->sort);
644   free (agr->break_vars);
645   free (agr->prev_break);
646   for (iter = agr->agr_vars; iter; iter = next)
647     {
648       next = iter->next;
649
650       if (iter->function & FSTRING)
651         {
652           int n_args;
653           int i;
654
655           n_args = agr_func_tab[iter->function & FUNC].n_args;
656           for (i = 0; i < n_args; i++)
657             free (iter->arg[i].c);
658           free (iter->string);
659         }
660       else if (iter->function == SD)
661         moments1_destroy (iter->moments);
662       free (iter);
663     }
664   if (agr->dict != NULL)
665     dict_destroy (agr->dict);
666
667   case_destroy (&agr->agr_case);
668 }
669 \f
670 /* Execution. */
671
672 static void accumulate_aggregate_info (struct agr_proc *,
673                                        const struct ccase *);
674 static void dump_aggregate_info (struct agr_proc *, struct ccase *);
675
676 /* Processes a single case INPUT for aggregation.  If output is
677    warranted, writes it to OUTPUT and returns nonzero.
678    Otherwise, returns zero and OUTPUT is unmodified. */
679 static int
680 aggregate_single_case (struct agr_proc *agr,
681                        const struct ccase *input, struct ccase *output)
682 {
683   /* The first case always begins a new break group.  We also need to
684      preserve the values of the case for later comparison. */
685   if (agr->case_cnt++ == 0)
686     {
687       int n_elem = 0;
688       
689       {
690         int i;
691
692         for (i = 0; i < agr->break_var_cnt; i++)
693           n_elem += agr->break_vars[i]->nv;
694       }
695       
696       agr->prev_break = xmalloc (sizeof *agr->prev_break * n_elem);
697
698       /* Copy INPUT into prev_break. */
699       {
700         union value *iter = agr->prev_break;
701         int i;
702
703         for (i = 0; i < agr->break_var_cnt; i++)
704           {
705             struct variable *v = agr->break_vars[i];
706             
707             if (v->type == NUMERIC)
708               (iter++)->f = case_num (input, v->fv);
709             else
710               {
711                 memcpy (iter->s, case_str (input, v->fv), v->width);
712                 iter += v->nv;
713               }
714           }
715       }
716             
717       accumulate_aggregate_info (agr, input);
718         
719       return 0;
720     }
721       
722   /* Compare the value of each break variable to the values on the
723      previous case. */
724   {
725     union value *iter = agr->prev_break;
726     int i;
727     
728     for (i = 0; i < agr->break_var_cnt; i++)
729       {
730         struct variable *v = agr->break_vars[i];
731       
732         switch (v->type)
733           {
734           case NUMERIC:
735             if (case_num (input, v->fv) != iter->f)
736               goto not_equal;
737             iter++;
738             break;
739           case ALPHA:
740             if (memcmp (case_str (input, v->fv), iter->s, v->width))
741               goto not_equal;
742             iter += v->nv;
743             break;
744           default:
745             assert (0);
746           }
747       }
748   }
749
750   accumulate_aggregate_info (agr, input);
751
752   return 0;
753   
754 not_equal:
755   /* The values of the break variable are different from the values on
756      the previous case.  That means that it's time to dump aggregate
757      info. */
758   dump_aggregate_info (agr, output);
759   initialize_aggregate_info (agr);
760   accumulate_aggregate_info (agr, input);
761
762   /* Copy INPUT into prev_break. */
763   {
764     union value *iter = agr->prev_break;
765     int i;
766
767     for (i = 0; i < agr->break_var_cnt; i++)
768       {
769         struct variable *v = agr->break_vars[i];
770             
771         if (v->type == NUMERIC)
772           (iter++)->f = case_num (input, v->fv);
773         else
774           {
775             memcpy (iter->s, case_str (input, v->fv), v->width);
776             iter += v->nv;
777           }
778       }
779   }
780   
781   return 1;
782 }
783
784 /* Accumulates aggregation data from the case INPUT. */
785 static void 
786 accumulate_aggregate_info (struct agr_proc *agr,
787                            const struct ccase *input)
788 {
789   struct agr_var *iter;
790   double weight;
791   int bad_warn = 1;
792
793   weight = dict_get_case_weight (default_dict, input, &bad_warn);
794
795   for (iter = agr->agr_vars; iter; iter = iter->next)
796     if (iter->src)
797       {
798         const union value *v = case_data (input, iter->src->fv);
799
800         if ((!iter->include_missing && is_missing (v, iter->src))
801             || (iter->include_missing && iter->src->type == NUMERIC
802                 && v->f == SYSMIS))
803           {
804             switch (iter->function)
805               {
806               case NMISS:
807                 iter->dbl[0] += weight;
808                 break;
809               case NUMISS:
810                 iter->int1++;
811                 break;
812               }
813             iter->missing = 1;
814             continue;
815           }
816         
817         /* This is horrible.  There are too many possibilities. */
818         switch (iter->function)
819           {
820           case SUM:
821             iter->dbl[0] += v->f;
822             break;
823           case MEAN:
824             iter->dbl[0] += v->f * weight;
825             iter->dbl[1] += weight;
826             break;
827           case SD:
828             moments1_add (iter->moments, v->f, weight);
829             break;
830           case MAX:
831             iter->dbl[0] = max (iter->dbl[0], v->f);
832             iter->int1 = 1;
833             break;
834           case MAX | FSTRING:
835             if (memcmp (iter->string, v->s, iter->src->width) < 0)
836               memcpy (iter->string, v->s, iter->src->width);
837             iter->int1 = 1;
838             break;
839           case MIN:
840             iter->dbl[0] = min (iter->dbl[0], v->f);
841             iter->int1 = 1;
842             break;
843           case MIN | FSTRING:
844             if (memcmp (iter->string, v->s, iter->src->width) > 0)
845               memcpy (iter->string, v->s, iter->src->width);
846             iter->int1 = 1;
847             break;
848           case FGT:
849           case PGT:
850             if (v->f > iter->arg[0].f)
851               iter->dbl[0] += weight;
852             iter->dbl[1] += weight;
853             break;
854           case FGT | FSTRING:
855           case PGT | FSTRING:
856             if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
857               iter->dbl[0] += weight;
858             iter->dbl[1] += weight;
859             break;
860           case FLT:
861           case PLT:
862             if (v->f < iter->arg[0].f)
863               iter->dbl[0] += weight;
864             iter->dbl[1] += weight;
865             break;
866           case FLT | FSTRING:
867           case PLT | FSTRING:
868             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
869               iter->dbl[0] += weight;
870             iter->dbl[1] += weight;
871             break;
872           case FIN:
873           case PIN:
874             if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
875               iter->dbl[0] += weight;
876             iter->dbl[1] += weight;
877             break;
878           case FIN | FSTRING:
879           case PIN | FSTRING:
880             if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
881                 && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
882               iter->dbl[0] += weight;
883             iter->dbl[1] += weight;
884             break;
885           case FOUT:
886           case POUT:
887             if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
888               iter->dbl[0] += weight;
889             iter->dbl[1] += weight;
890             break;
891           case FOUT | FSTRING:
892           case POUT | FSTRING:
893             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
894                 && memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
895               iter->dbl[0] += weight;
896             iter->dbl[1] += weight;
897             break;
898           case N:
899             iter->dbl[0] += weight;
900             break;
901           case NU:
902             iter->int1++;
903             break;
904           case FIRST:
905             if (iter->int1 == 0)
906               {
907                 iter->dbl[0] = v->f;
908                 iter->int1 = 1;
909               }
910             break;
911           case FIRST | FSTRING:
912             if (iter->int1 == 0)
913               {
914                 memcpy (iter->string, v->s, iter->src->width);
915                 iter->int1 = 1;
916               }
917             break;
918           case LAST:
919             iter->dbl[0] = v->f;
920             iter->int1 = 1;
921             break;
922           case LAST | FSTRING:
923             memcpy (iter->string, v->s, iter->src->width);
924             iter->int1 = 1;
925             break;
926           default:
927             assert (0);
928           }
929     } else {
930       switch (iter->function)
931         {
932         case N_NO_VARS:
933           iter->dbl[0] += weight;
934           break;
935         case NU_NO_VARS:
936           iter->int1++;
937           break;
938         default:
939           assert (0);
940         }
941     }
942 }
943
944 /* We've come to a record that differs from the previous in one or
945    more of the break variables.  Make an output record from the
946    accumulated statistics in the OUTPUT case. */
947 static void 
948 dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
949 {
950   {
951     int value_idx = 0;
952     int i;
953
954     for (i = 0; i < agr->break_var_cnt; i++) 
955       {
956         int nv = agr->break_vars[i]->nv;
957         memcpy (case_data_rw (output, value_idx),
958                 &agr->prev_break[value_idx],
959                 sizeof (union value) * nv);
960         value_idx += nv; 
961       }
962   }
963   
964   {
965     struct agr_var *i;
966   
967     for (i = agr->agr_vars; i; i = i->next)
968       {
969         union value *v = case_data_rw (output, i->dest->fv);
970
971         if (agr->missing == COLUMNWISE && i->missing != 0
972             && (i->function & FUNC) != N && (i->function & FUNC) != NU
973             && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
974           {
975             if (i->function & FSTRING)
976               memset (v->s, ' ', i->dest->width);
977             else
978               v->f = SYSMIS;
979             continue;
980           }
981         
982         switch (i->function)
983           {
984           case SUM:
985             v->f = i->dbl[0];
986             break;
987           case MEAN:
988             v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
989             break;
990           case SD:
991             {
992               double variance;
993
994               /* FIXME: we should use two passes. */
995               moments1_calculate (i->moments, NULL, NULL, &variance,
996                                  NULL, NULL);
997               if (variance != SYSMIS)
998                 v->f = sqrt (variance);
999               else
1000                 v->f = SYSMIS; 
1001             }
1002             break;
1003           case MAX:
1004           case MIN:
1005             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1006             break;
1007           case MAX | FSTRING:
1008           case MIN | FSTRING:
1009             if (i->int1)
1010               memcpy (v->s, i->string, i->dest->width);
1011             else
1012               memset (v->s, ' ', i->dest->width);
1013             break;
1014           case FGT | FSTRING:
1015           case FLT | FSTRING:
1016           case FIN | FSTRING:
1017           case FOUT | FSTRING:
1018             v->f = i->int2 ? (double) i->int1 / (double) i->int2 : SYSMIS;
1019             break;
1020           case FGT:
1021           case FLT:
1022           case FIN:
1023           case FOUT:
1024             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
1025             break;
1026           case PGT:
1027           case PGT | FSTRING:
1028           case PLT:
1029           case PLT | FSTRING:
1030           case PIN:
1031           case PIN | FSTRING:
1032           case POUT:
1033           case POUT | FSTRING:
1034             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
1035             break;
1036           case N:
1037             v->f = i->dbl[0];
1038             break;
1039           case NU:
1040             v->f = i->int1;
1041             break;
1042           case FIRST:
1043           case LAST:
1044             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1045             break;
1046           case FIRST | FSTRING:
1047           case LAST | FSTRING:
1048             if (i->int1)
1049               memcpy (v->s, i->string, i->dest->width);
1050             else
1051               memset (v->s, ' ', i->dest->width);
1052             break;
1053           case N_NO_VARS:
1054             v->f = i->dbl[0];
1055             break;
1056           case NU_NO_VARS:
1057             v->f = i->int1;
1058             break;
1059           case NMISS:
1060             v->f = i->dbl[0];
1061             break;
1062           case NUMISS:
1063             v->f = i->int1;
1064             break;
1065           default:
1066             assert (0);
1067           }
1068       }
1069   }
1070 }
1071
1072 /* Resets the state for all the aggregate functions. */
1073 static void
1074 initialize_aggregate_info (struct agr_proc *agr)
1075 {
1076   struct agr_var *iter;
1077
1078   for (iter = agr->agr_vars; iter; iter = iter->next)
1079     {
1080       iter->missing = 0;
1081       switch (iter->function)
1082         {
1083         case MIN:
1084           iter->dbl[0] = DBL_MAX;
1085           break;
1086         case MIN | FSTRING:
1087           memset (iter->string, 255, iter->src->width);
1088           break;
1089         case MAX:
1090           iter->dbl[0] = -DBL_MAX;
1091           break;
1092         case MAX | FSTRING:
1093           memset (iter->string, 0, iter->src->width);
1094           break;
1095         case SD:
1096           if (iter->moments == NULL)
1097             iter->moments = moments1_create (MOMENT_VARIANCE);
1098           else
1099             moments1_clear (iter->moments);
1100           break;
1101         default:
1102           iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1103           iter->int1 = iter->int2 = 0;
1104           break;
1105         }
1106     }
1107 }
1108 \f
1109 /* Aggregate each case as it comes through.  Cases which aren't needed
1110    are dropped. */
1111 static int
1112 agr_to_active_file (struct ccase *c, void *agr_)
1113 {
1114   struct agr_proc *agr = agr_;
1115
1116   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1117     agr->sink->class->write (agr->sink, &agr->agr_case);
1118
1119   return 1;
1120 }
1121
1122 /* Aggregate the current case and output it if we passed a
1123    breakpoint. */
1124 static int
1125 presorted_agr_to_sysfile (struct ccase *c, void *agr_) 
1126 {
1127   struct agr_proc *agr = agr_;
1128
1129   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1130     sfm_write_case (agr->writer, &agr->agr_case);
1131
1132   return 1;
1133 }