Get rid of `char *c' member in union value, for cleanliness.
[pspp] / src / language / stats / aggregate.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21
22 #include <stdlib.h>
23
24 #include <data/any-writer.h>
25 #include <data/case-sink.h>
26 #include <data/case.h>
27 #include <data/casefile.h>
28 #include <data/dictionary.h>
29 #include <data/file-handle-def.h>
30 #include <data/procedure.h>
31 #include <data/settings.h>
32 #include <data/storage-stream.h>
33 #include <data/sys-file-writer.h>
34 #include <data/variable.h>
35 #include <language/command.h>
36 #include <language/data-io/file-handle.h>
37 #include <language/lexer/lexer.h>
38 #include <language/stats/sort-criteria.h>
39 #include <libpspp/alloc.h>
40 #include <libpspp/message.h>
41 #include <libpspp/message.h>
42 #include <libpspp/misc.h>
43 #include <libpspp/pool.h>
44 #include <libpspp/str.h>
45 #include <math/moments.h>
46 #include <math/sort.h>
47
48 #include "gettext.h"
49 #define _(msgid) gettext (msgid)
50
51 /* Argument for AGGREGATE function. */
52 union agr_argument
53   {
54     double f;                           /* Numeric. */
55     char *c;                            /* Short or long string. */
56   };
57
58 /* Specifies how to make an aggregate variable. */
59 struct agr_var
60   {
61     struct agr_var *next;               /* Next in list. */
62
63     /* Collected during parsing. */
64     struct variable *src;       /* Source variable. */
65     struct variable *dest;      /* Target variable. */
66     int function;               /* Function. */
67     int include_missing;        /* 1=Include user-missing values. */
68     union agr_argument arg[2];  /* Arguments. */
69
70     /* Accumulated during AGGREGATE execution. */
71     double dbl[3];
72     int int1, int2;
73     char *string;
74     int missing;
75     struct moments1 *moments;
76   };
77
78 /* Aggregation functions. */
79 enum
80   {
81     NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
82     FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
83     N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
84     FUNC = 0x1f, /* Function mask. */
85     FSTRING = 1<<5, /* String function bit. */
86   };
87
88 /* Attributes of an aggregation function. */
89 struct agr_func
90   {
91     const char *name;           /* Aggregation function name. */
92     size_t n_args;              /* Number of arguments. */
93     int alpha_type;             /* When given ALPHA arguments, output type. */
94     struct fmt_spec format;     /* Format spec if alpha_type != ALPHA. */
95   };
96
97 /* Attributes of aggregation functions. */
98 static const struct agr_func agr_func_tab[] = 
99   {
100     {"<NONE>",  0, -1,      {0, 0, 0}},
101     {"SUM",     0, -1,      {FMT_F, 8, 2}},
102     {"MEAN",    0, -1,      {FMT_F, 8, 2}},
103     {"SD",      0, -1,      {FMT_F, 8, 2}},
104     {"MAX",     0, ALPHA,   {-1, -1, -1}}, 
105     {"MIN",     0, ALPHA,   {-1, -1, -1}}, 
106     {"PGT",     1, NUMERIC, {FMT_F, 5, 1}},      
107     {"PLT",     1, NUMERIC, {FMT_F, 5, 1}},       
108     {"PIN",     2, NUMERIC, {FMT_F, 5, 1}},       
109     {"POUT",    2, NUMERIC, {FMT_F, 5, 1}},       
110     {"FGT",     1, NUMERIC, {FMT_F, 5, 3}},       
111     {"FLT",     1, NUMERIC, {FMT_F, 5, 3}},       
112     {"FIN",     2, NUMERIC, {FMT_F, 5, 3}},       
113     {"FOUT",    2, NUMERIC, {FMT_F, 5, 3}},       
114     {"N",       0, NUMERIC, {FMT_F, 7, 0}},       
115     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},       
116     {"NMISS",   0, NUMERIC, {FMT_F, 7, 0}},       
117     {"NUMISS",  0, NUMERIC, {FMT_F, 7, 0}},       
118     {"FIRST",   0, ALPHA,   {-1, -1, -1}}, 
119     {"LAST",    0, ALPHA,   {-1, -1, -1}},
120     {NULL,      0, -1,      {-1, -1, -1}},
121     {"N",       0, NUMERIC, {FMT_F, 7, 0}},
122     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},
123   };
124
125 /* Missing value types. */
126 enum missing_treatment
127   {
128     ITEMWISE,           /* Missing values item by item. */
129     COLUMNWISE          /* Missing values column by column. */
130   };
131
132 /* An entire AGGREGATE procedure. */
133 struct agr_proc 
134   {
135     /* We have either an output file or a sink. */
136     struct any_writer *writer;          /* Output file, or null if none. */
137     struct case_sink *sink;             /* Sink, or null if none. */
138
139     /* Break variables. */
140     struct sort_criteria *sort;         /* Sort criteria. */
141     struct variable **break_vars;       /* Break variables. */
142     size_t break_var_cnt;               /* Number of break variables. */
143     struct ccase break_case;            /* Last values of break variables. */
144
145     enum missing_treatment missing;     /* How to treat missing values. */
146     struct agr_var *agr_vars;           /* First aggregate variable. */
147     struct dictionary *dict;            /* Aggregate dictionary. */
148     int case_cnt;                       /* Counts aggregated cases. */
149     struct ccase agr_case;              /* Aggregate case for output. */
150   };
151
152 static void initialize_aggregate_info (struct agr_proc *,
153                                        const struct ccase *);
154
155 /* Prototypes. */
156 static int parse_aggregate_functions (struct agr_proc *);
157 static void agr_destroy (struct agr_proc *);
158 static int aggregate_single_case (struct agr_proc *agr,
159                                   const struct ccase *input,
160                                   struct ccase *output);
161 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
162
163 /* Aggregating to the active file. */
164 static bool agr_to_active_file (const struct ccase *, void *aux);
165
166 /* Aggregating to a system file. */
167 static bool presorted_agr_to_sysfile (const struct ccase *, void *aux);
168 \f
169 /* Parsing. */
170
171 /* Parses and executes the AGGREGATE procedure. */
172 int
173 cmd_aggregate (void)
174 {
175   struct agr_proc agr;
176   struct file_handle *out_file = NULL;
177
178   bool copy_documents = false;
179   bool presorted = false;
180   bool saw_direction;
181
182   memset(&agr, 0 , sizeof (agr));
183   agr.missing = ITEMWISE;
184   case_nullify (&agr.break_case);
185   
186   agr.dict = dict_create ();
187   dict_set_label (agr.dict, dict_get_label (default_dict));
188   dict_set_documents (agr.dict, dict_get_documents (default_dict));
189
190   /* OUTFILE subcommand must be first. */
191   if (!lex_force_match_id ("OUTFILE"))
192     goto error;
193   lex_match ('=');
194   if (!lex_match ('*'))
195     {
196       out_file = fh_parse (FH_REF_FILE | FH_REF_SCRATCH);
197       if (out_file == NULL)
198         goto error;
199     }
200   
201   /* Read most of the subcommands. */
202   for (;;)
203     {
204       lex_match ('/');
205       
206       if (lex_match_id ("MISSING"))
207         {
208           lex_match ('=');
209           if (!lex_match_id ("COLUMNWISE"))
210             {
211               lex_error (_("while expecting COLUMNWISE"));
212               goto error;
213             }
214           agr.missing = COLUMNWISE;
215         }
216       else if (lex_match_id ("DOCUMENT"))
217         copy_documents = true;
218       else if (lex_match_id ("PRESORTED"))
219         presorted = true;
220       else if (lex_match_id ("BREAK"))
221         {
222           int i;
223
224           lex_match ('=');
225           agr.sort = sort_parse_criteria (default_dict,
226                                           &agr.break_vars, &agr.break_var_cnt,
227                                           &saw_direction, NULL);
228           if (agr.sort == NULL)
229             goto error;
230           
231           for (i = 0; i < agr.break_var_cnt; i++)
232             dict_clone_var_assert (agr.dict, agr.break_vars[i],
233                                    agr.break_vars[i]->name);
234
235           /* BREAK must follow the options. */
236           break;
237         }
238       else
239         {
240           lex_error (_("expecting BREAK"));
241           goto error;
242         }
243     }
244   if (presorted && saw_direction)
245     msg (SW, _("When PRESORTED is specified, specifying sorting directions "
246                "with (A) or (D) has no effect.  Output data will be sorted "
247                "the same way as the input data."));
248       
249   /* Read in the aggregate functions. */
250   lex_match ('/');
251   if (!parse_aggregate_functions (&agr))
252     goto error;
253
254   /* Delete documents. */
255   if (!copy_documents)
256     dict_set_documents (agr.dict, NULL);
257
258   /* Cancel SPLIT FILE. */
259   dict_set_split_vars (agr.dict, NULL, 0);
260   
261   /* Initialize. */
262   agr.case_cnt = 0;
263   case_create (&agr.agr_case, dict_get_next_value_idx (agr.dict));
264
265   /* Output to active file or external file? */
266   if (out_file == NULL) 
267     {
268       /* The active file will be replaced by the aggregated data,
269          so TEMPORARY is moot. */
270       proc_cancel_temporary_transformations ();
271
272       if (agr.sort != NULL && !presorted) 
273         {
274           if (!sort_active_file_in_place (agr.sort))
275             goto error;
276         }
277
278       agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
279       if (agr.sink->class->open != NULL)
280         agr.sink->class->open (agr.sink);
281       proc_set_sink (create_case_sink (&null_sink_class, default_dict, NULL));
282       if (!procedure (agr_to_active_file, &agr))
283         goto error;
284       if (agr.case_cnt > 0) 
285         {
286           dump_aggregate_info (&agr, &agr.agr_case);
287           if (!agr.sink->class->write (agr.sink, &agr.agr_case))
288             goto error;
289         }
290       discard_variables ();
291       default_dict = agr.dict;
292       agr.dict = NULL;
293       proc_set_source (agr.sink->class->make_source (agr.sink));
294       free_case_sink (agr.sink);
295     }
296   else
297     {
298       agr.writer = any_writer_open (out_file, agr.dict);
299       if (agr.writer == NULL)
300         goto error;
301       
302       if (agr.sort != NULL && !presorted) 
303         {
304           /* Sorting is needed. */
305           struct casefile *dst;
306           struct casereader *reader;
307           struct ccase c;
308           bool ok = true;
309           
310           dst = sort_active_file_to_casefile (agr.sort);
311           if (dst == NULL)
312             goto error;
313           reader = casefile_get_destructive_reader (dst);
314           while (ok && casereader_read_xfer (reader, &c)) 
315             {
316               if (aggregate_single_case (&agr, &c, &agr.agr_case)) 
317                 ok = any_writer_write (agr.writer, &agr.agr_case);
318               case_destroy (&c);
319             }
320           casereader_destroy (reader);
321           if (ok)
322             ok = !casefile_error (dst);
323           casefile_destroy (dst);
324           if (!ok)
325             goto error;
326         }
327       else 
328         {
329           /* Active file is already sorted. */
330           if (!procedure (presorted_agr_to_sysfile, &agr))
331             goto error;
332         }
333       
334       if (agr.case_cnt > 0) 
335         {
336           dump_aggregate_info (&agr, &agr.agr_case);
337           any_writer_write (agr.writer, &agr.agr_case);
338         }
339       if (any_writer_error (agr.writer))
340         goto error;
341     }
342   
343   agr_destroy (&agr);
344   return CMD_SUCCESS;
345
346 error:
347   agr_destroy (&agr);
348   return CMD_CASCADING_FAILURE;
349 }
350
351 /* Parse all the aggregate functions. */
352 static int
353 parse_aggregate_functions (struct agr_proc *agr)
354 {
355   struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
356
357   /* Parse everything. */
358   tail = NULL;
359   for (;;)
360     {
361       char **dest;
362       char **dest_label;
363       size_t n_dest;
364
365       int include_missing;
366       const struct agr_func *function;
367       int func_index;
368
369       union agr_argument arg[2];
370
371       struct variable **src;
372       size_t n_src;
373
374       size_t i;
375
376       dest = NULL;
377       dest_label = NULL;
378       n_dest = 0;
379       src = NULL;
380       function = NULL;
381       n_src = 0;
382       arg[0].c = NULL;
383       arg[1].c = NULL;
384
385       /* Parse the list of target variables. */
386       while (!lex_match ('='))
387         {
388           size_t n_dest_prev = n_dest;
389           
390           if (!parse_DATA_LIST_vars (&dest, &n_dest,
391                                      PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
392             goto error;
393
394           /* Assign empty labels. */
395           {
396             int j;
397
398             dest_label = xnrealloc (dest_label, n_dest, sizeof *dest_label);
399             for (j = n_dest_prev; j < n_dest; j++)
400               dest_label[j] = NULL;
401           }
402           
403           if (token == T_STRING)
404             {
405               ds_truncate (&tokstr, 255);
406               dest_label[n_dest - 1] = xstrdup (ds_c_str (&tokstr));
407               lex_get ();
408             }
409         }
410
411       /* Get the name of the aggregation function. */
412       if (token != T_ID)
413         {
414           lex_error (_("expecting aggregation function"));
415           goto error;
416         }
417
418       include_missing = 0;
419       if (tokid[strlen (tokid) - 1] == '.')
420         {
421           include_missing = 1;
422           tokid[strlen (tokid) - 1] = 0;
423         }
424       
425       for (function = agr_func_tab; function->name; function++)
426         if (!strcasecmp (function->name, tokid))
427           break;
428       if (NULL == function->name)
429         {
430           msg (SE, _("Unknown aggregation function %s."), tokid);
431           goto error;
432         }
433       func_index = function - agr_func_tab;
434       lex_get ();
435
436       /* Check for leading lparen. */
437       if (!lex_match ('('))
438         {
439           if (func_index == N)
440             func_index = N_NO_VARS;
441           else if (func_index == NU)
442             func_index = NU_NO_VARS;
443           else
444             {
445               lex_error (_("expecting `('"));
446               goto error;
447             }
448         }
449       else
450         {
451           /* Parse list of source variables. */
452           {
453             int pv_opts = PV_NO_SCRATCH;
454
455             if (func_index == SUM || func_index == MEAN || func_index == SD)
456               pv_opts |= PV_NUMERIC;
457             else if (function->n_args)
458               pv_opts |= PV_SAME_TYPE;
459
460             if (!parse_variables (default_dict, &src, &n_src, pv_opts))
461               goto error;
462           }
463
464           /* Parse function arguments, for those functions that
465              require arguments. */
466           if (function->n_args != 0)
467             for (i = 0; i < function->n_args; i++)
468               {
469                 int type;
470             
471                 lex_match (',');
472                 if (token == T_STRING)
473                   {
474                     arg[i].c = xstrdup (ds_c_str (&tokstr));
475                     type = ALPHA;
476                   }
477                 else if (lex_is_number ())
478                   {
479                     arg[i].f = tokval;
480                     type = NUMERIC;
481                   } else {
482                     msg (SE, _("Missing argument %d to %s."), i + 1,
483                          function->name);
484                     goto error;
485                   }
486             
487                 lex_get ();
488
489                 if (type != src[0]->type)
490                   {
491                     msg (SE, _("Arguments to %s must be of same type as "
492                                "source variables."),
493                          function->name);
494                     goto error;
495                   }
496               }
497
498           /* Trailing rparen. */
499           if (!lex_match(')'))
500             {
501               lex_error (_("expecting `)'"));
502               goto error;
503             }
504           
505           /* Now check that the number of source variables match
506              the number of target variables.  If we check earlier
507              than this, the user can get very misleading error
508              message, i.e. `AGGREGATE x=SUM(y t).' will get this
509              error message when a proper message would be more
510              like `unknown variable t'. */
511           if (n_src != n_dest)
512             {
513               msg (SE, _("Number of source variables (%u) does not match "
514                          "number of target variables (%u)."),
515                    (unsigned) n_src, (unsigned) n_dest);
516               goto error;
517             }
518
519           if ((func_index == PIN || func_index == POUT
520               || func_index == FIN || func_index == FOUT) 
521               && ((src[0]->type == NUMERIC && arg[0].f > arg[1].f)
522                   || (src[0]->type == ALPHA
523                       && str_compare_rpad (arg[0].c, arg[1].c) > 0)))
524             {
525               union agr_argument t = arg[0];
526               arg[0] = arg[1];
527               arg[1] = t;
528                   
529               msg (SW, _("The value arguments passed to the %s function "
530                          "are out-of-order.  They will be treated as if "
531                          "they had been specified in the correct order."),
532                    function->name);
533             }
534         }
535         
536       /* Finally add these to the linked list of aggregation
537          variables. */
538       for (i = 0; i < n_dest; i++)
539         {
540           struct agr_var *v = xmalloc (sizeof *v);
541
542           /* Add variable to chain. */
543           if (agr->agr_vars != NULL)
544             tail->next = v;
545           else
546             agr->agr_vars = v;
547           tail = v;
548           tail->next = NULL;
549           v->moments = NULL;
550           
551           /* Create the target variable in the aggregate
552              dictionary. */
553           {
554             struct variable *destvar;
555             
556             v->function = func_index;
557
558             if (src)
559               {
560                 v->src = src[i];
561                 
562                 if (src[i]->type == ALPHA)
563                   {
564                     v->function |= FSTRING;
565                     v->string = xmalloc (src[i]->width);
566                   }
567
568                 if (function->alpha_type == ALPHA)
569                   destvar = dict_clone_var (agr->dict, v->src, dest[i]);
570                 else
571                   {
572                     assert (v->src->type == NUMERIC
573                             || function->alpha_type == NUMERIC);
574                     destvar = dict_create_var (agr->dict, dest[i], 0);
575                     if (destvar != NULL) 
576                       {
577                         if ((func_index == N || func_index == NMISS)
578                             && dict_get_weight (default_dict) != NULL)
579                           destvar->print = destvar->write = f8_2; 
580                         else
581                           destvar->print = destvar->write = function->format;
582                       }
583                   }
584               } else {
585                 v->src = NULL;
586                 destvar = dict_create_var (agr->dict, dest[i], 0);
587                 if (func_index == N_NO_VARS
588                     && dict_get_weight (default_dict) != NULL)
589                   destvar->print = destvar->write = f8_2; 
590                 else
591                   destvar->print = destvar->write = function->format;
592               }
593           
594             if (!destvar)
595               {
596                 msg (SE, _("Variable name %s is not unique within the "
597                            "aggregate file dictionary, which contains "
598                            "the aggregate variables and the break "
599                            "variables."),
600                      dest[i]);
601                 goto error;
602               }
603
604             free (dest[i]);
605             if (dest_label[i])
606               {
607                 destvar->label = dest_label[i];
608                 dest_label[i] = NULL;
609               }
610
611             v->dest = destvar;
612           }
613           
614           v->include_missing = include_missing;
615
616           if (v->src != NULL)
617             {
618               int j;
619
620               if (v->src->type == NUMERIC)
621                 for (j = 0; j < function->n_args; j++)
622                   v->arg[j].f = arg[j].f;
623               else
624                 for (j = 0; j < function->n_args; j++)
625                   v->arg[j].c = xstrdup (arg[j].c);
626             }
627         }
628       
629       if (src != NULL && src[0]->type == ALPHA)
630         for (i = 0; i < function->n_args; i++)
631           {
632             free (arg[i].c);
633             arg[i].c = NULL;
634           }
635
636       free (src);
637       free (dest);
638       free (dest_label);
639
640       if (!lex_match ('/'))
641         {
642           if (token == '.')
643             return 1;
644
645           lex_error ("expecting end of command");
646           return 0;
647         }
648       continue;
649       
650     error:
651       for (i = 0; i < n_dest; i++)
652         {
653           free (dest[i]);
654           free (dest_label[i]);
655         }
656       free (dest);
657       free (dest_label);
658       free (arg[0].c);
659       free (arg[1].c);
660       if (src && n_src && src[0]->type == ALPHA)
661         for (i = 0; i < function->n_args; i++)
662           {
663             free (arg[i].c);
664             arg[i].c = NULL;
665           }
666       free (src);
667         
668       return 0;
669     }
670 }
671
672 /* Destroys AGR. */
673 static void
674 agr_destroy (struct agr_proc *agr)
675 {
676   struct agr_var *iter, *next;
677
678   any_writer_close (agr->writer);
679   if (agr->sort != NULL)
680     sort_destroy_criteria (agr->sort);
681   free (agr->break_vars);
682   case_destroy (&agr->break_case);
683   for (iter = agr->agr_vars; iter; iter = next)
684     {
685       next = iter->next;
686
687       if (iter->function & FSTRING)
688         {
689           size_t n_args;
690           size_t i;
691
692           n_args = agr_func_tab[iter->function & FUNC].n_args;
693           for (i = 0; i < n_args; i++)
694             free (iter->arg[i].c);
695           free (iter->string);
696         }
697       else if (iter->function == SD)
698         moments1_destroy (iter->moments);
699       free (iter);
700     }
701   if (agr->dict != NULL)
702     dict_destroy (agr->dict);
703
704   case_destroy (&agr->agr_case);
705 }
706 \f
707 /* Execution. */
708
709 static void accumulate_aggregate_info (struct agr_proc *,
710                                        const struct ccase *);
711 static void dump_aggregate_info (struct agr_proc *, struct ccase *);
712
713 /* Processes a single case INPUT for aggregation.  If output is
714    warranted, writes it to OUTPUT and returns nonzero.
715    Otherwise, returns zero and OUTPUT is unmodified. */
716 static int
717 aggregate_single_case (struct agr_proc *agr,
718                        const struct ccase *input, struct ccase *output)
719 {
720   bool finished_group = false;
721   
722   if (agr->case_cnt++ == 0)
723     initialize_aggregate_info (agr, input);
724   else if (case_compare (&agr->break_case, input,
725                          agr->break_vars, agr->break_var_cnt))
726     {
727       dump_aggregate_info (agr, output);
728       finished_group = true;
729
730       initialize_aggregate_info (agr, input);
731     }
732
733   accumulate_aggregate_info (agr, input);
734   return finished_group;
735 }
736
737 /* Accumulates aggregation data from the case INPUT. */
738 static void 
739 accumulate_aggregate_info (struct agr_proc *agr,
740                            const struct ccase *input)
741 {
742   struct agr_var *iter;
743   double weight;
744   int bad_warn = 1;
745
746   weight = dict_get_case_weight (default_dict, input, &bad_warn);
747
748   for (iter = agr->agr_vars; iter; iter = iter->next)
749     if (iter->src)
750       {
751         const union value *v = case_data (input, iter->src->fv);
752
753         if ((!iter->include_missing
754              && mv_is_value_missing (&iter->src->miss, v))
755             || (iter->include_missing && iter->src->type == NUMERIC
756                 && v->f == SYSMIS))
757           {
758             switch (iter->function)
759               {
760               case NMISS:
761               case NMISS | FSTRING:
762                 iter->dbl[0] += weight;
763                 break;
764               case NUMISS:
765               case NUMISS | FSTRING:
766                 iter->int1++;
767                 break;
768               }
769             iter->missing = 1;
770             continue;
771           }
772         
773         /* This is horrible.  There are too many possibilities. */
774         switch (iter->function)
775           {
776           case SUM:
777             iter->dbl[0] += v->f * weight;
778             iter->int1 = 1;
779             break;
780           case MEAN:
781             iter->dbl[0] += v->f * weight;
782             iter->dbl[1] += weight;
783             break;
784           case SD:
785             moments1_add (iter->moments, v->f, weight);
786             break;
787           case MAX:
788             iter->dbl[0] = max (iter->dbl[0], v->f);
789             iter->int1 = 1;
790             break;
791           case MAX | FSTRING:
792             if (memcmp (iter->string, v->s, iter->src->width) < 0)
793               memcpy (iter->string, v->s, iter->src->width);
794             iter->int1 = 1;
795             break;
796           case MIN:
797             iter->dbl[0] = min (iter->dbl[0], v->f);
798             iter->int1 = 1;
799             break;
800           case MIN | FSTRING:
801             if (memcmp (iter->string, v->s, iter->src->width) > 0)
802               memcpy (iter->string, v->s, iter->src->width);
803             iter->int1 = 1;
804             break;
805           case FGT:
806           case PGT:
807             if (v->f > iter->arg[0].f)
808               iter->dbl[0] += weight;
809             iter->dbl[1] += weight;
810             break;
811           case FGT | FSTRING:
812           case PGT | FSTRING:
813             if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
814               iter->dbl[0] += weight;
815             iter->dbl[1] += weight;
816             break;
817           case FLT:
818           case PLT:
819             if (v->f < iter->arg[0].f)
820               iter->dbl[0] += weight;
821             iter->dbl[1] += weight;
822             break;
823           case FLT | FSTRING:
824           case PLT | FSTRING:
825             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
826               iter->dbl[0] += weight;
827             iter->dbl[1] += weight;
828             break;
829           case FIN:
830           case PIN:
831             if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
832               iter->dbl[0] += weight;
833             iter->dbl[1] += weight;
834             break;
835           case FIN | FSTRING:
836           case PIN | FSTRING:
837             if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
838                 && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
839               iter->dbl[0] += weight;
840             iter->dbl[1] += weight;
841             break;
842           case FOUT:
843           case POUT:
844             if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
845               iter->dbl[0] += weight;
846             iter->dbl[1] += weight;
847             break;
848           case FOUT | FSTRING:
849           case POUT | FSTRING:
850             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
851                 || memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
852               iter->dbl[0] += weight;
853             iter->dbl[1] += weight;
854             break;
855           case N:
856           case N | FSTRING:
857             iter->dbl[0] += weight;
858             break;
859           case NU:
860           case NU | FSTRING:
861             iter->int1++;
862             break;
863           case FIRST:
864             if (iter->int1 == 0)
865               {
866                 iter->dbl[0] = v->f;
867                 iter->int1 = 1;
868               }
869             break;
870           case FIRST | FSTRING:
871             if (iter->int1 == 0)
872               {
873                 memcpy (iter->string, v->s, iter->src->width);
874                 iter->int1 = 1;
875               }
876             break;
877           case LAST:
878             iter->dbl[0] = v->f;
879             iter->int1 = 1;
880             break;
881           case LAST | FSTRING:
882             memcpy (iter->string, v->s, iter->src->width);
883             iter->int1 = 1;
884             break;
885           case NMISS:
886           case NMISS | FSTRING:
887           case NUMISS:
888           case NUMISS | FSTRING:
889             /* Our value is not missing or it would have been
890                caught earlier.  Nothing to do. */
891             break;
892           default:
893             assert (0);
894           }
895     } else {
896       switch (iter->function)
897         {
898         case N_NO_VARS:
899           iter->dbl[0] += weight;
900           break;
901         case NU_NO_VARS:
902           iter->int1++;
903           break;
904         default:
905           assert (0);
906         }
907     }
908 }
909
910 /* We've come to a record that differs from the previous in one or
911    more of the break variables.  Make an output record from the
912    accumulated statistics in the OUTPUT case. */
913 static void 
914 dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
915 {
916   {
917     int value_idx = 0;
918     int i;
919
920     for (i = 0; i < agr->break_var_cnt; i++) 
921       {
922         struct variable *v = agr->break_vars[i];
923         memcpy (case_data_rw (output, value_idx),
924                 case_data (&agr->break_case, v->fv),
925                 sizeof (union value) * v->nv);
926         value_idx += v->nv; 
927       }
928   }
929   
930   {
931     struct agr_var *i;
932   
933     for (i = agr->agr_vars; i; i = i->next)
934       {
935         union value *v = case_data_rw (output, i->dest->fv);
936
937         if (agr->missing == COLUMNWISE && i->missing != 0
938             && (i->function & FUNC) != N && (i->function & FUNC) != NU
939             && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
940           {
941             if (i->dest->type == ALPHA)
942               memset (v->s, ' ', i->dest->width);
943             else
944               v->f = SYSMIS;
945             continue;
946           }
947         
948         switch (i->function)
949           {
950           case SUM:
951             v->f = i->int1 ? i->dbl[0] : SYSMIS;
952             break;
953           case MEAN:
954             v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
955             break;
956           case SD:
957             {
958               double variance;
959
960               /* FIXME: we should use two passes. */
961               moments1_calculate (i->moments, NULL, NULL, &variance,
962                                  NULL, NULL);
963               if (variance != SYSMIS)
964                 v->f = sqrt (variance);
965               else
966                 v->f = SYSMIS; 
967             }
968             break;
969           case MAX:
970           case MIN:
971             v->f = i->int1 ? i->dbl[0] : SYSMIS;
972             break;
973           case MAX | FSTRING:
974           case MIN | FSTRING:
975             if (i->int1)
976               memcpy (v->s, i->string, i->dest->width);
977             else
978               memset (v->s, ' ', i->dest->width);
979             break;
980           case FGT:
981           case FGT | FSTRING:
982           case FLT:
983           case FLT | FSTRING:
984           case FIN:
985           case FIN | FSTRING:
986           case FOUT:
987           case FOUT | FSTRING:
988             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
989             break;
990           case PGT:
991           case PGT | FSTRING:
992           case PLT:
993           case PLT | FSTRING:
994           case PIN:
995           case PIN | FSTRING:
996           case POUT:
997           case POUT | FSTRING:
998             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
999             break;
1000           case N:
1001           case N | FSTRING:
1002             v->f = i->dbl[0];
1003             break;
1004           case NU:
1005           case NU | FSTRING:
1006             v->f = i->int1;
1007             break;
1008           case FIRST:
1009           case LAST:
1010             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1011             break;
1012           case FIRST | FSTRING:
1013           case LAST | FSTRING:
1014             if (i->int1)
1015               memcpy (v->s, i->string, i->dest->width);
1016             else
1017               memset (v->s, ' ', i->dest->width);
1018             break;
1019           case N_NO_VARS:
1020             v->f = i->dbl[0];
1021             break;
1022           case NU_NO_VARS:
1023             v->f = i->int1;
1024             break;
1025           case NMISS:
1026           case NMISS | FSTRING:
1027             v->f = i->dbl[0];
1028             break;
1029           case NUMISS:
1030           case NUMISS | FSTRING:
1031             v->f = i->int1;
1032             break;
1033           default:
1034             assert (0);
1035           }
1036       }
1037   }
1038 }
1039
1040 /* Resets the state for all the aggregate functions. */
1041 static void
1042 initialize_aggregate_info (struct agr_proc *agr, const struct ccase *input)
1043 {
1044   struct agr_var *iter;
1045
1046   case_destroy (&agr->break_case);
1047   case_clone (&agr->break_case, input);
1048
1049   for (iter = agr->agr_vars; iter; iter = iter->next)
1050     {
1051       iter->missing = 0;
1052       iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1053       iter->int1 = iter->int2 = 0;
1054       switch (iter->function)
1055         {
1056         case MIN:
1057           iter->dbl[0] = DBL_MAX;
1058           break;
1059         case MIN | FSTRING:
1060           memset (iter->string, 255, iter->src->width);
1061           break;
1062         case MAX:
1063           iter->dbl[0] = -DBL_MAX;
1064           break;
1065         case MAX | FSTRING:
1066           memset (iter->string, 0, iter->src->width);
1067           break;
1068         case SD:
1069           if (iter->moments == NULL)
1070             iter->moments = moments1_create (MOMENT_VARIANCE);
1071           else
1072             moments1_clear (iter->moments);
1073           break;
1074         default:
1075           break;
1076         }
1077     }
1078 }
1079 \f
1080 /* Aggregate each case as it comes through.  Cases which aren't needed
1081    are dropped.
1082    Returns true if successful, false if an I/O error occurred. */
1083 static bool
1084 agr_to_active_file (const struct ccase *c, void *agr_)
1085 {
1086   struct agr_proc *agr = agr_;
1087
1088   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1089     return agr->sink->class->write (agr->sink, &agr->agr_case);
1090
1091   return true;
1092 }
1093
1094 /* Aggregate the current case and output it if we passed a
1095    breakpoint. */
1096 static bool
1097 presorted_agr_to_sysfile (const struct ccase *c, void *agr_) 
1098 {
1099   struct agr_proc *agr = agr_;
1100
1101   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1102     return any_writer_write (agr->writer, &agr->agr_case);
1103   
1104   return true;
1105 }