fb955330a4102e0aea3f13156573f4d9414a2438
[pspp-builds.git] / src / aggregate.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include "error.h"
22 #include <stdlib.h>
23 #include "alloc.h"
24 #include "case.h"
25 #include "casefile.h"
26 #include "command.h"
27 #include "dictionary.h"
28 #include "error.h"
29 #include "file-handle.h"
30 #include "lexer.h"
31 #include "misc.h"
32 #include "moments.h"
33 #include "pool.h"
34 #include "settings.h"
35 #include "sfm-write.h"
36 #include "sort.h"
37 #include "str.h"
38 #include "var.h"
39 #include "vfm.h"
40 #include "vfmP.h"
41
42 /* Specifies how to make an aggregate variable. */
43 struct agr_var
44   {
45     struct agr_var *next;               /* Next in list. */
46
47     /* Collected during parsing. */
48     struct variable *src;       /* Source variable. */
49     struct variable *dest;      /* Target variable. */
50     int function;               /* Function. */
51     int include_missing;        /* 1=Include user-missing values. */
52     union value arg[2];         /* Arguments. */
53
54     /* Accumulated during AGGREGATE execution. */
55     double dbl[3];
56     int int1, int2;
57     char *string;
58     int missing;
59     struct moments1 *moments;
60   };
61
62 /* Aggregation functions. */
63 enum
64   {
65     NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
66     FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
67     N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
68     FUNC = 0x1f, /* Function mask. */
69     FSTRING = 1<<5, /* String function bit. */
70   };
71
72 /* Attributes of an aggregation function. */
73 struct agr_func
74   {
75     const char *name;           /* Aggregation function name. */
76     int n_args;                 /* Number of arguments. */
77     int alpha_type;             /* When given ALPHA arguments, output type. */
78     struct fmt_spec format;     /* Format spec if alpha_type != ALPHA. */
79   };
80
81 /* Attributes of aggregation functions. */
82 static const struct agr_func agr_func_tab[] = 
83   {
84     {"<NONE>",  0, -1,      {0, 0, 0}},
85     {"SUM",     0, -1,      {FMT_F, 8, 2}},
86     {"MEAN",    0, -1,      {FMT_F, 8, 2}},
87     {"SD",      0, -1,      {FMT_F, 8, 2}},
88     {"MAX",     0, ALPHA,   {-1, -1, -1}}, 
89     {"MIN",     0, ALPHA,   {-1, -1, -1}}, 
90     {"PGT",     1, NUMERIC, {FMT_F, 5, 1}},      
91     {"PLT",     1, NUMERIC, {FMT_F, 5, 1}},       
92     {"PIN",     2, NUMERIC, {FMT_F, 5, 1}},       
93     {"POUT",    2, NUMERIC, {FMT_F, 5, 1}},       
94     {"FGT",     1, NUMERIC, {FMT_F, 5, 3}},       
95     {"FLT",     1, NUMERIC, {FMT_F, 5, 3}},       
96     {"FIN",     2, NUMERIC, {FMT_F, 5, 3}},       
97     {"FOUT",    2, NUMERIC, {FMT_F, 5, 3}},       
98     {"N",       0, NUMERIC, {FMT_F, 7, 0}},       
99     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},       
100     {"NMISS",   0, NUMERIC, {FMT_F, 7, 0}},       
101     {"NUMISS",  0, NUMERIC, {FMT_F, 7, 0}},       
102     {"FIRST",   0, ALPHA,   {-1, -1, -1}}, 
103     {"LAST",    0, ALPHA,   {-1, -1, -1}},
104     {NULL,      0, -1,      {-1, -1, -1}},
105     {"N",       0, NUMERIC, {FMT_F, 7, 0}},
106     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},
107   };
108
109 /* Missing value types. */
110 enum missing_treatment
111   {
112     ITEMWISE,           /* Missing values item by item. */
113     COLUMNWISE          /* Missing values column by column. */
114   };
115
116 /* An entire AGGREGATE procedure. */
117 struct agr_proc 
118   {
119     /* We have either an output file or a sink. */
120     struct sfm_writer *writer;          /* Output file, or null if none. */
121     struct case_sink *sink;             /* Sink, or null if none. */
122
123     /* Break variables. */
124     struct sort_criteria *sort;         /* Sort criteria. */
125     struct variable **break_vars;       /* Break variables. */
126     size_t break_var_cnt;               /* Number of break variables. */
127     struct ccase break_case;            /* Last values of break variables. */
128
129     enum missing_treatment missing;     /* How to treat missing values. */
130     struct agr_var *agr_vars;           /* First aggregate variable. */
131     struct dictionary *dict;            /* Aggregate dictionary. */
132     int case_cnt;                       /* Counts aggregated cases. */
133     struct ccase agr_case;              /* Aggregate case for output. */
134   };
135
136 static void initialize_aggregate_info (struct agr_proc *,
137                                        const struct ccase *);
138
139 /* Prototypes. */
140 static int parse_aggregate_functions (struct agr_proc *);
141 static void agr_destroy (struct agr_proc *);
142 static int aggregate_single_case (struct agr_proc *agr,
143                                   const struct ccase *input,
144                                   struct ccase *output);
145 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
146
147 /* Aggregating to the active file. */
148 static int agr_to_active_file (struct ccase *, void *aux);
149
150 /* Aggregating to a system file. */
151 static int presorted_agr_to_sysfile (struct ccase *, void *aux);
152 \f
153 /* Parsing. */
154
155 /* Parses and executes the AGGREGATE procedure. */
156 int
157 cmd_aggregate (void)
158 {
159   struct agr_proc agr;
160   struct file_handle *out_file = NULL;
161
162   bool copy_documents = false;
163   bool presorted = false;
164   bool saw_direction;
165
166   memset(&agr, 0 , sizeof (agr));
167   agr.missing = ITEMWISE;
168   case_nullify (&agr.break_case);
169   
170   agr.dict = dict_create ();
171   dict_set_label (agr.dict, dict_get_label (default_dict));
172   dict_set_documents (agr.dict, dict_get_documents (default_dict));
173
174   /* OUTFILE subcommand must be first. */
175   if (!lex_force_match_id ("OUTFILE"))
176     goto error;
177   lex_match ('=');
178   if (!lex_match ('*'))
179     {
180       out_file = fh_parse ();
181       if (out_file == NULL)
182         goto error;
183     }
184   
185   /* Read most of the subcommands. */
186   for (;;)
187     {
188       lex_match ('/');
189       
190       if (lex_match_id ("MISSING"))
191         {
192           lex_match ('=');
193           if (!lex_match_id ("COLUMNWISE"))
194             {
195               lex_error (_("while expecting COLUMNWISE"));
196               goto error;
197             }
198           agr.missing = COLUMNWISE;
199         }
200       else if (lex_match_id ("DOCUMENT"))
201         copy_documents = true;
202       else if (lex_match_id ("PRESORTED"))
203         presorted = true;
204       else if (lex_match_id ("BREAK"))
205         {
206           int i;
207
208           lex_match ('=');
209           agr.sort = sort_parse_criteria (default_dict,
210                                           &agr.break_vars, &agr.break_var_cnt,
211                                           &saw_direction);
212           if (agr.sort == NULL)
213             goto error;
214           
215           for (i = 0; i < agr.break_var_cnt; i++)
216             {
217               struct variable *v = dict_clone_var (agr.dict, agr.break_vars[i],
218                                                    agr.break_vars[i]->name, 
219                                                    agr.break_vars[i]->longname 
220                                                    );
221               assert (v != NULL);
222             }
223
224           /* BREAK must follow the options. */
225           break;
226         }
227       else
228         {
229           lex_error (_("expecting BREAK"));
230           goto error;
231         }
232     }
233   if (presorted && saw_direction)
234     msg (SW, _("When PRESORTED is specified, specifying sorting directions "
235                "with (A) or (D) has no effect.  Output data will be sorted "
236                "the same way as the input data."));
237       
238   /* Read in the aggregate functions. */
239   lex_match ('/');
240   if (!parse_aggregate_functions (&agr))
241     goto error;
242
243   /* Delete documents. */
244   if (!copy_documents)
245     dict_set_documents (agr.dict, NULL);
246
247   /* Cancel SPLIT FILE. */
248   dict_set_split_vars (agr.dict, NULL, 0);
249   
250   /* Initialize. */
251   agr.case_cnt = 0;
252   case_create (&agr.agr_case, dict_get_next_value_idx (agr.dict));
253
254   /* Output to active file or external file? */
255   if (out_file == NULL) 
256     {
257       /* The active file will be replaced by the aggregated data,
258          so TEMPORARY is moot. */
259       cancel_temporary ();
260
261       if (agr.sort != NULL && !presorted)
262         sort_active_file_in_place (agr.sort);
263
264       agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
265       if (agr.sink->class->open != NULL)
266         agr.sink->class->open (agr.sink);
267       vfm_sink = create_case_sink (&null_sink_class, default_dict, NULL);
268       procedure (agr_to_active_file, &agr);
269       if (agr.case_cnt > 0) 
270         {
271           dump_aggregate_info (&agr, &agr.agr_case);
272           agr.sink->class->write (agr.sink, &agr.agr_case);
273         }
274       dict_destroy (default_dict);
275       default_dict = agr.dict;
276       agr.dict = NULL;
277       vfm_source = agr.sink->class->make_source (agr.sink);
278       free_case_sink (agr.sink);
279     }
280   else
281     {
282       agr.writer = sfm_open_writer (out_file, agr.dict, get_scompression (), 0);
283       if (agr.writer == NULL)
284         goto error;
285       
286       if (agr.sort != NULL && !presorted) 
287         {
288           /* Sorting is needed. */
289           struct casefile *dst;
290           struct casereader *reader;
291           struct ccase c;
292           
293           dst = sort_active_file_to_casefile (agr.sort);
294           if (dst == NULL)
295             goto error;
296           reader = casefile_get_destructive_reader (dst);
297           while (casereader_read_xfer (reader, &c)) 
298             {
299               if (aggregate_single_case (&agr, &c, &agr.agr_case)) 
300                 sfm_write_case (agr.writer, &agr.agr_case);
301               case_destroy (&c);
302             }
303           casereader_destroy (reader);
304           casefile_destroy (dst);
305         }
306       else 
307         {
308           /* Active file is already sorted. */
309           procedure (presorted_agr_to_sysfile, &agr);
310         }
311       
312       if (agr.case_cnt > 0) 
313         {
314           dump_aggregate_info (&agr, &agr.agr_case);
315           sfm_write_case (agr.writer, &agr.agr_case);
316         }
317     }
318   
319   agr_destroy (&agr);
320   return CMD_SUCCESS;
321
322 error:
323   agr_destroy (&agr);
324   return CMD_FAILURE;
325 }
326
327 /* Parse all the aggregate functions. */
328 static int
329 parse_aggregate_functions (struct agr_proc *agr)
330 {
331   struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
332
333   /* Parse everything. */
334   tail = NULL;
335   for (;;)
336     {
337       char **dest;
338       char **dest_label;
339       int n_dest;
340
341       int include_missing;
342       const struct agr_func *function;
343       int func_index;
344
345       union value arg[2];
346
347       struct variable **src;
348       int n_src;
349
350       int i;
351
352       dest = NULL;
353       dest_label = NULL;
354       n_dest = 0;
355       src = NULL;
356       function = NULL;
357       n_src = 0;
358       arg[0].c = NULL;
359       arg[1].c = NULL;
360
361       /* Parse the list of target variables. */
362       while (!lex_match ('='))
363         {
364           int n_dest_prev = n_dest;
365           
366           if (!parse_DATA_LIST_vars (&dest, &n_dest,
367                                      PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
368             goto error;
369
370           /* Assign empty labels. */
371           {
372             int j;
373
374             dest_label = xrealloc (dest_label, sizeof *dest_label * n_dest);
375             for (j = n_dest_prev; j < n_dest; j++)
376               dest_label[j] = NULL;
377           }
378           
379           if (token == T_STRING)
380             {
381               ds_truncate (&tokstr, 255);
382               dest_label[n_dest - 1] = xstrdup (ds_c_str (&tokstr));
383               lex_get ();
384             }
385         }
386
387       /* Get the name of the aggregation function. */
388       if (token != T_ID)
389         {
390           lex_error (_("expecting aggregation function"));
391           goto error;
392         }
393
394       include_missing = 0;
395       if (tokid[strlen (tokid) - 1] == '.')
396         {
397           include_missing = 1;
398           tokid[strlen (tokid) - 1] = 0;
399         }
400       
401       for (function = agr_func_tab; function->name; function++)
402         if (!strcasecmp (function->name, tokid))
403           break;
404       if (NULL == function->name)
405         {
406           msg (SE, _("Unknown aggregation function %s."), tokid);
407           goto error;
408         }
409       func_index = function - agr_func_tab;
410       lex_get ();
411
412       /* Check for leading lparen. */
413       if (!lex_match ('('))
414         {
415           if (func_index == N)
416             func_index = N_NO_VARS;
417           else if (func_index == NU)
418             func_index = NU_NO_VARS;
419           else
420             {
421               lex_error (_("expecting `('"));
422               goto error;
423             }
424         }
425       else
426         {
427           /* Parse list of source variables. */
428           {
429             int pv_opts = PV_NO_SCRATCH;
430
431             if (func_index == SUM || func_index == MEAN || func_index == SD)
432               pv_opts |= PV_NUMERIC;
433             else if (function->n_args)
434               pv_opts |= PV_SAME_TYPE;
435
436             if (!parse_variables (default_dict, &src, &n_src, pv_opts))
437               goto error;
438           }
439
440           /* Parse function arguments, for those functions that
441              require arguments. */
442           if (function->n_args != 0)
443             for (i = 0; i < function->n_args; i++)
444               {
445                 int type;
446             
447                 lex_match (',');
448                 if (token == T_STRING)
449                   {
450                     arg[i].c = xstrdup (ds_c_str (&tokstr));
451                     type = ALPHA;
452                   }
453                 else if (lex_is_number ())
454                   {
455                     arg[i].f = tokval;
456                     type = NUMERIC;
457                   } else {
458                     msg (SE, _("Missing argument %d to %s."), i + 1, function->name);
459                     goto error;
460                   }
461             
462                 lex_get ();
463
464                 if (type != src[0]->type)
465                   {
466                     msg (SE, _("Arguments to %s must be of same type as "
467                                "source variables."),
468                          function->name);
469                     goto error;
470                   }
471               }
472
473           /* Trailing rparen. */
474           if (!lex_match(')'))
475             {
476               lex_error (_("expecting `)'"));
477               goto error;
478             }
479           
480           /* Now check that the number of source variables match
481              the number of target variables.  If we check earlier
482              than this, the user can get very misleading error
483              message, i.e. `AGGREGATE x=SUM(y t).' will get this
484              error message when a proper message would be more
485              like `unknown variable t'. */
486           if (n_src != n_dest)
487             {
488               msg (SE, _("Number of source variables (%d) does not match "
489                          "number of target variables (%d)."),
490                    n_src, n_dest);
491               goto error;
492             }
493
494           if ((func_index == PIN || func_index == POUT
495               || func_index == FIN || func_index == FOUT) 
496               && ((src[0]->type == NUMERIC && arg[0].f > arg[1].f)
497                   || (src[0]->type == ALPHA
498                       && st_compare_pad (arg[0].c, strlen (arg[0].c),
499                                          arg[1].c, strlen (arg[1].c)) > 0)))
500             {
501               union value t = arg[0];
502               arg[0] = arg[1];
503               arg[1] = t;
504                   
505               msg (SW, _("The value arguments passed to the %s function "
506                          "are out-of-order.  They will be treated as if "
507                          "they had been specified in the correct order."),
508                    function->name);
509             }
510         }
511         
512       /* Finally add these to the linked list of aggregation
513          variables. */
514       for (i = 0; i < n_dest; i++)
515         {
516           struct agr_var *v = xmalloc (sizeof *v);
517
518           /* Add variable to chain. */
519           if (agr->agr_vars != NULL)
520             tail->next = v;
521           else
522             agr->agr_vars = v;
523           tail = v;
524           tail->next = NULL;
525           v->moments = NULL;
526           
527           /* Create the target variable in the aggregate
528              dictionary. */
529           {
530             static const struct fmt_spec f8_2 = {FMT_F, 8, 2};
531             struct variable *destvar;
532             
533             v->function = func_index;
534
535             if (src)
536               {
537                 v->src = src[i];
538                 
539                 if (src[i]->type == ALPHA)
540                   {
541                     v->function |= FSTRING;
542                     v->string = xmalloc (src[i]->width);
543                   }
544
545                 if (function->alpha_type == ALPHA)
546                   destvar = dict_clone_var (agr->dict, v->src, 0, dest[i] );
547                 else if (v->src->type == NUMERIC
548                          || function->alpha_type == NUMERIC)
549                   {
550                     destvar = dict_create_var (agr->dict, dest[i], 0);
551                     if (destvar != NULL) 
552                       {
553                         if ((func_index == N || func_index == NMISS)
554                             && dict_get_weight (default_dict) != NULL)
555                           destvar->print = destvar->write = f8_2; 
556                         else
557                           destvar->print = destvar->write = function->format;
558                       }
559                   }
560               } else {
561                 v->src = NULL;
562                 destvar = dict_create_var (agr->dict, dest[i], 0);
563                 if (func_index == N_NO_VARS
564                     && dict_get_weight (default_dict) != NULL)
565                   destvar->print = destvar->write = f8_2; 
566                 else
567                   destvar->print = destvar->write = function->format;
568               }
569           
570             if (!destvar)
571               {
572                 msg (SE, _("Variable name %s is not unique within the "
573                            "aggregate file dictionary, which contains "
574                            "the aggregate variables and the break "
575                            "variables."),
576                      dest[i]);
577                 goto error;
578               }
579
580             free (dest[i]);
581             destvar->init = 0;
582             if (dest_label[i])
583               {
584                 destvar->label = dest_label[i];
585                 dest_label[i] = NULL;
586               }
587
588             v->dest = destvar;
589           }
590           
591           v->include_missing = include_missing;
592
593           if (v->src != NULL)
594             {
595               int j;
596
597               if (v->src->type == NUMERIC)
598                 for (j = 0; j < function->n_args; j++)
599                   v->arg[j].f = arg[j].f;
600               else
601                 for (j = 0; j < function->n_args; j++)
602                   v->arg[j].c = xstrdup (arg[j].c);
603             }
604         }
605       
606       if (src != NULL && src[0]->type == ALPHA)
607         for (i = 0; i < function->n_args; i++)
608           {
609             free (arg[i].c);
610             arg[i].c = NULL;
611           }
612
613       free (src);
614       free (dest);
615       free (dest_label);
616
617       if (!lex_match ('/'))
618         {
619           if (token == '.')
620             return 1;
621
622           lex_error ("expecting end of command");
623           return 0;
624         }
625       continue;
626       
627     error:
628       for (i = 0; i < n_dest; i++)
629         {
630           free (dest[i]);
631           free (dest_label[i]);
632         }
633       free (dest);
634       free (dest_label);
635       free (arg[0].c);
636       free (arg[1].c);
637       if (src && n_src && src[0]->type == ALPHA)
638         for (i = 0; i < function->n_args; i++)
639           {
640             free (arg[i].c);
641             arg[i].c = NULL;
642           }
643       free (src);
644         
645       return 0;
646     }
647 }
648
649 /* Destroys AGR. */
650 static void
651 agr_destroy (struct agr_proc *agr)
652 {
653   struct agr_var *iter, *next;
654
655   sfm_close_writer (agr->writer);
656   if (agr->sort != NULL)
657     sort_destroy_criteria (agr->sort);
658   free (agr->break_vars);
659   case_destroy (&agr->break_case);
660   for (iter = agr->agr_vars; iter; iter = next)
661     {
662       next = iter->next;
663
664       if (iter->function & FSTRING)
665         {
666           int n_args;
667           int i;
668
669           n_args = agr_func_tab[iter->function & FUNC].n_args;
670           for (i = 0; i < n_args; i++)
671             free (iter->arg[i].c);
672           free (iter->string);
673         }
674       else if (iter->function == SD)
675         moments1_destroy (iter->moments);
676       free (iter);
677     }
678   if (agr->dict != NULL)
679     dict_destroy (agr->dict);
680
681   case_destroy (&agr->agr_case);
682 }
683 \f
684 /* Execution. */
685
686 static void accumulate_aggregate_info (struct agr_proc *,
687                                        const struct ccase *);
688 static void dump_aggregate_info (struct agr_proc *, struct ccase *);
689
690 /* Processes a single case INPUT for aggregation.  If output is
691    warranted, writes it to OUTPUT and returns nonzero.
692    Otherwise, returns zero and OUTPUT is unmodified. */
693 static int
694 aggregate_single_case (struct agr_proc *agr,
695                        const struct ccase *input, struct ccase *output)
696 {
697   bool finished_group = false;
698   
699   if (agr->case_cnt++ == 0)
700     initialize_aggregate_info (agr, input);
701   else if (case_compare (&agr->break_case, input,
702                          agr->break_vars, agr->break_var_cnt))
703     {
704       dump_aggregate_info (agr, output);
705       finished_group = true;
706
707       initialize_aggregate_info (agr, input);
708     }
709
710   accumulate_aggregate_info (agr, input);
711   return finished_group;
712 }
713
714 /* Accumulates aggregation data from the case INPUT. */
715 static void 
716 accumulate_aggregate_info (struct agr_proc *agr,
717                            const struct ccase *input)
718 {
719   struct agr_var *iter;
720   double weight;
721   int bad_warn = 1;
722
723   weight = dict_get_case_weight (default_dict, input, &bad_warn);
724
725   for (iter = agr->agr_vars; iter; iter = iter->next)
726     if (iter->src)
727       {
728         const union value *v = case_data (input, iter->src->fv);
729
730         if ((!iter->include_missing && is_missing (v, iter->src))
731             || (iter->include_missing && iter->src->type == NUMERIC
732                 && v->f == SYSMIS))
733           {
734             switch (iter->function)
735               {
736               case NMISS:
737               case NMISS | FSTRING:
738                 iter->dbl[0] += weight;
739                 break;
740               case NUMISS:
741               case NUMISS | FSTRING:
742                 iter->int1++;
743                 break;
744               }
745             iter->missing = 1;
746             continue;
747           }
748         
749         /* This is horrible.  There are too many possibilities. */
750         switch (iter->function)
751           {
752           case SUM:
753             iter->dbl[0] += v->f * weight;
754             iter->int1 = 1;
755             break;
756           case MEAN:
757             iter->dbl[0] += v->f * weight;
758             iter->dbl[1] += weight;
759             break;
760           case SD:
761             moments1_add (iter->moments, v->f, weight);
762             break;
763           case MAX:
764             iter->dbl[0] = max (iter->dbl[0], v->f);
765             iter->int1 = 1;
766             break;
767           case MAX | FSTRING:
768             if (memcmp (iter->string, v->s, iter->src->width) < 0)
769               memcpy (iter->string, v->s, iter->src->width);
770             iter->int1 = 1;
771             break;
772           case MIN:
773             iter->dbl[0] = min (iter->dbl[0], v->f);
774             iter->int1 = 1;
775             break;
776           case MIN | FSTRING:
777             if (memcmp (iter->string, v->s, iter->src->width) > 0)
778               memcpy (iter->string, v->s, iter->src->width);
779             iter->int1 = 1;
780             break;
781           case FGT:
782           case PGT:
783             if (v->f > iter->arg[0].f)
784               iter->dbl[0] += weight;
785             iter->dbl[1] += weight;
786             break;
787           case FGT | FSTRING:
788           case PGT | FSTRING:
789             if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
790               iter->dbl[0] += weight;
791             iter->dbl[1] += weight;
792             break;
793           case FLT:
794           case PLT:
795             if (v->f < iter->arg[0].f)
796               iter->dbl[0] += weight;
797             iter->dbl[1] += weight;
798             break;
799           case FLT | FSTRING:
800           case PLT | FSTRING:
801             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
802               iter->dbl[0] += weight;
803             iter->dbl[1] += weight;
804             break;
805           case FIN:
806           case PIN:
807             if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
808               iter->dbl[0] += weight;
809             iter->dbl[1] += weight;
810             break;
811           case FIN | FSTRING:
812           case PIN | FSTRING:
813             if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
814                 && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
815               iter->dbl[0] += weight;
816             iter->dbl[1] += weight;
817             break;
818           case FOUT:
819           case POUT:
820             if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
821               iter->dbl[0] += weight;
822             iter->dbl[1] += weight;
823             break;
824           case FOUT | FSTRING:
825           case POUT | FSTRING:
826             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
827                 || memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
828               iter->dbl[0] += weight;
829             iter->dbl[1] += weight;
830             break;
831           case N:
832           case N | FSTRING:
833             iter->dbl[0] += weight;
834             break;
835           case NU:
836           case NU | FSTRING:
837             iter->int1++;
838             break;
839           case FIRST:
840             if (iter->int1 == 0)
841               {
842                 iter->dbl[0] = v->f;
843                 iter->int1 = 1;
844               }
845             break;
846           case FIRST | FSTRING:
847             if (iter->int1 == 0)
848               {
849                 memcpy (iter->string, v->s, iter->src->width);
850                 iter->int1 = 1;
851               }
852             break;
853           case LAST:
854             iter->dbl[0] = v->f;
855             iter->int1 = 1;
856             break;
857           case LAST | FSTRING:
858             memcpy (iter->string, v->s, iter->src->width);
859             iter->int1 = 1;
860             break;
861           case NMISS:
862           case NMISS | FSTRING:
863           case NUMISS:
864           case NUMISS | FSTRING:
865             /* Our value is not missing or it would have been
866                caught earlier.  Nothing to do. */
867             break;
868           default:
869             assert (0);
870           }
871     } else {
872       switch (iter->function)
873         {
874         case N_NO_VARS:
875           iter->dbl[0] += weight;
876           break;
877         case NU_NO_VARS:
878           iter->int1++;
879           break;
880         default:
881           assert (0);
882         }
883     }
884 }
885
886 /* We've come to a record that differs from the previous in one or
887    more of the break variables.  Make an output record from the
888    accumulated statistics in the OUTPUT case. */
889 static void 
890 dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
891 {
892   {
893     int value_idx = 0;
894     int i;
895
896     for (i = 0; i < agr->break_var_cnt; i++) 
897       {
898         struct variable *v = agr->break_vars[i];
899         memcpy (case_data_rw (output, value_idx),
900                 case_data (&agr->break_case, v->fv),
901                 sizeof (union value) * v->nv);
902         value_idx += v->nv; 
903       }
904   }
905   
906   {
907     struct agr_var *i;
908   
909     for (i = agr->agr_vars; i; i = i->next)
910       {
911         union value *v = case_data_rw (output, i->dest->fv);
912
913         if (agr->missing == COLUMNWISE && i->missing != 0
914             && (i->function & FUNC) != N && (i->function & FUNC) != NU
915             && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
916           {
917             if (i->dest->type == ALPHA)
918               memset (v->s, ' ', i->dest->width);
919             else
920               v->f = SYSMIS;
921             continue;
922           }
923         
924         switch (i->function)
925           {
926           case SUM:
927             v->f = i->int1 ? i->dbl[0] : SYSMIS;
928             break;
929           case MEAN:
930             v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
931             break;
932           case SD:
933             {
934               double variance;
935
936               /* FIXME: we should use two passes. */
937               moments1_calculate (i->moments, NULL, NULL, &variance,
938                                  NULL, NULL);
939               if (variance != SYSMIS)
940                 v->f = sqrt (variance);
941               else
942                 v->f = SYSMIS; 
943             }
944             break;
945           case MAX:
946           case MIN:
947             v->f = i->int1 ? i->dbl[0] : SYSMIS;
948             break;
949           case MAX | FSTRING:
950           case MIN | FSTRING:
951             if (i->int1)
952               memcpy (v->s, i->string, i->dest->width);
953             else
954               memset (v->s, ' ', i->dest->width);
955             break;
956           case FGT:
957           case FGT | FSTRING:
958           case FLT:
959           case FLT | FSTRING:
960           case FIN:
961           case FIN | FSTRING:
962           case FOUT:
963           case FOUT | FSTRING:
964             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
965             break;
966           case PGT:
967           case PGT | FSTRING:
968           case PLT:
969           case PLT | FSTRING:
970           case PIN:
971           case PIN | FSTRING:
972           case POUT:
973           case POUT | FSTRING:
974             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
975             break;
976           case N:
977           case N | FSTRING:
978             v->f = i->dbl[0];
979             break;
980           case NU:
981           case NU | FSTRING:
982             v->f = i->int1;
983             break;
984           case FIRST:
985           case LAST:
986             v->f = i->int1 ? i->dbl[0] : SYSMIS;
987             break;
988           case FIRST | FSTRING:
989           case LAST | FSTRING:
990             if (i->int1)
991               memcpy (v->s, i->string, i->dest->width);
992             else
993               memset (v->s, ' ', i->dest->width);
994             break;
995           case N_NO_VARS:
996             v->f = i->dbl[0];
997             break;
998           case NU_NO_VARS:
999             v->f = i->int1;
1000             break;
1001           case NMISS:
1002           case NMISS | FSTRING:
1003             v->f = i->dbl[0];
1004             break;
1005           case NUMISS:
1006           case NUMISS | FSTRING:
1007             v->f = i->int1;
1008             break;
1009           default:
1010             assert (0);
1011           }
1012       }
1013   }
1014 }
1015
1016 /* Resets the state for all the aggregate functions. */
1017 static void
1018 initialize_aggregate_info (struct agr_proc *agr, const struct ccase *input)
1019 {
1020   struct agr_var *iter;
1021
1022   case_destroy (&agr->break_case);
1023   case_clone (&agr->break_case, input);
1024
1025   for (iter = agr->agr_vars; iter; iter = iter->next)
1026     {
1027       iter->missing = 0;
1028       iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1029       iter->int1 = iter->int2 = 0;
1030       switch (iter->function)
1031         {
1032         case MIN:
1033           iter->dbl[0] = DBL_MAX;
1034           break;
1035         case MIN | FSTRING:
1036           memset (iter->string, 255, iter->src->width);
1037           break;
1038         case MAX:
1039           iter->dbl[0] = -DBL_MAX;
1040           break;
1041         case MAX | FSTRING:
1042           memset (iter->string, 0, iter->src->width);
1043           break;
1044         case SD:
1045           if (iter->moments == NULL)
1046             iter->moments = moments1_create (MOMENT_VARIANCE);
1047           else
1048             moments1_clear (iter->moments);
1049           break;
1050         default:
1051           break;
1052         }
1053     }
1054 }
1055 \f
1056 /* Aggregate each case as it comes through.  Cases which aren't needed
1057    are dropped. */
1058 static int
1059 agr_to_active_file (struct ccase *c, void *agr_)
1060 {
1061   struct agr_proc *agr = agr_;
1062
1063   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1064     agr->sink->class->write (agr->sink, &agr->agr_case);
1065
1066   return 1;
1067 }
1068
1069 /* Aggregate the current case and output it if we passed a
1070    breakpoint. */
1071 static int
1072 presorted_agr_to_sysfile (struct ccase *c, void *agr_) 
1073 {
1074   struct agr_proc *agr = agr_;
1075
1076   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1077     sfm_write_case (agr->writer, &agr->agr_case);
1078
1079   return 1;
1080 }