We've had a mix of min, max from libpspp/misc.h and MIN, MAX from
[pspp-builds.git] / src / language / stats / aggregate.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21
22 #include <stdlib.h>
23
24 #include <data/any-writer.h>
25 #include <data/case-sink.h>
26 #include <data/case.h>
27 #include <data/casefile.h>
28 #include <data/dictionary.h>
29 #include <data/file-handle-def.h>
30 #include <data/procedure.h>
31 #include <data/settings.h>
32 #include <data/storage-stream.h>
33 #include <data/sys-file-writer.h>
34 #include <data/variable.h>
35 #include <language/command.h>
36 #include <language/data-io/file-handle.h>
37 #include <language/lexer/lexer.h>
38 #include <language/lexer/variable-parser.h>
39 #include <language/stats/sort-criteria.h>
40 #include <libpspp/alloc.h>
41 #include <libpspp/assertion.h>
42 #include <libpspp/message.h>
43 #include <libpspp/misc.h>
44 #include <libpspp/pool.h>
45 #include <libpspp/str.h>
46 #include <math/moments.h>
47 #include <math/sort.h>
48
49 #include "minmax.h"
50
51 #include "gettext.h"
52 #define _(msgid) gettext (msgid)
53
54 /* Argument for AGGREGATE function. */
55 union agr_argument
56   {
57     double f;                           /* Numeric. */
58     char *c;                            /* Short or long string. */
59   };
60
61 /* Specifies how to make an aggregate variable. */
62 struct agr_var
63   {
64     struct agr_var *next;               /* Next in list. */
65
66     /* Collected during parsing. */
67     struct variable *src;       /* Source variable. */
68     struct variable *dest;      /* Target variable. */
69     int function;               /* Function. */
70     int include_missing;        /* 1=Include user-missing values. */
71     union agr_argument arg[2];  /* Arguments. */
72
73     /* Accumulated during AGGREGATE execution. */
74     double dbl[3];
75     int int1, int2;
76     char *string;
77     int missing;
78     struct moments1 *moments;
79   };
80
81 /* Aggregation functions. */
82 enum
83   {
84     NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
85     FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
86     N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
87     FUNC = 0x1f, /* Function mask. */
88     FSTRING = 1<<5, /* String function bit. */
89   };
90
91 /* Attributes of an aggregation function. */
92 struct agr_func
93   {
94     const char *name;           /* Aggregation function name. */
95     size_t n_args;              /* Number of arguments. */
96     int alpha_type;             /* When given ALPHA arguments, output type. */
97     struct fmt_spec format;     /* Format spec if alpha_type != ALPHA. */
98   };
99
100 /* Attributes of aggregation functions. */
101 static const struct agr_func agr_func_tab[] = 
102   {
103     {"<NONE>",  0, -1,      {0, 0, 0}},
104     {"SUM",     0, -1,      {FMT_F, 8, 2}},
105     {"MEAN",    0, -1,      {FMT_F, 8, 2}},
106     {"SD",      0, -1,      {FMT_F, 8, 2}},
107     {"MAX",     0, ALPHA,   {-1, -1, -1}}, 
108     {"MIN",     0, ALPHA,   {-1, -1, -1}}, 
109     {"PGT",     1, NUMERIC, {FMT_F, 5, 1}},      
110     {"PLT",     1, NUMERIC, {FMT_F, 5, 1}},       
111     {"PIN",     2, NUMERIC, {FMT_F, 5, 1}},       
112     {"POUT",    2, NUMERIC, {FMT_F, 5, 1}},       
113     {"FGT",     1, NUMERIC, {FMT_F, 5, 3}},       
114     {"FLT",     1, NUMERIC, {FMT_F, 5, 3}},       
115     {"FIN",     2, NUMERIC, {FMT_F, 5, 3}},       
116     {"FOUT",    2, NUMERIC, {FMT_F, 5, 3}},       
117     {"N",       0, NUMERIC, {FMT_F, 7, 0}},       
118     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},       
119     {"NMISS",   0, NUMERIC, {FMT_F, 7, 0}},       
120     {"NUMISS",  0, NUMERIC, {FMT_F, 7, 0}},       
121     {"FIRST",   0, ALPHA,   {-1, -1, -1}}, 
122     {"LAST",    0, ALPHA,   {-1, -1, -1}},
123     {NULL,      0, -1,      {-1, -1, -1}},
124     {"N",       0, NUMERIC, {FMT_F, 7, 0}},
125     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},
126   };
127
128 /* Missing value types. */
129 enum missing_treatment
130   {
131     ITEMWISE,           /* Missing values item by item. */
132     COLUMNWISE          /* Missing values column by column. */
133   };
134
135 /* An entire AGGREGATE procedure. */
136 struct agr_proc 
137   {
138     /* We have either an output file or a sink. */
139     struct any_writer *writer;          /* Output file, or null if none. */
140     struct case_sink *sink;             /* Sink, or null if none. */
141
142     /* Break variables. */
143     struct sort_criteria *sort;         /* Sort criteria. */
144     struct variable **break_vars;       /* Break variables. */
145     size_t break_var_cnt;               /* Number of break variables. */
146     struct ccase break_case;            /* Last values of break variables. */
147
148     enum missing_treatment missing;     /* How to treat missing values. */
149     struct agr_var *agr_vars;           /* First aggregate variable. */
150     struct dictionary *dict;            /* Aggregate dictionary. */
151     const struct dictionary *src_dict;  /* Dict of the source */
152     int case_cnt;                       /* Counts aggregated cases. */
153     struct ccase agr_case;              /* Aggregate case for output. */
154   };
155
156 static void initialize_aggregate_info (struct agr_proc *,
157                                        const struct ccase *);
158
159 /* Prototypes. */
160 static bool parse_aggregate_functions (struct lexer *, const struct dictionary *,
161                                        struct agr_proc *);
162 static void agr_destroy (struct agr_proc *);
163 static bool aggregate_single_case (struct agr_proc *agr,
164                                    const struct ccase *input,
165                                    struct ccase *output);
166 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
167
168 /* Aggregating to the active file. */
169 static bool agr_to_active_file (const struct ccase *, void *aux, const struct dataset *);
170
171 /* Aggregating to a system file. */
172 static bool presorted_agr_to_sysfile (const struct ccase *, void *aux, const struct dataset *);
173 \f
174 /* Parsing. */
175
176 /* Parses and executes the AGGREGATE procedure. */
177 int
178 cmd_aggregate (struct lexer *lexer, struct dataset *ds)
179 {
180   struct dictionary *dict = dataset_dict (ds);
181   struct agr_proc agr;
182   struct file_handle *out_file = NULL;
183
184   bool copy_documents = false;
185   bool presorted = false;
186   bool saw_direction;
187
188   memset(&agr, 0 , sizeof (agr));
189   agr.missing = ITEMWISE;
190   case_nullify (&agr.break_case);
191   
192   agr.dict = dict_create ();
193   agr.src_dict = dict;
194   dict_set_label (agr.dict, dict_get_label (dict));
195   dict_set_documents (agr.dict, dict_get_documents (dict));
196
197   /* OUTFILE subcommand must be first. */
198   if (!lex_force_match_id (lexer, "OUTFILE"))
199     goto error;
200   lex_match (lexer, '=');
201   if (!lex_match (lexer, '*'))
202     {
203       out_file = fh_parse (lexer, FH_REF_FILE | FH_REF_SCRATCH);
204       if (out_file == NULL)
205         goto error;
206     }
207   
208   /* Read most of the subcommands. */
209   for (;;)
210     {
211       lex_match (lexer, '/');
212       
213       if (lex_match_id (lexer, "MISSING"))
214         {
215           lex_match (lexer, '=');
216           if (!lex_match_id (lexer, "COLUMNWISE"))
217             {
218               lex_error (lexer, _("while expecting COLUMNWISE"));
219               goto error;
220             }
221           agr.missing = COLUMNWISE;
222         }
223       else if (lex_match_id (lexer, "DOCUMENT"))
224         copy_documents = true;
225       else if (lex_match_id (lexer, "PRESORTED"))
226         presorted = true;
227       else if (lex_match_id (lexer, "BREAK"))
228         {
229           int i;
230
231           lex_match (lexer, '=');
232           agr.sort = sort_parse_criteria (lexer, dict,
233                                           &agr.break_vars, &agr.break_var_cnt,
234                                           &saw_direction, NULL);
235           if (agr.sort == NULL)
236             goto error;
237           
238           for (i = 0; i < agr.break_var_cnt; i++)
239             dict_clone_var_assert (agr.dict, agr.break_vars[i],
240                                    agr.break_vars[i]->name);
241
242           /* BREAK must follow the options. */
243           break;
244         }
245       else
246         {
247           lex_error (lexer, _("expecting BREAK"));
248           goto error;
249         }
250     }
251   if (presorted && saw_direction)
252     msg (SW, _("When PRESORTED is specified, specifying sorting directions "
253                "with (A) or (D) has no effect.  Output data will be sorted "
254                "the same way as the input data."));
255       
256   /* Read in the aggregate functions. */
257   lex_match (lexer, '/');
258   if (!parse_aggregate_functions (lexer, dict, &agr))
259     goto error;
260
261   /* Delete documents. */
262   if (!copy_documents)
263     dict_set_documents (agr.dict, NULL);
264
265   /* Cancel SPLIT FILE. */
266   dict_set_split_vars (agr.dict, NULL, 0);
267   
268   /* Initialize. */
269   agr.case_cnt = 0;
270   case_create (&agr.agr_case, dict_get_next_value_idx (agr.dict));
271
272   /* Output to active file or external file? */
273   if (out_file == NULL) 
274     {
275       /* The active file will be replaced by the aggregated data,
276          so TEMPORARY is moot. */
277       proc_cancel_temporary_transformations (ds);
278
279       if (agr.sort != NULL && !presorted) 
280         {
281           if (!sort_active_file_in_place (ds, agr.sort))
282             goto error;
283         }
284
285       agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
286       if (agr.sink->class->open != NULL)
287         agr.sink->class->open (agr.sink);
288       proc_set_sink (ds, 
289                      create_case_sink (&null_sink_class, 
290                                        dict, NULL));
291       if (!procedure (ds, agr_to_active_file, &agr))
292         goto error;
293       if (agr.case_cnt > 0) 
294         {
295           dump_aggregate_info (&agr, &agr.agr_case);
296           if (!agr.sink->class->write (agr.sink, &agr.agr_case))
297             goto error;
298         }
299       discard_variables (ds);
300       dict_destroy (dict);
301       dataset_set_dict (ds, agr.dict);
302       agr.dict = NULL;
303       proc_set_source (ds, 
304                        agr.sink->class->make_source (agr.sink));
305       free_case_sink (agr.sink);
306     }
307   else
308     {
309       agr.writer = any_writer_open (out_file, agr.dict);
310       if (agr.writer == NULL)
311         goto error;
312       
313       if (agr.sort != NULL && !presorted) 
314         {
315           /* Sorting is needed. */
316           struct casefile *dst;
317           struct casereader *reader;
318           struct ccase c;
319           bool ok = true;
320           
321           dst = sort_active_file_to_casefile (ds, agr.sort);
322           if (dst == NULL)
323             goto error;
324           reader = casefile_get_destructive_reader (dst);
325           while (ok && casereader_read_xfer (reader, &c)) 
326             {
327               if (aggregate_single_case (&agr, &c, &agr.agr_case)) 
328                 ok = any_writer_write (agr.writer, &agr.agr_case);
329               case_destroy (&c);
330             }
331           casereader_destroy (reader);
332           if (ok)
333             ok = !casefile_error (dst);
334           casefile_destroy (dst);
335           if (!ok)
336             goto error;
337         }
338       else 
339         {
340           /* Active file is already sorted. */
341           if (!procedure (ds, presorted_agr_to_sysfile, &agr))
342             goto error;
343         }
344       
345       if (agr.case_cnt > 0) 
346         {
347           dump_aggregate_info (&agr, &agr.agr_case);
348           any_writer_write (agr.writer, &agr.agr_case);
349         }
350       if (any_writer_error (agr.writer))
351         goto error;
352     }
353   
354   agr_destroy (&agr);
355   return CMD_SUCCESS;
356
357 error:
358   agr_destroy (&agr);
359   return CMD_CASCADING_FAILURE;
360 }
361
362 /* Parse all the aggregate functions. */
363 static bool
364 parse_aggregate_functions (struct lexer *lexer, const struct dictionary *dict, struct agr_proc *agr)
365 {
366   struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
367
368   /* Parse everything. */
369   tail = NULL;
370   for (;;)
371     {
372       char **dest;
373       char **dest_label;
374       size_t n_dest;
375       struct string function_name;
376
377       int include_missing;
378       const struct agr_func *function;
379       int func_index;
380
381       union agr_argument arg[2];
382
383       struct variable **src;
384       size_t n_src;
385
386       size_t i;
387
388       dest = NULL;
389       dest_label = NULL;
390       n_dest = 0;
391       src = NULL;
392       function = NULL;
393       n_src = 0;
394       arg[0].c = NULL;
395       arg[1].c = NULL;
396
397       /* Parse the list of target variables. */
398       while (!lex_match (lexer, '='))
399         {
400           size_t n_dest_prev = n_dest;
401           
402           if (!parse_DATA_LIST_vars (lexer, &dest, &n_dest,
403                                      PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
404             goto error;
405
406           /* Assign empty labels. */
407           {
408             int j;
409
410             dest_label = xnrealloc (dest_label, n_dest, sizeof *dest_label);
411             for (j = n_dest_prev; j < n_dest; j++)
412               dest_label[j] = NULL;
413           }
414
415
416           
417           if (lex_token (lexer) == T_STRING)
418             {
419               struct string label;
420               ds_init_string (&label, lex_tokstr (lexer));
421
422               ds_truncate (&label, 255);
423               dest_label[n_dest - 1] = ds_xstrdup (&label);
424               lex_get (lexer);
425               ds_destroy (&label);
426             }
427         }
428
429       /* Get the name of the aggregation function. */
430       if (lex_token (lexer) != T_ID)
431         {
432           lex_error (lexer, _("expecting aggregation function"));
433           goto error;
434         }
435
436       include_missing = 0;
437
438       ds_init_string (&function_name, lex_tokstr (lexer));
439
440       ds_chomp (&function_name, '.');
441
442       if (lex_tokid(lexer)[strlen (lex_tokid (lexer)) - 1] == '.')
443           include_missing = 1;
444
445       for (function = agr_func_tab; function->name; function++)
446         if (!strcasecmp (function->name, ds_cstr (&function_name)))
447           break;
448       if (NULL == function->name)
449         {
450           msg (SE, _("Unknown aggregation function %s."), 
451                ds_cstr (&function_name));
452           goto error;
453         }
454       ds_destroy (&function_name);
455       func_index = function - agr_func_tab;
456       lex_get (lexer);
457
458       /* Check for leading lparen. */
459       if (!lex_match (lexer, '('))
460         {
461           if (func_index == N)
462             func_index = N_NO_VARS;
463           else if (func_index == NU)
464             func_index = NU_NO_VARS;
465           else
466             {
467               lex_error (lexer, _("expecting `('"));
468               goto error;
469             }
470         }
471       else
472         {
473           /* Parse list of source variables. */
474           {
475             int pv_opts = PV_NO_SCRATCH;
476
477             if (func_index == SUM || func_index == MEAN || func_index == SD)
478               pv_opts |= PV_NUMERIC;
479             else if (function->n_args)
480               pv_opts |= PV_SAME_TYPE;
481
482             if (!parse_variables (lexer, dict, &src, &n_src, pv_opts))
483               goto error;
484           }
485
486           /* Parse function arguments, for those functions that
487              require arguments. */
488           if (function->n_args != 0)
489             for (i = 0; i < function->n_args; i++)
490               {
491                 int type;
492             
493                 lex_match (lexer, ',');
494                 if (lex_token (lexer) == T_STRING)
495                   {
496                     arg[i].c = ds_xstrdup (lex_tokstr (lexer));
497                     type = ALPHA;
498                   }
499                 else if (lex_is_number (lexer))
500                   {
501                     arg[i].f = lex_tokval (lexer);
502                     type = NUMERIC;
503                   } else {
504                     msg (SE, _("Missing argument %d to %s."), i + 1,
505                          function->name);
506                     goto error;
507                   }
508             
509                 lex_get (lexer);
510
511                 if (type != src[0]->type)
512                   {
513                     msg (SE, _("Arguments to %s must be of same type as "
514                                "source variables."),
515                          function->name);
516                     goto error;
517                   }
518               }
519
520           /* Trailing rparen. */
521           if (!lex_match (lexer, ')'))
522             {
523               lex_error (lexer, _("expecting `)'"));
524               goto error;
525             }
526           
527           /* Now check that the number of source variables match
528              the number of target variables.  If we check earlier
529              than this, the user can get very misleading error
530              message, i.e. `AGGREGATE x=SUM(y t).' will get this
531              error message when a proper message would be more
532              like `unknown variable t'. */
533           if (n_src != n_dest)
534             {
535               msg (SE, _("Number of source variables (%u) does not match "
536                          "number of target variables (%u)."),
537                    (unsigned) n_src, (unsigned) n_dest);
538               goto error;
539             }
540
541           if ((func_index == PIN || func_index == POUT
542               || func_index == FIN || func_index == FOUT) 
543               && ((src[0]->type == NUMERIC && arg[0].f > arg[1].f)
544                   || (src[0]->type == ALPHA
545                       && str_compare_rpad (arg[0].c, arg[1].c) > 0)))
546             {
547               union agr_argument t = arg[0];
548               arg[0] = arg[1];
549               arg[1] = t;
550                   
551               msg (SW, _("The value arguments passed to the %s function "
552                          "are out-of-order.  They will be treated as if "
553                          "they had been specified in the correct order."),
554                    function->name);
555             }
556         }
557         
558       /* Finally add these to the linked list of aggregation
559          variables. */
560       for (i = 0; i < n_dest; i++)
561         {
562           struct agr_var *v = xmalloc (sizeof *v);
563
564           /* Add variable to chain. */
565           if (agr->agr_vars != NULL)
566             tail->next = v;
567           else
568             agr->agr_vars = v;
569           tail = v;
570           tail->next = NULL;
571           v->moments = NULL;
572           
573           /* Create the target variable in the aggregate
574              dictionary. */
575           {
576             struct variable *destvar;
577             
578             v->function = func_index;
579
580             if (src)
581               {
582                 v->src = src[i];
583                 
584                 if (src[i]->type == ALPHA)
585                   {
586                     v->function |= FSTRING;
587                     v->string = xmalloc (src[i]->width);
588                   }
589
590                 if (function->alpha_type == ALPHA)
591                   destvar = dict_clone_var (agr->dict, v->src, dest[i]);
592                 else
593                   {
594                     assert (v->src->type == NUMERIC
595                             || function->alpha_type == NUMERIC);
596                     destvar = dict_create_var (agr->dict, dest[i], 0);
597                     if (destvar != NULL) 
598                       {
599                         struct fmt_spec f;
600                         if ((func_index == N || func_index == NMISS)
601                             && dict_get_weight (dict) != NULL)
602                           f = fmt_for_output (FMT_F, 8, 2); 
603                         else
604                           f = function->format;
605                         destvar->print = destvar->write = f;
606                       }
607                   }
608               } else {
609                 struct fmt_spec f;
610                 v->src = NULL;
611                 destvar = dict_create_var (agr->dict, dest[i], 0);
612                 if (func_index == N_NO_VARS && dict_get_weight (dict) != NULL)
613                   f = fmt_for_output (FMT_F, 8, 2); 
614                 else
615                   f = function->format;
616                 destvar->print = destvar->write = f;
617               }
618           
619             if (!destvar)
620               {
621                 msg (SE, _("Variable name %s is not unique within the "
622                            "aggregate file dictionary, which contains "
623                            "the aggregate variables and the break "
624                            "variables."),
625                      dest[i]);
626                 goto error;
627               }
628
629             free (dest[i]);
630             if (dest_label[i])
631               {
632                 destvar->label = dest_label[i];
633                 dest_label[i] = NULL;
634               }
635
636             v->dest = destvar;
637           }
638           
639           v->include_missing = include_missing;
640
641           if (v->src != NULL)
642             {
643               int j;
644
645               if (v->src->type == NUMERIC)
646                 for (j = 0; j < function->n_args; j++)
647                   v->arg[j].f = arg[j].f;
648               else
649                 for (j = 0; j < function->n_args; j++)
650                   v->arg[j].c = xstrdup (arg[j].c);
651             }
652         }
653       
654       if (src != NULL && src[0]->type == ALPHA)
655         for (i = 0; i < function->n_args; i++)
656           {
657             free (arg[i].c);
658             arg[i].c = NULL;
659           }
660
661       free (src);
662       free (dest);
663       free (dest_label);
664
665       if (!lex_match (lexer, '/'))
666         {
667           if (lex_token (lexer) == '.')
668             return true;
669
670           lex_error (lexer, "expecting end of command");
671           return false;
672         }
673       continue;
674       
675     error:
676       ds_destroy (&function_name);
677       for (i = 0; i < n_dest; i++)
678         {
679           free (dest[i]);
680           free (dest_label[i]);
681         }
682       free (dest);
683       free (dest_label);
684       free (arg[0].c);
685       free (arg[1].c);
686       if (src && n_src && src[0]->type == ALPHA)
687         for (i = 0; i < function->n_args; i++)
688           {
689             free (arg[i].c);
690             arg[i].c = NULL;
691           }
692       free (src);
693         
694       return false;
695     }
696 }
697
698 /* Destroys AGR. */
699 static void
700 agr_destroy (struct agr_proc *agr)
701 {
702   struct agr_var *iter, *next;
703
704   any_writer_close (agr->writer);
705   if (agr->sort != NULL)
706     sort_destroy_criteria (agr->sort);
707   free (agr->break_vars);
708   case_destroy (&agr->break_case);
709   for (iter = agr->agr_vars; iter; iter = next)
710     {
711       next = iter->next;
712
713       if (iter->function & FSTRING)
714         {
715           size_t n_args;
716           size_t i;
717
718           n_args = agr_func_tab[iter->function & FUNC].n_args;
719           for (i = 0; i < n_args; i++)
720             free (iter->arg[i].c);
721           free (iter->string);
722         }
723       else if (iter->function == SD)
724         moments1_destroy (iter->moments);
725       free (iter);
726     }
727   if (agr->dict != NULL)
728     dict_destroy (agr->dict);
729
730   case_destroy (&agr->agr_case);
731 }
732 \f
733 /* Execution. */
734
735 static void accumulate_aggregate_info (struct agr_proc *,
736                                        const struct ccase *);
737 static void dump_aggregate_info (struct agr_proc *, struct ccase *);
738
739 /* Processes a single case INPUT for aggregation.  If output is
740    warranted, writes it to OUTPUT and returns true.
741    Otherwise, returns false and OUTPUT is unmodified. */
742 static bool
743 aggregate_single_case (struct agr_proc *agr,
744                        const struct ccase *input, struct ccase *output)
745 {
746   bool finished_group = false;
747   
748   if (agr->case_cnt++ == 0)
749     initialize_aggregate_info (agr, input);
750   else if (case_compare (&agr->break_case, input,
751                          agr->break_vars, agr->break_var_cnt))
752     {
753       dump_aggregate_info (agr, output);
754       finished_group = true;
755
756       initialize_aggregate_info (agr, input);
757     }
758
759   accumulate_aggregate_info (agr, input);
760   return finished_group;
761 }
762
763 /* Accumulates aggregation data from the case INPUT. */
764 static void 
765 accumulate_aggregate_info (struct agr_proc *agr,
766                            const struct ccase *input)
767 {
768   struct agr_var *iter;
769   double weight;
770   bool bad_warn = true;
771
772   weight = dict_get_case_weight (agr->src_dict, input, &bad_warn);
773
774   for (iter = agr->agr_vars; iter; iter = iter->next)
775     if (iter->src)
776       {
777         const union value *v = case_data (input, iter->src->fv);
778
779         if ((!iter->include_missing
780              && mv_is_value_missing (&iter->src->miss, v))
781             || (iter->include_missing && iter->src->type == NUMERIC
782                 && v->f == SYSMIS))
783           {
784             switch (iter->function)
785               {
786               case NMISS:
787               case NMISS | FSTRING:
788                 iter->dbl[0] += weight;
789                 break;
790               case NUMISS:
791               case NUMISS | FSTRING:
792                 iter->int1++;
793                 break;
794               }
795             iter->missing = 1;
796             continue;
797           }
798         
799         /* This is horrible.  There are too many possibilities. */
800         switch (iter->function)
801           {
802           case SUM:
803             iter->dbl[0] += v->f * weight;
804             iter->int1 = 1;
805             break;
806           case MEAN:
807             iter->dbl[0] += v->f * weight;
808             iter->dbl[1] += weight;
809             break;
810           case SD:
811             moments1_add (iter->moments, v->f, weight);
812             break;
813           case MAX:
814             iter->dbl[0] = MAX (iter->dbl[0], v->f);
815             iter->int1 = 1;
816             break;
817           case MAX | FSTRING:
818             if (memcmp (iter->string, v->s, iter->src->width) < 0)
819               memcpy (iter->string, v->s, iter->src->width);
820             iter->int1 = 1;
821             break;
822           case MIN:
823             iter->dbl[0] = MIN (iter->dbl[0], v->f);
824             iter->int1 = 1;
825             break;
826           case MIN | FSTRING:
827             if (memcmp (iter->string, v->s, iter->src->width) > 0)
828               memcpy (iter->string, v->s, iter->src->width);
829             iter->int1 = 1;
830             break;
831           case FGT:
832           case PGT:
833             if (v->f > iter->arg[0].f)
834               iter->dbl[0] += weight;
835             iter->dbl[1] += weight;
836             break;
837           case FGT | FSTRING:
838           case PGT | FSTRING:
839             if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
840               iter->dbl[0] += weight;
841             iter->dbl[1] += weight;
842             break;
843           case FLT:
844           case PLT:
845             if (v->f < iter->arg[0].f)
846               iter->dbl[0] += weight;
847             iter->dbl[1] += weight;
848             break;
849           case FLT | FSTRING:
850           case PLT | FSTRING:
851             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
852               iter->dbl[0] += weight;
853             iter->dbl[1] += weight;
854             break;
855           case FIN:
856           case PIN:
857             if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
858               iter->dbl[0] += weight;
859             iter->dbl[1] += weight;
860             break;
861           case FIN | FSTRING:
862           case PIN | FSTRING:
863             if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
864                 && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
865               iter->dbl[0] += weight;
866             iter->dbl[1] += weight;
867             break;
868           case FOUT:
869           case POUT:
870             if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
871               iter->dbl[0] += weight;
872             iter->dbl[1] += weight;
873             break;
874           case FOUT | FSTRING:
875           case POUT | FSTRING:
876             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
877                 || memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
878               iter->dbl[0] += weight;
879             iter->dbl[1] += weight;
880             break;
881           case N:
882           case N | FSTRING:
883             iter->dbl[0] += weight;
884             break;
885           case NU:
886           case NU | FSTRING:
887             iter->int1++;
888             break;
889           case FIRST:
890             if (iter->int1 == 0)
891               {
892                 iter->dbl[0] = v->f;
893                 iter->int1 = 1;
894               }
895             break;
896           case FIRST | FSTRING:
897             if (iter->int1 == 0)
898               {
899                 memcpy (iter->string, v->s, iter->src->width);
900                 iter->int1 = 1;
901               }
902             break;
903           case LAST:
904             iter->dbl[0] = v->f;
905             iter->int1 = 1;
906             break;
907           case LAST | FSTRING:
908             memcpy (iter->string, v->s, iter->src->width);
909             iter->int1 = 1;
910             break;
911           case NMISS:
912           case NMISS | FSTRING:
913           case NUMISS:
914           case NUMISS | FSTRING:
915             /* Our value is not missing or it would have been
916                caught earlier.  Nothing to do. */
917             break;
918           default:
919             NOT_REACHED ();
920           }
921     } else {
922       switch (iter->function)
923         {
924         case N_NO_VARS:
925           iter->dbl[0] += weight;
926           break;
927         case NU_NO_VARS:
928           iter->int1++;
929           break;
930         default:
931           NOT_REACHED ();
932         }
933     }
934 }
935
936 /* We've come to a record that differs from the previous in one or
937    more of the break variables.  Make an output record from the
938    accumulated statistics in the OUTPUT case. */
939 static void 
940 dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
941 {
942   {
943     int value_idx = 0;
944     int i;
945
946     for (i = 0; i < agr->break_var_cnt; i++) 
947       {
948         struct variable *v = agr->break_vars[i];
949         memcpy (case_data_rw (output, value_idx),
950                 case_data (&agr->break_case, v->fv),
951                 sizeof (union value) * v->nv);
952         value_idx += v->nv; 
953       }
954   }
955   
956   {
957     struct agr_var *i;
958   
959     for (i = agr->agr_vars; i; i = i->next)
960       {
961         union value *v = case_data_rw (output, i->dest->fv);
962
963         if (agr->missing == COLUMNWISE && i->missing != 0
964             && (i->function & FUNC) != N && (i->function & FUNC) != NU
965             && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
966           {
967             if (i->dest->type == ALPHA)
968               memset (v->s, ' ', i->dest->width);
969             else
970               v->f = SYSMIS;
971             continue;
972           }
973         
974         switch (i->function)
975           {
976           case SUM:
977             v->f = i->int1 ? i->dbl[0] : SYSMIS;
978             break;
979           case MEAN:
980             v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
981             break;
982           case SD:
983             {
984               double variance;
985
986               /* FIXME: we should use two passes. */
987               moments1_calculate (i->moments, NULL, NULL, &variance,
988                                  NULL, NULL);
989               if (variance != SYSMIS)
990                 v->f = sqrt (variance);
991               else
992                 v->f = SYSMIS; 
993             }
994             break;
995           case MAX:
996           case MIN:
997             v->f = i->int1 ? i->dbl[0] : SYSMIS;
998             break;
999           case MAX | FSTRING:
1000           case MIN | FSTRING:
1001             if (i->int1)
1002               memcpy (v->s, i->string, i->dest->width);
1003             else
1004               memset (v->s, ' ', i->dest->width);
1005             break;
1006           case FGT:
1007           case FGT | FSTRING:
1008           case FLT:
1009           case FLT | FSTRING:
1010           case FIN:
1011           case FIN | FSTRING:
1012           case FOUT:
1013           case FOUT | FSTRING:
1014             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
1015             break;
1016           case PGT:
1017           case PGT | FSTRING:
1018           case PLT:
1019           case PLT | FSTRING:
1020           case PIN:
1021           case PIN | FSTRING:
1022           case POUT:
1023           case POUT | FSTRING:
1024             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
1025             break;
1026           case N:
1027           case N | FSTRING:
1028             v->f = i->dbl[0];
1029             break;
1030           case NU:
1031           case NU | FSTRING:
1032             v->f = i->int1;
1033             break;
1034           case FIRST:
1035           case LAST:
1036             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1037             break;
1038           case FIRST | FSTRING:
1039           case LAST | FSTRING:
1040             if (i->int1)
1041               memcpy (v->s, i->string, i->dest->width);
1042             else
1043               memset (v->s, ' ', i->dest->width);
1044             break;
1045           case N_NO_VARS:
1046             v->f = i->dbl[0];
1047             break;
1048           case NU_NO_VARS:
1049             v->f = i->int1;
1050             break;
1051           case NMISS:
1052           case NMISS | FSTRING:
1053             v->f = i->dbl[0];
1054             break;
1055           case NUMISS:
1056           case NUMISS | FSTRING:
1057             v->f = i->int1;
1058             break;
1059           default:
1060             NOT_REACHED ();
1061           }
1062       }
1063   }
1064 }
1065
1066 /* Resets the state for all the aggregate functions. */
1067 static void
1068 initialize_aggregate_info (struct agr_proc *agr, const struct ccase *input)
1069 {
1070   struct agr_var *iter;
1071
1072   case_destroy (&agr->break_case);
1073   case_clone (&agr->break_case, input);
1074
1075   for (iter = agr->agr_vars; iter; iter = iter->next)
1076     {
1077       iter->missing = 0;
1078       iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1079       iter->int1 = iter->int2 = 0;
1080       switch (iter->function)
1081         {
1082         case MIN:
1083           iter->dbl[0] = DBL_MAX;
1084           break;
1085         case MIN | FSTRING:
1086           memset (iter->string, 255, iter->src->width);
1087           break;
1088         case MAX:
1089           iter->dbl[0] = -DBL_MAX;
1090           break;
1091         case MAX | FSTRING:
1092           memset (iter->string, 0, iter->src->width);
1093           break;
1094         case SD:
1095           if (iter->moments == NULL)
1096             iter->moments = moments1_create (MOMENT_VARIANCE);
1097           else
1098             moments1_clear (iter->moments);
1099           break;
1100         default:
1101           break;
1102         }
1103     }
1104 }
1105 \f
1106 /* Aggregate each case as it comes through.  Cases which aren't needed
1107    are dropped.
1108    Returns true if successful, false if an I/O error occurred. */
1109 static bool
1110 agr_to_active_file (const struct ccase *c, void *agr_, const struct dataset *ds UNUSED)
1111 {
1112   struct agr_proc *agr = agr_;
1113
1114   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1115     return agr->sink->class->write (agr->sink, &agr->agr_case);
1116
1117   return true;
1118 }
1119
1120 /* Aggregate the current case and output it if we passed a
1121    breakpoint. */
1122 static bool
1123 presorted_agr_to_sysfile (const struct ccase *c, void *agr_, 
1124                           const struct dataset *ds UNUSED) 
1125 {
1126   struct agr_proc *agr = agr_;
1127
1128   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1129     return any_writer_write (agr->writer, &agr->agr_case);
1130   
1131   return true;
1132 }