9d925707c2d819235f03a42cd5b9fed4ea4d1a28
[pspp] / src / aggregate.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include "error.h"
22 #include <stdlib.h>
23 #include "alloc.h"
24 #include "case.h"
25 #include "casefile.h"
26 #include "command.h"
27 #include "dictionary.h"
28 #include "error.h"
29 #include "file-handle.h"
30 #include "lexer.h"
31 #include "misc.h"
32 #include "moments.h"
33 #include "pool.h"
34 #include "settings.h"
35 #include "sfm-write.h"
36 #include "sort.h"
37 #include "str.h"
38 #include "var.h"
39 #include "vfm.h"
40 #include "vfmP.h"
41
42 /* Specifies how to make an aggregate variable. */
43 struct agr_var
44   {
45     struct agr_var *next;               /* Next in list. */
46
47     /* Collected during parsing. */
48     struct variable *src;       /* Source variable. */
49     struct variable *dest;      /* Target variable. */
50     int function;               /* Function. */
51     int include_missing;        /* 1=Include user-missing values. */
52     union value arg[2];         /* Arguments. */
53
54     /* Accumulated during AGGREGATE execution. */
55     double dbl[3];
56     int int1, int2;
57     char *string;
58     int missing;
59     struct moments1 *moments;
60   };
61
62 /* Aggregation functions. */
63 enum
64   {
65     NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
66     FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
67     N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
68     FUNC = 0x1f, /* Function mask. */
69     FSTRING = 1<<5, /* String function bit. */
70   };
71
72 /* Attributes of an aggregation function. */
73 struct agr_func
74   {
75     const char *name;           /* Aggregation function name. */
76     int n_args;                 /* Number of arguments. */
77     int alpha_type;             /* When given ALPHA arguments, output type. */
78     struct fmt_spec format;     /* Format spec if alpha_type != ALPHA. */
79   };
80
81 /* Attributes of aggregation functions. */
82 static const struct agr_func agr_func_tab[] = 
83   {
84     {"<NONE>",  0, -1,      {0, 0, 0}},
85     {"SUM",     0, -1,      {FMT_F, 8, 2}},
86     {"MEAN",    0, -1,      {FMT_F, 8, 2}},
87     {"SD",      0, -1,      {FMT_F, 8, 2}},
88     {"MAX",     0, ALPHA,   {-1, -1, -1}}, 
89     {"MIN",     0, ALPHA,   {-1, -1, -1}}, 
90     {"PGT",     1, NUMERIC, {FMT_F, 5, 1}},      
91     {"PLT",     1, NUMERIC, {FMT_F, 5, 1}},       
92     {"PIN",     2, NUMERIC, {FMT_F, 5, 1}},       
93     {"POUT",    2, NUMERIC, {FMT_F, 5, 1}},       
94     {"FGT",     1, NUMERIC, {FMT_F, 5, 3}},       
95     {"FLT",     1, NUMERIC, {FMT_F, 5, 3}},       
96     {"FIN",     2, NUMERIC, {FMT_F, 5, 3}},       
97     {"FOUT",    2, NUMERIC, {FMT_F, 5, 3}},       
98     {"N",       0, NUMERIC, {FMT_F, 7, 0}},       
99     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},       
100     {"NMISS",   0, NUMERIC, {FMT_F, 7, 0}},       
101     {"NUMISS",  0, NUMERIC, {FMT_F, 7, 0}},       
102     {"FIRST",   0, ALPHA,   {-1, -1, -1}}, 
103     {"LAST",    0, ALPHA,   {-1, -1, -1}},
104     {NULL,      0, -1,      {-1, -1, -1}},
105     {"N",       0, NUMERIC, {FMT_F, 7, 0}},
106     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},
107   };
108
109 /* Missing value types. */
110 enum missing_treatment
111   {
112     ITEMWISE,           /* Missing values item by item. */
113     COLUMNWISE          /* Missing values column by column. */
114   };
115
116 /* An entire AGGREGATE procedure. */
117 struct agr_proc 
118   {
119     /* We have either an output file or a sink. */
120     struct sfm_writer *writer;          /* Output file, or null if none. */
121     struct case_sink *sink;             /* Sink, or null if none. */
122
123     /* Break variables. */
124     struct sort_criteria *sort;         /* Sort criteria. */
125     struct variable **break_vars;       /* Break variables. */
126     size_t break_var_cnt;               /* Number of break variables. */
127     union value *prev_break;            /* Last values of break variables. */
128
129     enum missing_treatment missing;     /* How to treat missing values. */
130     struct agr_var *agr_vars;           /* First aggregate variable. */
131     struct dictionary *dict;            /* Aggregate dictionary. */
132     int case_cnt;                       /* Counts aggregated cases. */
133     struct ccase agr_case;              /* Aggregate case for output. */
134   };
135
136 static void initialize_aggregate_info (struct agr_proc *);
137
138 /* Prototypes. */
139 static int parse_aggregate_functions (struct agr_proc *);
140 static void agr_destroy (struct agr_proc *);
141 static int aggregate_single_case (struct agr_proc *agr,
142                                   const struct ccase *input,
143                                   struct ccase *output);
144 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
145
146 /* Aggregating to the active file. */
147 static int agr_to_active_file (struct ccase *, void *aux);
148
149 /* Aggregating to a system file. */
150 static int presorted_agr_to_sysfile (struct ccase *, void *aux);
151 \f
152 /* Parsing. */
153
154 /* Parses and executes the AGGREGATE procedure. */
155 int
156 cmd_aggregate (void)
157 {
158   struct agr_proc agr;
159   struct file_handle *out_file = NULL;
160
161   bool copy_documents = false;
162   bool presorted = false;
163   bool saw_direction;
164
165   memset(&agr, 0 , sizeof (agr));
166   agr.missing = ITEMWISE;
167   
168   agr.dict = dict_create ();
169   dict_set_label (agr.dict, dict_get_label (default_dict));
170   dict_set_documents (agr.dict, dict_get_documents (default_dict));
171
172   /* OUTFILE subcommand must be first. */
173   if (!lex_force_match_id ("OUTFILE"))
174     goto error;
175   lex_match ('=');
176   if (!lex_match ('*'))
177     {
178       out_file = fh_parse ();
179       if (out_file == NULL)
180         goto error;
181     }
182   
183   /* Read most of the subcommands. */
184   for (;;)
185     {
186       lex_match ('/');
187       
188       if (lex_match_id ("MISSING"))
189         {
190           lex_match ('=');
191           if (!lex_match_id ("COLUMNWISE"))
192             {
193               lex_error (_("while expecting COLUMNWISE"));
194               goto error;
195             }
196           agr.missing = COLUMNWISE;
197         }
198       else if (lex_match_id ("DOCUMENT"))
199         copy_documents = true;
200       else if (lex_match_id ("PRESORTED"))
201         presorted = true;
202       else if (lex_match_id ("BREAK"))
203         {
204           int i;
205
206           lex_match ('=');
207           agr.sort = sort_parse_criteria (default_dict,
208                                           &agr.break_vars, &agr.break_var_cnt,
209                                           &saw_direction);
210           if (agr.sort == NULL)
211             goto error;
212           
213           for (i = 0; i < agr.break_var_cnt; i++)
214             {
215               struct variable *v = dict_clone_var (agr.dict, agr.break_vars[i],
216                                                    agr.break_vars[i]->name);
217               assert (v != NULL);
218             }
219
220           /* BREAK must follow the options. */
221           break;
222         }
223       else
224         {
225           lex_error (_("expecting BREAK"));
226           goto error;
227         }
228     }
229   if (presorted && saw_direction)
230     msg (SW, _("When PRESORTED is specified, specifying sorting directions "
231                "with (A) or (D) has no effect.  Output data will be sorted "
232                "the same way as the input data."));
233       
234   /* Read in the aggregate functions. */
235   lex_match ('/');
236   if (!parse_aggregate_functions (&agr))
237     goto error;
238
239   /* Delete documents. */
240   if (!copy_documents)
241     dict_set_documents (agr.dict, NULL);
242
243   /* Cancel SPLIT FILE. */
244   dict_set_split_vars (agr.dict, NULL, 0);
245   
246   /* Initialize. */
247   agr.case_cnt = 0;
248   case_create (&agr.agr_case, dict_get_next_value_idx (agr.dict));
249   initialize_aggregate_info (&agr);
250
251   /* Output to active file or external file? */
252   if (out_file == NULL) 
253     {
254       /* The active file will be replaced by the aggregated data,
255          so TEMPORARY is moot. */
256       cancel_temporary ();
257
258       if (agr.sort != NULL && !presorted)
259         sort_active_file_in_place (agr.sort);
260
261       agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
262       if (agr.sink->class->open != NULL)
263         agr.sink->class->open (agr.sink);
264       vfm_sink = create_case_sink (&null_sink_class, default_dict, NULL);
265       procedure (agr_to_active_file, &agr);
266       if (agr.case_cnt > 0) 
267         {
268           dump_aggregate_info (&agr, &agr.agr_case);
269           agr.sink->class->write (agr.sink, &agr.agr_case);
270         }
271       dict_destroy (default_dict);
272       default_dict = agr.dict;
273       agr.dict = NULL;
274       vfm_source = agr.sink->class->make_source (agr.sink);
275       free_case_sink (agr.sink);
276     }
277   else
278     {
279       agr.writer = sfm_open_writer (out_file, agr.dict, get_scompression ());
280       if (agr.writer == NULL)
281         goto error;
282       
283       if (agr.sort != NULL && !presorted) 
284         {
285           /* Sorting is needed. */
286           struct casefile *dst;
287           struct casereader *reader;
288           struct ccase c;
289           
290           dst = sort_active_file_to_casefile (agr.sort);
291           if (dst == NULL)
292             goto error;
293           reader = casefile_get_destructive_reader (dst);
294           while (casereader_read_xfer (reader, &c)) 
295             {
296               if (aggregate_single_case (&agr, &c, &agr.agr_case)) 
297                 sfm_write_case (agr.writer, &agr.agr_case);
298               case_destroy (&c);
299             }
300           casereader_destroy (reader);
301           casefile_destroy (dst);
302         }
303       else 
304         {
305           /* Active file is already sorted. */
306           procedure (presorted_agr_to_sysfile, &agr);
307         }
308       
309       if (agr.case_cnt > 0) 
310         {
311           dump_aggregate_info (&agr, &agr.agr_case);
312           sfm_write_case (agr.writer, &agr.agr_case);
313         }
314     }
315   
316   agr_destroy (&agr);
317   return CMD_SUCCESS;
318
319 error:
320   agr_destroy (&agr);
321   return CMD_FAILURE;
322 }
323
324 /* Parse all the aggregate functions. */
325 static int
326 parse_aggregate_functions (struct agr_proc *agr)
327 {
328   struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
329
330   /* Parse everything. */
331   tail = NULL;
332   for (;;)
333     {
334       char **dest;
335       char **dest_label;
336       int n_dest;
337
338       int include_missing;
339       const struct agr_func *function;
340       int func_index;
341
342       union value arg[2];
343
344       struct variable **src;
345       int n_src;
346
347       int i;
348
349       dest = NULL;
350       dest_label = NULL;
351       n_dest = 0;
352       src = NULL;
353       function = NULL;
354       n_src = 0;
355       arg[0].c = NULL;
356       arg[1].c = NULL;
357
358       /* Parse the list of target variables. */
359       while (!lex_match ('='))
360         {
361           int n_dest_prev = n_dest;
362           
363           if (!parse_DATA_LIST_vars (&dest, &n_dest,
364                                      PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
365             goto error;
366
367           /* Assign empty labels. */
368           {
369             int j;
370
371             dest_label = xrealloc (dest_label, sizeof *dest_label * n_dest);
372             for (j = n_dest_prev; j < n_dest; j++)
373               dest_label[j] = NULL;
374           }
375           
376           if (token == T_STRING)
377             {
378               ds_truncate (&tokstr, 255);
379               dest_label[n_dest - 1] = xstrdup (ds_c_str (&tokstr));
380               lex_get ();
381             }
382         }
383
384       /* Get the name of the aggregation function. */
385       if (token != T_ID)
386         {
387           lex_error (_("expecting aggregation function"));
388           goto error;
389         }
390
391       include_missing = 0;
392       if (tokid[strlen (tokid) - 1] == '.')
393         {
394           include_missing = 1;
395           tokid[strlen (tokid) - 1] = 0;
396         }
397       
398       for (function = agr_func_tab; function->name; function++)
399         if (!strcmp (function->name, tokid))
400           break;
401       if (NULL == function->name)
402         {
403           msg (SE, _("Unknown aggregation function %s."), tokid);
404           goto error;
405         }
406       func_index = function - agr_func_tab;
407       lex_get ();
408
409       /* Check for leading lparen. */
410       if (!lex_match ('('))
411         {
412           if (func_index == N)
413             func_index = N_NO_VARS;
414           else if (func_index == NU)
415             func_index = NU_NO_VARS;
416           else
417             {
418               lex_error (_("expecting `('"));
419               goto error;
420             }
421         }
422       else
423         {
424           /* Parse list of source variables. */
425           {
426             int pv_opts = PV_NO_SCRATCH;
427
428             if (func_index == SUM || func_index == MEAN || func_index == SD)
429               pv_opts |= PV_NUMERIC;
430             else if (function->n_args)
431               pv_opts |= PV_SAME_TYPE;
432
433             if (!parse_variables (default_dict, &src, &n_src, pv_opts))
434               goto error;
435           }
436
437           /* Parse function arguments, for those functions that
438              require arguments. */
439           if (function->n_args != 0)
440             for (i = 0; i < function->n_args; i++)
441               {
442                 int type;
443             
444                 lex_match (',');
445                 if (token == T_STRING)
446                   {
447                     arg[i].c = xstrdup (ds_c_str (&tokstr));
448                     type = ALPHA;
449                   }
450                 else if (lex_is_number ())
451                   {
452                     arg[i].f = tokval;
453                     type = NUMERIC;
454                   } else {
455                     msg (SE, _("Missing argument %d to %s."), i + 1, function->name);
456                     goto error;
457                   }
458             
459                 lex_get ();
460
461                 if (type != src[0]->type)
462                   {
463                     msg (SE, _("Arguments to %s must be of same type as "
464                                "source variables."),
465                          function->name);
466                     goto error;
467                   }
468               }
469
470           /* Trailing rparen. */
471           if (!lex_match(')'))
472             {
473               lex_error (_("expecting `)'"));
474               goto error;
475             }
476           
477           /* Now check that the number of source variables match
478              the number of target variables.  If we check earlier
479              than this, the user can get very misleading error
480              message, i.e. `AGGREGATE x=SUM(y t).' will get this
481              error message when a proper message would be more
482              like `unknown variable t'. */
483           if (n_src != n_dest)
484             {
485               msg (SE, _("Number of source variables (%d) does not match "
486                          "number of target variables (%d)."),
487                    n_src, n_dest);
488               goto error;
489             }
490
491           if ((func_index == PIN || func_index == POUT
492               || func_index == FIN || func_index == FOUT) 
493               && ((src[0]->type == NUMERIC && arg[0].f > arg[1].f)
494                   || (src[0]->type == ALPHA
495                       && st_compare_pad (arg[0].c, strlen (arg[0].c),
496                                          arg[1].c, strlen (arg[1].c)) > 0)))
497             {
498               union value t = arg[0];
499               arg[0] = arg[1];
500               arg[1] = t;
501                   
502               msg (SW, _("The value arguments passed to the %s function "
503                          "are out-of-order.  They will be treated as if "
504                          "they had been specified in the correct order."),
505                    function->name);
506             }
507         }
508         
509       /* Finally add these to the linked list of aggregation
510          variables. */
511       for (i = 0; i < n_dest; i++)
512         {
513           struct agr_var *v = xmalloc (sizeof *v);
514
515           /* Add variable to chain. */
516           if (agr->agr_vars != NULL)
517             tail->next = v;
518           else
519             agr->agr_vars = v;
520           tail = v;
521           tail->next = NULL;
522           v->moments = NULL;
523           
524           /* Create the target variable in the aggregate
525              dictionary. */
526           {
527             struct variable *destvar;
528             
529             v->function = func_index;
530
531             if (src)
532               {
533                 v->src = src[i];
534                 
535                 if (src[i]->type == ALPHA)
536                   {
537                     v->function |= FSTRING;
538                     v->string = xmalloc (src[i]->width);
539                   }
540
541                 if (function->alpha_type == ALPHA)
542                   destvar = dict_clone_var (agr->dict, v->src, dest[i]);
543                 else if (v->src->type == NUMERIC
544                          || function->alpha_type == NUMERIC)
545                   {
546                     destvar = dict_create_var (agr->dict, dest[i], 0);
547                         
548                     if ((func_index == N
549                             || func_index == N_NO_VARS
550                             || func_index == NMISS)
551                         && dict_get_weight (default_dict) != NULL)
552                       {
553                         static const struct fmt_spec f8_2 = {FMT_F, 8, 2};
554                             
555                         destvar->print = destvar->write = f8_2; 
556                       }
557                     else
558                       destvar->print = destvar->write = function->format;
559                   }
560                 else 
561                   destvar = dict_create_var (agr->dict, dest[i],
562                                              v->src->width);
563               } else {
564                 v->src = NULL;
565                 destvar = dict_create_var (agr->dict, dest[i], 0);
566               }
567           
568             if (!destvar)
569               {
570                 msg (SE, _("Variable name %s is not unique within the "
571                            "aggregate file dictionary, which contains "
572                            "the aggregate variables and the break "
573                            "variables."),
574                      dest[i]);
575                 free (dest[i]);
576                 goto error;
577               }
578
579             free (dest[i]);
580             destvar->init = 0;
581             if (dest_label[i])
582               {
583                 destvar->label = dest_label[i];
584                 dest_label[i] = NULL;
585               }
586
587             v->dest = destvar;
588           }
589           
590           v->include_missing = include_missing;
591
592           if (v->src != NULL)
593             {
594               int j;
595
596               if (v->src->type == NUMERIC)
597                 for (j = 0; j < function->n_args; j++)
598                   v->arg[j].f = arg[j].f;
599               else
600                 for (j = 0; j < function->n_args; j++)
601                   v->arg[j].c = xstrdup (arg[j].c);
602             }
603         }
604       
605       if (src != NULL && src[0]->type == ALPHA)
606         for (i = 0; i < function->n_args; i++)
607           {
608             free (arg[i].c);
609             arg[i].c = NULL;
610           }
611
612       free (src);
613       free (dest);
614       free (dest_label);
615
616       if (!lex_match ('/'))
617         {
618           if (token == '.')
619             return 1;
620
621           lex_error ("expecting end of command");
622           return 0;
623         }
624       continue;
625       
626     error:
627       for (i = 0; i < n_dest; i++)
628         {
629           free (dest[i]);
630           free (dest_label[i]);
631         }
632       free (dest);
633       free (dest_label);
634       free (arg[0].c);
635       free (arg[1].c);
636       if (src && n_src && src[0]->type == ALPHA)
637         for (i = 0; i < function->n_args; i++)
638           {
639             free (arg[i].c);
640             arg[i].c = NULL;
641           }
642       free (src);
643         
644       return 0;
645     }
646 }
647
648 /* Destroys AGR. */
649 static void
650 agr_destroy (struct agr_proc *agr)
651 {
652   struct agr_var *iter, *next;
653
654   sfm_close_writer (agr->writer);
655   if (agr->sort != NULL)
656     sort_destroy_criteria (agr->sort);
657   free (agr->break_vars);
658   free (agr->prev_break);
659   for (iter = agr->agr_vars; iter; iter = next)
660     {
661       next = iter->next;
662
663       if (iter->function & FSTRING)
664         {
665           int n_args;
666           int i;
667
668           n_args = agr_func_tab[iter->function & FUNC].n_args;
669           for (i = 0; i < n_args; i++)
670             free (iter->arg[i].c);
671           free (iter->string);
672         }
673       else if (iter->function == SD)
674         moments1_destroy (iter->moments);
675       free (iter);
676     }
677   if (agr->dict != NULL)
678     dict_destroy (agr->dict);
679
680   case_destroy (&agr->agr_case);
681 }
682 \f
683 /* Execution. */
684
685 static void accumulate_aggregate_info (struct agr_proc *,
686                                        const struct ccase *);
687 static void dump_aggregate_info (struct agr_proc *, struct ccase *);
688
689 /* Processes a single case INPUT for aggregation.  If output is
690    warranted, writes it to OUTPUT and returns nonzero.
691    Otherwise, returns zero and OUTPUT is unmodified. */
692 static int
693 aggregate_single_case (struct agr_proc *agr,
694                        const struct ccase *input, struct ccase *output)
695 {
696   /* The first case always begins a new break group.  We also need to
697      preserve the values of the case for later comparison. */
698   if (agr->case_cnt++ == 0)
699     {
700       int n_elem = 0;
701       
702       {
703         int i;
704
705         for (i = 0; i < agr->break_var_cnt; i++)
706           n_elem += agr->break_vars[i]->nv;
707       }
708       
709       agr->prev_break = xmalloc (sizeof *agr->prev_break * n_elem);
710
711       /* Copy INPUT into prev_break. */
712       {
713         union value *iter = agr->prev_break;
714         int i;
715
716         for (i = 0; i < agr->break_var_cnt; i++)
717           {
718             struct variable *v = agr->break_vars[i];
719             
720             if (v->type == NUMERIC)
721               (iter++)->f = case_num (input, v->fv);
722             else
723               {
724                 memcpy (iter->s, case_str (input, v->fv), v->width);
725                 iter += v->nv;
726               }
727           }
728       }
729             
730       accumulate_aggregate_info (agr, input);
731         
732       return 0;
733     }
734       
735   /* Compare the value of each break variable to the values on the
736      previous case. */
737   {
738     union value *iter = agr->prev_break;
739     int i;
740     
741     for (i = 0; i < agr->break_var_cnt; i++)
742       {
743         struct variable *v = agr->break_vars[i];
744       
745         switch (v->type)
746           {
747           case NUMERIC:
748             if (case_num (input, v->fv) != iter->f)
749               goto not_equal;
750             iter++;
751             break;
752           case ALPHA:
753             if (memcmp (case_str (input, v->fv), iter->s, v->width))
754               goto not_equal;
755             iter += v->nv;
756             break;
757           default:
758             assert (0);
759           }
760       }
761   }
762
763   accumulate_aggregate_info (agr, input);
764
765   return 0;
766   
767 not_equal:
768   /* The values of the break variable are different from the values on
769      the previous case.  That means that it's time to dump aggregate
770      info. */
771   dump_aggregate_info (agr, output);
772   initialize_aggregate_info (agr);
773   accumulate_aggregate_info (agr, input);
774
775   /* Copy INPUT into prev_break. */
776   {
777     union value *iter = agr->prev_break;
778     int i;
779
780     for (i = 0; i < agr->break_var_cnt; i++)
781       {
782         struct variable *v = agr->break_vars[i];
783             
784         if (v->type == NUMERIC)
785           (iter++)->f = case_num (input, v->fv);
786         else
787           {
788             memcpy (iter->s, case_str (input, v->fv), v->width);
789             iter += v->nv;
790           }
791       }
792   }
793   
794   return 1;
795 }
796
797 /* Accumulates aggregation data from the case INPUT. */
798 static void 
799 accumulate_aggregate_info (struct agr_proc *agr,
800                            const struct ccase *input)
801 {
802   struct agr_var *iter;
803   double weight;
804   int bad_warn = 1;
805
806   weight = dict_get_case_weight (default_dict, input, &bad_warn);
807
808   for (iter = agr->agr_vars; iter; iter = iter->next)
809     if (iter->src)
810       {
811         const union value *v = case_data (input, iter->src->fv);
812
813         if ((!iter->include_missing && is_missing (v, iter->src))
814             || (iter->include_missing && iter->src->type == NUMERIC
815                 && v->f == SYSMIS))
816           {
817             switch (iter->function)
818               {
819               case NMISS:
820                 iter->dbl[0] += weight;
821                 break;
822               case NUMISS:
823                 iter->int1++;
824                 break;
825               }
826             iter->missing = 1;
827             continue;
828           }
829         
830         /* This is horrible.  There are too many possibilities. */
831         switch (iter->function)
832           {
833           case SUM:
834             iter->dbl[0] += v->f * weight;
835             break;
836           case MEAN:
837             iter->dbl[0] += v->f * weight;
838             iter->dbl[1] += weight;
839             break;
840           case SD:
841             moments1_add (iter->moments, v->f, weight);
842             break;
843           case MAX:
844             iter->dbl[0] = max (iter->dbl[0], v->f);
845             iter->int1 = 1;
846             break;
847           case MAX | FSTRING:
848             if (memcmp (iter->string, v->s, iter->src->width) < 0)
849               memcpy (iter->string, v->s, iter->src->width);
850             iter->int1 = 1;
851             break;
852           case MIN:
853             iter->dbl[0] = min (iter->dbl[0], v->f);
854             iter->int1 = 1;
855             break;
856           case MIN | FSTRING:
857             if (memcmp (iter->string, v->s, iter->src->width) > 0)
858               memcpy (iter->string, v->s, iter->src->width);
859             iter->int1 = 1;
860             break;
861           case FGT:
862           case PGT:
863             if (v->f > iter->arg[0].f)
864               iter->dbl[0] += weight;
865             iter->dbl[1] += weight;
866             break;
867           case FGT | FSTRING:
868           case PGT | FSTRING:
869             if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
870               iter->dbl[0] += weight;
871             iter->dbl[1] += weight;
872             break;
873           case FLT:
874           case PLT:
875             if (v->f < iter->arg[0].f)
876               iter->dbl[0] += weight;
877             iter->dbl[1] += weight;
878             break;
879           case FLT | FSTRING:
880           case PLT | FSTRING:
881             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
882               iter->dbl[0] += weight;
883             iter->dbl[1] += weight;
884             break;
885           case FIN:
886           case PIN:
887             if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
888               iter->dbl[0] += weight;
889             iter->dbl[1] += weight;
890             break;
891           case FIN | FSTRING:
892           case PIN | FSTRING:
893             if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
894                 && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
895               iter->dbl[0] += weight;
896             iter->dbl[1] += weight;
897             break;
898           case FOUT:
899           case POUT:
900             if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
901               iter->dbl[0] += weight;
902             iter->dbl[1] += weight;
903             break;
904           case FOUT | FSTRING:
905           case POUT | FSTRING:
906             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
907                 && memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
908               iter->dbl[0] += weight;
909             iter->dbl[1] += weight;
910             break;
911           case N:
912           case N | FSTRING:
913             iter->dbl[0] += weight;
914             break;
915           case NU:
916           case NU | FSTRING:
917             iter->int1++;
918             break;
919           case FIRST:
920             if (iter->int1 == 0)
921               {
922                 iter->dbl[0] = v->f;
923                 iter->int1 = 1;
924               }
925             break;
926           case FIRST | FSTRING:
927             if (iter->int1 == 0)
928               {
929                 memcpy (iter->string, v->s, iter->src->width);
930                 iter->int1 = 1;
931               }
932             break;
933           case LAST:
934             iter->dbl[0] = v->f;
935             iter->int1 = 1;
936             break;
937           case LAST | FSTRING:
938             memcpy (iter->string, v->s, iter->src->width);
939             iter->int1 = 1;
940             break;
941           case NMISS:
942           case NMISS | FSTRING:
943           case NUMISS:
944           case NUMISS | FSTRING:
945             /* Our value is not missing or it would have been
946                caught earlier.  Nothing to do. */
947             break;
948           default:
949             assert (0);
950           }
951     } else {
952       switch (iter->function)
953         {
954         case N_NO_VARS:
955           iter->dbl[0] += weight;
956           break;
957         case NU_NO_VARS:
958           iter->int1++;
959           break;
960         default:
961           assert (0);
962         }
963     }
964 }
965
966 /* We've come to a record that differs from the previous in one or
967    more of the break variables.  Make an output record from the
968    accumulated statistics in the OUTPUT case. */
969 static void 
970 dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
971 {
972   {
973     int value_idx = 0;
974     int i;
975
976     for (i = 0; i < agr->break_var_cnt; i++) 
977       {
978         int nv = agr->break_vars[i]->nv;
979         memcpy (case_data_rw (output, value_idx),
980                 &agr->prev_break[value_idx],
981                 sizeof (union value) * nv);
982         value_idx += nv; 
983       }
984   }
985   
986   {
987     struct agr_var *i;
988   
989     for (i = agr->agr_vars; i; i = i->next)
990       {
991         union value *v = case_data_rw (output, i->dest->fv);
992
993         if (agr->missing == COLUMNWISE && i->missing != 0
994             && (i->function & FUNC) != N && (i->function & FUNC) != NU
995             && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
996           {
997             if (i->function & FSTRING)
998               memset (v->s, ' ', i->dest->width);
999             else
1000               v->f = SYSMIS;
1001             continue;
1002           }
1003         
1004         switch (i->function)
1005           {
1006           case SUM:
1007             v->f = i->dbl[0];
1008             break;
1009           case MEAN:
1010             v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
1011             break;
1012           case SD:
1013             {
1014               double variance;
1015
1016               /* FIXME: we should use two passes. */
1017               moments1_calculate (i->moments, NULL, NULL, &variance,
1018                                  NULL, NULL);
1019               if (variance != SYSMIS)
1020                 v->f = sqrt (variance);
1021               else
1022                 v->f = SYSMIS; 
1023             }
1024             break;
1025           case MAX:
1026           case MIN:
1027             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1028             break;
1029           case MAX | FSTRING:
1030           case MIN | FSTRING:
1031             if (i->int1)
1032               memcpy (v->s, i->string, i->dest->width);
1033             else
1034               memset (v->s, ' ', i->dest->width);
1035             break;
1036           case FGT | FSTRING:
1037           case FLT | FSTRING:
1038           case FIN | FSTRING:
1039           case FOUT | FSTRING:
1040             v->f = i->int2 ? (double) i->int1 / (double) i->int2 : SYSMIS;
1041             break;
1042           case FGT:
1043           case FLT:
1044           case FIN:
1045           case FOUT:
1046             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
1047             break;
1048           case PGT:
1049           case PGT | FSTRING:
1050           case PLT:
1051           case PLT | FSTRING:
1052           case PIN:
1053           case PIN | FSTRING:
1054           case POUT:
1055           case POUT | FSTRING:
1056             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
1057             break;
1058           case N:
1059           case N | FSTRING:
1060             v->f = i->dbl[0];
1061             break;
1062           case NU:
1063           case NU | FSTRING:
1064             v->f = i->int1;
1065             break;
1066           case FIRST:
1067           case LAST:
1068             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1069             break;
1070           case FIRST | FSTRING:
1071           case LAST | FSTRING:
1072             if (i->int1)
1073               memcpy (v->s, i->string, i->dest->width);
1074             else
1075               memset (v->s, ' ', i->dest->width);
1076             break;
1077           case N_NO_VARS:
1078             v->f = i->dbl[0];
1079             break;
1080           case NU_NO_VARS:
1081             v->f = i->int1;
1082             break;
1083           case NMISS:
1084           case NMISS | FSTRING:
1085             v->f = i->dbl[0];
1086             break;
1087           case NUMISS:
1088           case NUMISS | FSTRING:
1089             v->f = i->int1;
1090             break;
1091           default:
1092             assert (0);
1093           }
1094       }
1095   }
1096 }
1097
1098 /* Resets the state for all the aggregate functions. */
1099 static void
1100 initialize_aggregate_info (struct agr_proc *agr)
1101 {
1102   struct agr_var *iter;
1103
1104   for (iter = agr->agr_vars; iter; iter = iter->next)
1105     {
1106       iter->missing = 0;
1107       switch (iter->function)
1108         {
1109         case MIN:
1110           iter->dbl[0] = DBL_MAX;
1111           iter->int1 = 0;
1112           break;
1113         case MIN | FSTRING:
1114           memset (iter->string, 255, iter->src->width);
1115           break;
1116         case MAX:
1117           iter->dbl[0] = -DBL_MAX;
1118           iter->int1 = 0;
1119           break;
1120         case MAX | FSTRING:
1121           memset (iter->string, 0, iter->src->width);
1122           break;
1123         case SD:
1124           if (iter->moments == NULL)
1125             iter->moments = moments1_create (MOMENT_VARIANCE);
1126           else
1127             moments1_clear (iter->moments);
1128           break;
1129         default:
1130           iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1131           iter->int1 = iter->int2 = 0;
1132           break;
1133         }
1134     }
1135 }
1136 \f
1137 /* Aggregate each case as it comes through.  Cases which aren't needed
1138    are dropped. */
1139 static int
1140 agr_to_active_file (struct ccase *c, void *agr_)
1141 {
1142   struct agr_proc *agr = agr_;
1143
1144   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1145     agr->sink->class->write (agr->sink, &agr->agr_case);
1146
1147   return 1;
1148 }
1149
1150 /* Aggregate the current case and output it if we passed a
1151    breakpoint. */
1152 static int
1153 presorted_agr_to_sysfile (struct ccase *c, void *agr_) 
1154 {
1155   struct agr_proc *agr = agr_;
1156
1157   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1158     sfm_write_case (agr->writer, &agr->agr_case);
1159
1160   return 1;
1161 }