154256f5e59b96260b2c4ea713f427ee44e7c78a
[pspp-builds.git] / src / aggregate.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21 #include "error.h"
22 #include <stdlib.h>
23 #include "alloc.h"
24 #include "case.h"
25 #include "casefile.h"
26 #include "command.h"
27 #include "dictionary.h"
28 #include "error.h"
29 #include "file-handle.h"
30 #include "lexer.h"
31 #include "misc.h"
32 #include "moments.h"
33 #include "pool.h"
34 #include "settings.h"
35 #include "sfm-write.h"
36 #include "sort.h"
37 #include "str.h"
38 #include "var.h"
39 #include "vfm.h"
40 #include "vfmP.h"
41
42 /* Specifies how to make an aggregate variable. */
43 struct agr_var
44   {
45     struct agr_var *next;               /* Next in list. */
46
47     /* Collected during parsing. */
48     struct variable *src;       /* Source variable. */
49     struct variable *dest;      /* Target variable. */
50     int function;               /* Function. */
51     int include_missing;        /* 1=Include user-missing values. */
52     union value arg[2];         /* Arguments. */
53
54     /* Accumulated during AGGREGATE execution. */
55     double dbl[3];
56     int int1, int2;
57     char *string;
58     int missing;
59     struct moments1 *moments;
60   };
61
62 /* Aggregation functions. */
63 enum
64   {
65     NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
66     FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
67     N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
68     FUNC = 0x1f, /* Function mask. */
69     FSTRING = 1<<5, /* String function bit. */
70   };
71
72 /* Attributes of an aggregation function. */
73 struct agr_func
74   {
75     const char *name;           /* Aggregation function name. */
76     int n_args;                 /* Number of arguments. */
77     int alpha_type;             /* When given ALPHA arguments, output type. */
78     struct fmt_spec format;     /* Format spec if alpha_type != ALPHA. */
79   };
80
81 /* Attributes of aggregation functions. */
82 static const struct agr_func agr_func_tab[] = 
83   {
84     {"<NONE>",  0, -1,      {0, 0, 0}},
85     {"SUM",     0, -1,      {FMT_F, 8, 2}},
86     {"MEAN",    0, -1,      {FMT_F, 8, 2}},
87     {"SD",      0, -1,      {FMT_F, 8, 2}},
88     {"MAX",     0, ALPHA,   {-1, -1, -1}}, 
89     {"MIN",     0, ALPHA,   {-1, -1, -1}}, 
90     {"PGT",     1, NUMERIC, {FMT_F, 5, 1}},      
91     {"PLT",     1, NUMERIC, {FMT_F, 5, 1}},       
92     {"PIN",     2, NUMERIC, {FMT_F, 5, 1}},       
93     {"POUT",    2, NUMERIC, {FMT_F, 5, 1}},       
94     {"FGT",     1, NUMERIC, {FMT_F, 5, 3}},       
95     {"FLT",     1, NUMERIC, {FMT_F, 5, 3}},       
96     {"FIN",     2, NUMERIC, {FMT_F, 5, 3}},       
97     {"FOUT",    2, NUMERIC, {FMT_F, 5, 3}},       
98     {"N",       0, NUMERIC, {FMT_F, 7, 0}},       
99     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},       
100     {"NMISS",   0, NUMERIC, {FMT_F, 7, 0}},       
101     {"NUMISS",  0, NUMERIC, {FMT_F, 7, 0}},       
102     {"FIRST",   0, ALPHA,   {-1, -1, -1}}, 
103     {"LAST",    0, ALPHA,   {-1, -1, -1}},
104     {NULL,      0, -1,      {-1, -1, -1}},
105     {"N",       0, NUMERIC, {FMT_F, 7, 0}},
106     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},
107   };
108
109 /* Missing value types. */
110 enum missing_treatment
111   {
112     ITEMWISE,           /* Missing values item by item. */
113     COLUMNWISE          /* Missing values column by column. */
114   };
115
116 /* An entire AGGREGATE procedure. */
117 struct agr_proc 
118   {
119     /* We have either an output file or a sink. */
120     struct sfm_writer *writer;          /* Output file, or null if none. */
121     struct case_sink *sink;             /* Sink, or null if none. */
122
123     /* Break variables. */
124     struct sort_criteria *sort;         /* Sort criteria. */
125     struct variable **break_vars;       /* Break variables. */
126     size_t break_var_cnt;               /* Number of break variables. */
127     struct ccase break_case;            /* Last values of break variables. */
128
129     enum missing_treatment missing;     /* How to treat missing values. */
130     struct agr_var *agr_vars;           /* First aggregate variable. */
131     struct dictionary *dict;            /* Aggregate dictionary. */
132     int case_cnt;                       /* Counts aggregated cases. */
133     struct ccase agr_case;              /* Aggregate case for output. */
134   };
135
136 static void initialize_aggregate_info (struct agr_proc *,
137                                        const struct ccase *);
138
139 /* Prototypes. */
140 static int parse_aggregate_functions (struct agr_proc *);
141 static void agr_destroy (struct agr_proc *);
142 static int aggregate_single_case (struct agr_proc *agr,
143                                   const struct ccase *input,
144                                   struct ccase *output);
145 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
146
147 /* Aggregating to the active file. */
148 static int agr_to_active_file (struct ccase *, void *aux);
149
150 /* Aggregating to a system file. */
151 static int presorted_agr_to_sysfile (struct ccase *, void *aux);
152 \f
153 /* Parsing. */
154
155 /* Parses and executes the AGGREGATE procedure. */
156 int
157 cmd_aggregate (void)
158 {
159   struct agr_proc agr;
160   struct file_handle *out_file = NULL;
161
162   bool copy_documents = false;
163   bool presorted = false;
164   bool saw_direction;
165
166   memset(&agr, 0 , sizeof (agr));
167   agr.missing = ITEMWISE;
168   case_nullify (&agr.break_case);
169   
170   agr.dict = dict_create ();
171   dict_set_label (agr.dict, dict_get_label (default_dict));
172   dict_set_documents (agr.dict, dict_get_documents (default_dict));
173
174   /* OUTFILE subcommand must be first. */
175   if (!lex_force_match_id ("OUTFILE"))
176     goto error;
177   lex_match ('=');
178   if (!lex_match ('*'))
179     {
180       out_file = fh_parse ();
181       if (out_file == NULL)
182         goto error;
183     }
184   
185   /* Read most of the subcommands. */
186   for (;;)
187     {
188       lex_match ('/');
189       
190       if (lex_match_id ("MISSING"))
191         {
192           lex_match ('=');
193           if (!lex_match_id ("COLUMNWISE"))
194             {
195               lex_error (_("while expecting COLUMNWISE"));
196               goto error;
197             }
198           agr.missing = COLUMNWISE;
199         }
200       else if (lex_match_id ("DOCUMENT"))
201         copy_documents = true;
202       else if (lex_match_id ("PRESORTED"))
203         presorted = true;
204       else if (lex_match_id ("BREAK"))
205         {
206           int i;
207
208           lex_match ('=');
209           agr.sort = sort_parse_criteria (default_dict,
210                                           &agr.break_vars, &agr.break_var_cnt,
211                                           &saw_direction);
212           if (agr.sort == NULL)
213             goto error;
214           
215           for (i = 0; i < agr.break_var_cnt; i++)
216             dict_clone_var_assert (agr.dict, agr.break_vars[i],
217                                    agr.break_vars[i]->name);
218
219           /* BREAK must follow the options. */
220           break;
221         }
222       else
223         {
224           lex_error (_("expecting BREAK"));
225           goto error;
226         }
227     }
228   if (presorted && saw_direction)
229     msg (SW, _("When PRESORTED is specified, specifying sorting directions "
230                "with (A) or (D) has no effect.  Output data will be sorted "
231                "the same way as the input data."));
232       
233   /* Read in the aggregate functions. */
234   lex_match ('/');
235   if (!parse_aggregate_functions (&agr))
236     goto error;
237
238   /* Delete documents. */
239   if (!copy_documents)
240     dict_set_documents (agr.dict, NULL);
241
242   /* Cancel SPLIT FILE. */
243   dict_set_split_vars (agr.dict, NULL, 0);
244   
245   /* Initialize. */
246   agr.case_cnt = 0;
247   case_create (&agr.agr_case, dict_get_next_value_idx (agr.dict));
248
249   /* Output to active file or external file? */
250   if (out_file == NULL) 
251     {
252       /* The active file will be replaced by the aggregated data,
253          so TEMPORARY is moot. */
254       cancel_temporary ();
255
256       if (agr.sort != NULL && !presorted)
257         sort_active_file_in_place (agr.sort);
258
259       agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
260       if (agr.sink->class->open != NULL)
261         agr.sink->class->open (agr.sink);
262       vfm_sink = create_case_sink (&null_sink_class, default_dict, NULL);
263       procedure (agr_to_active_file, &agr);
264       if (agr.case_cnt > 0) 
265         {
266           dump_aggregate_info (&agr, &agr.agr_case);
267           agr.sink->class->write (agr.sink, &agr.agr_case);
268         }
269       dict_destroy (default_dict);
270       default_dict = agr.dict;
271       agr.dict = NULL;
272       vfm_source = agr.sink->class->make_source (agr.sink);
273       free_case_sink (agr.sink);
274     }
275   else
276     {
277       agr.writer = sfm_open_writer (out_file, agr.dict, get_scompression (), 0);
278       if (agr.writer == NULL)
279         goto error;
280       
281       if (agr.sort != NULL && !presorted) 
282         {
283           /* Sorting is needed. */
284           struct casefile *dst;
285           struct casereader *reader;
286           struct ccase c;
287           
288           dst = sort_active_file_to_casefile (agr.sort);
289           if (dst == NULL)
290             goto error;
291           reader = casefile_get_destructive_reader (dst);
292           while (casereader_read_xfer (reader, &c)) 
293             {
294               if (aggregate_single_case (&agr, &c, &agr.agr_case)) 
295                 sfm_write_case (agr.writer, &agr.agr_case);
296               case_destroy (&c);
297             }
298           casereader_destroy (reader);
299           casefile_destroy (dst);
300         }
301       else 
302         {
303           /* Active file is already sorted. */
304           procedure (presorted_agr_to_sysfile, &agr);
305         }
306       
307       if (agr.case_cnt > 0) 
308         {
309           dump_aggregate_info (&agr, &agr.agr_case);
310           sfm_write_case (agr.writer, &agr.agr_case);
311         }
312     }
313   
314   agr_destroy (&agr);
315   return CMD_SUCCESS;
316
317 error:
318   agr_destroy (&agr);
319   return CMD_FAILURE;
320 }
321
322 /* Parse all the aggregate functions. */
323 static int
324 parse_aggregate_functions (struct agr_proc *agr)
325 {
326   struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
327
328   /* Parse everything. */
329   tail = NULL;
330   for (;;)
331     {
332       char **dest;
333       char **dest_label;
334       int n_dest;
335
336       int include_missing;
337       const struct agr_func *function;
338       int func_index;
339
340       union value arg[2];
341
342       struct variable **src;
343       int n_src;
344
345       int i;
346
347       dest = NULL;
348       dest_label = NULL;
349       n_dest = 0;
350       src = NULL;
351       function = NULL;
352       n_src = 0;
353       arg[0].c = NULL;
354       arg[1].c = NULL;
355
356       /* Parse the list of target variables. */
357       while (!lex_match ('='))
358         {
359           int n_dest_prev = n_dest;
360           
361           if (!parse_DATA_LIST_vars (&dest, &n_dest,
362                                      PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
363             goto error;
364
365           /* Assign empty labels. */
366           {
367             int j;
368
369             dest_label = xrealloc (dest_label, sizeof *dest_label * n_dest);
370             for (j = n_dest_prev; j < n_dest; j++)
371               dest_label[j] = NULL;
372           }
373           
374           if (token == T_STRING)
375             {
376               ds_truncate (&tokstr, 255);
377               dest_label[n_dest - 1] = xstrdup (ds_c_str (&tokstr));
378               lex_get ();
379             }
380         }
381
382       /* Get the name of the aggregation function. */
383       if (token != T_ID)
384         {
385           lex_error (_("expecting aggregation function"));
386           goto error;
387         }
388
389       include_missing = 0;
390       if (tokid[strlen (tokid) - 1] == '.')
391         {
392           include_missing = 1;
393           tokid[strlen (tokid) - 1] = 0;
394         }
395       
396       for (function = agr_func_tab; function->name; function++)
397         if (!strcasecmp (function->name, tokid))
398           break;
399       if (NULL == function->name)
400         {
401           msg (SE, _("Unknown aggregation function %s."), tokid);
402           goto error;
403         }
404       func_index = function - agr_func_tab;
405       lex_get ();
406
407       /* Check for leading lparen. */
408       if (!lex_match ('('))
409         {
410           if (func_index == N)
411             func_index = N_NO_VARS;
412           else if (func_index == NU)
413             func_index = NU_NO_VARS;
414           else
415             {
416               lex_error (_("expecting `('"));
417               goto error;
418             }
419         }
420       else
421         {
422           /* Parse list of source variables. */
423           {
424             int pv_opts = PV_NO_SCRATCH;
425
426             if (func_index == SUM || func_index == MEAN || func_index == SD)
427               pv_opts |= PV_NUMERIC;
428             else if (function->n_args)
429               pv_opts |= PV_SAME_TYPE;
430
431             if (!parse_variables (default_dict, &src, &n_src, pv_opts))
432               goto error;
433           }
434
435           /* Parse function arguments, for those functions that
436              require arguments. */
437           if (function->n_args != 0)
438             for (i = 0; i < function->n_args; i++)
439               {
440                 int type;
441             
442                 lex_match (',');
443                 if (token == T_STRING)
444                   {
445                     arg[i].c = xstrdup (ds_c_str (&tokstr));
446                     type = ALPHA;
447                   }
448                 else if (lex_is_number ())
449                   {
450                     arg[i].f = tokval;
451                     type = NUMERIC;
452                   } else {
453                     msg (SE, _("Missing argument %d to %s."), i + 1,
454                          function->name);
455                     goto error;
456                   }
457             
458                 lex_get ();
459
460                 if (type != src[0]->type)
461                   {
462                     msg (SE, _("Arguments to %s must be of same type as "
463                                "source variables."),
464                          function->name);
465                     goto error;
466                   }
467               }
468
469           /* Trailing rparen. */
470           if (!lex_match(')'))
471             {
472               lex_error (_("expecting `)'"));
473               goto error;
474             }
475           
476           /* Now check that the number of source variables match
477              the number of target variables.  If we check earlier
478              than this, the user can get very misleading error
479              message, i.e. `AGGREGATE x=SUM(y t).' will get this
480              error message when a proper message would be more
481              like `unknown variable t'. */
482           if (n_src != n_dest)
483             {
484               msg (SE, _("Number of source variables (%d) does not match "
485                          "number of target variables (%d)."),
486                    n_src, n_dest);
487               goto error;
488             }
489
490           if ((func_index == PIN || func_index == POUT
491               || func_index == FIN || func_index == FOUT) 
492               && ((src[0]->type == NUMERIC && arg[0].f > arg[1].f)
493                   || (src[0]->type == ALPHA
494                       && st_compare_pad (arg[0].c, strlen (arg[0].c),
495                                          arg[1].c, strlen (arg[1].c)) > 0)))
496             {
497               union value t = arg[0];
498               arg[0] = arg[1];
499               arg[1] = t;
500                   
501               msg (SW, _("The value arguments passed to the %s function "
502                          "are out-of-order.  They will be treated as if "
503                          "they had been specified in the correct order."),
504                    function->name);
505             }
506         }
507         
508       /* Finally add these to the linked list of aggregation
509          variables. */
510       for (i = 0; i < n_dest; i++)
511         {
512           struct agr_var *v = xmalloc (sizeof *v);
513
514           /* Add variable to chain. */
515           if (agr->agr_vars != NULL)
516             tail->next = v;
517           else
518             agr->agr_vars = v;
519           tail = v;
520           tail->next = NULL;
521           v->moments = NULL;
522           
523           /* Create the target variable in the aggregate
524              dictionary. */
525           {
526             static const struct fmt_spec f8_2 = {FMT_F, 8, 2};
527             struct variable *destvar;
528             
529             v->function = func_index;
530
531             if (src)
532               {
533                 v->src = src[i];
534                 
535                 if (src[i]->type == ALPHA)
536                   {
537                     v->function |= FSTRING;
538                     v->string = xmalloc (src[i]->width);
539                   }
540
541                 if (function->alpha_type == ALPHA)
542                   destvar = dict_clone_var (agr->dict, v->src, dest[i]);
543                 else
544                   {
545                     assert (v->src->type == NUMERIC
546                             || function->alpha_type == NUMERIC);
547                     destvar = dict_create_var (agr->dict, dest[i], 0);
548                     if (destvar != NULL) 
549                       {
550                         if ((func_index == N || func_index == NMISS)
551                             && dict_get_weight (default_dict) != NULL)
552                           destvar->print = destvar->write = f8_2; 
553                         else
554                           destvar->print = destvar->write = function->format;
555                       }
556                   }
557               } else {
558                 v->src = NULL;
559                 destvar = dict_create_var (agr->dict, dest[i], 0);
560                 if (func_index == N_NO_VARS
561                     && dict_get_weight (default_dict) != NULL)
562                   destvar->print = destvar->write = f8_2; 
563                 else
564                   destvar->print = destvar->write = function->format;
565               }
566           
567             if (!destvar)
568               {
569                 msg (SE, _("Variable name %s is not unique within the "
570                            "aggregate file dictionary, which contains "
571                            "the aggregate variables and the break "
572                            "variables."),
573                      dest[i]);
574                 goto error;
575               }
576
577             free (dest[i]);
578             destvar->init = 0;
579             if (dest_label[i])
580               {
581                 destvar->label = dest_label[i];
582                 dest_label[i] = NULL;
583               }
584
585             v->dest = destvar;
586           }
587           
588           v->include_missing = include_missing;
589
590           if (v->src != NULL)
591             {
592               int j;
593
594               if (v->src->type == NUMERIC)
595                 for (j = 0; j < function->n_args; j++)
596                   v->arg[j].f = arg[j].f;
597               else
598                 for (j = 0; j < function->n_args; j++)
599                   v->arg[j].c = xstrdup (arg[j].c);
600             }
601         }
602       
603       if (src != NULL && src[0]->type == ALPHA)
604         for (i = 0; i < function->n_args; i++)
605           {
606             free (arg[i].c);
607             arg[i].c = NULL;
608           }
609
610       free (src);
611       free (dest);
612       free (dest_label);
613
614       if (!lex_match ('/'))
615         {
616           if (token == '.')
617             return 1;
618
619           lex_error ("expecting end of command");
620           return 0;
621         }
622       continue;
623       
624     error:
625       for (i = 0; i < n_dest; i++)
626         {
627           free (dest[i]);
628           free (dest_label[i]);
629         }
630       free (dest);
631       free (dest_label);
632       free (arg[0].c);
633       free (arg[1].c);
634       if (src && n_src && src[0]->type == ALPHA)
635         for (i = 0; i < function->n_args; i++)
636           {
637             free (arg[i].c);
638             arg[i].c = NULL;
639           }
640       free (src);
641         
642       return 0;
643     }
644 }
645
646 /* Destroys AGR. */
647 static void
648 agr_destroy (struct agr_proc *agr)
649 {
650   struct agr_var *iter, *next;
651
652   sfm_close_writer (agr->writer);
653   if (agr->sort != NULL)
654     sort_destroy_criteria (agr->sort);
655   free (agr->break_vars);
656   case_destroy (&agr->break_case);
657   for (iter = agr->agr_vars; iter; iter = next)
658     {
659       next = iter->next;
660
661       if (iter->function & FSTRING)
662         {
663           int n_args;
664           int i;
665
666           n_args = agr_func_tab[iter->function & FUNC].n_args;
667           for (i = 0; i < n_args; i++)
668             free (iter->arg[i].c);
669           free (iter->string);
670         }
671       else if (iter->function == SD)
672         moments1_destroy (iter->moments);
673       free (iter);
674     }
675   if (agr->dict != NULL)
676     dict_destroy (agr->dict);
677
678   case_destroy (&agr->agr_case);
679 }
680 \f
681 /* Execution. */
682
683 static void accumulate_aggregate_info (struct agr_proc *,
684                                        const struct ccase *);
685 static void dump_aggregate_info (struct agr_proc *, struct ccase *);
686
687 /* Processes a single case INPUT for aggregation.  If output is
688    warranted, writes it to OUTPUT and returns nonzero.
689    Otherwise, returns zero and OUTPUT is unmodified. */
690 static int
691 aggregate_single_case (struct agr_proc *agr,
692                        const struct ccase *input, struct ccase *output)
693 {
694   bool finished_group = false;
695   
696   if (agr->case_cnt++ == 0)
697     initialize_aggregate_info (agr, input);
698   else if (case_compare (&agr->break_case, input,
699                          agr->break_vars, agr->break_var_cnt))
700     {
701       dump_aggregate_info (agr, output);
702       finished_group = true;
703
704       initialize_aggregate_info (agr, input);
705     }
706
707   accumulate_aggregate_info (agr, input);
708   return finished_group;
709 }
710
711 /* Accumulates aggregation data from the case INPUT. */
712 static void 
713 accumulate_aggregate_info (struct agr_proc *agr,
714                            const struct ccase *input)
715 {
716   struct agr_var *iter;
717   double weight;
718   int bad_warn = 1;
719
720   weight = dict_get_case_weight (default_dict, input, &bad_warn);
721
722   for (iter = agr->agr_vars; iter; iter = iter->next)
723     if (iter->src)
724       {
725         const union value *v = case_data (input, iter->src->fv);
726
727         if ((!iter->include_missing && is_missing (v, iter->src))
728             || (iter->include_missing && iter->src->type == NUMERIC
729                 && v->f == SYSMIS))
730           {
731             switch (iter->function)
732               {
733               case NMISS:
734               case NMISS | FSTRING:
735                 iter->dbl[0] += weight;
736                 break;
737               case NUMISS:
738               case NUMISS | FSTRING:
739                 iter->int1++;
740                 break;
741               }
742             iter->missing = 1;
743             continue;
744           }
745         
746         /* This is horrible.  There are too many possibilities. */
747         switch (iter->function)
748           {
749           case SUM:
750             iter->dbl[0] += v->f * weight;
751             iter->int1 = 1;
752             break;
753           case MEAN:
754             iter->dbl[0] += v->f * weight;
755             iter->dbl[1] += weight;
756             break;
757           case SD:
758             moments1_add (iter->moments, v->f, weight);
759             break;
760           case MAX:
761             iter->dbl[0] = max (iter->dbl[0], v->f);
762             iter->int1 = 1;
763             break;
764           case MAX | FSTRING:
765             if (memcmp (iter->string, v->s, iter->src->width) < 0)
766               memcpy (iter->string, v->s, iter->src->width);
767             iter->int1 = 1;
768             break;
769           case MIN:
770             iter->dbl[0] = min (iter->dbl[0], v->f);
771             iter->int1 = 1;
772             break;
773           case MIN | FSTRING:
774             if (memcmp (iter->string, v->s, iter->src->width) > 0)
775               memcpy (iter->string, v->s, iter->src->width);
776             iter->int1 = 1;
777             break;
778           case FGT:
779           case PGT:
780             if (v->f > iter->arg[0].f)
781               iter->dbl[0] += weight;
782             iter->dbl[1] += weight;
783             break;
784           case FGT | FSTRING:
785           case PGT | FSTRING:
786             if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
787               iter->dbl[0] += weight;
788             iter->dbl[1] += weight;
789             break;
790           case FLT:
791           case PLT:
792             if (v->f < iter->arg[0].f)
793               iter->dbl[0] += weight;
794             iter->dbl[1] += weight;
795             break;
796           case FLT | FSTRING:
797           case PLT | FSTRING:
798             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
799               iter->dbl[0] += weight;
800             iter->dbl[1] += weight;
801             break;
802           case FIN:
803           case PIN:
804             if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
805               iter->dbl[0] += weight;
806             iter->dbl[1] += weight;
807             break;
808           case FIN | FSTRING:
809           case PIN | FSTRING:
810             if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
811                 && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
812               iter->dbl[0] += weight;
813             iter->dbl[1] += weight;
814             break;
815           case FOUT:
816           case POUT:
817             if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
818               iter->dbl[0] += weight;
819             iter->dbl[1] += weight;
820             break;
821           case FOUT | FSTRING:
822           case POUT | FSTRING:
823             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
824                 || memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
825               iter->dbl[0] += weight;
826             iter->dbl[1] += weight;
827             break;
828           case N:
829           case N | FSTRING:
830             iter->dbl[0] += weight;
831             break;
832           case NU:
833           case NU | FSTRING:
834             iter->int1++;
835             break;
836           case FIRST:
837             if (iter->int1 == 0)
838               {
839                 iter->dbl[0] = v->f;
840                 iter->int1 = 1;
841               }
842             break;
843           case FIRST | FSTRING:
844             if (iter->int1 == 0)
845               {
846                 memcpy (iter->string, v->s, iter->src->width);
847                 iter->int1 = 1;
848               }
849             break;
850           case LAST:
851             iter->dbl[0] = v->f;
852             iter->int1 = 1;
853             break;
854           case LAST | FSTRING:
855             memcpy (iter->string, v->s, iter->src->width);
856             iter->int1 = 1;
857             break;
858           case NMISS:
859           case NMISS | FSTRING:
860           case NUMISS:
861           case NUMISS | FSTRING:
862             /* Our value is not missing or it would have been
863                caught earlier.  Nothing to do. */
864             break;
865           default:
866             assert (0);
867           }
868     } else {
869       switch (iter->function)
870         {
871         case N_NO_VARS:
872           iter->dbl[0] += weight;
873           break;
874         case NU_NO_VARS:
875           iter->int1++;
876           break;
877         default:
878           assert (0);
879         }
880     }
881 }
882
883 /* We've come to a record that differs from the previous in one or
884    more of the break variables.  Make an output record from the
885    accumulated statistics in the OUTPUT case. */
886 static void 
887 dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
888 {
889   {
890     int value_idx = 0;
891     int i;
892
893     for (i = 0; i < agr->break_var_cnt; i++) 
894       {
895         struct variable *v = agr->break_vars[i];
896         memcpy (case_data_rw (output, value_idx),
897                 case_data (&agr->break_case, v->fv),
898                 sizeof (union value) * v->nv);
899         value_idx += v->nv; 
900       }
901   }
902   
903   {
904     struct agr_var *i;
905   
906     for (i = agr->agr_vars; i; i = i->next)
907       {
908         union value *v = case_data_rw (output, i->dest->fv);
909
910         if (agr->missing == COLUMNWISE && i->missing != 0
911             && (i->function & FUNC) != N && (i->function & FUNC) != NU
912             && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
913           {
914             if (i->dest->type == ALPHA)
915               memset (v->s, ' ', i->dest->width);
916             else
917               v->f = SYSMIS;
918             continue;
919           }
920         
921         switch (i->function)
922           {
923           case SUM:
924             v->f = i->int1 ? i->dbl[0] : SYSMIS;
925             break;
926           case MEAN:
927             v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
928             break;
929           case SD:
930             {
931               double variance;
932
933               /* FIXME: we should use two passes. */
934               moments1_calculate (i->moments, NULL, NULL, &variance,
935                                  NULL, NULL);
936               if (variance != SYSMIS)
937                 v->f = sqrt (variance);
938               else
939                 v->f = SYSMIS; 
940             }
941             break;
942           case MAX:
943           case MIN:
944             v->f = i->int1 ? i->dbl[0] : SYSMIS;
945             break;
946           case MAX | FSTRING:
947           case MIN | FSTRING:
948             if (i->int1)
949               memcpy (v->s, i->string, i->dest->width);
950             else
951               memset (v->s, ' ', i->dest->width);
952             break;
953           case FGT:
954           case FGT | FSTRING:
955           case FLT:
956           case FLT | FSTRING:
957           case FIN:
958           case FIN | FSTRING:
959           case FOUT:
960           case FOUT | FSTRING:
961             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
962             break;
963           case PGT:
964           case PGT | FSTRING:
965           case PLT:
966           case PLT | FSTRING:
967           case PIN:
968           case PIN | FSTRING:
969           case POUT:
970           case POUT | FSTRING:
971             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
972             break;
973           case N:
974           case N | FSTRING:
975             v->f = i->dbl[0];
976             break;
977           case NU:
978           case NU | FSTRING:
979             v->f = i->int1;
980             break;
981           case FIRST:
982           case LAST:
983             v->f = i->int1 ? i->dbl[0] : SYSMIS;
984             break;
985           case FIRST | FSTRING:
986           case LAST | FSTRING:
987             if (i->int1)
988               memcpy (v->s, i->string, i->dest->width);
989             else
990               memset (v->s, ' ', i->dest->width);
991             break;
992           case N_NO_VARS:
993             v->f = i->dbl[0];
994             break;
995           case NU_NO_VARS:
996             v->f = i->int1;
997             break;
998           case NMISS:
999           case NMISS | FSTRING:
1000             v->f = i->dbl[0];
1001             break;
1002           case NUMISS:
1003           case NUMISS | FSTRING:
1004             v->f = i->int1;
1005             break;
1006           default:
1007             assert (0);
1008           }
1009       }
1010   }
1011 }
1012
1013 /* Resets the state for all the aggregate functions. */
1014 static void
1015 initialize_aggregate_info (struct agr_proc *agr, const struct ccase *input)
1016 {
1017   struct agr_var *iter;
1018
1019   case_destroy (&agr->break_case);
1020   case_clone (&agr->break_case, input);
1021
1022   for (iter = agr->agr_vars; iter; iter = iter->next)
1023     {
1024       iter->missing = 0;
1025       iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1026       iter->int1 = iter->int2 = 0;
1027       switch (iter->function)
1028         {
1029         case MIN:
1030           iter->dbl[0] = DBL_MAX;
1031           break;
1032         case MIN | FSTRING:
1033           memset (iter->string, 255, iter->src->width);
1034           break;
1035         case MAX:
1036           iter->dbl[0] = -DBL_MAX;
1037           break;
1038         case MAX | FSTRING:
1039           memset (iter->string, 0, iter->src->width);
1040           break;
1041         case SD:
1042           if (iter->moments == NULL)
1043             iter->moments = moments1_create (MOMENT_VARIANCE);
1044           else
1045             moments1_clear (iter->moments);
1046           break;
1047         default:
1048           break;
1049         }
1050     }
1051 }
1052 \f
1053 /* Aggregate each case as it comes through.  Cases which aren't needed
1054    are dropped. */
1055 static int
1056 agr_to_active_file (struct ccase *c, void *agr_)
1057 {
1058   struct agr_proc *agr = agr_;
1059
1060   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1061     agr->sink->class->write (agr->sink, &agr->agr_case);
1062
1063   return 1;
1064 }
1065
1066 /* Aggregate the current case and output it if we passed a
1067    breakpoint. */
1068 static int
1069 presorted_agr_to_sysfile (struct ccase *c, void *agr_) 
1070 {
1071   struct agr_proc *agr = agr_;
1072
1073   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1074     sfm_write_case (agr->writer, &agr->agr_case);
1075
1076   return 1;
1077 }