7f34309c5354da7cdcc31f4c1d7ccb12ecead7a5
[pspp-builds.git] / src / aggregate.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21 #include "error.h"
22 #include <stdlib.h>
23 #include "alloc.h"
24 #include "case.h"
25 #include "casefile.h"
26 #include "command.h"
27 #include "dictionary.h"
28 #include "error.h"
29 #include "file-handle.h"
30 #include "lexer.h"
31 #include "misc.h"
32 #include "moments.h"
33 #include "pool.h"
34 #include "settings.h"
35 #include "sfm-write.h"
36 #include "sort.h"
37 #include "str.h"
38 #include "var.h"
39 #include "vfm.h"
40 #include "vfmP.h"
41
42 /* Specifies how to make an aggregate variable. */
43 struct agr_var
44   {
45     struct agr_var *next;               /* Next in list. */
46
47     /* Collected during parsing. */
48     struct variable *src;       /* Source variable. */
49     struct variable *dest;      /* Target variable. */
50     int function;               /* Function. */
51     int include_missing;        /* 1=Include user-missing values. */
52     union value arg[2];         /* Arguments. */
53
54     /* Accumulated during AGGREGATE execution. */
55     double dbl[3];
56     int int1, int2;
57     char *string;
58     int missing;
59     struct moments1 *moments;
60   };
61
62 /* Aggregation functions. */
63 enum
64   {
65     NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
66     FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
67     N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
68     FUNC = 0x1f, /* Function mask. */
69     FSTRING = 1<<5, /* String function bit. */
70   };
71
72 /* Attributes of an aggregation function. */
73 struct agr_func
74   {
75     const char *name;           /* Aggregation function name. */
76     int n_args;                 /* Number of arguments. */
77     int alpha_type;             /* When given ALPHA arguments, output type. */
78     struct fmt_spec format;     /* Format spec if alpha_type != ALPHA. */
79   };
80
81 /* Attributes of aggregation functions. */
82 static const struct agr_func agr_func_tab[] = 
83   {
84     {"<NONE>",  0, -1,      {0, 0, 0}},
85     {"SUM",     0, -1,      {FMT_F, 8, 2}},
86     {"MEAN",    0, -1,      {FMT_F, 8, 2}},
87     {"SD",      0, -1,      {FMT_F, 8, 2}},
88     {"MAX",     0, ALPHA,   {-1, -1, -1}}, 
89     {"MIN",     0, ALPHA,   {-1, -1, -1}}, 
90     {"PGT",     1, NUMERIC, {FMT_F, 5, 1}},      
91     {"PLT",     1, NUMERIC, {FMT_F, 5, 1}},       
92     {"PIN",     2, NUMERIC, {FMT_F, 5, 1}},       
93     {"POUT",    2, NUMERIC, {FMT_F, 5, 1}},       
94     {"FGT",     1, NUMERIC, {FMT_F, 5, 3}},       
95     {"FLT",     1, NUMERIC, {FMT_F, 5, 3}},       
96     {"FIN",     2, NUMERIC, {FMT_F, 5, 3}},       
97     {"FOUT",    2, NUMERIC, {FMT_F, 5, 3}},       
98     {"N",       0, NUMERIC, {FMT_F, 7, 0}},       
99     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},       
100     {"NMISS",   0, NUMERIC, {FMT_F, 7, 0}},       
101     {"NUMISS",  0, NUMERIC, {FMT_F, 7, 0}},       
102     {"FIRST",   0, ALPHA,   {-1, -1, -1}}, 
103     {"LAST",    0, ALPHA,   {-1, -1, -1}},
104     {NULL,      0, -1,      {-1, -1, -1}},
105     {"N",       0, NUMERIC, {FMT_F, 7, 0}},
106     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},
107   };
108
109 /* Missing value types. */
110 enum missing_treatment
111   {
112     ITEMWISE,           /* Missing values item by item. */
113     COLUMNWISE          /* Missing values column by column. */
114   };
115
116 /* An entire AGGREGATE procedure. */
117 struct agr_proc 
118   {
119     /* We have either an output file or a sink. */
120     struct sfm_writer *writer;          /* Output file, or null if none. */
121     struct case_sink *sink;             /* Sink, or null if none. */
122
123     /* Break variables. */
124     struct sort_criteria *sort;         /* Sort criteria. */
125     struct variable **break_vars;       /* Break variables. */
126     size_t break_var_cnt;               /* Number of break variables. */
127     struct ccase break_case;            /* Last values of break variables. */
128
129     enum missing_treatment missing;     /* How to treat missing values. */
130     struct agr_var *agr_vars;           /* First aggregate variable. */
131     struct dictionary *dict;            /* Aggregate dictionary. */
132     int case_cnt;                       /* Counts aggregated cases. */
133     struct ccase agr_case;              /* Aggregate case for output. */
134   };
135
136 static void initialize_aggregate_info (struct agr_proc *,
137                                        const struct ccase *);
138
139 /* Prototypes. */
140 static int parse_aggregate_functions (struct agr_proc *);
141 static void agr_destroy (struct agr_proc *);
142 static int aggregate_single_case (struct agr_proc *agr,
143                                   const struct ccase *input,
144                                   struct ccase *output);
145 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
146
147 /* Aggregating to the active file. */
148 static int agr_to_active_file (struct ccase *, void *aux);
149
150 /* Aggregating to a system file. */
151 static int presorted_agr_to_sysfile (struct ccase *, void *aux);
152 \f
153 /* Parsing. */
154
155 /* Parses and executes the AGGREGATE procedure. */
156 int
157 cmd_aggregate (void)
158 {
159   struct agr_proc agr;
160   struct file_handle *out_file = NULL;
161
162   bool copy_documents = false;
163   bool presorted = false;
164   bool saw_direction;
165
166   memset(&agr, 0 , sizeof (agr));
167   agr.missing = ITEMWISE;
168   case_nullify (&agr.break_case);
169   
170   agr.dict = dict_create ();
171   dict_set_label (agr.dict, dict_get_label (default_dict));
172   dict_set_documents (agr.dict, dict_get_documents (default_dict));
173
174   /* OUTFILE subcommand must be first. */
175   if (!lex_force_match_id ("OUTFILE"))
176     goto error;
177   lex_match ('=');
178   if (!lex_match ('*'))
179     {
180       out_file = fh_parse ();
181       if (out_file == NULL)
182         goto error;
183     }
184   
185   /* Read most of the subcommands. */
186   for (;;)
187     {
188       lex_match ('/');
189       
190       if (lex_match_id ("MISSING"))
191         {
192           lex_match ('=');
193           if (!lex_match_id ("COLUMNWISE"))
194             {
195               lex_error (_("while expecting COLUMNWISE"));
196               goto error;
197             }
198           agr.missing = COLUMNWISE;
199         }
200       else if (lex_match_id ("DOCUMENT"))
201         copy_documents = true;
202       else if (lex_match_id ("PRESORTED"))
203         presorted = true;
204       else if (lex_match_id ("BREAK"))
205         {
206           int i;
207
208           lex_match ('=');
209           agr.sort = sort_parse_criteria (default_dict,
210                                           &agr.break_vars, &agr.break_var_cnt,
211                                           &saw_direction);
212           if (agr.sort == NULL)
213             goto error;
214           
215           for (i = 0; i < agr.break_var_cnt; i++)
216             dict_clone_var_assert (agr.dict, agr.break_vars[i],
217                                    agr.break_vars[i]->name);
218
219           /* BREAK must follow the options. */
220           break;
221         }
222       else
223         {
224           lex_error (_("expecting BREAK"));
225           goto error;
226         }
227     }
228   if (presorted && saw_direction)
229     msg (SW, _("When PRESORTED is specified, specifying sorting directions "
230                "with (A) or (D) has no effect.  Output data will be sorted "
231                "the same way as the input data."));
232       
233   /* Read in the aggregate functions. */
234   lex_match ('/');
235   if (!parse_aggregate_functions (&agr))
236     goto error;
237
238   /* Delete documents. */
239   if (!copy_documents)
240     dict_set_documents (agr.dict, NULL);
241
242   /* Cancel SPLIT FILE. */
243   dict_set_split_vars (agr.dict, NULL, 0);
244   
245   /* Initialize. */
246   agr.case_cnt = 0;
247   case_create (&agr.agr_case, dict_get_next_value_idx (agr.dict));
248
249   /* Output to active file or external file? */
250   if (out_file == NULL) 
251     {
252       /* The active file will be replaced by the aggregated data,
253          so TEMPORARY is moot. */
254       cancel_temporary ();
255
256       if (agr.sort != NULL && !presorted)
257         sort_active_file_in_place (agr.sort);
258
259       agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
260       if (agr.sink->class->open != NULL)
261         agr.sink->class->open (agr.sink);
262       vfm_sink = create_case_sink (&null_sink_class, default_dict, NULL);
263       procedure (agr_to_active_file, &agr);
264       if (agr.case_cnt > 0) 
265         {
266           dump_aggregate_info (&agr, &agr.agr_case);
267           agr.sink->class->write (agr.sink, &agr.agr_case);
268         }
269       dict_destroy (default_dict);
270       default_dict = agr.dict;
271       agr.dict = NULL;
272       vfm_source = agr.sink->class->make_source (agr.sink);
273       free_case_sink (agr.sink);
274     }
275   else
276     {
277       agr.writer = sfm_open_writer (out_file, agr.dict, get_scompression (), 0);
278       if (agr.writer == NULL)
279         goto error;
280       
281       if (agr.sort != NULL && !presorted) 
282         {
283           /* Sorting is needed. */
284           struct casefile *dst;
285           struct casereader *reader;
286           struct ccase c;
287           
288           dst = sort_active_file_to_casefile (agr.sort);
289           if (dst == NULL)
290             goto error;
291           reader = casefile_get_destructive_reader (dst);
292           while (casereader_read_xfer (reader, &c)) 
293             {
294               if (aggregate_single_case (&agr, &c, &agr.agr_case)) 
295                 sfm_write_case (agr.writer, &agr.agr_case);
296               case_destroy (&c);
297             }
298           casereader_destroy (reader);
299           casefile_destroy (dst);
300         }
301       else 
302         {
303           /* Active file is already sorted. */
304           procedure (presorted_agr_to_sysfile, &agr);
305         }
306       
307       if (agr.case_cnt > 0) 
308         {
309           dump_aggregate_info (&agr, &agr.agr_case);
310           sfm_write_case (agr.writer, &agr.agr_case);
311         }
312     }
313   
314   agr_destroy (&agr);
315   return CMD_SUCCESS;
316
317 error:
318   agr_destroy (&agr);
319   return CMD_FAILURE;
320 }
321
322 /* Parse all the aggregate functions. */
323 static int
324 parse_aggregate_functions (struct agr_proc *agr)
325 {
326   struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
327
328   /* Parse everything. */
329   tail = NULL;
330   for (;;)
331     {
332       char **dest;
333       char **dest_label;
334       int n_dest;
335
336       int include_missing;
337       const struct agr_func *function;
338       int func_index;
339
340       union value arg[2];
341
342       struct variable **src;
343       int n_src;
344
345       int i;
346
347       dest = NULL;
348       dest_label = NULL;
349       n_dest = 0;
350       src = NULL;
351       function = NULL;
352       n_src = 0;
353       arg[0].c = NULL;
354       arg[1].c = NULL;
355
356       /* Parse the list of target variables. */
357       while (!lex_match ('='))
358         {
359           int n_dest_prev = n_dest;
360           
361           if (!parse_DATA_LIST_vars (&dest, &n_dest,
362                                      PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
363             goto error;
364
365           /* Assign empty labels. */
366           {
367             int j;
368
369             dest_label = xrealloc (dest_label, sizeof *dest_label * n_dest);
370             for (j = n_dest_prev; j < n_dest; j++)
371               dest_label[j] = NULL;
372           }
373           
374           if (token == T_STRING)
375             {
376               ds_truncate (&tokstr, 255);
377               dest_label[n_dest - 1] = xstrdup (ds_c_str (&tokstr));
378               lex_get ();
379             }
380         }
381
382       /* Get the name of the aggregation function. */
383       if (token != T_ID)
384         {
385           lex_error (_("expecting aggregation function"));
386           goto error;
387         }
388
389       include_missing = 0;
390       if (tokid[strlen (tokid) - 1] == '.')
391         {
392           include_missing = 1;
393           tokid[strlen (tokid) - 1] = 0;
394         }
395       
396       for (function = agr_func_tab; function->name; function++)
397         if (!strcasecmp (function->name, tokid))
398           break;
399       if (NULL == function->name)
400         {
401           msg (SE, _("Unknown aggregation function %s."), tokid);
402           goto error;
403         }
404       func_index = function - agr_func_tab;
405       lex_get ();
406
407       /* Check for leading lparen. */
408       if (!lex_match ('('))
409         {
410           if (func_index == N)
411             func_index = N_NO_VARS;
412           else if (func_index == NU)
413             func_index = NU_NO_VARS;
414           else
415             {
416               lex_error (_("expecting `('"));
417               goto error;
418             }
419         }
420       else
421         {
422           /* Parse list of source variables. */
423           {
424             int pv_opts = PV_NO_SCRATCH;
425
426             if (func_index == SUM || func_index == MEAN || func_index == SD)
427               pv_opts |= PV_NUMERIC;
428             else if (function->n_args)
429               pv_opts |= PV_SAME_TYPE;
430
431             if (!parse_variables (default_dict, &src, &n_src, pv_opts))
432               goto error;
433           }
434
435           /* Parse function arguments, for those functions that
436              require arguments. */
437           if (function->n_args != 0)
438             for (i = 0; i < function->n_args; i++)
439               {
440                 int type;
441             
442                 lex_match (',');
443                 if (token == T_STRING)
444                   {
445                     arg[i].c = xstrdup (ds_c_str (&tokstr));
446                     type = ALPHA;
447                   }
448                 else if (lex_is_number ())
449                   {
450                     arg[i].f = tokval;
451                     type = NUMERIC;
452                   } else {
453                     msg (SE, _("Missing argument %d to %s."), i + 1,
454                          function->name);
455                     goto error;
456                   }
457             
458                 lex_get ();
459
460                 if (type != src[0]->type)
461                   {
462                     msg (SE, _("Arguments to %s must be of same type as "
463                                "source variables."),
464                          function->name);
465                     goto error;
466                   }
467               }
468
469           /* Trailing rparen. */
470           if (!lex_match(')'))
471             {
472               lex_error (_("expecting `)'"));
473               goto error;
474             }
475           
476           /* Now check that the number of source variables match
477              the number of target variables.  If we check earlier
478              than this, the user can get very misleading error
479              message, i.e. `AGGREGATE x=SUM(y t).' will get this
480              error message when a proper message would be more
481              like `unknown variable t'. */
482           if (n_src != n_dest)
483             {
484               msg (SE, _("Number of source variables (%d) does not match "
485                          "number of target variables (%d)."),
486                    n_src, n_dest);
487               goto error;
488             }
489
490           if ((func_index == PIN || func_index == POUT
491               || func_index == FIN || func_index == FOUT) 
492               && ((src[0]->type == NUMERIC && arg[0].f > arg[1].f)
493                   || (src[0]->type == ALPHA
494                       && str_compare_rpad (arg[0].c, arg[1].c) > 0)))
495             {
496               union value t = arg[0];
497               arg[0] = arg[1];
498               arg[1] = t;
499                   
500               msg (SW, _("The value arguments passed to the %s function "
501                          "are out-of-order.  They will be treated as if "
502                          "they had been specified in the correct order."),
503                    function->name);
504             }
505         }
506         
507       /* Finally add these to the linked list of aggregation
508          variables. */
509       for (i = 0; i < n_dest; i++)
510         {
511           struct agr_var *v = xmalloc (sizeof *v);
512
513           /* Add variable to chain. */
514           if (agr->agr_vars != NULL)
515             tail->next = v;
516           else
517             agr->agr_vars = v;
518           tail = v;
519           tail->next = NULL;
520           v->moments = NULL;
521           
522           /* Create the target variable in the aggregate
523              dictionary. */
524           {
525             static const struct fmt_spec f8_2 = {FMT_F, 8, 2};
526             struct variable *destvar;
527             
528             v->function = func_index;
529
530             if (src)
531               {
532                 v->src = src[i];
533                 
534                 if (src[i]->type == ALPHA)
535                   {
536                     v->function |= FSTRING;
537                     v->string = xmalloc (src[i]->width);
538                   }
539
540                 if (function->alpha_type == ALPHA)
541                   destvar = dict_clone_var (agr->dict, v->src, dest[i]);
542                 else
543                   {
544                     assert (v->src->type == NUMERIC
545                             || function->alpha_type == NUMERIC);
546                     destvar = dict_create_var (agr->dict, dest[i], 0);
547                     if (destvar != NULL) 
548                       {
549                         if ((func_index == N || func_index == NMISS)
550                             && dict_get_weight (default_dict) != NULL)
551                           destvar->print = destvar->write = f8_2; 
552                         else
553                           destvar->print = destvar->write = function->format;
554                       }
555                   }
556               } else {
557                 v->src = NULL;
558                 destvar = dict_create_var (agr->dict, dest[i], 0);
559                 if (func_index == N_NO_VARS
560                     && dict_get_weight (default_dict) != NULL)
561                   destvar->print = destvar->write = f8_2; 
562                 else
563                   destvar->print = destvar->write = function->format;
564               }
565           
566             if (!destvar)
567               {
568                 msg (SE, _("Variable name %s is not unique within the "
569                            "aggregate file dictionary, which contains "
570                            "the aggregate variables and the break "
571                            "variables."),
572                      dest[i]);
573                 goto error;
574               }
575
576             free (dest[i]);
577             destvar->init = 0;
578             if (dest_label[i])
579               {
580                 destvar->label = dest_label[i];
581                 dest_label[i] = NULL;
582               }
583
584             v->dest = destvar;
585           }
586           
587           v->include_missing = include_missing;
588
589           if (v->src != NULL)
590             {
591               int j;
592
593               if (v->src->type == NUMERIC)
594                 for (j = 0; j < function->n_args; j++)
595                   v->arg[j].f = arg[j].f;
596               else
597                 for (j = 0; j < function->n_args; j++)
598                   v->arg[j].c = xstrdup (arg[j].c);
599             }
600         }
601       
602       if (src != NULL && src[0]->type == ALPHA)
603         for (i = 0; i < function->n_args; i++)
604           {
605             free (arg[i].c);
606             arg[i].c = NULL;
607           }
608
609       free (src);
610       free (dest);
611       free (dest_label);
612
613       if (!lex_match ('/'))
614         {
615           if (token == '.')
616             return 1;
617
618           lex_error ("expecting end of command");
619           return 0;
620         }
621       continue;
622       
623     error:
624       for (i = 0; i < n_dest; i++)
625         {
626           free (dest[i]);
627           free (dest_label[i]);
628         }
629       free (dest);
630       free (dest_label);
631       free (arg[0].c);
632       free (arg[1].c);
633       if (src && n_src && src[0]->type == ALPHA)
634         for (i = 0; i < function->n_args; i++)
635           {
636             free (arg[i].c);
637             arg[i].c = NULL;
638           }
639       free (src);
640         
641       return 0;
642     }
643 }
644
645 /* Destroys AGR. */
646 static void
647 agr_destroy (struct agr_proc *agr)
648 {
649   struct agr_var *iter, *next;
650
651   sfm_close_writer (agr->writer);
652   if (agr->sort != NULL)
653     sort_destroy_criteria (agr->sort);
654   free (agr->break_vars);
655   case_destroy (&agr->break_case);
656   for (iter = agr->agr_vars; iter; iter = next)
657     {
658       next = iter->next;
659
660       if (iter->function & FSTRING)
661         {
662           int n_args;
663           int i;
664
665           n_args = agr_func_tab[iter->function & FUNC].n_args;
666           for (i = 0; i < n_args; i++)
667             free (iter->arg[i].c);
668           free (iter->string);
669         }
670       else if (iter->function == SD)
671         moments1_destroy (iter->moments);
672       free (iter);
673     }
674   if (agr->dict != NULL)
675     dict_destroy (agr->dict);
676
677   case_destroy (&agr->agr_case);
678 }
679 \f
680 /* Execution. */
681
682 static void accumulate_aggregate_info (struct agr_proc *,
683                                        const struct ccase *);
684 static void dump_aggregate_info (struct agr_proc *, struct ccase *);
685
686 /* Processes a single case INPUT for aggregation.  If output is
687    warranted, writes it to OUTPUT and returns nonzero.
688    Otherwise, returns zero and OUTPUT is unmodified. */
689 static int
690 aggregate_single_case (struct agr_proc *agr,
691                        const struct ccase *input, struct ccase *output)
692 {
693   bool finished_group = false;
694   
695   if (agr->case_cnt++ == 0)
696     initialize_aggregate_info (agr, input);
697   else if (case_compare (&agr->break_case, input,
698                          agr->break_vars, agr->break_var_cnt))
699     {
700       dump_aggregate_info (agr, output);
701       finished_group = true;
702
703       initialize_aggregate_info (agr, input);
704     }
705
706   accumulate_aggregate_info (agr, input);
707   return finished_group;
708 }
709
710 /* Accumulates aggregation data from the case INPUT. */
711 static void 
712 accumulate_aggregate_info (struct agr_proc *agr,
713                            const struct ccase *input)
714 {
715   struct agr_var *iter;
716   double weight;
717   int bad_warn = 1;
718
719   weight = dict_get_case_weight (default_dict, input, &bad_warn);
720
721   for (iter = agr->agr_vars; iter; iter = iter->next)
722     if (iter->src)
723       {
724         const union value *v = case_data (input, iter->src->fv);
725
726         if ((!iter->include_missing && is_missing (v, iter->src))
727             || (iter->include_missing && iter->src->type == NUMERIC
728                 && v->f == SYSMIS))
729           {
730             switch (iter->function)
731               {
732               case NMISS:
733               case NMISS | FSTRING:
734                 iter->dbl[0] += weight;
735                 break;
736               case NUMISS:
737               case NUMISS | FSTRING:
738                 iter->int1++;
739                 break;
740               }
741             iter->missing = 1;
742             continue;
743           }
744         
745         /* This is horrible.  There are too many possibilities. */
746         switch (iter->function)
747           {
748           case SUM:
749             iter->dbl[0] += v->f * weight;
750             iter->int1 = 1;
751             break;
752           case MEAN:
753             iter->dbl[0] += v->f * weight;
754             iter->dbl[1] += weight;
755             break;
756           case SD:
757             moments1_add (iter->moments, v->f, weight);
758             break;
759           case MAX:
760             iter->dbl[0] = max (iter->dbl[0], v->f);
761             iter->int1 = 1;
762             break;
763           case MAX | FSTRING:
764             if (memcmp (iter->string, v->s, iter->src->width) < 0)
765               memcpy (iter->string, v->s, iter->src->width);
766             iter->int1 = 1;
767             break;
768           case MIN:
769             iter->dbl[0] = min (iter->dbl[0], v->f);
770             iter->int1 = 1;
771             break;
772           case MIN | FSTRING:
773             if (memcmp (iter->string, v->s, iter->src->width) > 0)
774               memcpy (iter->string, v->s, iter->src->width);
775             iter->int1 = 1;
776             break;
777           case FGT:
778           case PGT:
779             if (v->f > iter->arg[0].f)
780               iter->dbl[0] += weight;
781             iter->dbl[1] += weight;
782             break;
783           case FGT | FSTRING:
784           case PGT | FSTRING:
785             if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
786               iter->dbl[0] += weight;
787             iter->dbl[1] += weight;
788             break;
789           case FLT:
790           case PLT:
791             if (v->f < iter->arg[0].f)
792               iter->dbl[0] += weight;
793             iter->dbl[1] += weight;
794             break;
795           case FLT | FSTRING:
796           case PLT | FSTRING:
797             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
798               iter->dbl[0] += weight;
799             iter->dbl[1] += weight;
800             break;
801           case FIN:
802           case PIN:
803             if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
804               iter->dbl[0] += weight;
805             iter->dbl[1] += weight;
806             break;
807           case FIN | FSTRING:
808           case PIN | FSTRING:
809             if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
810                 && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
811               iter->dbl[0] += weight;
812             iter->dbl[1] += weight;
813             break;
814           case FOUT:
815           case POUT:
816             if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
817               iter->dbl[0] += weight;
818             iter->dbl[1] += weight;
819             break;
820           case FOUT | FSTRING:
821           case POUT | FSTRING:
822             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
823                 || memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
824               iter->dbl[0] += weight;
825             iter->dbl[1] += weight;
826             break;
827           case N:
828           case N | FSTRING:
829             iter->dbl[0] += weight;
830             break;
831           case NU:
832           case NU | FSTRING:
833             iter->int1++;
834             break;
835           case FIRST:
836             if (iter->int1 == 0)
837               {
838                 iter->dbl[0] = v->f;
839                 iter->int1 = 1;
840               }
841             break;
842           case FIRST | FSTRING:
843             if (iter->int1 == 0)
844               {
845                 memcpy (iter->string, v->s, iter->src->width);
846                 iter->int1 = 1;
847               }
848             break;
849           case LAST:
850             iter->dbl[0] = v->f;
851             iter->int1 = 1;
852             break;
853           case LAST | FSTRING:
854             memcpy (iter->string, v->s, iter->src->width);
855             iter->int1 = 1;
856             break;
857           case NMISS:
858           case NMISS | FSTRING:
859           case NUMISS:
860           case NUMISS | FSTRING:
861             /* Our value is not missing or it would have been
862                caught earlier.  Nothing to do. */
863             break;
864           default:
865             assert (0);
866           }
867     } else {
868       switch (iter->function)
869         {
870         case N_NO_VARS:
871           iter->dbl[0] += weight;
872           break;
873         case NU_NO_VARS:
874           iter->int1++;
875           break;
876         default:
877           assert (0);
878         }
879     }
880 }
881
882 /* We've come to a record that differs from the previous in one or
883    more of the break variables.  Make an output record from the
884    accumulated statistics in the OUTPUT case. */
885 static void 
886 dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
887 {
888   {
889     int value_idx = 0;
890     int i;
891
892     for (i = 0; i < agr->break_var_cnt; i++) 
893       {
894         struct variable *v = agr->break_vars[i];
895         memcpy (case_data_rw (output, value_idx),
896                 case_data (&agr->break_case, v->fv),
897                 sizeof (union value) * v->nv);
898         value_idx += v->nv; 
899       }
900   }
901   
902   {
903     struct agr_var *i;
904   
905     for (i = agr->agr_vars; i; i = i->next)
906       {
907         union value *v = case_data_rw (output, i->dest->fv);
908
909         if (agr->missing == COLUMNWISE && i->missing != 0
910             && (i->function & FUNC) != N && (i->function & FUNC) != NU
911             && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
912           {
913             if (i->dest->type == ALPHA)
914               memset (v->s, ' ', i->dest->width);
915             else
916               v->f = SYSMIS;
917             continue;
918           }
919         
920         switch (i->function)
921           {
922           case SUM:
923             v->f = i->int1 ? i->dbl[0] : SYSMIS;
924             break;
925           case MEAN:
926             v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
927             break;
928           case SD:
929             {
930               double variance;
931
932               /* FIXME: we should use two passes. */
933               moments1_calculate (i->moments, NULL, NULL, &variance,
934                                  NULL, NULL);
935               if (variance != SYSMIS)
936                 v->f = sqrt (variance);
937               else
938                 v->f = SYSMIS; 
939             }
940             break;
941           case MAX:
942           case MIN:
943             v->f = i->int1 ? i->dbl[0] : SYSMIS;
944             break;
945           case MAX | FSTRING:
946           case MIN | FSTRING:
947             if (i->int1)
948               memcpy (v->s, i->string, i->dest->width);
949             else
950               memset (v->s, ' ', i->dest->width);
951             break;
952           case FGT:
953           case FGT | FSTRING:
954           case FLT:
955           case FLT | FSTRING:
956           case FIN:
957           case FIN | FSTRING:
958           case FOUT:
959           case FOUT | FSTRING:
960             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
961             break;
962           case PGT:
963           case PGT | FSTRING:
964           case PLT:
965           case PLT | FSTRING:
966           case PIN:
967           case PIN | FSTRING:
968           case POUT:
969           case POUT | FSTRING:
970             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
971             break;
972           case N:
973           case N | FSTRING:
974             v->f = i->dbl[0];
975             break;
976           case NU:
977           case NU | FSTRING:
978             v->f = i->int1;
979             break;
980           case FIRST:
981           case LAST:
982             v->f = i->int1 ? i->dbl[0] : SYSMIS;
983             break;
984           case FIRST | FSTRING:
985           case LAST | FSTRING:
986             if (i->int1)
987               memcpy (v->s, i->string, i->dest->width);
988             else
989               memset (v->s, ' ', i->dest->width);
990             break;
991           case N_NO_VARS:
992             v->f = i->dbl[0];
993             break;
994           case NU_NO_VARS:
995             v->f = i->int1;
996             break;
997           case NMISS:
998           case NMISS | FSTRING:
999             v->f = i->dbl[0];
1000             break;
1001           case NUMISS:
1002           case NUMISS | FSTRING:
1003             v->f = i->int1;
1004             break;
1005           default:
1006             assert (0);
1007           }
1008       }
1009   }
1010 }
1011
1012 /* Resets the state for all the aggregate functions. */
1013 static void
1014 initialize_aggregate_info (struct agr_proc *agr, const struct ccase *input)
1015 {
1016   struct agr_var *iter;
1017
1018   case_destroy (&agr->break_case);
1019   case_clone (&agr->break_case, input);
1020
1021   for (iter = agr->agr_vars; iter; iter = iter->next)
1022     {
1023       iter->missing = 0;
1024       iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1025       iter->int1 = iter->int2 = 0;
1026       switch (iter->function)
1027         {
1028         case MIN:
1029           iter->dbl[0] = DBL_MAX;
1030           break;
1031         case MIN | FSTRING:
1032           memset (iter->string, 255, iter->src->width);
1033           break;
1034         case MAX:
1035           iter->dbl[0] = -DBL_MAX;
1036           break;
1037         case MAX | FSTRING:
1038           memset (iter->string, 0, iter->src->width);
1039           break;
1040         case SD:
1041           if (iter->moments == NULL)
1042             iter->moments = moments1_create (MOMENT_VARIANCE);
1043           else
1044             moments1_clear (iter->moments);
1045           break;
1046         default:
1047           break;
1048         }
1049     }
1050 }
1051 \f
1052 /* Aggregate each case as it comes through.  Cases which aren't needed
1053    are dropped. */
1054 static int
1055 agr_to_active_file (struct ccase *c, void *agr_)
1056 {
1057   struct agr_proc *agr = agr_;
1058
1059   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1060     agr->sink->class->write (agr->sink, &agr->agr_case);
1061
1062   return 1;
1063 }
1064
1065 /* Aggregate the current case and output it if we passed a
1066    breakpoint. */
1067 static int
1068 presorted_agr_to_sysfile (struct ccase *c, void *agr_) 
1069 {
1070   struct agr_proc *agr = agr_;
1071
1072   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1073     sfm_write_case (agr->writer, &agr->agr_case);
1074
1075   return 1;
1076 }