Add multipass procedures. Add two-pass moments calculation. Rewrite
[pspp] / src / aggregate.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include "error.h"
22 #include <stdlib.h>
23 #include "alloc.h"
24 #include "command.h"
25 #include "error.h"
26 #include "file-handle.h"
27 #include "lexer.h"
28 #include "misc.h"
29 #include "moments.h"
30 #include "pool.h"
31 #include "settings.h"
32 #include "sfm.h"
33 #include "sort.h"
34 #include "str.h"
35 #include "var.h"
36 #include "vfm.h"
37 #include "vfmP.h"
38
39 /* Specifies how to make an aggregate variable. */
40 struct agr_var
41   {
42     struct agr_var *next;               /* Next in list. */
43
44     /* Collected during parsing. */
45     struct variable *src;       /* Source variable. */
46     struct variable *dest;      /* Target variable. */
47     int function;               /* Function. */
48     int include_missing;        /* 1=Include user-missing values. */
49     union value arg[2];         /* Arguments. */
50
51     /* Accumulated during AGGREGATE execution. */
52     double dbl[3];
53     int int1, int2;
54     char *string;
55     int missing;
56     struct moments *moments;
57   };
58
59 /* Aggregation functions. */
60 enum
61   {
62     NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
63     FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
64     N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
65     FUNC = 0x1f, /* Function mask. */
66     FSTRING = 1<<5, /* String function bit. */
67   };
68
69 /* Attributes of an aggregation function. */
70 struct agr_func
71   {
72     const char *name;           /* Aggregation function name. */
73     int n_args;                 /* Number of arguments. */
74     int alpha_type;             /* When given ALPHA arguments, output type. */
75     struct fmt_spec format;     /* Format spec if alpha_type != ALPHA. */
76   };
77
78 /* Attributes of aggregation functions. */
79 static const struct agr_func agr_func_tab[] = 
80   {
81     {"<NONE>",  0, -1,      {0, 0, 0}},
82     {"SUM",     0, -1,      {FMT_F, 8, 2}},
83     {"MEAN",    0, -1,      {FMT_F, 8, 2}},
84     {"SD",      0, -1,      {FMT_F, 8, 2}},
85     {"MAX",     0, ALPHA,   {-1, -1, -1}}, 
86     {"MIN",     0, ALPHA,   {-1, -1, -1}}, 
87     {"PGT",     1, NUMERIC, {FMT_F, 5, 1}},      
88     {"PLT",     1, NUMERIC, {FMT_F, 5, 1}},       
89     {"PIN",     2, NUMERIC, {FMT_F, 5, 1}},       
90     {"POUT",    2, NUMERIC, {FMT_F, 5, 1}},       
91     {"FGT",     1, NUMERIC, {FMT_F, 5, 3}},       
92     {"FLT",     1, NUMERIC, {FMT_F, 5, 3}},       
93     {"FIN",     2, NUMERIC, {FMT_F, 5, 3}},       
94     {"FOUT",    2, NUMERIC, {FMT_F, 5, 3}},       
95     {"N",       0, NUMERIC, {FMT_F, 7, 0}},       
96     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},       
97     {"NMISS",   0, NUMERIC, {FMT_F, 7, 0}},       
98     {"NUMISS",  0, NUMERIC, {FMT_F, 7, 0}},       
99     {"FIRST",   0, ALPHA,   {-1, -1, -1}}, 
100     {"LAST",    0, ALPHA,   {-1, -1, -1}},
101     {NULL,      0, -1,      {-1, -1, -1}},
102     {"N",       0, NUMERIC, {FMT_F, 7, 0}},
103     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},
104   };
105
106 /* Missing value types. */
107 enum missing_treatment
108   {
109     ITEMWISE,           /* Missing values item by item. */
110     COLUMNWISE          /* Missing values column by column. */
111   };
112
113 /* An entire AGGREGATE procedure. */
114 struct agr_proc 
115   {
116     /* We have either an output file or a sink. */
117     struct file_handle *out_file;       /* Output file, or null if none. */
118     struct case_sink *sink;             /* Sink, or null if none. */
119
120     enum missing_treatment missing;     /* How to treat missing values. */
121     struct sort_cases_pgm *sort;        /* Sort program. */
122     struct agr_var *vars;               /* First aggregate variable. */
123     struct dictionary *dict;            /* Aggregate dictionary. */
124     int case_cnt;                       /* Counts aggregated cases. */
125     union value *prev_break;            /* Last values of break variables. */
126     struct ccase *agr_case;             /* Aggregate case for output. */
127     flt64 *sfm_agr_case;                /* Aggregate case in SFM format. */
128   };
129
130 static void initialize_aggregate_info (struct agr_proc *);
131
132 /* Prototypes. */
133 static int parse_aggregate_functions (struct agr_proc *);
134 static void agr_destroy (struct agr_proc *);
135 static int aggregate_single_case (struct agr_proc *agr,
136                                   const struct ccase *input,
137                                   struct ccase *output);
138 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
139 static int create_sysfile (struct agr_proc *);
140
141 /* Aggregating to the active file. */
142 static int agr_to_active_file (struct ccase *, void *aux);
143
144 /* Aggregating to a system file. */
145 static void write_case_to_sfm (struct agr_proc *agr);
146 static int presorted_agr_to_sysfile (struct ccase *, void *aux);
147 static int sort_agr_to_sysfile (const struct ccase *, void *aux);
148 \f
149 /* Parsing. */
150
151 /* Parses and executes the AGGREGATE procedure. */
152 int
153 cmd_aggregate (void)
154 {
155   struct agr_proc agr;
156
157   /* Have we seen these subcommands? */
158   unsigned seen = 0;
159
160   agr.out_file = NULL;
161   agr.sink = NULL;
162   agr.missing = ITEMWISE;
163   agr.sort = NULL;
164   agr.vars = NULL;
165   agr.dict = NULL;
166   agr.case_cnt = 0;
167   agr.prev_break = NULL;
168   
169   agr.dict = dict_create ();
170   dict_set_label (agr.dict, dict_get_label (default_dict));
171   dict_set_documents (agr.dict, dict_get_documents (default_dict));
172   
173   /* Read most of the subcommands. */
174   for (;;)
175     {
176       lex_match ('/');
177       
178       if (lex_match_id ("OUTFILE"))
179         {
180           if (seen & 1)
181             {
182               msg (SE, _("%s subcommand given multiple times."),"OUTFILE");
183               goto lossage;
184             }
185           seen |= 1;
186               
187           lex_match ('=');
188           if (lex_match ('*'))
189             agr.out_file = NULL;
190           else 
191             {
192               agr.out_file = fh_parse_file_handle ();
193               if (agr.out_file == NULL)
194                 goto lossage;
195             }
196         }
197       else if (lex_match_id ("MISSING"))
198         {
199           lex_match ('=');
200           if (!lex_match_id ("COLUMNWISE"))
201             {
202               lex_error (_("while expecting COLUMNWISE"));
203               goto lossage;
204             }
205           agr.missing = COLUMNWISE;
206         }
207       else if (lex_match_id ("DOCUMENT"))
208         seen |= 2;
209       else if (lex_match_id ("PRESORTED"))
210         seen |= 4;
211       else if (lex_match_id ("BREAK"))
212         {
213           if (seen & 8)
214             {
215               msg (SE, _("%s subcommand given multiple times."),"BREAK");
216               goto lossage;
217             }
218           seen |= 8;
219
220           lex_match ('=');
221           agr.sort = parse_sort ();
222           if (agr.sort == NULL)
223             goto lossage;
224           
225           {
226             int i;
227             
228             for (i = 0; i < agr.sort->var_cnt; i++)
229               {
230                 struct variable *v;
231               
232                 v = dict_clone_var (agr.dict, agr.sort->vars[i],
233                                     agr.sort->vars[i]->name);
234                 assert (v != NULL);
235               }
236           }
237         }
238       else break;
239     }
240
241   /* Check for proper syntax. */
242   if (!(seen & 8))
243     msg (SW, _("BREAK subcommand not specified."));
244       
245   /* Read in the aggregate functions. */
246   if (!parse_aggregate_functions (&agr))
247     goto lossage;
248
249   /* Delete documents. */
250   if (!(seen & 2))
251     dict_set_documents (agr.dict, NULL);
252
253   /* Cancel SPLIT FILE. */
254   dict_set_split_vars (agr.dict, NULL, 0);
255   
256   /* Initialize. */
257   agr.case_cnt = 0;
258   agr.agr_case = xmalloc (dict_get_case_size (agr.dict));
259   initialize_aggregate_info (&agr);
260
261   /* Output to active file or external file? */
262   if (agr.out_file == NULL) 
263     {
264       /* The active file will be replaced by the aggregated data,
265          so TEMPORARY is moot. */
266       cancel_temporary ();
267
268       if (agr.sort != NULL && (seen & 4) == 0)
269         sort_cases (agr.sort, 0);
270
271       agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
272       if (agr.sink->class->open != NULL)
273         agr.sink->class->open (agr.sink);
274       vfm_sink = create_case_sink (&null_sink_class, default_dict, NULL);
275       procedure (agr_to_active_file, &agr);
276       if (agr.case_cnt > 0) 
277         {
278           dump_aggregate_info (&agr, agr.agr_case);
279           agr.sink->class->write (agr.sink, agr.agr_case);
280         }
281       dict_destroy (default_dict);
282       default_dict = agr.dict;
283       agr.dict = NULL;
284       vfm_source = agr.sink->class->make_source (agr.sink);
285       free_case_sink (agr.sink);
286     }
287   else
288     {
289       if (!create_sysfile (&agr))
290         goto lossage;
291
292       if (agr.sort != NULL && (seen & 4) == 0) 
293         {
294           /* Sorting is needed. */
295           sort_cases (agr.sort, 1);
296           read_sort_output (agr.sort, sort_agr_to_sysfile, NULL);
297         }
298       else 
299         {
300           /* Active file is already sorted. */
301           procedure (presorted_agr_to_sysfile, &agr);
302         }
303       
304       if (agr.case_cnt > 0) 
305         {
306           dump_aggregate_info (&agr, agr.agr_case);
307           write_case_to_sfm (&agr);
308         }
309       fh_close_handle (agr.out_file);
310     }
311   
312   agr_destroy (&agr);
313   return CMD_SUCCESS;
314
315 lossage:
316   agr_destroy (&agr);
317   return CMD_FAILURE;
318 }
319
320 /* Create a system file for use in aggregation to an external
321    file. */
322 static int
323 create_sysfile (struct agr_proc *agr)
324 {
325   struct sfm_write_info w;
326   w.h = agr->out_file;
327   w.dict = agr->dict;
328   w.compress = get_scompression();
329   if (!sfm_write_dictionary (&w))
330     return 0;
331
332   agr->sfm_agr_case = xmalloc (sizeof *agr->sfm_agr_case * w.case_size);
333     
334   return 1;
335 }
336
337 /* Parse all the aggregate functions. */
338 static int
339 parse_aggregate_functions (struct agr_proc *agr)
340 {
341   struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
342
343   /* Parse everything. */
344   tail = NULL;
345   for (;;)
346     {
347       char **dest;
348       char **dest_label;
349       int n_dest;
350
351       int include_missing;
352       const struct agr_func *function;
353       int func_index;
354
355       union value arg[2];
356
357       struct variable **src;
358       int n_src;
359
360       int i;
361
362       dest = NULL;
363       dest_label = NULL;
364       n_dest = 0;
365       src = NULL;
366       function = NULL;
367       n_src = 0;
368       arg[0].c = NULL;
369       arg[1].c = NULL;
370
371       /* Parse the list of target variables. */
372       while (!lex_match ('='))
373         {
374           int n_dest_prev = n_dest;
375           
376           if (!parse_DATA_LIST_vars (&dest, &n_dest, PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
377             goto lossage;
378
379           /* Assign empty labels. */
380           {
381             int j;
382
383             dest_label = xrealloc (dest_label, sizeof *dest_label * n_dest);
384             for (j = n_dest_prev; j < n_dest; j++)
385               dest_label[j] = NULL;
386           }
387           
388           if (token == T_STRING)
389             {
390               ds_truncate (&tokstr, 120);
391               dest_label[n_dest - 1] = xstrdup (ds_value (&tokstr));
392               lex_get ();
393             }
394         }
395
396       /* Get the name of the aggregation function. */
397       if (token != T_ID)
398         {
399           lex_error (_("expecting aggregation function"));
400           goto lossage;
401         }
402
403       include_missing = 0;
404       if (tokid[strlen (tokid) - 1] == '.')
405         {
406           include_missing = 1;
407           tokid[strlen (tokid) - 1] = 0;
408         }
409       
410       for (function = agr_func_tab; function->name; function++)
411         if (!strcmp (function->name, tokid))
412           break;
413       if (NULL == function->name)
414         {
415           msg (SE, _("Unknown aggregation function %s."), tokid);
416           goto lossage;
417         }
418       func_index = function - agr_func_tab;
419       lex_get ();
420
421       /* Check for leading lparen. */
422       if (!lex_match ('('))
423         {
424           if (func_index == N)
425             func_index = N_NO_VARS;
426           else if (func_index == NU)
427             func_index = NU_NO_VARS;
428           else
429             {
430               lex_error (_("expecting `('"));
431               goto lossage;
432             }
433         } else {
434           /* Parse list of source variables. */
435           {
436             int pv_opts = PV_NO_SCRATCH;
437
438             if (func_index == SUM || func_index == MEAN || func_index == SD)
439               pv_opts |= PV_NUMERIC;
440             else if (function->n_args)
441               pv_opts |= PV_SAME_TYPE;
442
443             if (!parse_variables (default_dict, &src, &n_src, pv_opts))
444               goto lossage;
445           }
446
447           /* Parse function arguments, for those functions that
448              require arguments. */
449           if (function->n_args != 0)
450             for (i = 0; i < function->n_args; i++)
451               {
452                 int type;
453             
454                 lex_match (',');
455                 if (token == T_STRING)
456                   {
457                     arg[i].c = xstrdup (ds_value (&tokstr));
458                     type = ALPHA;
459                   }
460                 else if (token == T_NUM)
461                   {
462                     arg[i].f = tokval;
463                     type = NUMERIC;
464                   } else {
465                     msg (SE, _("Missing argument %d to %s."), i + 1, function->name);
466                     goto lossage;
467                   }
468             
469                 lex_get ();
470
471                 if (type != src[0]->type)
472                   {
473                     msg (SE, _("Arguments to %s must be of same type as "
474                                "source variables."),
475                          function->name);
476                     goto lossage;
477                   }
478               }
479
480           /* Trailing rparen. */
481           if (!lex_match(')'))
482             {
483               lex_error (_("expecting `)'"));
484               goto lossage;
485             }
486           
487           /* Now check that the number of source variables match the
488              number of target variables.  Do this here because if we
489              do it earlier then the user can get very misleading error
490              messages; i.e., `AGGREGATE x=SUM(y t).' will get this
491              error message when a proper message would be more like
492              `unknown variable t'. */
493           if (n_src != n_dest)
494             {
495               msg (SE, _("Number of source variables (%d) does not match "
496                          "number of target variables (%d)."),
497                    n_src, n_dest);
498               goto lossage;
499             }
500         }
501         
502       /* Finally add these to the linked list of aggregation
503          variables. */
504       for (i = 0; i < n_dest; i++)
505         {
506           struct agr_var *v = xmalloc (sizeof *v);
507
508           /* Add variable to chain. */
509           if (agr->vars != NULL)
510             tail->next = v;
511           else
512             agr->vars = v;
513           tail = v;
514           tail->next = NULL;
515           v->moments = NULL;
516           
517           /* Create the target variable in the aggregate
518              dictionary. */
519           {
520             struct variable *destvar;
521             
522             v->function = func_index;
523
524             if (src)
525               {
526                 int output_width;
527
528                 v->src = src[i];
529                 
530                 if (src[i]->type == ALPHA)
531                   {
532                     v->function |= FSTRING;
533                     v->string = xmalloc (src[i]->width);
534                   }
535                 
536                 if (v->src->type == NUMERIC || function->alpha_type == NUMERIC)
537                   output_width = 0;
538                 else
539                   output_width = v->src->width;
540
541                 if (function->alpha_type == ALPHA)
542                   destvar = dict_clone_var (agr->dict, v->src, dest[i]);
543                 else
544                   {
545                     destvar = dict_create_var (agr->dict, dest[i], output_width);
546                     if (output_width == 0)
547                       destvar->print = destvar->write = function->format;
548                     if (output_width == 0 && dict_get_weight (default_dict) != NULL
549                         && (func_index == N || func_index == N_NO_VARS
550                             || func_index == NU || func_index == NU_NO_VARS))
551                       {
552                         struct fmt_spec f = {FMT_F, 8, 2};
553                       
554                         destvar->print = destvar->write = f;
555                       }
556                   }
557               } else {
558                 v->src = NULL;
559                 destvar = dict_create_var (agr->dict, dest[i], 0);
560               }
561           
562             if (!destvar)
563               {
564                 msg (SE, _("Variable name %s is not unique within the "
565                            "aggregate file dictionary, which contains "
566                            "the aggregate variables and the break "
567                            "variables."),
568                      dest[i]);
569                 free (dest[i]);
570                 goto lossage;
571               }
572
573             free (dest[i]);
574             destvar->init = 0;
575             if (dest_label[i])
576               {
577                 destvar->label = dest_label[i];
578                 dest_label[i] = NULL;
579               }
580             else if (function->alpha_type == ALPHA)
581               destvar->print = destvar->write = function->format;
582
583             v->dest = destvar;
584           }
585           
586           v->include_missing = include_missing;
587
588           if (v->src != NULL)
589             {
590               int j;
591
592               if (v->src->type == NUMERIC)
593                 for (j = 0; j < function->n_args; j++)
594                   v->arg[j].f = arg[j].f;
595               else
596                 for (j = 0; j < function->n_args; j++)
597                   v->arg[j].c = xstrdup (arg[j].c);
598             }
599         }
600       
601       if (src != NULL && src[0]->type == ALPHA)
602         for (i = 0; i < function->n_args; i++)
603           {
604             free (arg[i].c);
605             arg[i].c = NULL;
606           }
607
608       free (src);
609       free (dest);
610       free (dest_label);
611
612       if (!lex_match ('/'))
613         {
614           if (token == '.')
615             return 1;
616
617           lex_error ("expecting end of command");
618           return 0;
619         }
620       continue;
621       
622     lossage:
623       for (i = 0; i < n_dest; i++)
624         {
625           free (dest[i]);
626           free (dest_label[i]);
627         }
628       free (dest);
629       free (dest_label);
630       free (arg[0].c);
631       free (arg[1].c);
632       if (src && n_src && src[0]->type == ALPHA)
633         for (i = 0; i < function->n_args; i++)
634           {
635             free (arg[i].c);
636             arg[i].c = NULL;
637           }
638       free (src);
639         
640       return 0;
641     }
642 }
643
644 /* Destroys AGR. */
645 static void
646 agr_destroy (struct agr_proc *agr)
647 {
648   struct agr_var *iter, *next;
649
650   if (agr->dict != NULL)
651     dict_destroy (agr->dict);
652   if (agr->sort != NULL)
653     destroy_sort_cases_pgm (agr->sort);
654   for (iter = agr->vars; iter; iter = next)
655     {
656       next = iter->next;
657
658       if (iter->function & FSTRING)
659         {
660           int n_args;
661           int i;
662
663           n_args = agr_func_tab[iter->function & FUNC].n_args;
664           for (i = 0; i < n_args; i++)
665             free (iter->arg[i].c);
666           free (iter->string);
667         }
668       else if (iter->function == SD)
669         moments_destroy (iter->moments);
670       free (iter);
671     }
672   free (agr->prev_break);
673   free (agr->agr_case);
674 }
675 \f
676 /* Execution. */
677
678 static void accumulate_aggregate_info (struct agr_proc *,
679                                        const struct ccase *);
680 static void dump_aggregate_info (struct agr_proc *, struct ccase *);
681
682 /* Processes a single case INPUT for aggregation.  If output is
683    warranted, writes it to OUTPUT and returns nonzero.
684    Otherwise, returns zero and OUTPUT is unmodified. */
685 static int
686 aggregate_single_case (struct agr_proc *agr,
687                        const struct ccase *input, struct ccase *output)
688 {
689   /* The first case always begins a new break group.  We also need to
690      preserve the values of the case for later comparison. */
691   if (agr->case_cnt++ == 0)
692     {
693       int n_elem = 0;
694       
695       {
696         int i;
697
698         for (i = 0; i < agr->sort->var_cnt; i++)
699           n_elem += agr->sort->vars[i]->nv;
700       }
701       
702       agr->prev_break = xmalloc (sizeof *agr->prev_break * n_elem);
703
704       /* Copy INPUT into prev_break. */
705       {
706         union value *iter = agr->prev_break;
707         int i;
708
709         for (i = 0; i < agr->sort->var_cnt; i++)
710           {
711             struct variable *v = agr->sort->vars[i];
712             
713             if (v->type == NUMERIC)
714               (iter++)->f = input->data[v->fv].f;
715             else
716               {
717                 memcpy (iter->s, input->data[v->fv].s, v->width);
718                 iter += v->nv;
719               }
720           }
721       }
722             
723       accumulate_aggregate_info (agr, input);
724         
725       return 0;
726     }
727       
728   /* Compare the value of each break variable to the values on the
729      previous case. */
730   {
731     union value *iter = agr->prev_break;
732     int i;
733     
734     for (i = 0; i < agr->sort->var_cnt; i++)
735       {
736         struct variable *v = agr->sort->vars[i];
737       
738         switch (v->type)
739           {
740           case NUMERIC:
741             if (input->data[v->fv].f != iter->f)
742               goto not_equal;
743             iter++;
744             break;
745           case ALPHA:
746             if (memcmp (input->data[v->fv].s, iter->s, v->width))
747               goto not_equal;
748             iter += v->nv;
749             break;
750           default:
751             assert (0);
752           }
753       }
754   }
755
756   accumulate_aggregate_info (agr, input);
757
758   return 0;
759   
760 not_equal:
761   /* The values of the break variable are different from the values on
762      the previous case.  That means that it's time to dump aggregate
763      info. */
764   dump_aggregate_info (agr, output);
765   initialize_aggregate_info (agr);
766   accumulate_aggregate_info (agr, input);
767
768   /* Copy INPUT into prev_break. */
769   {
770     union value *iter = agr->prev_break;
771     int i;
772
773     for (i = 0; i < agr->sort->var_cnt; i++)
774       {
775         struct variable *v = agr->sort->vars[i];
776             
777         if (v->type == NUMERIC)
778           (iter++)->f = input->data[v->fv].f;
779         else
780           {
781             memcpy (iter->s, input->data[v->fv].s, v->width);
782             iter += v->nv;
783           }
784       }
785   }
786   
787   return 1;
788 }
789
790 /* Accumulates aggregation data from the case INPUT. */
791 static void 
792 accumulate_aggregate_info (struct agr_proc *agr,
793                            const struct ccase *input)
794 {
795   struct agr_var *iter;
796   double weight;
797
798   weight = dict_get_case_weight (default_dict, input);
799
800   for (iter = agr->vars; iter; iter = iter->next)
801     if (iter->src)
802       {
803         const union value *v = &input->data[iter->src->fv];
804
805         if ((!iter->include_missing && is_missing (v, iter->src))
806             || (iter->include_missing && iter->src->type == NUMERIC
807                 && v->f == SYSMIS))
808           {
809             switch (iter->function)
810               {
811               case NMISS:
812                 iter->dbl[0] += weight;
813                 break;
814               case NUMISS:
815                 iter->int1++;
816                 break;
817               }
818             iter->missing = 1;
819             continue;
820           }
821         
822         /* This is horrible.  There are too many possibilities. */
823         switch (iter->function)
824           {
825           case SUM:
826             iter->dbl[0] += v->f;
827             break;
828           case MEAN:
829             iter->dbl[0] += v->f * weight;
830             iter->dbl[1] += weight;
831             break;
832           case SD:
833             moments_pass_two (iter->moments, v->f, weight);
834             break;
835           case MAX:
836             iter->dbl[0] = max (iter->dbl[0], v->f);
837             iter->int1 = 1;
838             break;
839           case MAX | FSTRING:
840             if (memcmp (iter->string, v->s, iter->src->width) < 0)
841               memcpy (iter->string, v->s, iter->src->width);
842             iter->int1 = 1;
843             break;
844           case MIN:
845             iter->dbl[0] = min (iter->dbl[0], v->f);
846             iter->int1 = 1;
847             break;
848           case MIN | FSTRING:
849             if (memcmp (iter->string, v->s, iter->src->width) > 0)
850               memcpy (iter->string, v->s, iter->src->width);
851             iter->int1 = 1;
852             break;
853           case FGT:
854           case PGT:
855             if (v->f > iter->arg[0].f)
856               iter->dbl[0] += weight;
857             iter->dbl[1] += weight;
858             break;
859           case FGT | FSTRING:
860           case PGT | FSTRING:
861             if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
862               iter->dbl[0] += weight;
863             iter->dbl[1] += weight;
864             break;
865           case FLT:
866           case PLT:
867             if (v->f < iter->arg[0].f)
868               iter->dbl[0] += weight;
869             iter->dbl[1] += weight;
870             break;
871           case FLT | FSTRING:
872           case PLT | FSTRING:
873             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
874               iter->dbl[0] += weight;
875             iter->dbl[1] += weight;
876             break;
877           case FIN:
878           case PIN:
879             if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
880               iter->dbl[0] += weight;
881             iter->dbl[1] += weight;
882             break;
883           case FIN | FSTRING:
884           case PIN | FSTRING:
885             if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
886                 && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
887               iter->dbl[0] += weight;
888             iter->dbl[1] += weight;
889             break;
890           case FOUT:
891           case POUT:
892             if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
893               iter->dbl[0] += weight;
894             iter->dbl[1] += weight;
895             break;
896           case FOUT | FSTRING:
897           case POUT | FSTRING:
898             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
899                 && memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
900               iter->dbl[0] += weight;
901             iter->dbl[1] += weight;
902             break;
903           case N:
904             iter->dbl[0] += weight;
905             break;
906           case NU:
907             iter->int1++;
908             break;
909           case FIRST:
910             if (iter->int1 == 0)
911               {
912                 iter->dbl[0] = v->f;
913                 iter->int1 = 1;
914               }
915             break;
916           case FIRST | FSTRING:
917             if (iter->int1 == 0)
918               {
919                 memcpy (iter->string, v->s, iter->src->width);
920                 iter->int1 = 1;
921               }
922             break;
923           case LAST:
924             iter->dbl[0] = v->f;
925             iter->int1 = 1;
926             break;
927           case LAST | FSTRING:
928             memcpy (iter->string, v->s, iter->src->width);
929             iter->int1 = 1;
930             break;
931           default:
932             assert (0);
933           }
934     } else {
935       switch (iter->function)
936         {
937         case N_NO_VARS:
938           iter->dbl[0] += weight;
939           break;
940         case NU_NO_VARS:
941           iter->int1++;
942           break;
943         default:
944           assert (0);
945         }
946     }
947 }
948
949 /* We've come to a record that differs from the previous in one or
950    more of the break variables.  Make an output record from the
951    accumulated statistics in the OUTPUT case. */
952 static void 
953 dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
954 {
955   {
956     int n_elem = 0;
957     
958     {
959       int i;
960
961       for (i = 0; i < agr->sort->var_cnt; i++)
962         n_elem += agr->sort->vars[i]->nv;
963     }
964     memcpy (output->data, agr->prev_break, sizeof (union value) * n_elem);
965   }
966   
967   {
968     struct agr_var *i;
969   
970     for (i = agr->vars; i; i = i->next)
971       {
972         union value *v = &output->data[i->dest->fv];
973
974         if (agr->missing == COLUMNWISE && i->missing != 0
975             && (i->function & FUNC) != N && (i->function & FUNC) != NU
976             && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
977           {
978             if (i->function & FSTRING)
979               memset (v->s, ' ', i->dest->width);
980             else
981               v->f = SYSMIS;
982             continue;
983           }
984         
985         switch (i->function)
986           {
987           case SUM:
988             v->f = i->dbl[0];
989             break;
990           case MEAN:
991             v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
992             break;
993           case SD:
994             {
995               double variance;
996
997               /* FIXME: we should use two passes. */
998               moments_calculate (i->moments, NULL, NULL, &variance,
999                                  NULL, NULL);
1000               if (variance != SYSMIS)
1001                 v->f = sqrt (variance);
1002               else
1003                 v->f = SYSMIS; 
1004             }
1005             break;
1006           case MAX:
1007           case MIN:
1008             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1009             break;
1010           case MAX | FSTRING:
1011           case MIN | FSTRING:
1012             if (i->int1)
1013               memcpy (v->s, i->string, i->dest->width);
1014             else
1015               memset (v->s, ' ', i->dest->width);
1016             break;
1017           case FGT | FSTRING:
1018           case FLT | FSTRING:
1019           case FIN | FSTRING:
1020           case FOUT | FSTRING:
1021             v->f = i->int2 ? (double) i->int1 / (double) i->int2 : SYSMIS;
1022             break;
1023           case FGT:
1024           case FLT:
1025           case FIN:
1026           case FOUT:
1027             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
1028             break;
1029           case PGT:
1030           case PGT | FSTRING:
1031           case PLT:
1032           case PLT | FSTRING:
1033           case PIN:
1034           case PIN | FSTRING:
1035           case POUT:
1036           case POUT | FSTRING:
1037             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
1038             break;
1039           case N:
1040             v->f = i->dbl[0];
1041             break;
1042           case NU:
1043             v->f = i->int1;
1044             break;
1045           case FIRST:
1046           case LAST:
1047             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1048             break;
1049           case FIRST | FSTRING:
1050           case LAST | FSTRING:
1051             if (i->int1)
1052               memcpy (v->s, i->string, i->dest->width);
1053             else
1054               memset (v->s, ' ', i->dest->width);
1055             break;
1056           case N_NO_VARS:
1057             v->f = i->dbl[0];
1058             break;
1059           case NU_NO_VARS:
1060             v->f = i->int1;
1061             break;
1062           case NMISS:
1063             v->f = i->dbl[0];
1064             break;
1065           case NUMISS:
1066             v->f = i->int1;
1067             break;
1068           default:
1069             assert (0);
1070           }
1071       }
1072   }
1073 }
1074
1075 /* Resets the state for all the aggregate functions. */
1076 static void
1077 initialize_aggregate_info (struct agr_proc *agr)
1078 {
1079   struct agr_var *iter;
1080
1081   for (iter = agr->vars; iter; iter = iter->next)
1082     {
1083       iter->missing = 0;
1084       switch (iter->function)
1085         {
1086         case MIN:
1087           iter->dbl[0] = DBL_MAX;
1088           break;
1089         case MIN | FSTRING:
1090           memset (iter->string, 255, iter->src->width);
1091           break;
1092         case MAX:
1093           iter->dbl[0] = -DBL_MAX;
1094           break;
1095         case MAX | FSTRING:
1096           memset (iter->string, 0, iter->src->width);
1097           break;
1098         case SD:
1099           if (iter->moments == NULL)
1100             iter->moments = moments_create (MOMENT_VARIANCE);
1101           else
1102             moments_clear (iter->moments);
1103           break;
1104         default:
1105           iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1106           iter->int1 = iter->int2 = 0;
1107           break;
1108         }
1109     }
1110 }
1111 \f
1112 /* Aggregate each case as it comes through.  Cases which aren't needed
1113    are dropped. */
1114 static int
1115 agr_to_active_file (struct ccase *c, void *agr_)
1116 {
1117   struct agr_proc *agr = agr_;
1118
1119   if (aggregate_single_case (agr, c, agr->agr_case)) 
1120     agr->sink->class->write (agr->sink, agr->agr_case);
1121
1122   return 1;
1123 }
1124
1125 /* Writes AGR->agr_case to AGR->out_file. */
1126 static void
1127 write_case_to_sfm (struct agr_proc *agr)
1128 {
1129   flt64 *p;
1130   int i;
1131
1132   p = agr->sfm_agr_case;
1133   for (i = 0; i < dict_get_var_cnt (agr->dict); i++)
1134     {
1135       struct variable *v = dict_get_var (agr->dict, i);
1136       
1137       if (v->type == NUMERIC)
1138         {
1139           double src = agr->agr_case->data[v->fv].f;
1140           if (src == SYSMIS)
1141             *p++ = -FLT64_MAX;
1142           else
1143             *p++ = src;
1144         }
1145       else
1146         {
1147           memcpy (p, agr->agr_case->data[v->fv].s, v->width);
1148           memset (&((char *) p)[v->width], ' ',
1149                   REM_RND_UP (v->width, sizeof (flt64)));
1150           p += DIV_RND_UP (v->width, sizeof (flt64));
1151         }
1152     }
1153
1154   sfm_write_case (agr->out_file, agr->sfm_agr_case, p - agr->sfm_agr_case);
1155 }
1156
1157 /* Aggregate the current case and output it if we passed a
1158    breakpoint. */
1159 static int
1160 presorted_agr_to_sysfile (struct ccase *c, void *agr_) 
1161 {
1162   sort_agr_to_sysfile (c, agr_);
1163   return 1;
1164 }
1165
1166 /* Aggregate the current case and output it if we passed a
1167    breakpoint. */
1168 static int
1169 sort_agr_to_sysfile (const struct ccase *c, void *agr_) 
1170 {
1171   struct agr_proc *agr = agr_;
1172
1173   if (aggregate_single_case (agr, c, agr->agr_case)) 
1174     write_case_to_sfm (agr);
1175
1176   return 1;
1177 }