Revise.
[pspp] / src / aggregate.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
18    02110-1301, USA. */
19
20 #include <config.h>
21 #include "error.h"
22 #include <stdlib.h>
23 #include "alloc.h"
24 #include "case.h"
25 #include "casefile.h"
26 #include "command.h"
27 #include "dictionary.h"
28 #include "error.h"
29 #include "file-handle.h"
30 #include "lexer.h"
31 #include "misc.h"
32 #include "moments.h"
33 #include "pool.h"
34 #include "settings.h"
35 #include "sfm-write.h"
36 #include "sort-prs.h"
37 #include "sort.h"
38 #include "str.h"
39 #include "var.h"
40 #include "vfm.h"
41 #include "vfmP.h"
42
43 #include "gettext.h"
44 #define _(msgid) gettext (msgid)
45
46 /* Specifies how to make an aggregate variable. */
47 struct agr_var
48   {
49     struct agr_var *next;               /* Next in list. */
50
51     /* Collected during parsing. */
52     struct variable *src;       /* Source variable. */
53     struct variable *dest;      /* Target variable. */
54     int function;               /* Function. */
55     int include_missing;        /* 1=Include user-missing values. */
56     union value arg[2];         /* Arguments. */
57
58     /* Accumulated during AGGREGATE execution. */
59     double dbl[3];
60     int int1, int2;
61     char *string;
62     int missing;
63     struct moments1 *moments;
64   };
65
66 /* Aggregation functions. */
67 enum
68   {
69     NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
70     FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
71     N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
72     FUNC = 0x1f, /* Function mask. */
73     FSTRING = 1<<5, /* String function bit. */
74   };
75
76 /* Attributes of an aggregation function. */
77 struct agr_func
78   {
79     const char *name;           /* Aggregation function name. */
80     int n_args;                 /* Number of arguments. */
81     int alpha_type;             /* When given ALPHA arguments, output type. */
82     struct fmt_spec format;     /* Format spec if alpha_type != ALPHA. */
83   };
84
85 /* Attributes of aggregation functions. */
86 static const struct agr_func agr_func_tab[] = 
87   {
88     {"<NONE>",  0, -1,      {0, 0, 0}},
89     {"SUM",     0, -1,      {FMT_F, 8, 2}},
90     {"MEAN",    0, -1,      {FMT_F, 8, 2}},
91     {"SD",      0, -1,      {FMT_F, 8, 2}},
92     {"MAX",     0, ALPHA,   {-1, -1, -1}}, 
93     {"MIN",     0, ALPHA,   {-1, -1, -1}}, 
94     {"PGT",     1, NUMERIC, {FMT_F, 5, 1}},      
95     {"PLT",     1, NUMERIC, {FMT_F, 5, 1}},       
96     {"PIN",     2, NUMERIC, {FMT_F, 5, 1}},       
97     {"POUT",    2, NUMERIC, {FMT_F, 5, 1}},       
98     {"FGT",     1, NUMERIC, {FMT_F, 5, 3}},       
99     {"FLT",     1, NUMERIC, {FMT_F, 5, 3}},       
100     {"FIN",     2, NUMERIC, {FMT_F, 5, 3}},       
101     {"FOUT",    2, NUMERIC, {FMT_F, 5, 3}},       
102     {"N",       0, NUMERIC, {FMT_F, 7, 0}},       
103     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},       
104     {"NMISS",   0, NUMERIC, {FMT_F, 7, 0}},       
105     {"NUMISS",  0, NUMERIC, {FMT_F, 7, 0}},       
106     {"FIRST",   0, ALPHA,   {-1, -1, -1}}, 
107     {"LAST",    0, ALPHA,   {-1, -1, -1}},
108     {NULL,      0, -1,      {-1, -1, -1}},
109     {"N",       0, NUMERIC, {FMT_F, 7, 0}},
110     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},
111   };
112
113 /* Missing value types. */
114 enum missing_treatment
115   {
116     ITEMWISE,           /* Missing values item by item. */
117     COLUMNWISE          /* Missing values column by column. */
118   };
119
120 /* An entire AGGREGATE procedure. */
121 struct agr_proc 
122   {
123     /* We have either an output file or a sink. */
124     struct sfm_writer *writer;          /* Output file, or null if none. */
125     struct case_sink *sink;             /* Sink, or null if none. */
126
127     /* Break variables. */
128     struct sort_criteria *sort;         /* Sort criteria. */
129     struct variable **break_vars;       /* Break variables. */
130     size_t break_var_cnt;               /* Number of break variables. */
131     struct ccase break_case;            /* Last values of break variables. */
132
133     enum missing_treatment missing;     /* How to treat missing values. */
134     struct agr_var *agr_vars;           /* First aggregate variable. */
135     struct dictionary *dict;            /* Aggregate dictionary. */
136     int case_cnt;                       /* Counts aggregated cases. */
137     struct ccase agr_case;              /* Aggregate case for output. */
138   };
139
140 static void initialize_aggregate_info (struct agr_proc *,
141                                        const struct ccase *);
142
143 /* Prototypes. */
144 static int parse_aggregate_functions (struct agr_proc *);
145 static void agr_destroy (struct agr_proc *);
146 static int aggregate_single_case (struct agr_proc *agr,
147                                   const struct ccase *input,
148                                   struct ccase *output);
149 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
150
151 /* Aggregating to the active file. */
152 static int agr_to_active_file (struct ccase *, void *aux);
153
154 /* Aggregating to a system file. */
155 static int presorted_agr_to_sysfile (struct ccase *, void *aux);
156 \f
157 /* Parsing. */
158
159 /* Parses and executes the AGGREGATE procedure. */
160 int
161 cmd_aggregate (void)
162 {
163   struct agr_proc agr;
164   struct file_handle *out_file = NULL;
165
166   bool copy_documents = false;
167   bool presorted = false;
168   bool saw_direction;
169
170   memset(&agr, 0 , sizeof (agr));
171   agr.missing = ITEMWISE;
172   case_nullify (&agr.break_case);
173   
174   agr.dict = dict_create ();
175   dict_set_label (agr.dict, dict_get_label (default_dict));
176   dict_set_documents (agr.dict, dict_get_documents (default_dict));
177
178   /* OUTFILE subcommand must be first. */
179   if (!lex_force_match_id ("OUTFILE"))
180     goto error;
181   lex_match ('=');
182   if (!lex_match ('*'))
183     {
184       out_file = fh_parse ();
185       if (out_file == NULL)
186         goto error;
187     }
188   
189   /* Read most of the subcommands. */
190   for (;;)
191     {
192       lex_match ('/');
193       
194       if (lex_match_id ("MISSING"))
195         {
196           lex_match ('=');
197           if (!lex_match_id ("COLUMNWISE"))
198             {
199               lex_error (_("while expecting COLUMNWISE"));
200               goto error;
201             }
202           agr.missing = COLUMNWISE;
203         }
204       else if (lex_match_id ("DOCUMENT"))
205         copy_documents = true;
206       else if (lex_match_id ("PRESORTED"))
207         presorted = true;
208       else if (lex_match_id ("BREAK"))
209         {
210           int i;
211
212           lex_match ('=');
213           agr.sort = sort_parse_criteria (default_dict,
214                                           &agr.break_vars, &agr.break_var_cnt,
215                                           &saw_direction, NULL);
216           if (agr.sort == NULL)
217             goto error;
218           
219           for (i = 0; i < agr.break_var_cnt; i++)
220             dict_clone_var_assert (agr.dict, agr.break_vars[i],
221                                    agr.break_vars[i]->name);
222
223           /* BREAK must follow the options. */
224           break;
225         }
226       else
227         {
228           lex_error (_("expecting BREAK"));
229           goto error;
230         }
231     }
232   if (presorted && saw_direction)
233     msg (SW, _("When PRESORTED is specified, specifying sorting directions "
234                "with (A) or (D) has no effect.  Output data will be sorted "
235                "the same way as the input data."));
236       
237   /* Read in the aggregate functions. */
238   lex_match ('/');
239   if (!parse_aggregate_functions (&agr))
240     goto error;
241
242   /* Delete documents. */
243   if (!copy_documents)
244     dict_set_documents (agr.dict, NULL);
245
246   /* Cancel SPLIT FILE. */
247   dict_set_split_vars (agr.dict, NULL, 0);
248   
249   /* Initialize. */
250   agr.case_cnt = 0;
251   case_create (&agr.agr_case, dict_get_next_value_idx (agr.dict));
252
253   /* Output to active file or external file? */
254   if (out_file == NULL) 
255     {
256       /* The active file will be replaced by the aggregated data,
257          so TEMPORARY is moot. */
258       cancel_temporary ();
259
260       if (agr.sort != NULL && !presorted)
261         sort_active_file_in_place (agr.sort);
262
263       agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
264       if (agr.sink->class->open != NULL)
265         agr.sink->class->open (agr.sink);
266       vfm_sink = create_case_sink (&null_sink_class, default_dict, NULL);
267       procedure (agr_to_active_file, &agr);
268       if (agr.case_cnt > 0) 
269         {
270           dump_aggregate_info (&agr, &agr.agr_case);
271           agr.sink->class->write (agr.sink, &agr.agr_case);
272         }
273       dict_destroy (default_dict);
274       default_dict = agr.dict;
275       agr.dict = NULL;
276       vfm_source = agr.sink->class->make_source (agr.sink);
277       free_case_sink (agr.sink);
278     }
279   else
280     {
281       agr.writer = sfm_open_writer (out_file, agr.dict, get_scompression (), 0);
282       if (agr.writer == NULL)
283         goto error;
284       
285       if (agr.sort != NULL && !presorted) 
286         {
287           /* Sorting is needed. */
288           struct casefile *dst;
289           struct casereader *reader;
290           struct ccase c;
291           
292           dst = sort_active_file_to_casefile (agr.sort);
293           if (dst == NULL)
294             goto error;
295           reader = casefile_get_destructive_reader (dst);
296           while (casereader_read_xfer (reader, &c)) 
297             {
298               if (aggregate_single_case (&agr, &c, &agr.agr_case)) 
299                 sfm_write_case (agr.writer, &agr.agr_case);
300               case_destroy (&c);
301             }
302           casereader_destroy (reader);
303           casefile_destroy (dst);
304         }
305       else 
306         {
307           /* Active file is already sorted. */
308           procedure (presorted_agr_to_sysfile, &agr);
309         }
310       
311       if (agr.case_cnt > 0) 
312         {
313           dump_aggregate_info (&agr, &agr.agr_case);
314           sfm_write_case (agr.writer, &agr.agr_case);
315         }
316     }
317   
318   agr_destroy (&agr);
319   return CMD_SUCCESS;
320
321 error:
322   agr_destroy (&agr);
323   return CMD_FAILURE;
324 }
325
326 /* Parse all the aggregate functions. */
327 static int
328 parse_aggregate_functions (struct agr_proc *agr)
329 {
330   struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
331
332   /* Parse everything. */
333   tail = NULL;
334   for (;;)
335     {
336       char **dest;
337       char **dest_label;
338       int n_dest;
339
340       int include_missing;
341       const struct agr_func *function;
342       int func_index;
343
344       union value arg[2];
345
346       struct variable **src;
347       int n_src;
348
349       int i;
350
351       dest = NULL;
352       dest_label = NULL;
353       n_dest = 0;
354       src = NULL;
355       function = NULL;
356       n_src = 0;
357       arg[0].c = NULL;
358       arg[1].c = NULL;
359
360       /* Parse the list of target variables. */
361       while (!lex_match ('='))
362         {
363           int n_dest_prev = n_dest;
364           
365           if (!parse_DATA_LIST_vars (&dest, &n_dest,
366                                      PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
367             goto error;
368
369           /* Assign empty labels. */
370           {
371             int j;
372
373             dest_label = xrealloc (dest_label, sizeof *dest_label * n_dest);
374             for (j = n_dest_prev; j < n_dest; j++)
375               dest_label[j] = NULL;
376           }
377           
378           if (token == T_STRING)
379             {
380               ds_truncate (&tokstr, 255);
381               dest_label[n_dest - 1] = xstrdup (ds_c_str (&tokstr));
382               lex_get ();
383             }
384         }
385
386       /* Get the name of the aggregation function. */
387       if (token != T_ID)
388         {
389           lex_error (_("expecting aggregation function"));
390           goto error;
391         }
392
393       include_missing = 0;
394       if (tokid[strlen (tokid) - 1] == '.')
395         {
396           include_missing = 1;
397           tokid[strlen (tokid) - 1] = 0;
398         }
399       
400       for (function = agr_func_tab; function->name; function++)
401         if (!strcasecmp (function->name, tokid))
402           break;
403       if (NULL == function->name)
404         {
405           msg (SE, _("Unknown aggregation function %s."), tokid);
406           goto error;
407         }
408       func_index = function - agr_func_tab;
409       lex_get ();
410
411       /* Check for leading lparen. */
412       if (!lex_match ('('))
413         {
414           if (func_index == N)
415             func_index = N_NO_VARS;
416           else if (func_index == NU)
417             func_index = NU_NO_VARS;
418           else
419             {
420               lex_error (_("expecting `('"));
421               goto error;
422             }
423         }
424       else
425         {
426           /* Parse list of source variables. */
427           {
428             int pv_opts = PV_NO_SCRATCH;
429
430             if (func_index == SUM || func_index == MEAN || func_index == SD)
431               pv_opts |= PV_NUMERIC;
432             else if (function->n_args)
433               pv_opts |= PV_SAME_TYPE;
434
435             if (!parse_variables (default_dict, &src, &n_src, pv_opts))
436               goto error;
437           }
438
439           /* Parse function arguments, for those functions that
440              require arguments. */
441           if (function->n_args != 0)
442             for (i = 0; i < function->n_args; i++)
443               {
444                 int type;
445             
446                 lex_match (',');
447                 if (token == T_STRING)
448                   {
449                     arg[i].c = xstrdup (ds_c_str (&tokstr));
450                     type = ALPHA;
451                   }
452                 else if (lex_is_number ())
453                   {
454                     arg[i].f = tokval;
455                     type = NUMERIC;
456                   } else {
457                     msg (SE, _("Missing argument %d to %s."), i + 1,
458                          function->name);
459                     goto error;
460                   }
461             
462                 lex_get ();
463
464                 if (type != src[0]->type)
465                   {
466                     msg (SE, _("Arguments to %s must be of same type as "
467                                "source variables."),
468                          function->name);
469                     goto error;
470                   }
471               }
472
473           /* Trailing rparen. */
474           if (!lex_match(')'))
475             {
476               lex_error (_("expecting `)'"));
477               goto error;
478             }
479           
480           /* Now check that the number of source variables match
481              the number of target variables.  If we check earlier
482              than this, the user can get very misleading error
483              message, i.e. `AGGREGATE x=SUM(y t).' will get this
484              error message when a proper message would be more
485              like `unknown variable t'. */
486           if (n_src != n_dest)
487             {
488               msg (SE, _("Number of source variables (%d) does not match "
489                          "number of target variables (%d)."),
490                    n_src, n_dest);
491               goto error;
492             }
493
494           if ((func_index == PIN || func_index == POUT
495               || func_index == FIN || func_index == FOUT) 
496               && ((src[0]->type == NUMERIC && arg[0].f > arg[1].f)
497                   || (src[0]->type == ALPHA
498                       && str_compare_rpad (arg[0].c, arg[1].c) > 0)))
499             {
500               union value t = arg[0];
501               arg[0] = arg[1];
502               arg[1] = t;
503                   
504               msg (SW, _("The value arguments passed to the %s function "
505                          "are out-of-order.  They will be treated as if "
506                          "they had been specified in the correct order."),
507                    function->name);
508             }
509         }
510         
511       /* Finally add these to the linked list of aggregation
512          variables. */
513       for (i = 0; i < n_dest; i++)
514         {
515           struct agr_var *v = xmalloc (sizeof *v);
516
517           /* Add variable to chain. */
518           if (agr->agr_vars != NULL)
519             tail->next = v;
520           else
521             agr->agr_vars = v;
522           tail = v;
523           tail->next = NULL;
524           v->moments = NULL;
525           
526           /* Create the target variable in the aggregate
527              dictionary. */
528           {
529             struct variable *destvar;
530             
531             v->function = func_index;
532
533             if (src)
534               {
535                 v->src = src[i];
536                 
537                 if (src[i]->type == ALPHA)
538                   {
539                     v->function |= FSTRING;
540                     v->string = xmalloc (src[i]->width);
541                   }
542
543                 if (function->alpha_type == ALPHA)
544                   destvar = dict_clone_var (agr->dict, v->src, dest[i]);
545                 else
546                   {
547                     assert (v->src->type == NUMERIC
548                             || function->alpha_type == NUMERIC);
549                     destvar = dict_create_var (agr->dict, dest[i], 0);
550                     if (destvar != NULL) 
551                       {
552                         if ((func_index == N || func_index == NMISS)
553                             && dict_get_weight (default_dict) != NULL)
554                           destvar->print = destvar->write = f8_2; 
555                         else
556                           destvar->print = destvar->write = function->format;
557                       }
558                   }
559               } else {
560                 v->src = NULL;
561                 destvar = dict_create_var (agr->dict, dest[i], 0);
562                 if (func_index == N_NO_VARS
563                     && dict_get_weight (default_dict) != NULL)
564                   destvar->print = destvar->write = f8_2; 
565                 else
566                   destvar->print = destvar->write = function->format;
567               }
568           
569             if (!destvar)
570               {
571                 msg (SE, _("Variable name %s is not unique within the "
572                            "aggregate file dictionary, which contains "
573                            "the aggregate variables and the break "
574                            "variables."),
575                      dest[i]);
576                 goto error;
577               }
578
579             free (dest[i]);
580             destvar->init = 0;
581             if (dest_label[i])
582               {
583                 destvar->label = dest_label[i];
584                 dest_label[i] = NULL;
585               }
586
587             v->dest = destvar;
588           }
589           
590           v->include_missing = include_missing;
591
592           if (v->src != NULL)
593             {
594               int j;
595
596               if (v->src->type == NUMERIC)
597                 for (j = 0; j < function->n_args; j++)
598                   v->arg[j].f = arg[j].f;
599               else
600                 for (j = 0; j < function->n_args; j++)
601                   v->arg[j].c = xstrdup (arg[j].c);
602             }
603         }
604       
605       if (src != NULL && src[0]->type == ALPHA)
606         for (i = 0; i < function->n_args; i++)
607           {
608             free (arg[i].c);
609             arg[i].c = NULL;
610           }
611
612       free (src);
613       free (dest);
614       free (dest_label);
615
616       if (!lex_match ('/'))
617         {
618           if (token == '.')
619             return 1;
620
621           lex_error ("expecting end of command");
622           return 0;
623         }
624       continue;
625       
626     error:
627       for (i = 0; i < n_dest; i++)
628         {
629           free (dest[i]);
630           free (dest_label[i]);
631         }
632       free (dest);
633       free (dest_label);
634       free (arg[0].c);
635       free (arg[1].c);
636       if (src && n_src && src[0]->type == ALPHA)
637         for (i = 0; i < function->n_args; i++)
638           {
639             free (arg[i].c);
640             arg[i].c = NULL;
641           }
642       free (src);
643         
644       return 0;
645     }
646 }
647
648 /* Destroys AGR. */
649 static void
650 agr_destroy (struct agr_proc *agr)
651 {
652   struct agr_var *iter, *next;
653
654   sfm_close_writer (agr->writer);
655   if (agr->sort != NULL)
656     sort_destroy_criteria (agr->sort);
657   free (agr->break_vars);
658   case_destroy (&agr->break_case);
659   for (iter = agr->agr_vars; iter; iter = next)
660     {
661       next = iter->next;
662
663       if (iter->function & FSTRING)
664         {
665           int n_args;
666           int i;
667
668           n_args = agr_func_tab[iter->function & FUNC].n_args;
669           for (i = 0; i < n_args; i++)
670             free (iter->arg[i].c);
671           free (iter->string);
672         }
673       else if (iter->function == SD)
674         moments1_destroy (iter->moments);
675       free (iter);
676     }
677   if (agr->dict != NULL)
678     dict_destroy (agr->dict);
679
680   case_destroy (&agr->agr_case);
681 }
682 \f
683 /* Execution. */
684
685 static void accumulate_aggregate_info (struct agr_proc *,
686                                        const struct ccase *);
687 static void dump_aggregate_info (struct agr_proc *, struct ccase *);
688
689 /* Processes a single case INPUT for aggregation.  If output is
690    warranted, writes it to OUTPUT and returns nonzero.
691    Otherwise, returns zero and OUTPUT is unmodified. */
692 static int
693 aggregate_single_case (struct agr_proc *agr,
694                        const struct ccase *input, struct ccase *output)
695 {
696   bool finished_group = false;
697   
698   if (agr->case_cnt++ == 0)
699     initialize_aggregate_info (agr, input);
700   else if (case_compare (&agr->break_case, input,
701                          agr->break_vars, agr->break_var_cnt))
702     {
703       dump_aggregate_info (agr, output);
704       finished_group = true;
705
706       initialize_aggregate_info (agr, input);
707     }
708
709   accumulate_aggregate_info (agr, input);
710   return finished_group;
711 }
712
713 /* Accumulates aggregation data from the case INPUT. */
714 static void 
715 accumulate_aggregate_info (struct agr_proc *agr,
716                            const struct ccase *input)
717 {
718   struct agr_var *iter;
719   double weight;
720   int bad_warn = 1;
721
722   weight = dict_get_case_weight (default_dict, input, &bad_warn);
723
724   for (iter = agr->agr_vars; iter; iter = iter->next)
725     if (iter->src)
726       {
727         const union value *v = case_data (input, iter->src->fv);
728
729         if ((!iter->include_missing && is_missing (v, iter->src))
730             || (iter->include_missing && iter->src->type == NUMERIC
731                 && v->f == SYSMIS))
732           {
733             switch (iter->function)
734               {
735               case NMISS:
736               case NMISS | FSTRING:
737                 iter->dbl[0] += weight;
738                 break;
739               case NUMISS:
740               case NUMISS | FSTRING:
741                 iter->int1++;
742                 break;
743               }
744             iter->missing = 1;
745             continue;
746           }
747         
748         /* This is horrible.  There are too many possibilities. */
749         switch (iter->function)
750           {
751           case SUM:
752             iter->dbl[0] += v->f * weight;
753             iter->int1 = 1;
754             break;
755           case MEAN:
756             iter->dbl[0] += v->f * weight;
757             iter->dbl[1] += weight;
758             break;
759           case SD:
760             moments1_add (iter->moments, v->f, weight);
761             break;
762           case MAX:
763             iter->dbl[0] = max (iter->dbl[0], v->f);
764             iter->int1 = 1;
765             break;
766           case MAX | FSTRING:
767             if (memcmp (iter->string, v->s, iter->src->width) < 0)
768               memcpy (iter->string, v->s, iter->src->width);
769             iter->int1 = 1;
770             break;
771           case MIN:
772             iter->dbl[0] = min (iter->dbl[0], v->f);
773             iter->int1 = 1;
774             break;
775           case MIN | FSTRING:
776             if (memcmp (iter->string, v->s, iter->src->width) > 0)
777               memcpy (iter->string, v->s, iter->src->width);
778             iter->int1 = 1;
779             break;
780           case FGT:
781           case PGT:
782             if (v->f > iter->arg[0].f)
783               iter->dbl[0] += weight;
784             iter->dbl[1] += weight;
785             break;
786           case FGT | FSTRING:
787           case PGT | FSTRING:
788             if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
789               iter->dbl[0] += weight;
790             iter->dbl[1] += weight;
791             break;
792           case FLT:
793           case PLT:
794             if (v->f < iter->arg[0].f)
795               iter->dbl[0] += weight;
796             iter->dbl[1] += weight;
797             break;
798           case FLT | FSTRING:
799           case PLT | FSTRING:
800             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
801               iter->dbl[0] += weight;
802             iter->dbl[1] += weight;
803             break;
804           case FIN:
805           case PIN:
806             if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
807               iter->dbl[0] += weight;
808             iter->dbl[1] += weight;
809             break;
810           case FIN | FSTRING:
811           case PIN | FSTRING:
812             if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
813                 && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
814               iter->dbl[0] += weight;
815             iter->dbl[1] += weight;
816             break;
817           case FOUT:
818           case POUT:
819             if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
820               iter->dbl[0] += weight;
821             iter->dbl[1] += weight;
822             break;
823           case FOUT | FSTRING:
824           case POUT | FSTRING:
825             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
826                 || memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
827               iter->dbl[0] += weight;
828             iter->dbl[1] += weight;
829             break;
830           case N:
831           case N | FSTRING:
832             iter->dbl[0] += weight;
833             break;
834           case NU:
835           case NU | FSTRING:
836             iter->int1++;
837             break;
838           case FIRST:
839             if (iter->int1 == 0)
840               {
841                 iter->dbl[0] = v->f;
842                 iter->int1 = 1;
843               }
844             break;
845           case FIRST | FSTRING:
846             if (iter->int1 == 0)
847               {
848                 memcpy (iter->string, v->s, iter->src->width);
849                 iter->int1 = 1;
850               }
851             break;
852           case LAST:
853             iter->dbl[0] = v->f;
854             iter->int1 = 1;
855             break;
856           case LAST | FSTRING:
857             memcpy (iter->string, v->s, iter->src->width);
858             iter->int1 = 1;
859             break;
860           case NMISS:
861           case NMISS | FSTRING:
862           case NUMISS:
863           case NUMISS | FSTRING:
864             /* Our value is not missing or it would have been
865                caught earlier.  Nothing to do. */
866             break;
867           default:
868             assert (0);
869           }
870     } else {
871       switch (iter->function)
872         {
873         case N_NO_VARS:
874           iter->dbl[0] += weight;
875           break;
876         case NU_NO_VARS:
877           iter->int1++;
878           break;
879         default:
880           assert (0);
881         }
882     }
883 }
884
885 /* We've come to a record that differs from the previous in one or
886    more of the break variables.  Make an output record from the
887    accumulated statistics in the OUTPUT case. */
888 static void 
889 dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
890 {
891   {
892     int value_idx = 0;
893     int i;
894
895     for (i = 0; i < agr->break_var_cnt; i++) 
896       {
897         struct variable *v = agr->break_vars[i];
898         memcpy (case_data_rw (output, value_idx),
899                 case_data (&agr->break_case, v->fv),
900                 sizeof (union value) * v->nv);
901         value_idx += v->nv; 
902       }
903   }
904   
905   {
906     struct agr_var *i;
907   
908     for (i = agr->agr_vars; i; i = i->next)
909       {
910         union value *v = case_data_rw (output, i->dest->fv);
911
912         if (agr->missing == COLUMNWISE && i->missing != 0
913             && (i->function & FUNC) != N && (i->function & FUNC) != NU
914             && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
915           {
916             if (i->dest->type == ALPHA)
917               memset (v->s, ' ', i->dest->width);
918             else
919               v->f = SYSMIS;
920             continue;
921           }
922         
923         switch (i->function)
924           {
925           case SUM:
926             v->f = i->int1 ? i->dbl[0] : SYSMIS;
927             break;
928           case MEAN:
929             v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
930             break;
931           case SD:
932             {
933               double variance;
934
935               /* FIXME: we should use two passes. */
936               moments1_calculate (i->moments, NULL, NULL, &variance,
937                                  NULL, NULL);
938               if (variance != SYSMIS)
939                 v->f = sqrt (variance);
940               else
941                 v->f = SYSMIS; 
942             }
943             break;
944           case MAX:
945           case MIN:
946             v->f = i->int1 ? i->dbl[0] : SYSMIS;
947             break;
948           case MAX | FSTRING:
949           case MIN | FSTRING:
950             if (i->int1)
951               memcpy (v->s, i->string, i->dest->width);
952             else
953               memset (v->s, ' ', i->dest->width);
954             break;
955           case FGT:
956           case FGT | FSTRING:
957           case FLT:
958           case FLT | FSTRING:
959           case FIN:
960           case FIN | FSTRING:
961           case FOUT:
962           case FOUT | FSTRING:
963             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
964             break;
965           case PGT:
966           case PGT | FSTRING:
967           case PLT:
968           case PLT | FSTRING:
969           case PIN:
970           case PIN | FSTRING:
971           case POUT:
972           case POUT | FSTRING:
973             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
974             break;
975           case N:
976           case N | FSTRING:
977             v->f = i->dbl[0];
978             break;
979           case NU:
980           case NU | FSTRING:
981             v->f = i->int1;
982             break;
983           case FIRST:
984           case LAST:
985             v->f = i->int1 ? i->dbl[0] : SYSMIS;
986             break;
987           case FIRST | FSTRING:
988           case LAST | FSTRING:
989             if (i->int1)
990               memcpy (v->s, i->string, i->dest->width);
991             else
992               memset (v->s, ' ', i->dest->width);
993             break;
994           case N_NO_VARS:
995             v->f = i->dbl[0];
996             break;
997           case NU_NO_VARS:
998             v->f = i->int1;
999             break;
1000           case NMISS:
1001           case NMISS | FSTRING:
1002             v->f = i->dbl[0];
1003             break;
1004           case NUMISS:
1005           case NUMISS | FSTRING:
1006             v->f = i->int1;
1007             break;
1008           default:
1009             assert (0);
1010           }
1011       }
1012   }
1013 }
1014
1015 /* Resets the state for all the aggregate functions. */
1016 static void
1017 initialize_aggregate_info (struct agr_proc *agr, const struct ccase *input)
1018 {
1019   struct agr_var *iter;
1020
1021   case_destroy (&agr->break_case);
1022   case_clone (&agr->break_case, input);
1023
1024   for (iter = agr->agr_vars; iter; iter = iter->next)
1025     {
1026       iter->missing = 0;
1027       iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1028       iter->int1 = iter->int2 = 0;
1029       switch (iter->function)
1030         {
1031         case MIN:
1032           iter->dbl[0] = DBL_MAX;
1033           break;
1034         case MIN | FSTRING:
1035           memset (iter->string, 255, iter->src->width);
1036           break;
1037         case MAX:
1038           iter->dbl[0] = -DBL_MAX;
1039           break;
1040         case MAX | FSTRING:
1041           memset (iter->string, 0, iter->src->width);
1042           break;
1043         case SD:
1044           if (iter->moments == NULL)
1045             iter->moments = moments1_create (MOMENT_VARIANCE);
1046           else
1047             moments1_clear (iter->moments);
1048           break;
1049         default:
1050           break;
1051         }
1052     }
1053 }
1054 \f
1055 /* Aggregate each case as it comes through.  Cases which aren't needed
1056    are dropped. */
1057 static int
1058 agr_to_active_file (struct ccase *c, void *agr_)
1059 {
1060   struct agr_proc *agr = agr_;
1061
1062   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1063     agr->sink->class->write (agr->sink, &agr->agr_case);
1064
1065   return 1;
1066 }
1067
1068 /* Aggregate the current case and output it if we passed a
1069    breakpoint. */
1070 static int
1071 presorted_agr_to_sysfile (struct ccase *c, void *agr_) 
1072 {
1073   struct agr_proc *agr = agr_;
1074
1075   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1076     sfm_write_case (agr->writer, &agr->agr_case);
1077
1078   return 1;
1079 }