Changed include paths to be explicitly specified in the #include directive.
[pspp-builds.git] / src / language / stats / aggregate.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21 #include <libpspp/message.h>
22 #include <stdlib.h>
23 #include <libpspp/alloc.h>
24 #include <data/any-writer.h>
25 #include <data/case.h>
26 #include <data/casefile.h>
27 #include <language/command.h>
28 #include <data/dictionary.h>
29 #include <libpspp/message.h>
30 #include <data/file-handle-def.h>
31 #include <language/lexer/lexer.h>
32 #include <libpspp/misc.h>
33 #include <math/moments.h>
34 #include <libpspp/pool.h>
35 #include <data/settings.h>
36 #include <data/sys-file-writer.h>
37 #include <math/sort.h>
38 #include <libpspp/str.h>
39 #include <data/variable.h>
40 #include <procedure.h>
41
42 #include "gettext.h"
43 #define _(msgid) gettext (msgid)
44
45 /* Specifies how to make an aggregate variable. */
46 struct agr_var
47   {
48     struct agr_var *next;               /* Next in list. */
49
50     /* Collected during parsing. */
51     struct variable *src;       /* Source variable. */
52     struct variable *dest;      /* Target variable. */
53     int function;               /* Function. */
54     int include_missing;        /* 1=Include user-missing values. */
55     union value arg[2];         /* Arguments. */
56
57     /* Accumulated during AGGREGATE execution. */
58     double dbl[3];
59     int int1, int2;
60     char *string;
61     int missing;
62     struct moments1 *moments;
63   };
64
65 /* Aggregation functions. */
66 enum
67   {
68     NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
69     FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
70     N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
71     FUNC = 0x1f, /* Function mask. */
72     FSTRING = 1<<5, /* String function bit. */
73   };
74
75 /* Attributes of an aggregation function. */
76 struct agr_func
77   {
78     const char *name;           /* Aggregation function name. */
79     size_t n_args;              /* Number of arguments. */
80     int alpha_type;             /* When given ALPHA arguments, output type. */
81     struct fmt_spec format;     /* Format spec if alpha_type != ALPHA. */
82   };
83
84 /* Attributes of aggregation functions. */
85 static const struct agr_func agr_func_tab[] = 
86   {
87     {"<NONE>",  0, -1,      {0, 0, 0}},
88     {"SUM",     0, -1,      {FMT_F, 8, 2}},
89     {"MEAN",    0, -1,      {FMT_F, 8, 2}},
90     {"SD",      0, -1,      {FMT_F, 8, 2}},
91     {"MAX",     0, ALPHA,   {-1, -1, -1}}, 
92     {"MIN",     0, ALPHA,   {-1, -1, -1}}, 
93     {"PGT",     1, NUMERIC, {FMT_F, 5, 1}},      
94     {"PLT",     1, NUMERIC, {FMT_F, 5, 1}},       
95     {"PIN",     2, NUMERIC, {FMT_F, 5, 1}},       
96     {"POUT",    2, NUMERIC, {FMT_F, 5, 1}},       
97     {"FGT",     1, NUMERIC, {FMT_F, 5, 3}},       
98     {"FLT",     1, NUMERIC, {FMT_F, 5, 3}},       
99     {"FIN",     2, NUMERIC, {FMT_F, 5, 3}},       
100     {"FOUT",    2, NUMERIC, {FMT_F, 5, 3}},       
101     {"N",       0, NUMERIC, {FMT_F, 7, 0}},       
102     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},       
103     {"NMISS",   0, NUMERIC, {FMT_F, 7, 0}},       
104     {"NUMISS",  0, NUMERIC, {FMT_F, 7, 0}},       
105     {"FIRST",   0, ALPHA,   {-1, -1, -1}}, 
106     {"LAST",    0, ALPHA,   {-1, -1, -1}},
107     {NULL,      0, -1,      {-1, -1, -1}},
108     {"N",       0, NUMERIC, {FMT_F, 7, 0}},
109     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},
110   };
111
112 /* Missing value types. */
113 enum missing_treatment
114   {
115     ITEMWISE,           /* Missing values item by item. */
116     COLUMNWISE          /* Missing values column by column. */
117   };
118
119 /* An entire AGGREGATE procedure. */
120 struct agr_proc 
121   {
122     /* We have either an output file or a sink. */
123     struct any_writer *writer;          /* Output file, or null if none. */
124     struct case_sink *sink;             /* Sink, or null if none. */
125
126     /* Break variables. */
127     struct sort_criteria *sort;         /* Sort criteria. */
128     struct variable **break_vars;       /* Break variables. */
129     size_t break_var_cnt;               /* Number of break variables. */
130     struct ccase break_case;            /* Last values of break variables. */
131
132     enum missing_treatment missing;     /* How to treat missing values. */
133     struct agr_var *agr_vars;           /* First aggregate variable. */
134     struct dictionary *dict;            /* Aggregate dictionary. */
135     int case_cnt;                       /* Counts aggregated cases. */
136     struct ccase agr_case;              /* Aggregate case for output. */
137   };
138
139 static void initialize_aggregate_info (struct agr_proc *,
140                                        const struct ccase *);
141
142 /* Prototypes. */
143 static int parse_aggregate_functions (struct agr_proc *);
144 static void agr_destroy (struct agr_proc *);
145 static int aggregate_single_case (struct agr_proc *agr,
146                                   const struct ccase *input,
147                                   struct ccase *output);
148 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
149
150 /* Aggregating to the active file. */
151 static bool agr_to_active_file (struct ccase *, void *aux);
152
153 /* Aggregating to a system file. */
154 static bool presorted_agr_to_sysfile (struct ccase *, void *aux);
155 \f
156 /* Parsing. */
157
158 /* Parses and executes the AGGREGATE procedure. */
159 int
160 cmd_aggregate (void)
161 {
162   struct agr_proc agr;
163   struct file_handle *out_file = NULL;
164
165   bool copy_documents = false;
166   bool presorted = false;
167   bool saw_direction;
168
169   memset(&agr, 0 , sizeof (agr));
170   agr.missing = ITEMWISE;
171   case_nullify (&agr.break_case);
172   
173   agr.dict = dict_create ();
174   dict_set_label (agr.dict, dict_get_label (default_dict));
175   dict_set_documents (agr.dict, dict_get_documents (default_dict));
176
177   /* OUTFILE subcommand must be first. */
178   if (!lex_force_match_id ("OUTFILE"))
179     goto error;
180   lex_match ('=');
181   if (!lex_match ('*'))
182     {
183       out_file = fh_parse (FH_REF_FILE | FH_REF_SCRATCH);
184       if (out_file == NULL)
185         goto error;
186     }
187   
188   /* Read most of the subcommands. */
189   for (;;)
190     {
191       lex_match ('/');
192       
193       if (lex_match_id ("MISSING"))
194         {
195           lex_match ('=');
196           if (!lex_match_id ("COLUMNWISE"))
197             {
198               lex_error (_("while expecting COLUMNWISE"));
199               goto error;
200             }
201           agr.missing = COLUMNWISE;
202         }
203       else if (lex_match_id ("DOCUMENT"))
204         copy_documents = true;
205       else if (lex_match_id ("PRESORTED"))
206         presorted = true;
207       else if (lex_match_id ("BREAK"))
208         {
209           int i;
210
211           lex_match ('=');
212           agr.sort = sort_parse_criteria (default_dict,
213                                           &agr.break_vars, &agr.break_var_cnt,
214                                           &saw_direction, NULL);
215           if (agr.sort == NULL)
216             goto error;
217           
218           for (i = 0; i < agr.break_var_cnt; i++)
219             dict_clone_var_assert (agr.dict, agr.break_vars[i],
220                                    agr.break_vars[i]->name);
221
222           /* BREAK must follow the options. */
223           break;
224         }
225       else
226         {
227           lex_error (_("expecting BREAK"));
228           goto error;
229         }
230     }
231   if (presorted && saw_direction)
232     msg (SW, _("When PRESORTED is specified, specifying sorting directions "
233                "with (A) or (D) has no effect.  Output data will be sorted "
234                "the same way as the input data."));
235       
236   /* Read in the aggregate functions. */
237   lex_match ('/');
238   if (!parse_aggregate_functions (&agr))
239     goto error;
240
241   /* Delete documents. */
242   if (!copy_documents)
243     dict_set_documents (agr.dict, NULL);
244
245   /* Cancel SPLIT FILE. */
246   dict_set_split_vars (agr.dict, NULL, 0);
247   
248   /* Initialize. */
249   agr.case_cnt = 0;
250   case_create (&agr.agr_case, dict_get_next_value_idx (agr.dict));
251
252   /* Output to active file or external file? */
253   if (out_file == NULL) 
254     {
255       /* The active file will be replaced by the aggregated data,
256          so TEMPORARY is moot. */
257       cancel_temporary ();
258
259       if (agr.sort != NULL && !presorted) 
260         {
261           if (!sort_active_file_in_place (agr.sort))
262             goto error;
263         }
264
265       agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
266       if (agr.sink->class->open != NULL)
267         agr.sink->class->open (agr.sink);
268       vfm_sink = create_case_sink (&null_sink_class, default_dict, NULL);
269       if (!procedure (agr_to_active_file, &agr))
270         goto error;
271       if (agr.case_cnt > 0) 
272         {
273           dump_aggregate_info (&agr, &agr.agr_case);
274           if (!agr.sink->class->write (agr.sink, &agr.agr_case))
275             goto error;
276         }
277       dict_destroy (default_dict);
278       default_dict = agr.dict;
279       agr.dict = NULL;
280       vfm_source = agr.sink->class->make_source (agr.sink);
281       free_case_sink (agr.sink);
282     }
283   else
284     {
285       agr.writer = any_writer_open (out_file, agr.dict);
286       if (agr.writer == NULL)
287         goto error;
288       
289       if (agr.sort != NULL && !presorted) 
290         {
291           /* Sorting is needed. */
292           struct casefile *dst;
293           struct casereader *reader;
294           struct ccase c;
295           bool ok = true;
296           
297           dst = sort_active_file_to_casefile (agr.sort);
298           if (dst == NULL)
299             goto error;
300           reader = casefile_get_destructive_reader (dst);
301           while (ok && casereader_read_xfer (reader, &c)) 
302             {
303               if (aggregate_single_case (&agr, &c, &agr.agr_case)) 
304                 ok = any_writer_write (agr.writer, &agr.agr_case);
305               case_destroy (&c);
306             }
307           casereader_destroy (reader);
308           if (ok)
309             ok = !casefile_error (dst);
310           casefile_destroy (dst);
311           if (!ok)
312             goto error;
313         }
314       else 
315         {
316           /* Active file is already sorted. */
317           if (!procedure (presorted_agr_to_sysfile, &agr))
318             goto error;
319         }
320       
321       if (agr.case_cnt > 0) 
322         {
323           dump_aggregate_info (&agr, &agr.agr_case);
324           any_writer_write (agr.writer, &agr.agr_case);
325         }
326       if (any_writer_error (agr.writer))
327         goto error;
328     }
329   
330   agr_destroy (&agr);
331   return CMD_SUCCESS;
332
333 error:
334   agr_destroy (&agr);
335   return CMD_CASCADING_FAILURE;
336 }
337
338 /* Parse all the aggregate functions. */
339 static int
340 parse_aggregate_functions (struct agr_proc *agr)
341 {
342   struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
343
344   /* Parse everything. */
345   tail = NULL;
346   for (;;)
347     {
348       char **dest;
349       char **dest_label;
350       size_t n_dest;
351
352       int include_missing;
353       const struct agr_func *function;
354       int func_index;
355
356       union value arg[2];
357
358       struct variable **src;
359       size_t n_src;
360
361       size_t i;
362
363       dest = NULL;
364       dest_label = NULL;
365       n_dest = 0;
366       src = NULL;
367       function = NULL;
368       n_src = 0;
369       arg[0].c = NULL;
370       arg[1].c = NULL;
371
372       /* Parse the list of target variables. */
373       while (!lex_match ('='))
374         {
375           size_t n_dest_prev = n_dest;
376           
377           if (!parse_DATA_LIST_vars (&dest, &n_dest,
378                                      PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
379             goto error;
380
381           /* Assign empty labels. */
382           {
383             int j;
384
385             dest_label = xnrealloc (dest_label, n_dest, sizeof *dest_label);
386             for (j = n_dest_prev; j < n_dest; j++)
387               dest_label[j] = NULL;
388           }
389           
390           if (token == T_STRING)
391             {
392               ds_truncate (&tokstr, 255);
393               dest_label[n_dest - 1] = xstrdup (ds_c_str (&tokstr));
394               lex_get ();
395             }
396         }
397
398       /* Get the name of the aggregation function. */
399       if (token != T_ID)
400         {
401           lex_error (_("expecting aggregation function"));
402           goto error;
403         }
404
405       include_missing = 0;
406       if (tokid[strlen (tokid) - 1] == '.')
407         {
408           include_missing = 1;
409           tokid[strlen (tokid) - 1] = 0;
410         }
411       
412       for (function = agr_func_tab; function->name; function++)
413         if (!strcasecmp (function->name, tokid))
414           break;
415       if (NULL == function->name)
416         {
417           msg (SE, _("Unknown aggregation function %s."), tokid);
418           goto error;
419         }
420       func_index = function - agr_func_tab;
421       lex_get ();
422
423       /* Check for leading lparen. */
424       if (!lex_match ('('))
425         {
426           if (func_index == N)
427             func_index = N_NO_VARS;
428           else if (func_index == NU)
429             func_index = NU_NO_VARS;
430           else
431             {
432               lex_error (_("expecting `('"));
433               goto error;
434             }
435         }
436       else
437         {
438           /* Parse list of source variables. */
439           {
440             int pv_opts = PV_NO_SCRATCH;
441
442             if (func_index == SUM || func_index == MEAN || func_index == SD)
443               pv_opts |= PV_NUMERIC;
444             else if (function->n_args)
445               pv_opts |= PV_SAME_TYPE;
446
447             if (!parse_variables (default_dict, &src, &n_src, pv_opts))
448               goto error;
449           }
450
451           /* Parse function arguments, for those functions that
452              require arguments. */
453           if (function->n_args != 0)
454             for (i = 0; i < function->n_args; i++)
455               {
456                 int type;
457             
458                 lex_match (',');
459                 if (token == T_STRING)
460                   {
461                     arg[i].c = xstrdup (ds_c_str (&tokstr));
462                     type = ALPHA;
463                   }
464                 else if (lex_is_number ())
465                   {
466                     arg[i].f = tokval;
467                     type = NUMERIC;
468                   } else {
469                     msg (SE, _("Missing argument %d to %s."), i + 1,
470                          function->name);
471                     goto error;
472                   }
473             
474                 lex_get ();
475
476                 if (type != src[0]->type)
477                   {
478                     msg (SE, _("Arguments to %s must be of same type as "
479                                "source variables."),
480                          function->name);
481                     goto error;
482                   }
483               }
484
485           /* Trailing rparen. */
486           if (!lex_match(')'))
487             {
488               lex_error (_("expecting `)'"));
489               goto error;
490             }
491           
492           /* Now check that the number of source variables match
493              the number of target variables.  If we check earlier
494              than this, the user can get very misleading error
495              message, i.e. `AGGREGATE x=SUM(y t).' will get this
496              error message when a proper message would be more
497              like `unknown variable t'. */
498           if (n_src != n_dest)
499             {
500               msg (SE, _("Number of source variables (%u) does not match "
501                          "number of target variables (%u)."),
502                    (unsigned) n_src, (unsigned) n_dest);
503               goto error;
504             }
505
506           if ((func_index == PIN || func_index == POUT
507               || func_index == FIN || func_index == FOUT) 
508               && ((src[0]->type == NUMERIC && arg[0].f > arg[1].f)
509                   || (src[0]->type == ALPHA
510                       && str_compare_rpad (arg[0].c, arg[1].c) > 0)))
511             {
512               union value t = arg[0];
513               arg[0] = arg[1];
514               arg[1] = t;
515                   
516               msg (SW, _("The value arguments passed to the %s function "
517                          "are out-of-order.  They will be treated as if "
518                          "they had been specified in the correct order."),
519                    function->name);
520             }
521         }
522         
523       /* Finally add these to the linked list of aggregation
524          variables. */
525       for (i = 0; i < n_dest; i++)
526         {
527           struct agr_var *v = xmalloc (sizeof *v);
528
529           /* Add variable to chain. */
530           if (agr->agr_vars != NULL)
531             tail->next = v;
532           else
533             agr->agr_vars = v;
534           tail = v;
535           tail->next = NULL;
536           v->moments = NULL;
537           
538           /* Create the target variable in the aggregate
539              dictionary. */
540           {
541             struct variable *destvar;
542             
543             v->function = func_index;
544
545             if (src)
546               {
547                 v->src = src[i];
548                 
549                 if (src[i]->type == ALPHA)
550                   {
551                     v->function |= FSTRING;
552                     v->string = xmalloc (src[i]->width);
553                   }
554
555                 if (function->alpha_type == ALPHA)
556                   destvar = dict_clone_var (agr->dict, v->src, dest[i]);
557                 else
558                   {
559                     assert (v->src->type == NUMERIC
560                             || function->alpha_type == NUMERIC);
561                     destvar = dict_create_var (agr->dict, dest[i], 0);
562                     if (destvar != NULL) 
563                       {
564                         if ((func_index == N || func_index == NMISS)
565                             && dict_get_weight (default_dict) != NULL)
566                           destvar->print = destvar->write = f8_2; 
567                         else
568                           destvar->print = destvar->write = function->format;
569                       }
570                   }
571               } else {
572                 v->src = NULL;
573                 destvar = dict_create_var (agr->dict, dest[i], 0);
574                 if (func_index == N_NO_VARS
575                     && dict_get_weight (default_dict) != NULL)
576                   destvar->print = destvar->write = f8_2; 
577                 else
578                   destvar->print = destvar->write = function->format;
579               }
580           
581             if (!destvar)
582               {
583                 msg (SE, _("Variable name %s is not unique within the "
584                            "aggregate file dictionary, which contains "
585                            "the aggregate variables and the break "
586                            "variables."),
587                      dest[i]);
588                 goto error;
589               }
590
591             free (dest[i]);
592             destvar->init = 0;
593             if (dest_label[i])
594               {
595                 destvar->label = dest_label[i];
596                 dest_label[i] = NULL;
597               }
598
599             v->dest = destvar;
600           }
601           
602           v->include_missing = include_missing;
603
604           if (v->src != NULL)
605             {
606               int j;
607
608               if (v->src->type == NUMERIC)
609                 for (j = 0; j < function->n_args; j++)
610                   v->arg[j].f = arg[j].f;
611               else
612                 for (j = 0; j < function->n_args; j++)
613                   v->arg[j].c = xstrdup (arg[j].c);
614             }
615         }
616       
617       if (src != NULL && src[0]->type == ALPHA)
618         for (i = 0; i < function->n_args; i++)
619           {
620             free (arg[i].c);
621             arg[i].c = NULL;
622           }
623
624       free (src);
625       free (dest);
626       free (dest_label);
627
628       if (!lex_match ('/'))
629         {
630           if (token == '.')
631             return 1;
632
633           lex_error ("expecting end of command");
634           return 0;
635         }
636       continue;
637       
638     error:
639       for (i = 0; i < n_dest; i++)
640         {
641           free (dest[i]);
642           free (dest_label[i]);
643         }
644       free (dest);
645       free (dest_label);
646       free (arg[0].c);
647       free (arg[1].c);
648       if (src && n_src && src[0]->type == ALPHA)
649         for (i = 0; i < function->n_args; i++)
650           {
651             free (arg[i].c);
652             arg[i].c = NULL;
653           }
654       free (src);
655         
656       return 0;
657     }
658 }
659
660 /* Destroys AGR. */
661 static void
662 agr_destroy (struct agr_proc *agr)
663 {
664   struct agr_var *iter, *next;
665
666   any_writer_close (agr->writer);
667   if (agr->sort != NULL)
668     sort_destroy_criteria (agr->sort);
669   free (agr->break_vars);
670   case_destroy (&agr->break_case);
671   for (iter = agr->agr_vars; iter; iter = next)
672     {
673       next = iter->next;
674
675       if (iter->function & FSTRING)
676         {
677           size_t n_args;
678           size_t i;
679
680           n_args = agr_func_tab[iter->function & FUNC].n_args;
681           for (i = 0; i < n_args; i++)
682             free (iter->arg[i].c);
683           free (iter->string);
684         }
685       else if (iter->function == SD)
686         moments1_destroy (iter->moments);
687       free (iter);
688     }
689   if (agr->dict != NULL)
690     dict_destroy (agr->dict);
691
692   case_destroy (&agr->agr_case);
693 }
694 \f
695 /* Execution. */
696
697 static void accumulate_aggregate_info (struct agr_proc *,
698                                        const struct ccase *);
699 static void dump_aggregate_info (struct agr_proc *, struct ccase *);
700
701 /* Processes a single case INPUT for aggregation.  If output is
702    warranted, writes it to OUTPUT and returns nonzero.
703    Otherwise, returns zero and OUTPUT is unmodified. */
704 static int
705 aggregate_single_case (struct agr_proc *agr,
706                        const struct ccase *input, struct ccase *output)
707 {
708   bool finished_group = false;
709   
710   if (agr->case_cnt++ == 0)
711     initialize_aggregate_info (agr, input);
712   else if (case_compare (&agr->break_case, input,
713                          agr->break_vars, agr->break_var_cnt))
714     {
715       dump_aggregate_info (agr, output);
716       finished_group = true;
717
718       initialize_aggregate_info (agr, input);
719     }
720
721   accumulate_aggregate_info (agr, input);
722   return finished_group;
723 }
724
725 /* Accumulates aggregation data from the case INPUT. */
726 static void 
727 accumulate_aggregate_info (struct agr_proc *agr,
728                            const struct ccase *input)
729 {
730   struct agr_var *iter;
731   double weight;
732   int bad_warn = 1;
733
734   weight = dict_get_case_weight (default_dict, input, &bad_warn);
735
736   for (iter = agr->agr_vars; iter; iter = iter->next)
737     if (iter->src)
738       {
739         const union value *v = case_data (input, iter->src->fv);
740
741         if ((!iter->include_missing
742              && mv_is_value_missing (&iter->src->miss, v))
743             || (iter->include_missing && iter->src->type == NUMERIC
744                 && v->f == SYSMIS))
745           {
746             switch (iter->function)
747               {
748               case NMISS:
749               case NMISS | FSTRING:
750                 iter->dbl[0] += weight;
751                 break;
752               case NUMISS:
753               case NUMISS | FSTRING:
754                 iter->int1++;
755                 break;
756               }
757             iter->missing = 1;
758             continue;
759           }
760         
761         /* This is horrible.  There are too many possibilities. */
762         switch (iter->function)
763           {
764           case SUM:
765             iter->dbl[0] += v->f * weight;
766             iter->int1 = 1;
767             break;
768           case MEAN:
769             iter->dbl[0] += v->f * weight;
770             iter->dbl[1] += weight;
771             break;
772           case SD:
773             moments1_add (iter->moments, v->f, weight);
774             break;
775           case MAX:
776             iter->dbl[0] = max (iter->dbl[0], v->f);
777             iter->int1 = 1;
778             break;
779           case MAX | FSTRING:
780             if (memcmp (iter->string, v->s, iter->src->width) < 0)
781               memcpy (iter->string, v->s, iter->src->width);
782             iter->int1 = 1;
783             break;
784           case MIN:
785             iter->dbl[0] = min (iter->dbl[0], v->f);
786             iter->int1 = 1;
787             break;
788           case MIN | FSTRING:
789             if (memcmp (iter->string, v->s, iter->src->width) > 0)
790               memcpy (iter->string, v->s, iter->src->width);
791             iter->int1 = 1;
792             break;
793           case FGT:
794           case PGT:
795             if (v->f > iter->arg[0].f)
796               iter->dbl[0] += weight;
797             iter->dbl[1] += weight;
798             break;
799           case FGT | FSTRING:
800           case PGT | FSTRING:
801             if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
802               iter->dbl[0] += weight;
803             iter->dbl[1] += weight;
804             break;
805           case FLT:
806           case PLT:
807             if (v->f < iter->arg[0].f)
808               iter->dbl[0] += weight;
809             iter->dbl[1] += weight;
810             break;
811           case FLT | FSTRING:
812           case PLT | FSTRING:
813             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
814               iter->dbl[0] += weight;
815             iter->dbl[1] += weight;
816             break;
817           case FIN:
818           case PIN:
819             if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
820               iter->dbl[0] += weight;
821             iter->dbl[1] += weight;
822             break;
823           case FIN | FSTRING:
824           case PIN | FSTRING:
825             if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
826                 && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
827               iter->dbl[0] += weight;
828             iter->dbl[1] += weight;
829             break;
830           case FOUT:
831           case POUT:
832             if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
833               iter->dbl[0] += weight;
834             iter->dbl[1] += weight;
835             break;
836           case FOUT | FSTRING:
837           case POUT | FSTRING:
838             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
839                 || memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
840               iter->dbl[0] += weight;
841             iter->dbl[1] += weight;
842             break;
843           case N:
844           case N | FSTRING:
845             iter->dbl[0] += weight;
846             break;
847           case NU:
848           case NU | FSTRING:
849             iter->int1++;
850             break;
851           case FIRST:
852             if (iter->int1 == 0)
853               {
854                 iter->dbl[0] = v->f;
855                 iter->int1 = 1;
856               }
857             break;
858           case FIRST | FSTRING:
859             if (iter->int1 == 0)
860               {
861                 memcpy (iter->string, v->s, iter->src->width);
862                 iter->int1 = 1;
863               }
864             break;
865           case LAST:
866             iter->dbl[0] = v->f;
867             iter->int1 = 1;
868             break;
869           case LAST | FSTRING:
870             memcpy (iter->string, v->s, iter->src->width);
871             iter->int1 = 1;
872             break;
873           case NMISS:
874           case NMISS | FSTRING:
875           case NUMISS:
876           case NUMISS | FSTRING:
877             /* Our value is not missing or it would have been
878                caught earlier.  Nothing to do. */
879             break;
880           default:
881             assert (0);
882           }
883     } else {
884       switch (iter->function)
885         {
886         case N_NO_VARS:
887           iter->dbl[0] += weight;
888           break;
889         case NU_NO_VARS:
890           iter->int1++;
891           break;
892         default:
893           assert (0);
894         }
895     }
896 }
897
898 /* We've come to a record that differs from the previous in one or
899    more of the break variables.  Make an output record from the
900    accumulated statistics in the OUTPUT case. */
901 static void 
902 dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
903 {
904   {
905     int value_idx = 0;
906     int i;
907
908     for (i = 0; i < agr->break_var_cnt; i++) 
909       {
910         struct variable *v = agr->break_vars[i];
911         memcpy (case_data_rw (output, value_idx),
912                 case_data (&agr->break_case, v->fv),
913                 sizeof (union value) * v->nv);
914         value_idx += v->nv; 
915       }
916   }
917   
918   {
919     struct agr_var *i;
920   
921     for (i = agr->agr_vars; i; i = i->next)
922       {
923         union value *v = case_data_rw (output, i->dest->fv);
924
925         if (agr->missing == COLUMNWISE && i->missing != 0
926             && (i->function & FUNC) != N && (i->function & FUNC) != NU
927             && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
928           {
929             if (i->dest->type == ALPHA)
930               memset (v->s, ' ', i->dest->width);
931             else
932               v->f = SYSMIS;
933             continue;
934           }
935         
936         switch (i->function)
937           {
938           case SUM:
939             v->f = i->int1 ? i->dbl[0] : SYSMIS;
940             break;
941           case MEAN:
942             v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
943             break;
944           case SD:
945             {
946               double variance;
947
948               /* FIXME: we should use two passes. */
949               moments1_calculate (i->moments, NULL, NULL, &variance,
950                                  NULL, NULL);
951               if (variance != SYSMIS)
952                 v->f = sqrt (variance);
953               else
954                 v->f = SYSMIS; 
955             }
956             break;
957           case MAX:
958           case MIN:
959             v->f = i->int1 ? i->dbl[0] : SYSMIS;
960             break;
961           case MAX | FSTRING:
962           case MIN | FSTRING:
963             if (i->int1)
964               memcpy (v->s, i->string, i->dest->width);
965             else
966               memset (v->s, ' ', i->dest->width);
967             break;
968           case FGT:
969           case FGT | FSTRING:
970           case FLT:
971           case FLT | FSTRING:
972           case FIN:
973           case FIN | FSTRING:
974           case FOUT:
975           case FOUT | FSTRING:
976             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
977             break;
978           case PGT:
979           case PGT | FSTRING:
980           case PLT:
981           case PLT | FSTRING:
982           case PIN:
983           case PIN | FSTRING:
984           case POUT:
985           case POUT | FSTRING:
986             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
987             break;
988           case N:
989           case N | FSTRING:
990             v->f = i->dbl[0];
991             break;
992           case NU:
993           case NU | FSTRING:
994             v->f = i->int1;
995             break;
996           case FIRST:
997           case LAST:
998             v->f = i->int1 ? i->dbl[0] : SYSMIS;
999             break;
1000           case FIRST | FSTRING:
1001           case LAST | FSTRING:
1002             if (i->int1)
1003               memcpy (v->s, i->string, i->dest->width);
1004             else
1005               memset (v->s, ' ', i->dest->width);
1006             break;
1007           case N_NO_VARS:
1008             v->f = i->dbl[0];
1009             break;
1010           case NU_NO_VARS:
1011             v->f = i->int1;
1012             break;
1013           case NMISS:
1014           case NMISS | FSTRING:
1015             v->f = i->dbl[0];
1016             break;
1017           case NUMISS:
1018           case NUMISS | FSTRING:
1019             v->f = i->int1;
1020             break;
1021           default:
1022             assert (0);
1023           }
1024       }
1025   }
1026 }
1027
1028 /* Resets the state for all the aggregate functions. */
1029 static void
1030 initialize_aggregate_info (struct agr_proc *agr, const struct ccase *input)
1031 {
1032   struct agr_var *iter;
1033
1034   case_destroy (&agr->break_case);
1035   case_clone (&agr->break_case, input);
1036
1037   for (iter = agr->agr_vars; iter; iter = iter->next)
1038     {
1039       iter->missing = 0;
1040       iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1041       iter->int1 = iter->int2 = 0;
1042       switch (iter->function)
1043         {
1044         case MIN:
1045           iter->dbl[0] = DBL_MAX;
1046           break;
1047         case MIN | FSTRING:
1048           memset (iter->string, 255, iter->src->width);
1049           break;
1050         case MAX:
1051           iter->dbl[0] = -DBL_MAX;
1052           break;
1053         case MAX | FSTRING:
1054           memset (iter->string, 0, iter->src->width);
1055           break;
1056         case SD:
1057           if (iter->moments == NULL)
1058             iter->moments = moments1_create (MOMENT_VARIANCE);
1059           else
1060             moments1_clear (iter->moments);
1061           break;
1062         default:
1063           break;
1064         }
1065     }
1066 }
1067 \f
1068 /* Aggregate each case as it comes through.  Cases which aren't needed
1069    are dropped.
1070    Returns true if successful, false if an I/O error occurred. */
1071 static bool
1072 agr_to_active_file (struct ccase *c, void *agr_)
1073 {
1074   struct agr_proc *agr = agr_;
1075
1076   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1077     return agr->sink->class->write (agr->sink, &agr->agr_case);
1078
1079   return true;
1080 }
1081
1082 /* Aggregate the current case and output it if we passed a
1083    breakpoint. */
1084 static bool
1085 presorted_agr_to_sysfile (struct ccase *c, void *agr_) 
1086 {
1087   struct agr_proc *agr = agr_;
1088
1089   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1090     return any_writer_write (agr->writer, &agr->agr_case);
1091   
1092   return true;
1093 }