Implemented the SHOW command and massaged the SET command to fit
[pspp-builds.git] / src / aggregate.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include <assert.h>
22 #include <stdlib.h>
23 #include "alloc.h"
24 #include "command.h"
25 #include "error.h"
26 #include "file-handle.h"
27 #include "lexer.h"
28 #include "misc.h"
29 #include "pool.h"
30 #include "settings.h"
31 #include "sfm.h"
32 #include "sort.h"
33 #include "stats.h"
34 #include "str.h"
35 #include "var.h"
36 #include "vfm.h"
37 #include "vfmP.h"
38
39 /* Specifies how to make an aggregate variable. */
40 struct agr_var
41   {
42     struct agr_var *next;               /* Next in list. */
43
44     /* Collected during parsing. */
45     struct variable *src;       /* Source variable. */
46     struct variable *dest;      /* Target variable. */
47     int function;               /* Function. */
48     int include_missing;        /* 1=Include user-missing values. */
49     union value arg[2];         /* Arguments. */
50
51     /* Accumulated during AGGREGATE execution. */
52     double dbl[3];
53     int int1, int2;
54     char *string;
55     int missing;
56   };
57
58 /* Aggregation functions. */
59 enum
60   {
61     NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
62     FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
63     N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
64     FUNC = 0x1f, /* Function mask. */
65     FSTRING = 1<<5, /* String function bit. */
66   };
67
68 /* Attributes of an aggregation function. */
69 struct agr_func
70   {
71     const char *name;           /* Aggregation function name. */
72     int n_args;                 /* Number of arguments. */
73     int alpha_type;             /* When given ALPHA arguments, output type. */
74     struct fmt_spec format;     /* Format spec if alpha_type != ALPHA. */
75   };
76
77 /* Attributes of aggregation functions. */
78 static const struct agr_func agr_func_tab[] = 
79   {
80     {"<NONE>",  0, -1,      {0, 0, 0}},
81     {"SUM",     0, -1,      {FMT_F, 8, 2}},
82     {"MEAN",    0, -1,      {FMT_F, 8, 2}},
83     {"SD",      0, -1,      {FMT_F, 8, 2}},
84     {"MAX",     0, ALPHA,   {-1, -1, -1}}, 
85     {"MIN",     0, ALPHA,   {-1, -1, -1}}, 
86     {"PGT",     1, NUMERIC, {FMT_F, 5, 1}},      
87     {"PLT",     1, NUMERIC, {FMT_F, 5, 1}},       
88     {"PIN",     2, NUMERIC, {FMT_F, 5, 1}},       
89     {"POUT",    2, NUMERIC, {FMT_F, 5, 1}},       
90     {"FGT",     1, NUMERIC, {FMT_F, 5, 3}},       
91     {"FLT",     1, NUMERIC, {FMT_F, 5, 3}},       
92     {"FIN",     2, NUMERIC, {FMT_F, 5, 3}},       
93     {"FOUT",    2, NUMERIC, {FMT_F, 5, 3}},       
94     {"N",       0, NUMERIC, {FMT_F, 7, 0}},       
95     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},       
96     {"NMISS",   0, NUMERIC, {FMT_F, 7, 0}},       
97     {"NUMISS",  0, NUMERIC, {FMT_F, 7, 0}},       
98     {"FIRST",   0, ALPHA,   {-1, -1, -1}}, 
99     {"LAST",    0, ALPHA,   {-1, -1, -1}},
100     {NULL,      0, -1,      {-1, -1, -1}},
101     {"N",       0, NUMERIC, {FMT_F, 7, 0}},
102     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},
103   };
104
105 /* Missing value types. */
106 enum missing_treatment
107   {
108     ITEMWISE,           /* Missing values item by item. */
109     COLUMNWISE          /* Missing values column by column. */
110   };
111
112 /* An entire AGGREGATE procedure. */
113 struct agr_proc 
114   {
115     /* We have either an output file or a sink. */
116     struct file_handle *out_file;       /* Output file, or null if none. */
117     struct case_sink *sink;             /* Sink, or null if none. */
118
119     enum missing_treatment missing;     /* How to treat missing values. */
120     struct sort_cases_pgm *sort;        /* Sort program. */
121     struct agr_var *vars;               /* First aggregate variable. */
122     struct dictionary *dict;            /* Aggregate dictionary. */
123     int case_cnt;                       /* Counts aggregated cases. */
124     union value *prev_break;            /* Last values of break variables. */
125     struct ccase *agr_case;             /* Aggregate case for output. */
126     flt64 *sfm_agr_case;                /* Aggregate case in SFM format. */
127   };
128
129 static void initialize_aggregate_info (struct agr_proc *);
130
131 /* Prototypes. */
132 static int parse_aggregate_functions (struct agr_proc *);
133 static void agr_destroy (struct agr_proc *);
134 static int aggregate_single_case (struct agr_proc *agr,
135                                   const struct ccase *input,
136                                   struct ccase *output);
137 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
138 static int create_sysfile (struct agr_proc *);
139
140 /* Aggregating to the active file. */
141 static int agr_to_active_file (struct ccase *, void *aux);
142
143 /* Aggregating to a system file. */
144 static void write_case_to_sfm (struct agr_proc *agr);
145 static int presorted_agr_to_sysfile (struct ccase *, void *aux);
146 static int sort_agr_to_sysfile (const struct ccase *, void *aux);
147 \f
148 /* Parsing. */
149
150 /* Parses and executes the AGGREGATE procedure. */
151 int
152 cmd_aggregate (void)
153 {
154   struct agr_proc agr;
155
156   /* Have we seen these subcommands? */
157   unsigned seen = 0;
158
159   agr.out_file = NULL;
160   agr.sink = NULL;
161   agr.missing = ITEMWISE;
162   agr.sort = NULL;
163   agr.vars = NULL;
164   agr.dict = NULL;
165   agr.case_cnt = 0;
166   agr.prev_break = NULL;
167   
168   agr.dict = dict_create ();
169   dict_set_label (agr.dict, dict_get_label (default_dict));
170   dict_set_documents (agr.dict, dict_get_documents (default_dict));
171   
172   lex_match_id ("AGGREGATE");
173
174   /* Read most of the subcommands. */
175   for (;;)
176     {
177       lex_match ('/');
178       
179       if (lex_match_id ("OUTFILE"))
180         {
181           if (seen & 1)
182             {
183               msg (SE, _("%s subcommand given multiple times."),"OUTFILE");
184               goto lossage;
185             }
186           seen |= 1;
187               
188           lex_match ('=');
189           if (lex_match ('*'))
190             agr.out_file = NULL;
191           else 
192             {
193               agr.out_file = fh_parse_file_handle ();
194               if (agr.out_file == NULL)
195                 goto lossage;
196             }
197         }
198       else if (lex_match_id ("MISSING"))
199         {
200           lex_match ('=');
201           if (!lex_match_id ("COLUMNWISE"))
202             {
203               lex_error (_("while expecting COLUMNWISE"));
204               goto lossage;
205             }
206           agr.missing = COLUMNWISE;
207         }
208       else if (lex_match_id ("DOCUMENT"))
209         seen |= 2;
210       else if (lex_match_id ("PRESORTED"))
211         seen |= 4;
212       else if (lex_match_id ("BREAK"))
213         {
214           if (seen & 8)
215             {
216               msg (SE, _("%s subcommand given multiple times."),"BREAK");
217               goto lossage;
218             }
219           seen |= 8;
220
221           lex_match ('=');
222           agr.sort = parse_sort ();
223           if (agr.sort == NULL)
224             goto lossage;
225           
226           {
227             int i;
228             
229             for (i = 0; i < agr.sort->var_cnt; i++)
230               {
231                 struct variable *v;
232               
233                 v = dict_clone_var (agr.dict, agr.sort->vars[i],
234                                     agr.sort->vars[i]->name);
235                 assert (v != NULL);
236               }
237           }
238         }
239       else break;
240     }
241
242   /* Check for proper syntax. */
243   if (!(seen & 8))
244     msg (SW, _("BREAK subcommand not specified."));
245       
246   /* Read in the aggregate functions. */
247   if (!parse_aggregate_functions (&agr))
248     goto lossage;
249
250   /* Delete documents. */
251   if (!(seen & 2))
252     dict_set_documents (agr.dict, NULL);
253
254   /* Cancel SPLIT FILE. */
255   dict_set_split_vars (agr.dict, NULL, 0);
256   
257   /* Initialize. */
258   agr.case_cnt = 0;
259   agr.agr_case = xmalloc (dict_get_case_size (agr.dict));
260   initialize_aggregate_info (&agr);
261
262   /* Output to active file or external file? */
263   if (agr.out_file == NULL) 
264     {
265       /* The active file will be replaced by the aggregated data,
266          so TEMPORARY is moot. */
267       cancel_temporary ();
268
269       if (agr.sort != NULL && (seen & 4) == 0)
270         sort_cases (agr.sort, 0);
271
272       agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
273       if (agr.sink->class->open != NULL)
274         agr.sink->class->open (agr.sink);
275       vfm_sink = create_case_sink (&null_sink_class, default_dict, NULL);
276       procedure (agr_to_active_file, &agr);
277       if (agr.case_cnt > 0) 
278         {
279           dump_aggregate_info (&agr, agr.agr_case);
280           agr.sink->class->write (agr.sink, agr.agr_case);
281         }
282       dict_destroy (default_dict);
283       default_dict = agr.dict;
284       agr.dict = NULL;
285       vfm_source = agr.sink->class->make_source (agr.sink);
286       free_case_sink (agr.sink);
287     }
288   else
289     {
290       if (!create_sysfile (&agr))
291         goto lossage;
292
293       if (agr.sort != NULL && (seen & 4) == 0) 
294         {
295           /* Sorting is needed. */
296           sort_cases (agr.sort, 1);
297           read_sort_output (agr.sort, sort_agr_to_sysfile, NULL);
298         }
299       else 
300         {
301           /* Active file is already sorted. */
302           procedure (presorted_agr_to_sysfile, &agr);
303         }
304       
305       if (agr.case_cnt > 0) 
306         {
307           dump_aggregate_info (&agr, agr.agr_case);
308           write_case_to_sfm (&agr);
309         }
310       fh_close_handle (agr.out_file);
311     }
312   
313   agr_destroy (&agr);
314   return CMD_SUCCESS;
315
316 lossage:
317   agr_destroy (&agr);
318   return CMD_FAILURE;
319 }
320
321 /* Create a system file for use in aggregation to an external
322    file. */
323 static int
324 create_sysfile (struct agr_proc *agr)
325 {
326   struct sfm_write_info w;
327   w.h = agr->out_file;
328   w.dict = agr->dict;
329   w.compress = get_scompression();
330   if (!sfm_write_dictionary (&w))
331     return 0;
332
333   agr->sfm_agr_case = xmalloc (sizeof *agr->sfm_agr_case * w.case_size);
334     
335   return 1;
336 }
337
338 /* Parse all the aggregate functions. */
339 static int
340 parse_aggregate_functions (struct agr_proc *agr)
341 {
342   struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
343
344   /* Parse everything. */
345   tail = NULL;
346   for (;;)
347     {
348       char **dest;
349       char **dest_label;
350       int n_dest;
351
352       int include_missing;
353       const struct agr_func *function;
354       int func_index;
355
356       union value arg[2];
357
358       struct variable **src;
359       int n_src;
360
361       int i;
362
363       dest = NULL;
364       dest_label = NULL;
365       n_dest = 0;
366       src = NULL;
367       function = NULL;
368       n_src = 0;
369       arg[0].c = NULL;
370       arg[1].c = NULL;
371
372       /* Parse the list of target variables. */
373       while (!lex_match ('='))
374         {
375           int n_dest_prev = n_dest;
376           
377           if (!parse_DATA_LIST_vars (&dest, &n_dest, PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
378             goto lossage;
379
380           /* Assign empty labels. */
381           {
382             int j;
383
384             dest_label = xrealloc (dest_label, sizeof *dest_label * n_dest);
385             for (j = n_dest_prev; j < n_dest; j++)
386               dest_label[j] = NULL;
387           }
388           
389           if (token == T_STRING)
390             {
391               ds_truncate (&tokstr, 120);
392               dest_label[n_dest - 1] = xstrdup (ds_value (&tokstr));
393               lex_get ();
394             }
395         }
396
397       /* Get the name of the aggregation function. */
398       if (token != T_ID)
399         {
400           lex_error (_("expecting aggregation function"));
401           goto lossage;
402         }
403
404       include_missing = 0;
405       if (tokid[strlen (tokid) - 1] == '.')
406         {
407           include_missing = 1;
408           tokid[strlen (tokid) - 1] = 0;
409         }
410       
411       for (function = agr_func_tab; function->name; function++)
412         if (!strcmp (function->name, tokid))
413           break;
414       if (NULL == function->name)
415         {
416           msg (SE, _("Unknown aggregation function %s."), tokid);
417           goto lossage;
418         }
419       func_index = function - agr_func_tab;
420       lex_get ();
421
422       /* Check for leading lparen. */
423       if (!lex_match ('('))
424         {
425           if (func_index == N)
426             func_index = N_NO_VARS;
427           else if (func_index == NU)
428             func_index = NU_NO_VARS;
429           else
430             {
431               lex_error (_("expecting `('"));
432               goto lossage;
433             }
434         } else {
435           /* Parse list of source variables. */
436           {
437             int pv_opts = PV_NO_SCRATCH;
438
439             if (func_index == SUM || func_index == MEAN || func_index == SD)
440               pv_opts |= PV_NUMERIC;
441             else if (function->n_args)
442               pv_opts |= PV_SAME_TYPE;
443
444             if (!parse_variables (default_dict, &src, &n_src, pv_opts))
445               goto lossage;
446           }
447
448           /* Parse function arguments, for those functions that
449              require arguments. */
450           if (function->n_args != 0)
451             for (i = 0; i < function->n_args; i++)
452               {
453                 int type;
454             
455                 lex_match (',');
456                 if (token == T_STRING)
457                   {
458                     arg[i].c = xstrdup (ds_value (&tokstr));
459                     type = ALPHA;
460                   }
461                 else if (token == T_NUM)
462                   {
463                     arg[i].f = tokval;
464                     type = NUMERIC;
465                   } else {
466                     msg (SE, _("Missing argument %d to %s."), i + 1, function->name);
467                     goto lossage;
468                   }
469             
470                 lex_get ();
471
472                 if (type != src[0]->type)
473                   {
474                     msg (SE, _("Arguments to %s must be of same type as "
475                                "source variables."),
476                          function->name);
477                     goto lossage;
478                   }
479               }
480
481           /* Trailing rparen. */
482           if (!lex_match(')'))
483             {
484               lex_error (_("expecting `)'"));
485               goto lossage;
486             }
487           
488           /* Now check that the number of source variables match the
489              number of target variables.  Do this here because if we
490              do it earlier then the user can get very misleading error
491              messages; i.e., `AGGREGATE x=SUM(y t).' will get this
492              error message when a proper message would be more like
493              `unknown variable t'. */
494           if (n_src != n_dest)
495             {
496               msg (SE, _("Number of source variables (%d) does not match "
497                          "number of target variables (%d)."),
498                    n_src, n_dest);
499               goto lossage;
500             }
501         }
502         
503       /* Finally add these to the linked list of aggregation
504          variables. */
505       for (i = 0; i < n_dest; i++)
506         {
507           struct agr_var *v = xmalloc (sizeof *v);
508
509           /* Add variable to chain. */
510           if (agr->vars != NULL)
511             tail->next = v;
512           else
513             agr->vars = v;
514           tail = v;
515           tail->next = NULL;
516           
517           /* Create the target variable in the aggregate
518              dictionary. */
519           {
520             struct variable *destvar;
521             
522             v->function = func_index;
523
524             if (src)
525               {
526                 int output_width;
527
528                 v->src = src[i];
529                 
530                 if (src[i]->type == ALPHA)
531                   {
532                     v->function |= FSTRING;
533                     v->string = xmalloc (src[i]->width);
534                   }
535                 
536                 if (v->src->type == NUMERIC || function->alpha_type == NUMERIC)
537                   output_width = 0;
538                 else
539                   output_width = v->src->width;
540
541                 if (function->alpha_type == ALPHA)
542                   destvar = dict_clone_var (agr->dict, v->src, dest[i]);
543                 else
544                   {
545                     destvar = dict_create_var (agr->dict, dest[i], output_width);
546                     if (output_width == 0)
547                       destvar->print = destvar->write = function->format;
548                     if (output_width == 0 && dict_get_weight (default_dict) != NULL
549                         && (func_index == N || func_index == N_NO_VARS
550                             || func_index == NU || func_index == NU_NO_VARS))
551                       {
552                         struct fmt_spec f = {FMT_F, 8, 2};
553                       
554                         destvar->print = destvar->write = f;
555                       }
556                   }
557               } else {
558                 v->src = NULL;
559                 destvar = dict_create_var (agr->dict, dest[i], 0);
560               }
561           
562             if (!destvar)
563               {
564                 msg (SE, _("Variable name %s is not unique within the "
565                            "aggregate file dictionary, which contains "
566                            "the aggregate variables and the break "
567                            "variables."),
568                      dest[i]);
569                 free (dest[i]);
570                 goto lossage;
571               }
572
573             free (dest[i]);
574             destvar->init = 0;
575             if (dest_label[i])
576               {
577                 destvar->label = dest_label[i];
578                 dest_label[i] = NULL;
579               }
580             else if (function->alpha_type == ALPHA)
581               destvar->print = destvar->write = function->format;
582
583             v->dest = destvar;
584           }
585           
586           v->include_missing = include_missing;
587
588           if (v->src != NULL)
589             {
590               int j;
591
592               if (v->src->type == NUMERIC)
593                 for (j = 0; j < function->n_args; j++)
594                   v->arg[j].f = arg[j].f;
595               else
596                 for (j = 0; j < function->n_args; j++)
597                   v->arg[j].c = xstrdup (arg[j].c);
598             }
599         }
600       
601       if (src != NULL && src[0]->type == ALPHA)
602         for (i = 0; i < function->n_args; i++)
603           {
604             free (arg[i].c);
605             arg[i].c = NULL;
606           }
607
608       free (src);
609       free (dest);
610       free (dest_label);
611
612       if (!lex_match ('/'))
613         {
614           if (token == '.')
615             return 1;
616
617           lex_error ("expecting end of command");
618           return 0;
619         }
620       continue;
621       
622     lossage:
623       for (i = 0; i < n_dest; i++)
624         {
625           free (dest[i]);
626           free (dest_label[i]);
627         }
628       free (dest);
629       free (dest_label);
630       free (arg[0].c);
631       free (arg[1].c);
632       if (src && n_src && src[0]->type == ALPHA)
633         for (i = 0; i < function->n_args; i++)
634           {
635             free (arg[i].c);
636             arg[i].c = NULL;
637           }
638       free (src);
639         
640       return 0;
641     }
642 }
643
644 /* Destroys AGR. */
645 static void
646 agr_destroy (struct agr_proc *agr)
647 {
648   struct agr_var *iter, *next;
649
650   if (agr->dict != NULL)
651     dict_destroy (agr->dict);
652   if (agr->sort != NULL)
653     destroy_sort_cases_pgm (agr->sort);
654   for (iter = agr->vars; iter; iter = next)
655     {
656       next = iter->next;
657
658       if (iter->function & FSTRING)
659         {
660           int n_args;
661           int i;
662
663           n_args = agr_func_tab[iter->function & FUNC].n_args;
664           for (i = 0; i < n_args; i++)
665             free (iter->arg[i].c);
666           free (iter->string);
667         }
668       free (iter);
669     }
670   free (agr->prev_break);
671   free (agr->agr_case);
672 }
673 \f
674 /* Execution. */
675
676 static void accumulate_aggregate_info (struct agr_proc *,
677                                        const struct ccase *);
678 static void dump_aggregate_info (struct agr_proc *, struct ccase *);
679
680 /* Processes a single case INPUT for aggregation.  If output is
681    warranted, writes it to OUTPUT and returns nonzero.
682    Otherwise, returns zero and OUTPUT is unmodified. */
683 static int
684 aggregate_single_case (struct agr_proc *agr,
685                        const struct ccase *input, struct ccase *output)
686 {
687   /* The first case always begins a new break group.  We also need to
688      preserve the values of the case for later comparison. */
689   if (agr->case_cnt++ == 0)
690     {
691       int n_elem = 0;
692       
693       {
694         int i;
695
696         for (i = 0; i < agr->sort->var_cnt; i++)
697           n_elem += agr->sort->vars[i]->nv;
698       }
699       
700       agr->prev_break = xmalloc (sizeof *agr->prev_break * n_elem);
701
702       /* Copy INPUT into prev_break. */
703       {
704         union value *iter = agr->prev_break;
705         int i;
706
707         for (i = 0; i < agr->sort->var_cnt; i++)
708           {
709             struct variable *v = agr->sort->vars[i];
710             
711             if (v->type == NUMERIC)
712               (iter++)->f = input->data[v->fv].f;
713             else
714               {
715                 memcpy (iter->s, input->data[v->fv].s, v->width);
716                 iter += v->nv;
717               }
718           }
719       }
720             
721       accumulate_aggregate_info (agr, input);
722         
723       return 0;
724     }
725       
726   /* Compare the value of each break variable to the values on the
727      previous case. */
728   {
729     union value *iter = agr->prev_break;
730     int i;
731     
732     for (i = 0; i < agr->sort->var_cnt; i++)
733       {
734         struct variable *v = agr->sort->vars[i];
735       
736         switch (v->type)
737           {
738           case NUMERIC:
739             if (input->data[v->fv].f != iter->f)
740               goto not_equal;
741             iter++;
742             break;
743           case ALPHA:
744             if (memcmp (input->data[v->fv].s, iter->s, v->width))
745               goto not_equal;
746             iter += v->nv;
747             break;
748           default:
749             assert (0);
750           }
751       }
752   }
753
754   accumulate_aggregate_info (agr, input);
755
756   return 0;
757   
758 not_equal:
759   /* The values of the break variable are different from the values on
760      the previous case.  That means that it's time to dump aggregate
761      info. */
762   dump_aggregate_info (agr, output);
763   initialize_aggregate_info (agr);
764   accumulate_aggregate_info (agr, input);
765
766   /* Copy INPUT into prev_break. */
767   {
768     union value *iter = agr->prev_break;
769     int i;
770
771     for (i = 0; i < agr->sort->var_cnt; i++)
772       {
773         struct variable *v = agr->sort->vars[i];
774             
775         if (v->type == NUMERIC)
776           (iter++)->f = input->data[v->fv].f;
777         else
778           {
779             memcpy (iter->s, input->data[v->fv].s, v->width);
780             iter += v->nv;
781           }
782       }
783   }
784   
785   return 1;
786 }
787
788 /* Accumulates aggregation data from the case INPUT. */
789 static void 
790 accumulate_aggregate_info (struct agr_proc *agr,
791                            const struct ccase *input)
792 {
793   struct agr_var *iter;
794   double weight;
795
796   weight = dict_get_case_weight (default_dict, input);
797
798   for (iter = agr->vars; iter; iter = iter->next)
799     if (iter->src)
800       {
801         const union value *v = &input->data[iter->src->fv];
802
803         if ((!iter->include_missing && is_missing (v, iter->src))
804             || (iter->include_missing && iter->src->type == NUMERIC
805                 && v->f == SYSMIS))
806           {
807             switch (iter->function)
808               {
809               case NMISS:
810                 iter->dbl[0] += weight;
811                 break;
812               case NUMISS:
813                 iter->int1++;
814                 break;
815               }
816             iter->missing = 1;
817             continue;
818           }
819         
820         /* This is horrible.  There are too many possibilities. */
821         switch (iter->function)
822           {
823           case SUM:
824             iter->dbl[0] += v->f;
825             break;
826           case MEAN:
827             iter->dbl[0] += v->f * weight;
828             iter->dbl[1] += weight;
829             break;
830           case SD: 
831             {
832               double product = v->f * weight;
833               iter->dbl[0] += product;
834               iter->dbl[1] += product * v->f;
835               iter->dbl[2] += weight;
836               break; 
837             }
838           case MAX:
839             iter->dbl[0] = max (iter->dbl[0], v->f);
840             iter->int1 = 1;
841             break;
842           case MAX | FSTRING:
843             if (memcmp (iter->string, v->s, iter->src->width) < 0)
844               memcpy (iter->string, v->s, iter->src->width);
845             iter->int1 = 1;
846             break;
847           case MIN:
848             iter->dbl[0] = min (iter->dbl[0], v->f);
849             iter->int1 = 1;
850             break;
851           case MIN | FSTRING:
852             if (memcmp (iter->string, v->s, iter->src->width) > 0)
853               memcpy (iter->string, v->s, iter->src->width);
854             iter->int1 = 1;
855             break;
856           case FGT:
857           case PGT:
858             if (v->f > iter->arg[0].f)
859               iter->dbl[0] += weight;
860             iter->dbl[1] += weight;
861             break;
862           case FGT | FSTRING:
863           case PGT | FSTRING:
864             if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
865               iter->dbl[0] += weight;
866             iter->dbl[1] += weight;
867             break;
868           case FLT:
869           case PLT:
870             if (v->f < iter->arg[0].f)
871               iter->dbl[0] += weight;
872             iter->dbl[1] += weight;
873             break;
874           case FLT | FSTRING:
875           case PLT | FSTRING:
876             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
877               iter->dbl[0] += weight;
878             iter->dbl[1] += weight;
879             break;
880           case FIN:
881           case PIN:
882             if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
883               iter->dbl[0] += weight;
884             iter->dbl[1] += weight;
885             break;
886           case FIN | FSTRING:
887           case PIN | FSTRING:
888             if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
889                 && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
890               iter->dbl[0] += weight;
891             iter->dbl[1] += weight;
892             break;
893           case FOUT:
894           case POUT:
895             if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
896               iter->dbl[0] += weight;
897             iter->dbl[1] += weight;
898             break;
899           case FOUT | FSTRING:
900           case POUT | FSTRING:
901             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
902                 && memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
903               iter->dbl[0] += weight;
904             iter->dbl[1] += weight;
905             break;
906           case N:
907             iter->dbl[0] += weight;
908             break;
909           case NU:
910             iter->int1++;
911             break;
912           case FIRST:
913             if (iter->int1 == 0)
914               {
915                 iter->dbl[0] = v->f;
916                 iter->int1 = 1;
917               }
918             break;
919           case FIRST | FSTRING:
920             if (iter->int1 == 0)
921               {
922                 memcpy (iter->string, v->s, iter->src->width);
923                 iter->int1 = 1;
924               }
925             break;
926           case LAST:
927             iter->dbl[0] = v->f;
928             iter->int1 = 1;
929             break;
930           case LAST | FSTRING:
931             memcpy (iter->string, v->s, iter->src->width);
932             iter->int1 = 1;
933             break;
934           default:
935             assert (0);
936           }
937     } else {
938       switch (iter->function)
939         {
940         case N_NO_VARS:
941           iter->dbl[0] += weight;
942           break;
943         case NU_NO_VARS:
944           iter->int1++;
945           break;
946         default:
947           assert (0);
948         }
949     }
950 }
951
952 /* We've come to a record that differs from the previous in one or
953    more of the break variables.  Make an output record from the
954    accumulated statistics in the OUTPUT case. */
955 static void 
956 dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
957 {
958   {
959     int n_elem = 0;
960     
961     {
962       int i;
963
964       for (i = 0; i < agr->sort->var_cnt; i++)
965         n_elem += agr->sort->vars[i]->nv;
966     }
967     memcpy (output->data, agr->prev_break, sizeof (union value) * n_elem);
968   }
969   
970   {
971     struct agr_var *i;
972   
973     for (i = agr->vars; i; i = i->next)
974       {
975         union value *v = &output->data[i->dest->fv];
976
977         if (agr->missing == COLUMNWISE && i->missing != 0
978             && (i->function & FUNC) != N && (i->function & FUNC) != NU
979             && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
980           {
981             if (i->function & FSTRING)
982               memset (v->s, ' ', i->dest->width);
983             else
984               v->f = SYSMIS;
985             continue;
986           }
987         
988         switch (i->function)
989           {
990           case SUM:
991             v->f = i->dbl[0];
992             break;
993           case MEAN:
994             v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
995             break;
996           case SD:
997             v->f = ((i->dbl[2] > 1.0)
998                     ? calc_stddev (calc_variance (i->dbl, i->dbl[2]))
999                     : SYSMIS);
1000             break;
1001           case MAX:
1002           case MIN:
1003             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1004             break;
1005           case MAX | FSTRING:
1006           case MIN | FSTRING:
1007             if (i->int1)
1008               memcpy (v->s, i->string, i->dest->width);
1009             else
1010               memset (v->s, ' ', i->dest->width);
1011             break;
1012           case FGT | FSTRING:
1013           case FLT | FSTRING:
1014           case FIN | FSTRING:
1015           case FOUT | FSTRING:
1016             v->f = i->int2 ? (double) i->int1 / (double) i->int2 : SYSMIS;
1017             break;
1018           case FGT:
1019           case FLT:
1020           case FIN:
1021           case FOUT:
1022             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
1023             break;
1024           case PGT:
1025           case PGT | FSTRING:
1026           case PLT:
1027           case PLT | FSTRING:
1028           case PIN:
1029           case PIN | FSTRING:
1030           case POUT:
1031           case POUT | FSTRING:
1032             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
1033             break;
1034           case N:
1035             v->f = i->dbl[0];
1036             break;
1037           case NU:
1038             v->f = i->int1;
1039             break;
1040           case FIRST:
1041           case LAST:
1042             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1043             break;
1044           case FIRST | FSTRING:
1045           case LAST | FSTRING:
1046             if (i->int1)
1047               memcpy (v->s, i->string, i->dest->width);
1048             else
1049               memset (v->s, ' ', i->dest->width);
1050             break;
1051           case N_NO_VARS:
1052             v->f = i->dbl[0];
1053             break;
1054           case NU_NO_VARS:
1055             v->f = i->int1;
1056             break;
1057           case NMISS:
1058             v->f = i->dbl[0];
1059             break;
1060           case NUMISS:
1061             v->f = i->int1;
1062             break;
1063           default:
1064             assert (0);
1065           }
1066       }
1067   }
1068 }
1069
1070 /* Resets the state for all the aggregate functions. */
1071 static void
1072 initialize_aggregate_info (struct agr_proc *agr)
1073 {
1074   struct agr_var *iter;
1075
1076   for (iter = agr->vars; iter; iter = iter->next)
1077     {
1078       iter->missing = 0;
1079       switch (iter->function)
1080         {
1081         case MIN:
1082           iter->dbl[0] = DBL_MAX;
1083           break;
1084         case MIN | FSTRING:
1085           memset (iter->string, 255, iter->src->width);
1086           break;
1087         case MAX:
1088           iter->dbl[0] = -DBL_MAX;
1089           break;
1090         case MAX | FSTRING:
1091           memset (iter->string, 0, iter->src->width);
1092           break;
1093         default:
1094           iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1095           iter->int1 = iter->int2 = 0;
1096           break;
1097         }
1098     }
1099 }
1100 \f
1101 /* Aggregate each case as it comes through.  Cases which aren't needed
1102    are dropped. */
1103 static int
1104 agr_to_active_file (struct ccase *c, void *agr_)
1105 {
1106   struct agr_proc *agr = agr_;
1107
1108   if (aggregate_single_case (agr, c, agr->agr_case)) 
1109     agr->sink->class->write (agr->sink, agr->agr_case);
1110
1111   return 1;
1112 }
1113
1114 /* Writes AGR->agr_case to AGR->out_file. */
1115 static void
1116 write_case_to_sfm (struct agr_proc *agr)
1117 {
1118   flt64 *p;
1119   int i;
1120
1121   p = agr->sfm_agr_case;
1122   for (i = 0; i < dict_get_var_cnt (agr->dict); i++)
1123     {
1124       struct variable *v = dict_get_var (agr->dict, i);
1125       
1126       if (v->type == NUMERIC)
1127         {
1128           double src = agr->agr_case->data[v->fv].f;
1129           if (src == SYSMIS)
1130             *p++ = -FLT64_MAX;
1131           else
1132             *p++ = src;
1133         }
1134       else
1135         {
1136           memcpy (p, agr->agr_case->data[v->fv].s, v->width);
1137           memset (&((char *) p)[v->width], ' ',
1138                   REM_RND_UP (v->width, sizeof (flt64)));
1139           p += DIV_RND_UP (v->width, sizeof (flt64));
1140         }
1141     }
1142
1143   sfm_write_case (agr->out_file, agr->sfm_agr_case, p - agr->sfm_agr_case);
1144 }
1145
1146 /* Aggregate the current case and output it if we passed a
1147    breakpoint. */
1148 static int
1149 presorted_agr_to_sysfile (struct ccase *c, void *agr_) 
1150 {
1151   sort_agr_to_sysfile (c, agr_);
1152   return 1;
1153 }
1154
1155 /* Aggregate the current case and output it if we passed a
1156    breakpoint. */
1157 static int
1158 sort_agr_to_sysfile (const struct ccase *c, void *agr_) 
1159 {
1160   struct agr_proc *agr = agr_;
1161
1162   if (aggregate_single_case (agr, c, agr->agr_case)) 
1163     write_case_to_sfm (agr);
1164
1165   return 1;
1166 }