Remove julcal.
[pspp] / src / aggregate.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include "error.h"
22 #include <stdlib.h>
23 #include "alloc.h"
24 #include "case.h"
25 #include "casefile.h"
26 #include "command.h"
27 #include "dictionary.h"
28 #include "error.h"
29 #include "file-handle.h"
30 #include "lexer.h"
31 #include "misc.h"
32 #include "moments.h"
33 #include "pool.h"
34 #include "settings.h"
35 #include "sfm-write.h"
36 #include "sort.h"
37 #include "str.h"
38 #include "var.h"
39 #include "vfm.h"
40 #include "vfmP.h"
41
42 /* Specifies how to make an aggregate variable. */
43 struct agr_var
44   {
45     struct agr_var *next;               /* Next in list. */
46
47     /* Collected during parsing. */
48     struct variable *src;       /* Source variable. */
49     struct variable *dest;      /* Target variable. */
50     int function;               /* Function. */
51     int include_missing;        /* 1=Include user-missing values. */
52     union value arg[2];         /* Arguments. */
53
54     /* Accumulated during AGGREGATE execution. */
55     double dbl[3];
56     int int1, int2;
57     char *string;
58     int missing;
59     struct moments1 *moments;
60   };
61
62 /* Aggregation functions. */
63 enum
64   {
65     NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
66     FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
67     N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
68     FUNC = 0x1f, /* Function mask. */
69     FSTRING = 1<<5, /* String function bit. */
70   };
71
72 /* Attributes of an aggregation function. */
73 struct agr_func
74   {
75     const char *name;           /* Aggregation function name. */
76     int n_args;                 /* Number of arguments. */
77     int alpha_type;             /* When given ALPHA arguments, output type. */
78     struct fmt_spec format;     /* Format spec if alpha_type != ALPHA. */
79   };
80
81 /* Attributes of aggregation functions. */
82 static const struct agr_func agr_func_tab[] = 
83   {
84     {"<NONE>",  0, -1,      {0, 0, 0}},
85     {"SUM",     0, -1,      {FMT_F, 8, 2}},
86     {"MEAN",    0, -1,      {FMT_F, 8, 2}},
87     {"SD",      0, -1,      {FMT_F, 8, 2}},
88     {"MAX",     0, ALPHA,   {-1, -1, -1}}, 
89     {"MIN",     0, ALPHA,   {-1, -1, -1}}, 
90     {"PGT",     1, NUMERIC, {FMT_F, 5, 1}},      
91     {"PLT",     1, NUMERIC, {FMT_F, 5, 1}},       
92     {"PIN",     2, NUMERIC, {FMT_F, 5, 1}},       
93     {"POUT",    2, NUMERIC, {FMT_F, 5, 1}},       
94     {"FGT",     1, NUMERIC, {FMT_F, 5, 3}},       
95     {"FLT",     1, NUMERIC, {FMT_F, 5, 3}},       
96     {"FIN",     2, NUMERIC, {FMT_F, 5, 3}},       
97     {"FOUT",    2, NUMERIC, {FMT_F, 5, 3}},       
98     {"N",       0, NUMERIC, {FMT_F, 7, 0}},       
99     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},       
100     {"NMISS",   0, NUMERIC, {FMT_F, 7, 0}},       
101     {"NUMISS",  0, NUMERIC, {FMT_F, 7, 0}},       
102     {"FIRST",   0, ALPHA,   {-1, -1, -1}}, 
103     {"LAST",    0, ALPHA,   {-1, -1, -1}},
104     {NULL,      0, -1,      {-1, -1, -1}},
105     {"N",       0, NUMERIC, {FMT_F, 7, 0}},
106     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},
107   };
108
109 /* Missing value types. */
110 enum missing_treatment
111   {
112     ITEMWISE,           /* Missing values item by item. */
113     COLUMNWISE          /* Missing values column by column. */
114   };
115
116 /* An entire AGGREGATE procedure. */
117 struct agr_proc 
118   {
119     /* We have either an output file or a sink. */
120     struct sfm_writer *writer;          /* Output file, or null if none. */
121     struct case_sink *sink;             /* Sink, or null if none. */
122
123     /* Break variables. */
124     struct sort_criteria *sort;         /* Sort criteria. */
125     struct variable **break_vars;       /* Break variables. */
126     size_t break_var_cnt;               /* Number of break variables. */
127     union value *prev_break;            /* Last values of break variables. */
128
129     enum missing_treatment missing;     /* How to treat missing values. */
130     struct agr_var *agr_vars;           /* First aggregate variable. */
131     struct dictionary *dict;            /* Aggregate dictionary. */
132     int case_cnt;                       /* Counts aggregated cases. */
133     struct ccase agr_case;              /* Aggregate case for output. */
134   };
135
136 static void initialize_aggregate_info (struct agr_proc *);
137
138 /* Prototypes. */
139 static int parse_aggregate_functions (struct agr_proc *);
140 static void agr_destroy (struct agr_proc *);
141 static int aggregate_single_case (struct agr_proc *agr,
142                                   const struct ccase *input,
143                                   struct ccase *output);
144 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
145
146 /* Aggregating to the active file. */
147 static int agr_to_active_file (struct ccase *, void *aux);
148
149 /* Aggregating to a system file. */
150 static int presorted_agr_to_sysfile (struct ccase *, void *aux);
151 \f
152 /* Parsing. */
153
154 /* Parses and executes the AGGREGATE procedure. */
155 int
156 cmd_aggregate (void)
157 {
158   struct agr_proc agr;
159   struct file_handle *out_file = NULL;
160
161   /* Have we seen these subcommands? */
162   unsigned seen = 0;
163
164   memset(&agr, 0 , sizeof (agr));
165   agr.missing = ITEMWISE;
166   
167   agr.dict = dict_create ();
168   dict_set_label (agr.dict, dict_get_label (default_dict));
169   dict_set_documents (agr.dict, dict_get_documents (default_dict));
170   
171   /* Read most of the subcommands. */
172   for (;;)
173     {
174       lex_match ('/');
175       
176       if (lex_match_id ("OUTFILE"))
177         {
178           if (seen & 1)
179             {
180               msg (SE, _("%s subcommand given multiple times."),"OUTFILE");
181               goto error;
182             }
183           seen |= 1;
184               
185           lex_match ('=');
186           if (!lex_match ('*'))
187             {
188               out_file = fh_parse ();
189               if (out_file == NULL)
190                 goto error;
191             }
192         }
193       else if (lex_match_id ("MISSING"))
194         {
195           lex_match ('=');
196           if (!lex_match_id ("COLUMNWISE"))
197             {
198               lex_error (_("while expecting COLUMNWISE"));
199               goto error;
200             }
201           agr.missing = COLUMNWISE;
202         }
203       else if (lex_match_id ("DOCUMENT"))
204         seen |= 2;
205       else if (lex_match_id ("PRESORTED"))
206         seen |= 4;
207       else if (lex_match_id ("BREAK"))
208         {
209           int i;
210
211           if (seen & 8)
212             {
213               msg (SE, _("%s subcommand given multiple times."),"BREAK");
214               goto error;
215             }
216           seen |= 8;
217
218           lex_match ('=');
219           agr.sort = sort_parse_criteria (default_dict,
220                                           &agr.break_vars, &agr.break_var_cnt);
221           if (agr.sort == NULL)
222             goto error;
223           
224           for (i = 0; i < agr.break_var_cnt; i++)
225             {
226               struct variable *v = dict_clone_var (agr.dict, agr.break_vars[i],
227                                                    agr.break_vars[i]->name);
228               assert (v != NULL);
229             }
230         }
231       else break;
232     }
233
234   /* Check for proper syntax. */
235   if (!(seen & 8))
236     msg (SW, _("BREAK subcommand not specified."));
237       
238   /* Read in the aggregate functions. */
239   if (!parse_aggregate_functions (&agr))
240     goto error;
241
242   /* Delete documents. */
243   if (!(seen & 2))
244     dict_set_documents (agr.dict, NULL);
245
246   /* Cancel SPLIT FILE. */
247   dict_set_split_vars (agr.dict, NULL, 0);
248   
249   /* Initialize. */
250   agr.case_cnt = 0;
251   case_create (&agr.agr_case, dict_get_next_value_idx (agr.dict));
252   initialize_aggregate_info (&agr);
253
254   /* Output to active file or external file? */
255   if (out_file == NULL) 
256     {
257       /* The active file will be replaced by the aggregated data,
258          so TEMPORARY is moot. */
259       cancel_temporary ();
260
261       if (agr.sort != NULL && (seen & 4) == 0)
262         sort_active_file_in_place (agr.sort);
263
264       agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
265       if (agr.sink->class->open != NULL)
266         agr.sink->class->open (agr.sink);
267       vfm_sink = create_case_sink (&null_sink_class, default_dict, NULL);
268       procedure (agr_to_active_file, &agr);
269       if (agr.case_cnt > 0) 
270         {
271           dump_aggregate_info (&agr, &agr.agr_case);
272           agr.sink->class->write (agr.sink, &agr.agr_case);
273         }
274       dict_destroy (default_dict);
275       default_dict = agr.dict;
276       agr.dict = NULL;
277       vfm_source = agr.sink->class->make_source (agr.sink);
278       free_case_sink (agr.sink);
279     }
280   else
281     {
282       agr.writer = sfm_open_writer (out_file, agr.dict, get_scompression ());
283       if (agr.writer == NULL)
284         goto error;
285       
286       if (agr.sort != NULL && (seen & 4) == 0) 
287         {
288           /* Sorting is needed. */
289           struct casefile *dst;
290           struct casereader *reader;
291           struct ccase c;
292           
293           dst = sort_active_file_to_casefile (agr.sort);
294           if (dst == NULL)
295             goto error;
296           reader = casefile_get_destructive_reader (dst);
297           while (casereader_read_xfer (reader, &c)) 
298             {
299               if (aggregate_single_case (&agr, &c, &agr.agr_case)) 
300                 sfm_write_case (agr.writer, &agr.agr_case);
301               case_destroy (&c);
302             }
303           casereader_destroy (reader);
304           casefile_destroy (dst);
305         }
306       else 
307         {
308           /* Active file is already sorted. */
309           procedure (presorted_agr_to_sysfile, &agr);
310         }
311       
312       if (agr.case_cnt > 0) 
313         {
314           dump_aggregate_info (&agr, &agr.agr_case);
315           sfm_write_case (agr.writer, &agr.agr_case);
316         }
317     }
318   
319   agr_destroy (&agr);
320   return CMD_SUCCESS;
321
322 error:
323   agr_destroy (&agr);
324   return CMD_FAILURE;
325 }
326
327 /* Parse all the aggregate functions. */
328 static int
329 parse_aggregate_functions (struct agr_proc *agr)
330 {
331   struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
332
333   /* Parse everything. */
334   tail = NULL;
335   for (;;)
336     {
337       char **dest;
338       char **dest_label;
339       int n_dest;
340
341       int include_missing;
342       const struct agr_func *function;
343       int func_index;
344
345       union value arg[2];
346
347       struct variable **src;
348       int n_src;
349
350       int i;
351
352       dest = NULL;
353       dest_label = NULL;
354       n_dest = 0;
355       src = NULL;
356       function = NULL;
357       n_src = 0;
358       arg[0].c = NULL;
359       arg[1].c = NULL;
360
361       /* Parse the list of target variables. */
362       while (!lex_match ('='))
363         {
364           int n_dest_prev = n_dest;
365           
366           if (!parse_DATA_LIST_vars (&dest, &n_dest,
367                                      PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
368             goto error;
369
370           /* Assign empty labels. */
371           {
372             int j;
373
374             dest_label = xrealloc (dest_label, sizeof *dest_label * n_dest);
375             for (j = n_dest_prev; j < n_dest; j++)
376               dest_label[j] = NULL;
377           }
378           
379           if (token == T_STRING)
380             {
381               ds_truncate (&tokstr, 255);
382               dest_label[n_dest - 1] = xstrdup (ds_c_str (&tokstr));
383               lex_get ();
384             }
385         }
386
387       /* Get the name of the aggregation function. */
388       if (token != T_ID)
389         {
390           lex_error (_("expecting aggregation function"));
391           goto error;
392         }
393
394       include_missing = 0;
395       if (tokid[strlen (tokid) - 1] == '.')
396         {
397           include_missing = 1;
398           tokid[strlen (tokid) - 1] = 0;
399         }
400       
401       for (function = agr_func_tab; function->name; function++)
402         if (!strcmp (function->name, tokid))
403           break;
404       if (NULL == function->name)
405         {
406           msg (SE, _("Unknown aggregation function %s."), tokid);
407           goto error;
408         }
409       func_index = function - agr_func_tab;
410       lex_get ();
411
412       /* Check for leading lparen. */
413       if (!lex_match ('('))
414         {
415           if (func_index == N)
416             func_index = N_NO_VARS;
417           else if (func_index == NU)
418             func_index = NU_NO_VARS;
419           else
420             {
421               lex_error (_("expecting `('"));
422               goto error;
423             }
424         }
425       else
426         {
427           /* Parse list of source variables. */
428           {
429             int pv_opts = PV_NO_SCRATCH;
430
431             if (func_index == SUM || func_index == MEAN || func_index == SD)
432               pv_opts |= PV_NUMERIC;
433             else if (function->n_args)
434               pv_opts |= PV_SAME_TYPE;
435
436             if (!parse_variables (default_dict, &src, &n_src, pv_opts))
437               goto error;
438           }
439
440           /* Parse function arguments, for those functions that
441              require arguments. */
442           if (function->n_args != 0)
443             for (i = 0; i < function->n_args; i++)
444               {
445                 int type;
446             
447                 lex_match (',');
448                 if (token == T_STRING)
449                   {
450                     arg[i].c = xstrdup (ds_c_str (&tokstr));
451                     type = ALPHA;
452                   }
453                 else if (lex_is_number ())
454                   {
455                     arg[i].f = tokval;
456                     type = NUMERIC;
457                   } else {
458                     msg (SE, _("Missing argument %d to %s."), i + 1, function->name);
459                     goto error;
460                   }
461             
462                 lex_get ();
463
464                 if (type != src[0]->type)
465                   {
466                     msg (SE, _("Arguments to %s must be of same type as "
467                                "source variables."),
468                          function->name);
469                     goto error;
470                   }
471               }
472
473           /* Trailing rparen. */
474           if (!lex_match(')'))
475             {
476               lex_error (_("expecting `)'"));
477               goto error;
478             }
479           
480           /* Now check that the number of source variables match the
481              number of target variables.  Do this here because if we
482              do it earlier then the user can get very misleading error
483              messages; i.e., `AGGREGATE x=SUM(y t).' will get this
484              error message when a proper message would be more like
485              `unknown variable t'. */
486           if (n_src != n_dest)
487             {
488               msg (SE, _("Number of source variables (%d) does not match "
489                          "number of target variables (%d)."),
490                    n_src, n_dest);
491               goto error;
492             }
493         }
494         
495       /* Finally add these to the linked list of aggregation
496          variables. */
497       for (i = 0; i < n_dest; i++)
498         {
499           struct agr_var *v = xmalloc (sizeof *v);
500
501           /* Add variable to chain. */
502           if (agr->agr_vars != NULL)
503             tail->next = v;
504           else
505             agr->agr_vars = v;
506           tail = v;
507           tail->next = NULL;
508           v->moments = NULL;
509           
510           /* Create the target variable in the aggregate
511              dictionary. */
512           {
513             struct variable *destvar;
514             
515             v->function = func_index;
516
517             if (src)
518               {
519                 v->src = src[i];
520                 
521                 if (src[i]->type == ALPHA)
522                   {
523                     v->function |= FSTRING;
524                     v->string = xmalloc (src[i]->width);
525                   }
526
527                 if (function->alpha_type == ALPHA)
528                   destvar = dict_clone_var (agr->dict, v->src, dest[i]);
529                 else if (v->src->type == NUMERIC
530                          || function->alpha_type == NUMERIC)
531                   {
532                     destvar = dict_create_var (agr->dict, dest[i], 0);
533                         
534                     if ((func_index == N
535                             || func_index == N_NO_VARS
536                             || func_index == NMISS)
537                         && dict_get_weight (default_dict) != NULL)
538                       {
539                         static const struct fmt_spec f8_2 = {FMT_F, 8, 2};
540                             
541                         destvar->print = destvar->write = f8_2; 
542                       }
543                     else
544                       destvar->print = destvar->write = function->format;
545                   }
546                 else 
547                   destvar = dict_create_var (agr->dict, dest[i],
548                                              v->src->width);
549               } else {
550                 v->src = NULL;
551                 destvar = dict_create_var (agr->dict, dest[i], 0);
552               }
553           
554             if (!destvar)
555               {
556                 msg (SE, _("Variable name %s is not unique within the "
557                            "aggregate file dictionary, which contains "
558                            "the aggregate variables and the break "
559                            "variables."),
560                      dest[i]);
561                 free (dest[i]);
562                 goto error;
563               }
564
565             free (dest[i]);
566             destvar->init = 0;
567             if (dest_label[i])
568               {
569                 destvar->label = dest_label[i];
570                 dest_label[i] = NULL;
571               }
572
573             v->dest = destvar;
574           }
575           
576           v->include_missing = include_missing;
577
578           if (v->src != NULL)
579             {
580               int j;
581
582               if (v->src->type == NUMERIC)
583                 for (j = 0; j < function->n_args; j++)
584                   v->arg[j].f = arg[j].f;
585               else
586                 for (j = 0; j < function->n_args; j++)
587                   v->arg[j].c = xstrdup (arg[j].c);
588             }
589         }
590       
591       if (src != NULL && src[0]->type == ALPHA)
592         for (i = 0; i < function->n_args; i++)
593           {
594             free (arg[i].c);
595             arg[i].c = NULL;
596           }
597
598       free (src);
599       free (dest);
600       free (dest_label);
601
602       if (!lex_match ('/'))
603         {
604           if (token == '.')
605             return 1;
606
607           lex_error ("expecting end of command");
608           return 0;
609         }
610       continue;
611       
612     error:
613       for (i = 0; i < n_dest; i++)
614         {
615           free (dest[i]);
616           free (dest_label[i]);
617         }
618       free (dest);
619       free (dest_label);
620       free (arg[0].c);
621       free (arg[1].c);
622       if (src && n_src && src[0]->type == ALPHA)
623         for (i = 0; i < function->n_args; i++)
624           {
625             free (arg[i].c);
626             arg[i].c = NULL;
627           }
628       free (src);
629         
630       return 0;
631     }
632 }
633
634 /* Destroys AGR. */
635 static void
636 agr_destroy (struct agr_proc *agr)
637 {
638   struct agr_var *iter, *next;
639
640   sfm_close_writer (agr->writer);
641   if (agr->sort != NULL)
642     sort_destroy_criteria (agr->sort);
643   free (agr->break_vars);
644   free (agr->prev_break);
645   for (iter = agr->agr_vars; iter; iter = next)
646     {
647       next = iter->next;
648
649       if (iter->function & FSTRING)
650         {
651           int n_args;
652           int i;
653
654           n_args = agr_func_tab[iter->function & FUNC].n_args;
655           for (i = 0; i < n_args; i++)
656             free (iter->arg[i].c);
657           free (iter->string);
658         }
659       else if (iter->function == SD)
660         moments1_destroy (iter->moments);
661       free (iter);
662     }
663   if (agr->dict != NULL)
664     dict_destroy (agr->dict);
665
666   case_destroy (&agr->agr_case);
667 }
668 \f
669 /* Execution. */
670
671 static void accumulate_aggregate_info (struct agr_proc *,
672                                        const struct ccase *);
673 static void dump_aggregate_info (struct agr_proc *, struct ccase *);
674
675 /* Processes a single case INPUT for aggregation.  If output is
676    warranted, writes it to OUTPUT and returns nonzero.
677    Otherwise, returns zero and OUTPUT is unmodified. */
678 static int
679 aggregate_single_case (struct agr_proc *agr,
680                        const struct ccase *input, struct ccase *output)
681 {
682   /* The first case always begins a new break group.  We also need to
683      preserve the values of the case for later comparison. */
684   if (agr->case_cnt++ == 0)
685     {
686       int n_elem = 0;
687       
688       {
689         int i;
690
691         for (i = 0; i < agr->break_var_cnt; i++)
692           n_elem += agr->break_vars[i]->nv;
693       }
694       
695       agr->prev_break = xmalloc (sizeof *agr->prev_break * n_elem);
696
697       /* Copy INPUT into prev_break. */
698       {
699         union value *iter = agr->prev_break;
700         int i;
701
702         for (i = 0; i < agr->break_var_cnt; i++)
703           {
704             struct variable *v = agr->break_vars[i];
705             
706             if (v->type == NUMERIC)
707               (iter++)->f = case_num (input, v->fv);
708             else
709               {
710                 memcpy (iter->s, case_str (input, v->fv), v->width);
711                 iter += v->nv;
712               }
713           }
714       }
715             
716       accumulate_aggregate_info (agr, input);
717         
718       return 0;
719     }
720       
721   /* Compare the value of each break variable to the values on the
722      previous case. */
723   {
724     union value *iter = agr->prev_break;
725     int i;
726     
727     for (i = 0; i < agr->break_var_cnt; i++)
728       {
729         struct variable *v = agr->break_vars[i];
730       
731         switch (v->type)
732           {
733           case NUMERIC:
734             if (case_num (input, v->fv) != iter->f)
735               goto not_equal;
736             iter++;
737             break;
738           case ALPHA:
739             if (memcmp (case_str (input, v->fv), iter->s, v->width))
740               goto not_equal;
741             iter += v->nv;
742             break;
743           default:
744             assert (0);
745           }
746       }
747   }
748
749   accumulate_aggregate_info (agr, input);
750
751   return 0;
752   
753 not_equal:
754   /* The values of the break variable are different from the values on
755      the previous case.  That means that it's time to dump aggregate
756      info. */
757   dump_aggregate_info (agr, output);
758   initialize_aggregate_info (agr);
759   accumulate_aggregate_info (agr, input);
760
761   /* Copy INPUT into prev_break. */
762   {
763     union value *iter = agr->prev_break;
764     int i;
765
766     for (i = 0; i < agr->break_var_cnt; i++)
767       {
768         struct variable *v = agr->break_vars[i];
769             
770         if (v->type == NUMERIC)
771           (iter++)->f = case_num (input, v->fv);
772         else
773           {
774             memcpy (iter->s, case_str (input, v->fv), v->width);
775             iter += v->nv;
776           }
777       }
778   }
779   
780   return 1;
781 }
782
783 /* Accumulates aggregation data from the case INPUT. */
784 static void 
785 accumulate_aggregate_info (struct agr_proc *agr,
786                            const struct ccase *input)
787 {
788   struct agr_var *iter;
789   double weight;
790   int bad_warn = 1;
791
792   weight = dict_get_case_weight (default_dict, input, &bad_warn);
793
794   for (iter = agr->agr_vars; iter; iter = iter->next)
795     if (iter->src)
796       {
797         const union value *v = case_data (input, iter->src->fv);
798
799         if ((!iter->include_missing && is_missing (v, iter->src))
800             || (iter->include_missing && iter->src->type == NUMERIC
801                 && v->f == SYSMIS))
802           {
803             switch (iter->function)
804               {
805               case NMISS:
806                 iter->dbl[0] += weight;
807                 break;
808               case NUMISS:
809                 iter->int1++;
810                 break;
811               }
812             iter->missing = 1;
813             continue;
814           }
815         
816         /* This is horrible.  There are too many possibilities. */
817         switch (iter->function)
818           {
819           case SUM:
820             iter->dbl[0] += v->f;
821             break;
822           case MEAN:
823             iter->dbl[0] += v->f * weight;
824             iter->dbl[1] += weight;
825             break;
826           case SD:
827             moments1_add (iter->moments, v->f, weight);
828             break;
829           case MAX:
830             iter->dbl[0] = max (iter->dbl[0], v->f);
831             iter->int1 = 1;
832             break;
833           case MAX | FSTRING:
834             if (memcmp (iter->string, v->s, iter->src->width) < 0)
835               memcpy (iter->string, v->s, iter->src->width);
836             iter->int1 = 1;
837             break;
838           case MIN:
839             iter->dbl[0] = min (iter->dbl[0], v->f);
840             iter->int1 = 1;
841             break;
842           case MIN | FSTRING:
843             if (memcmp (iter->string, v->s, iter->src->width) > 0)
844               memcpy (iter->string, v->s, iter->src->width);
845             iter->int1 = 1;
846             break;
847           case FGT:
848           case PGT:
849             if (v->f > iter->arg[0].f)
850               iter->dbl[0] += weight;
851             iter->dbl[1] += weight;
852             break;
853           case FGT | FSTRING:
854           case PGT | FSTRING:
855             if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
856               iter->dbl[0] += weight;
857             iter->dbl[1] += weight;
858             break;
859           case FLT:
860           case PLT:
861             if (v->f < iter->arg[0].f)
862               iter->dbl[0] += weight;
863             iter->dbl[1] += weight;
864             break;
865           case FLT | FSTRING:
866           case PLT | FSTRING:
867             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
868               iter->dbl[0] += weight;
869             iter->dbl[1] += weight;
870             break;
871           case FIN:
872           case PIN:
873             if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
874               iter->dbl[0] += weight;
875             iter->dbl[1] += weight;
876             break;
877           case FIN | FSTRING:
878           case PIN | FSTRING:
879             if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
880                 && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
881               iter->dbl[0] += weight;
882             iter->dbl[1] += weight;
883             break;
884           case FOUT:
885           case POUT:
886             if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
887               iter->dbl[0] += weight;
888             iter->dbl[1] += weight;
889             break;
890           case FOUT | FSTRING:
891           case POUT | FSTRING:
892             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
893                 && memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
894               iter->dbl[0] += weight;
895             iter->dbl[1] += weight;
896             break;
897           case N:
898             iter->dbl[0] += weight;
899             break;
900           case NU:
901             iter->int1++;
902             break;
903           case FIRST:
904             if (iter->int1 == 0)
905               {
906                 iter->dbl[0] = v->f;
907                 iter->int1 = 1;
908               }
909             break;
910           case FIRST | FSTRING:
911             if (iter->int1 == 0)
912               {
913                 memcpy (iter->string, v->s, iter->src->width);
914                 iter->int1 = 1;
915               }
916             break;
917           case LAST:
918             iter->dbl[0] = v->f;
919             iter->int1 = 1;
920             break;
921           case LAST | FSTRING:
922             memcpy (iter->string, v->s, iter->src->width);
923             iter->int1 = 1;
924             break;
925           default:
926             assert (0);
927           }
928     } else {
929       switch (iter->function)
930         {
931         case N_NO_VARS:
932           iter->dbl[0] += weight;
933           break;
934         case NU_NO_VARS:
935           iter->int1++;
936           break;
937         default:
938           assert (0);
939         }
940     }
941 }
942
943 /* We've come to a record that differs from the previous in one or
944    more of the break variables.  Make an output record from the
945    accumulated statistics in the OUTPUT case. */
946 static void 
947 dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
948 {
949   {
950     int value_idx = 0;
951     int i;
952
953     for (i = 0; i < agr->break_var_cnt; i++) 
954       {
955         int nv = agr->break_vars[i]->nv;
956         memcpy (case_data_rw (output, value_idx),
957                 &agr->prev_break[value_idx],
958                 sizeof (union value) * nv);
959         value_idx += nv; 
960       }
961   }
962   
963   {
964     struct agr_var *i;
965   
966     for (i = agr->agr_vars; i; i = i->next)
967       {
968         union value *v = case_data_rw (output, i->dest->fv);
969
970         if (agr->missing == COLUMNWISE && i->missing != 0
971             && (i->function & FUNC) != N && (i->function & FUNC) != NU
972             && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
973           {
974             if (i->function & FSTRING)
975               memset (v->s, ' ', i->dest->width);
976             else
977               v->f = SYSMIS;
978             continue;
979           }
980         
981         switch (i->function)
982           {
983           case SUM:
984             v->f = i->dbl[0];
985             break;
986           case MEAN:
987             v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
988             break;
989           case SD:
990             {
991               double variance;
992
993               /* FIXME: we should use two passes. */
994               moments1_calculate (i->moments, NULL, NULL, &variance,
995                                  NULL, NULL);
996               if (variance != SYSMIS)
997                 v->f = sqrt (variance);
998               else
999                 v->f = SYSMIS; 
1000             }
1001             break;
1002           case MAX:
1003           case MIN:
1004             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1005             break;
1006           case MAX | FSTRING:
1007           case MIN | FSTRING:
1008             if (i->int1)
1009               memcpy (v->s, i->string, i->dest->width);
1010             else
1011               memset (v->s, ' ', i->dest->width);
1012             break;
1013           case FGT | FSTRING:
1014           case FLT | FSTRING:
1015           case FIN | FSTRING:
1016           case FOUT | FSTRING:
1017             v->f = i->int2 ? (double) i->int1 / (double) i->int2 : SYSMIS;
1018             break;
1019           case FGT:
1020           case FLT:
1021           case FIN:
1022           case FOUT:
1023             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
1024             break;
1025           case PGT:
1026           case PGT | FSTRING:
1027           case PLT:
1028           case PLT | FSTRING:
1029           case PIN:
1030           case PIN | FSTRING:
1031           case POUT:
1032           case POUT | FSTRING:
1033             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
1034             break;
1035           case N:
1036             v->f = i->dbl[0];
1037             break;
1038           case NU:
1039             v->f = i->int1;
1040             break;
1041           case FIRST:
1042           case LAST:
1043             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1044             break;
1045           case FIRST | FSTRING:
1046           case LAST | FSTRING:
1047             if (i->int1)
1048               memcpy (v->s, i->string, i->dest->width);
1049             else
1050               memset (v->s, ' ', i->dest->width);
1051             break;
1052           case N_NO_VARS:
1053             v->f = i->dbl[0];
1054             break;
1055           case NU_NO_VARS:
1056             v->f = i->int1;
1057             break;
1058           case NMISS:
1059             v->f = i->dbl[0];
1060             break;
1061           case NUMISS:
1062             v->f = i->int1;
1063             break;
1064           default:
1065             assert (0);
1066           }
1067       }
1068   }
1069 }
1070
1071 /* Resets the state for all the aggregate functions. */
1072 static void
1073 initialize_aggregate_info (struct agr_proc *agr)
1074 {
1075   struct agr_var *iter;
1076
1077   for (iter = agr->agr_vars; iter; iter = iter->next)
1078     {
1079       iter->missing = 0;
1080       switch (iter->function)
1081         {
1082         case MIN:
1083           iter->dbl[0] = DBL_MAX;
1084           break;
1085         case MIN | FSTRING:
1086           memset (iter->string, 255, iter->src->width);
1087           break;
1088         case MAX:
1089           iter->dbl[0] = -DBL_MAX;
1090           break;
1091         case MAX | FSTRING:
1092           memset (iter->string, 0, iter->src->width);
1093           break;
1094         case SD:
1095           if (iter->moments == NULL)
1096             iter->moments = moments1_create (MOMENT_VARIANCE);
1097           else
1098             moments1_clear (iter->moments);
1099           break;
1100         default:
1101           iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1102           iter->int1 = iter->int2 = 0;
1103           break;
1104         }
1105     }
1106 }
1107 \f
1108 /* Aggregate each case as it comes through.  Cases which aren't needed
1109    are dropped. */
1110 static int
1111 agr_to_active_file (struct ccase *c, void *agr_)
1112 {
1113   struct agr_proc *agr = agr_;
1114
1115   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1116     agr->sink->class->write (agr->sink, &agr->agr_case);
1117
1118   return 1;
1119 }
1120
1121 /* Aggregate the current case and output it if we passed a
1122    breakpoint. */
1123 static int
1124 presorted_agr_to_sysfile (struct ccase *c, void *agr_) 
1125 {
1126   struct agr_proc *agr = agr_;
1127
1128   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1129     sfm_write_case (agr->writer, &agr->agr_case);
1130
1131   return 1;
1132 }