Continue reforming procedure execution. In this phase, get rid of
[pspp-builds.git] / src / language / stats / aggregate.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21
22 #include <stdlib.h>
23
24 #include <data/any-writer.h>
25 #include <data/case-sink.h>
26 #include <data/case.h>
27 #include <data/casefile.h>
28 #include <data/dictionary.h>
29 #include <data/file-handle-def.h>
30 #include <procedure.h>
31 #include <data/settings.h>
32 #include <data/storage-stream.h>
33 #include <data/sys-file-writer.h>
34 #include <data/variable.h>
35 #include <language/command.h>
36 #include <language/data-io/file-handle.h>
37 #include <language/lexer/lexer.h>
38 #include <language/stats/sort-criteria.h>
39 #include <libpspp/alloc.h>
40 #include <libpspp/message.h>
41 #include <libpspp/message.h>
42 #include <libpspp/misc.h>
43 #include <libpspp/pool.h>
44 #include <libpspp/str.h>
45 #include <math/moments.h>
46 #include <math/sort.h>
47 #include <procedure.h>
48
49 #include "gettext.h"
50 #define _(msgid) gettext (msgid)
51
52 /* Specifies how to make an aggregate variable. */
53 struct agr_var
54   {
55     struct agr_var *next;               /* Next in list. */
56
57     /* Collected during parsing. */
58     struct variable *src;       /* Source variable. */
59     struct variable *dest;      /* Target variable. */
60     int function;               /* Function. */
61     int include_missing;        /* 1=Include user-missing values. */
62     union value arg[2];         /* Arguments. */
63
64     /* Accumulated during AGGREGATE execution. */
65     double dbl[3];
66     int int1, int2;
67     char *string;
68     int missing;
69     struct moments1 *moments;
70   };
71
72 /* Aggregation functions. */
73 enum
74   {
75     NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
76     FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
77     N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
78     FUNC = 0x1f, /* Function mask. */
79     FSTRING = 1<<5, /* String function bit. */
80   };
81
82 /* Attributes of an aggregation function. */
83 struct agr_func
84   {
85     const char *name;           /* Aggregation function name. */
86     size_t n_args;              /* Number of arguments. */
87     int alpha_type;             /* When given ALPHA arguments, output type. */
88     struct fmt_spec format;     /* Format spec if alpha_type != ALPHA. */
89   };
90
91 /* Attributes of aggregation functions. */
92 static const struct agr_func agr_func_tab[] = 
93   {
94     {"<NONE>",  0, -1,      {0, 0, 0}},
95     {"SUM",     0, -1,      {FMT_F, 8, 2}},
96     {"MEAN",    0, -1,      {FMT_F, 8, 2}},
97     {"SD",      0, -1,      {FMT_F, 8, 2}},
98     {"MAX",     0, ALPHA,   {-1, -1, -1}}, 
99     {"MIN",     0, ALPHA,   {-1, -1, -1}}, 
100     {"PGT",     1, NUMERIC, {FMT_F, 5, 1}},      
101     {"PLT",     1, NUMERIC, {FMT_F, 5, 1}},       
102     {"PIN",     2, NUMERIC, {FMT_F, 5, 1}},       
103     {"POUT",    2, NUMERIC, {FMT_F, 5, 1}},       
104     {"FGT",     1, NUMERIC, {FMT_F, 5, 3}},       
105     {"FLT",     1, NUMERIC, {FMT_F, 5, 3}},       
106     {"FIN",     2, NUMERIC, {FMT_F, 5, 3}},       
107     {"FOUT",    2, NUMERIC, {FMT_F, 5, 3}},       
108     {"N",       0, NUMERIC, {FMT_F, 7, 0}},       
109     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},       
110     {"NMISS",   0, NUMERIC, {FMT_F, 7, 0}},       
111     {"NUMISS",  0, NUMERIC, {FMT_F, 7, 0}},       
112     {"FIRST",   0, ALPHA,   {-1, -1, -1}}, 
113     {"LAST",    0, ALPHA,   {-1, -1, -1}},
114     {NULL,      0, -1,      {-1, -1, -1}},
115     {"N",       0, NUMERIC, {FMT_F, 7, 0}},
116     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},
117   };
118
119 /* Missing value types. */
120 enum missing_treatment
121   {
122     ITEMWISE,           /* Missing values item by item. */
123     COLUMNWISE          /* Missing values column by column. */
124   };
125
126 /* An entire AGGREGATE procedure. */
127 struct agr_proc 
128   {
129     /* We have either an output file or a sink. */
130     struct any_writer *writer;          /* Output file, or null if none. */
131     struct case_sink *sink;             /* Sink, or null if none. */
132
133     /* Break variables. */
134     struct sort_criteria *sort;         /* Sort criteria. */
135     struct variable **break_vars;       /* Break variables. */
136     size_t break_var_cnt;               /* Number of break variables. */
137     struct ccase break_case;            /* Last values of break variables. */
138
139     enum missing_treatment missing;     /* How to treat missing values. */
140     struct agr_var *agr_vars;           /* First aggregate variable. */
141     struct dictionary *dict;            /* Aggregate dictionary. */
142     int case_cnt;                       /* Counts aggregated cases. */
143     struct ccase agr_case;              /* Aggregate case for output. */
144   };
145
146 static void initialize_aggregate_info (struct agr_proc *,
147                                        const struct ccase *);
148
149 /* Prototypes. */
150 static int parse_aggregate_functions (struct agr_proc *);
151 static void agr_destroy (struct agr_proc *);
152 static int aggregate_single_case (struct agr_proc *agr,
153                                   const struct ccase *input,
154                                   struct ccase *output);
155 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
156
157 /* Aggregating to the active file. */
158 static bool agr_to_active_file (struct ccase *, void *aux);
159
160 /* Aggregating to a system file. */
161 static bool presorted_agr_to_sysfile (struct ccase *, void *aux);
162 \f
163 /* Parsing. */
164
165 /* Parses and executes the AGGREGATE procedure. */
166 int
167 cmd_aggregate (void)
168 {
169   struct agr_proc agr;
170   struct file_handle *out_file = NULL;
171
172   bool copy_documents = false;
173   bool presorted = false;
174   bool saw_direction;
175
176   memset(&agr, 0 , sizeof (agr));
177   agr.missing = ITEMWISE;
178   case_nullify (&agr.break_case);
179   
180   agr.dict = dict_create ();
181   dict_set_label (agr.dict, dict_get_label (default_dict));
182   dict_set_documents (agr.dict, dict_get_documents (default_dict));
183
184   /* OUTFILE subcommand must be first. */
185   if (!lex_force_match_id ("OUTFILE"))
186     goto error;
187   lex_match ('=');
188   if (!lex_match ('*'))
189     {
190       out_file = fh_parse (FH_REF_FILE | FH_REF_SCRATCH);
191       if (out_file == NULL)
192         goto error;
193     }
194   
195   /* Read most of the subcommands. */
196   for (;;)
197     {
198       lex_match ('/');
199       
200       if (lex_match_id ("MISSING"))
201         {
202           lex_match ('=');
203           if (!lex_match_id ("COLUMNWISE"))
204             {
205               lex_error (_("while expecting COLUMNWISE"));
206               goto error;
207             }
208           agr.missing = COLUMNWISE;
209         }
210       else if (lex_match_id ("DOCUMENT"))
211         copy_documents = true;
212       else if (lex_match_id ("PRESORTED"))
213         presorted = true;
214       else if (lex_match_id ("BREAK"))
215         {
216           int i;
217
218           lex_match ('=');
219           agr.sort = sort_parse_criteria (default_dict,
220                                           &agr.break_vars, &agr.break_var_cnt,
221                                           &saw_direction, NULL);
222           if (agr.sort == NULL)
223             goto error;
224           
225           for (i = 0; i < agr.break_var_cnt; i++)
226             dict_clone_var_assert (agr.dict, agr.break_vars[i],
227                                    agr.break_vars[i]->name);
228
229           /* BREAK must follow the options. */
230           break;
231         }
232       else
233         {
234           lex_error (_("expecting BREAK"));
235           goto error;
236         }
237     }
238   if (presorted && saw_direction)
239     msg (SW, _("When PRESORTED is specified, specifying sorting directions "
240                "with (A) or (D) has no effect.  Output data will be sorted "
241                "the same way as the input data."));
242       
243   /* Read in the aggregate functions. */
244   lex_match ('/');
245   if (!parse_aggregate_functions (&agr))
246     goto error;
247
248   /* Delete documents. */
249   if (!copy_documents)
250     dict_set_documents (agr.dict, NULL);
251
252   /* Cancel SPLIT FILE. */
253   dict_set_split_vars (agr.dict, NULL, 0);
254   
255   /* Initialize. */
256   agr.case_cnt = 0;
257   case_create (&agr.agr_case, dict_get_next_value_idx (agr.dict));
258
259   /* Output to active file or external file? */
260   if (out_file == NULL) 
261     {
262       /* The active file will be replaced by the aggregated data,
263          so TEMPORARY is moot. */
264       proc_cancel_temporary_transformations ();
265
266       if (agr.sort != NULL && !presorted) 
267         {
268           if (!sort_active_file_in_place (agr.sort))
269             goto error;
270         }
271
272       agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
273       if (agr.sink->class->open != NULL)
274         agr.sink->class->open (agr.sink);
275       proc_set_sink (create_case_sink (&null_sink_class, default_dict, NULL));
276       if (!procedure (agr_to_active_file, &agr))
277         goto error;
278       if (agr.case_cnt > 0) 
279         {
280           dump_aggregate_info (&agr, &agr.agr_case);
281           if (!agr.sink->class->write (agr.sink, &agr.agr_case))
282             goto error;
283         }
284       discard_variables ();
285       default_dict = agr.dict;
286       agr.dict = NULL;
287       proc_set_source (agr.sink->class->make_source (agr.sink));
288       free_case_sink (agr.sink);
289     }
290   else
291     {
292       agr.writer = any_writer_open (out_file, agr.dict);
293       if (agr.writer == NULL)
294         goto error;
295       
296       if (agr.sort != NULL && !presorted) 
297         {
298           /* Sorting is needed. */
299           struct casefile *dst;
300           struct casereader *reader;
301           struct ccase c;
302           bool ok = true;
303           
304           dst = sort_active_file_to_casefile (agr.sort);
305           if (dst == NULL)
306             goto error;
307           reader = casefile_get_destructive_reader (dst);
308           while (ok && casereader_read_xfer (reader, &c)) 
309             {
310               if (aggregate_single_case (&agr, &c, &agr.agr_case)) 
311                 ok = any_writer_write (agr.writer, &agr.agr_case);
312               case_destroy (&c);
313             }
314           casereader_destroy (reader);
315           if (ok)
316             ok = !casefile_error (dst);
317           casefile_destroy (dst);
318           if (!ok)
319             goto error;
320         }
321       else 
322         {
323           /* Active file is already sorted. */
324           if (!procedure (presorted_agr_to_sysfile, &agr))
325             goto error;
326         }
327       
328       if (agr.case_cnt > 0) 
329         {
330           dump_aggregate_info (&agr, &agr.agr_case);
331           any_writer_write (agr.writer, &agr.agr_case);
332         }
333       if (any_writer_error (agr.writer))
334         goto error;
335     }
336   
337   agr_destroy (&agr);
338   return CMD_SUCCESS;
339
340 error:
341   agr_destroy (&agr);
342   return CMD_CASCADING_FAILURE;
343 }
344
345 /* Parse all the aggregate functions. */
346 static int
347 parse_aggregate_functions (struct agr_proc *agr)
348 {
349   struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
350
351   /* Parse everything. */
352   tail = NULL;
353   for (;;)
354     {
355       char **dest;
356       char **dest_label;
357       size_t n_dest;
358
359       int include_missing;
360       const struct agr_func *function;
361       int func_index;
362
363       union value arg[2];
364
365       struct variable **src;
366       size_t n_src;
367
368       size_t i;
369
370       dest = NULL;
371       dest_label = NULL;
372       n_dest = 0;
373       src = NULL;
374       function = NULL;
375       n_src = 0;
376       arg[0].c = NULL;
377       arg[1].c = NULL;
378
379       /* Parse the list of target variables. */
380       while (!lex_match ('='))
381         {
382           size_t n_dest_prev = n_dest;
383           
384           if (!parse_DATA_LIST_vars (&dest, &n_dest,
385                                      PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
386             goto error;
387
388           /* Assign empty labels. */
389           {
390             int j;
391
392             dest_label = xnrealloc (dest_label, n_dest, sizeof *dest_label);
393             for (j = n_dest_prev; j < n_dest; j++)
394               dest_label[j] = NULL;
395           }
396           
397           if (token == T_STRING)
398             {
399               ds_truncate (&tokstr, 255);
400               dest_label[n_dest - 1] = xstrdup (ds_c_str (&tokstr));
401               lex_get ();
402             }
403         }
404
405       /* Get the name of the aggregation function. */
406       if (token != T_ID)
407         {
408           lex_error (_("expecting aggregation function"));
409           goto error;
410         }
411
412       include_missing = 0;
413       if (tokid[strlen (tokid) - 1] == '.')
414         {
415           include_missing = 1;
416           tokid[strlen (tokid) - 1] = 0;
417         }
418       
419       for (function = agr_func_tab; function->name; function++)
420         if (!strcasecmp (function->name, tokid))
421           break;
422       if (NULL == function->name)
423         {
424           msg (SE, _("Unknown aggregation function %s."), tokid);
425           goto error;
426         }
427       func_index = function - agr_func_tab;
428       lex_get ();
429
430       /* Check for leading lparen. */
431       if (!lex_match ('('))
432         {
433           if (func_index == N)
434             func_index = N_NO_VARS;
435           else if (func_index == NU)
436             func_index = NU_NO_VARS;
437           else
438             {
439               lex_error (_("expecting `('"));
440               goto error;
441             }
442         }
443       else
444         {
445           /* Parse list of source variables. */
446           {
447             int pv_opts = PV_NO_SCRATCH;
448
449             if (func_index == SUM || func_index == MEAN || func_index == SD)
450               pv_opts |= PV_NUMERIC;
451             else if (function->n_args)
452               pv_opts |= PV_SAME_TYPE;
453
454             if (!parse_variables (default_dict, &src, &n_src, pv_opts))
455               goto error;
456           }
457
458           /* Parse function arguments, for those functions that
459              require arguments. */
460           if (function->n_args != 0)
461             for (i = 0; i < function->n_args; i++)
462               {
463                 int type;
464             
465                 lex_match (',');
466                 if (token == T_STRING)
467                   {
468                     arg[i].c = xstrdup (ds_c_str (&tokstr));
469                     type = ALPHA;
470                   }
471                 else if (lex_is_number ())
472                   {
473                     arg[i].f = tokval;
474                     type = NUMERIC;
475                   } else {
476                     msg (SE, _("Missing argument %d to %s."), i + 1,
477                          function->name);
478                     goto error;
479                   }
480             
481                 lex_get ();
482
483                 if (type != src[0]->type)
484                   {
485                     msg (SE, _("Arguments to %s must be of same type as "
486                                "source variables."),
487                          function->name);
488                     goto error;
489                   }
490               }
491
492           /* Trailing rparen. */
493           if (!lex_match(')'))
494             {
495               lex_error (_("expecting `)'"));
496               goto error;
497             }
498           
499           /* Now check that the number of source variables match
500              the number of target variables.  If we check earlier
501              than this, the user can get very misleading error
502              message, i.e. `AGGREGATE x=SUM(y t).' will get this
503              error message when a proper message would be more
504              like `unknown variable t'. */
505           if (n_src != n_dest)
506             {
507               msg (SE, _("Number of source variables (%u) does not match "
508                          "number of target variables (%u)."),
509                    (unsigned) n_src, (unsigned) n_dest);
510               goto error;
511             }
512
513           if ((func_index == PIN || func_index == POUT
514               || func_index == FIN || func_index == FOUT) 
515               && ((src[0]->type == NUMERIC && arg[0].f > arg[1].f)
516                   || (src[0]->type == ALPHA
517                       && str_compare_rpad (arg[0].c, arg[1].c) > 0)))
518             {
519               union value t = arg[0];
520               arg[0] = arg[1];
521               arg[1] = t;
522                   
523               msg (SW, _("The value arguments passed to the %s function "
524                          "are out-of-order.  They will be treated as if "
525                          "they had been specified in the correct order."),
526                    function->name);
527             }
528         }
529         
530       /* Finally add these to the linked list of aggregation
531          variables. */
532       for (i = 0; i < n_dest; i++)
533         {
534           struct agr_var *v = xmalloc (sizeof *v);
535
536           /* Add variable to chain. */
537           if (agr->agr_vars != NULL)
538             tail->next = v;
539           else
540             agr->agr_vars = v;
541           tail = v;
542           tail->next = NULL;
543           v->moments = NULL;
544           
545           /* Create the target variable in the aggregate
546              dictionary. */
547           {
548             struct variable *destvar;
549             
550             v->function = func_index;
551
552             if (src)
553               {
554                 v->src = src[i];
555                 
556                 if (src[i]->type == ALPHA)
557                   {
558                     v->function |= FSTRING;
559                     v->string = xmalloc (src[i]->width);
560                   }
561
562                 if (function->alpha_type == ALPHA)
563                   destvar = dict_clone_var (agr->dict, v->src, dest[i]);
564                 else
565                   {
566                     assert (v->src->type == NUMERIC
567                             || function->alpha_type == NUMERIC);
568                     destvar = dict_create_var (agr->dict, dest[i], 0);
569                     if (destvar != NULL) 
570                       {
571                         if ((func_index == N || func_index == NMISS)
572                             && dict_get_weight (default_dict) != NULL)
573                           destvar->print = destvar->write = f8_2; 
574                         else
575                           destvar->print = destvar->write = function->format;
576                       }
577                   }
578               } else {
579                 v->src = NULL;
580                 destvar = dict_create_var (agr->dict, dest[i], 0);
581                 if (func_index == N_NO_VARS
582                     && dict_get_weight (default_dict) != NULL)
583                   destvar->print = destvar->write = f8_2; 
584                 else
585                   destvar->print = destvar->write = function->format;
586               }
587           
588             if (!destvar)
589               {
590                 msg (SE, _("Variable name %s is not unique within the "
591                            "aggregate file dictionary, which contains "
592                            "the aggregate variables and the break "
593                            "variables."),
594                      dest[i]);
595                 goto error;
596               }
597
598             free (dest[i]);
599             if (dest_label[i])
600               {
601                 destvar->label = dest_label[i];
602                 dest_label[i] = NULL;
603               }
604
605             v->dest = destvar;
606           }
607           
608           v->include_missing = include_missing;
609
610           if (v->src != NULL)
611             {
612               int j;
613
614               if (v->src->type == NUMERIC)
615                 for (j = 0; j < function->n_args; j++)
616                   v->arg[j].f = arg[j].f;
617               else
618                 for (j = 0; j < function->n_args; j++)
619                   v->arg[j].c = xstrdup (arg[j].c);
620             }
621         }
622       
623       if (src != NULL && src[0]->type == ALPHA)
624         for (i = 0; i < function->n_args; i++)
625           {
626             free (arg[i].c);
627             arg[i].c = NULL;
628           }
629
630       free (src);
631       free (dest);
632       free (dest_label);
633
634       if (!lex_match ('/'))
635         {
636           if (token == '.')
637             return 1;
638
639           lex_error ("expecting end of command");
640           return 0;
641         }
642       continue;
643       
644     error:
645       for (i = 0; i < n_dest; i++)
646         {
647           free (dest[i]);
648           free (dest_label[i]);
649         }
650       free (dest);
651       free (dest_label);
652       free (arg[0].c);
653       free (arg[1].c);
654       if (src && n_src && src[0]->type == ALPHA)
655         for (i = 0; i < function->n_args; i++)
656           {
657             free (arg[i].c);
658             arg[i].c = NULL;
659           }
660       free (src);
661         
662       return 0;
663     }
664 }
665
666 /* Destroys AGR. */
667 static void
668 agr_destroy (struct agr_proc *agr)
669 {
670   struct agr_var *iter, *next;
671
672   any_writer_close (agr->writer);
673   if (agr->sort != NULL)
674     sort_destroy_criteria (agr->sort);
675   free (agr->break_vars);
676   case_destroy (&agr->break_case);
677   for (iter = agr->agr_vars; iter; iter = next)
678     {
679       next = iter->next;
680
681       if (iter->function & FSTRING)
682         {
683           size_t n_args;
684           size_t i;
685
686           n_args = agr_func_tab[iter->function & FUNC].n_args;
687           for (i = 0; i < n_args; i++)
688             free (iter->arg[i].c);
689           free (iter->string);
690         }
691       else if (iter->function == SD)
692         moments1_destroy (iter->moments);
693       free (iter);
694     }
695   if (agr->dict != NULL)
696     dict_destroy (agr->dict);
697
698   case_destroy (&agr->agr_case);
699 }
700 \f
701 /* Execution. */
702
703 static void accumulate_aggregate_info (struct agr_proc *,
704                                        const struct ccase *);
705 static void dump_aggregate_info (struct agr_proc *, struct ccase *);
706
707 /* Processes a single case INPUT for aggregation.  If output is
708    warranted, writes it to OUTPUT and returns nonzero.
709    Otherwise, returns zero and OUTPUT is unmodified. */
710 static int
711 aggregate_single_case (struct agr_proc *agr,
712                        const struct ccase *input, struct ccase *output)
713 {
714   bool finished_group = false;
715   
716   if (agr->case_cnt++ == 0)
717     initialize_aggregate_info (agr, input);
718   else if (case_compare (&agr->break_case, input,
719                          agr->break_vars, agr->break_var_cnt))
720     {
721       dump_aggregate_info (agr, output);
722       finished_group = true;
723
724       initialize_aggregate_info (agr, input);
725     }
726
727   accumulate_aggregate_info (agr, input);
728   return finished_group;
729 }
730
731 /* Accumulates aggregation data from the case INPUT. */
732 static void 
733 accumulate_aggregate_info (struct agr_proc *agr,
734                            const struct ccase *input)
735 {
736   struct agr_var *iter;
737   double weight;
738   int bad_warn = 1;
739
740   weight = dict_get_case_weight (default_dict, input, &bad_warn);
741
742   for (iter = agr->agr_vars; iter; iter = iter->next)
743     if (iter->src)
744       {
745         const union value *v = case_data (input, iter->src->fv);
746
747         if ((!iter->include_missing
748              && mv_is_value_missing (&iter->src->miss, v))
749             || (iter->include_missing && iter->src->type == NUMERIC
750                 && v->f == SYSMIS))
751           {
752             switch (iter->function)
753               {
754               case NMISS:
755               case NMISS | FSTRING:
756                 iter->dbl[0] += weight;
757                 break;
758               case NUMISS:
759               case NUMISS | FSTRING:
760                 iter->int1++;
761                 break;
762               }
763             iter->missing = 1;
764             continue;
765           }
766         
767         /* This is horrible.  There are too many possibilities. */
768         switch (iter->function)
769           {
770           case SUM:
771             iter->dbl[0] += v->f * weight;
772             iter->int1 = 1;
773             break;
774           case MEAN:
775             iter->dbl[0] += v->f * weight;
776             iter->dbl[1] += weight;
777             break;
778           case SD:
779             moments1_add (iter->moments, v->f, weight);
780             break;
781           case MAX:
782             iter->dbl[0] = max (iter->dbl[0], v->f);
783             iter->int1 = 1;
784             break;
785           case MAX | FSTRING:
786             if (memcmp (iter->string, v->s, iter->src->width) < 0)
787               memcpy (iter->string, v->s, iter->src->width);
788             iter->int1 = 1;
789             break;
790           case MIN:
791             iter->dbl[0] = min (iter->dbl[0], v->f);
792             iter->int1 = 1;
793             break;
794           case MIN | FSTRING:
795             if (memcmp (iter->string, v->s, iter->src->width) > 0)
796               memcpy (iter->string, v->s, iter->src->width);
797             iter->int1 = 1;
798             break;
799           case FGT:
800           case PGT:
801             if (v->f > iter->arg[0].f)
802               iter->dbl[0] += weight;
803             iter->dbl[1] += weight;
804             break;
805           case FGT | FSTRING:
806           case PGT | FSTRING:
807             if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
808               iter->dbl[0] += weight;
809             iter->dbl[1] += weight;
810             break;
811           case FLT:
812           case PLT:
813             if (v->f < iter->arg[0].f)
814               iter->dbl[0] += weight;
815             iter->dbl[1] += weight;
816             break;
817           case FLT | FSTRING:
818           case PLT | FSTRING:
819             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
820               iter->dbl[0] += weight;
821             iter->dbl[1] += weight;
822             break;
823           case FIN:
824           case PIN:
825             if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
826               iter->dbl[0] += weight;
827             iter->dbl[1] += weight;
828             break;
829           case FIN | FSTRING:
830           case PIN | FSTRING:
831             if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
832                 && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
833               iter->dbl[0] += weight;
834             iter->dbl[1] += weight;
835             break;
836           case FOUT:
837           case POUT:
838             if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
839               iter->dbl[0] += weight;
840             iter->dbl[1] += weight;
841             break;
842           case FOUT | FSTRING:
843           case POUT | FSTRING:
844             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
845                 || memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
846               iter->dbl[0] += weight;
847             iter->dbl[1] += weight;
848             break;
849           case N:
850           case N | FSTRING:
851             iter->dbl[0] += weight;
852             break;
853           case NU:
854           case NU | FSTRING:
855             iter->int1++;
856             break;
857           case FIRST:
858             if (iter->int1 == 0)
859               {
860                 iter->dbl[0] = v->f;
861                 iter->int1 = 1;
862               }
863             break;
864           case FIRST | FSTRING:
865             if (iter->int1 == 0)
866               {
867                 memcpy (iter->string, v->s, iter->src->width);
868                 iter->int1 = 1;
869               }
870             break;
871           case LAST:
872             iter->dbl[0] = v->f;
873             iter->int1 = 1;
874             break;
875           case LAST | FSTRING:
876             memcpy (iter->string, v->s, iter->src->width);
877             iter->int1 = 1;
878             break;
879           case NMISS:
880           case NMISS | FSTRING:
881           case NUMISS:
882           case NUMISS | FSTRING:
883             /* Our value is not missing or it would have been
884                caught earlier.  Nothing to do. */
885             break;
886           default:
887             assert (0);
888           }
889     } else {
890       switch (iter->function)
891         {
892         case N_NO_VARS:
893           iter->dbl[0] += weight;
894           break;
895         case NU_NO_VARS:
896           iter->int1++;
897           break;
898         default:
899           assert (0);
900         }
901     }
902 }
903
904 /* We've come to a record that differs from the previous in one or
905    more of the break variables.  Make an output record from the
906    accumulated statistics in the OUTPUT case. */
907 static void 
908 dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
909 {
910   {
911     int value_idx = 0;
912     int i;
913
914     for (i = 0; i < agr->break_var_cnt; i++) 
915       {
916         struct variable *v = agr->break_vars[i];
917         memcpy (case_data_rw (output, value_idx),
918                 case_data (&agr->break_case, v->fv),
919                 sizeof (union value) * v->nv);
920         value_idx += v->nv; 
921       }
922   }
923   
924   {
925     struct agr_var *i;
926   
927     for (i = agr->agr_vars; i; i = i->next)
928       {
929         union value *v = case_data_rw (output, i->dest->fv);
930
931         if (agr->missing == COLUMNWISE && i->missing != 0
932             && (i->function & FUNC) != N && (i->function & FUNC) != NU
933             && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
934           {
935             if (i->dest->type == ALPHA)
936               memset (v->s, ' ', i->dest->width);
937             else
938               v->f = SYSMIS;
939             continue;
940           }
941         
942         switch (i->function)
943           {
944           case SUM:
945             v->f = i->int1 ? i->dbl[0] : SYSMIS;
946             break;
947           case MEAN:
948             v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
949             break;
950           case SD:
951             {
952               double variance;
953
954               /* FIXME: we should use two passes. */
955               moments1_calculate (i->moments, NULL, NULL, &variance,
956                                  NULL, NULL);
957               if (variance != SYSMIS)
958                 v->f = sqrt (variance);
959               else
960                 v->f = SYSMIS; 
961             }
962             break;
963           case MAX:
964           case MIN:
965             v->f = i->int1 ? i->dbl[0] : SYSMIS;
966             break;
967           case MAX | FSTRING:
968           case MIN | FSTRING:
969             if (i->int1)
970               memcpy (v->s, i->string, i->dest->width);
971             else
972               memset (v->s, ' ', i->dest->width);
973             break;
974           case FGT:
975           case FGT | FSTRING:
976           case FLT:
977           case FLT | FSTRING:
978           case FIN:
979           case FIN | FSTRING:
980           case FOUT:
981           case FOUT | FSTRING:
982             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
983             break;
984           case PGT:
985           case PGT | FSTRING:
986           case PLT:
987           case PLT | FSTRING:
988           case PIN:
989           case PIN | FSTRING:
990           case POUT:
991           case POUT | FSTRING:
992             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
993             break;
994           case N:
995           case N | FSTRING:
996             v->f = i->dbl[0];
997             break;
998           case NU:
999           case NU | FSTRING:
1000             v->f = i->int1;
1001             break;
1002           case FIRST:
1003           case LAST:
1004             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1005             break;
1006           case FIRST | FSTRING:
1007           case LAST | FSTRING:
1008             if (i->int1)
1009               memcpy (v->s, i->string, i->dest->width);
1010             else
1011               memset (v->s, ' ', i->dest->width);
1012             break;
1013           case N_NO_VARS:
1014             v->f = i->dbl[0];
1015             break;
1016           case NU_NO_VARS:
1017             v->f = i->int1;
1018             break;
1019           case NMISS:
1020           case NMISS | FSTRING:
1021             v->f = i->dbl[0];
1022             break;
1023           case NUMISS:
1024           case NUMISS | FSTRING:
1025             v->f = i->int1;
1026             break;
1027           default:
1028             assert (0);
1029           }
1030       }
1031   }
1032 }
1033
1034 /* Resets the state for all the aggregate functions. */
1035 static void
1036 initialize_aggregate_info (struct agr_proc *agr, const struct ccase *input)
1037 {
1038   struct agr_var *iter;
1039
1040   case_destroy (&agr->break_case);
1041   case_clone (&agr->break_case, input);
1042
1043   for (iter = agr->agr_vars; iter; iter = iter->next)
1044     {
1045       iter->missing = 0;
1046       iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1047       iter->int1 = iter->int2 = 0;
1048       switch (iter->function)
1049         {
1050         case MIN:
1051           iter->dbl[0] = DBL_MAX;
1052           break;
1053         case MIN | FSTRING:
1054           memset (iter->string, 255, iter->src->width);
1055           break;
1056         case MAX:
1057           iter->dbl[0] = -DBL_MAX;
1058           break;
1059         case MAX | FSTRING:
1060           memset (iter->string, 0, iter->src->width);
1061           break;
1062         case SD:
1063           if (iter->moments == NULL)
1064             iter->moments = moments1_create (MOMENT_VARIANCE);
1065           else
1066             moments1_clear (iter->moments);
1067           break;
1068         default:
1069           break;
1070         }
1071     }
1072 }
1073 \f
1074 /* Aggregate each case as it comes through.  Cases which aren't needed
1075    are dropped.
1076    Returns true if successful, false if an I/O error occurred. */
1077 static bool
1078 agr_to_active_file (struct ccase *c, void *agr_)
1079 {
1080   struct agr_proc *agr = agr_;
1081
1082   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1083     return agr->sink->class->write (agr->sink, &agr->agr_case);
1084
1085   return true;
1086 }
1087
1088 /* Aggregate the current case and output it if we passed a
1089    breakpoint. */
1090 static bool
1091 presorted_agr_to_sysfile (struct ccase *c, void *agr_) 
1092 {
1093   struct agr_proc *agr = agr_;
1094
1095   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1096     return any_writer_write (agr->writer, &agr->agr_case);
1097   
1098   return true;
1099 }