Encapsulated the static data of procedure.[ch] into a single object, to be
[pspp-builds.git] / src / language / stats / aggregate.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21
22 #include <stdlib.h>
23
24 #include <data/any-writer.h>
25 #include <data/case-sink.h>
26 #include <data/case.h>
27 #include <data/casefile.h>
28 #include <data/dictionary.h>
29 #include <data/file-handle-def.h>
30 #include <data/procedure.h>
31 #include <data/settings.h>
32 #include <data/storage-stream.h>
33 #include <data/sys-file-writer.h>
34 #include <data/variable.h>
35 #include <language/command.h>
36 #include <language/data-io/file-handle.h>
37 #include <language/lexer/lexer.h>
38 #include <language/lexer/variable-parser.h>
39 #include <language/stats/sort-criteria.h>
40 #include <libpspp/alloc.h>
41 #include <libpspp/assertion.h>
42 #include <libpspp/message.h>
43 #include <libpspp/misc.h>
44 #include <libpspp/pool.h>
45 #include <libpspp/str.h>
46 #include <math/moments.h>
47 #include <math/sort.h>
48
49 #include "gettext.h"
50 #define _(msgid) gettext (msgid)
51
52 /* Argument for AGGREGATE function. */
53 union agr_argument
54   {
55     double f;                           /* Numeric. */
56     char *c;                            /* Short or long string. */
57   };
58
59 /* Specifies how to make an aggregate variable. */
60 struct agr_var
61   {
62     struct agr_var *next;               /* Next in list. */
63
64     /* Collected during parsing. */
65     struct variable *src;       /* Source variable. */
66     struct variable *dest;      /* Target variable. */
67     int function;               /* Function. */
68     int include_missing;        /* 1=Include user-missing values. */
69     union agr_argument arg[2];  /* Arguments. */
70
71     /* Accumulated during AGGREGATE execution. */
72     double dbl[3];
73     int int1, int2;
74     char *string;
75     int missing;
76     struct moments1 *moments;
77   };
78
79 /* Aggregation functions. */
80 enum
81   {
82     NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
83     FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
84     N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
85     FUNC = 0x1f, /* Function mask. */
86     FSTRING = 1<<5, /* String function bit. */
87   };
88
89 /* Attributes of an aggregation function. */
90 struct agr_func
91   {
92     const char *name;           /* Aggregation function name. */
93     size_t n_args;              /* Number of arguments. */
94     int alpha_type;             /* When given ALPHA arguments, output type. */
95     struct fmt_spec format;     /* Format spec if alpha_type != ALPHA. */
96   };
97
98 /* Attributes of aggregation functions. */
99 static const struct agr_func agr_func_tab[] = 
100   {
101     {"<NONE>",  0, -1,      {0, 0, 0}},
102     {"SUM",     0, -1,      {FMT_F, 8, 2}},
103     {"MEAN",    0, -1,      {FMT_F, 8, 2}},
104     {"SD",      0, -1,      {FMT_F, 8, 2}},
105     {"MAX",     0, ALPHA,   {-1, -1, -1}}, 
106     {"MIN",     0, ALPHA,   {-1, -1, -1}}, 
107     {"PGT",     1, NUMERIC, {FMT_F, 5, 1}},      
108     {"PLT",     1, NUMERIC, {FMT_F, 5, 1}},       
109     {"PIN",     2, NUMERIC, {FMT_F, 5, 1}},       
110     {"POUT",    2, NUMERIC, {FMT_F, 5, 1}},       
111     {"FGT",     1, NUMERIC, {FMT_F, 5, 3}},       
112     {"FLT",     1, NUMERIC, {FMT_F, 5, 3}},       
113     {"FIN",     2, NUMERIC, {FMT_F, 5, 3}},       
114     {"FOUT",    2, NUMERIC, {FMT_F, 5, 3}},       
115     {"N",       0, NUMERIC, {FMT_F, 7, 0}},       
116     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},       
117     {"NMISS",   0, NUMERIC, {FMT_F, 7, 0}},       
118     {"NUMISS",  0, NUMERIC, {FMT_F, 7, 0}},       
119     {"FIRST",   0, ALPHA,   {-1, -1, -1}}, 
120     {"LAST",    0, ALPHA,   {-1, -1, -1}},
121     {NULL,      0, -1,      {-1, -1, -1}},
122     {"N",       0, NUMERIC, {FMT_F, 7, 0}},
123     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},
124   };
125
126 /* Missing value types. */
127 enum missing_treatment
128   {
129     ITEMWISE,           /* Missing values item by item. */
130     COLUMNWISE          /* Missing values column by column. */
131   };
132
133 /* An entire AGGREGATE procedure. */
134 struct agr_proc 
135   {
136     /* We have either an output file or a sink. */
137     struct any_writer *writer;          /* Output file, or null if none. */
138     struct case_sink *sink;             /* Sink, or null if none. */
139
140     /* Break variables. */
141     struct sort_criteria *sort;         /* Sort criteria. */
142     struct variable **break_vars;       /* Break variables. */
143     size_t break_var_cnt;               /* Number of break variables. */
144     struct ccase break_case;            /* Last values of break variables. */
145
146     enum missing_treatment missing;     /* How to treat missing values. */
147     struct agr_var *agr_vars;           /* First aggregate variable. */
148     struct dictionary *dict;            /* Aggregate dictionary. */
149     int case_cnt;                       /* Counts aggregated cases. */
150     struct ccase agr_case;              /* Aggregate case for output. */
151   };
152
153 static void initialize_aggregate_info (struct agr_proc *,
154                                        const struct ccase *);
155
156 /* Prototypes. */
157 static bool parse_aggregate_functions (struct agr_proc *);
158 static void agr_destroy (struct agr_proc *);
159 static bool aggregate_single_case (struct agr_proc *agr,
160                                   const struct ccase *input,
161                                   struct ccase *output);
162 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
163
164 /* Aggregating to the active file. */
165 static bool agr_to_active_file (const struct ccase *, void *aux);
166
167 /* Aggregating to a system file. */
168 static bool presorted_agr_to_sysfile (const struct ccase *, void *aux);
169 \f
170 /* Parsing. */
171
172 /* Parses and executes the AGGREGATE procedure. */
173 int
174 cmd_aggregate (void)
175 {
176   struct agr_proc agr;
177   struct file_handle *out_file = NULL;
178
179   bool copy_documents = false;
180   bool presorted = false;
181   bool saw_direction;
182
183   memset(&agr, 0 , sizeof (agr));
184   agr.missing = ITEMWISE;
185   case_nullify (&agr.break_case);
186   
187   agr.dict = dict_create ();
188   dict_set_label (agr.dict, dict_get_label (dataset_dict (current_dataset)));
189   dict_set_documents (agr.dict, dict_get_documents (dataset_dict (current_dataset)));
190
191   /* OUTFILE subcommand must be first. */
192   if (!lex_force_match_id ("OUTFILE"))
193     goto error;
194   lex_match ('=');
195   if (!lex_match ('*'))
196     {
197       out_file = fh_parse (FH_REF_FILE | FH_REF_SCRATCH);
198       if (out_file == NULL)
199         goto error;
200     }
201   
202   /* Read most of the subcommands. */
203   for (;;)
204     {
205       lex_match ('/');
206       
207       if (lex_match_id ("MISSING"))
208         {
209           lex_match ('=');
210           if (!lex_match_id ("COLUMNWISE"))
211             {
212               lex_error (_("while expecting COLUMNWISE"));
213               goto error;
214             }
215           agr.missing = COLUMNWISE;
216         }
217       else if (lex_match_id ("DOCUMENT"))
218         copy_documents = true;
219       else if (lex_match_id ("PRESORTED"))
220         presorted = true;
221       else if (lex_match_id ("BREAK"))
222         {
223           int i;
224
225           lex_match ('=');
226           agr.sort = sort_parse_criteria (dataset_dict (current_dataset),
227                                           &agr.break_vars, &agr.break_var_cnt,
228                                           &saw_direction, NULL);
229           if (agr.sort == NULL)
230             goto error;
231           
232           for (i = 0; i < agr.break_var_cnt; i++)
233             dict_clone_var_assert (agr.dict, agr.break_vars[i],
234                                    agr.break_vars[i]->name);
235
236           /* BREAK must follow the options. */
237           break;
238         }
239       else
240         {
241           lex_error (_("expecting BREAK"));
242           goto error;
243         }
244     }
245   if (presorted && saw_direction)
246     msg (SW, _("When PRESORTED is specified, specifying sorting directions "
247                "with (A) or (D) has no effect.  Output data will be sorted "
248                "the same way as the input data."));
249       
250   /* Read in the aggregate functions. */
251   lex_match ('/');
252   if (!parse_aggregate_functions (&agr))
253     goto error;
254
255   /* Delete documents. */
256   if (!copy_documents)
257     dict_set_documents (agr.dict, NULL);
258
259   /* Cancel SPLIT FILE. */
260   dict_set_split_vars (agr.dict, NULL, 0);
261   
262   /* Initialize. */
263   agr.case_cnt = 0;
264   case_create (&agr.agr_case, dict_get_next_value_idx (agr.dict));
265
266   /* Output to active file or external file? */
267   if (out_file == NULL) 
268     {
269       /* The active file will be replaced by the aggregated data,
270          so TEMPORARY is moot. */
271       proc_cancel_temporary_transformations (current_dataset);
272
273       if (agr.sort != NULL && !presorted) 
274         {
275           if (!sort_active_file_in_place (agr.sort))
276             goto error;
277         }
278
279       agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
280       if (agr.sink->class->open != NULL)
281         agr.sink->class->open (agr.sink);
282       proc_set_sink (current_dataset, 
283                      create_case_sink (&null_sink_class, 
284                                        dataset_dict (current_dataset), NULL));
285       if (!procedure (current_dataset,agr_to_active_file, &agr))
286         goto error;
287       if (agr.case_cnt > 0) 
288         {
289           dump_aggregate_info (&agr, &agr.agr_case);
290           if (!agr.sink->class->write (agr.sink, &agr.agr_case))
291             goto error;
292         }
293       discard_variables (current_dataset);
294       dict_destroy (dataset_dict (current_dataset));
295       dataset_set_dict (current_dataset, agr.dict);
296       agr.dict = NULL;
297       proc_set_source (current_dataset, 
298                        agr.sink->class->make_source (agr.sink));
299       free_case_sink (agr.sink);
300     }
301   else
302     {
303       agr.writer = any_writer_open (out_file, agr.dict);
304       if (agr.writer == NULL)
305         goto error;
306       
307       if (agr.sort != NULL && !presorted) 
308         {
309           /* Sorting is needed. */
310           struct casefile *dst;
311           struct casereader *reader;
312           struct ccase c;
313           bool ok = true;
314           
315           dst = sort_active_file_to_casefile (agr.sort);
316           if (dst == NULL)
317             goto error;
318           reader = casefile_get_destructive_reader (dst);
319           while (ok && casereader_read_xfer (reader, &c)) 
320             {
321               if (aggregate_single_case (&agr, &c, &agr.agr_case)) 
322                 ok = any_writer_write (agr.writer, &agr.agr_case);
323               case_destroy (&c);
324             }
325           casereader_destroy (reader);
326           if (ok)
327             ok = !casefile_error (dst);
328           casefile_destroy (dst);
329           if (!ok)
330             goto error;
331         }
332       else 
333         {
334           /* Active file is already sorted. */
335           if (!procedure (current_dataset,presorted_agr_to_sysfile, &agr))
336             goto error;
337         }
338       
339       if (agr.case_cnt > 0) 
340         {
341           dump_aggregate_info (&agr, &agr.agr_case);
342           any_writer_write (agr.writer, &agr.agr_case);
343         }
344       if (any_writer_error (agr.writer))
345         goto error;
346     }
347   
348   agr_destroy (&agr);
349   return CMD_SUCCESS;
350
351 error:
352   agr_destroy (&agr);
353   return CMD_CASCADING_FAILURE;
354 }
355
356 /* Parse all the aggregate functions. */
357 static bool
358 parse_aggregate_functions (struct agr_proc *agr)
359 {
360   struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
361
362   /* Parse everything. */
363   tail = NULL;
364   for (;;)
365     {
366       char **dest;
367       char **dest_label;
368       size_t n_dest;
369
370       int include_missing;
371       const struct agr_func *function;
372       int func_index;
373
374       union agr_argument arg[2];
375
376       struct variable **src;
377       size_t n_src;
378
379       size_t i;
380
381       dest = NULL;
382       dest_label = NULL;
383       n_dest = 0;
384       src = NULL;
385       function = NULL;
386       n_src = 0;
387       arg[0].c = NULL;
388       arg[1].c = NULL;
389
390       /* Parse the list of target variables. */
391       while (!lex_match ('='))
392         {
393           size_t n_dest_prev = n_dest;
394           
395           if (!parse_DATA_LIST_vars (&dest, &n_dest,
396                                      PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
397             goto error;
398
399           /* Assign empty labels. */
400           {
401             int j;
402
403             dest_label = xnrealloc (dest_label, n_dest, sizeof *dest_label);
404             for (j = n_dest_prev; j < n_dest; j++)
405               dest_label[j] = NULL;
406           }
407           
408           if (token == T_STRING)
409             {
410               ds_truncate (&tokstr, 255);
411               dest_label[n_dest - 1] = ds_xstrdup (&tokstr);
412               lex_get ();
413             }
414         }
415
416       /* Get the name of the aggregation function. */
417       if (token != T_ID)
418         {
419           lex_error (_("expecting aggregation function"));
420           goto error;
421         }
422
423       include_missing = 0;
424       if (tokid[strlen (tokid) - 1] == '.')
425         {
426           include_missing = 1;
427           tokid[strlen (tokid) - 1] = 0;
428         }
429       
430       for (function = agr_func_tab; function->name; function++)
431         if (!strcasecmp (function->name, tokid))
432           break;
433       if (NULL == function->name)
434         {
435           msg (SE, _("Unknown aggregation function %s."), tokid);
436           goto error;
437         }
438       func_index = function - agr_func_tab;
439       lex_get ();
440
441       /* Check for leading lparen. */
442       if (!lex_match ('('))
443         {
444           if (func_index == N)
445             func_index = N_NO_VARS;
446           else if (func_index == NU)
447             func_index = NU_NO_VARS;
448           else
449             {
450               lex_error (_("expecting `('"));
451               goto error;
452             }
453         }
454       else
455         {
456           /* Parse list of source variables. */
457           {
458             int pv_opts = PV_NO_SCRATCH;
459
460             if (func_index == SUM || func_index == MEAN || func_index == SD)
461               pv_opts |= PV_NUMERIC;
462             else if (function->n_args)
463               pv_opts |= PV_SAME_TYPE;
464
465             if (!parse_variables (dataset_dict (current_dataset), &src, &n_src, pv_opts))
466               goto error;
467           }
468
469           /* Parse function arguments, for those functions that
470              require arguments. */
471           if (function->n_args != 0)
472             for (i = 0; i < function->n_args; i++)
473               {
474                 int type;
475             
476                 lex_match (',');
477                 if (token == T_STRING)
478                   {
479                     arg[i].c = ds_xstrdup (&tokstr);
480                     type = ALPHA;
481                   }
482                 else if (lex_is_number ())
483                   {
484                     arg[i].f = tokval;
485                     type = NUMERIC;
486                   } else {
487                     msg (SE, _("Missing argument %d to %s."), i + 1,
488                          function->name);
489                     goto error;
490                   }
491             
492                 lex_get ();
493
494                 if (type != src[0]->type)
495                   {
496                     msg (SE, _("Arguments to %s must be of same type as "
497                                "source variables."),
498                          function->name);
499                     goto error;
500                   }
501               }
502
503           /* Trailing rparen. */
504           if (!lex_match(')'))
505             {
506               lex_error (_("expecting `)'"));
507               goto error;
508             }
509           
510           /* Now check that the number of source variables match
511              the number of target variables.  If we check earlier
512              than this, the user can get very misleading error
513              message, i.e. `AGGREGATE x=SUM(y t).' will get this
514              error message when a proper message would be more
515              like `unknown variable t'. */
516           if (n_src != n_dest)
517             {
518               msg (SE, _("Number of source variables (%u) does not match "
519                          "number of target variables (%u)."),
520                    (unsigned) n_src, (unsigned) n_dest);
521               goto error;
522             }
523
524           if ((func_index == PIN || func_index == POUT
525               || func_index == FIN || func_index == FOUT) 
526               && ((src[0]->type == NUMERIC && arg[0].f > arg[1].f)
527                   || (src[0]->type == ALPHA
528                       && str_compare_rpad (arg[0].c, arg[1].c) > 0)))
529             {
530               union agr_argument t = arg[0];
531               arg[0] = arg[1];
532               arg[1] = t;
533                   
534               msg (SW, _("The value arguments passed to the %s function "
535                          "are out-of-order.  They will be treated as if "
536                          "they had been specified in the correct order."),
537                    function->name);
538             }
539         }
540         
541       /* Finally add these to the linked list of aggregation
542          variables. */
543       for (i = 0; i < n_dest; i++)
544         {
545           struct agr_var *v = xmalloc (sizeof *v);
546
547           /* Add variable to chain. */
548           if (agr->agr_vars != NULL)
549             tail->next = v;
550           else
551             agr->agr_vars = v;
552           tail = v;
553           tail->next = NULL;
554           v->moments = NULL;
555           
556           /* Create the target variable in the aggregate
557              dictionary. */
558           {
559             struct variable *destvar;
560             
561             v->function = func_index;
562
563             if (src)
564               {
565                 v->src = src[i];
566                 
567                 if (src[i]->type == ALPHA)
568                   {
569                     v->function |= FSTRING;
570                     v->string = xmalloc (src[i]->width);
571                   }
572
573                 if (function->alpha_type == ALPHA)
574                   destvar = dict_clone_var (agr->dict, v->src, dest[i]);
575                 else
576                   {
577                     assert (v->src->type == NUMERIC
578                             || function->alpha_type == NUMERIC);
579                     destvar = dict_create_var (agr->dict, dest[i], 0);
580                     if (destvar != NULL) 
581                       {
582                         if ((func_index == N || func_index == NMISS)
583                             && dict_get_weight (dataset_dict (current_dataset)) != NULL)
584                           destvar->print = destvar->write = f8_2; 
585                         else
586                           destvar->print = destvar->write = function->format;
587                       }
588                   }
589               } else {
590                 v->src = NULL;
591                 destvar = dict_create_var (agr->dict, dest[i], 0);
592                 if (func_index == N_NO_VARS
593                     && dict_get_weight (dataset_dict (current_dataset)) != NULL)
594                   destvar->print = destvar->write = f8_2; 
595                 else
596                   destvar->print = destvar->write = function->format;
597               }
598           
599             if (!destvar)
600               {
601                 msg (SE, _("Variable name %s is not unique within the "
602                            "aggregate file dictionary, which contains "
603                            "the aggregate variables and the break "
604                            "variables."),
605                      dest[i]);
606                 goto error;
607               }
608
609             free (dest[i]);
610             if (dest_label[i])
611               {
612                 destvar->label = dest_label[i];
613                 dest_label[i] = NULL;
614               }
615
616             v->dest = destvar;
617           }
618           
619           v->include_missing = include_missing;
620
621           if (v->src != NULL)
622             {
623               int j;
624
625               if (v->src->type == NUMERIC)
626                 for (j = 0; j < function->n_args; j++)
627                   v->arg[j].f = arg[j].f;
628               else
629                 for (j = 0; j < function->n_args; j++)
630                   v->arg[j].c = xstrdup (arg[j].c);
631             }
632         }
633       
634       if (src != NULL && src[0]->type == ALPHA)
635         for (i = 0; i < function->n_args; i++)
636           {
637             free (arg[i].c);
638             arg[i].c = NULL;
639           }
640
641       free (src);
642       free (dest);
643       free (dest_label);
644
645       if (!lex_match ('/'))
646         {
647           if (token == '.')
648             return true;
649
650           lex_error ("expecting end of command");
651           return false;
652         }
653       continue;
654       
655     error:
656       for (i = 0; i < n_dest; i++)
657         {
658           free (dest[i]);
659           free (dest_label[i]);
660         }
661       free (dest);
662       free (dest_label);
663       free (arg[0].c);
664       free (arg[1].c);
665       if (src && n_src && src[0]->type == ALPHA)
666         for (i = 0; i < function->n_args; i++)
667           {
668             free (arg[i].c);
669             arg[i].c = NULL;
670           }
671       free (src);
672         
673       return false;
674     }
675 }
676
677 /* Destroys AGR. */
678 static void
679 agr_destroy (struct agr_proc *agr)
680 {
681   struct agr_var *iter, *next;
682
683   any_writer_close (agr->writer);
684   if (agr->sort != NULL)
685     sort_destroy_criteria (agr->sort);
686   free (agr->break_vars);
687   case_destroy (&agr->break_case);
688   for (iter = agr->agr_vars; iter; iter = next)
689     {
690       next = iter->next;
691
692       if (iter->function & FSTRING)
693         {
694           size_t n_args;
695           size_t i;
696
697           n_args = agr_func_tab[iter->function & FUNC].n_args;
698           for (i = 0; i < n_args; i++)
699             free (iter->arg[i].c);
700           free (iter->string);
701         }
702       else if (iter->function == SD)
703         moments1_destroy (iter->moments);
704       free (iter);
705     }
706   if (agr->dict != NULL)
707     dict_destroy (agr->dict);
708
709   case_destroy (&agr->agr_case);
710 }
711 \f
712 /* Execution. */
713
714 static void accumulate_aggregate_info (struct agr_proc *,
715                                        const struct ccase *);
716 static void dump_aggregate_info (struct agr_proc *, struct ccase *);
717
718 /* Processes a single case INPUT for aggregation.  If output is
719    warranted, writes it to OUTPUT and returns true.
720    Otherwise, returns false and OUTPUT is unmodified. */
721 static bool
722 aggregate_single_case (struct agr_proc *agr,
723                        const struct ccase *input, struct ccase *output)
724 {
725   bool finished_group = false;
726   
727   if (agr->case_cnt++ == 0)
728     initialize_aggregate_info (agr, input);
729   else if (case_compare (&agr->break_case, input,
730                          agr->break_vars, agr->break_var_cnt))
731     {
732       dump_aggregate_info (agr, output);
733       finished_group = true;
734
735       initialize_aggregate_info (agr, input);
736     }
737
738   accumulate_aggregate_info (agr, input);
739   return finished_group;
740 }
741
742 /* Accumulates aggregation data from the case INPUT. */
743 static void 
744 accumulate_aggregate_info (struct agr_proc *agr,
745                            const struct ccase *input)
746 {
747   struct agr_var *iter;
748   double weight;
749   bool bad_warn = true;
750
751   weight = dict_get_case_weight (dataset_dict (current_dataset), input, &bad_warn);
752
753   for (iter = agr->agr_vars; iter; iter = iter->next)
754     if (iter->src)
755       {
756         const union value *v = case_data (input, iter->src->fv);
757
758         if ((!iter->include_missing
759              && mv_is_value_missing (&iter->src->miss, v))
760             || (iter->include_missing && iter->src->type == NUMERIC
761                 && v->f == SYSMIS))
762           {
763             switch (iter->function)
764               {
765               case NMISS:
766               case NMISS | FSTRING:
767                 iter->dbl[0] += weight;
768                 break;
769               case NUMISS:
770               case NUMISS | FSTRING:
771                 iter->int1++;
772                 break;
773               }
774             iter->missing = 1;
775             continue;
776           }
777         
778         /* This is horrible.  There are too many possibilities. */
779         switch (iter->function)
780           {
781           case SUM:
782             iter->dbl[0] += v->f * weight;
783             iter->int1 = 1;
784             break;
785           case MEAN:
786             iter->dbl[0] += v->f * weight;
787             iter->dbl[1] += weight;
788             break;
789           case SD:
790             moments1_add (iter->moments, v->f, weight);
791             break;
792           case MAX:
793             iter->dbl[0] = max (iter->dbl[0], v->f);
794             iter->int1 = 1;
795             break;
796           case MAX | FSTRING:
797             if (memcmp (iter->string, v->s, iter->src->width) < 0)
798               memcpy (iter->string, v->s, iter->src->width);
799             iter->int1 = 1;
800             break;
801           case MIN:
802             iter->dbl[0] = min (iter->dbl[0], v->f);
803             iter->int1 = 1;
804             break;
805           case MIN | FSTRING:
806             if (memcmp (iter->string, v->s, iter->src->width) > 0)
807               memcpy (iter->string, v->s, iter->src->width);
808             iter->int1 = 1;
809             break;
810           case FGT:
811           case PGT:
812             if (v->f > iter->arg[0].f)
813               iter->dbl[0] += weight;
814             iter->dbl[1] += weight;
815             break;
816           case FGT | FSTRING:
817           case PGT | FSTRING:
818             if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
819               iter->dbl[0] += weight;
820             iter->dbl[1] += weight;
821             break;
822           case FLT:
823           case PLT:
824             if (v->f < iter->arg[0].f)
825               iter->dbl[0] += weight;
826             iter->dbl[1] += weight;
827             break;
828           case FLT | FSTRING:
829           case PLT | FSTRING:
830             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
831               iter->dbl[0] += weight;
832             iter->dbl[1] += weight;
833             break;
834           case FIN:
835           case PIN:
836             if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
837               iter->dbl[0] += weight;
838             iter->dbl[1] += weight;
839             break;
840           case FIN | FSTRING:
841           case PIN | FSTRING:
842             if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
843                 && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
844               iter->dbl[0] += weight;
845             iter->dbl[1] += weight;
846             break;
847           case FOUT:
848           case POUT:
849             if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
850               iter->dbl[0] += weight;
851             iter->dbl[1] += weight;
852             break;
853           case FOUT | FSTRING:
854           case POUT | FSTRING:
855             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
856                 || memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
857               iter->dbl[0] += weight;
858             iter->dbl[1] += weight;
859             break;
860           case N:
861           case N | FSTRING:
862             iter->dbl[0] += weight;
863             break;
864           case NU:
865           case NU | FSTRING:
866             iter->int1++;
867             break;
868           case FIRST:
869             if (iter->int1 == 0)
870               {
871                 iter->dbl[0] = v->f;
872                 iter->int1 = 1;
873               }
874             break;
875           case FIRST | FSTRING:
876             if (iter->int1 == 0)
877               {
878                 memcpy (iter->string, v->s, iter->src->width);
879                 iter->int1 = 1;
880               }
881             break;
882           case LAST:
883             iter->dbl[0] = v->f;
884             iter->int1 = 1;
885             break;
886           case LAST | FSTRING:
887             memcpy (iter->string, v->s, iter->src->width);
888             iter->int1 = 1;
889             break;
890           case NMISS:
891           case NMISS | FSTRING:
892           case NUMISS:
893           case NUMISS | FSTRING:
894             /* Our value is not missing or it would have been
895                caught earlier.  Nothing to do. */
896             break;
897           default:
898             NOT_REACHED ();
899           }
900     } else {
901       switch (iter->function)
902         {
903         case N_NO_VARS:
904           iter->dbl[0] += weight;
905           break;
906         case NU_NO_VARS:
907           iter->int1++;
908           break;
909         default:
910           NOT_REACHED ();
911         }
912     }
913 }
914
915 /* We've come to a record that differs from the previous in one or
916    more of the break variables.  Make an output record from the
917    accumulated statistics in the OUTPUT case. */
918 static void 
919 dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
920 {
921   {
922     int value_idx = 0;
923     int i;
924
925     for (i = 0; i < agr->break_var_cnt; i++) 
926       {
927         struct variable *v = agr->break_vars[i];
928         memcpy (case_data_rw (output, value_idx),
929                 case_data (&agr->break_case, v->fv),
930                 sizeof (union value) * v->nv);
931         value_idx += v->nv; 
932       }
933   }
934   
935   {
936     struct agr_var *i;
937   
938     for (i = agr->agr_vars; i; i = i->next)
939       {
940         union value *v = case_data_rw (output, i->dest->fv);
941
942         if (agr->missing == COLUMNWISE && i->missing != 0
943             && (i->function & FUNC) != N && (i->function & FUNC) != NU
944             && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
945           {
946             if (i->dest->type == ALPHA)
947               memset (v->s, ' ', i->dest->width);
948             else
949               v->f = SYSMIS;
950             continue;
951           }
952         
953         switch (i->function)
954           {
955           case SUM:
956             v->f = i->int1 ? i->dbl[0] : SYSMIS;
957             break;
958           case MEAN:
959             v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
960             break;
961           case SD:
962             {
963               double variance;
964
965               /* FIXME: we should use two passes. */
966               moments1_calculate (i->moments, NULL, NULL, &variance,
967                                  NULL, NULL);
968               if (variance != SYSMIS)
969                 v->f = sqrt (variance);
970               else
971                 v->f = SYSMIS; 
972             }
973             break;
974           case MAX:
975           case MIN:
976             v->f = i->int1 ? i->dbl[0] : SYSMIS;
977             break;
978           case MAX | FSTRING:
979           case MIN | FSTRING:
980             if (i->int1)
981               memcpy (v->s, i->string, i->dest->width);
982             else
983               memset (v->s, ' ', i->dest->width);
984             break;
985           case FGT:
986           case FGT | FSTRING:
987           case FLT:
988           case FLT | FSTRING:
989           case FIN:
990           case FIN | FSTRING:
991           case FOUT:
992           case FOUT | FSTRING:
993             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
994             break;
995           case PGT:
996           case PGT | FSTRING:
997           case PLT:
998           case PLT | FSTRING:
999           case PIN:
1000           case PIN | FSTRING:
1001           case POUT:
1002           case POUT | FSTRING:
1003             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
1004             break;
1005           case N:
1006           case N | FSTRING:
1007             v->f = i->dbl[0];
1008             break;
1009           case NU:
1010           case NU | FSTRING:
1011             v->f = i->int1;
1012             break;
1013           case FIRST:
1014           case LAST:
1015             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1016             break;
1017           case FIRST | FSTRING:
1018           case LAST | FSTRING:
1019             if (i->int1)
1020               memcpy (v->s, i->string, i->dest->width);
1021             else
1022               memset (v->s, ' ', i->dest->width);
1023             break;
1024           case N_NO_VARS:
1025             v->f = i->dbl[0];
1026             break;
1027           case NU_NO_VARS:
1028             v->f = i->int1;
1029             break;
1030           case NMISS:
1031           case NMISS | FSTRING:
1032             v->f = i->dbl[0];
1033             break;
1034           case NUMISS:
1035           case NUMISS | FSTRING:
1036             v->f = i->int1;
1037             break;
1038           default:
1039             NOT_REACHED ();
1040           }
1041       }
1042   }
1043 }
1044
1045 /* Resets the state for all the aggregate functions. */
1046 static void
1047 initialize_aggregate_info (struct agr_proc *agr, const struct ccase *input)
1048 {
1049   struct agr_var *iter;
1050
1051   case_destroy (&agr->break_case);
1052   case_clone (&agr->break_case, input);
1053
1054   for (iter = agr->agr_vars; iter; iter = iter->next)
1055     {
1056       iter->missing = 0;
1057       iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1058       iter->int1 = iter->int2 = 0;
1059       switch (iter->function)
1060         {
1061         case MIN:
1062           iter->dbl[0] = DBL_MAX;
1063           break;
1064         case MIN | FSTRING:
1065           memset (iter->string, 255, iter->src->width);
1066           break;
1067         case MAX:
1068           iter->dbl[0] = -DBL_MAX;
1069           break;
1070         case MAX | FSTRING:
1071           memset (iter->string, 0, iter->src->width);
1072           break;
1073         case SD:
1074           if (iter->moments == NULL)
1075             iter->moments = moments1_create (MOMENT_VARIANCE);
1076           else
1077             moments1_clear (iter->moments);
1078           break;
1079         default:
1080           break;
1081         }
1082     }
1083 }
1084 \f
1085 /* Aggregate each case as it comes through.  Cases which aren't needed
1086    are dropped.
1087    Returns true if successful, false if an I/O error occurred. */
1088 static bool
1089 agr_to_active_file (const struct ccase *c, void *agr_)
1090 {
1091   struct agr_proc *agr = agr_;
1092
1093   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1094     return agr->sink->class->write (agr->sink, &agr->agr_case);
1095
1096   return true;
1097 }
1098
1099 /* Aggregate the current case and output it if we passed a
1100    breakpoint. */
1101 static bool
1102 presorted_agr_to_sysfile (const struct ccase *c, void *agr_) 
1103 {
1104   struct agr_proc *agr = agr_;
1105
1106   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1107     return any_writer_write (agr->writer, &agr->agr_case);
1108   
1109   return true;
1110 }