Completely rewrite src/data/format.[ch], to achieve better
[pspp-builds.git] / src / language / stats / aggregate.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000, 2006 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21
22 #include <stdlib.h>
23
24 #include <data/any-writer.h>
25 #include <data/case-sink.h>
26 #include <data/case.h>
27 #include <data/casefile.h>
28 #include <data/dictionary.h>
29 #include <data/file-handle-def.h>
30 #include <data/procedure.h>
31 #include <data/settings.h>
32 #include <data/storage-stream.h>
33 #include <data/sys-file-writer.h>
34 #include <data/variable.h>
35 #include <language/command.h>
36 #include <language/data-io/file-handle.h>
37 #include <language/lexer/lexer.h>
38 #include <language/lexer/variable-parser.h>
39 #include <language/stats/sort-criteria.h>
40 #include <libpspp/alloc.h>
41 #include <libpspp/assertion.h>
42 #include <libpspp/message.h>
43 #include <libpspp/misc.h>
44 #include <libpspp/pool.h>
45 #include <libpspp/str.h>
46 #include <math/moments.h>
47 #include <math/sort.h>
48
49 #include "gettext.h"
50 #define _(msgid) gettext (msgid)
51
52 /* Argument for AGGREGATE function. */
53 union agr_argument
54   {
55     double f;                           /* Numeric. */
56     char *c;                            /* Short or long string. */
57   };
58
59 /* Specifies how to make an aggregate variable. */
60 struct agr_var
61   {
62     struct agr_var *next;               /* Next in list. */
63
64     /* Collected during parsing. */
65     struct variable *src;       /* Source variable. */
66     struct variable *dest;      /* Target variable. */
67     int function;               /* Function. */
68     int include_missing;        /* 1=Include user-missing values. */
69     union agr_argument arg[2];  /* Arguments. */
70
71     /* Accumulated during AGGREGATE execution. */
72     double dbl[3];
73     int int1, int2;
74     char *string;
75     int missing;
76     struct moments1 *moments;
77   };
78
79 /* Aggregation functions. */
80 enum
81   {
82     NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
83     FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
84     N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
85     FUNC = 0x1f, /* Function mask. */
86     FSTRING = 1<<5, /* String function bit. */
87   };
88
89 /* Attributes of an aggregation function. */
90 struct agr_func
91   {
92     const char *name;           /* Aggregation function name. */
93     size_t n_args;              /* Number of arguments. */
94     int alpha_type;             /* When given ALPHA arguments, output type. */
95     struct fmt_spec format;     /* Format spec if alpha_type != ALPHA. */
96   };
97
98 /* Attributes of aggregation functions. */
99 static const struct agr_func agr_func_tab[] = 
100   {
101     {"<NONE>",  0, -1,      {0, 0, 0}},
102     {"SUM",     0, -1,      {FMT_F, 8, 2}},
103     {"MEAN",    0, -1,      {FMT_F, 8, 2}},
104     {"SD",      0, -1,      {FMT_F, 8, 2}},
105     {"MAX",     0, ALPHA,   {-1, -1, -1}}, 
106     {"MIN",     0, ALPHA,   {-1, -1, -1}}, 
107     {"PGT",     1, NUMERIC, {FMT_F, 5, 1}},      
108     {"PLT",     1, NUMERIC, {FMT_F, 5, 1}},       
109     {"PIN",     2, NUMERIC, {FMT_F, 5, 1}},       
110     {"POUT",    2, NUMERIC, {FMT_F, 5, 1}},       
111     {"FGT",     1, NUMERIC, {FMT_F, 5, 3}},       
112     {"FLT",     1, NUMERIC, {FMT_F, 5, 3}},       
113     {"FIN",     2, NUMERIC, {FMT_F, 5, 3}},       
114     {"FOUT",    2, NUMERIC, {FMT_F, 5, 3}},       
115     {"N",       0, NUMERIC, {FMT_F, 7, 0}},       
116     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},       
117     {"NMISS",   0, NUMERIC, {FMT_F, 7, 0}},       
118     {"NUMISS",  0, NUMERIC, {FMT_F, 7, 0}},       
119     {"FIRST",   0, ALPHA,   {-1, -1, -1}}, 
120     {"LAST",    0, ALPHA,   {-1, -1, -1}},
121     {NULL,      0, -1,      {-1, -1, -1}},
122     {"N",       0, NUMERIC, {FMT_F, 7, 0}},
123     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},
124   };
125
126 /* Missing value types. */
127 enum missing_treatment
128   {
129     ITEMWISE,           /* Missing values item by item. */
130     COLUMNWISE          /* Missing values column by column. */
131   };
132
133 /* An entire AGGREGATE procedure. */
134 struct agr_proc 
135   {
136     /* We have either an output file or a sink. */
137     struct any_writer *writer;          /* Output file, or null if none. */
138     struct case_sink *sink;             /* Sink, or null if none. */
139
140     /* Break variables. */
141     struct sort_criteria *sort;         /* Sort criteria. */
142     struct variable **break_vars;       /* Break variables. */
143     size_t break_var_cnt;               /* Number of break variables. */
144     struct ccase break_case;            /* Last values of break variables. */
145
146     enum missing_treatment missing;     /* How to treat missing values. */
147     struct agr_var *agr_vars;           /* First aggregate variable. */
148     struct dictionary *dict;            /* Aggregate dictionary. */
149     const struct dictionary *src_dict;  /* Dict of the source */
150     int case_cnt;                       /* Counts aggregated cases. */
151     struct ccase agr_case;              /* Aggregate case for output. */
152   };
153
154 static void initialize_aggregate_info (struct agr_proc *,
155                                        const struct ccase *);
156
157 /* Prototypes. */
158 static bool parse_aggregate_functions (const struct dictionary *,
159                                        struct agr_proc *);
160 static void agr_destroy (struct agr_proc *);
161 static bool aggregate_single_case (struct agr_proc *agr,
162                                    const struct ccase *input,
163                                    struct ccase *output);
164 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
165
166 /* Aggregating to the active file. */
167 static bool agr_to_active_file (const struct ccase *, void *aux, const struct dataset *);
168
169 /* Aggregating to a system file. */
170 static bool presorted_agr_to_sysfile (const struct ccase *, void *aux, const struct dataset *);
171 \f
172 /* Parsing. */
173
174 /* Parses and executes the AGGREGATE procedure. */
175 int
176 cmd_aggregate (struct dataset *ds)
177 {
178   struct dictionary *dict = dataset_dict (ds);
179   struct agr_proc agr;
180   struct file_handle *out_file = NULL;
181
182   bool copy_documents = false;
183   bool presorted = false;
184   bool saw_direction;
185
186   memset(&agr, 0 , sizeof (agr));
187   agr.missing = ITEMWISE;
188   case_nullify (&agr.break_case);
189   
190   agr.dict = dict_create ();
191   agr.src_dict = dict;
192   dict_set_label (agr.dict, dict_get_label (dict));
193   dict_set_documents (agr.dict, dict_get_documents (dict));
194
195   /* OUTFILE subcommand must be first. */
196   if (!lex_force_match_id ("OUTFILE"))
197     goto error;
198   lex_match ('=');
199   if (!lex_match ('*'))
200     {
201       out_file = fh_parse (FH_REF_FILE | FH_REF_SCRATCH);
202       if (out_file == NULL)
203         goto error;
204     }
205   
206   /* Read most of the subcommands. */
207   for (;;)
208     {
209       lex_match ('/');
210       
211       if (lex_match_id ("MISSING"))
212         {
213           lex_match ('=');
214           if (!lex_match_id ("COLUMNWISE"))
215             {
216               lex_error (_("while expecting COLUMNWISE"));
217               goto error;
218             }
219           agr.missing = COLUMNWISE;
220         }
221       else if (lex_match_id ("DOCUMENT"))
222         copy_documents = true;
223       else if (lex_match_id ("PRESORTED"))
224         presorted = true;
225       else if (lex_match_id ("BREAK"))
226         {
227           int i;
228
229           lex_match ('=');
230           agr.sort = sort_parse_criteria (dict,
231                                           &agr.break_vars, &agr.break_var_cnt,
232                                           &saw_direction, NULL);
233           if (agr.sort == NULL)
234             goto error;
235           
236           for (i = 0; i < agr.break_var_cnt; i++)
237             dict_clone_var_assert (agr.dict, agr.break_vars[i],
238                                    agr.break_vars[i]->name);
239
240           /* BREAK must follow the options. */
241           break;
242         }
243       else
244         {
245           lex_error (_("expecting BREAK"));
246           goto error;
247         }
248     }
249   if (presorted && saw_direction)
250     msg (SW, _("When PRESORTED is specified, specifying sorting directions "
251                "with (A) or (D) has no effect.  Output data will be sorted "
252                "the same way as the input data."));
253       
254   /* Read in the aggregate functions. */
255   lex_match ('/');
256   if (!parse_aggregate_functions (dict, &agr))
257     goto error;
258
259   /* Delete documents. */
260   if (!copy_documents)
261     dict_set_documents (agr.dict, NULL);
262
263   /* Cancel SPLIT FILE. */
264   dict_set_split_vars (agr.dict, NULL, 0);
265   
266   /* Initialize. */
267   agr.case_cnt = 0;
268   case_create (&agr.agr_case, dict_get_next_value_idx (agr.dict));
269
270   /* Output to active file or external file? */
271   if (out_file == NULL) 
272     {
273       /* The active file will be replaced by the aggregated data,
274          so TEMPORARY is moot. */
275       proc_cancel_temporary_transformations (ds);
276
277       if (agr.sort != NULL && !presorted) 
278         {
279           if (!sort_active_file_in_place (ds, agr.sort))
280             goto error;
281         }
282
283       agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
284       if (agr.sink->class->open != NULL)
285         agr.sink->class->open (agr.sink);
286       proc_set_sink (ds, 
287                      create_case_sink (&null_sink_class, 
288                                        dict, NULL));
289       if (!procedure (ds, agr_to_active_file, &agr))
290         goto error;
291       if (agr.case_cnt > 0) 
292         {
293           dump_aggregate_info (&agr, &agr.agr_case);
294           if (!agr.sink->class->write (agr.sink, &agr.agr_case))
295             goto error;
296         }
297       discard_variables (ds);
298       dict_destroy (dict);
299       dataset_set_dict (ds, agr.dict);
300       agr.dict = NULL;
301       proc_set_source (ds, 
302                        agr.sink->class->make_source (agr.sink));
303       free_case_sink (agr.sink);
304     }
305   else
306     {
307       agr.writer = any_writer_open (out_file, agr.dict);
308       if (agr.writer == NULL)
309         goto error;
310       
311       if (agr.sort != NULL && !presorted) 
312         {
313           /* Sorting is needed. */
314           struct casefile *dst;
315           struct casereader *reader;
316           struct ccase c;
317           bool ok = true;
318           
319           dst = sort_active_file_to_casefile (ds, agr.sort);
320           if (dst == NULL)
321             goto error;
322           reader = casefile_get_destructive_reader (dst);
323           while (ok && casereader_read_xfer (reader, &c)) 
324             {
325               if (aggregate_single_case (&agr, &c, &agr.agr_case)) 
326                 ok = any_writer_write (agr.writer, &agr.agr_case);
327               case_destroy (&c);
328             }
329           casereader_destroy (reader);
330           if (ok)
331             ok = !casefile_error (dst);
332           casefile_destroy (dst);
333           if (!ok)
334             goto error;
335         }
336       else 
337         {
338           /* Active file is already sorted. */
339           if (!procedure (ds, presorted_agr_to_sysfile, &agr))
340             goto error;
341         }
342       
343       if (agr.case_cnt > 0) 
344         {
345           dump_aggregate_info (&agr, &agr.agr_case);
346           any_writer_write (agr.writer, &agr.agr_case);
347         }
348       if (any_writer_error (agr.writer))
349         goto error;
350     }
351   
352   agr_destroy (&agr);
353   return CMD_SUCCESS;
354
355 error:
356   agr_destroy (&agr);
357   return CMD_CASCADING_FAILURE;
358 }
359
360 /* Parse all the aggregate functions. */
361 static bool
362 parse_aggregate_functions (const struct dictionary *dict, struct agr_proc *agr)
363 {
364   struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
365
366   /* Parse everything. */
367   tail = NULL;
368   for (;;)
369     {
370       char **dest;
371       char **dest_label;
372       size_t n_dest;
373
374       int include_missing;
375       const struct agr_func *function;
376       int func_index;
377
378       union agr_argument arg[2];
379
380       struct variable **src;
381       size_t n_src;
382
383       size_t i;
384
385       dest = NULL;
386       dest_label = NULL;
387       n_dest = 0;
388       src = NULL;
389       function = NULL;
390       n_src = 0;
391       arg[0].c = NULL;
392       arg[1].c = NULL;
393
394       /* Parse the list of target variables. */
395       while (!lex_match ('='))
396         {
397           size_t n_dest_prev = n_dest;
398           
399           if (!parse_DATA_LIST_vars (&dest, &n_dest,
400                                      PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
401             goto error;
402
403           /* Assign empty labels. */
404           {
405             int j;
406
407             dest_label = xnrealloc (dest_label, n_dest, sizeof *dest_label);
408             for (j = n_dest_prev; j < n_dest; j++)
409               dest_label[j] = NULL;
410           }
411           
412           if (token == T_STRING)
413             {
414               ds_truncate (&tokstr, 255);
415               dest_label[n_dest - 1] = ds_xstrdup (&tokstr);
416               lex_get ();
417             }
418         }
419
420       /* Get the name of the aggregation function. */
421       if (token != T_ID)
422         {
423           lex_error (_("expecting aggregation function"));
424           goto error;
425         }
426
427       include_missing = 0;
428       if (tokid[strlen (tokid) - 1] == '.')
429         {
430           include_missing = 1;
431           tokid[strlen (tokid) - 1] = 0;
432         }
433       
434       for (function = agr_func_tab; function->name; function++)
435         if (!strcasecmp (function->name, tokid))
436           break;
437       if (NULL == function->name)
438         {
439           msg (SE, _("Unknown aggregation function %s."), tokid);
440           goto error;
441         }
442       func_index = function - agr_func_tab;
443       lex_get ();
444
445       /* Check for leading lparen. */
446       if (!lex_match ('('))
447         {
448           if (func_index == N)
449             func_index = N_NO_VARS;
450           else if (func_index == NU)
451             func_index = NU_NO_VARS;
452           else
453             {
454               lex_error (_("expecting `('"));
455               goto error;
456             }
457         }
458       else
459         {
460           /* Parse list of source variables. */
461           {
462             int pv_opts = PV_NO_SCRATCH;
463
464             if (func_index == SUM || func_index == MEAN || func_index == SD)
465               pv_opts |= PV_NUMERIC;
466             else if (function->n_args)
467               pv_opts |= PV_SAME_TYPE;
468
469             if (!parse_variables (dict, &src, &n_src, pv_opts))
470               goto error;
471           }
472
473           /* Parse function arguments, for those functions that
474              require arguments. */
475           if (function->n_args != 0)
476             for (i = 0; i < function->n_args; i++)
477               {
478                 int type;
479             
480                 lex_match (',');
481                 if (token == T_STRING)
482                   {
483                     arg[i].c = ds_xstrdup (&tokstr);
484                     type = ALPHA;
485                   }
486                 else if (lex_is_number ())
487                   {
488                     arg[i].f = tokval;
489                     type = NUMERIC;
490                   } else {
491                     msg (SE, _("Missing argument %d to %s."), i + 1,
492                          function->name);
493                     goto error;
494                   }
495             
496                 lex_get ();
497
498                 if (type != src[0]->type)
499                   {
500                     msg (SE, _("Arguments to %s must be of same type as "
501                                "source variables."),
502                          function->name);
503                     goto error;
504                   }
505               }
506
507           /* Trailing rparen. */
508           if (!lex_match(')'))
509             {
510               lex_error (_("expecting `)'"));
511               goto error;
512             }
513           
514           /* Now check that the number of source variables match
515              the number of target variables.  If we check earlier
516              than this, the user can get very misleading error
517              message, i.e. `AGGREGATE x=SUM(y t).' will get this
518              error message when a proper message would be more
519              like `unknown variable t'. */
520           if (n_src != n_dest)
521             {
522               msg (SE, _("Number of source variables (%u) does not match "
523                          "number of target variables (%u)."),
524                    (unsigned) n_src, (unsigned) n_dest);
525               goto error;
526             }
527
528           if ((func_index == PIN || func_index == POUT
529               || func_index == FIN || func_index == FOUT) 
530               && ((src[0]->type == NUMERIC && arg[0].f > arg[1].f)
531                   || (src[0]->type == ALPHA
532                       && str_compare_rpad (arg[0].c, arg[1].c) > 0)))
533             {
534               union agr_argument t = arg[0];
535               arg[0] = arg[1];
536               arg[1] = t;
537                   
538               msg (SW, _("The value arguments passed to the %s function "
539                          "are out-of-order.  They will be treated as if "
540                          "they had been specified in the correct order."),
541                    function->name);
542             }
543         }
544         
545       /* Finally add these to the linked list of aggregation
546          variables. */
547       for (i = 0; i < n_dest; i++)
548         {
549           struct agr_var *v = xmalloc (sizeof *v);
550
551           /* Add variable to chain. */
552           if (agr->agr_vars != NULL)
553             tail->next = v;
554           else
555             agr->agr_vars = v;
556           tail = v;
557           tail->next = NULL;
558           v->moments = NULL;
559           
560           /* Create the target variable in the aggregate
561              dictionary. */
562           {
563             struct variable *destvar;
564             
565             v->function = func_index;
566
567             if (src)
568               {
569                 v->src = src[i];
570                 
571                 if (src[i]->type == ALPHA)
572                   {
573                     v->function |= FSTRING;
574                     v->string = xmalloc (src[i]->width);
575                   }
576
577                 if (function->alpha_type == ALPHA)
578                   destvar = dict_clone_var (agr->dict, v->src, dest[i]);
579                 else
580                   {
581                     assert (v->src->type == NUMERIC
582                             || function->alpha_type == NUMERIC);
583                     destvar = dict_create_var (agr->dict, dest[i], 0);
584                     if (destvar != NULL) 
585                       {
586                         struct fmt_spec f;
587                         if ((func_index == N || func_index == NMISS)
588                             && dict_get_weight (dict) != NULL)
589                           f = fmt_for_output (FMT_F, 8, 2); 
590                         else
591                           f = function->format;
592                         destvar->print = destvar->write = f;
593                       }
594                   }
595               } else {
596                 struct fmt_spec f;
597                 v->src = NULL;
598                 destvar = dict_create_var (agr->dict, dest[i], 0);
599                 if (func_index == N_NO_VARS && dict_get_weight (dict) != NULL)
600                   f = fmt_for_output (FMT_F, 8, 2); 
601                 else
602                   f = function->format;
603                 destvar->print = destvar->write = f;
604               }
605           
606             if (!destvar)
607               {
608                 msg (SE, _("Variable name %s is not unique within the "
609                            "aggregate file dictionary, which contains "
610                            "the aggregate variables and the break "
611                            "variables."),
612                      dest[i]);
613                 goto error;
614               }
615
616             free (dest[i]);
617             if (dest_label[i])
618               {
619                 destvar->label = dest_label[i];
620                 dest_label[i] = NULL;
621               }
622
623             v->dest = destvar;
624           }
625           
626           v->include_missing = include_missing;
627
628           if (v->src != NULL)
629             {
630               int j;
631
632               if (v->src->type == NUMERIC)
633                 for (j = 0; j < function->n_args; j++)
634                   v->arg[j].f = arg[j].f;
635               else
636                 for (j = 0; j < function->n_args; j++)
637                   v->arg[j].c = xstrdup (arg[j].c);
638             }
639         }
640       
641       if (src != NULL && src[0]->type == ALPHA)
642         for (i = 0; i < function->n_args; i++)
643           {
644             free (arg[i].c);
645             arg[i].c = NULL;
646           }
647
648       free (src);
649       free (dest);
650       free (dest_label);
651
652       if (!lex_match ('/'))
653         {
654           if (token == '.')
655             return true;
656
657           lex_error ("expecting end of command");
658           return false;
659         }
660       continue;
661       
662     error:
663       for (i = 0; i < n_dest; i++)
664         {
665           free (dest[i]);
666           free (dest_label[i]);
667         }
668       free (dest);
669       free (dest_label);
670       free (arg[0].c);
671       free (arg[1].c);
672       if (src && n_src && src[0]->type == ALPHA)
673         for (i = 0; i < function->n_args; i++)
674           {
675             free (arg[i].c);
676             arg[i].c = NULL;
677           }
678       free (src);
679         
680       return false;
681     }
682 }
683
684 /* Destroys AGR. */
685 static void
686 agr_destroy (struct agr_proc *agr)
687 {
688   struct agr_var *iter, *next;
689
690   any_writer_close (agr->writer);
691   if (agr->sort != NULL)
692     sort_destroy_criteria (agr->sort);
693   free (agr->break_vars);
694   case_destroy (&agr->break_case);
695   for (iter = agr->agr_vars; iter; iter = next)
696     {
697       next = iter->next;
698
699       if (iter->function & FSTRING)
700         {
701           size_t n_args;
702           size_t i;
703
704           n_args = agr_func_tab[iter->function & FUNC].n_args;
705           for (i = 0; i < n_args; i++)
706             free (iter->arg[i].c);
707           free (iter->string);
708         }
709       else if (iter->function == SD)
710         moments1_destroy (iter->moments);
711       free (iter);
712     }
713   if (agr->dict != NULL)
714     dict_destroy (agr->dict);
715
716   case_destroy (&agr->agr_case);
717 }
718 \f
719 /* Execution. */
720
721 static void accumulate_aggregate_info (struct agr_proc *,
722                                        const struct ccase *);
723 static void dump_aggregate_info (struct agr_proc *, struct ccase *);
724
725 /* Processes a single case INPUT for aggregation.  If output is
726    warranted, writes it to OUTPUT and returns true.
727    Otherwise, returns false and OUTPUT is unmodified. */
728 static bool
729 aggregate_single_case (struct agr_proc *agr,
730                        const struct ccase *input, struct ccase *output)
731 {
732   bool finished_group = false;
733   
734   if (agr->case_cnt++ == 0)
735     initialize_aggregate_info (agr, input);
736   else if (case_compare (&agr->break_case, input,
737                          agr->break_vars, agr->break_var_cnt))
738     {
739       dump_aggregate_info (agr, output);
740       finished_group = true;
741
742       initialize_aggregate_info (agr, input);
743     }
744
745   accumulate_aggregate_info (agr, input);
746   return finished_group;
747 }
748
749 /* Accumulates aggregation data from the case INPUT. */
750 static void 
751 accumulate_aggregate_info (struct agr_proc *agr,
752                            const struct ccase *input)
753 {
754   struct agr_var *iter;
755   double weight;
756   bool bad_warn = true;
757
758   weight = dict_get_case_weight (agr->src_dict, input, &bad_warn);
759
760   for (iter = agr->agr_vars; iter; iter = iter->next)
761     if (iter->src)
762       {
763         const union value *v = case_data (input, iter->src->fv);
764
765         if ((!iter->include_missing
766              && mv_is_value_missing (&iter->src->miss, v))
767             || (iter->include_missing && iter->src->type == NUMERIC
768                 && v->f == SYSMIS))
769           {
770             switch (iter->function)
771               {
772               case NMISS:
773               case NMISS | FSTRING:
774                 iter->dbl[0] += weight;
775                 break;
776               case NUMISS:
777               case NUMISS | FSTRING:
778                 iter->int1++;
779                 break;
780               }
781             iter->missing = 1;
782             continue;
783           }
784         
785         /* This is horrible.  There are too many possibilities. */
786         switch (iter->function)
787           {
788           case SUM:
789             iter->dbl[0] += v->f * weight;
790             iter->int1 = 1;
791             break;
792           case MEAN:
793             iter->dbl[0] += v->f * weight;
794             iter->dbl[1] += weight;
795             break;
796           case SD:
797             moments1_add (iter->moments, v->f, weight);
798             break;
799           case MAX:
800             iter->dbl[0] = max (iter->dbl[0], v->f);
801             iter->int1 = 1;
802             break;
803           case MAX | FSTRING:
804             if (memcmp (iter->string, v->s, iter->src->width) < 0)
805               memcpy (iter->string, v->s, iter->src->width);
806             iter->int1 = 1;
807             break;
808           case MIN:
809             iter->dbl[0] = min (iter->dbl[0], v->f);
810             iter->int1 = 1;
811             break;
812           case MIN | FSTRING:
813             if (memcmp (iter->string, v->s, iter->src->width) > 0)
814               memcpy (iter->string, v->s, iter->src->width);
815             iter->int1 = 1;
816             break;
817           case FGT:
818           case PGT:
819             if (v->f > iter->arg[0].f)
820               iter->dbl[0] += weight;
821             iter->dbl[1] += weight;
822             break;
823           case FGT | FSTRING:
824           case PGT | FSTRING:
825             if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
826               iter->dbl[0] += weight;
827             iter->dbl[1] += weight;
828             break;
829           case FLT:
830           case PLT:
831             if (v->f < iter->arg[0].f)
832               iter->dbl[0] += weight;
833             iter->dbl[1] += weight;
834             break;
835           case FLT | FSTRING:
836           case PLT | FSTRING:
837             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
838               iter->dbl[0] += weight;
839             iter->dbl[1] += weight;
840             break;
841           case FIN:
842           case PIN:
843             if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
844               iter->dbl[0] += weight;
845             iter->dbl[1] += weight;
846             break;
847           case FIN | FSTRING:
848           case PIN | FSTRING:
849             if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
850                 && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
851               iter->dbl[0] += weight;
852             iter->dbl[1] += weight;
853             break;
854           case FOUT:
855           case POUT:
856             if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
857               iter->dbl[0] += weight;
858             iter->dbl[1] += weight;
859             break;
860           case FOUT | FSTRING:
861           case POUT | FSTRING:
862             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
863                 || memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
864               iter->dbl[0] += weight;
865             iter->dbl[1] += weight;
866             break;
867           case N:
868           case N | FSTRING:
869             iter->dbl[0] += weight;
870             break;
871           case NU:
872           case NU | FSTRING:
873             iter->int1++;
874             break;
875           case FIRST:
876             if (iter->int1 == 0)
877               {
878                 iter->dbl[0] = v->f;
879                 iter->int1 = 1;
880               }
881             break;
882           case FIRST | FSTRING:
883             if (iter->int1 == 0)
884               {
885                 memcpy (iter->string, v->s, iter->src->width);
886                 iter->int1 = 1;
887               }
888             break;
889           case LAST:
890             iter->dbl[0] = v->f;
891             iter->int1 = 1;
892             break;
893           case LAST | FSTRING:
894             memcpy (iter->string, v->s, iter->src->width);
895             iter->int1 = 1;
896             break;
897           case NMISS:
898           case NMISS | FSTRING:
899           case NUMISS:
900           case NUMISS | FSTRING:
901             /* Our value is not missing or it would have been
902                caught earlier.  Nothing to do. */
903             break;
904           default:
905             NOT_REACHED ();
906           }
907     } else {
908       switch (iter->function)
909         {
910         case N_NO_VARS:
911           iter->dbl[0] += weight;
912           break;
913         case NU_NO_VARS:
914           iter->int1++;
915           break;
916         default:
917           NOT_REACHED ();
918         }
919     }
920 }
921
922 /* We've come to a record that differs from the previous in one or
923    more of the break variables.  Make an output record from the
924    accumulated statistics in the OUTPUT case. */
925 static void 
926 dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
927 {
928   {
929     int value_idx = 0;
930     int i;
931
932     for (i = 0; i < agr->break_var_cnt; i++) 
933       {
934         struct variable *v = agr->break_vars[i];
935         memcpy (case_data_rw (output, value_idx),
936                 case_data (&agr->break_case, v->fv),
937                 sizeof (union value) * v->nv);
938         value_idx += v->nv; 
939       }
940   }
941   
942   {
943     struct agr_var *i;
944   
945     for (i = agr->agr_vars; i; i = i->next)
946       {
947         union value *v = case_data_rw (output, i->dest->fv);
948
949         if (agr->missing == COLUMNWISE && i->missing != 0
950             && (i->function & FUNC) != N && (i->function & FUNC) != NU
951             && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
952           {
953             if (i->dest->type == ALPHA)
954               memset (v->s, ' ', i->dest->width);
955             else
956               v->f = SYSMIS;
957             continue;
958           }
959         
960         switch (i->function)
961           {
962           case SUM:
963             v->f = i->int1 ? i->dbl[0] : SYSMIS;
964             break;
965           case MEAN:
966             v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
967             break;
968           case SD:
969             {
970               double variance;
971
972               /* FIXME: we should use two passes. */
973               moments1_calculate (i->moments, NULL, NULL, &variance,
974                                  NULL, NULL);
975               if (variance != SYSMIS)
976                 v->f = sqrt (variance);
977               else
978                 v->f = SYSMIS; 
979             }
980             break;
981           case MAX:
982           case MIN:
983             v->f = i->int1 ? i->dbl[0] : SYSMIS;
984             break;
985           case MAX | FSTRING:
986           case MIN | FSTRING:
987             if (i->int1)
988               memcpy (v->s, i->string, i->dest->width);
989             else
990               memset (v->s, ' ', i->dest->width);
991             break;
992           case FGT:
993           case FGT | FSTRING:
994           case FLT:
995           case FLT | FSTRING:
996           case FIN:
997           case FIN | FSTRING:
998           case FOUT:
999           case FOUT | FSTRING:
1000             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
1001             break;
1002           case PGT:
1003           case PGT | FSTRING:
1004           case PLT:
1005           case PLT | FSTRING:
1006           case PIN:
1007           case PIN | FSTRING:
1008           case POUT:
1009           case POUT | FSTRING:
1010             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
1011             break;
1012           case N:
1013           case N | FSTRING:
1014             v->f = i->dbl[0];
1015             break;
1016           case NU:
1017           case NU | FSTRING:
1018             v->f = i->int1;
1019             break;
1020           case FIRST:
1021           case LAST:
1022             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1023             break;
1024           case FIRST | FSTRING:
1025           case LAST | FSTRING:
1026             if (i->int1)
1027               memcpy (v->s, i->string, i->dest->width);
1028             else
1029               memset (v->s, ' ', i->dest->width);
1030             break;
1031           case N_NO_VARS:
1032             v->f = i->dbl[0];
1033             break;
1034           case NU_NO_VARS:
1035             v->f = i->int1;
1036             break;
1037           case NMISS:
1038           case NMISS | FSTRING:
1039             v->f = i->dbl[0];
1040             break;
1041           case NUMISS:
1042           case NUMISS | FSTRING:
1043             v->f = i->int1;
1044             break;
1045           default:
1046             NOT_REACHED ();
1047           }
1048       }
1049   }
1050 }
1051
1052 /* Resets the state for all the aggregate functions. */
1053 static void
1054 initialize_aggregate_info (struct agr_proc *agr, const struct ccase *input)
1055 {
1056   struct agr_var *iter;
1057
1058   case_destroy (&agr->break_case);
1059   case_clone (&agr->break_case, input);
1060
1061   for (iter = agr->agr_vars; iter; iter = iter->next)
1062     {
1063       iter->missing = 0;
1064       iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1065       iter->int1 = iter->int2 = 0;
1066       switch (iter->function)
1067         {
1068         case MIN:
1069           iter->dbl[0] = DBL_MAX;
1070           break;
1071         case MIN | FSTRING:
1072           memset (iter->string, 255, iter->src->width);
1073           break;
1074         case MAX:
1075           iter->dbl[0] = -DBL_MAX;
1076           break;
1077         case MAX | FSTRING:
1078           memset (iter->string, 0, iter->src->width);
1079           break;
1080         case SD:
1081           if (iter->moments == NULL)
1082             iter->moments = moments1_create (MOMENT_VARIANCE);
1083           else
1084             moments1_clear (iter->moments);
1085           break;
1086         default:
1087           break;
1088         }
1089     }
1090 }
1091 \f
1092 /* Aggregate each case as it comes through.  Cases which aren't needed
1093    are dropped.
1094    Returns true if successful, false if an I/O error occurred. */
1095 static bool
1096 agr_to_active_file (const struct ccase *c, void *agr_, const struct dataset *ds UNUSED)
1097 {
1098   struct agr_proc *agr = agr_;
1099
1100   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1101     return agr->sink->class->write (agr->sink, &agr->agr_case);
1102
1103   return true;
1104 }
1105
1106 /* Aggregate the current case and output it if we passed a
1107    breakpoint. */
1108 static bool
1109 presorted_agr_to_sysfile (const struct ccase *c, void *agr_, const struct dataset *ds UNUSED) 
1110 {
1111   struct agr_proc *agr = agr_;
1112
1113   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1114     return any_writer_write (agr->writer, &agr->agr_case);
1115   
1116   return true;
1117 }