Fix memory leaks.
[pspp-builds.git] / src / aggregate.c
1 /* PSPP - computes sample statistics.
2    Copyright (C) 1997-9, 2000 Free Software Foundation, Inc.
3    Written by Ben Pfaff <blp@gnu.org>.
4
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License as
7    published by the Free Software Foundation; either version 2 of the
8    License, or (at your option) any later version.
9
10    This program is distributed in the hope that it will be useful, but
11    WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13    General Public License for more details.
14
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
18    02111-1307, USA. */
19
20 #include <config.h>
21 #include "error.h"
22 #include <stdlib.h>
23 #include "alloc.h"
24 #include "case.h"
25 #include "casefile.h"
26 #include "command.h"
27 #include "error.h"
28 #include "file-handle.h"
29 #include "lexer.h"
30 #include "misc.h"
31 #include "moments.h"
32 #include "pool.h"
33 #include "settings.h"
34 #include "sfm.h"
35 #include "sort.h"
36 #include "str.h"
37 #include "var.h"
38 #include "vfm.h"
39 #include "vfmP.h"
40
41 /* Specifies how to make an aggregate variable. */
42 struct agr_var
43   {
44     struct agr_var *next;               /* Next in list. */
45
46     /* Collected during parsing. */
47     struct variable *src;       /* Source variable. */
48     struct variable *dest;      /* Target variable. */
49     int function;               /* Function. */
50     int include_missing;        /* 1=Include user-missing values. */
51     union value arg[2];         /* Arguments. */
52
53     /* Accumulated during AGGREGATE execution. */
54     double dbl[3];
55     int int1, int2;
56     char *string;
57     int missing;
58     struct moments1 *moments;
59   };
60
61 /* Aggregation functions. */
62 enum
63   {
64     NONE, SUM, MEAN, SD, MAX, MIN, PGT, PLT, PIN, POUT, FGT, FLT, FIN,
65     FOUT, N, NU, NMISS, NUMISS, FIRST, LAST,
66     N_AGR_FUNCS, N_NO_VARS, NU_NO_VARS,
67     FUNC = 0x1f, /* Function mask. */
68     FSTRING = 1<<5, /* String function bit. */
69   };
70
71 /* Attributes of an aggregation function. */
72 struct agr_func
73   {
74     const char *name;           /* Aggregation function name. */
75     int n_args;                 /* Number of arguments. */
76     int alpha_type;             /* When given ALPHA arguments, output type. */
77     struct fmt_spec format;     /* Format spec if alpha_type != ALPHA. */
78   };
79
80 /* Attributes of aggregation functions. */
81 static const struct agr_func agr_func_tab[] = 
82   {
83     {"<NONE>",  0, -1,      {0, 0, 0}},
84     {"SUM",     0, -1,      {FMT_F, 8, 2}},
85     {"MEAN",    0, -1,      {FMT_F, 8, 2}},
86     {"SD",      0, -1,      {FMT_F, 8, 2}},
87     {"MAX",     0, ALPHA,   {-1, -1, -1}}, 
88     {"MIN",     0, ALPHA,   {-1, -1, -1}}, 
89     {"PGT",     1, NUMERIC, {FMT_F, 5, 1}},      
90     {"PLT",     1, NUMERIC, {FMT_F, 5, 1}},       
91     {"PIN",     2, NUMERIC, {FMT_F, 5, 1}},       
92     {"POUT",    2, NUMERIC, {FMT_F, 5, 1}},       
93     {"FGT",     1, NUMERIC, {FMT_F, 5, 3}},       
94     {"FLT",     1, NUMERIC, {FMT_F, 5, 3}},       
95     {"FIN",     2, NUMERIC, {FMT_F, 5, 3}},       
96     {"FOUT",    2, NUMERIC, {FMT_F, 5, 3}},       
97     {"N",       0, NUMERIC, {FMT_F, 7, 0}},       
98     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},       
99     {"NMISS",   0, NUMERIC, {FMT_F, 7, 0}},       
100     {"NUMISS",  0, NUMERIC, {FMT_F, 7, 0}},       
101     {"FIRST",   0, ALPHA,   {-1, -1, -1}}, 
102     {"LAST",    0, ALPHA,   {-1, -1, -1}},
103     {NULL,      0, -1,      {-1, -1, -1}},
104     {"N",       0, NUMERIC, {FMT_F, 7, 0}},
105     {"NU",      0, NUMERIC, {FMT_F, 7, 0}},
106   };
107
108 /* Missing value types. */
109 enum missing_treatment
110   {
111     ITEMWISE,           /* Missing values item by item. */
112     COLUMNWISE          /* Missing values column by column. */
113   };
114
115 /* An entire AGGREGATE procedure. */
116 struct agr_proc 
117   {
118     /* We have either an output file or a sink. */
119     struct file_handle *out_file;       /* Output file, or null if none. */
120     struct case_sink *sink;             /* Sink, or null if none. */
121
122     /* Break variables. */
123     struct sort_criteria *sort;         /* Sort criteria. */
124     struct variable **break_vars;       /* Break variables. */
125     size_t break_var_cnt;               /* Number of break variables. */
126     union value *prev_break;            /* Last values of break variables. */
127
128     enum missing_treatment missing;     /* How to treat missing values. */
129     struct agr_var *agr_vars;           /* First aggregate variable. */
130     struct dictionary *dict;            /* Aggregate dictionary. */
131     int case_cnt;                       /* Counts aggregated cases. */
132     struct ccase agr_case;              /* Aggregate case for output. */
133     flt64 *sfm_agr_case;                /* Aggregate case in SFM format. */
134   };
135
136 static void initialize_aggregate_info (struct agr_proc *);
137
138 /* Prototypes. */
139 static int parse_aggregate_functions (struct agr_proc *);
140 static void agr_destroy (struct agr_proc *);
141 static int aggregate_single_case (struct agr_proc *agr,
142                                   const struct ccase *input,
143                                   struct ccase *output);
144 static void dump_aggregate_info (struct agr_proc *agr, struct ccase *output);
145 static int create_sysfile (struct agr_proc *);
146
147 /* Aggregating to the active file. */
148 static int agr_to_active_file (struct ccase *, void *aux);
149
150 /* Aggregating to a system file. */
151 static void write_case_to_sfm (struct agr_proc *agr);
152 static int presorted_agr_to_sysfile (struct ccase *, void *aux);
153 \f
154 /* Parsing. */
155
156 /* Parses and executes the AGGREGATE procedure. */
157 int
158 cmd_aggregate (void)
159 {
160   struct agr_proc agr;
161
162   /* Have we seen these subcommands? */
163   unsigned seen = 0;
164
165   agr.out_file = NULL;
166   agr.sink = NULL;
167   agr.missing = ITEMWISE;
168   agr.sort = NULL;
169   agr.break_vars = NULL;
170   agr.agr_vars = NULL;
171   agr.dict = NULL;
172   agr.case_cnt = 0;
173   agr.prev_break = NULL;
174   
175   agr.dict = dict_create ();
176   dict_set_label (agr.dict, dict_get_label (default_dict));
177   dict_set_documents (agr.dict, dict_get_documents (default_dict));
178   
179   /* Read most of the subcommands. */
180   for (;;)
181     {
182       lex_match ('/');
183       
184       if (lex_match_id ("OUTFILE"))
185         {
186           if (seen & 1)
187             {
188               msg (SE, _("%s subcommand given multiple times."),"OUTFILE");
189               goto lossage;
190             }
191           seen |= 1;
192               
193           lex_match ('=');
194           if (lex_match ('*'))
195             agr.out_file = NULL;
196           else 
197             {
198               agr.out_file = fh_parse_file_handle ();
199               if (agr.out_file == NULL)
200                 goto lossage;
201             }
202         }
203       else if (lex_match_id ("MISSING"))
204         {
205           lex_match ('=');
206           if (!lex_match_id ("COLUMNWISE"))
207             {
208               lex_error (_("while expecting COLUMNWISE"));
209               goto lossage;
210             }
211           agr.missing = COLUMNWISE;
212         }
213       else if (lex_match_id ("DOCUMENT"))
214         seen |= 2;
215       else if (lex_match_id ("PRESORTED"))
216         seen |= 4;
217       else if (lex_match_id ("BREAK"))
218         {
219           int i;
220
221           if (seen & 8)
222             {
223               msg (SE, _("%s subcommand given multiple times."),"BREAK");
224               goto lossage;
225             }
226           seen |= 8;
227
228           lex_match ('=');
229           agr.sort = sort_parse_criteria (default_dict,
230                                           &agr.break_vars, &agr.break_var_cnt);
231           if (agr.sort == NULL)
232             goto lossage;
233           
234           for (i = 0; i < agr.break_var_cnt; i++)
235             {
236               struct variable *v = dict_clone_var (agr.dict, agr.break_vars[i],
237                                                    agr.break_vars[i]->name);
238               assert (v != NULL);
239             }
240         }
241       else break;
242     }
243
244   /* Check for proper syntax. */
245   if (!(seen & 8))
246     msg (SW, _("BREAK subcommand not specified."));
247       
248   /* Read in the aggregate functions. */
249   if (!parse_aggregate_functions (&agr))
250     goto lossage;
251
252   /* Delete documents. */
253   if (!(seen & 2))
254     dict_set_documents (agr.dict, NULL);
255
256   /* Cancel SPLIT FILE. */
257   dict_set_split_vars (agr.dict, NULL, 0);
258   
259   /* Initialize. */
260   agr.case_cnt = 0;
261   case_create (&agr.agr_case, dict_get_next_value_idx (agr.dict));
262   initialize_aggregate_info (&agr);
263
264   /* Output to active file or external file? */
265   if (agr.out_file == NULL) 
266     {
267       /* The active file will be replaced by the aggregated data,
268          so TEMPORARY is moot. */
269       cancel_temporary ();
270
271       if (agr.sort != NULL && (seen & 4) == 0)
272         sort_active_file_in_place (agr.sort);
273
274       agr.sink = create_case_sink (&storage_sink_class, agr.dict, NULL);
275       if (agr.sink->class->open != NULL)
276         agr.sink->class->open (agr.sink);
277       vfm_sink = create_case_sink (&null_sink_class, default_dict, NULL);
278       procedure (agr_to_active_file, &agr);
279       if (agr.case_cnt > 0) 
280         {
281           dump_aggregate_info (&agr, &agr.agr_case);
282           agr.sink->class->write (agr.sink, &agr.agr_case);
283         }
284       dict_destroy (default_dict);
285       default_dict = agr.dict;
286       agr.dict = NULL;
287       vfm_source = agr.sink->class->make_source (agr.sink);
288       free_case_sink (agr.sink);
289     }
290   else
291     {
292       if (!create_sysfile (&agr))
293         goto lossage;
294
295       if (agr.sort != NULL && (seen & 4) == 0) 
296         {
297           /* Sorting is needed. */
298           struct casefile *dst;
299           struct casereader *reader;
300           struct ccase c;
301           
302           dst = sort_active_file_to_casefile (agr.sort);
303           if (dst == NULL)
304             goto lossage;
305           reader = casefile_get_destructive_reader (dst);
306           while (casereader_read_xfer (reader, &c)) 
307             {
308               if (aggregate_single_case (&agr, &c, &agr.agr_case)) 
309                 write_case_to_sfm (&agr);
310               case_destroy (&c);
311             }
312           casereader_destroy (reader);
313           casefile_destroy (dst);
314         }
315       else 
316         {
317           /* Active file is already sorted. */
318           procedure (presorted_agr_to_sysfile, &agr);
319         }
320       
321       if (agr.case_cnt > 0) 
322         {
323           dump_aggregate_info (&agr, &agr.agr_case);
324           write_case_to_sfm (&agr);
325         }
326       fh_close_handle (agr.out_file);
327     }
328   
329   agr_destroy (&agr);
330   return CMD_SUCCESS;
331
332 lossage:
333   agr_destroy (&agr);
334   return CMD_FAILURE;
335 }
336
337 /* Create a system file for use in aggregation to an external
338    file. */
339 static int
340 create_sysfile (struct agr_proc *agr)
341 {
342   struct sfm_write_info w;
343   w.h = agr->out_file;
344   w.dict = agr->dict;
345   w.compress = get_scompression();
346   if (!sfm_write_dictionary (&w))
347     return 0;
348
349   agr->sfm_agr_case = xmalloc (sizeof *agr->sfm_agr_case * w.case_size);
350     
351   return 1;
352 }
353
354 /* Parse all the aggregate functions. */
355 static int
356 parse_aggregate_functions (struct agr_proc *agr)
357 {
358   struct agr_var *tail; /* Tail of linked list starting at agr->vars. */
359
360   /* Parse everything. */
361   tail = NULL;
362   for (;;)
363     {
364       char **dest;
365       char **dest_label;
366       int n_dest;
367
368       int include_missing;
369       const struct agr_func *function;
370       int func_index;
371
372       union value arg[2];
373
374       struct variable **src;
375       int n_src;
376
377       int i;
378
379       dest = NULL;
380       dest_label = NULL;
381       n_dest = 0;
382       src = NULL;
383       function = NULL;
384       n_src = 0;
385       arg[0].c = NULL;
386       arg[1].c = NULL;
387
388       /* Parse the list of target variables. */
389       while (!lex_match ('='))
390         {
391           int n_dest_prev = n_dest;
392           
393           if (!parse_DATA_LIST_vars (&dest, &n_dest, PV_APPEND | PV_SINGLE | PV_NO_SCRATCH))
394             goto lossage;
395
396           /* Assign empty labels. */
397           {
398             int j;
399
400             dest_label = xrealloc (dest_label, sizeof *dest_label * n_dest);
401             for (j = n_dest_prev; j < n_dest; j++)
402               dest_label[j] = NULL;
403           }
404           
405           if (token == T_STRING)
406             {
407               ds_truncate (&tokstr, 255);
408               dest_label[n_dest - 1] = xstrdup (ds_c_str (&tokstr));
409               lex_get ();
410             }
411         }
412
413       /* Get the name of the aggregation function. */
414       if (token != T_ID)
415         {
416           lex_error (_("expecting aggregation function"));
417           goto lossage;
418         }
419
420       include_missing = 0;
421       if (tokid[strlen (tokid) - 1] == '.')
422         {
423           include_missing = 1;
424           tokid[strlen (tokid) - 1] = 0;
425         }
426       
427       for (function = agr_func_tab; function->name; function++)
428         if (!strcmp (function->name, tokid))
429           break;
430       if (NULL == function->name)
431         {
432           msg (SE, _("Unknown aggregation function %s."), tokid);
433           goto lossage;
434         }
435       func_index = function - agr_func_tab;
436       lex_get ();
437
438       /* Check for leading lparen. */
439       if (!lex_match ('('))
440         {
441           if (func_index == N)
442             func_index = N_NO_VARS;
443           else if (func_index == NU)
444             func_index = NU_NO_VARS;
445           else
446             {
447               lex_error (_("expecting `('"));
448               goto lossage;
449             }
450         } else {
451           /* Parse list of source variables. */
452           {
453             int pv_opts = PV_NO_SCRATCH;
454
455             if (func_index == SUM || func_index == MEAN || func_index == SD)
456               pv_opts |= PV_NUMERIC;
457             else if (function->n_args)
458               pv_opts |= PV_SAME_TYPE;
459
460             if (!parse_variables (default_dict, &src, &n_src, pv_opts))
461               goto lossage;
462           }
463
464           /* Parse function arguments, for those functions that
465              require arguments. */
466           if (function->n_args != 0)
467             for (i = 0; i < function->n_args; i++)
468               {
469                 int type;
470             
471                 lex_match (',');
472                 if (token == T_STRING)
473                   {
474                     arg[i].c = xstrdup (ds_c_str (&tokstr));
475                     type = ALPHA;
476                   }
477                 else if (token == T_NUM)
478                   {
479                     arg[i].f = tokval;
480                     type = NUMERIC;
481                   } else {
482                     msg (SE, _("Missing argument %d to %s."), i + 1, function->name);
483                     goto lossage;
484                   }
485             
486                 lex_get ();
487
488                 if (type != src[0]->type)
489                   {
490                     msg (SE, _("Arguments to %s must be of same type as "
491                                "source variables."),
492                          function->name);
493                     goto lossage;
494                   }
495               }
496
497           /* Trailing rparen. */
498           if (!lex_match(')'))
499             {
500               lex_error (_("expecting `)'"));
501               goto lossage;
502             }
503           
504           /* Now check that the number of source variables match the
505              number of target variables.  Do this here because if we
506              do it earlier then the user can get very misleading error
507              messages; i.e., `AGGREGATE x=SUM(y t).' will get this
508              error message when a proper message would be more like
509              `unknown variable t'. */
510           if (n_src != n_dest)
511             {
512               msg (SE, _("Number of source variables (%d) does not match "
513                          "number of target variables (%d)."),
514                    n_src, n_dest);
515               goto lossage;
516             }
517         }
518         
519       /* Finally add these to the linked list of aggregation
520          variables. */
521       for (i = 0; i < n_dest; i++)
522         {
523           struct agr_var *v = xmalloc (sizeof *v);
524
525           /* Add variable to chain. */
526           if (agr->agr_vars != NULL)
527             tail->next = v;
528           else
529             agr->agr_vars = v;
530           tail = v;
531           tail->next = NULL;
532           v->moments = NULL;
533           
534           /* Create the target variable in the aggregate
535              dictionary. */
536           {
537             struct variable *destvar;
538             
539             v->function = func_index;
540
541             if (src)
542               {
543                 int output_width;
544
545                 v->src = src[i];
546                 
547                 if (src[i]->type == ALPHA)
548                   {
549                     v->function |= FSTRING;
550                     v->string = xmalloc (src[i]->width);
551                   }
552                 
553                 if (v->src->type == NUMERIC || function->alpha_type == NUMERIC)
554                   output_width = 0;
555                 else
556                   output_width = v->src->width;
557
558                 if (function->alpha_type == ALPHA)
559                   destvar = dict_clone_var (agr->dict, v->src, dest[i]);
560                 else
561                   {
562                     destvar = dict_create_var (agr->dict, dest[i], output_width);
563                     if (output_width == 0)
564                       destvar->print = destvar->write = function->format;
565                     if (output_width == 0 && dict_get_weight (default_dict) != NULL
566                         && (func_index == N || func_index == N_NO_VARS
567                             || func_index == NU || func_index == NU_NO_VARS))
568                       {
569                         struct fmt_spec f = {FMT_F, 8, 2};
570                       
571                         destvar->print = destvar->write = f;
572                       }
573                   }
574               } else {
575                 v->src = NULL;
576                 destvar = dict_create_var (agr->dict, dest[i], 0);
577               }
578           
579             if (!destvar)
580               {
581                 msg (SE, _("Variable name %s is not unique within the "
582                            "aggregate file dictionary, which contains "
583                            "the aggregate variables and the break "
584                            "variables."),
585                      dest[i]);
586                 free (dest[i]);
587                 goto lossage;
588               }
589
590             free (dest[i]);
591             destvar->init = 0;
592             if (dest_label[i])
593               {
594                 destvar->label = dest_label[i];
595                 dest_label[i] = NULL;
596               }
597             else if (function->alpha_type == ALPHA)
598               destvar->print = destvar->write = function->format;
599
600             v->dest = destvar;
601           }
602           
603           v->include_missing = include_missing;
604
605           if (v->src != NULL)
606             {
607               int j;
608
609               if (v->src->type == NUMERIC)
610                 for (j = 0; j < function->n_args; j++)
611                   v->arg[j].f = arg[j].f;
612               else
613                 for (j = 0; j < function->n_args; j++)
614                   v->arg[j].c = xstrdup (arg[j].c);
615             }
616         }
617       
618       if (src != NULL && src[0]->type == ALPHA)
619         for (i = 0; i < function->n_args; i++)
620           {
621             free (arg[i].c);
622             arg[i].c = NULL;
623           }
624
625       free (src);
626       free (dest);
627       free (dest_label);
628
629       if (!lex_match ('/'))
630         {
631           if (token == '.')
632             return 1;
633
634           lex_error ("expecting end of command");
635           return 0;
636         }
637       continue;
638       
639     lossage:
640       for (i = 0; i < n_dest; i++)
641         {
642           free (dest[i]);
643           free (dest_label[i]);
644         }
645       free (dest);
646       free (dest_label);
647       free (arg[0].c);
648       free (arg[1].c);
649       if (src && n_src && src[0]->type == ALPHA)
650         for (i = 0; i < function->n_args; i++)
651           {
652             free (arg[i].c);
653             arg[i].c = NULL;
654           }
655       free (src);
656         
657       return 0;
658     }
659 }
660
661 /* Destroys AGR. */
662 static void
663 agr_destroy (struct agr_proc *agr)
664 {
665   struct agr_var *iter, *next;
666
667   if (agr->dict != NULL)
668     dict_destroy (agr->dict);
669   if (agr->sort != NULL)
670     sort_destroy_criteria (agr->sort);
671   free (agr->break_vars);
672   for (iter = agr->agr_vars; iter; iter = next)
673     {
674       next = iter->next;
675
676       if (iter->function & FSTRING)
677         {
678           int n_args;
679           int i;
680
681           n_args = agr_func_tab[iter->function & FUNC].n_args;
682           for (i = 0; i < n_args; i++)
683             free (iter->arg[i].c);
684           free (iter->string);
685         }
686       else if (iter->function == SD)
687         moments1_destroy (iter->moments);
688       free (iter);
689     }
690   free (agr->prev_break);
691   case_destroy (&agr->agr_case);
692 }
693 \f
694 /* Execution. */
695
696 static void accumulate_aggregate_info (struct agr_proc *,
697                                        const struct ccase *);
698 static void dump_aggregate_info (struct agr_proc *, struct ccase *);
699
700 /* Processes a single case INPUT for aggregation.  If output is
701    warranted, writes it to OUTPUT and returns nonzero.
702    Otherwise, returns zero and OUTPUT is unmodified. */
703 static int
704 aggregate_single_case (struct agr_proc *agr,
705                        const struct ccase *input, struct ccase *output)
706 {
707   /* The first case always begins a new break group.  We also need to
708      preserve the values of the case for later comparison. */
709   if (agr->case_cnt++ == 0)
710     {
711       int n_elem = 0;
712       
713       {
714         int i;
715
716         for (i = 0; i < agr->break_var_cnt; i++)
717           n_elem += agr->break_vars[i]->nv;
718       }
719       
720       agr->prev_break = xmalloc (sizeof *agr->prev_break * n_elem);
721
722       /* Copy INPUT into prev_break. */
723       {
724         union value *iter = agr->prev_break;
725         int i;
726
727         for (i = 0; i < agr->break_var_cnt; i++)
728           {
729             struct variable *v = agr->break_vars[i];
730             
731             if (v->type == NUMERIC)
732               (iter++)->f = case_num (input, v->fv);
733             else
734               {
735                 memcpy (iter->s, case_str (input, v->fv), v->width);
736                 iter += v->nv;
737               }
738           }
739       }
740             
741       accumulate_aggregate_info (agr, input);
742         
743       return 0;
744     }
745       
746   /* Compare the value of each break variable to the values on the
747      previous case. */
748   {
749     union value *iter = agr->prev_break;
750     int i;
751     
752     for (i = 0; i < agr->break_var_cnt; i++)
753       {
754         struct variable *v = agr->break_vars[i];
755       
756         switch (v->type)
757           {
758           case NUMERIC:
759             if (case_num (input, v->fv) != iter->f)
760               goto not_equal;
761             iter++;
762             break;
763           case ALPHA:
764             if (memcmp (case_str (input, v->fv), iter->s, v->width))
765               goto not_equal;
766             iter += v->nv;
767             break;
768           default:
769             assert (0);
770           }
771       }
772   }
773
774   accumulate_aggregate_info (agr, input);
775
776   return 0;
777   
778 not_equal:
779   /* The values of the break variable are different from the values on
780      the previous case.  That means that it's time to dump aggregate
781      info. */
782   dump_aggregate_info (agr, output);
783   initialize_aggregate_info (agr);
784   accumulate_aggregate_info (agr, input);
785
786   /* Copy INPUT into prev_break. */
787   {
788     union value *iter = agr->prev_break;
789     int i;
790
791     for (i = 0; i < agr->break_var_cnt; i++)
792       {
793         struct variable *v = agr->break_vars[i];
794             
795         if (v->type == NUMERIC)
796           (iter++)->f = case_num (input, v->fv);
797         else
798           {
799             memcpy (iter->s, case_str (input, v->fv), v->width);
800             iter += v->nv;
801           }
802       }
803   }
804   
805   return 1;
806 }
807
808 /* Accumulates aggregation data from the case INPUT. */
809 static void 
810 accumulate_aggregate_info (struct agr_proc *agr,
811                            const struct ccase *input)
812 {
813   struct agr_var *iter;
814   double weight;
815   int bad_warn = 1;
816
817   weight = dict_get_case_weight (default_dict, input, &bad_warn);
818
819   for (iter = agr->agr_vars; iter; iter = iter->next)
820     if (iter->src)
821       {
822         const union value *v = case_data (input, iter->src->fv);
823
824         if ((!iter->include_missing && is_missing (v, iter->src))
825             || (iter->include_missing && iter->src->type == NUMERIC
826                 && v->f == SYSMIS))
827           {
828             switch (iter->function)
829               {
830               case NMISS:
831                 iter->dbl[0] += weight;
832                 break;
833               case NUMISS:
834                 iter->int1++;
835                 break;
836               }
837             iter->missing = 1;
838             continue;
839           }
840         
841         /* This is horrible.  There are too many possibilities. */
842         switch (iter->function)
843           {
844           case SUM:
845             iter->dbl[0] += v->f;
846             break;
847           case MEAN:
848             iter->dbl[0] += v->f * weight;
849             iter->dbl[1] += weight;
850             break;
851           case SD:
852             moments1_add (iter->moments, v->f, weight);
853             break;
854           case MAX:
855             iter->dbl[0] = max (iter->dbl[0], v->f);
856             iter->int1 = 1;
857             break;
858           case MAX | FSTRING:
859             if (memcmp (iter->string, v->s, iter->src->width) < 0)
860               memcpy (iter->string, v->s, iter->src->width);
861             iter->int1 = 1;
862             break;
863           case MIN:
864             iter->dbl[0] = min (iter->dbl[0], v->f);
865             iter->int1 = 1;
866             break;
867           case MIN | FSTRING:
868             if (memcmp (iter->string, v->s, iter->src->width) > 0)
869               memcpy (iter->string, v->s, iter->src->width);
870             iter->int1 = 1;
871             break;
872           case FGT:
873           case PGT:
874             if (v->f > iter->arg[0].f)
875               iter->dbl[0] += weight;
876             iter->dbl[1] += weight;
877             break;
878           case FGT | FSTRING:
879           case PGT | FSTRING:
880             if (memcmp (iter->arg[0].c, v->s, iter->src->width) < 0)
881               iter->dbl[0] += weight;
882             iter->dbl[1] += weight;
883             break;
884           case FLT:
885           case PLT:
886             if (v->f < iter->arg[0].f)
887               iter->dbl[0] += weight;
888             iter->dbl[1] += weight;
889             break;
890           case FLT | FSTRING:
891           case PLT | FSTRING:
892             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0)
893               iter->dbl[0] += weight;
894             iter->dbl[1] += weight;
895             break;
896           case FIN:
897           case PIN:
898             if (iter->arg[0].f <= v->f && v->f <= iter->arg[1].f)
899               iter->dbl[0] += weight;
900             iter->dbl[1] += weight;
901             break;
902           case FIN | FSTRING:
903           case PIN | FSTRING:
904             if (memcmp (iter->arg[0].c, v->s, iter->src->width) <= 0
905                 && memcmp (iter->arg[1].c, v->s, iter->src->width) >= 0)
906               iter->dbl[0] += weight;
907             iter->dbl[1] += weight;
908             break;
909           case FOUT:
910           case POUT:
911             if (iter->arg[0].f > v->f || v->f > iter->arg[1].f)
912               iter->dbl[0] += weight;
913             iter->dbl[1] += weight;
914             break;
915           case FOUT | FSTRING:
916           case POUT | FSTRING:
917             if (memcmp (iter->arg[0].c, v->s, iter->src->width) > 0
918                 && memcmp (iter->arg[1].c, v->s, iter->src->width) < 0)
919               iter->dbl[0] += weight;
920             iter->dbl[1] += weight;
921             break;
922           case N:
923             iter->dbl[0] += weight;
924             break;
925           case NU:
926             iter->int1++;
927             break;
928           case FIRST:
929             if (iter->int1 == 0)
930               {
931                 iter->dbl[0] = v->f;
932                 iter->int1 = 1;
933               }
934             break;
935           case FIRST | FSTRING:
936             if (iter->int1 == 0)
937               {
938                 memcpy (iter->string, v->s, iter->src->width);
939                 iter->int1 = 1;
940               }
941             break;
942           case LAST:
943             iter->dbl[0] = v->f;
944             iter->int1 = 1;
945             break;
946           case LAST | FSTRING:
947             memcpy (iter->string, v->s, iter->src->width);
948             iter->int1 = 1;
949             break;
950           default:
951             assert (0);
952           }
953     } else {
954       switch (iter->function)
955         {
956         case N_NO_VARS:
957           iter->dbl[0] += weight;
958           break;
959         case NU_NO_VARS:
960           iter->int1++;
961           break;
962         default:
963           assert (0);
964         }
965     }
966 }
967
968 /* We've come to a record that differs from the previous in one or
969    more of the break variables.  Make an output record from the
970    accumulated statistics in the OUTPUT case. */
971 static void 
972 dump_aggregate_info (struct agr_proc *agr, struct ccase *output)
973 {
974   {
975     int value_idx = 0;
976     int i;
977
978     for (i = 0; i < agr->break_var_cnt; i++) 
979       {
980         int nv = agr->break_vars[i]->nv;
981         memcpy (case_data_rw (output, value_idx),
982                 &agr->prev_break[value_idx],
983                 sizeof (union value) * nv);
984         value_idx += nv; 
985       }
986   }
987   
988   {
989     struct agr_var *i;
990   
991     for (i = agr->agr_vars; i; i = i->next)
992       {
993         union value *v = case_data_rw (output, i->dest->fv);
994
995         if (agr->missing == COLUMNWISE && i->missing != 0
996             && (i->function & FUNC) != N && (i->function & FUNC) != NU
997             && (i->function & FUNC) != NMISS && (i->function & FUNC) != NUMISS)
998           {
999             if (i->function & FSTRING)
1000               memset (v->s, ' ', i->dest->width);
1001             else
1002               v->f = SYSMIS;
1003             continue;
1004           }
1005         
1006         switch (i->function)
1007           {
1008           case SUM:
1009             v->f = i->dbl[0];
1010             break;
1011           case MEAN:
1012             v->f = i->dbl[1] != 0.0 ? i->dbl[0] / i->dbl[1] : SYSMIS;
1013             break;
1014           case SD:
1015             {
1016               double variance;
1017
1018               /* FIXME: we should use two passes. */
1019               moments1_calculate (i->moments, NULL, NULL, &variance,
1020                                  NULL, NULL);
1021               if (variance != SYSMIS)
1022                 v->f = sqrt (variance);
1023               else
1024                 v->f = SYSMIS; 
1025             }
1026             break;
1027           case MAX:
1028           case MIN:
1029             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1030             break;
1031           case MAX | FSTRING:
1032           case MIN | FSTRING:
1033             if (i->int1)
1034               memcpy (v->s, i->string, i->dest->width);
1035             else
1036               memset (v->s, ' ', i->dest->width);
1037             break;
1038           case FGT | FSTRING:
1039           case FLT | FSTRING:
1040           case FIN | FSTRING:
1041           case FOUT | FSTRING:
1042             v->f = i->int2 ? (double) i->int1 / (double) i->int2 : SYSMIS;
1043             break;
1044           case FGT:
1045           case FLT:
1046           case FIN:
1047           case FOUT:
1048             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] : SYSMIS;
1049             break;
1050           case PGT:
1051           case PGT | FSTRING:
1052           case PLT:
1053           case PLT | FSTRING:
1054           case PIN:
1055           case PIN | FSTRING:
1056           case POUT:
1057           case POUT | FSTRING:
1058             v->f = i->dbl[1] ? i->dbl[0] / i->dbl[1] * 100.0 : SYSMIS;
1059             break;
1060           case N:
1061             v->f = i->dbl[0];
1062             break;
1063           case NU:
1064             v->f = i->int1;
1065             break;
1066           case FIRST:
1067           case LAST:
1068             v->f = i->int1 ? i->dbl[0] : SYSMIS;
1069             break;
1070           case FIRST | FSTRING:
1071           case LAST | FSTRING:
1072             if (i->int1)
1073               memcpy (v->s, i->string, i->dest->width);
1074             else
1075               memset (v->s, ' ', i->dest->width);
1076             break;
1077           case N_NO_VARS:
1078             v->f = i->dbl[0];
1079             break;
1080           case NU_NO_VARS:
1081             v->f = i->int1;
1082             break;
1083           case NMISS:
1084             v->f = i->dbl[0];
1085             break;
1086           case NUMISS:
1087             v->f = i->int1;
1088             break;
1089           default:
1090             assert (0);
1091           }
1092       }
1093   }
1094 }
1095
1096 /* Resets the state for all the aggregate functions. */
1097 static void
1098 initialize_aggregate_info (struct agr_proc *agr)
1099 {
1100   struct agr_var *iter;
1101
1102   for (iter = agr->agr_vars; iter; iter = iter->next)
1103     {
1104       iter->missing = 0;
1105       switch (iter->function)
1106         {
1107         case MIN:
1108           iter->dbl[0] = DBL_MAX;
1109           break;
1110         case MIN | FSTRING:
1111           memset (iter->string, 255, iter->src->width);
1112           break;
1113         case MAX:
1114           iter->dbl[0] = -DBL_MAX;
1115           break;
1116         case MAX | FSTRING:
1117           memset (iter->string, 0, iter->src->width);
1118           break;
1119         case SD:
1120           if (iter->moments == NULL)
1121             iter->moments = moments1_create (MOMENT_VARIANCE);
1122           else
1123             moments1_clear (iter->moments);
1124           break;
1125         default:
1126           iter->dbl[0] = iter->dbl[1] = iter->dbl[2] = 0.0;
1127           iter->int1 = iter->int2 = 0;
1128           break;
1129         }
1130     }
1131 }
1132 \f
1133 /* Aggregate each case as it comes through.  Cases which aren't needed
1134    are dropped. */
1135 static int
1136 agr_to_active_file (struct ccase *c, void *agr_)
1137 {
1138   struct agr_proc *agr = agr_;
1139
1140   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1141     agr->sink->class->write (agr->sink, &agr->agr_case);
1142
1143   return 1;
1144 }
1145
1146 /* Writes AGR->agr_case to AGR->out_file. */
1147 static void
1148 write_case_to_sfm (struct agr_proc *agr)
1149 {
1150   flt64 *p;
1151   int i;
1152
1153   p = agr->sfm_agr_case;
1154   for (i = 0; i < dict_get_var_cnt (agr->dict); i++)
1155     {
1156       struct variable *v = dict_get_var (agr->dict, i);
1157       
1158       if (v->type == NUMERIC)
1159         {
1160           double src = case_num (&agr->agr_case, v->fv);
1161           if (src == SYSMIS)
1162             *p++ = -FLT64_MAX;
1163           else
1164             *p++ = src;
1165         }
1166       else
1167         {
1168           memcpy (p, case_str (&agr->agr_case, v->fv), v->width);
1169           memset (&((char *) p)[v->width], ' ',
1170                   REM_RND_UP (v->width, sizeof (flt64)));
1171           p += DIV_RND_UP (v->width, sizeof (flt64));
1172         }
1173     }
1174
1175   sfm_write_case (agr->out_file, agr->sfm_agr_case, p - agr->sfm_agr_case);
1176 }
1177
1178 /* Aggregate the current case and output it if we passed a
1179    breakpoint. */
1180 static int
1181 presorted_agr_to_sysfile (struct ccase *c, void *agr_) 
1182 {
1183   struct agr_proc *agr = agr_;
1184
1185   if (aggregate_single_case (agr, c, &agr->agr_case)) 
1186     write_case_to_sfm (agr);
1187
1188   return 1;
1189 }