a0887e3b2c0877123fbcfa7c3072dbb8d9dee99f
[pspp-builds.git] / src / language / stats / aggregate.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21
22 #include <stdlib.h>
23
24 #include <data/any-writer.h>
25 #include <data/case-sink.h>
26 #include <data/case.h>
27 #include <data/casefile.h>
28 #include <data/dictionary.h>
29 #include <data/file-handle-def.h>
30 #include <data/settings.h>
31 #include <data/storage-stream.h>
32 #include <data/sys-file-writer.h>
33 #include <data/variable.h>
34 #include <language/command.h>
35 #include <language/data-io/file-handle.h>
36 #include <language/lexer/lexer.h>
37 #include <language/stats/sort-criteria.h>
38 #include <libpspp/alloc.h>
39 #include <libpspp/message.h>
40 #include <libpspp/message.h>
41 #include <libpspp/misc.h>
42 #include <libpspp/pool.h>
43 #include <libpspp/str.h>
44 #include <math/moments.h>
45 #include <math/sort.h>
46 #include <procedure.h>
47
48 #include "gettext.h"
49 #define _(msgid) gettext (msgid)
50
51 /* Specifies how to make an aggregate variable. */
52 struct agr_var
53   {
54     struct agr_var *next;               /* Next in list. */
55
56     /* Collected during parsing. */
57     struct variable *src;       /* Source variable. */
58     struct variable *dest;      /* Target variable. */
59     int function;               /* Function. */
60     int include_missing;        /* 1=Include user-missing values. */
61     union value arg[2];         /* Arguments. */
62
63     /* Accumulated during AGGREGATE execution. */
64     double dbl[3];
65     int int1, int2;
66     char *string;
67     int missing;
68     struct moments1 *moments;
69   };
70
71 /* Aggregation functions. */
72 enum
73   {
74     NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
75     FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
76     N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
77     FUNC = 0x1f, /* Function mask. */
78     FSTRING = 1<<5, /* String function bit. */
79   };
80
81 /* Attributes of an aggregation function. */
82 struct agr_func
83   {
84     const char *name;           /* Aggregation function name. */
85     size_t n_args;              /* Number of arguments. */
86     int alpha_type;             /* When given ALPHA arguments, output type. */
87     struct fmt_spec format;     /* Format spec if alpha_type != ALPHA. */
88   };
89
90 /* Attributes of aggregation functions. */
91 static const struct agr_func agr_func_tab[] = 
92   {
93     {"<NONE>",  0, -1,      {0, 0, 0}},
94     {"SUM",     0, -1,      {FMT_F, 8, 2}},
95     {"MEAN",    0, -1,      {FMT_F, 8, 2}},
96     {"SD",      0, -1,      {FMT_F, 8, 2}},
97     {"MAX",     0, ALPHA,   {-1, -1, -1}}, 
98     {"MIN",     0, ALPHA,   {-1, -1, -1}}, 
99     {"PGT",     1, NUMERIC, {FMT_F, 5, 1}},      
100     {"PLT",     1, NUMERIC, {FMT_F, 5, 1}},       
101     {"PIN",     2, NUMERIC, {FMT_F, 5, 1}},       
102     {"POUT",    2, NUMERIC, {FMT_F, 5, 1}},       
103     {"FGT",     1, NUMERIC, {FMT_F, 5, 3}},       
104     {"FLT",     1, NUMERIC, {FMT_F, 5, 3}},       
105     {"FIN",     2, NUMERIC, {FMT_F, 5, 3}},       
106     {"FOUT",    2, NUMERIC, {FMT_F, 5, 3}},       
107     {"N",       0, NUMERIC, {FMT_F, 7, 0}},       
108     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},       
109     {"NMISS",   0, NUMERIC, {FMT_F, 7, 0}},       
110     {"NUMISS",  0, NUMERIC, {FMT_F, 7, 0}},       
111     {"FIRST",   0, ALPHA,   {-1, -1, -1}}, 
112     {"LAST",    0, ALPHA,   {-1, -1, -1}},
113     {NULL,      0, -1,      {-1, -1, -1}},
114     {"N",       0, NUMERIC, {FMT_F, 7, 0}},
115     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},
116   };
117
118 /* Missing value types. */
119 enum missing_treatment
120   {
121     ITEMWISE,           /* Missing values item by item. */
122     COLUMNWISE          /* Missing values column by column. */
123   };
124
125 /* An entire AGGREGATE procedure. */
126 struct agr_proc 
127   {
128     /* We have either an output file or a sink. */
129     struct any_writer *writer;          /* Output file, or null if none. */
130     struct case_sink *sink;             /* Sink, or null if none. */
131
132     /* Break variables. */
133     struct sort_criteria *sort;         /* Sort criteria. */
134     struct variable **break_vars;       /* Break variables. */
135     size_t break_var_cnt;               /* Number of break variables. */
136     struct ccase break_case;            /* Last values of break variables. */
137
138     enum missing_treatment missing;     /* How to treat missing values. */
139     struct agr_var *agr_vars;           /* First aggregate variable. */
140     struct dictionary *dict;            /* Aggregate dictionary. */
141     int case_cnt;                       /* Counts aggregated cases. */
142     struct ccase agr_case;              /* Aggregate case for output. */
143   };
144
145 static void initialize_aggregate_info (struct agr_proc *,
146                                        const struct ccase *);
147
148 /* Prototypes. */
149 static int parse_aggregate_functions (struct agr_proc *);
150 static void agr_destroy (struct agr_proc *);
151 static int aggregate_single_case (struct agr_proc *agr,
152                                   const struct ccase *input,
153                                   struct ccase *output);
154 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
155
156 /* Aggregating to the active file. */
157 static bool agr_to_active_file (struct ccase *, void *aux);
158
159 /* Aggregating to a system file. */
160 static bool presorted_agr_to_sysfile (struct ccase *, void *aux);
161 \f
162 /* Parsing. */
163
164 /* Parses and executes the AGGREGATE procedure. */
165 int
166 cmd_aggregate (void)
167 {
168   struct agr_proc agr;
169   struct file_handle *out_file = NULL;
170
171   bool copy_documents = false;
172   bool presorted = false;
173   bool saw_direction;
174
175   memset(&agr, 0 , sizeof (agr));
176   agr.missing = ITEMWISE;
177   case_nullify (&agr.break_case);
178   
179   agr.dict = dict_create ();
180   dict_set_label (agr.dict, dict_get_label (default_dict));
181   dict_set_documents (agr.dict, dict_get_documents (default_dict));
182
183   /* OUTFILE subcommand must be first. */
184   if (!lex_force_match_id ("OUTFILE"))
185     goto error;
186   lex_match ('=');
187   if (!lex_match ('*'))
188     {
189       out_file = fh_parse (FH_REF_FILE | FH_REF_SCRATCH);
190       if (out_file == NULL)
191         goto error;
192     }
193   
194   /* Read most of the subcommands. */
195   for (;;)
196     {
197       lex_match ('/');
198       
199       if (lex_match_id ("MISSING"))
200         {
201           lex_match ('=');
202           if (!lex_match_id ("COLUMNWISE"))
203             {
204               lex_error (_("while expecting COLUMNWISE"));
205               goto error;
206             }
207           agr.missing = COLUMNWISE;
208         }
209       else if (lex_match_id ("DOCUMENT"))
210         copy_documents = true;
211       else if (lex_match_id ("PRESORTED"))
212         presorted = true;
213       else if (lex_match_id ("BREAK"))
214         {
215           int i;
216
217           lex_match ('=');
218           agr.sort = sort_parse_criteria (default_dict,
219                                           &agr.break_vars, &agr.break_var_cnt,
220                                           &saw_direction, NULL);
221           if (agr.sort == NULL)
222             goto error;
223           
224           for (i = 0; i < agr.break_var_cnt; i++)
225             dict_clone_var_assert (agr.dict, agr.break_vars[i],
226                                    agr.break_vars[i]->name);
227
228           /* BREAK must follow the options. */
229           break;
230         }
231       else
232         {
233           lex_error (_("expecting BREAK"));
234           goto error;
235         }
236     }
237   if (presorted && saw_direction)
238     msg (SW, _("When PRESORTED is specified, specifying sorting directions "
239                "with (A) or (D) has no effect.  Output data will be sorted "
240                "the same way as the input data."));
241       
242   /* Read in the aggregate functions. */
243   lex_match ('/');
244   if (!parse_aggregate_functions (&agr))
245     goto error;
246
247   /* Delete documents. */
248   if (!copy_documents)
249     dict_set_documents (agr.dict, NULL);
250
251   /* Cancel SPLIT FILE. */
252   dict_set_split_vars (agr.dict, NULL, 0);
253   
254   /* Initialize. */
255   agr.case_cnt = 0;
256   case_create (&agr.agr_case, dict_get_next_value_idx (agr.dict));
257
258   /* Output to active file or external file? */
259   if (out_file == NULL) 
260     {
261       /* The active file will be replaced by the aggregated data,
262          so TEMPORARY is moot. */
263       cancel_temporary ();
264
265       if (agr.sort != NULL && !presorted) 
266         {
267           if (!sort_active_file_in_place (agr.sort))
268             goto error;
269         }
270
271       agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
272       if (agr.sink->class->open != NULL)
273         agr.sink->class->open (agr.sink);
274       vfm_sink = create_case_sink (&null_sink_class, default_dict, NULL);
275       if (!procedure (agr_to_active_file, &agr))
276         goto error;
277       if (agr.case_cnt > 0) 
278         {
279           dump_aggregate_info (&agr, &agr.agr_case);
280           if (!agr.sink->class->write (agr.sink, &agr.agr_case))
281             goto error;
282         }
283       dict_destroy (default_dict);
284       default_dict = agr.dict;
285       agr.dict = NULL;
286       vfm_source = agr.sink->class->make_source (agr.sink);
287       free_case_sink (agr.sink);
288     }
289   else
290     {
291       agr.writer = any_writer_open (out_file, agr.dict);
292       if (agr.writer == NULL)
293         goto error;
294       
295       if (agr.sort != NULL && !presorted) 
296         {
297           /* Sorting is needed. */
298           struct casefile *dst;
299           struct casereader *reader;
300           struct ccase c;
301           bool ok = true;
302           
303           dst = sort_active_file_to_casefile (agr.sort);
304           if (dst == NULL)
305             goto error;
306           reader = casefile_get_destructive_reader (dst);
307           while (ok && casereader_read_xfer (reader, &c)) 
308             {
309               if (aggregate_single_case (&agr, &c, &agr.agr_case)) 
310                 ok = any_writer_write (agr.writer, &agr.agr_case);
311               case_destroy (&c);
312             }
313           casereader_destroy (reader);
314           if (ok)
315             ok = !casefile_error (dst);
316           casefile_destroy (dst);
317           if (!ok)
318             goto error;
319         }
320       else 
321         {
322           /* Active file is already sorted. */
323           if (!procedure (presorted_agr_to_sysfile, &agr))
324             goto error;
325         }
326       
327       if (agr.case_cnt > 0) 
328         {
329           dump_aggregate_info (&agr, &agr.agr_case);
330           any_writer_write (agr.writer, &agr.agr_case);
331         }
332       if (any_writer_error (agr.writer))
333         goto error;
334     }
335   
336   agr_destroy (&agr);
337   return CMD_SUCCESS;
338
339 error:
340   agr_destroy (&agr);
341   return CMD_CASCADING_FAILURE;
342 }
343
344 /* Parse all the aggregate functions. */
345 static int
346 parse_aggregate_functions (struct agr_proc *agr)
347 {
348   struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
349
350   /* Parse everything. */
351   tail = NULL;
352   for (;;)
353     {
354       char **dest;
355       char **dest_label;
356       size_t n_dest;
357
358       int include_missing;
359       const struct agr_func *function;
360       int func_index;
361
362       union value arg[2];
363
364       struct variable **src;
365       size_t n_src;
366
367       size_t i;
368
369       dest = NULL;
370       dest_label = NULL;
371       n_dest = 0;
372       src = NULL;
373       function = NULL;
374       n_src = 0;
375       arg[0].c = NULL;
376       arg[1].c = NULL;
377
378       /* Parse the list of target variables. */
379       while (!lex_match ('='))
380         {
381           size_t n_dest_prev = n_dest;
382           
383           if (!parse_DATA_LIST_vars (&dest, &n_dest,
384                                      PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
385             goto error;
386
387           /* Assign empty labels. */
388           {
389             int j;
390
391             dest_label = xnrealloc (dest_label, n_dest, sizeof *dest_label);
392             for (j = n_dest_prev; j < n_dest; j++)
393               dest_label[j] = NULL;
394           }
395           
396           if (token == T_STRING)
397             {
398               ds_truncate (&tokstr, 255);
399               dest_label[n_dest - 1] = xstrdup (ds_c_str (&tokstr));
400               lex_get ();
401             }
402         }
403
404       /* Get the name of the aggregation function. */
405       if (token != T_ID)
406         {
407           lex_error (_("expecting aggregation function"));
408           goto error;
409         }
410
411       include_missing = 0;
412       if (tokid[strlen (tokid) - 1] == '.')
413         {
414           include_missing = 1;
415           tokid[strlen (tokid) - 1] = 0;
416         }
417       
418       for (function = agr_func_tab; function->name; function++)
419         if (!strcasecmp (function->name, tokid))
420           break;
421       if (NULL == function->name)
422         {
423           msg (SE, _("Unknown aggregation function %s."), tokid);
424           goto error;
425         }
426       func_index = function - agr_func_tab;
427       lex_get ();
428
429       /* Check for leading lparen. */
430       if (!lex_match ('('))
431         {
432           if (func_index == N)
433             func_index = N_NO_VARS;
434           else if (func_index == NU)
435             func_index = NU_NO_VARS;
436           else
437             {
438               lex_error (_("expecting `('"));
439               goto error;
440             }
441         }
442       else
443         {
444           /* Parse list of source variables. */
445           {
446             int pv_opts = PV_NO_SCRATCH;
447
448             if (func_index == SUM || func_index == MEAN || func_index == SD)
449               pv_opts |= PV_NUMERIC;
450             else if (function->n_args)
451               pv_opts |= PV_SAME_TYPE;
452
453             if (!parse_variables (default_dict, &src, &n_src, pv_opts))
454               goto error;
455           }
456
457           /* Parse function arguments, for those functions that
458              require arguments. */
459           if (function->n_args != 0)
460             for (i = 0; i < function->n_args; i++)
461               {
462                 int type;
463             
464                 lex_match (',');
465                 if (token == T_STRING)
466                   {
467                     arg[i].c = xstrdup (ds_c_str (&tokstr));
468                     type = ALPHA;
469                   }
470                 else if (lex_is_number ())
471                   {
472                     arg[i].f = tokval;
473                     type = NUMERIC;
474                   } else {
475                     msg (SE, _("Missing argument %d to %s."), i + 1,
476                          function->name);
477                     goto error;
478                   }
479             
480                 lex_get ();
481
482                 if (type != src[0]->type)
483                   {
484                     msg (SE, _("Arguments to %s must be of same type as "
485                                "source variables."),
486                          function->name);
487                     goto error;
488                   }
489               }
490
491           /* Trailing rparen. */
492           if (!lex_match(')'))
493             {
494               lex_error (_("expecting `)'"));
495               goto error;
496             }
497           
498           /* Now check that the number of source variables match
499              the number of target variables.  If we check earlier
500              than this, the user can get very misleading error
501              message, i.e. `AGGREGATE x=SUM(y t).' will get this
502              error message when a proper message would be more
503              like `unknown variable t'. */
504           if (n_src != n_dest)
505             {
506               msg (SE, _("Number of source variables (%u) does not match "
507                          "number of target variables (%u)."),
508                    (unsigned) n_src, (unsigned) n_dest);
509               goto error;
510             }
511
512           if ((func_index == PIN || func_index == POUT
513               || func_index == FIN || func_index == FOUT) 
514               && ((src[0]->type == NUMERIC && arg[0].f > arg[1].f)
515                   || (src[0]->type == ALPHA
516                       && str_compare_rpad (arg[0].c, arg[1].c) > 0)))
517             {
518               union value t = arg[0];
519               arg[0] = arg[1];
520               arg[1] = t;
521                   
522               msg (SW, _("The value arguments passed to the %s function "
523                          "are out-of-order.  They will be treated as if "
524                          "they had been specified in the correct order."),
525                    function->name);
526             }
527         }
528         
529       /* Finally add these to the linked list of aggregation
530          variables. */
531       for (i = 0; i < n_dest; i++)
532         {
533           struct agr_var *v = xmalloc (sizeof *v);
534
535           /* Add variable to chain. */
536           if (agr->agr_vars != NULL)
537             tail->next = v;
538           else
539             agr->agr_vars = v;
540           tail = v;
541           tail->next = NULL;
542           v->moments = NULL;
543           
544           /* Create the target variable in the aggregate
545              dictionary. */
546           {
547             struct variable *destvar;
548             
549             v->function = func_index;
550
551             if (src)
552               {
553                 v->src = src[i];
554                 
555                 if (src[i]->type == ALPHA)
556                   {
557                     v->function |= FSTRING;
558                     v->string = xmalloc (src[i]->width);
559                   }
560
561                 if (function->alpha_type == ALPHA)
562                   destvar = dict_clone_var (agr->dict, v->src, dest[i]);
563                 else
564                   {
565                     assert (v->src->type == NUMERIC
566                             || function->alpha_type == NUMERIC);
567                     destvar = dict_create_var (agr->dict, dest[i], 0);
568                     if (destvar != NULL) 
569                       {
570                         if ((func_index == N || func_index == NMISS)
571                             && dict_get_weight (default_dict) != NULL)
572                           destvar->print = destvar->write = f8_2; 
573                         else
574                           destvar->print = destvar->write = function->format;
575                       }
576                   }
577               } else {
578                 v->src = NULL;
579                 destvar = dict_create_var (agr->dict, dest[i], 0);
580                 if (func_index == N_NO_VARS
581                     && dict_get_weight (default_dict) != NULL)
582                   destvar->print = destvar->write = f8_2; 
583                 else
584                   destvar->print = destvar->write = function->format;
585               }
586           
587             if (!destvar)
588               {
589                 msg (SE, _("Variable name %s is not unique within the "
590                            "aggregate file dictionary, which contains "
591                            "the aggregate variables and the break "
592                            "variables."),
593                      dest[i]);
594                 goto error;
595               }
596
597             free (dest[i]);
598             if (dest_label[i])
599               {
600                 destvar->label = dest_label[i];
601                 dest_label[i] = NULL;
602               }
603
604             v->dest = destvar;
605           }
606           
607           v->include_missing = include_missing;
608
609           if (v->src != NULL)
610             {
611               int j;
612
613               if (v->src->type == NUMERIC)
614                 for (j = 0; j < function->n_args; j++)
615                   v->arg[j].f = arg[j].f;
616               else
617                 for (j = 0; j < function->n_args; j++)
618                   v->arg[j].c = xstrdup (arg[j].c);
619             }
620         }
621       
622       if (src != NULL && src[0]->type == ALPHA)
623         for (i = 0; i < function->n_args; i++)
624           {
625             free (arg[i].c);
626             arg[i].c = NULL;
627           }
628
629       free (src);
630       free (dest);
631       free (dest_label);
632
633       if (!lex_match ('/'))
634         {
635           if (token == '.')
636             return 1;
637
638           lex_error ("expecting end of command");
639           return 0;
640         }
641       continue;
642       
643     error:
644       for (i = 0; i < n_dest; i++)
645         {
646           free (dest[i]);
647           free (dest_label[i]);
648         }
649       free (dest);
650       free (dest_label);
651       free (arg[0].c);
652       free (arg[1].c);
653       if (src && n_src && src[0]->type == ALPHA)
654         for (i = 0; i < function->n_args; i++)
655           {
656             free (arg[i].c);
657             arg[i].c = NULL;
658           }
659       free (src);
660         
661       return 0;
662     }
663 }
664
665 /* Destroys AGR. */
666 static void
667 agr_destroy (struct agr_proc *agr)
668 {
669   struct agr_var *iter, *next;
670
671   any_writer_close (agr->writer);
672   if (agr->sort != NULL)
673     sort_destroy_criteria (agr->sort);
674   free (agr->break_vars);
675   case_destroy (&agr->break_case);
676   for (iter = agr->agr_vars; iter; iter = next)
677     {
678       next = iter->next;
679
680       if (iter->function & FSTRING)
681         {
682           size_t n_args;
683           size_t i;
684
685           n_args = agr_func_tab[iter->function & FUNC].n_args;
686           for (i = 0; i < n_args; i++)
687             free (iter->arg[i].c);
688           free (iter->string);
689         }
690       else if (iter->function == SD)
691         moments1_destroy (iter->moments);
692       free (iter);
693     }
694   if (agr->dict != NULL)
695     dict_destroy (agr->dict);
696
697   case_destroy (&agr->agr_case);
698 }
699 \f
700 /* Execution. */
701
702 static void accumulate_aggregate_info (struct agr_proc *,
703                                        const struct ccase *);
704 static void dump_aggregate_info (struct agr_proc *, struct ccase *);
705
706 /* Processes a single case INPUT for aggregation.  If output is
707    warranted, writes it to OUTPUT and returns nonzero.
708    Otherwise, returns zero and OUTPUT is unmodified. */
709 static int
710 aggregate_single_case (struct agr_proc *agr,
711                        const struct ccase *input, struct ccase *output)
712 {
713   bool finished_group = false;
714   
715   if (agr->case_cnt++ == 0)
716     initialize_aggregate_info (agr, input);
717   else if (case_compare (&agr->break_case, input,
718                          agr->break_vars, agr->break_var_cnt))
719     {
720       dump_aggregate_info (agr, output);
721       finished_group = true;
722
723       initialize_aggregate_info (agr, input);
724     }
725
726   accumulate_aggregate_info (agr, input);
727   return finished_group;
728 }
729
730 /* Accumulates aggregation data from the case INPUT. */
731 static void 
732 accumulate_aggregate_info (struct agr_proc *agr,
733                            const struct ccase *input)
734 {
735   struct agr_var *iter;
736   double weight;
737   int bad_warn = 1;
738
739   weight = dict_get_case_weight (default_dict, input, &bad_warn);
740
741   for (iter = agr->agr_vars; iter; iter = iter->next)
742     if (iter->src)
743       {
744         const union value *v = case_data (input, iter->src->fv);
745
746         if ((!iter->include_missing
747              && mv_is_value_missing (&iter->src->miss, v))
748             || (iter->include_missing && iter->src->type == NUMERIC
749                 && v->f == SYSMIS))
750           {
751             switch (iter->function)
752               {
753               case NMISS:
754               case NMISS | FSTRING:
755                 iter->dbl[0] += weight;
756                 break;
757               case NUMISS:
758               case NUMISS | FSTRING:
759                 iter->int1++;
760                 break;
761               }
762             iter->missing = 1;
763             continue;
764           }
765         
766         /* This is horrible.  There are too many possibilities. */
767         switch (iter->function)
768           {
769           case SUM:
770             iter->dbl[0] += v->f * weight;
771             iter->int1 = 1;
772             break;
773           case MEAN:
774             iter->dbl[0] += v->f * weight;
775             iter->dbl[1] += weight;
776             break;
777           case SD:
778             moments1_add (iter->moments, v->f, weight);
779             break;
780           case MAX:
781             iter->dbl[0] = max (iter->dbl[0], v->f);
782             iter->int1 = 1;
783             break;
784           case MAX | FSTRING:
785             if (memcmp (iter->string, v->s, iter->src->width) < 0)
786               memcpy (iter->string, v->s, iter->src->width);
787             iter->int1 = 1;
788             break;
789           case MIN:
790             iter->dbl[0] = min (iter->dbl[0], v->f);
791             iter->int1 = 1;
792             break;
793           case MIN | FSTRING:
794             if (memcmp (iter->string, v->s, iter->src->width) > 0)
795               memcpy (iter->string, v->s, iter->src->width);
796             iter->int1 = 1;
797             break;
798           case FGT:
799           case PGT:
800             if (v->f > iter->arg[0].f)
801               iter->dbl[0] += weight;
802             iter->dbl[1] += weight;
803             break;
804           case FGT | FSTRING:
805           case PGT | FSTRING:
806             if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
807               iter->dbl[0] += weight;
808             iter->dbl[1] += weight;
809             break;
810           case FLT:
811           case PLT:
812             if (v->f < iter->arg[0].f)
813               iter->dbl[0] += weight;
814             iter->dbl[1] += weight;
815             break;
816           case FLT | FSTRING:
817           case PLT | FSTRING:
818             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
819               iter->dbl[0] += weight;
820             iter->dbl[1] += weight;
821             break;
822           case FIN:
823           case PIN:
824             if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
825               iter->dbl[0] += weight;
826             iter->dbl[1] += weight;
827             break;
828           case FIN | FSTRING:
829           case PIN | FSTRING:
830             if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
831                 && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
832               iter->dbl[0] += weight;
833             iter->dbl[1] += weight;
834             break;
835           case FOUT:
836           case POUT:
837             if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
838               iter->dbl[0] += weight;
839             iter->dbl[1] += weight;
840             break;
841           case FOUT | FSTRING:
842           case POUT | FSTRING:
843             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
844                 || memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
845               iter->dbl[0] += weight;
846             iter->dbl[1] += weight;
847             break;
848           case N:
849           case N | FSTRING:
850             iter->dbl[0] += weight;
851             break;
852           case NU:
853           case NU | FSTRING:
854             iter->int1++;
855             break;
856           case FIRST:
857             if (iter->int1 == 0)
858               {
859                 iter->dbl[0] = v->f;
860                 iter->int1 = 1;
861               }
862             break;
863           case FIRST | FSTRING:
864             if (iter->int1 == 0)
865               {
866                 memcpy (iter->string, v->s, iter->src->width);
867                 iter->int1 = 1;
868               }
869             break;
870           case LAST:
871             iter->dbl[0] = v->f;
872             iter->int1 = 1;
873             break;
874           case LAST | FSTRING:
875             memcpy (iter->string, v->s, iter->src->width);
876             iter->int1 = 1;
877             break;
878           case NMISS:
879           case NMISS | FSTRING:
880           case NUMISS:
881           case NUMISS | FSTRING:
882             /* Our value is not missing or it would have been
883                caught earlier.  Nothing to do. */
884             break;
885           default:
886             assert (0);
887           }
888     } else {
889       switch (iter->function)
890         {
891         case N_NO_VARS:
892           iter->dbl[0] += weight;
893           break;
894         case NU_NO_VARS:
895           iter->int1++;
896           break;
897         default:
898           assert (0);
899         }
900     }
901 }
902
903 /* We've come to a record that differs from the previous in one or
904    more of the break variables.  Make an output record from the
905    accumulated statistics in the OUTPUT case. */
906 static void 
907 dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
908 {
909   {
910     int value_idx = 0;
911     int i;
912
913     for (i = 0; i < agr->break_var_cnt; i++) 
914       {
915         struct variable *v = agr->break_vars[i];
916         memcpy (case_data_rw (output, value_idx),
917                 case_data (&agr->break_case, v->fv),
918                 sizeof (union value) * v->nv);
919         value_idx += v->nv; 
920       }
921   }
922   
923   {
924     struct agr_var *i;
925   
926     for (i = agr->agr_vars; i; i = i->next)
927       {
928         union value *v = case_data_rw (output, i->dest->fv);
929
930         if (agr->missing == COLUMNWISE && i->missing != 0
931             && (i->function & FUNC) != N && (i->function & FUNC) != NU
932             && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
933           {
934             if (i->dest->type == ALPHA)
935               memset (v->s, ' ', i->dest->width);
936             else
937               v->f = SYSMIS;
938             continue;
939           }
940         
941         switch (i->function)
942           {
943           case SUM:
944             v->f = i->int1 ? i->dbl[0] : SYSMIS;
945             break;
946           case MEAN:
947             v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
948             break;
949           case SD:
950             {
951               double variance;
952
953               /* FIXME: we should use two passes. */
954               moments1_calculate (i->moments, NULL, NULL, &variance,
955                                  NULL, NULL);
956               if (variance != SYSMIS)
957                 v->f = sqrt (variance);
958               else
959                 v->f = SYSMIS; 
960             }
961             break;
962           case MAX:
963           case MIN:
964             v->f = i->int1 ? i->dbl[0] : SYSMIS;
965             break;
966           case MAX | FSTRING:
967           case MIN | FSTRING:
968             if (i->int1)
969               memcpy (v->s, i->string, i->dest->width);
970             else
971               memset (v->s, ' ', i->dest->width);
972             break;
973           case FGT:
974           case FGT | FSTRING:
975           case FLT:
976           case FLT | FSTRING:
977           case FIN:
978           case FIN | FSTRING:
979           case FOUT:
980           case FOUT | FSTRING:
981             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
982             break;
983           case PGT:
984           case PGT | FSTRING:
985           case PLT:
986           case PLT | FSTRING:
987           case PIN:
988           case PIN | FSTRING:
989           case POUT:
990           case POUT | FSTRING:
991             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
992             break;
993           case N:
994           case N | FSTRING:
995             v->f = i->dbl[0];
996             break;
997           case NU:
998           case NU | FSTRING:
999             v->f = i->int1;
1000             break;
1001           case FIRST:
1002           case LAST:
1003             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1004             break;
1005           case FIRST | FSTRING:
1006           case LAST | FSTRING:
1007             if (i->int1)
1008               memcpy (v->s, i->string, i->dest->width);
1009             else
1010               memset (v->s, ' ', i->dest->width);
1011             break;
1012           case N_NO_VARS:
1013             v->f = i->dbl[0];
1014             break;
1015           case NU_NO_VARS:
1016             v->f = i->int1;
1017             break;
1018           case NMISS:
1019           case NMISS | FSTRING:
1020             v->f = i->dbl[0];
1021             break;
1022           case NUMISS:
1023           case NUMISS | FSTRING:
1024             v->f = i->int1;
1025             break;
1026           default:
1027             assert (0);
1028           }
1029       }
1030   }
1031 }
1032
1033 /* Resets the state for all the aggregate functions. */
1034 static void
1035 initialize_aggregate_info (struct agr_proc *agr, const struct ccase *input)
1036 {
1037   struct agr_var *iter;
1038
1039   case_destroy (&agr->break_case);
1040   case_clone (&agr->break_case, input);
1041
1042   for (iter = agr->agr_vars; iter; iter = iter->next)
1043     {
1044       iter->missing = 0;
1045       iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1046       iter->int1 = iter->int2 = 0;
1047       switch (iter->function)
1048         {
1049         case MIN:
1050           iter->dbl[0] = DBL_MAX;
1051           break;
1052         case MIN | FSTRING:
1053           memset (iter->string, 255, iter->src->width);
1054           break;
1055         case MAX:
1056           iter->dbl[0] = -DBL_MAX;
1057           break;
1058         case MAX | FSTRING:
1059           memset (iter->string, 0, iter->src->width);
1060           break;
1061         case SD:
1062           if (iter->moments == NULL)
1063             iter->moments = moments1_create (MOMENT_VARIANCE);
1064           else
1065             moments1_clear (iter->moments);
1066           break;
1067         default:
1068           break;
1069         }
1070     }
1071 }
1072 \f
1073 /* Aggregate each case as it comes through.  Cases which aren't needed
1074    are dropped.
1075    Returns true if successful, false if an I/O error occurred. */
1076 static bool
1077 agr_to_active_file (struct ccase *c, void *agr_)
1078 {
1079   struct agr_proc *agr = agr_;
1080
1081   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1082     return agr->sink->class->write (agr->sink, &agr->agr_case);
1083
1084   return true;
1085 }
1086
1087 /* Aggregate the current case and output it if we passed a
1088    breakpoint. */
1089 static bool
1090 presorted_agr_to_sysfile (struct ccase *c, void *agr_) 
1091 {
1092   struct agr_proc *agr = agr_;
1093
1094   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1095     return any_writer_write (agr->writer, &agr->agr_case);
1096   
1097   return true;
1098 }