Implemented long variable names a la spss V12.
[pspp-builds.git] / src / aggregate.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include "error.h"
22 #include <stdlib.h>
23 #include "alloc.h"
24 #include "case.h"
25 #include "casefile.h"
26 #include "command.h"
27 #include "dictionary.h"
28 #include "error.h"
29 #include "file-handle.h"
30 #include "lexer.h"
31 #include "misc.h"
32 #include "moments.h"
33 #include "pool.h"
34 #include "settings.h"
35 #include "sfm-write.h"
36 #include "sort.h"
37 #include "str.h"
38 #include "var.h"
39 #include "vfm.h"
40 #include "vfmP.h"
41
42 /* Specifies how to make an aggregate variable. */
43 struct agr_var
44   {
45     struct agr_var *next;               /* Next in list. */
46
47     /* Collected during parsing. */
48     struct variable *src;       /* Source variable. */
49     struct variable *dest;      /* Target variable. */
50     int function;               /* Function. */
51     int include_missing;        /* 1=Include user-missing values. */
52     union value arg[2];         /* Arguments. */
53
54     /* Accumulated during AGGREGATE execution. */
55     double dbl[3];
56     int int1, int2;
57     char *string;
58     int missing;
59     struct moments1 *moments;
60   };
61
62 /* Aggregation functions. */
63 enum
64   {
65     NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
66     FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
67     N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
68     FUNC = 0x1f, /* Function mask. */
69     FSTRING = 1<<5, /* String function bit. */
70   };
71
72 /* Attributes of an aggregation function. */
73 struct agr_func
74   {
75     const char *name;           /* Aggregation function name. */
76     int n_args;                 /* Number of arguments. */
77     int alpha_type;             /* When given ALPHA arguments, output type. */
78     struct fmt_spec format;     /* Format spec if alpha_type != ALPHA. */
79   };
80
81 /* Attributes of aggregation functions. */
82 static const struct agr_func agr_func_tab[] = 
83   {
84     {"<NONE>",  0, -1,      {0, 0, 0}},
85     {"SUM",     0, -1,      {FMT_F, 8, 2}},
86     {"MEAN",    0, -1,      {FMT_F, 8, 2}},
87     {"SD",      0, -1,      {FMT_F, 8, 2}},
88     {"MAX",     0, ALPHA,   {-1, -1, -1}}, 
89     {"MIN",     0, ALPHA,   {-1, -1, -1}}, 
90     {"PGT",     1, NUMERIC, {FMT_F, 5, 1}},      
91     {"PLT",     1, NUMERIC, {FMT_F, 5, 1}},       
92     {"PIN",     2, NUMERIC, {FMT_F, 5, 1}},       
93     {"POUT",    2, NUMERIC, {FMT_F, 5, 1}},       
94     {"FGT",     1, NUMERIC, {FMT_F, 5, 3}},       
95     {"FLT",     1, NUMERIC, {FMT_F, 5, 3}},       
96     {"FIN",     2, NUMERIC, {FMT_F, 5, 3}},       
97     {"FOUT",    2, NUMERIC, {FMT_F, 5, 3}},       
98     {"N",       0, NUMERIC, {FMT_F, 7, 0}},       
99     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},       
100     {"NMISS",   0, NUMERIC, {FMT_F, 7, 0}},       
101     {"NUMISS",  0, NUMERIC, {FMT_F, 7, 0}},       
102     {"FIRST",   0, ALPHA,   {-1, -1, -1}}, 
103     {"LAST",    0, ALPHA,   {-1, -1, -1}},
104     {NULL,      0, -1,      {-1, -1, -1}},
105     {"N",       0, NUMERIC, {FMT_F, 7, 0}},
106     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},
107   };
108
109 /* Missing value types. */
110 enum missing_treatment
111   {
112     ITEMWISE,           /* Missing values item by item. */
113     COLUMNWISE          /* Missing values column by column. */
114   };
115
116 /* An entire AGGREGATE procedure. */
117 struct agr_proc 
118   {
119     /* We have either an output file or a sink. */
120     struct sfm_writer *writer;          /* Output file, or null if none. */
121     struct case_sink *sink;             /* Sink, or null if none. */
122
123     /* Break variables. */
124     struct sort_criteria *sort;         /* Sort criteria. */
125     struct variable **break_vars;       /* Break variables. */
126     size_t break_var_cnt;               /* Number of break variables. */
127     union value *prev_break;            /* Last values of break variables. */
128
129     enum missing_treatment missing;     /* How to treat missing values. */
130     struct agr_var *agr_vars;           /* First aggregate variable. */
131     struct dictionary *dict;            /* Aggregate dictionary. */
132     int case_cnt;                       /* Counts aggregated cases. */
133     struct ccase agr_case;              /* Aggregate case for output. */
134   };
135
136 static void initialize_aggregate_info (struct agr_proc *);
137
138 /* Prototypes. */
139 static int parse_aggregate_functions (struct agr_proc *);
140 static void agr_destroy (struct agr_proc *);
141 static int aggregate_single_case (struct agr_proc *agr,
142                                   const struct ccase *input,
143                                   struct ccase *output);
144 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
145
146 /* Aggregating to the active file. */
147 static int agr_to_active_file (struct ccase *, void *aux);
148
149 /* Aggregating to a system file. */
150 static int presorted_agr_to_sysfile (struct ccase *, void *aux);
151 \f
152 /* Parsing. */
153
154 /* Parses and executes the AGGREGATE procedure. */
155 int
156 cmd_aggregate (void)
157 {
158   struct agr_proc agr;
159   struct file_handle *out_file = NULL;
160
161   bool copy_documents = false;
162   bool presorted = false;
163   bool saw_direction;
164
165   memset(&agr, 0 , sizeof (agr));
166   agr.missing = ITEMWISE;
167   
168   agr.dict = dict_create ();
169   dict_set_label (agr.dict, dict_get_label (default_dict));
170   dict_set_documents (agr.dict, dict_get_documents (default_dict));
171
172   /* OUTFILE subcommand must be first. */
173   if (!lex_force_match_id ("OUTFILE"))
174     goto error;
175   lex_match ('=');
176   if (!lex_match ('*'))
177     {
178       out_file = fh_parse ();
179       if (out_file == NULL)
180         goto error;
181     }
182   
183   /* Read most of the subcommands. */
184   for (;;)
185     {
186       lex_match ('/');
187       
188       if (lex_match_id ("MISSING"))
189         {
190           lex_match ('=');
191           if (!lex_match_id ("COLUMNWISE"))
192             {
193               lex_error (_("while expecting COLUMNWISE"));
194               goto error;
195             }
196           agr.missing = COLUMNWISE;
197         }
198       else if (lex_match_id ("DOCUMENT"))
199         copy_documents = true;
200       else if (lex_match_id ("PRESORTED"))
201         presorted = true;
202       else if (lex_match_id ("BREAK"))
203         {
204           int i;
205
206           lex_match ('=');
207           agr.sort = sort_parse_criteria (default_dict,
208                                           &agr.break_vars, &agr.break_var_cnt,
209                                           &saw_direction);
210           if (agr.sort == NULL)
211             goto error;
212           
213           for (i = 0; i < agr.break_var_cnt; i++)
214             {
215               struct variable *v = dict_clone_var (agr.dict, agr.break_vars[i],
216                                                    agr.break_vars[i]->name, 
217                                                    agr.break_vars[i]->longname 
218                                                    );
219               assert (v != NULL);
220             }
221
222           /* BREAK must follow the options. */
223           break;
224         }
225       else
226         {
227           lex_error (_("expecting BREAK"));
228           goto error;
229         }
230     }
231   if (presorted && saw_direction)
232     msg (SW, _("When PRESORTED is specified, specifying sorting directions "
233                "with (A) or (D) has no effect.  Output data will be sorted "
234                "the same way as the input data."));
235       
236   /* Read in the aggregate functions. */
237   lex_match ('/');
238   if (!parse_aggregate_functions (&agr))
239     goto error;
240
241   /* Delete documents. */
242   if (!copy_documents)
243     dict_set_documents (agr.dict, NULL);
244
245   /* Cancel SPLIT FILE. */
246   dict_set_split_vars (agr.dict, NULL, 0);
247   
248   /* Initialize. */
249   agr.case_cnt = 0;
250   case_create (&agr.agr_case, dict_get_next_value_idx (agr.dict));
251   initialize_aggregate_info (&agr);
252
253   /* Output to active file or external file? */
254   if (out_file == NULL) 
255     {
256       /* The active file will be replaced by the aggregated data,
257          so TEMPORARY is moot. */
258       cancel_temporary ();
259
260       if (agr.sort != NULL && !presorted)
261         sort_active_file_in_place (agr.sort);
262
263       agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
264       if (agr.sink->class->open != NULL)
265         agr.sink->class->open (agr.sink);
266       vfm_sink = create_case_sink (&null_sink_class, default_dict, NULL);
267       procedure (agr_to_active_file, &agr);
268       if (agr.case_cnt > 0) 
269         {
270           dump_aggregate_info (&agr, &agr.agr_case);
271           agr.sink->class->write (agr.sink, &agr.agr_case);
272         }
273       dict_destroy (default_dict);
274       default_dict = agr.dict;
275       agr.dict = NULL;
276       vfm_source = agr.sink->class->make_source (agr.sink);
277       free_case_sink (agr.sink);
278     }
279   else
280     {
281       agr.writer = sfm_open_writer (out_file, agr.dict, get_scompression (), 0);
282       if (agr.writer == NULL)
283         goto error;
284       
285       if (agr.sort != NULL && !presorted) 
286         {
287           /* Sorting is needed. */
288           struct casefile *dst;
289           struct casereader *reader;
290           struct ccase c;
291           
292           dst = sort_active_file_to_casefile (agr.sort);
293           if (dst == NULL)
294             goto error;
295           reader = casefile_get_destructive_reader (dst);
296           while (casereader_read_xfer (reader, &c)) 
297             {
298               if (aggregate_single_case (&agr, &c, &agr.agr_case)) 
299                 sfm_write_case (agr.writer, &agr.agr_case);
300               case_destroy (&c);
301             }
302           casereader_destroy (reader);
303           casefile_destroy (dst);
304         }
305       else 
306         {
307           /* Active file is already sorted. */
308           procedure (presorted_agr_to_sysfile, &agr);
309         }
310       
311       if (agr.case_cnt > 0) 
312         {
313           dump_aggregate_info (&agr, &agr.agr_case);
314           sfm_write_case (agr.writer, &agr.agr_case);
315         }
316     }
317   
318   agr_destroy (&agr);
319   return CMD_SUCCESS;
320
321 error:
322   agr_destroy (&agr);
323   return CMD_FAILURE;
324 }
325
326 /* Parse all the aggregate functions. */
327 static int
328 parse_aggregate_functions (struct agr_proc *agr)
329 {
330   struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
331
332   /* Parse everything. */
333   tail = NULL;
334   for (;;)
335     {
336       char **dest;
337       char **dest_label;
338       int n_dest;
339
340       int include_missing;
341       const struct agr_func *function;
342       int func_index;
343
344       union value arg[2];
345
346       struct variable **src;
347       int n_src;
348
349       int i;
350
351       dest = NULL;
352       dest_label = NULL;
353       n_dest = 0;
354       src = NULL;
355       function = NULL;
356       n_src = 0;
357       arg[0].c = NULL;
358       arg[1].c = NULL;
359
360       /* Parse the list of target variables. */
361       while (!lex_match ('='))
362         {
363           int n_dest_prev = n_dest;
364           
365           if (!parse_DATA_LIST_vars (&dest, &n_dest,
366                                      PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
367             goto error;
368
369           /* Assign empty labels. */
370           {
371             int j;
372
373             dest_label = xrealloc (dest_label, sizeof *dest_label * n_dest);
374             for (j = n_dest_prev; j < n_dest; j++)
375               dest_label[j] = NULL;
376           }
377           
378           if (token == T_STRING)
379             {
380               ds_truncate (&tokstr, 255);
381               dest_label[n_dest - 1] = xstrdup (ds_c_str (&tokstr));
382               lex_get ();
383             }
384         }
385
386       /* Get the name of the aggregation function. */
387       if (token != T_ID)
388         {
389           lex_error (_("expecting aggregation function"));
390           goto error;
391         }
392
393       include_missing = 0;
394       if (tokid[strlen (tokid) - 1] == '.')
395         {
396           include_missing = 1;
397           tokid[strlen (tokid) - 1] = 0;
398         }
399       
400       for (function = agr_func_tab; function->name; function++)
401         if (!strcasecmp (function->name, tokid))
402           break;
403       if (NULL == function->name)
404         {
405           msg (SE, _("Unknown aggregation function %s."), tokid);
406           goto error;
407         }
408       func_index = function - agr_func_tab;
409       lex_get ();
410
411       /* Check for leading lparen. */
412       if (!lex_match ('('))
413         {
414           if (func_index == N)
415             func_index = N_NO_VARS;
416           else if (func_index == NU)
417             func_index = NU_NO_VARS;
418           else
419             {
420               lex_error (_("expecting `('"));
421               goto error;
422             }
423         }
424       else
425         {
426           /* Parse list of source variables. */
427           {
428             int pv_opts = PV_NO_SCRATCH;
429
430             if (func_index == SUM || func_index == MEAN || func_index == SD)
431               pv_opts |= PV_NUMERIC;
432             else if (function->n_args)
433               pv_opts |= PV_SAME_TYPE;
434
435             if (!parse_variables (default_dict, &src, &n_src, pv_opts))
436               goto error;
437           }
438
439           /* Parse function arguments, for those functions that
440              require arguments. */
441           if (function->n_args != 0)
442             for (i = 0; i < function->n_args; i++)
443               {
444                 int type;
445             
446                 lex_match (',');
447                 if (token == T_STRING)
448                   {
449                     arg[i].c = xstrdup (ds_c_str (&tokstr));
450                     type = ALPHA;
451                   }
452                 else if (lex_is_number ())
453                   {
454                     arg[i].f = tokval;
455                     type = NUMERIC;
456                   } else {
457                     msg (SE, _("Missing argument %d to %s."), i + 1, function->name);
458                     goto error;
459                   }
460             
461                 lex_get ();
462
463                 if (type != src[0]->type)
464                   {
465                     msg (SE, _("Arguments to %s must be of same type as "
466                                "source variables."),
467                          function->name);
468                     goto error;
469                   }
470               }
471
472           /* Trailing rparen. */
473           if (!lex_match(')'))
474             {
475               lex_error (_("expecting `)'"));
476               goto error;
477             }
478           
479           /* Now check that the number of source variables match
480              the number of target variables.  If we check earlier
481              than this, the user can get very misleading error
482              message, i.e. `AGGREGATE x=SUM(y t).' will get this
483              error message when a proper message would be more
484              like `unknown variable t'. */
485           if (n_src != n_dest)
486             {
487               msg (SE, _("Number of source variables (%d) does not match "
488                          "number of target variables (%d)."),
489                    n_src, n_dest);
490               goto error;
491             }
492
493           if ((func_index == PIN || func_index == POUT
494               || func_index == FIN || func_index == FOUT) 
495               && ((src[0]->type == NUMERIC && arg[0].f > arg[1].f)
496                   || (src[0]->type == ALPHA
497                       && st_compare_pad (arg[0].c, strlen (arg[0].c),
498                                          arg[1].c, strlen (arg[1].c)) > 0)))
499             {
500               union value t = arg[0];
501               arg[0] = arg[1];
502               arg[1] = t;
503                   
504               msg (SW, _("The value arguments passed to the %s function "
505                          "are out-of-order.  They will be treated as if "
506                          "they had been specified in the correct order."),
507                    function->name);
508             }
509         }
510         
511       /* Finally add these to the linked list of aggregation
512          variables. */
513       for (i = 0; i < n_dest; i++)
514         {
515           struct agr_var *v = xmalloc (sizeof *v);
516
517           /* Add variable to chain. */
518           if (agr->agr_vars != NULL)
519             tail->next = v;
520           else
521             agr->agr_vars = v;
522           tail = v;
523           tail->next = NULL;
524           v->moments = NULL;
525           
526           /* Create the target variable in the aggregate
527              dictionary. */
528           {
529             static const struct fmt_spec f8_2 = {FMT_F, 8, 2};
530             struct variable *destvar;
531             
532             v->function = func_index;
533
534             if (src)
535               {
536                 v->src = src[i];
537                 
538                 if (src[i]->type == ALPHA)
539                   {
540                     v->function |= FSTRING;
541                     v->string = xmalloc (src[i]->width);
542                   }
543
544                 if (function->alpha_type == ALPHA)
545                   destvar = dict_clone_var (agr->dict, v->src, 0, dest[i] );
546                 else if (v->src->type == NUMERIC
547                          || function->alpha_type == NUMERIC)
548                   {
549                     destvar = dict_create_var (agr->dict, dest[i], 0);
550                     if (destvar != NULL) 
551                       {
552                         if ((func_index == N || func_index == NMISS)
553                             && dict_get_weight (default_dict) != NULL)
554                           destvar->print = destvar->write = f8_2; 
555                         else
556                           destvar->print = destvar->write = function->format;
557                       }
558                   }
559               } else {
560                 v->src = NULL;
561                 destvar = dict_create_var (agr->dict, dest[i], 0);
562                 if (func_index == N_NO_VARS
563                     && dict_get_weight (default_dict) != NULL)
564                   destvar->print = destvar->write = f8_2; 
565                 else
566                   destvar->print = destvar->write = function->format;
567               }
568           
569             if (!destvar)
570               {
571                 msg (SE, _("Variable name %s is not unique within the "
572                            "aggregate file dictionary, which contains "
573                            "the aggregate variables and the break "
574                            "variables."),
575                      dest[i]);
576                 goto error;
577               }
578
579             free (dest[i]);
580             destvar->init = 0;
581             if (dest_label[i])
582               {
583                 destvar->label = dest_label[i];
584                 dest_label[i] = NULL;
585               }
586
587             v->dest = destvar;
588           }
589           
590           v->include_missing = include_missing;
591
592           if (v->src != NULL)
593             {
594               int j;
595
596               if (v->src->type == NUMERIC)
597                 for (j = 0; j < function->n_args; j++)
598                   v->arg[j].f = arg[j].f;
599               else
600                 for (j = 0; j < function->n_args; j++)
601                   v->arg[j].c = xstrdup (arg[j].c);
602             }
603         }
604       
605       if (src != NULL && src[0]->type == ALPHA)
606         for (i = 0; i < function->n_args; i++)
607           {
608             free (arg[i].c);
609             arg[i].c = NULL;
610           }
611
612       free (src);
613       free (dest);
614       free (dest_label);
615
616       if (!lex_match ('/'))
617         {
618           if (token == '.')
619             return 1;
620
621           lex_error ("expecting end of command");
622           return 0;
623         }
624       continue;
625       
626     error:
627       for (i = 0; i < n_dest; i++)
628         {
629           free (dest[i]);
630           free (dest_label[i]);
631         }
632       free (dest);
633       free (dest_label);
634       free (arg[0].c);
635       free (arg[1].c);
636       if (src && n_src && src[0]->type == ALPHA)
637         for (i = 0; i < function->n_args; i++)
638           {
639             free (arg[i].c);
640             arg[i].c = NULL;
641           }
642       free (src);
643         
644       return 0;
645     }
646 }
647
648 /* Destroys AGR. */
649 static void
650 agr_destroy (struct agr_proc *agr)
651 {
652   struct agr_var *iter, *next;
653
654   sfm_close_writer (agr->writer);
655   if (agr->sort != NULL)
656     sort_destroy_criteria (agr->sort);
657   free (agr->break_vars);
658   free (agr->prev_break);
659   for (iter = agr->agr_vars; iter; iter = next)
660     {
661       next = iter->next;
662
663       if (iter->function & FSTRING)
664         {
665           int n_args;
666           int i;
667
668           n_args = agr_func_tab[iter->function & FUNC].n_args;
669           for (i = 0; i < n_args; i++)
670             free (iter->arg[i].c);
671           free (iter->string);
672         }
673       else if (iter->function == SD)
674         moments1_destroy (iter->moments);
675       free (iter);
676     }
677   if (agr->dict != NULL)
678     dict_destroy (agr->dict);
679
680   case_destroy (&agr->agr_case);
681 }
682 \f
683 /* Execution. */
684
685 static void accumulate_aggregate_info (struct agr_proc *,
686                                        const struct ccase *);
687 static void dump_aggregate_info (struct agr_proc *, struct ccase *);
688
689 /* Processes a single case INPUT for aggregation.  If output is
690    warranted, writes it to OUTPUT and returns nonzero.
691    Otherwise, returns zero and OUTPUT is unmodified. */
692 static int
693 aggregate_single_case (struct agr_proc *agr,
694                        const struct ccase *input, struct ccase *output)
695 {
696   /* The first case always begins a new break group.  We also need to
697      preserve the values of the case for later comparison. */
698   if (agr->case_cnt++ == 0)
699     {
700       int n_elem = 0;
701       
702       {
703         int i;
704
705         for (i = 0; i < agr->break_var_cnt; i++)
706           n_elem += agr->break_vars[i]->nv;
707       }
708       
709       agr->prev_break = xmalloc (sizeof *agr->prev_break * n_elem);
710
711       /* Copy INPUT into prev_break. */
712       {
713         union value *iter = agr->prev_break;
714         int i;
715
716         for (i = 0; i < agr->break_var_cnt; i++)
717           {
718             struct variable *v = agr->break_vars[i];
719             
720             if (v->type == NUMERIC)
721               (iter++)->f = case_num (input, v->fv);
722             else
723               {
724                 memcpy (iter->s, case_str (input, v->fv), v->width);
725                 iter += v->nv;
726               }
727           }
728       }
729             
730       accumulate_aggregate_info (agr, input);
731         
732       return 0;
733     }
734       
735   /* Compare the value of each break variable to the values on the
736      previous case. */
737   {
738     union value *iter = agr->prev_break;
739     int i;
740     
741     for (i = 0; i < agr->break_var_cnt; i++)
742       {
743         struct variable *v = agr->break_vars[i];
744       
745         switch (v->type)
746           {
747           case NUMERIC:
748             if (case_num (input, v->fv) != iter->f)
749               goto not_equal;
750             iter++;
751             break;
752           case ALPHA:
753             if (memcmp (case_str (input, v->fv), iter->s, v->width))
754               goto not_equal;
755             iter += v->nv;
756             break;
757           default:
758             assert (0);
759           }
760       }
761   }
762
763   accumulate_aggregate_info (agr, input);
764
765   return 0;
766   
767 not_equal:
768   /* The values of the break variable are different from the values on
769      the previous case.  That means that it's time to dump aggregate
770      info. */
771   dump_aggregate_info (agr, output);
772   initialize_aggregate_info (agr);
773   accumulate_aggregate_info (agr, input);
774
775   /* Copy INPUT into prev_break. */
776   {
777     union value *iter = agr->prev_break;
778     int i;
779
780     for (i = 0; i < agr->break_var_cnt; i++)
781       {
782         struct variable *v = agr->break_vars[i];
783             
784         if (v->type == NUMERIC)
785           (iter++)->f = case_num (input, v->fv);
786         else
787           {
788             memcpy (iter->s, case_str (input, v->fv), v->width);
789             iter += v->nv;
790           }
791       }
792   }
793   
794   return 1;
795 }
796
797 /* Accumulates aggregation data from the case INPUT. */
798 static void 
799 accumulate_aggregate_info (struct agr_proc *agr,
800                            const struct ccase *input)
801 {
802   struct agr_var *iter;
803   double weight;
804   int bad_warn = 1;
805
806   weight = dict_get_case_weight (default_dict, input, &bad_warn);
807
808   for (iter = agr->agr_vars; iter; iter = iter->next)
809     if (iter->src)
810       {
811         const union value *v = case_data (input, iter->src->fv);
812
813         if ((!iter->include_missing && is_missing (v, iter->src))
814             || (iter->include_missing && iter->src->type == NUMERIC
815                 && v->f == SYSMIS))
816           {
817             switch (iter->function)
818               {
819               case NMISS:
820               case NMISS | FSTRING:
821                 iter->dbl[0] += weight;
822                 break;
823               case NUMISS:
824               case NUMISS | FSTRING:
825                 iter->int1++;
826                 break;
827               }
828             iter->missing = 1;
829             continue;
830           }
831         
832         /* This is horrible.  There are too many possibilities. */
833         switch (iter->function)
834           {
835           case SUM:
836             iter->dbl[0] += v->f * weight;
837             iter->int1 = 1;
838             break;
839           case MEAN:
840             iter->dbl[0] += v->f * weight;
841             iter->dbl[1] += weight;
842             break;
843           case SD:
844             moments1_add (iter->moments, v->f, weight);
845             break;
846           case MAX:
847             iter->dbl[0] = max (iter->dbl[0], v->f);
848             iter->int1 = 1;
849             break;
850           case MAX | FSTRING:
851             if (memcmp (iter->string, v->s, iter->src->width) < 0)
852               memcpy (iter->string, v->s, iter->src->width);
853             iter->int1 = 1;
854             break;
855           case MIN:
856             iter->dbl[0] = min (iter->dbl[0], v->f);
857             iter->int1 = 1;
858             break;
859           case MIN | FSTRING:
860             if (memcmp (iter->string, v->s, iter->src->width) > 0)
861               memcpy (iter->string, v->s, iter->src->width);
862             iter->int1 = 1;
863             break;
864           case FGT:
865           case PGT:
866             if (v->f > iter->arg[0].f)
867               iter->dbl[0] += weight;
868             iter->dbl[1] += weight;
869             break;
870           case FGT | FSTRING:
871           case PGT | FSTRING:
872             if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
873               iter->dbl[0] += weight;
874             iter->dbl[1] += weight;
875             break;
876           case FLT:
877           case PLT:
878             if (v->f < iter->arg[0].f)
879               iter->dbl[0] += weight;
880             iter->dbl[1] += weight;
881             break;
882           case FLT | FSTRING:
883           case PLT | FSTRING:
884             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
885               iter->dbl[0] += weight;
886             iter->dbl[1] += weight;
887             break;
888           case FIN:
889           case PIN:
890             if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
891               iter->dbl[0] += weight;
892             iter->dbl[1] += weight;
893             break;
894           case FIN | FSTRING:
895           case PIN | FSTRING:
896             if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
897                 && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
898               iter->dbl[0] += weight;
899             iter->dbl[1] += weight;
900             break;
901           case FOUT:
902           case POUT:
903             if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
904               iter->dbl[0] += weight;
905             iter->dbl[1] += weight;
906             break;
907           case FOUT | FSTRING:
908           case POUT | FSTRING:
909             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
910                 || memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
911               iter->dbl[0] += weight;
912             iter->dbl[1] += weight;
913             break;
914           case N:
915           case N | FSTRING:
916             iter->dbl[0] += weight;
917             break;
918           case NU:
919           case NU | FSTRING:
920             iter->int1++;
921             break;
922           case FIRST:
923             if (iter->int1 == 0)
924               {
925                 iter->dbl[0] = v->f;
926                 iter->int1 = 1;
927               }
928             break;
929           case FIRST | FSTRING:
930             if (iter->int1 == 0)
931               {
932                 memcpy (iter->string, v->s, iter->src->width);
933                 iter->int1 = 1;
934               }
935             break;
936           case LAST:
937             iter->dbl[0] = v->f;
938             iter->int1 = 1;
939             break;
940           case LAST | FSTRING:
941             memcpy (iter->string, v->s, iter->src->width);
942             iter->int1 = 1;
943             break;
944           case NMISS:
945           case NMISS | FSTRING:
946           case NUMISS:
947           case NUMISS | FSTRING:
948             /* Our value is not missing or it would have been
949                caught earlier.  Nothing to do. */
950             break;
951           default:
952             assert (0);
953           }
954     } else {
955       switch (iter->function)
956         {
957         case N_NO_VARS:
958           iter->dbl[0] += weight;
959           break;
960         case NU_NO_VARS:
961           iter->int1++;
962           break;
963         default:
964           assert (0);
965         }
966     }
967 }
968
969 /* We've come to a record that differs from the previous in one or
970    more of the break variables.  Make an output record from the
971    accumulated statistics in the OUTPUT case. */
972 static void 
973 dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
974 {
975   {
976     int value_idx = 0;
977     int i;
978
979     for (i = 0; i < agr->break_var_cnt; i++) 
980       {
981         int nv = agr->break_vars[i]->nv;
982         memcpy (case_data_rw (output, value_idx),
983                 &agr->prev_break[value_idx],
984                 sizeof (union value) * nv);
985         value_idx += nv; 
986       }
987   }
988   
989   {
990     struct agr_var *i;
991   
992     for (i = agr->agr_vars; i; i = i->next)
993       {
994         union value *v = case_data_rw (output, i->dest->fv);
995
996         if (agr->missing == COLUMNWISE && i->missing != 0
997             && (i->function & FUNC) != N && (i->function & FUNC) != NU
998             && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
999           {
1000             if (i->dest->type == ALPHA)
1001               memset (v->s, ' ', i->dest->width);
1002             else
1003               v->f = SYSMIS;
1004             continue;
1005           }
1006         
1007         switch (i->function)
1008           {
1009           case SUM:
1010             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1011             break;
1012           case MEAN:
1013             v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
1014             break;
1015           case SD:
1016             {
1017               double variance;
1018
1019               /* FIXME: we should use two passes. */
1020               moments1_calculate (i->moments, NULL, NULL, &variance,
1021                                  NULL, NULL);
1022               if (variance != SYSMIS)
1023                 v->f = sqrt (variance);
1024               else
1025                 v->f = SYSMIS; 
1026             }
1027             break;
1028           case MAX:
1029           case MIN:
1030             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1031             break;
1032           case MAX | FSTRING:
1033           case MIN | FSTRING:
1034             if (i->int1)
1035               memcpy (v->s, i->string, i->dest->width);
1036             else
1037               memset (v->s, ' ', i->dest->width);
1038             break;
1039           case FGT:
1040           case FGT | FSTRING:
1041           case FLT:
1042           case FLT | FSTRING:
1043           case FIN:
1044           case FIN | FSTRING:
1045           case FOUT:
1046           case FOUT | FSTRING:
1047             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
1048             break;
1049           case PGT:
1050           case PGT | FSTRING:
1051           case PLT:
1052           case PLT | FSTRING:
1053           case PIN:
1054           case PIN | FSTRING:
1055           case POUT:
1056           case POUT | FSTRING:
1057             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
1058             break;
1059           case N:
1060           case N | FSTRING:
1061             v->f = i->dbl[0];
1062             break;
1063           case NU:
1064           case NU | FSTRING:
1065             v->f = i->int1;
1066             break;
1067           case FIRST:
1068           case LAST:
1069             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1070             break;
1071           case FIRST | FSTRING:
1072           case LAST | FSTRING:
1073             if (i->int1)
1074               memcpy (v->s, i->string, i->dest->width);
1075             else
1076               memset (v->s, ' ', i->dest->width);
1077             break;
1078           case N_NO_VARS:
1079             v->f = i->dbl[0];
1080             break;
1081           case NU_NO_VARS:
1082             v->f = i->int1;
1083             break;
1084           case NMISS:
1085           case NMISS | FSTRING:
1086             v->f = i->dbl[0];
1087             break;
1088           case NUMISS:
1089           case NUMISS | FSTRING:
1090             v->f = i->int1;
1091             break;
1092           default:
1093             assert (0);
1094           }
1095       }
1096   }
1097 }
1098
1099 /* Resets the state for all the aggregate functions. */
1100 static void
1101 initialize_aggregate_info (struct agr_proc *agr)
1102 {
1103   struct agr_var *iter;
1104
1105   for (iter = agr->agr_vars; iter; iter = iter->next)
1106     {
1107       iter->missing = 0;
1108       iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1109       iter->int1 = iter->int2 = 0;
1110       switch (iter->function)
1111         {
1112         case MIN:
1113           iter->dbl[0] = DBL_MAX;
1114           break;
1115         case MIN | FSTRING:
1116           memset (iter->string, 255, iter->src->width);
1117           break;
1118         case MAX:
1119           iter->dbl[0] = -DBL_MAX;
1120           break;
1121         case MAX | FSTRING:
1122           memset (iter->string, 0, iter->src->width);
1123           break;
1124         case SD:
1125           if (iter->moments == NULL)
1126             iter->moments = moments1_create (MOMENT_VARIANCE);
1127           else
1128             moments1_clear (iter->moments);
1129           break;
1130         default:
1131           break;
1132         }
1133     }
1134 }
1135 \f
1136 /* Aggregate each case as it comes through.  Cases which aren't needed
1137    are dropped. */
1138 static int
1139 agr_to_active_file (struct ccase *c, void *agr_)
1140 {
1141   struct agr_proc *agr = agr_;
1142
1143   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1144     agr->sink->class->write (agr->sink, &agr->agr_case);
1145
1146   return 1;
1147 }
1148
1149 /* Aggregate the current case and output it if we passed a
1150    breakpoint. */
1151 static int
1152 presorted_agr_to_sysfile (struct ccase *c, void *agr_) 
1153 {
1154   struct agr_proc *agr = agr_;
1155
1156   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1157     sfm_write_case (agr->writer, &agr->agr_case);
1158
1159   return 1;
1160 }