Fix some warnings.
[pspp] / src / language / stats / aggregate.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21 #include <libpspp/message.h>
22 #include <stdlib.h>
23 #include <libpspp/alloc.h>
24 #include <data/any-writer.h>
25 #include <data/case.h>
26 #include <data/casefile.h>
27 #include <language/command.h>
28 #include <data/dictionary.h>
29 #include <libpspp/message.h>
30 #include <data/file-handle-def.h>
31 #include <language/data-io/file-handle.h>
32 #include <language/lexer/lexer.h>
33 #include <language/stats/sort-criteria.h>
34 #include <libpspp/misc.h>
35 #include <math/moments.h>
36 #include <libpspp/pool.h>
37 #include <data/settings.h>
38 #include <data/sys-file-writer.h>
39 #include <math/sort.h>
40 #include <libpspp/str.h>
41 #include <data/variable.h>
42 #include <procedure.h>
43
44 #include "gettext.h"
45 #define _(msgid) gettext (msgid)
46
47 /* Specifies how to make an aggregate variable. */
48 struct agr_var
49   {
50     struct agr_var *next;               /* Next in list. */
51
52     /* Collected during parsing. */
53     struct variable *src;       /* Source variable. */
54     struct variable *dest;      /* Target variable. */
55     int function;               /* Function. */
56     int include_missing;        /* 1=Include user-missing values. */
57     union value arg[2];         /* Arguments. */
58
59     /* Accumulated during AGGREGATE execution. */
60     double dbl[3];
61     int int1, int2;
62     char *string;
63     int missing;
64     struct moments1 *moments;
65   };
66
67 /* Aggregation functions. */
68 enum
69   {
70     NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
71     FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
72     N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
73     FUNC = 0x1f, /* Function mask. */
74     FSTRING = 1<<5, /* String function bit. */
75   };
76
77 /* Attributes of an aggregation function. */
78 struct agr_func
79   {
80     const char *name;           /* Aggregation function name. */
81     size_t n_args;              /* Number of arguments. */
82     int alpha_type;             /* When given ALPHA arguments, output type. */
83     struct fmt_spec format;     /* Format spec if alpha_type != ALPHA. */
84   };
85
86 /* Attributes of aggregation functions. */
87 static const struct agr_func agr_func_tab[] = 
88   {
89     {"<NONE>",  0, -1,      {0, 0, 0}},
90     {"SUM",     0, -1,      {FMT_F, 8, 2}},
91     {"MEAN",    0, -1,      {FMT_F, 8, 2}},
92     {"SD",      0, -1,      {FMT_F, 8, 2}},
93     {"MAX",     0, ALPHA,   {-1, -1, -1}}, 
94     {"MIN",     0, ALPHA,   {-1, -1, -1}}, 
95     {"PGT",     1, NUMERIC, {FMT_F, 5, 1}},      
96     {"PLT",     1, NUMERIC, {FMT_F, 5, 1}},       
97     {"PIN",     2, NUMERIC, {FMT_F, 5, 1}},       
98     {"POUT",    2, NUMERIC, {FMT_F, 5, 1}},       
99     {"FGT",     1, NUMERIC, {FMT_F, 5, 3}},       
100     {"FLT",     1, NUMERIC, {FMT_F, 5, 3}},       
101     {"FIN",     2, NUMERIC, {FMT_F, 5, 3}},       
102     {"FOUT",    2, NUMERIC, {FMT_F, 5, 3}},       
103     {"N",       0, NUMERIC, {FMT_F, 7, 0}},       
104     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},       
105     {"NMISS",   0, NUMERIC, {FMT_F, 7, 0}},       
106     {"NUMISS",  0, NUMERIC, {FMT_F, 7, 0}},       
107     {"FIRST",   0, ALPHA,   {-1, -1, -1}}, 
108     {"LAST",    0, ALPHA,   {-1, -1, -1}},
109     {NULL,      0, -1,      {-1, -1, -1}},
110     {"N",       0, NUMERIC, {FMT_F, 7, 0}},
111     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},
112   };
113
114 /* Missing value types. */
115 enum missing_treatment
116   {
117     ITEMWISE,           /* Missing values item by item. */
118     COLUMNWISE          /* Missing values column by column. */
119   };
120
121 /* An entire AGGREGATE procedure. */
122 struct agr_proc 
123   {
124     /* We have either an output file or a sink. */
125     struct any_writer *writer;          /* Output file, or null if none. */
126     struct case_sink *sink;             /* Sink, or null if none. */
127
128     /* Break variables. */
129     struct sort_criteria *sort;         /* Sort criteria. */
130     struct variable **break_vars;       /* Break variables. */
131     size_t break_var_cnt;               /* Number of break variables. */
132     struct ccase break_case;            /* Last values of break variables. */
133
134     enum missing_treatment missing;     /* How to treat missing values. */
135     struct agr_var *agr_vars;           /* First aggregate variable. */
136     struct dictionary *dict;            /* Aggregate dictionary. */
137     int case_cnt;                       /* Counts aggregated cases. */
138     struct ccase agr_case;              /* Aggregate case for output. */
139   };
140
141 static void initialize_aggregate_info (struct agr_proc *,
142                                        const struct ccase *);
143
144 /* Prototypes. */
145 static int parse_aggregate_functions (struct agr_proc *);
146 static void agr_destroy (struct agr_proc *);
147 static int aggregate_single_case (struct agr_proc *agr,
148                                   const struct ccase *input,
149                                   struct ccase *output);
150 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
151
152 /* Aggregating to the active file. */
153 static bool agr_to_active_file (struct ccase *, void *aux);
154
155 /* Aggregating to a system file. */
156 static bool presorted_agr_to_sysfile (struct ccase *, void *aux);
157 \f
158 /* Parsing. */
159
160 /* Parses and executes the AGGREGATE procedure. */
161 int
162 cmd_aggregate (void)
163 {
164   struct agr_proc agr;
165   struct file_handle *out_file = NULL;
166
167   bool copy_documents = false;
168   bool presorted = false;
169   bool saw_direction;
170
171   memset(&agr, 0 , sizeof (agr));
172   agr.missing = ITEMWISE;
173   case_nullify (&agr.break_case);
174   
175   agr.dict = dict_create ();
176   dict_set_label (agr.dict, dict_get_label (default_dict));
177   dict_set_documents (agr.dict, dict_get_documents (default_dict));
178
179   /* OUTFILE subcommand must be first. */
180   if (!lex_force_match_id ("OUTFILE"))
181     goto error;
182   lex_match ('=');
183   if (!lex_match ('*'))
184     {
185       out_file = fh_parse (FH_REF_FILE | FH_REF_SCRATCH);
186       if (out_file == NULL)
187         goto error;
188     }
189   
190   /* Read most of the subcommands. */
191   for (;;)
192     {
193       lex_match ('/');
194       
195       if (lex_match_id ("MISSING"))
196         {
197           lex_match ('=');
198           if (!lex_match_id ("COLUMNWISE"))
199             {
200               lex_error (_("while expecting COLUMNWISE"));
201               goto error;
202             }
203           agr.missing = COLUMNWISE;
204         }
205       else if (lex_match_id ("DOCUMENT"))
206         copy_documents = true;
207       else if (lex_match_id ("PRESORTED"))
208         presorted = true;
209       else if (lex_match_id ("BREAK"))
210         {
211           int i;
212
213           lex_match ('=');
214           agr.sort = sort_parse_criteria (default_dict,
215                                           &agr.break_vars, &agr.break_var_cnt,
216                                           &saw_direction, NULL);
217           if (agr.sort == NULL)
218             goto error;
219           
220           for (i = 0; i < agr.break_var_cnt; i++)
221             dict_clone_var_assert (agr.dict, agr.break_vars[i],
222                                    agr.break_vars[i]->name);
223
224           /* BREAK must follow the options. */
225           break;
226         }
227       else
228         {
229           lex_error (_("expecting BREAK"));
230           goto error;
231         }
232     }
233   if (presorted && saw_direction)
234     msg (SW, _("When PRESORTED is specified, specifying sorting directions "
235                "with (A) or (D) has no effect.  Output data will be sorted "
236                "the same way as the input data."));
237       
238   /* Read in the aggregate functions. */
239   lex_match ('/');
240   if (!parse_aggregate_functions (&agr))
241     goto error;
242
243   /* Delete documents. */
244   if (!copy_documents)
245     dict_set_documents (agr.dict, NULL);
246
247   /* Cancel SPLIT FILE. */
248   dict_set_split_vars (agr.dict, NULL, 0);
249   
250   /* Initialize. */
251   agr.case_cnt = 0;
252   case_create (&agr.agr_case, dict_get_next_value_idx (agr.dict));
253
254   /* Output to active file or external file? */
255   if (out_file == NULL) 
256     {
257       /* The active file will be replaced by the aggregated data,
258          so TEMPORARY is moot. */
259       cancel_temporary ();
260
261       if (agr.sort != NULL && !presorted) 
262         {
263           if (!sort_active_file_in_place (agr.sort))
264             goto error;
265         }
266
267       agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
268       if (agr.sink->class->open != NULL)
269         agr.sink->class->open (agr.sink);
270       vfm_sink = create_case_sink (&null_sink_class, default_dict, NULL);
271       if (!procedure (agr_to_active_file, &agr))
272         goto error;
273       if (agr.case_cnt > 0) 
274         {
275           dump_aggregate_info (&agr, &agr.agr_case);
276           if (!agr.sink->class->write (agr.sink, &agr.agr_case))
277             goto error;
278         }
279       dict_destroy (default_dict);
280       default_dict = agr.dict;
281       agr.dict = NULL;
282       vfm_source = agr.sink->class->make_source (agr.sink);
283       free_case_sink (agr.sink);
284     }
285   else
286     {
287       agr.writer = any_writer_open (out_file, agr.dict);
288       if (agr.writer == NULL)
289         goto error;
290       
291       if (agr.sort != NULL && !presorted) 
292         {
293           /* Sorting is needed. */
294           struct casefile *dst;
295           struct casereader *reader;
296           struct ccase c;
297           bool ok = true;
298           
299           dst = sort_active_file_to_casefile (agr.sort);
300           if (dst == NULL)
301             goto error;
302           reader = casefile_get_destructive_reader (dst);
303           while (ok && casereader_read_xfer (reader, &c)) 
304             {
305               if (aggregate_single_case (&agr, &c, &agr.agr_case)) 
306                 ok = any_writer_write (agr.writer, &agr.agr_case);
307               case_destroy (&c);
308             }
309           casereader_destroy (reader);
310           if (ok)
311             ok = !casefile_error (dst);
312           casefile_destroy (dst);
313           if (!ok)
314             goto error;
315         }
316       else 
317         {
318           /* Active file is already sorted. */
319           if (!procedure (presorted_agr_to_sysfile, &agr))
320             goto error;
321         }
322       
323       if (agr.case_cnt > 0) 
324         {
325           dump_aggregate_info (&agr, &agr.agr_case);
326           any_writer_write (agr.writer, &agr.agr_case);
327         }
328       if (any_writer_error (agr.writer))
329         goto error;
330     }
331   
332   agr_destroy (&agr);
333   return CMD_SUCCESS;
334
335 error:
336   agr_destroy (&agr);
337   return CMD_CASCADING_FAILURE;
338 }
339
340 /* Parse all the aggregate functions. */
341 static int
342 parse_aggregate_functions (struct agr_proc *agr)
343 {
344   struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
345
346   /* Parse everything. */
347   tail = NULL;
348   for (;;)
349     {
350       char **dest;
351       char **dest_label;
352       size_t n_dest;
353
354       int include_missing;
355       const struct agr_func *function;
356       int func_index;
357
358       union value arg[2];
359
360       struct variable **src;
361       size_t n_src;
362
363       size_t i;
364
365       dest = NULL;
366       dest_label = NULL;
367       n_dest = 0;
368       src = NULL;
369       function = NULL;
370       n_src = 0;
371       arg[0].c = NULL;
372       arg[1].c = NULL;
373
374       /* Parse the list of target variables. */
375       while (!lex_match ('='))
376         {
377           size_t n_dest_prev = n_dest;
378           
379           if (!parse_DATA_LIST_vars (&dest, &n_dest,
380                                      PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
381             goto error;
382
383           /* Assign empty labels. */
384           {
385             int j;
386
387             dest_label = xnrealloc (dest_label, n_dest, sizeof *dest_label);
388             for (j = n_dest_prev; j < n_dest; j++)
389               dest_label[j] = NULL;
390           }
391           
392           if (token == T_STRING)
393             {
394               ds_truncate (&tokstr, 255);
395               dest_label[n_dest - 1] = xstrdup (ds_c_str (&tokstr));
396               lex_get ();
397             }
398         }
399
400       /* Get the name of the aggregation function. */
401       if (token != T_ID)
402         {
403           lex_error (_("expecting aggregation function"));
404           goto error;
405         }
406
407       include_missing = 0;
408       if (tokid[strlen (tokid) - 1] == '.')
409         {
410           include_missing = 1;
411           tokid[strlen (tokid) - 1] = 0;
412         }
413       
414       for (function = agr_func_tab; function->name; function++)
415         if (!strcasecmp (function->name, tokid))
416           break;
417       if (NULL == function->name)
418         {
419           msg (SE, _("Unknown aggregation function %s."), tokid);
420           goto error;
421         }
422       func_index = function - agr_func_tab;
423       lex_get ();
424
425       /* Check for leading lparen. */
426       if (!lex_match ('('))
427         {
428           if (func_index == N)
429             func_index = N_NO_VARS;
430           else if (func_index == NU)
431             func_index = NU_NO_VARS;
432           else
433             {
434               lex_error (_("expecting `('"));
435               goto error;
436             }
437         }
438       else
439         {
440           /* Parse list of source variables. */
441           {
442             int pv_opts = PV_NO_SCRATCH;
443
444             if (func_index == SUM || func_index == MEAN || func_index == SD)
445               pv_opts |= PV_NUMERIC;
446             else if (function->n_args)
447               pv_opts |= PV_SAME_TYPE;
448
449             if (!parse_variables (default_dict, &src, &n_src, pv_opts))
450               goto error;
451           }
452
453           /* Parse function arguments, for those functions that
454              require arguments. */
455           if (function->n_args != 0)
456             for (i = 0; i < function->n_args; i++)
457               {
458                 int type;
459             
460                 lex_match (',');
461                 if (token == T_STRING)
462                   {
463                     arg[i].c = xstrdup (ds_c_str (&tokstr));
464                     type = ALPHA;
465                   }
466                 else if (lex_is_number ())
467                   {
468                     arg[i].f = tokval;
469                     type = NUMERIC;
470                   } else {
471                     msg (SE, _("Missing argument %d to %s."), i + 1,
472                          function->name);
473                     goto error;
474                   }
475             
476                 lex_get ();
477
478                 if (type != src[0]->type)
479                   {
480                     msg (SE, _("Arguments to %s must be of same type as "
481                                "source variables."),
482                          function->name);
483                     goto error;
484                   }
485               }
486
487           /* Trailing rparen. */
488           if (!lex_match(')'))
489             {
490               lex_error (_("expecting `)'"));
491               goto error;
492             }
493           
494           /* Now check that the number of source variables match
495              the number of target variables.  If we check earlier
496              than this, the user can get very misleading error
497              message, i.e. `AGGREGATE x=SUM(y t).' will get this
498              error message when a proper message would be more
499              like `unknown variable t'. */
500           if (n_src != n_dest)
501             {
502               msg (SE, _("Number of source variables (%u) does not match "
503                          "number of target variables (%u)."),
504                    (unsigned) n_src, (unsigned) n_dest);
505               goto error;
506             }
507
508           if ((func_index == PIN || func_index == POUT
509               || func_index == FIN || func_index == FOUT) 
510               && ((src[0]->type == NUMERIC && arg[0].f > arg[1].f)
511                   || (src[0]->type == ALPHA
512                       && str_compare_rpad (arg[0].c, arg[1].c) > 0)))
513             {
514               union value t = arg[0];
515               arg[0] = arg[1];
516               arg[1] = t;
517                   
518               msg (SW, _("The value arguments passed to the %s function "
519                          "are out-of-order.  They will be treated as if "
520                          "they had been specified in the correct order."),
521                    function->name);
522             }
523         }
524         
525       /* Finally add these to the linked list of aggregation
526          variables. */
527       for (i = 0; i < n_dest; i++)
528         {
529           struct agr_var *v = xmalloc (sizeof *v);
530
531           /* Add variable to chain. */
532           if (agr->agr_vars != NULL)
533             tail->next = v;
534           else
535             agr->agr_vars = v;
536           tail = v;
537           tail->next = NULL;
538           v->moments = NULL;
539           
540           /* Create the target variable in the aggregate
541              dictionary. */
542           {
543             struct variable *destvar;
544             
545             v->function = func_index;
546
547             if (src)
548               {
549                 v->src = src[i];
550                 
551                 if (src[i]->type == ALPHA)
552                   {
553                     v->function |= FSTRING;
554                     v->string = xmalloc (src[i]->width);
555                   }
556
557                 if (function->alpha_type == ALPHA)
558                   destvar = dict_clone_var (agr->dict, v->src, dest[i]);
559                 else
560                   {
561                     assert (v->src->type == NUMERIC
562                             || function->alpha_type == NUMERIC);
563                     destvar = dict_create_var (agr->dict, dest[i], 0);
564                     if (destvar != NULL) 
565                       {
566                         if ((func_index == N || func_index == NMISS)
567                             && dict_get_weight (default_dict) != NULL)
568                           destvar->print = destvar->write = f8_2; 
569                         else
570                           destvar->print = destvar->write = function->format;
571                       }
572                   }
573               } else {
574                 v->src = NULL;
575                 destvar = dict_create_var (agr->dict, dest[i], 0);
576                 if (func_index == N_NO_VARS
577                     && dict_get_weight (default_dict) != NULL)
578                   destvar->print = destvar->write = f8_2; 
579                 else
580                   destvar->print = destvar->write = function->format;
581               }
582           
583             if (!destvar)
584               {
585                 msg (SE, _("Variable name %s is not unique within the "
586                            "aggregate file dictionary, which contains "
587                            "the aggregate variables and the break "
588                            "variables."),
589                      dest[i]);
590                 goto error;
591               }
592
593             free (dest[i]);
594             destvar->init = 0;
595             if (dest_label[i])
596               {
597                 destvar->label = dest_label[i];
598                 dest_label[i] = NULL;
599               }
600
601             v->dest = destvar;
602           }
603           
604           v->include_missing = include_missing;
605
606           if (v->src != NULL)
607             {
608               int j;
609
610               if (v->src->type == NUMERIC)
611                 for (j = 0; j < function->n_args; j++)
612                   v->arg[j].f = arg[j].f;
613               else
614                 for (j = 0; j < function->n_args; j++)
615                   v->arg[j].c = xstrdup (arg[j].c);
616             }
617         }
618       
619       if (src != NULL && src[0]->type == ALPHA)
620         for (i = 0; i < function->n_args; i++)
621           {
622             free (arg[i].c);
623             arg[i].c = NULL;
624           }
625
626       free (src);
627       free (dest);
628       free (dest_label);
629
630       if (!lex_match ('/'))
631         {
632           if (token == '.')
633             return 1;
634
635           lex_error ("expecting end of command");
636           return 0;
637         }
638       continue;
639       
640     error:
641       for (i = 0; i < n_dest; i++)
642         {
643           free (dest[i]);
644           free (dest_label[i]);
645         }
646       free (dest);
647       free (dest_label);
648       free (arg[0].c);
649       free (arg[1].c);
650       if (src && n_src && src[0]->type == ALPHA)
651         for (i = 0; i < function->n_args; i++)
652           {
653             free (arg[i].c);
654             arg[i].c = NULL;
655           }
656       free (src);
657         
658       return 0;
659     }
660 }
661
662 /* Destroys AGR. */
663 static void
664 agr_destroy (struct agr_proc *agr)
665 {
666   struct agr_var *iter, *next;
667
668   any_writer_close (agr->writer);
669   if (agr->sort != NULL)
670     sort_destroy_criteria (agr->sort);
671   free (agr->break_vars);
672   case_destroy (&agr->break_case);
673   for (iter = agr->agr_vars; iter; iter = next)
674     {
675       next = iter->next;
676
677       if (iter->function & FSTRING)
678         {
679           size_t n_args;
680           size_t i;
681
682           n_args = agr_func_tab[iter->function & FUNC].n_args;
683           for (i = 0; i < n_args; i++)
684             free (iter->arg[i].c);
685           free (iter->string);
686         }
687       else if (iter->function == SD)
688         moments1_destroy (iter->moments);
689       free (iter);
690     }
691   if (agr->dict != NULL)
692     dict_destroy (agr->dict);
693
694   case_destroy (&agr->agr_case);
695 }
696 \f
697 /* Execution. */
698
699 static void accumulate_aggregate_info (struct agr_proc *,
700                                        const struct ccase *);
701 static void dump_aggregate_info (struct agr_proc *, struct ccase *);
702
703 /* Processes a single case INPUT for aggregation.  If output is
704    warranted, writes it to OUTPUT and returns nonzero.
705    Otherwise, returns zero and OUTPUT is unmodified. */
706 static int
707 aggregate_single_case (struct agr_proc *agr,
708                        const struct ccase *input, struct ccase *output)
709 {
710   bool finished_group = false;
711   
712   if (agr->case_cnt++ == 0)
713     initialize_aggregate_info (agr, input);
714   else if (case_compare (&agr->break_case, input,
715                          agr->break_vars, agr->break_var_cnt))
716     {
717       dump_aggregate_info (agr, output);
718       finished_group = true;
719
720       initialize_aggregate_info (agr, input);
721     }
722
723   accumulate_aggregate_info (agr, input);
724   return finished_group;
725 }
726
727 /* Accumulates aggregation data from the case INPUT. */
728 static void 
729 accumulate_aggregate_info (struct agr_proc *agr,
730                            const struct ccase *input)
731 {
732   struct agr_var *iter;
733   double weight;
734   int bad_warn = 1;
735
736   weight = dict_get_case_weight (default_dict, input, &bad_warn);
737
738   for (iter = agr->agr_vars; iter; iter = iter->next)
739     if (iter->src)
740       {
741         const union value *v = case_data (input, iter->src->fv);
742
743         if ((!iter->include_missing
744              && mv_is_value_missing (&iter->src->miss, v))
745             || (iter->include_missing && iter->src->type == NUMERIC
746                 && v->f == SYSMIS))
747           {
748             switch (iter->function)
749               {
750               case NMISS:
751               case NMISS | FSTRING:
752                 iter->dbl[0] += weight;
753                 break;
754               case NUMISS:
755               case NUMISS | FSTRING:
756                 iter->int1++;
757                 break;
758               }
759             iter->missing = 1;
760             continue;
761           }
762         
763         /* This is horrible.  There are too many possibilities. */
764         switch (iter->function)
765           {
766           case SUM:
767             iter->dbl[0] += v->f * weight;
768             iter->int1 = 1;
769             break;
770           case MEAN:
771             iter->dbl[0] += v->f * weight;
772             iter->dbl[1] += weight;
773             break;
774           case SD:
775             moments1_add (iter->moments, v->f, weight);
776             break;
777           case MAX:
778             iter->dbl[0] = max (iter->dbl[0], v->f);
779             iter->int1 = 1;
780             break;
781           case MAX | FSTRING:
782             if (memcmp (iter->string, v->s, iter->src->width) < 0)
783               memcpy (iter->string, v->s, iter->src->width);
784             iter->int1 = 1;
785             break;
786           case MIN:
787             iter->dbl[0] = min (iter->dbl[0], v->f);
788             iter->int1 = 1;
789             break;
790           case MIN | FSTRING:
791             if (memcmp (iter->string, v->s, iter->src->width) > 0)
792               memcpy (iter->string, v->s, iter->src->width);
793             iter->int1 = 1;
794             break;
795           case FGT:
796           case PGT:
797             if (v->f > iter->arg[0].f)
798               iter->dbl[0] += weight;
799             iter->dbl[1] += weight;
800             break;
801           case FGT | FSTRING:
802           case PGT | FSTRING:
803             if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
804               iter->dbl[0] += weight;
805             iter->dbl[1] += weight;
806             break;
807           case FLT:
808           case PLT:
809             if (v->f < iter->arg[0].f)
810               iter->dbl[0] += weight;
811             iter->dbl[1] += weight;
812             break;
813           case FLT | FSTRING:
814           case PLT | FSTRING:
815             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
816               iter->dbl[0] += weight;
817             iter->dbl[1] += weight;
818             break;
819           case FIN:
820           case PIN:
821             if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
822               iter->dbl[0] += weight;
823             iter->dbl[1] += weight;
824             break;
825           case FIN | FSTRING:
826           case PIN | FSTRING:
827             if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
828                 && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
829               iter->dbl[0] += weight;
830             iter->dbl[1] += weight;
831             break;
832           case FOUT:
833           case POUT:
834             if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
835               iter->dbl[0] += weight;
836             iter->dbl[1] += weight;
837             break;
838           case FOUT | FSTRING:
839           case POUT | FSTRING:
840             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
841                 || memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
842               iter->dbl[0] += weight;
843             iter->dbl[1] += weight;
844             break;
845           case N:
846           case N | FSTRING:
847             iter->dbl[0] += weight;
848             break;
849           case NU:
850           case NU | FSTRING:
851             iter->int1++;
852             break;
853           case FIRST:
854             if (iter->int1 == 0)
855               {
856                 iter->dbl[0] = v->f;
857                 iter->int1 = 1;
858               }
859             break;
860           case FIRST | FSTRING:
861             if (iter->int1 == 0)
862               {
863                 memcpy (iter->string, v->s, iter->src->width);
864                 iter->int1 = 1;
865               }
866             break;
867           case LAST:
868             iter->dbl[0] = v->f;
869             iter->int1 = 1;
870             break;
871           case LAST | FSTRING:
872             memcpy (iter->string, v->s, iter->src->width);
873             iter->int1 = 1;
874             break;
875           case NMISS:
876           case NMISS | FSTRING:
877           case NUMISS:
878           case NUMISS | FSTRING:
879             /* Our value is not missing or it would have been
880                caught earlier.  Nothing to do. */
881             break;
882           default:
883             assert (0);
884           }
885     } else {
886       switch (iter->function)
887         {
888         case N_NO_VARS:
889           iter->dbl[0] += weight;
890           break;
891         case NU_NO_VARS:
892           iter->int1++;
893           break;
894         default:
895           assert (0);
896         }
897     }
898 }
899
900 /* We've come to a record that differs from the previous in one or
901    more of the break variables.  Make an output record from the
902    accumulated statistics in the OUTPUT case. */
903 static void 
904 dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
905 {
906   {
907     int value_idx = 0;
908     int i;
909
910     for (i = 0; i < agr->break_var_cnt; i++) 
911       {
912         struct variable *v = agr->break_vars[i];
913         memcpy (case_data_rw (output, value_idx),
914                 case_data (&agr->break_case, v->fv),
915                 sizeof (union value) * v->nv);
916         value_idx += v->nv; 
917       }
918   }
919   
920   {
921     struct agr_var *i;
922   
923     for (i = agr->agr_vars; i; i = i->next)
924       {
925         union value *v = case_data_rw (output, i->dest->fv);
926
927         if (agr->missing == COLUMNWISE && i->missing != 0
928             && (i->function & FUNC) != N && (i->function & FUNC) != NU
929             && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
930           {
931             if (i->dest->type == ALPHA)
932               memset (v->s, ' ', i->dest->width);
933             else
934               v->f = SYSMIS;
935             continue;
936           }
937         
938         switch (i->function)
939           {
940           case SUM:
941             v->f = i->int1 ? i->dbl[0] : SYSMIS;
942             break;
943           case MEAN:
944             v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
945             break;
946           case SD:
947             {
948               double variance;
949
950               /* FIXME: we should use two passes. */
951               moments1_calculate (i->moments, NULL, NULL, &variance,
952                                  NULL, NULL);
953               if (variance != SYSMIS)
954                 v->f = sqrt (variance);
955               else
956                 v->f = SYSMIS; 
957             }
958             break;
959           case MAX:
960           case MIN:
961             v->f = i->int1 ? i->dbl[0] : SYSMIS;
962             break;
963           case MAX | FSTRING:
964           case MIN | FSTRING:
965             if (i->int1)
966               memcpy (v->s, i->string, i->dest->width);
967             else
968               memset (v->s, ' ', i->dest->width);
969             break;
970           case FGT:
971           case FGT | FSTRING:
972           case FLT:
973           case FLT | FSTRING:
974           case FIN:
975           case FIN | FSTRING:
976           case FOUT:
977           case FOUT | FSTRING:
978             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
979             break;
980           case PGT:
981           case PGT | FSTRING:
982           case PLT:
983           case PLT | FSTRING:
984           case PIN:
985           case PIN | FSTRING:
986           case POUT:
987           case POUT | FSTRING:
988             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
989             break;
990           case N:
991           case N | FSTRING:
992             v->f = i->dbl[0];
993             break;
994           case NU:
995           case NU | FSTRING:
996             v->f = i->int1;
997             break;
998           case FIRST:
999           case LAST:
1000             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1001             break;
1002           case FIRST | FSTRING:
1003           case LAST | FSTRING:
1004             if (i->int1)
1005               memcpy (v->s, i->string, i->dest->width);
1006             else
1007               memset (v->s, ' ', i->dest->width);
1008             break;
1009           case N_NO_VARS:
1010             v->f = i->dbl[0];
1011             break;
1012           case NU_NO_VARS:
1013             v->f = i->int1;
1014             break;
1015           case NMISS:
1016           case NMISS | FSTRING:
1017             v->f = i->dbl[0];
1018             break;
1019           case NUMISS:
1020           case NUMISS | FSTRING:
1021             v->f = i->int1;
1022             break;
1023           default:
1024             assert (0);
1025           }
1026       }
1027   }
1028 }
1029
1030 /* Resets the state for all the aggregate functions. */
1031 static void
1032 initialize_aggregate_info (struct agr_proc *agr, const struct ccase *input)
1033 {
1034   struct agr_var *iter;
1035
1036   case_destroy (&agr->break_case);
1037   case_clone (&agr->break_case, input);
1038
1039   for (iter = agr->agr_vars; iter; iter = iter->next)
1040     {
1041       iter->missing = 0;
1042       iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1043       iter->int1 = iter->int2 = 0;
1044       switch (iter->function)
1045         {
1046         case MIN:
1047           iter->dbl[0] = DBL_MAX;
1048           break;
1049         case MIN | FSTRING:
1050           memset (iter->string, 255, iter->src->width);
1051           break;
1052         case MAX:
1053           iter->dbl[0] = -DBL_MAX;
1054           break;
1055         case MAX | FSTRING:
1056           memset (iter->string, 0, iter->src->width);
1057           break;
1058         case SD:
1059           if (iter->moments == NULL)
1060             iter->moments = moments1_create (MOMENT_VARIANCE);
1061           else
1062             moments1_clear (iter->moments);
1063           break;
1064         default:
1065           break;
1066         }
1067     }
1068 }
1069 \f
1070 /* Aggregate each case as it comes through.  Cases which aren't needed
1071    are dropped.
1072    Returns true if successful, false if an I/O error occurred. */
1073 static bool
1074 agr_to_active_file (struct ccase *c, void *agr_)
1075 {
1076   struct agr_proc *agr = agr_;
1077
1078   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1079     return agr->sink->class->write (agr->sink, &agr->agr_case);
1080
1081   return true;
1082 }
1083
1084 /* Aggregate the current case and output it if we passed a
1085    breakpoint. */
1086 static bool
1087 presorted_agr_to_sysfile (struct ccase *c, void *agr_) 
1088 {
1089   struct agr_proc *agr = agr_;
1090
1091   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1092     return any_writer_write (agr->writer, &agr->agr_case);
1093   
1094   return true;
1095 }