8fe168ecf78175581c92de4d50f4c3ba6bb0b144
[pspp-builds.git] / src / aggregate.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21 #include "error.h"
22 #include <stdlib.h>
23 #include "alloc.h"
24 #include "case.h"
25 #include "casefile.h"
26 #include "command.h"
27 #include "dictionary.h"
28 #include "error.h"
29 #include "file-handle.h"
30 #include "lexer.h"
31 #include "misc.h"
32 #include "moments.h"
33 #include "pool.h"
34 #include "settings.h"
35 #include "sfm-write.h"
36 #include "sort-prs.h"
37 #include "sort.h"
38 #include "str.h"
39 #include "var.h"
40 #include "vfm.h"
41 #include "vfmP.h"
42
43 /* Specifies how to make an aggregate variable. */
44 struct agr_var
45   {
46     struct agr_var *next;               /* Next in list. */
47
48     /* Collected during parsing. */
49     struct variable *src;       /* Source variable. */
50     struct variable *dest;      /* Target variable. */
51     int function;               /* Function. */
52     int include_missing;        /* 1=Include user-missing values. */
53     union value arg[2];         /* Arguments. */
54
55     /* Accumulated during AGGREGATE execution. */
56     double dbl[3];
57     int int1, int2;
58     char *string;
59     int missing;
60     struct moments1 *moments;
61   };
62
63 /* Aggregation functions. */
64 enum
65   {
66     NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
67     FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
68     N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
69     FUNC = 0x1f, /* Function mask. */
70     FSTRING = 1<<5, /* String function bit. */
71   };
72
73 /* Attributes of an aggregation function. */
74 struct agr_func
75   {
76     const char *name;           /* Aggregation function name. */
77     int n_args;                 /* Number of arguments. */
78     int alpha_type;             /* When given ALPHA arguments, output type. */
79     struct fmt_spec format;     /* Format spec if alpha_type != ALPHA. */
80   };
81
82 /* Attributes of aggregation functions. */
83 static const struct agr_func agr_func_tab[] = 
84   {
85     {"<NONE>",  0, -1,      {0, 0, 0}},
86     {"SUM",     0, -1,      {FMT_F, 8, 2}},
87     {"MEAN",    0, -1,      {FMT_F, 8, 2}},
88     {"SD",      0, -1,      {FMT_F, 8, 2}},
89     {"MAX",     0, ALPHA,   {-1, -1, -1}}, 
90     {"MIN",     0, ALPHA,   {-1, -1, -1}}, 
91     {"PGT",     1, NUMERIC, {FMT_F, 5, 1}},      
92     {"PLT",     1, NUMERIC, {FMT_F, 5, 1}},       
93     {"PIN",     2, NUMERIC, {FMT_F, 5, 1}},       
94     {"POUT",    2, NUMERIC, {FMT_F, 5, 1}},       
95     {"FGT",     1, NUMERIC, {FMT_F, 5, 3}},       
96     {"FLT",     1, NUMERIC, {FMT_F, 5, 3}},       
97     {"FIN",     2, NUMERIC, {FMT_F, 5, 3}},       
98     {"FOUT",    2, NUMERIC, {FMT_F, 5, 3}},       
99     {"N",       0, NUMERIC, {FMT_F, 7, 0}},       
100     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},       
101     {"NMISS",   0, NUMERIC, {FMT_F, 7, 0}},       
102     {"NUMISS",  0, NUMERIC, {FMT_F, 7, 0}},       
103     {"FIRST",   0, ALPHA,   {-1, -1, -1}}, 
104     {"LAST",    0, ALPHA,   {-1, -1, -1}},
105     {NULL,      0, -1,      {-1, -1, -1}},
106     {"N",       0, NUMERIC, {FMT_F, 7, 0}},
107     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},
108   };
109
110 /* Missing value types. */
111 enum missing_treatment
112   {
113     ITEMWISE,           /* Missing values item by item. */
114     COLUMNWISE          /* Missing values column by column. */
115   };
116
117 /* An entire AGGREGATE procedure. */
118 struct agr_proc 
119   {
120     /* We have either an output file or a sink. */
121     struct sfm_writer *writer;          /* Output file, or null if none. */
122     struct case_sink *sink;             /* Sink, or null if none. */
123
124     /* Break variables. */
125     struct sort_criteria *sort;         /* Sort criteria. */
126     struct variable **break_vars;       /* Break variables. */
127     size_t break_var_cnt;               /* Number of break variables. */
128     struct ccase break_case;            /* Last values of break variables. */
129
130     enum missing_treatment missing;     /* How to treat missing values. */
131     struct agr_var *agr_vars;           /* First aggregate variable. */
132     struct dictionary *dict;            /* Aggregate dictionary. */
133     int case_cnt;                       /* Counts aggregated cases. */
134     struct ccase agr_case;              /* Aggregate case for output. */
135   };
136
137 static void initialize_aggregate_info (struct agr_proc *,
138                                        const struct ccase *);
139
140 /* Prototypes. */
141 static int parse_aggregate_functions (struct agr_proc *);
142 static void agr_destroy (struct agr_proc *);
143 static int aggregate_single_case (struct agr_proc *agr,
144                                   const struct ccase *input,
145                                   struct ccase *output);
146 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
147
148 /* Aggregating to the active file. */
149 static int agr_to_active_file (struct ccase *, void *aux);
150
151 /* Aggregating to a system file. */
152 static int presorted_agr_to_sysfile (struct ccase *, void *aux);
153 \f
154 /* Parsing. */
155
156 /* Parses and executes the AGGREGATE procedure. */
157 int
158 cmd_aggregate (void)
159 {
160   struct agr_proc agr;
161   struct file_handle *out_file = NULL;
162
163   bool copy_documents = false;
164   bool presorted = false;
165   bool saw_direction;
166
167   memset(&agr, 0 , sizeof (agr));
168   agr.missing = ITEMWISE;
169   case_nullify (&agr.break_case);
170   
171   agr.dict = dict_create ();
172   dict_set_label (agr.dict, dict_get_label (default_dict));
173   dict_set_documents (agr.dict, dict_get_documents (default_dict));
174
175   /* OUTFILE subcommand must be first. */
176   if (!lex_force_match_id ("OUTFILE"))
177     goto error;
178   lex_match ('=');
179   if (!lex_match ('*'))
180     {
181       out_file = fh_parse ();
182       if (out_file == NULL)
183         goto error;
184     }
185   
186   /* Read most of the subcommands. */
187   for (;;)
188     {
189       lex_match ('/');
190       
191       if (lex_match_id ("MISSING"))
192         {
193           lex_match ('=');
194           if (!lex_match_id ("COLUMNWISE"))
195             {
196               lex_error (_("while expecting COLUMNWISE"));
197               goto error;
198             }
199           agr.missing = COLUMNWISE;
200         }
201       else if (lex_match_id ("DOCUMENT"))
202         copy_documents = true;
203       else if (lex_match_id ("PRESORTED"))
204         presorted = true;
205       else if (lex_match_id ("BREAK"))
206         {
207           int i;
208
209           lex_match ('=');
210           agr.sort = sort_parse_criteria (default_dict,
211                                           &agr.break_vars, &agr.break_var_cnt,
212                                           &saw_direction, NULL);
213           if (agr.sort == NULL)
214             goto error;
215           
216           for (i = 0; i < agr.break_var_cnt; i++)
217             dict_clone_var_assert (agr.dict, agr.break_vars[i],
218                                    agr.break_vars[i]->name);
219
220           /* BREAK must follow the options. */
221           break;
222         }
223       else
224         {
225           lex_error (_("expecting BREAK"));
226           goto error;
227         }
228     }
229   if (presorted && saw_direction)
230     msg (SW, _("When PRESORTED is specified, specifying sorting directions "
231                "with (A) or (D) has no effect.  Output data will be sorted "
232                "the same way as the input data."));
233       
234   /* Read in the aggregate functions. */
235   lex_match ('/');
236   if (!parse_aggregate_functions (&agr))
237     goto error;
238
239   /* Delete documents. */
240   if (!copy_documents)
241     dict_set_documents (agr.dict, NULL);
242
243   /* Cancel SPLIT FILE. */
244   dict_set_split_vars (agr.dict, NULL, 0);
245   
246   /* Initialize. */
247   agr.case_cnt = 0;
248   case_create (&agr.agr_case, dict_get_next_value_idx (agr.dict));
249
250   /* Output to active file or external file? */
251   if (out_file == NULL) 
252     {
253       /* The active file will be replaced by the aggregated data,
254          so TEMPORARY is moot. */
255       cancel_temporary ();
256
257       if (agr.sort != NULL && !presorted)
258         sort_active_file_in_place (agr.sort);
259
260       agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
261       if (agr.sink->class->open != NULL)
262         agr.sink->class->open (agr.sink);
263       vfm_sink = create_case_sink (&null_sink_class, default_dict, NULL);
264       procedure (agr_to_active_file, &agr);
265       if (agr.case_cnt > 0) 
266         {
267           dump_aggregate_info (&agr, &agr.agr_case);
268           agr.sink->class->write (agr.sink, &agr.agr_case);
269         }
270       dict_destroy (default_dict);
271       default_dict = agr.dict;
272       agr.dict = NULL;
273       vfm_source = agr.sink->class->make_source (agr.sink);
274       free_case_sink (agr.sink);
275     }
276   else
277     {
278       agr.writer = sfm_open_writer (out_file, agr.dict, get_scompression (), 0);
279       if (agr.writer == NULL)
280         goto error;
281       
282       if (agr.sort != NULL && !presorted) 
283         {
284           /* Sorting is needed. */
285           struct casefile *dst;
286           struct casereader *reader;
287           struct ccase c;
288           
289           dst = sort_active_file_to_casefile (agr.sort);
290           if (dst == NULL)
291             goto error;
292           reader = casefile_get_destructive_reader (dst);
293           while (casereader_read_xfer (reader, &c)) 
294             {
295               if (aggregate_single_case (&agr, &c, &agr.agr_case)) 
296                 sfm_write_case (agr.writer, &agr.agr_case);
297               case_destroy (&c);
298             }
299           casereader_destroy (reader);
300           casefile_destroy (dst);
301         }
302       else 
303         {
304           /* Active file is already sorted. */
305           procedure (presorted_agr_to_sysfile, &agr);
306         }
307       
308       if (agr.case_cnt > 0) 
309         {
310           dump_aggregate_info (&agr, &agr.agr_case);
311           sfm_write_case (agr.writer, &agr.agr_case);
312         }
313     }
314   
315   agr_destroy (&agr);
316   return CMD_SUCCESS;
317
318 error:
319   agr_destroy (&agr);
320   return CMD_FAILURE;
321 }
322
323 /* Parse all the aggregate functions. */
324 static int
325 parse_aggregate_functions (struct agr_proc *agr)
326 {
327   struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
328
329   /* Parse everything. */
330   tail = NULL;
331   for (;;)
332     {
333       char **dest;
334       char **dest_label;
335       int n_dest;
336
337       int include_missing;
338       const struct agr_func *function;
339       int func_index;
340
341       union value arg[2];
342
343       struct variable **src;
344       int n_src;
345
346       int i;
347
348       dest = NULL;
349       dest_label = NULL;
350       n_dest = 0;
351       src = NULL;
352       function = NULL;
353       n_src = 0;
354       arg[0].c = NULL;
355       arg[1].c = NULL;
356
357       /* Parse the list of target variables. */
358       while (!lex_match ('='))
359         {
360           int n_dest_prev = n_dest;
361           
362           if (!parse_DATA_LIST_vars (&dest, &n_dest,
363                                      PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
364             goto error;
365
366           /* Assign empty labels. */
367           {
368             int j;
369
370             dest_label = xrealloc (dest_label, sizeof *dest_label * n_dest);
371             for (j = n_dest_prev; j < n_dest; j++)
372               dest_label[j] = NULL;
373           }
374           
375           if (token == T_STRING)
376             {
377               ds_truncate (&tokstr, 255);
378               dest_label[n_dest - 1] = xstrdup (ds_c_str (&tokstr));
379               lex_get ();
380             }
381         }
382
383       /* Get the name of the aggregation function. */
384       if (token != T_ID)
385         {
386           lex_error (_("expecting aggregation function"));
387           goto error;
388         }
389
390       include_missing = 0;
391       if (tokid[strlen (tokid) - 1] == '.')
392         {
393           include_missing = 1;
394           tokid[strlen (tokid) - 1] = 0;
395         }
396       
397       for (function = agr_func_tab; function->name; function++)
398         if (!strcasecmp (function->name, tokid))
399           break;
400       if (NULL == function->name)
401         {
402           msg (SE, _("Unknown aggregation function %s."), tokid);
403           goto error;
404         }
405       func_index = function - agr_func_tab;
406       lex_get ();
407
408       /* Check for leading lparen. */
409       if (!lex_match ('('))
410         {
411           if (func_index == N)
412             func_index = N_NO_VARS;
413           else if (func_index == NU)
414             func_index = NU_NO_VARS;
415           else
416             {
417               lex_error (_("expecting `('"));
418               goto error;
419             }
420         }
421       else
422         {
423           /* Parse list of source variables. */
424           {
425             int pv_opts = PV_NO_SCRATCH;
426
427             if (func_index == SUM || func_index == MEAN || func_index == SD)
428               pv_opts |= PV_NUMERIC;
429             else if (function->n_args)
430               pv_opts |= PV_SAME_TYPE;
431
432             if (!parse_variables (default_dict, &src, &n_src, pv_opts))
433               goto error;
434           }
435
436           /* Parse function arguments, for those functions that
437              require arguments. */
438           if (function->n_args != 0)
439             for (i = 0; i < function->n_args; i++)
440               {
441                 int type;
442             
443                 lex_match (',');
444                 if (token == T_STRING)
445                   {
446                     arg[i].c = xstrdup (ds_c_str (&tokstr));
447                     type = ALPHA;
448                   }
449                 else if (lex_is_number ())
450                   {
451                     arg[i].f = tokval;
452                     type = NUMERIC;
453                   } else {
454                     msg (SE, _("Missing argument %d to %s."), i + 1,
455                          function->name);
456                     goto error;
457                   }
458             
459                 lex_get ();
460
461                 if (type != src[0]->type)
462                   {
463                     msg (SE, _("Arguments to %s must be of same type as "
464                                "source variables."),
465                          function->name);
466                     goto error;
467                   }
468               }
469
470           /* Trailing rparen. */
471           if (!lex_match(')'))
472             {
473               lex_error (_("expecting `)'"));
474               goto error;
475             }
476           
477           /* Now check that the number of source variables match
478              the number of target variables.  If we check earlier
479              than this, the user can get very misleading error
480              message, i.e. `AGGREGATE x=SUM(y t).' will get this
481              error message when a proper message would be more
482              like `unknown variable t'. */
483           if (n_src != n_dest)
484             {
485               msg (SE, _("Number of source variables (%d) does not match "
486                          "number of target variables (%d)."),
487                    n_src, n_dest);
488               goto error;
489             }
490
491           if ((func_index == PIN || func_index == POUT
492               || func_index == FIN || func_index == FOUT) 
493               && ((src[0]->type == NUMERIC && arg[0].f > arg[1].f)
494                   || (src[0]->type == ALPHA
495                       && str_compare_rpad (arg[0].c, arg[1].c) > 0)))
496             {
497               union value t = arg[0];
498               arg[0] = arg[1];
499               arg[1] = t;
500                   
501               msg (SW, _("The value arguments passed to the %s function "
502                          "are out-of-order.  They will be treated as if "
503                          "they had been specified in the correct order."),
504                    function->name);
505             }
506         }
507         
508       /* Finally add these to the linked list of aggregation
509          variables. */
510       for (i = 0; i < n_dest; i++)
511         {
512           struct agr_var *v = xmalloc (sizeof *v);
513
514           /* Add variable to chain. */
515           if (agr->agr_vars != NULL)
516             tail->next = v;
517           else
518             agr->agr_vars = v;
519           tail = v;
520           tail->next = NULL;
521           v->moments = NULL;
522           
523           /* Create the target variable in the aggregate
524              dictionary. */
525           {
526             struct variable *destvar;
527             
528             v->function = func_index;
529
530             if (src)
531               {
532                 v->src = src[i];
533                 
534                 if (src[i]->type == ALPHA)
535                   {
536                     v->function |= FSTRING;
537                     v->string = xmalloc (src[i]->width);
538                   }
539
540                 if (function->alpha_type == ALPHA)
541                   destvar = dict_clone_var (agr->dict, v->src, dest[i]);
542                 else
543                   {
544                     assert (v->src->type == NUMERIC
545                             || function->alpha_type == NUMERIC);
546                     destvar = dict_create_var (agr->dict, dest[i], 0);
547                     if (destvar != NULL) 
548                       {
549                         if ((func_index == N || func_index == NMISS)
550                             && dict_get_weight (default_dict) != NULL)
551                           destvar->print = destvar->write = f8_2; 
552                         else
553                           destvar->print = destvar->write = function->format;
554                       }
555                   }
556               } else {
557                 v->src = NULL;
558                 destvar = dict_create_var (agr->dict, dest[i], 0);
559                 if (func_index == N_NO_VARS
560                     && dict_get_weight (default_dict) != NULL)
561                   destvar->print = destvar->write = f8_2; 
562                 else
563                   destvar->print = destvar->write = function->format;
564               }
565           
566             if (!destvar)
567               {
568                 msg (SE, _("Variable name %s is not unique within the "
569                            "aggregate file dictionary, which contains "
570                            "the aggregate variables and the break "
571                            "variables."),
572                      dest[i]);
573                 goto error;
574               }
575
576             free (dest[i]);
577             destvar->init = 0;
578             if (dest_label[i])
579               {
580                 destvar->label = dest_label[i];
581                 dest_label[i] = NULL;
582               }
583
584             v->dest = destvar;
585           }
586           
587           v->include_missing = include_missing;
588
589           if (v->src != NULL)
590             {
591               int j;
592
593               if (v->src->type == NUMERIC)
594                 for (j = 0; j < function->n_args; j++)
595                   v->arg[j].f = arg[j].f;
596               else
597                 for (j = 0; j < function->n_args; j++)
598                   v->arg[j].c = xstrdup (arg[j].c);
599             }
600         }
601       
602       if (src != NULL && src[0]->type == ALPHA)
603         for (i = 0; i < function->n_args; i++)
604           {
605             free (arg[i].c);
606             arg[i].c = NULL;
607           }
608
609       free (src);
610       free (dest);
611       free (dest_label);
612
613       if (!lex_match ('/'))
614         {
615           if (token == '.')
616             return 1;
617
618           lex_error ("expecting end of command");
619           return 0;
620         }
621       continue;
622       
623     error:
624       for (i = 0; i < n_dest; i++)
625         {
626           free (dest[i]);
627           free (dest_label[i]);
628         }
629       free (dest);
630       free (dest_label);
631       free (arg[0].c);
632       free (arg[1].c);
633       if (src && n_src && src[0]->type == ALPHA)
634         for (i = 0; i < function->n_args; i++)
635           {
636             free (arg[i].c);
637             arg[i].c = NULL;
638           }
639       free (src);
640         
641       return 0;
642     }
643 }
644
645 /* Destroys AGR. */
646 static void
647 agr_destroy (struct agr_proc *agr)
648 {
649   struct agr_var *iter, *next;
650
651   sfm_close_writer (agr->writer);
652   if (agr->sort != NULL)
653     sort_destroy_criteria (agr->sort);
654   free (agr->break_vars);
655   case_destroy (&agr->break_case);
656   for (iter = agr->agr_vars; iter; iter = next)
657     {
658       next = iter->next;
659
660       if (iter->function & FSTRING)
661         {
662           int n_args;
663           int i;
664
665           n_args = agr_func_tab[iter->function & FUNC].n_args;
666           for (i = 0; i < n_args; i++)
667             free (iter->arg[i].c);
668           free (iter->string);
669         }
670       else if (iter->function == SD)
671         moments1_destroy (iter->moments);
672       free (iter);
673     }
674   if (agr->dict != NULL)
675     dict_destroy (agr->dict);
676
677   case_destroy (&agr->agr_case);
678 }
679 \f
680 /* Execution. */
681
682 static void accumulate_aggregate_info (struct agr_proc *,
683                                        const struct ccase *);
684 static void dump_aggregate_info (struct agr_proc *, struct ccase *);
685
686 /* Processes a single case INPUT for aggregation.  If output is
687    warranted, writes it to OUTPUT and returns nonzero.
688    Otherwise, returns zero and OUTPUT is unmodified. */
689 static int
690 aggregate_single_case (struct agr_proc *agr,
691                        const struct ccase *input, struct ccase *output)
692 {
693   bool finished_group = false;
694   
695   if (agr->case_cnt++ == 0)
696     initialize_aggregate_info (agr, input);
697   else if (case_compare (&agr->break_case, input,
698                          agr->break_vars, agr->break_var_cnt))
699     {
700       dump_aggregate_info (agr, output);
701       finished_group = true;
702
703       initialize_aggregate_info (agr, input);
704     }
705
706   accumulate_aggregate_info (agr, input);
707   return finished_group;
708 }
709
710 /* Accumulates aggregation data from the case INPUT. */
711 static void 
712 accumulate_aggregate_info (struct agr_proc *agr,
713                            const struct ccase *input)
714 {
715   struct agr_var *iter;
716   double weight;
717   int bad_warn = 1;
718
719   weight = dict_get_case_weight (default_dict, input, &bad_warn);
720
721   for (iter = agr->agr_vars; iter; iter = iter->next)
722     if (iter->src)
723       {
724         const union value *v = case_data (input, iter->src->fv);
725
726         if ((!iter->include_missing && is_missing (v, iter->src))
727             || (iter->include_missing && iter->src->type == NUMERIC
728                 && v->f == SYSMIS))
729           {
730             switch (iter->function)
731               {
732               case NMISS:
733               case NMISS | FSTRING:
734                 iter->dbl[0] += weight;
735                 break;
736               case NUMISS:
737               case NUMISS | FSTRING:
738                 iter->int1++;
739                 break;
740               }
741             iter->missing = 1;
742             continue;
743           }
744         
745         /* This is horrible.  There are too many possibilities. */
746         switch (iter->function)
747           {
748           case SUM:
749             iter->dbl[0] += v->f * weight;
750             iter->int1 = 1;
751             break;
752           case MEAN:
753             iter->dbl[0] += v->f * weight;
754             iter->dbl[1] += weight;
755             break;
756           case SD:
757             moments1_add (iter->moments, v->f, weight);
758             break;
759           case MAX:
760             iter->dbl[0] = max (iter->dbl[0], v->f);
761             iter->int1 = 1;
762             break;
763           case MAX | FSTRING:
764             if (memcmp (iter->string, v->s, iter->src->width) < 0)
765               memcpy (iter->string, v->s, iter->src->width);
766             iter->int1 = 1;
767             break;
768           case MIN:
769             iter->dbl[0] = min (iter->dbl[0], v->f);
770             iter->int1 = 1;
771             break;
772           case MIN | FSTRING:
773             if (memcmp (iter->string, v->s, iter->src->width) > 0)
774               memcpy (iter->string, v->s, iter->src->width);
775             iter->int1 = 1;
776             break;
777           case FGT:
778           case PGT:
779             if (v->f > iter->arg[0].f)
780               iter->dbl[0] += weight;
781             iter->dbl[1] += weight;
782             break;
783           case FGT | FSTRING:
784           case PGT | FSTRING:
785             if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
786               iter->dbl[0] += weight;
787             iter->dbl[1] += weight;
788             break;
789           case FLT:
790           case PLT:
791             if (v->f < iter->arg[0].f)
792               iter->dbl[0] += weight;
793             iter->dbl[1] += weight;
794             break;
795           case FLT | FSTRING:
796           case PLT | FSTRING:
797             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
798               iter->dbl[0] += weight;
799             iter->dbl[1] += weight;
800             break;
801           case FIN:
802           case PIN:
803             if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
804               iter->dbl[0] += weight;
805             iter->dbl[1] += weight;
806             break;
807           case FIN | FSTRING:
808           case PIN | FSTRING:
809             if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
810                 && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
811               iter->dbl[0] += weight;
812             iter->dbl[1] += weight;
813             break;
814           case FOUT:
815           case POUT:
816             if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
817               iter->dbl[0] += weight;
818             iter->dbl[1] += weight;
819             break;
820           case FOUT | FSTRING:
821           case POUT | FSTRING:
822             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
823                 || memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
824               iter->dbl[0] += weight;
825             iter->dbl[1] += weight;
826             break;
827           case N:
828           case N | FSTRING:
829             iter->dbl[0] += weight;
830             break;
831           case NU:
832           case NU | FSTRING:
833             iter->int1++;
834             break;
835           case FIRST:
836             if (iter->int1 == 0)
837               {
838                 iter->dbl[0] = v->f;
839                 iter->int1 = 1;
840               }
841             break;
842           case FIRST | FSTRING:
843             if (iter->int1 == 0)
844               {
845                 memcpy (iter->string, v->s, iter->src->width);
846                 iter->int1 = 1;
847               }
848             break;
849           case LAST:
850             iter->dbl[0] = v->f;
851             iter->int1 = 1;
852             break;
853           case LAST | FSTRING:
854             memcpy (iter->string, v->s, iter->src->width);
855             iter->int1 = 1;
856             break;
857           case NMISS:
858           case NMISS | FSTRING:
859           case NUMISS:
860           case NUMISS | FSTRING:
861             /* Our value is not missing or it would have been
862                caught earlier.  Nothing to do. */
863             break;
864           default:
865             assert (0);
866           }
867     } else {
868       switch (iter->function)
869         {
870         case N_NO_VARS:
871           iter->dbl[0] += weight;
872           break;
873         case NU_NO_VARS:
874           iter->int1++;
875           break;
876         default:
877           assert (0);
878         }
879     }
880 }
881
882 /* We've come to a record that differs from the previous in one or
883    more of the break variables.  Make an output record from the
884    accumulated statistics in the OUTPUT case. */
885 static void 
886 dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
887 {
888   {
889     int value_idx = 0;
890     int i;
891
892     for (i = 0; i < agr->break_var_cnt; i++) 
893       {
894         struct variable *v = agr->break_vars[i];
895         memcpy (case_data_rw (output, value_idx),
896                 case_data (&agr->break_case, v->fv),
897                 sizeof (union value) * v->nv);
898         value_idx += v->nv; 
899       }
900   }
901   
902   {
903     struct agr_var *i;
904   
905     for (i = agr->agr_vars; i; i = i->next)
906       {
907         union value *v = case_data_rw (output, i->dest->fv);
908
909         if (agr->missing == COLUMNWISE && i->missing != 0
910             && (i->function & FUNC) != N && (i->function & FUNC) != NU
911             && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
912           {
913             if (i->dest->type == ALPHA)
914               memset (v->s, ' ', i->dest->width);
915             else
916               v->f = SYSMIS;
917             continue;
918           }
919         
920         switch (i->function)
921           {
922           case SUM:
923             v->f = i->int1 ? i->dbl[0] : SYSMIS;
924             break;
925           case MEAN:
926             v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
927             break;
928           case SD:
929             {
930               double variance;
931
932               /* FIXME: we should use two passes. */
933               moments1_calculate (i->moments, NULL, NULL, &variance,
934                                  NULL, NULL);
935               if (variance != SYSMIS)
936                 v->f = sqrt (variance);
937               else
938                 v->f = SYSMIS; 
939             }
940             break;
941           case MAX:
942           case MIN:
943             v->f = i->int1 ? i->dbl[0] : SYSMIS;
944             break;
945           case MAX | FSTRING:
946           case MIN | FSTRING:
947             if (i->int1)
948               memcpy (v->s, i->string, i->dest->width);
949             else
950               memset (v->s, ' ', i->dest->width);
951             break;
952           case FGT:
953           case FGT | FSTRING:
954           case FLT:
955           case FLT | FSTRING:
956           case FIN:
957           case FIN | FSTRING:
958           case FOUT:
959           case FOUT | FSTRING:
960             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
961             break;
962           case PGT:
963           case PGT | FSTRING:
964           case PLT:
965           case PLT | FSTRING:
966           case PIN:
967           case PIN | FSTRING:
968           case POUT:
969           case POUT | FSTRING:
970             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
971             break;
972           case N:
973           case N | FSTRING:
974             v->f = i->dbl[0];
975             break;
976           case NU:
977           case NU | FSTRING:
978             v->f = i->int1;
979             break;
980           case FIRST:
981           case LAST:
982             v->f = i->int1 ? i->dbl[0] : SYSMIS;
983             break;
984           case FIRST | FSTRING:
985           case LAST | FSTRING:
986             if (i->int1)
987               memcpy (v->s, i->string, i->dest->width);
988             else
989               memset (v->s, ' ', i->dest->width);
990             break;
991           case N_NO_VARS:
992             v->f = i->dbl[0];
993             break;
994           case NU_NO_VARS:
995             v->f = i->int1;
996             break;
997           case NMISS:
998           case NMISS | FSTRING:
999             v->f = i->dbl[0];
1000             break;
1001           case NUMISS:
1002           case NUMISS | FSTRING:
1003             v->f = i->int1;
1004             break;
1005           default:
1006             assert (0);
1007           }
1008       }
1009   }
1010 }
1011
1012 /* Resets the state for all the aggregate functions. */
1013 static void
1014 initialize_aggregate_info (struct agr_proc *agr, const struct ccase *input)
1015 {
1016   struct agr_var *iter;
1017
1018   case_destroy (&agr->break_case);
1019   case_clone (&agr->break_case, input);
1020
1021   for (iter = agr->agr_vars; iter; iter = iter->next)
1022     {
1023       iter->missing = 0;
1024       iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1025       iter->int1 = iter->int2 = 0;
1026       switch (iter->function)
1027         {
1028         case MIN:
1029           iter->dbl[0] = DBL_MAX;
1030           break;
1031         case MIN | FSTRING:
1032           memset (iter->string, 255, iter->src->width);
1033           break;
1034         case MAX:
1035           iter->dbl[0] = -DBL_MAX;
1036           break;
1037         case MAX | FSTRING:
1038           memset (iter->string, 0, iter->src->width);
1039           break;
1040         case SD:
1041           if (iter->moments == NULL)
1042             iter->moments = moments1_create (MOMENT_VARIANCE);
1043           else
1044             moments1_clear (iter->moments);
1045           break;
1046         default:
1047           break;
1048         }
1049     }
1050 }
1051 \f
1052 /* Aggregate each case as it comes through.  Cases which aren't needed
1053    are dropped. */
1054 static int
1055 agr_to_active_file (struct ccase *c, void *agr_)
1056 {
1057   struct agr_proc *agr = agr_;
1058
1059   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1060     agr->sink->class->write (agr->sink, &agr->agr_case);
1061
1062   return 1;
1063 }
1064
1065 /* Aggregate the current case and output it if we passed a
1066    breakpoint. */
1067 static int
1068 presorted_agr_to_sysfile (struct ccase *c, void *agr_) 
1069 {
1070   struct agr_proc *agr = agr_;
1071
1072   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1073     sfm_write_case (agr->writer, &agr->agr_case);
1074
1075   return 1;
1076 }