Fix rest of PR 13054.
[pspp-builds.git] / src / aggregate.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21 #include "error.h"
22 #include <stdlib.h>
23 #include "alloc.h"
24 #include "case.h"
25 #include "casefile.h"
26 #include "command.h"
27 #include "dictionary.h"
28 #include "error.h"
29 #include "file-handle.h"
30 #include "lexer.h"
31 #include "misc.h"
32 #include "moments.h"
33 #include "pool.h"
34 #include "settings.h"
35 #include "sfm-write.h"
36 #include "sort.h"
37 #include "str.h"
38 #include "var.h"
39 #include "vfm.h"
40 #include "vfmP.h"
41
42 /* Specifies how to make an aggregate variable. */
43 struct agr_var
44   {
45     struct agr_var *next;               /* Next in list. */
46
47     /* Collected during parsing. */
48     struct variable *src;       /* Source variable. */
49     struct variable *dest;      /* Target variable. */
50     int function;               /* Function. */
51     int include_missing;        /* 1=Include user-missing values. */
52     union value arg[2];         /* Arguments. */
53
54     /* Accumulated during AGGREGATE execution. */
55     double dbl[3];
56     int int1, int2;
57     char *string;
58     int missing;
59     struct moments1 *moments;
60   };
61
62 /* Aggregation functions. */
63 enum
64   {
65     NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
66     FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
67     N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
68     FUNC = 0x1f, /* Function mask. */
69     FSTRING = 1<<5, /* String function bit. */
70   };
71
72 /* Attributes of an aggregation function. */
73 struct agr_func
74   {
75     const char *name;           /* Aggregation function name. */
76     int n_args;                 /* Number of arguments. */
77     int alpha_type;             /* When given ALPHA arguments, output type. */
78     struct fmt_spec format;     /* Format spec if alpha_type != ALPHA. */
79   };
80
81 /* Attributes of aggregation functions. */
82 static const struct agr_func agr_func_tab[] = 
83   {
84     {"<NONE>",  0, -1,      {0, 0, 0}},
85     {"SUM",     0, -1,      {FMT_F, 8, 2}},
86     {"MEAN",    0, -1,      {FMT_F, 8, 2}},
87     {"SD",      0, -1,      {FMT_F, 8, 2}},
88     {"MAX",     0, ALPHA,   {-1, -1, -1}}, 
89     {"MIN",     0, ALPHA,   {-1, -1, -1}}, 
90     {"PGT",     1, NUMERIC, {FMT_F, 5, 1}},      
91     {"PLT",     1, NUMERIC, {FMT_F, 5, 1}},       
92     {"PIN",     2, NUMERIC, {FMT_F, 5, 1}},       
93     {"POUT",    2, NUMERIC, {FMT_F, 5, 1}},       
94     {"FGT",     1, NUMERIC, {FMT_F, 5, 3}},       
95     {"FLT",     1, NUMERIC, {FMT_F, 5, 3}},       
96     {"FIN",     2, NUMERIC, {FMT_F, 5, 3}},       
97     {"FOUT",    2, NUMERIC, {FMT_F, 5, 3}},       
98     {"N",       0, NUMERIC, {FMT_F, 7, 0}},       
99     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},       
100     {"NMISS",   0, NUMERIC, {FMT_F, 7, 0}},       
101     {"NUMISS",  0, NUMERIC, {FMT_F, 7, 0}},       
102     {"FIRST",   0, ALPHA,   {-1, -1, -1}}, 
103     {"LAST",    0, ALPHA,   {-1, -1, -1}},
104     {NULL,      0, -1,      {-1, -1, -1}},
105     {"N",       0, NUMERIC, {FMT_F, 7, 0}},
106     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},
107   };
108
109 /* Missing value types. */
110 enum missing_treatment
111   {
112     ITEMWISE,           /* Missing values item by item. */
113     COLUMNWISE          /* Missing values column by column. */
114   };
115
116 /* An entire AGGREGATE procedure. */
117 struct agr_proc 
118   {
119     /* We have either an output file or a sink. */
120     struct sfm_writer *writer;          /* Output file, or null if none. */
121     struct case_sink *sink;             /* Sink, or null if none. */
122
123     /* Break variables. */
124     struct sort_criteria *sort;         /* Sort criteria. */
125     struct variable **break_vars;       /* Break variables. */
126     size_t break_var_cnt;               /* Number of break variables. */
127     struct ccase break_case;            /* Last values of break variables. */
128
129     enum missing_treatment missing;     /* How to treat missing values. */
130     struct agr_var *agr_vars;           /* First aggregate variable. */
131     struct dictionary *dict;            /* Aggregate dictionary. */
132     int case_cnt;                       /* Counts aggregated cases. */
133     struct ccase agr_case;              /* Aggregate case for output. */
134   };
135
136 static void initialize_aggregate_info (struct agr_proc *,
137                                        const struct ccase *);
138
139 /* Prototypes. */
140 static int parse_aggregate_functions (struct agr_proc *);
141 static void agr_destroy (struct agr_proc *);
142 static int aggregate_single_case (struct agr_proc *agr,
143                                   const struct ccase *input,
144                                   struct ccase *output);
145 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
146
147 /* Aggregating to the active file. */
148 static int agr_to_active_file (struct ccase *, void *aux);
149
150 /* Aggregating to a system file. */
151 static int presorted_agr_to_sysfile (struct ccase *, void *aux);
152 \f
153 /* Parsing. */
154
155 /* Parses and executes the AGGREGATE procedure. */
156 int
157 cmd_aggregate (void)
158 {
159   struct agr_proc agr;
160   struct file_handle *out_file = NULL;
161
162   bool copy_documents = false;
163   bool presorted = false;
164   bool saw_direction;
165
166   memset(&agr, 0 , sizeof (agr));
167   agr.missing = ITEMWISE;
168   case_nullify (&agr.break_case);
169   
170   agr.dict = dict_create ();
171   dict_set_label (agr.dict, dict_get_label (default_dict));
172   dict_set_documents (agr.dict, dict_get_documents (default_dict));
173
174   /* OUTFILE subcommand must be first. */
175   if (!lex_force_match_id ("OUTFILE"))
176     goto error;
177   lex_match ('=');
178   if (!lex_match ('*'))
179     {
180       out_file = fh_parse ();
181       if (out_file == NULL)
182         goto error;
183     }
184   
185   /* Read most of the subcommands. */
186   for (;;)
187     {
188       lex_match ('/');
189       
190       if (lex_match_id ("MISSING"))
191         {
192           lex_match ('=');
193           if (!lex_match_id ("COLUMNWISE"))
194             {
195               lex_error (_("while expecting COLUMNWISE"));
196               goto error;
197             }
198           agr.missing = COLUMNWISE;
199         }
200       else if (lex_match_id ("DOCUMENT"))
201         copy_documents = true;
202       else if (lex_match_id ("PRESORTED"))
203         presorted = true;
204       else if (lex_match_id ("BREAK"))
205         {
206           int i;
207
208           lex_match ('=');
209           agr.sort = sort_parse_criteria (default_dict,
210                                           &agr.break_vars, &agr.break_var_cnt,
211                                           &saw_direction);
212           if (agr.sort == NULL)
213             goto error;
214           
215           for (i = 0; i < agr.break_var_cnt; i++)
216             dict_clone_var_assert (agr.dict, agr.break_vars[i],
217                                    agr.break_vars[i]->name);
218
219           /* BREAK must follow the options. */
220           break;
221         }
222       else
223         {
224           lex_error (_("expecting BREAK"));
225           goto error;
226         }
227     }
228   if (presorted && saw_direction)
229     msg (SW, _("When PRESORTED is specified, specifying sorting directions "
230                "with (A) or (D) has no effect.  Output data will be sorted "
231                "the same way as the input data."));
232       
233   /* Read in the aggregate functions. */
234   lex_match ('/');
235   if (!parse_aggregate_functions (&agr))
236     goto error;
237
238   /* Delete documents. */
239   if (!copy_documents)
240     dict_set_documents (agr.dict, NULL);
241
242   /* Cancel SPLIT FILE. */
243   dict_set_split_vars (agr.dict, NULL, 0);
244   
245   /* Initialize. */
246   agr.case_cnt = 0;
247   case_create (&agr.agr_case, dict_get_next_value_idx (agr.dict));
248
249   /* Output to active file or external file? */
250   if (out_file == NULL) 
251     {
252       /* The active file will be replaced by the aggregated data,
253          so TEMPORARY is moot. */
254       cancel_temporary ();
255
256       if (agr.sort != NULL && !presorted)
257         sort_active_file_in_place (agr.sort);
258
259       agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
260       if (agr.sink->class->open != NULL)
261         agr.sink->class->open (agr.sink);
262       vfm_sink = create_case_sink (&null_sink_class, default_dict, NULL);
263       procedure (agr_to_active_file, &agr);
264       if (agr.case_cnt > 0) 
265         {
266           dump_aggregate_info (&agr, &agr.agr_case);
267           agr.sink->class->write (agr.sink, &agr.agr_case);
268         }
269       dict_destroy (default_dict);
270       default_dict = agr.dict;
271       agr.dict = NULL;
272       vfm_source = agr.sink->class->make_source (agr.sink);
273       free_case_sink (agr.sink);
274     }
275   else
276     {
277       agr.writer = sfm_open_writer (out_file, agr.dict, get_scompression (), 0);
278       if (agr.writer == NULL)
279         goto error;
280       
281       if (agr.sort != NULL && !presorted) 
282         {
283           /* Sorting is needed. */
284           struct casefile *dst;
285           struct casereader *reader;
286           struct ccase c;
287           
288           dst = sort_active_file_to_casefile (agr.sort);
289           if (dst == NULL)
290             goto error;
291           reader = casefile_get_destructive_reader (dst);
292           while (casereader_read_xfer (reader, &c)) 
293             {
294               if (aggregate_single_case (&agr, &c, &agr.agr_case)) 
295                 sfm_write_case (agr.writer, &agr.agr_case);
296               case_destroy (&c);
297             }
298           casereader_destroy (reader);
299           casefile_destroy (dst);
300         }
301       else 
302         {
303           /* Active file is already sorted. */
304           procedure (presorted_agr_to_sysfile, &agr);
305         }
306       
307       if (agr.case_cnt > 0) 
308         {
309           dump_aggregate_info (&agr, &agr.agr_case);
310           sfm_write_case (agr.writer, &agr.agr_case);
311         }
312     }
313   
314   agr_destroy (&agr);
315   return CMD_SUCCESS;
316
317 error:
318   agr_destroy (&agr);
319   return CMD_FAILURE;
320 }
321
322 /* Parse all the aggregate functions. */
323 static int
324 parse_aggregate_functions (struct agr_proc *agr)
325 {
326   struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
327
328   /* Parse everything. */
329   tail = NULL;
330   for (;;)
331     {
332       char **dest;
333       char **dest_label;
334       int n_dest;
335
336       int include_missing;
337       const struct agr_func *function;
338       int func_index;
339
340       union value arg[2];
341
342       struct variable **src;
343       int n_src;
344
345       int i;
346
347       dest = NULL;
348       dest_label = NULL;
349       n_dest = 0;
350       src = NULL;
351       function = NULL;
352       n_src = 0;
353       arg[0].c = NULL;
354       arg[1].c = NULL;
355
356       /* Parse the list of target variables. */
357       while (!lex_match ('='))
358         {
359           int n_dest_prev = n_dest;
360           
361           if (!parse_DATA_LIST_vars (&dest, &n_dest,
362                                      PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
363             goto error;
364
365           /* Assign empty labels. */
366           {
367             int j;
368
369             dest_label = xrealloc (dest_label, sizeof *dest_label * n_dest);
370             for (j = n_dest_prev; j < n_dest; j++)
371               dest_label[j] = NULL;
372           }
373           
374           if (token == T_STRING)
375             {
376               ds_truncate (&tokstr, 255);
377               dest_label[n_dest - 1] = xstrdup (ds_c_str (&tokstr));
378               lex_get ();
379             }
380         }
381
382       /* Get the name of the aggregation function. */
383       if (token != T_ID)
384         {
385           lex_error (_("expecting aggregation function"));
386           goto error;
387         }
388
389       include_missing = 0;
390       if (tokid[strlen (tokid) - 1] == '.')
391         {
392           include_missing = 1;
393           tokid[strlen (tokid) - 1] = 0;
394         }
395       
396       for (function = agr_func_tab; function->name; function++)
397         if (!strcasecmp (function->name, tokid))
398           break;
399       if (NULL == function->name)
400         {
401           msg (SE, _("Unknown aggregation function %s."), tokid);
402           goto error;
403         }
404       func_index = function - agr_func_tab;
405       lex_get ();
406
407       /* Check for leading lparen. */
408       if (!lex_match ('('))
409         {
410           if (func_index == N)
411             func_index = N_NO_VARS;
412           else if (func_index == NU)
413             func_index = NU_NO_VARS;
414           else
415             {
416               lex_error (_("expecting `('"));
417               goto error;
418             }
419         }
420       else
421         {
422           /* Parse list of source variables. */
423           {
424             int pv_opts = PV_NO_SCRATCH;
425
426             if (func_index == SUM || func_index == MEAN || func_index == SD)
427               pv_opts |= PV_NUMERIC;
428             else if (function->n_args)
429               pv_opts |= PV_SAME_TYPE;
430
431             if (!parse_variables (default_dict, &src, &n_src, pv_opts))
432               goto error;
433           }
434
435           /* Parse function arguments, for those functions that
436              require arguments. */
437           if (function->n_args != 0)
438             for (i = 0; i < function->n_args; i++)
439               {
440                 int type;
441             
442                 lex_match (',');
443                 if (token == T_STRING)
444                   {
445                     arg[i].c = xstrdup (ds_c_str (&tokstr));
446                     type = ALPHA;
447                   }
448                 else if (lex_is_number ())
449                   {
450                     arg[i].f = tokval;
451                     type = NUMERIC;
452                   } else {
453                     msg (SE, _("Missing argument %d to %s."), i + 1,
454                          function->name);
455                     goto error;
456                   }
457             
458                 lex_get ();
459
460                 if (type != src[0]->type)
461                   {
462                     msg (SE, _("Arguments to %s must be of same type as "
463                                "source variables."),
464                          function->name);
465                     goto error;
466                   }
467               }
468
469           /* Trailing rparen. */
470           if (!lex_match(')'))
471             {
472               lex_error (_("expecting `)'"));
473               goto error;
474             }
475           
476           /* Now check that the number of source variables match
477              the number of target variables.  If we check earlier
478              than this, the user can get very misleading error
479              message, i.e. `AGGREGATE x=SUM(y t).' will get this
480              error message when a proper message would be more
481              like `unknown variable t'. */
482           if (n_src != n_dest)
483             {
484               msg (SE, _("Number of source variables (%d) does not match "
485                          "number of target variables (%d)."),
486                    n_src, n_dest);
487               goto error;
488             }
489
490           if ((func_index == PIN || func_index == POUT
491               || func_index == FIN || func_index == FOUT) 
492               && ((src[0]->type == NUMERIC && arg[0].f > arg[1].f)
493                   || (src[0]->type == ALPHA
494                       && str_compare_rpad (arg[0].c, arg[1].c) > 0)))
495             {
496               union value t = arg[0];
497               arg[0] = arg[1];
498               arg[1] = t;
499                   
500               msg (SW, _("The value arguments passed to the %s function "
501                          "are out-of-order.  They will be treated as if "
502                          "they had been specified in the correct order."),
503                    function->name);
504             }
505         }
506         
507       /* Finally add these to the linked list of aggregation
508          variables. */
509       for (i = 0; i < n_dest; i++)
510         {
511           struct agr_var *v = xmalloc (sizeof *v);
512
513           /* Add variable to chain. */
514           if (agr->agr_vars != NULL)
515             tail->next = v;
516           else
517             agr->agr_vars = v;
518           tail = v;
519           tail->next = NULL;
520           v->moments = NULL;
521           
522           /* Create the target variable in the aggregate
523              dictionary. */
524           {
525             struct variable *destvar;
526             
527             v->function = func_index;
528
529             if (src)
530               {
531                 v->src = src[i];
532                 
533                 if (src[i]->type == ALPHA)
534                   {
535                     v->function |= FSTRING;
536                     v->string = xmalloc (src[i]->width);
537                   }
538
539                 if (function->alpha_type == ALPHA)
540                   destvar = dict_clone_var (agr->dict, v->src, dest[i]);
541                 else
542                   {
543                     assert (v->src->type == NUMERIC
544                             || function->alpha_type == NUMERIC);
545                     destvar = dict_create_var (agr->dict, dest[i], 0);
546                     if (destvar != NULL) 
547                       {
548                         if ((func_index == N || func_index == NMISS)
549                             && dict_get_weight (default_dict) != NULL)
550                           destvar->print = destvar->write = f8_2; 
551                         else
552                           destvar->print = destvar->write = function->format;
553                       }
554                   }
555               } else {
556                 v->src = NULL;
557                 destvar = dict_create_var (agr->dict, dest[i], 0);
558                 if (func_index == N_NO_VARS
559                     && dict_get_weight (default_dict) != NULL)
560                   destvar->print = destvar->write = f8_2; 
561                 else
562                   destvar->print = destvar->write = function->format;
563               }
564           
565             if (!destvar)
566               {
567                 msg (SE, _("Variable name %s is not unique within the "
568                            "aggregate file dictionary, which contains "
569                            "the aggregate variables and the break "
570                            "variables."),
571                      dest[i]);
572                 goto error;
573               }
574
575             free (dest[i]);
576             destvar->init = 0;
577             if (dest_label[i])
578               {
579                 destvar->label = dest_label[i];
580                 dest_label[i] = NULL;
581               }
582
583             v->dest = destvar;
584           }
585           
586           v->include_missing = include_missing;
587
588           if (v->src != NULL)
589             {
590               int j;
591
592               if (v->src->type == NUMERIC)
593                 for (j = 0; j < function->n_args; j++)
594                   v->arg[j].f = arg[j].f;
595               else
596                 for (j = 0; j < function->n_args; j++)
597                   v->arg[j].c = xstrdup (arg[j].c);
598             }
599         }
600       
601       if (src != NULL && src[0]->type == ALPHA)
602         for (i = 0; i < function->n_args; i++)
603           {
604             free (arg[i].c);
605             arg[i].c = NULL;
606           }
607
608       free (src);
609       free (dest);
610       free (dest_label);
611
612       if (!lex_match ('/'))
613         {
614           if (token == '.')
615             return 1;
616
617           lex_error ("expecting end of command");
618           return 0;
619         }
620       continue;
621       
622     error:
623       for (i = 0; i < n_dest; i++)
624         {
625           free (dest[i]);
626           free (dest_label[i]);
627         }
628       free (dest);
629       free (dest_label);
630       free (arg[0].c);
631       free (arg[1].c);
632       if (src && n_src && src[0]->type == ALPHA)
633         for (i = 0; i < function->n_args; i++)
634           {
635             free (arg[i].c);
636             arg[i].c = NULL;
637           }
638       free (src);
639         
640       return 0;
641     }
642 }
643
644 /* Destroys AGR. */
645 static void
646 agr_destroy (struct agr_proc *agr)
647 {
648   struct agr_var *iter, *next;
649
650   sfm_close_writer (agr->writer);
651   if (agr->sort != NULL)
652     sort_destroy_criteria (agr->sort);
653   free (agr->break_vars);
654   case_destroy (&agr->break_case);
655   for (iter = agr->agr_vars; iter; iter = next)
656     {
657       next = iter->next;
658
659       if (iter->function & FSTRING)
660         {
661           int n_args;
662           int i;
663
664           n_args = agr_func_tab[iter->function & FUNC].n_args;
665           for (i = 0; i < n_args; i++)
666             free (iter->arg[i].c);
667           free (iter->string);
668         }
669       else if (iter->function == SD)
670         moments1_destroy (iter->moments);
671       free (iter);
672     }
673   if (agr->dict != NULL)
674     dict_destroy (agr->dict);
675
676   case_destroy (&agr->agr_case);
677 }
678 \f
679 /* Execution. */
680
681 static void accumulate_aggregate_info (struct agr_proc *,
682                                        const struct ccase *);
683 static void dump_aggregate_info (struct agr_proc *, struct ccase *);
684
685 /* Processes a single case INPUT for aggregation.  If output is
686    warranted, writes it to OUTPUT and returns nonzero.
687    Otherwise, returns zero and OUTPUT is unmodified. */
688 static int
689 aggregate_single_case (struct agr_proc *agr,
690                        const struct ccase *input, struct ccase *output)
691 {
692   bool finished_group = false;
693   
694   if (agr->case_cnt++ == 0)
695     initialize_aggregate_info (agr, input);
696   else if (case_compare (&agr->break_case, input,
697                          agr->break_vars, agr->break_var_cnt))
698     {
699       dump_aggregate_info (agr, output);
700       finished_group = true;
701
702       initialize_aggregate_info (agr, input);
703     }
704
705   accumulate_aggregate_info (agr, input);
706   return finished_group;
707 }
708
709 /* Accumulates aggregation data from the case INPUT. */
710 static void 
711 accumulate_aggregate_info (struct agr_proc *agr,
712                            const struct ccase *input)
713 {
714   struct agr_var *iter;
715   double weight;
716   int bad_warn = 1;
717
718   weight = dict_get_case_weight (default_dict, input, &bad_warn);
719
720   for (iter = agr->agr_vars; iter; iter = iter->next)
721     if (iter->src)
722       {
723         const union value *v = case_data (input, iter->src->fv);
724
725         if ((!iter->include_missing && is_missing (v, iter->src))
726             || (iter->include_missing && iter->src->type == NUMERIC
727                 && v->f == SYSMIS))
728           {
729             switch (iter->function)
730               {
731               case NMISS:
732               case NMISS | FSTRING:
733                 iter->dbl[0] += weight;
734                 break;
735               case NUMISS:
736               case NUMISS | FSTRING:
737                 iter->int1++;
738                 break;
739               }
740             iter->missing = 1;
741             continue;
742           }
743         
744         /* This is horrible.  There are too many possibilities. */
745         switch (iter->function)
746           {
747           case SUM:
748             iter->dbl[0] += v->f * weight;
749             iter->int1 = 1;
750             break;
751           case MEAN:
752             iter->dbl[0] += v->f * weight;
753             iter->dbl[1] += weight;
754             break;
755           case SD:
756             moments1_add (iter->moments, v->f, weight);
757             break;
758           case MAX:
759             iter->dbl[0] = max (iter->dbl[0], v->f);
760             iter->int1 = 1;
761             break;
762           case MAX | FSTRING:
763             if (memcmp (iter->string, v->s, iter->src->width) < 0)
764               memcpy (iter->string, v->s, iter->src->width);
765             iter->int1 = 1;
766             break;
767           case MIN:
768             iter->dbl[0] = min (iter->dbl[0], v->f);
769             iter->int1 = 1;
770             break;
771           case MIN | FSTRING:
772             if (memcmp (iter->string, v->s, iter->src->width) > 0)
773               memcpy (iter->string, v->s, iter->src->width);
774             iter->int1 = 1;
775             break;
776           case FGT:
777           case PGT:
778             if (v->f > iter->arg[0].f)
779               iter->dbl[0] += weight;
780             iter->dbl[1] += weight;
781             break;
782           case FGT | FSTRING:
783           case PGT | FSTRING:
784             if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
785               iter->dbl[0] += weight;
786             iter->dbl[1] += weight;
787             break;
788           case FLT:
789           case PLT:
790             if (v->f < iter->arg[0].f)
791               iter->dbl[0] += weight;
792             iter->dbl[1] += weight;
793             break;
794           case FLT | FSTRING:
795           case PLT | FSTRING:
796             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
797               iter->dbl[0] += weight;
798             iter->dbl[1] += weight;
799             break;
800           case FIN:
801           case PIN:
802             if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
803               iter->dbl[0] += weight;
804             iter->dbl[1] += weight;
805             break;
806           case FIN | FSTRING:
807           case PIN | FSTRING:
808             if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
809                 && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
810               iter->dbl[0] += weight;
811             iter->dbl[1] += weight;
812             break;
813           case FOUT:
814           case POUT:
815             if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
816               iter->dbl[0] += weight;
817             iter->dbl[1] += weight;
818             break;
819           case FOUT | FSTRING:
820           case POUT | FSTRING:
821             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
822                 || memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
823               iter->dbl[0] += weight;
824             iter->dbl[1] += weight;
825             break;
826           case N:
827           case N | FSTRING:
828             iter->dbl[0] += weight;
829             break;
830           case NU:
831           case NU | FSTRING:
832             iter->int1++;
833             break;
834           case FIRST:
835             if (iter->int1 == 0)
836               {
837                 iter->dbl[0] = v->f;
838                 iter->int1 = 1;
839               }
840             break;
841           case FIRST | FSTRING:
842             if (iter->int1 == 0)
843               {
844                 memcpy (iter->string, v->s, iter->src->width);
845                 iter->int1 = 1;
846               }
847             break;
848           case LAST:
849             iter->dbl[0] = v->f;
850             iter->int1 = 1;
851             break;
852           case LAST | FSTRING:
853             memcpy (iter->string, v->s, iter->src->width);
854             iter->int1 = 1;
855             break;
856           case NMISS:
857           case NMISS | FSTRING:
858           case NUMISS:
859           case NUMISS | FSTRING:
860             /* Our value is not missing or it would have been
861                caught earlier.  Nothing to do. */
862             break;
863           default:
864             assert (0);
865           }
866     } else {
867       switch (iter->function)
868         {
869         case N_NO_VARS:
870           iter->dbl[0] += weight;
871           break;
872         case NU_NO_VARS:
873           iter->int1++;
874           break;
875         default:
876           assert (0);
877         }
878     }
879 }
880
881 /* We've come to a record that differs from the previous in one or
882    more of the break variables.  Make an output record from the
883    accumulated statistics in the OUTPUT case. */
884 static void 
885 dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
886 {
887   {
888     int value_idx = 0;
889     int i;
890
891     for (i = 0; i < agr->break_var_cnt; i++) 
892       {
893         struct variable *v = agr->break_vars[i];
894         memcpy (case_data_rw (output, value_idx),
895                 case_data (&agr->break_case, v->fv),
896                 sizeof (union value) * v->nv);
897         value_idx += v->nv; 
898       }
899   }
900   
901   {
902     struct agr_var *i;
903   
904     for (i = agr->agr_vars; i; i = i->next)
905       {
906         union value *v = case_data_rw (output, i->dest->fv);
907
908         if (agr->missing == COLUMNWISE && i->missing != 0
909             && (i->function & FUNC) != N && (i->function & FUNC) != NU
910             && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
911           {
912             if (i->dest->type == ALPHA)
913               memset (v->s, ' ', i->dest->width);
914             else
915               v->f = SYSMIS;
916             continue;
917           }
918         
919         switch (i->function)
920           {
921           case SUM:
922             v->f = i->int1 ? i->dbl[0] : SYSMIS;
923             break;
924           case MEAN:
925             v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
926             break;
927           case SD:
928             {
929               double variance;
930
931               /* FIXME: we should use two passes. */
932               moments1_calculate (i->moments, NULL, NULL, &variance,
933                                  NULL, NULL);
934               if (variance != SYSMIS)
935                 v->f = sqrt (variance);
936               else
937                 v->f = SYSMIS; 
938             }
939             break;
940           case MAX:
941           case MIN:
942             v->f = i->int1 ? i->dbl[0] : SYSMIS;
943             break;
944           case MAX | FSTRING:
945           case MIN | FSTRING:
946             if (i->int1)
947               memcpy (v->s, i->string, i->dest->width);
948             else
949               memset (v->s, ' ', i->dest->width);
950             break;
951           case FGT:
952           case FGT | FSTRING:
953           case FLT:
954           case FLT | FSTRING:
955           case FIN:
956           case FIN | FSTRING:
957           case FOUT:
958           case FOUT | FSTRING:
959             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
960             break;
961           case PGT:
962           case PGT | FSTRING:
963           case PLT:
964           case PLT | FSTRING:
965           case PIN:
966           case PIN | FSTRING:
967           case POUT:
968           case POUT | FSTRING:
969             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
970             break;
971           case N:
972           case N | FSTRING:
973             v->f = i->dbl[0];
974             break;
975           case NU:
976           case NU | FSTRING:
977             v->f = i->int1;
978             break;
979           case FIRST:
980           case LAST:
981             v->f = i->int1 ? i->dbl[0] : SYSMIS;
982             break;
983           case FIRST | FSTRING:
984           case LAST | FSTRING:
985             if (i->int1)
986               memcpy (v->s, i->string, i->dest->width);
987             else
988               memset (v->s, ' ', i->dest->width);
989             break;
990           case N_NO_VARS:
991             v->f = i->dbl[0];
992             break;
993           case NU_NO_VARS:
994             v->f = i->int1;
995             break;
996           case NMISS:
997           case NMISS | FSTRING:
998             v->f = i->dbl[0];
999             break;
1000           case NUMISS:
1001           case NUMISS | FSTRING:
1002             v->f = i->int1;
1003             break;
1004           default:
1005             assert (0);
1006           }
1007       }
1008   }
1009 }
1010
1011 /* Resets the state for all the aggregate functions. */
1012 static void
1013 initialize_aggregate_info (struct agr_proc *agr, const struct ccase *input)
1014 {
1015   struct agr_var *iter;
1016
1017   case_destroy (&agr->break_case);
1018   case_clone (&agr->break_case, input);
1019
1020   for (iter = agr->agr_vars; iter; iter = iter->next)
1021     {
1022       iter->missing = 0;
1023       iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1024       iter->int1 = iter->int2 = 0;
1025       switch (iter->function)
1026         {
1027         case MIN:
1028           iter->dbl[0] = DBL_MAX;
1029           break;
1030         case MIN | FSTRING:
1031           memset (iter->string, 255, iter->src->width);
1032           break;
1033         case MAX:
1034           iter->dbl[0] = -DBL_MAX;
1035           break;
1036         case MAX | FSTRING:
1037           memset (iter->string, 0, iter->src->width);
1038           break;
1039         case SD:
1040           if (iter->moments == NULL)
1041             iter->moments = moments1_create (MOMENT_VARIANCE);
1042           else
1043             moments1_clear (iter->moments);
1044           break;
1045         default:
1046           break;
1047         }
1048     }
1049 }
1050 \f
1051 /* Aggregate each case as it comes through.  Cases which aren't needed
1052    are dropped. */
1053 static int
1054 agr_to_active_file (struct ccase *c, void *agr_)
1055 {
1056   struct agr_proc *agr = agr_;
1057
1058   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1059     agr->sink->class->write (agr->sink, &agr->agr_case);
1060
1061   return 1;
1062 }
1063
1064 /* Aggregate the current case and output it if we passed a
1065    breakpoint. */
1066 static int
1067 presorted_agr_to_sysfile (struct ccase *c, void *agr_) 
1068 {
1069   struct agr_proc *agr = agr_;
1070
1071   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1072     sfm_write_case (agr->writer, &agr->agr_case);
1073
1074   return 1;
1075 }