Changed int to bool in dict_get_weight and sort_active_file_in_place
[pspp-builds.git] / src / language / stats / aggregate.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21
22 #include <stdlib.h>
23
24 #include <data/any-writer.h>
25 #include <data/case-sink.h>
26 #include <data/case.h>
27 #include <data/casefile.h>
28 #include <data/dictionary.h>
29 #include <data/file-handle-def.h>
30 #include <data/procedure.h>
31 #include <data/settings.h>
32 #include <data/storage-stream.h>
33 #include <data/sys-file-writer.h>
34 #include <data/variable.h>
35 #include <language/command.h>
36 #include <language/data-io/file-handle.h>
37 #include <language/lexer/lexer.h>
38 #include <language/lexer/variable-parser.h>
39 #include <language/stats/sort-criteria.h>
40 #include <libpspp/alloc.h>
41 #include <libpspp/assertion.h>
42 #include <libpspp/message.h>
43 #include <libpspp/misc.h>
44 #include <libpspp/pool.h>
45 #include <libpspp/str.h>
46 #include <math/moments.h>
47 #include <math/sort.h>
48
49 #include "gettext.h"
50 #define _(msgid) gettext (msgid)
51
52 /* Argument for AGGREGATE function. */
53 union agr_argument
54   {
55     double f;                           /* Numeric. */
56     char *c;                            /* Short or long string. */
57   };
58
59 /* Specifies how to make an aggregate variable. */
60 struct agr_var
61   {
62     struct agr_var *next;               /* Next in list. */
63
64     /* Collected during parsing. */
65     struct variable *src;       /* Source variable. */
66     struct variable *dest;      /* Target variable. */
67     int function;               /* Function. */
68     int include_missing;        /* 1=Include user-missing values. */
69     union agr_argument arg[2];  /* Arguments. */
70
71     /* Accumulated during AGGREGATE execution. */
72     double dbl[3];
73     int int1, int2;
74     char *string;
75     int missing;
76     struct moments1 *moments;
77   };
78
79 /* Aggregation functions. */
80 enum
81   {
82     NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
83     FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
84     N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
85     FUNC = 0x1f, /* Function mask. */
86     FSTRING = 1<<5, /* String function bit. */
87   };
88
89 /* Attributes of an aggregation function. */
90 struct agr_func
91   {
92     const char *name;           /* Aggregation function name. */
93     size_t n_args;              /* Number of arguments. */
94     int alpha_type;             /* When given ALPHA arguments, output type. */
95     struct fmt_spec format;     /* Format spec if alpha_type != ALPHA. */
96   };
97
98 /* Attributes of aggregation functions. */
99 static const struct agr_func agr_func_tab[] = 
100   {
101     {"<NONE>",  0, -1,      {0, 0, 0}},
102     {"SUM",     0, -1,      {FMT_F, 8, 2}},
103     {"MEAN",    0, -1,      {FMT_F, 8, 2}},
104     {"SD",      0, -1,      {FMT_F, 8, 2}},
105     {"MAX",     0, ALPHA,   {-1, -1, -1}}, 
106     {"MIN",     0, ALPHA,   {-1, -1, -1}}, 
107     {"PGT",     1, NUMERIC, {FMT_F, 5, 1}},      
108     {"PLT",     1, NUMERIC, {FMT_F, 5, 1}},       
109     {"PIN",     2, NUMERIC, {FMT_F, 5, 1}},       
110     {"POUT",    2, NUMERIC, {FMT_F, 5, 1}},       
111     {"FGT",     1, NUMERIC, {FMT_F, 5, 3}},       
112     {"FLT",     1, NUMERIC, {FMT_F, 5, 3}},       
113     {"FIN",     2, NUMERIC, {FMT_F, 5, 3}},       
114     {"FOUT",    2, NUMERIC, {FMT_F, 5, 3}},       
115     {"N",       0, NUMERIC, {FMT_F, 7, 0}},       
116     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},       
117     {"NMISS",   0, NUMERIC, {FMT_F, 7, 0}},       
118     {"NUMISS",  0, NUMERIC, {FMT_F, 7, 0}},       
119     {"FIRST",   0, ALPHA,   {-1, -1, -1}}, 
120     {"LAST",    0, ALPHA,   {-1, -1, -1}},
121     {NULL,      0, -1,      {-1, -1, -1}},
122     {"N",       0, NUMERIC, {FMT_F, 7, 0}},
123     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},
124   };
125
126 /* Missing value types. */
127 enum missing_treatment
128   {
129     ITEMWISE,           /* Missing values item by item. */
130     COLUMNWISE          /* Missing values column by column. */
131   };
132
133 /* An entire AGGREGATE procedure. */
134 struct agr_proc 
135   {
136     /* We have either an output file or a sink. */
137     struct any_writer *writer;          /* Output file, or null if none. */
138     struct case_sink *sink;             /* Sink, or null if none. */
139
140     /* Break variables. */
141     struct sort_criteria *sort;         /* Sort criteria. */
142     struct variable **break_vars;       /* Break variables. */
143     size_t break_var_cnt;               /* Number of break variables. */
144     struct ccase break_case;            /* Last values of break variables. */
145
146     enum missing_treatment missing;     /* How to treat missing values. */
147     struct agr_var *agr_vars;           /* First aggregate variable. */
148     struct dictionary *dict;            /* Aggregate dictionary. */
149     int case_cnt;                       /* Counts aggregated cases. */
150     struct ccase agr_case;              /* Aggregate case for output. */
151   };
152
153 static void initialize_aggregate_info (struct agr_proc *,
154                                        const struct ccase *);
155
156 /* Prototypes. */
157 static int parse_aggregate_functions (struct agr_proc *);
158 static void agr_destroy (struct agr_proc *);
159 static int aggregate_single_case (struct agr_proc *agr,
160                                   const struct ccase *input,
161                                   struct ccase *output);
162 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
163
164 /* Aggregating to the active file. */
165 static bool agr_to_active_file (const struct ccase *, void *aux);
166
167 /* Aggregating to a system file. */
168 static bool presorted_agr_to_sysfile (const struct ccase *, void *aux);
169 \f
170 /* Parsing. */
171
172 /* Parses and executes the AGGREGATE procedure. */
173 int
174 cmd_aggregate (void)
175 {
176   struct agr_proc agr;
177   struct file_handle *out_file = NULL;
178
179   bool copy_documents = false;
180   bool presorted = false;
181   bool saw_direction;
182
183   memset(&agr, 0 , sizeof (agr));
184   agr.missing = ITEMWISE;
185   case_nullify (&agr.break_case);
186   
187   agr.dict = dict_create ();
188   dict_set_label (agr.dict, dict_get_label (default_dict));
189   dict_set_documents (agr.dict, dict_get_documents (default_dict));
190
191   /* OUTFILE subcommand must be first. */
192   if (!lex_force_match_id ("OUTFILE"))
193     goto error;
194   lex_match ('=');
195   if (!lex_match ('*'))
196     {
197       out_file = fh_parse (FH_REF_FILE | FH_REF_SCRATCH);
198       if (out_file == NULL)
199         goto error;
200     }
201   
202   /* Read most of the subcommands. */
203   for (;;)
204     {
205       lex_match ('/');
206       
207       if (lex_match_id ("MISSING"))
208         {
209           lex_match ('=');
210           if (!lex_match_id ("COLUMNWISE"))
211             {
212               lex_error (_("while expecting COLUMNWISE"));
213               goto error;
214             }
215           agr.missing = COLUMNWISE;
216         }
217       else if (lex_match_id ("DOCUMENT"))
218         copy_documents = true;
219       else if (lex_match_id ("PRESORTED"))
220         presorted = true;
221       else if (lex_match_id ("BREAK"))
222         {
223           int i;
224
225           lex_match ('=');
226           agr.sort = sort_parse_criteria (default_dict,
227                                           &agr.break_vars, &agr.break_var_cnt,
228                                           &saw_direction, NULL);
229           if (agr.sort == NULL)
230             goto error;
231           
232           for (i = 0; i < agr.break_var_cnt; i++)
233             dict_clone_var_assert (agr.dict, agr.break_vars[i],
234                                    agr.break_vars[i]->name);
235
236           /* BREAK must follow the options. */
237           break;
238         }
239       else
240         {
241           lex_error (_("expecting BREAK"));
242           goto error;
243         }
244     }
245   if (presorted && saw_direction)
246     msg (SW, _("When PRESORTED is specified, specifying sorting directions "
247                "with (A) or (D) has no effect.  Output data will be sorted "
248                "the same way as the input data."));
249       
250   /* Read in the aggregate functions. */
251   lex_match ('/');
252   if (!parse_aggregate_functions (&agr))
253     goto error;
254
255   /* Delete documents. */
256   if (!copy_documents)
257     dict_set_documents (agr.dict, NULL);
258
259   /* Cancel SPLIT FILE. */
260   dict_set_split_vars (agr.dict, NULL, 0);
261   
262   /* Initialize. */
263   agr.case_cnt = 0;
264   case_create (&agr.agr_case, dict_get_next_value_idx (agr.dict));
265
266   /* Output to active file or external file? */
267   if (out_file == NULL) 
268     {
269       /* The active file will be replaced by the aggregated data,
270          so TEMPORARY is moot. */
271       proc_cancel_temporary_transformations ();
272
273       if (agr.sort != NULL && !presorted) 
274         {
275           if (!sort_active_file_in_place (agr.sort))
276             goto error;
277         }
278
279       agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
280       if (agr.sink->class->open != NULL)
281         agr.sink->class->open (agr.sink);
282       proc_set_sink (create_case_sink (&null_sink_class, default_dict, NULL));
283       if (!procedure (agr_to_active_file, &agr))
284         goto error;
285       if (agr.case_cnt > 0) 
286         {
287           dump_aggregate_info (&agr, &agr.agr_case);
288           if (!agr.sink->class->write (agr.sink, &agr.agr_case))
289             goto error;
290         }
291       discard_variables ();
292       dict_destroy (default_dict);
293       default_dict = agr.dict;
294       agr.dict = NULL;
295       proc_set_source (agr.sink->class->make_source (agr.sink));
296       free_case_sink (agr.sink);
297     }
298   else
299     {
300       agr.writer = any_writer_open (out_file, agr.dict);
301       if (agr.writer == NULL)
302         goto error;
303       
304       if (agr.sort != NULL && !presorted) 
305         {
306           /* Sorting is needed. */
307           struct casefile *dst;
308           struct casereader *reader;
309           struct ccase c;
310           bool ok = true;
311           
312           dst = sort_active_file_to_casefile (agr.sort);
313           if (dst == NULL)
314             goto error;
315           reader = casefile_get_destructive_reader (dst);
316           while (ok && casereader_read_xfer (reader, &c)) 
317             {
318               if (aggregate_single_case (&agr, &c, &agr.agr_case)) 
319                 ok = any_writer_write (agr.writer, &agr.agr_case);
320               case_destroy (&c);
321             }
322           casereader_destroy (reader);
323           if (ok)
324             ok = !casefile_error (dst);
325           casefile_destroy (dst);
326           if (!ok)
327             goto error;
328         }
329       else 
330         {
331           /* Active file is already sorted. */
332           if (!procedure (presorted_agr_to_sysfile, &agr))
333             goto error;
334         }
335       
336       if (agr.case_cnt > 0) 
337         {
338           dump_aggregate_info (&agr, &agr.agr_case);
339           any_writer_write (agr.writer, &agr.agr_case);
340         }
341       if (any_writer_error (agr.writer))
342         goto error;
343     }
344   
345   agr_destroy (&agr);
346   return CMD_SUCCESS;
347
348 error:
349   agr_destroy (&agr);
350   return CMD_CASCADING_FAILURE;
351 }
352
353 /* Parse all the aggregate functions. */
354 static int
355 parse_aggregate_functions (struct agr_proc *agr)
356 {
357   struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
358
359   /* Parse everything. */
360   tail = NULL;
361   for (;;)
362     {
363       char **dest;
364       char **dest_label;
365       size_t n_dest;
366
367       int include_missing;
368       const struct agr_func *function;
369       int func_index;
370
371       union agr_argument arg[2];
372
373       struct variable **src;
374       size_t n_src;
375
376       size_t i;
377
378       dest = NULL;
379       dest_label = NULL;
380       n_dest = 0;
381       src = NULL;
382       function = NULL;
383       n_src = 0;
384       arg[0].c = NULL;
385       arg[1].c = NULL;
386
387       /* Parse the list of target variables. */
388       while (!lex_match ('='))
389         {
390           size_t n_dest_prev = n_dest;
391           
392           if (!parse_DATA_LIST_vars (&dest, &n_dest,
393                                      PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
394             goto error;
395
396           /* Assign empty labels. */
397           {
398             int j;
399
400             dest_label = xnrealloc (dest_label, n_dest, sizeof *dest_label);
401             for (j = n_dest_prev; j < n_dest; j++)
402               dest_label[j] = NULL;
403           }
404           
405           if (token == T_STRING)
406             {
407               ds_truncate (&tokstr, 255);
408               dest_label[n_dest - 1] = ds_xstrdup (&tokstr);
409               lex_get ();
410             }
411         }
412
413       /* Get the name of the aggregation function. */
414       if (token != T_ID)
415         {
416           lex_error (_("expecting aggregation function"));
417           goto error;
418         }
419
420       include_missing = 0;
421       if (tokid[strlen (tokid) - 1] == '.')
422         {
423           include_missing = 1;
424           tokid[strlen (tokid) - 1] = 0;
425         }
426       
427       for (function = agr_func_tab; function->name; function++)
428         if (!strcasecmp (function->name, tokid))
429           break;
430       if (NULL == function->name)
431         {
432           msg (SE, _("Unknown aggregation function %s."), tokid);
433           goto error;
434         }
435       func_index = function - agr_func_tab;
436       lex_get ();
437
438       /* Check for leading lparen. */
439       if (!lex_match ('('))
440         {
441           if (func_index == N)
442             func_index = N_NO_VARS;
443           else if (func_index == NU)
444             func_index = NU_NO_VARS;
445           else
446             {
447               lex_error (_("expecting `('"));
448               goto error;
449             }
450         }
451       else
452         {
453           /* Parse list of source variables. */
454           {
455             int pv_opts = PV_NO_SCRATCH;
456
457             if (func_index == SUM || func_index == MEAN || func_index == SD)
458               pv_opts |= PV_NUMERIC;
459             else if (function->n_args)
460               pv_opts |= PV_SAME_TYPE;
461
462             if (!parse_variables (default_dict, &src, &n_src, pv_opts))
463               goto error;
464           }
465
466           /* Parse function arguments, for those functions that
467              require arguments. */
468           if (function->n_args != 0)
469             for (i = 0; i < function->n_args; i++)
470               {
471                 int type;
472             
473                 lex_match (',');
474                 if (token == T_STRING)
475                   {
476                     arg[i].c = ds_xstrdup (&tokstr);
477                     type = ALPHA;
478                   }
479                 else if (lex_is_number ())
480                   {
481                     arg[i].f = tokval;
482                     type = NUMERIC;
483                   } else {
484                     msg (SE, _("Missing argument %d to %s."), i + 1,
485                          function->name);
486                     goto error;
487                   }
488             
489                 lex_get ();
490
491                 if (type != src[0]->type)
492                   {
493                     msg (SE, _("Arguments to %s must be of same type as "
494                                "source variables."),
495                          function->name);
496                     goto error;
497                   }
498               }
499
500           /* Trailing rparen. */
501           if (!lex_match(')'))
502             {
503               lex_error (_("expecting `)'"));
504               goto error;
505             }
506           
507           /* Now check that the number of source variables match
508              the number of target variables.  If we check earlier
509              than this, the user can get very misleading error
510              message, i.e. `AGGREGATE x=SUM(y t).' will get this
511              error message when a proper message would be more
512              like `unknown variable t'. */
513           if (n_src != n_dest)
514             {
515               msg (SE, _("Number of source variables (%u) does not match "
516                          "number of target variables (%u)."),
517                    (unsigned) n_src, (unsigned) n_dest);
518               goto error;
519             }
520
521           if ((func_index == PIN || func_index == POUT
522               || func_index == FIN || func_index == FOUT) 
523               && ((src[0]->type == NUMERIC && arg[0].f > arg[1].f)
524                   || (src[0]->type == ALPHA
525                       && str_compare_rpad (arg[0].c, arg[1].c) > 0)))
526             {
527               union agr_argument t = arg[0];
528               arg[0] = arg[1];
529               arg[1] = t;
530                   
531               msg (SW, _("The value arguments passed to the %s function "
532                          "are out-of-order.  They will be treated as if "
533                          "they had been specified in the correct order."),
534                    function->name);
535             }
536         }
537         
538       /* Finally add these to the linked list of aggregation
539          variables. */
540       for (i = 0; i < n_dest; i++)
541         {
542           struct agr_var *v = xmalloc (sizeof *v);
543
544           /* Add variable to chain. */
545           if (agr->agr_vars != NULL)
546             tail->next = v;
547           else
548             agr->agr_vars = v;
549           tail = v;
550           tail->next = NULL;
551           v->moments = NULL;
552           
553           /* Create the target variable in the aggregate
554              dictionary. */
555           {
556             struct variable *destvar;
557             
558             v->function = func_index;
559
560             if (src)
561               {
562                 v->src = src[i];
563                 
564                 if (src[i]->type == ALPHA)
565                   {
566                     v->function |= FSTRING;
567                     v->string = xmalloc (src[i]->width);
568                   }
569
570                 if (function->alpha_type == ALPHA)
571                   destvar = dict_clone_var (agr->dict, v->src, dest[i]);
572                 else
573                   {
574                     assert (v->src->type == NUMERIC
575                             || function->alpha_type == NUMERIC);
576                     destvar = dict_create_var (agr->dict, dest[i], 0);
577                     if (destvar != NULL) 
578                       {
579                         if ((func_index == N || func_index == NMISS)
580                             && dict_get_weight (default_dict) != NULL)
581                           destvar->print = destvar->write = f8_2; 
582                         else
583                           destvar->print = destvar->write = function->format;
584                       }
585                   }
586               } else {
587                 v->src = NULL;
588                 destvar = dict_create_var (agr->dict, dest[i], 0);
589                 if (func_index == N_NO_VARS
590                     && dict_get_weight (default_dict) != NULL)
591                   destvar->print = destvar->write = f8_2; 
592                 else
593                   destvar->print = destvar->write = function->format;
594               }
595           
596             if (!destvar)
597               {
598                 msg (SE, _("Variable name %s is not unique within the "
599                            "aggregate file dictionary, which contains "
600                            "the aggregate variables and the break "
601                            "variables."),
602                      dest[i]);
603                 goto error;
604               }
605
606             free (dest[i]);
607             if (dest_label[i])
608               {
609                 destvar->label = dest_label[i];
610                 dest_label[i] = NULL;
611               }
612
613             v->dest = destvar;
614           }
615           
616           v->include_missing = include_missing;
617
618           if (v->src != NULL)
619             {
620               int j;
621
622               if (v->src->type == NUMERIC)
623                 for (j = 0; j < function->n_args; j++)
624                   v->arg[j].f = arg[j].f;
625               else
626                 for (j = 0; j < function->n_args; j++)
627                   v->arg[j].c = xstrdup (arg[j].c);
628             }
629         }
630       
631       if (src != NULL && src[0]->type == ALPHA)
632         for (i = 0; i < function->n_args; i++)
633           {
634             free (arg[i].c);
635             arg[i].c = NULL;
636           }
637
638       free (src);
639       free (dest);
640       free (dest_label);
641
642       if (!lex_match ('/'))
643         {
644           if (token == '.')
645             return 1;
646
647           lex_error ("expecting end of command");
648           return 0;
649         }
650       continue;
651       
652     error:
653       for (i = 0; i < n_dest; i++)
654         {
655           free (dest[i]);
656           free (dest_label[i]);
657         }
658       free (dest);
659       free (dest_label);
660       free (arg[0].c);
661       free (arg[1].c);
662       if (src && n_src && src[0]->type == ALPHA)
663         for (i = 0; i < function->n_args; i++)
664           {
665             free (arg[i].c);
666             arg[i].c = NULL;
667           }
668       free (src);
669         
670       return 0;
671     }
672 }
673
674 /* Destroys AGR. */
675 static void
676 agr_destroy (struct agr_proc *agr)
677 {
678   struct agr_var *iter, *next;
679
680   any_writer_close (agr->writer);
681   if (agr->sort != NULL)
682     sort_destroy_criteria (agr->sort);
683   free (agr->break_vars);
684   case_destroy (&agr->break_case);
685   for (iter = agr->agr_vars; iter; iter = next)
686     {
687       next = iter->next;
688
689       if (iter->function & FSTRING)
690         {
691           size_t n_args;
692           size_t i;
693
694           n_args = agr_func_tab[iter->function & FUNC].n_args;
695           for (i = 0; i < n_args; i++)
696             free (iter->arg[i].c);
697           free (iter->string);
698         }
699       else if (iter->function == SD)
700         moments1_destroy (iter->moments);
701       free (iter);
702     }
703   if (agr->dict != NULL)
704     dict_destroy (agr->dict);
705
706   case_destroy (&agr->agr_case);
707 }
708 \f
709 /* Execution. */
710
711 static void accumulate_aggregate_info (struct agr_proc *,
712                                        const struct ccase *);
713 static void dump_aggregate_info (struct agr_proc *, struct ccase *);
714
715 /* Processes a single case INPUT for aggregation.  If output is
716    warranted, writes it to OUTPUT and returns nonzero.
717    Otherwise, returns zero and OUTPUT is unmodified. */
718 static int
719 aggregate_single_case (struct agr_proc *agr,
720                        const struct ccase *input, struct ccase *output)
721 {
722   bool finished_group = false;
723   
724   if (agr->case_cnt++ == 0)
725     initialize_aggregate_info (agr, input);
726   else if (case_compare (&agr->break_case, input,
727                          agr->break_vars, agr->break_var_cnt))
728     {
729       dump_aggregate_info (agr, output);
730       finished_group = true;
731
732       initialize_aggregate_info (agr, input);
733     }
734
735   accumulate_aggregate_info (agr, input);
736   return finished_group;
737 }
738
739 /* Accumulates aggregation data from the case INPUT. */
740 static void 
741 accumulate_aggregate_info (struct agr_proc *agr,
742                            const struct ccase *input)
743 {
744   struct agr_var *iter;
745   double weight;
746   bool bad_warn = true;
747
748   weight = dict_get_case_weight (default_dict, input, &bad_warn);
749
750   for (iter = agr->agr_vars; iter; iter = iter->next)
751     if (iter->src)
752       {
753         const union value *v = case_data (input, iter->src->fv);
754
755         if ((!iter->include_missing
756              && mv_is_value_missing (&iter->src->miss, v))
757             || (iter->include_missing && iter->src->type == NUMERIC
758                 && v->f == SYSMIS))
759           {
760             switch (iter->function)
761               {
762               case NMISS:
763               case NMISS | FSTRING:
764                 iter->dbl[0] += weight;
765                 break;
766               case NUMISS:
767               case NUMISS | FSTRING:
768                 iter->int1++;
769                 break;
770               }
771             iter->missing = 1;
772             continue;
773           }
774         
775         /* This is horrible.  There are too many possibilities. */
776         switch (iter->function)
777           {
778           case SUM:
779             iter->dbl[0] += v->f * weight;
780             iter->int1 = 1;
781             break;
782           case MEAN:
783             iter->dbl[0] += v->f * weight;
784             iter->dbl[1] += weight;
785             break;
786           case SD:
787             moments1_add (iter->moments, v->f, weight);
788             break;
789           case MAX:
790             iter->dbl[0] = max (iter->dbl[0], v->f);
791             iter->int1 = 1;
792             break;
793           case MAX | FSTRING:
794             if (memcmp (iter->string, v->s, iter->src->width) < 0)
795               memcpy (iter->string, v->s, iter->src->width);
796             iter->int1 = 1;
797             break;
798           case MIN:
799             iter->dbl[0] = min (iter->dbl[0], v->f);
800             iter->int1 = 1;
801             break;
802           case MIN | FSTRING:
803             if (memcmp (iter->string, v->s, iter->src->width) > 0)
804               memcpy (iter->string, v->s, iter->src->width);
805             iter->int1 = 1;
806             break;
807           case FGT:
808           case PGT:
809             if (v->f > iter->arg[0].f)
810               iter->dbl[0] += weight;
811             iter->dbl[1] += weight;
812             break;
813           case FGT | FSTRING:
814           case PGT | FSTRING:
815             if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
816               iter->dbl[0] += weight;
817             iter->dbl[1] += weight;
818             break;
819           case FLT:
820           case PLT:
821             if (v->f < iter->arg[0].f)
822               iter->dbl[0] += weight;
823             iter->dbl[1] += weight;
824             break;
825           case FLT | FSTRING:
826           case PLT | FSTRING:
827             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
828               iter->dbl[0] += weight;
829             iter->dbl[1] += weight;
830             break;
831           case FIN:
832           case PIN:
833             if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
834               iter->dbl[0] += weight;
835             iter->dbl[1] += weight;
836             break;
837           case FIN | FSTRING:
838           case PIN | FSTRING:
839             if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
840                 && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
841               iter->dbl[0] += weight;
842             iter->dbl[1] += weight;
843             break;
844           case FOUT:
845           case POUT:
846             if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
847               iter->dbl[0] += weight;
848             iter->dbl[1] += weight;
849             break;
850           case FOUT | FSTRING:
851           case POUT | FSTRING:
852             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
853                 || memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
854               iter->dbl[0] += weight;
855             iter->dbl[1] += weight;
856             break;
857           case N:
858           case N | FSTRING:
859             iter->dbl[0] += weight;
860             break;
861           case NU:
862           case NU | FSTRING:
863             iter->int1++;
864             break;
865           case FIRST:
866             if (iter->int1 == 0)
867               {
868                 iter->dbl[0] = v->f;
869                 iter->int1 = 1;
870               }
871             break;
872           case FIRST | FSTRING:
873             if (iter->int1 == 0)
874               {
875                 memcpy (iter->string, v->s, iter->src->width);
876                 iter->int1 = 1;
877               }
878             break;
879           case LAST:
880             iter->dbl[0] = v->f;
881             iter->int1 = 1;
882             break;
883           case LAST | FSTRING:
884             memcpy (iter->string, v->s, iter->src->width);
885             iter->int1 = 1;
886             break;
887           case NMISS:
888           case NMISS | FSTRING:
889           case NUMISS:
890           case NUMISS | FSTRING:
891             /* Our value is not missing or it would have been
892                caught earlier.  Nothing to do. */
893             break;
894           default:
895             NOT_REACHED ();
896           }
897     } else {
898       switch (iter->function)
899         {
900         case N_NO_VARS:
901           iter->dbl[0] += weight;
902           break;
903         case NU_NO_VARS:
904           iter->int1++;
905           break;
906         default:
907           NOT_REACHED ();
908         }
909     }
910 }
911
912 /* We've come to a record that differs from the previous in one or
913    more of the break variables.  Make an output record from the
914    accumulated statistics in the OUTPUT case. */
915 static void 
916 dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
917 {
918   {
919     int value_idx = 0;
920     int i;
921
922     for (i = 0; i < agr->break_var_cnt; i++) 
923       {
924         struct variable *v = agr->break_vars[i];
925         memcpy (case_data_rw (output, value_idx),
926                 case_data (&agr->break_case, v->fv),
927                 sizeof (union value) * v->nv);
928         value_idx += v->nv; 
929       }
930   }
931   
932   {
933     struct agr_var *i;
934   
935     for (i = agr->agr_vars; i; i = i->next)
936       {
937         union value *v = case_data_rw (output, i->dest->fv);
938
939         if (agr->missing == COLUMNWISE && i->missing != 0
940             && (i->function & FUNC) != N && (i->function & FUNC) != NU
941             && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
942           {
943             if (i->dest->type == ALPHA)
944               memset (v->s, ' ', i->dest->width);
945             else
946               v->f = SYSMIS;
947             continue;
948           }
949         
950         switch (i->function)
951           {
952           case SUM:
953             v->f = i->int1 ? i->dbl[0] : SYSMIS;
954             break;
955           case MEAN:
956             v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
957             break;
958           case SD:
959             {
960               double variance;
961
962               /* FIXME: we should use two passes. */
963               moments1_calculate (i->moments, NULL, NULL, &variance,
964                                  NULL, NULL);
965               if (variance != SYSMIS)
966                 v->f = sqrt (variance);
967               else
968                 v->f = SYSMIS; 
969             }
970             break;
971           case MAX:
972           case MIN:
973             v->f = i->int1 ? i->dbl[0] : SYSMIS;
974             break;
975           case MAX | FSTRING:
976           case MIN | FSTRING:
977             if (i->int1)
978               memcpy (v->s, i->string, i->dest->width);
979             else
980               memset (v->s, ' ', i->dest->width);
981             break;
982           case FGT:
983           case FGT | FSTRING:
984           case FLT:
985           case FLT | FSTRING:
986           case FIN:
987           case FIN | FSTRING:
988           case FOUT:
989           case FOUT | FSTRING:
990             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
991             break;
992           case PGT:
993           case PGT | FSTRING:
994           case PLT:
995           case PLT | FSTRING:
996           case PIN:
997           case PIN | FSTRING:
998           case POUT:
999           case POUT | FSTRING:
1000             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
1001             break;
1002           case N:
1003           case N | FSTRING:
1004             v->f = i->dbl[0];
1005             break;
1006           case NU:
1007           case NU | FSTRING:
1008             v->f = i->int1;
1009             break;
1010           case FIRST:
1011           case LAST:
1012             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1013             break;
1014           case FIRST | FSTRING:
1015           case LAST | FSTRING:
1016             if (i->int1)
1017               memcpy (v->s, i->string, i->dest->width);
1018             else
1019               memset (v->s, ' ', i->dest->width);
1020             break;
1021           case N_NO_VARS:
1022             v->f = i->dbl[0];
1023             break;
1024           case NU_NO_VARS:
1025             v->f = i->int1;
1026             break;
1027           case NMISS:
1028           case NMISS | FSTRING:
1029             v->f = i->dbl[0];
1030             break;
1031           case NUMISS:
1032           case NUMISS | FSTRING:
1033             v->f = i->int1;
1034             break;
1035           default:
1036             NOT_REACHED ();
1037           }
1038       }
1039   }
1040 }
1041
1042 /* Resets the state for all the aggregate functions. */
1043 static void
1044 initialize_aggregate_info (struct agr_proc *agr, const struct ccase *input)
1045 {
1046   struct agr_var *iter;
1047
1048   case_destroy (&agr->break_case);
1049   case_clone (&agr->break_case, input);
1050
1051   for (iter = agr->agr_vars; iter; iter = iter->next)
1052     {
1053       iter->missing = 0;
1054       iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1055       iter->int1 = iter->int2 = 0;
1056       switch (iter->function)
1057         {
1058         case MIN:
1059           iter->dbl[0] = DBL_MAX;
1060           break;
1061         case MIN | FSTRING:
1062           memset (iter->string, 255, iter->src->width);
1063           break;
1064         case MAX:
1065           iter->dbl[0] = -DBL_MAX;
1066           break;
1067         case MAX | FSTRING:
1068           memset (iter->string, 0, iter->src->width);
1069           break;
1070         case SD:
1071           if (iter->moments == NULL)
1072             iter->moments = moments1_create (MOMENT_VARIANCE);
1073           else
1074             moments1_clear (iter->moments);
1075           break;
1076         default:
1077           break;
1078         }
1079     }
1080 }
1081 \f
1082 /* Aggregate each case as it comes through.  Cases which aren't needed
1083    are dropped.
1084    Returns true if successful, false if an I/O error occurred. */
1085 static bool
1086 agr_to_active_file (const struct ccase *c, void *agr_)
1087 {
1088   struct agr_proc *agr = agr_;
1089
1090   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1091     return agr->sink->class->write (agr->sink, &agr->agr_case);
1092
1093   return true;
1094 }
1095
1096 /* Aggregate the current case and output it if we passed a
1097    breakpoint. */
1098 static bool
1099 presorted_agr_to_sysfile (const struct ccase *c, void *agr_) 
1100 {
1101   struct agr_proc *agr = agr_;
1102
1103   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1104     return any_writer_write (agr->writer, &agr->agr_case);
1105   
1106   return true;
1107 }