bc554b97bf588845ca0a2fa4e1e794cef200dcbc
[pspp-builds.git] / src / aggregate.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21 #include "error.h"
22 #include <stdlib.h>
23 #include "alloc.h"
24 #include "case.h"
25 #include "casefile.h"
26 #include "command.h"
27 #include "dictionary.h"
28 #include "error.h"
29 #include "file-handle.h"
30 #include "lexer.h"
31 #include "misc.h"
32 #include "moments.h"
33 #include "pool.h"
34 #include "settings.h"
35 #include "sfm-write.h"
36 #include "sort-prs.h"
37 #include "sort.h"
38 #include "str.h"
39 #include "var.h"
40 #include "vfm.h"
41 #include "vfmP.h"
42
43 #include "gettext.h"
44 #define _(msgid) gettext (msgid)
45
46 /* Specifies how to make an aggregate variable. */
47 struct agr_var
48   {
49     struct agr_var *next;               /* Next in list. */
50
51     /* Collected during parsing. */
52     struct variable *src;       /* Source variable. */
53     struct variable *dest;      /* Target variable. */
54     int function;               /* Function. */
55     int include_missing;        /* 1=Include user-missing values. */
56     union value arg[2];         /* Arguments. */
57
58     /* Accumulated during AGGREGATE execution. */
59     double dbl[3];
60     int int1, int2;
61     char *string;
62     int missing;
63     struct moments1 *moments;
64   };
65
66 /* Aggregation functions. */
67 enum
68   {
69     NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
70     FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
71     N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
72     FUNC = 0x1f, /* Function mask. */
73     FSTRING = 1<<5, /* String function bit. */
74   };
75
76 /* Attributes of an aggregation function. */
77 struct agr_func
78   {
79     const char *name;           /* Aggregation function name. */
80     int n_args;                 /* Number of arguments. */
81     int alpha_type;             /* When given ALPHA arguments, output type. */
82     struct fmt_spec format;     /* Format spec if alpha_type != ALPHA. */
83   };
84
85 /* Attributes of aggregation functions. */
86 static const struct agr_func agr_func_tab[] = 
87   {
88     {"<NONE>",  0, -1,      {0, 0, 0}},
89     {"SUM",     0, -1,      {FMT_F, 8, 2}},
90     {"MEAN",    0, -1,      {FMT_F, 8, 2}},
91     {"SD",      0, -1,      {FMT_F, 8, 2}},
92     {"MAX",     0, ALPHA,   {-1, -1, -1}}, 
93     {"MIN",     0, ALPHA,   {-1, -1, -1}}, 
94     {"PGT",     1, NUMERIC, {FMT_F, 5, 1}},      
95     {"PLT",     1, NUMERIC, {FMT_F, 5, 1}},       
96     {"PIN",     2, NUMERIC, {FMT_F, 5, 1}},       
97     {"POUT",    2, NUMERIC, {FMT_F, 5, 1}},       
98     {"FGT",     1, NUMERIC, {FMT_F, 5, 3}},       
99     {"FLT",     1, NUMERIC, {FMT_F, 5, 3}},       
100     {"FIN",     2, NUMERIC, {FMT_F, 5, 3}},       
101     {"FOUT",    2, NUMERIC, {FMT_F, 5, 3}},       
102     {"N",       0, NUMERIC, {FMT_F, 7, 0}},       
103     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},       
104     {"NMISS",   0, NUMERIC, {FMT_F, 7, 0}},       
105     {"NUMISS",  0, NUMERIC, {FMT_F, 7, 0}},       
106     {"FIRST",   0, ALPHA,   {-1, -1, -1}}, 
107     {"LAST",    0, ALPHA,   {-1, -1, -1}},
108     {NULL,      0, -1,      {-1, -1, -1}},
109     {"N",       0, NUMERIC, {FMT_F, 7, 0}},
110     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},
111   };
112
113 /* Missing value types. */
114 enum missing_treatment
115   {
116     ITEMWISE,           /* Missing values item by item. */
117     COLUMNWISE          /* Missing values column by column. */
118   };
119
120 /* An entire AGGREGATE procedure. */
121 struct agr_proc 
122   {
123     /* We have either an output file or a sink. */
124     struct sfm_writer *writer;          /* Output file, or null if none. */
125     struct case_sink *sink;             /* Sink, or null if none. */
126
127     /* Break variables. */
128     struct sort_criteria *sort;         /* Sort criteria. */
129     struct variable **break_vars;       /* Break variables. */
130     size_t break_var_cnt;               /* Number of break variables. */
131     struct ccase break_case;            /* Last values of break variables. */
132
133     enum missing_treatment missing;     /* How to treat missing values. */
134     struct agr_var *agr_vars;           /* First aggregate variable. */
135     struct dictionary *dict;            /* Aggregate dictionary. */
136     int case_cnt;                       /* Counts aggregated cases. */
137     struct ccase agr_case;              /* Aggregate case for output. */
138   };
139
140 static void initialize_aggregate_info (struct agr_proc *,
141                                        const struct ccase *);
142
143 /* Prototypes. */
144 static int parse_aggregate_functions (struct agr_proc *);
145 static void agr_destroy (struct agr_proc *);
146 static int aggregate_single_case (struct agr_proc *agr,
147                                   const struct ccase *input,
148                                   struct ccase *output);
149 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
150
151 /* Aggregating to the active file. */
152 static int agr_to_active_file (struct ccase *, void *aux);
153
154 /* Aggregating to a system file. */
155 static int presorted_agr_to_sysfile (struct ccase *, void *aux);
156 \f
157 /* Parsing. */
158
159 /* Parses and executes the AGGREGATE procedure. */
160 int
161 cmd_aggregate (void)
162 {
163   struct agr_proc agr;
164   struct file_handle *out_file = NULL;
165
166   bool copy_documents = false;
167   bool presorted = false;
168   bool saw_direction;
169
170   memset(&agr, 0 , sizeof (agr));
171   agr.missing = ITEMWISE;
172   case_nullify (&agr.break_case);
173   
174   agr.dict = dict_create ();
175   dict_set_label (agr.dict, dict_get_label (default_dict));
176   dict_set_documents (agr.dict, dict_get_documents (default_dict));
177
178   /* OUTFILE subcommand must be first. */
179   if (!lex_force_match_id ("OUTFILE"))
180     goto error;
181   lex_match ('=');
182   if (!lex_match ('*'))
183     {
184       out_file = fh_parse ();
185       if (out_file == NULL)
186         goto error;
187     }
188   
189   /* Read most of the subcommands. */
190   for (;;)
191     {
192       lex_match ('/');
193       
194       if (lex_match_id ("MISSING"))
195         {
196           lex_match ('=');
197           if (!lex_match_id ("COLUMNWISE"))
198             {
199               lex_error (_("while expecting COLUMNWISE"));
200               goto error;
201             }
202           agr.missing = COLUMNWISE;
203         }
204       else if (lex_match_id ("DOCUMENT"))
205         copy_documents = true;
206       else if (lex_match_id ("PRESORTED"))
207         presorted = true;
208       else if (lex_match_id ("BREAK"))
209         {
210           int i;
211
212           lex_match ('=');
213           agr.sort = sort_parse_criteria (default_dict,
214                                           &agr.break_vars, &agr.break_var_cnt,
215                                           &saw_direction, NULL);
216           if (agr.sort == NULL)
217             goto error;
218           
219           for (i = 0; i < agr.break_var_cnt; i++)
220             dict_clone_var_assert (agr.dict, agr.break_vars[i],
221                                    agr.break_vars[i]->name);
222
223           /* BREAK must follow the options. */
224           break;
225         }
226       else
227         {
228           lex_error (_("expecting BREAK"));
229           goto error;
230         }
231     }
232   if (presorted && saw_direction)
233     msg (SW, _("When PRESORTED is specified, specifying sorting directions "
234                "with (A) or (D) has no effect.  Output data will be sorted "
235                "the same way as the input data."));
236       
237   /* Read in the aggregate functions. */
238   lex_match ('/');
239   if (!parse_aggregate_functions (&agr))
240     goto error;
241
242   /* Delete documents. */
243   if (!copy_documents)
244     dict_set_documents (agr.dict, NULL);
245
246   /* Cancel SPLIT FILE. */
247   dict_set_split_vars (agr.dict, NULL, 0);
248   
249   /* Initialize. */
250   agr.case_cnt = 0;
251   case_create (&agr.agr_case, dict_get_next_value_idx (agr.dict));
252
253   /* Output to active file or external file? */
254   if (out_file == NULL) 
255     {
256       /* The active file will be replaced by the aggregated data,
257          so TEMPORARY is moot. */
258       cancel_temporary ();
259
260       if (agr.sort != NULL && !presorted)
261         sort_active_file_in_place (agr.sort);
262
263       agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
264       if (agr.sink->class->open != NULL)
265         agr.sink->class->open (agr.sink);
266       vfm_sink = create_case_sink (&null_sink_class, default_dict, NULL);
267       procedure (agr_to_active_file, &agr);
268       if (agr.case_cnt > 0) 
269         {
270           dump_aggregate_info (&agr, &agr.agr_case);
271           agr.sink->class->write (agr.sink, &agr.agr_case);
272         }
273       dict_destroy (default_dict);
274       default_dict = agr.dict;
275       agr.dict = NULL;
276       vfm_source = agr.sink->class->make_source (agr.sink);
277       free_case_sink (agr.sink);
278     }
279   else
280     {
281       agr.writer = sfm_open_writer (out_file, agr.dict, get_scompression (), 0);
282       if (agr.writer == NULL)
283         goto error;
284       
285       if (agr.sort != NULL && !presorted) 
286         {
287           /* Sorting is needed. */
288           struct casefile *dst;
289           struct casereader *reader;
290           struct ccase c;
291           
292           dst = sort_active_file_to_casefile (agr.sort);
293           if (dst == NULL)
294             goto error;
295           reader = casefile_get_destructive_reader (dst);
296           while (casereader_read_xfer (reader, &c)) 
297             {
298               if (aggregate_single_case (&agr, &c, &agr.agr_case)) 
299                 sfm_write_case (agr.writer, &agr.agr_case);
300               case_destroy (&c);
301             }
302           casereader_destroy (reader);
303           casefile_destroy (dst);
304         }
305       else 
306         {
307           /* Active file is already sorted. */
308           procedure (presorted_agr_to_sysfile, &agr);
309         }
310       
311       if (agr.case_cnt > 0) 
312         {
313           dump_aggregate_info (&agr, &agr.agr_case);
314           sfm_write_case (agr.writer, &agr.agr_case);
315         }
316     }
317   
318   agr_destroy (&agr);
319   return CMD_SUCCESS;
320
321 error:
322   agr_destroy (&agr);
323   return CMD_FAILURE;
324 }
325
326 /* Parse all the aggregate functions. */
327 static int
328 parse_aggregate_functions (struct agr_proc *agr)
329 {
330   struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
331
332   /* Parse everything. */
333   tail = NULL;
334   for (;;)
335     {
336       char **dest;
337       char **dest_label;
338       int n_dest;
339
340       int include_missing;
341       const struct agr_func *function;
342       int func_index;
343
344       union value arg[2];
345
346       struct variable **src;
347       int n_src;
348
349       int i;
350
351       dest = NULL;
352       dest_label = NULL;
353       n_dest = 0;
354       src = NULL;
355       function = NULL;
356       n_src = 0;
357       arg[0].c = NULL;
358       arg[1].c = NULL;
359
360       /* Parse the list of target variables. */
361       while (!lex_match ('='))
362         {
363           int n_dest_prev = n_dest;
364           
365           if (!parse_DATA_LIST_vars (&dest, &n_dest,
366                                      PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
367             goto error;
368
369           /* Assign empty labels. */
370           {
371             int j;
372
373             dest_label = xrealloc (dest_label, sizeof *dest_label * n_dest);
374             for (j = n_dest_prev; j < n_dest; j++)
375               dest_label[j] = NULL;
376           }
377           
378           if (token == T_STRING)
379             {
380               ds_truncate (&tokstr, 255);
381               dest_label[n_dest - 1] = xstrdup (ds_c_str (&tokstr));
382               lex_get ();
383             }
384         }
385
386       /* Get the name of the aggregation function. */
387       if (token != T_ID)
388         {
389           lex_error (_("expecting aggregation function"));
390           goto error;
391         }
392
393       include_missing = 0;
394       if (tokid[strlen (tokid) - 1] == '.')
395         {
396           include_missing = 1;
397           tokid[strlen (tokid) - 1] = 0;
398         }
399       
400       for (function = agr_func_tab; function->name; function++)
401         if (!strcasecmp (function->name, tokid))
402           break;
403       if (NULL == function->name)
404         {
405           msg (SE, _("Unknown aggregation function %s."), tokid);
406           goto error;
407         }
408       func_index = function - agr_func_tab;
409       lex_get ();
410
411       /* Check for leading lparen. */
412       if (!lex_match ('('))
413         {
414           if (func_index == N)
415             func_index = N_NO_VARS;
416           else if (func_index == NU)
417             func_index = NU_NO_VARS;
418           else
419             {
420               lex_error (_("expecting `('"));
421               goto error;
422             }
423         }
424       else
425         {
426           /* Parse list of source variables. */
427           {
428             int pv_opts = PV_NO_SCRATCH;
429
430             if (func_index == SUM || func_index == MEAN || func_index == SD)
431               pv_opts |= PV_NUMERIC;
432             else if (function->n_args)
433               pv_opts |= PV_SAME_TYPE;
434
435             if (!parse_variables (default_dict, &src, &n_src, pv_opts))
436               goto error;
437           }
438
439           /* Parse function arguments, for those functions that
440              require arguments. */
441           if (function->n_args != 0)
442             for (i = 0; i < function->n_args; i++)
443               {
444                 int type;
445             
446                 lex_match (',');
447                 if (token == T_STRING)
448                   {
449                     arg[i].c = xstrdup (ds_c_str (&tokstr));
450                     type = ALPHA;
451                   }
452                 else if (lex_is_number ())
453                   {
454                     arg[i].f = tokval;
455                     type = NUMERIC;
456                   } else {
457                     msg (SE, _("Missing argument %d to %s."), i + 1,
458                          function->name);
459                     goto error;
460                   }
461             
462                 lex_get ();
463
464                 if (type != src[0]->type)
465                   {
466                     msg (SE, _("Arguments to %s must be of same type as "
467                                "source variables."),
468                          function->name);
469                     goto error;
470                   }
471               }
472
473           /* Trailing rparen. */
474           if (!lex_match(')'))
475             {
476               lex_error (_("expecting `)'"));
477               goto error;
478             }
479           
480           /* Now check that the number of source variables match
481              the number of target variables.  If we check earlier
482              than this, the user can get very misleading error
483              message, i.e. `AGGREGATE x=SUM(y t).' will get this
484              error message when a proper message would be more
485              like `unknown variable t'. */
486           if (n_src != n_dest)
487             {
488               msg (SE, _("Number of source variables (%d) does not match "
489                          "number of target variables (%d)."),
490                    n_src, n_dest);
491               goto error;
492             }
493
494           if ((func_index == PIN || func_index == POUT
495               || func_index == FIN || func_index == FOUT) 
496               && ((src[0]->type == NUMERIC && arg[0].f > arg[1].f)
497                   || (src[0]->type == ALPHA
498                       && str_compare_rpad (arg[0].c, arg[1].c) > 0)))
499             {
500               union value t = arg[0];
501               arg[0] = arg[1];
502               arg[1] = t;
503                   
504               msg (SW, _("The value arguments passed to the %s function "
505                          "are out-of-order.  They will be treated as if "
506                          "they had been specified in the correct order."),
507                    function->name);
508             }
509         }
510         
511       /* Finally add these to the linked list of aggregation
512          variables. */
513       for (i = 0; i < n_dest; i++)
514         {
515           struct agr_var *v = xmalloc (sizeof *v);
516
517           /* Add variable to chain. */
518           if (agr->agr_vars != NULL)
519             tail->next = v;
520           else
521             agr->agr_vars = v;
522           tail = v;
523           tail->next = NULL;
524           v->moments = NULL;
525           
526           /* Create the target variable in the aggregate
527              dictionary. */
528           {
529             struct variable *destvar;
530             
531             v->function = func_index;
532
533             if (src)
534               {
535                 v->src = src[i];
536                 
537                 if (src[i]->type == ALPHA)
538                   {
539                     v->function |= FSTRING;
540                     v->string = xmalloc (src[i]->width);
541                   }
542
543                 if (function->alpha_type == ALPHA)
544                   destvar = dict_clone_var (agr->dict, v->src, dest[i]);
545                 else
546                   {
547                     assert (v->src->type == NUMERIC
548                             || function->alpha_type == NUMERIC);
549                     destvar = dict_create_var (agr->dict, dest[i], 0);
550                     if (destvar != NULL) 
551                       {
552                         if ((func_index == N || func_index == NMISS)
553                             && dict_get_weight (default_dict) != NULL)
554                           destvar->print = destvar->write = f8_2; 
555                         else
556                           destvar->print = destvar->write = function->format;
557                       }
558                   }
559               } else {
560                 v->src = NULL;
561                 destvar = dict_create_var (agr->dict, dest[i], 0);
562                 if (func_index == N_NO_VARS
563                     && dict_get_weight (default_dict) != NULL)
564                   destvar->print = destvar->write = f8_2; 
565                 else
566                   destvar->print = destvar->write = function->format;
567               }
568           
569             if (!destvar)
570               {
571                 msg (SE, _("Variable name %s is not unique within the "
572                            "aggregate file dictionary, which contains "
573                            "the aggregate variables and the break "
574                            "variables."),
575                      dest[i]);
576                 goto error;
577               }
578
579             free (dest[i]);
580             destvar->init = 0;
581             if (dest_label[i])
582               {
583                 destvar->label = dest_label[i];
584                 dest_label[i] = NULL;
585               }
586
587             v->dest = destvar;
588           }
589           
590           v->include_missing = include_missing;
591
592           if (v->src != NULL)
593             {
594               int j;
595
596               if (v->src->type == NUMERIC)
597                 for (j = 0; j < function->n_args; j++)
598                   v->arg[j].f = arg[j].f;
599               else
600                 for (j = 0; j < function->n_args; j++)
601                   v->arg[j].c = xstrdup (arg[j].c);
602             }
603         }
604       
605       if (src != NULL && src[0]->type == ALPHA)
606         for (i = 0; i < function->n_args; i++)
607           {
608             free (arg[i].c);
609             arg[i].c = NULL;
610           }
611
612       free (src);
613       free (dest);
614       free (dest_label);
615
616       if (!lex_match ('/'))
617         {
618           if (token == '.')
619             return 1;
620
621           lex_error ("expecting end of command");
622           return 0;
623         }
624       continue;
625       
626     error:
627       for (i = 0; i < n_dest; i++)
628         {
629           free (dest[i]);
630           free (dest_label[i]);
631         }
632       free (dest);
633       free (dest_label);
634       free (arg[0].c);
635       free (arg[1].c);
636       if (src && n_src && src[0]->type == ALPHA)
637         for (i = 0; i < function->n_args; i++)
638           {
639             free (arg[i].c);
640             arg[i].c = NULL;
641           }
642       free (src);
643         
644       return 0;
645     }
646 }
647
648 /* Destroys AGR. */
649 static void
650 agr_destroy (struct agr_proc *agr)
651 {
652   struct agr_var *iter, *next;
653
654   sfm_close_writer (agr->writer);
655   if (agr->sort != NULL)
656     sort_destroy_criteria (agr->sort);
657   free (agr->break_vars);
658   case_destroy (&agr->break_case);
659   for (iter = agr->agr_vars; iter; iter = next)
660     {
661       next = iter->next;
662
663       if (iter->function & FSTRING)
664         {
665           int n_args;
666           int i;
667
668           n_args = agr_func_tab[iter->function & FUNC].n_args;
669           for (i = 0; i < n_args; i++)
670             free (iter->arg[i].c);
671           free (iter->string);
672         }
673       else if (iter->function == SD)
674         moments1_destroy (iter->moments);
675       free (iter);
676     }
677   if (agr->dict != NULL)
678     dict_destroy (agr->dict);
679
680   case_destroy (&agr->agr_case);
681 }
682 \f
683 /* Execution. */
684
685 static void accumulate_aggregate_info (struct agr_proc *,
686                                        const struct ccase *);
687 static void dump_aggregate_info (struct agr_proc *, struct ccase *);
688
689 /* Processes a single case INPUT for aggregation.  If output is
690    warranted, writes it to OUTPUT and returns nonzero.
691    Otherwise, returns zero and OUTPUT is unmodified. */
692 static int
693 aggregate_single_case (struct agr_proc *agr,
694                        const struct ccase *input, struct ccase *output)
695 {
696   bool finished_group = false;
697   
698   if (agr->case_cnt++ == 0)
699     initialize_aggregate_info (agr, input);
700   else if (case_compare (&agr->break_case, input,
701                          agr->break_vars, agr->break_var_cnt))
702     {
703       dump_aggregate_info (agr, output);
704       finished_group = true;
705
706       initialize_aggregate_info (agr, input);
707     }
708
709   accumulate_aggregate_info (agr, input);
710   return finished_group;
711 }
712
713 /* Accumulates aggregation data from the case INPUT. */
714 static void 
715 accumulate_aggregate_info (struct agr_proc *agr,
716                            const struct ccase *input)
717 {
718   struct agr_var *iter;
719   double weight;
720   int bad_warn = 1;
721
722   weight = dict_get_case_weight (default_dict, input, &bad_warn);
723
724   for (iter = agr->agr_vars; iter; iter = iter->next)
725     if (iter->src)
726       {
727         const union value *v = case_data (input, iter->src->fv);
728
729         if ((!iter->include_missing
730              && mv_is_value_missing (&iter->src->miss, v))
731             || (iter->include_missing && iter->src->type == NUMERIC
732                 && v->f == SYSMIS))
733           {
734             switch (iter->function)
735               {
736               case NMISS:
737               case NMISS | FSTRING:
738                 iter->dbl[0] += weight;
739                 break;
740               case NUMISS:
741               case NUMISS | FSTRING:
742                 iter->int1++;
743                 break;
744               }
745             iter->missing = 1;
746             continue;
747           }
748         
749         /* This is horrible.  There are too many possibilities. */
750         switch (iter->function)
751           {
752           case SUM:
753             iter->dbl[0] += v->f * weight;
754             iter->int1 = 1;
755             break;
756           case MEAN:
757             iter->dbl[0] += v->f * weight;
758             iter->dbl[1] += weight;
759             break;
760           case SD:
761             moments1_add (iter->moments, v->f, weight);
762             break;
763           case MAX:
764             iter->dbl[0] = max (iter->dbl[0], v->f);
765             iter->int1 = 1;
766             break;
767           case MAX | FSTRING:
768             if (memcmp (iter->string, v->s, iter->src->width) < 0)
769               memcpy (iter->string, v->s, iter->src->width);
770             iter->int1 = 1;
771             break;
772           case MIN:
773             iter->dbl[0] = min (iter->dbl[0], v->f);
774             iter->int1 = 1;
775             break;
776           case MIN | FSTRING:
777             if (memcmp (iter->string, v->s, iter->src->width) > 0)
778               memcpy (iter->string, v->s, iter->src->width);
779             iter->int1 = 1;
780             break;
781           case FGT:
782           case PGT:
783             if (v->f > iter->arg[0].f)
784               iter->dbl[0] += weight;
785             iter->dbl[1] += weight;
786             break;
787           case FGT | FSTRING:
788           case PGT | FSTRING:
789             if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
790               iter->dbl[0] += weight;
791             iter->dbl[1] += weight;
792             break;
793           case FLT:
794           case PLT:
795             if (v->f < iter->arg[0].f)
796               iter->dbl[0] += weight;
797             iter->dbl[1] += weight;
798             break;
799           case FLT | FSTRING:
800           case PLT | FSTRING:
801             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
802               iter->dbl[0] += weight;
803             iter->dbl[1] += weight;
804             break;
805           case FIN:
806           case PIN:
807             if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
808               iter->dbl[0] += weight;
809             iter->dbl[1] += weight;
810             break;
811           case FIN | FSTRING:
812           case PIN | FSTRING:
813             if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
814                 && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
815               iter->dbl[0] += weight;
816             iter->dbl[1] += weight;
817             break;
818           case FOUT:
819           case POUT:
820             if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
821               iter->dbl[0] += weight;
822             iter->dbl[1] += weight;
823             break;
824           case FOUT | FSTRING:
825           case POUT | FSTRING:
826             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
827                 || memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
828               iter->dbl[0] += weight;
829             iter->dbl[1] += weight;
830             break;
831           case N:
832           case N | FSTRING:
833             iter->dbl[0] += weight;
834             break;
835           case NU:
836           case NU | FSTRING:
837             iter->int1++;
838             break;
839           case FIRST:
840             if (iter->int1 == 0)
841               {
842                 iter->dbl[0] = v->f;
843                 iter->int1 = 1;
844               }
845             break;
846           case FIRST | FSTRING:
847             if (iter->int1 == 0)
848               {
849                 memcpy (iter->string, v->s, iter->src->width);
850                 iter->int1 = 1;
851               }
852             break;
853           case LAST:
854             iter->dbl[0] = v->f;
855             iter->int1 = 1;
856             break;
857           case LAST | FSTRING:
858             memcpy (iter->string, v->s, iter->src->width);
859             iter->int1 = 1;
860             break;
861           case NMISS:
862           case NMISS | FSTRING:
863           case NUMISS:
864           case NUMISS | FSTRING:
865             /* Our value is not missing or it would have been
866                caught earlier.  Nothing to do. */
867             break;
868           default:
869             assert (0);
870           }
871     } else {
872       switch (iter->function)
873         {
874         case N_NO_VARS:
875           iter->dbl[0] += weight;
876           break;
877         case NU_NO_VARS:
878           iter->int1++;
879           break;
880         default:
881           assert (0);
882         }
883     }
884 }
885
886 /* We've come to a record that differs from the previous in one or
887    more of the break variables.  Make an output record from the
888    accumulated statistics in the OUTPUT case. */
889 static void 
890 dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
891 {
892   {
893     int value_idx = 0;
894     int i;
895
896     for (i = 0; i < agr->break_var_cnt; i++) 
897       {
898         struct variable *v = agr->break_vars[i];
899         memcpy (case_data_rw (output, value_idx),
900                 case_data (&agr->break_case, v->fv),
901                 sizeof (union value) * v->nv);
902         value_idx += v->nv; 
903       }
904   }
905   
906   {
907     struct agr_var *i;
908   
909     for (i = agr->agr_vars; i; i = i->next)
910       {
911         union value *v = case_data_rw (output, i->dest->fv);
912
913         if (agr->missing == COLUMNWISE && i->missing != 0
914             && (i->function & FUNC) != N && (i->function & FUNC) != NU
915             && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
916           {
917             if (i->dest->type == ALPHA)
918               memset (v->s, ' ', i->dest->width);
919             else
920               v->f = SYSMIS;
921             continue;
922           }
923         
924         switch (i->function)
925           {
926           case SUM:
927             v->f = i->int1 ? i->dbl[0] : SYSMIS;
928             break;
929           case MEAN:
930             v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
931             break;
932           case SD:
933             {
934               double variance;
935
936               /* FIXME: we should use two passes. */
937               moments1_calculate (i->moments, NULL, NULL, &variance,
938                                  NULL, NULL);
939               if (variance != SYSMIS)
940                 v->f = sqrt (variance);
941               else
942                 v->f = SYSMIS; 
943             }
944             break;
945           case MAX:
946           case MIN:
947             v->f = i->int1 ? i->dbl[0] : SYSMIS;
948             break;
949           case MAX | FSTRING:
950           case MIN | FSTRING:
951             if (i->int1)
952               memcpy (v->s, i->string, i->dest->width);
953             else
954               memset (v->s, ' ', i->dest->width);
955             break;
956           case FGT:
957           case FGT | FSTRING:
958           case FLT:
959           case FLT | FSTRING:
960           case FIN:
961           case FIN | FSTRING:
962           case FOUT:
963           case FOUT | FSTRING:
964             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
965             break;
966           case PGT:
967           case PGT | FSTRING:
968           case PLT:
969           case PLT | FSTRING:
970           case PIN:
971           case PIN | FSTRING:
972           case POUT:
973           case POUT | FSTRING:
974             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
975             break;
976           case N:
977           case N | FSTRING:
978             v->f = i->dbl[0];
979             break;
980           case NU:
981           case NU | FSTRING:
982             v->f = i->int1;
983             break;
984           case FIRST:
985           case LAST:
986             v->f = i->int1 ? i->dbl[0] : SYSMIS;
987             break;
988           case FIRST | FSTRING:
989           case LAST | FSTRING:
990             if (i->int1)
991               memcpy (v->s, i->string, i->dest->width);
992             else
993               memset (v->s, ' ', i->dest->width);
994             break;
995           case N_NO_VARS:
996             v->f = i->dbl[0];
997             break;
998           case NU_NO_VARS:
999             v->f = i->int1;
1000             break;
1001           case NMISS:
1002           case NMISS | FSTRING:
1003             v->f = i->dbl[0];
1004             break;
1005           case NUMISS:
1006           case NUMISS | FSTRING:
1007             v->f = i->int1;
1008             break;
1009           default:
1010             assert (0);
1011           }
1012       }
1013   }
1014 }
1015
1016 /* Resets the state for all the aggregate functions. */
1017 static void
1018 initialize_aggregate_info (struct agr_proc *agr, const struct ccase *input)
1019 {
1020   struct agr_var *iter;
1021
1022   case_destroy (&agr->break_case);
1023   case_clone (&agr->break_case, input);
1024
1025   for (iter = agr->agr_vars; iter; iter = iter->next)
1026     {
1027       iter->missing = 0;
1028       iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1029       iter->int1 = iter->int2 = 0;
1030       switch (iter->function)
1031         {
1032         case MIN:
1033           iter->dbl[0] = DBL_MAX;
1034           break;
1035         case MIN | FSTRING:
1036           memset (iter->string, 255, iter->src->width);
1037           break;
1038         case MAX:
1039           iter->dbl[0] = -DBL_MAX;
1040           break;
1041         case MAX | FSTRING:
1042           memset (iter->string, 0, iter->src->width);
1043           break;
1044         case SD:
1045           if (iter->moments == NULL)
1046             iter->moments = moments1_create (MOMENT_VARIANCE);
1047           else
1048             moments1_clear (iter->moments);
1049           break;
1050         default:
1051           break;
1052         }
1053     }
1054 }
1055 \f
1056 /* Aggregate each case as it comes through.  Cases which aren't needed
1057    are dropped. */
1058 static int
1059 agr_to_active_file (struct ccase *c, void *agr_)
1060 {
1061   struct agr_proc *agr = agr_;
1062
1063   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1064     agr->sink->class->write (agr->sink, &agr->agr_case);
1065
1066   return 1;
1067 }
1068
1069 /* Aggregate the current case and output it if we passed a
1070    breakpoint. */
1071 static int
1072 presorted_agr_to_sysfile (struct ccase *c, void *agr_) 
1073 {
1074   struct agr_proc *agr = agr_;
1075
1076   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1077     sfm_write_case (agr->writer, &agr->agr_case);
1078
1079   return 1;
1080 }