Categorical value cache added
[pspp-builds.git] / src / aggregate.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21 #include "error.h"
22 #include <stdlib.h>
23 #include "alloc.h"
24 #include "case.h"
25 #include "casefile.h"
26 #include "command.h"
27 #include "dictionary.h"
28 #include "error.h"
29 #include "file-handle.h"
30 #include "lexer.h"
31 #include "misc.h"
32 #include "moments.h"
33 #include "pool.h"
34 #include "settings.h"
35 #include "sfm-write.h"
36 #include "sort-prs.h"
37 #include "sort.h"
38 #include "str.h"
39 #include "var.h"
40 #include "vfm.h"
41 #include "vfmP.h"
42
43 #include "gettext.h"
44 #define _(msgid) gettext (msgid)
45
46 /* Specifies how to make an aggregate variable. */
47 struct agr_var
48   {
49     struct agr_var *next;               /* Next in list. */
50
51     /* Collected during parsing. */
52     struct variable *src;       /* Source variable. */
53     struct variable *dest;      /* Target variable. */
54     int function;               /* Function. */
55     int include_missing;        /* 1=Include user-missing values. */
56     union value arg[2];         /* Arguments. */
57
58     /* Accumulated during AGGREGATE execution. */
59     double dbl[3];
60     int int1, int2;
61     char *string;
62     int missing;
63     struct moments1 *moments;
64   };
65
66 /* Aggregation functions. */
67 enum
68   {
69     NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
70     FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
71     N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
72     FUNC = 0x1f, /* Function mask. */
73     FSTRING = 1<<5, /* String function bit. */
74   };
75
76 /* Attributes of an aggregation function. */
77 struct agr_func
78   {
79     const char *name;           /* Aggregation function name. */
80     size_t n_args;              /* Number of arguments. */
81     int alpha_type;             /* When given ALPHA arguments, output type. */
82     struct fmt_spec format;     /* Format spec if alpha_type != ALPHA. */
83   };
84
85 /* Attributes of aggregation functions. */
86 static const struct agr_func agr_func_tab[] = 
87   {
88     {"<NONE>",  0, -1,      {0, 0, 0}},
89     {"SUM",     0, -1,      {FMT_F, 8, 2}},
90     {"MEAN",    0, -1,      {FMT_F, 8, 2}},
91     {"SD",      0, -1,      {FMT_F, 8, 2}},
92     {"MAX",     0, ALPHA,   {-1, -1, -1}}, 
93     {"MIN",     0, ALPHA,   {-1, -1, -1}}, 
94     {"PGT",     1, NUMERIC, {FMT_F, 5, 1}},      
95     {"PLT",     1, NUMERIC, {FMT_F, 5, 1}},       
96     {"PIN",     2, NUMERIC, {FMT_F, 5, 1}},       
97     {"POUT",    2, NUMERIC, {FMT_F, 5, 1}},       
98     {"FGT",     1, NUMERIC, {FMT_F, 5, 3}},       
99     {"FLT",     1, NUMERIC, {FMT_F, 5, 3}},       
100     {"FIN",     2, NUMERIC, {FMT_F, 5, 3}},       
101     {"FOUT",    2, NUMERIC, {FMT_F, 5, 3}},       
102     {"N",       0, NUMERIC, {FMT_F, 7, 0}},       
103     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},       
104     {"NMISS",   0, NUMERIC, {FMT_F, 7, 0}},       
105     {"NUMISS",  0, NUMERIC, {FMT_F, 7, 0}},       
106     {"FIRST",   0, ALPHA,   {-1, -1, -1}}, 
107     {"LAST",    0, ALPHA,   {-1, -1, -1}},
108     {NULL,      0, -1,      {-1, -1, -1}},
109     {"N",       0, NUMERIC, {FMT_F, 7, 0}},
110     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},
111   };
112
113 /* Missing value types. */
114 enum missing_treatment
115   {
116     ITEMWISE,           /* Missing values item by item. */
117     COLUMNWISE          /* Missing values column by column. */
118   };
119
120 /* An entire AGGREGATE procedure. */
121 struct agr_proc 
122   {
123     /* We have either an output file or a sink. */
124     struct sfm_writer *writer;          /* Output file, or null if none. */
125     struct case_sink *sink;             /* Sink, or null if none. */
126
127     /* Break variables. */
128     struct sort_criteria *sort;         /* Sort criteria. */
129     struct variable **break_vars;       /* Break variables. */
130     size_t break_var_cnt;               /* Number of break variables. */
131     struct ccase break_case;            /* Last values of break variables. */
132
133     enum missing_treatment missing;     /* How to treat missing values. */
134     struct agr_var *agr_vars;           /* First aggregate variable. */
135     struct dictionary *dict;            /* Aggregate dictionary. */
136     int case_cnt;                       /* Counts aggregated cases. */
137     struct ccase agr_case;              /* Aggregate case for output. */
138   };
139
140 static void initialize_aggregate_info (struct agr_proc *,
141                                        const struct ccase *);
142
143 /* Prototypes. */
144 static int parse_aggregate_functions (struct agr_proc *);
145 static void agr_destroy (struct agr_proc *);
146 static int aggregate_single_case (struct agr_proc *agr,
147                                   const struct ccase *input,
148                                   struct ccase *output);
149 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
150
151 /* Aggregating to the active file. */
152 static int agr_to_active_file (struct ccase *, void *aux);
153
154 /* Aggregating to a system file. */
155 static int presorted_agr_to_sysfile (struct ccase *, void *aux);
156 \f
157 /* Parsing. */
158
159 /* Parses and executes the AGGREGATE procedure. */
160 int
161 cmd_aggregate (void)
162 {
163   struct agr_proc agr;
164   struct file_handle *out_file = NULL;
165
166   bool copy_documents = false;
167   bool presorted = false;
168   bool saw_direction;
169
170   memset(&agr, 0 , sizeof (agr));
171   agr.missing = ITEMWISE;
172   case_nullify (&agr.break_case);
173   
174   agr.dict = dict_create ();
175   dict_set_label (agr.dict, dict_get_label (default_dict));
176   dict_set_documents (agr.dict, dict_get_documents (default_dict));
177
178   /* OUTFILE subcommand must be first. */
179   if (!lex_force_match_id ("OUTFILE"))
180     goto error;
181   lex_match ('=');
182   if (!lex_match ('*'))
183     {
184       out_file = fh_parse ();
185       if (out_file == NULL)
186         goto error;
187     }
188   
189   /* Read most of the subcommands. */
190   for (;;)
191     {
192       lex_match ('/');
193       
194       if (lex_match_id ("MISSING"))
195         {
196           lex_match ('=');
197           if (!lex_match_id ("COLUMNWISE"))
198             {
199               lex_error (_("while expecting COLUMNWISE"));
200               goto error;
201             }
202           agr.missing = COLUMNWISE;
203         }
204       else if (lex_match_id ("DOCUMENT"))
205         copy_documents = true;
206       else if (lex_match_id ("PRESORTED"))
207         presorted = true;
208       else if (lex_match_id ("BREAK"))
209         {
210           int i;
211
212           lex_match ('=');
213           agr.sort = sort_parse_criteria (default_dict,
214                                           &agr.break_vars, &agr.break_var_cnt,
215                                           &saw_direction, NULL);
216           if (agr.sort == NULL)
217             goto error;
218           
219           for (i = 0; i < agr.break_var_cnt; i++)
220             dict_clone_var_assert (agr.dict, agr.break_vars[i],
221                                    agr.break_vars[i]->name);
222
223           /* BREAK must follow the options. */
224           break;
225         }
226       else
227         {
228           lex_error (_("expecting BREAK"));
229           goto error;
230         }
231     }
232   if (presorted && saw_direction)
233     msg (SW, _("When PRESORTED is specified, specifying sorting directions "
234                "with (A) or (D) has no effect.  Output data will be sorted "
235                "the same way as the input data."));
236       
237   /* Read in the aggregate functions. */
238   lex_match ('/');
239   if (!parse_aggregate_functions (&agr))
240     goto error;
241
242   /* Delete documents. */
243   if (!copy_documents)
244     dict_set_documents (agr.dict, NULL);
245
246   /* Cancel SPLIT FILE. */
247   dict_set_split_vars (agr.dict, NULL, 0);
248   
249   /* Initialize. */
250   agr.case_cnt = 0;
251   case_create (&agr.agr_case, dict_get_next_value_idx (agr.dict));
252
253   /* Output to active file or external file? */
254   if (out_file == NULL) 
255     {
256       /* The active file will be replaced by the aggregated data,
257          so TEMPORARY is moot. */
258       cancel_temporary ();
259
260       if (agr.sort != NULL && !presorted)
261         sort_active_file_in_place (agr.sort);
262
263       agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
264       if (agr.sink->class->open != NULL)
265         agr.sink->class->open (agr.sink);
266       vfm_sink = create_case_sink (&null_sink_class, default_dict, NULL);
267       procedure (agr_to_active_file, &agr);
268       if (agr.case_cnt > 0) 
269         {
270           dump_aggregate_info (&agr, &agr.agr_case);
271           agr.sink->class->write (agr.sink, &agr.agr_case);
272         }
273       dict_destroy (default_dict);
274       default_dict = agr.dict;
275       agr.dict = NULL;
276       vfm_source = agr.sink->class->make_source (agr.sink);
277       free_case_sink (agr.sink);
278     }
279   else
280     {
281       agr.writer = sfm_open_writer (out_file, agr.dict,
282                                     sfm_writer_default_options ());
283       if (agr.writer == NULL)
284         goto error;
285       
286       if (agr.sort != NULL && !presorted) 
287         {
288           /* Sorting is needed. */
289           struct casefile *dst;
290           struct casereader *reader;
291           struct ccase c;
292           
293           dst = sort_active_file_to_casefile (agr.sort);
294           if (dst == NULL)
295             goto error;
296           reader = casefile_get_destructive_reader (dst);
297           while (casereader_read_xfer (reader, &c)) 
298             {
299               if (aggregate_single_case (&agr, &c, &agr.agr_case)) 
300                 sfm_write_case (agr.writer, &agr.agr_case);
301               case_destroy (&c);
302             }
303           casereader_destroy (reader);
304           casefile_destroy (dst);
305         }
306       else 
307         {
308           /* Active file is already sorted. */
309           procedure (presorted_agr_to_sysfile, &agr);
310         }
311       
312       if (agr.case_cnt > 0) 
313         {
314           dump_aggregate_info (&agr, &agr.agr_case);
315           sfm_write_case (agr.writer, &agr.agr_case);
316         }
317     }
318   
319   agr_destroy (&agr);
320   return CMD_SUCCESS;
321
322 error:
323   agr_destroy (&agr);
324   return CMD_FAILURE;
325 }
326
327 /* Parse all the aggregate functions. */
328 static int
329 parse_aggregate_functions (struct agr_proc *agr)
330 {
331   struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
332
333   /* Parse everything. */
334   tail = NULL;
335   for (;;)
336     {
337       char **dest;
338       char **dest_label;
339       size_t n_dest;
340
341       int include_missing;
342       const struct agr_func *function;
343       int func_index;
344
345       union value arg[2];
346
347       struct variable **src;
348       size_t n_src;
349
350       size_t i;
351
352       dest = NULL;
353       dest_label = NULL;
354       n_dest = 0;
355       src = NULL;
356       function = NULL;
357       n_src = 0;
358       arg[0].c = NULL;
359       arg[1].c = NULL;
360
361       /* Parse the list of target variables. */
362       while (!lex_match ('='))
363         {
364           size_t n_dest_prev = n_dest;
365           
366           if (!parse_DATA_LIST_vars (&dest, &n_dest,
367                                      PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
368             goto error;
369
370           /* Assign empty labels. */
371           {
372             int j;
373
374             dest_label = xnrealloc (dest_label, n_dest, sizeof *dest_label);
375             for (j = n_dest_prev; j < n_dest; j++)
376               dest_label[j] = NULL;
377           }
378           
379           if (token == T_STRING)
380             {
381               ds_truncate (&tokstr, 255);
382               dest_label[n_dest - 1] = xstrdup (ds_c_str (&tokstr));
383               lex_get ();
384             }
385         }
386
387       /* Get the name of the aggregation function. */
388       if (token != T_ID)
389         {
390           lex_error (_("expecting aggregation function"));
391           goto error;
392         }
393
394       include_missing = 0;
395       if (tokid[strlen (tokid) - 1] == '.')
396         {
397           include_missing = 1;
398           tokid[strlen (tokid) - 1] = 0;
399         }
400       
401       for (function = agr_func_tab; function->name; function++)
402         if (!strcasecmp (function->name, tokid))
403           break;
404       if (NULL == function->name)
405         {
406           msg (SE, _("Unknown aggregation function %s."), tokid);
407           goto error;
408         }
409       func_index = function - agr_func_tab;
410       lex_get ();
411
412       /* Check for leading lparen. */
413       if (!lex_match ('('))
414         {
415           if (func_index == N)
416             func_index = N_NO_VARS;
417           else if (func_index == NU)
418             func_index = NU_NO_VARS;
419           else
420             {
421               lex_error (_("expecting `('"));
422               goto error;
423             }
424         }
425       else
426         {
427           /* Parse list of source variables. */
428           {
429             int pv_opts = PV_NO_SCRATCH;
430
431             if (func_index == SUM || func_index == MEAN || func_index == SD)
432               pv_opts |= PV_NUMERIC;
433             else if (function->n_args)
434               pv_opts |= PV_SAME_TYPE;
435
436             if (!parse_variables (default_dict, &src, &n_src, pv_opts))
437               goto error;
438           }
439
440           /* Parse function arguments, for those functions that
441              require arguments. */
442           if (function->n_args != 0)
443             for (i = 0; i < function->n_args; i++)
444               {
445                 int type;
446             
447                 lex_match (',');
448                 if (token == T_STRING)
449                   {
450                     arg[i].c = xstrdup (ds_c_str (&tokstr));
451                     type = ALPHA;
452                   }
453                 else if (lex_is_number ())
454                   {
455                     arg[i].f = tokval;
456                     type = NUMERIC;
457                   } else {
458                     msg (SE, _("Missing argument %d to %s."), i + 1,
459                          function->name);
460                     goto error;
461                   }
462             
463                 lex_get ();
464
465                 if (type != src[0]->type)
466                   {
467                     msg (SE, _("Arguments to %s must be of same type as "
468                                "source variables."),
469                          function->name);
470                     goto error;
471                   }
472               }
473
474           /* Trailing rparen. */
475           if (!lex_match(')'))
476             {
477               lex_error (_("expecting `)'"));
478               goto error;
479             }
480           
481           /* Now check that the number of source variables match
482              the number of target variables.  If we check earlier
483              than this, the user can get very misleading error
484              message, i.e. `AGGREGATE x=SUM(y t).' will get this
485              error message when a proper message would be more
486              like `unknown variable t'. */
487           if (n_src != n_dest)
488             {
489               msg (SE, _("Number of source variables (%u) does not match "
490                          "number of target variables (%u)."),
491                    (unsigned) n_src, (unsigned) n_dest);
492               goto error;
493             }
494
495           if ((func_index == PIN || func_index == POUT
496               || func_index == FIN || func_index == FOUT) 
497               && ((src[0]->type == NUMERIC && arg[0].f > arg[1].f)
498                   || (src[0]->type == ALPHA
499                       && str_compare_rpad (arg[0].c, arg[1].c) > 0)))
500             {
501               union value t = arg[0];
502               arg[0] = arg[1];
503               arg[1] = t;
504                   
505               msg (SW, _("The value arguments passed to the %s function "
506                          "are out-of-order.  They will be treated as if "
507                          "they had been specified in the correct order."),
508                    function->name);
509             }
510         }
511         
512       /* Finally add these to the linked list of aggregation
513          variables. */
514       for (i = 0; i < n_dest; i++)
515         {
516           struct agr_var *v = xmalloc (sizeof *v);
517
518           /* Add variable to chain. */
519           if (agr->agr_vars != NULL)
520             tail->next = v;
521           else
522             agr->agr_vars = v;
523           tail = v;
524           tail->next = NULL;
525           v->moments = NULL;
526           
527           /* Create the target variable in the aggregate
528              dictionary. */
529           {
530             struct variable *destvar;
531             
532             v->function = func_index;
533
534             if (src)
535               {
536                 v->src = src[i];
537                 
538                 if (src[i]->type == ALPHA)
539                   {
540                     v->function |= FSTRING;
541                     v->string = xmalloc (src[i]->width);
542                   }
543
544                 if (function->alpha_type == ALPHA)
545                   destvar = dict_clone_var (agr->dict, v->src, dest[i]);
546                 else
547                   {
548                     assert (v->src->type == NUMERIC
549                             || function->alpha_type == NUMERIC);
550                     destvar = dict_create_var (agr->dict, dest[i], 0);
551                     if (destvar != NULL) 
552                       {
553                         if ((func_index == N || func_index == NMISS)
554                             && dict_get_weight (default_dict) != NULL)
555                           destvar->print = destvar->write = f8_2; 
556                         else
557                           destvar->print = destvar->write = function->format;
558                       }
559                   }
560               } else {
561                 v->src = NULL;
562                 destvar = dict_create_var (agr->dict, dest[i], 0);
563                 if (func_index == N_NO_VARS
564                     && dict_get_weight (default_dict) != NULL)
565                   destvar->print = destvar->write = f8_2; 
566                 else
567                   destvar->print = destvar->write = function->format;
568               }
569           
570             if (!destvar)
571               {
572                 msg (SE, _("Variable name %s is not unique within the "
573                            "aggregate file dictionary, which contains "
574                            "the aggregate variables and the break "
575                            "variables."),
576                      dest[i]);
577                 goto error;
578               }
579
580             free (dest[i]);
581             destvar->init = 0;
582             if (dest_label[i])
583               {
584                 destvar->label = dest_label[i];
585                 dest_label[i] = NULL;
586               }
587
588             v->dest = destvar;
589           }
590           
591           v->include_missing = include_missing;
592
593           if (v->src != NULL)
594             {
595               int j;
596
597               if (v->src->type == NUMERIC)
598                 for (j = 0; j < function->n_args; j++)
599                   v->arg[j].f = arg[j].f;
600               else
601                 for (j = 0; j < function->n_args; j++)
602                   v->arg[j].c = xstrdup (arg[j].c);
603             }
604         }
605       
606       if (src != NULL && src[0]->type == ALPHA)
607         for (i = 0; i < function->n_args; i++)
608           {
609             free (arg[i].c);
610             arg[i].c = NULL;
611           }
612
613       free (src);
614       free (dest);
615       free (dest_label);
616
617       if (!lex_match ('/'))
618         {
619           if (token == '.')
620             return 1;
621
622           lex_error ("expecting end of command");
623           return 0;
624         }
625       continue;
626       
627     error:
628       for (i = 0; i < n_dest; i++)
629         {
630           free (dest[i]);
631           free (dest_label[i]);
632         }
633       free (dest);
634       free (dest_label);
635       free (arg[0].c);
636       free (arg[1].c);
637       if (src && n_src && src[0]->type == ALPHA)
638         for (i = 0; i < function->n_args; i++)
639           {
640             free (arg[i].c);
641             arg[i].c = NULL;
642           }
643       free (src);
644         
645       return 0;
646     }
647 }
648
649 /* Destroys AGR. */
650 static void
651 agr_destroy (struct agr_proc *agr)
652 {
653   struct agr_var *iter, *next;
654
655   sfm_close_writer (agr->writer);
656   if (agr->sort != NULL)
657     sort_destroy_criteria (agr->sort);
658   free (agr->break_vars);
659   case_destroy (&agr->break_case);
660   for (iter = agr->agr_vars; iter; iter = next)
661     {
662       next = iter->next;
663
664       if (iter->function & FSTRING)
665         {
666           size_t n_args;
667           size_t i;
668
669           n_args = agr_func_tab[iter->function & FUNC].n_args;
670           for (i = 0; i < n_args; i++)
671             free (iter->arg[i].c);
672           free (iter->string);
673         }
674       else if (iter->function == SD)
675         moments1_destroy (iter->moments);
676       free (iter);
677     }
678   if (agr->dict != NULL)
679     dict_destroy (agr->dict);
680
681   case_destroy (&agr->agr_case);
682 }
683 \f
684 /* Execution. */
685
686 static void accumulate_aggregate_info (struct agr_proc *,
687                                        const struct ccase *);
688 static void dump_aggregate_info (struct agr_proc *, struct ccase *);
689
690 /* Processes a single case INPUT for aggregation.  If output is
691    warranted, writes it to OUTPUT and returns nonzero.
692    Otherwise, returns zero and OUTPUT is unmodified. */
693 static int
694 aggregate_single_case (struct agr_proc *agr,
695                        const struct ccase *input, struct ccase *output)
696 {
697   bool finished_group = false;
698   
699   if (agr->case_cnt++ == 0)
700     initialize_aggregate_info (agr, input);
701   else if (case_compare (&agr->break_case, input,
702                          agr->break_vars, agr->break_var_cnt))
703     {
704       dump_aggregate_info (agr, output);
705       finished_group = true;
706
707       initialize_aggregate_info (agr, input);
708     }
709
710   accumulate_aggregate_info (agr, input);
711   return finished_group;
712 }
713
714 /* Accumulates aggregation data from the case INPUT. */
715 static void 
716 accumulate_aggregate_info (struct agr_proc *agr,
717                            const struct ccase *input)
718 {
719   struct agr_var *iter;
720   double weight;
721   int bad_warn = 1;
722
723   weight = dict_get_case_weight (default_dict, input, &bad_warn);
724
725   for (iter = agr->agr_vars; iter; iter = iter->next)
726     if (iter->src)
727       {
728         const union value *v = case_data (input, iter->src->fv);
729
730         if ((!iter->include_missing
731              && mv_is_value_missing (&iter->src->miss, v))
732             || (iter->include_missing && iter->src->type == NUMERIC
733                 && v->f == SYSMIS))
734           {
735             switch (iter->function)
736               {
737               case NMISS:
738               case NMISS | FSTRING:
739                 iter->dbl[0] += weight;
740                 break;
741               case NUMISS:
742               case NUMISS | FSTRING:
743                 iter->int1++;
744                 break;
745               }
746             iter->missing = 1;
747             continue;
748           }
749         
750         /* This is horrible.  There are too many possibilities. */
751         switch (iter->function)
752           {
753           case SUM:
754             iter->dbl[0] += v->f * weight;
755             iter->int1 = 1;
756             break;
757           case MEAN:
758             iter->dbl[0] += v->f * weight;
759             iter->dbl[1] += weight;
760             break;
761           case SD:
762             moments1_add (iter->moments, v->f, weight);
763             break;
764           case MAX:
765             iter->dbl[0] = max (iter->dbl[0], v->f);
766             iter->int1 = 1;
767             break;
768           case MAX | FSTRING:
769             if (memcmp (iter->string, v->s, iter->src->width) < 0)
770               memcpy (iter->string, v->s, iter->src->width);
771             iter->int1 = 1;
772             break;
773           case MIN:
774             iter->dbl[0] = min (iter->dbl[0], v->f);
775             iter->int1 = 1;
776             break;
777           case MIN | FSTRING:
778             if (memcmp (iter->string, v->s, iter->src->width) > 0)
779               memcpy (iter->string, v->s, iter->src->width);
780             iter->int1 = 1;
781             break;
782           case FGT:
783           case PGT:
784             if (v->f > iter->arg[0].f)
785               iter->dbl[0] += weight;
786             iter->dbl[1] += weight;
787             break;
788           case FGT | FSTRING:
789           case PGT | FSTRING:
790             if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
791               iter->dbl[0] += weight;
792             iter->dbl[1] += weight;
793             break;
794           case FLT:
795           case PLT:
796             if (v->f < iter->arg[0].f)
797               iter->dbl[0] += weight;
798             iter->dbl[1] += weight;
799             break;
800           case FLT | FSTRING:
801           case PLT | FSTRING:
802             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
803               iter->dbl[0] += weight;
804             iter->dbl[1] += weight;
805             break;
806           case FIN:
807           case PIN:
808             if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
809               iter->dbl[0] += weight;
810             iter->dbl[1] += weight;
811             break;
812           case FIN | FSTRING:
813           case PIN | FSTRING:
814             if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
815                 && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
816               iter->dbl[0] += weight;
817             iter->dbl[1] += weight;
818             break;
819           case FOUT:
820           case POUT:
821             if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
822               iter->dbl[0] += weight;
823             iter->dbl[1] += weight;
824             break;
825           case FOUT | FSTRING:
826           case POUT | FSTRING:
827             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
828                 || memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
829               iter->dbl[0] += weight;
830             iter->dbl[1] += weight;
831             break;
832           case N:
833           case N | FSTRING:
834             iter->dbl[0] += weight;
835             break;
836           case NU:
837           case NU | FSTRING:
838             iter->int1++;
839             break;
840           case FIRST:
841             if (iter->int1 == 0)
842               {
843                 iter->dbl[0] = v->f;
844                 iter->int1 = 1;
845               }
846             break;
847           case FIRST | FSTRING:
848             if (iter->int1 == 0)
849               {
850                 memcpy (iter->string, v->s, iter->src->width);
851                 iter->int1 = 1;
852               }
853             break;
854           case LAST:
855             iter->dbl[0] = v->f;
856             iter->int1 = 1;
857             break;
858           case LAST | FSTRING:
859             memcpy (iter->string, v->s, iter->src->width);
860             iter->int1 = 1;
861             break;
862           case NMISS:
863           case NMISS | FSTRING:
864           case NUMISS:
865           case NUMISS | FSTRING:
866             /* Our value is not missing or it would have been
867                caught earlier.  Nothing to do. */
868             break;
869           default:
870             assert (0);
871           }
872     } else {
873       switch (iter->function)
874         {
875         case N_NO_VARS:
876           iter->dbl[0] += weight;
877           break;
878         case NU_NO_VARS:
879           iter->int1++;
880           break;
881         default:
882           assert (0);
883         }
884     }
885 }
886
887 /* We've come to a record that differs from the previous in one or
888    more of the break variables.  Make an output record from the
889    accumulated statistics in the OUTPUT case. */
890 static void 
891 dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
892 {
893   {
894     int value_idx = 0;
895     int i;
896
897     for (i = 0; i < agr->break_var_cnt; i++) 
898       {
899         struct variable *v = agr->break_vars[i];
900         memcpy (case_data_rw (output, value_idx),
901                 case_data (&agr->break_case, v->fv),
902                 sizeof (union value) * v->nv);
903         value_idx += v->nv; 
904       }
905   }
906   
907   {
908     struct agr_var *i;
909   
910     for (i = agr->agr_vars; i; i = i->next)
911       {
912         union value *v = case_data_rw (output, i->dest->fv);
913
914         if (agr->missing == COLUMNWISE && i->missing != 0
915             && (i->function & FUNC) != N && (i->function & FUNC) != NU
916             && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
917           {
918             if (i->dest->type == ALPHA)
919               memset (v->s, ' ', i->dest->width);
920             else
921               v->f = SYSMIS;
922             continue;
923           }
924         
925         switch (i->function)
926           {
927           case SUM:
928             v->f = i->int1 ? i->dbl[0] : SYSMIS;
929             break;
930           case MEAN:
931             v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
932             break;
933           case SD:
934             {
935               double variance;
936
937               /* FIXME: we should use two passes. */
938               moments1_calculate (i->moments, NULL, NULL, &variance,
939                                  NULL, NULL);
940               if (variance != SYSMIS)
941                 v->f = sqrt (variance);
942               else
943                 v->f = SYSMIS; 
944             }
945             break;
946           case MAX:
947           case MIN:
948             v->f = i->int1 ? i->dbl[0] : SYSMIS;
949             break;
950           case MAX | FSTRING:
951           case MIN | FSTRING:
952             if (i->int1)
953               memcpy (v->s, i->string, i->dest->width);
954             else
955               memset (v->s, ' ', i->dest->width);
956             break;
957           case FGT:
958           case FGT | FSTRING:
959           case FLT:
960           case FLT | FSTRING:
961           case FIN:
962           case FIN | FSTRING:
963           case FOUT:
964           case FOUT | FSTRING:
965             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
966             break;
967           case PGT:
968           case PGT | FSTRING:
969           case PLT:
970           case PLT | FSTRING:
971           case PIN:
972           case PIN | FSTRING:
973           case POUT:
974           case POUT | FSTRING:
975             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
976             break;
977           case N:
978           case N | FSTRING:
979             v->f = i->dbl[0];
980             break;
981           case NU:
982           case NU | FSTRING:
983             v->f = i->int1;
984             break;
985           case FIRST:
986           case LAST:
987             v->f = i->int1 ? i->dbl[0] : SYSMIS;
988             break;
989           case FIRST | FSTRING:
990           case LAST | FSTRING:
991             if (i->int1)
992               memcpy (v->s, i->string, i->dest->width);
993             else
994               memset (v->s, ' ', i->dest->width);
995             break;
996           case N_NO_VARS:
997             v->f = i->dbl[0];
998             break;
999           case NU_NO_VARS:
1000             v->f = i->int1;
1001             break;
1002           case NMISS:
1003           case NMISS | FSTRING:
1004             v->f = i->dbl[0];
1005             break;
1006           case NUMISS:
1007           case NUMISS | FSTRING:
1008             v->f = i->int1;
1009             break;
1010           default:
1011             assert (0);
1012           }
1013       }
1014   }
1015 }
1016
1017 /* Resets the state for all the aggregate functions. */
1018 static void
1019 initialize_aggregate_info (struct agr_proc *agr, const struct ccase *input)
1020 {
1021   struct agr_var *iter;
1022
1023   case_destroy (&agr->break_case);
1024   case_clone (&agr->break_case, input);
1025
1026   for (iter = agr->agr_vars; iter; iter = iter->next)
1027     {
1028       iter->missing = 0;
1029       iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1030       iter->int1 = iter->int2 = 0;
1031       switch (iter->function)
1032         {
1033         case MIN:
1034           iter->dbl[0] = DBL_MAX;
1035           break;
1036         case MIN | FSTRING:
1037           memset (iter->string, 255, iter->src->width);
1038           break;
1039         case MAX:
1040           iter->dbl[0] = -DBL_MAX;
1041           break;
1042         case MAX | FSTRING:
1043           memset (iter->string, 0, iter->src->width);
1044           break;
1045         case SD:
1046           if (iter->moments == NULL)
1047             iter->moments = moments1_create (MOMENT_VARIANCE);
1048           else
1049             moments1_clear (iter->moments);
1050           break;
1051         default:
1052           break;
1053         }
1054     }
1055 }
1056 \f
1057 /* Aggregate each case as it comes through.  Cases which aren't needed
1058    are dropped. */
1059 static int
1060 agr_to_active_file (struct ccase *c, void *agr_)
1061 {
1062   struct agr_proc *agr = agr_;
1063
1064   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1065     agr->sink->class->write (agr->sink, &agr->agr_case);
1066
1067   return 1;
1068 }
1069
1070 /* Aggregate the current case and output it if we passed a
1071    breakpoint. */
1072 static int
1073 presorted_agr_to_sysfile (struct ccase *c, void *agr_) 
1074 {
1075   struct agr_proc *agr = agr_;
1076
1077   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1078     sfm_write_case (agr->writer, &agr->agr_case);
1079
1080   return 1;
1081 }