Added custom assert(); Fixed bugs in T-TEST and VALUE LABELS
[pspp-builds.git] / src / aggregate.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include "error.h"
22 #include <stdlib.h>
23 #include "alloc.h"
24 #include "command.h"
25 #include "error.h"
26 #include "file-handle.h"
27 #include "lexer.h"
28 #include "misc.h"
29 #include "pool.h"
30 #include "settings.h"
31 #include "sfm.h"
32 #include "sort.h"
33 #include "stats.h"
34 #include "str.h"
35 #include "var.h"
36 #include "vfm.h"
37 #include "vfmP.h"
38
39 /* Specifies how to make an aggregate variable. */
40 struct agr_var
41   {
42     struct agr_var *next;               /* Next in list. */
43
44     /* Collected during parsing. */
45     struct variable *src;       /* Source variable. */
46     struct variable *dest;      /* Target variable. */
47     int function;               /* Function. */
48     int include_missing;        /* 1=Include user-missing values. */
49     union value arg[2];         /* Arguments. */
50
51     /* Accumulated during AGGREGATE execution. */
52     double dbl[3];
53     int int1, int2;
54     char *string;
55     int missing;
56   };
57
58 /* Aggregation functions. */
59 enum
60   {
61     NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
62     FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
63     N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
64     FUNC = 0x1f, /* Function mask. */
65     FSTRING = 1<<5, /* String function bit. */
66   };
67
68 /* Attributes of an aggregation function. */
69 struct agr_func
70   {
71     const char *name;           /* Aggregation function name. */
72     int n_args;                 /* Number of arguments. */
73     int alpha_type;             /* When given ALPHA arguments, output type. */
74     struct fmt_spec format;     /* Format spec if alpha_type != ALPHA. */
75   };
76
77 /* Attributes of aggregation functions. */
78 static const struct agr_func agr_func_tab[] = 
79   {
80     {"<NONE>",  0, -1,      {0, 0, 0}},
81     {"SUM",     0, -1,      {FMT_F, 8, 2}},
82     {"MEAN",    0, -1,      {FMT_F, 8, 2}},
83     {"SD",      0, -1,      {FMT_F, 8, 2}},
84     {"MAX",     0, ALPHA,   {-1, -1, -1}}, 
85     {"MIN",     0, ALPHA,   {-1, -1, -1}}, 
86     {"PGT",     1, NUMERIC, {FMT_F, 5, 1}},      
87     {"PLT",     1, NUMERIC, {FMT_F, 5, 1}},       
88     {"PIN",     2, NUMERIC, {FMT_F, 5, 1}},       
89     {"POUT",    2, NUMERIC, {FMT_F, 5, 1}},       
90     {"FGT",     1, NUMERIC, {FMT_F, 5, 3}},       
91     {"FLT",     1, NUMERIC, {FMT_F, 5, 3}},       
92     {"FIN",     2, NUMERIC, {FMT_F, 5, 3}},       
93     {"FOUT",    2, NUMERIC, {FMT_F, 5, 3}},       
94     {"N",       0, NUMERIC, {FMT_F, 7, 0}},       
95     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},       
96     {"NMISS",   0, NUMERIC, {FMT_F, 7, 0}},       
97     {"NUMISS",  0, NUMERIC, {FMT_F, 7, 0}},       
98     {"FIRST",   0, ALPHA,   {-1, -1, -1}}, 
99     {"LAST",    0, ALPHA,   {-1, -1, -1}},
100     {NULL,      0, -1,      {-1, -1, -1}},
101     {"N",       0, NUMERIC, {FMT_F, 7, 0}},
102     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},
103   };
104
105 /* Missing value types. */
106 enum missing_treatment
107   {
108     ITEMWISE,           /* Missing values item by item. */
109     COLUMNWISE          /* Missing values column by column. */
110   };
111
112 /* An entire AGGREGATE procedure. */
113 struct agr_proc 
114   {
115     /* We have either an output file or a sink. */
116     struct file_handle *out_file;       /* Output file, or null if none. */
117     struct case_sink *sink;             /* Sink, or null if none. */
118
119     enum missing_treatment missing;     /* How to treat missing values. */
120     struct sort_cases_pgm *sort;        /* Sort program. */
121     struct agr_var *vars;               /* First aggregate variable. */
122     struct dictionary *dict;            /* Aggregate dictionary. */
123     int case_cnt;                       /* Counts aggregated cases. */
124     union value *prev_break;            /* Last values of break variables. */
125     struct ccase *agr_case;             /* Aggregate case for output. */
126     flt64 *sfm_agr_case;                /* Aggregate case in SFM format. */
127   };
128
129 static void initialize_aggregate_info (struct agr_proc *);
130
131 /* Prototypes. */
132 static int parse_aggregate_functions (struct agr_proc *);
133 static void agr_destroy (struct agr_proc *);
134 static int aggregate_single_case (struct agr_proc *agr,
135                                   const struct ccase *input,
136                                   struct ccase *output);
137 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
138 static int create_sysfile (struct agr_proc *);
139
140 /* Aggregating to the active file. */
141 static int agr_to_active_file (struct ccase *, void *aux);
142
143 /* Aggregating to a system file. */
144 static void write_case_to_sfm (struct agr_proc *agr);
145 static int presorted_agr_to_sysfile (struct ccase *, void *aux);
146 static int sort_agr_to_sysfile (const struct ccase *, void *aux);
147 \f
148 /* Parsing. */
149
150 /* Parses and executes the AGGREGATE procedure. */
151 int
152 cmd_aggregate (void)
153 {
154   struct agr_proc agr;
155
156   /* Have we seen these subcommands? */
157   unsigned seen = 0;
158
159   agr.out_file = NULL;
160   agr.sink = NULL;
161   agr.missing = ITEMWISE;
162   agr.sort = NULL;
163   agr.vars = NULL;
164   agr.dict = NULL;
165   agr.case_cnt = 0;
166   agr.prev_break = NULL;
167   
168   agr.dict = dict_create ();
169   dict_set_label (agr.dict, dict_get_label (default_dict));
170   dict_set_documents (agr.dict, dict_get_documents (default_dict));
171   
172   /* Read most of the subcommands. */
173   for (;;)
174     {
175       lex_match ('/');
176       
177       if (lex_match_id ("OUTFILE"))
178         {
179           if (seen & 1)
180             {
181               msg (SE, _("%s subcommand given multiple times."),"OUTFILE");
182               goto lossage;
183             }
184           seen |= 1;
185               
186           lex_match ('=');
187           if (lex_match ('*'))
188             agr.out_file = NULL;
189           else 
190             {
191               agr.out_file = fh_parse_file_handle ();
192               if (agr.out_file == NULL)
193                 goto lossage;
194             }
195         }
196       else if (lex_match_id ("MISSING"))
197         {
198           lex_match ('=');
199           if (!lex_match_id ("COLUMNWISE"))
200             {
201               lex_error (_("while expecting COLUMNWISE"));
202               goto lossage;
203             }
204           agr.missing = COLUMNWISE;
205         }
206       else if (lex_match_id ("DOCUMENT"))
207         seen |= 2;
208       else if (lex_match_id ("PRESORTED"))
209         seen |= 4;
210       else if (lex_match_id ("BREAK"))
211         {
212           if (seen & 8)
213             {
214               msg (SE, _("%s subcommand given multiple times."),"BREAK");
215               goto lossage;
216             }
217           seen |= 8;
218
219           lex_match ('=');
220           agr.sort = parse_sort ();
221           if (agr.sort == NULL)
222             goto lossage;
223           
224           {
225             int i;
226             
227             for (i = 0; i < agr.sort->var_cnt; i++)
228               {
229                 struct variable *v;
230               
231                 v = dict_clone_var (agr.dict, agr.sort->vars[i],
232                                     agr.sort->vars[i]->name);
233                 assert (v != NULL);
234               }
235           }
236         }
237       else break;
238     }
239
240   /* Check for proper syntax. */
241   if (!(seen & 8))
242     msg (SW, _("BREAK subcommand not specified."));
243       
244   /* Read in the aggregate functions. */
245   if (!parse_aggregate_functions (&agr))
246     goto lossage;
247
248   /* Delete documents. */
249   if (!(seen & 2))
250     dict_set_documents (agr.dict, NULL);
251
252   /* Cancel SPLIT FILE. */
253   dict_set_split_vars (agr.dict, NULL, 0);
254   
255   /* Initialize. */
256   agr.case_cnt = 0;
257   agr.agr_case = xmalloc (dict_get_case_size (agr.dict));
258   initialize_aggregate_info (&agr);
259
260   /* Output to active file or external file? */
261   if (agr.out_file == NULL) 
262     {
263       /* The active file will be replaced by the aggregated data,
264          so TEMPORARY is moot. */
265       cancel_temporary ();
266
267       if (agr.sort != NULL && (seen & 4) == 0)
268         sort_cases (agr.sort, 0);
269
270       agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
271       if (agr.sink->class->open != NULL)
272         agr.sink->class->open (agr.sink);
273       vfm_sink = create_case_sink (&null_sink_class, default_dict, NULL);
274       procedure (agr_to_active_file, &agr);
275       if (agr.case_cnt > 0) 
276         {
277           dump_aggregate_info (&agr, agr.agr_case);
278           agr.sink->class->write (agr.sink, agr.agr_case);
279         }
280       dict_destroy (default_dict);
281       default_dict = agr.dict;
282       agr.dict = NULL;
283       vfm_source = agr.sink->class->make_source (agr.sink);
284       free_case_sink (agr.sink);
285     }
286   else
287     {
288       if (!create_sysfile (&agr))
289         goto lossage;
290
291       if (agr.sort != NULL && (seen & 4) == 0) 
292         {
293           /* Sorting is needed. */
294           sort_cases (agr.sort, 1);
295           read_sort_output (agr.sort, sort_agr_to_sysfile, NULL);
296         }
297       else 
298         {
299           /* Active file is already sorted. */
300           procedure (presorted_agr_to_sysfile, &agr);
301         }
302       
303       if (agr.case_cnt > 0) 
304         {
305           dump_aggregate_info (&agr, agr.agr_case);
306           write_case_to_sfm (&agr);
307         }
308       fh_close_handle (agr.out_file);
309     }
310   
311   agr_destroy (&agr);
312   return CMD_SUCCESS;
313
314 lossage:
315   agr_destroy (&agr);
316   return CMD_FAILURE;
317 }
318
319 /* Create a system file for use in aggregation to an external
320    file. */
321 static int
322 create_sysfile (struct agr_proc *agr)
323 {
324   struct sfm_write_info w;
325   w.h = agr->out_file;
326   w.dict = agr->dict;
327   w.compress = get_scompression();
328   if (!sfm_write_dictionary (&w))
329     return 0;
330
331   agr->sfm_agr_case = xmalloc (sizeof *agr->sfm_agr_case * w.case_size);
332     
333   return 1;
334 }
335
336 /* Parse all the aggregate functions. */
337 static int
338 parse_aggregate_functions (struct agr_proc *agr)
339 {
340   struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
341
342   /* Parse everything. */
343   tail = NULL;
344   for (;;)
345     {
346       char **dest;
347       char **dest_label;
348       int n_dest;
349
350       int include_missing;
351       const struct agr_func *function;
352       int func_index;
353
354       union value arg[2];
355
356       struct variable **src;
357       int n_src;
358
359       int i;
360
361       dest = NULL;
362       dest_label = NULL;
363       n_dest = 0;
364       src = NULL;
365       function = NULL;
366       n_src = 0;
367       arg[0].c = NULL;
368       arg[1].c = NULL;
369
370       /* Parse the list of target variables. */
371       while (!lex_match ('='))
372         {
373           int n_dest_prev = n_dest;
374           
375           if (!parse_DATA_LIST_vars (&dest, &n_dest, PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
376             goto lossage;
377
378           /* Assign empty labels. */
379           {
380             int j;
381
382             dest_label = xrealloc (dest_label, sizeof *dest_label * n_dest);
383             for (j = n_dest_prev; j < n_dest; j++)
384               dest_label[j] = NULL;
385           }
386           
387           if (token == T_STRING)
388             {
389               ds_truncate (&tokstr, 120);
390               dest_label[n_dest - 1] = xstrdup (ds_value (&tokstr));
391               lex_get ();
392             }
393         }
394
395       /* Get the name of the aggregation function. */
396       if (token != T_ID)
397         {
398           lex_error (_("expecting aggregation function"));
399           goto lossage;
400         }
401
402       include_missing = 0;
403       if (tokid[strlen (tokid) - 1] == '.')
404         {
405           include_missing = 1;
406           tokid[strlen (tokid) - 1] = 0;
407         }
408       
409       for (function = agr_func_tab; function->name; function++)
410         if (!strcmp (function->name, tokid))
411           break;
412       if (NULL == function->name)
413         {
414           msg (SE, _("Unknown aggregation function %s."), tokid);
415           goto lossage;
416         }
417       func_index = function - agr_func_tab;
418       lex_get ();
419
420       /* Check for leading lparen. */
421       if (!lex_match ('('))
422         {
423           if (func_index == N)
424             func_index = N_NO_VARS;
425           else if (func_index == NU)
426             func_index = NU_NO_VARS;
427           else
428             {
429               lex_error (_("expecting `('"));
430               goto lossage;
431             }
432         } else {
433           /* Parse list of source variables. */
434           {
435             int pv_opts = PV_NO_SCRATCH;
436
437             if (func_index == SUM || func_index == MEAN || func_index == SD)
438               pv_opts |= PV_NUMERIC;
439             else if (function->n_args)
440               pv_opts |= PV_SAME_TYPE;
441
442             if (!parse_variables (default_dict, &src, &n_src, pv_opts))
443               goto lossage;
444           }
445
446           /* Parse function arguments, for those functions that
447              require arguments. */
448           if (function->n_args != 0)
449             for (i = 0; i < function->n_args; i++)
450               {
451                 int type;
452             
453                 lex_match (',');
454                 if (token == T_STRING)
455                   {
456                     arg[i].c = xstrdup (ds_value (&tokstr));
457                     type = ALPHA;
458                   }
459                 else if (token == T_NUM)
460                   {
461                     arg[i].f = tokval;
462                     type = NUMERIC;
463                   } else {
464                     msg (SE, _("Missing argument %d to %s."), i + 1, function->name);
465                     goto lossage;
466                   }
467             
468                 lex_get ();
469
470                 if (type != src[0]->type)
471                   {
472                     msg (SE, _("Arguments to %s must be of same type as "
473                                "source variables."),
474                          function->name);
475                     goto lossage;
476                   }
477               }
478
479           /* Trailing rparen. */
480           if (!lex_match(')'))
481             {
482               lex_error (_("expecting `)'"));
483               goto lossage;
484             }
485           
486           /* Now check that the number of source variables match the
487              number of target variables.  Do this here because if we
488              do it earlier then the user can get very misleading error
489              messages; i.e., `AGGREGATE x=SUM(y t).' will get this
490              error message when a proper message would be more like
491              `unknown variable t'. */
492           if (n_src != n_dest)
493             {
494               msg (SE, _("Number of source variables (%d) does not match "
495                          "number of target variables (%d)."),
496                    n_src, n_dest);
497               goto lossage;
498             }
499         }
500         
501       /* Finally add these to the linked list of aggregation
502          variables. */
503       for (i = 0; i < n_dest; i++)
504         {
505           struct agr_var *v = xmalloc (sizeof *v);
506
507           /* Add variable to chain. */
508           if (agr->vars != NULL)
509             tail->next = v;
510           else
511             agr->vars = v;
512           tail = v;
513           tail->next = NULL;
514           
515           /* Create the target variable in the aggregate
516              dictionary. */
517           {
518             struct variable *destvar;
519             
520             v->function = func_index;
521
522             if (src)
523               {
524                 int output_width;
525
526                 v->src = src[i];
527                 
528                 if (src[i]->type == ALPHA)
529                   {
530                     v->function |= FSTRING;
531                     v->string = xmalloc (src[i]->width);
532                   }
533                 
534                 if (v->src->type == NUMERIC || function->alpha_type == NUMERIC)
535                   output_width = 0;
536                 else
537                   output_width = v->src->width;
538
539                 if (function->alpha_type == ALPHA)
540                   destvar = dict_clone_var (agr->dict, v->src, dest[i]);
541                 else
542                   {
543                     destvar = dict_create_var (agr->dict, dest[i], output_width);
544                     if (output_width == 0)
545                       destvar->print = destvar->write = function->format;
546                     if (output_width == 0 && dict_get_weight (default_dict) != NULL
547                         && (func_index == N || func_index == N_NO_VARS
548                             || func_index == NU || func_index == NU_NO_VARS))
549                       {
550                         struct fmt_spec f = {FMT_F, 8, 2};
551                       
552                         destvar->print = destvar->write = f;
553                       }
554                   }
555               } else {
556                 v->src = NULL;
557                 destvar = dict_create_var (agr->dict, dest[i], 0);
558               }
559           
560             if (!destvar)
561               {
562                 msg (SE, _("Variable name %s is not unique within the "
563                            "aggregate file dictionary, which contains "
564                            "the aggregate variables and the break "
565                            "variables."),
566                      dest[i]);
567                 free (dest[i]);
568                 goto lossage;
569               }
570
571             free (dest[i]);
572             destvar->init = 0;
573             if (dest_label[i])
574               {
575                 destvar->label = dest_label[i];
576                 dest_label[i] = NULL;
577               }
578             else if (function->alpha_type == ALPHA)
579               destvar->print = destvar->write = function->format;
580
581             v->dest = destvar;
582           }
583           
584           v->include_missing = include_missing;
585
586           if (v->src != NULL)
587             {
588               int j;
589
590               if (v->src->type == NUMERIC)
591                 for (j = 0; j < function->n_args; j++)
592                   v->arg[j].f = arg[j].f;
593               else
594                 for (j = 0; j < function->n_args; j++)
595                   v->arg[j].c = xstrdup (arg[j].c);
596             }
597         }
598       
599       if (src != NULL && src[0]->type == ALPHA)
600         for (i = 0; i < function->n_args; i++)
601           {
602             free (arg[i].c);
603             arg[i].c = NULL;
604           }
605
606       free (src);
607       free (dest);
608       free (dest_label);
609
610       if (!lex_match ('/'))
611         {
612           if (token == '.')
613             return 1;
614
615           lex_error ("expecting end of command");
616           return 0;
617         }
618       continue;
619       
620     lossage:
621       for (i = 0; i < n_dest; i++)
622         {
623           free (dest[i]);
624           free (dest_label[i]);
625         }
626       free (dest);
627       free (dest_label);
628       free (arg[0].c);
629       free (arg[1].c);
630       if (src && n_src && src[0]->type == ALPHA)
631         for (i = 0; i < function->n_args; i++)
632           {
633             free (arg[i].c);
634             arg[i].c = NULL;
635           }
636       free (src);
637         
638       return 0;
639     }
640 }
641
642 /* Destroys AGR. */
643 static void
644 agr_destroy (struct agr_proc *agr)
645 {
646   struct agr_var *iter, *next;
647
648   if (agr->dict != NULL)
649     dict_destroy (agr->dict);
650   if (agr->sort != NULL)
651     destroy_sort_cases_pgm (agr->sort);
652   for (iter = agr->vars; iter; iter = next)
653     {
654       next = iter->next;
655
656       if (iter->function & FSTRING)
657         {
658           int n_args;
659           int i;
660
661           n_args = agr_func_tab[iter->function & FUNC].n_args;
662           for (i = 0; i < n_args; i++)
663             free (iter->arg[i].c);
664           free (iter->string);
665         }
666       free (iter);
667     }
668   free (agr->prev_break);
669   free (agr->agr_case);
670 }
671 \f
672 /* Execution. */
673
674 static void accumulate_aggregate_info (struct agr_proc *,
675                                        const struct ccase *);
676 static void dump_aggregate_info (struct agr_proc *, struct ccase *);
677
678 /* Processes a single case INPUT for aggregation.  If output is
679    warranted, writes it to OUTPUT and returns nonzero.
680    Otherwise, returns zero and OUTPUT is unmodified. */
681 static int
682 aggregate_single_case (struct agr_proc *agr,
683                        const struct ccase *input, struct ccase *output)
684 {
685   /* The first case always begins a new break group.  We also need to
686      preserve the values of the case for later comparison. */
687   if (agr->case_cnt++ == 0)
688     {
689       int n_elem = 0;
690       
691       {
692         int i;
693
694         for (i = 0; i < agr->sort->var_cnt; i++)
695           n_elem += agr->sort->vars[i]->nv;
696       }
697       
698       agr->prev_break = xmalloc (sizeof *agr->prev_break * n_elem);
699
700       /* Copy INPUT into prev_break. */
701       {
702         union value *iter = agr->prev_break;
703         int i;
704
705         for (i = 0; i < agr->sort->var_cnt; i++)
706           {
707             struct variable *v = agr->sort->vars[i];
708             
709             if (v->type == NUMERIC)
710               (iter++)->f = input->data[v->fv].f;
711             else
712               {
713                 memcpy (iter->s, input->data[v->fv].s, v->width);
714                 iter += v->nv;
715               }
716           }
717       }
718             
719       accumulate_aggregate_info (agr, input);
720         
721       return 0;
722     }
723       
724   /* Compare the value of each break variable to the values on the
725      previous case. */
726   {
727     union value *iter = agr->prev_break;
728     int i;
729     
730     for (i = 0; i < agr->sort->var_cnt; i++)
731       {
732         struct variable *v = agr->sort->vars[i];
733       
734         switch (v->type)
735           {
736           case NUMERIC:
737             if (input->data[v->fv].f != iter->f)
738               goto not_equal;
739             iter++;
740             break;
741           case ALPHA:
742             if (memcmp (input->data[v->fv].s, iter->s, v->width))
743               goto not_equal;
744             iter += v->nv;
745             break;
746           default:
747             assert (0);
748           }
749       }
750   }
751
752   accumulate_aggregate_info (agr, input);
753
754   return 0;
755   
756 not_equal:
757   /* The values of the break variable are different from the values on
758      the previous case.  That means that it's time to dump aggregate
759      info. */
760   dump_aggregate_info (agr, output);
761   initialize_aggregate_info (agr);
762   accumulate_aggregate_info (agr, input);
763
764   /* Copy INPUT into prev_break. */
765   {
766     union value *iter = agr->prev_break;
767     int i;
768
769     for (i = 0; i < agr->sort->var_cnt; i++)
770       {
771         struct variable *v = agr->sort->vars[i];
772             
773         if (v->type == NUMERIC)
774           (iter++)->f = input->data[v->fv].f;
775         else
776           {
777             memcpy (iter->s, input->data[v->fv].s, v->width);
778             iter += v->nv;
779           }
780       }
781   }
782   
783   return 1;
784 }
785
786 /* Accumulates aggregation data from the case INPUT. */
787 static void 
788 accumulate_aggregate_info (struct agr_proc *agr,
789                            const struct ccase *input)
790 {
791   struct agr_var *iter;
792   double weight;
793
794   weight = dict_get_case_weight (default_dict, input);
795
796   for (iter = agr->vars; iter; iter = iter->next)
797     if (iter->src)
798       {
799         const union value *v = &input->data[iter->src->fv];
800
801         if ((!iter->include_missing && is_missing (v, iter->src))
802             || (iter->include_missing && iter->src->type == NUMERIC
803                 && v->f == SYSMIS))
804           {
805             switch (iter->function)
806               {
807               case NMISS:
808                 iter->dbl[0] += weight;
809                 break;
810               case NUMISS:
811                 iter->int1++;
812                 break;
813               }
814             iter->missing = 1;
815             continue;
816           }
817         
818         /* This is horrible.  There are too many possibilities. */
819         switch (iter->function)
820           {
821           case SUM:
822             iter->dbl[0] += v->f;
823             break;
824           case MEAN:
825             iter->dbl[0] += v->f * weight;
826             iter->dbl[1] += weight;
827             break;
828           case SD: 
829             {
830               double product = v->f * weight;
831               iter->dbl[0] += product;
832               iter->dbl[1] += product * v->f;
833               iter->dbl[2] += weight;
834               break; 
835             }
836           case MAX:
837             iter->dbl[0] = max (iter->dbl[0], v->f);
838             iter->int1 = 1;
839             break;
840           case MAX | FSTRING:
841             if (memcmp (iter->string, v->s, iter->src->width) < 0)
842               memcpy (iter->string, v->s, iter->src->width);
843             iter->int1 = 1;
844             break;
845           case MIN:
846             iter->dbl[0] = min (iter->dbl[0], v->f);
847             iter->int1 = 1;
848             break;
849           case MIN | FSTRING:
850             if (memcmp (iter->string, v->s, iter->src->width) > 0)
851               memcpy (iter->string, v->s, iter->src->width);
852             iter->int1 = 1;
853             break;
854           case FGT:
855           case PGT:
856             if (v->f > iter->arg[0].f)
857               iter->dbl[0] += weight;
858             iter->dbl[1] += weight;
859             break;
860           case FGT | FSTRING:
861           case PGT | FSTRING:
862             if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
863               iter->dbl[0] += weight;
864             iter->dbl[1] += weight;
865             break;
866           case FLT:
867           case PLT:
868             if (v->f < iter->arg[0].f)
869               iter->dbl[0] += weight;
870             iter->dbl[1] += weight;
871             break;
872           case FLT | FSTRING:
873           case PLT | FSTRING:
874             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
875               iter->dbl[0] += weight;
876             iter->dbl[1] += weight;
877             break;
878           case FIN:
879           case PIN:
880             if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
881               iter->dbl[0] += weight;
882             iter->dbl[1] += weight;
883             break;
884           case FIN | FSTRING:
885           case PIN | FSTRING:
886             if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
887                 && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
888               iter->dbl[0] += weight;
889             iter->dbl[1] += weight;
890             break;
891           case FOUT:
892           case POUT:
893             if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
894               iter->dbl[0] += weight;
895             iter->dbl[1] += weight;
896             break;
897           case FOUT | FSTRING:
898           case POUT | FSTRING:
899             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
900                 && memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
901               iter->dbl[0] += weight;
902             iter->dbl[1] += weight;
903             break;
904           case N:
905             iter->dbl[0] += weight;
906             break;
907           case NU:
908             iter->int1++;
909             break;
910           case FIRST:
911             if (iter->int1 == 0)
912               {
913                 iter->dbl[0] = v->f;
914                 iter->int1 = 1;
915               }
916             break;
917           case FIRST | FSTRING:
918             if (iter->int1 == 0)
919               {
920                 memcpy (iter->string, v->s, iter->src->width);
921                 iter->int1 = 1;
922               }
923             break;
924           case LAST:
925             iter->dbl[0] = v->f;
926             iter->int1 = 1;
927             break;
928           case LAST | FSTRING:
929             memcpy (iter->string, v->s, iter->src->width);
930             iter->int1 = 1;
931             break;
932           default:
933             assert (0);
934           }
935     } else {
936       switch (iter->function)
937         {
938         case N_NO_VARS:
939           iter->dbl[0] += weight;
940           break;
941         case NU_NO_VARS:
942           iter->int1++;
943           break;
944         default:
945           assert (0);
946         }
947     }
948 }
949
950 /* We've come to a record that differs from the previous in one or
951    more of the break variables.  Make an output record from the
952    accumulated statistics in the OUTPUT case. */
953 static void 
954 dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
955 {
956   {
957     int n_elem = 0;
958     
959     {
960       int i;
961
962       for (i = 0; i < agr->sort->var_cnt; i++)
963         n_elem += agr->sort->vars[i]->nv;
964     }
965     memcpy (output->data, agr->prev_break, sizeof (union value) * n_elem);
966   }
967   
968   {
969     struct agr_var *i;
970   
971     for (i = agr->vars; i; i = i->next)
972       {
973         union value *v = &output->data[i->dest->fv];
974
975         if (agr->missing == COLUMNWISE && i->missing != 0
976             && (i->function & FUNC) != N && (i->function & FUNC) != NU
977             && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
978           {
979             if (i->function & FSTRING)
980               memset (v->s, ' ', i->dest->width);
981             else
982               v->f = SYSMIS;
983             continue;
984           }
985         
986         switch (i->function)
987           {
988           case SUM:
989             v->f = i->dbl[0];
990             break;
991           case MEAN:
992             v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
993             break;
994           case SD:
995             v->f = ((i->dbl[2] > 1.0)
996                     ? calc_stddev (calc_variance (i->dbl, i->dbl[2]))
997                     : SYSMIS);
998             break;
999           case MAX:
1000           case MIN:
1001             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1002             break;
1003           case MAX | FSTRING:
1004           case MIN | FSTRING:
1005             if (i->int1)
1006               memcpy (v->s, i->string, i->dest->width);
1007             else
1008               memset (v->s, ' ', i->dest->width);
1009             break;
1010           case FGT | FSTRING:
1011           case FLT | FSTRING:
1012           case FIN | FSTRING:
1013           case FOUT | FSTRING:
1014             v->f = i->int2 ? (double) i->int1 / (double) i->int2 : SYSMIS;
1015             break;
1016           case FGT:
1017           case FLT:
1018           case FIN:
1019           case FOUT:
1020             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
1021             break;
1022           case PGT:
1023           case PGT | FSTRING:
1024           case PLT:
1025           case PLT | FSTRING:
1026           case PIN:
1027           case PIN | FSTRING:
1028           case POUT:
1029           case POUT | FSTRING:
1030             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
1031             break;
1032           case N:
1033             v->f = i->dbl[0];
1034             break;
1035           case NU:
1036             v->f = i->int1;
1037             break;
1038           case FIRST:
1039           case LAST:
1040             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1041             break;
1042           case FIRST | FSTRING:
1043           case LAST | FSTRING:
1044             if (i->int1)
1045               memcpy (v->s, i->string, i->dest->width);
1046             else
1047               memset (v->s, ' ', i->dest->width);
1048             break;
1049           case N_NO_VARS:
1050             v->f = i->dbl[0];
1051             break;
1052           case NU_NO_VARS:
1053             v->f = i->int1;
1054             break;
1055           case NMISS:
1056             v->f = i->dbl[0];
1057             break;
1058           case NUMISS:
1059             v->f = i->int1;
1060             break;
1061           default:
1062             assert (0);
1063           }
1064       }
1065   }
1066 }
1067
1068 /* Resets the state for all the aggregate functions. */
1069 static void
1070 initialize_aggregate_info (struct agr_proc *agr)
1071 {
1072   struct agr_var *iter;
1073
1074   for (iter = agr->vars; iter; iter = iter->next)
1075     {
1076       iter->missing = 0;
1077       switch (iter->function)
1078         {
1079         case MIN:
1080           iter->dbl[0] = DBL_MAX;
1081           break;
1082         case MIN | FSTRING:
1083           memset (iter->string, 255, iter->src->width);
1084           break;
1085         case MAX:
1086           iter->dbl[0] = -DBL_MAX;
1087           break;
1088         case MAX | FSTRING:
1089           memset (iter->string, 0, iter->src->width);
1090           break;
1091         default:
1092           iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1093           iter->int1 = iter->int2 = 0;
1094           break;
1095         }
1096     }
1097 }
1098 \f
1099 /* Aggregate each case as it comes through.  Cases which aren't needed
1100    are dropped. */
1101 static int
1102 agr_to_active_file (struct ccase *c, void *agr_)
1103 {
1104   struct agr_proc *agr = agr_;
1105
1106   if (aggregate_single_case (agr, c, agr->agr_case)) 
1107     agr->sink->class->write (agr->sink, agr->agr_case);
1108
1109   return 1;
1110 }
1111
1112 /* Writes AGR->agr_case to AGR->out_file. */
1113 static void
1114 write_case_to_sfm (struct agr_proc *agr)
1115 {
1116   flt64 *p;
1117   int i;
1118
1119   p = agr->sfm_agr_case;
1120   for (i = 0; i < dict_get_var_cnt (agr->dict); i++)
1121     {
1122       struct variable *v = dict_get_var (agr->dict, i);
1123       
1124       if (v->type == NUMERIC)
1125         {
1126           double src = agr->agr_case->data[v->fv].f;
1127           if (src == SYSMIS)
1128             *p++ = -FLT64_MAX;
1129           else
1130             *p++ = src;
1131         }
1132       else
1133         {
1134           memcpy (p, agr->agr_case->data[v->fv].s, v->width);
1135           memset (&((char *) p)[v->width], ' ',
1136                   REM_RND_UP (v->width, sizeof (flt64)));
1137           p += DIV_RND_UP (v->width, sizeof (flt64));
1138         }
1139     }
1140
1141   sfm_write_case (agr->out_file, agr->sfm_agr_case, p - agr->sfm_agr_case);
1142 }
1143
1144 /* Aggregate the current case and output it if we passed a
1145    breakpoint. */
1146 static int
1147 presorted_agr_to_sysfile (struct ccase *c, void *agr_) 
1148 {
1149   sort_agr_to_sysfile (c, agr_);
1150   return 1;
1151 }
1152
1153 /* Aggregate the current case and output it if we passed a
1154    breakpoint. */
1155 static int
1156 sort_agr_to_sysfile (const struct ccase *c, void *agr_) 
1157 {
1158   struct agr_proc *agr = agr_;
1159
1160   if (aggregate_single_case (agr, c, agr->agr_case)) 
1161     write_case_to_sfm (agr);
1162
1163   return 1;
1164 }