First phase of making SORT CASES stable (PR 12035).
[pspp-builds.git] / src / aggregate.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include "error.h"
22 #include <stdlib.h>
23 #include "alloc.h"
24 #include "case.h"
25 #include "casefile.h"
26 #include "command.h"
27 #include "dictionary.h"
28 #include "error.h"
29 #include "file-handle.h"
30 #include "lexer.h"
31 #include "misc.h"
32 #include "moments.h"
33 #include "pool.h"
34 #include "settings.h"
35 #include "sfm-write.h"
36 #include "sort.h"
37 #include "str.h"
38 #include "var.h"
39 #include "vfm.h"
40 #include "vfmP.h"
41
42 /* Specifies how to make an aggregate variable. */
43 struct agr_var
44   {
45     struct agr_var *next;               /* Next in list. */
46
47     /* Collected during parsing. */
48     struct variable *src;       /* Source variable. */
49     struct variable *dest;      /* Target variable. */
50     int function;               /* Function. */
51     int include_missing;        /* 1=Include user-missing values. */
52     union value arg[2];         /* Arguments. */
53
54     /* Accumulated during AGGREGATE execution. */
55     double dbl[3];
56     int int1, int2;
57     char *string;
58     int missing;
59     struct moments1 *moments;
60   };
61
62 /* Aggregation functions. */
63 enum
64   {
65     NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
66     FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
67     N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
68     FUNC = 0x1f, /* Function mask. */
69     FSTRING = 1<<5, /* String function bit. */
70   };
71
72 /* Attributes of an aggregation function. */
73 struct agr_func
74   {
75     const char *name;           /* Aggregation function name. */
76     int n_args;                 /* Number of arguments. */
77     int alpha_type;             /* When given ALPHA arguments, output type. */
78     struct fmt_spec format;     /* Format spec if alpha_type != ALPHA. */
79   };
80
81 /* Attributes of aggregation functions. */
82 static const struct agr_func agr_func_tab[] = 
83   {
84     {"<NONE>",  0, -1,      {0, 0, 0}},
85     {"SUM",     0, -1,      {FMT_F, 8, 2}},
86     {"MEAN",    0, -1,      {FMT_F, 8, 2}},
87     {"SD",      0, -1,      {FMT_F, 8, 2}},
88     {"MAX",     0, ALPHA,   {-1, -1, -1}}, 
89     {"MIN",     0, ALPHA,   {-1, -1, -1}}, 
90     {"PGT",     1, NUMERIC, {FMT_F, 5, 1}},      
91     {"PLT",     1, NUMERIC, {FMT_F, 5, 1}},       
92     {"PIN",     2, NUMERIC, {FMT_F, 5, 1}},       
93     {"POUT",    2, NUMERIC, {FMT_F, 5, 1}},       
94     {"FGT",     1, NUMERIC, {FMT_F, 5, 3}},       
95     {"FLT",     1, NUMERIC, {FMT_F, 5, 3}},       
96     {"FIN",     2, NUMERIC, {FMT_F, 5, 3}},       
97     {"FOUT",    2, NUMERIC, {FMT_F, 5, 3}},       
98     {"N",       0, NUMERIC, {FMT_F, 7, 0}},       
99     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},       
100     {"NMISS",   0, NUMERIC, {FMT_F, 7, 0}},       
101     {"NUMISS",  0, NUMERIC, {FMT_F, 7, 0}},       
102     {"FIRST",   0, ALPHA,   {-1, -1, -1}}, 
103     {"LAST",    0, ALPHA,   {-1, -1, -1}},
104     {NULL,      0, -1,      {-1, -1, -1}},
105     {"N",       0, NUMERIC, {FMT_F, 7, 0}},
106     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},
107   };
108
109 /* Missing value types. */
110 enum missing_treatment
111   {
112     ITEMWISE,           /* Missing values item by item. */
113     COLUMNWISE          /* Missing values column by column. */
114   };
115
116 /* An entire AGGREGATE procedure. */
117 struct agr_proc 
118   {
119     /* We have either an output file or a sink. */
120     struct sfm_writer *writer;          /* Output file, or null if none. */
121     struct case_sink *sink;             /* Sink, or null if none. */
122
123     /* Break variables. */
124     struct sort_criteria *sort;         /* Sort criteria. */
125     struct variable **break_vars;       /* Break variables. */
126     size_t break_var_cnt;               /* Number of break variables. */
127     union value *prev_break;            /* Last values of break variables. */
128
129     enum missing_treatment missing;     /* How to treat missing values. */
130     struct agr_var *agr_vars;           /* First aggregate variable. */
131     struct dictionary *dict;            /* Aggregate dictionary. */
132     int case_cnt;                       /* Counts aggregated cases. */
133     struct ccase agr_case;              /* Aggregate case for output. */
134   };
135
136 static void initialize_aggregate_info (struct agr_proc *);
137
138 /* Prototypes. */
139 static int parse_aggregate_functions (struct agr_proc *);
140 static void agr_destroy (struct agr_proc *);
141 static int aggregate_single_case (struct agr_proc *agr,
142                                   const struct ccase *input,
143                                   struct ccase *output);
144 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
145
146 /* Aggregating to the active file. */
147 static int agr_to_active_file (struct ccase *, void *aux);
148
149 /* Aggregating to a system file. */
150 static int presorted_agr_to_sysfile (struct ccase *, void *aux);
151 \f
152 /* Parsing. */
153
154 /* Parses and executes the AGGREGATE procedure. */
155 int
156 cmd_aggregate (void)
157 {
158   struct agr_proc agr;
159   struct file_handle *out_file = NULL;
160
161   bool copy_documents = false;
162   bool presorted = false;
163   bool saw_direction;
164
165   memset(&agr, 0 , sizeof (agr));
166   agr.missing = ITEMWISE;
167   
168   agr.dict = dict_create ();
169   dict_set_label (agr.dict, dict_get_label (default_dict));
170   dict_set_documents (agr.dict, dict_get_documents (default_dict));
171
172   /* OUTFILE subcommand must be first. */
173   if (!lex_force_match_id ("OUTFILE"))
174     goto error;
175   lex_match ('=');
176   if (!lex_match ('*'))
177     {
178       out_file = fh_parse ();
179       if (out_file == NULL)
180         goto error;
181     }
182   
183   /* Read most of the subcommands. */
184   for (;;)
185     {
186       lex_match ('/');
187       
188       if (lex_match_id ("MISSING"))
189         {
190           lex_match ('=');
191           if (!lex_match_id ("COLUMNWISE"))
192             {
193               lex_error (_("while expecting COLUMNWISE"));
194               goto error;
195             }
196           agr.missing = COLUMNWISE;
197         }
198       else if (lex_match_id ("DOCUMENT"))
199         copy_documents = true;
200       else if (lex_match_id ("PRESORTED"))
201         presorted = true;
202       else if (lex_match_id ("BREAK"))
203         {
204           int i;
205
206           lex_match ('=');
207           agr.sort = sort_parse_criteria (default_dict,
208                                           &agr.break_vars, &agr.break_var_cnt,
209                                           &saw_direction);
210           if (agr.sort == NULL)
211             goto error;
212           
213           for (i = 0; i < agr.break_var_cnt; i++)
214             {
215               struct variable *v = dict_clone_var (agr.dict, agr.break_vars[i],
216                                                    agr.break_vars[i]->name);
217               assert (v != NULL);
218             }
219
220           /* BREAK must follow the options. */
221           break;
222         }
223       else
224         {
225           lex_error (_("expecting BREAK"));
226           goto error;
227         }
228     }
229   if (presorted && saw_direction)
230     msg (SW, _("When PRESORTED is specified, specifying sorting directions "
231                "with (A) or (D) has no effect.  Output data will be sorted "
232                "the same way as the input data."));
233       
234   /* Read in the aggregate functions. */
235   lex_match ('/');
236   if (!parse_aggregate_functions (&agr))
237     goto error;
238
239   /* Delete documents. */
240   if (!copy_documents)
241     dict_set_documents (agr.dict, NULL);
242
243   /* Cancel SPLIT FILE. */
244   dict_set_split_vars (agr.dict, NULL, 0);
245   
246   /* Initialize. */
247   agr.case_cnt = 0;
248   case_create (&agr.agr_case, dict_get_next_value_idx (agr.dict));
249   initialize_aggregate_info (&agr);
250
251   /* Output to active file or external file? */
252   if (out_file == NULL) 
253     {
254       /* The active file will be replaced by the aggregated data,
255          so TEMPORARY is moot. */
256       cancel_temporary ();
257
258       if (agr.sort != NULL && !presorted)
259         sort_active_file_in_place (agr.sort);
260
261       agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
262       if (agr.sink->class->open != NULL)
263         agr.sink->class->open (agr.sink);
264       vfm_sink = create_case_sink (&null_sink_class, default_dict, NULL);
265       procedure (agr_to_active_file, &agr);
266       if (agr.case_cnt > 0) 
267         {
268           dump_aggregate_info (&agr, &agr.agr_case);
269           agr.sink->class->write (agr.sink, &agr.agr_case);
270         }
271       dict_destroy (default_dict);
272       default_dict = agr.dict;
273       agr.dict = NULL;
274       vfm_source = agr.sink->class->make_source (agr.sink);
275       free_case_sink (agr.sink);
276     }
277   else
278     {
279       agr.writer = sfm_open_writer (out_file, agr.dict, get_scompression ());
280       if (agr.writer == NULL)
281         goto error;
282       
283       if (agr.sort != NULL && !presorted) 
284         {
285           /* Sorting is needed. */
286           struct casefile *dst;
287           struct casereader *reader;
288           struct ccase c;
289           
290           dst = sort_active_file_to_casefile (agr.sort);
291           if (dst == NULL)
292             goto error;
293           reader = casefile_get_destructive_reader (dst);
294           while (casereader_read_xfer (reader, &c)) 
295             {
296               if (aggregate_single_case (&agr, &c, &agr.agr_case)) 
297                 sfm_write_case (agr.writer, &agr.agr_case);
298               case_destroy (&c);
299             }
300           casereader_destroy (reader);
301           casefile_destroy (dst);
302         }
303       else 
304         {
305           /* Active file is already sorted. */
306           procedure (presorted_agr_to_sysfile, &agr);
307         }
308       
309       if (agr.case_cnt > 0) 
310         {
311           dump_aggregate_info (&agr, &agr.agr_case);
312           sfm_write_case (agr.writer, &agr.agr_case);
313         }
314     }
315   
316   agr_destroy (&agr);
317   return CMD_SUCCESS;
318
319 error:
320   agr_destroy (&agr);
321   return CMD_FAILURE;
322 }
323
324 /* Parse all the aggregate functions. */
325 static int
326 parse_aggregate_functions (struct agr_proc *agr)
327 {
328   struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
329
330   /* Parse everything. */
331   tail = NULL;
332   for (;;)
333     {
334       char **dest;
335       char **dest_label;
336       int n_dest;
337
338       int include_missing;
339       const struct agr_func *function;
340       int func_index;
341
342       union value arg[2];
343
344       struct variable **src;
345       int n_src;
346
347       int i;
348
349       dest = NULL;
350       dest_label = NULL;
351       n_dest = 0;
352       src = NULL;
353       function = NULL;
354       n_src = 0;
355       arg[0].c = NULL;
356       arg[1].c = NULL;
357
358       /* Parse the list of target variables. */
359       while (!lex_match ('='))
360         {
361           int n_dest_prev = n_dest;
362           
363           if (!parse_DATA_LIST_vars (&dest, &n_dest,
364                                      PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
365             goto error;
366
367           /* Assign empty labels. */
368           {
369             int j;
370
371             dest_label = xrealloc (dest_label, sizeof *dest_label * n_dest);
372             for (j = n_dest_prev; j < n_dest; j++)
373               dest_label[j] = NULL;
374           }
375           
376           if (token == T_STRING)
377             {
378               ds_truncate (&tokstr, 255);
379               dest_label[n_dest - 1] = xstrdup (ds_c_str (&tokstr));
380               lex_get ();
381             }
382         }
383
384       /* Get the name of the aggregation function. */
385       if (token != T_ID)
386         {
387           lex_error (_("expecting aggregation function"));
388           goto error;
389         }
390
391       include_missing = 0;
392       if (tokid[strlen (tokid) - 1] == '.')
393         {
394           include_missing = 1;
395           tokid[strlen (tokid) - 1] = 0;
396         }
397       
398       for (function = agr_func_tab; function->name; function++)
399         if (!strcmp (function->name, tokid))
400           break;
401       if (NULL == function->name)
402         {
403           msg (SE, _("Unknown aggregation function %s."), tokid);
404           goto error;
405         }
406       func_index = function - agr_func_tab;
407       lex_get ();
408
409       /* Check for leading lparen. */
410       if (!lex_match ('('))
411         {
412           if (func_index == N)
413             func_index = N_NO_VARS;
414           else if (func_index == NU)
415             func_index = NU_NO_VARS;
416           else
417             {
418               lex_error (_("expecting `('"));
419               goto error;
420             }
421         }
422       else
423         {
424           /* Parse list of source variables. */
425           {
426             int pv_opts = PV_NO_SCRATCH;
427
428             if (func_index == SUM || func_index == MEAN || func_index == SD)
429               pv_opts |= PV_NUMERIC;
430             else if (function->n_args)
431               pv_opts |= PV_SAME_TYPE;
432
433             if (!parse_variables (default_dict, &src, &n_src, pv_opts))
434               goto error;
435           }
436
437           /* Parse function arguments, for those functions that
438              require arguments. */
439           if (function->n_args != 0)
440             for (i = 0; i < function->n_args; i++)
441               {
442                 int type;
443             
444                 lex_match (',');
445                 if (token == T_STRING)
446                   {
447                     arg[i].c = xstrdup (ds_c_str (&tokstr));
448                     type = ALPHA;
449                   }
450                 else if (lex_is_number ())
451                   {
452                     arg[i].f = tokval;
453                     type = NUMERIC;
454                   } else {
455                     msg (SE, _("Missing argument %d to %s."), i + 1, function->name);
456                     goto error;
457                   }
458             
459                 lex_get ();
460
461                 if (type != src[0]->type)
462                   {
463                     msg (SE, _("Arguments to %s must be of same type as "
464                                "source variables."),
465                          function->name);
466                     goto error;
467                   }
468               }
469
470           /* Trailing rparen. */
471           if (!lex_match(')'))
472             {
473               lex_error (_("expecting `)'"));
474               goto error;
475             }
476           
477           /* Now check that the number of source variables match
478              the number of target variables.  If we check earlier
479              than this, the user can get very misleading error
480              message, i.e. `AGGREGATE x=SUM(y t).' will get this
481              error message when a proper message would be more
482              like `unknown variable t'. */
483           if (n_src != n_dest)
484             {
485               msg (SE, _("Number of source variables (%d) does not match "
486                          "number of target variables (%d)."),
487                    n_src, n_dest);
488               goto error;
489             }
490
491           if ((func_index == PIN || func_index == POUT
492               || func_index == FIN || func_index == FOUT) 
493               && ((src[0]->type == NUMERIC && arg[0].f > arg[1].f)
494                   || (src[0]->type == ALPHA
495                       && st_compare_pad (arg[0].c, strlen (arg[0].c),
496                                          arg[1].c, strlen (arg[1].c)) > 0)))
497             {
498               union value t = arg[0];
499               arg[0] = arg[1];
500               arg[1] = t;
501                   
502               msg (SW, _("The value arguments passed to the %s function "
503                          "are out-of-order.  They will be treated as if "
504                          "they had been specified in the correct order."),
505                    function->name);
506             }
507         }
508         
509       /* Finally add these to the linked list of aggregation
510          variables. */
511       for (i = 0; i < n_dest; i++)
512         {
513           struct agr_var *v = xmalloc (sizeof *v);
514
515           /* Add variable to chain. */
516           if (agr->agr_vars != NULL)
517             tail->next = v;
518           else
519             agr->agr_vars = v;
520           tail = v;
521           tail->next = NULL;
522           v->moments = NULL;
523           
524           /* Create the target variable in the aggregate
525              dictionary. */
526           {
527             struct variable *destvar;
528             
529             v->function = func_index;
530
531             if (src)
532               {
533                 v->src = src[i];
534                 
535                 if (src[i]->type == ALPHA)
536                   {
537                     v->function |= FSTRING;
538                     v->string = xmalloc (src[i]->width);
539                   }
540
541                 if (function->alpha_type == ALPHA)
542                   destvar = dict_clone_var (agr->dict, v->src, dest[i]);
543                 else if (v->src->type == NUMERIC
544                          || function->alpha_type == NUMERIC)
545                   {
546                     destvar = dict_create_var (agr->dict, dest[i], 0);
547                         
548                     if ((func_index == N
549                             || func_index == N_NO_VARS
550                             || func_index == NMISS)
551                         && dict_get_weight (default_dict) != NULL)
552                       {
553                         static const struct fmt_spec f8_2 = {FMT_F, 8, 2};
554                             
555                         destvar->print = destvar->write = f8_2; 
556                       }
557                     else
558                       destvar->print = destvar->write = function->format;
559                   }
560                 else 
561                   destvar = dict_create_var (agr->dict, dest[i],
562                                              v->src->width);
563               } else {
564                 v->src = NULL;
565                 destvar = dict_create_var (agr->dict, dest[i], 0);
566               }
567           
568             if (!destvar)
569               {
570                 msg (SE, _("Variable name %s is not unique within the "
571                            "aggregate file dictionary, which contains "
572                            "the aggregate variables and the break "
573                            "variables."),
574                      dest[i]);
575                 free (dest[i]);
576                 goto error;
577               }
578
579             free (dest[i]);
580             destvar->init = 0;
581             if (dest_label[i])
582               {
583                 destvar->label = dest_label[i];
584                 dest_label[i] = NULL;
585               }
586
587             v->dest = destvar;
588           }
589           
590           v->include_missing = include_missing;
591
592           if (v->src != NULL)
593             {
594               int j;
595
596               if (v->src->type == NUMERIC)
597                 for (j = 0; j < function->n_args; j++)
598                   v->arg[j].f = arg[j].f;
599               else
600                 for (j = 0; j < function->n_args; j++)
601                   v->arg[j].c = xstrdup (arg[j].c);
602             }
603         }
604       
605       if (src != NULL && src[0]->type == ALPHA)
606         for (i = 0; i < function->n_args; i++)
607           {
608             free (arg[i].c);
609             arg[i].c = NULL;
610           }
611
612       free (src);
613       free (dest);
614       free (dest_label);
615
616       if (!lex_match ('/'))
617         {
618           if (token == '.')
619             return 1;
620
621           lex_error ("expecting end of command");
622           return 0;
623         }
624       continue;
625       
626     error:
627       for (i = 0; i < n_dest; i++)
628         {
629           free (dest[i]);
630           free (dest_label[i]);
631         }
632       free (dest);
633       free (dest_label);
634       free (arg[0].c);
635       free (arg[1].c);
636       if (src && n_src && src[0]->type == ALPHA)
637         for (i = 0; i < function->n_args; i++)
638           {
639             free (arg[i].c);
640             arg[i].c = NULL;
641           }
642       free (src);
643         
644       return 0;
645     }
646 }
647
648 /* Destroys AGR. */
649 static void
650 agr_destroy (struct agr_proc *agr)
651 {
652   struct agr_var *iter, *next;
653
654   sfm_close_writer (agr->writer);
655   if (agr->sort != NULL)
656     sort_destroy_criteria (agr->sort);
657   free (agr->break_vars);
658   free (agr->prev_break);
659   for (iter = agr->agr_vars; iter; iter = next)
660     {
661       next = iter->next;
662
663       if (iter->function & FSTRING)
664         {
665           int n_args;
666           int i;
667
668           n_args = agr_func_tab[iter->function & FUNC].n_args;
669           for (i = 0; i < n_args; i++)
670             free (iter->arg[i].c);
671           free (iter->string);
672         }
673       else if (iter->function == SD)
674         moments1_destroy (iter->moments);
675       free (iter);
676     }
677   if (agr->dict != NULL)
678     dict_destroy (agr->dict);
679
680   case_destroy (&agr->agr_case);
681 }
682 \f
683 /* Execution. */
684
685 static void accumulate_aggregate_info (struct agr_proc *,
686                                        const struct ccase *);
687 static void dump_aggregate_info (struct agr_proc *, struct ccase *);
688
689 /* Processes a single case INPUT for aggregation.  If output is
690    warranted, writes it to OUTPUT and returns nonzero.
691    Otherwise, returns zero and OUTPUT is unmodified. */
692 static int
693 aggregate_single_case (struct agr_proc *agr,
694                        const struct ccase *input, struct ccase *output)
695 {
696   /* The first case always begins a new break group.  We also need to
697      preserve the values of the case for later comparison. */
698   if (agr->case_cnt++ == 0)
699     {
700       int n_elem = 0;
701       
702       {
703         int i;
704
705         for (i = 0; i < agr->break_var_cnt; i++)
706           n_elem += agr->break_vars[i]->nv;
707       }
708       
709       agr->prev_break = xmalloc (sizeof *agr->prev_break * n_elem);
710
711       /* Copy INPUT into prev_break. */
712       {
713         union value *iter = agr->prev_break;
714         int i;
715
716         for (i = 0; i < agr->break_var_cnt; i++)
717           {
718             struct variable *v = agr->break_vars[i];
719             
720             if (v->type == NUMERIC)
721               (iter++)->f = case_num (input, v->fv);
722             else
723               {
724                 memcpy (iter->s, case_str (input, v->fv), v->width);
725                 iter += v->nv;
726               }
727           }
728       }
729             
730       accumulate_aggregate_info (agr, input);
731         
732       return 0;
733     }
734       
735   /* Compare the value of each break variable to the values on the
736      previous case. */
737   {
738     union value *iter = agr->prev_break;
739     int i;
740     
741     for (i = 0; i < agr->break_var_cnt; i++)
742       {
743         struct variable *v = agr->break_vars[i];
744       
745         switch (v->type)
746           {
747           case NUMERIC:
748             if (case_num (input, v->fv) != iter->f)
749               goto not_equal;
750             iter++;
751             break;
752           case ALPHA:
753             if (memcmp (case_str (input, v->fv), iter->s, v->width))
754               goto not_equal;
755             iter += v->nv;
756             break;
757           default:
758             assert (0);
759           }
760       }
761   }
762
763   accumulate_aggregate_info (agr, input);
764
765   return 0;
766   
767 not_equal:
768   /* The values of the break variable are different from the values on
769      the previous case.  That means that it's time to dump aggregate
770      info. */
771   dump_aggregate_info (agr, output);
772   initialize_aggregate_info (agr);
773   accumulate_aggregate_info (agr, input);
774
775   /* Copy INPUT into prev_break. */
776   {
777     union value *iter = agr->prev_break;
778     int i;
779
780     for (i = 0; i < agr->break_var_cnt; i++)
781       {
782         struct variable *v = agr->break_vars[i];
783             
784         if (v->type == NUMERIC)
785           (iter++)->f = case_num (input, v->fv);
786         else
787           {
788             memcpy (iter->s, case_str (input, v->fv), v->width);
789             iter += v->nv;
790           }
791       }
792   }
793   
794   return 1;
795 }
796
797 /* Accumulates aggregation data from the case INPUT. */
798 static void 
799 accumulate_aggregate_info (struct agr_proc *agr,
800                            const struct ccase *input)
801 {
802   struct agr_var *iter;
803   double weight;
804   int bad_warn = 1;
805
806   weight = dict_get_case_weight (default_dict, input, &bad_warn);
807
808   for (iter = agr->agr_vars; iter; iter = iter->next)
809     if (iter->src)
810       {
811         const union value *v = case_data (input, iter->src->fv);
812
813         if ((!iter->include_missing && is_missing (v, iter->src))
814             || (iter->include_missing && iter->src->type == NUMERIC
815                 && v->f == SYSMIS))
816           {
817             switch (iter->function)
818               {
819               case NMISS:
820               case NMISS | FSTRING:
821                 iter->dbl[0] += weight;
822                 break;
823               case NUMISS:
824               case NUMISS | FSTRING:
825                 iter->int1++;
826                 break;
827               }
828             iter->missing = 1;
829             continue;
830           }
831         
832         /* This is horrible.  There are too many possibilities. */
833         switch (iter->function)
834           {
835           case SUM:
836             iter->dbl[0] += v->f * weight;
837             break;
838           case MEAN:
839             iter->dbl[0] += v->f * weight;
840             iter->dbl[1] += weight;
841             break;
842           case SD:
843             moments1_add (iter->moments, v->f, weight);
844             break;
845           case MAX:
846             iter->dbl[0] = max (iter->dbl[0], v->f);
847             iter->int1 = 1;
848             break;
849           case MAX | FSTRING:
850             if (memcmp (iter->string, v->s, iter->src->width) < 0)
851               memcpy (iter->string, v->s, iter->src->width);
852             iter->int1 = 1;
853             break;
854           case MIN:
855             iter->dbl[0] = min (iter->dbl[0], v->f);
856             iter->int1 = 1;
857             break;
858           case MIN | FSTRING:
859             if (memcmp (iter->string, v->s, iter->src->width) > 0)
860               memcpy (iter->string, v->s, iter->src->width);
861             iter->int1 = 1;
862             break;
863           case FGT:
864           case PGT:
865             if (v->f > iter->arg[0].f)
866               iter->dbl[0] += weight;
867             iter->dbl[1] += weight;
868             break;
869           case FGT | FSTRING:
870           case PGT | FSTRING:
871             if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
872               iter->dbl[0] += weight;
873             iter->dbl[1] += weight;
874             break;
875           case FLT:
876           case PLT:
877             if (v->f < iter->arg[0].f)
878               iter->dbl[0] += weight;
879             iter->dbl[1] += weight;
880             break;
881           case FLT | FSTRING:
882           case PLT | FSTRING:
883             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
884               iter->dbl[0] += weight;
885             iter->dbl[1] += weight;
886             break;
887           case FIN:
888           case PIN:
889             if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
890               iter->dbl[0] += weight;
891             iter->dbl[1] += weight;
892             break;
893           case FIN | FSTRING:
894           case PIN | FSTRING:
895             if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
896                 && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
897               iter->dbl[0] += weight;
898             iter->dbl[1] += weight;
899             break;
900           case FOUT:
901           case POUT:
902             if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
903               iter->dbl[0] += weight;
904             iter->dbl[1] += weight;
905             break;
906           case FOUT | FSTRING:
907           case POUT | FSTRING:
908             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
909                 || memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
910               iter->dbl[0] += weight;
911             iter->dbl[1] += weight;
912             break;
913           case N:
914           case N | FSTRING:
915             iter->dbl[0] += weight;
916             break;
917           case NU:
918           case NU | FSTRING:
919             iter->int1++;
920             break;
921           case FIRST:
922             if (iter->int1 == 0)
923               {
924                 iter->dbl[0] = v->f;
925                 iter->int1 = 1;
926               }
927             break;
928           case FIRST | FSTRING:
929             if (iter->int1 == 0)
930               {
931                 memcpy (iter->string, v->s, iter->src->width);
932                 iter->int1 = 1;
933               }
934             break;
935           case LAST:
936             iter->dbl[0] = v->f;
937             iter->int1 = 1;
938             break;
939           case LAST | FSTRING:
940             memcpy (iter->string, v->s, iter->src->width);
941             iter->int1 = 1;
942             break;
943           case NMISS:
944           case NMISS | FSTRING:
945           case NUMISS:
946           case NUMISS | FSTRING:
947             /* Our value is not missing or it would have been
948                caught earlier.  Nothing to do. */
949             break;
950           default:
951             assert (0);
952           }
953     } else {
954       switch (iter->function)
955         {
956         case N_NO_VARS:
957           iter->dbl[0] += weight;
958           break;
959         case NU_NO_VARS:
960           iter->int1++;
961           break;
962         default:
963           assert (0);
964         }
965     }
966 }
967
968 /* We've come to a record that differs from the previous in one or
969    more of the break variables.  Make an output record from the
970    accumulated statistics in the OUTPUT case. */
971 static void 
972 dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
973 {
974   {
975     int value_idx = 0;
976     int i;
977
978     for (i = 0; i < agr->break_var_cnt; i++) 
979       {
980         int nv = agr->break_vars[i]->nv;
981         memcpy (case_data_rw (output, value_idx),
982                 &agr->prev_break[value_idx],
983                 sizeof (union value) * nv);
984         value_idx += nv; 
985       }
986   }
987   
988   {
989     struct agr_var *i;
990   
991     for (i = agr->agr_vars; i; i = i->next)
992       {
993         union value *v = case_data_rw (output, i->dest->fv);
994
995         if (agr->missing == COLUMNWISE && i->missing != 0
996             && (i->function & FUNC) != N && (i->function & FUNC) != NU
997             && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
998           {
999             if (i->function & FSTRING)
1000               memset (v->s, ' ', i->dest->width);
1001             else
1002               v->f = SYSMIS;
1003             continue;
1004           }
1005         
1006         switch (i->function)
1007           {
1008           case SUM:
1009             v->f = i->dbl[0];
1010             break;
1011           case MEAN:
1012             v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
1013             break;
1014           case SD:
1015             {
1016               double variance;
1017
1018               /* FIXME: we should use two passes. */
1019               moments1_calculate (i->moments, NULL, NULL, &variance,
1020                                  NULL, NULL);
1021               if (variance != SYSMIS)
1022                 v->f = sqrt (variance);
1023               else
1024                 v->f = SYSMIS; 
1025             }
1026             break;
1027           case MAX:
1028           case MIN:
1029             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1030             break;
1031           case MAX | FSTRING:
1032           case MIN | FSTRING:
1033             if (i->int1)
1034               memcpy (v->s, i->string, i->dest->width);
1035             else
1036               memset (v->s, ' ', i->dest->width);
1037             break;
1038           case FGT:
1039           case FGT | FSTRING:
1040           case FLT:
1041           case FLT | FSTRING:
1042           case FIN:
1043           case FIN | FSTRING:
1044           case FOUT:
1045           case FOUT | FSTRING:
1046             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
1047             break;
1048           case PGT:
1049           case PGT | FSTRING:
1050           case PLT:
1051           case PLT | FSTRING:
1052           case PIN:
1053           case PIN | FSTRING:
1054           case POUT:
1055           case POUT | FSTRING:
1056             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
1057             break;
1058           case N:
1059           case N | FSTRING:
1060             v->f = i->dbl[0];
1061             break;
1062           case NU:
1063           case NU | FSTRING:
1064             v->f = i->int1;
1065             break;
1066           case FIRST:
1067           case LAST:
1068             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1069             break;
1070           case FIRST | FSTRING:
1071           case LAST | FSTRING:
1072             if (i->int1)
1073               memcpy (v->s, i->string, i->dest->width);
1074             else
1075               memset (v->s, ' ', i->dest->width);
1076             break;
1077           case N_NO_VARS:
1078             v->f = i->dbl[0];
1079             break;
1080           case NU_NO_VARS:
1081             v->f = i->int1;
1082             break;
1083           case NMISS:
1084           case NMISS | FSTRING:
1085             v->f = i->dbl[0];
1086             break;
1087           case NUMISS:
1088           case NUMISS | FSTRING:
1089             v->f = i->int1;
1090             break;
1091           default:
1092             assert (0);
1093           }
1094       }
1095   }
1096 }
1097
1098 /* Resets the state for all the aggregate functions. */
1099 static void
1100 initialize_aggregate_info (struct agr_proc *agr)
1101 {
1102   struct agr_var *iter;
1103
1104   for (iter = agr->agr_vars; iter; iter = iter->next)
1105     {
1106       iter->missing = 0;
1107       iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1108       iter->int1 = iter->int2 = 0;
1109       switch (iter->function)
1110         {
1111         case MIN:
1112           iter->dbl[0] = DBL_MAX;
1113           break;
1114         case MIN | FSTRING:
1115           memset (iter->string, 255, iter->src->width);
1116           break;
1117         case MAX:
1118           iter->dbl[0] = -DBL_MAX;
1119           break;
1120         case MAX | FSTRING:
1121           memset (iter->string, 0, iter->src->width);
1122           break;
1123         case SD:
1124           if (iter->moments == NULL)
1125             iter->moments = moments1_create (MOMENT_VARIANCE);
1126           else
1127             moments1_clear (iter->moments);
1128           break;
1129         default:
1130           break;
1131         }
1132     }
1133 }
1134 \f
1135 /* Aggregate each case as it comes through.  Cases which aren't needed
1136    are dropped. */
1137 static int
1138 agr_to_active_file (struct ccase *c, void *agr_)
1139 {
1140   struct agr_proc *agr = agr_;
1141
1142   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1143     agr->sink->class->write (agr->sink, &agr->agr_case);
1144
1145   return 1;
1146 }
1147
1148 /* Aggregate the current case and output it if we passed a
1149    breakpoint. */
1150 static int
1151 presorted_agr_to_sysfile (struct ccase *c, void *agr_) 
1152 {
1153   struct agr_proc *agr = agr_;
1154
1155   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1156     sfm_write_case (agr->writer, &agr->agr_case);
1157
1158   return 1;
1159 }