fa89cc4934c45a8dbe28254ae123c61634921637
[pspp-builds.git] / src / aggregate.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include "error.h"
22 #include <stdlib.h>
23 #include "alloc.h"
24 #include "case.h"
25 #include "casefile.h"
26 #include "command.h"
27 #include "dictionary.h"
28 #include "error.h"
29 #include "file-handle.h"
30 #include "lexer.h"
31 #include "misc.h"
32 #include "moments.h"
33 #include "pool.h"
34 #include "settings.h"
35 #include "sfm-write.h"
36 #include "sort.h"
37 #include "str.h"
38 #include "var.h"
39 #include "vfm.h"
40 #include "vfmP.h"
41
42 /* Specifies how to make an aggregate variable. */
43 struct agr_var
44   {
45     struct agr_var *next;               /* Next in list. */
46
47     /* Collected during parsing. */
48     struct variable *src;       /* Source variable. */
49     struct variable *dest;      /* Target variable. */
50     int function;               /* Function. */
51     int include_missing;        /* 1=Include user-missing values. */
52     union value arg[2];         /* Arguments. */
53
54     /* Accumulated during AGGREGATE execution. */
55     double dbl[3];
56     int int1, int2;
57     char *string;
58     int missing;
59     struct moments1 *moments;
60   };
61
62 /* Aggregation functions. */
63 enum
64   {
65     NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
66     FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
67     N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
68     FUNC = 0x1f, /* Function mask. */
69     FSTRING = 1<<5, /* String function bit. */
70   };
71
72 /* Attributes of an aggregation function. */
73 struct agr_func
74   {
75     const char *name;           /* Aggregation function name. */
76     int n_args;                 /* Number of arguments. */
77     int alpha_type;             /* When given ALPHA arguments, output type. */
78     struct fmt_spec format;     /* Format spec if alpha_type != ALPHA. */
79   };
80
81 /* Attributes of aggregation functions. */
82 static const struct agr_func agr_func_tab[] = 
83   {
84     {"<NONE>",  0, -1,      {0, 0, 0}},
85     {"SUM",     0, -1,      {FMT_F, 8, 2}},
86     {"MEAN",    0, -1,      {FMT_F, 8, 2}},
87     {"SD",      0, -1,      {FMT_F, 8, 2}},
88     {"MAX",     0, ALPHA,   {-1, -1, -1}}, 
89     {"MIN",     0, ALPHA,   {-1, -1, -1}}, 
90     {"PGT",     1, NUMERIC, {FMT_F, 5, 1}},      
91     {"PLT",     1, NUMERIC, {FMT_F, 5, 1}},       
92     {"PIN",     2, NUMERIC, {FMT_F, 5, 1}},       
93     {"POUT",    2, NUMERIC, {FMT_F, 5, 1}},       
94     {"FGT",     1, NUMERIC, {FMT_F, 5, 3}},       
95     {"FLT",     1, NUMERIC, {FMT_F, 5, 3}},       
96     {"FIN",     2, NUMERIC, {FMT_F, 5, 3}},       
97     {"FOUT",    2, NUMERIC, {FMT_F, 5, 3}},       
98     {"N",       0, NUMERIC, {FMT_F, 7, 0}},       
99     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},       
100     {"NMISS",   0, NUMERIC, {FMT_F, 7, 0}},       
101     {"NUMISS",  0, NUMERIC, {FMT_F, 7, 0}},       
102     {"FIRST",   0, ALPHA,   {-1, -1, -1}}, 
103     {"LAST",    0, ALPHA,   {-1, -1, -1}},
104     {NULL,      0, -1,      {-1, -1, -1}},
105     {"N",       0, NUMERIC, {FMT_F, 7, 0}},
106     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},
107   };
108
109 /* Missing value types. */
110 enum missing_treatment
111   {
112     ITEMWISE,           /* Missing values item by item. */
113     COLUMNWISE          /* Missing values column by column. */
114   };
115
116 /* An entire AGGREGATE procedure. */
117 struct agr_proc 
118   {
119     /* We have either an output file or a sink. */
120     struct sfm_writer *writer;          /* Output file, or null if none. */
121     struct case_sink *sink;             /* Sink, or null if none. */
122
123     /* Break variables. */
124     struct sort_criteria *sort;         /* Sort criteria. */
125     struct variable **break_vars;       /* Break variables. */
126     size_t break_var_cnt;               /* Number of break variables. */
127     union value *prev_break;            /* Last values of break variables. */
128
129     enum missing_treatment missing;     /* How to treat missing values. */
130     struct agr_var *agr_vars;           /* First aggregate variable. */
131     struct dictionary *dict;            /* Aggregate dictionary. */
132     int case_cnt;                       /* Counts aggregated cases. */
133     struct ccase agr_case;              /* Aggregate case for output. */
134   };
135
136 static void initialize_aggregate_info (struct agr_proc *);
137
138 /* Prototypes. */
139 static int parse_aggregate_functions (struct agr_proc *);
140 static void agr_destroy (struct agr_proc *);
141 static int aggregate_single_case (struct agr_proc *agr,
142                                   const struct ccase *input,
143                                   struct ccase *output);
144 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
145
146 /* Aggregating to the active file. */
147 static int agr_to_active_file (struct ccase *, void *aux);
148
149 /* Aggregating to a system file. */
150 static int presorted_agr_to_sysfile (struct ccase *, void *aux);
151 \f
152 /* Parsing. */
153
154 /* Parses and executes the AGGREGATE procedure. */
155 int
156 cmd_aggregate (void)
157 {
158   struct agr_proc agr;
159   struct file_handle *out_file = NULL;
160
161   bool copy_documents = false;
162   bool presorted = false;
163   bool saw_direction;
164
165   memset(&agr, 0 , sizeof (agr));
166   agr.missing = ITEMWISE;
167   
168   agr.dict = dict_create ();
169   dict_set_label (agr.dict, dict_get_label (default_dict));
170   dict_set_documents (agr.dict, dict_get_documents (default_dict));
171
172   /* OUTFILE subcommand must be first. */
173   if (!lex_force_match_id ("OUTFILE"))
174     goto error;
175   lex_match ('=');
176   if (!lex_match ('*'))
177     {
178       out_file = fh_parse ();
179       if (out_file == NULL)
180         goto error;
181     }
182   
183   /* Read most of the subcommands. */
184   for (;;)
185     {
186       lex_match ('/');
187       
188       if (lex_match_id ("MISSING"))
189         {
190           lex_match ('=');
191           if (!lex_match_id ("COLUMNWISE"))
192             {
193               lex_error (_("while expecting COLUMNWISE"));
194               goto error;
195             }
196           agr.missing = COLUMNWISE;
197         }
198       else if (lex_match_id ("DOCUMENT"))
199         copy_documents = true;
200       else if (lex_match_id ("PRESORTED"))
201         presorted = true;
202       else if (lex_match_id ("BREAK"))
203         {
204           int i;
205
206           lex_match ('=');
207           agr.sort = sort_parse_criteria (default_dict,
208                                           &agr.break_vars, &agr.break_var_cnt,
209                                           &saw_direction);
210           if (agr.sort == NULL)
211             goto error;
212           
213           for (i = 0; i < agr.break_var_cnt; i++)
214             {
215               struct variable *v = dict_clone_var (agr.dict, agr.break_vars[i],
216                                                    agr.break_vars[i]->name);
217               assert (v != NULL);
218             }
219
220           /* BREAK must follow the options. */
221           break;
222         }
223       else
224         {
225           lex_error (_("expecting BREAK"));
226           goto error;
227         }
228     }
229   if (presorted && saw_direction)
230     msg (SW, _("When PRESORTED is specified, specifying sorting directions "
231                "with (A) or (D) has no effect.  Output data will be sorted "
232                "the same way as the input data."));
233       
234   /* Read in the aggregate functions. */
235   lex_match ('/');
236   if (!parse_aggregate_functions (&agr))
237     goto error;
238
239   /* Delete documents. */
240   if (!copy_documents)
241     dict_set_documents (agr.dict, NULL);
242
243   /* Cancel SPLIT FILE. */
244   dict_set_split_vars (agr.dict, NULL, 0);
245   
246   /* Initialize. */
247   agr.case_cnt = 0;
248   case_create (&agr.agr_case, dict_get_next_value_idx (agr.dict));
249   initialize_aggregate_info (&agr);
250
251   /* Output to active file or external file? */
252   if (out_file == NULL) 
253     {
254       /* The active file will be replaced by the aggregated data,
255          so TEMPORARY is moot. */
256       cancel_temporary ();
257
258       if (agr.sort != NULL && !presorted)
259         sort_active_file_in_place (agr.sort);
260
261       agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
262       if (agr.sink->class->open != NULL)
263         agr.sink->class->open (agr.sink);
264       vfm_sink = create_case_sink (&null_sink_class, default_dict, NULL);
265       procedure (agr_to_active_file, &agr);
266       if (agr.case_cnt > 0) 
267         {
268           dump_aggregate_info (&agr, &agr.agr_case);
269           agr.sink->class->write (agr.sink, &agr.agr_case);
270         }
271       dict_destroy (default_dict);
272       default_dict = agr.dict;
273       agr.dict = NULL;
274       vfm_source = agr.sink->class->make_source (agr.sink);
275       free_case_sink (agr.sink);
276     }
277   else
278     {
279       agr.writer = sfm_open_writer (out_file, agr.dict, get_scompression ());
280       if (agr.writer == NULL)
281         goto error;
282       
283       if (agr.sort != NULL && !presorted) 
284         {
285           /* Sorting is needed. */
286           struct casefile *dst;
287           struct casereader *reader;
288           struct ccase c;
289           
290           dst = sort_active_file_to_casefile (agr.sort);
291           if (dst == NULL)
292             goto error;
293           reader = casefile_get_destructive_reader (dst);
294           while (casereader_read_xfer (reader, &c)) 
295             {
296               if (aggregate_single_case (&agr, &c, &agr.agr_case)) 
297                 sfm_write_case (agr.writer, &agr.agr_case);
298               case_destroy (&c);
299             }
300           casereader_destroy (reader);
301           casefile_destroy (dst);
302         }
303       else 
304         {
305           /* Active file is already sorted. */
306           procedure (presorted_agr_to_sysfile, &agr);
307         }
308       
309       if (agr.case_cnt > 0) 
310         {
311           dump_aggregate_info (&agr, &agr.agr_case);
312           sfm_write_case (agr.writer, &agr.agr_case);
313         }
314     }
315   
316   agr_destroy (&agr);
317   return CMD_SUCCESS;
318
319 error:
320   agr_destroy (&agr);
321   return CMD_FAILURE;
322 }
323
324 /* Parse all the aggregate functions. */
325 static int
326 parse_aggregate_functions (struct agr_proc *agr)
327 {
328   struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
329
330   /* Parse everything. */
331   tail = NULL;
332   for (;;)
333     {
334       char **dest;
335       char **dest_label;
336       int n_dest;
337
338       int include_missing;
339       const struct agr_func *function;
340       int func_index;
341
342       union value arg[2];
343
344       struct variable **src;
345       int n_src;
346
347       int i;
348
349       dest = NULL;
350       dest_label = NULL;
351       n_dest = 0;
352       src = NULL;
353       function = NULL;
354       n_src = 0;
355       arg[0].c = NULL;
356       arg[1].c = NULL;
357
358       /* Parse the list of target variables. */
359       while (!lex_match ('='))
360         {
361           int n_dest_prev = n_dest;
362           
363           if (!parse_DATA_LIST_vars (&dest, &n_dest,
364                                      PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
365             goto error;
366
367           /* Assign empty labels. */
368           {
369             int j;
370
371             dest_label = xrealloc (dest_label, sizeof *dest_label * n_dest);
372             for (j = n_dest_prev; j < n_dest; j++)
373               dest_label[j] = NULL;
374           }
375           
376           if (token == T_STRING)
377             {
378               ds_truncate (&tokstr, 255);
379               dest_label[n_dest - 1] = xstrdup (ds_c_str (&tokstr));
380               lex_get ();
381             }
382         }
383
384       /* Get the name of the aggregation function. */
385       if (token != T_ID)
386         {
387           lex_error (_("expecting aggregation function"));
388           goto error;
389         }
390
391       include_missing = 0;
392       if (tokid[strlen (tokid) - 1] == '.')
393         {
394           include_missing = 1;
395           tokid[strlen (tokid) - 1] = 0;
396         }
397       
398       for (function = agr_func_tab; function->name; function++)
399         if (!strcmp (function->name, tokid))
400           break;
401       if (NULL == function->name)
402         {
403           msg (SE, _("Unknown aggregation function %s."), tokid);
404           goto error;
405         }
406       func_index = function - agr_func_tab;
407       lex_get ();
408
409       /* Check for leading lparen. */
410       if (!lex_match ('('))
411         {
412           if (func_index == N)
413             func_index = N_NO_VARS;
414           else if (func_index == NU)
415             func_index = NU_NO_VARS;
416           else
417             {
418               lex_error (_("expecting `('"));
419               goto error;
420             }
421         }
422       else
423         {
424           /* Parse list of source variables. */
425           {
426             int pv_opts = PV_NO_SCRATCH;
427
428             if (func_index == SUM || func_index == MEAN || func_index == SD)
429               pv_opts |= PV_NUMERIC;
430             else if (function->n_args)
431               pv_opts |= PV_SAME_TYPE;
432
433             if (!parse_variables (default_dict, &src, &n_src, pv_opts))
434               goto error;
435           }
436
437           /* Parse function arguments, for those functions that
438              require arguments. */
439           if (function->n_args != 0)
440             for (i = 0; i < function->n_args; i++)
441               {
442                 int type;
443             
444                 lex_match (',');
445                 if (token == T_STRING)
446                   {
447                     arg[i].c = xstrdup (ds_c_str (&tokstr));
448                     type = ALPHA;
449                   }
450                 else if (lex_is_number ())
451                   {
452                     arg[i].f = tokval;
453                     type = NUMERIC;
454                   } else {
455                     msg (SE, _("Missing argument %d to %s."), i + 1, function->name);
456                     goto error;
457                   }
458             
459                 lex_get ();
460
461                 if (type != src[0]->type)
462                   {
463                     msg (SE, _("Arguments to %s must be of same type as "
464                                "source variables."),
465                          function->name);
466                     goto error;
467                   }
468               }
469
470           /* Trailing rparen. */
471           if (!lex_match(')'))
472             {
473               lex_error (_("expecting `)'"));
474               goto error;
475             }
476           
477           /* Now check that the number of source variables match
478              the number of target variables.  If we check earlier
479              than this, the user can get very misleading error
480              message, i.e. `AGGREGATE x=SUM(y t).' will get this
481              error message when a proper message would be more
482              like `unknown variable t'. */
483           if (n_src != n_dest)
484             {
485               msg (SE, _("Number of source variables (%d) does not match "
486                          "number of target variables (%d)."),
487                    n_src, n_dest);
488               goto error;
489             }
490
491           if ((func_index == PIN || func_index == POUT
492               || func_index == FIN || func_index == FOUT) 
493               && ((src[0]->type == NUMERIC && arg[0].f > arg[1].f)
494                   || (src[0]->type == ALPHA
495                       && st_compare_pad (arg[0].c, strlen (arg[0].c),
496                                          arg[1].c, strlen (arg[1].c)) > 0)))
497             {
498               union value t = arg[0];
499               arg[0] = arg[1];
500               arg[1] = t;
501                   
502               msg (SW, _("The value arguments passed to the %s function "
503                          "are out-of-order.  They will be treated as if "
504                          "they had been specified in the correct order."),
505                    function->name);
506             }
507         }
508         
509       /* Finally add these to the linked list of aggregation
510          variables. */
511       for (i = 0; i < n_dest; i++)
512         {
513           struct agr_var *v = xmalloc (sizeof *v);
514
515           /* Add variable to chain. */
516           if (agr->agr_vars != NULL)
517             tail->next = v;
518           else
519             agr->agr_vars = v;
520           tail = v;
521           tail->next = NULL;
522           v->moments = NULL;
523           
524           /* Create the target variable in the aggregate
525              dictionary. */
526           {
527             static const struct fmt_spec f8_2 = {FMT_F, 8, 2};
528             struct variable *destvar;
529             
530             v->function = func_index;
531
532             if (src)
533               {
534                 v->src = src[i];
535                 
536                 if (src[i]->type == ALPHA)
537                   {
538                     v->function |= FSTRING;
539                     v->string = xmalloc (src[i]->width);
540                   }
541
542                 if (function->alpha_type == ALPHA)
543                   destvar = dict_clone_var (agr->dict, v->src, dest[i]);
544                 else if (v->src->type == NUMERIC
545                          || function->alpha_type == NUMERIC)
546                   {
547                     destvar = dict_create_var (agr->dict, dest[i], 0);
548                     if (destvar != NULL) 
549                       {
550                         if ((func_index == N || func_index == NMISS)
551                             && dict_get_weight (default_dict) != NULL)
552                           destvar->print = destvar->write = f8_2; 
553                         else
554                           destvar->print = destvar->write = function->format;
555                       }
556                   }
557               } else {
558                 v->src = NULL;
559                 destvar = dict_create_var (agr->dict, dest[i], 0);
560                 if (func_index == N_NO_VARS
561                     && dict_get_weight (default_dict) != NULL)
562                   destvar->print = destvar->write = f8_2; 
563                 else
564                   destvar->print = destvar->write = function->format;
565               }
566           
567             if (!destvar)
568               {
569                 msg (SE, _("Variable name %s is not unique within the "
570                            "aggregate file dictionary, which contains "
571                            "the aggregate variables and the break "
572                            "variables."),
573                      dest[i]);
574                 goto error;
575               }
576
577             free (dest[i]);
578             destvar->init = 0;
579             if (dest_label[i])
580               {
581                 destvar->label = dest_label[i];
582                 dest_label[i] = NULL;
583               }
584
585             v->dest = destvar;
586           }
587           
588           v->include_missing = include_missing;
589
590           if (v->src != NULL)
591             {
592               int j;
593
594               if (v->src->type == NUMERIC)
595                 for (j = 0; j < function->n_args; j++)
596                   v->arg[j].f = arg[j].f;
597               else
598                 for (j = 0; j < function->n_args; j++)
599                   v->arg[j].c = xstrdup (arg[j].c);
600             }
601         }
602       
603       if (src != NULL && src[0]->type == ALPHA)
604         for (i = 0; i < function->n_args; i++)
605           {
606             free (arg[i].c);
607             arg[i].c = NULL;
608           }
609
610       free (src);
611       free (dest);
612       free (dest_label);
613
614       if (!lex_match ('/'))
615         {
616           if (token == '.')
617             return 1;
618
619           lex_error ("expecting end of command");
620           return 0;
621         }
622       continue;
623       
624     error:
625       for (i = 0; i < n_dest; i++)
626         {
627           free (dest[i]);
628           free (dest_label[i]);
629         }
630       free (dest);
631       free (dest_label);
632       free (arg[0].c);
633       free (arg[1].c);
634       if (src && n_src && src[0]->type == ALPHA)
635         for (i = 0; i < function->n_args; i++)
636           {
637             free (arg[i].c);
638             arg[i].c = NULL;
639           }
640       free (src);
641         
642       return 0;
643     }
644 }
645
646 /* Destroys AGR. */
647 static void
648 agr_destroy (struct agr_proc *agr)
649 {
650   struct agr_var *iter, *next;
651
652   sfm_close_writer (agr->writer);
653   if (agr->sort != NULL)
654     sort_destroy_criteria (agr->sort);
655   free (agr->break_vars);
656   free (agr->prev_break);
657   for (iter = agr->agr_vars; iter; iter = next)
658     {
659       next = iter->next;
660
661       if (iter->function & FSTRING)
662         {
663           int n_args;
664           int i;
665
666           n_args = agr_func_tab[iter->function & FUNC].n_args;
667           for (i = 0; i < n_args; i++)
668             free (iter->arg[i].c);
669           free (iter->string);
670         }
671       else if (iter->function == SD)
672         moments1_destroy (iter->moments);
673       free (iter);
674     }
675   if (agr->dict != NULL)
676     dict_destroy (agr->dict);
677
678   case_destroy (&agr->agr_case);
679 }
680 \f
681 /* Execution. */
682
683 static void accumulate_aggregate_info (struct agr_proc *,
684                                        const struct ccase *);
685 static void dump_aggregate_info (struct agr_proc *, struct ccase *);
686
687 /* Processes a single case INPUT for aggregation.  If output is
688    warranted, writes it to OUTPUT and returns nonzero.
689    Otherwise, returns zero and OUTPUT is unmodified. */
690 static int
691 aggregate_single_case (struct agr_proc *agr,
692                        const struct ccase *input, struct ccase *output)
693 {
694   /* The first case always begins a new break group.  We also need to
695      preserve the values of the case for later comparison. */
696   if (agr->case_cnt++ == 0)
697     {
698       int n_elem = 0;
699       
700       {
701         int i;
702
703         for (i = 0; i < agr->break_var_cnt; i++)
704           n_elem += agr->break_vars[i]->nv;
705       }
706       
707       agr->prev_break = xmalloc (sizeof *agr->prev_break * n_elem);
708
709       /* Copy INPUT into prev_break. */
710       {
711         union value *iter = agr->prev_break;
712         int i;
713
714         for (i = 0; i < agr->break_var_cnt; i++)
715           {
716             struct variable *v = agr->break_vars[i];
717             
718             if (v->type == NUMERIC)
719               (iter++)->f = case_num (input, v->fv);
720             else
721               {
722                 memcpy (iter->s, case_str (input, v->fv), v->width);
723                 iter += v->nv;
724               }
725           }
726       }
727             
728       accumulate_aggregate_info (agr, input);
729         
730       return 0;
731     }
732       
733   /* Compare the value of each break variable to the values on the
734      previous case. */
735   {
736     union value *iter = agr->prev_break;
737     int i;
738     
739     for (i = 0; i < agr->break_var_cnt; i++)
740       {
741         struct variable *v = agr->break_vars[i];
742       
743         switch (v->type)
744           {
745           case NUMERIC:
746             if (case_num (input, v->fv) != iter->f)
747               goto not_equal;
748             iter++;
749             break;
750           case ALPHA:
751             if (memcmp (case_str (input, v->fv), iter->s, v->width))
752               goto not_equal;
753             iter += v->nv;
754             break;
755           default:
756             assert (0);
757           }
758       }
759   }
760
761   accumulate_aggregate_info (agr, input);
762
763   return 0;
764   
765 not_equal:
766   /* The values of the break variable are different from the values on
767      the previous case.  That means that it's time to dump aggregate
768      info. */
769   dump_aggregate_info (agr, output);
770   initialize_aggregate_info (agr);
771   accumulate_aggregate_info (agr, input);
772
773   /* Copy INPUT into prev_break. */
774   {
775     union value *iter = agr->prev_break;
776     int i;
777
778     for (i = 0; i < agr->break_var_cnt; i++)
779       {
780         struct variable *v = agr->break_vars[i];
781             
782         if (v->type == NUMERIC)
783           (iter++)->f = case_num (input, v->fv);
784         else
785           {
786             memcpy (iter->s, case_str (input, v->fv), v->width);
787             iter += v->nv;
788           }
789       }
790   }
791   
792   return 1;
793 }
794
795 /* Accumulates aggregation data from the case INPUT. */
796 static void 
797 accumulate_aggregate_info (struct agr_proc *agr,
798                            const struct ccase *input)
799 {
800   struct agr_var *iter;
801   double weight;
802   int bad_warn = 1;
803
804   weight = dict_get_case_weight (default_dict, input, &bad_warn);
805
806   for (iter = agr->agr_vars; iter; iter = iter->next)
807     if (iter->src)
808       {
809         const union value *v = case_data (input, iter->src->fv);
810
811         if ((!iter->include_missing && is_missing (v, iter->src))
812             || (iter->include_missing && iter->src->type == NUMERIC
813                 && v->f == SYSMIS))
814           {
815             switch (iter->function)
816               {
817               case NMISS:
818               case NMISS | FSTRING:
819                 iter->dbl[0] += weight;
820                 break;
821               case NUMISS:
822               case NUMISS | FSTRING:
823                 iter->int1++;
824                 break;
825               }
826             iter->missing = 1;
827             continue;
828           }
829         
830         /* This is horrible.  There are too many possibilities. */
831         switch (iter->function)
832           {
833           case SUM:
834             iter->dbl[0] += v->f * weight;
835             iter->int1 = 1;
836             break;
837           case MEAN:
838             iter->dbl[0] += v->f * weight;
839             iter->dbl[1] += weight;
840             break;
841           case SD:
842             moments1_add (iter->moments, v->f, weight);
843             break;
844           case MAX:
845             iter->dbl[0] = max (iter->dbl[0], v->f);
846             iter->int1 = 1;
847             break;
848           case MAX | FSTRING:
849             if (memcmp (iter->string, v->s, iter->src->width) < 0)
850               memcpy (iter->string, v->s, iter->src->width);
851             iter->int1 = 1;
852             break;
853           case MIN:
854             iter->dbl[0] = min (iter->dbl[0], v->f);
855             iter->int1 = 1;
856             break;
857           case MIN | FSTRING:
858             if (memcmp (iter->string, v->s, iter->src->width) > 0)
859               memcpy (iter->string, v->s, iter->src->width);
860             iter->int1 = 1;
861             break;
862           case FGT:
863           case PGT:
864             if (v->f > iter->arg[0].f)
865               iter->dbl[0] += weight;
866             iter->dbl[1] += weight;
867             break;
868           case FGT | FSTRING:
869           case PGT | FSTRING:
870             if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
871               iter->dbl[0] += weight;
872             iter->dbl[1] += weight;
873             break;
874           case FLT:
875           case PLT:
876             if (v->f < iter->arg[0].f)
877               iter->dbl[0] += weight;
878             iter->dbl[1] += weight;
879             break;
880           case FLT | FSTRING:
881           case PLT | FSTRING:
882             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
883               iter->dbl[0] += weight;
884             iter->dbl[1] += weight;
885             break;
886           case FIN:
887           case PIN:
888             if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
889               iter->dbl[0] += weight;
890             iter->dbl[1] += weight;
891             break;
892           case FIN | FSTRING:
893           case PIN | FSTRING:
894             if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
895                 && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
896               iter->dbl[0] += weight;
897             iter->dbl[1] += weight;
898             break;
899           case FOUT:
900           case POUT:
901             if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
902               iter->dbl[0] += weight;
903             iter->dbl[1] += weight;
904             break;
905           case FOUT | FSTRING:
906           case POUT | FSTRING:
907             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
908                 || memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
909               iter->dbl[0] += weight;
910             iter->dbl[1] += weight;
911             break;
912           case N:
913           case N | FSTRING:
914             iter->dbl[0] += weight;
915             break;
916           case NU:
917           case NU | FSTRING:
918             iter->int1++;
919             break;
920           case FIRST:
921             if (iter->int1 == 0)
922               {
923                 iter->dbl[0] = v->f;
924                 iter->int1 = 1;
925               }
926             break;
927           case FIRST | FSTRING:
928             if (iter->int1 == 0)
929               {
930                 memcpy (iter->string, v->s, iter->src->width);
931                 iter->int1 = 1;
932               }
933             break;
934           case LAST:
935             iter->dbl[0] = v->f;
936             iter->int1 = 1;
937             break;
938           case LAST | FSTRING:
939             memcpy (iter->string, v->s, iter->src->width);
940             iter->int1 = 1;
941             break;
942           case NMISS:
943           case NMISS | FSTRING:
944           case NUMISS:
945           case NUMISS | FSTRING:
946             /* Our value is not missing or it would have been
947                caught earlier.  Nothing to do. */
948             break;
949           default:
950             assert (0);
951           }
952     } else {
953       switch (iter->function)
954         {
955         case N_NO_VARS:
956           iter->dbl[0] += weight;
957           break;
958         case NU_NO_VARS:
959           iter->int1++;
960           break;
961         default:
962           assert (0);
963         }
964     }
965 }
966
967 /* We've come to a record that differs from the previous in one or
968    more of the break variables.  Make an output record from the
969    accumulated statistics in the OUTPUT case. */
970 static void 
971 dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
972 {
973   {
974     int value_idx = 0;
975     int i;
976
977     for (i = 0; i < agr->break_var_cnt; i++) 
978       {
979         int nv = agr->break_vars[i]->nv;
980         memcpy (case_data_rw (output, value_idx),
981                 &agr->prev_break[value_idx],
982                 sizeof (union value) * nv);
983         value_idx += nv; 
984       }
985   }
986   
987   {
988     struct agr_var *i;
989   
990     for (i = agr->agr_vars; i; i = i->next)
991       {
992         union value *v = case_data_rw (output, i->dest->fv);
993
994         if (agr->missing == COLUMNWISE && i->missing != 0
995             && (i->function & FUNC) != N && (i->function & FUNC) != NU
996             && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
997           {
998             if (i->dest->type == ALPHA)
999               memset (v->s, ' ', i->dest->width);
1000             else
1001               v->f = SYSMIS;
1002             continue;
1003           }
1004         
1005         switch (i->function)
1006           {
1007           case SUM:
1008             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1009             break;
1010           case MEAN:
1011             v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
1012             break;
1013           case SD:
1014             {
1015               double variance;
1016
1017               /* FIXME: we should use two passes. */
1018               moments1_calculate (i->moments, NULL, NULL, &variance,
1019                                  NULL, NULL);
1020               if (variance != SYSMIS)
1021                 v->f = sqrt (variance);
1022               else
1023                 v->f = SYSMIS; 
1024             }
1025             break;
1026           case MAX:
1027           case MIN:
1028             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1029             break;
1030           case MAX | FSTRING:
1031           case MIN | FSTRING:
1032             if (i->int1)
1033               memcpy (v->s, i->string, i->dest->width);
1034             else
1035               memset (v->s, ' ', i->dest->width);
1036             break;
1037           case FGT:
1038           case FGT | FSTRING:
1039           case FLT:
1040           case FLT | FSTRING:
1041           case FIN:
1042           case FIN | FSTRING:
1043           case FOUT:
1044           case FOUT | FSTRING:
1045             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
1046             break;
1047           case PGT:
1048           case PGT | FSTRING:
1049           case PLT:
1050           case PLT | FSTRING:
1051           case PIN:
1052           case PIN | FSTRING:
1053           case POUT:
1054           case POUT | FSTRING:
1055             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
1056             break;
1057           case N:
1058           case N | FSTRING:
1059             v->f = i->dbl[0];
1060             break;
1061           case NU:
1062           case NU | FSTRING:
1063             v->f = i->int1;
1064             break;
1065           case FIRST:
1066           case LAST:
1067             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1068             break;
1069           case FIRST | FSTRING:
1070           case LAST | FSTRING:
1071             if (i->int1)
1072               memcpy (v->s, i->string, i->dest->width);
1073             else
1074               memset (v->s, ' ', i->dest->width);
1075             break;
1076           case N_NO_VARS:
1077             v->f = i->dbl[0];
1078             break;
1079           case NU_NO_VARS:
1080             v->f = i->int1;
1081             break;
1082           case NMISS:
1083           case NMISS | FSTRING:
1084             v->f = i->dbl[0];
1085             break;
1086           case NUMISS:
1087           case NUMISS | FSTRING:
1088             v->f = i->int1;
1089             break;
1090           default:
1091             assert (0);
1092           }
1093       }
1094   }
1095 }
1096
1097 /* Resets the state for all the aggregate functions. */
1098 static void
1099 initialize_aggregate_info (struct agr_proc *agr)
1100 {
1101   struct agr_var *iter;
1102
1103   for (iter = agr->agr_vars; iter; iter = iter->next)
1104     {
1105       iter->missing = 0;
1106       iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1107       iter->int1 = iter->int2 = 0;
1108       switch (iter->function)
1109         {
1110         case MIN:
1111           iter->dbl[0] = DBL_MAX;
1112           break;
1113         case MIN | FSTRING:
1114           memset (iter->string, 255, iter->src->width);
1115           break;
1116         case MAX:
1117           iter->dbl[0] = -DBL_MAX;
1118           break;
1119         case MAX | FSTRING:
1120           memset (iter->string, 0, iter->src->width);
1121           break;
1122         case SD:
1123           if (iter->moments == NULL)
1124             iter->moments = moments1_create (MOMENT_VARIANCE);
1125           else
1126             moments1_clear (iter->moments);
1127           break;
1128         default:
1129           break;
1130         }
1131     }
1132 }
1133 \f
1134 /* Aggregate each case as it comes through.  Cases which aren't needed
1135    are dropped. */
1136 static int
1137 agr_to_active_file (struct ccase *c, void *agr_)
1138 {
1139   struct agr_proc *agr = agr_;
1140
1141   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1142     agr->sink->class->write (agr->sink, &agr->agr_case);
1143
1144   return 1;
1145 }
1146
1147 /* Aggregate the current case and output it if we passed a
1148    breakpoint. */
1149 static int
1150 presorted_agr_to_sysfile (struct ccase *c, void *agr_) 
1151 {
1152   struct agr_proc *agr = agr_;
1153
1154   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1155     sfm_write_case (agr->writer, &agr->agr_case);
1156
1157   return 1;
1158 }