Instead of making system or portable file readers responsible for
[pspp-builds.git] / src / aggregate.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include "error.h"
22 #include <stdlib.h>
23 #include "alloc.h"
24 #include "case.h"
25 #include "casefile.h"
26 #include "command.h"
27 #include "dictionary.h"
28 #include "error.h"
29 #include "file-handle.h"
30 #include "lexer.h"
31 #include "misc.h"
32 #include "moments.h"
33 #include "pool.h"
34 #include "settings.h"
35 #include "sfm-write.h"
36 #include "sort.h"
37 #include "str.h"
38 #include "var.h"
39 #include "vfm.h"
40 #include "vfmP.h"
41
42 /* Specifies how to make an aggregate variable. */
43 struct agr_var
44   {
45     struct agr_var *next;               /* Next in list. */
46
47     /* Collected during parsing. */
48     struct variable *src;       /* Source variable. */
49     struct variable *dest;      /* Target variable. */
50     int function;               /* Function. */
51     int include_missing;        /* 1=Include user-missing values. */
52     union value arg[2];         /* Arguments. */
53
54     /* Accumulated during AGGREGATE execution. */
55     double dbl[3];
56     int int1, int2;
57     char *string;
58     int missing;
59     struct moments1 *moments;
60   };
61
62 /* Aggregation functions. */
63 enum
64   {
65     NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
66     FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
67     N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
68     FUNC = 0x1f, /* Function mask. */
69     FSTRING = 1<<5, /* String function bit. */
70   };
71
72 /* Attributes of an aggregation function. */
73 struct agr_func
74   {
75     const char *name;           /* Aggregation function name. */
76     int n_args;                 /* Number of arguments. */
77     int alpha_type;             /* When given ALPHA arguments, output type. */
78     struct fmt_spec format;     /* Format spec if alpha_type != ALPHA. */
79   };
80
81 /* Attributes of aggregation functions. */
82 static const struct agr_func agr_func_tab[] = 
83   {
84     {"<NONE>",  0, -1,      {0, 0, 0}},
85     {"SUM",     0, -1,      {FMT_F, 8, 2}},
86     {"MEAN",    0, -1,      {FMT_F, 8, 2}},
87     {"SD",      0, -1,      {FMT_F, 8, 2}},
88     {"MAX",     0, ALPHA,   {-1, -1, -1}}, 
89     {"MIN",     0, ALPHA,   {-1, -1, -1}}, 
90     {"PGT",     1, NUMERIC, {FMT_F, 5, 1}},      
91     {"PLT",     1, NUMERIC, {FMT_F, 5, 1}},       
92     {"PIN",     2, NUMERIC, {FMT_F, 5, 1}},       
93     {"POUT",    2, NUMERIC, {FMT_F, 5, 1}},       
94     {"FGT",     1, NUMERIC, {FMT_F, 5, 3}},       
95     {"FLT",     1, NUMERIC, {FMT_F, 5, 3}},       
96     {"FIN",     2, NUMERIC, {FMT_F, 5, 3}},       
97     {"FOUT",    2, NUMERIC, {FMT_F, 5, 3}},       
98     {"N",       0, NUMERIC, {FMT_F, 7, 0}},       
99     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},       
100     {"NMISS",   0, NUMERIC, {FMT_F, 7, 0}},       
101     {"NUMISS",  0, NUMERIC, {FMT_F, 7, 0}},       
102     {"FIRST",   0, ALPHA,   {-1, -1, -1}}, 
103     {"LAST",    0, ALPHA,   {-1, -1, -1}},
104     {NULL,      0, -1,      {-1, -1, -1}},
105     {"N",       0, NUMERIC, {FMT_F, 7, 0}},
106     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},
107   };
108
109 /* Missing value types. */
110 enum missing_treatment
111   {
112     ITEMWISE,           /* Missing values item by item. */
113     COLUMNWISE          /* Missing values column by column. */
114   };
115
116 /* An entire AGGREGATE procedure. */
117 struct agr_proc 
118   {
119     /* We have either an output file or a sink. */
120     struct sfm_writer *writer;          /* Output file, or null if none. */
121     struct case_sink *sink;             /* Sink, or null if none. */
122
123     /* Break variables. */
124     struct sort_criteria *sort;         /* Sort criteria. */
125     struct variable **break_vars;       /* Break variables. */
126     size_t break_var_cnt;               /* Number of break variables. */
127     union value *prev_break;            /* Last values of break variables. */
128
129     enum missing_treatment missing;     /* How to treat missing values. */
130     struct agr_var *agr_vars;           /* First aggregate variable. */
131     struct dictionary *dict;            /* Aggregate dictionary. */
132     int case_cnt;                       /* Counts aggregated cases. */
133     struct ccase agr_case;              /* Aggregate case for output. */
134   };
135
136 static void initialize_aggregate_info (struct agr_proc *);
137
138 /* Prototypes. */
139 static int parse_aggregate_functions (struct agr_proc *);
140 static void agr_destroy (struct agr_proc *);
141 static int aggregate_single_case (struct agr_proc *agr,
142                                   const struct ccase *input,
143                                   struct ccase *output);
144 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
145
146 /* Aggregating to the active file. */
147 static int agr_to_active_file (struct ccase *, void *aux);
148
149 /* Aggregating to a system file. */
150 static int presorted_agr_to_sysfile (struct ccase *, void *aux);
151 \f
152 /* Parsing. */
153
154 /* Parses and executes the AGGREGATE procedure. */
155 int
156 cmd_aggregate (void)
157 {
158   struct agr_proc agr;
159   struct file_handle *out_file = NULL;
160
161   /* Have we seen these subcommands? */
162   unsigned seen = 0;
163
164   agr.writer = NULL;
165   agr.sink = NULL;
166   agr.missing = ITEMWISE;
167   agr.sort = NULL;
168   agr.break_vars = NULL;
169   agr.agr_vars = NULL;
170   agr.dict = NULL;
171   agr.case_cnt = 0;
172   agr.prev_break = NULL;
173   
174   agr.dict = dict_create ();
175   dict_set_label (agr.dict, dict_get_label (default_dict));
176   dict_set_documents (agr.dict, dict_get_documents (default_dict));
177   
178   /* Read most of the subcommands. */
179   for (;;)
180     {
181       lex_match ('/');
182       
183       if (lex_match_id ("OUTFILE"))
184         {
185           if (seen & 1)
186             {
187               msg (SE, _("%s subcommand given multiple times."),"OUTFILE");
188               goto error;
189             }
190           seen |= 1;
191               
192           lex_match ('=');
193           if (!lex_match ('*'))
194             {
195               out_file = fh_parse ();
196               if (out_file == NULL)
197                 goto error;
198             }
199         }
200       else if (lex_match_id ("MISSING"))
201         {
202           lex_match ('=');
203           if (!lex_match_id ("COLUMNWISE"))
204             {
205               lex_error (_("while expecting COLUMNWISE"));
206               goto error;
207             }
208           agr.missing = COLUMNWISE;
209         }
210       else if (lex_match_id ("DOCUMENT"))
211         seen |= 2;
212       else if (lex_match_id ("PRESORTED"))
213         seen |= 4;
214       else if (lex_match_id ("BREAK"))
215         {
216           int i;
217
218           if (seen & 8)
219             {
220               msg (SE, _("%s subcommand given multiple times."),"BREAK");
221               goto error;
222             }
223           seen |= 8;
224
225           lex_match ('=');
226           agr.sort = sort_parse_criteria (default_dict,
227                                           &agr.break_vars, &agr.break_var_cnt);
228           if (agr.sort == NULL)
229             goto error;
230           
231           for (i = 0; i < agr.break_var_cnt; i++)
232             {
233               struct variable *v = dict_clone_var (agr.dict, agr.break_vars[i],
234                                                    agr.break_vars[i]->name);
235               assert (v != NULL);
236             }
237         }
238       else break;
239     }
240
241   /* Check for proper syntax. */
242   if (!(seen & 8))
243     msg (SW, _("BREAK subcommand not specified."));
244       
245   /* Read in the aggregate functions. */
246   if (!parse_aggregate_functions (&agr))
247     goto error;
248
249   /* Delete documents. */
250   if (!(seen & 2))
251     dict_set_documents (agr.dict, NULL);
252
253   /* Cancel SPLIT FILE. */
254   dict_set_split_vars (agr.dict, NULL, 0);
255   
256   /* Initialize. */
257   agr.case_cnt = 0;
258   case_create (&agr.agr_case, dict_get_next_value_idx (agr.dict));
259   initialize_aggregate_info (&agr);
260
261   /* Output to active file or external file? */
262   if (out_file == NULL) 
263     {
264       /* The active file will be replaced by the aggregated data,
265          so TEMPORARY is moot. */
266       cancel_temporary ();
267
268       if (agr.sort != NULL && (seen & 4) == 0)
269         sort_active_file_in_place (agr.sort);
270
271       agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
272       if (agr.sink->class->open != NULL)
273         agr.sink->class->open (agr.sink);
274       vfm_sink = create_case_sink (&null_sink_class, default_dict, NULL);
275       procedure (agr_to_active_file, &agr);
276       if (agr.case_cnt > 0) 
277         {
278           dump_aggregate_info (&agr, &agr.agr_case);
279           agr.sink->class->write (agr.sink, &agr.agr_case);
280         }
281       dict_destroy (default_dict);
282       default_dict = agr.dict;
283       agr.dict = NULL;
284       vfm_source = agr.sink->class->make_source (agr.sink);
285       free_case_sink (agr.sink);
286     }
287   else
288     {
289       agr.writer = sfm_open_writer (out_file, agr.dict, get_scompression ());
290       if (agr.writer == NULL)
291         goto error;
292       
293       if (agr.sort != NULL && (seen & 4) == 0) 
294         {
295           /* Sorting is needed. */
296           struct casefile *dst;
297           struct casereader *reader;
298           struct ccase c;
299           
300           dst = sort_active_file_to_casefile (agr.sort);
301           if (dst == NULL)
302             goto error;
303           reader = casefile_get_destructive_reader (dst);
304           while (casereader_read_xfer (reader, &c)) 
305             {
306               if (aggregate_single_case (&agr, &c, &agr.agr_case)) 
307                 sfm_write_case (agr.writer, &agr.agr_case);
308               case_destroy (&c);
309             }
310           casereader_destroy (reader);
311           casefile_destroy (dst);
312         }
313       else 
314         {
315           /* Active file is already sorted. */
316           procedure (presorted_agr_to_sysfile, &agr);
317         }
318       
319       if (agr.case_cnt > 0) 
320         {
321           dump_aggregate_info (&agr, &agr.agr_case);
322           sfm_write_case (agr.writer, &agr.agr_case);
323         }
324     }
325   
326   agr_destroy (&agr);
327   return CMD_SUCCESS;
328
329 error:
330   agr_destroy (&agr);
331   return CMD_FAILURE;
332 }
333
334 /* Parse all the aggregate functions. */
335 static int
336 parse_aggregate_functions (struct agr_proc *agr)
337 {
338   struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
339
340   /* Parse everything. */
341   tail = NULL;
342   for (;;)
343     {
344       char **dest;
345       char **dest_label;
346       int n_dest;
347
348       int include_missing;
349       const struct agr_func *function;
350       int func_index;
351
352       union value arg[2];
353
354       struct variable **src;
355       int n_src;
356
357       int i;
358
359       dest = NULL;
360       dest_label = NULL;
361       n_dest = 0;
362       src = NULL;
363       function = NULL;
364       n_src = 0;
365       arg[0].c = NULL;
366       arg[1].c = NULL;
367
368       /* Parse the list of target variables. */
369       while (!lex_match ('='))
370         {
371           int n_dest_prev = n_dest;
372           
373           if (!parse_DATA_LIST_vars (&dest, &n_dest,
374                                      PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
375             goto error;
376
377           /* Assign empty labels. */
378           {
379             int j;
380
381             dest_label = xrealloc (dest_label, sizeof *dest_label * n_dest);
382             for (j = n_dest_prev; j < n_dest; j++)
383               dest_label[j] = NULL;
384           }
385           
386           if (token == T_STRING)
387             {
388               ds_truncate (&tokstr, 255);
389               dest_label[n_dest - 1] = xstrdup (ds_c_str (&tokstr));
390               lex_get ();
391             }
392         }
393
394       /* Get the name of the aggregation function. */
395       if (token != T_ID)
396         {
397           lex_error (_("expecting aggregation function"));
398           goto error;
399         }
400
401       include_missing = 0;
402       if (tokid[strlen (tokid) - 1] == '.')
403         {
404           include_missing = 1;
405           tokid[strlen (tokid) - 1] = 0;
406         }
407       
408       for (function = agr_func_tab; function->name; function++)
409         if (!strcmp (function->name, tokid))
410           break;
411       if (NULL == function->name)
412         {
413           msg (SE, _("Unknown aggregation function %s."), tokid);
414           goto error;
415         }
416       func_index = function - agr_func_tab;
417       lex_get ();
418
419       /* Check for leading lparen. */
420       if (!lex_match ('('))
421         {
422           if (func_index == N)
423             func_index = N_NO_VARS;
424           else if (func_index == NU)
425             func_index = NU_NO_VARS;
426           else
427             {
428               lex_error (_("expecting `('"));
429               goto error;
430             }
431         } else {
432           /* Parse list of source variables. */
433           {
434             int pv_opts = PV_NO_SCRATCH;
435
436             if (func_index == SUM || func_index == MEAN || func_index == SD)
437               pv_opts |= PV_NUMERIC;
438             else if (function->n_args)
439               pv_opts |= PV_SAME_TYPE;
440
441             if (!parse_variables (default_dict, &src, &n_src, pv_opts))
442               goto error;
443           }
444
445           /* Parse function arguments, for those functions that
446              require arguments. */
447           if (function->n_args != 0)
448             for (i = 0; i < function->n_args; i++)
449               {
450                 int type;
451             
452                 lex_match (',');
453                 if (token == T_STRING)
454                   {
455                     arg[i].c = xstrdup (ds_c_str (&tokstr));
456                     type = ALPHA;
457                   }
458                 else if (token == T_NUM)
459                   {
460                     arg[i].f = tokval;
461                     type = NUMERIC;
462                   } else {
463                     msg (SE, _("Missing argument %d to %s."), i + 1, function->name);
464                     goto error;
465                   }
466             
467                 lex_get ();
468
469                 if (type != src[0]->type)
470                   {
471                     msg (SE, _("Arguments to %s must be of same type as "
472                                "source variables."),
473                          function->name);
474                     goto error;
475                   }
476               }
477
478           /* Trailing rparen. */
479           if (!lex_match(')'))
480             {
481               lex_error (_("expecting `)'"));
482               goto error;
483             }
484           
485           /* Now check that the number of source variables match the
486              number of target variables.  Do this here because if we
487              do it earlier then the user can get very misleading error
488              messages; i.e., `AGGREGATE x=SUM(y t).' will get this
489              error message when a proper message would be more like
490              `unknown variable t'. */
491           if (n_src != n_dest)
492             {
493               msg (SE, _("Number of source variables (%d) does not match "
494                          "number of target variables (%d)."),
495                    n_src, n_dest);
496               goto error;
497             }
498         }
499         
500       /* Finally add these to the linked list of aggregation
501          variables. */
502       for (i = 0; i < n_dest; i++)
503         {
504           struct agr_var *v = xmalloc (sizeof *v);
505
506           /* Add variable to chain. */
507           if (agr->agr_vars != NULL)
508             tail->next = v;
509           else
510             agr->agr_vars = v;
511           tail = v;
512           tail->next = NULL;
513           v->moments = NULL;
514           
515           /* Create the target variable in the aggregate
516              dictionary. */
517           {
518             struct variable *destvar;
519             
520             v->function = func_index;
521
522             if (src)
523               {
524                 int output_width;
525
526                 v->src = src[i];
527                 
528                 if (src[i]->type == ALPHA)
529                   {
530                     v->function |= FSTRING;
531                     v->string = xmalloc (src[i]->width);
532                   }
533                 
534                 if (v->src->type == NUMERIC || function->alpha_type == NUMERIC)
535                   output_width = 0;
536                 else
537                   output_width = v->src->width;
538
539                 if (function->alpha_type == ALPHA)
540                   destvar = dict_clone_var (agr->dict, v->src, dest[i]);
541                 else
542                   {
543                     destvar = dict_create_var (agr->dict, dest[i], output_width);
544                     if (output_width == 0)
545                       destvar->print = destvar->write = function->format;
546                     if (output_width == 0 && dict_get_weight (default_dict) != NULL
547                         && (func_index == N || func_index == N_NO_VARS
548                             || func_index == NU || func_index == NU_NO_VARS))
549                       {
550                         struct fmt_spec f = {FMT_F, 8, 2};
551                       
552                         destvar->print = destvar->write = f;
553                       }
554                   }
555               } else {
556                 v->src = NULL;
557                 destvar = dict_create_var (agr->dict, dest[i], 0);
558               }
559           
560             if (!destvar)
561               {
562                 msg (SE, _("Variable name %s is not unique within the "
563                            "aggregate file dictionary, which contains "
564                            "the aggregate variables and the break "
565                            "variables."),
566                      dest[i]);
567                 free (dest[i]);
568                 goto error;
569               }
570
571             free (dest[i]);
572             destvar->init = 0;
573             if (dest_label[i])
574               {
575                 destvar->label = dest_label[i];
576                 dest_label[i] = NULL;
577               }
578             else if (function->alpha_type == ALPHA)
579               destvar->print = destvar->write = function->format;
580
581             v->dest = destvar;
582           }
583           
584           v->include_missing = include_missing;
585
586           if (v->src != NULL)
587             {
588               int j;
589
590               if (v->src->type == NUMERIC)
591                 for (j = 0; j < function->n_args; j++)
592                   v->arg[j].f = arg[j].f;
593               else
594                 for (j = 0; j < function->n_args; j++)
595                   v->arg[j].c = xstrdup (arg[j].c);
596             }
597         }
598       
599       if (src != NULL && src[0]->type == ALPHA)
600         for (i = 0; i < function->n_args; i++)
601           {
602             free (arg[i].c);
603             arg[i].c = NULL;
604           }
605
606       free (src);
607       free (dest);
608       free (dest_label);
609
610       if (!lex_match ('/'))
611         {
612           if (token == '.')
613             return 1;
614
615           lex_error ("expecting end of command");
616           return 0;
617         }
618       continue;
619       
620     error:
621       for (i = 0; i < n_dest; i++)
622         {
623           free (dest[i]);
624           free (dest_label[i]);
625         }
626       free (dest);
627       free (dest_label);
628       free (arg[0].c);
629       free (arg[1].c);
630       if (src && n_src && src[0]->type == ALPHA)
631         for (i = 0; i < function->n_args; i++)
632           {
633             free (arg[i].c);
634             arg[i].c = NULL;
635           }
636       free (src);
637         
638       return 0;
639     }
640 }
641
642 /* Destroys AGR. */
643 static void
644 agr_destroy (struct agr_proc *agr)
645 {
646   struct agr_var *iter, *next;
647
648   sfm_close_writer (agr->writer);
649   if (agr->sort != NULL)
650     sort_destroy_criteria (agr->sort);
651   free (agr->break_vars);
652   free (agr->prev_break);
653   for (iter = agr->agr_vars; iter; iter = next)
654     {
655       next = iter->next;
656
657       if (iter->function & FSTRING)
658         {
659           int n_args;
660           int i;
661
662           n_args = agr_func_tab[iter->function & FUNC].n_args;
663           for (i = 0; i < n_args; i++)
664             free (iter->arg[i].c);
665           free (iter->string);
666         }
667       else if (iter->function == SD)
668         moments1_destroy (iter->moments);
669       free (iter);
670     }
671   if (agr->dict != NULL)
672     dict_destroy (agr->dict);
673   case_destroy (&agr->agr_case);
674 }
675 \f
676 /* Execution. */
677
678 static void accumulate_aggregate_info (struct agr_proc *,
679                                        const struct ccase *);
680 static void dump_aggregate_info (struct agr_proc *, struct ccase *);
681
682 /* Processes a single case INPUT for aggregation.  If output is
683    warranted, writes it to OUTPUT and returns nonzero.
684    Otherwise, returns zero and OUTPUT is unmodified. */
685 static int
686 aggregate_single_case (struct agr_proc *agr,
687                        const struct ccase *input, struct ccase *output)
688 {
689   /* The first case always begins a new break group.  We also need to
690      preserve the values of the case for later comparison. */
691   if (agr->case_cnt++ == 0)
692     {
693       int n_elem = 0;
694       
695       {
696         int i;
697
698         for (i = 0; i < agr->break_var_cnt; i++)
699           n_elem += agr->break_vars[i]->nv;
700       }
701       
702       agr->prev_break = xmalloc (sizeof *agr->prev_break * n_elem);
703
704       /* Copy INPUT into prev_break. */
705       {
706         union value *iter = agr->prev_break;
707         int i;
708
709         for (i = 0; i < agr->break_var_cnt; i++)
710           {
711             struct variable *v = agr->break_vars[i];
712             
713             if (v->type == NUMERIC)
714               (iter++)->f = case_num (input, v->fv);
715             else
716               {
717                 memcpy (iter->s, case_str (input, v->fv), v->width);
718                 iter += v->nv;
719               }
720           }
721       }
722             
723       accumulate_aggregate_info (agr, input);
724         
725       return 0;
726     }
727       
728   /* Compare the value of each break variable to the values on the
729      previous case. */
730   {
731     union value *iter = agr->prev_break;
732     int i;
733     
734     for (i = 0; i < agr->break_var_cnt; i++)
735       {
736         struct variable *v = agr->break_vars[i];
737       
738         switch (v->type)
739           {
740           case NUMERIC:
741             if (case_num (input, v->fv) != iter->f)
742               goto not_equal;
743             iter++;
744             break;
745           case ALPHA:
746             if (memcmp (case_str (input, v->fv), iter->s, v->width))
747               goto not_equal;
748             iter += v->nv;
749             break;
750           default:
751             assert (0);
752           }
753       }
754   }
755
756   accumulate_aggregate_info (agr, input);
757
758   return 0;
759   
760 not_equal:
761   /* The values of the break variable are different from the values on
762      the previous case.  That means that it's time to dump aggregate
763      info. */
764   dump_aggregate_info (agr, output);
765   initialize_aggregate_info (agr);
766   accumulate_aggregate_info (agr, input);
767
768   /* Copy INPUT into prev_break. */
769   {
770     union value *iter = agr->prev_break;
771     int i;
772
773     for (i = 0; i < agr->break_var_cnt; i++)
774       {
775         struct variable *v = agr->break_vars[i];
776             
777         if (v->type == NUMERIC)
778           (iter++)->f = case_num (input, v->fv);
779         else
780           {
781             memcpy (iter->s, case_str (input, v->fv), v->width);
782             iter += v->nv;
783           }
784       }
785   }
786   
787   return 1;
788 }
789
790 /* Accumulates aggregation data from the case INPUT. */
791 static void 
792 accumulate_aggregate_info (struct agr_proc *agr,
793                            const struct ccase *input)
794 {
795   struct agr_var *iter;
796   double weight;
797   int bad_warn = 1;
798
799   weight = dict_get_case_weight (default_dict, input, &bad_warn);
800
801   for (iter = agr->agr_vars; iter; iter = iter->next)
802     if (iter->src)
803       {
804         const union value *v = case_data (input, iter->src->fv);
805
806         if ((!iter->include_missing && is_missing (v, iter->src))
807             || (iter->include_missing && iter->src->type == NUMERIC
808                 && v->f == SYSMIS))
809           {
810             switch (iter->function)
811               {
812               case NMISS:
813                 iter->dbl[0] += weight;
814                 break;
815               case NUMISS:
816                 iter->int1++;
817                 break;
818               }
819             iter->missing = 1;
820             continue;
821           }
822         
823         /* This is horrible.  There are too many possibilities. */
824         switch (iter->function)
825           {
826           case SUM:
827             iter->dbl[0] += v->f;
828             break;
829           case MEAN:
830             iter->dbl[0] += v->f * weight;
831             iter->dbl[1] += weight;
832             break;
833           case SD:
834             moments1_add (iter->moments, v->f, weight);
835             break;
836           case MAX:
837             iter->dbl[0] = max (iter->dbl[0], v->f);
838             iter->int1 = 1;
839             break;
840           case MAX | FSTRING:
841             if (memcmp (iter->string, v->s, iter->src->width) < 0)
842               memcpy (iter->string, v->s, iter->src->width);
843             iter->int1 = 1;
844             break;
845           case MIN:
846             iter->dbl[0] = min (iter->dbl[0], v->f);
847             iter->int1 = 1;
848             break;
849           case MIN | FSTRING:
850             if (memcmp (iter->string, v->s, iter->src->width) > 0)
851               memcpy (iter->string, v->s, iter->src->width);
852             iter->int1 = 1;
853             break;
854           case FGT:
855           case PGT:
856             if (v->f > iter->arg[0].f)
857               iter->dbl[0] += weight;
858             iter->dbl[1] += weight;
859             break;
860           case FGT | FSTRING:
861           case PGT | FSTRING:
862             if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
863               iter->dbl[0] += weight;
864             iter->dbl[1] += weight;
865             break;
866           case FLT:
867           case PLT:
868             if (v->f < iter->arg[0].f)
869               iter->dbl[0] += weight;
870             iter->dbl[1] += weight;
871             break;
872           case FLT | FSTRING:
873           case PLT | FSTRING:
874             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
875               iter->dbl[0] += weight;
876             iter->dbl[1] += weight;
877             break;
878           case FIN:
879           case PIN:
880             if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
881               iter->dbl[0] += weight;
882             iter->dbl[1] += weight;
883             break;
884           case FIN | FSTRING:
885           case PIN | FSTRING:
886             if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
887                 && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
888               iter->dbl[0] += weight;
889             iter->dbl[1] += weight;
890             break;
891           case FOUT:
892           case POUT:
893             if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
894               iter->dbl[0] += weight;
895             iter->dbl[1] += weight;
896             break;
897           case FOUT | FSTRING:
898           case POUT | FSTRING:
899             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
900                 && memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
901               iter->dbl[0] += weight;
902             iter->dbl[1] += weight;
903             break;
904           case N:
905             iter->dbl[0] += weight;
906             break;
907           case NU:
908             iter->int1++;
909             break;
910           case FIRST:
911             if (iter->int1 == 0)
912               {
913                 iter->dbl[0] = v->f;
914                 iter->int1 = 1;
915               }
916             break;
917           case FIRST | FSTRING:
918             if (iter->int1 == 0)
919               {
920                 memcpy (iter->string, v->s, iter->src->width);
921                 iter->int1 = 1;
922               }
923             break;
924           case LAST:
925             iter->dbl[0] = v->f;
926             iter->int1 = 1;
927             break;
928           case LAST | FSTRING:
929             memcpy (iter->string, v->s, iter->src->width);
930             iter->int1 = 1;
931             break;
932           default:
933             assert (0);
934           }
935     } else {
936       switch (iter->function)
937         {
938         case N_NO_VARS:
939           iter->dbl[0] += weight;
940           break;
941         case NU_NO_VARS:
942           iter->int1++;
943           break;
944         default:
945           assert (0);
946         }
947     }
948 }
949
950 /* We've come to a record that differs from the previous in one or
951    more of the break variables.  Make an output record from the
952    accumulated statistics in the OUTPUT case. */
953 static void 
954 dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
955 {
956   {
957     int value_idx = 0;
958     int i;
959
960     for (i = 0; i < agr->break_var_cnt; i++) 
961       {
962         int nv = agr->break_vars[i]->nv;
963         memcpy (case_data_rw (output, value_idx),
964                 &agr->prev_break[value_idx],
965                 sizeof (union value) * nv);
966         value_idx += nv; 
967       }
968   }
969   
970   {
971     struct agr_var *i;
972   
973     for (i = agr->agr_vars; i; i = i->next)
974       {
975         union value *v = case_data_rw (output, i->dest->fv);
976
977         if (agr->missing == COLUMNWISE && i->missing != 0
978             && (i->function & FUNC) != N && (i->function & FUNC) != NU
979             && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
980           {
981             if (i->function & FSTRING)
982               memset (v->s, ' ', i->dest->width);
983             else
984               v->f = SYSMIS;
985             continue;
986           }
987         
988         switch (i->function)
989           {
990           case SUM:
991             v->f = i->dbl[0];
992             break;
993           case MEAN:
994             v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
995             break;
996           case SD:
997             {
998               double variance;
999
1000               /* FIXME: we should use two passes. */
1001               moments1_calculate (i->moments, NULL, NULL, &variance,
1002                                  NULL, NULL);
1003               if (variance != SYSMIS)
1004                 v->f = sqrt (variance);
1005               else
1006                 v->f = SYSMIS; 
1007             }
1008             break;
1009           case MAX:
1010           case MIN:
1011             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1012             break;
1013           case MAX | FSTRING:
1014           case MIN | FSTRING:
1015             if (i->int1)
1016               memcpy (v->s, i->string, i->dest->width);
1017             else
1018               memset (v->s, ' ', i->dest->width);
1019             break;
1020           case FGT | FSTRING:
1021           case FLT | FSTRING:
1022           case FIN | FSTRING:
1023           case FOUT | FSTRING:
1024             v->f = i->int2 ? (double) i->int1 / (double) i->int2 : SYSMIS;
1025             break;
1026           case FGT:
1027           case FLT:
1028           case FIN:
1029           case FOUT:
1030             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
1031             break;
1032           case PGT:
1033           case PGT | FSTRING:
1034           case PLT:
1035           case PLT | FSTRING:
1036           case PIN:
1037           case PIN | FSTRING:
1038           case POUT:
1039           case POUT | FSTRING:
1040             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
1041             break;
1042           case N:
1043             v->f = i->dbl[0];
1044             break;
1045           case NU:
1046             v->f = i->int1;
1047             break;
1048           case FIRST:
1049           case LAST:
1050             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1051             break;
1052           case FIRST | FSTRING:
1053           case LAST | FSTRING:
1054             if (i->int1)
1055               memcpy (v->s, i->string, i->dest->width);
1056             else
1057               memset (v->s, ' ', i->dest->width);
1058             break;
1059           case N_NO_VARS:
1060             v->f = i->dbl[0];
1061             break;
1062           case NU_NO_VARS:
1063             v->f = i->int1;
1064             break;
1065           case NMISS:
1066             v->f = i->dbl[0];
1067             break;
1068           case NUMISS:
1069             v->f = i->int1;
1070             break;
1071           default:
1072             assert (0);
1073           }
1074       }
1075   }
1076 }
1077
1078 /* Resets the state for all the aggregate functions. */
1079 static void
1080 initialize_aggregate_info (struct agr_proc *agr)
1081 {
1082   struct agr_var *iter;
1083
1084   for (iter = agr->agr_vars; iter; iter = iter->next)
1085     {
1086       iter->missing = 0;
1087       switch (iter->function)
1088         {
1089         case MIN:
1090           iter->dbl[0] = DBL_MAX;
1091           break;
1092         case MIN | FSTRING:
1093           memset (iter->string, 255, iter->src->width);
1094           break;
1095         case MAX:
1096           iter->dbl[0] = -DBL_MAX;
1097           break;
1098         case MAX | FSTRING:
1099           memset (iter->string, 0, iter->src->width);
1100           break;
1101         case SD:
1102           if (iter->moments == NULL)
1103             iter->moments = moments1_create (MOMENT_VARIANCE);
1104           else
1105             moments1_clear (iter->moments);
1106           break;
1107         default:
1108           iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1109           iter->int1 = iter->int2 = 0;
1110           break;
1111         }
1112     }
1113 }
1114 \f
1115 /* Aggregate each case as it comes through.  Cases which aren't needed
1116    are dropped. */
1117 static int
1118 agr_to_active_file (struct ccase *c, void *agr_)
1119 {
1120   struct agr_proc *agr = agr_;
1121
1122   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1123     agr->sink->class->write (agr->sink, &agr->agr_case);
1124
1125   return 1;
1126 }
1127
1128 /* Aggregate the current case and output it if we passed a
1129    breakpoint. */
1130 static int
1131 presorted_agr_to_sysfile (struct ccase *c, void *agr_) 
1132 {
1133   struct agr_proc *agr = agr_;
1134
1135   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1136     sfm_write_case (agr->writer, &agr->agr_case);
1137
1138   return 1;
1139 }