2b1ae5a00d728b511d861cf9faedebecb1a85bf9
[pspp-builds.git] / src / language / stats / aggregate.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21
22 #include <stdlib.h>
23
24 #include <data/any-writer.h>
25 #include <data/case-sink.h>
26 #include <data/case.h>
27 #include <data/casefile.h>
28 #include <data/dictionary.h>
29 #include <data/file-handle-def.h>
30 #include <data/procedure.h>
31 #include <data/settings.h>
32 #include <data/storage-stream.h>
33 #include <data/sys-file-writer.h>
34 #include <data/variable.h>
35 #include <language/command.h>
36 #include <language/data-io/file-handle.h>
37 #include <language/lexer/lexer.h>
38 #include <language/lexer/variable-parser.h>
39 #include <language/stats/sort-criteria.h>
40 #include <libpspp/alloc.h>
41 #include <libpspp/assertion.h>
42 #include <libpspp/message.h>
43 #include <libpspp/misc.h>
44 #include <libpspp/pool.h>
45 #include <libpspp/str.h>
46 #include <math/moments.h>
47 #include <math/sort.h>
48
49 #include "gettext.h"
50 #define _(msgid) gettext (msgid)
51
52 /* Argument for AGGREGATE function. */
53 union agr_argument
54   {
55     double f;                           /* Numeric. */
56     char *c;                            /* Short or long string. */
57   };
58
59 /* Specifies how to make an aggregate variable. */
60 struct agr_var
61   {
62     struct agr_var *next;               /* Next in list. */
63
64     /* Collected during parsing. */
65     struct variable *src;       /* Source variable. */
66     struct variable *dest;      /* Target variable. */
67     int function;               /* Function. */
68     int include_missing;        /* 1=Include user-missing values. */
69     union agr_argument arg[2];  /* Arguments. */
70
71     /* Accumulated during AGGREGATE execution. */
72     double dbl[3];
73     int int1, int2;
74     char *string;
75     int missing;
76     struct moments1 *moments;
77   };
78
79 /* Aggregation functions. */
80 enum
81   {
82     NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
83     FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
84     N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
85     FUNC = 0x1f, /* Function mask. */
86     FSTRING = 1<<5, /* String function bit. */
87   };
88
89 /* Attributes of an aggregation function. */
90 struct agr_func
91   {
92     const char *name;           /* Aggregation function name. */
93     size_t n_args;              /* Number of arguments. */
94     int alpha_type;             /* When given ALPHA arguments, output type. */
95     struct fmt_spec format;     /* Format spec if alpha_type != ALPHA. */
96   };
97
98 /* Attributes of aggregation functions. */
99 static const struct agr_func agr_func_tab[] = 
100   {
101     {"<NONE>",  0, -1,      {0, 0, 0}},
102     {"SUM",     0, -1,      {FMT_F, 8, 2}},
103     {"MEAN",    0, -1,      {FMT_F, 8, 2}},
104     {"SD",      0, -1,      {FMT_F, 8, 2}},
105     {"MAX",     0, ALPHA,   {-1, -1, -1}}, 
106     {"MIN",     0, ALPHA,   {-1, -1, -1}}, 
107     {"PGT",     1, NUMERIC, {FMT_F, 5, 1}},      
108     {"PLT",     1, NUMERIC, {FMT_F, 5, 1}},       
109     {"PIN",     2, NUMERIC, {FMT_F, 5, 1}},       
110     {"POUT",    2, NUMERIC, {FMT_F, 5, 1}},       
111     {"FGT",     1, NUMERIC, {FMT_F, 5, 3}},       
112     {"FLT",     1, NUMERIC, {FMT_F, 5, 3}},       
113     {"FIN",     2, NUMERIC, {FMT_F, 5, 3}},       
114     {"FOUT",    2, NUMERIC, {FMT_F, 5, 3}},       
115     {"N",       0, NUMERIC, {FMT_F, 7, 0}},       
116     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},       
117     {"NMISS",   0, NUMERIC, {FMT_F, 7, 0}},       
118     {"NUMISS",  0, NUMERIC, {FMT_F, 7, 0}},       
119     {"FIRST",   0, ALPHA,   {-1, -1, -1}}, 
120     {"LAST",    0, ALPHA,   {-1, -1, -1}},
121     {NULL,      0, -1,      {-1, -1, -1}},
122     {"N",       0, NUMERIC, {FMT_F, 7, 0}},
123     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},
124   };
125
126 /* Missing value types. */
127 enum missing_treatment
128   {
129     ITEMWISE,           /* Missing values item by item. */
130     COLUMNWISE          /* Missing values column by column. */
131   };
132
133 /* An entire AGGREGATE procedure. */
134 struct agr_proc 
135   {
136     /* We have either an output file or a sink. */
137     struct any_writer *writer;          /* Output file, or null if none. */
138     struct case_sink *sink;             /* Sink, or null if none. */
139
140     /* Break variables. */
141     struct sort_criteria *sort;         /* Sort criteria. */
142     struct variable **break_vars;       /* Break variables. */
143     size_t break_var_cnt;               /* Number of break variables. */
144     struct ccase break_case;            /* Last values of break variables. */
145
146     enum missing_treatment missing;     /* How to treat missing values. */
147     struct agr_var *agr_vars;           /* First aggregate variable. */
148     struct dictionary *dict;            /* Aggregate dictionary. */
149     const struct dictionary *src_dict;  /* Dict of the source */
150     int case_cnt;                       /* Counts aggregated cases. */
151     struct ccase agr_case;              /* Aggregate case for output. */
152   };
153
154 static void initialize_aggregate_info (struct agr_proc *,
155                                        const struct ccase *);
156
157 /* Prototypes. */
158 static bool parse_aggregate_functions (const struct dictionary *,
159                                        struct agr_proc *);
160 static void agr_destroy (struct agr_proc *);
161 static bool aggregate_single_case (struct agr_proc *agr,
162                                    const struct ccase *input,
163                                    struct ccase *output);
164 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
165
166 /* Aggregating to the active file. */
167 static bool agr_to_active_file (const struct ccase *, void *aux, const struct dataset *);
168
169 /* Aggregating to a system file. */
170 static bool presorted_agr_to_sysfile (const struct ccase *, void *aux, const struct dataset *);
171 \f
172 /* Parsing. */
173
174 /* Parses and executes the AGGREGATE procedure. */
175 int
176 cmd_aggregate (struct dataset *ds)
177 {
178   struct dictionary *dict = dataset_dict (ds);
179   struct agr_proc agr;
180   struct file_handle *out_file = NULL;
181
182   bool copy_documents = false;
183   bool presorted = false;
184   bool saw_direction;
185
186   memset(&agr, 0 , sizeof (agr));
187   agr.missing = ITEMWISE;
188   case_nullify (&agr.break_case);
189   
190   agr.dict = dict_create ();
191   agr.src_dict = dict;
192   dict_set_label (agr.dict, dict_get_label (dict));
193   dict_set_documents (agr.dict, dict_get_documents (dict));
194
195   /* OUTFILE subcommand must be first. */
196   if (!lex_force_match_id ("OUTFILE"))
197     goto error;
198   lex_match ('=');
199   if (!lex_match ('*'))
200     {
201       out_file = fh_parse (FH_REF_FILE | FH_REF_SCRATCH);
202       if (out_file == NULL)
203         goto error;
204     }
205   
206   /* Read most of the subcommands. */
207   for (;;)
208     {
209       lex_match ('/');
210       
211       if (lex_match_id ("MISSING"))
212         {
213           lex_match ('=');
214           if (!lex_match_id ("COLUMNWISE"))
215             {
216               lex_error (_("while expecting COLUMNWISE"));
217               goto error;
218             }
219           agr.missing = COLUMNWISE;
220         }
221       else if (lex_match_id ("DOCUMENT"))
222         copy_documents = true;
223       else if (lex_match_id ("PRESORTED"))
224         presorted = true;
225       else if (lex_match_id ("BREAK"))
226         {
227           int i;
228
229           lex_match ('=');
230           agr.sort = sort_parse_criteria (dict,
231                                           &agr.break_vars, &agr.break_var_cnt,
232                                           &saw_direction, NULL);
233           if (agr.sort == NULL)
234             goto error;
235           
236           for (i = 0; i < agr.break_var_cnt; i++)
237             dict_clone_var_assert (agr.dict, agr.break_vars[i],
238                                    agr.break_vars[i]->name);
239
240           /* BREAK must follow the options. */
241           break;
242         }
243       else
244         {
245           lex_error (_("expecting BREAK"));
246           goto error;
247         }
248     }
249   if (presorted && saw_direction)
250     msg (SW, _("When PRESORTED is specified, specifying sorting directions "
251                "with (A) or (D) has no effect.  Output data will be sorted "
252                "the same way as the input data."));
253       
254   /* Read in the aggregate functions. */
255   lex_match ('/');
256   if (!parse_aggregate_functions (dict, &agr))
257     goto error;
258
259   /* Delete documents. */
260   if (!copy_documents)
261     dict_set_documents (agr.dict, NULL);
262
263   /* Cancel SPLIT FILE. */
264   dict_set_split_vars (agr.dict, NULL, 0);
265   
266   /* Initialize. */
267   agr.case_cnt = 0;
268   case_create (&agr.agr_case, dict_get_next_value_idx (agr.dict));
269
270   /* Output to active file or external file? */
271   if (out_file == NULL) 
272     {
273       /* The active file will be replaced by the aggregated data,
274          so TEMPORARY is moot. */
275       proc_cancel_temporary_transformations (ds);
276
277       if (agr.sort != NULL && !presorted) 
278         {
279           if (!sort_active_file_in_place (ds, agr.sort))
280             goto error;
281         }
282
283       agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
284       if (agr.sink->class->open != NULL)
285         agr.sink->class->open (agr.sink);
286       proc_set_sink (ds, 
287                      create_case_sink (&null_sink_class, 
288                                        dict, NULL));
289       if (!procedure (ds, agr_to_active_file, &agr))
290         goto error;
291       if (agr.case_cnt > 0) 
292         {
293           dump_aggregate_info (&agr, &agr.agr_case);
294           if (!agr.sink->class->write (agr.sink, &agr.agr_case))
295             goto error;
296         }
297       discard_variables (ds);
298       dict_destroy (dict);
299       dataset_set_dict (ds, agr.dict);
300       agr.dict = NULL;
301       proc_set_source (ds, 
302                        agr.sink->class->make_source (agr.sink));
303       free_case_sink (agr.sink);
304     }
305   else
306     {
307       agr.writer = any_writer_open (out_file, agr.dict);
308       if (agr.writer == NULL)
309         goto error;
310       
311       if (agr.sort != NULL && !presorted) 
312         {
313           /* Sorting is needed. */
314           struct casefile *dst;
315           struct casereader *reader;
316           struct ccase c;
317           bool ok = true;
318           
319           dst = sort_active_file_to_casefile (ds, agr.sort);
320           if (dst == NULL)
321             goto error;
322           reader = casefile_get_destructive_reader (dst);
323           while (ok && casereader_read_xfer (reader, &c)) 
324             {
325               if (aggregate_single_case (&agr, &c, &agr.agr_case)) 
326                 ok = any_writer_write (agr.writer, &agr.agr_case);
327               case_destroy (&c);
328             }
329           casereader_destroy (reader);
330           if (ok)
331             ok = !casefile_error (dst);
332           casefile_destroy (dst);
333           if (!ok)
334             goto error;
335         }
336       else 
337         {
338           /* Active file is already sorted. */
339           if (!procedure (ds, presorted_agr_to_sysfile, &agr))
340             goto error;
341         }
342       
343       if (agr.case_cnt > 0) 
344         {
345           dump_aggregate_info (&agr, &agr.agr_case);
346           any_writer_write (agr.writer, &agr.agr_case);
347         }
348       if (any_writer_error (agr.writer))
349         goto error;
350     }
351   
352   agr_destroy (&agr);
353   return CMD_SUCCESS;
354
355 error:
356   agr_destroy (&agr);
357   return CMD_CASCADING_FAILURE;
358 }
359
360 /* Parse all the aggregate functions. */
361 static bool
362 parse_aggregate_functions (const struct dictionary *dict, struct agr_proc *agr)
363 {
364   struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
365
366   /* Parse everything. */
367   tail = NULL;
368   for (;;)
369     {
370       char **dest;
371       char **dest_label;
372       size_t n_dest;
373
374       int include_missing;
375       const struct agr_func *function;
376       int func_index;
377
378       union agr_argument arg[2];
379
380       struct variable **src;
381       size_t n_src;
382
383       size_t i;
384
385       dest = NULL;
386       dest_label = NULL;
387       n_dest = 0;
388       src = NULL;
389       function = NULL;
390       n_src = 0;
391       arg[0].c = NULL;
392       arg[1].c = NULL;
393
394       /* Parse the list of target variables. */
395       while (!lex_match ('='))
396         {
397           size_t n_dest_prev = n_dest;
398           
399           if (!parse_DATA_LIST_vars (&dest, &n_dest,
400                                      PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
401             goto error;
402
403           /* Assign empty labels. */
404           {
405             int j;
406
407             dest_label = xnrealloc (dest_label, n_dest, sizeof *dest_label);
408             for (j = n_dest_prev; j < n_dest; j++)
409               dest_label[j] = NULL;
410           }
411           
412           if (token == T_STRING)
413             {
414               ds_truncate (&tokstr, 255);
415               dest_label[n_dest - 1] = ds_xstrdup (&tokstr);
416               lex_get ();
417             }
418         }
419
420       /* Get the name of the aggregation function. */
421       if (token != T_ID)
422         {
423           lex_error (_("expecting aggregation function"));
424           goto error;
425         }
426
427       include_missing = 0;
428       if (tokid[strlen (tokid) - 1] == '.')
429         {
430           include_missing = 1;
431           tokid[strlen (tokid) - 1] = 0;
432         }
433       
434       for (function = agr_func_tab; function->name; function++)
435         if (!strcasecmp (function->name, tokid))
436           break;
437       if (NULL == function->name)
438         {
439           msg (SE, _("Unknown aggregation function %s."), tokid);
440           goto error;
441         }
442       func_index = function - agr_func_tab;
443       lex_get ();
444
445       /* Check for leading lparen. */
446       if (!lex_match ('('))
447         {
448           if (func_index == N)
449             func_index = N_NO_VARS;
450           else if (func_index == NU)
451             func_index = NU_NO_VARS;
452           else
453             {
454               lex_error (_("expecting `('"));
455               goto error;
456             }
457         }
458       else
459         {
460           /* Parse list of source variables. */
461           {
462             int pv_opts = PV_NO_SCRATCH;
463
464             if (func_index == SUM || func_index == MEAN || func_index == SD)
465               pv_opts |= PV_NUMERIC;
466             else if (function->n_args)
467               pv_opts |= PV_SAME_TYPE;
468
469             if (!parse_variables (dict, &src, &n_src, pv_opts))
470               goto error;
471           }
472
473           /* Parse function arguments, for those functions that
474              require arguments. */
475           if (function->n_args != 0)
476             for (i = 0; i < function->n_args; i++)
477               {
478                 int type;
479             
480                 lex_match (',');
481                 if (token == T_STRING)
482                   {
483                     arg[i].c = ds_xstrdup (&tokstr);
484                     type = ALPHA;
485                   }
486                 else if (lex_is_number ())
487                   {
488                     arg[i].f = tokval;
489                     type = NUMERIC;
490                   } else {
491                     msg (SE, _("Missing argument %d to %s."), i + 1,
492                          function->name);
493                     goto error;
494                   }
495             
496                 lex_get ();
497
498                 if (type != src[0]->type)
499                   {
500                     msg (SE, _("Arguments to %s must be of same type as "
501                                "source variables."),
502                          function->name);
503                     goto error;
504                   }
505               }
506
507           /* Trailing rparen. */
508           if (!lex_match(')'))
509             {
510               lex_error (_("expecting `)'"));
511               goto error;
512             }
513           
514           /* Now check that the number of source variables match
515              the number of target variables.  If we check earlier
516              than this, the user can get very misleading error
517              message, i.e. `AGGREGATE x=SUM(y t).' will get this
518              error message when a proper message would be more
519              like `unknown variable t'. */
520           if (n_src != n_dest)
521             {
522               msg (SE, _("Number of source variables (%u) does not match "
523                          "number of target variables (%u)."),
524                    (unsigned) n_src, (unsigned) n_dest);
525               goto error;
526             }
527
528           if ((func_index == PIN || func_index == POUT
529               || func_index == FIN || func_index == FOUT) 
530               && ((src[0]->type == NUMERIC && arg[0].f > arg[1].f)
531                   || (src[0]->type == ALPHA
532                       && str_compare_rpad (arg[0].c, arg[1].c) > 0)))
533             {
534               union agr_argument t = arg[0];
535               arg[0] = arg[1];
536               arg[1] = t;
537                   
538               msg (SW, _("The value arguments passed to the %s function "
539                          "are out-of-order.  They will be treated as if "
540                          "they had been specified in the correct order."),
541                    function->name);
542             }
543         }
544         
545       /* Finally add these to the linked list of aggregation
546          variables. */
547       for (i = 0; i < n_dest; i++)
548         {
549           struct agr_var *v = xmalloc (sizeof *v);
550
551           /* Add variable to chain. */
552           if (agr->agr_vars != NULL)
553             tail->next = v;
554           else
555             agr->agr_vars = v;
556           tail = v;
557           tail->next = NULL;
558           v->moments = NULL;
559           
560           /* Create the target variable in the aggregate
561              dictionary. */
562           {
563             struct variable *destvar;
564             
565             v->function = func_index;
566
567             if (src)
568               {
569                 v->src = src[i];
570                 
571                 if (src[i]->type == ALPHA)
572                   {
573                     v->function |= FSTRING;
574                     v->string = xmalloc (src[i]->width);
575                   }
576
577                 if (function->alpha_type == ALPHA)
578                   destvar = dict_clone_var (agr->dict, v->src, dest[i]);
579                 else
580                   {
581                     assert (v->src->type == NUMERIC
582                             || function->alpha_type == NUMERIC);
583                     destvar = dict_create_var (agr->dict, dest[i], 0);
584                     if (destvar != NULL) 
585                       {
586                         if ((func_index == N || func_index == NMISS)
587                             && dict_get_weight (dict) != NULL)
588                           destvar->print = destvar->write = f8_2; 
589                         else
590                           destvar->print = destvar->write = function->format;
591                       }
592                   }
593               } else {
594                 v->src = NULL;
595                 destvar = dict_create_var (agr->dict, dest[i], 0);
596                 if (func_index == N_NO_VARS
597                     && dict_get_weight (dict) != NULL)
598                   destvar->print = destvar->write = f8_2; 
599                 else
600                   destvar->print = destvar->write = function->format;
601               }
602           
603             if (!destvar)
604               {
605                 msg (SE, _("Variable name %s is not unique within the "
606                            "aggregate file dictionary, which contains "
607                            "the aggregate variables and the break "
608                            "variables."),
609                      dest[i]);
610                 goto error;
611               }
612
613             free (dest[i]);
614             if (dest_label[i])
615               {
616                 destvar->label = dest_label[i];
617                 dest_label[i] = NULL;
618               }
619
620             v->dest = destvar;
621           }
622           
623           v->include_missing = include_missing;
624
625           if (v->src != NULL)
626             {
627               int j;
628
629               if (v->src->type == NUMERIC)
630                 for (j = 0; j < function->n_args; j++)
631                   v->arg[j].f = arg[j].f;
632               else
633                 for (j = 0; j < function->n_args; j++)
634                   v->arg[j].c = xstrdup (arg[j].c);
635             }
636         }
637       
638       if (src != NULL && src[0]->type == ALPHA)
639         for (i = 0; i < function->n_args; i++)
640           {
641             free (arg[i].c);
642             arg[i].c = NULL;
643           }
644
645       free (src);
646       free (dest);
647       free (dest_label);
648
649       if (!lex_match ('/'))
650         {
651           if (token == '.')
652             return true;
653
654           lex_error ("expecting end of command");
655           return false;
656         }
657       continue;
658       
659     error:
660       for (i = 0; i < n_dest; i++)
661         {
662           free (dest[i]);
663           free (dest_label[i]);
664         }
665       free (dest);
666       free (dest_label);
667       free (arg[0].c);
668       free (arg[1].c);
669       if (src && n_src && src[0]->type == ALPHA)
670         for (i = 0; i < function->n_args; i++)
671           {
672             free (arg[i].c);
673             arg[i].c = NULL;
674           }
675       free (src);
676         
677       return false;
678     }
679 }
680
681 /* Destroys AGR. */
682 static void
683 agr_destroy (struct agr_proc *agr)
684 {
685   struct agr_var *iter, *next;
686
687   any_writer_close (agr->writer);
688   if (agr->sort != NULL)
689     sort_destroy_criteria (agr->sort);
690   free (agr->break_vars);
691   case_destroy (&agr->break_case);
692   for (iter = agr->agr_vars; iter; iter = next)
693     {
694       next = iter->next;
695
696       if (iter->function & FSTRING)
697         {
698           size_t n_args;
699           size_t i;
700
701           n_args = agr_func_tab[iter->function & FUNC].n_args;
702           for (i = 0; i < n_args; i++)
703             free (iter->arg[i].c);
704           free (iter->string);
705         }
706       else if (iter->function == SD)
707         moments1_destroy (iter->moments);
708       free (iter);
709     }
710   if (agr->dict != NULL)
711     dict_destroy (agr->dict);
712
713   case_destroy (&agr->agr_case);
714 }
715 \f
716 /* Execution. */
717
718 static void accumulate_aggregate_info (struct agr_proc *,
719                                        const struct ccase *);
720 static void dump_aggregate_info (struct agr_proc *, struct ccase *);
721
722 /* Processes a single case INPUT for aggregation.  If output is
723    warranted, writes it to OUTPUT and returns true.
724    Otherwise, returns false and OUTPUT is unmodified. */
725 static bool
726 aggregate_single_case (struct agr_proc *agr,
727                        const struct ccase *input, struct ccase *output)
728 {
729   bool finished_group = false;
730   
731   if (agr->case_cnt++ == 0)
732     initialize_aggregate_info (agr, input);
733   else if (case_compare (&agr->break_case, input,
734                          agr->break_vars, agr->break_var_cnt))
735     {
736       dump_aggregate_info (agr, output);
737       finished_group = true;
738
739       initialize_aggregate_info (agr, input);
740     }
741
742   accumulate_aggregate_info (agr, input);
743   return finished_group;
744 }
745
746 /* Accumulates aggregation data from the case INPUT. */
747 static void 
748 accumulate_aggregate_info (struct agr_proc *agr,
749                            const struct ccase *input)
750 {
751   struct agr_var *iter;
752   double weight;
753   bool bad_warn = true;
754
755   weight = dict_get_case_weight (agr->src_dict, input, &bad_warn);
756
757   for (iter = agr->agr_vars; iter; iter = iter->next)
758     if (iter->src)
759       {
760         const union value *v = case_data (input, iter->src->fv);
761
762         if ((!iter->include_missing
763              && mv_is_value_missing (&iter->src->miss, v))
764             || (iter->include_missing && iter->src->type == NUMERIC
765                 && v->f == SYSMIS))
766           {
767             switch (iter->function)
768               {
769               case NMISS:
770               case NMISS | FSTRING:
771                 iter->dbl[0] += weight;
772                 break;
773               case NUMISS:
774               case NUMISS | FSTRING:
775                 iter->int1++;
776                 break;
777               }
778             iter->missing = 1;
779             continue;
780           }
781         
782         /* This is horrible.  There are too many possibilities. */
783         switch (iter->function)
784           {
785           case SUM:
786             iter->dbl[0] += v->f * weight;
787             iter->int1 = 1;
788             break;
789           case MEAN:
790             iter->dbl[0] += v->f * weight;
791             iter->dbl[1] += weight;
792             break;
793           case SD:
794             moments1_add (iter->moments, v->f, weight);
795             break;
796           case MAX:
797             iter->dbl[0] = max (iter->dbl[0], v->f);
798             iter->int1 = 1;
799             break;
800           case MAX | FSTRING:
801             if (memcmp (iter->string, v->s, iter->src->width) < 0)
802               memcpy (iter->string, v->s, iter->src->width);
803             iter->int1 = 1;
804             break;
805           case MIN:
806             iter->dbl[0] = min (iter->dbl[0], v->f);
807             iter->int1 = 1;
808             break;
809           case MIN | FSTRING:
810             if (memcmp (iter->string, v->s, iter->src->width) > 0)
811               memcpy (iter->string, v->s, iter->src->width);
812             iter->int1 = 1;
813             break;
814           case FGT:
815           case PGT:
816             if (v->f > iter->arg[0].f)
817               iter->dbl[0] += weight;
818             iter->dbl[1] += weight;
819             break;
820           case FGT | FSTRING:
821           case PGT | FSTRING:
822             if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
823               iter->dbl[0] += weight;
824             iter->dbl[1] += weight;
825             break;
826           case FLT:
827           case PLT:
828             if (v->f < iter->arg[0].f)
829               iter->dbl[0] += weight;
830             iter->dbl[1] += weight;
831             break;
832           case FLT | FSTRING:
833           case PLT | FSTRING:
834             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
835               iter->dbl[0] += weight;
836             iter->dbl[1] += weight;
837             break;
838           case FIN:
839           case PIN:
840             if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
841               iter->dbl[0] += weight;
842             iter->dbl[1] += weight;
843             break;
844           case FIN | FSTRING:
845           case PIN | FSTRING:
846             if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
847                 && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
848               iter->dbl[0] += weight;
849             iter->dbl[1] += weight;
850             break;
851           case FOUT:
852           case POUT:
853             if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
854               iter->dbl[0] += weight;
855             iter->dbl[1] += weight;
856             break;
857           case FOUT | FSTRING:
858           case POUT | FSTRING:
859             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
860                 || memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
861               iter->dbl[0] += weight;
862             iter->dbl[1] += weight;
863             break;
864           case N:
865           case N | FSTRING:
866             iter->dbl[0] += weight;
867             break;
868           case NU:
869           case NU | FSTRING:
870             iter->int1++;
871             break;
872           case FIRST:
873             if (iter->int1 == 0)
874               {
875                 iter->dbl[0] = v->f;
876                 iter->int1 = 1;
877               }
878             break;
879           case FIRST | FSTRING:
880             if (iter->int1 == 0)
881               {
882                 memcpy (iter->string, v->s, iter->src->width);
883                 iter->int1 = 1;
884               }
885             break;
886           case LAST:
887             iter->dbl[0] = v->f;
888             iter->int1 = 1;
889             break;
890           case LAST | FSTRING:
891             memcpy (iter->string, v->s, iter->src->width);
892             iter->int1 = 1;
893             break;
894           case NMISS:
895           case NMISS | FSTRING:
896           case NUMISS:
897           case NUMISS | FSTRING:
898             /* Our value is not missing or it would have been
899                caught earlier.  Nothing to do. */
900             break;
901           default:
902             NOT_REACHED ();
903           }
904     } else {
905       switch (iter->function)
906         {
907         case N_NO_VARS:
908           iter->dbl[0] += weight;
909           break;
910         case NU_NO_VARS:
911           iter->int1++;
912           break;
913         default:
914           NOT_REACHED ();
915         }
916     }
917 }
918
919 /* We've come to a record that differs from the previous in one or
920    more of the break variables.  Make an output record from the
921    accumulated statistics in the OUTPUT case. */
922 static void 
923 dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
924 {
925   {
926     int value_idx = 0;
927     int i;
928
929     for (i = 0; i < agr->break_var_cnt; i++) 
930       {
931         struct variable *v = agr->break_vars[i];
932         memcpy (case_data_rw (output, value_idx),
933                 case_data (&agr->break_case, v->fv),
934                 sizeof (union value) * v->nv);
935         value_idx += v->nv; 
936       }
937   }
938   
939   {
940     struct agr_var *i;
941   
942     for (i = agr->agr_vars; i; i = i->next)
943       {
944         union value *v = case_data_rw (output, i->dest->fv);
945
946         if (agr->missing == COLUMNWISE && i->missing != 0
947             && (i->function & FUNC) != N && (i->function & FUNC) != NU
948             && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
949           {
950             if (i->dest->type == ALPHA)
951               memset (v->s, ' ', i->dest->width);
952             else
953               v->f = SYSMIS;
954             continue;
955           }
956         
957         switch (i->function)
958           {
959           case SUM:
960             v->f = i->int1 ? i->dbl[0] : SYSMIS;
961             break;
962           case MEAN:
963             v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
964             break;
965           case SD:
966             {
967               double variance;
968
969               /* FIXME: we should use two passes. */
970               moments1_calculate (i->moments, NULL, NULL, &variance,
971                                  NULL, NULL);
972               if (variance != SYSMIS)
973                 v->f = sqrt (variance);
974               else
975                 v->f = SYSMIS; 
976             }
977             break;
978           case MAX:
979           case MIN:
980             v->f = i->int1 ? i->dbl[0] : SYSMIS;
981             break;
982           case MAX | FSTRING:
983           case MIN | FSTRING:
984             if (i->int1)
985               memcpy (v->s, i->string, i->dest->width);
986             else
987               memset (v->s, ' ', i->dest->width);
988             break;
989           case FGT:
990           case FGT | FSTRING:
991           case FLT:
992           case FLT | FSTRING:
993           case FIN:
994           case FIN | FSTRING:
995           case FOUT:
996           case FOUT | FSTRING:
997             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
998             break;
999           case PGT:
1000           case PGT | FSTRING:
1001           case PLT:
1002           case PLT | FSTRING:
1003           case PIN:
1004           case PIN | FSTRING:
1005           case POUT:
1006           case POUT | FSTRING:
1007             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
1008             break;
1009           case N:
1010           case N | FSTRING:
1011             v->f = i->dbl[0];
1012             break;
1013           case NU:
1014           case NU | FSTRING:
1015             v->f = i->int1;
1016             break;
1017           case FIRST:
1018           case LAST:
1019             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1020             break;
1021           case FIRST | FSTRING:
1022           case LAST | FSTRING:
1023             if (i->int1)
1024               memcpy (v->s, i->string, i->dest->width);
1025             else
1026               memset (v->s, ' ', i->dest->width);
1027             break;
1028           case N_NO_VARS:
1029             v->f = i->dbl[0];
1030             break;
1031           case NU_NO_VARS:
1032             v->f = i->int1;
1033             break;
1034           case NMISS:
1035           case NMISS | FSTRING:
1036             v->f = i->dbl[0];
1037             break;
1038           case NUMISS:
1039           case NUMISS | FSTRING:
1040             v->f = i->int1;
1041             break;
1042           default:
1043             NOT_REACHED ();
1044           }
1045       }
1046   }
1047 }
1048
1049 /* Resets the state for all the aggregate functions. */
1050 static void
1051 initialize_aggregate_info (struct agr_proc *agr, const struct ccase *input)
1052 {
1053   struct agr_var *iter;
1054
1055   case_destroy (&agr->break_case);
1056   case_clone (&agr->break_case, input);
1057
1058   for (iter = agr->agr_vars; iter; iter = iter->next)
1059     {
1060       iter->missing = 0;
1061       iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1062       iter->int1 = iter->int2 = 0;
1063       switch (iter->function)
1064         {
1065         case MIN:
1066           iter->dbl[0] = DBL_MAX;
1067           break;
1068         case MIN | FSTRING:
1069           memset (iter->string, 255, iter->src->width);
1070           break;
1071         case MAX:
1072           iter->dbl[0] = -DBL_MAX;
1073           break;
1074         case MAX | FSTRING:
1075           memset (iter->string, 0, iter->src->width);
1076           break;
1077         case SD:
1078           if (iter->moments == NULL)
1079             iter->moments = moments1_create (MOMENT_VARIANCE);
1080           else
1081             moments1_clear (iter->moments);
1082           break;
1083         default:
1084           break;
1085         }
1086     }
1087 }
1088 \f
1089 /* Aggregate each case as it comes through.  Cases which aren't needed
1090    are dropped.
1091    Returns true if successful, false if an I/O error occurred. */
1092 static bool
1093 agr_to_active_file (const struct ccase *c, void *agr_, const struct dataset *ds UNUSED)
1094 {
1095   struct agr_proc *agr = agr_;
1096
1097   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1098     return agr->sink->class->write (agr->sink, &agr->agr_case);
1099
1100   return true;
1101 }
1102
1103 /* Aggregate the current case and output it if we passed a
1104    breakpoint. */
1105 static bool
1106 presorted_agr_to_sysfile (const struct ccase *c, void *agr_, const struct dataset *ds UNUSED) 
1107 {
1108   struct agr_proc *agr = agr_;
1109
1110   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1111     return any_writer_write (agr->writer, &agr->agr_case);
1112   
1113   return true;
1114 }